liveMedia/OnDemandServerMediaSubsession.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00015 **********/
00016 // "liveMedia"
00017 // Copyright (c) 1996-2008 Live Networks, Inc.  All rights reserved.
00018 // A 'ServerMediaSubsession' object that creates new, unicast, "RTPSink"s
00019 // on demand.
00020 // Implementation
00021 
00022 #include "OnDemandServerMediaSubsession.hh"
00023 #include "RTCP.hh"
00024 #include "BasicUDPSink.hh"
00025 #include <GroupsockHelper.hh>
00026 
00027 OnDemandServerMediaSubsession
00028 ::OnDemandServerMediaSubsession(UsageEnvironment& env,
00029                                 Boolean reuseFirstSource,
00030                                 portNumBits initialPortNum)
00031   : ServerMediaSubsession(env),
00032     fReuseFirstSource(reuseFirstSource), fInitialPortNum(initialPortNum),
00033     fLastStreamToken(NULL), fSDPLines(NULL) {
00034   fDestinationsHashTable = HashTable::create(ONE_WORD_HASH_KEYS);
00035   gethostname(fCNAME, sizeof fCNAME);
00036   fCNAME[sizeof fCNAME-1] = '\0'; // just in case
00037 }
00038 
00039 class Destinations {
00040 public:
00041   Destinations(struct in_addr const& destAddr,
00042                Port const& rtpDestPort,
00043                Port const& rtcpDestPort)
00044     : isTCP(False), addr(destAddr), rtpPort(rtpDestPort), rtcpPort(rtcpDestPort) {
00045   }
00046   Destinations(int tcpSockNum, unsigned char rtpChanId, unsigned char rtcpChanId)
00047     : isTCP(True), rtpPort(0) /*dummy*/, rtcpPort(0) /*dummy*/,
00048       tcpSocketNum(tcpSockNum), rtpChannelId(rtpChanId), rtcpChannelId(rtcpChanId) {
00049   }
00050 
00051 public:
00052   Boolean isTCP;
00053   struct in_addr addr;
00054   Port rtpPort;
00055   Port rtcpPort;
00056   int tcpSocketNum;
00057   unsigned char rtpChannelId, rtcpChannelId;
00058 };
00059 
00060 OnDemandServerMediaSubsession::~OnDemandServerMediaSubsession() {
00061   delete[] fSDPLines;
00062 
00063   // Clean out the destinations hash table:
00064   while (1) {
00065     Destinations* destinations
00066       = (Destinations*)(fDestinationsHashTable->RemoveNext());
00067     if (destinations == NULL) break;
00068     delete destinations;
00069   }
00070   delete fDestinationsHashTable;
00071 }
00072 
00073 char const*
00074 OnDemandServerMediaSubsession::sdpLines() {
00075   if (fSDPLines == NULL) {
00076     // We need to construct a set of SDP lines that describe this
00077     // subsession (as a unicast stream).  To do so, we first create
00078     // dummy (unused) source and "RTPSink" objects,
00079     // whose parameters we use for the SDP lines: 
00080     unsigned estBitrate; // unused
00081     FramedSource* inputSource = createNewStreamSource(0, estBitrate);
00082     if (inputSource == NULL) return NULL; // file not found
00083 
00084     struct in_addr dummyAddr;
00085     dummyAddr.s_addr = 0;
00086     Groupsock dummyGroupsock(envir(), dummyAddr, 0, 0);
00087     unsigned char rtpPayloadType = 96 + trackNumber()-1; // if dynamic
00088     RTPSink* dummyRTPSink
00089       = createNewRTPSink(&dummyGroupsock, rtpPayloadType, inputSource);
00090 
00091     setSDPLinesFromRTPSink(dummyRTPSink, inputSource);
00092     Medium::close(dummyRTPSink);
00093     closeStreamSource(inputSource);
00094   }
00095 
00096   return fSDPLines;
00097 }
00098 
00099 // A class that represents the state of an ongoing stream
00100 class StreamState {
00101 public:
00102   StreamState(OnDemandServerMediaSubsession& master,
00103               Port const& serverRTPPort, Port const& serverRTCPPort,
00104               RTPSink* rtpSink, BasicUDPSink* udpSink,
00105               unsigned totalBW, FramedSource* mediaSource,
00106               Groupsock* rtpGS, Groupsock* rtcpGS);
00107   virtual ~StreamState();
00108 
00109   void startPlaying(Destinations* destinations,
00110                     TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData);
00111   void pause();
00112   void endPlaying(Destinations* destinations);
00113   void reclaim();
00114 
00115   unsigned& referenceCount() { return fReferenceCount; }
00116 
00117   Port const& serverRTPPort() const { return fServerRTPPort; }
00118   Port const& serverRTCPPort() const { return fServerRTCPPort; }
00119 
00120   RTPSink* rtpSink() const { return fRTPSink; }
00121 
00122   float streamDuration() const { return fStreamDuration; }
00123 
00124   FramedSource* mediaSource() const { return fMediaSource; }
00125 
00126 private:
00127   OnDemandServerMediaSubsession& fMaster;
00128   Boolean fAreCurrentlyPlaying;
00129   unsigned fReferenceCount;
00130 
00131   Port fServerRTPPort, fServerRTCPPort;
00132 
00133   RTPSink* fRTPSink;
00134   BasicUDPSink* fUDPSink;
00135 
00136   float fStreamDuration;
00137   unsigned fTotalBW; RTCPInstance* fRTCPInstance;
00138 
00139   FramedSource* fMediaSource;
00140 
00141   Groupsock* fRTPgs; Groupsock* fRTCPgs;
00142 };
00143 
00144 void OnDemandServerMediaSubsession
00145 ::getStreamParameters(unsigned clientSessionId,
00146                       netAddressBits clientAddress,
00147                       Port const& clientRTPPort,
00148                       Port const& clientRTCPPort,
00149                       int tcpSocketNum,
00150                       unsigned char rtpChannelId,
00151                       unsigned char rtcpChannelId,
00152                       netAddressBits& destinationAddress,
00153                       u_int8_t& /*destinationTTL*/,
00154                       Boolean& isMulticast,
00155                       Port& serverRTPPort,
00156                       Port& serverRTCPPort,
00157                       void*& streamToken) {
00158   if (destinationAddress == 0) destinationAddress = clientAddress;
00159   struct in_addr destinationAddr; destinationAddr.s_addr = destinationAddress;
00160   isMulticast = False;
00161 
00162   if (fLastStreamToken != NULL && fReuseFirstSource) {
00163     // Special case: Rather than creating a new 'StreamState',
00164     // we reuse the one that we've already created:
00165     serverRTPPort = ((StreamState*)fLastStreamToken)->serverRTPPort();
00166     serverRTCPPort = ((StreamState*)fLastStreamToken)->serverRTCPPort();
00167     ++((StreamState*)fLastStreamToken)->referenceCount();
00168     streamToken = fLastStreamToken;
00169   } else {
00170     // Normal case: Create a new media source:
00171     unsigned streamBitrate;
00172     FramedSource* mediaSource
00173       = createNewStreamSource(clientSessionId, streamBitrate);
00174 
00175     // Create 'groupsock' and 'sink' objects for the destination,
00176     // using previously unused server port numbers:
00177     RTPSink* rtpSink;
00178     BasicUDPSink* udpSink;
00179     Groupsock* rtpGroupsock;
00180     Groupsock* rtcpGroupsock;
00181     portNumBits serverPortNum;
00182     if (clientRTCPPort.num() == 0) {
00183       // We're streaming raw UDP (not RTP). Create a single groupsock:
00184       NoReuse dummy; // ensures that we skip over ports that are already in use
00185       for (serverPortNum = fInitialPortNum; ; ++serverPortNum) {
00186         struct in_addr dummyAddr; dummyAddr.s_addr = 0;
00187 
00188         serverRTPPort = serverPortNum;
00189         rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);
00190         if (rtpGroupsock->socketNum() >= 0) break; // success
00191       }
00192 
00193       rtcpGroupsock = NULL;
00194       rtpSink = NULL;
00195       udpSink = BasicUDPSink::createNew(envir(), rtpGroupsock);
00196     } else {
00197       // Normal case: We're streaming RTP (over UDP or TCP).  Create a pair of
00198       // groupsocks (RTP and RTCP), with adjacent port numbers (RTP port number even):
00199       NoReuse dummy; // ensures that we skip over ports that are already in use
00200       for (portNumBits serverPortNum = fInitialPortNum; ; serverPortNum += 2) {
00201         struct in_addr dummyAddr; dummyAddr.s_addr = 0;
00202 
00203         serverRTPPort = serverPortNum;
00204         rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);
00205         if (rtpGroupsock->socketNum() < 0) {
00206           delete rtpGroupsock;
00207           continue; // try again
00208         }
00209 
00210         serverRTCPPort = serverPortNum+1;
00211         rtcpGroupsock = new Groupsock(envir(), dummyAddr, serverRTCPPort, 255);
00212         if (rtcpGroupsock->socketNum() < 0) {
00213           delete rtpGroupsock;
00214           delete rtcpGroupsock;
00215           continue; // try again
00216         }
00217 
00218         break; // success
00219       }
00220       
00221       unsigned char rtpPayloadType = 96 + trackNumber()-1; // if dynamic
00222       rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource);
00223       udpSink = NULL;
00224     }
00225     
00226     // Turn off the destinations for each groupsock.  They'll get set later
00227     // (unless TCP is used instead):
00228     if (rtpGroupsock != NULL) rtpGroupsock->removeAllDestinations();
00229     if (rtcpGroupsock != NULL) rtcpGroupsock->removeAllDestinations();
00230 
00231     // Set up the state of the stream.  The stream will get started later:
00232     streamToken = fLastStreamToken
00233       = new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink,
00234                         streamBitrate, mediaSource,
00235                         rtpGroupsock, rtcpGroupsock);
00236   }
00237   
00238   // Record these destinations as being for this client session id:
00239   Destinations* destinations;
00240   if (tcpSocketNum < 0) { // UDP
00241     destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort);
00242   } else { // TCP
00243     destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId);
00244   }
00245   fDestinationsHashTable->Add((char const*)clientSessionId, destinations);
00246 }
00247 
00248 void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,
00249                                                 void* streamToken,
00250                                                 TaskFunc* rtcpRRHandler,
00251                                                 void* rtcpRRHandlerClientData,
00252                                                 unsigned short& rtpSeqNum,
00253                                                 unsigned& rtpTimestamp) {
00254   StreamState* streamState = (StreamState*)streamToken; 
00255   Destinations* destinations
00256     = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
00257   if (streamState != NULL) {
00258     streamState->startPlaying(destinations,
00259                               rtcpRRHandler, rtcpRRHandlerClientData);
00260     if (streamState->rtpSink() != NULL) {
00261       rtpSeqNum = streamState->rtpSink()->currentSeqNo();
00262       rtpTimestamp = streamState->rtpSink()->presetNextTimestamp();
00263     }
00264   }
00265 }
00266 
00267 void OnDemandServerMediaSubsession::pauseStream(unsigned /*clientSessionId*/,
00268                                                 void* streamToken) {
00269   // Pausing isn't allowed if multiple clients are receiving data from
00270   // the same source:
00271   if (fReuseFirstSource) return;
00272 
00273   StreamState* streamState = (StreamState*)streamToken; 
00274   if (streamState != NULL) streamState->pause();
00275 }
00276 
00277 void OnDemandServerMediaSubsession::seekStream(unsigned /*clientSessionId*/,
00278                                                void* streamToken, float seekNPT) {
00279   // Seeking isn't allowed if multiple clients are receiving data from
00280   // the same source:
00281   if (fReuseFirstSource) return;
00282 
00283   StreamState* streamState = (StreamState*)streamToken; 
00284   if (streamState != NULL && streamState->mediaSource() != NULL) {
00285     seekStreamSource(streamState->mediaSource(), seekNPT);
00286   }
00287 }
00288 
00289 void OnDemandServerMediaSubsession::setStreamScale(unsigned /*clientSessionId*/,
00290                                                    void* streamToken, float scale) {
00291   // Changing the scale factor isn't allowed if multiple clients are receiving data
00292   // from the same source:
00293   if (fReuseFirstSource) return;
00294 
00295   StreamState* streamState = (StreamState*)streamToken; 
00296   if (streamState != NULL && streamState->mediaSource() != NULL) {
00297     setStreamSourceScale(streamState->mediaSource(), scale);
00298   }
00299 }
00300 
00301 void OnDemandServerMediaSubsession::deleteStream(unsigned clientSessionId,
00302                                                  void*& streamToken) {
00303   StreamState* streamState = (StreamState*)streamToken; 
00304 
00305   // Look up (and remove) the destinations for this client session:
00306   Destinations* destinations
00307     = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
00308   if (destinations != NULL) {
00309     fDestinationsHashTable->Remove((char const*)clientSessionId);
00310 
00311     // Stop streaming to these destinations:
00312     if (streamState != NULL) streamState->endPlaying(destinations);
00313   }
00314 
00315   // Delete the "StreamState" structure if it's no longer being used:
00316   if (streamState != NULL) {
00317     if (streamState->referenceCount() > 0) --streamState->referenceCount();
00318     if (streamState->referenceCount() == 0) {
00319       delete streamState;
00320       if (fLastStreamToken == streamToken) fLastStreamToken = NULL; 
00321       streamToken = NULL;
00322     }
00323   }
00324 
00325   // Finally, delete the destinations themselves:
00326   delete destinations;
00327 }
00328 
00329 char const* OnDemandServerMediaSubsession
00330 ::getAuxSDPLine(RTPSink* rtpSink, FramedSource* /*inputSource*/) {
00331   // Default implementation:
00332   return rtpSink == NULL ? NULL : rtpSink->auxSDPLine();
00333 }
00334 
00335 void OnDemandServerMediaSubsession::seekStreamSource(FramedSource* /*inputSource*/,
00336                                                      float /*seekNPT*/) {
00337   // Default implementation: Do nothing
00338 }
00339 
00340 void OnDemandServerMediaSubsession
00341 ::setStreamSourceScale(FramedSource* /*inputSource*/, float /*scale*/) {
00342   // Default implementation: Do nothing
00343 }
00344 
00345 void OnDemandServerMediaSubsession::closeStreamSource(FramedSource *inputSource) {
00346   Medium::close(inputSource);
00347 }
00348 
00349 void OnDemandServerMediaSubsession
00350 ::setSDPLinesFromRTPSink(RTPSink* rtpSink, FramedSource* inputSource) {
00351   if (rtpSink == NULL) return;
00352 
00353   char const* mediaType = rtpSink->sdpMediaType();
00354   unsigned char rtpPayloadType = rtpSink->rtpPayloadType();
00355   struct in_addr serverAddrForSDP; serverAddrForSDP.s_addr = fServerAddressForSDP;
00356   char* const ipAddressStr = strDup(our_inet_ntoa(serverAddrForSDP));
00357   char* rtpmapLine = rtpSink->rtpmapLine();
00358   char const* rangeLine = rangeSDPLine();
00359   char const* auxSDPLine = getAuxSDPLine(rtpSink, inputSource);
00360   if (auxSDPLine == NULL) auxSDPLine = "";
00361   
00362   char const* const sdpFmt =
00363     "m=%s %u RTP/AVP %d\r\n"
00364     "c=IN IP4 %s\r\n"
00365     "%s"
00366     "%s"
00367     "%s"
00368     "a=control:%s\r\n";
00369   unsigned sdpFmtSize = strlen(sdpFmt)
00370     + strlen(mediaType) + 5 /* max short len */ + 3 /* max char len */
00371     + strlen(ipAddressStr)
00372     + strlen(rtpmapLine)
00373     + strlen(rangeLine)
00374     + strlen(auxSDPLine)
00375     + strlen(trackId());
00376   char* sdpLines = new char[sdpFmtSize];
00377   sprintf(sdpLines, sdpFmt,
00378           mediaType, // m= <media>
00379           fPortNumForSDP, // m= <port>
00380           rtpPayloadType, // m= <fmt list>
00381           ipAddressStr, // c= address
00382           rtpmapLine, // a=rtpmap:... (if present)
00383           rangeLine, // a=range:... (if present)
00384           auxSDPLine, // optional extra SDP line
00385           trackId()); // a=control:<track-id>
00386   delete[] (char*)rangeLine; delete[] rtpmapLine; delete[] ipAddressStr;
00387   
00388   fSDPLines = strDup(sdpLines);
00389   delete[] sdpLines;
00390 }
00391 
00392 
00394 
00395 static void afterPlayingStreamState(void* clientData) {
00396   StreamState* streamState = (StreamState*)clientData;
00397   if (streamState->streamDuration() == 0.0) {
00398     // When the input stream ends, tear it down.  This will cause a RTCP "BYE"
00399     // to be sent to each client, teling it that the stream has ended.
00400     // (Because the stream didn't have a known duration, there was no other
00401     //  way for clients to know when the stream ended.)
00402     streamState->reclaim();
00403   }
00404   // Otherwise, keep the stream alive, in case a client wants to
00405   // subsequently re-play the stream starting from somewhere other than the end.
00406   // (This can be done only on streams that have a known duration.)
00407 }
00408 
00409 StreamState::StreamState(OnDemandServerMediaSubsession& master,
00410                          Port const& serverRTPPort, Port const& serverRTCPPort,
00411                          RTPSink* rtpSink, BasicUDPSink* udpSink,
00412                          unsigned totalBW, FramedSource* mediaSource,
00413                          Groupsock* rtpGS, Groupsock* rtcpGS)
00414   : fMaster(master), fAreCurrentlyPlaying(False), fReferenceCount(1),
00415     fServerRTPPort(serverRTPPort), fServerRTCPPort(serverRTCPPort),
00416     fRTPSink(rtpSink), fUDPSink(udpSink), fStreamDuration(master.duration()),
00417     fTotalBW(totalBW), fRTCPInstance(NULL) /* created later */,
00418     fMediaSource(mediaSource), fRTPgs(rtpGS), fRTCPgs(rtcpGS) {
00419 }  
00420 
00421 StreamState::~StreamState() {
00422   reclaim();
00423 }
00424 
00425 void StreamState
00426 ::startPlaying(Destinations* dests,
00427                TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData) {
00428   if (dests == NULL) return;
00429   if (!fAreCurrentlyPlaying && fMediaSource != NULL) {
00430     if (fRTPSink != NULL) {
00431       fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
00432       fAreCurrentlyPlaying = True;
00433     } else if (fUDPSink != NULL) {
00434       fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
00435       fAreCurrentlyPlaying = True;
00436     }
00437   }
00438 
00439   if (fRTCPInstance == NULL && fRTPSink != NULL) {
00440     // Create (and start) a 'RTCP instance' for this RTP sink:
00441     fRTCPInstance
00442       = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs,
00443                                 fTotalBW, (unsigned char*)fMaster.fCNAME,
00444                                 fRTPSink, NULL /* we're a server */);
00445         // Note: This starts RTCP running automatically
00446   }
00447 
00448   if (dests->isTCP) {
00449     // Change RTP and RTCP to use the TCP socket instead of UDP:
00450     if (fRTPSink != NULL) {
00451       fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
00452     }
00453     if (fRTCPInstance != NULL) {
00454       fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
00455       fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,
00456                                           rtcpRRHandler, rtcpRRHandlerClientData);
00457     }
00458   } else {
00459     // Tell the RTP and RTCP 'groupsocks' about this destination
00460     // (in case they don't already have it):
00461     if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort);
00462     if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort);
00463     if (fRTCPInstance != NULL) {
00464       fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
00465                                           rtcpRRHandler, rtcpRRHandlerClientData);
00466     }
00467   }
00468 }
00469 
00470 void StreamState::pause() {
00471   if (fRTPSink != NULL) fRTPSink->stopPlaying();
00472   if (fUDPSink != NULL) fUDPSink->stopPlaying();
00473   fAreCurrentlyPlaying = False;
00474 }
00475 
00476 void StreamState::endPlaying(Destinations* dests) {
00477   if (dests->isTCP) {
00478     if (fRTPSink != NULL) {
00479       fRTPSink->removeStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
00480     }
00481     if (fRTCPInstance != NULL) {
00482       fRTCPInstance->removeStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
00483       fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,
00484                                           NULL, NULL);
00485     }
00486   } else {
00487     // Tell the RTP and RTCP 'groupsocks' to stop using these destinations:
00488     if (fRTPgs != NULL) fRTPgs->removeDestination(dests->addr, dests->rtpPort);
00489     if (fRTCPgs != NULL) fRTCPgs->removeDestination(dests->addr, dests->rtcpPort);
00490     if (fRTCPInstance != NULL) {
00491       fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
00492                                           NULL, NULL);
00493     }
00494   }
00495 }
00496 
00497 void StreamState::reclaim() {
00498   // Delete allocated media objects
00499   Medium::close(fRTCPInstance) /* will send a RTCP BYE */; fRTCPInstance = NULL;
00500   Medium::close(fRTPSink); fRTPSink = NULL;
00501   Medium::close(fUDPSink); fUDPSink = NULL;
00502 
00503   fMaster.closeStreamSource(fMediaSource); fMediaSource = NULL;
00504 
00505   delete fRTPgs; fRTPgs = NULL;
00506   delete fRTCPgs; fRTCPgs = NULL;
00507 }

Generated on Tue Jul 22 06:39:06 2008 for live by  doxygen 1.5.2