00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "MultiFramedRTPSource.hh"
00023 #include "GroupsockHelper.hh"
00024 #include <string.h>
00025
00027
00028 class ReorderingPacketBuffer {
00029 public:
00030 ReorderingPacketBuffer(BufferedPacketFactory* packetFactory);
00031 virtual ~ReorderingPacketBuffer();
00032 void reset();
00033
00034 BufferedPacket* getFreePacket(MultiFramedRTPSource* ourSource);
00035 Boolean storePacket(BufferedPacket* bPacket);
00036 BufferedPacket* getNextCompletedPacket(Boolean& packetLossPreceded);
00037 void releaseUsedPacket(BufferedPacket* packet);
00038 void freePacket(BufferedPacket* packet) {
00039 if (packet != fSavedPacket) {
00040 delete packet;
00041 } else {
00042 fSavedPacketFree = True;
00043 }
00044 }
00045 Boolean isEmpty() const { return fHeadPacket == NULL; }
00046
00047 void setThresholdTime(unsigned uSeconds) { fThresholdTime = uSeconds; }
00048 void resetHaveSeenFirstPacket() { fHaveSeenFirstPacket = False; }
00049
00050 private:
00051 BufferedPacketFactory* fPacketFactory;
00052 unsigned fThresholdTime;
00053 Boolean fHaveSeenFirstPacket;
00054 unsigned short fNextExpectedSeqNo;
00055 BufferedPacket* fHeadPacket;
00056 BufferedPacket* fTailPacket;
00057 BufferedPacket* fSavedPacket;
00058
00059 Boolean fSavedPacketFree;
00060 };
00061
00062
00064
00065 MultiFramedRTPSource
00066 ::MultiFramedRTPSource(UsageEnvironment& env, Groupsock* RTPgs,
00067 unsigned char rtpPayloadFormat,
00068 unsigned rtpTimestampFrequency,
00069 BufferedPacketFactory* packetFactory)
00070 : RTPSource(env, RTPgs, rtpPayloadFormat, rtpTimestampFrequency) {
00071 reset();
00072 fReorderingBuffer = new ReorderingPacketBuffer(packetFactory);
00073
00074
00075 increaseReceiveBufferTo(env, RTPgs->socketNum(), 50*1024);
00076 }
00077
00078 void MultiFramedRTPSource::reset() {
00079 fCurrentPacketBeginsFrame = True;
00080 fCurrentPacketCompletesFrame = True;
00081 fAreDoingNetworkReads = False;
00082 fPacketReadInProgress = NULL;
00083 fNeedDelivery = False;
00084 fPacketLossInFragmentedFrame = False;
00085 }
00086
00087 MultiFramedRTPSource::~MultiFramedRTPSource() {
00088 fRTPInterface.stopNetworkReading();
00089 delete fReorderingBuffer;
00090 }
00091
00092 Boolean MultiFramedRTPSource
00093 ::processSpecialHeader(BufferedPacket* ,
00094 unsigned& resultSpecialHeaderSize) {
00095
00096 resultSpecialHeaderSize = 0;
00097 return True;
00098 }
00099
00100 Boolean MultiFramedRTPSource
00101 ::packetIsUsableInJitterCalculation(unsigned char* ,
00102 unsigned ) {
00103
00104 return True;
00105 }
00106
00107 void MultiFramedRTPSource::doStopGettingFrames() {
00108 envir().taskScheduler().unscheduleDelayedTask(nextTask());
00109 fRTPInterface.stopNetworkReading();
00110 fReorderingBuffer->reset();
00111 reset();
00112 }
00113
00114 void MultiFramedRTPSource::doGetNextFrame() {
00115 if (!fAreDoingNetworkReads) {
00116
00117 fAreDoingNetworkReads = True;
00118 TaskScheduler::BackgroundHandlerProc* handler
00119 = (TaskScheduler::BackgroundHandlerProc*)&networkReadHandler;
00120 fRTPInterface.startNetworkReading(handler);
00121 }
00122
00123 fSavedTo = fTo;
00124 fSavedMaxSize = fMaxSize;
00125 fFrameSize = 0;
00126 fNeedDelivery = True;
00127 doGetNextFrame1();
00128 }
00129
00130 void MultiFramedRTPSource::doGetNextFrame1() {
00131 while (fNeedDelivery) {
00132
00133 Boolean packetLossPrecededThis;
00134 BufferedPacket* nextPacket
00135 = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);
00136 if (nextPacket == NULL) break;
00137
00138 fNeedDelivery = False;
00139
00140 if (nextPacket->useCount() == 0) {
00141
00142
00143 unsigned specialHeaderSize;
00144 if (!processSpecialHeader(nextPacket, specialHeaderSize)) {
00145
00146 fReorderingBuffer->releaseUsedPacket(nextPacket);
00147 fNeedDelivery = True;
00148 break;
00149 }
00150 nextPacket->skip(specialHeaderSize);
00151 }
00152
00153
00154
00155 if (fCurrentPacketBeginsFrame) {
00156 if (packetLossPrecededThis || fPacketLossInFragmentedFrame) {
00157
00158
00159 fTo = fSavedTo; fMaxSize = fSavedMaxSize;
00160 fFrameSize = 0;
00161 }
00162 fPacketLossInFragmentedFrame = False;
00163 } else if (packetLossPrecededThis) {
00164
00165 fPacketLossInFragmentedFrame = True;
00166 }
00167 if (fPacketLossInFragmentedFrame) {
00168
00169 fReorderingBuffer->releaseUsedPacket(nextPacket);
00170 fNeedDelivery = True;
00171 break;
00172 }
00173
00174
00175 unsigned frameSize;
00176 nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,
00177 fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,
00178 fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,
00179 fCurPacketMarkerBit);
00180 fFrameSize += frameSize;
00181
00182 if (!nextPacket->hasUsableData()) {
00183
00184 fReorderingBuffer->releaseUsedPacket(nextPacket);
00185 }
00186
00187 if (fCurrentPacketCompletesFrame) {
00188
00189 if (fNumTruncatedBytes > 0) {
00190 envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size ("
00191 << fSavedMaxSize << "). "
00192 << fNumTruncatedBytes << " bytes of trailing data will be dropped!\n";
00193 }
00194
00195 if (fReorderingBuffer->isEmpty()) {
00196
00197
00198
00199 afterGetting(this);
00200 } else {
00201
00202 nextTask() = envir().taskScheduler().scheduleDelayedTask(0,
00203 (TaskFunc*)FramedSource::afterGetting, this);
00204 }
00205 } else {
00206
00207
00208 fTo += frameSize; fMaxSize -= frameSize;
00209 fNeedDelivery = True;
00210 }
00211 }
00212 }
00213
00214 void MultiFramedRTPSource
00215 ::setPacketReorderingThresholdTime(unsigned uSeconds) {
00216 fReorderingBuffer->setThresholdTime(uSeconds);
00217 }
00218
00219 #define ADVANCE(n) do { bPacket->skip(n); } while (0)
00220
00221 void MultiFramedRTPSource::networkReadHandler(MultiFramedRTPSource* source, int ) {
00222 source->networkReadHandler1();
00223 }
00224
00225 void MultiFramedRTPSource::networkReadHandler1() {
00226 BufferedPacket* bPacket = fPacketReadInProgress;
00227 if (bPacket == NULL) {
00228
00229 bPacket = fReorderingBuffer->getFreePacket(this);
00230 }
00231
00232
00233 Boolean readSuccess = False;
00234 do {
00235 Boolean packetReadWasIncomplete = fPacketReadInProgress != NULL;
00236 if (!bPacket->fillInData(fRTPInterface, packetReadWasIncomplete)) {
00237 if (bPacket->bytesAvailable() == 0) {
00238 envir() << "MultiFramedRTPSource error: Hit limit when reading incoming packet over TCP. Increase \"MAX_PACKET_SIZE\"\n";
00239 }
00240 fPacketReadInProgress = NULL;
00241 break;
00242 }
00243 if (packetReadWasIncomplete) {
00244
00245 fPacketReadInProgress = bPacket;
00246 return;
00247 } else {
00248 fPacketReadInProgress = NULL;
00249 }
00250 #ifdef TEST_LOSS
00251 setPacketReorderingThresholdTime(0);
00252
00253 if ((our_random()%10) == 0) break;
00254 #endif
00255
00256
00257 if (bPacket->dataSize() < 12) break;
00258 unsigned rtpHdr = ntohl(*(u_int32_t*)(bPacket->data())); ADVANCE(4);
00259 Boolean rtpMarkerBit = (rtpHdr&0x00800000) != 0;
00260 unsigned rtpTimestamp = ntohl(*(u_int32_t*)(bPacket->data()));ADVANCE(4);
00261 unsigned rtpSSRC = ntohl(*(u_int32_t*)(bPacket->data())); ADVANCE(4);
00262
00263
00264 if ((rtpHdr&0xC0000000) != 0x80000000) break;
00265
00266
00267 unsigned cc = (rtpHdr>>24)&0xF;
00268 if (bPacket->dataSize() < cc) break;
00269 ADVANCE(cc*4);
00270
00271
00272 if (rtpHdr&0x10000000) {
00273 if (bPacket->dataSize() < 4) break;
00274 unsigned extHdr = ntohl(*(u_int32_t*)(bPacket->data())); ADVANCE(4);
00275 unsigned remExtSize = 4*(extHdr&0xFFFF);
00276 if (bPacket->dataSize() < remExtSize) break;
00277 ADVANCE(remExtSize);
00278 }
00279
00280
00281 if (rtpHdr&0x20000000) {
00282 if (bPacket->dataSize() == 0) break;
00283 unsigned numPaddingBytes
00284 = (unsigned)(bPacket->data())[bPacket->dataSize()-1];
00285 if (bPacket->dataSize() < numPaddingBytes) break;
00286 bPacket->removePadding(numPaddingBytes);
00287 }
00288
00289 if ((unsigned char)((rtpHdr&0x007F0000)>>16)
00290 != rtpPayloadFormat()) {
00291 break;
00292 }
00293
00294
00295 if (rtpSSRC != fLastReceivedSSRC) {
00296
00297
00298 fLastReceivedSSRC = rtpSSRC;
00299 fReorderingBuffer->resetHaveSeenFirstPacket();
00300 }
00301 unsigned short rtpSeqNo = (unsigned short)(rtpHdr&0xFFFF);
00302 Boolean usableInJitterCalculation
00303 = packetIsUsableInJitterCalculation((bPacket->data()),
00304 bPacket->dataSize());
00305 struct timeval presentationTime;
00306 Boolean hasBeenSyncedUsingRTCP;
00307 receptionStatsDB()
00308 .noteIncomingPacket(rtpSSRC, rtpSeqNo, rtpTimestamp,
00309 timestampFrequency(),
00310 usableInJitterCalculation, presentationTime,
00311 hasBeenSyncedUsingRTCP, bPacket->dataSize());
00312
00313
00314 struct timeval timeNow;
00315 gettimeofday(&timeNow, NULL);
00316 bPacket->assignMiscParams(rtpSeqNo, rtpTimestamp, presentationTime,
00317 hasBeenSyncedUsingRTCP, rtpMarkerBit,
00318 timeNow);
00319 if (!fReorderingBuffer->storePacket(bPacket)) break;
00320
00321 readSuccess = True;
00322 } while (0);
00323 if (!readSuccess) fReorderingBuffer->freePacket(bPacket);
00324
00325 doGetNextFrame1();
00326
00327 }
00328
00329
00331
00332 #define MAX_PACKET_SIZE 20000
00333
00334 BufferedPacket::BufferedPacket()
00335 : fPacketSize(MAX_PACKET_SIZE),
00336 fBuf(new unsigned char[MAX_PACKET_SIZE]),
00337 fNextPacket(NULL) {
00338 }
00339
00340 BufferedPacket::~BufferedPacket() {
00341 delete fNextPacket;
00342 delete[] fBuf;
00343 }
00344
00345 void BufferedPacket::reset() {
00346 fHead = fTail = 0;
00347 fUseCount = 0;
00348 fIsFirstPacket = False;
00349 }
00350
00351
00352 unsigned BufferedPacket
00353 ::nextEnclosedFrameSize(unsigned char*& , unsigned dataSize) {
00354
00355
00356
00357 return dataSize;
00358 }
00359
00360 void BufferedPacket
00361 ::getNextEnclosedFrameParameters(unsigned char*& framePtr, unsigned dataSize,
00362 unsigned& frameSize,
00363 unsigned& frameDurationInMicroseconds) {
00364
00365
00366
00367
00368
00369
00370 frameSize = nextEnclosedFrameSize(framePtr, dataSize);
00371
00372 frameDurationInMicroseconds = 0;
00373 }
00374
00375 Boolean BufferedPacket::fillInData(RTPInterface& rtpInterface, Boolean& packetReadWasIncomplete) {
00376 if (!packetReadWasIncomplete) reset();
00377
00378 unsigned numBytesRead;
00379 struct sockaddr_in fromAddress;
00380 unsigned const maxBytesToRead = bytesAvailable();
00381 if (maxBytesToRead == 0) return False;
00382 if (!rtpInterface.handleRead(&fBuf[fTail], maxBytesToRead, numBytesRead, fromAddress, packetReadWasIncomplete)) {
00383 return False;
00384 }
00385 fTail += numBytesRead;
00386 return True;
00387 }
00388
00389 void BufferedPacket
00390 ::assignMiscParams(unsigned short rtpSeqNo, unsigned rtpTimestamp,
00391 struct timeval presentationTime,
00392 Boolean hasBeenSyncedUsingRTCP, Boolean rtpMarkerBit,
00393 struct timeval timeReceived) {
00394 fRTPSeqNo = rtpSeqNo;
00395 fRTPTimestamp = rtpTimestamp;
00396 fPresentationTime = presentationTime;
00397 fHasBeenSyncedUsingRTCP = hasBeenSyncedUsingRTCP;
00398 fRTPMarkerBit = rtpMarkerBit;
00399 fTimeReceived = timeReceived;
00400 }
00401
00402 void BufferedPacket::skip(unsigned numBytes) {
00403 fHead += numBytes;
00404 if (fHead > fTail) fHead = fTail;
00405 }
00406
00407 void BufferedPacket::removePadding(unsigned numBytes) {
00408 if (numBytes > fTail-fHead) numBytes = fTail-fHead;
00409 fTail -= numBytes;
00410 }
00411
00412 void BufferedPacket::appendData(unsigned char* newData, unsigned numBytes) {
00413 if (numBytes > fPacketSize-fTail) numBytes = fPacketSize - fTail;
00414 memmove(&fBuf[fTail], newData, numBytes);
00415 fTail += numBytes;
00416 }
00417
00418 void BufferedPacket::use(unsigned char* to, unsigned toSize,
00419 unsigned& bytesUsed, unsigned& bytesTruncated,
00420 unsigned short& rtpSeqNo, unsigned& rtpTimestamp,
00421 struct timeval& presentationTime,
00422 Boolean& hasBeenSyncedUsingRTCP,
00423 Boolean& rtpMarkerBit) {
00424 unsigned char* origFramePtr = &fBuf[fHead];
00425 unsigned char* newFramePtr = origFramePtr;
00426 unsigned frameSize, frameDurationInMicroseconds;
00427 getNextEnclosedFrameParameters(newFramePtr, fTail - fHead,
00428 frameSize, frameDurationInMicroseconds);
00429 if (frameSize > toSize) {
00430 bytesTruncated += frameSize - toSize;
00431 bytesUsed = toSize;
00432 } else {
00433 bytesTruncated = 0;
00434 bytesUsed = frameSize;
00435 }
00436
00437 memmove(to, newFramePtr, bytesUsed);
00438 fHead += (newFramePtr - origFramePtr) + frameSize;
00439 ++fUseCount;
00440
00441 rtpSeqNo = fRTPSeqNo;
00442 rtpTimestamp = fRTPTimestamp;
00443 presentationTime = fPresentationTime;
00444 hasBeenSyncedUsingRTCP = fHasBeenSyncedUsingRTCP;
00445 rtpMarkerBit = fRTPMarkerBit;
00446
00447
00448 fPresentationTime.tv_usec += frameDurationInMicroseconds;
00449 if (fPresentationTime.tv_usec >= 1000000) {
00450 fPresentationTime.tv_sec += fPresentationTime.tv_usec/1000000;
00451 fPresentationTime.tv_usec = fPresentationTime.tv_usec%1000000;
00452 }
00453 }
00454
00455 BufferedPacketFactory::BufferedPacketFactory() {
00456 }
00457
00458 BufferedPacketFactory::~BufferedPacketFactory() {
00459 }
00460
00461 BufferedPacket* BufferedPacketFactory
00462 ::createNewPacket(MultiFramedRTPSource* ) {
00463 return new BufferedPacket;
00464 }
00465
00466
00468
00469 ReorderingPacketBuffer
00470 ::ReorderingPacketBuffer(BufferedPacketFactory* packetFactory)
00471 : fThresholdTime(100000) ,
00472 fHaveSeenFirstPacket(False), fHeadPacket(NULL), fTailPacket(NULL), fSavedPacket(NULL), fSavedPacketFree(True) {
00473 fPacketFactory = (packetFactory == NULL)
00474 ? (new BufferedPacketFactory)
00475 : packetFactory;
00476 }
00477
00478 ReorderingPacketBuffer::~ReorderingPacketBuffer() {
00479 reset();
00480 delete fPacketFactory;
00481 }
00482
00483 void ReorderingPacketBuffer::reset() {
00484 if (fSavedPacketFree) delete fSavedPacket;
00485 delete fHeadPacket;
00486 resetHaveSeenFirstPacket();
00487 fHeadPacket = fTailPacket = fSavedPacket = NULL;
00488 }
00489
00490 BufferedPacket* ReorderingPacketBuffer::getFreePacket(MultiFramedRTPSource* ourSource) {
00491 if (fSavedPacket == NULL) {
00492 fSavedPacket = fPacketFactory->createNewPacket(ourSource);
00493 fSavedPacketFree = True;
00494 }
00495
00496 if (fSavedPacketFree == True) {
00497 fSavedPacketFree = False;
00498 return fSavedPacket;
00499 } else {
00500 return fPacketFactory->createNewPacket(ourSource);
00501 }
00502 }
00503
00504 Boolean ReorderingPacketBuffer::storePacket(BufferedPacket* bPacket) {
00505 unsigned short rtpSeqNo = bPacket->rtpSeqNo();
00506
00507 if (!fHaveSeenFirstPacket) {
00508 fNextExpectedSeqNo = rtpSeqNo;
00509 bPacket->isFirstPacket() = True;
00510 fHaveSeenFirstPacket = True;
00511 }
00512
00513
00514
00515 if (seqNumLT(rtpSeqNo, fNextExpectedSeqNo)) return False;
00516
00517 if (fTailPacket == NULL) {
00518
00519 bPacket->nextPacket() = NULL;
00520 fHeadPacket = fTailPacket = bPacket;
00521 return True;
00522 }
00523
00524 if (seqNumLT(fTailPacket->rtpSeqNo(), rtpSeqNo)) {
00525
00526 bPacket->nextPacket() = NULL;
00527 fTailPacket->nextPacket() = bPacket;
00528 fTailPacket = bPacket;
00529 return True;
00530 }
00531
00532 if (rtpSeqNo == fTailPacket->rtpSeqNo()) {
00533
00534 return False;
00535 }
00536
00537
00538 BufferedPacket* beforePtr = NULL;
00539 BufferedPacket* afterPtr = fHeadPacket;
00540 while (afterPtr != NULL) {
00541 if (seqNumLT(rtpSeqNo, afterPtr->rtpSeqNo())) break;
00542 if (rtpSeqNo == afterPtr->rtpSeqNo()) {
00543
00544 return False;
00545 }
00546
00547 beforePtr = afterPtr;
00548 afterPtr = afterPtr->nextPacket();
00549 }
00550
00551
00552 bPacket->nextPacket() = afterPtr;
00553 if (beforePtr == NULL) {
00554 fHeadPacket = bPacket;
00555 } else {
00556 beforePtr->nextPacket() = bPacket;
00557 }
00558
00559 return True;
00560 }
00561
00562 void ReorderingPacketBuffer::releaseUsedPacket(BufferedPacket* packet) {
00563
00564
00565 ++fNextExpectedSeqNo;
00566
00567 fHeadPacket = fHeadPacket->nextPacket();
00568 if (!fHeadPacket) {
00569 fTailPacket = NULL;
00570 }
00571 packet->nextPacket() = NULL;
00572
00573 freePacket(packet);
00574 }
00575
00576 BufferedPacket* ReorderingPacketBuffer
00577 ::getNextCompletedPacket(Boolean& packetLossPreceded) {
00578 if (fHeadPacket == NULL) return NULL;
00579
00580
00581
00582
00583 if (fHeadPacket->rtpSeqNo() == fNextExpectedSeqNo) {
00584 packetLossPreceded = fHeadPacket->isFirstPacket();
00585
00586 return fHeadPacket;
00587 }
00588
00589
00590
00591
00592 Boolean timeThresholdHasBeenExceeded;
00593 if (fThresholdTime == 0) {
00594 timeThresholdHasBeenExceeded = True;
00595 } else {
00596 struct timeval timeNow;
00597 gettimeofday(&timeNow, NULL);
00598 unsigned uSecondsSinceReceived
00599 = (timeNow.tv_sec - fHeadPacket->timeReceived().tv_sec)*1000000
00600 + (timeNow.tv_usec - fHeadPacket->timeReceived().tv_usec);
00601 timeThresholdHasBeenExceeded = uSecondsSinceReceived > fThresholdTime;
00602 }
00603 if (timeThresholdHasBeenExceeded) {
00604 fNextExpectedSeqNo = fHeadPacket->rtpSeqNo();
00605
00606 packetLossPreceded = True;
00607 return fHeadPacket;
00608 }
00609
00610
00611 return NULL;
00612 }