00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "MultiFramedRTPSource.hh"
00023 #include "GroupsockHelper.hh"
00024 #include <string.h>
00025
00027
00028 class ReorderingPacketBuffer {
00029 public:
00030 ReorderingPacketBuffer(BufferedPacketFactory* packetFactory);
00031 virtual ~ReorderingPacketBuffer();
00032 void reset();
00033
00034 BufferedPacket* getFreePacket(MultiFramedRTPSource* ourSource);
00035 Boolean storePacket(BufferedPacket* bPacket);
00036 BufferedPacket* getNextCompletedPacket(Boolean& packetLossPreceded);
00037 void releaseUsedPacket(BufferedPacket* packet);
00038 void freePacket(BufferedPacket* packet) {
00039 if (packet != fSavedPacket) {
00040 delete packet;
00041 } else {
00042 fSavedPacketFree = True;
00043 }
00044 }
00045 Boolean isEmpty() const { return fHeadPacket == NULL; }
00046
00047 void setThresholdTime(unsigned uSeconds) { fThresholdTime = uSeconds; }
00048 void resetHaveSeenFirstPacket() { fHaveSeenFirstPacket = False; }
00049
00050 private:
00051 BufferedPacketFactory* fPacketFactory;
00052 unsigned fThresholdTime;
00053 Boolean fHaveSeenFirstPacket;
00054 unsigned short fNextExpectedSeqNo;
00055 BufferedPacket* fHeadPacket;
00056 BufferedPacket* fTailPacket;
00057 BufferedPacket* fSavedPacket;
00058
00059 Boolean fSavedPacketFree;
00060 };
00061
00062
00064
00065 MultiFramedRTPSource
00066 ::MultiFramedRTPSource(UsageEnvironment& env, Groupsock* RTPgs,
00067 unsigned char rtpPayloadFormat,
00068 unsigned rtpTimestampFrequency,
00069 BufferedPacketFactory* packetFactory)
00070 : RTPSource(env, RTPgs, rtpPayloadFormat, rtpTimestampFrequency) {
00071 reset();
00072 fReorderingBuffer = new ReorderingPacketBuffer(packetFactory);
00073
00074
00075 increaseReceiveBufferTo(env, RTPgs->socketNum(), 50*1024);
00076 }
00077
00078 void MultiFramedRTPSource::reset() {
00079 fCurrentPacketBeginsFrame = True;
00080 fCurrentPacketCompletesFrame = True;
00081 fAreDoingNetworkReads = False;
00082 fPacketReadInProgress = NULL;
00083 fNeedDelivery = False;
00084 fPacketLossInFragmentedFrame = False;
00085 }
00086
00087 MultiFramedRTPSource::~MultiFramedRTPSource() {
00088 fRTPInterface.stopNetworkReading();
00089 delete fReorderingBuffer;
00090 }
00091
00092 Boolean MultiFramedRTPSource
00093 ::processSpecialHeader(BufferedPacket* ,
00094 unsigned& resultSpecialHeaderSize) {
00095
00096 resultSpecialHeaderSize = 0;
00097 return True;
00098 }
00099
00100 Boolean MultiFramedRTPSource
00101 ::packetIsUsableInJitterCalculation(unsigned char* ,
00102 unsigned ) {
00103
00104 return True;
00105 }
00106
00107 void MultiFramedRTPSource::doStopGettingFrames() {
00108 fRTPInterface.stopNetworkReading();
00109 fReorderingBuffer->reset();
00110 reset();
00111 }
00112
00113 void MultiFramedRTPSource::doGetNextFrame() {
00114 if (!fAreDoingNetworkReads) {
00115
00116 fAreDoingNetworkReads = True;
00117 TaskScheduler::BackgroundHandlerProc* handler
00118 = (TaskScheduler::BackgroundHandlerProc*)&networkReadHandler;
00119 fRTPInterface.startNetworkReading(handler);
00120 }
00121
00122 fSavedTo = fTo;
00123 fSavedMaxSize = fMaxSize;
00124 fFrameSize = 0;
00125 fNeedDelivery = True;
00126 doGetNextFrame1();
00127 }
00128
00129 void MultiFramedRTPSource::doGetNextFrame1() {
00130 while (fNeedDelivery) {
00131
00132 Boolean packetLossPrecededThis;
00133 BufferedPacket* nextPacket
00134 = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);
00135 if (nextPacket == NULL) break;
00136
00137 fNeedDelivery = False;
00138
00139 if (nextPacket->useCount() == 0) {
00140
00141
00142 unsigned specialHeaderSize;
00143 if (!processSpecialHeader(nextPacket, specialHeaderSize)) {
00144
00145 fReorderingBuffer->releaseUsedPacket(nextPacket);
00146 fNeedDelivery = True;
00147 break;
00148 }
00149 nextPacket->skip(specialHeaderSize);
00150 }
00151
00152
00153
00154 if (fCurrentPacketBeginsFrame) {
00155 if (packetLossPrecededThis || fPacketLossInFragmentedFrame) {
00156
00157
00158 fTo = fSavedTo; fMaxSize = fSavedMaxSize;
00159 fFrameSize = 0;
00160 }
00161 fPacketLossInFragmentedFrame = False;
00162 } else if (packetLossPrecededThis) {
00163
00164 fPacketLossInFragmentedFrame = True;
00165 }
00166 if (fPacketLossInFragmentedFrame) {
00167
00168 fReorderingBuffer->releaseUsedPacket(nextPacket);
00169 fNeedDelivery = True;
00170 break;
00171 }
00172
00173
00174 unsigned frameSize;
00175 nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,
00176 fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,
00177 fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,
00178 fCurPacketMarkerBit);
00179 fFrameSize += frameSize;
00180
00181 if (!nextPacket->hasUsableData()) {
00182
00183 fReorderingBuffer->releaseUsedPacket(nextPacket);
00184 }
00185
00186 if (fCurrentPacketCompletesFrame) {
00187
00188 if (fNumTruncatedBytes > 0) {
00189 envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size ("
00190 << fSavedMaxSize << "). "
00191 << fNumTruncatedBytes << " bytes of trailing data will be dropped!\n";
00192 }
00193
00194 if (fReorderingBuffer->isEmpty()) {
00195
00196
00197
00198 afterGetting(this);
00199 } else {
00200
00201 nextTask() = envir().taskScheduler().scheduleDelayedTask(0,
00202 (TaskFunc*)FramedSource::afterGetting, this);
00203 }
00204 } else {
00205
00206
00207 fTo += frameSize; fMaxSize -= frameSize;
00208 fNeedDelivery = True;
00209 }
00210 }
00211 }
00212
00213 void MultiFramedRTPSource
00214 ::setPacketReorderingThresholdTime(unsigned uSeconds) {
00215 fReorderingBuffer->setThresholdTime(uSeconds);
00216 }
00217
00218 #define ADVANCE(n) do { bPacket->skip(n); } while (0)
00219
00220 void MultiFramedRTPSource::networkReadHandler(MultiFramedRTPSource* source, int ) {
00221 source->networkReadHandler1();
00222 }
00223
00224 void MultiFramedRTPSource::networkReadHandler1() {
00225 BufferedPacket* bPacket = fPacketReadInProgress;
00226 if (bPacket == NULL) {
00227
00228 bPacket = fReorderingBuffer->getFreePacket(this);
00229 }
00230
00231
00232 Boolean readSuccess = False;
00233 do {
00234 Boolean packetReadWasIncomplete = fPacketReadInProgress != NULL;
00235 if (!bPacket->fillInData(fRTPInterface, packetReadWasIncomplete)) break;
00236 if (packetReadWasIncomplete) {
00237
00238 fPacketReadInProgress = bPacket;
00239 return;
00240 } else {
00241 fPacketReadInProgress = NULL;
00242 }
00243 #ifdef TEST_LOSS
00244 setPacketReorderingThresholdTime(0);
00245
00246 if ((our_random()%10) == 0) break;
00247 #endif
00248
00249
00250 if (bPacket->dataSize() < 12) break;
00251 unsigned rtpHdr = ntohl(*(u_int32_t*)(bPacket->data())); ADVANCE(4);
00252 Boolean rtpMarkerBit = (rtpHdr&0x00800000) != 0;
00253 unsigned rtpTimestamp = ntohl(*(u_int32_t*)(bPacket->data()));ADVANCE(4);
00254 unsigned rtpSSRC = ntohl(*(u_int32_t*)(bPacket->data())); ADVANCE(4);
00255
00256
00257 if ((rtpHdr&0xC0000000) != 0x80000000) break;
00258
00259
00260 unsigned cc = (rtpHdr>>24)&0xF;
00261 if (bPacket->dataSize() < cc) break;
00262 ADVANCE(cc*4);
00263
00264
00265 if (rtpHdr&0x10000000) {
00266 if (bPacket->dataSize() < 4) break;
00267 unsigned extHdr = ntohl(*(u_int32_t*)(bPacket->data())); ADVANCE(4);
00268 unsigned remExtSize = 4*(extHdr&0xFFFF);
00269 if (bPacket->dataSize() < remExtSize) break;
00270 ADVANCE(remExtSize);
00271 }
00272
00273
00274 if (rtpHdr&0x20000000) {
00275 if (bPacket->dataSize() == 0) break;
00276 unsigned numPaddingBytes
00277 = (unsigned)(bPacket->data())[bPacket->dataSize()-1];
00278 if (bPacket->dataSize() < numPaddingBytes) break;
00279 bPacket->removePadding(numPaddingBytes);
00280 }
00281
00282 if ((unsigned char)((rtpHdr&0x007F0000)>>16)
00283 != rtpPayloadFormat()) {
00284 break;
00285 }
00286
00287
00288 if (rtpSSRC != fLastReceivedSSRC) {
00289
00290
00291 fLastReceivedSSRC = rtpSSRC;
00292 fReorderingBuffer->resetHaveSeenFirstPacket();
00293 }
00294 unsigned short rtpSeqNo = (unsigned short)(rtpHdr&0xFFFF);
00295 Boolean usableInJitterCalculation
00296 = packetIsUsableInJitterCalculation((bPacket->data()),
00297 bPacket->dataSize());
00298 struct timeval presentationTime;
00299 Boolean hasBeenSyncedUsingRTCP;
00300 receptionStatsDB()
00301 .noteIncomingPacket(rtpSSRC, rtpSeqNo, rtpTimestamp,
00302 timestampFrequency(),
00303 usableInJitterCalculation, presentationTime,
00304 hasBeenSyncedUsingRTCP, bPacket->dataSize());
00305
00306
00307 struct timeval timeNow;
00308 gettimeofday(&timeNow, NULL);
00309 bPacket->assignMiscParams(rtpSeqNo, rtpTimestamp, presentationTime,
00310 hasBeenSyncedUsingRTCP, rtpMarkerBit,
00311 timeNow);
00312 if (!fReorderingBuffer->storePacket(bPacket)) break;
00313
00314 readSuccess = True;
00315 } while (0);
00316 if (!readSuccess) fReorderingBuffer->freePacket(bPacket);
00317
00318 doGetNextFrame1();
00319
00320 }
00321
00322
00324
00325 #define MAX_PACKET_SIZE 10000
00326
00327 BufferedPacket::BufferedPacket()
00328 : fPacketSize(MAX_PACKET_SIZE),
00329 fBuf(new unsigned char[MAX_PACKET_SIZE]),
00330 fNextPacket(NULL) {
00331 }
00332
00333 BufferedPacket::~BufferedPacket() {
00334 delete fNextPacket;
00335 delete[] fBuf;
00336 }
00337
00338 void BufferedPacket::reset() {
00339 fHead = fTail = 0;
00340 fUseCount = 0;
00341 fIsFirstPacket = False;
00342 }
00343
00344
00345 unsigned BufferedPacket
00346 ::nextEnclosedFrameSize(unsigned char*& , unsigned dataSize) {
00347
00348
00349
00350 return dataSize;
00351 }
00352
00353 void BufferedPacket
00354 ::getNextEnclosedFrameParameters(unsigned char*& framePtr, unsigned dataSize,
00355 unsigned& frameSize,
00356 unsigned& frameDurationInMicroseconds) {
00357
00358
00359
00360
00361
00362
00363 frameSize = nextEnclosedFrameSize(framePtr, dataSize);
00364
00365 frameDurationInMicroseconds = 0;
00366 }
00367
00368 Boolean BufferedPacket::fillInData(RTPInterface& rtpInterface, Boolean& packetReadWasIncomplete) {
00369 if (!packetReadWasIncomplete) reset();
00370
00371 unsigned numBytesRead;
00372 struct sockaddr_in fromAddress;
00373 if (!rtpInterface.handleRead(&fBuf[fTail], fPacketSize-fTail, numBytesRead, fromAddress, packetReadWasIncomplete)) {
00374 return False;
00375 }
00376 fTail += numBytesRead;
00377 return True;
00378 }
00379
00380 void BufferedPacket
00381 ::assignMiscParams(unsigned short rtpSeqNo, unsigned rtpTimestamp,
00382 struct timeval presentationTime,
00383 Boolean hasBeenSyncedUsingRTCP, Boolean rtpMarkerBit,
00384 struct timeval timeReceived) {
00385 fRTPSeqNo = rtpSeqNo;
00386 fRTPTimestamp = rtpTimestamp;
00387 fPresentationTime = presentationTime;
00388 fHasBeenSyncedUsingRTCP = hasBeenSyncedUsingRTCP;
00389 fRTPMarkerBit = rtpMarkerBit;
00390 fTimeReceived = timeReceived;
00391 }
00392
00393 void BufferedPacket::skip(unsigned numBytes) {
00394 fHead += numBytes;
00395 if (fHead > fTail) fHead = fTail;
00396 }
00397
00398 void BufferedPacket::removePadding(unsigned numBytes) {
00399 if (numBytes > fTail-fHead) numBytes = fTail-fHead;
00400 fTail -= numBytes;
00401 }
00402
00403 void BufferedPacket::appendData(unsigned char* newData, unsigned numBytes) {
00404 if (numBytes > fPacketSize-fTail) numBytes = fPacketSize - fTail;
00405 memmove(&fBuf[fTail], newData, numBytes);
00406 fTail += numBytes;
00407 }
00408
00409 void BufferedPacket::use(unsigned char* to, unsigned toSize,
00410 unsigned& bytesUsed, unsigned& bytesTruncated,
00411 unsigned short& rtpSeqNo, unsigned& rtpTimestamp,
00412 struct timeval& presentationTime,
00413 Boolean& hasBeenSyncedUsingRTCP,
00414 Boolean& rtpMarkerBit) {
00415 unsigned char* origFramePtr = &fBuf[fHead];
00416 unsigned char* newFramePtr = origFramePtr;
00417 unsigned frameSize, frameDurationInMicroseconds;
00418 getNextEnclosedFrameParameters(newFramePtr, fTail - fHead,
00419 frameSize, frameDurationInMicroseconds);
00420 if (frameSize > toSize) {
00421 bytesTruncated += frameSize - toSize;
00422 bytesUsed = toSize;
00423 } else {
00424 bytesTruncated = 0;
00425 bytesUsed = frameSize;
00426 }
00427
00428 memmove(to, newFramePtr, bytesUsed);
00429 fHead += (newFramePtr - origFramePtr) + frameSize;
00430 ++fUseCount;
00431
00432 rtpSeqNo = fRTPSeqNo;
00433 rtpTimestamp = fRTPTimestamp;
00434 presentationTime = fPresentationTime;
00435 hasBeenSyncedUsingRTCP = fHasBeenSyncedUsingRTCP;
00436 rtpMarkerBit = fRTPMarkerBit;
00437
00438
00439 fPresentationTime.tv_usec += frameDurationInMicroseconds;
00440 if (fPresentationTime.tv_usec >= 1000000) {
00441 fPresentationTime.tv_sec += fPresentationTime.tv_usec/1000000;
00442 fPresentationTime.tv_usec = fPresentationTime.tv_usec%1000000;
00443 }
00444 }
00445
00446 BufferedPacketFactory::BufferedPacketFactory() {
00447 }
00448
00449 BufferedPacketFactory::~BufferedPacketFactory() {
00450 }
00451
00452 BufferedPacket* BufferedPacketFactory
00453 ::createNewPacket(MultiFramedRTPSource* ) {
00454 return new BufferedPacket;
00455 }
00456
00457
00459
00460 ReorderingPacketBuffer
00461 ::ReorderingPacketBuffer(BufferedPacketFactory* packetFactory)
00462 : fThresholdTime(100000) ,
00463 fHaveSeenFirstPacket(False), fHeadPacket(NULL), fTailPacket(NULL), fSavedPacket(NULL), fSavedPacketFree(True) {
00464 fPacketFactory = (packetFactory == NULL)
00465 ? (new BufferedPacketFactory)
00466 : packetFactory;
00467 }
00468
00469 ReorderingPacketBuffer::~ReorderingPacketBuffer() {
00470 reset();
00471 delete fPacketFactory;
00472 }
00473
00474 void ReorderingPacketBuffer::reset() {
00475 if (fSavedPacketFree) delete fSavedPacket;
00476 delete fHeadPacket;
00477 resetHaveSeenFirstPacket();
00478 fHeadPacket = fTailPacket = fSavedPacket = NULL;
00479 }
00480
00481 BufferedPacket* ReorderingPacketBuffer::getFreePacket(MultiFramedRTPSource* ourSource) {
00482 if (fSavedPacket == NULL) {
00483 fSavedPacket = fPacketFactory->createNewPacket(ourSource);
00484 fSavedPacketFree = True;
00485 }
00486
00487 if (fSavedPacketFree == True) {
00488 fSavedPacketFree = False;
00489 return fSavedPacket;
00490 } else {
00491 return fPacketFactory->createNewPacket(ourSource);
00492 }
00493 }
00494
00495 Boolean ReorderingPacketBuffer::storePacket(BufferedPacket* bPacket) {
00496 unsigned short rtpSeqNo = bPacket->rtpSeqNo();
00497
00498 if (!fHaveSeenFirstPacket) {
00499 fNextExpectedSeqNo = rtpSeqNo;
00500 bPacket->isFirstPacket() = True;
00501 fHaveSeenFirstPacket = True;
00502 }
00503
00504
00505
00506 if (seqNumLT(rtpSeqNo, fNextExpectedSeqNo)) return False;
00507
00508 if (fTailPacket == NULL) {
00509
00510 bPacket->nextPacket() = NULL;
00511 fHeadPacket = fTailPacket = bPacket;
00512 return True;
00513 }
00514
00515 if (seqNumLT(fTailPacket->rtpSeqNo(), rtpSeqNo)) {
00516
00517 bPacket->nextPacket() = NULL;
00518 fTailPacket->nextPacket() = bPacket;
00519 fTailPacket = bPacket;
00520 return True;
00521 }
00522
00523 if (rtpSeqNo == fTailPacket->rtpSeqNo()) {
00524
00525 return False;
00526 }
00527
00528
00529 BufferedPacket* beforePtr = NULL;
00530 BufferedPacket* afterPtr = fHeadPacket;
00531 while (afterPtr != NULL) {
00532 if (seqNumLT(rtpSeqNo, afterPtr->rtpSeqNo())) break;
00533 if (rtpSeqNo == afterPtr->rtpSeqNo()) {
00534
00535 return False;
00536 }
00537
00538 beforePtr = afterPtr;
00539 afterPtr = afterPtr->nextPacket();
00540 }
00541
00542
00543 bPacket->nextPacket() = afterPtr;
00544 if (beforePtr == NULL) {
00545 fHeadPacket = bPacket;
00546 } else {
00547 beforePtr->nextPacket() = bPacket;
00548 }
00549
00550 return True;
00551 }
00552
00553 void ReorderingPacketBuffer::releaseUsedPacket(BufferedPacket* packet) {
00554
00555
00556 ++fNextExpectedSeqNo;
00557
00558 fHeadPacket = fHeadPacket->nextPacket();
00559 if (!fHeadPacket) {
00560 fTailPacket = NULL;
00561 }
00562 packet->nextPacket() = NULL;
00563
00564 freePacket(packet);
00565 }
00566
00567 BufferedPacket* ReorderingPacketBuffer
00568 ::getNextCompletedPacket(Boolean& packetLossPreceded) {
00569 if (fHeadPacket == NULL) return NULL;
00570
00571
00572
00573
00574 if (fHeadPacket->rtpSeqNo() == fNextExpectedSeqNo) {
00575 packetLossPreceded = fHeadPacket->isFirstPacket();
00576
00577 return fHeadPacket;
00578 }
00579
00580
00581
00582
00583 Boolean timeThresholdHasBeenExceeded;
00584 if (fThresholdTime == 0) {
00585 timeThresholdHasBeenExceeded = True;
00586 } else {
00587 struct timeval timeNow;
00588 gettimeofday(&timeNow, NULL);
00589 unsigned uSecondsSinceReceived
00590 = (timeNow.tv_sec - fHeadPacket->timeReceived().tv_sec)*1000000
00591 + (timeNow.tv_usec - fHeadPacket->timeReceived().tv_usec);
00592 timeThresholdHasBeenExceeded = uSecondsSinceReceived > fThresholdTime;
00593 }
00594 if (timeThresholdHasBeenExceeded) {
00595 fNextExpectedSeqNo = fHeadPacket->rtpSeqNo();
00596
00597 packetLossPreceded = True;
00598 return fHeadPacket;
00599 }
00600
00601
00602 return NULL;
00603 }