testProgs/playCommon.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // Copyright (c) 1996-2012, Live Networks, Inc.  All rights reserved
00017 // A common framework, used for the "openRTSP" and "playSIP" applications
00018 // Implementation
00019 //
00020 // NOTE: If you want to develop your own RTSP client application (or embed RTSP client functionality into your own application),
00021 // then we don't recommend using this code as a model, because it is too complex (with many options).
00022 // Instead, we recommend using the "testRTSPClient" application code as a model.
00023 
00024 #include "playCommon.hh"
00025 #include "BasicUsageEnvironment.hh"
00026 #include "GroupsockHelper.hh"
00027 
00028 #if defined(__WIN32__) || defined(_WIN32)
00029 #define snprintf _snprintf
00030 #else
00031 #include <signal.h>
00032 #define USE_SIGNALS 1
00033 #endif
00034 
00035 // Forward function definitions:
00036 void continueAfterOPTIONS(RTSPClient* client, int resultCode, char* resultString);
00037 void continueAfterDESCRIBE(RTSPClient* client, int resultCode, char* resultString);
00038 void continueAfterSETUP(RTSPClient* client, int resultCode, char* resultString);
00039 void continueAfterPLAY(RTSPClient* client, int resultCode, char* resultString);
00040 void continueAfterTEARDOWN(RTSPClient* client, int resultCode, char* resultString);
00041 
00042 void setupStreams();
00043 void closeMediaSinks();
00044 void subsessionAfterPlaying(void* clientData);
00045 void subsessionByeHandler(void* clientData);
00046 void sessionAfterPlaying(void* clientData = NULL);
00047 void sessionTimerHandler(void* clientData);
00048 void shutdown(int exitCode = 1);
00049 void signalHandlerShutdown(int sig);
00050 void checkForPacketArrival(void* clientData);
00051 void checkInterPacketGaps(void* clientData);
00052 void beginQOSMeasurement();
00053 
00054 char const* progName;
00055 UsageEnvironment* env;
00056 Medium* ourClient = NULL;
00057 Authenticator* ourAuthenticator = NULL;
00058 char const* streamURL = NULL;
00059 MediaSession* session = NULL;
00060 TaskToken sessionTimerTask = NULL;
00061 TaskToken arrivalCheckTimerTask = NULL;
00062 TaskToken interPacketGapCheckTimerTask = NULL;
00063 TaskToken qosMeasurementTimerTask = NULL;
00064 Boolean createReceivers = True;
00065 Boolean outputQuickTimeFile = False;
00066 Boolean generateMP4Format = False;
00067 QuickTimeFileSink* qtOut = NULL;
00068 Boolean outputAVIFile = False;
00069 AVIFileSink* aviOut = NULL;
00070 Boolean audioOnly = False;
00071 Boolean videoOnly = False;
00072 char const* singleMedium = NULL;
00073 int verbosityLevel = 1; // by default, print verbose output
00074 double duration = 0;
00075 double durationSlop = -1.0; // extra seconds to play at the end
00076 double initialSeekTime = 0.0f;
00077 float scale = 1.0f;
00078 double endTime;
00079 unsigned interPacketGapMaxTime = 0;
00080 unsigned totNumPacketsReceived = ~0; // used if checking inter-packet gaps
00081 Boolean playContinuously = False;
00082 int simpleRTPoffsetArg = -1;
00083 Boolean sendOptionsRequest = True;
00084 Boolean sendOptionsRequestOnly = False;
00085 Boolean oneFilePerFrame = False;
00086 Boolean notifyOnPacketArrival = False;
00087 Boolean streamUsingTCP = False;
00088 unsigned short desiredPortNum = 0;
00089 portNumBits tunnelOverHTTPPortNum = 0;
00090 char* username = NULL;
00091 char* password = NULL;
00092 char* proxyServerName = NULL;
00093 unsigned short proxyServerPortNum = 0;
00094 unsigned char desiredAudioRTPPayloadFormat = 0;
00095 char* mimeSubtype = NULL;
00096 unsigned short movieWidth = 240; // default
00097 Boolean movieWidthOptionSet = False;
00098 unsigned short movieHeight = 180; // default
00099 Boolean movieHeightOptionSet = False;
00100 unsigned movieFPS = 15; // default
00101 Boolean movieFPSOptionSet = False;
00102 char const* fileNamePrefix = "";
00103 unsigned fileSinkBufferSize = 100000;
00104 unsigned socketInputBufferSize = 0;
00105 Boolean packetLossCompensate = False;
00106 Boolean syncStreams = False;
00107 Boolean generateHintTracks = False;
00108 unsigned qosMeasurementIntervalMS = 0; // 0 means: Don't output QOS data
00109 
00110 struct timeval startTime;
00111 
00112 void usage() {
00113   *env << "Usage: " << progName
00114        << " [-p <startPortNum>] [-r|-q|-4|-i] [-a|-v] [-V] [-d <duration>] [-D <max-inter-packet-gap-time> [-c] [-S <offset>] [-n] [-O]"
00115            << (controlConnectionUsesTCP ? " [-t|-T <http-port>]" : "")
00116        << " [-u <username> <password>"
00117            << (allowProxyServers ? " [<proxy-server> [<proxy-server-port>]]" : "")
00118        << "]" << (supportCodecSelection ? " [-A <audio-codec-rtp-payload-format-code>|-M <mime-subtype-name>]" : "")
00119        << " [-s <initial-seek-time>] [-z <scale>]"
00120        << " [-w <width> -h <height>] [-f <frames-per-second>] [-y] [-H] [-Q [<measurement-interval>]] [-F <filename-prefix>] [-b <file-sink-buffer-size>] [-B <input-socket-buffer-size>] [-I <input-interface-ip-address>] [-m] <url> (or " << progName << " -o [-V] <url>)\n";
00121   shutdown();
00122 }
00123 
00124 int main(int argc, char** argv) {
00125   // Begin by setting up our usage environment:
00126   TaskScheduler* scheduler = BasicTaskScheduler::createNew();
00127   env = BasicUsageEnvironment::createNew(*scheduler);
00128 
00129   progName = argv[0];
00130 
00131   gettimeofday(&startTime, NULL);
00132 
00133 #ifdef USE_SIGNALS
00134   // Allow ourselves to be shut down gracefully by a SIGHUP or a SIGUSR1:
00135   signal(SIGHUP, signalHandlerShutdown);
00136   signal(SIGUSR1, signalHandlerShutdown);
00137 #endif
00138 
00139   // unfortunately we can't use getopt() here, as Windoze doesn't have it
00140   while (argc > 2) {
00141     char* const opt = argv[1];
00142     if (opt[0] != '-') usage();
00143     switch (opt[1]) {
00144 
00145     case 'p': { // specify start port number
00146       int portArg;
00147       if (sscanf(argv[2], "%d", &portArg) != 1) {
00148         usage();
00149       }
00150       if (portArg <= 0 || portArg >= 65536 || portArg&1) {
00151         *env << "bad port number: " << portArg
00152                 << " (must be even, and in the range (0,65536))\n";
00153         usage();
00154       }
00155       desiredPortNum = (unsigned short)portArg;
00156       ++argv; --argc;
00157       break;
00158     }
00159 
00160     case 'r': { // do not receive data (instead, just 'play' the stream(s))
00161       createReceivers = False;
00162       break;
00163     }
00164 
00165     case 'q': { // output a QuickTime file (to stdout)
00166       outputQuickTimeFile = True;
00167       break;
00168     }
00169 
00170     case '4': { // output a 'mp4'-format file (to stdout)
00171       outputQuickTimeFile = True;
00172       generateMP4Format = True;
00173       break;
00174     }
00175 
00176     case 'i': { // output an AVI file (to stdout)
00177       outputAVIFile = True;
00178       break;
00179     }
00180 
00181     case 'I': { // specify input interface...
00182       NetAddressList addresses(argv[2]);
00183       if (addresses.numAddresses() == 0) {
00184         *env << "Failed to find network address for \"" << argv[2] << "\"";
00185         break;
00186       }
00187       ReceivingInterfaceAddr = *(unsigned*)(addresses.firstAddress()->data());
00188       ++argv; --argc;
00189       break;
00190     }
00191 
00192     case 'a': { // receive/record an audio stream only
00193       audioOnly = True;
00194       singleMedium = "audio";
00195       break;
00196     }
00197 
00198     case 'v': { // receive/record a video stream only
00199       videoOnly = True;
00200       singleMedium = "video";
00201       break;
00202     }
00203 
00204     case 'V': { // disable verbose output
00205       verbosityLevel = 0;
00206       break;
00207     }
00208 
00209     case 'd': { // specify duration, or how much to delay after end time
00210       float arg;
00211       if (sscanf(argv[2], "%g", &arg) != 1) {
00212         usage();
00213       }
00214       if (argv[2][0] == '-') { // not "arg<0", in case argv[2] was "-0"
00215         // a 'negative' argument was specified; use this for "durationSlop":
00216         duration = 0; // use whatever's in the SDP
00217         durationSlop = -arg;
00218       } else {
00219         duration = arg;
00220         durationSlop = 0;
00221       }
00222       ++argv; --argc;
00223       break;
00224     }
00225 
00226     case 'D': { // specify maximum number of seconds to wait for packets:
00227       if (sscanf(argv[2], "%u", &interPacketGapMaxTime) != 1) {
00228         usage();
00229       }
00230       ++argv; --argc;
00231       break;
00232     }
00233 
00234     case 'c': { // play continuously
00235       playContinuously = True;
00236       break;
00237     }
00238 
00239     case 'S': { // specify an offset to use with "SimpleRTPSource"s
00240       if (sscanf(argv[2], "%d", &simpleRTPoffsetArg) != 1) {
00241         usage();
00242       }
00243       if (simpleRTPoffsetArg < 0) {
00244         *env << "offset argument to \"-S\" must be >= 0\n";
00245         usage();
00246       }
00247       ++argv; --argc;
00248       break;
00249     }
00250 
00251     case 'O': { // Don't send an "OPTIONS" request before "DESCRIBE"
00252       sendOptionsRequest = False;
00253       break;
00254     }
00255 
00256     case 'o': { // Send only the "OPTIONS" request to the server
00257       sendOptionsRequestOnly = True;
00258       break;
00259     }
00260 
00261     case 'm': { // output multiple files - one for each frame
00262       oneFilePerFrame = True;
00263       break;
00264     }
00265 
00266     case 'n': { // notify the user when the first data packet arrives
00267       notifyOnPacketArrival = True;
00268       break;
00269     }
00270 
00271     case 't': {
00272       // stream RTP and RTCP over the TCP 'control' connection
00273       if (controlConnectionUsesTCP) {
00274         streamUsingTCP = True;
00275       } else {
00276         usage();
00277       }
00278       break;
00279     }
00280 
00281     case 'T': {
00282       // stream RTP and RTCP over a HTTP connection
00283       if (controlConnectionUsesTCP) {
00284         if (argc > 3 && argv[2][0] != '-') {
00285           // The next argument is the HTTP server port number:
00286           if (sscanf(argv[2], "%hu", &tunnelOverHTTPPortNum) == 1
00287               && tunnelOverHTTPPortNum > 0) {
00288             ++argv; --argc;
00289             break;
00290           }
00291         }
00292       }
00293 
00294       // If we get here, the option was specified incorrectly:
00295       usage();
00296       break;
00297     }
00298 
00299     case 'u': { // specify a username and password
00300       if (argc < 4) usage(); // there's no argv[3] (for the "password")
00301       username = argv[2];
00302       password = argv[3];
00303       argv+=2; argc-=2;
00304       if (allowProxyServers && argc > 3 && argv[2][0] != '-') {
00305         // The next argument is the name of a proxy server:
00306         proxyServerName = argv[2];
00307         ++argv; --argc;
00308 
00309         if (argc > 3 && argv[2][0] != '-') {
00310           // The next argument is the proxy server port number:
00311           if (sscanf(argv[2], "%hu", &proxyServerPortNum) != 1) {
00312             usage();
00313           }
00314           ++argv; --argc;
00315         }
00316       }
00317 
00318       ourAuthenticator = new Authenticator(username, password);
00319       break;
00320     }
00321 
00322     case 'A': { // specify a desired audio RTP payload format
00323       unsigned formatArg;
00324       if (sscanf(argv[2], "%u", &formatArg) != 1
00325           || formatArg >= 96) {
00326         usage();
00327       }
00328       desiredAudioRTPPayloadFormat = (unsigned char)formatArg;
00329       ++argv; --argc;
00330       break;
00331     }
00332 
00333     case 'M': { // specify a MIME subtype for a dynamic RTP payload type
00334       mimeSubtype = argv[2];
00335       if (desiredAudioRTPPayloadFormat==0) desiredAudioRTPPayloadFormat =96;
00336       ++argv; --argc;
00337       break;
00338     }
00339 
00340     case 'w': { // specify a width (pixels) for an output QuickTime or AVI movie
00341       if (sscanf(argv[2], "%hu", &movieWidth) != 1) {
00342         usage();
00343       }
00344       movieWidthOptionSet = True;
00345       ++argv; --argc;
00346       break;
00347     }
00348 
00349     case 'h': { // specify a height (pixels) for an output QuickTime or AVI movie
00350       if (sscanf(argv[2], "%hu", &movieHeight) != 1) {
00351         usage();
00352       }
00353       movieHeightOptionSet = True;
00354       ++argv; --argc;
00355       break;
00356     }
00357 
00358     case 'f': { // specify a frame rate (per second) for an output QT or AVI movie
00359       if (sscanf(argv[2], "%u", &movieFPS) != 1) {
00360         usage();
00361       }
00362       movieFPSOptionSet = True;
00363       ++argv; --argc;
00364       break;
00365     }
00366 
00367     case 'F': { // specify a prefix for the audio and video output files
00368       fileNamePrefix = argv[2];
00369       ++argv; --argc;
00370       break;
00371     }
00372 
00373     case 'b': { // specify the size of buffers for "FileSink"s
00374       if (sscanf(argv[2], "%u", &fileSinkBufferSize) != 1) {
00375         usage();
00376       }
00377       ++argv; --argc;
00378       break;
00379     }
00380 
00381     case 'B': { // specify the size of input socket buffers
00382       if (sscanf(argv[2], "%u", &socketInputBufferSize) != 1) {
00383         usage();
00384       }
00385       ++argv; --argc;
00386       break;
00387     }
00388 
00389     // Note: The following option is deprecated, and may someday be removed:
00390     case 'l': { // try to compensate for packet loss by repeating frames
00391       packetLossCompensate = True;
00392       break;
00393     }
00394 
00395     case 'y': { // synchronize audio and video streams
00396       syncStreams = True;
00397       break;
00398     }
00399 
00400     case 'H': { // generate hint tracks (as well as the regular data tracks)
00401       generateHintTracks = True;
00402       break;
00403     }
00404 
00405     case 'Q': { // output QOS measurements
00406       qosMeasurementIntervalMS = 1000; // default: 1 second
00407 
00408       if (argc > 3 && argv[2][0] != '-') {
00409         // The next argument is the measurement interval,
00410         // in multiples of 100 ms
00411         if (sscanf(argv[2], "%u", &qosMeasurementIntervalMS) != 1) {
00412           usage();
00413         }
00414         qosMeasurementIntervalMS *= 100;
00415         ++argv; --argc;
00416       }
00417       break;
00418     }
00419 
00420     case 's': { // specify initial seek time (trick play)
00421       double arg;
00422       if (sscanf(argv[2], "%lg", &arg) != 1 || arg < 0) {
00423         usage();
00424       }
00425       initialSeekTime = arg;
00426       ++argv; --argc;
00427       break;
00428     }
00429 
00430     case 'z': { // scale (trick play)
00431       float arg;
00432       if (sscanf(argv[2], "%g", &arg) != 1 || arg == 0.0f) {
00433         usage();
00434       }
00435       scale = arg;
00436       ++argv; --argc;
00437       break;
00438     }
00439 
00440     default: {
00441       usage();
00442       break;
00443     }
00444     }
00445 
00446     ++argv; --argc;
00447   }
00448   if (argc != 2) usage(); // there must be exactly one "rtsp://" URL at the end
00449   if (outputQuickTimeFile && outputAVIFile) {
00450     *env << "The -i and -q (or -4) options cannot both be used!\n";
00451     usage();
00452   }
00453   Boolean outputCompositeFile = outputQuickTimeFile || outputAVIFile;
00454   if (!createReceivers && outputCompositeFile) {
00455     *env << "The -r and -q (or -4 or -i) options cannot both be used!\n";
00456     usage();
00457   }
00458   if (outputCompositeFile && !movieWidthOptionSet) {
00459     *env << "Warning: The -q, -4 or -i option was used, but not -w.  Assuming a video width of "
00460          << movieWidth << " pixels\n";
00461   }
00462   if (outputCompositeFile && !movieHeightOptionSet) {
00463     *env << "Warning: The -q, -4 or -i option was used, but not -h.  Assuming a video height of "
00464          << movieHeight << " pixels\n";
00465   }
00466   if (outputCompositeFile && !movieFPSOptionSet) {
00467     *env << "Warning: The -q, -4 or -i option was used, but not -f.  Assuming a video frame rate of "
00468          << movieFPS << " frames-per-second\n";
00469   }
00470   if (audioOnly && videoOnly) {
00471     *env << "The -a and -v options cannot both be used!\n";
00472     usage();
00473   }
00474   if (sendOptionsRequestOnly && !sendOptionsRequest) {
00475     *env << "The -o and -O options cannot both be used!\n";
00476     usage();
00477   }
00478   if (tunnelOverHTTPPortNum > 0) {
00479     if (streamUsingTCP) {
00480       *env << "The -t and -T options cannot both be used!\n";
00481       usage();
00482     } else {
00483       streamUsingTCP = True;
00484     }
00485   }
00486   if (!createReceivers && notifyOnPacketArrival) {
00487     *env << "Warning: Because we're not receiving stream data, the -n flag has no effect\n";
00488   }
00489   if (durationSlop < 0) {
00490     // This parameter wasn't set, so use a default value.
00491     // If we're measuring QOS stats, then don't add any slop, to avoid
00492     // having 'empty' measurement intervals at the end.
00493     durationSlop = qosMeasurementIntervalMS > 0 ? 0.0 : 5.0;
00494   }
00495 
00496   streamURL = argv[1];
00497 
00498   // Create our client object:
00499   ourClient = createClient(*env, streamURL, verbosityLevel, progName);
00500   if (ourClient == NULL) {
00501     *env << "Failed to create " << clientProtocolName
00502                 << " client: " << env->getResultMsg() << "\n";
00503     shutdown();
00504   }
00505 
00506   if (sendOptionsRequest) {
00507     // Begin by sending an "OPTIONS" command:
00508     getOptions(continueAfterOPTIONS);
00509   } else {
00510     continueAfterOPTIONS(NULL, 0, NULL);
00511   }
00512 
00513   // All subsequent activity takes place within the event loop:
00514   env->taskScheduler().doEventLoop(); // does not return
00515 
00516   return 0; // only to prevent compiler warning
00517 }
00518 
00519 void continueAfterOPTIONS(RTSPClient*, int resultCode, char* resultString) {
00520   if (sendOptionsRequestOnly) {
00521     if (resultCode != 0) {
00522       *env << clientProtocolName << " \"OPTIONS\" request failed: " << resultString << "\n";
00523     } else {
00524       *env << clientProtocolName << " \"OPTIONS\" request returned: " << resultString << "\n";
00525     }
00526     shutdown();
00527   }
00528   delete[] resultString;
00529 
00530   // Next, get a SDP description for the stream:
00531   getSDPDescription(continueAfterDESCRIBE);
00532 }
00533 
00534 void continueAfterDESCRIBE(RTSPClient*, int resultCode, char* resultString) {
00535   if (resultCode != 0) {
00536     *env << "Failed to get a SDP description for the URL \"" << streamURL << "\": " << resultString << "\n";
00537     shutdown();
00538   }
00539 
00540   char* sdpDescription = resultString;
00541   *env << "Opened URL \"" << streamURL << "\", returning a SDP description:\n" << sdpDescription << "\n";
00542 
00543   // Create a media session object from this SDP description:
00544   session = MediaSession::createNew(*env, sdpDescription);
00545   delete[] sdpDescription;
00546   if (session == NULL) {
00547     *env << "Failed to create a MediaSession object from the SDP description: " << env->getResultMsg() << "\n";
00548     shutdown();
00549   } else if (!session->hasSubsessions()) {
00550     *env << "This session has no media subsessions (i.e., no \"m=\" lines)\n";
00551     shutdown();
00552   }
00553 
00554   // Then, setup the "RTPSource"s for the session:
00555   MediaSubsessionIterator iter(*session);
00556   MediaSubsession *subsession;
00557   Boolean madeProgress = False;
00558   char const* singleMediumToTest = singleMedium;
00559   while ((subsession = iter.next()) != NULL) {
00560     // If we've asked to receive only a single medium, then check this now:
00561     if (singleMediumToTest != NULL) {
00562       if (strcmp(subsession->mediumName(), singleMediumToTest) != 0) {
00563                   *env << "Ignoring \"" << subsession->mediumName()
00564                           << "/" << subsession->codecName()
00565                           << "\" subsession, because we've asked to receive a single " << singleMedium
00566                           << " session only\n";
00567         continue;
00568       } else {
00569         // Receive this subsession only
00570         singleMediumToTest = "xxxxx";
00571             // this hack ensures that we get only 1 subsession of this type
00572       }
00573     }
00574 
00575     if (desiredPortNum != 0) {
00576       subsession->setClientPortNum(desiredPortNum);
00577       desiredPortNum += 2;
00578     }
00579 
00580     if (createReceivers) {
00581       if (!subsession->initiate(simpleRTPoffsetArg)) {
00582         *env << "Unable to create receiver for \"" << subsession->mediumName()
00583              << "/" << subsession->codecName()
00584              << "\" subsession: " << env->getResultMsg() << "\n";
00585       } else {
00586         *env << "Created receiver for \"" << subsession->mediumName()
00587              << "/" << subsession->codecName()
00588              << "\" subsession (client ports " << subsession->clientPortNum()
00589              << "-" << subsession->clientPortNum()+1 << ")\n";
00590         madeProgress = True;
00591         
00592         if (subsession->rtpSource() != NULL) {
00593           // Because we're saving the incoming data, rather than playing
00594           // it in real time, allow an especially large time threshold
00595           // (1 second) for reordering misordered incoming packets:
00596           unsigned const thresh = 1000000; // 1 second
00597           subsession->rtpSource()->setPacketReorderingThresholdTime(thresh);
00598           
00599           // Set the RTP source's OS socket buffer size as appropriate - either if we were explicitly asked (using -B),
00600           // or if the desired FileSink buffer size happens to be larger than the current OS socket buffer size.
00601           // (The latter case is a heuristic, on the assumption that if the user asked for a large FileSink buffer size,
00602           // then the input data rate may be large enough to justify increasing the OS socket buffer size also.)
00603           int socketNum = subsession->rtpSource()->RTPgs()->socketNum();
00604           unsigned curBufferSize = getReceiveBufferSize(*env, socketNum);
00605           if (socketInputBufferSize > 0 || fileSinkBufferSize > curBufferSize) {
00606             unsigned newBufferSize = socketInputBufferSize > 0 ? socketInputBufferSize : fileSinkBufferSize;
00607             newBufferSize = setReceiveBufferTo(*env, socketNum, newBufferSize);
00608             if (socketInputBufferSize > 0) { // The user explicitly asked for the new socket buffer size; announce it:
00609               *env << "Changed socket receive buffer size for the \""
00610                    << subsession->mediumName()
00611                    << "/" << subsession->codecName()
00612                    << "\" subsession from "
00613                    << curBufferSize << " to "
00614                    << newBufferSize << " bytes\n";
00615             }
00616           }
00617         }
00618       }
00619     } else {
00620       if (subsession->clientPortNum() == 0) {
00621         *env << "No client port was specified for the \""
00622              << subsession->mediumName()
00623              << "/" << subsession->codecName()
00624              << "\" subsession.  (Try adding the \"-p <portNum>\" option.)\n";
00625       } else {
00626                 madeProgress = True;
00627       }
00628     }
00629   }
00630   if (!madeProgress) shutdown();
00631 
00632   // Perform additional 'setup' on each subsession, before playing them:
00633   setupStreams();
00634 }
00635 
00636 MediaSubsession *subsession;
00637 Boolean madeProgress = False;
00638 void continueAfterSETUP(RTSPClient*, int resultCode, char* resultString) {
00639   if (resultCode == 0) {
00640       *env << "Setup \"" << subsession->mediumName()
00641            << "/" << subsession->codecName()
00642            << "\" subsession (client ports " << subsession->clientPortNum()
00643            << "-" << subsession->clientPortNum()+1 << ")\n";
00644       madeProgress = True;
00645   } else {
00646     *env << "Failed to setup \"" << subsession->mediumName()
00647          << "/" << subsession->codecName()
00648          << "\" subsession: " << env->getResultMsg() << "\n";
00649   }
00650 
00651   // Set up the next subsession, if any:
00652   setupStreams();
00653 }
00654 
00655 void setupStreams() {
00656   static MediaSubsessionIterator* setupIter = NULL;
00657   if (setupIter == NULL) setupIter = new MediaSubsessionIterator(*session);
00658   while ((subsession = setupIter->next()) != NULL) {
00659     // We have another subsession left to set up:
00660     if (subsession->clientPortNum() == 0) continue; // port # was not set
00661 
00662     setupSubsession(subsession, streamUsingTCP, continueAfterSETUP);
00663     return;
00664   }
00665 
00666   // We're done setting up subsessions.
00667   delete setupIter;
00668   if (!madeProgress) shutdown();
00669 
00670   // Create output files:
00671   if (createReceivers) {
00672     if (outputQuickTimeFile) {
00673       // Create a "QuickTimeFileSink", to write to 'stdout':
00674       qtOut = QuickTimeFileSink::createNew(*env, *session, "stdout",
00675                                            fileSinkBufferSize,
00676                                            movieWidth, movieHeight,
00677                                            movieFPS,
00678                                            packetLossCompensate,
00679                                            syncStreams,
00680                                            generateHintTracks,
00681                                            generateMP4Format);
00682       if (qtOut == NULL) {
00683         *env << "Failed to create QuickTime file sink for stdout: " << env->getResultMsg();
00684         shutdown();
00685       }
00686 
00687       qtOut->startPlaying(sessionAfterPlaying, NULL);
00688     } else if (outputAVIFile) {
00689       // Create an "AVIFileSink", to write to 'stdout':
00690       aviOut = AVIFileSink::createNew(*env, *session, "stdout",
00691                                       fileSinkBufferSize,
00692                                       movieWidth, movieHeight,
00693                                       movieFPS,
00694                                       packetLossCompensate);
00695       if (aviOut == NULL) {
00696         *env << "Failed to create AVI file sink for stdout: " << env->getResultMsg();
00697         shutdown();
00698       }
00699 
00700       aviOut->startPlaying(sessionAfterPlaying, NULL);
00701     } else {
00702       // Create and start "FileSink"s for each subsession:
00703       madeProgress = False;
00704       MediaSubsessionIterator iter(*session);
00705       while ((subsession = iter.next()) != NULL) {
00706         if (subsession->readSource() == NULL) continue; // was not initiated
00707 
00708         // Create an output file for each desired stream:
00709         char outFileName[1000];
00710         if (singleMedium == NULL) {
00711           // Output file name is
00712           //     "<filename-prefix><medium_name>-<codec_name>-<counter>"
00713           static unsigned streamCounter = 0;
00714           snprintf(outFileName, sizeof outFileName, "%s%s-%s-%d",
00715                    fileNamePrefix, subsession->mediumName(),
00716                    subsession->codecName(), ++streamCounter);
00717         } else {
00718           sprintf(outFileName, "stdout");
00719         }
00720         FileSink* fileSink;
00721         if (strcmp(subsession->mediumName(), "audio") == 0 &&
00722             (strcmp(subsession->codecName(), "AMR") == 0 ||
00723              strcmp(subsession->codecName(), "AMR-WB") == 0)) {
00724           // For AMR audio streams, we use a special sink that inserts AMR frame hdrs:
00725           fileSink = AMRAudioFileSink::createNew(*env, outFileName,
00726                                                  fileSinkBufferSize, oneFilePerFrame);
00727         } else if (strcmp(subsession->mediumName(), "video") == 0 &&
00728             (strcmp(subsession->codecName(), "H264") == 0)) {
00729           // For H.264 video stream, we use a special sink that insert start_codes:
00730           fileSink = H264VideoFileSink::createNew(*env, outFileName,
00731                                                   subsession->fmtp_spropparametersets(),
00732                                                   fileSinkBufferSize, oneFilePerFrame);
00733         } else {
00734           // Normal case:
00735           fileSink = FileSink::createNew(*env, outFileName,
00736                                          fileSinkBufferSize, oneFilePerFrame);
00737         }
00738         subsession->sink = fileSink;
00739         if (subsession->sink == NULL) {
00740           *env << "Failed to create FileSink for \"" << outFileName
00741                   << "\": " << env->getResultMsg() << "\n";
00742         } else {
00743           if (singleMedium == NULL) {
00744             *env << "Created output file: \"" << outFileName << "\"\n";
00745           } else {
00746             *env << "Outputting data from the \"" << subsession->mediumName()
00747                         << "/" << subsession->codecName()
00748                         << "\" subsession to 'stdout'\n";
00749           }
00750 
00751           if (strcmp(subsession->mediumName(), "video") == 0 &&
00752               strcmp(subsession->codecName(), "MP4V-ES") == 0 &&
00753               subsession->fmtp_config() != NULL) {
00754             // For MPEG-4 video RTP streams, the 'config' information
00755             // from the SDP description contains useful VOL etc. headers.
00756             // Insert this data at the front of the output file:
00757             unsigned configLen;
00758             unsigned char* configData
00759               = parseGeneralConfigStr(subsession->fmtp_config(), configLen);
00760             struct timeval timeNow;
00761             gettimeofday(&timeNow, NULL);
00762             fileSink->addData(configData, configLen, timeNow);
00763             delete[] configData;
00764           }
00765 
00766           subsession->sink->startPlaying(*(subsession->readSource()),
00767                                          subsessionAfterPlaying,
00768                                          subsession);
00769 
00770           // Also set a handler to be called if a RTCP "BYE" arrives
00771           // for this subsession:
00772           if (subsession->rtcpInstance() != NULL) {
00773             subsession->rtcpInstance()->setByeHandler(subsessionByeHandler, subsession);
00774           }
00775 
00776           madeProgress = True;
00777         }
00778       }
00779       if (!madeProgress) shutdown();
00780     }
00781   }
00782 
00783   // Finally, start playing each subsession, to start the data flow:
00784   if (duration == 0) {
00785     if (scale > 0) duration = session->playEndTime() - initialSeekTime; // use SDP end time
00786     else if (scale < 0) duration = initialSeekTime;
00787   }
00788   if (duration < 0) duration = 0.0;
00789 
00790   endTime = initialSeekTime;
00791   if (scale > 0) {
00792     if (duration <= 0) endTime = -1.0f;
00793     else endTime = initialSeekTime + duration;
00794   } else {
00795     endTime = initialSeekTime - duration;
00796     if (endTime < 0) endTime = 0.0f;
00797   }
00798 
00799   startPlayingSession(session, initialSeekTime, endTime, scale, continueAfterPLAY);
00800 }
00801 
00802 void continueAfterPLAY(RTSPClient*, int resultCode, char* resultString) {
00803   if (resultCode != 0) {
00804     *env << "Failed to start playing session: " << resultString << "\n";
00805     shutdown();
00806   } else {
00807     *env << "Started playing session\n";
00808   }
00809 
00810   if (qosMeasurementIntervalMS > 0) {
00811     // Begin periodic QOS measurements:
00812     beginQOSMeasurement();
00813   }
00814 
00815   // Figure out how long to delay (if at all) before shutting down, or
00816   // repeating the playing
00817   Boolean timerIsBeingUsed = False;
00818   double secondsToDelay = duration;
00819   if (duration > 0) {
00820     // First, adjust "duration" based on any change to the play range (that was specified in the "PLAY" response):
00821     double rangeAdjustment = (session->playEndTime() - session->playStartTime()) - (endTime - initialSeekTime);
00822     if (duration + rangeAdjustment > 0.0) duration += rangeAdjustment;
00823 
00824     timerIsBeingUsed = True;
00825     double absScale = scale > 0 ? scale : -scale; // ASSERT: scale != 0
00826     secondsToDelay = duration/absScale + durationSlop;
00827 
00828     int64_t uSecsToDelay = (int64_t)(secondsToDelay*1000000.0);
00829     sessionTimerTask = env->taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)sessionTimerHandler, (void*)NULL);
00830   }
00831 
00832   char const* actionString
00833     = createReceivers? "Receiving streamed data":"Data is being streamed";
00834   if (timerIsBeingUsed) {
00835     *env << actionString
00836                 << " (for up to " << secondsToDelay
00837                 << " seconds)...\n";
00838   } else {
00839 #ifdef USE_SIGNALS
00840     pid_t ourPid = getpid();
00841     *env << actionString
00842                 << " (signal with \"kill -HUP " << (int)ourPid
00843                 << "\" or \"kill -USR1 " << (int)ourPid
00844                 << "\" to terminate)...\n";
00845 #else
00846     *env << actionString << "...\n";
00847 #endif
00848   }
00849 
00850   // Watch for incoming packets (if desired):
00851   checkForPacketArrival(NULL);
00852   checkInterPacketGaps(NULL);
00853 }
00854 
00855 void closeMediaSinks() {
00856   Medium::close(qtOut);
00857   Medium::close(aviOut);
00858 
00859   if (session == NULL) return;
00860   MediaSubsessionIterator iter(*session);
00861   MediaSubsession* subsession;
00862   while ((subsession = iter.next()) != NULL) {
00863     Medium::close(subsession->sink);
00864     subsession->sink = NULL;
00865   }
00866 }
00867 
00868 void subsessionAfterPlaying(void* clientData) {
00869   // Begin by closing this media subsession's stream:
00870   MediaSubsession* subsession = (MediaSubsession*)clientData;
00871   Medium::close(subsession->sink);
00872   subsession->sink = NULL;
00873 
00874   // Next, check whether *all* subsessions' streams have now been closed:
00875   MediaSession& session = subsession->parentSession();
00876   MediaSubsessionIterator iter(session);
00877   while ((subsession = iter.next()) != NULL) {
00878     if (subsession->sink != NULL) return; // this subsession is still active
00879   }
00880 
00881   // All subsessions' streams have now been closed
00882   sessionAfterPlaying();
00883 }
00884 
00885 void subsessionByeHandler(void* clientData) {
00886   struct timeval timeNow;
00887   gettimeofday(&timeNow, NULL);
00888   unsigned secsDiff = timeNow.tv_sec - startTime.tv_sec;
00889 
00890   MediaSubsession* subsession = (MediaSubsession*)clientData;
00891   *env << "Received RTCP \"BYE\" on \"" << subsession->mediumName()
00892         << "/" << subsession->codecName()
00893         << "\" subsession (after " << secsDiff
00894         << " seconds)\n";
00895 
00896   // Act now as if the subsession had closed:
00897   subsessionAfterPlaying(subsession);
00898 }
00899 
00900 void sessionAfterPlaying(void* /*clientData*/) {
00901   if (!playContinuously) {
00902     shutdown(0);
00903   } else {
00904     // We've been asked to play the stream(s) over again.
00905     // First, reset state from the current session:
00906     if (env != NULL) {
00907       env->taskScheduler().unscheduleDelayedTask(sessionTimerTask);
00908       env->taskScheduler().unscheduleDelayedTask(arrivalCheckTimerTask);
00909       env->taskScheduler().unscheduleDelayedTask(interPacketGapCheckTimerTask);
00910       env->taskScheduler().unscheduleDelayedTask(qosMeasurementTimerTask);
00911     }
00912     totNumPacketsReceived = ~0;
00913 
00914     startPlayingSession(session, initialSeekTime, endTime, scale, continueAfterPLAY);
00915   }
00916 }
00917 
00918 void sessionTimerHandler(void* /*clientData*/) {
00919   sessionTimerTask = NULL;
00920 
00921   sessionAfterPlaying();
00922 }
00923 
00924 class qosMeasurementRecord {
00925 public:
00926   qosMeasurementRecord(struct timeval const& startTime, RTPSource* src)
00927     : fSource(src), fNext(NULL),
00928       kbits_per_second_min(1e20), kbits_per_second_max(0),
00929       kBytesTotal(0.0),
00930       packet_loss_fraction_min(1.0), packet_loss_fraction_max(0.0),
00931       totNumPacketsReceived(0), totNumPacketsExpected(0) {
00932     measurementEndTime = measurementStartTime = startTime;
00933 
00934     RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB());
00935     // Assume that there's only one SSRC source (usually the case):
00936     RTPReceptionStats* stats = statsIter.next(True);
00937     if (stats != NULL) {
00938       kBytesTotal = stats->totNumKBytesReceived();
00939       totNumPacketsReceived = stats->totNumPacketsReceived();
00940       totNumPacketsExpected = stats->totNumPacketsExpected();
00941     }
00942   }
00943   virtual ~qosMeasurementRecord() { delete fNext; }
00944 
00945   void periodicQOSMeasurement(struct timeval const& timeNow);
00946 
00947 public:
00948   RTPSource* fSource;
00949   qosMeasurementRecord* fNext;
00950 
00951 public:
00952   struct timeval measurementStartTime, measurementEndTime;
00953   double kbits_per_second_min, kbits_per_second_max;
00954   double kBytesTotal;
00955   double packet_loss_fraction_min, packet_loss_fraction_max;
00956   unsigned totNumPacketsReceived, totNumPacketsExpected;
00957 };
00958 
00959 static qosMeasurementRecord* qosRecordHead = NULL;
00960 
00961 static void periodicQOSMeasurement(void* clientData); // forward
00962 
00963 static unsigned nextQOSMeasurementUSecs;
00964 
00965 static void scheduleNextQOSMeasurement() {
00966   nextQOSMeasurementUSecs += qosMeasurementIntervalMS*1000;
00967   struct timeval timeNow;
00968   gettimeofday(&timeNow, NULL);
00969   unsigned timeNowUSecs = timeNow.tv_sec*1000000 + timeNow.tv_usec;
00970   unsigned usecsToDelay = nextQOSMeasurementUSecs - timeNowUSecs;
00971      // Note: This works even when nextQOSMeasurementUSecs wraps around
00972 
00973   qosMeasurementTimerTask = env->taskScheduler().scheduleDelayedTask(
00974      usecsToDelay, (TaskFunc*)periodicQOSMeasurement, (void*)NULL);
00975 }
00976 
00977 static void periodicQOSMeasurement(void* /*clientData*/) {
00978   struct timeval timeNow;
00979   gettimeofday(&timeNow, NULL);
00980 
00981   for (qosMeasurementRecord* qosRecord = qosRecordHead;
00982        qosRecord != NULL; qosRecord = qosRecord->fNext) {
00983     qosRecord->periodicQOSMeasurement(timeNow);
00984   }
00985 
00986   // Do this again later:
00987   scheduleNextQOSMeasurement();
00988 }
00989 
00990 void qosMeasurementRecord
00991 ::periodicQOSMeasurement(struct timeval const& timeNow) {
00992   unsigned secsDiff = timeNow.tv_sec - measurementEndTime.tv_sec;
00993   int usecsDiff = timeNow.tv_usec - measurementEndTime.tv_usec;
00994   double timeDiff = secsDiff + usecsDiff/1000000.0;
00995   measurementEndTime = timeNow;
00996 
00997   RTPReceptionStatsDB::Iterator statsIter(fSource->receptionStatsDB());
00998   // Assume that there's only one SSRC source (usually the case):
00999   RTPReceptionStats* stats = statsIter.next(True);
01000   if (stats != NULL) {
01001     double kBytesTotalNow = stats->totNumKBytesReceived();
01002     double kBytesDeltaNow = kBytesTotalNow - kBytesTotal;
01003     kBytesTotal = kBytesTotalNow;
01004 
01005     double kbpsNow = timeDiff == 0.0 ? 0.0 : 8*kBytesDeltaNow/timeDiff;
01006     if (kbpsNow < 0.0) kbpsNow = 0.0; // in case of roundoff error
01007     if (kbpsNow < kbits_per_second_min) kbits_per_second_min = kbpsNow;
01008     if (kbpsNow > kbits_per_second_max) kbits_per_second_max = kbpsNow;
01009 
01010     unsigned totReceivedNow = stats->totNumPacketsReceived();
01011     unsigned totExpectedNow = stats->totNumPacketsExpected();
01012     unsigned deltaReceivedNow = totReceivedNow - totNumPacketsReceived;
01013     unsigned deltaExpectedNow = totExpectedNow - totNumPacketsExpected;
01014     totNumPacketsReceived = totReceivedNow;
01015     totNumPacketsExpected = totExpectedNow;
01016 
01017     double lossFractionNow = deltaExpectedNow == 0 ? 0.0
01018       : 1.0 - deltaReceivedNow/(double)deltaExpectedNow;
01019     //if (lossFractionNow < 0.0) lossFractionNow = 0.0; //reordering can cause
01020     if (lossFractionNow < packet_loss_fraction_min) {
01021       packet_loss_fraction_min = lossFractionNow;
01022     }
01023     if (lossFractionNow > packet_loss_fraction_max) {
01024       packet_loss_fraction_max = lossFractionNow;
01025     }
01026   }
01027 }
01028 
01029 void beginQOSMeasurement() {
01030   // Set up a measurement record for each active subsession:
01031   struct timeval startTime;
01032   gettimeofday(&startTime, NULL);
01033   nextQOSMeasurementUSecs = startTime.tv_sec*1000000 + startTime.tv_usec;
01034   qosMeasurementRecord* qosRecordTail = NULL;
01035   MediaSubsessionIterator iter(*session);
01036   MediaSubsession* subsession;
01037   while ((subsession = iter.next()) != NULL) {
01038     RTPSource* src = subsession->rtpSource();
01039     if (src == NULL) continue;
01040 
01041     qosMeasurementRecord* qosRecord
01042       = new qosMeasurementRecord(startTime, src);
01043     if (qosRecordHead == NULL) qosRecordHead = qosRecord;
01044     if (qosRecordTail != NULL) qosRecordTail->fNext = qosRecord;
01045     qosRecordTail  = qosRecord;
01046   }
01047 
01048   // Then schedule the first of the periodic measurements:
01049   scheduleNextQOSMeasurement();
01050 }
01051 
01052 void printQOSData(int exitCode) {
01053   *env << "begin_QOS_statistics\n";
01054   
01055   // Print out stats for each active subsession:
01056   qosMeasurementRecord* curQOSRecord = qosRecordHead;
01057   if (session != NULL) {
01058     MediaSubsessionIterator iter(*session);
01059     MediaSubsession* subsession;
01060     while ((subsession = iter.next()) != NULL) {
01061       RTPSource* src = subsession->rtpSource();
01062       if (src == NULL) continue;
01063       
01064       *env << "subsession\t" << subsession->mediumName()
01065            << "/" << subsession->codecName() << "\n";
01066       
01067       unsigned numPacketsReceived = 0, numPacketsExpected = 0;
01068       
01069       if (curQOSRecord != NULL) {
01070         numPacketsReceived = curQOSRecord->totNumPacketsReceived;
01071         numPacketsExpected = curQOSRecord->totNumPacketsExpected;
01072       }
01073       *env << "num_packets_received\t" << numPacketsReceived << "\n";
01074       *env << "num_packets_lost\t" << int(numPacketsExpected - numPacketsReceived) << "\n";
01075       
01076       if (curQOSRecord != NULL) {
01077         unsigned secsDiff = curQOSRecord->measurementEndTime.tv_sec
01078           - curQOSRecord->measurementStartTime.tv_sec;
01079         int usecsDiff = curQOSRecord->measurementEndTime.tv_usec
01080           - curQOSRecord->measurementStartTime.tv_usec;
01081         double measurementTime = secsDiff + usecsDiff/1000000.0;
01082         *env << "elapsed_measurement_time\t" << measurementTime << "\n";
01083         
01084         *env << "kBytes_received_total\t" << curQOSRecord->kBytesTotal << "\n";
01085         
01086         *env << "measurement_sampling_interval_ms\t" << qosMeasurementIntervalMS << "\n";
01087         
01088         if (curQOSRecord->kbits_per_second_max == 0) {
01089           // special case: we didn't receive any data:
01090           *env <<
01091             "kbits_per_second_min\tunavailable\n"
01092             "kbits_per_second_ave\tunavailable\n"
01093             "kbits_per_second_max\tunavailable\n";
01094         } else {
01095           *env << "kbits_per_second_min\t" << curQOSRecord->kbits_per_second_min << "\n";
01096           *env << "kbits_per_second_ave\t"
01097                << (measurementTime == 0.0 ? 0.0 : 8*curQOSRecord->kBytesTotal/measurementTime) << "\n";
01098           *env << "kbits_per_second_max\t" << curQOSRecord->kbits_per_second_max << "\n";
01099         }
01100         
01101         *env << "packet_loss_percentage_min\t" << 100*curQOSRecord->packet_loss_fraction_min << "\n";
01102         double packetLossFraction = numPacketsExpected == 0 ? 1.0
01103           : 1.0 - numPacketsReceived/(double)numPacketsExpected;
01104         if (packetLossFraction < 0.0) packetLossFraction = 0.0;
01105         *env << "packet_loss_percentage_ave\t" << 100*packetLossFraction << "\n";
01106         *env << "packet_loss_percentage_max\t"
01107              << (packetLossFraction == 1.0 ? 100.0 : 100*curQOSRecord->packet_loss_fraction_max) << "\n";
01108         
01109         RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB());
01110         // Assume that there's only one SSRC source (usually the case):
01111         RTPReceptionStats* stats = statsIter.next(True);
01112         if (stats != NULL) {
01113           *env << "inter_packet_gap_ms_min\t" << stats->minInterPacketGapUS()/1000.0 << "\n";
01114           struct timeval totalGaps = stats->totalInterPacketGaps();
01115           double totalGapsMS = totalGaps.tv_sec*1000.0 + totalGaps.tv_usec/1000.0;
01116           unsigned totNumPacketsReceived = stats->totNumPacketsReceived();
01117           *env << "inter_packet_gap_ms_ave\t"
01118                << (totNumPacketsReceived == 0 ? 0.0 : totalGapsMS/totNumPacketsReceived) << "\n";
01119           *env << "inter_packet_gap_ms_max\t" << stats->maxInterPacketGapUS()/1000.0 << "\n";
01120         }
01121         
01122         curQOSRecord = curQOSRecord->fNext;
01123       }
01124     }
01125   }
01126 
01127   *env << "end_QOS_statistics\n";
01128   delete qosRecordHead;
01129 }
01130 
01131 Boolean areAlreadyShuttingDown = False;
01132 int shutdownExitCode;
01133 void shutdown(int exitCode) {
01134   if (areAlreadyShuttingDown) return; // in case we're called after receiving a RTCP "BYE" while in the middle of a "TEARDOWN".
01135   areAlreadyShuttingDown = True;
01136 
01137   shutdownExitCode = exitCode;
01138   if (env != NULL) {
01139     env->taskScheduler().unscheduleDelayedTask(sessionTimerTask);
01140     env->taskScheduler().unscheduleDelayedTask(arrivalCheckTimerTask);
01141     env->taskScheduler().unscheduleDelayedTask(interPacketGapCheckTimerTask);
01142     env->taskScheduler().unscheduleDelayedTask(qosMeasurementTimerTask);
01143   }
01144 
01145   if (qosMeasurementIntervalMS > 0) {
01146     printQOSData(exitCode);
01147   }
01148 
01149   // Teardown, then shutdown, any outstanding RTP/RTCP subsessions
01150   if (session != NULL) {
01151     tearDownSession(session, continueAfterTEARDOWN);
01152   } else {
01153     continueAfterTEARDOWN(NULL, 0, NULL);
01154   }
01155 }
01156 
01157 void continueAfterTEARDOWN(RTSPClient*, int /*resultCode*/, char* /*resultString*/) {
01158   // Now that we've stopped any more incoming data from arriving, close our output files:
01159   closeMediaSinks();
01160   Medium::close(session);
01161 
01162   // Finally, shut down our client:
01163   delete ourAuthenticator;
01164   Medium::close(ourClient);
01165 
01166   // Adios...
01167   exit(shutdownExitCode);
01168 }
01169 
01170 void signalHandlerShutdown(int /*sig*/) {
01171   *env << "Got shutdown signal\n";
01172   shutdown(0);
01173 }
01174 
01175 void checkForPacketArrival(void* /*clientData*/) {
01176   if (!notifyOnPacketArrival) return; // we're not checking
01177 
01178   // Check each subsession, to see whether it has received data packets:
01179   unsigned numSubsessionsChecked = 0;
01180   unsigned numSubsessionsWithReceivedData = 0;
01181   unsigned numSubsessionsThatHaveBeenSynced = 0;
01182 
01183   MediaSubsessionIterator iter(*session);
01184   MediaSubsession* subsession;
01185   while ((subsession = iter.next()) != NULL) {
01186     RTPSource* src = subsession->rtpSource();
01187     if (src == NULL) continue;
01188     ++numSubsessionsChecked;
01189 
01190     if (src->receptionStatsDB().numActiveSourcesSinceLastReset() > 0) {
01191       // At least one data packet has arrived
01192       ++numSubsessionsWithReceivedData;
01193     }
01194     if (src->hasBeenSynchronizedUsingRTCP()) {
01195       ++numSubsessionsThatHaveBeenSynced;
01196     }
01197   }
01198 
01199   unsigned numSubsessionsToCheck = numSubsessionsChecked;
01200   // Special case for "QuickTimeFileSink"s and "AVIFileSink"s:
01201   // They might not use all of the input sources:
01202   if (qtOut != NULL) {
01203     numSubsessionsToCheck = qtOut->numActiveSubsessions();
01204   } else if (aviOut != NULL) {
01205     numSubsessionsToCheck = aviOut->numActiveSubsessions();
01206   }
01207 
01208   Boolean notifyTheUser;
01209   if (!syncStreams) {
01210     notifyTheUser = numSubsessionsWithReceivedData > 0; // easy case
01211   } else {
01212     notifyTheUser = numSubsessionsWithReceivedData >= numSubsessionsToCheck
01213       && numSubsessionsThatHaveBeenSynced == numSubsessionsChecked;
01214     // Note: A subsession with no active sources is considered to be synced
01215   }
01216   if (notifyTheUser) {
01217     struct timeval timeNow;
01218     gettimeofday(&timeNow, NULL);
01219         char timestampStr[100];
01220         sprintf(timestampStr, "%ld%03ld", timeNow.tv_sec, (long)(timeNow.tv_usec/1000));
01221     *env << (syncStreams ? "Synchronized d" : "D")
01222                 << "ata packets have begun arriving [" << timestampStr << "]\007\n";
01223     return;
01224   }
01225 
01226   // No luck, so reschedule this check again, after a delay:
01227   int uSecsToDelay = 100000; // 100 ms
01228   arrivalCheckTimerTask
01229     = env->taskScheduler().scheduleDelayedTask(uSecsToDelay,
01230                                (TaskFunc*)checkForPacketArrival, NULL);
01231 }
01232 
01233 void checkInterPacketGaps(void* /*clientData*/) {
01234   if (interPacketGapMaxTime == 0) return; // we're not checking
01235 
01236   // Check each subsession, counting up how many packets have been received:
01237   unsigned newTotNumPacketsReceived = 0;
01238 
01239   MediaSubsessionIterator iter(*session);
01240   MediaSubsession* subsession;
01241   while ((subsession = iter.next()) != NULL) {
01242     RTPSource* src = subsession->rtpSource();
01243     if (src == NULL) continue;
01244     newTotNumPacketsReceived += src->receptionStatsDB().totNumPacketsReceived();
01245   }
01246 
01247   if (newTotNumPacketsReceived == totNumPacketsReceived) {
01248     // No additional packets have been received since the last time we
01249     // checked, so end this stream:
01250     *env << "Closing session, because we stopped receiving packets.\n";
01251     interPacketGapCheckTimerTask = NULL;
01252     sessionAfterPlaying();
01253   } else {
01254     totNumPacketsReceived = newTotNumPacketsReceived;
01255     // Check again, after the specified delay:
01256     interPacketGapCheckTimerTask
01257       = env->taskScheduler().scheduleDelayedTask(interPacketGapMaxTime*1000000,
01258                                  (TaskFunc*)checkInterPacketGaps, NULL);
01259   }
01260 }

Generated on Thu May 17 07:11:47 2012 for live by  doxygen 1.5.2