00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "MultiFramedRTPSource.hh"
00023 #include "GroupsockHelper.hh"
00024 #include <string.h>
00025
00027
00028 class ReorderingPacketBuffer {
00029 public:
00030 ReorderingPacketBuffer(BufferedPacketFactory* packetFactory);
00031 virtual ~ReorderingPacketBuffer();
00032 void reset();
00033
00034 BufferedPacket* getFreePacket(MultiFramedRTPSource* ourSource);
00035 Boolean storePacket(BufferedPacket* bPacket);
00036 BufferedPacket* getNextCompletedPacket(Boolean& packetLossPreceded);
00037 void releaseUsedPacket(BufferedPacket* packet);
00038 void freePacket(BufferedPacket* packet) {
00039 if (packet != fSavedPacket) {
00040 delete packet;
00041 } else {
00042 fSavedPacketFree = True;
00043 }
00044 }
00045 Boolean isEmpty() const { return fHeadPacket == NULL; }
00046
00047 void setThresholdTime(unsigned uSeconds) { fThresholdTime = uSeconds; }
00048
00049 private:
00050 BufferedPacketFactory* fPacketFactory;
00051 unsigned fThresholdTime;
00052 Boolean fHaveSeenFirstPacket;
00053 unsigned short fNextExpectedSeqNo;
00054 BufferedPacket* fHeadPacket;
00055 BufferedPacket* fSavedPacket;
00056
00057 Boolean fSavedPacketFree;
00058 };
00059
00060
00062
00063 MultiFramedRTPSource
00064 ::MultiFramedRTPSource(UsageEnvironment& env, Groupsock* RTPgs,
00065 unsigned char rtpPayloadFormat,
00066 unsigned rtpTimestampFrequency,
00067 BufferedPacketFactory* packetFactory)
00068 : RTPSource(env, RTPgs, rtpPayloadFormat, rtpTimestampFrequency) {
00069 reset();
00070 fReorderingBuffer = new ReorderingPacketBuffer(packetFactory);
00071
00072
00073 increaseReceiveBufferTo(env, RTPgs->socketNum(), 50*1024);
00074 }
00075
00076 void MultiFramedRTPSource::reset() {
00077 fCurrentPacketBeginsFrame = True;
00078 fCurrentPacketCompletesFrame = True;
00079 fAreDoingNetworkReads = False;
00080 fNeedDelivery = False;
00081 fPacketLossInFragmentedFrame = False;
00082 }
00083
00084 MultiFramedRTPSource::~MultiFramedRTPSource() {
00085 fRTPInterface.stopNetworkReading();
00086 delete fReorderingBuffer;
00087 }
00088
00089 Boolean MultiFramedRTPSource
00090 ::processSpecialHeader(BufferedPacket* ,
00091 unsigned& resultSpecialHeaderSize) {
00092
00093 resultSpecialHeaderSize = 0;
00094 return True;
00095 }
00096
00097 Boolean MultiFramedRTPSource
00098 ::packetIsUsableInJitterCalculation(unsigned char* ,
00099 unsigned ) {
00100
00101 return True;
00102 }
00103
00104 void MultiFramedRTPSource::doStopGettingFrames() {
00105 fRTPInterface.stopNetworkReading();
00106 fReorderingBuffer->reset();
00107 reset();
00108 }
00109
00110 void MultiFramedRTPSource::doGetNextFrame() {
00111 if (!fAreDoingNetworkReads) {
00112
00113 fAreDoingNetworkReads = True;
00114 TaskScheduler::BackgroundHandlerProc* handler
00115 = (TaskScheduler::BackgroundHandlerProc*)&networkReadHandler;
00116 fRTPInterface.startNetworkReading(handler);
00117 }
00118
00119 fSavedTo = fTo;
00120 fSavedMaxSize = fMaxSize;
00121 fFrameSize = 0;
00122 fNeedDelivery = True;
00123 doGetNextFrame1();
00124 }
00125
00126 void MultiFramedRTPSource::doGetNextFrame1() {
00127 while (fNeedDelivery) {
00128
00129 Boolean packetLossPrecededThis;
00130 BufferedPacket* nextPacket
00131 = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);
00132 if (nextPacket == NULL) break;
00133
00134 fNeedDelivery = False;
00135
00136 if (nextPacket->useCount() == 0) {
00137
00138
00139 unsigned specialHeaderSize;
00140 if (!processSpecialHeader(nextPacket, specialHeaderSize)) {
00141
00142 fReorderingBuffer->releaseUsedPacket(nextPacket);
00143 fNeedDelivery = True;
00144 break;
00145 }
00146 nextPacket->skip(specialHeaderSize);
00147 }
00148
00149
00150
00151 if (fCurrentPacketBeginsFrame) {
00152 if (packetLossPrecededThis || fPacketLossInFragmentedFrame) {
00153
00154
00155 fTo = fSavedTo; fMaxSize = fSavedMaxSize;
00156 fFrameSize = 0;
00157 }
00158 fPacketLossInFragmentedFrame = False;
00159 } else if (packetLossPrecededThis) {
00160
00161 fPacketLossInFragmentedFrame = True;
00162 }
00163 if (fPacketLossInFragmentedFrame) {
00164
00165 fReorderingBuffer->releaseUsedPacket(nextPacket);
00166 fNeedDelivery = True;
00167 break;
00168 }
00169
00170
00171 unsigned frameSize;
00172 nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,
00173 fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,
00174 fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,
00175 fCurPacketMarkerBit);
00176 fFrameSize += frameSize;
00177
00178 if (!nextPacket->hasUsableData()) {
00179
00180 fReorderingBuffer->releaseUsedPacket(nextPacket);
00181 }
00182
00183 if (fCurrentPacketCompletesFrame || fNumTruncatedBytes > 0) {
00184
00185 if (fNumTruncatedBytes > 0) {
00186 envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size ("
00187 << fSavedMaxSize << "). "
00188 << fNumTruncatedBytes << " bytes of trailing data will be dropped!\n";
00189 }
00190
00191 if (fReorderingBuffer->isEmpty()) {
00192
00193
00194
00195 afterGetting(this);
00196 } else {
00197
00198 nextTask() = envir().taskScheduler().scheduleDelayedTask(0,
00199 (TaskFunc*)FramedSource::afterGetting, this);
00200 }
00201 } else {
00202
00203
00204 fTo += frameSize; fMaxSize -= frameSize;
00205 fNeedDelivery = True;
00206 }
00207 }
00208 }
00209
00210 void MultiFramedRTPSource
00211 ::setPacketReorderingThresholdTime(unsigned uSeconds) {
00212 fReorderingBuffer->setThresholdTime(uSeconds);
00213 }
00214
00215 #define ADVANCE(n) do { bPacket->skip(n); } while (0)
00216
00217 void MultiFramedRTPSource::networkReadHandler(MultiFramedRTPSource* source,
00218 int ) {
00219
00220 BufferedPacket* bPacket
00221 = source->fReorderingBuffer->getFreePacket(source);
00222
00223
00224 Boolean readSuccess = False;
00225 do {
00226 if (!bPacket->fillInData(source->fRTPInterface)) break;
00227 #ifdef TEST_LOSS
00228 source->setPacketReorderingThresholdTime(0);
00229
00230 if ((our_random()%10) == 0) break;
00231 #endif
00232
00233
00234 if (bPacket->dataSize() < 12) break;
00235 unsigned rtpHdr = ntohl(*(unsigned*)(bPacket->data())); ADVANCE(4);
00236 Boolean rtpMarkerBit = (rtpHdr&0x00800000) >> 23;
00237 unsigned rtpTimestamp = ntohl(*(unsigned*)(bPacket->data()));ADVANCE(4);
00238 unsigned rtpSSRC = ntohl(*(unsigned*)(bPacket->data())); ADVANCE(4);
00239
00240
00241 if ((rtpHdr&0xC0000000) != 0x80000000) break;
00242
00243
00244 unsigned cc = (rtpHdr>>24)&0xF;
00245 if (bPacket->dataSize() < cc) break;
00246 ADVANCE(cc*4);
00247
00248
00249 if (rtpHdr&0x10000000) {
00250 if (bPacket->dataSize() < 4) break;
00251 unsigned extHdr = ntohl(*(unsigned*)(bPacket->data())); ADVANCE(4);
00252 unsigned remExtSize = 4*(extHdr&0xFFFF);
00253 if (bPacket->dataSize() < remExtSize) break;
00254 ADVANCE(remExtSize);
00255 }
00256
00257
00258 if (rtpHdr&0x20000000) {
00259 if (bPacket->dataSize() == 0) break;
00260 unsigned numPaddingBytes
00261 = (unsigned)(bPacket->data())[bPacket->dataSize()-1];
00262 if (bPacket->dataSize() < numPaddingBytes) break;
00263 bPacket->removePadding(numPaddingBytes);
00264 }
00265
00266 if ((unsigned char)((rtpHdr&0x007F0000)>>16)
00267 != source->rtpPayloadFormat()) {
00268 break;
00269 }
00270
00271
00272 source->fLastReceivedSSRC = rtpSSRC;
00273 unsigned short rtpSeqNo = (unsigned short)(rtpHdr&0xFFFF);
00274 Boolean usableInJitterCalculation
00275 = source->packetIsUsableInJitterCalculation((bPacket->data()),
00276 bPacket->dataSize());
00277 struct timeval presentationTime;
00278 Boolean hasBeenSyncedUsingRTCP;
00279 source->receptionStatsDB()
00280 .noteIncomingPacket(rtpSSRC, rtpSeqNo, rtpTimestamp,
00281 source->timestampFrequency(),
00282 usableInJitterCalculation, presentationTime,
00283 hasBeenSyncedUsingRTCP, bPacket->dataSize());
00284
00285
00286 struct timeval timeNow;
00287 gettimeofday(&timeNow, NULL);
00288 bPacket->assignMiscParams(rtpSeqNo, rtpTimestamp, presentationTime,
00289 hasBeenSyncedUsingRTCP, rtpMarkerBit,
00290 timeNow);
00291 if (!source->fReorderingBuffer->storePacket(bPacket)) break;
00292
00293 readSuccess = True;
00294 } while (0);
00295 if (!readSuccess) source->fReorderingBuffer->freePacket(bPacket);
00296
00297 source->doGetNextFrame1();
00298
00299 }
00300
00301
00303
00304 #define MAX_PACKET_SIZE 10000
00305
00306 BufferedPacket::BufferedPacket()
00307 : fPacketSize(MAX_PACKET_SIZE),
00308 fBuf(new unsigned char[MAX_PACKET_SIZE]),
00309 fNextPacket(NULL) {
00310 }
00311
00312 BufferedPacket::~BufferedPacket() {
00313 delete fNextPacket;
00314 delete[] fBuf;
00315 }
00316
00317 void BufferedPacket::reset() {
00318 fHead = fTail = 0;
00319 fUseCount = 0;
00320 }
00321
00322
00323 unsigned BufferedPacket
00324 ::nextEnclosedFrameSize(unsigned char*& , unsigned dataSize) {
00325
00326
00327
00328 return dataSize;
00329 }
00330
00331 void BufferedPacket
00332 ::getNextEnclosedFrameParameters(unsigned char*& framePtr, unsigned dataSize,
00333 unsigned& frameSize,
00334 unsigned& frameDurationInMicroseconds) {
00335
00336
00337
00338
00339
00340
00341 frameSize = nextEnclosedFrameSize(framePtr, dataSize);
00342
00343 frameDurationInMicroseconds = 0;
00344 }
00345
00346 Boolean BufferedPacket::fillInData(RTPInterface& rtpInterface) {
00347 reset();
00348
00349 unsigned numBytesRead;
00350 struct sockaddr_in fromAddress;
00351 if (!rtpInterface.handleRead(&fBuf[fTail], fPacketSize-fTail, numBytesRead,
00352 fromAddress)) {
00353 return False;
00354 }
00355 fTail += numBytesRead;
00356 return True;
00357 }
00358
00359 void BufferedPacket
00360 ::assignMiscParams(unsigned short rtpSeqNo, unsigned rtpTimestamp,
00361 struct timeval presentationTime,
00362 Boolean hasBeenSyncedUsingRTCP, Boolean rtpMarkerBit,
00363 struct timeval timeReceived) {
00364 fRTPSeqNo = rtpSeqNo;
00365 fRTPTimestamp = rtpTimestamp;
00366 fPresentationTime = presentationTime;
00367 fHasBeenSyncedUsingRTCP = hasBeenSyncedUsingRTCP;
00368 fRTPMarkerBit = rtpMarkerBit;
00369 fTimeReceived = timeReceived;
00370 }
00371
00372 void BufferedPacket::skip(unsigned numBytes) {
00373 fHead += numBytes;
00374 if (fHead > fTail) fHead = fTail;
00375 }
00376
00377 void BufferedPacket::removePadding(unsigned numBytes) {
00378 if (numBytes > fTail-fHead) numBytes = fTail-fHead;
00379 fTail -= numBytes;
00380 }
00381
00382 void BufferedPacket::appendData(unsigned char* newData, unsigned numBytes) {
00383 if (numBytes > fPacketSize-fTail) numBytes = fPacketSize - fTail;
00384 memmove(&fBuf[fTail], newData, numBytes);
00385 fTail += numBytes;
00386 }
00387
00388 void BufferedPacket::use(unsigned char* to, unsigned toSize,
00389 unsigned& bytesUsed, unsigned& bytesTruncated,
00390 unsigned short& rtpSeqNo, unsigned& rtpTimestamp,
00391 struct timeval& presentationTime,
00392 Boolean& hasBeenSyncedUsingRTCP,
00393 Boolean& rtpMarkerBit) {
00394 unsigned char* origFramePtr = &fBuf[fHead];
00395 unsigned char* newFramePtr = origFramePtr;
00396 unsigned frameSize, frameDurationInMicroseconds;
00397 getNextEnclosedFrameParameters(newFramePtr, fTail - fHead,
00398 frameSize, frameDurationInMicroseconds);
00399 if (frameSize > toSize) {
00400 bytesTruncated = frameSize - toSize;
00401 bytesUsed = toSize;
00402 } else {
00403 bytesTruncated = 0;
00404 bytesUsed = frameSize;
00405 }
00406
00407 memmove(to, newFramePtr, bytesUsed);
00408 fHead += (newFramePtr - origFramePtr) + frameSize;
00409 ++fUseCount;
00410
00411 rtpSeqNo = fRTPSeqNo;
00412 rtpTimestamp = fRTPTimestamp;
00413 presentationTime = fPresentationTime;
00414 hasBeenSyncedUsingRTCP = fHasBeenSyncedUsingRTCP;
00415 rtpMarkerBit = fRTPMarkerBit;
00416
00417
00418 fPresentationTime.tv_usec += frameDurationInMicroseconds;
00419 if (fPresentationTime.tv_usec >= 1000000) {
00420 fPresentationTime.tv_sec += fPresentationTime.tv_usec/1000000;
00421 fPresentationTime.tv_usec = fPresentationTime.tv_usec%1000000;
00422 }
00423 }
00424
00425 BufferedPacketFactory::BufferedPacketFactory() {
00426 }
00427
00428 BufferedPacketFactory::~BufferedPacketFactory() {
00429 }
00430
00431 BufferedPacket* BufferedPacketFactory
00432 ::createNewPacket(MultiFramedRTPSource* ) {
00433 return new BufferedPacket;
00434 }
00435
00436
00438
00439 ReorderingPacketBuffer
00440 ::ReorderingPacketBuffer(BufferedPacketFactory* packetFactory)
00441 : fThresholdTime(100000) ,
00442 fHaveSeenFirstPacket(False), fHeadPacket(NULL), fSavedPacket(NULL), fSavedPacketFree(True) {
00443 fPacketFactory = (packetFactory == NULL)
00444 ? (new BufferedPacketFactory)
00445 : packetFactory;
00446 }
00447
00448 ReorderingPacketBuffer::~ReorderingPacketBuffer() {
00449 reset();
00450 delete fPacketFactory;
00451 }
00452
00453 void ReorderingPacketBuffer::reset() {
00454 if (fSavedPacketFree) delete fSavedPacket;
00455 delete fHeadPacket;
00456 fHaveSeenFirstPacket = False;
00457 fHeadPacket = NULL;
00458 fSavedPacket = NULL;
00459 }
00460
00461 BufferedPacket* ReorderingPacketBuffer
00462 ::getFreePacket(MultiFramedRTPSource* ourSource) {
00463 if (fSavedPacket == NULL) {
00464 fSavedPacket = fPacketFactory->createNewPacket(ourSource);
00465 fSavedPacketFree = True;
00466 }
00467
00468 if (fSavedPacketFree == True) {
00469 fSavedPacketFree = False;
00470 return fSavedPacket;
00471 } else {
00472 return fPacketFactory->createNewPacket(ourSource);
00473 }
00474 }
00475
00476 Boolean ReorderingPacketBuffer::storePacket(BufferedPacket* bPacket) {
00477 unsigned short rtpSeqNo = bPacket->rtpSeqNo();
00478
00479 if (!fHaveSeenFirstPacket) {
00480 fNextExpectedSeqNo = rtpSeqNo;
00481 fHaveSeenFirstPacket = True;
00482 }
00483
00484
00485
00486
00487
00488 unsigned short const seqNoThreshold = 100;
00489 if (seqNumLT(rtpSeqNo, fNextExpectedSeqNo)
00490 && seqNumLT(fNextExpectedSeqNo, rtpSeqNo+seqNoThreshold)) {
00491 return False;
00492 }
00493
00494
00495 BufferedPacket* beforePtr = NULL;
00496 BufferedPacket* afterPtr = fHeadPacket;
00497 while (afterPtr != NULL) {
00498 if (seqNumLT(rtpSeqNo, afterPtr->rtpSeqNo())) break;
00499 if (rtpSeqNo == afterPtr->rtpSeqNo()) {
00500
00501 return False;
00502 }
00503
00504 beforePtr = afterPtr;
00505 afterPtr = afterPtr->nextPacket();
00506 }
00507
00508
00509 bPacket->nextPacket() = afterPtr;
00510 if (beforePtr == NULL) {
00511 fHeadPacket = bPacket;
00512 } else {
00513 beforePtr->nextPacket() = bPacket;
00514 }
00515
00516 return True;
00517 }
00518
00519 void ReorderingPacketBuffer::releaseUsedPacket(BufferedPacket* packet) {
00520
00521
00522 ++fNextExpectedSeqNo;
00523
00524 fHeadPacket = fHeadPacket->nextPacket();
00525 packet->nextPacket() = NULL;
00526
00527 freePacket(packet);
00528 }
00529
00530 BufferedPacket* ReorderingPacketBuffer
00531 ::getNextCompletedPacket(Boolean& packetLossPreceded) {
00532 if (fHeadPacket == NULL) return NULL;
00533
00534
00535
00536
00537 if (fHeadPacket->rtpSeqNo() == fNextExpectedSeqNo) {
00538 packetLossPreceded = False;
00539 return fHeadPacket;
00540 }
00541
00542
00543
00544
00545 struct timeval timeNow;
00546 gettimeofday(&timeNow, NULL);
00547 unsigned uSecondsSinceReceived
00548 = (timeNow.tv_sec - fHeadPacket->timeReceived().tv_sec)*1000000
00549 + (timeNow.tv_usec - fHeadPacket->timeReceived().tv_usec);
00550 if (uSecondsSinceReceived > fThresholdTime) {
00551 fNextExpectedSeqNo = fHeadPacket->rtpSeqNo();
00552
00553 packetLossPreceded = True;
00554 return fHeadPacket;
00555 }
00556
00557
00558 return NULL;
00559 }