liveMedia/ProxyServerMediaSession.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // "liveMedia"
00017 // Copyright (c) 1996-2013 Live Networks, Inc.  All rights reserved.
00018 // A subclass of "ServerMediaSession" that can be used to create a (unicast) RTSP servers that acts as a 'proxy' for
00019 // another (unicast or multicast) RTSP/RTP stream.
00020 // Implementation
00021 
00022 #include "liveMedia.hh"
00023 #include "RTSPCommon.hh"
00024 #include "GroupsockHelper.hh" // for "our_random()"
00025 
00026 #ifndef MILLION
00027 #define MILLION 1000000
00028 #endif
00029 
00030 // A "OnDemandServerMediaSubsession" subclass, used to implement a unicast RTSP server that's proxying another RTSP stream:
00031 
00032 class ProxyServerMediaSubsession: public OnDemandServerMediaSubsession {
00033 public:
00034   ProxyServerMediaSubsession(MediaSubsession& mediaSubsession);
00035   virtual ~ProxyServerMediaSubsession();
00036 
00037   char const* codecName() const { return fClientMediaSubsession.codecName(); }
00038 
00039 private: // redefined virtual functions
00040   virtual FramedSource* createNewStreamSource(unsigned clientSessionId,
00041                                               unsigned& estBitrate);
00042   virtual void closeStreamSource(FramedSource *inputSource);
00043   virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock,
00044                                     unsigned char rtpPayloadTypeIfDynamic,
00045                                     FramedSource* inputSource);
00046 
00047 private:
00048   static void subsessionByeHandler(void* clientData);
00049   void subsessionByeHandler();
00050 
00051   int verbosityLevel() const { return ((ProxyServerMediaSession*)fParentSession)->fVerbosityLevel; }
00052 
00053 private:
00054   friend class ProxyRTSPClient;
00055   MediaSubsession& fClientMediaSubsession; // the 'client' media subsession object that corresponds to this 'server' media subsession
00056   ProxyServerMediaSubsession* fNext; // used when we're part of a queue
00057   Boolean fHaveSetupStream;
00058 };
00059 
00060 
00062 
00063 UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyServerMediaSession& psms) { // used for debugging
00064   return env << "ProxyServerMediaSession[\"" << psms.url() << "\"]";
00065 }
00066 
00067 ProxyServerMediaSession* ProxyServerMediaSession
00068 ::createNew(UsageEnvironment& env, RTSPServer* ourRTSPServer,
00069             char const* inputStreamURL, char const* streamName,
00070             char const* username, char const* password, portNumBits tunnelOverHTTPPortNum, int verbosityLevel) {
00071   return new ProxyServerMediaSession(env, ourRTSPServer, inputStreamURL, streamName, username, password, tunnelOverHTTPPortNum, verbosityLevel);
00072 }
00073 
00074 
00075 ProxyServerMediaSession::ProxyServerMediaSession(UsageEnvironment& env, RTSPServer* ourRTSPServer,
00076                                                  char const* inputStreamURL, char const* streamName,
00077                                                  char const* username, char const* password,
00078                                                  portNumBits tunnelOverHTTPPortNum, int verbosityLevel)
00079   : ServerMediaSession(env, streamName, NULL, NULL, False, NULL),
00080     describeCompletedFlag(0), fOurRTSPServer(ourRTSPServer), fClientMediaSession(NULL),
00081     fVerbosityLevel(verbosityLevel), fPresentationTimeSessionNormalizer(new PresentationTimeSessionNormalizer(envir())) {
00082   // Open a RTSP connection to the input stream, and send a "DESCRIBE" command.
00083   // We'll use the SDP description in the response to set ourselves up.
00084   fProxyRTSPClient = createNewProxyRTSPClient(inputStreamURL, username, password,
00085                                               tunnelOverHTTPPortNum, verbosityLevel > 0 ? verbosityLevel-1 : verbosityLevel);
00086   ProxyRTSPClient::sendDESCRIBE(fProxyRTSPClient);
00087 }
00088 
00089 ProxyServerMediaSession::~ProxyServerMediaSession() {
00090   if (fVerbosityLevel > 0) {
00091     envir() << *this << "::~ProxyServerMediaSession()\n";
00092   }
00093 
00094   // Begin by sending a "TEARDOWN" command (without checking for a response):
00095   if (fProxyRTSPClient != NULL) fProxyRTSPClient->sendTeardownCommand(*fClientMediaSession, NULL, fProxyRTSPClient->auth());
00096 
00097   // Then delete our state:
00098   Medium::close(fClientMediaSession);
00099   Medium::close(fProxyRTSPClient);
00100   delete fPresentationTimeSessionNormalizer;
00101 }
00102 
00103 char const* ProxyServerMediaSession::url() const {
00104   return fProxyRTSPClient == NULL ? NULL : fProxyRTSPClient->url();
00105 }
00106 
00107 ProxyRTSPClient* ProxyServerMediaSession
00108 ::createNewProxyRTSPClient(char const* rtspURL, char const* username, char const* password,
00109                            portNumBits tunnelOverHTTPPortNum, int verbosityLevel){
00110   // default implementation:
00111   return new ProxyRTSPClient(*this, rtspURL, username, password, tunnelOverHTTPPortNum, verbosityLevel);
00112 }
00113 
00114 void ProxyServerMediaSession::continueAfterDESCRIBE(char const* sdpDescription) {
00115   describeCompletedFlag = 1;
00116 
00117   // Create a (client) "MediaSession" object from the stream's SDP description ("resultString"), then iterate through its
00118   // "MediaSubsession" objects, to set up corresponding "ServerMediaSubsession" objects that we'll use to serve the stream's tracks.
00119   do {
00120     fClientMediaSession = MediaSession::createNew(envir(), sdpDescription);
00121     if (fClientMediaSession == NULL) break;
00122 
00123     MediaSubsessionIterator iter(*fClientMediaSession);
00124     for (MediaSubsession* mss = iter.next(); mss != NULL; mss = iter.next()) {
00125       ServerMediaSubsession* smss = new ProxyServerMediaSubsession(*mss);
00126       addSubsession(smss);
00127       if (fVerbosityLevel > 0) {
00128         envir() << *this << " added new \"ProxyServerMediaSubsession\" for "
00129                 << mss->protocolName() << "/" << mss->mediumName() << "/" << mss->codecName() << " track\n";
00130       }
00131     }
00132   } while (0);
00133 }
00134 
00135 void ProxyServerMediaSession::resetDESCRIBEState() {
00136   // Delete all of our "ProxyServerMediaSubsession"s; they'll get set up again once we get a response to the new "DESCRIBE".
00137   if (fOurRTSPServer != NULL) {
00138     // First, close any RTSP client connections that may have already been set up:
00139     fOurRTSPServer->closeAllClientSessionsForServerMediaSession(this);
00140   }
00141   deleteAllSubsessions();
00142 
00143   // Finally, delete the client "MediaSession" object that we had set up after receiving the response to the previous "DESCRIBE":
00144   Medium::close(fClientMediaSession); fClientMediaSession = NULL;
00145 }
00146 
00148 
00149 static void continueAfterDESCRIBE(RTSPClient* rtspClient, int resultCode, char* resultString) {
00150   char const* res;
00151 
00152   if (resultCode == 0) {
00153     // The "DESCRIBE" command succeeded, so "resultString" should be the stream's SDP description.
00154     res = resultString;
00155   } else {
00156     // The "DESCRIBE" command failed.
00157     res = NULL;
00158   }
00159   ((ProxyRTSPClient*)rtspClient)->continueAfterDESCRIBE(res);
00160   delete[] resultString;
00161 }
00162 
00163 static void continueAfterSETUP(RTSPClient* rtspClient, int resultCode, char* resultString) {
00164   if (resultCode == 0) {
00165     ((ProxyRTSPClient*)rtspClient)->continueAfterSETUP();
00166   }
00167   delete[] resultString;
00168 }
00169 
00170 static void continueAfterOPTIONS(RTSPClient* rtspClient, int resultCode, char* resultString) {
00171   Boolean serverSupportsGetParameter = False;
00172   if (resultCode == 0) {
00173     // Note whether the server told us that it supports the "GET_PARAMETER" command:
00174     serverSupportsGetParameter = RTSPOptionIsSupported("GET_PARAMETER", resultString);
00175   }
00176   ((ProxyRTSPClient*)rtspClient)->continueAfterLivenessCommand(resultCode, serverSupportsGetParameter);
00177   delete[] resultString;
00178 }
00179 
00180 #ifdef SEND_GET_PARAMETER_IF_SUPPORTED
00181 static void continueAfterGET_PARAMETER(RTSPClient* rtspClient, int resultCode, char* resultString) {
00182   ((ProxyRTSPClient*)rtspClient)->continueAfterLivenessCommand(resultCode, True);
00183   delete[] resultString;
00184 }
00185 #endif
00186 
00187 
00189 
00190 UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyRTSPClient& proxyRTSPClient) { // used for debugging
00191   return env << "ProxyRTSPClient[\"" << proxyRTSPClient.url() << "\"]";
00192 }
00193 
00194 ProxyRTSPClient::ProxyRTSPClient(ProxyServerMediaSession& ourServerMediaSession, char const* rtspURL,
00195                                  char const* username, char const* password, portNumBits tunnelOverHTTPPortNum, int verbosityLevel)
00196   : RTSPClient(ourServerMediaSession.envir(), rtspURL, verbosityLevel, "ProxyRTSPClient",
00197                tunnelOverHTTPPortNum == (portNumBits)(~0) ? 0 : tunnelOverHTTPPortNum),
00198     fOurServerMediaSession(ourServerMediaSession), fOurURL(strDup(rtspURL)), fStreamRTPOverTCP(tunnelOverHTTPPortNum != 0),
00199     fSetupQueueHead(NULL), fSetupQueueTail(NULL), fNumSetupsDone(0), fNextDESCRIBEDelay(1),
00200     fServerSupportsGetParameter(False), fLastCommandWasPLAY(False),
00201     fLivenessCommandTask(NULL), fDESCRIBECommandTask(NULL), fSubsessionTimerTask(NULL) { 
00202   if (username != NULL && password != NULL) {
00203     fOurAuthenticator = new Authenticator(username, password);
00204   } else {
00205     fOurAuthenticator = NULL;
00206   }
00207 }
00208 
00209 void ProxyRTSPClient::reset() {
00210   envir().taskScheduler().unscheduleDelayedTask(fLivenessCommandTask); fLivenessCommandTask = NULL;
00211   envir().taskScheduler().unscheduleDelayedTask(fDESCRIBECommandTask); fDESCRIBECommandTask = NULL;
00212   envir().taskScheduler().unscheduleDelayedTask(fSubsessionTimerTask); fSubsessionTimerTask = NULL;
00213 
00214   fSetupQueueHead = fSetupQueueTail = NULL;
00215   fNumSetupsDone = 0;
00216   fNextDESCRIBEDelay = 1;
00217   fLastCommandWasPLAY = False;
00218 
00219   RTSPClient::reset();
00220 }
00221 
00222 ProxyRTSPClient::~ProxyRTSPClient() {
00223   reset();
00224 
00225   delete fOurAuthenticator;
00226   delete[] fOurURL;
00227 }
00228 
00229 void ProxyRTSPClient::continueAfterDESCRIBE(char const* sdpDescription) {
00230   if (sdpDescription != NULL) {
00231     fOurServerMediaSession.continueAfterDESCRIBE(sdpDescription);
00232 
00233     // Unlike most RTSP streams, there might be a long delay between this "DESCRIBE" command (to the downstream server) and the
00234     // subsequent "SETUP"/"PLAY" - which doesn't occur until the first time that a client requests the stream.
00235     // To prevent the proxied connection (between us and the downstream server) from timing out, we send periodic 'liveness'
00236     // ("OPTIONS" or "GET_PARAMETER") commands.  (The usual RTCP liveness mechanism wouldn't work here, because RTCP packets
00237     // don't get sent until after the "PLAY" command.)
00238     scheduleLivenessCommand();
00239   } else {
00240     // The "DESCRIBE" command failed, most likely because the server or the stream is not yet running.
00241     // Reschedule another "DESCRIBE" command to take place later:
00242     scheduleDESCRIBECommand();
00243   }
00244 }
00245 
00246 void ProxyRTSPClient::continueAfterLivenessCommand(int resultCode, Boolean serverSupportsGetParameter) {
00247   if (resultCode != 0) {
00248     // The periodic 'liveness' command failed, suggesting that the back-end stream is no longer alive.
00249     // We handle this by resetting our connection state with this server.  Any current clients will be closed, but
00250     // subsequent clients will cause new RTSP "SETUP"s and "PLAY"s to get done, restarting the stream.
00251     // Then continue by sending more "DESCRIBE" commands, to try to restore the stream.
00252 
00253     fServerSupportsGetParameter = False; // until we learn otherwise, in response to a future "OPTIONS" command
00254 
00255     if (resultCode < 0) {
00256       // The 'liveness' command failed without getting a response from the server (otherwise "resultCode" would have been > 0).
00257       // This suggests that the RTSP connection itself has failed.  Print this error code, in case it's useful for debugging:
00258       if (fVerbosityLevel > 0) {
00259         envir() << *this << ": lost connection to server ('errno': " << -resultCode << ").  Resetting...\n";
00260       }
00261     }
00262 
00263     reset();
00264     fOurServerMediaSession.resetDESCRIBEState();
00265 
00266     setBaseURL(fOurURL); // because we'll be sending an initial "DESCRIBE" all over again
00267     sendDESCRIBE(this);
00268     return;
00269   }
00270 
00271   fServerSupportsGetParameter = serverSupportsGetParameter;
00272 
00273   // Schedule the next 'liveness' command (i.e., to tell the back-end server that we're still alive):
00274   scheduleLivenessCommand();
00275 }
00276 
00277 #define SUBSESSION_TIMEOUT_SECONDS 10 // how many seconds to wait for the last track's "SETUP" to be done (note below)
00278 
00279 void ProxyRTSPClient::continueAfterSETUP() {
00280   if (fVerbosityLevel > 0) {
00281     envir() << *this << "::continueAfterSETUP(): head codec: " << fSetupQueueHead->fClientMediaSubsession.codecName()
00282             << "; numSubsessions " << fSetupQueueHead->fParentSession->numSubsessions() << "\n\tqueue:";
00283     for (ProxyServerMediaSubsession* p = fSetupQueueHead; p != NULL; p = p->fNext) {
00284       envir() << "\t" << p->fClientMediaSubsession.codecName();
00285     }
00286     envir() << "\n";
00287   }
00288   envir().taskScheduler().unscheduleDelayedTask(fSubsessionTimerTask); // in case it had been set
00289 
00290   // Dequeue the first "ProxyServerMediaSubsession" from our 'SETUP queue'.  It will be the one for which this "SETUP" was done:
00291   ProxyServerMediaSubsession* smss = fSetupQueueHead; // Assert: != NULL
00292   fSetupQueueHead = fSetupQueueHead->fNext;
00293   if (fSetupQueueHead == NULL) fSetupQueueTail = NULL;
00294 
00295   if (fSetupQueueHead != NULL) {
00296     // There are still entries in the queue, for tracks for which we have still to do a "SETUP".
00297     // "SETUP" the first of these now:
00298     sendSetupCommand(fSetupQueueHead->fClientMediaSubsession, ::continueAfterSETUP,
00299                      False, fStreamRTPOverTCP, False, fOurAuthenticator);
00300     ++fNumSetupsDone;
00301     fSetupQueueHead->fHaveSetupStream = True;
00302   } else {
00303     if (fNumSetupsDone >= smss->fParentSession->numSubsessions()) {
00304       // We've now finished setting up each of our subsessions (i.e., 'tracks').
00305       // Continue by sending a "PLAY" command (an 'aggregate' "PLAY" command, on the whole session):
00306       sendPlayCommand(smss->fClientMediaSubsession.parentSession(), NULL, -1.0f, -1.0f, 1.0f, fOurAuthenticator);
00307           // the "-1.0f" "start" parameter causes the "PLAY" to be sent without a "Range:" header, in case we'd already done
00308           // a "PLAY" before (as a result of a 'subsession timeout' (note below))
00309       fLastCommandWasPLAY = True;
00310     } else {
00311       // Some of this session's subsessions (i.e., 'tracks') remain to be "SETUP".  They might get "SETUP" very soon, but it's
00312       // also possible - if the remote client chose to play only some of the session's tracks - that they might not.
00313       // To allow for this possibility, we set a timer.  If the timer expires without the remaining subsessions getting "SETUP",
00314       // then we send a "PLAY" command anyway:
00315       fSubsessionTimerTask
00316         = envir().taskScheduler().scheduleDelayedTask(SUBSESSION_TIMEOUT_SECONDS*MILLION, (TaskFunc*)subsessionTimeout, this);
00317     }
00318   }
00319 }
00320 
00321 void ProxyRTSPClient::scheduleLivenessCommand() {
00322   // Delay a random time before sending another 'liveness' command.
00323   unsigned delayMax = sessionTimeoutParameter(); // if the server specified a maximum time between 'liveness' probes, then use that
00324   if (delayMax == 0) {
00325     delayMax = 60;
00326   }
00327 
00328   // Choose a random time from [delayMax/2,delayMax) seconds:
00329   unsigned const dM5 = delayMax*500000;
00330   unsigned uSecondsToDelay = dM5 + (dM5*our_random())%dM5;
00331   fLivenessCommandTask = envir().taskScheduler().scheduleDelayedTask(uSecondsToDelay, sendLivenessCommand, this);
00332 }
00333 
00334 void ProxyRTSPClient::sendLivenessCommand(void* clientData) {
00335   ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData;
00336 
00337   // Note.  By default, we do not send "GET_PARAMETER" as our 'liveness notification' command, even if the server previously
00338   // indicated (in its response to our earlier "OPTIONS" command) that it supported "GET_PARAMETER".  This is because
00339   // "GET_PARAMETER" crashes some camera servers (even though they claimed to support "GET_PARAMETER").
00340 #ifdef SEND_GET_PARAMETER_IF_SUPPORTED
00341   MediaSession* sess = rtspClient->fOurServerMediaSession.fClientMediaSession;
00342 
00343   if (rtspClient->fServerSupportsGetParameter && rtspClient->fNumSetupsDone > 0 && sess != NULL) {
00344     rtspClient->sendGetParameterCommand(*sess, ::continueAfterGET_PARAMETER, "", rtspClient->auth());
00345   } else {
00346 #endif
00347     rtspClient->sendOptionsCommand(::continueAfterOPTIONS, rtspClient->auth());
00348 #ifdef SEND_GET_PARAMETER_IF_SUPPORTED
00349   }
00350 #endif
00351 }
00352 
00353 void ProxyRTSPClient::scheduleDESCRIBECommand() {
00354   // Delay 1s, 2s, 4s, 8s ... 256s until sending the next "DESCRIBE".  Then, keep delaying a random time from [256..511] seconds:
00355   unsigned secondsToDelay;
00356   if (fNextDESCRIBEDelay <= 256) {
00357     secondsToDelay = fNextDESCRIBEDelay;
00358     fNextDESCRIBEDelay *= 2;
00359   } else {
00360     secondsToDelay = 256 + (our_random()&0xFF); // [256..511] seconds
00361   }
00362 
00363   if (fVerbosityLevel > 0) {
00364     envir() << *this << ": RTSP \"DESCRIBE\" command failed; trying again in " << secondsToDelay << " seconds\n";
00365   }
00366   fDESCRIBECommandTask = envir().taskScheduler().scheduleDelayedTask(secondsToDelay*MILLION, sendDESCRIBE, this);
00367 }
00368 
00369 void ProxyRTSPClient::sendDESCRIBE(void* clientData) {
00370   ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData;
00371   if (rtspClient != NULL) rtspClient->sendDescribeCommand(::continueAfterDESCRIBE, rtspClient->auth());
00372 }
00373 
00374 void ProxyRTSPClient::subsessionTimeout(void* clientData) {
00375   ((ProxyRTSPClient*)clientData)->handleSubsessionTimeout();
00376 }
00377 
00378 void ProxyRTSPClient::handleSubsessionTimeout() {
00379   // We still have one or more subsessions ('tracks') left to "SETUP".  But we can't wait any longer for them.  Send a "PLAY" now:
00380   MediaSession* sess = fOurServerMediaSession.fClientMediaSession;
00381   if (sess != NULL) sendPlayCommand(*sess, NULL, -1.0f, -1.0f, 1.0f, fOurAuthenticator);
00382   fLastCommandWasPLAY = True;
00383 }
00384 
00385 
00387 
00388 ProxyServerMediaSubsession::ProxyServerMediaSubsession(MediaSubsession& mediaSubsession)
00389   : OnDemandServerMediaSubsession(mediaSubsession.parentSession().envir(), True/*reuseFirstSource*/),
00390     fClientMediaSubsession(mediaSubsession), fNext(NULL), fHaveSetupStream(False) {
00391 }
00392 
00393 UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyServerMediaSubsession& psmss) { // used for debugging
00394   return env << "ProxyServerMediaSubsession[\"" << psmss.codecName() << "\"]";
00395 }
00396 
00397 ProxyServerMediaSubsession::~ProxyServerMediaSubsession() {
00398   if (verbosityLevel() > 0) {
00399     envir() << *this << "::~ProxyServerMediaSubsession()\n";
00400   }
00401 }
00402 
00403 FramedSource* ProxyServerMediaSubsession::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate) {
00404   ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession;
00405 
00406   if (verbosityLevel() > 0) {
00407     envir() << *this << "::createNewStreamSource(session id " << clientSessionId << ")\n";
00408   }
00409 
00410   // If we haven't yet created a data source from our 'media subsession' object, initiate() it to do so:
00411   if (fClientMediaSubsession.readSource() == NULL) {
00412     fClientMediaSubsession.receiveRawMP3ADUs(); // hack for MPA-ROBUST streams
00413     fClientMediaSubsession.receiveRawJPEGFrames(); // hack for proxying JPEG/RTP streams. (Don't do this if we're transcoding.)
00414     fClientMediaSubsession.initiate();
00415     if (verbosityLevel() > 0) {
00416       envir() << "\tInitiated: " << *this << "\n";
00417     }
00418 
00419     if (fClientMediaSubsession.readSource() != NULL) {
00420       // Add to the front of all data sources a filter that will 'normalize' their frames' presentation times,
00421       // before the frames get re-transmitted by our server:
00422       char const* const codecName = fClientMediaSubsession.codecName();
00423       FramedFilter* normalizerFilter = sms->fPresentationTimeSessionNormalizer
00424         ->createNewPresentationTimeSubsessionNormalizer(fClientMediaSubsession.readSource(), fClientMediaSubsession.rtpSource(),
00425                                                         codecName);
00426       fClientMediaSubsession.addFilter(normalizerFilter);
00427 
00428       // Some data sources require a 'framer' object to be added, before they can be fed into a "RTPSink".  Adjust for this now:
00429       if (strcmp(codecName, "H264") == 0) {
00430         fClientMediaSubsession.addFilter(H264VideoStreamDiscreteFramer::createNew(envir(), fClientMediaSubsession.readSource()));
00431       } else if (strcmp(codecName, "MP4V-ES") == 0) {
00432         fClientMediaSubsession.addFilter(MPEG4VideoStreamDiscreteFramer
00433                                          ::createNew(envir(), fClientMediaSubsession.readSource(), True/* leave PTs unmodified*/));
00434       } else if (strcmp(codecName, "MPV") == 0) {
00435         fClientMediaSubsession.addFilter(MPEG1or2VideoStreamDiscreteFramer::createNew(envir(), fClientMediaSubsession.readSource(),
00436                                                                                       False, 5.0, True/* leave PTs unmodified*/));
00437       } else if (strcmp(codecName, "DV") == 0) {
00438         fClientMediaSubsession.addFilter(DVVideoStreamFramer::createNew(envir(), fClientMediaSubsession.readSource(),
00439                                                                         False, True/* leave PTs unmodified*/));
00440       }
00441     }
00442 
00443     if (fClientMediaSubsession.rtcpInstance() != NULL) {
00444       fClientMediaSubsession.rtcpInstance()->setByeHandler(subsessionByeHandler, this);
00445     }
00446   }
00447 
00448   ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient;
00449   if (clientSessionId != 0) {
00450     // We're being called as a result of implementing a RTSP "SETUP".
00451     if (!fHaveSetupStream) {
00452       // This is our first "SETUP".  Send RTSP "SETUP" and later "PLAY" commands to the proxied server, to start streaming:
00453       // (Before sending "SETUP", enqueue ourselves on the "RTSPClient"s 'SETUP queue', so we'll be able to get the correct
00454       //  "ProxyServerMediaSubsession" to handle the response.  (Note that responses come back in the same order as requests.))
00455       Boolean queueWasEmpty = proxyRTSPClient->fSetupQueueHead == NULL;
00456       if (queueWasEmpty) {
00457         proxyRTSPClient->fSetupQueueHead = this;
00458       } else {
00459         proxyRTSPClient->fSetupQueueTail->fNext = this;
00460       }
00461       proxyRTSPClient->fSetupQueueTail = this;
00462 
00463       // Hack: If there's already a pending "SETUP" request (for another track), don't send this track's "SETUP" right away, because
00464       // the server might not properly handle 'pipelined' requests.  Instead, wait until after previous "SETUP" responses come back.
00465       if (queueWasEmpty) {
00466         proxyRTSPClient->sendSetupCommand(fClientMediaSubsession, ::continueAfterSETUP,
00467                                           False, proxyRTSPClient->fStreamRTPOverTCP, False, proxyRTSPClient->auth());
00468         ++proxyRTSPClient->fNumSetupsDone;
00469         fHaveSetupStream = True;
00470       }
00471     } else {
00472       // This is a "SETUP" from a new client.  We know that there are no other currently active clients (otherwise we wouldn't
00473       // have been called here), so we know that the substream was previously "PAUSE"d.  Send "PLAY" downstream once again,
00474       // to resume the stream:
00475       if (!proxyRTSPClient->fLastCommandWasPLAY) { // so that we send only one "PLAY"; not one for each subsession
00476         proxyRTSPClient->sendPlayCommand(fClientMediaSubsession.parentSession(), NULL, -1.0f/*resume from previous point*/,
00477                                          -1.0f, 1.0f, proxyRTSPClient->auth());
00478         proxyRTSPClient->fLastCommandWasPLAY = True;
00479       }
00480     }
00481   }
00482 
00483   estBitrate = fClientMediaSubsession.bandwidth();
00484   if (estBitrate == 0) estBitrate = 50; // kbps, estimate
00485   return fClientMediaSubsession.readSource();
00486 }
00487 
00488 void ProxyServerMediaSubsession::closeStreamSource(FramedSource* inputSource) {
00489   if (verbosityLevel() > 0) {
00490     envir() << *this << "::closeStreamSource()\n";
00491   }
00492   // Because there's only one input source for this 'subsession' (regardless of how many downstream clients are proxying it),
00493   // we don't close the input source here.  (Instead, we wait until *this* object gets deleted.)
00494   // However, because (as evidenced by this function having been called) we no longer have any clients accessing the stream,
00495   // then we "PAUSE" the downstream proxied stream, until a new client arrives:
00496   if (fHaveSetupStream) {
00497     ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession;
00498     ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient;
00499     if (proxyRTSPClient->fLastCommandWasPLAY) { // so that we send only one "PAUSE"; not one for each subsession
00500       proxyRTSPClient->sendPauseCommand(fClientMediaSubsession.parentSession(), NULL, proxyRTSPClient->auth());
00501       proxyRTSPClient->fLastCommandWasPLAY = False;
00502     }
00503   }
00504 }
00505 
00506 RTPSink* ProxyServerMediaSubsession
00507 ::createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource) {
00508   if (verbosityLevel() > 0) {
00509     envir() << *this << "::createNewRTPSink()\n";
00510   }
00511 
00512   // Create (and return) the appropriate "RTPSink" object for our codec:
00513   RTPSink* newSink;
00514   char const* const codecName = fClientMediaSubsession.codecName();
00515   if (strcmp(codecName, "AC3") == 0 || strcmp(codecName, "EAC3") == 0) {
00516     newSink = AC3AudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
00517                                          fClientMediaSubsession.rtpTimestampFrequency()); 
00518 #if 0 // This code does not work; do *not* enable it:
00519   } else if (strcmp(codecName, "AMR") == 0 || strcmp(codecName, "AMR-WB") == 0) {
00520     Boolean isWideband = strcmp(codecName, "AMR-WB") == 0;
00521     newSink = AMRAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
00522                                          isWideband, fClientMediaSubsession.numChannels());
00523 #endif
00524   } else if (strcmp(codecName, "DV") == 0) {
00525     newSink = DVVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
00526   } else if (strcmp(codecName, "GSM") == 0) {
00527     newSink = GSMAudioRTPSink::createNew(envir(), rtpGroupsock);
00528   } else if (strcmp(codecName, "H263-1998") == 0 || strcmp(codecName, "H263-2000") == 0) {
00529     newSink = H263plusVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
00530                                               fClientMediaSubsession.rtpTimestampFrequency()); 
00531   } else if (strcmp(codecName, "H264") == 0) {
00532     newSink = H264VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
00533                                           fClientMediaSubsession.fmtp_spropparametersets());
00534   } else if (strcmp(codecName, "JPEG") == 0) {
00535     newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock, 26, 90000, "video", "JPEG",
00536                                        1/*numChannels*/, False/*allowMultipleFramesPerPacket*/, False/*doNormalMBitRule*/);
00537   } else if (strcmp(codecName, "MP4A-LATM") == 0) {
00538     newSink = MPEG4LATMAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
00539                                                fClientMediaSubsession.rtpTimestampFrequency(),
00540                                                fClientMediaSubsession.fmtp_config(),
00541                                                fClientMediaSubsession.numChannels());
00542   } else if (strcmp(codecName, "MP4V-ES") == 0) {
00543     newSink = MPEG4ESVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
00544                                              fClientMediaSubsession.rtpTimestampFrequency(),
00545                                              fClientMediaSubsession.fmtp_profile_level_id(), fClientMediaSubsession.fmtp_config()); 
00546   } else if (strcmp(codecName, "MPA") == 0) {
00547     newSink = MPEG1or2AudioRTPSink::createNew(envir(), rtpGroupsock);
00548   } else if (strcmp(codecName, "MPA-ROBUST") == 0) {
00549     newSink = MP3ADURTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
00550   } else if (strcmp(codecName, "MPEG4-GENERIC") == 0) {
00551     newSink = MPEG4GenericRTPSink::createNew(envir(), rtpGroupsock,
00552                                              rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(),
00553                                              fClientMediaSubsession.mediumName(), fClientMediaSubsession.fmtp_mode(),
00554                                              fClientMediaSubsession.fmtp_config(), fClientMediaSubsession.numChannels());
00555   } else if (strcmp(codecName, "MPV") == 0) {
00556     newSink = MPEG1or2VideoRTPSink::createNew(envir(), rtpGroupsock);
00557   } else if (strcmp(codecName, "T140") == 0) {
00558     newSink = T140TextRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
00559   } else if (strcmp(codecName, "VORBIS") == 0) {
00560     newSink = VorbisAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
00561                                             fClientMediaSubsession.rtpTimestampFrequency(), fClientMediaSubsession.numChannels(),
00562                                             fClientMediaSubsession.fmtp_config()); 
00563   } else if (strcmp(codecName, "VP8") == 0) {
00564     newSink = VP8VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
00565   } else if (strcmp(codecName, "AMR") == 0 || strcmp(codecName, "AMR-WB") == 0) {
00566     // Proxying of these codecs is currently *not* supported, because the data received by the "RTPSource" object is not in a
00567     // form that can be fed directly into a corresponding "RTPSink" object.
00568     if (verbosityLevel() > 0) {
00569       envir() << "\treturns NULL (because we currently don't support the proxying of \""
00570               << fClientMediaSubsession.mediumName() << "/" << codecName << "\" streams)\n";
00571     }
00572     return NULL;
00573   } else if (strcmp(codecName, "QCELP") == 0 ||
00574              strcmp(codecName, "H261") == 0 ||
00575              strcmp(codecName, "H263-1998") == 0 || strcmp(codecName, "H263-2000") == 0 ||
00576              strcmp(codecName, "X-QT") == 0 || strcmp(codecName, "X-QUICKTIME") == 0) {
00577     // This codec requires a specialized RTP payload format; however, we don't yet have an appropriate "RTPSink" subclass for it:
00578     if (verbosityLevel() > 0) {
00579       envir() << "\treturns NULL (because we don't have a \"RTPSink\" subclass for this RTP payload format)\n";
00580     }
00581     return NULL;
00582   } else {
00583     // This codec is assumed to have a simple RTP payload format that can be implemented just with a "SimpleRTPSink":
00584     Boolean allowMultipleFramesPerPacket = True; // by default
00585     Boolean doNormalMBitRule = True; // by default
00586     // Some codecs change the above default parameters:
00587     if (strcmp(codecName, "MP2T") == 0) {
00588       doNormalMBitRule = False; // no RTP 'M' bit
00589     }
00590     newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock,
00591                                        rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(),
00592                                        fClientMediaSubsession.mediumName(), fClientMediaSubsession.codecName(),
00593                                        fClientMediaSubsession.numChannels(), allowMultipleFramesPerPacket, doNormalMBitRule);
00594   }
00595 
00596   // Because our relayed frames' presentation times are inaccurate until the input frames have been RTCP-synchronized,
00597   // we temporarily disable RTCP "SR" reports for this "RTPSink" object:
00598   newSink->enableRTCPReports() = False;
00599 
00600   // Also tell our "PresentationTimeSubsessionNormalizer" object about the "RTPSink", so it can enable RTCP "SR" reports later:
00601   PresentationTimeSubsessionNormalizer* ssNormalizer;
00602   if (strcmp(codecName, "H264") == 0 ||
00603       strcmp(codecName, "MP4V-ES") == 0 ||
00604       strcmp(codecName, "MPV") == 0 ||
00605       strcmp(codecName, "DV") == 0) {
00606     // There was a separate 'framer' object in front of the "PresentationTimeSubsessionNormalizer", so go back one object to get it:
00607     ssNormalizer = (PresentationTimeSubsessionNormalizer*)(((FramedFilter*)inputSource)->inputSource());
00608   } else {
00609     ssNormalizer = (PresentationTimeSubsessionNormalizer*)inputSource;
00610   }
00611   ssNormalizer->setRTPSink(newSink);
00612 
00613   return newSink;
00614 }
00615 
00616 void ProxyServerMediaSubsession::subsessionByeHandler(void* clientData) {
00617   ((ProxyServerMediaSubsession*)clientData)->subsessionByeHandler();
00618 }
00619 
00620 void ProxyServerMediaSubsession::subsessionByeHandler() {
00621   if (verbosityLevel() > 0) {
00622     envir() << *this << ": received RTCP \"BYE\"\n";
00623   }
00624 
00625   // This "BYE" signals that our input source has (effectively) closed, so handle this accordingly:
00626   FramedSource::handleClosure(fClientMediaSubsession.readSource());
00627 
00628   // Then, close our input source for real:
00629   fClientMediaSubsession.deInitiate();
00630 }
00631 
00632 
00634 
00635 // PresentationTimeSessionNormalizer:
00636 
00637 PresentationTimeSessionNormalizer::PresentationTimeSessionNormalizer(UsageEnvironment& env)
00638   : Medium(env),
00639     fSubsessionNormalizers(NULL), fMasterSSNormalizer(NULL) {
00640 }
00641 
00642 PresentationTimeSessionNormalizer::~PresentationTimeSessionNormalizer() {
00643   while (fSubsessionNormalizers != NULL) {
00644     delete fSubsessionNormalizers;
00645   }
00646 }
00647 
00648 PresentationTimeSubsessionNormalizer*
00649 PresentationTimeSessionNormalizer::createNewPresentationTimeSubsessionNormalizer(FramedSource* inputSource, RTPSource* rtpSource,
00650                                                                                  char const* codecName) {
00651   fSubsessionNormalizers
00652     = new PresentationTimeSubsessionNormalizer(*this, inputSource, rtpSource, codecName, fSubsessionNormalizers);
00653   return fSubsessionNormalizers;
00654 }
00655 
00656 void PresentationTimeSessionNormalizer::normalizePresentationTime(PresentationTimeSubsessionNormalizer* ssNormalizer,
00657                                                                   struct timeval& toPT, struct timeval const& fromPT) {
00658   Boolean const hasBeenSynced = ssNormalizer->fRTPSource->hasBeenSynchronizedUsingRTCP();
00659 
00660   if (!hasBeenSynced) {
00661     // If "fromPT" has not yet been RTCP-synchronized, then it was generated by our own receiving code, and thus
00662     // is already aligned with 'wall-clock' time.  Just copy it 'as is' to "toPT":
00663     toPT = fromPT;
00664   } else {
00665     if (fMasterSSNormalizer == NULL) {
00666       // Make "ssNormalizer" the 'master' subsession - meaning that its presentation time is adjusted to align with 'wall clock'
00667       // time, and the presentation times of other subsessions (if any) are adjusted to retain their relative separation with
00668       // those of the master:
00669       fMasterSSNormalizer = ssNormalizer;
00670 
00671       struct timeval timeNow;
00672       gettimeofday(&timeNow, NULL);
00673 
00674       // Compute: fPTAdjustment = timeNow - fromPT
00675       fPTAdjustment.tv_sec = timeNow.tv_sec - fromPT.tv_sec;
00676       fPTAdjustment.tv_usec = timeNow.tv_usec - fromPT.tv_usec;
00677       // Note: It's OK if one or both of these fields underflows; the result still works out OK later.
00678     }
00679 
00680     // Compute a normalized presentation time: toPT = fromPT + fPTAdjustment
00681     toPT.tv_sec = fromPT.tv_sec + fPTAdjustment.tv_sec - 1;
00682     toPT.tv_usec = fromPT.tv_usec + fPTAdjustment.tv_usec + MILLION;
00683     while (toPT.tv_usec > MILLION) { ++toPT.tv_sec; toPT.tv_usec -= MILLION; }
00684 
00685     // Because "ssNormalizer"s relayed presentation times are accurate from now on, enable RTCP "SR" reports for its "RTPSink":
00686     RTPSink* const rtpSink = ssNormalizer->fRTPSink;
00687     if (rtpSink != NULL) { // sanity check; should always be true
00688       rtpSink->enableRTCPReports() = True;
00689     }
00690   }
00691 }
00692 
00693 void PresentationTimeSessionNormalizer
00694 ::removePresentationTimeSubsessionNormalizer(PresentationTimeSubsessionNormalizer* ssNormalizer) {
00695   // Unlink "ssNormalizer" from the linked list (starting with "fSubsessionNormalizers"):
00696   if (fSubsessionNormalizers == ssNormalizer) {
00697     fSubsessionNormalizers = fSubsessionNormalizers->fNext;
00698   } else {
00699     PresentationTimeSubsessionNormalizer** ssPtrPtr = &(fSubsessionNormalizers->fNext);
00700     while (*ssPtrPtr != ssNormalizer) ssPtrPtr = &((*ssPtrPtr)->fNext);
00701     *ssPtrPtr = (*ssPtrPtr)->fNext;
00702   }
00703 }
00704 
00705 // PresentationTimeSubsessionNormalizer:
00706 
00707 PresentationTimeSubsessionNormalizer
00708 ::PresentationTimeSubsessionNormalizer(PresentationTimeSessionNormalizer& parent, FramedSource* inputSource, RTPSource* rtpSource,
00709                                        char const* codecName, PresentationTimeSubsessionNormalizer* next)
00710   : FramedFilter(parent.envir(), inputSource),
00711     fParent(parent), fRTPSource(rtpSource), fRTPSink(NULL), fCodecName(codecName), fNext(next) {
00712 }
00713 
00714 PresentationTimeSubsessionNormalizer::~PresentationTimeSubsessionNormalizer() {
00715   fParent.removePresentationTimeSubsessionNormalizer(this);
00716 }
00717 
00718 void PresentationTimeSubsessionNormalizer::afterGettingFrame(void* clientData, unsigned frameSize,
00719                                                              unsigned numTruncatedBytes,
00720                                                              struct timeval presentationTime,
00721                                                              unsigned durationInMicroseconds) {
00722   ((PresentationTimeSubsessionNormalizer*)clientData)
00723     ->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds);
00724 }
00725 
00726 void PresentationTimeSubsessionNormalizer::afterGettingFrame(unsigned frameSize,
00727                                                              unsigned numTruncatedBytes,
00728                                                              struct timeval presentationTime,
00729                                                              unsigned durationInMicroseconds) {
00730   // This filter is implemented by passing all frames through unchanged, except that "fPresentationTime" is changed:
00731   fFrameSize = frameSize;
00732   fNumTruncatedBytes = numTruncatedBytes;
00733   fDurationInMicroseconds = durationInMicroseconds;
00734 
00735   fParent.normalizePresentationTime(this, fPresentationTime, presentationTime);
00736 
00737   // Hack for JPEG/RTP proxying.  Because we're proxying JPEG by just copying the raw JPEG/RTP payloads, without interpreting them,
00738   // we need to also 'copy' the RTP 'M' (marker) bit from the "RTPSource" to the "RTPSink":
00739   if (fRTPSource->curPacketMarkerBit() && strcmp(fCodecName, "JPEG") == 0) ((SimpleRTPSink*)fRTPSink)->setMBitOnNextPacket();
00740 
00741   // Complete delivery:
00742   FramedSource::afterGetting(this);
00743 }
00744 
00745 void PresentationTimeSubsessionNormalizer::doGetNextFrame() {
00746   fInputSource->getNextFrame(fTo, fMaxSize, afterGettingFrame, this, FramedSource::handleClosure, this);
00747 }

Generated on Mon Apr 29 13:28:03 2013 for live by  doxygen 1.5.2