testProgs/playCommon.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // Copyright (c) 1996-2014, Live Networks, Inc.  All rights reserved
00017 // A common framework, used for the "openRTSP" and "playSIP" applications
00018 // Implementation
00019 //
00020 // NOTE: If you want to develop your own RTSP client application (or embed RTSP client functionality into your own application),
00021 // then we don't recommend using this code as a model, because it is too complex (with many options).
00022 // Instead, we recommend using the "testRTSPClient" application code as a model.
00023 
00024 #include "playCommon.hh"
00025 #include "BasicUsageEnvironment.hh"
00026 #include "GroupsockHelper.hh"
00027 
00028 #if defined(__WIN32__) || defined(_WIN32)
00029 #define snprintf _snprintf
00030 #else
00031 #include <signal.h>
00032 #define USE_SIGNALS 1
00033 #endif
00034 
00035 // Forward function definitions:
00036 void continueAfterClientCreation0(RTSPClient* client, Boolean requestStreamingOverTCP);
00037 void continueAfterClientCreation1();
00038 void continueAfterOPTIONS(RTSPClient* client, int resultCode, char* resultString);
00039 void continueAfterDESCRIBE(RTSPClient* client, int resultCode, char* resultString);
00040 void continueAfterSETUP(RTSPClient* client, int resultCode, char* resultString);
00041 void continueAfterPLAY(RTSPClient* client, int resultCode, char* resultString);
00042 void continueAfterTEARDOWN(RTSPClient* client, int resultCode, char* resultString);
00043 
00044 void createOutputFiles(char const* periodicFilenameSuffix);
00045 void createPeriodicOutputFiles();
00046 void setupStreams();
00047 void closeMediaSinks();
00048 void subsessionAfterPlaying(void* clientData);
00049 void subsessionByeHandler(void* clientData);
00050 void sessionAfterPlaying(void* clientData = NULL);
00051 void sessionTimerHandler(void* clientData);
00052 void periodicFileOutputTimerHandler(void* clientData);
00053 void shutdown(int exitCode = 1);
00054 void signalHandlerShutdown(int sig);
00055 void checkForPacketArrival(void* clientData);
00056 void checkInterPacketGaps(void* clientData);
00057 void beginQOSMeasurement();
00058 
00059 char const* progName;
00060 UsageEnvironment* env;
00061 Medium* ourClient = NULL;
00062 Authenticator* ourAuthenticator = NULL;
00063 char const* streamURL = NULL;
00064 MediaSession* session = NULL;
00065 TaskToken sessionTimerTask = NULL;
00066 TaskToken arrivalCheckTimerTask = NULL;
00067 TaskToken interPacketGapCheckTimerTask = NULL;
00068 TaskToken qosMeasurementTimerTask = NULL;
00069 TaskToken periodicFileOutputTask = NULL;
00070 Boolean createReceivers = True;
00071 Boolean outputQuickTimeFile = False;
00072 Boolean generateMP4Format = False;
00073 QuickTimeFileSink* qtOut = NULL;
00074 Boolean outputAVIFile = False;
00075 AVIFileSink* aviOut = NULL;
00076 Boolean audioOnly = False;
00077 Boolean videoOnly = False;
00078 char const* singleMedium = NULL;
00079 int verbosityLevel = 1; // by default, print verbose output
00080 double duration = 0;
00081 double durationSlop = -1.0; // extra seconds to play at the end
00082 double initialSeekTime = 0.0f;
00083 char* initialAbsoluteSeekTime = NULL;
00084 float scale = 1.0f;
00085 double endTime;
00086 unsigned interPacketGapMaxTime = 0;
00087 unsigned totNumPacketsReceived = ~0; // used if checking inter-packet gaps
00088 Boolean playContinuously = False;
00089 int simpleRTPoffsetArg = -1;
00090 Boolean sendOptionsRequest = True;
00091 Boolean sendOptionsRequestOnly = False;
00092 Boolean oneFilePerFrame = False;
00093 Boolean notifyOnPacketArrival = False;
00094 Boolean streamUsingTCP = False;
00095 Boolean forceMulticastOnUnspecified = False;
00096 unsigned short desiredPortNum = 0;
00097 portNumBits tunnelOverHTTPPortNum = 0;
00098 char* username = NULL;
00099 char* password = NULL;
00100 char* proxyServerName = NULL;
00101 unsigned short proxyServerPortNum = 0;
00102 unsigned char desiredAudioRTPPayloadFormat = 0;
00103 char* mimeSubtype = NULL;
00104 unsigned short movieWidth = 240; // default
00105 Boolean movieWidthOptionSet = False;
00106 unsigned short movieHeight = 180; // default
00107 Boolean movieHeightOptionSet = False;
00108 unsigned movieFPS = 15; // default
00109 Boolean movieFPSOptionSet = False;
00110 char const* fileNamePrefix = "";
00111 unsigned fileSinkBufferSize = 100000;
00112 unsigned socketInputBufferSize = 0;
00113 Boolean packetLossCompensate = False;
00114 Boolean syncStreams = False;
00115 Boolean generateHintTracks = False;
00116 Boolean waitForResponseToTEARDOWN = True;
00117 unsigned qosMeasurementIntervalMS = 0; // 0 means: Don't output QOS data
00118 char* userAgent = NULL;
00119 unsigned fileOutputInterval = 0; // seconds
00120 unsigned fileOutputSecondsSoFar = 0; // seconds
00121 Boolean createHandlerServerForREGISTERCommand = False;
00122 portNumBits handlerServerForREGISTERCommandPortNum = 0;
00123 HandlerServerForREGISTERCommand* handlerServerForREGISTERCommand;
00124 char* usernameForREGISTER = NULL;
00125 char* passwordForREGISTER = NULL;
00126 UserAuthenticationDatabase* authDBForREGISTER = NULL;
00127 
00128 struct timeval startTime;
00129 
00130 void usage() {
00131   *env << "Usage: " << progName
00132        << " [-p <startPortNum>] [-r|-q|-4|-i] [-a|-v] [-V] [-d <duration>] [-D <max-inter-packet-gap-time> [-c] [-S <offset>] [-n] [-O]"
00133            << (controlConnectionUsesTCP ? " [-t|-T <http-port>]" : "")
00134        << " [-u <username> <password>"
00135            << (allowProxyServers ? " [<proxy-server> [<proxy-server-port>]]" : "")
00136        << "]" << (supportCodecSelection ? " [-A <audio-codec-rtp-payload-format-code>|-M <mime-subtype-name>]" : "")
00137        << " [-s <initial-seek-time>]|[-U <absolute-seek-time>] [-z <scale>] [-g user-agent]"
00138        << " [-k <username-for-REGISTER> <password-for-REGISTER>]"
00139        << " [-P <interval-in-seconds>]"
00140        << " [-w <width> -h <height>] [-f <frames-per-second>] [-y] [-H] [-Q [<measurement-interval>]] [-F <filename-prefix>] [-b <file-sink-buffer-size>] [-B <input-socket-buffer-size>] [-I <input-interface-ip-address>] [-m] [<url>|-R [<port-num>]] (or " << progName << " -o [-V] <url>)\n";
00141   shutdown();
00142 }
00143 
00144 int main(int argc, char** argv) {
00145   // Begin by setting up our usage environment:
00146   TaskScheduler* scheduler = BasicTaskScheduler::createNew();
00147   env = BasicUsageEnvironment::createNew(*scheduler);
00148 
00149   progName = argv[0];
00150 
00151   gettimeofday(&startTime, NULL);
00152 
00153 #ifdef USE_SIGNALS
00154   // Allow ourselves to be shut down gracefully by a SIGHUP or a SIGUSR1:
00155   signal(SIGHUP, signalHandlerShutdown);
00156   signal(SIGUSR1, signalHandlerShutdown);
00157 #endif
00158 
00159   // unfortunately we can't use getopt() here, as Windoze doesn't have it
00160   while (argc > 1) {
00161     char* const opt = argv[1];
00162     if (opt[0] != '-') {
00163       if (argc == 2) break; // only the URL is left
00164       usage();
00165     }
00166 
00167     switch (opt[1]) {
00168     case 'p': { // specify start port number
00169       int portArg;
00170       if (sscanf(argv[2], "%d", &portArg) != 1) {
00171         usage();
00172       }
00173       if (portArg <= 0 || portArg >= 65536 || portArg&1) {
00174         *env << "bad port number: " << portArg
00175                 << " (must be even, and in the range (0,65536))\n";
00176         usage();
00177       }
00178       desiredPortNum = (unsigned short)portArg;
00179       ++argv; --argc;
00180       break;
00181     }
00182 
00183     case 'r': { // do not receive data (instead, just 'play' the stream(s))
00184       createReceivers = False;
00185       break;
00186     }
00187 
00188     case 'q': { // output a QuickTime file (to stdout)
00189       outputQuickTimeFile = True;
00190       break;
00191     }
00192 
00193     case '4': { // output a 'mp4'-format file (to stdout)
00194       outputQuickTimeFile = True;
00195       generateMP4Format = True;
00196       break;
00197     }
00198 
00199     case 'i': { // output an AVI file (to stdout)
00200       outputAVIFile = True;
00201       break;
00202     }
00203 
00204     case 'I': { // specify input interface...
00205       NetAddressList addresses(argv[2]);
00206       if (addresses.numAddresses() == 0) {
00207         *env << "Failed to find network address for \"" << argv[2] << "\"";
00208         break;
00209       }
00210       ReceivingInterfaceAddr = *(unsigned*)(addresses.firstAddress()->data());
00211       ++argv; --argc;
00212       break;
00213     }
00214 
00215     case 'a': { // receive/record an audio stream only
00216       audioOnly = True;
00217       singleMedium = "audio";
00218       break;
00219     }
00220 
00221     case 'v': { // receive/record a video stream only
00222       videoOnly = True;
00223       singleMedium = "video";
00224       break;
00225     }
00226 
00227     case 'V': { // disable verbose output
00228       verbosityLevel = 0;
00229       break;
00230     }
00231 
00232     case 'd': { // specify duration, or how much to delay after end time
00233       float arg;
00234       if (sscanf(argv[2], "%g", &arg) != 1) {
00235         usage();
00236       }
00237       if (argv[2][0] == '-') { // not "arg<0", in case argv[2] was "-0"
00238         // a 'negative' argument was specified; use this for "durationSlop":
00239         duration = 0; // use whatever's in the SDP
00240         durationSlop = -arg;
00241       } else {
00242         duration = arg;
00243         durationSlop = 0;
00244       }
00245       ++argv; --argc;
00246       break;
00247     }
00248 
00249     case 'D': { // specify maximum number of seconds to wait for packets:
00250       if (sscanf(argv[2], "%u", &interPacketGapMaxTime) != 1) {
00251         usage();
00252       }
00253       ++argv; --argc;
00254       break;
00255     }
00256 
00257     case 'c': { // play continuously
00258       playContinuously = True;
00259       break;
00260     }
00261 
00262     case 'S': { // specify an offset to use with "SimpleRTPSource"s
00263       if (sscanf(argv[2], "%d", &simpleRTPoffsetArg) != 1) {
00264         usage();
00265       }
00266       if (simpleRTPoffsetArg < 0) {
00267         *env << "offset argument to \"-S\" must be >= 0\n";
00268         usage();
00269       }
00270       ++argv; --argc;
00271       break;
00272     }
00273 
00274     case 'm': { // output multiple files - one for each frame
00275       oneFilePerFrame = True;
00276       break;
00277     }
00278 
00279     case 'n': { // notify the user when the first data packet arrives
00280       notifyOnPacketArrival = True;
00281       break;
00282     }
00283 
00284     case 'O': { // Don't send an "OPTIONS" request before "DESCRIBE"
00285       sendOptionsRequest = False;
00286       break;
00287     }
00288 
00289     case 'o': { // Send only the "OPTIONS" request to the server
00290       sendOptionsRequestOnly = True;
00291       break;
00292     }
00293 
00294     case 'P': { // specify an interval (in seconds) between writing successive output files
00295       int fileOutputIntervalInt;
00296       if (sscanf(argv[2], "%d", &fileOutputIntervalInt) != 1 || fileOutputIntervalInt <= 0) {
00297         usage();
00298       }
00299       fileOutputInterval = (unsigned)fileOutputIntervalInt;
00300       ++argv; --argc;
00301       break;
00302     }
00303 
00304     case 't': {
00305       // stream RTP and RTCP over the TCP 'control' connection
00306       if (controlConnectionUsesTCP) {
00307         streamUsingTCP = True;
00308       } else {
00309         usage();
00310       }
00311       break;
00312     }
00313 
00314     case 'T': {
00315       // stream RTP and RTCP over a HTTP connection
00316       if (controlConnectionUsesTCP) {
00317         if (argc > 3 && argv[2][0] != '-') {
00318           // The next argument is the HTTP server port number:
00319           if (sscanf(argv[2], "%hu", &tunnelOverHTTPPortNum) == 1
00320               && tunnelOverHTTPPortNum > 0) {
00321             ++argv; --argc;
00322             break;
00323           }
00324         }
00325       }
00326 
00327       // If we get here, the option was specified incorrectly:
00328       usage();
00329       break;
00330     }
00331 
00332     case 'u': { // specify a username and password
00333       if (argc < 4) usage(); // there's no argv[3] (for the "password")
00334       username = argv[2];
00335       password = argv[3];
00336       argv+=2; argc-=2;
00337       if (allowProxyServers && argc > 3 && argv[2][0] != '-') {
00338         // The next argument is the name of a proxy server:
00339         proxyServerName = argv[2];
00340         ++argv; --argc;
00341 
00342         if (argc > 3 && argv[2][0] != '-') {
00343           // The next argument is the proxy server port number:
00344           if (sscanf(argv[2], "%hu", &proxyServerPortNum) != 1) {
00345             usage();
00346           }
00347           ++argv; --argc;
00348         }
00349       }
00350 
00351       ourAuthenticator = new Authenticator(username, password);
00352       break;
00353     }
00354 
00355     case 'k': { // specify a username and password to be used to authentication an incoming "REGISTER" command (for use with -R)
00356       if (argc < 4) usage(); // there's no argv[3] (for the "password")
00357       usernameForREGISTER = argv[2];
00358       passwordForREGISTER = argv[3];
00359       argv+=2; argc-=2;
00360 
00361       if (authDBForREGISTER == NULL) authDBForREGISTER = new UserAuthenticationDatabase;
00362       authDBForREGISTER->addUserRecord(usernameForREGISTER, passwordForREGISTER);
00363       break;
00364     }
00365 
00366     case 'A': { // specify a desired audio RTP payload format
00367       unsigned formatArg;
00368       if (sscanf(argv[2], "%u", &formatArg) != 1
00369           || formatArg >= 96) {
00370         usage();
00371       }
00372       desiredAudioRTPPayloadFormat = (unsigned char)formatArg;
00373       ++argv; --argc;
00374       break;
00375     }
00376 
00377     case 'M': { // specify a MIME subtype for a dynamic RTP payload type
00378       mimeSubtype = argv[2];
00379       if (desiredAudioRTPPayloadFormat==0) desiredAudioRTPPayloadFormat =96;
00380       ++argv; --argc;
00381       break;
00382     }
00383 
00384     case 'w': { // specify a width (pixels) for an output QuickTime or AVI movie
00385       if (sscanf(argv[2], "%hu", &movieWidth) != 1) {
00386         usage();
00387       }
00388       movieWidthOptionSet = True;
00389       ++argv; --argc;
00390       break;
00391     }
00392 
00393     case 'h': { // specify a height (pixels) for an output QuickTime or AVI movie
00394       if (sscanf(argv[2], "%hu", &movieHeight) != 1) {
00395         usage();
00396       }
00397       movieHeightOptionSet = True;
00398       ++argv; --argc;
00399       break;
00400     }
00401 
00402     case 'f': { // specify a frame rate (per second) for an output QT or AVI movie
00403       if (sscanf(argv[2], "%u", &movieFPS) != 1) {
00404         usage();
00405       }
00406       movieFPSOptionSet = True;
00407       ++argv; --argc;
00408       break;
00409     }
00410 
00411     case 'F': { // specify a prefix for the audio and video output files
00412       fileNamePrefix = argv[2];
00413       ++argv; --argc;
00414       break;
00415     }
00416 
00417     case 'g': { // specify a user agent name to use in outgoing requests
00418       userAgent = argv[2];
00419       ++argv; --argc;
00420       break;
00421     }
00422 
00423     case 'b': { // specify the size of buffers for "FileSink"s
00424       if (sscanf(argv[2], "%u", &fileSinkBufferSize) != 1) {
00425         usage();
00426       }
00427       ++argv; --argc;
00428       break;
00429     }
00430 
00431     case 'B': { // specify the size of input socket buffers
00432       if (sscanf(argv[2], "%u", &socketInputBufferSize) != 1) {
00433         usage();
00434       }
00435       ++argv; --argc;
00436       break;
00437     }
00438 
00439     // Note: The following option is deprecated, and may someday be removed:
00440     case 'l': { // try to compensate for packet loss by repeating frames
00441       packetLossCompensate = True;
00442       break;
00443     }
00444 
00445     case 'y': { // synchronize audio and video streams
00446       syncStreams = True;
00447       break;
00448     }
00449 
00450     case 'H': { // generate hint tracks (as well as the regular data tracks)
00451       generateHintTracks = True;
00452       break;
00453     }
00454 
00455     case 'Q': { // output QOS measurements
00456       qosMeasurementIntervalMS = 1000; // default: 1 second
00457 
00458       if (argc > 3 && argv[2][0] != '-') {
00459         // The next argument is the measurement interval,
00460         // in multiples of 100 ms
00461         if (sscanf(argv[2], "%u", &qosMeasurementIntervalMS) != 1) {
00462           usage();
00463         }
00464         qosMeasurementIntervalMS *= 100;
00465         ++argv; --argc;
00466       }
00467       break;
00468     }
00469 
00470     case 's': { // specify initial seek time (trick play)
00471       double arg;
00472       if (sscanf(argv[2], "%lg", &arg) != 1 || arg < 0) {
00473         usage();
00474       }
00475       initialSeekTime = arg;
00476       ++argv; --argc;
00477       break;
00478     }
00479 
00480     case 'U': {
00481       // specify initial absolute seek time (trick play), using a string of the form "YYYYMMDDTHHMMSSZ" or "YYYYMMDDTHHMMSS.<frac>Z"
00482       initialAbsoluteSeekTime = argv[2];
00483       ++argv; --argc;
00484       break;
00485     }
00486 
00487     case 'z': { // scale (trick play)
00488       float arg;
00489       if (sscanf(argv[2], "%g", &arg) != 1 || arg == 0.0f) {
00490         usage();
00491       }
00492       scale = arg;
00493       ++argv; --argc;
00494       break;
00495     }
00496 
00497     case 'R': {
00498       // set up a handler server for incoming "REGISTER" commands
00499       createHandlerServerForREGISTERCommand = True;
00500       if (argc > 2 && argv[2][0] != '-') {
00501         // The next argument is the REGISTER handler server port number:
00502         if (sscanf(argv[2], "%hu", &handlerServerForREGISTERCommandPortNum) == 1 && handlerServerForREGISTERCommandPortNum > 0) {
00503           ++argv; --argc;
00504           break;
00505         }
00506       }
00507       break;
00508     }
00509 
00510     case 'C': {
00511       forceMulticastOnUnspecified = True;
00512       break;
00513     }
00514 
00515     default: {
00516       *env << "Invalid option: " << opt << "\n";
00517       usage();
00518       break;
00519     }
00520     }
00521 
00522     ++argv; --argc;
00523   }
00524 
00525   // There must be exactly one "rtsp://" URL at the end (unless '-R' was used, in which case there's no URL)
00526   if (!( (argc == 2 && !createHandlerServerForREGISTERCommand) || (argc == 1 && createHandlerServerForREGISTERCommand) )) usage();
00527   if (outputQuickTimeFile && outputAVIFile) {
00528     *env << "The -i and -q (or -4) options cannot both be used!\n";
00529     usage();
00530   }
00531   Boolean outputCompositeFile = outputQuickTimeFile || outputAVIFile;
00532   if (!createReceivers && (outputCompositeFile || oneFilePerFrame || fileOutputInterval > 0)) {
00533     *env << "The -r option cannot be used with -q, -4, -i, -m, or -P!\n";
00534     usage();
00535   }
00536   if (oneFilePerFrame && fileOutputInterval > 0) {
00537     *env << "The -m and -P options cannot both be used!\n";
00538     usage();
00539   }
00540   if (outputCompositeFile && !movieWidthOptionSet) {
00541     *env << "Warning: The -q, -4 or -i option was used, but not -w.  Assuming a video width of "
00542          << movieWidth << " pixels\n";
00543   }
00544   if (outputCompositeFile && !movieHeightOptionSet) {
00545     *env << "Warning: The -q, -4 or -i option was used, but not -h.  Assuming a video height of "
00546          << movieHeight << " pixels\n";
00547   }
00548   if (outputCompositeFile && !movieFPSOptionSet) {
00549     *env << "Warning: The -q, -4 or -i option was used, but not -f.  Assuming a video frame rate of "
00550          << movieFPS << " frames-per-second\n";
00551   }
00552   if (audioOnly && videoOnly) {
00553     *env << "The -a and -v options cannot both be used!\n";
00554     usage();
00555   }
00556   if (sendOptionsRequestOnly && !sendOptionsRequest) {
00557     *env << "The -o and -O options cannot both be used!\n";
00558     usage();
00559   }
00560   if (initialAbsoluteSeekTime != NULL && initialSeekTime != 0.0f) {
00561     *env << "The -s and -U options cannot both be used!\n";
00562     usage();
00563   }
00564   if (authDBForREGISTER != NULL && !createHandlerServerForREGISTERCommand) {
00565     *env << "If \"-k <username> <password>\" is used, then -R (or \"-R <port-num>\") must also be used!\n";
00566     usage();
00567   }
00568   if (tunnelOverHTTPPortNum > 0) {
00569     if (streamUsingTCP) {
00570       *env << "The -t and -T options cannot both be used!\n";
00571       usage();
00572     } else {
00573       streamUsingTCP = True;
00574     }
00575   }
00576   if (!createReceivers && notifyOnPacketArrival) {
00577     *env << "Warning: Because we're not receiving stream data, the -n flag has no effect\n";
00578   }
00579   if (durationSlop < 0) {
00580     // This parameter wasn't set, so use a default value.
00581     // If we're measuring QOS stats, then don't add any slop, to avoid
00582     // having 'empty' measurement intervals at the end.
00583     durationSlop = qosMeasurementIntervalMS > 0 ? 0.0 : 5.0;
00584   }
00585 
00586   streamURL = argv[1];
00587 
00588   // Create (or arrange to create) our client object:
00589   if (createHandlerServerForREGISTERCommand) {
00590     handlerServerForREGISTERCommand
00591       = HandlerServerForREGISTERCommand::createNew(*env, continueAfterClientCreation0,
00592                                                    handlerServerForREGISTERCommandPortNum, authDBForREGISTER,
00593                                                    verbosityLevel, progName);
00594     if (handlerServerForREGISTERCommand == NULL) {
00595       *env << "Failed to create a server for handling incoming \"REGISTER\" commands: " << env->getResultMsg() << "\n";
00596     } else {
00597       *env << "Awaiting an incoming \"REGISTER\" command on port " << handlerServerForREGISTERCommand->serverPortNum() << "\n";
00598     }
00599   } else {
00600     ourClient = createClient(*env, streamURL, verbosityLevel, progName);
00601     if (ourClient == NULL) {
00602       *env << "Failed to create " << clientProtocolName << " client: " << env->getResultMsg() << "\n";
00603       shutdown();
00604     }
00605     continueAfterClientCreation1();
00606   }
00607 
00608   // All subsequent activity takes place within the event loop:
00609   env->taskScheduler().doEventLoop(); // does not return
00610 
00611   return 0; // only to prevent compiler warning
00612 }
00613 
00614 void continueAfterClientCreation0(RTSPClient* newRTSPClient, Boolean requestStreamingOverTCP) {
00615   if (newRTSPClient == NULL) return;
00616 
00617   streamUsingTCP = requestStreamingOverTCP;
00618 
00619   assignClient(ourClient = newRTSPClient);
00620   streamURL = newRTSPClient->url();
00621 
00622   // Having handled one "REGISTER" command (giving us a "rtsp://" URL to stream from), we don't handle any more:
00623   Medium::close(handlerServerForREGISTERCommand); handlerServerForREGISTERCommand = NULL;
00624 
00625   continueAfterClientCreation1();
00626 }
00627 
00628 void continueAfterClientCreation1() {
00629   setUserAgentString(userAgent);
00630 
00631   if (sendOptionsRequest) {
00632     // Begin by sending an "OPTIONS" command:
00633     getOptions(continueAfterOPTIONS);
00634   } else {
00635     continueAfterOPTIONS(NULL, 0, NULL);
00636   }
00637 }
00638 
00639 void continueAfterOPTIONS(RTSPClient*, int resultCode, char* resultString) {
00640   if (sendOptionsRequestOnly) {
00641     if (resultCode != 0) {
00642       *env << clientProtocolName << " \"OPTIONS\" request failed: " << resultString << "\n";
00643     } else {
00644       *env << clientProtocolName << " \"OPTIONS\" request returned: " << resultString << "\n";
00645     }
00646     shutdown();
00647   }
00648   delete[] resultString;
00649 
00650   // Next, get a SDP description for the stream:
00651   getSDPDescription(continueAfterDESCRIBE);
00652 }
00653 
00654 void continueAfterDESCRIBE(RTSPClient*, int resultCode, char* resultString) {
00655   if (resultCode != 0) {
00656     *env << "Failed to get a SDP description for the URL \"" << streamURL << "\": " << resultString << "\n";
00657     delete[] resultString;
00658     shutdown();
00659   }
00660 
00661   char* sdpDescription = resultString;
00662   *env << "Opened URL \"" << streamURL << "\", returning a SDP description:\n" << sdpDescription << "\n";
00663 
00664   // Create a media session object from this SDP description:
00665   session = MediaSession::createNew(*env, sdpDescription);
00666   delete[] sdpDescription;
00667   if (session == NULL) {
00668     *env << "Failed to create a MediaSession object from the SDP description: " << env->getResultMsg() << "\n";
00669     shutdown();
00670   } else if (!session->hasSubsessions()) {
00671     *env << "This session has no media subsessions (i.e., no \"m=\" lines)\n";
00672     shutdown();
00673   }
00674 
00675   // Then, setup the "RTPSource"s for the session:
00676   MediaSubsessionIterator iter(*session);
00677   MediaSubsession *subsession;
00678   Boolean madeProgress = False;
00679   char const* singleMediumToTest = singleMedium;
00680   while ((subsession = iter.next()) != NULL) {
00681     // If we've asked to receive only a single medium, then check this now:
00682     if (singleMediumToTest != NULL) {
00683       if (strcmp(subsession->mediumName(), singleMediumToTest) != 0) {
00684                   *env << "Ignoring \"" << subsession->mediumName()
00685                           << "/" << subsession->codecName()
00686                           << "\" subsession, because we've asked to receive a single " << singleMedium
00687                           << " session only\n";
00688         continue;
00689       } else {
00690         // Receive this subsession only
00691         singleMediumToTest = "xxxxx";
00692             // this hack ensures that we get only 1 subsession of this type
00693       }
00694     }
00695 
00696     if (desiredPortNum != 0) {
00697       subsession->setClientPortNum(desiredPortNum);
00698       desiredPortNum += 2;
00699     }
00700 
00701     if (createReceivers) {
00702       if (!subsession->initiate(simpleRTPoffsetArg)) {
00703         *env << "Unable to create receiver for \"" << subsession->mediumName()
00704              << "/" << subsession->codecName()
00705              << "\" subsession: " << env->getResultMsg() << "\n";
00706       } else {
00707         *env << "Created receiver for \"" << subsession->mediumName()
00708              << "/" << subsession->codecName() << "\" subsession (";
00709         if (subsession->rtcpIsMuxed()) {
00710           *env << "client port " << subsession->clientPortNum();
00711         } else {
00712           *env << "client ports " << subsession->clientPortNum()
00713                << "-" << subsession->clientPortNum()+1;
00714         }
00715         *env << ")\n";
00716         madeProgress = True;
00717         
00718         if (subsession->rtpSource() != NULL) {
00719           // Because we're saving the incoming data, rather than playing
00720           // it in real time, allow an especially large time threshold
00721           // (1 second) for reordering misordered incoming packets:
00722           unsigned const thresh = 1000000; // 1 second
00723           subsession->rtpSource()->setPacketReorderingThresholdTime(thresh);
00724           
00725           // Set the RTP source's OS socket buffer size as appropriate - either if we were explicitly asked (using -B),
00726           // or if the desired FileSink buffer size happens to be larger than the current OS socket buffer size.
00727           // (The latter case is a heuristic, on the assumption that if the user asked for a large FileSink buffer size,
00728           // then the input data rate may be large enough to justify increasing the OS socket buffer size also.)
00729           int socketNum = subsession->rtpSource()->RTPgs()->socketNum();
00730           unsigned curBufferSize = getReceiveBufferSize(*env, socketNum);
00731           if (socketInputBufferSize > 0 || fileSinkBufferSize > curBufferSize) {
00732             unsigned newBufferSize = socketInputBufferSize > 0 ? socketInputBufferSize : fileSinkBufferSize;
00733             newBufferSize = setReceiveBufferTo(*env, socketNum, newBufferSize);
00734             if (socketInputBufferSize > 0) { // The user explicitly asked for the new socket buffer size; announce it:
00735               *env << "Changed socket receive buffer size for the \""
00736                    << subsession->mediumName()
00737                    << "/" << subsession->codecName()
00738                    << "\" subsession from "
00739                    << curBufferSize << " to "
00740                    << newBufferSize << " bytes\n";
00741             }
00742           }
00743         }
00744       }
00745     } else {
00746       if (subsession->clientPortNum() == 0) {
00747         *env << "No client port was specified for the \""
00748              << subsession->mediumName()
00749              << "/" << subsession->codecName()
00750              << "\" subsession.  (Try adding the \"-p <portNum>\" option.)\n";
00751       } else {
00752                 madeProgress = True;
00753       }
00754     }
00755   }
00756   if (!madeProgress) shutdown();
00757 
00758   // Perform additional 'setup' on each subsession, before playing them:
00759   setupStreams();
00760 }
00761 
00762 MediaSubsession *subsession;
00763 Boolean madeProgress = False;
00764 void continueAfterSETUP(RTSPClient*, int resultCode, char* resultString) {
00765   if (resultCode == 0) {
00766       *env << "Setup \"" << subsession->mediumName()
00767            << "/" << subsession->codecName()
00768            << "\" subsession (";
00769       if (subsession->rtcpIsMuxed()) {
00770         *env << "client port " << subsession->clientPortNum();
00771       } else {
00772         *env << "client ports " << subsession->clientPortNum()
00773              << "-" << subsession->clientPortNum()+1;
00774       }
00775       *env << ")\n";
00776       madeProgress = True;
00777   } else {
00778     *env << "Failed to setup \"" << subsession->mediumName()
00779          << "/" << subsession->codecName()
00780          << "\" subsession: " << resultString << "\n";
00781   }
00782   delete[] resultString;
00783 
00784   // Set up the next subsession, if any:
00785   setupStreams();
00786 }
00787 
00788 void createOutputFiles(char const* periodicFilenameSuffix) {
00789   char outFileName[1000];
00790 
00791   if (outputQuickTimeFile || outputAVIFile) {
00792     if (periodicFilenameSuffix[0] == '\0') {
00793       // Normally (unless the '-P <interval-in-seconds>' option was given) we output to 'stdout':
00794       sprintf(outFileName, "stdout");
00795     } else {
00796       // Otherwise output to a type-specific file name, containing "periodicFilenameSuffix":
00797       char const* prefix = fileNamePrefix[0] == '\0' ? "output" : fileNamePrefix;
00798       snprintf(outFileName, sizeof outFileName, "%s%s.%s", prefix, periodicFilenameSuffix,
00799                outputAVIFile ? "avi" : generateMP4Format ? "mp4" : "mov");
00800     }
00801 
00802     if (outputQuickTimeFile) {
00803       qtOut = QuickTimeFileSink::createNew(*env, *session, outFileName,
00804                                            fileSinkBufferSize,
00805                                            movieWidth, movieHeight,
00806                                            movieFPS,
00807                                            packetLossCompensate,
00808                                            syncStreams,
00809                                            generateHintTracks,
00810                                            generateMP4Format);
00811       if (qtOut == NULL) {
00812         *env << "Failed to create a \"QuickTimeFileSink\" for outputting to \""
00813              << outFileName << "\": " << env->getResultMsg() << "\n";
00814         shutdown();
00815       } else {
00816         *env << "Outputting to the file: \"" << outFileName << "\"\n";
00817       }
00818       
00819       qtOut->startPlaying(sessionAfterPlaying, NULL);
00820     } else { // outputAVIFile
00821       aviOut = AVIFileSink::createNew(*env, *session, outFileName,
00822                                       fileSinkBufferSize,
00823                                       movieWidth, movieHeight,
00824                                       movieFPS,
00825                                       packetLossCompensate);
00826       if (aviOut == NULL) {
00827         *env << "Failed to create an \"AVIFileSink\" for outputting to \""
00828              << outFileName << "\": " << env->getResultMsg() << "\n";
00829         shutdown();
00830       } else {
00831         *env << "Outputting to the file: \"" << outFileName << "\"\n";
00832       }
00833       
00834       aviOut->startPlaying(sessionAfterPlaying, NULL);
00835     }
00836   } else {
00837     // Create and start "FileSink"s for each subsession:
00838     madeProgress = False;
00839     MediaSubsessionIterator iter(*session);
00840     while ((subsession = iter.next()) != NULL) {
00841       if (subsession->readSource() == NULL) continue; // was not initiated
00842       
00843       // Create an output file for each desired stream:
00844       if (singleMedium == NULL || periodicFilenameSuffix[0] != '\0') {
00845         // Output file name is
00846         //     "<filename-prefix><medium_name>-<codec_name>-<counter><periodicFilenameSuffix>"
00847         static unsigned streamCounter = 0;
00848         snprintf(outFileName, sizeof outFileName, "%s%s-%s-%d%s",
00849                  fileNamePrefix, subsession->mediumName(),
00850                  subsession->codecName(), ++streamCounter, periodicFilenameSuffix);
00851       } else {
00852         // When outputting a single medium only, we output to 'stdout
00853         // (unless the '-P <interval-in-seconds>' option was given):
00854         sprintf(outFileName, "stdout");
00855       }
00856 
00857       FileSink* fileSink = NULL;
00858       Boolean createOggFileSink = False; // by default
00859       if (strcmp(subsession->mediumName(), "video") == 0) {
00860         if (strcmp(subsession->codecName(), "H264") == 0) {
00861           // For H.264 video stream, we use a special sink that adds 'start codes',
00862           // and (at the start) the SPS and PPS NAL units:
00863           fileSink = H264VideoFileSink::createNew(*env, outFileName,
00864                                                   subsession->fmtp_spropparametersets(),
00865                                                   fileSinkBufferSize, oneFilePerFrame);
00866         } else if (strcmp(subsession->codecName(), "H265") == 0) {
00867           // For H.265 video stream, we use a special sink that adds 'start codes',
00868           // and (at the start) the VPS, SPS, and PPS NAL units:
00869           fileSink = H265VideoFileSink::createNew(*env, outFileName,
00870                                                   subsession->fmtp_spropvps(),
00871                                                   subsession->fmtp_spropsps(),
00872                                                   subsession->fmtp_sproppps(),
00873                                                   fileSinkBufferSize, oneFilePerFrame);
00874         } else if (strcmp(subsession->codecName(), "THEORA") == 0) {
00875           createOggFileSink = True;
00876         }
00877       } else if (strcmp(subsession->mediumName(), "audio") == 0) {
00878         if (strcmp(subsession->codecName(), "AMR") == 0 ||
00879             strcmp(subsession->codecName(), "AMR-WB") == 0) {
00880           // For AMR audio streams, we use a special sink that inserts AMR frame hdrs:
00881           fileSink = AMRAudioFileSink::createNew(*env, outFileName,
00882                                                  fileSinkBufferSize, oneFilePerFrame);
00883         } else if (strcmp(subsession->codecName(), "VORBIS") == 0 ||
00884                    strcmp(subsession->codecName(), "OPUS") == 0) {
00885           createOggFileSink = True;
00886         }
00887       }
00888       if (createOggFileSink) {
00889         fileSink = OggFileSink
00890 	  ::createNew(*env, outFileName,
00891                       subsession->rtpTimestampFrequency(), subsession->fmtp_config());
00892       } else if (fileSink == NULL) {
00893         // Normal case:
00894         fileSink = FileSink::createNew(*env, outFileName,
00895                                        fileSinkBufferSize, oneFilePerFrame);
00896       }
00897       subsession->sink = fileSink;
00898 
00899       if (subsession->sink == NULL) {
00900         *env << "Failed to create FileSink for \"" << outFileName
00901              << "\": " << env->getResultMsg() << "\n";
00902       } else {
00903         if (singleMedium == NULL) {
00904           *env << "Created output file: \"" << outFileName << "\"\n";
00905         } else {
00906           *env << "Outputting data from the \"" << subsession->mediumName()
00907                << "/" << subsession->codecName()
00908                << "\" subsession to \"" << outFileName << "\"\n";
00909         }
00910         
00911         if (strcmp(subsession->mediumName(), "video") == 0 &&
00912             strcmp(subsession->codecName(), "MP4V-ES") == 0 &&
00913             subsession->fmtp_config() != NULL) {
00914           // For MPEG-4 video RTP streams, the 'config' information
00915           // from the SDP description contains useful VOL etc. headers.
00916           // Insert this data at the front of the output file:
00917           unsigned configLen;
00918           unsigned char* configData
00919             = parseGeneralConfigStr(subsession->fmtp_config(), configLen);
00920           struct timeval timeNow;
00921           gettimeofday(&timeNow, NULL);
00922           fileSink->addData(configData, configLen, timeNow);
00923           delete[] configData;
00924         }
00925         
00926         subsession->sink->startPlaying(*(subsession->readSource()),
00927                                        subsessionAfterPlaying,
00928                                        subsession);
00929         
00930         // Also set a handler to be called if a RTCP "BYE" arrives
00931         // for this subsession:
00932         if (subsession->rtcpInstance() != NULL) {
00933           subsession->rtcpInstance()->setByeHandler(subsessionByeHandler, subsession);
00934         }
00935         
00936         madeProgress = True;
00937       }
00938     }
00939     if (!madeProgress) shutdown();
00940   }
00941 }
00942 
00943 void createPeriodicOutputFiles() {
00944   // Create a filename suffix that notes the time interval that's being recorded:
00945   char periodicFileNameSuffix[100];
00946   snprintf(periodicFileNameSuffix, sizeof periodicFileNameSuffix, "-%05d-%05d",
00947            fileOutputSecondsSoFar, fileOutputSecondsSoFar + fileOutputInterval);
00948   createOutputFiles(periodicFileNameSuffix);
00949 
00950   // Schedule an event for writing the next output file:
00951   periodicFileOutputTask
00952     = env->taskScheduler().scheduleDelayedTask(fileOutputInterval*1000000,
00953                                                (TaskFunc*)periodicFileOutputTimerHandler,
00954                                                (void*)NULL);
00955 }
00956 
00957 void setupStreams() {
00958   static MediaSubsessionIterator* setupIter = NULL;
00959   if (setupIter == NULL) setupIter = new MediaSubsessionIterator(*session);
00960   while ((subsession = setupIter->next()) != NULL) {
00961     // We have another subsession left to set up:
00962     if (subsession->clientPortNum() == 0) continue; // port # was not set
00963 
00964     setupSubsession(subsession, streamUsingTCP, forceMulticastOnUnspecified, continueAfterSETUP);
00965     return;
00966   }
00967 
00968   // We're done setting up subsessions.
00969   delete setupIter;
00970   if (!madeProgress) shutdown();
00971 
00972   // Create output files:
00973   if (createReceivers) {
00974     if (fileOutputInterval > 0) {
00975       createPeriodicOutputFiles();
00976     } else {
00977       createOutputFiles("");
00978     }
00979   }
00980 
00981   // Finally, start playing each subsession, to start the data flow:
00982   if (duration == 0) {
00983     if (scale > 0) duration = session->playEndTime() - initialSeekTime; // use SDP end time
00984     else if (scale < 0) duration = initialSeekTime;
00985   }
00986   if (duration < 0) duration = 0.0;
00987 
00988   endTime = initialSeekTime;
00989   if (scale > 0) {
00990     if (duration <= 0) endTime = -1.0f;
00991     else endTime = initialSeekTime + duration;
00992   } else {
00993     endTime = initialSeekTime - duration;
00994     if (endTime < 0) endTime = 0.0f;
00995   }
00996 
00997   char const* absStartTime = initialAbsoluteSeekTime != NULL ? initialAbsoluteSeekTime : session->absStartTime();
00998   if (absStartTime != NULL) {
00999     // Either we or the server have specified that seeking should be done by 'absolute' time:
01000     startPlayingSession(session, absStartTime, session->absEndTime(), scale, continueAfterPLAY);
01001   } else {
01002     // Normal case: Seek by relative time (NPT):
01003     startPlayingSession(session, initialSeekTime, endTime, scale, continueAfterPLAY);
01004   }
01005 }
01006 
01007 void continueAfterPLAY(RTSPClient*, int resultCode, char* resultString) {
01008   if (resultCode != 0) {
01009     *env << "Failed to start playing session: " << resultString << "\n";
01010     delete[] resultString;
01011     shutdown();
01012     return;
01013   } else {
01014     *env << "Started playing session\n";
01015   }
01016   delete[] resultString;
01017 
01018   if (qosMeasurementIntervalMS > 0) {
01019     // Begin periodic QOS measurements:
01020     beginQOSMeasurement();
01021   }
01022 
01023   // Figure out how long to delay (if at all) before shutting down, or
01024   // repeating the playing
01025   Boolean timerIsBeingUsed = False;
01026   double secondsToDelay = duration;
01027   if (duration > 0) {
01028     // First, adjust "duration" based on any change to the play range (that was specified in the "PLAY" response):
01029     double rangeAdjustment = (session->playEndTime() - session->playStartTime()) - (endTime - initialSeekTime);
01030     if (duration + rangeAdjustment > 0.0) duration += rangeAdjustment;
01031 
01032     timerIsBeingUsed = True;
01033     double absScale = scale > 0 ? scale : -scale; // ASSERT: scale != 0
01034     secondsToDelay = duration/absScale + durationSlop;
01035 
01036     int64_t uSecsToDelay = (int64_t)(secondsToDelay*1000000.0);
01037     sessionTimerTask = env->taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)sessionTimerHandler, (void*)NULL);
01038   }
01039 
01040   char const* actionString
01041     = createReceivers? "Receiving streamed data":"Data is being streamed";
01042   if (timerIsBeingUsed) {
01043     *env << actionString
01044                 << " (for up to " << secondsToDelay
01045                 << " seconds)...\n";
01046   } else {
01047 #ifdef USE_SIGNALS
01048     pid_t ourPid = getpid();
01049     *env << actionString
01050                 << " (signal with \"kill -HUP " << (int)ourPid
01051                 << "\" or \"kill -USR1 " << (int)ourPid
01052                 << "\" to terminate)...\n";
01053 #else
01054     *env << actionString << "...\n";
01055 #endif
01056   }
01057 
01058   // Watch for incoming packets (if desired):
01059   checkForPacketArrival(NULL);
01060   checkInterPacketGaps(NULL);
01061 }
01062 
01063 void closeMediaSinks() {
01064   Medium::close(qtOut); qtOut = NULL;
01065   Medium::close(aviOut); aviOut = NULL;
01066 
01067   if (session == NULL) return;
01068   MediaSubsessionIterator iter(*session);
01069   MediaSubsession* subsession;
01070   while ((subsession = iter.next()) != NULL) {
01071     Medium::close(subsession->sink);
01072     subsession->sink = NULL;
01073   }
01074 }
01075 
01076 void subsessionAfterPlaying(void* clientData) {
01077   // Begin by closing this media subsession's stream:
01078   MediaSubsession* subsession = (MediaSubsession*)clientData;
01079   Medium::close(subsession->sink);
01080   subsession->sink = NULL;
01081 
01082   // Next, check whether *all* subsessions' streams have now been closed:
01083   MediaSession& session = subsession->parentSession();
01084   MediaSubsessionIterator iter(session);
01085   while ((subsession = iter.next()) != NULL) {
01086     if (subsession->sink != NULL) return; // this subsession is still active
01087   }
01088 
01089   // All subsessions' streams have now been closed
01090   sessionAfterPlaying();
01091 }
01092 
01093 void subsessionByeHandler(void* clientData) {
01094   struct timeval timeNow;
01095   gettimeofday(&timeNow, NULL);
01096   unsigned secsDiff = timeNow.tv_sec - startTime.tv_sec;
01097 
01098   MediaSubsession* subsession = (MediaSubsession*)clientData;
01099   *env << "Received RTCP \"BYE\" on \"" << subsession->mediumName()
01100         << "/" << subsession->codecName()
01101         << "\" subsession (after " << secsDiff
01102         << " seconds)\n";
01103 
01104   // Act now as if the subsession had closed:
01105   subsessionAfterPlaying(subsession);
01106 }
01107 
01108 void sessionAfterPlaying(void* /*clientData*/) {
01109   if (!playContinuously) {
01110     shutdown(0);
01111   } else {
01112     // We've been asked to play the stream(s) over again.
01113     // First, reset state from the current session:
01114     if (env != NULL) {
01115       env->taskScheduler().unscheduleDelayedTask(periodicFileOutputTask);
01116       env->taskScheduler().unscheduleDelayedTask(sessionTimerTask);
01117       env->taskScheduler().unscheduleDelayedTask(arrivalCheckTimerTask);
01118       env->taskScheduler().unscheduleDelayedTask(interPacketGapCheckTimerTask);
01119       env->taskScheduler().unscheduleDelayedTask(qosMeasurementTimerTask);
01120     }
01121     totNumPacketsReceived = ~0;
01122 
01123     startPlayingSession(session, initialSeekTime, endTime, scale, continueAfterPLAY);
01124   }
01125 }
01126 
01127 void sessionTimerHandler(void* /*clientData*/) {
01128   sessionTimerTask = NULL;
01129 
01130   sessionAfterPlaying();
01131 }
01132 
01133 void periodicFileOutputTimerHandler(void* /*clientData*/) {
01134   fileOutputSecondsSoFar += fileOutputInterval;
01135 
01136   // First, close the existing output files:
01137   closeMediaSinks();
01138 
01139   // Then, create new output files:
01140   createPeriodicOutputFiles();
01141 }
01142 
01143 class qosMeasurementRecord {
01144 public:
01145   qosMeasurementRecord(struct timeval const& startTime, RTPSource* src)
01146     : fSource(src), fNext(NULL),
01147       kbits_per_second_min(1e20), kbits_per_second_max(0),
01148       kBytesTotal(0.0),
01149       packet_loss_fraction_min(1.0), packet_loss_fraction_max(0.0),
01150       totNumPacketsReceived(0), totNumPacketsExpected(0) {
01151     measurementEndTime = measurementStartTime = startTime;
01152 
01153     RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB());
01154     // Assume that there's only one SSRC source (usually the case):
01155     RTPReceptionStats* stats = statsIter.next(True);
01156     if (stats != NULL) {
01157       kBytesTotal = stats->totNumKBytesReceived();
01158       totNumPacketsReceived = stats->totNumPacketsReceived();
01159       totNumPacketsExpected = stats->totNumPacketsExpected();
01160     }
01161   }
01162   virtual ~qosMeasurementRecord() { delete fNext; }
01163 
01164   void periodicQOSMeasurement(struct timeval const& timeNow);
01165 
01166 public:
01167   RTPSource* fSource;
01168   qosMeasurementRecord* fNext;
01169 
01170 public:
01171   struct timeval measurementStartTime, measurementEndTime;
01172   double kbits_per_second_min, kbits_per_second_max;
01173   double kBytesTotal;
01174   double packet_loss_fraction_min, packet_loss_fraction_max;
01175   unsigned totNumPacketsReceived, totNumPacketsExpected;
01176 };
01177 
01178 static qosMeasurementRecord* qosRecordHead = NULL;
01179 
01180 static void periodicQOSMeasurement(void* clientData); // forward
01181 
01182 static unsigned nextQOSMeasurementUSecs;
01183 
01184 static void scheduleNextQOSMeasurement() {
01185   nextQOSMeasurementUSecs += qosMeasurementIntervalMS*1000;
01186   struct timeval timeNow;
01187   gettimeofday(&timeNow, NULL);
01188   unsigned timeNowUSecs = timeNow.tv_sec*1000000 + timeNow.tv_usec;
01189   int usecsToDelay = nextQOSMeasurementUSecs - timeNowUSecs;
01190 
01191   qosMeasurementTimerTask = env->taskScheduler().scheduleDelayedTask(
01192      usecsToDelay, (TaskFunc*)periodicQOSMeasurement, (void*)NULL);
01193 }
01194 
01195 static void periodicQOSMeasurement(void* /*clientData*/) {
01196   struct timeval timeNow;
01197   gettimeofday(&timeNow, NULL);
01198 
01199   for (qosMeasurementRecord* qosRecord = qosRecordHead;
01200        qosRecord != NULL; qosRecord = qosRecord->fNext) {
01201     qosRecord->periodicQOSMeasurement(timeNow);
01202   }
01203 
01204   // Do this again later:
01205   scheduleNextQOSMeasurement();
01206 }
01207 
01208 void qosMeasurementRecord
01209 ::periodicQOSMeasurement(struct timeval const& timeNow) {
01210   unsigned secsDiff = timeNow.tv_sec - measurementEndTime.tv_sec;
01211   int usecsDiff = timeNow.tv_usec - measurementEndTime.tv_usec;
01212   double timeDiff = secsDiff + usecsDiff/1000000.0;
01213   measurementEndTime = timeNow;
01214 
01215   RTPReceptionStatsDB::Iterator statsIter(fSource->receptionStatsDB());
01216   // Assume that there's only one SSRC source (usually the case):
01217   RTPReceptionStats* stats = statsIter.next(True);
01218   if (stats != NULL) {
01219     double kBytesTotalNow = stats->totNumKBytesReceived();
01220     double kBytesDeltaNow = kBytesTotalNow - kBytesTotal;
01221     kBytesTotal = kBytesTotalNow;
01222 
01223     double kbpsNow = timeDiff == 0.0 ? 0.0 : 8*kBytesDeltaNow/timeDiff;
01224     if (kbpsNow < 0.0) kbpsNow = 0.0; // in case of roundoff error
01225     if (kbpsNow < kbits_per_second_min) kbits_per_second_min = kbpsNow;
01226     if (kbpsNow > kbits_per_second_max) kbits_per_second_max = kbpsNow;
01227 
01228     unsigned totReceivedNow = stats->totNumPacketsReceived();
01229     unsigned totExpectedNow = stats->totNumPacketsExpected();
01230     unsigned deltaReceivedNow = totReceivedNow - totNumPacketsReceived;
01231     unsigned deltaExpectedNow = totExpectedNow - totNumPacketsExpected;
01232     totNumPacketsReceived = totReceivedNow;
01233     totNumPacketsExpected = totExpectedNow;
01234 
01235     double lossFractionNow = deltaExpectedNow == 0 ? 0.0
01236       : 1.0 - deltaReceivedNow/(double)deltaExpectedNow;
01237     //if (lossFractionNow < 0.0) lossFractionNow = 0.0; //reordering can cause
01238     if (lossFractionNow < packet_loss_fraction_min) {
01239       packet_loss_fraction_min = lossFractionNow;
01240     }
01241     if (lossFractionNow > packet_loss_fraction_max) {
01242       packet_loss_fraction_max = lossFractionNow;
01243     }
01244   }
01245 }
01246 
01247 void beginQOSMeasurement() {
01248   // Set up a measurement record for each active subsession:
01249   struct timeval startTime;
01250   gettimeofday(&startTime, NULL);
01251   nextQOSMeasurementUSecs = startTime.tv_sec*1000000 + startTime.tv_usec;
01252   qosMeasurementRecord* qosRecordTail = NULL;
01253   MediaSubsessionIterator iter(*session);
01254   MediaSubsession* subsession;
01255   while ((subsession = iter.next()) != NULL) {
01256     RTPSource* src = subsession->rtpSource();
01257     if (src == NULL) continue;
01258 
01259     qosMeasurementRecord* qosRecord
01260       = new qosMeasurementRecord(startTime, src);
01261     if (qosRecordHead == NULL) qosRecordHead = qosRecord;
01262     if (qosRecordTail != NULL) qosRecordTail->fNext = qosRecord;
01263     qosRecordTail  = qosRecord;
01264   }
01265 
01266   // Then schedule the first of the periodic measurements:
01267   scheduleNextQOSMeasurement();
01268 }
01269 
01270 void printQOSData(int exitCode) {
01271   *env << "begin_QOS_statistics\n";
01272   
01273   // Print out stats for each active subsession:
01274   qosMeasurementRecord* curQOSRecord = qosRecordHead;
01275   if (session != NULL) {
01276     MediaSubsessionIterator iter(*session);
01277     MediaSubsession* subsession;
01278     while ((subsession = iter.next()) != NULL) {
01279       RTPSource* src = subsession->rtpSource();
01280       if (src == NULL) continue;
01281       
01282       *env << "subsession\t" << subsession->mediumName()
01283            << "/" << subsession->codecName() << "\n";
01284       
01285       unsigned numPacketsReceived = 0, numPacketsExpected = 0;
01286       
01287       if (curQOSRecord != NULL) {
01288         numPacketsReceived = curQOSRecord->totNumPacketsReceived;
01289         numPacketsExpected = curQOSRecord->totNumPacketsExpected;
01290       }
01291       *env << "num_packets_received\t" << numPacketsReceived << "\n";
01292       *env << "num_packets_lost\t" << int(numPacketsExpected - numPacketsReceived) << "\n";
01293       
01294       if (curQOSRecord != NULL) {
01295         unsigned secsDiff = curQOSRecord->measurementEndTime.tv_sec
01296           - curQOSRecord->measurementStartTime.tv_sec;
01297         int usecsDiff = curQOSRecord->measurementEndTime.tv_usec
01298           - curQOSRecord->measurementStartTime.tv_usec;
01299         double measurementTime = secsDiff + usecsDiff/1000000.0;
01300         *env << "elapsed_measurement_time\t" << measurementTime << "\n";
01301         
01302         *env << "kBytes_received_total\t" << curQOSRecord->kBytesTotal << "\n";
01303         
01304         *env << "measurement_sampling_interval_ms\t" << qosMeasurementIntervalMS << "\n";
01305         
01306         if (curQOSRecord->kbits_per_second_max == 0) {
01307           // special case: we didn't receive any data:
01308           *env <<
01309             "kbits_per_second_min\tunavailable\n"
01310             "kbits_per_second_ave\tunavailable\n"
01311             "kbits_per_second_max\tunavailable\n";
01312         } else {
01313           *env << "kbits_per_second_min\t" << curQOSRecord->kbits_per_second_min << "\n";
01314           *env << "kbits_per_second_ave\t"
01315                << (measurementTime == 0.0 ? 0.0 : 8*curQOSRecord->kBytesTotal/measurementTime) << "\n";
01316           *env << "kbits_per_second_max\t" << curQOSRecord->kbits_per_second_max << "\n";
01317         }
01318         
01319         *env << "packet_loss_percentage_min\t" << 100*curQOSRecord->packet_loss_fraction_min << "\n";
01320         double packetLossFraction = numPacketsExpected == 0 ? 1.0
01321           : 1.0 - numPacketsReceived/(double)numPacketsExpected;
01322         if (packetLossFraction < 0.0) packetLossFraction = 0.0;
01323         *env << "packet_loss_percentage_ave\t" << 100*packetLossFraction << "\n";
01324         *env << "packet_loss_percentage_max\t"
01325              << (packetLossFraction == 1.0 ? 100.0 : 100*curQOSRecord->packet_loss_fraction_max) << "\n";
01326         
01327         RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB());
01328         // Assume that there's only one SSRC source (usually the case):
01329         RTPReceptionStats* stats = statsIter.next(True);
01330         if (stats != NULL) {
01331           *env << "inter_packet_gap_ms_min\t" << stats->minInterPacketGapUS()/1000.0 << "\n";
01332           struct timeval totalGaps = stats->totalInterPacketGaps();
01333           double totalGapsMS = totalGaps.tv_sec*1000.0 + totalGaps.tv_usec/1000.0;
01334           unsigned totNumPacketsReceived = stats->totNumPacketsReceived();
01335           *env << "inter_packet_gap_ms_ave\t"
01336                << (totNumPacketsReceived == 0 ? 0.0 : totalGapsMS/totNumPacketsReceived) << "\n";
01337           *env << "inter_packet_gap_ms_max\t" << stats->maxInterPacketGapUS()/1000.0 << "\n";
01338         }
01339         
01340         curQOSRecord = curQOSRecord->fNext;
01341       }
01342     }
01343   }
01344 
01345   *env << "end_QOS_statistics\n";
01346   delete qosRecordHead;
01347 }
01348 
01349 Boolean areAlreadyShuttingDown = False;
01350 int shutdownExitCode;
01351 void shutdown(int exitCode) {
01352   if (areAlreadyShuttingDown) return; // in case we're called after receiving a RTCP "BYE" while in the middle of a "TEARDOWN".
01353   areAlreadyShuttingDown = True;
01354 
01355   shutdownExitCode = exitCode;
01356   if (env != NULL) {
01357     env->taskScheduler().unscheduleDelayedTask(periodicFileOutputTask);
01358     env->taskScheduler().unscheduleDelayedTask(sessionTimerTask);
01359     env->taskScheduler().unscheduleDelayedTask(arrivalCheckTimerTask);
01360     env->taskScheduler().unscheduleDelayedTask(interPacketGapCheckTimerTask);
01361     env->taskScheduler().unscheduleDelayedTask(qosMeasurementTimerTask);
01362   }
01363 
01364   if (qosMeasurementIntervalMS > 0) {
01365     printQOSData(exitCode);
01366   }
01367 
01368   // Teardown, then shutdown, any outstanding RTP/RTCP subsessions
01369   Boolean shutdownImmediately = True; // by default
01370   if (session != NULL) {
01371     RTSPClient::responseHandler* responseHandlerForTEARDOWN = NULL; // unless:
01372     if (waitForResponseToTEARDOWN) {
01373       shutdownImmediately = False;
01374       responseHandlerForTEARDOWN = continueAfterTEARDOWN;
01375     }
01376     tearDownSession(session, responseHandlerForTEARDOWN);
01377   }
01378 
01379   if (shutdownImmediately) continueAfterTEARDOWN(NULL, 0, NULL);
01380 }
01381 
01382 void continueAfterTEARDOWN(RTSPClient*, int /*resultCode*/, char* resultString) {
01383   delete[] resultString;
01384 
01385   // Now that we've stopped any more incoming data from arriving, close our output files:
01386   closeMediaSinks();
01387   Medium::close(session);
01388 
01389   // Finally, shut down our client:
01390   delete ourAuthenticator;
01391   delete authDBForREGISTER;
01392   Medium::close(ourClient);
01393 
01394   // Adios...
01395   exit(shutdownExitCode);
01396 }
01397 
01398 void signalHandlerShutdown(int /*sig*/) {
01399   *env << "Got shutdown signal\n";
01400   waitForResponseToTEARDOWN = False; // to ensure that we end, even if the server does not respond to our TEARDOWN
01401   shutdown(0);
01402 }
01403 
01404 void checkForPacketArrival(void* /*clientData*/) {
01405   if (!notifyOnPacketArrival) return; // we're not checking
01406 
01407   // Check each subsession, to see whether it has received data packets:
01408   unsigned numSubsessionsChecked = 0;
01409   unsigned numSubsessionsWithReceivedData = 0;
01410   unsigned numSubsessionsThatHaveBeenSynced = 0;
01411 
01412   MediaSubsessionIterator iter(*session);
01413   MediaSubsession* subsession;
01414   while ((subsession = iter.next()) != NULL) {
01415     RTPSource* src = subsession->rtpSource();
01416     if (src == NULL) continue;
01417     ++numSubsessionsChecked;
01418 
01419     if (src->receptionStatsDB().numActiveSourcesSinceLastReset() > 0) {
01420       // At least one data packet has arrived
01421       ++numSubsessionsWithReceivedData;
01422     }
01423     if (src->hasBeenSynchronizedUsingRTCP()) {
01424       ++numSubsessionsThatHaveBeenSynced;
01425     }
01426   }
01427 
01428   unsigned numSubsessionsToCheck = numSubsessionsChecked;
01429   // Special case for "QuickTimeFileSink"s and "AVIFileSink"s:
01430   // They might not use all of the input sources:
01431   if (qtOut != NULL) {
01432     numSubsessionsToCheck = qtOut->numActiveSubsessions();
01433   } else if (aviOut != NULL) {
01434     numSubsessionsToCheck = aviOut->numActiveSubsessions();
01435   }
01436 
01437   Boolean notifyTheUser;
01438   if (!syncStreams) {
01439     notifyTheUser = numSubsessionsWithReceivedData > 0; // easy case
01440   } else {
01441     notifyTheUser = numSubsessionsWithReceivedData >= numSubsessionsToCheck
01442       && numSubsessionsThatHaveBeenSynced == numSubsessionsChecked;
01443     // Note: A subsession with no active sources is considered to be synced
01444   }
01445   if (notifyTheUser) {
01446     struct timeval timeNow;
01447     gettimeofday(&timeNow, NULL);
01448         char timestampStr[100];
01449         sprintf(timestampStr, "%ld%03ld", timeNow.tv_sec, (long)(timeNow.tv_usec/1000));
01450     *env << (syncStreams ? "Synchronized d" : "D")
01451                 << "ata packets have begun arriving [" << timestampStr << "]\007\n";
01452     return;
01453   }
01454 
01455   // No luck, so reschedule this check again, after a delay:
01456   int uSecsToDelay = 100000; // 100 ms
01457   arrivalCheckTimerTask
01458     = env->taskScheduler().scheduleDelayedTask(uSecsToDelay,
01459                                (TaskFunc*)checkForPacketArrival, NULL);
01460 }
01461 
01462 void checkInterPacketGaps(void* /*clientData*/) {
01463   if (interPacketGapMaxTime == 0) return; // we're not checking
01464 
01465   // Check each subsession, counting up how many packets have been received:
01466   unsigned newTotNumPacketsReceived = 0;
01467 
01468   MediaSubsessionIterator iter(*session);
01469   MediaSubsession* subsession;
01470   while ((subsession = iter.next()) != NULL) {
01471     RTPSource* src = subsession->rtpSource();
01472     if (src == NULL) continue;
01473     newTotNumPacketsReceived += src->receptionStatsDB().totNumPacketsReceived();
01474   }
01475 
01476   if (newTotNumPacketsReceived == totNumPacketsReceived) {
01477     // No additional packets have been received since the last time we
01478     // checked, so end this stream:
01479     *env << "Closing session, because we stopped receiving packets.\n";
01480     interPacketGapCheckTimerTask = NULL;
01481     sessionAfterPlaying();
01482   } else {
01483     totNumPacketsReceived = newTotNumPacketsReceived;
01484     // Check again, after the specified delay:
01485     interPacketGapCheckTimerTask
01486       = env->taskScheduler().scheduleDelayedTask(interPacketGapMaxTime*1000000,
01487                                  (TaskFunc*)checkInterPacketGaps, NULL);
01488   }
01489 }

Generated on Wed Apr 23 16:12:01 2014 for live by  doxygen 1.5.2