liveMedia/RTCP.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // "liveMedia"
00017 // Copyright (c) 1996-2014 Live Networks, Inc.  All rights reserved.
00018 // RTCP
00019 // Implementation
00020 
00021 #include "RTCP.hh"
00022 #include "GroupsockHelper.hh"
00023 #include "rtcp_from_spec.h"
00024 
00026 
00027 class RTCPMemberDatabase {
00028 public:
00029   RTCPMemberDatabase(RTCPInstance& ourRTCPInstance)
00030     : fOurRTCPInstance(ourRTCPInstance), fNumMembers(1 /*ourself*/),
00031       fTable(HashTable::create(ONE_WORD_HASH_KEYS)) {
00032   }
00033 
00034   virtual ~RTCPMemberDatabase() {
00035         delete fTable;
00036   }
00037 
00038   Boolean isMember(unsigned ssrc) const {
00039     return fTable->Lookup((char*)(long)ssrc) != NULL;
00040   }
00041 
00042   Boolean noteMembership(unsigned ssrc, unsigned curTimeCount) {
00043     Boolean isNew = !isMember(ssrc);
00044 
00045     if (isNew) {
00046       ++fNumMembers;
00047     }
00048 
00049     // Record the current time, so we can age stale members
00050     fTable->Add((char*)(long)ssrc, (void*)(long)curTimeCount);
00051 
00052     return isNew;
00053   }
00054 
00055   Boolean remove(unsigned ssrc) {
00056     Boolean wasPresent = fTable->Remove((char*)(long)ssrc);
00057     if (wasPresent) {
00058       --fNumMembers;
00059     }
00060     return wasPresent;
00061   }
00062 
00063   unsigned numMembers() const {
00064     return fNumMembers;
00065   }
00066 
00067   void reapOldMembers(unsigned threshold);
00068 
00069 private:
00070   RTCPInstance& fOurRTCPInstance;
00071   unsigned fNumMembers;
00072   HashTable* fTable;
00073 };
00074 
00075 void RTCPMemberDatabase::reapOldMembers(unsigned threshold) {
00076   Boolean foundOldMember;
00077   u_int32_t oldSSRC = 0;
00078 
00079   do {
00080     foundOldMember = False;
00081 
00082     HashTable::Iterator* iter
00083       = HashTable::Iterator::create(*fTable);
00084     uintptr_t timeCount;
00085     char const* key;
00086     while ((timeCount = (uintptr_t)(iter->next(key))) != 0) {
00087 #ifdef DEBUG
00088       fprintf(stderr, "reap: checking SSRC 0x%lx: %ld (threshold %d)\n", (unsigned long)key, timeCount, threshold);
00089 #endif
00090       if (timeCount < (uintptr_t)threshold) { // this SSRC is old
00091         uintptr_t ssrc = (uintptr_t)key;
00092         oldSSRC = (u_int32_t)ssrc;
00093         foundOldMember = True;
00094       }
00095     }
00096     delete iter;
00097 
00098     if (foundOldMember) {
00099 #ifdef DEBUG
00100         fprintf(stderr, "reap: removing SSRC 0x%x\n", oldSSRC);
00101 #endif
00102       fOurRTCPInstance.removeSSRC(oldSSRC, True);
00103     }
00104   } while (foundOldMember);
00105 }
00106 
00107 
00109 
00110 static double dTimeNow() {
00111     struct timeval timeNow;
00112     gettimeofday(&timeNow, NULL);
00113     return (double) (timeNow.tv_sec + timeNow.tv_usec/1000000.0);
00114 }
00115 
00116 static unsigned const maxRTCPPacketSize = 1450;
00117         // bytes (1500, minus some allowance for IP, UDP, UMTP headers)
00118 static unsigned const preferredPacketSize = 1000; // bytes
00119 
00120 RTCPInstance::RTCPInstance(UsageEnvironment& env, Groupsock* RTCPgs,
00121                            unsigned totSessionBW,
00122                            unsigned char const* cname,
00123                            RTPSink* sink, RTPSource* source,
00124                            Boolean isSSMSource)
00125   : Medium(env), fRTCPInterface(this, RTCPgs), fTotSessionBW(totSessionBW),
00126     fSink(sink), fSource(source), fIsSSMSource(isSSMSource),
00127     fCNAME(RTCP_SDES_CNAME, cname), fOutgoingReportCount(1),
00128     fAveRTCPSize(0), fIsInitial(1), fPrevNumMembers(0),
00129     fLastSentSize(0), fLastReceivedSize(0), fLastReceivedSSRC(0),
00130     fTypeOfEvent(EVENT_UNKNOWN), fTypeOfPacket(PACKET_UNKNOWN_TYPE),
00131     fHaveJustSentPacket(False), fLastPacketSentSize(0),
00132     fByeHandlerTask(NULL), fByeHandlerClientData(NULL),
00133     fSRHandlerTask(NULL), fSRHandlerClientData(NULL),
00134     fRRHandlerTask(NULL), fRRHandlerClientData(NULL),
00135     fSpecificRRHandlerTable(NULL) {
00136 #ifdef DEBUG
00137   fprintf(stderr, "RTCPInstance[%p]::RTCPInstance()\n", this);
00138 #endif
00139   if (fTotSessionBW == 0) { // not allowed!
00140     env << "RTCPInstance::RTCPInstance error: totSessionBW parameter should not be zero!\n";
00141     fTotSessionBW = 1;
00142   }
00143 
00144   if (isSSMSource) RTCPgs->multicastSendOnly(); // don't receive multicast
00145 
00146   double timeNow = dTimeNow();
00147   fPrevReportTime = fNextReportTime = timeNow;
00148 
00149   fKnownMembers = new RTCPMemberDatabase(*this);
00150   fInBuf = new unsigned char[maxRTCPPacketSize];
00151   if (fKnownMembers == NULL || fInBuf == NULL) return;
00152   fNumBytesAlreadyRead = 0;
00153 
00154   // A hack to save buffer space, because RTCP packets are always small:
00155   unsigned savedMaxSize = OutPacketBuffer::maxSize;
00156   OutPacketBuffer::maxSize = maxRTCPPacketSize;
00157   fOutBuf = new OutPacketBuffer(preferredPacketSize, maxRTCPPacketSize);
00158   OutPacketBuffer::maxSize = savedMaxSize;
00159   if (fOutBuf == NULL) return;
00160 
00161   if (fSource != NULL && fSource->RTPgs() == RTCPgs) {
00162     // We're receiving RTCP reports that are multiplexed with RTP, so ask the RTP source
00163     // to give them to us:
00164     fSource->registerForMultiplexedRTCPPackets(this);
00165   } else {
00166     // Arrange to handle incoming reports from the network:
00167     TaskScheduler::BackgroundHandlerProc* handler
00168       = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00169     fRTCPInterface.startNetworkReading(handler);
00170   }
00171 
00172   // Send our first report.
00173   fTypeOfEvent = EVENT_REPORT;
00174   onExpire(this);
00175 }
00176 
00177 struct RRHandlerRecord {
00178   TaskFunc* rrHandlerTask;
00179   void* rrHandlerClientData;
00180 };
00181 
00182 RTCPInstance::~RTCPInstance() {
00183 #ifdef DEBUG
00184   fprintf(stderr, "RTCPInstance[%p]::~RTCPInstance()\n", this);
00185 #endif
00186   if (fSource != NULL) fSource->deregisterForMultiplexedRTCPPackets();
00187 
00188   // Begin by sending a BYE.  We have to do this immediately, without
00189   // 'reconsideration', because "this" is going away.
00190   fTypeOfEvent = EVENT_BYE; // not used, but...
00191   sendBYE();
00192 
00193   if (fSpecificRRHandlerTable != NULL) {
00194     AddressPortLookupTable::Iterator iter(*fSpecificRRHandlerTable);
00195     RRHandlerRecord* rrHandler;
00196     while ((rrHandler = (RRHandlerRecord*)iter.next()) != NULL) {
00197       delete rrHandler;
00198     }
00199     delete fSpecificRRHandlerTable;
00200   }
00201 
00202   delete fKnownMembers;
00203   delete fOutBuf;
00204   delete[] fInBuf;
00205 }
00206 
00207 RTCPInstance* RTCPInstance::createNew(UsageEnvironment& env, Groupsock* RTCPgs,
00208                                       unsigned totSessionBW,
00209                                       unsigned char const* cname,
00210                                       RTPSink* sink, RTPSource* source,
00211                                       Boolean isSSMSource) {
00212   return new RTCPInstance(env, RTCPgs, totSessionBW, cname, sink, source,
00213                           isSSMSource);
00214 }
00215 
00216 Boolean RTCPInstance::lookupByName(UsageEnvironment& env,
00217                                    char const* instanceName,
00218                                    RTCPInstance*& resultInstance) {
00219   resultInstance = NULL; // unless we succeed
00220 
00221   Medium* medium;
00222   if (!Medium::lookupByName(env, instanceName, medium)) return False;
00223 
00224   if (!medium->isRTCPInstance()) {
00225     env.setResultMsg(instanceName, " is not a RTCP instance");
00226     return False;
00227   }
00228 
00229   resultInstance = (RTCPInstance*)medium;
00230   return True;
00231 }
00232 
00233 Boolean RTCPInstance::isRTCPInstance() const {
00234   return True;
00235 }
00236 
00237 unsigned RTCPInstance::numMembers() const {
00238   if (fKnownMembers == NULL) return 0;
00239 
00240   return fKnownMembers->numMembers();
00241 }
00242 
00243 void RTCPInstance::setByeHandler(TaskFunc* handlerTask, void* clientData,
00244                                  Boolean handleActiveParticipantsOnly) {
00245   fByeHandlerTask = handlerTask;
00246   fByeHandlerClientData = clientData;
00247   fByeHandleActiveParticipantsOnly = handleActiveParticipantsOnly;
00248 }
00249 
00250 void RTCPInstance::setSRHandler(TaskFunc* handlerTask, void* clientData) {
00251   fSRHandlerTask = handlerTask;
00252   fSRHandlerClientData = clientData;
00253 }
00254 
00255 void RTCPInstance::setRRHandler(TaskFunc* handlerTask, void* clientData) {
00256   fRRHandlerTask = handlerTask;
00257   fRRHandlerClientData = clientData;
00258 }
00259 
00260 void RTCPInstance
00261 ::setSpecificRRHandler(netAddressBits fromAddress, Port fromPort,
00262                        TaskFunc* handlerTask, void* clientData) {
00263   if (handlerTask == NULL && clientData == NULL) {
00264     unsetSpecificRRHandler(fromAddress, fromPort);
00265     return;
00266   }
00267 
00268   RRHandlerRecord* rrHandler = new RRHandlerRecord;
00269   rrHandler->rrHandlerTask = handlerTask;
00270   rrHandler->rrHandlerClientData = clientData;
00271   if (fSpecificRRHandlerTable == NULL) {
00272     fSpecificRRHandlerTable = new AddressPortLookupTable;
00273   }
00274   RRHandlerRecord* existingRecord = (RRHandlerRecord*)fSpecificRRHandlerTable->Add(fromAddress, (~0), fromPort, rrHandler);
00275   delete existingRecord; // if any
00276 
00277 }
00278 
00279 void RTCPInstance
00280 ::unsetSpecificRRHandler(netAddressBits fromAddress, Port fromPort) {
00281   if (fSpecificRRHandlerTable == NULL) return;
00282 
00283   RRHandlerRecord* rrHandler
00284     = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddress, (~0), fromPort));
00285   if (rrHandler != NULL) {
00286     fSpecificRRHandlerTable->Remove(fromAddress, (~0), fromPort);
00287     delete rrHandler;
00288   }
00289 }
00290 
00291 void RTCPInstance::setStreamSocket(int sockNum,
00292                                    unsigned char streamChannelId) {
00293   // Turn off background read handling:
00294   fRTCPInterface.stopNetworkReading();
00295 
00296   // Switch to RTCP-over-TCP:
00297   fRTCPInterface.setStreamSocket(sockNum, streamChannelId);
00298 
00299   // Turn background reading back on:
00300   TaskScheduler::BackgroundHandlerProc* handler
00301     = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00302   fRTCPInterface.startNetworkReading(handler);
00303 }
00304 
00305 void RTCPInstance::addStreamSocket(int sockNum,
00306                                    unsigned char streamChannelId) {
00307   // First, turn off background read handling for the default (UDP) socket:
00308   envir().taskScheduler().turnOffBackgroundReadHandling(fRTCPInterface.gs()->socketNum());
00309 
00310   // Add the RTCP-over-TCP interface:
00311   fRTCPInterface.addStreamSocket(sockNum, streamChannelId);
00312 
00313   // Turn on background reading for this socket (in case it's not on already):
00314   TaskScheduler::BackgroundHandlerProc* handler
00315     = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00316   fRTCPInterface.startNetworkReading(handler);
00317 }
00318 
00319 void RTCPInstance
00320 ::injectReport(u_int8_t const* packet, unsigned packetSize, struct sockaddr_in const& fromAddress) {
00321   if (packetSize > maxRTCPPacketSize) packetSize = maxRTCPPacketSize;
00322   memmove(fInBuf, packet, packetSize);
00323   processIncomingReport(packetSize, fromAddress);
00324 }
00325 
00326 static unsigned const IP_UDP_HDR_SIZE = 28;
00327     // overhead (bytes) of IP and UDP hdrs
00328 
00329 #define ADVANCE(n) pkt += (n); packetSize -= (n)
00330 
00331 void RTCPInstance::incomingReportHandler(RTCPInstance* instance,
00332                                          int /*mask*/) {
00333   instance->incomingReportHandler1();
00334 }
00335 
00336 void RTCPInstance::incomingReportHandler1() {
00337   do {
00338     unsigned packetSize = 0;
00339     unsigned numBytesRead;
00340     struct sockaddr_in fromAddress;
00341     Boolean packetReadWasIncomplete;
00342     if (fNumBytesAlreadyRead >= maxRTCPPacketSize) {
00343       envir() << "RTCPInstance error: Hit limit when reading incoming packet over TCP. Increase \"maxRTCPPacketSize\"\n";
00344       break;
00345     }
00346     Boolean readResult
00347       = fRTCPInterface.handleRead(&fInBuf[fNumBytesAlreadyRead], maxRTCPPacketSize - fNumBytesAlreadyRead,
00348                                   numBytesRead, fromAddress, packetReadWasIncomplete);
00349     if (packetReadWasIncomplete) {
00350       fNumBytesAlreadyRead += numBytesRead;
00351       return; // more reads are needed to get the entire packet
00352     } else { // normal case: We've read the entire packet 
00353       packetSize = fNumBytesAlreadyRead + numBytesRead;
00354       fNumBytesAlreadyRead = 0; // for next time
00355     }
00356     if (!readResult) break;
00357 
00358     // Ignore the packet if it was looped-back from ourself:
00359     Boolean packetWasFromOurHost = False;
00360     if (RTCPgs()->wasLoopedBackFromUs(envir(), fromAddress)) {
00361       packetWasFromOurHost = True;
00362       // However, we still want to handle incoming RTCP packets from
00363       // *other processes* on the same machine.  To distinguish this
00364       // case from a true loop-back, check whether we've just sent a
00365       // packet of the same size.  (This check isn't perfect, but it seems
00366       // to be the best we can do.)
00367       if (fHaveJustSentPacket && fLastPacketSentSize == packetSize) {
00368         // This is a true loop-back:
00369         fHaveJustSentPacket = False;
00370         break; // ignore this packet
00371       }
00372     }
00373 
00374     if (fIsSSMSource && !packetWasFromOurHost) {
00375       // This packet is assumed to have been received via unicast (because we're a SSM source,
00376       // and SSM receivers send back RTCP "RR" packets via unicast).
00377       // 'Reflect' the packet by resending it to the multicast group, so that any other receivers
00378       // can also get to see it.
00379 
00380       // NOTE: Denial-of-service attacks are possible here.
00381       // Users of this software may wish to add their own,
00382       // application-specific mechanism for 'authenticating' the
00383       // validity of this packet before reflecting it.
00384 
00385       // NOTE: The test for "!packetWasFromOurHost" means that we won't reflect RTCP packets
00386       // that come from other processes on the same host as us.  The reason for this is that the
00387       // 'packet size' test above is not 100% reliable; some packets that were truly looped back
00388       // from us might not be detected as such, and this might lead to infinite
00389       // forwarding/receiving of some packets.  To avoid this possibility, we reflect only
00390       // RTCP packets that we know for sure originated elsewhere.
00391       // (Note, though, that if we ever re-enable the code in "Groupsock::multicastSendOnly()",
00392       // then we could remove the test for "!packetWasFromOurHost".)
00393       fRTCPInterface.sendPacket(fInBuf, packetSize);
00394       fHaveJustSentPacket = True;
00395       fLastPacketSentSize = packetSize;
00396     }
00397 
00398     processIncomingReport(packetSize, fromAddress);
00399   } while (0);
00400 }
00401 
00402 void RTCPInstance
00403 ::processIncomingReport(unsigned packetSize, struct sockaddr_in const& fromAddress) {
00404   do {
00405     Boolean callByeHandler = False;
00406     int tcpReadStreamSocketNum = fRTCPInterface.nextTCPReadStreamSocketNum();
00407     unsigned char tcpReadStreamChannelId = fRTCPInterface.nextTCPReadStreamChannelId();
00408     unsigned char* pkt = fInBuf;
00409 
00410 #ifdef DEBUG
00411     fprintf(stderr, "[%p]saw incoming RTCP packet", this);
00412     if (tcpReadStreamSocketNum < 0) {
00413       // Note that "fromAddress" is valid only if we're receiving over UDP (not over TCP):
00414       fprintf(stderr, " (from address %s, port %d)", AddressString(fromAddress).val(), ntohs(fromAddress.sin_port));
00415     }
00416     fprintf(stderr, "\n");
00417     for (unsigned i = 0; i < packetSize; ++i) {
00418       if (i%4 == 0) fprintf(stderr, " ");
00419       fprintf(stderr, "%02x", pkt[i]);
00420     }
00421     fprintf(stderr, "\n");
00422 #endif
00423     int totPacketSize = IP_UDP_HDR_SIZE + packetSize;
00424 
00425     // Check the RTCP packet for validity:
00426     // It must at least contain a header (4 bytes), and this header
00427     // must be version=2, with no padding bit, and a payload type of
00428     // SR (200) or RR (201):
00429     if (packetSize < 4) break;
00430     unsigned rtcpHdr = ntohl(*(u_int32_t*)pkt);
00431     if ((rtcpHdr & 0xE0FE0000) != (0x80000000 | (RTCP_PT_SR<<16))) {
00432 #ifdef DEBUG
00433       fprintf(stderr, "rejected bad RTCP packet: header 0x%08x\n", rtcpHdr);
00434 #endif
00435       break;
00436     }
00437 
00438     // Process each of the individual RTCP 'subpackets' in (what may be)
00439     // a compound RTCP packet.
00440     int typeOfPacket = PACKET_UNKNOWN_TYPE;
00441     unsigned reportSenderSSRC = 0;
00442     Boolean packetOK = False;
00443     while (1) {
00444       unsigned rc = (rtcpHdr>>24)&0x1F;
00445       unsigned pt = (rtcpHdr>>16)&0xFF;
00446       unsigned length = 4*(rtcpHdr&0xFFFF); // doesn't count hdr
00447       ADVANCE(4); // skip over the header
00448       if (length > packetSize) break;
00449 
00450       // Assume that each RTCP subpacket begins with a 4-byte SSRC:
00451       if (length < 4) break; length -= 4;
00452       reportSenderSSRC = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00453 
00454       Boolean subPacketOK = False;
00455       switch (pt) {
00456         case RTCP_PT_SR: {
00457 #ifdef DEBUG
00458           fprintf(stderr, "SR\n");
00459 #endif
00460           if (length < 20) break; length -= 20;
00461 
00462           // Extract the NTP timestamp, and note this:
00463           unsigned NTPmsw = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00464           unsigned NTPlsw = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00465           unsigned rtpTimestamp = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00466           if (fSource != NULL) {
00467             RTPReceptionStatsDB& receptionStats
00468               = fSource->receptionStatsDB();
00469             receptionStats.noteIncomingSR(reportSenderSSRC,
00470                                           NTPmsw, NTPlsw, rtpTimestamp);
00471           }
00472           ADVANCE(8); // skip over packet count, octet count
00473 
00474           // If a 'SR handler' was set, call it now:
00475           if (fSRHandlerTask != NULL) (*fSRHandlerTask)(fSRHandlerClientData);
00476 
00477           // The rest of the SR is handled like a RR (so, no "break;" here)
00478         }
00479         case RTCP_PT_RR: {
00480 #ifdef DEBUG
00481           fprintf(stderr, "RR\n");
00482 #endif
00483           unsigned reportBlocksSize = rc*(6*4);
00484           if (length < reportBlocksSize) break;
00485           length -= reportBlocksSize;
00486 
00487           if (fSink != NULL) {
00488             // Use this information to update stats about our transmissions:
00489             RTPTransmissionStatsDB& transmissionStats = fSink->transmissionStatsDB();
00490             for (unsigned i = 0; i < rc; ++i) {
00491               unsigned senderSSRC = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00492               // We care only about reports about our own transmission, not others'
00493               if (senderSSRC == fSink->SSRC()) {
00494                 unsigned lossStats = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00495                 unsigned highestReceived = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00496                 unsigned jitter = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00497                 unsigned timeLastSR = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00498                 unsigned timeSinceLastSR = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00499                 transmissionStats.noteIncomingRR(reportSenderSSRC, fromAddress,
00500                                                  lossStats,
00501                                                  highestReceived, jitter,
00502                                                  timeLastSR, timeSinceLastSR);
00503               } else {
00504                 ADVANCE(4*5);
00505               }
00506             }
00507           } else {
00508             ADVANCE(reportBlocksSize);
00509           }
00510 
00511           if (pt == RTCP_PT_RR) { // i.e., we didn't fall through from 'SR'
00512             // If a 'RR handler' was set, call it now:
00513 
00514             // Specific RR handler:
00515             if (fSpecificRRHandlerTable != NULL) {
00516               netAddressBits fromAddr;
00517               portNumBits fromPortNum;
00518               if (tcpReadStreamSocketNum < 0) {
00519                 // Normal case: We read the RTCP packet over UDP
00520                 fromAddr = fromAddress.sin_addr.s_addr;
00521                 fromPortNum = ntohs(fromAddress.sin_port);
00522               } else {
00523                 // Special case: We read the RTCP packet over TCP (interleaved)
00524                 // Hack: Use the TCP socket and channel id to look up the handler
00525                 fromAddr = tcpReadStreamSocketNum;
00526                 fromPortNum = tcpReadStreamChannelId;
00527               }
00528               Port fromPort(fromPortNum);
00529               RRHandlerRecord* rrHandler
00530                 = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddr, (~0), fromPort));
00531               if (rrHandler != NULL) {
00532                 if (rrHandler->rrHandlerTask != NULL) {
00533                   (*(rrHandler->rrHandlerTask))(rrHandler->rrHandlerClientData);
00534                 }
00535               }
00536             }
00537 
00538             // General RR handler:
00539             if (fRRHandlerTask != NULL) (*fRRHandlerTask)(fRRHandlerClientData);
00540           }
00541 
00542           subPacketOK = True;
00543           typeOfPacket = PACKET_RTCP_REPORT;
00544           break;
00545         }
00546         case RTCP_PT_BYE: {
00547 #ifdef DEBUG
00548           fprintf(stderr, "BYE\n");
00549 #endif
00550           // If a 'BYE handler' was set, arrange for it to be called at the end of this routine.
00551           // (Note: We don't call it immediately, in case it happens to cause "this" to be deleted.)
00552           if (fByeHandlerTask != NULL
00553               && (!fByeHandleActiveParticipantsOnly
00554                   || (fSource != NULL
00555                       && fSource->receptionStatsDB().lookup(reportSenderSSRC) != NULL)
00556                   || (fSink != NULL
00557                       && fSink->transmissionStatsDB().lookup(reportSenderSSRC) != NULL))) {
00558             callByeHandler = True;
00559           }
00560 
00561           // We should really check for & handle >1 SSRCs being present #####
00562 
00563           subPacketOK = True;
00564           typeOfPacket = PACKET_BYE;
00565           break;
00566         }
00567         // Later handle SDES, APP, and compound RTCP packets #####
00568         default:
00569 #ifdef DEBUG
00570           fprintf(stderr, "UNSUPPORTED TYPE(0x%x)\n", pt);
00571 #endif
00572           subPacketOK = True;
00573           break;
00574       }
00575       if (!subPacketOK) break;
00576 
00577       // need to check for (& handle) SSRC collision! #####
00578 
00579 #ifdef DEBUG
00580       fprintf(stderr, "validated RTCP subpacket (type %d): %d, %d, %d, 0x%08x\n", typeOfPacket, rc, pt, length, reportSenderSSRC);
00581 #endif
00582 
00583       // Skip over any remaining bytes in this subpacket:
00584       ADVANCE(length);
00585 
00586       // Check whether another RTCP 'subpacket' follows:
00587       if (packetSize == 0) {
00588         packetOK = True;
00589         break;
00590       } else if (packetSize < 4) {
00591 #ifdef DEBUG
00592         fprintf(stderr, "extraneous %d bytes at end of RTCP packet!\n", packetSize);
00593 #endif
00594         break;
00595       }
00596       rtcpHdr = ntohl(*(u_int32_t*)pkt);
00597       if ((rtcpHdr & 0xC0000000) != 0x80000000) {
00598 #ifdef DEBUG
00599         fprintf(stderr, "bad RTCP subpacket: header 0x%08x\n", rtcpHdr);
00600 #endif
00601         break;
00602       }
00603     }
00604 
00605     if (!packetOK) {
00606 #ifdef DEBUG
00607       fprintf(stderr, "rejected bad RTCP subpacket: header 0x%08x\n", rtcpHdr);
00608 #endif
00609       break;
00610     } else {
00611 #ifdef DEBUG
00612       fprintf(stderr, "validated entire RTCP packet\n");
00613 #endif
00614     }
00615 
00616     onReceive(typeOfPacket, totPacketSize, reportSenderSSRC);
00617 
00618     // Finally, if we need to call a "BYE" handler, do so now (in case it causes "this" to get deleted):
00619     if (callByeHandler && fByeHandlerTask != NULL/*sanity check*/) {
00620       TaskFunc* byeHandler = fByeHandlerTask;
00621       fByeHandlerTask = NULL; // because we call the handler only once, by default
00622       (*byeHandler)(fByeHandlerClientData);
00623     }
00624   } while (0);
00625 }
00626 
00627 void RTCPInstance::onReceive(int typeOfPacket, int totPacketSize,
00628                              unsigned ssrc) {
00629   fTypeOfPacket = typeOfPacket;
00630   fLastReceivedSize = totPacketSize;
00631   fLastReceivedSSRC = ssrc;
00632 
00633   int members = (int)numMembers();
00634   int senders = (fSink != NULL) ? 1 : 0;
00635 
00636   OnReceive(this, // p
00637             this, // e
00638             &members, // members
00639             &fPrevNumMembers, // pmembers
00640             &senders, // senders
00641             &fAveRTCPSize, // avg_rtcp_size
00642             &fPrevReportTime, // tp
00643             dTimeNow(), // tc
00644             fNextReportTime);
00645 }
00646 
00647 void RTCPInstance::sendReport() {
00648 #ifdef DEBUG
00649   fprintf(stderr, "sending REPORT\n");
00650 #endif
00651   // Begin by including a SR and/or RR report:
00652   if (!addReport()) return;
00653 
00654   // Then, include a SDES:
00655   addSDES();
00656 
00657   // Send the report:
00658   sendBuiltPacket();
00659 
00660   // Periodically clean out old members from our SSRC membership database:
00661   const unsigned membershipReapPeriod = 5;
00662   if ((++fOutgoingReportCount) % membershipReapPeriod == 0) {
00663     unsigned threshold = fOutgoingReportCount - membershipReapPeriod;
00664     fKnownMembers->reapOldMembers(threshold);
00665   }
00666 }
00667 
00668 void RTCPInstance::sendBYE() {
00669 #ifdef DEBUG
00670   fprintf(stderr, "sending BYE\n");
00671 #endif
00672   // The packet must begin with a SR and/or RR report:
00673   (void)addReport(True);
00674 
00675   addBYE();
00676   sendBuiltPacket();
00677 }
00678 
00679 void RTCPInstance::sendBuiltPacket() {
00680 #ifdef DEBUG
00681   fprintf(stderr, "sending RTCP packet\n");
00682   unsigned char* p = fOutBuf->packet();
00683   for (unsigned i = 0; i < fOutBuf->curPacketSize(); ++i) {
00684     if (i%4 == 0) fprintf(stderr," ");
00685     fprintf(stderr, "%02x", p[i]);
00686   }
00687   fprintf(stderr, "\n");
00688 #endif
00689   unsigned reportSize = fOutBuf->curPacketSize();
00690   fRTCPInterface.sendPacket(fOutBuf->packet(), reportSize);
00691   fOutBuf->resetOffset();
00692 
00693   fLastSentSize = IP_UDP_HDR_SIZE + reportSize;
00694   fHaveJustSentPacket = True;
00695   fLastPacketSentSize = reportSize;
00696 }
00697 
00698 int RTCPInstance::checkNewSSRC() {
00699   return fKnownMembers->noteMembership(fLastReceivedSSRC,
00700                                        fOutgoingReportCount);
00701 }
00702 
00703 void RTCPInstance::removeLastReceivedSSRC() {
00704   removeSSRC(fLastReceivedSSRC, False/*keep stats around*/);
00705 }
00706 
00707 void RTCPInstance::removeSSRC(u_int32_t ssrc, Boolean alsoRemoveStats) {
00708   fKnownMembers->remove(ssrc);
00709 
00710   if (alsoRemoveStats) {
00711     // Also, remove records of this SSRC from any reception or transmission stats
00712     if (fSource != NULL) fSource->receptionStatsDB().removeRecord(ssrc);
00713     if (fSink != NULL) fSink->transmissionStatsDB().removeRecord(ssrc);
00714   }
00715 }
00716 
00717 void RTCPInstance::onExpire(RTCPInstance* instance) {
00718   instance->onExpire1();
00719 }
00720 
00721 // Member functions to build specific kinds of report:
00722 
00723 Boolean RTCPInstance::addReport(Boolean alwaysAdd) {
00724   // Include a SR or a RR, depending on whether we have an associated sink or source:
00725   if (fSink != NULL) {
00726     if (!alwaysAdd) {
00727       if (!fSink->enableRTCPReports()) return False;
00728 
00729       // Hack: Don't send a SR during those (brief) times when the timestamp of the
00730       // next outgoing RTP packet has been preset, to ensure that that timestamp gets
00731       // used for that outgoing packet. (David Bertrand, 2006.07.18)
00732       if (fSink->nextTimestampHasBeenPreset()) return False;
00733     }
00734 
00735     addSR();
00736   } else if (fSource != NULL) {
00737     if (!alwaysAdd) {
00738       if (!fSource->enableRTCPReports()) return False;
00739     }
00740 
00741     addRR();
00742   }
00743 
00744   return True;
00745 }
00746 
00747 void RTCPInstance::addSR() {
00748   // ASSERT: fSink != NULL
00749 
00750   enqueueCommonReportPrefix(RTCP_PT_SR, fSink->SSRC(),
00751                             5 /* extra words in a SR */);
00752 
00753   // Now, add the 'sender info' for our sink
00754 
00755   // Insert the NTP and RTP timestamps for the 'wallclock time':
00756   struct timeval timeNow;
00757   gettimeofday(&timeNow, NULL);
00758   fOutBuf->enqueueWord(timeNow.tv_sec + 0x83AA7E80);
00759       // NTP timestamp most-significant word (1970 epoch -> 1900 epoch)
00760   double fractionalPart = (timeNow.tv_usec/15625.0)*0x04000000; // 2^32/10^6
00761   fOutBuf->enqueueWord((unsigned)(fractionalPart+0.5));
00762       // NTP timestamp least-significant word
00763   unsigned rtpTimestamp = fSink->convertToRTPTimestamp(timeNow);
00764   fOutBuf->enqueueWord(rtpTimestamp); // RTP ts
00765 
00766   // Insert the packet and byte counts:
00767   fOutBuf->enqueueWord(fSink->packetCount());
00768   fOutBuf->enqueueWord(fSink->octetCount());
00769 
00770   enqueueCommonReportSuffix();
00771 }
00772 
00773 void RTCPInstance::addRR() {
00774   // ASSERT: fSource != NULL
00775 
00776   enqueueCommonReportPrefix(RTCP_PT_RR, fSource->SSRC());
00777   enqueueCommonReportSuffix();
00778 }
00779 
00780 void RTCPInstance::enqueueCommonReportPrefix(unsigned char packetType,
00781                                              unsigned SSRC,
00782                                              unsigned numExtraWords) {
00783   unsigned numReportingSources;
00784   if (fSource == NULL) {
00785     numReportingSources = 0; // we don't receive anything
00786   } else {
00787     RTPReceptionStatsDB& allReceptionStats
00788       = fSource->receptionStatsDB();
00789     numReportingSources = allReceptionStats.numActiveSourcesSinceLastReset();
00790     // This must be <32, to fit in 5 bits:
00791     if (numReportingSources >= 32) { numReportingSources = 32; }
00792     // Later: support adding more reports to handle >32 sources (unlikely)#####
00793   }
00794 
00795   unsigned rtcpHdr = 0x80000000; // version 2, no padding
00796   rtcpHdr |= (numReportingSources<<24);
00797   rtcpHdr |= (packetType<<16);
00798   rtcpHdr |= (1 + numExtraWords + 6*numReportingSources);
00799       // each report block is 6 32-bit words long
00800   fOutBuf->enqueueWord(rtcpHdr);
00801 
00802   fOutBuf->enqueueWord(SSRC);
00803 }
00804 
00805 void RTCPInstance::enqueueCommonReportSuffix() {
00806   // Output the report blocks for each source:
00807   if (fSource != NULL) {
00808     RTPReceptionStatsDB& allReceptionStats
00809       = fSource->receptionStatsDB();
00810 
00811     RTPReceptionStatsDB::Iterator iterator(allReceptionStats);
00812     while (1) {
00813       RTPReceptionStats* receptionStats = iterator.next();
00814       if (receptionStats == NULL) break;
00815       enqueueReportBlock(receptionStats);
00816     }
00817 
00818     allReceptionStats.reset(); // because we have just generated a report
00819   }
00820 }
00821 
00822 void
00823 RTCPInstance::enqueueReportBlock(RTPReceptionStats* stats) {
00824   fOutBuf->enqueueWord(stats->SSRC());
00825 
00826   unsigned highestExtSeqNumReceived = stats->highestExtSeqNumReceived();
00827 
00828   unsigned totNumExpected
00829     = highestExtSeqNumReceived - stats->baseExtSeqNumReceived();
00830   int totNumLost = totNumExpected - stats->totNumPacketsReceived();
00831   // 'Clamp' this loss number to a 24-bit signed value:
00832   if (totNumLost > 0x007FFFFF) {
00833     totNumLost = 0x007FFFFF;
00834   } else if (totNumLost < 0) {
00835     if (totNumLost < -0x00800000) totNumLost = 0x00800000; // unlikely, but...
00836     totNumLost &= 0x00FFFFFF;
00837   }
00838 
00839   unsigned numExpectedSinceLastReset
00840     = highestExtSeqNumReceived - stats->lastResetExtSeqNumReceived();
00841   int numLostSinceLastReset
00842     = numExpectedSinceLastReset - stats->numPacketsReceivedSinceLastReset();
00843   unsigned char lossFraction;
00844   if (numExpectedSinceLastReset == 0 || numLostSinceLastReset < 0) {
00845     lossFraction = 0;
00846   } else {
00847     lossFraction = (unsigned char)
00848       ((numLostSinceLastReset << 8) / numExpectedSinceLastReset);
00849   }
00850 
00851   fOutBuf->enqueueWord((lossFraction<<24) | totNumLost);
00852   fOutBuf->enqueueWord(highestExtSeqNumReceived);
00853 
00854   fOutBuf->enqueueWord(stats->jitter());
00855 
00856   unsigned NTPmsw = stats->lastReceivedSR_NTPmsw();
00857   unsigned NTPlsw = stats->lastReceivedSR_NTPlsw();
00858   unsigned LSR = ((NTPmsw&0xFFFF)<<16)|(NTPlsw>>16); // middle 32 bits
00859   fOutBuf->enqueueWord(LSR);
00860 
00861   // Figure out how long has elapsed since the last SR rcvd from this src:
00862   struct timeval const& LSRtime = stats->lastReceivedSR_time(); // "last SR"
00863   struct timeval timeNow, timeSinceLSR;
00864   gettimeofday(&timeNow, NULL);
00865   if (timeNow.tv_usec < LSRtime.tv_usec) {
00866     timeNow.tv_usec += 1000000;
00867     timeNow.tv_sec -= 1;
00868   }
00869   timeSinceLSR.tv_sec = timeNow.tv_sec - LSRtime.tv_sec;
00870   timeSinceLSR.tv_usec = timeNow.tv_usec - LSRtime.tv_usec;
00871   // The enqueued time is in units of 1/65536 seconds.
00872   // (Note that 65536/1000000 == 1024/15625)
00873   unsigned DLSR;
00874   if (LSR == 0) {
00875     DLSR = 0;
00876   } else {
00877     DLSR = (timeSinceLSR.tv_sec<<16)
00878          | ( (((timeSinceLSR.tv_usec<<11)+15625)/31250) & 0xFFFF);
00879   }
00880   fOutBuf->enqueueWord(DLSR);
00881 }
00882 
00883 void RTCPInstance::addSDES() {
00884   // For now we support only the CNAME item; later support more #####
00885 
00886   // Begin by figuring out the size of the entire SDES report:
00887   unsigned numBytes = 4;
00888       // counts the SSRC, but not the header; it'll get subtracted out
00889   numBytes += fCNAME.totalSize(); // includes id and length
00890   numBytes += 1; // the special END item
00891 
00892   unsigned num4ByteWords = (numBytes + 3)/4;
00893 
00894   unsigned rtcpHdr = 0x81000000; // version 2, no padding, 1 SSRC chunk
00895   rtcpHdr |= (RTCP_PT_SDES<<16);
00896   rtcpHdr |= num4ByteWords;
00897   fOutBuf->enqueueWord(rtcpHdr);
00898 
00899   if (fSource != NULL) {
00900     fOutBuf->enqueueWord(fSource->SSRC());
00901   } else if (fSink != NULL) {
00902     fOutBuf->enqueueWord(fSink->SSRC());
00903   }
00904 
00905   // Add the CNAME:
00906   fOutBuf->enqueue(fCNAME.data(), fCNAME.totalSize());
00907 
00908   // Add the 'END' item (i.e., a zero byte), plus any more needed to pad:
00909   unsigned numPaddingBytesNeeded = 4 - (fOutBuf->curPacketSize() % 4);
00910   unsigned char const zero = '\0';
00911   while (numPaddingBytesNeeded-- > 0) fOutBuf->enqueue(&zero, 1);
00912 }
00913 
00914 void RTCPInstance::addBYE() {
00915   unsigned rtcpHdr = 0x81000000; // version 2, no padding, 1 SSRC
00916   rtcpHdr |= (RTCP_PT_BYE<<16);
00917   rtcpHdr |= 1; // 2 32-bit words total (i.e., with 1 SSRC)
00918   fOutBuf->enqueueWord(rtcpHdr);
00919 
00920   if (fSource != NULL) {
00921     fOutBuf->enqueueWord(fSource->SSRC());
00922   } else if (fSink != NULL) {
00923     fOutBuf->enqueueWord(fSink->SSRC());
00924   }
00925 }
00926 
00927 void RTCPInstance::schedule(double nextTime) {
00928   fNextReportTime = nextTime;
00929 
00930   double secondsToDelay = nextTime - dTimeNow();
00931   if (secondsToDelay < 0) secondsToDelay = 0;
00932 #ifdef DEBUG
00933   fprintf(stderr, "schedule(%f->%f)\n", secondsToDelay, nextTime);
00934 #endif
00935   int64_t usToGo = (int64_t)(secondsToDelay * 1000000);
00936   nextTask() = envir().taskScheduler().scheduleDelayedTask(usToGo,
00937                                 (TaskFunc*)RTCPInstance::onExpire, this);
00938 }
00939 
00940 void RTCPInstance::reschedule(double nextTime) {
00941   envir().taskScheduler().unscheduleDelayedTask(nextTask());
00942   schedule(nextTime);
00943 }
00944 
00945 void RTCPInstance::onExpire1() {
00946   // Note: fTotSessionBW is kbits per second
00947   double rtcpBW = 0.05*fTotSessionBW*1024/8; // -> bytes per second
00948 
00949   OnExpire(this, // event
00950            numMembers(), // members
00951            (fSink != NULL) ? 1 : 0, // senders
00952            rtcpBW, // rtcp_bw
00953            (fSink != NULL) ? 1 : 0, // we_sent
00954            &fAveRTCPSize, // ave_rtcp_size
00955            &fIsInitial, // initial
00956            dTimeNow(), // tc
00957            &fPrevReportTime, // tp
00958            &fPrevNumMembers // pmembers
00959            );
00960 }
00961 
00963 
00964 SDESItem::SDESItem(unsigned char tag, unsigned char const* value) {
00965   unsigned length = strlen((char const*)value);
00966   if (length > 0xFF) length = 0xFF; // maximum data length for a SDES item
00967 
00968   fData[0] = tag;
00969   fData[1] = (unsigned char)length;
00970   memmove(&fData[2], value, length);
00971 }
00972 
00973 unsigned SDESItem::totalSize() const {
00974   return 2 + (unsigned)fData[1];
00975 }
00976 
00977 
00979 
00980 extern "C" void Schedule(double nextTime, event e) {
00981   RTCPInstance* instance = (RTCPInstance*)e;
00982   if (instance == NULL) return;
00983 
00984   instance->schedule(nextTime);
00985 }
00986 
00987 extern "C" void Reschedule(double nextTime, event e) {
00988   RTCPInstance* instance = (RTCPInstance*)e;
00989   if (instance == NULL) return;
00990 
00991   instance->reschedule(nextTime);
00992 }
00993 
00994 extern "C" void SendRTCPReport(event e) {
00995   RTCPInstance* instance = (RTCPInstance*)e;
00996   if (instance == NULL) return;
00997 
00998   instance->sendReport();
00999 }
01000 
01001 extern "C" void SendBYEPacket(event e) {
01002   RTCPInstance* instance = (RTCPInstance*)e;
01003   if (instance == NULL) return;
01004 
01005   instance->sendBYE();
01006 }
01007 
01008 extern "C" int TypeOfEvent(event e) {
01009   RTCPInstance* instance = (RTCPInstance*)e;
01010   if (instance == NULL) return EVENT_UNKNOWN;
01011 
01012   return instance->typeOfEvent();
01013 }
01014 
01015 extern "C" int SentPacketSize(event e) {
01016   RTCPInstance* instance = (RTCPInstance*)e;
01017   if (instance == NULL) return 0;
01018 
01019   return instance->sentPacketSize();
01020 }
01021 
01022 extern "C" int PacketType(packet p) {
01023   RTCPInstance* instance = (RTCPInstance*)p;
01024   if (instance == NULL) return PACKET_UNKNOWN_TYPE;
01025 
01026   return instance->packetType();
01027 }
01028 
01029 extern "C" int ReceivedPacketSize(packet p) {
01030   RTCPInstance* instance = (RTCPInstance*)p;
01031   if (instance == NULL) return 0;
01032 
01033   return instance->receivedPacketSize();
01034 }
01035 
01036 extern "C" int NewMember(packet p) {
01037   RTCPInstance* instance = (RTCPInstance*)p;
01038   if (instance == NULL) return 0;
01039 
01040   return instance->checkNewSSRC();
01041 }
01042 
01043 extern "C" int NewSender(packet /*p*/) {
01044   return 0; // we don't yet recognize senders other than ourselves #####
01045 }
01046 
01047 extern "C" void AddMember(packet /*p*/) {
01048   // Do nothing; all of the real work was done when NewMember() was called
01049 }
01050 
01051 extern "C" void AddSender(packet /*p*/) {
01052   // we don't yet recognize senders other than ourselves #####
01053 }
01054 
01055 extern "C" void RemoveMember(packet p) {
01056   RTCPInstance* instance = (RTCPInstance*)p;
01057   if (instance == NULL) return;
01058 
01059   instance->removeLastReceivedSSRC();
01060 }
01061 
01062 extern "C" void RemoveSender(packet /*p*/) {
01063   // we don't yet recognize senders other than ourselves #####
01064 }
01065 
01066 extern "C" double drand30() {
01067   unsigned tmp = our_random()&0x3FFFFFFF; // a random 30-bit integer
01068   return tmp/(double)(1024*1024*1024);
01069 }

Generated on Tue Mar 25 14:35:36 2014 for live by  doxygen 1.5.2