testProgs/playCommon.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // Copyright (c) 1996-2012, Live Networks, Inc.  All rights reserved
00017 // A common framework, used for the "openRTSP" and "playSIP" applications
00018 // Implementation
00019 //
00020 // NOTE: If you want to develop your own RTSP client application (or embed RTSP client functionality into your own application),
00021 // then we don't recommend using this code as a model, because it is too complex (with many options).
00022 // Instead, we recommend using the "testRTSPClient" application code as a model.
00023 
00024 #include "playCommon.hh"
00025 #include "BasicUsageEnvironment.hh"
00026 #include "GroupsockHelper.hh"
00027 
00028 #if defined(__WIN32__) || defined(_WIN32)
00029 #define snprintf _snprintf
00030 #else
00031 #include <signal.h>
00032 #define USE_SIGNALS 1
00033 #endif
00034 
00035 // Forward function definitions:
00036 void continueAfterOPTIONS(RTSPClient* client, int resultCode, char* resultString);
00037 void continueAfterDESCRIBE(RTSPClient* client, int resultCode, char* resultString);
00038 void continueAfterSETUP(RTSPClient* client, int resultCode, char* resultString);
00039 void continueAfterPLAY(RTSPClient* client, int resultCode, char* resultString);
00040 void continueAfterTEARDOWN(RTSPClient* client, int resultCode, char* resultString);
00041 
00042 void setupStreams();
00043 void closeMediaSinks();
00044 void subsessionAfterPlaying(void* clientData);
00045 void subsessionByeHandler(void* clientData);
00046 void sessionAfterPlaying(void* clientData = NULL);
00047 void sessionTimerHandler(void* clientData);
00048 void shutdown(int exitCode = 1);
00049 void signalHandlerShutdown(int sig);
00050 void checkForPacketArrival(void* clientData);
00051 void checkInterPacketGaps(void* clientData);
00052 void beginQOSMeasurement();
00053 
00054 char const* progName;
00055 UsageEnvironment* env;
00056 Medium* ourClient = NULL;
00057 Authenticator* ourAuthenticator = NULL;
00058 char const* streamURL = NULL;
00059 MediaSession* session = NULL;
00060 TaskToken sessionTimerTask = NULL;
00061 TaskToken arrivalCheckTimerTask = NULL;
00062 TaskToken interPacketGapCheckTimerTask = NULL;
00063 TaskToken qosMeasurementTimerTask = NULL;
00064 Boolean createReceivers = True;
00065 Boolean outputQuickTimeFile = False;
00066 Boolean generateMP4Format = False;
00067 QuickTimeFileSink* qtOut = NULL;
00068 Boolean outputAVIFile = False;
00069 AVIFileSink* aviOut = NULL;
00070 Boolean audioOnly = False;
00071 Boolean videoOnly = False;
00072 char const* singleMedium = NULL;
00073 int verbosityLevel = 1; // by default, print verbose output
00074 double duration = 0;
00075 double durationSlop = -1.0; // extra seconds to play at the end
00076 double initialSeekTime = 0.0f;
00077 float scale = 1.0f;
00078 double endTime;
00079 unsigned interPacketGapMaxTime = 0;
00080 unsigned totNumPacketsReceived = ~0; // used if checking inter-packet gaps
00081 Boolean playContinuously = False;
00082 int simpleRTPoffsetArg = -1;
00083 Boolean sendOptionsRequest = True;
00084 Boolean sendOptionsRequestOnly = False;
00085 Boolean oneFilePerFrame = False;
00086 Boolean notifyOnPacketArrival = False;
00087 Boolean streamUsingTCP = False;
00088 unsigned short desiredPortNum = 0;
00089 portNumBits tunnelOverHTTPPortNum = 0;
00090 char* username = NULL;
00091 char* password = NULL;
00092 char* proxyServerName = NULL;
00093 unsigned short proxyServerPortNum = 0;
00094 unsigned char desiredAudioRTPPayloadFormat = 0;
00095 char* mimeSubtype = NULL;
00096 unsigned short movieWidth = 240; // default
00097 Boolean movieWidthOptionSet = False;
00098 unsigned short movieHeight = 180; // default
00099 Boolean movieHeightOptionSet = False;
00100 unsigned movieFPS = 15; // default
00101 Boolean movieFPSOptionSet = False;
00102 char const* fileNamePrefix = "";
00103 unsigned fileSinkBufferSize = 100000;
00104 unsigned socketInputBufferSize = 0;
00105 Boolean packetLossCompensate = False;
00106 Boolean syncStreams = False;
00107 Boolean generateHintTracks = False;
00108 unsigned qosMeasurementIntervalMS = 0; // 0 means: Don't output QOS data
00109 
00110 struct timeval startTime;
00111 
00112 void usage() {
00113   *env << "Usage: " << progName
00114        << " [-p <startPortNum>] [-r|-q|-4|-i] [-a|-v] [-V] [-d <duration>] [-D <max-inter-packet-gap-time> [-c] [-S <offset>] [-n] [-O]"
00115            << (controlConnectionUsesTCP ? " [-t|-T <http-port>]" : "")
00116        << " [-u <username> <password>"
00117            << (allowProxyServers ? " [<proxy-server> [<proxy-server-port>]]" : "")
00118        << "]" << (supportCodecSelection ? " [-A <audio-codec-rtp-payload-format-code>|-M <mime-subtype-name>]" : "")
00119        << " [-s <initial-seek-time>] [-z <scale>]"
00120        << " [-w <width> -h <height>] [-f <frames-per-second>] [-y] [-H] [-Q [<measurement-interval>]] [-F <filename-prefix>] [-b <file-sink-buffer-size>] [-B <input-socket-buffer-size>] [-I <input-interface-ip-address>] [-m] <url> (or " << progName << " -o [-V] <url>)\n";
00121   shutdown();
00122 }
00123 
00124 int main(int argc, char** argv) {
00125   // Begin by setting up our usage environment:
00126   TaskScheduler* scheduler = BasicTaskScheduler::createNew();
00127   env = BasicUsageEnvironment::createNew(*scheduler);
00128 
00129   progName = argv[0];
00130 
00131   gettimeofday(&startTime, NULL);
00132 
00133 #ifdef USE_SIGNALS
00134   // Allow ourselves to be shut down gracefully by a SIGHUP or a SIGUSR1:
00135   signal(SIGHUP, signalHandlerShutdown);
00136   signal(SIGUSR1, signalHandlerShutdown);
00137 #endif
00138 
00139   // unfortunately we can't use getopt() here, as Windoze doesn't have it
00140   while (argc > 2) {
00141     char* const opt = argv[1];
00142     if (opt[0] != '-') usage();
00143     switch (opt[1]) {
00144 
00145     case 'p': { // specify start port number
00146       int portArg;
00147       if (sscanf(argv[2], "%d", &portArg) != 1) {
00148         usage();
00149       }
00150       if (portArg <= 0 || portArg >= 65536 || portArg&1) {
00151         *env << "bad port number: " << portArg
00152                 << " (must be even, and in the range (0,65536))\n";
00153         usage();
00154       }
00155       desiredPortNum = (unsigned short)portArg;
00156       ++argv; --argc;
00157       break;
00158     }
00159 
00160     case 'r': { // do not receive data (instead, just 'play' the stream(s))
00161       createReceivers = False;
00162       break;
00163     }
00164 
00165     case 'q': { // output a QuickTime file (to stdout)
00166       outputQuickTimeFile = True;
00167       break;
00168     }
00169 
00170     case '4': { // output a 'mp4'-format file (to stdout)
00171       outputQuickTimeFile = True;
00172       generateMP4Format = True;
00173       break;
00174     }
00175 
00176     case 'i': { // output an AVI file (to stdout)
00177       outputAVIFile = True;
00178       break;
00179     }
00180 
00181     case 'I': { // specify input interface...
00182       NetAddressList addresses(argv[2]);
00183       if (addresses.numAddresses() == 0) {
00184         *env << "Failed to find network address for \"" << argv[2] << "\"";
00185         break;
00186       }
00187       ReceivingInterfaceAddr = *(unsigned*)(addresses.firstAddress()->data());
00188       ++argv; --argc;
00189       break;
00190     }
00191 
00192     case 'a': { // receive/record an audio stream only
00193       audioOnly = True;
00194       singleMedium = "audio";
00195       break;
00196     }
00197 
00198     case 'v': { // receive/record a video stream only
00199       videoOnly = True;
00200       singleMedium = "video";
00201       break;
00202     }
00203 
00204     case 'V': { // disable verbose output
00205       verbosityLevel = 0;
00206       break;
00207     }
00208 
00209     case 'd': { // specify duration, or how much to delay after end time
00210       float arg;
00211       if (sscanf(argv[2], "%g", &arg) != 1) {
00212         usage();
00213       }
00214       if (argv[2][0] == '-') { // not "arg<0", in case argv[2] was "-0"
00215         // a 'negative' argument was specified; use this for "durationSlop":
00216         duration = 0; // use whatever's in the SDP
00217         durationSlop = -arg;
00218       } else {
00219         duration = arg;
00220         durationSlop = 0;
00221       }
00222       ++argv; --argc;
00223       break;
00224     }
00225 
00226     case 'D': { // specify maximum number of seconds to wait for packets:
00227       if (sscanf(argv[2], "%u", &interPacketGapMaxTime) != 1) {
00228         usage();
00229       }
00230       ++argv; --argc;
00231       break;
00232     }
00233 
00234     case 'c': { // play continuously
00235       playContinuously = True;
00236       break;
00237     }
00238 
00239     case 'S': { // specify an offset to use with "SimpleRTPSource"s
00240       if (sscanf(argv[2], "%d", &simpleRTPoffsetArg) != 1) {
00241         usage();
00242       }
00243       if (simpleRTPoffsetArg < 0) {
00244         *env << "offset argument to \"-S\" must be >= 0\n";
00245         usage();
00246       }
00247       ++argv; --argc;
00248       break;
00249     }
00250 
00251     case 'O': { // Don't send an "OPTIONS" request before "DESCRIBE"
00252       sendOptionsRequest = False;
00253       break;
00254     }
00255 
00256     case 'o': { // Send only the "OPTIONS" request to the server
00257       sendOptionsRequestOnly = True;
00258       break;
00259     }
00260 
00261     case 'm': { // output multiple files - one for each frame
00262       oneFilePerFrame = True;
00263       break;
00264     }
00265 
00266     case 'n': { // notify the user when the first data packet arrives
00267       notifyOnPacketArrival = True;
00268       break;
00269     }
00270 
00271     case 't': {
00272       // stream RTP and RTCP over the TCP 'control' connection
00273       if (controlConnectionUsesTCP) {
00274         streamUsingTCP = True;
00275       } else {
00276         usage();
00277       }
00278       break;
00279     }
00280 
00281     case 'T': {
00282       // stream RTP and RTCP over a HTTP connection
00283       if (controlConnectionUsesTCP) {
00284         if (argc > 3 && argv[2][0] != '-') {
00285           // The next argument is the HTTP server port number:
00286           if (sscanf(argv[2], "%hu", &tunnelOverHTTPPortNum) == 1
00287               && tunnelOverHTTPPortNum > 0) {
00288             ++argv; --argc;
00289             break;
00290           }
00291         }
00292       }
00293 
00294       // If we get here, the option was specified incorrectly:
00295       usage();
00296       break;
00297     }
00298 
00299     case 'u': { // specify a username and password
00300       username = argv[2];
00301       password = argv[3];
00302       argv+=2; argc-=2;
00303       if (allowProxyServers && argc > 3 && argv[2][0] != '-') {
00304         // The next argument is the name of a proxy server:
00305         proxyServerName = argv[2];
00306         ++argv; --argc;
00307 
00308         if (argc > 3 && argv[2][0] != '-') {
00309           // The next argument is the proxy server port number:
00310           if (sscanf(argv[2], "%hu", &proxyServerPortNum) != 1) {
00311             usage();
00312           }
00313           ++argv; --argc;
00314         }
00315       }
00316 
00317       ourAuthenticator = new Authenticator(username, password);
00318       break;
00319     }
00320 
00321     case 'A': { // specify a desired audio RTP payload format
00322       unsigned formatArg;
00323       if (sscanf(argv[2], "%u", &formatArg) != 1
00324           || formatArg >= 96) {
00325         usage();
00326       }
00327       desiredAudioRTPPayloadFormat = (unsigned char)formatArg;
00328       ++argv; --argc;
00329       break;
00330     }
00331 
00332     case 'M': { // specify a MIME subtype for a dynamic RTP payload type
00333       mimeSubtype = argv[2];
00334       if (desiredAudioRTPPayloadFormat==0) desiredAudioRTPPayloadFormat =96;
00335       ++argv; --argc;
00336       break;
00337     }
00338 
00339     case 'w': { // specify a width (pixels) for an output QuickTime or AVI movie
00340       if (sscanf(argv[2], "%hu", &movieWidth) != 1) {
00341         usage();
00342       }
00343       movieWidthOptionSet = True;
00344       ++argv; --argc;
00345       break;
00346     }
00347 
00348     case 'h': { // specify a height (pixels) for an output QuickTime or AVI movie
00349       if (sscanf(argv[2], "%hu", &movieHeight) != 1) {
00350         usage();
00351       }
00352       movieHeightOptionSet = True;
00353       ++argv; --argc;
00354       break;
00355     }
00356 
00357     case 'f': { // specify a frame rate (per second) for an output QT or AVI movie
00358       if (sscanf(argv[2], "%u", &movieFPS) != 1) {
00359         usage();
00360       }
00361       movieFPSOptionSet = True;
00362       ++argv; --argc;
00363       break;
00364     }
00365 
00366     case 'F': { // specify a prefix for the audio and video output files
00367       fileNamePrefix = argv[2];
00368       ++argv; --argc;
00369       break;
00370     }
00371 
00372     case 'b': { // specify the size of buffers for "FileSink"s
00373       if (sscanf(argv[2], "%u", &fileSinkBufferSize) != 1) {
00374         usage();
00375       }
00376       ++argv; --argc;
00377       break;
00378     }
00379 
00380     case 'B': { // specify the size of input socket buffers
00381       if (sscanf(argv[2], "%u", &socketInputBufferSize) != 1) {
00382         usage();
00383       }
00384       ++argv; --argc;
00385       break;
00386     }
00387 
00388     // Note: The following option is deprecated, and may someday be removed:
00389     case 'l': { // try to compensate for packet loss by repeating frames
00390       packetLossCompensate = True;
00391       break;
00392     }
00393 
00394     case 'y': { // synchronize audio and video streams
00395       syncStreams = True;
00396       break;
00397     }
00398 
00399     case 'H': { // generate hint tracks (as well as the regular data tracks)
00400       generateHintTracks = True;
00401       break;
00402     }
00403 
00404     case 'Q': { // output QOS measurements
00405       qosMeasurementIntervalMS = 1000; // default: 1 second
00406 
00407       if (argc > 3 && argv[2][0] != '-') {
00408         // The next argument is the measurement interval,
00409         // in multiples of 100 ms
00410         if (sscanf(argv[2], "%u", &qosMeasurementIntervalMS) != 1) {
00411           usage();
00412         }
00413         qosMeasurementIntervalMS *= 100;
00414         ++argv; --argc;
00415       }
00416       break;
00417     }
00418 
00419     case 's': { // specify initial seek time (trick play)
00420       double arg;
00421       if (sscanf(argv[2], "%lg", &arg) != 1 || arg < 0) {
00422         usage();
00423       }
00424       initialSeekTime = arg;
00425       ++argv; --argc;
00426       break;
00427     }
00428 
00429     case 'z': { // scale (trick play)
00430       float arg;
00431       if (sscanf(argv[2], "%g", &arg) != 1 || arg == 0.0f) {
00432         usage();
00433       }
00434       scale = arg;
00435       ++argv; --argc;
00436       break;
00437     }
00438 
00439     default: {
00440       usage();
00441       break;
00442     }
00443     }
00444 
00445     ++argv; --argc;
00446   }
00447   if (argc != 2) usage();
00448   if (outputQuickTimeFile && outputAVIFile) {
00449     *env << "The -i and -q (or -4) flags cannot both be used!\n";
00450     usage();
00451   }
00452   Boolean outputCompositeFile = outputQuickTimeFile || outputAVIFile;
00453   if (!createReceivers && outputCompositeFile) {
00454     *env << "The -r and -q (or -4 or -i) flags cannot both be used!\n";
00455     usage();
00456   }
00457   if (outputCompositeFile && !movieWidthOptionSet) {
00458     *env << "Warning: The -q, -4 or -i option was used, but not -w.  Assuming a video width of "
00459          << movieWidth << " pixels\n";
00460   }
00461   if (outputCompositeFile && !movieHeightOptionSet) {
00462     *env << "Warning: The -q, -4 or -i option was used, but not -h.  Assuming a video height of "
00463          << movieHeight << " pixels\n";
00464   }
00465   if (outputCompositeFile && !movieFPSOptionSet) {
00466     *env << "Warning: The -q, -4 or -i option was used, but not -f.  Assuming a video frame rate of "
00467          << movieFPS << " frames-per-second\n";
00468   }
00469   if (audioOnly && videoOnly) {
00470     *env << "The -a and -v flags cannot both be used!\n";
00471     usage();
00472   }
00473   if (sendOptionsRequestOnly && !sendOptionsRequest) {
00474     *env << "The -o and -O flags cannot both be used!\n";
00475     usage();
00476   }
00477   if (tunnelOverHTTPPortNum > 0) {
00478     if (streamUsingTCP) {
00479       *env << "The -t and -T flags cannot both be used!\n";
00480       usage();
00481     } else {
00482       streamUsingTCP = True;
00483     }
00484   }
00485   if (!createReceivers && notifyOnPacketArrival) {
00486     *env << "Warning: Because we're not receiving stream data, the -n flag has no effect\n";
00487   }
00488   if (durationSlop < 0) {
00489     // This parameter wasn't set, so use a default value.
00490     // If we're measuring QOS stats, then don't add any slop, to avoid
00491     // having 'empty' measurement intervals at the end.
00492     durationSlop = qosMeasurementIntervalMS > 0 ? 0.0 : 5.0;
00493   }
00494 
00495   streamURL = argv[1];
00496 
00497   // Create our client object:
00498   ourClient = createClient(*env, streamURL, verbosityLevel, progName);
00499   if (ourClient == NULL) {
00500     *env << "Failed to create " << clientProtocolName
00501                 << " client: " << env->getResultMsg() << "\n";
00502     shutdown();
00503   }
00504 
00505   if (sendOptionsRequest) {
00506     // Begin by sending an "OPTIONS" command:
00507     getOptions(continueAfterOPTIONS);
00508   } else {
00509     continueAfterOPTIONS(NULL, 0, NULL);
00510   }
00511 
00512   // All subsequent activity takes place within the event loop:
00513   env->taskScheduler().doEventLoop(); // does not return
00514 
00515   return 0; // only to prevent compiler warning
00516 }
00517 
00518 void continueAfterOPTIONS(RTSPClient*, int resultCode, char* resultString) {
00519   if (sendOptionsRequestOnly) {
00520     if (resultCode != 0) {
00521       *env << clientProtocolName << " \"OPTIONS\" request failed: " << resultString << "\n";
00522     } else {
00523       *env << clientProtocolName << " \"OPTIONS\" request returned: " << resultString << "\n";
00524     }
00525     shutdown();
00526   }
00527   delete[] resultString;
00528 
00529   // Next, get a SDP description for the stream:
00530   getSDPDescription(continueAfterDESCRIBE);
00531 }
00532 
00533 void continueAfterDESCRIBE(RTSPClient*, int resultCode, char* resultString) {
00534   if (resultCode != 0) {
00535     *env << "Failed to get a SDP description for the URL \"" << streamURL << "\": " << resultString << "\n";
00536     shutdown();
00537   }
00538 
00539   char* sdpDescription = resultString;
00540   *env << "Opened URL \"" << streamURL << "\", returning a SDP description:\n" << sdpDescription << "\n";
00541 
00542   // Create a media session object from this SDP description:
00543   session = MediaSession::createNew(*env, sdpDescription);
00544   delete[] sdpDescription;
00545   if (session == NULL) {
00546     *env << "Failed to create a MediaSession object from the SDP description: " << env->getResultMsg() << "\n";
00547     shutdown();
00548   } else if (!session->hasSubsessions()) {
00549     *env << "This session has no media subsessions (i.e., no \"m=\" lines)\n";
00550     shutdown();
00551   }
00552 
00553   // Then, setup the "RTPSource"s for the session:
00554   MediaSubsessionIterator iter(*session);
00555   MediaSubsession *subsession;
00556   Boolean madeProgress = False;
00557   char const* singleMediumToTest = singleMedium;
00558   while ((subsession = iter.next()) != NULL) {
00559     // If we've asked to receive only a single medium, then check this now:
00560     if (singleMediumToTest != NULL) {
00561       if (strcmp(subsession->mediumName(), singleMediumToTest) != 0) {
00562                   *env << "Ignoring \"" << subsession->mediumName()
00563                           << "/" << subsession->codecName()
00564                           << "\" subsession, because we've asked to receive a single " << singleMedium
00565                           << " session only\n";
00566         continue;
00567       } else {
00568         // Receive this subsession only
00569         singleMediumToTest = "xxxxx";
00570             // this hack ensures that we get only 1 subsession of this type
00571       }
00572     }
00573 
00574     if (desiredPortNum != 0) {
00575       subsession->setClientPortNum(desiredPortNum);
00576       desiredPortNum += 2;
00577     }
00578 
00579     if (createReceivers) {
00580       if (!subsession->initiate(simpleRTPoffsetArg)) {
00581         *env << "Unable to create receiver for \"" << subsession->mediumName()
00582              << "/" << subsession->codecName()
00583              << "\" subsession: " << env->getResultMsg() << "\n";
00584       } else {
00585         *env << "Created receiver for \"" << subsession->mediumName()
00586              << "/" << subsession->codecName()
00587              << "\" subsession (client ports " << subsession->clientPortNum()
00588              << "-" << subsession->clientPortNum()+1 << ")\n";
00589         madeProgress = True;
00590         
00591         if (subsession->rtpSource() != NULL) {
00592           // Because we're saving the incoming data, rather than playing
00593           // it in real time, allow an especially large time threshold
00594           // (1 second) for reordering misordered incoming packets:
00595           unsigned const thresh = 1000000; // 1 second
00596           subsession->rtpSource()->setPacketReorderingThresholdTime(thresh);
00597           
00598           // Set the RTP source's OS socket buffer size as appropriate - either if we were explicitly asked (using -B),
00599           // or if the desired FileSink buffer size happens to be larger than the current OS socket buffer size.
00600           // (The latter case is a heuristic, on the assumption that if the user asked for a large FileSink buffer size,
00601           // then the input data rate may be large enough to justify increasing the OS socket buffer size also.)
00602           int socketNum = subsession->rtpSource()->RTPgs()->socketNum();
00603           unsigned curBufferSize = getReceiveBufferSize(*env, socketNum);
00604           if (socketInputBufferSize > 0 || fileSinkBufferSize > curBufferSize) {
00605             unsigned newBufferSize = socketInputBufferSize > 0 ? socketInputBufferSize : fileSinkBufferSize;
00606             newBufferSize = setReceiveBufferTo(*env, socketNum, newBufferSize);
00607             if (socketInputBufferSize > 0) { // The user explicitly asked for the new socket buffer size; announce it:
00608               *env << "Changed socket receive buffer size for the \""
00609                    << subsession->mediumName()
00610                    << "/" << subsession->codecName()
00611                    << "\" subsession from "
00612                    << curBufferSize << " to "
00613                    << newBufferSize << " bytes\n";
00614             }
00615           }
00616         }
00617       }
00618     } else {
00619       if (subsession->clientPortNum() == 0) {
00620         *env << "No client port was specified for the \""
00621              << subsession->mediumName()
00622              << "/" << subsession->codecName()
00623              << "\" subsession.  (Try adding the \"-p <portNum>\" option.)\n";
00624       } else {
00625                 madeProgress = True;
00626       }
00627     }
00628   }
00629   if (!madeProgress) shutdown();
00630 
00631   // Perform additional 'setup' on each subsession, before playing them:
00632   setupStreams();
00633 }
00634 
00635 MediaSubsession *subsession;
00636 Boolean madeProgress = False;
00637 void continueAfterSETUP(RTSPClient*, int resultCode, char* resultString) {
00638   if (resultCode == 0) {
00639       *env << "Setup \"" << subsession->mediumName()
00640            << "/" << subsession->codecName()
00641            << "\" subsession (client ports " << subsession->clientPortNum()
00642            << "-" << subsession->clientPortNum()+1 << ")\n";
00643       madeProgress = True;
00644   } else {
00645     *env << "Failed to setup \"" << subsession->mediumName()
00646          << "/" << subsession->codecName()
00647          << "\" subsession: " << env->getResultMsg() << "\n";
00648   }
00649 
00650   // Set up the next subsession, if any:
00651   setupStreams();
00652 }
00653 
00654 void setupStreams() {
00655   static MediaSubsessionIterator* setupIter = NULL;
00656   if (setupIter == NULL) setupIter = new MediaSubsessionIterator(*session);
00657   while ((subsession = setupIter->next()) != NULL) {
00658     // We have another subsession left to set up:
00659     if (subsession->clientPortNum() == 0) continue; // port # was not set
00660 
00661     setupSubsession(subsession, streamUsingTCP, continueAfterSETUP);
00662     return;
00663   }
00664 
00665   // We're done setting up subsessions.
00666   delete setupIter;
00667   if (!madeProgress) shutdown();
00668 
00669   // Create output files:
00670   if (createReceivers) {
00671     if (outputQuickTimeFile) {
00672       // Create a "QuickTimeFileSink", to write to 'stdout':
00673       qtOut = QuickTimeFileSink::createNew(*env, *session, "stdout",
00674                                            fileSinkBufferSize,
00675                                            movieWidth, movieHeight,
00676                                            movieFPS,
00677                                            packetLossCompensate,
00678                                            syncStreams,
00679                                            generateHintTracks,
00680                                            generateMP4Format);
00681       if (qtOut == NULL) {
00682         *env << "Failed to create QuickTime file sink for stdout: " << env->getResultMsg();
00683         shutdown();
00684       }
00685 
00686       qtOut->startPlaying(sessionAfterPlaying, NULL);
00687     } else if (outputAVIFile) {
00688       // Create an "AVIFileSink", to write to 'stdout':
00689       aviOut = AVIFileSink::createNew(*env, *session, "stdout",
00690                                       fileSinkBufferSize,
00691                                       movieWidth, movieHeight,
00692                                       movieFPS,
00693                                       packetLossCompensate);
00694       if (aviOut == NULL) {
00695         *env << "Failed to create AVI file sink for stdout: " << env->getResultMsg();
00696         shutdown();
00697       }
00698 
00699       aviOut->startPlaying(sessionAfterPlaying, NULL);
00700     } else {
00701       // Create and start "FileSink"s for each subsession:
00702       madeProgress = False;
00703       MediaSubsessionIterator iter(*session);
00704       while ((subsession = iter.next()) != NULL) {
00705         if (subsession->readSource() == NULL) continue; // was not initiated
00706 
00707         // Create an output file for each desired stream:
00708         char outFileName[1000];
00709         if (singleMedium == NULL) {
00710           // Output file name is
00711           //     "<filename-prefix><medium_name>-<codec_name>-<counter>"
00712           static unsigned streamCounter = 0;
00713           snprintf(outFileName, sizeof outFileName, "%s%s-%s-%d",
00714                    fileNamePrefix, subsession->mediumName(),
00715                    subsession->codecName(), ++streamCounter);
00716         } else {
00717           sprintf(outFileName, "stdout");
00718         }
00719         FileSink* fileSink;
00720         if (strcmp(subsession->mediumName(), "audio") == 0 &&
00721             (strcmp(subsession->codecName(), "AMR") == 0 ||
00722              strcmp(subsession->codecName(), "AMR-WB") == 0)) {
00723           // For AMR audio streams, we use a special sink that inserts AMR frame hdrs:
00724           fileSink = AMRAudioFileSink::createNew(*env, outFileName,
00725                                                  fileSinkBufferSize, oneFilePerFrame);
00726         } else if (strcmp(subsession->mediumName(), "video") == 0 &&
00727             (strcmp(subsession->codecName(), "H264") == 0)) {
00728           // For H.264 video stream, we use a special sink that insert start_codes:
00729           fileSink = H264VideoFileSink::createNew(*env, outFileName,
00730                                                   subsession->fmtp_spropparametersets(),
00731                                                   fileSinkBufferSize, oneFilePerFrame);
00732         } else {
00733           // Normal case:
00734           fileSink = FileSink::createNew(*env, outFileName,
00735                                          fileSinkBufferSize, oneFilePerFrame);
00736         }
00737         subsession->sink = fileSink;
00738         if (subsession->sink == NULL) {
00739           *env << "Failed to create FileSink for \"" << outFileName
00740                   << "\": " << env->getResultMsg() << "\n";
00741         } else {
00742           if (singleMedium == NULL) {
00743             *env << "Created output file: \"" << outFileName << "\"\n";
00744           } else {
00745             *env << "Outputting data from the \"" << subsession->mediumName()
00746                         << "/" << subsession->codecName()
00747                         << "\" subsession to 'stdout'\n";
00748           }
00749 
00750           if (strcmp(subsession->mediumName(), "video") == 0 &&
00751               strcmp(subsession->codecName(), "MP4V-ES") == 0 &&
00752               subsession->fmtp_config() != NULL) {
00753             // For MPEG-4 video RTP streams, the 'config' information
00754             // from the SDP description contains useful VOL etc. headers.
00755             // Insert this data at the front of the output file:
00756             unsigned configLen;
00757             unsigned char* configData
00758               = parseGeneralConfigStr(subsession->fmtp_config(), configLen);
00759             struct timeval timeNow;
00760             gettimeofday(&timeNow, NULL);
00761             fileSink->addData(configData, configLen, timeNow);
00762             delete[] configData;
00763           }
00764 
00765           subsession->sink->startPlaying(*(subsession->readSource()),
00766                                          subsessionAfterPlaying,
00767                                          subsession);
00768 
00769           // Also set a handler to be called if a RTCP "BYE" arrives
00770           // for this subsession:
00771           if (subsession->rtcpInstance() != NULL) {
00772             subsession->rtcpInstance()->setByeHandler(subsessionByeHandler, subsession);
00773           }
00774 
00775           madeProgress = True;
00776         }
00777       }
00778       if (!madeProgress) shutdown();
00779     }
00780   }
00781 
00782   // Finally, start playing each subsession, to start the data flow:
00783   if (duration == 0) {
00784     if (scale > 0) duration = session->playEndTime() - initialSeekTime; // use SDP end time
00785     else if (scale < 0) duration = initialSeekTime;
00786   }
00787   if (duration < 0) duration = 0.0;
00788 
00789   endTime = initialSeekTime;
00790   if (scale > 0) {
00791     if (duration <= 0) endTime = -1.0f;
00792     else endTime = initialSeekTime + duration;
00793   } else {
00794     endTime = initialSeekTime - duration;
00795     if (endTime < 0) endTime = 0.0f;
00796   }
00797 
00798   startPlayingSession(session, initialSeekTime, endTime, scale, continueAfterPLAY);
00799 }
00800 
00801 void continueAfterPLAY(RTSPClient*, int resultCode, char* resultString) {
00802   if (resultCode != 0) {
00803     *env << "Failed to start playing session: " << resultString << "\n";
00804     shutdown();
00805   } else {
00806     *env << "Started playing session\n";
00807   }
00808 
00809   if (qosMeasurementIntervalMS > 0) {
00810     // Begin periodic QOS measurements:
00811     beginQOSMeasurement();
00812   }
00813 
00814   // Figure out how long to delay (if at all) before shutting down, or
00815   // repeating the playing
00816   Boolean timerIsBeingUsed = False;
00817   double secondsToDelay = duration;
00818   if (duration > 0) {
00819     // First, adjust "duration" based on any change to the play range (that was specified in the "PLAY" response):
00820     double rangeAdjustment = (session->playEndTime() - session->playStartTime()) - (endTime - initialSeekTime);
00821     if (duration + rangeAdjustment > 0.0) duration += rangeAdjustment;
00822 
00823     timerIsBeingUsed = True;
00824     double absScale = scale > 0 ? scale : -scale; // ASSERT: scale != 0
00825     secondsToDelay = duration/absScale + durationSlop;
00826 
00827     int64_t uSecsToDelay = (int64_t)(secondsToDelay*1000000.0);
00828     sessionTimerTask = env->taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)sessionTimerHandler, (void*)NULL);
00829   }
00830 
00831   char const* actionString
00832     = createReceivers? "Receiving streamed data":"Data is being streamed";
00833   if (timerIsBeingUsed) {
00834     *env << actionString
00835                 << " (for up to " << secondsToDelay
00836                 << " seconds)...\n";
00837   } else {
00838 #ifdef USE_SIGNALS
00839     pid_t ourPid = getpid();
00840     *env << actionString
00841                 << " (signal with \"kill -HUP " << (int)ourPid
00842                 << "\" or \"kill -USR1 " << (int)ourPid
00843                 << "\" to terminate)...\n";
00844 #else
00845     *env << actionString << "...\n";
00846 #endif
00847   }
00848 
00849   // Watch for incoming packets (if desired):
00850   checkForPacketArrival(NULL);
00851   checkInterPacketGaps(NULL);
00852 }
00853 
00854 void closeMediaSinks() {
00855   Medium::close(qtOut);
00856   Medium::close(aviOut);
00857 
00858   if (session == NULL) return;
00859   MediaSubsessionIterator iter(*session);
00860   MediaSubsession* subsession;
00861   while ((subsession = iter.next()) != NULL) {
00862     Medium::close(subsession->sink);
00863     subsession->sink = NULL;
00864   }
00865 }
00866 
00867 void subsessionAfterPlaying(void* clientData) {
00868   // Begin by closing this media subsession's stream:
00869   MediaSubsession* subsession = (MediaSubsession*)clientData;
00870   Medium::close(subsession->sink);
00871   subsession->sink = NULL;
00872 
00873   // Next, check whether *all* subsessions' streams have now been closed:
00874   MediaSession& session = subsession->parentSession();
00875   MediaSubsessionIterator iter(session);
00876   while ((subsession = iter.next()) != NULL) {
00877     if (subsession->sink != NULL) return; // this subsession is still active
00878   }
00879 
00880   // All subsessions' streams have now been closed
00881   sessionAfterPlaying();
00882 }
00883 
00884 void subsessionByeHandler(void* clientData) {
00885   struct timeval timeNow;
00886   gettimeofday(&timeNow, NULL);
00887   unsigned secsDiff = timeNow.tv_sec - startTime.tv_sec;
00888 
00889   MediaSubsession* subsession = (MediaSubsession*)clientData;
00890   *env << "Received RTCP \"BYE\" on \"" << subsession->mediumName()
00891         << "/" << subsession->codecName()
00892         << "\" subsession (after " << secsDiff
00893         << " seconds)\n";
00894 
00895   // Act now as if the subsession had closed:
00896   subsessionAfterPlaying(subsession);
00897 }
00898 
00899 void sessionAfterPlaying(void* /*clientData*/) {
00900   if (!playContinuously) {
00901     shutdown(0);
00902   } else {
00903     // We've been asked to play the stream(s) over again.
00904     // First, reset state from the current session:
00905     if (env != NULL) {
00906       env->taskScheduler().unscheduleDelayedTask(sessionTimerTask);
00907       env->taskScheduler().unscheduleDelayedTask(arrivalCheckTimerTask);
00908       env->taskScheduler().unscheduleDelayedTask(interPacketGapCheckTimerTask);
00909       env->taskScheduler().unscheduleDelayedTask(qosMeasurementTimerTask);
00910     }
00911     totNumPacketsReceived = ~0;
00912 
00913     startPlayingSession(session, initialSeekTime, endTime, scale, continueAfterPLAY);
00914   }
00915 }
00916 
00917 void sessionTimerHandler(void* /*clientData*/) {
00918   sessionTimerTask = NULL;
00919 
00920   sessionAfterPlaying();
00921 }
00922 
00923 class qosMeasurementRecord {
00924 public:
00925   qosMeasurementRecord(struct timeval const& startTime, RTPSource* src)
00926     : fSource(src), fNext(NULL),
00927       kbits_per_second_min(1e20), kbits_per_second_max(0),
00928       kBytesTotal(0.0),
00929       packet_loss_fraction_min(1.0), packet_loss_fraction_max(0.0),
00930       totNumPacketsReceived(0), totNumPacketsExpected(0) {
00931     measurementEndTime = measurementStartTime = startTime;
00932 
00933     RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB());
00934     // Assume that there's only one SSRC source (usually the case):
00935     RTPReceptionStats* stats = statsIter.next(True);
00936     if (stats != NULL) {
00937       kBytesTotal = stats->totNumKBytesReceived();
00938       totNumPacketsReceived = stats->totNumPacketsReceived();
00939       totNumPacketsExpected = stats->totNumPacketsExpected();
00940     }
00941   }
00942   virtual ~qosMeasurementRecord() { delete fNext; }
00943 
00944   void periodicQOSMeasurement(struct timeval const& timeNow);
00945 
00946 public:
00947   RTPSource* fSource;
00948   qosMeasurementRecord* fNext;
00949 
00950 public:
00951   struct timeval measurementStartTime, measurementEndTime;
00952   double kbits_per_second_min, kbits_per_second_max;
00953   double kBytesTotal;
00954   double packet_loss_fraction_min, packet_loss_fraction_max;
00955   unsigned totNumPacketsReceived, totNumPacketsExpected;
00956 };
00957 
00958 static qosMeasurementRecord* qosRecordHead = NULL;
00959 
00960 static void periodicQOSMeasurement(void* clientData); // forward
00961 
00962 static unsigned nextQOSMeasurementUSecs;
00963 
00964 static void scheduleNextQOSMeasurement() {
00965   nextQOSMeasurementUSecs += qosMeasurementIntervalMS*1000;
00966   struct timeval timeNow;
00967   gettimeofday(&timeNow, NULL);
00968   unsigned timeNowUSecs = timeNow.tv_sec*1000000 + timeNow.tv_usec;
00969   unsigned usecsToDelay = nextQOSMeasurementUSecs - timeNowUSecs;
00970      // Note: This works even when nextQOSMeasurementUSecs wraps around
00971 
00972   qosMeasurementTimerTask = env->taskScheduler().scheduleDelayedTask(
00973      usecsToDelay, (TaskFunc*)periodicQOSMeasurement, (void*)NULL);
00974 }
00975 
00976 static void periodicQOSMeasurement(void* /*clientData*/) {
00977   struct timeval timeNow;
00978   gettimeofday(&timeNow, NULL);
00979 
00980   for (qosMeasurementRecord* qosRecord = qosRecordHead;
00981        qosRecord != NULL; qosRecord = qosRecord->fNext) {
00982     qosRecord->periodicQOSMeasurement(timeNow);
00983   }
00984 
00985   // Do this again later:
00986   scheduleNextQOSMeasurement();
00987 }
00988 
00989 void qosMeasurementRecord
00990 ::periodicQOSMeasurement(struct timeval const& timeNow) {
00991   unsigned secsDiff = timeNow.tv_sec - measurementEndTime.tv_sec;
00992   int usecsDiff = timeNow.tv_usec - measurementEndTime.tv_usec;
00993   double timeDiff = secsDiff + usecsDiff/1000000.0;
00994   measurementEndTime = timeNow;
00995 
00996   RTPReceptionStatsDB::Iterator statsIter(fSource->receptionStatsDB());
00997   // Assume that there's only one SSRC source (usually the case):
00998   RTPReceptionStats* stats = statsIter.next(True);
00999   if (stats != NULL) {
01000     double kBytesTotalNow = stats->totNumKBytesReceived();
01001     double kBytesDeltaNow = kBytesTotalNow - kBytesTotal;
01002     kBytesTotal = kBytesTotalNow;
01003 
01004     double kbpsNow = timeDiff == 0.0 ? 0.0 : 8*kBytesDeltaNow/timeDiff;
01005     if (kbpsNow < 0.0) kbpsNow = 0.0; // in case of roundoff error
01006     if (kbpsNow < kbits_per_second_min) kbits_per_second_min = kbpsNow;
01007     if (kbpsNow > kbits_per_second_max) kbits_per_second_max = kbpsNow;
01008 
01009     unsigned totReceivedNow = stats->totNumPacketsReceived();
01010     unsigned totExpectedNow = stats->totNumPacketsExpected();
01011     unsigned deltaReceivedNow = totReceivedNow - totNumPacketsReceived;
01012     unsigned deltaExpectedNow = totExpectedNow - totNumPacketsExpected;
01013     totNumPacketsReceived = totReceivedNow;
01014     totNumPacketsExpected = totExpectedNow;
01015 
01016     double lossFractionNow = deltaExpectedNow == 0 ? 0.0
01017       : 1.0 - deltaReceivedNow/(double)deltaExpectedNow;
01018     //if (lossFractionNow < 0.0) lossFractionNow = 0.0; //reordering can cause
01019     if (lossFractionNow < packet_loss_fraction_min) {
01020       packet_loss_fraction_min = lossFractionNow;
01021     }
01022     if (lossFractionNow > packet_loss_fraction_max) {
01023       packet_loss_fraction_max = lossFractionNow;
01024     }
01025   }
01026 }
01027 
01028 void beginQOSMeasurement() {
01029   // Set up a measurement record for each active subsession:
01030   struct timeval startTime;
01031   gettimeofday(&startTime, NULL);
01032   nextQOSMeasurementUSecs = startTime.tv_sec*1000000 + startTime.tv_usec;
01033   qosMeasurementRecord* qosRecordTail = NULL;
01034   MediaSubsessionIterator iter(*session);
01035   MediaSubsession* subsession;
01036   while ((subsession = iter.next()) != NULL) {
01037     RTPSource* src = subsession->rtpSource();
01038     if (src == NULL) continue;
01039 
01040     qosMeasurementRecord* qosRecord
01041       = new qosMeasurementRecord(startTime, src);
01042     if (qosRecordHead == NULL) qosRecordHead = qosRecord;
01043     if (qosRecordTail != NULL) qosRecordTail->fNext = qosRecord;
01044     qosRecordTail  = qosRecord;
01045   }
01046 
01047   // Then schedule the first of the periodic measurements:
01048   scheduleNextQOSMeasurement();
01049 }
01050 
01051 void printQOSData(int exitCode) {
01052   *env << "begin_QOS_statistics\n";
01053   
01054   // Print out stats for each active subsession:
01055   qosMeasurementRecord* curQOSRecord = qosRecordHead;
01056   if (session != NULL) {
01057     MediaSubsessionIterator iter(*session);
01058     MediaSubsession* subsession;
01059     while ((subsession = iter.next()) != NULL) {
01060       RTPSource* src = subsession->rtpSource();
01061       if (src == NULL) continue;
01062       
01063       *env << "subsession\t" << subsession->mediumName()
01064            << "/" << subsession->codecName() << "\n";
01065       
01066       unsigned numPacketsReceived = 0, numPacketsExpected = 0;
01067       
01068       if (curQOSRecord != NULL) {
01069         numPacketsReceived = curQOSRecord->totNumPacketsReceived;
01070         numPacketsExpected = curQOSRecord->totNumPacketsExpected;
01071       }
01072       *env << "num_packets_received\t" << numPacketsReceived << "\n";
01073       *env << "num_packets_lost\t" << int(numPacketsExpected - numPacketsReceived) << "\n";
01074       
01075       if (curQOSRecord != NULL) {
01076         unsigned secsDiff = curQOSRecord->measurementEndTime.tv_sec
01077           - curQOSRecord->measurementStartTime.tv_sec;
01078         int usecsDiff = curQOSRecord->measurementEndTime.tv_usec
01079           - curQOSRecord->measurementStartTime.tv_usec;
01080         double measurementTime = secsDiff + usecsDiff/1000000.0;
01081         *env << "elapsed_measurement_time\t" << measurementTime << "\n";
01082         
01083         *env << "kBytes_received_total\t" << curQOSRecord->kBytesTotal << "\n";
01084         
01085         *env << "measurement_sampling_interval_ms\t" << qosMeasurementIntervalMS << "\n";
01086         
01087         if (curQOSRecord->kbits_per_second_max == 0) {
01088           // special case: we didn't receive any data:
01089           *env <<
01090             "kbits_per_second_min\tunavailable\n"
01091             "kbits_per_second_ave\tunavailable\n"
01092             "kbits_per_second_max\tunavailable\n";
01093         } else {
01094           *env << "kbits_per_second_min\t" << curQOSRecord->kbits_per_second_min << "\n";
01095           *env << "kbits_per_second_ave\t"
01096                << (measurementTime == 0.0 ? 0.0 : 8*curQOSRecord->kBytesTotal/measurementTime) << "\n";
01097           *env << "kbits_per_second_max\t" << curQOSRecord->kbits_per_second_max << "\n";
01098         }
01099         
01100         *env << "packet_loss_percentage_min\t" << 100*curQOSRecord->packet_loss_fraction_min << "\n";
01101         double packetLossFraction = numPacketsExpected == 0 ? 1.0
01102           : 1.0 - numPacketsReceived/(double)numPacketsExpected;
01103         if (packetLossFraction < 0.0) packetLossFraction = 0.0;
01104         *env << "packet_loss_percentage_ave\t" << 100*packetLossFraction << "\n";
01105         *env << "packet_loss_percentage_max\t"
01106              << (packetLossFraction == 1.0 ? 100.0 : 100*curQOSRecord->packet_loss_fraction_max) << "\n";
01107         
01108         RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB());
01109         // Assume that there's only one SSRC source (usually the case):
01110         RTPReceptionStats* stats = statsIter.next(True);
01111         if (stats != NULL) {
01112           *env << "inter_packet_gap_ms_min\t" << stats->minInterPacketGapUS()/1000.0 << "\n";
01113           struct timeval totalGaps = stats->totalInterPacketGaps();
01114           double totalGapsMS = totalGaps.tv_sec*1000.0 + totalGaps.tv_usec/1000.0;
01115           unsigned totNumPacketsReceived = stats->totNumPacketsReceived();
01116           *env << "inter_packet_gap_ms_ave\t"
01117                << (totNumPacketsReceived == 0 ? 0.0 : totalGapsMS/totNumPacketsReceived) << "\n";
01118           *env << "inter_packet_gap_ms_max\t" << stats->maxInterPacketGapUS()/1000.0 << "\n";
01119         }
01120         
01121         curQOSRecord = curQOSRecord->fNext;
01122       }
01123     }
01124   }
01125 
01126   *env << "end_QOS_statistics\n";
01127   delete qosRecordHead;
01128 }
01129 
01130 Boolean areAlreadyShuttingDown = False;
01131 int shutdownExitCode;
01132 void shutdown(int exitCode) {
01133   if (areAlreadyShuttingDown) return; // in case we're called after receiving a RTCP "BYE" while in the middle of a "TEARDOWN".
01134   areAlreadyShuttingDown = True;
01135 
01136   shutdownExitCode = exitCode;
01137   if (env != NULL) {
01138     env->taskScheduler().unscheduleDelayedTask(sessionTimerTask);
01139     env->taskScheduler().unscheduleDelayedTask(arrivalCheckTimerTask);
01140     env->taskScheduler().unscheduleDelayedTask(interPacketGapCheckTimerTask);
01141     env->taskScheduler().unscheduleDelayedTask(qosMeasurementTimerTask);
01142   }
01143 
01144   if (qosMeasurementIntervalMS > 0) {
01145     printQOSData(exitCode);
01146   }
01147 
01148   // Teardown, then shutdown, any outstanding RTP/RTCP subsessions
01149   if (session != NULL) {
01150     tearDownSession(session, continueAfterTEARDOWN);
01151   } else {
01152     continueAfterTEARDOWN(NULL, 0, NULL);
01153   }
01154 }
01155 
01156 void continueAfterTEARDOWN(RTSPClient*, int /*resultCode*/, char* /*resultString*/) {
01157   // Now that we've stopped any more incoming data from arriving, close our output files:
01158   closeMediaSinks();
01159   Medium::close(session);
01160 
01161   // Finally, shut down our client:
01162   delete ourAuthenticator;
01163   Medium::close(ourClient);
01164 
01165   // Adios...
01166   exit(shutdownExitCode);
01167 }
01168 
01169 void signalHandlerShutdown(int /*sig*/) {
01170   *env << "Got shutdown signal\n";
01171   shutdown(0);
01172 }
01173 
01174 void checkForPacketArrival(void* /*clientData*/) {
01175   if (!notifyOnPacketArrival) return; // we're not checking
01176 
01177   // Check each subsession, to see whether it has received data packets:
01178   unsigned numSubsessionsChecked = 0;
01179   unsigned numSubsessionsWithReceivedData = 0;
01180   unsigned numSubsessionsThatHaveBeenSynced = 0;
01181 
01182   MediaSubsessionIterator iter(*session);
01183   MediaSubsession* subsession;
01184   while ((subsession = iter.next()) != NULL) {
01185     RTPSource* src = subsession->rtpSource();
01186     if (src == NULL) continue;
01187     ++numSubsessionsChecked;
01188 
01189     if (src->receptionStatsDB().numActiveSourcesSinceLastReset() > 0) {
01190       // At least one data packet has arrived
01191       ++numSubsessionsWithReceivedData;
01192     }
01193     if (src->hasBeenSynchronizedUsingRTCP()) {
01194       ++numSubsessionsThatHaveBeenSynced;
01195     }
01196   }
01197 
01198   unsigned numSubsessionsToCheck = numSubsessionsChecked;
01199   // Special case for "QuickTimeFileSink"s and "AVIFileSink"s:
01200   // They might not use all of the input sources:
01201   if (qtOut != NULL) {
01202     numSubsessionsToCheck = qtOut->numActiveSubsessions();
01203   } else if (aviOut != NULL) {
01204     numSubsessionsToCheck = aviOut->numActiveSubsessions();
01205   }
01206 
01207   Boolean notifyTheUser;
01208   if (!syncStreams) {
01209     notifyTheUser = numSubsessionsWithReceivedData > 0; // easy case
01210   } else {
01211     notifyTheUser = numSubsessionsWithReceivedData >= numSubsessionsToCheck
01212       && numSubsessionsThatHaveBeenSynced == numSubsessionsChecked;
01213     // Note: A subsession with no active sources is considered to be synced
01214   }
01215   if (notifyTheUser) {
01216     struct timeval timeNow;
01217     gettimeofday(&timeNow, NULL);
01218         char timestampStr[100];
01219         sprintf(timestampStr, "%ld%03ld", timeNow.tv_sec, (long)(timeNow.tv_usec/1000));
01220     *env << (syncStreams ? "Synchronized d" : "D")
01221                 << "ata packets have begun arriving [" << timestampStr << "]\007\n";
01222     return;
01223   }
01224 
01225   // No luck, so reschedule this check again, after a delay:
01226   int uSecsToDelay = 100000; // 100 ms
01227   arrivalCheckTimerTask
01228     = env->taskScheduler().scheduleDelayedTask(uSecsToDelay,
01229                                (TaskFunc*)checkForPacketArrival, NULL);
01230 }
01231 
01232 void checkInterPacketGaps(void* /*clientData*/) {
01233   if (interPacketGapMaxTime == 0) return; // we're not checking
01234 
01235   // Check each subsession, counting up how many packets have been received:
01236   unsigned newTotNumPacketsReceived = 0;
01237 
01238   MediaSubsessionIterator iter(*session);
01239   MediaSubsession* subsession;
01240   while ((subsession = iter.next()) != NULL) {
01241     RTPSource* src = subsession->rtpSource();
01242     if (src == NULL) continue;
01243     newTotNumPacketsReceived += src->receptionStatsDB().totNumPacketsReceived();
01244   }
01245 
01246   if (newTotNumPacketsReceived == totNumPacketsReceived) {
01247     // No additional packets have been received since the last time we
01248     // checked, so end this stream:
01249     *env << "Closing session, because we stopped receiving packets.\n";
01250     interPacketGapCheckTimerTask = NULL;
01251     sessionAfterPlaying();
01252   } else {
01253     totNumPacketsReceived = newTotNumPacketsReceived;
01254     // Check again, after the specified delay:
01255     interPacketGapCheckTimerTask
01256       = env->taskScheduler().scheduleDelayedTask(interPacketGapMaxTime*1000000,
01257                                  (TaskFunc*)checkInterPacketGaps, NULL);
01258   }
01259 }

Generated on Thu Feb 2 23:51:31 2012 for live by  doxygen 1.5.2