00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "RTCP.hh"
00022 #include "GroupsockHelper.hh"
00023 #include "rtcp_from_spec.h"
00024
00026
00027 class RTCPMemberDatabase {
00028 public:
00029 RTCPMemberDatabase(RTCPInstance& ourRTCPInstance)
00030 : fOurRTCPInstance(ourRTCPInstance), fNumMembers(1 ),
00031 fTable(HashTable::create(ONE_WORD_HASH_KEYS)) {
00032 }
00033
00034 virtual ~RTCPMemberDatabase() {
00035 delete fTable;
00036 }
00037
00038 Boolean isMember(unsigned ssrc) const {
00039 return fTable->Lookup((char*)(long)ssrc) != NULL;
00040 }
00041
00042 Boolean noteMembership(unsigned ssrc, unsigned curTimeCount) {
00043 Boolean isNew = !isMember(ssrc);
00044
00045 if (isNew) {
00046 ++fNumMembers;
00047 }
00048
00049
00050 fTable->Add((char*)(long)ssrc, (void*)(long)curTimeCount);
00051
00052 return isNew;
00053 }
00054
00055 Boolean remove(unsigned ssrc) {
00056 Boolean wasPresent = fTable->Remove((char*)(long)ssrc);
00057 if (wasPresent) {
00058 --fNumMembers;
00059 }
00060 return wasPresent;
00061 }
00062
00063 unsigned numMembers() const {
00064 return fNumMembers;
00065 }
00066
00067 void reapOldMembers(unsigned threshold);
00068
00069 private:
00070 RTCPInstance& fOurRTCPInstance;
00071 unsigned fNumMembers;
00072 HashTable* fTable;
00073 };
00074
00075 void RTCPMemberDatabase::reapOldMembers(unsigned threshold) {
00076 Boolean foundOldMember;
00077 unsigned oldSSRC = 0;
00078
00079 do {
00080 foundOldMember = False;
00081
00082 HashTable::Iterator* iter
00083 = HashTable::Iterator::create(*fTable);
00084 unsigned long timeCount;
00085 char const* key;
00086 while ((timeCount = (unsigned long)(iter->next(key))) != 0) {
00087 #ifdef DEBUG
00088 fprintf(stderr, "reap: checking SSRC 0x%lx: %ld (threshold %d)\n", (unsigned long)key, timeCount, threshold);
00089 #endif
00090 if (timeCount < (unsigned long)threshold) {
00091 unsigned long ssrc = (unsigned long)key;
00092 oldSSRC = (unsigned)ssrc;
00093 foundOldMember = True;
00094 }
00095 }
00096 delete iter;
00097
00098 if (foundOldMember) {
00099 #ifdef DEBUG
00100 fprintf(stderr, "reap: removing SSRC 0x%x\n", oldSSRC);
00101 #endif
00102 fOurRTCPInstance.removeSSRC(oldSSRC, True);
00103 }
00104 } while (foundOldMember);
00105 }
00106
00107
00109
00110 static double dTimeNow() {
00111 struct timeval timeNow;
00112 gettimeofday(&timeNow, NULL);
00113 return (double) (timeNow.tv_sec + timeNow.tv_usec/1000000.0);
00114 }
00115
00116 static unsigned const maxPacketSize = 1450;
00117
00118 static unsigned const preferredPacketSize = 1000;
00119
00120 RTCPInstance::RTCPInstance(UsageEnvironment& env, Groupsock* RTCPgs,
00121 unsigned totSessionBW,
00122 unsigned char const* cname,
00123 RTPSink* sink, RTPSource const* source,
00124 Boolean isSSMSource)
00125 : Medium(env), fRTCPInterface(this, RTCPgs), fTotSessionBW(totSessionBW),
00126 fSink(sink), fSource(source), fIsSSMSource(isSSMSource),
00127 fCNAME(RTCP_SDES_CNAME, cname), fOutgoingReportCount(1),
00128 fAveRTCPSize(0), fIsInitial(1), fPrevNumMembers(0),
00129 fLastSentSize(0), fLastReceivedSize(0), fLastReceivedSSRC(0),
00130 fTypeOfEvent(EVENT_UNKNOWN), fTypeOfPacket(PACKET_UNKNOWN_TYPE),
00131 fHaveJustSentPacket(False), fLastPacketSentSize(0),
00132 fByeHandlerTask(NULL), fByeHandlerClientData(NULL),
00133 fSRHandlerTask(NULL), fSRHandlerClientData(NULL),
00134 fRRHandlerTask(NULL), fRRHandlerClientData(NULL),
00135 fSpecificRRHandlerTable(NULL) {
00136 #ifdef DEBUG
00137 fprintf(stderr, "RTCPInstance[%p]::RTCPInstance()\n", this);
00138 #endif
00139 if (fTotSessionBW == 0) {
00140 env << "RTCPInstance::RTCPInstance error: totSessionBW parameter should not be zero!\n";
00141 fTotSessionBW = 1;
00142 }
00143
00144 if (isSSMSource) RTCPgs->multicastSendOnly();
00145
00146 double timeNow = dTimeNow();
00147 fPrevReportTime = fNextReportTime = timeNow;
00148
00149 fKnownMembers = new RTCPMemberDatabase(*this);
00150 fInBuf = new unsigned char[maxPacketSize];
00151 if (fKnownMembers == NULL || fInBuf == NULL) return;
00152
00153
00154 unsigned savedMaxSize = OutPacketBuffer::maxSize;
00155 OutPacketBuffer::maxSize = maxPacketSize;
00156 fOutBuf = new OutPacketBuffer(preferredPacketSize, maxPacketSize);
00157 OutPacketBuffer::maxSize = savedMaxSize;
00158 if (fOutBuf == NULL) return;
00159
00160
00161 TaskScheduler::BackgroundHandlerProc* handler
00162 = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00163 fRTCPInterface.startNetworkReading(handler);
00164
00165
00166 fTypeOfEvent = EVENT_REPORT;
00167 onExpire(this);
00168 }
00169
00170 struct RRHandlerRecord {
00171 TaskFunc* rrHandlerTask;
00172 void* rrHandlerClientData;
00173 };
00174
00175 RTCPInstance::~RTCPInstance() {
00176 #ifdef DEBUG
00177 fprintf(stderr, "RTCPInstance[%p]::~RTCPInstance()\n", this);
00178 #endif
00179
00180 fRTCPInterface.stopNetworkReading();
00181
00182
00183
00184 fTypeOfEvent = EVENT_BYE;
00185 sendBYE();
00186
00187 if (fSpecificRRHandlerTable != NULL) {
00188 AddressPortLookupTable::Iterator iter(*fSpecificRRHandlerTable);
00189 RRHandlerRecord* rrHandler;
00190 while ((rrHandler = (RRHandlerRecord*)iter.next()) != NULL) {
00191 delete rrHandler;
00192 }
00193 delete fSpecificRRHandlerTable;
00194 }
00195
00196 delete fKnownMembers;
00197 delete fOutBuf;
00198 delete[] fInBuf;
00199 }
00200
00201 RTCPInstance* RTCPInstance::createNew(UsageEnvironment& env, Groupsock* RTCPgs,
00202 unsigned totSessionBW,
00203 unsigned char const* cname,
00204 RTPSink* sink, RTPSource const* source,
00205 Boolean isSSMSource) {
00206 return new RTCPInstance(env, RTCPgs, totSessionBW, cname, sink, source,
00207 isSSMSource);
00208 }
00209
00210 Boolean RTCPInstance::lookupByName(UsageEnvironment& env,
00211 char const* instanceName,
00212 RTCPInstance*& resultInstance) {
00213 resultInstance = NULL;
00214
00215 Medium* medium;
00216 if (!Medium::lookupByName(env, instanceName, medium)) return False;
00217
00218 if (!medium->isRTCPInstance()) {
00219 env.setResultMsg(instanceName, " is not a RTCP instance");
00220 return False;
00221 }
00222
00223 resultInstance = (RTCPInstance*)medium;
00224 return True;
00225 }
00226
00227 Boolean RTCPInstance::isRTCPInstance() const {
00228 return True;
00229 }
00230
00231 unsigned RTCPInstance::numMembers() const {
00232 if (fKnownMembers == NULL) return 0;
00233
00234 return fKnownMembers->numMembers();
00235 }
00236
00237 void RTCPInstance::setByeHandler(TaskFunc* handlerTask, void* clientData,
00238 Boolean handleActiveParticipantsOnly) {
00239 fByeHandlerTask = handlerTask;
00240 fByeHandlerClientData = clientData;
00241 fByeHandleActiveParticipantsOnly = handleActiveParticipantsOnly;
00242 }
00243
00244 void RTCPInstance::setSRHandler(TaskFunc* handlerTask, void* clientData) {
00245 fSRHandlerTask = handlerTask;
00246 fSRHandlerClientData = clientData;
00247 }
00248
00249 void RTCPInstance::setRRHandler(TaskFunc* handlerTask, void* clientData) {
00250 fRRHandlerTask = handlerTask;
00251 fRRHandlerClientData = clientData;
00252 }
00253
00254 void RTCPInstance
00255 ::setSpecificRRHandler(netAddressBits fromAddress, Port fromPort,
00256 TaskFunc* handlerTask, void* clientData) {
00257 if (handlerTask == NULL && clientData == NULL) {
00258 unsetSpecificRRHandler(fromAddress, fromPort);
00259 return;
00260 }
00261
00262 RRHandlerRecord* rrHandler = new RRHandlerRecord;
00263 rrHandler->rrHandlerTask = handlerTask;
00264 rrHandler->rrHandlerClientData = clientData;
00265 if (fSpecificRRHandlerTable == NULL) {
00266 fSpecificRRHandlerTable = new AddressPortLookupTable;
00267 }
00268 fSpecificRRHandlerTable->Add(fromAddress, (~0), fromPort, rrHandler);
00269 }
00270
00271 void RTCPInstance
00272 ::unsetSpecificRRHandler(netAddressBits fromAddress, Port fromPort) {
00273 if (fSpecificRRHandlerTable == NULL) return;
00274
00275 RRHandlerRecord* rrHandler
00276 = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddress, (~0), fromPort));
00277 if (rrHandler != NULL) {
00278 fSpecificRRHandlerTable->Remove(fromAddress, (~0), fromPort);
00279 delete rrHandler;
00280 }
00281 }
00282
00283 void RTCPInstance::setStreamSocket(int sockNum,
00284 unsigned char streamChannelId) {
00285
00286 fRTCPInterface.stopNetworkReading();
00287
00288
00289 fRTCPInterface.setStreamSocket(sockNum, streamChannelId);
00290
00291
00292 TaskScheduler::BackgroundHandlerProc* handler
00293 = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00294 fRTCPInterface.startNetworkReading(handler);
00295 }
00296
00297 void RTCPInstance::addStreamSocket(int sockNum,
00298 unsigned char streamChannelId) {
00299
00300 fRTCPInterface.stopNetworkReading();
00301
00302
00303 fRTCPInterface.setStreamSocket(sockNum, streamChannelId);
00304
00305
00306 TaskScheduler::BackgroundHandlerProc* handler
00307 = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00308 fRTCPInterface.startNetworkReading(handler);
00309 }
00310
00311 static unsigned const IP_UDP_HDR_SIZE = 28;
00312
00313
00314 #define ADVANCE(n) pkt += (n); packetSize -= (n)
00315
00316 void RTCPInstance::incomingReportHandler(RTCPInstance* instance,
00317 int ) {
00318 instance->incomingReportHandler1();
00319 }
00320
00321 void RTCPInstance::incomingReportHandler1() {
00322 unsigned char* pkt = fInBuf;
00323 unsigned packetSize;
00324 struct sockaddr_in fromAddress;
00325 int typeOfPacket = PACKET_UNKNOWN_TYPE;
00326
00327 do {
00328 int tcpReadStreamSocketNum = fRTCPInterface.nextTCPReadStreamSocketNum();
00329 unsigned char tcpReadStreamChannelId = fRTCPInterface.nextTCPReadStreamChannelId();
00330 if (!fRTCPInterface.handleRead(pkt, maxPacketSize,
00331 packetSize, fromAddress)) {
00332 break;
00333 }
00334
00335
00336 if (RTCPgs()->wasLoopedBackFromUs(envir(), fromAddress)) {
00337
00338
00339
00340
00341
00342 if (fHaveJustSentPacket && fLastPacketSentSize == packetSize) {
00343
00344 fHaveJustSentPacket = False;
00345 break;
00346 }
00347 }
00348
00349 if (fIsSSMSource) {
00350
00351
00352
00353
00354
00355
00356 fRTCPInterface.sendPacket(pkt, packetSize);
00357 fHaveJustSentPacket = True;
00358 fLastPacketSentSize = packetSize;
00359 }
00360
00361 #ifdef DEBUG
00362 fprintf(stderr, "[%p]saw incoming RTCP packet (from address %s, port %d)\n", this, our_inet_ntoa(fromAddress.sin_addr), ntohs(fromAddress.sin_port));
00363 unsigned char* p = pkt;
00364 for (unsigned i = 0; i < packetSize; ++i) {
00365 if (i%4 == 0) fprintf(stderr, " ");
00366 fprintf(stderr, "%02x", p[i]);
00367 }
00368 fprintf(stderr, "\n");
00369 #endif
00370 int totPacketSize = IP_UDP_HDR_SIZE + packetSize;
00371
00372
00373
00374
00375
00376 if (packetSize < 4) break;
00377 unsigned rtcpHdr = ntohl(*(unsigned*)pkt);
00378 if ((rtcpHdr & 0xE0FE0000) != (0x80000000 | (RTCP_PT_SR<<16))) {
00379 #ifdef DEBUG
00380 fprintf(stderr, "rejected bad RTCP packet: header 0x%08x\n", rtcpHdr);
00381 #endif
00382 break;
00383 }
00384
00385
00386
00387 unsigned reportSenderSSRC = 0;
00388 Boolean packetOK = False;
00389 while (1) {
00390 unsigned rc = (rtcpHdr>>24)&0x1F;
00391 unsigned pt = (rtcpHdr>>16)&0xFF;
00392 unsigned length = 4*(rtcpHdr&0xFFFF);
00393 ADVANCE(4);
00394 if (length > packetSize) break;
00395
00396
00397 if (length < 4) break; length -= 4;
00398 reportSenderSSRC = ntohl(*(unsigned*)pkt); ADVANCE(4);
00399
00400 Boolean subPacketOK = False;
00401 switch (pt) {
00402 case RTCP_PT_SR: {
00403 #ifdef DEBUG
00404 fprintf(stderr, "SR\n");
00405 #endif
00406 if (length < 20) break; length -= 20;
00407
00408
00409 unsigned NTPmsw = ntohl(*(unsigned*)pkt); ADVANCE(4);
00410 unsigned NTPlsw = ntohl(*(unsigned*)pkt); ADVANCE(4);
00411 unsigned rtpTimestamp = ntohl(*(unsigned*)pkt); ADVANCE(4);
00412 if (fSource != NULL) {
00413 RTPReceptionStatsDB& receptionStats
00414 = fSource->receptionStatsDB();
00415 receptionStats.noteIncomingSR(reportSenderSSRC,
00416 NTPmsw, NTPlsw, rtpTimestamp);
00417 }
00418 ADVANCE(8);
00419
00420
00421 if (fSRHandlerTask != NULL) (*fSRHandlerTask)(fSRHandlerClientData);
00422
00423
00424 }
00425 case RTCP_PT_RR: {
00426 #ifdef DEBUG
00427 fprintf(stderr, "RR\n");
00428 #endif
00429 unsigned reportBlocksSize = rc*(6*4);
00430 if (length < reportBlocksSize) break;
00431 length -= reportBlocksSize;
00432
00433 if (fSink != NULL) {
00434
00435 RTPTransmissionStatsDB& transmissionStats = fSink->transmissionStatsDB();
00436 for (unsigned i = 0; i < rc; ++i) {
00437 unsigned senderSSRC = ntohl(*(unsigned*)pkt); ADVANCE(4);
00438
00439 if (senderSSRC == fSink->SSRC()) {
00440 unsigned lossStats = ntohl(*(unsigned*)pkt); ADVANCE(4);
00441 unsigned highestReceived = ntohl(*(unsigned*)pkt); ADVANCE(4);
00442 unsigned jitter = ntohl(*(unsigned*)pkt); ADVANCE(4);
00443 unsigned timeLastSR = ntohl(*(unsigned*)pkt); ADVANCE(4);
00444 unsigned timeSinceLastSR = ntohl(*(unsigned*)pkt); ADVANCE(4);
00445 transmissionStats.noteIncomingRR(reportSenderSSRC, fromAddress,
00446 lossStats,
00447 highestReceived, jitter,
00448 timeLastSR, timeSinceLastSR);
00449 } else {
00450 ADVANCE(4*5);
00451 }
00452 }
00453 } else {
00454 ADVANCE(reportBlocksSize);
00455 }
00456
00457 if (pt == RTCP_PT_RR) {
00458
00459
00460
00461 if (fSpecificRRHandlerTable != NULL) {
00462 netAddressBits fromAddr;
00463 portNumBits fromPortNum;
00464 if (tcpReadStreamSocketNum < 0) {
00465
00466 fromAddr = fromAddress.sin_addr.s_addr;
00467 fromPortNum = ntohs(fromAddress.sin_port);
00468 } else {
00469
00470
00471 fromAddr = tcpReadStreamSocketNum;
00472 fromPortNum = tcpReadStreamChannelId;
00473 }
00474 Port fromPort(fromPortNum);
00475 RRHandlerRecord* rrHandler
00476 = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddr, (~0), fromPort));
00477 if (rrHandler != NULL) {
00478 if (rrHandler->rrHandlerTask != NULL) {
00479 (*(rrHandler->rrHandlerTask))(rrHandler->rrHandlerClientData);
00480 }
00481 }
00482 }
00483
00484
00485 if (fRRHandlerTask != NULL) (*fRRHandlerTask)(fRRHandlerClientData);
00486 }
00487
00488 subPacketOK = True;
00489 typeOfPacket = PACKET_RTCP_REPORT;
00490 break;
00491 }
00492 case RTCP_PT_BYE: {
00493 #ifdef DEBUG
00494 fprintf(stderr, "BYE\n");
00495 #endif
00496
00497 TaskFunc* byeHandler = fByeHandlerTask;
00498 if (byeHandler != NULL
00499 && (!fByeHandleActiveParticipantsOnly
00500 || (fSource != NULL
00501 && fSource->receptionStatsDB().lookup(reportSenderSSRC) != NULL)
00502 || (fSink != NULL
00503 && fSink->transmissionStatsDB().lookup(reportSenderSSRC) != NULL))) {
00504 fByeHandlerTask = NULL;
00505
00506 (*byeHandler)(fByeHandlerClientData);
00507 }
00508
00509
00510
00511 subPacketOK = True;
00512 typeOfPacket = PACKET_BYE;
00513 break;
00514 }
00515
00516 default:
00517 #ifdef DEBUG
00518 fprintf(stderr, "UNSUPPORTED TYPE(0x%x)\n", pt);
00519 #endif
00520 subPacketOK = True;
00521 break;
00522 }
00523 if (!subPacketOK) break;
00524
00525
00526
00527 #ifdef DEBUG
00528 fprintf(stderr, "validated RTCP subpacket (type %d): %d, %d, %d, 0x%08x\n", typeOfPacket, rc, pt, length, reportSenderSSRC);
00529 #endif
00530
00531
00532 ADVANCE(length);
00533
00534
00535 if (packetSize == 0) {
00536 packetOK = True;
00537 break;
00538 } else if (packetSize < 4) {
00539 #ifdef DEBUG
00540 fprintf(stderr, "extraneous %d bytes at end of RTCP packet!\n", packetSize);
00541 #endif
00542 break;
00543 }
00544 rtcpHdr = ntohl(*(unsigned*)pkt);
00545 if ((rtcpHdr & 0xC0000000) != 0x80000000) {
00546 #ifdef DEBUG
00547 fprintf(stderr, "bad RTCP subpacket: header 0x%08x\n", rtcpHdr);
00548 #endif
00549 break;
00550 }
00551 }
00552
00553 if (!packetOK) {
00554 #ifdef DEBUG
00555 fprintf(stderr, "rejected bad RTCP subpacket: header 0x%08x\n", rtcpHdr);
00556 #endif
00557 break;
00558 } else {
00559 #ifdef DEBUG
00560 fprintf(stderr, "validated entire RTCP packet\n");
00561 #endif
00562 }
00563
00564 onReceive(typeOfPacket, totPacketSize, reportSenderSSRC);
00565 } while (0);
00566 }
00567
00568 void RTCPInstance::onReceive(int typeOfPacket, int totPacketSize,
00569 unsigned ssrc) {
00570 fTypeOfPacket = typeOfPacket;
00571 fLastReceivedSize = totPacketSize;
00572 fLastReceivedSSRC = ssrc;
00573
00574 int members = (int)numMembers();
00575 int senders = (fSink != NULL) ? 1 : 0;
00576
00577 OnReceive(this,
00578 this,
00579 &members,
00580 &fPrevNumMembers,
00581 &senders,
00582 &fAveRTCPSize,
00583 &fPrevReportTime,
00584 dTimeNow(),
00585 fNextReportTime);
00586 }
00587
00588 void RTCPInstance::sendReport() {
00589
00590
00591
00592 if (fSink != NULL && fSink->nextTimestampHasBeenPreset()) return;
00593
00594 #ifdef DEBUG
00595 fprintf(stderr, "sending REPORT\n");
00596 #endif
00597
00598 addReport();
00599
00600
00601 addSDES();
00602
00603
00604 sendBuiltPacket();
00605
00606
00607 const unsigned membershipReapPeriod = 5;
00608 if ((++fOutgoingReportCount) % membershipReapPeriod == 0) {
00609 unsigned threshold = fOutgoingReportCount - membershipReapPeriod;
00610 fKnownMembers->reapOldMembers(threshold);
00611 }
00612 }
00613
00614 void RTCPInstance::sendBYE() {
00615 #ifdef DEBUG
00616 fprintf(stderr, "sending BYE\n");
00617 #endif
00618
00619 addReport();
00620
00621 addBYE();
00622 sendBuiltPacket();
00623 }
00624
00625 void RTCPInstance::sendBuiltPacket() {
00626 #ifdef DEBUG
00627 fprintf(stderr, "sending RTCP packet\n");
00628 unsigned char* p = fOutBuf->packet();
00629 for (unsigned i = 0; i < fOutBuf->curPacketSize(); ++i) {
00630 if (i%4 == 0) fprintf(stderr," ");
00631 fprintf(stderr, "%02x", p[i]);
00632 }
00633 fprintf(stderr, "\n");
00634 #endif
00635 unsigned reportSize = fOutBuf->curPacketSize();
00636 fRTCPInterface.sendPacket(fOutBuf->packet(), reportSize);
00637 fOutBuf->resetOffset();
00638
00639 fLastSentSize = IP_UDP_HDR_SIZE + reportSize;
00640 fHaveJustSentPacket = True;
00641 fLastPacketSentSize = reportSize;
00642 }
00643
00644 int RTCPInstance::checkNewSSRC() {
00645 return fKnownMembers->noteMembership(fLastReceivedSSRC,
00646 fOutgoingReportCount);
00647 }
00648
00649 void RTCPInstance::removeLastReceivedSSRC() {
00650 removeSSRC(fLastReceivedSSRC, False);
00651 }
00652
00653 void RTCPInstance::removeSSRC(u_int32_t ssrc, Boolean alsoRemoveStats) {
00654 fKnownMembers->remove(ssrc);
00655
00656 if (alsoRemoveStats) {
00657
00658 if (fSource != NULL) fSource->receptionStatsDB().removeRecord(ssrc);
00659 if (fSink != NULL) fSink->transmissionStatsDB().removeRecord(ssrc);
00660 }
00661 }
00662
00663 void RTCPInstance::onExpire(RTCPInstance* instance) {
00664 instance->onExpire1();
00665 }
00666
00667
00668
00669 void RTCPInstance::addReport() {
00670
00671
00672 if (fSink != NULL) {
00673 addSR();
00674 } else if (fSource != NULL) {
00675 addRR();
00676 }
00677 }
00678
00679 void RTCPInstance::addSR() {
00680
00681
00682 enqueueCommonReportPrefix(RTCP_PT_SR, fSink->SSRC(),
00683 5 );
00684
00685
00686
00687
00688 struct timeval timeNow;
00689 gettimeofday(&timeNow, NULL);
00690 fOutBuf->enqueueWord(timeNow.tv_sec + 0x83AA7E80);
00691
00692 double fractionalPart = (timeNow.tv_usec/15625.0)*0x04000000;
00693 fOutBuf->enqueueWord((unsigned)(fractionalPart+0.5));
00694
00695 unsigned rtpTimestamp = fSink->convertToRTPTimestamp(timeNow);
00696 fOutBuf->enqueueWord(rtpTimestamp);
00697
00698
00699 fOutBuf->enqueueWord(fSink->packetCount());
00700 fOutBuf->enqueueWord(fSink->octetCount());
00701
00702 enqueueCommonReportSuffix();
00703 }
00704
00705 void RTCPInstance::addRR() {
00706
00707
00708 enqueueCommonReportPrefix(RTCP_PT_RR, fSource->SSRC());
00709 enqueueCommonReportSuffix();
00710 }
00711
00712 void RTCPInstance::enqueueCommonReportPrefix(unsigned char packetType,
00713 unsigned SSRC,
00714 unsigned numExtraWords) {
00715 unsigned numReportingSources;
00716 if (fSource == NULL) {
00717 numReportingSources = 0;
00718 } else {
00719 RTPReceptionStatsDB& allReceptionStats
00720 = fSource->receptionStatsDB();
00721 numReportingSources = allReceptionStats.numActiveSourcesSinceLastReset();
00722
00723 if (numReportingSources >= 32) { numReportingSources = 32; }
00724
00725 }
00726
00727 unsigned rtcpHdr = 0x80000000;
00728 rtcpHdr |= (numReportingSources<<24);
00729 rtcpHdr |= (packetType<<16);
00730 rtcpHdr |= (1 + numExtraWords + 6*numReportingSources);
00731
00732 fOutBuf->enqueueWord(rtcpHdr);
00733
00734 fOutBuf->enqueueWord(SSRC);
00735 }
00736
00737 void RTCPInstance::enqueueCommonReportSuffix() {
00738
00739 if (fSource != NULL) {
00740 RTPReceptionStatsDB& allReceptionStats
00741 = fSource->receptionStatsDB();
00742
00743 RTPReceptionStatsDB::Iterator iterator(allReceptionStats);
00744 while (1) {
00745 RTPReceptionStats* receptionStats = iterator.next();
00746 if (receptionStats == NULL) break;
00747 enqueueReportBlock(receptionStats);
00748 }
00749
00750 allReceptionStats.reset();
00751 }
00752 }
00753
00754 void
00755 RTCPInstance::enqueueReportBlock(RTPReceptionStats* stats) {
00756 fOutBuf->enqueueWord(stats->SSRC());
00757
00758 unsigned highestExtSeqNumReceived = stats->highestExtSeqNumReceived();
00759
00760 unsigned totNumExpected
00761 = highestExtSeqNumReceived - stats->baseExtSeqNumReceived();
00762 int totNumLost = totNumExpected - stats->totNumPacketsReceived();
00763
00764 if (totNumLost > 0x007FFFFF) {
00765 totNumLost = 0x007FFFFF;
00766 } else if (totNumLost < 0) {
00767 if (totNumLost < -0x00800000) totNumLost = 0x00800000;
00768 totNumLost &= 0x00FFFFFF;
00769 }
00770
00771 unsigned numExpectedSinceLastReset
00772 = highestExtSeqNumReceived - stats->lastResetExtSeqNumReceived();
00773 int numLostSinceLastReset
00774 = numExpectedSinceLastReset - stats->numPacketsReceivedSinceLastReset();
00775 unsigned char lossFraction;
00776 if (numExpectedSinceLastReset == 0 || numLostSinceLastReset < 0) {
00777 lossFraction = 0;
00778 } else {
00779 lossFraction = (unsigned char)
00780 ((numLostSinceLastReset << 8) / numExpectedSinceLastReset);
00781 }
00782
00783 fOutBuf->enqueueWord((lossFraction<<24) | totNumLost);
00784 fOutBuf->enqueueWord(highestExtSeqNumReceived);
00785
00786 fOutBuf->enqueueWord(stats->jitter());
00787
00788 unsigned NTPmsw = stats->lastReceivedSR_NTPmsw();
00789 unsigned NTPlsw = stats->lastReceivedSR_NTPlsw();
00790 unsigned LSR = ((NTPmsw&0xFFFF)<<16)|(NTPlsw>>16);
00791 fOutBuf->enqueueWord(LSR);
00792
00793
00794 struct timeval const& LSRtime = stats->lastReceivedSR_time();
00795 struct timeval timeNow, timeSinceLSR;
00796 gettimeofday(&timeNow, NULL);
00797 if (timeNow.tv_usec < LSRtime.tv_usec) {
00798 timeNow.tv_usec += 1000000;
00799 timeNow.tv_sec -= 1;
00800 }
00801 timeSinceLSR.tv_sec = timeNow.tv_sec - LSRtime.tv_sec;
00802 timeSinceLSR.tv_usec = timeNow.tv_usec - LSRtime.tv_usec;
00803
00804
00805 unsigned DLSR;
00806 if (LSR == 0) {
00807 DLSR = 0;
00808 } else {
00809 DLSR = (timeSinceLSR.tv_sec<<16)
00810 | ( (((timeSinceLSR.tv_usec<<11)+15625)/31250) & 0xFFFF);
00811 }
00812 fOutBuf->enqueueWord(DLSR);
00813 }
00814
00815 void RTCPInstance::addSDES() {
00816
00817
00818
00819 unsigned numBytes = 4;
00820
00821 numBytes += fCNAME.totalSize();
00822 numBytes += 1;
00823
00824 unsigned num4ByteWords = (numBytes + 3)/4;
00825
00826 unsigned rtcpHdr = 0x81000000;
00827 rtcpHdr |= (RTCP_PT_SDES<<16);
00828 rtcpHdr |= num4ByteWords;
00829 fOutBuf->enqueueWord(rtcpHdr);
00830
00831 if (fSource != NULL) {
00832 fOutBuf->enqueueWord(fSource->SSRC());
00833 } else if (fSink != NULL) {
00834 fOutBuf->enqueueWord(fSink->SSRC());
00835 }
00836
00837
00838 fOutBuf->enqueue(fCNAME.data(), fCNAME.totalSize());
00839
00840
00841 unsigned numPaddingBytesNeeded = 4 - (fOutBuf->curPacketSize() % 4);
00842 unsigned char const zero = '\0';
00843 while (numPaddingBytesNeeded-- > 0) fOutBuf->enqueue(&zero, 1);
00844 }
00845
00846 void RTCPInstance::addBYE() {
00847 unsigned rtcpHdr = 0x81000000;
00848 rtcpHdr |= (RTCP_PT_BYE<<16);
00849 rtcpHdr |= 1;
00850 fOutBuf->enqueueWord(rtcpHdr);
00851
00852 if (fSource != NULL) {
00853 fOutBuf->enqueueWord(fSource->SSRC());
00854 } else if (fSink != NULL) {
00855 fOutBuf->enqueueWord(fSink->SSRC());
00856 }
00857 }
00858
00859 void RTCPInstance::schedule(double nextTime) {
00860 fNextReportTime = nextTime;
00861
00862 double secondsToDelay = nextTime - dTimeNow();
00863 #ifdef DEBUG
00864 fprintf(stderr, "schedule(%f->%f)\n", secondsToDelay, nextTime);
00865 #endif
00866 int usToGo = (int)(secondsToDelay * 1000000);
00867 nextTask() = envir().taskScheduler().scheduleDelayedTask(usToGo,
00868 (TaskFunc*)RTCPInstance::onExpire, this);
00869 }
00870
00871 void RTCPInstance::reschedule(double nextTime) {
00872 envir().taskScheduler().unscheduleDelayedTask(nextTask());
00873 schedule(nextTime);
00874 }
00875
00876 void RTCPInstance::onExpire1() {
00877
00878 double rtcpBW = 0.05*fTotSessionBW*1024/8;
00879
00880 OnExpire(this,
00881 numMembers(),
00882 (fSink != NULL) ? 1 : 0,
00883 rtcpBW,
00884 (fSink != NULL) ? 1 : 0,
00885 &fAveRTCPSize,
00886 &fIsInitial,
00887 dTimeNow(),
00888 &fPrevReportTime,
00889 &fPrevNumMembers
00890 );
00891 }
00892
00894
00895 SDESItem::SDESItem(unsigned char tag, unsigned char const* value) {
00896 unsigned length = strlen((char const*)value);
00897 if (length > 511) length = 511;
00898
00899 fData[0] = tag;
00900 fData[1] = (unsigned char)length;
00901 memmove(&fData[2], value, length);
00902
00903
00904 while ((length)%4 > 0) fData[2 + length++] = '\0';
00905 }
00906
00907 unsigned SDESItem::totalSize() const {
00908 return 2 + (unsigned)fData[1];
00909 }
00910
00911
00913
00914 extern "C" void Schedule(double nextTime, event e) {
00915 RTCPInstance* instance = (RTCPInstance*)e;
00916 if (instance == NULL) return;
00917
00918 instance->schedule(nextTime);
00919 }
00920
00921 extern "C" void Reschedule(double nextTime, event e) {
00922 RTCPInstance* instance = (RTCPInstance*)e;
00923 if (instance == NULL) return;
00924
00925 instance->reschedule(nextTime);
00926 }
00927
00928 extern "C" void SendRTCPReport(event e) {
00929 RTCPInstance* instance = (RTCPInstance*)e;
00930 if (instance == NULL) return;
00931
00932 instance->sendReport();
00933 }
00934