liveMedia/RTCP.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00015 **********/
00016 // "liveMedia"
00017 // Copyright (c) 1996-2008 Live Networks, Inc.  All rights reserved.
00018 // RTCP
00019 // Implementation
00020 
00021 #include "RTCP.hh"
00022 #include "GroupsockHelper.hh"
00023 #include "rtcp_from_spec.h"
00024 
00026 
00027 class RTCPMemberDatabase {
00028 public:
00029   RTCPMemberDatabase(RTCPInstance& ourRTCPInstance)
00030     : fOurRTCPInstance(ourRTCPInstance), fNumMembers(1 /*ourself*/),
00031       fTable(HashTable::create(ONE_WORD_HASH_KEYS)) {
00032   }
00033 
00034   virtual ~RTCPMemberDatabase() {
00035         delete fTable;
00036   }
00037 
00038   Boolean isMember(unsigned ssrc) const {
00039     return fTable->Lookup((char*)(long)ssrc) != NULL;
00040   }
00041 
00042   Boolean noteMembership(unsigned ssrc, unsigned curTimeCount) {
00043     Boolean isNew = !isMember(ssrc);
00044 
00045     if (isNew) {
00046       ++fNumMembers;
00047     }
00048 
00049     // Record the current time, so we can age stale members
00050     fTable->Add((char*)(long)ssrc, (void*)(long)curTimeCount);
00051 
00052     return isNew;
00053   }
00054 
00055   Boolean remove(unsigned ssrc) {
00056     Boolean wasPresent = fTable->Remove((char*)(long)ssrc);
00057     if (wasPresent) {
00058       --fNumMembers;
00059     }
00060     return wasPresent;
00061   }
00062 
00063   unsigned numMembers() const {
00064     return fNumMembers;
00065   }
00066 
00067   void reapOldMembers(unsigned threshold);
00068 
00069 private:
00070   RTCPInstance& fOurRTCPInstance;
00071   unsigned fNumMembers;
00072   HashTable* fTable;
00073 };
00074 
00075 void RTCPMemberDatabase::reapOldMembers(unsigned threshold) {
00076   Boolean foundOldMember;
00077   unsigned oldSSRC = 0;
00078 
00079   do {
00080     foundOldMember = False;
00081 
00082     HashTable::Iterator* iter
00083       = HashTable::Iterator::create(*fTable);
00084     unsigned long timeCount;
00085     char const* key;
00086     while ((timeCount = (unsigned long)(iter->next(key))) != 0) {
00087 #ifdef DEBUG
00088       fprintf(stderr, "reap: checking SSRC 0x%lx: %ld (threshold %d)\n", (unsigned long)key, timeCount, threshold);
00089 #endif
00090       if (timeCount < (unsigned long)threshold) { // this SSRC is old
00091         unsigned long ssrc = (unsigned long)key;
00092         oldSSRC = (unsigned)ssrc;
00093         foundOldMember = True;
00094       }
00095     }
00096     delete iter;
00097 
00098     if (foundOldMember) {
00099 #ifdef DEBUG
00100         fprintf(stderr, "reap: removing SSRC 0x%x\n", oldSSRC);
00101 #endif
00102       fOurRTCPInstance.removeSSRC(oldSSRC, True);
00103     }
00104   } while (foundOldMember);
00105 }
00106 
00107 
00109 
00110 static double dTimeNow() {
00111     struct timeval timeNow;
00112     gettimeofday(&timeNow, NULL);
00113     return (double) (timeNow.tv_sec + timeNow.tv_usec/1000000.0);
00114 }
00115 
00116 static unsigned const maxPacketSize = 1450;
00117         // bytes (1500, minus some allowance for IP, UDP, UMTP headers)
00118 static unsigned const preferredPacketSize = 1000; // bytes
00119 
00120 RTCPInstance::RTCPInstance(UsageEnvironment& env, Groupsock* RTCPgs,
00121                            unsigned totSessionBW,
00122                            unsigned char const* cname,
00123                            RTPSink* sink, RTPSource const* source,
00124                            Boolean isSSMSource)
00125   : Medium(env), fRTCPInterface(this, RTCPgs), fTotSessionBW(totSessionBW),
00126     fSink(sink), fSource(source), fIsSSMSource(isSSMSource),
00127     fCNAME(RTCP_SDES_CNAME, cname), fOutgoingReportCount(1),
00128     fAveRTCPSize(0), fIsInitial(1), fPrevNumMembers(0),
00129     fLastSentSize(0), fLastReceivedSize(0), fLastReceivedSSRC(0),
00130     fTypeOfEvent(EVENT_UNKNOWN), fTypeOfPacket(PACKET_UNKNOWN_TYPE),
00131     fHaveJustSentPacket(False), fLastPacketSentSize(0),
00132     fByeHandlerTask(NULL), fByeHandlerClientData(NULL),
00133     fSRHandlerTask(NULL), fSRHandlerClientData(NULL),
00134     fRRHandlerTask(NULL), fRRHandlerClientData(NULL),
00135     fSpecificRRHandlerTable(NULL) {
00136 #ifdef DEBUG
00137   fprintf(stderr, "RTCPInstance[%p]::RTCPInstance()\n", this);
00138 #endif
00139   if (fTotSessionBW == 0) { // not allowed!
00140     env << "RTCPInstance::RTCPInstance error: totSessionBW parameter should not be zero!\n";
00141     fTotSessionBW = 1;
00142   }
00143 
00144   if (isSSMSource) RTCPgs->multicastSendOnly(); // don't receive multicast
00145     
00146   double timeNow = dTimeNow();
00147   fPrevReportTime = fNextReportTime = timeNow;
00148 
00149   fKnownMembers = new RTCPMemberDatabase(*this);
00150   fInBuf = new unsigned char[maxPacketSize];
00151   if (fKnownMembers == NULL || fInBuf == NULL) return;
00152 
00153   // A hack to save buffer space, because RTCP packets are always small:
00154   unsigned savedMaxSize = OutPacketBuffer::maxSize;
00155   OutPacketBuffer::maxSize = maxPacketSize;
00156   fOutBuf = new OutPacketBuffer(preferredPacketSize, maxPacketSize);
00157   OutPacketBuffer::maxSize = savedMaxSize;
00158   if (fOutBuf == NULL) return;
00159 
00160   // Arrange to handle incoming reports from others:
00161   TaskScheduler::BackgroundHandlerProc* handler
00162     = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00163   fRTCPInterface.startNetworkReading(handler);
00164   
00165   // Send our first report.
00166   fTypeOfEvent = EVENT_REPORT;
00167   onExpire(this);
00168 }
00169 
00170 struct RRHandlerRecord {
00171   TaskFunc* rrHandlerTask;
00172   void* rrHandlerClientData;
00173 };
00174 
00175 RTCPInstance::~RTCPInstance() {
00176 #ifdef DEBUG
00177   fprintf(stderr, "RTCPInstance[%p]::~RTCPInstance()\n", this);
00178 #endif
00179   // Turn off background read handling:
00180   fRTCPInterface.stopNetworkReading();
00181 
00182   // Begin by sending a BYE.  We have to do this immediately, without
00183   // 'reconsideration', because "this" is going away.
00184   fTypeOfEvent = EVENT_BYE; // not used, but...
00185   sendBYE();
00186 
00187   if (fSpecificRRHandlerTable != NULL) {
00188     AddressPortLookupTable::Iterator iter(*fSpecificRRHandlerTable);
00189     RRHandlerRecord* rrHandler;
00190     while ((rrHandler = (RRHandlerRecord*)iter.next()) != NULL) {
00191       delete rrHandler;
00192     }
00193     delete fSpecificRRHandlerTable;
00194   }
00195 
00196   delete fKnownMembers;
00197   delete fOutBuf;
00198   delete[] fInBuf;
00199 }
00200 
00201 RTCPInstance* RTCPInstance::createNew(UsageEnvironment& env, Groupsock* RTCPgs,
00202                                       unsigned totSessionBW,
00203                                       unsigned char const* cname,
00204                                       RTPSink* sink, RTPSource const* source,
00205                                       Boolean isSSMSource) {
00206   return new RTCPInstance(env, RTCPgs, totSessionBW, cname, sink, source,
00207                           isSSMSource);
00208 }
00209 
00210 Boolean RTCPInstance::lookupByName(UsageEnvironment& env,
00211                                    char const* instanceName,
00212                                    RTCPInstance*& resultInstance) {
00213   resultInstance = NULL; // unless we succeed
00214 
00215   Medium* medium;
00216   if (!Medium::lookupByName(env, instanceName, medium)) return False;
00217 
00218   if (!medium->isRTCPInstance()) {
00219     env.setResultMsg(instanceName, " is not a RTCP instance");
00220     return False;
00221   }
00222 
00223   resultInstance = (RTCPInstance*)medium;
00224   return True;
00225 }
00226 
00227 Boolean RTCPInstance::isRTCPInstance() const {
00228   return True;
00229 }
00230 
00231 unsigned RTCPInstance::numMembers() const {
00232   if (fKnownMembers == NULL) return 0;
00233 
00234   return fKnownMembers->numMembers();
00235 }
00236 
00237 void RTCPInstance::setByeHandler(TaskFunc* handlerTask, void* clientData,
00238                                  Boolean handleActiveParticipantsOnly) {
00239   fByeHandlerTask = handlerTask;
00240   fByeHandlerClientData = clientData;
00241   fByeHandleActiveParticipantsOnly = handleActiveParticipantsOnly;
00242 }
00243 
00244 void RTCPInstance::setSRHandler(TaskFunc* handlerTask, void* clientData) {
00245   fSRHandlerTask = handlerTask;
00246   fSRHandlerClientData = clientData;
00247 }
00248 
00249 void RTCPInstance::setRRHandler(TaskFunc* handlerTask, void* clientData) {
00250   fRRHandlerTask = handlerTask;
00251   fRRHandlerClientData = clientData;
00252 }
00253 
00254 void RTCPInstance
00255 ::setSpecificRRHandler(netAddressBits fromAddress, Port fromPort,
00256                        TaskFunc* handlerTask, void* clientData) {
00257   if (handlerTask == NULL && clientData == NULL) {
00258     unsetSpecificRRHandler(fromAddress, fromPort);
00259     return;
00260   }
00261 
00262   RRHandlerRecord* rrHandler = new RRHandlerRecord;
00263   rrHandler->rrHandlerTask = handlerTask;
00264   rrHandler->rrHandlerClientData = clientData;
00265   if (fSpecificRRHandlerTable == NULL) {
00266     fSpecificRRHandlerTable = new AddressPortLookupTable;
00267   }
00268   fSpecificRRHandlerTable->Add(fromAddress, (~0), fromPort, rrHandler);
00269 }
00270 
00271 void RTCPInstance
00272 ::unsetSpecificRRHandler(netAddressBits fromAddress, Port fromPort) {
00273   if (fSpecificRRHandlerTable == NULL) return;
00274 
00275   RRHandlerRecord* rrHandler
00276     = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddress, (~0), fromPort));
00277   if (rrHandler != NULL) {
00278     fSpecificRRHandlerTable->Remove(fromAddress, (~0), fromPort);
00279     delete rrHandler;
00280   }
00281 }
00282 
00283 void RTCPInstance::setStreamSocket(int sockNum,
00284                                    unsigned char streamChannelId) {
00285   // Turn off background read handling:
00286   fRTCPInterface.stopNetworkReading();
00287 
00288   // Switch to RTCP-over-TCP:
00289   fRTCPInterface.setStreamSocket(sockNum, streamChannelId);
00290 
00291   // Turn background reading back on:
00292   TaskScheduler::BackgroundHandlerProc* handler
00293     = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00294   fRTCPInterface.startNetworkReading(handler);
00295 }
00296 
00297 void RTCPInstance::addStreamSocket(int sockNum,
00298                                    unsigned char streamChannelId) {
00299   // First, turn off background read handling for the default (UDP) socket:
00300   fRTCPInterface.stopNetworkReading();
00301   
00302   // Add the RTCP-over-TCP interface:
00303   fRTCPInterface.setStreamSocket(sockNum, streamChannelId);
00304 
00305   // Turn on background reading for this socket (in case it's not on already):
00306   TaskScheduler::BackgroundHandlerProc* handler
00307     = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00308   fRTCPInterface.startNetworkReading(handler);
00309 }
00310 
00311 static unsigned const IP_UDP_HDR_SIZE = 28;
00312     // overhead (bytes) of IP and UDP hdrs
00313 
00314 #define ADVANCE(n) pkt += (n); packetSize -= (n)
00315 
00316 void RTCPInstance::incomingReportHandler(RTCPInstance* instance,
00317                                          int /*mask*/) {
00318   instance->incomingReportHandler1();
00319 }
00320 
00321 void RTCPInstance::incomingReportHandler1() {
00322   unsigned char* pkt = fInBuf;
00323   unsigned packetSize;
00324   struct sockaddr_in fromAddress;
00325   int typeOfPacket = PACKET_UNKNOWN_TYPE;
00326 
00327   do {
00328     int tcpReadStreamSocketNum = fRTCPInterface.nextTCPReadStreamSocketNum();
00329     unsigned char tcpReadStreamChannelId = fRTCPInterface.nextTCPReadStreamChannelId();
00330     if (!fRTCPInterface.handleRead(pkt, maxPacketSize,
00331                                    packetSize, fromAddress)) {
00332       break;
00333     }
00334 
00335     // Ignore the packet if it was looped-back from ourself:
00336     if (RTCPgs()->wasLoopedBackFromUs(envir(), fromAddress)) {
00337       // However, we still want to handle incoming RTCP packets from
00338       // *other processes* on the same machine.  To distinguish this
00339       // case from a true loop-back, check whether we've just sent a
00340       // packet of the same size.  (This check isn't perfect, but it seems
00341       // to be the best we can do.)
00342       if (fHaveJustSentPacket && fLastPacketSentSize == packetSize) {
00343         // This is a true loop-back:
00344         fHaveJustSentPacket = False;
00345         break; // ignore this packet
00346       }
00347     }
00348 
00349     if (fIsSSMSource) {
00350       // This packet was received via unicast.  'Reflect' it by resending
00351       // it to the multicast group.
00352       // NOTE: Denial-of-service attacks are possible here.
00353       // Users of this software may wish to add their own,
00354       // application-specific mechanism for 'authenticating' the
00355       // validity of this packet before reflecting it.
00356       fRTCPInterface.sendPacket(pkt, packetSize);
00357       fHaveJustSentPacket = True;
00358       fLastPacketSentSize = packetSize;
00359     }
00360 
00361 #ifdef DEBUG
00362     fprintf(stderr, "[%p]saw incoming RTCP packet (from address %s, port %d)\n", this, our_inet_ntoa(fromAddress.sin_addr), ntohs(fromAddress.sin_port));
00363     unsigned char* p = pkt;
00364     for (unsigned i = 0; i < packetSize; ++i) {
00365       if (i%4 == 0) fprintf(stderr, " ");
00366       fprintf(stderr, "%02x", p[i]);
00367     }
00368     fprintf(stderr, "\n");
00369 #endif
00370     int totPacketSize = IP_UDP_HDR_SIZE + packetSize;
00371 
00372     // Check the RTCP packet for validity:
00373     // It must at least contain a header (4 bytes), and this header
00374     // must be version=2, with no padding bit, and a payload type of
00375     // SR (200) or RR (201):
00376     if (packetSize < 4) break;
00377     unsigned rtcpHdr = ntohl(*(unsigned*)pkt);
00378     if ((rtcpHdr & 0xE0FE0000) != (0x80000000 | (RTCP_PT_SR<<16))) {
00379 #ifdef DEBUG
00380       fprintf(stderr, "rejected bad RTCP packet: header 0x%08x\n", rtcpHdr);
00381 #endif
00382       break;
00383     }
00384 
00385     // Process each of the individual RTCP 'subpackets' in (what may be)
00386     // a compound RTCP packet.
00387     unsigned reportSenderSSRC = 0;
00388     Boolean packetOK = False;
00389     while (1) {
00390       unsigned rc = (rtcpHdr>>24)&0x1F;
00391       unsigned pt = (rtcpHdr>>16)&0xFF;
00392       unsigned length = 4*(rtcpHdr&0xFFFF); // doesn't count hdr
00393       ADVANCE(4); // skip over the header
00394       if (length > packetSize) break;
00395 
00396       // Assume that each RTCP subpacket begins with a 4-byte SSRC:
00397       if (length < 4) break; length -= 4;
00398       reportSenderSSRC = ntohl(*(unsigned*)pkt); ADVANCE(4);
00399 
00400       Boolean subPacketOK = False;
00401       switch (pt) {
00402         case RTCP_PT_SR: {
00403 #ifdef DEBUG
00404           fprintf(stderr, "SR\n");
00405 #endif
00406           if (length < 20) break; length -= 20;
00407 
00408           // Extract the NTP timestamp, and note this:
00409           unsigned NTPmsw = ntohl(*(unsigned*)pkt); ADVANCE(4);
00410           unsigned NTPlsw = ntohl(*(unsigned*)pkt); ADVANCE(4);
00411           unsigned rtpTimestamp = ntohl(*(unsigned*)pkt); ADVANCE(4);
00412           if (fSource != NULL) {
00413             RTPReceptionStatsDB& receptionStats
00414               = fSource->receptionStatsDB();
00415             receptionStats.noteIncomingSR(reportSenderSSRC,
00416                                           NTPmsw, NTPlsw, rtpTimestamp);
00417           }
00418           ADVANCE(8); // skip over packet count, octet count
00419 
00420           // If a 'SR handler' was set, call it now:
00421           if (fSRHandlerTask != NULL) (*fSRHandlerTask)(fSRHandlerClientData);
00422 
00423           // The rest of the SR is handled like a RR (so, no "break;" here)
00424         }
00425         case RTCP_PT_RR: {
00426 #ifdef DEBUG
00427           fprintf(stderr, "RR\n");
00428 #endif
00429           unsigned reportBlocksSize = rc*(6*4);
00430           if (length < reportBlocksSize) break;
00431           length -= reportBlocksSize;
00432 
00433           if (fSink != NULL) {
00434             // Use this information to update stats about our transmissions:
00435             RTPTransmissionStatsDB& transmissionStats = fSink->transmissionStatsDB();
00436             for (unsigned i = 0; i < rc; ++i) {
00437               unsigned senderSSRC = ntohl(*(unsigned*)pkt); ADVANCE(4);
00438               // We care only about reports about our own transmission, not others'
00439               if (senderSSRC == fSink->SSRC()) {
00440                 unsigned lossStats = ntohl(*(unsigned*)pkt); ADVANCE(4);
00441                 unsigned highestReceived = ntohl(*(unsigned*)pkt); ADVANCE(4);
00442                 unsigned jitter = ntohl(*(unsigned*)pkt); ADVANCE(4);
00443                 unsigned timeLastSR = ntohl(*(unsigned*)pkt); ADVANCE(4);
00444                 unsigned timeSinceLastSR = ntohl(*(unsigned*)pkt); ADVANCE(4);
00445                 transmissionStats.noteIncomingRR(reportSenderSSRC, fromAddress,
00446                                                  lossStats,
00447                                                  highestReceived, jitter,
00448                                                  timeLastSR, timeSinceLastSR);
00449               } else {
00450                 ADVANCE(4*5);
00451               }
00452             }
00453           } else {
00454             ADVANCE(reportBlocksSize);
00455           }
00456 
00457           if (pt == RTCP_PT_RR) { // i.e., we didn't fall through from 'SR'
00458             // If a 'RR handler' was set, call it now:
00459 
00460             // Specific RR handler:
00461             if (fSpecificRRHandlerTable != NULL) {
00462               netAddressBits fromAddr;
00463               portNumBits fromPortNum;
00464               if (tcpReadStreamSocketNum < 0) {
00465                 // Normal case: We read the RTCP packet over UDP
00466                 fromAddr = fromAddress.sin_addr.s_addr;
00467                 fromPortNum = ntohs(fromAddress.sin_port);
00468               } else {
00469                 // Special case: We read the RTCP packet over TCP (interleaved)
00470                 // Hack: Use the TCP socket and channel id to look up the handler
00471                 fromAddr = tcpReadStreamSocketNum;
00472                 fromPortNum = tcpReadStreamChannelId;
00473               }
00474               Port fromPort(fromPortNum); 
00475               RRHandlerRecord* rrHandler
00476                 = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddr, (~0), fromPort));
00477               if (rrHandler != NULL) {
00478                 if (rrHandler->rrHandlerTask != NULL) {
00479                   (*(rrHandler->rrHandlerTask))(rrHandler->rrHandlerClientData);
00480                 }
00481               }
00482             }
00483 
00484             // General RR handler:
00485             if (fRRHandlerTask != NULL) (*fRRHandlerTask)(fRRHandlerClientData);
00486           }
00487 
00488           subPacketOK = True;
00489           typeOfPacket = PACKET_RTCP_REPORT;
00490           break;
00491         }
00492         case RTCP_PT_BYE: {
00493 #ifdef DEBUG
00494           fprintf(stderr, "BYE\n");
00495 #endif
00496           // If a 'BYE handler' was set, call it now:
00497           TaskFunc* byeHandler = fByeHandlerTask;
00498           if (byeHandler != NULL
00499               && (!fByeHandleActiveParticipantsOnly
00500                   || (fSource != NULL
00501                       && fSource->receptionStatsDB().lookup(reportSenderSSRC) != NULL)
00502                   || (fSink != NULL
00503                       && fSink->transmissionStatsDB().lookup(reportSenderSSRC) != NULL))) {
00504             fByeHandlerTask = NULL;
00505                 // we call this only once by default 
00506             (*byeHandler)(fByeHandlerClientData);
00507           }
00508 
00509           // We should really check for & handle >1 SSRCs being present #####
00510 
00511           subPacketOK = True;
00512           typeOfPacket = PACKET_BYE;
00513           break;
00514         }
00515         // Later handle SDES, APP, and compound RTCP packets #####
00516         default:
00517 #ifdef DEBUG
00518           fprintf(stderr, "UNSUPPORTED TYPE(0x%x)\n", pt);
00519 #endif
00520           subPacketOK = True;
00521           break;
00522       }  
00523       if (!subPacketOK) break;
00524 
00525       // need to check for (& handle) SSRC collision! #####
00526 
00527 #ifdef DEBUG
00528       fprintf(stderr, "validated RTCP subpacket (type %d): %d, %d, %d, 0x%08x\n", typeOfPacket, rc, pt, length, reportSenderSSRC);
00529 #endif
00530       
00531       // Skip over any remaining bytes in this subpacket:
00532       ADVANCE(length);
00533 
00534       // Check whether another RTCP 'subpacket' follows:
00535       if (packetSize == 0) {
00536         packetOK = True;
00537         break;
00538       } else if (packetSize < 4) {
00539 #ifdef DEBUG
00540         fprintf(stderr, "extraneous %d bytes at end of RTCP packet!\n", packetSize);
00541 #endif
00542         break;
00543       }
00544       rtcpHdr = ntohl(*(unsigned*)pkt);
00545       if ((rtcpHdr & 0xC0000000) != 0x80000000) {
00546 #ifdef DEBUG
00547         fprintf(stderr, "bad RTCP subpacket: header 0x%08x\n", rtcpHdr);
00548 #endif
00549         break;
00550       }
00551     }
00552       
00553     if (!packetOK) {
00554 #ifdef DEBUG
00555       fprintf(stderr, "rejected bad RTCP subpacket: header 0x%08x\n", rtcpHdr);
00556 #endif
00557       break;
00558     } else {
00559 #ifdef DEBUG
00560       fprintf(stderr, "validated entire RTCP packet\n");
00561 #endif
00562     }
00563       
00564     onReceive(typeOfPacket, totPacketSize, reportSenderSSRC);
00565   } while (0);
00566 }
00567 
00568 void RTCPInstance::onReceive(int typeOfPacket, int totPacketSize,
00569                              unsigned ssrc) {
00570   fTypeOfPacket = typeOfPacket;
00571   fLastReceivedSize = totPacketSize;
00572   fLastReceivedSSRC = ssrc;
00573 
00574   int members = (int)numMembers();
00575   int senders = (fSink != NULL) ? 1 : 0;
00576 
00577   OnReceive(this, // p
00578             this, // e
00579             &members, // members
00580             &fPrevNumMembers, // pmembers
00581             &senders, // senders
00582             &fAveRTCPSize, // avg_rtcp_size
00583             &fPrevReportTime, // tp
00584             dTimeNow(), // tc
00585             fNextReportTime);
00586 }
00587 
00588 void RTCPInstance::sendReport() {
00589   // Hack: Don't send a SR during those (brief) times when the timestamp of the
00590   // next outgoing RTP packet has been preset, to ensure that that timestamp gets
00591   // used for that outgoing packet. (David Bertrand, 2006.07.18)
00592   if (fSink != NULL && fSink->nextTimestampHasBeenPreset()) return;
00593 
00594 #ifdef DEBUG
00595   fprintf(stderr, "sending REPORT\n");
00596 #endif
00597   // Begin by including a SR and/or RR report:
00598   addReport();
00599 
00600   // Then, include a SDES:
00601   addSDES();
00602 
00603   // Send the report:
00604   sendBuiltPacket();
00605 
00606   // Periodically clean out old members from our SSRC membership database:
00607   const unsigned membershipReapPeriod = 5;
00608   if ((++fOutgoingReportCount) % membershipReapPeriod == 0) {
00609     unsigned threshold = fOutgoingReportCount - membershipReapPeriod;
00610     fKnownMembers->reapOldMembers(threshold);
00611   }
00612 }
00613 
00614 void RTCPInstance::sendBYE() {
00615 #ifdef DEBUG
00616   fprintf(stderr, "sending BYE\n");
00617 #endif
00618   // The packet must begin with a SR and/or RR report:
00619   addReport();
00620 
00621   addBYE();
00622   sendBuiltPacket();
00623 }
00624 
00625 void RTCPInstance::sendBuiltPacket() {
00626 #ifdef DEBUG
00627   fprintf(stderr, "sending RTCP packet\n");
00628   unsigned char* p = fOutBuf->packet();
00629   for (unsigned i = 0; i < fOutBuf->curPacketSize(); ++i) {
00630     if (i%4 == 0) fprintf(stderr," ");
00631     fprintf(stderr, "%02x", p[i]);
00632   }
00633   fprintf(stderr, "\n");
00634 #endif
00635   unsigned reportSize = fOutBuf->curPacketSize();
00636   fRTCPInterface.sendPacket(fOutBuf->packet(), reportSize);
00637   fOutBuf->resetOffset();
00638 
00639   fLastSentSize = IP_UDP_HDR_SIZE + reportSize;
00640   fHaveJustSentPacket = True;
00641   fLastPacketSentSize = reportSize;
00642 }
00643 
00644 int RTCPInstance::checkNewSSRC() {
00645   return fKnownMembers->noteMembership(fLastReceivedSSRC,
00646                                        fOutgoingReportCount);
00647 }
00648 
00649 void RTCPInstance::removeLastReceivedSSRC() {
00650   removeSSRC(fLastReceivedSSRC, False/*keep stats around*/);
00651 }
00652 
00653 void RTCPInstance::removeSSRC(u_int32_t ssrc, Boolean alsoRemoveStats) {
00654   fKnownMembers->remove(ssrc);
00655 
00656   if (alsoRemoveStats) {
00657     // Also, remove records of this SSRC from any reception or transmission stats
00658     if (fSource != NULL) fSource->receptionStatsDB().removeRecord(ssrc);
00659     if (fSink != NULL) fSink->transmissionStatsDB().removeRecord(ssrc);
00660   }
00661 }
00662 
00663 void RTCPInstance::onExpire(RTCPInstance* instance) {
00664   instance->onExpire1();
00665 }
00666 
00667 // Member functions to build specific kinds of report:
00668 
00669 void RTCPInstance::addReport() {
00670   // Include a SR or a RR, depending on whether we
00671   // have an associated sink or source:
00672   if (fSink != NULL) {
00673     addSR();
00674   } else if (fSource != NULL) {
00675     addRR();
00676   }
00677 }
00678 
00679 void RTCPInstance::addSR() {
00680   // ASSERT: fSink != NULL
00681 
00682   enqueueCommonReportPrefix(RTCP_PT_SR, fSink->SSRC(),
00683                             5 /* extra words in a SR */);
00684 
00685   // Now, add the 'sender info' for our sink
00686 
00687   // Insert the NTP and RTP timestamps for the 'wallclock time':
00688   struct timeval timeNow;
00689   gettimeofday(&timeNow, NULL);
00690   fOutBuf->enqueueWord(timeNow.tv_sec + 0x83AA7E80);
00691       // NTP timestamp most-significant word (1970 epoch -> 1900 epoch)
00692   double fractionalPart = (timeNow.tv_usec/15625.0)*0x04000000; // 2^32/10^6
00693   fOutBuf->enqueueWord((unsigned)(fractionalPart+0.5));
00694       // NTP timestamp least-significant word
00695   unsigned rtpTimestamp = fSink->convertToRTPTimestamp(timeNow);
00696   fOutBuf->enqueueWord(rtpTimestamp); // RTP ts
00697 
00698   // Insert the packet and byte counts:
00699   fOutBuf->enqueueWord(fSink->packetCount());
00700   fOutBuf->enqueueWord(fSink->octetCount());
00701 
00702   enqueueCommonReportSuffix();
00703 }
00704 
00705 void RTCPInstance::addRR() {
00706   // ASSERT: fSource != NULL
00707 
00708   enqueueCommonReportPrefix(RTCP_PT_RR, fSource->SSRC());
00709   enqueueCommonReportSuffix();
00710 }
00711 
00712 void RTCPInstance::enqueueCommonReportPrefix(unsigned char packetType,
00713                                              unsigned SSRC,
00714                                              unsigned numExtraWords) {
00715   unsigned numReportingSources;
00716   if (fSource == NULL) {
00717     numReportingSources = 0; // we don't receive anything
00718   } else {
00719     RTPReceptionStatsDB& allReceptionStats
00720       = fSource->receptionStatsDB();
00721     numReportingSources = allReceptionStats.numActiveSourcesSinceLastReset();
00722     // This must be <32, to fit in 5 bits:
00723     if (numReportingSources >= 32) { numReportingSources = 32; }
00724     // Later: support adding more reports to handle >32 sources (unlikely)#####
00725   }
00726 
00727   unsigned rtcpHdr = 0x80000000; // version 2, no padding
00728   rtcpHdr |= (numReportingSources<<24);
00729   rtcpHdr |= (packetType<<16);
00730   rtcpHdr |= (1 + numExtraWords + 6*numReportingSources);
00731       // each report block is 6 32-bit words long
00732   fOutBuf->enqueueWord(rtcpHdr);
00733 
00734   fOutBuf->enqueueWord(SSRC);
00735 }
00736 
00737 void RTCPInstance::enqueueCommonReportSuffix() {
00738   // Output the report blocks for each source:
00739   if (fSource != NULL) { 
00740     RTPReceptionStatsDB& allReceptionStats
00741       = fSource->receptionStatsDB();
00742 
00743     RTPReceptionStatsDB::Iterator iterator(allReceptionStats);
00744     while (1) {
00745       RTPReceptionStats* receptionStats = iterator.next();
00746       if (receptionStats == NULL) break;
00747       enqueueReportBlock(receptionStats);
00748     }
00749 
00750     allReceptionStats.reset(); // because we have just generated a report
00751   }
00752 }
00753 
00754 void
00755 RTCPInstance::enqueueReportBlock(RTPReceptionStats* stats) {
00756   fOutBuf->enqueueWord(stats->SSRC());
00757 
00758   unsigned highestExtSeqNumReceived = stats->highestExtSeqNumReceived();
00759 
00760   unsigned totNumExpected
00761     = highestExtSeqNumReceived - stats->baseExtSeqNumReceived();
00762   int totNumLost = totNumExpected - stats->totNumPacketsReceived();
00763   // 'Clamp' this loss number to a 24-bit signed value:
00764   if (totNumLost > 0x007FFFFF) {
00765     totNumLost = 0x007FFFFF;
00766   } else if (totNumLost < 0) {
00767     if (totNumLost < -0x00800000) totNumLost = 0x00800000; // unlikely, but...
00768     totNumLost &= 0x00FFFFFF;
00769   }
00770 
00771   unsigned numExpectedSinceLastReset
00772     = highestExtSeqNumReceived - stats->lastResetExtSeqNumReceived();
00773   int numLostSinceLastReset
00774     = numExpectedSinceLastReset - stats->numPacketsReceivedSinceLastReset(); 
00775   unsigned char lossFraction;
00776   if (numExpectedSinceLastReset == 0 || numLostSinceLastReset < 0) {
00777     lossFraction = 0;
00778   } else {
00779     lossFraction = (unsigned char)
00780       ((numLostSinceLastReset << 8) / numExpectedSinceLastReset);
00781   }
00782   
00783   fOutBuf->enqueueWord((lossFraction<<24) | totNumLost);
00784   fOutBuf->enqueueWord(highestExtSeqNumReceived);
00785 
00786   fOutBuf->enqueueWord(stats->jitter());
00787 
00788   unsigned NTPmsw = stats->lastReceivedSR_NTPmsw();
00789   unsigned NTPlsw = stats->lastReceivedSR_NTPlsw();
00790   unsigned LSR = ((NTPmsw&0xFFFF)<<16)|(NTPlsw>>16); // middle 32 bits
00791   fOutBuf->enqueueWord(LSR);
00792 
00793   // Figure out how long has elapsed since the last SR rcvd from this src:
00794   struct timeval const& LSRtime = stats->lastReceivedSR_time(); // "last SR"
00795   struct timeval timeNow, timeSinceLSR;
00796   gettimeofday(&timeNow, NULL);
00797   if (timeNow.tv_usec < LSRtime.tv_usec) {
00798     timeNow.tv_usec += 1000000;
00799     timeNow.tv_sec -= 1;
00800   }
00801   timeSinceLSR.tv_sec = timeNow.tv_sec - LSRtime.tv_sec;
00802   timeSinceLSR.tv_usec = timeNow.tv_usec - LSRtime.tv_usec;
00803   // The enqueued time is in units of 1/65536 seconds.
00804   // (Note that 65536/1000000 == 1024/15625) 
00805   unsigned DLSR;
00806   if (LSR == 0) {
00807     DLSR = 0;
00808   } else {
00809     DLSR = (timeSinceLSR.tv_sec<<16)
00810          | ( (((timeSinceLSR.tv_usec<<11)+15625)/31250) & 0xFFFF);
00811   }
00812   fOutBuf->enqueueWord(DLSR);
00813 }
00814 
00815 void RTCPInstance::addSDES() {
00816   // For now we support only the CNAME item; later support more #####
00817 
00818   // Begin by figuring out the size of the entire SDES report:
00819   unsigned numBytes = 4;
00820       // counts the SSRC, but not the header; it'll get subtracted out
00821   numBytes += fCNAME.totalSize(); // includes id and length
00822   numBytes += 1; // the special END item
00823 
00824   unsigned num4ByteWords = (numBytes + 3)/4;
00825 
00826   unsigned rtcpHdr = 0x81000000; // version 2, no padding, 1 SSRC chunk
00827   rtcpHdr |= (RTCP_PT_SDES<<16);
00828   rtcpHdr |= num4ByteWords;
00829   fOutBuf->enqueueWord(rtcpHdr);
00830 
00831   if (fSource != NULL) {
00832     fOutBuf->enqueueWord(fSource->SSRC());
00833   } else if (fSink != NULL) {
00834     fOutBuf->enqueueWord(fSink->SSRC());
00835   }
00836 
00837   // Add the CNAME:
00838   fOutBuf->enqueue(fCNAME.data(), fCNAME.totalSize());
00839 
00840   // Add the 'END' item (i.e., a zero byte), plus any more needed to pad:
00841   unsigned numPaddingBytesNeeded = 4 - (fOutBuf->curPacketSize() % 4);
00842   unsigned char const zero = '\0';
00843   while (numPaddingBytesNeeded-- > 0) fOutBuf->enqueue(&zero, 1);
00844 }
00845 
00846 void RTCPInstance::addBYE() {
00847   unsigned rtcpHdr = 0x81000000; // version 2, no padding, 1 SSRC
00848   rtcpHdr |= (RTCP_PT_BYE<<16);
00849   rtcpHdr |= 1; // 2 32-bit words total (i.e., with 1 SSRC)
00850   fOutBuf->enqueueWord(rtcpHdr);
00851 
00852   if (fSource != NULL) {
00853     fOutBuf->enqueueWord(fSource->SSRC());
00854   } else if (fSink != NULL) {
00855     fOutBuf->enqueueWord(fSink->SSRC());
00856   }
00857 }
00858 
00859 void RTCPInstance::schedule(double nextTime) {
00860   fNextReportTime = nextTime;
00861 
00862   double secondsToDelay = nextTime - dTimeNow();
00863 #ifdef DEBUG
00864   fprintf(stderr, "schedule(%f->%f)\n", secondsToDelay, nextTime);
00865 #endif
00866   int usToGo = (int)(secondsToDelay * 1000000);
00867   nextTask() = envir().taskScheduler().scheduleDelayedTask(usToGo,
00868                                 (TaskFunc*)RTCPInstance::onExpire, this);
00869 }
00870 
00871 void RTCPInstance::reschedule(double nextTime) {
00872   envir().taskScheduler().unscheduleDelayedTask(nextTask());
00873   schedule(nextTime);
00874 }
00875 
00876 void RTCPInstance::onExpire1() {
00877   // Note: fTotSessionBW is kbits per second
00878   double rtcpBW = 0.05*fTotSessionBW*1024/8; // -> bytes per second
00879 
00880   OnExpire(this, // event
00881            numMembers(), // members
00882            (fSink != NULL) ? 1 : 0, // senders
00883            rtcpBW, // rtcp_bw
00884            (fSink != NULL) ? 1 : 0, // we_sent
00885            &fAveRTCPSize, // ave_rtcp_size
00886            &fIsInitial, // initial
00887            dTimeNow(), // tc
00888            &fPrevReportTime, // tp
00889            &fPrevNumMembers // pmembers
00890            );
00891 }
00892 
00894 
00895 SDESItem::SDESItem(unsigned char tag, unsigned char const* value) {
00896   unsigned length = strlen((char const*)value);
00897   if (length > 511) length = 511;
00898 
00899   fData[0] = tag;
00900   fData[1] = (unsigned char)length;
00901   memmove(&fData[2], value, length);
00902 
00903   // Pad the trailing bytes to a 4-byte boundary:
00904   while ((length)%4 > 0) fData[2 + length++] = '\0';
00905 }
00906 
00907 unsigned SDESItem::totalSize() const {
00908   return 2 + (unsigned)fData[1];
00909 }
00910 
00911 
00913 
00914 extern "C" void Schedule(double nextTime, event e) {
00915   RTCPInstance* instance = (RTCPInstance*)e;
00916   if (instance == NULL) return;
00917 
00918   instance->schedule(nextTime);
00919 }
00920 
00921 extern "C" void Reschedule(double nextTime, event e) {
00922   RTCPInstance* instance = (RTCPInstance*)e;
00923   if (instance == NULL) return;
00924 
00925   instance->reschedule(nextTime);
00926 }
00927 
00928 extern "C" void SendRTCPReport(event e) {
00929   RTCPInstance* instance = (RTCPInstance*)e;
00930   if (instance == NULL) return;
00931 
00932   instance->sendReport();
00933 }
00934