00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "OnDemandServerMediaSubsession.hh"
00023 #include "RTCP.hh"
00024 #include "BasicUDPSink.hh"
00025 #include <GroupsockHelper.hh>
00026
00027 OnDemandServerMediaSubsession
00028 ::OnDemandServerMediaSubsession(UsageEnvironment& env,
00029 Boolean reuseFirstSource,
00030 portNumBits initialPortNum)
00031 : ServerMediaSubsession(env),
00032 fReuseFirstSource(reuseFirstSource), fInitialPortNum(initialPortNum),
00033 fLastStreamToken(NULL), fSDPLines(NULL) {
00034 fDestinationsHashTable = HashTable::create(ONE_WORD_HASH_KEYS);
00035 gethostname(fCNAME, sizeof fCNAME);
00036 fCNAME[sizeof fCNAME-1] = '\0';
00037 }
00038
00039 class Destinations {
00040 public:
00041 Destinations(struct in_addr const& destAddr,
00042 Port const& rtpDestPort,
00043 Port const& rtcpDestPort)
00044 : isTCP(False), addr(destAddr), rtpPort(rtpDestPort), rtcpPort(rtcpDestPort) {
00045 }
00046 Destinations(int tcpSockNum, unsigned char rtpChanId, unsigned char rtcpChanId)
00047 : isTCP(True), rtpPort(0) , rtcpPort(0) ,
00048 tcpSocketNum(tcpSockNum), rtpChannelId(rtpChanId), rtcpChannelId(rtcpChanId) {
00049 }
00050
00051 public:
00052 Boolean isTCP;
00053 struct in_addr addr;
00054 Port rtpPort;
00055 Port rtcpPort;
00056 int tcpSocketNum;
00057 unsigned char rtpChannelId, rtcpChannelId;
00058 };
00059
00060 OnDemandServerMediaSubsession::~OnDemandServerMediaSubsession() {
00061 delete[] fSDPLines;
00062
00063
00064 while (1) {
00065 Destinations* destinations
00066 = (Destinations*)(fDestinationsHashTable->RemoveNext());
00067 if (destinations == NULL) break;
00068 delete destinations;
00069 }
00070 delete fDestinationsHashTable;
00071 }
00072
00073 char const*
00074 OnDemandServerMediaSubsession::sdpLines() {
00075 if (fSDPLines == NULL) {
00076
00077
00078
00079
00080 unsigned estBitrate;
00081 FramedSource* inputSource = createNewStreamSource(0, estBitrate);
00082 if (inputSource == NULL) return NULL;
00083
00084 struct in_addr dummyAddr;
00085 dummyAddr.s_addr = 0;
00086 Groupsock dummyGroupsock(envir(), dummyAddr, 0, 0);
00087 unsigned char rtpPayloadType = 96 + trackNumber()-1;
00088 RTPSink* dummyRTPSink
00089 = createNewRTPSink(&dummyGroupsock, rtpPayloadType, inputSource);
00090
00091 setSDPLinesFromRTPSink(dummyRTPSink, inputSource);
00092 Medium::close(dummyRTPSink);
00093 closeStreamSource(inputSource);
00094 }
00095
00096 return fSDPLines;
00097 }
00098
00099
00100 class StreamState {
00101 public:
00102 StreamState(OnDemandServerMediaSubsession& master,
00103 Port const& serverRTPPort, Port const& serverRTCPPort,
00104 RTPSink* rtpSink, BasicUDPSink* udpSink,
00105 unsigned totalBW, FramedSource* mediaSource,
00106 Groupsock* rtpGS, Groupsock* rtcpGS);
00107 virtual ~StreamState();
00108
00109 void startPlaying(Destinations* destinations,
00110 TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData);
00111 void pause();
00112 void endPlaying(Destinations* destinations);
00113 void reclaim();
00114
00115 unsigned& referenceCount() { return fReferenceCount; }
00116
00117 Port const& serverRTPPort() const { return fServerRTPPort; }
00118 Port const& serverRTCPPort() const { return fServerRTCPPort; }
00119
00120 RTPSink* rtpSink() const { return fRTPSink; }
00121
00122 float streamDuration() const { return fStreamDuration; }
00123
00124 FramedSource* mediaSource() const { return fMediaSource; }
00125
00126 private:
00127 OnDemandServerMediaSubsession& fMaster;
00128 Boolean fAreCurrentlyPlaying;
00129 unsigned fReferenceCount;
00130
00131 Port fServerRTPPort, fServerRTCPPort;
00132
00133 RTPSink* fRTPSink;
00134 BasicUDPSink* fUDPSink;
00135
00136 float fStreamDuration;
00137 unsigned fTotalBW; RTCPInstance* fRTCPInstance;
00138
00139 FramedSource* fMediaSource;
00140
00141 Groupsock* fRTPgs; Groupsock* fRTCPgs;
00142 };
00143
00144 void OnDemandServerMediaSubsession
00145 ::getStreamParameters(unsigned clientSessionId,
00146 netAddressBits clientAddress,
00147 Port const& clientRTPPort,
00148 Port const& clientRTCPPort,
00149 int tcpSocketNum,
00150 unsigned char rtpChannelId,
00151 unsigned char rtcpChannelId,
00152 netAddressBits& destinationAddress,
00153 u_int8_t& ,
00154 Boolean& isMulticast,
00155 Port& serverRTPPort,
00156 Port& serverRTCPPort,
00157 void*& streamToken) {
00158 if (destinationAddress == 0) destinationAddress = clientAddress;
00159 struct in_addr destinationAddr; destinationAddr.s_addr = destinationAddress;
00160 isMulticast = False;
00161
00162 if (fLastStreamToken != NULL && fReuseFirstSource) {
00163
00164
00165 serverRTPPort = ((StreamState*)fLastStreamToken)->serverRTPPort();
00166 serverRTCPPort = ((StreamState*)fLastStreamToken)->serverRTCPPort();
00167 ++((StreamState*)fLastStreamToken)->referenceCount();
00168 streamToken = fLastStreamToken;
00169 } else {
00170
00171 unsigned streamBitrate;
00172 FramedSource* mediaSource
00173 = createNewStreamSource(clientSessionId, streamBitrate);
00174
00175
00176
00177 RTPSink* rtpSink;
00178 BasicUDPSink* udpSink;
00179 Groupsock* rtpGroupsock;
00180 Groupsock* rtcpGroupsock;
00181 portNumBits serverPortNum;
00182 if (clientRTCPPort.num() == 0) {
00183
00184 NoReuse dummy;
00185 for (serverPortNum = fInitialPortNum; ; ++serverPortNum) {
00186 struct in_addr dummyAddr; dummyAddr.s_addr = 0;
00187
00188 serverRTPPort = serverPortNum;
00189 rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);
00190 if (rtpGroupsock->socketNum() >= 0) break;
00191 }
00192
00193 rtcpGroupsock = NULL;
00194 rtpSink = NULL;
00195 udpSink = BasicUDPSink::createNew(envir(), rtpGroupsock);
00196 } else {
00197
00198
00199 NoReuse dummy;
00200 for (portNumBits serverPortNum = fInitialPortNum; ; serverPortNum += 2) {
00201 struct in_addr dummyAddr; dummyAddr.s_addr = 0;
00202
00203 serverRTPPort = serverPortNum;
00204 rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);
00205 if (rtpGroupsock->socketNum() < 0) {
00206 delete rtpGroupsock;
00207 continue;
00208 }
00209
00210 serverRTCPPort = serverPortNum+1;
00211 rtcpGroupsock = new Groupsock(envir(), dummyAddr, serverRTCPPort, 255);
00212 if (rtcpGroupsock->socketNum() < 0) {
00213 delete rtpGroupsock;
00214 delete rtcpGroupsock;
00215 continue;
00216 }
00217
00218 break;
00219 }
00220
00221 unsigned char rtpPayloadType = 96 + trackNumber()-1;
00222 rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource);
00223 udpSink = NULL;
00224 }
00225
00226
00227
00228 if (rtpGroupsock != NULL) rtpGroupsock->removeAllDestinations();
00229 if (rtcpGroupsock != NULL) rtcpGroupsock->removeAllDestinations();
00230
00231
00232 streamToken = fLastStreamToken
00233 = new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink,
00234 streamBitrate, mediaSource,
00235 rtpGroupsock, rtcpGroupsock);
00236 }
00237
00238
00239 Destinations* destinations;
00240 if (tcpSocketNum < 0) {
00241 destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort);
00242 } else {
00243 destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId);
00244 }
00245 fDestinationsHashTable->Add((char const*)clientSessionId, destinations);
00246 }
00247
00248 void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,
00249 void* streamToken,
00250 TaskFunc* rtcpRRHandler,
00251 void* rtcpRRHandlerClientData,
00252 unsigned short& rtpSeqNum,
00253 unsigned& rtpTimestamp) {
00254 StreamState* streamState = (StreamState*)streamToken;
00255 Destinations* destinations
00256 = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
00257 if (streamState != NULL) {
00258 streamState->startPlaying(destinations,
00259 rtcpRRHandler, rtcpRRHandlerClientData);
00260 if (streamState->rtpSink() != NULL) {
00261 rtpSeqNum = streamState->rtpSink()->currentSeqNo();
00262 rtpTimestamp = streamState->rtpSink()->presetNextTimestamp();
00263 }
00264 }
00265 }
00266
00267 void OnDemandServerMediaSubsession::pauseStream(unsigned ,
00268 void* streamToken) {
00269
00270
00271 if (fReuseFirstSource) return;
00272
00273 StreamState* streamState = (StreamState*)streamToken;
00274 if (streamState != NULL) streamState->pause();
00275 }
00276
00277 void OnDemandServerMediaSubsession::seekStream(unsigned ,
00278 void* streamToken, float seekNPT) {
00279
00280
00281 if (fReuseFirstSource) return;
00282
00283 StreamState* streamState = (StreamState*)streamToken;
00284 if (streamState != NULL && streamState->mediaSource() != NULL) {
00285 seekStreamSource(streamState->mediaSource(), seekNPT);
00286 }
00287 }
00288
00289 void OnDemandServerMediaSubsession::setStreamScale(unsigned ,
00290 void* streamToken, float scale) {
00291
00292
00293 if (fReuseFirstSource) return;
00294
00295 StreamState* streamState = (StreamState*)streamToken;
00296 if (streamState != NULL && streamState->mediaSource() != NULL) {
00297 setStreamSourceScale(streamState->mediaSource(), scale);
00298 }
00299 }
00300
00301 void OnDemandServerMediaSubsession::deleteStream(unsigned clientSessionId,
00302 void*& streamToken) {
00303 StreamState* streamState = (StreamState*)streamToken;
00304
00305
00306 Destinations* destinations
00307 = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
00308 if (destinations != NULL) {
00309 fDestinationsHashTable->Remove((char const*)clientSessionId);
00310
00311
00312 if (streamState != NULL) streamState->endPlaying(destinations);
00313 }
00314
00315
00316 if (streamState != NULL) {
00317 if (streamState->referenceCount() > 0) --streamState->referenceCount();
00318 if (streamState->referenceCount() == 0) {
00319 delete streamState;
00320 if (fLastStreamToken == streamToken) fLastStreamToken = NULL;
00321 streamToken = NULL;
00322 }
00323 }
00324
00325
00326 delete destinations;
00327 }
00328
00329 char const* OnDemandServerMediaSubsession
00330 ::getAuxSDPLine(RTPSink* rtpSink, FramedSource* ) {
00331
00332 return rtpSink == NULL ? NULL : rtpSink->auxSDPLine();
00333 }
00334
00335 void OnDemandServerMediaSubsession::seekStreamSource(FramedSource* ,
00336 float ) {
00337
00338 }
00339
00340 void OnDemandServerMediaSubsession
00341 ::setStreamSourceScale(FramedSource* , float ) {
00342
00343 }
00344
00345 void OnDemandServerMediaSubsession::closeStreamSource(FramedSource *inputSource) {
00346 Medium::close(inputSource);
00347 }
00348
00349 void OnDemandServerMediaSubsession
00350 ::setSDPLinesFromRTPSink(RTPSink* rtpSink, FramedSource* inputSource) {
00351 if (rtpSink == NULL) return;
00352
00353 char const* mediaType = rtpSink->sdpMediaType();
00354 unsigned char rtpPayloadType = rtpSink->rtpPayloadType();
00355 struct in_addr serverAddrForSDP; serverAddrForSDP.s_addr = fServerAddressForSDP;
00356 char* const ipAddressStr = strDup(our_inet_ntoa(serverAddrForSDP));
00357 char* rtpmapLine = rtpSink->rtpmapLine();
00358 char const* rangeLine = rangeSDPLine();
00359 char const* auxSDPLine = getAuxSDPLine(rtpSink, inputSource);
00360 if (auxSDPLine == NULL) auxSDPLine = "";
00361
00362 char const* const sdpFmt =
00363 "m=%s %u RTP/AVP %d\r\n"
00364 "c=IN IP4 %s\r\n"
00365 "%s"
00366 "%s"
00367 "%s"
00368 "a=control:%s\r\n";
00369 unsigned sdpFmtSize = strlen(sdpFmt)
00370 + strlen(mediaType) + 5 + 3
00371 + strlen(ipAddressStr)
00372 + strlen(rtpmapLine)
00373 + strlen(rangeLine)
00374 + strlen(auxSDPLine)
00375 + strlen(trackId());
00376 char* sdpLines = new char[sdpFmtSize];
00377 sprintf(sdpLines, sdpFmt,
00378 mediaType,
00379 fPortNumForSDP,
00380 rtpPayloadType,
00381 ipAddressStr,
00382 rtpmapLine,
00383 rangeLine,
00384 auxSDPLine,
00385 trackId());
00386 delete[] (char*)rangeLine; delete[] rtpmapLine; delete[] ipAddressStr;
00387
00388 fSDPLines = strDup(sdpLines);
00389 delete[] sdpLines;
00390 }
00391
00392
00394
00395 static void afterPlayingStreamState(void* clientData) {
00396 StreamState* streamState = (StreamState*)clientData;
00397 if (streamState->streamDuration() == 0.0) {
00398
00399
00400
00401
00402 streamState->reclaim();
00403 }
00404
00405
00406
00407 }
00408
00409 StreamState::StreamState(OnDemandServerMediaSubsession& master,
00410 Port const& serverRTPPort, Port const& serverRTCPPort,
00411 RTPSink* rtpSink, BasicUDPSink* udpSink,
00412 unsigned totalBW, FramedSource* mediaSource,
00413 Groupsock* rtpGS, Groupsock* rtcpGS)
00414 : fMaster(master), fAreCurrentlyPlaying(False), fReferenceCount(1),
00415 fServerRTPPort(serverRTPPort), fServerRTCPPort(serverRTCPPort),
00416 fRTPSink(rtpSink), fUDPSink(udpSink), fStreamDuration(master.duration()),
00417 fTotalBW(totalBW), fRTCPInstance(NULL) ,
00418 fMediaSource(mediaSource), fRTPgs(rtpGS), fRTCPgs(rtcpGS) {
00419 }
00420
00421 StreamState::~StreamState() {
00422 reclaim();
00423 }
00424
00425 void StreamState
00426 ::startPlaying(Destinations* dests,
00427 TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData) {
00428 if (dests == NULL) return;
00429 if (!fAreCurrentlyPlaying && fMediaSource != NULL) {
00430 if (fRTPSink != NULL) {
00431 fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
00432 fAreCurrentlyPlaying = True;
00433 } else if (fUDPSink != NULL) {
00434 fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
00435 fAreCurrentlyPlaying = True;
00436 }
00437 }
00438
00439 if (fRTCPInstance == NULL && fRTPSink != NULL) {
00440
00441 fRTCPInstance
00442 = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs,
00443 fTotalBW, (unsigned char*)fMaster.fCNAME,
00444 fRTPSink, NULL );
00445
00446 }
00447
00448 if (dests->isTCP) {
00449
00450 if (fRTPSink != NULL) {
00451 fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
00452 }
00453 if (fRTCPInstance != NULL) {
00454 fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
00455 fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,
00456 rtcpRRHandler, rtcpRRHandlerClientData);
00457 }
00458 } else {
00459
00460
00461 if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort);
00462 if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort);
00463 if (fRTCPInstance != NULL) {
00464 fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
00465 rtcpRRHandler, rtcpRRHandlerClientData);
00466 }
00467 }
00468 }
00469
00470 void StreamState::pause() {
00471 if (fRTPSink != NULL) fRTPSink->stopPlaying();
00472 if (fUDPSink != NULL) fUDPSink->stopPlaying();
00473 fAreCurrentlyPlaying = False;
00474 }
00475
00476 void StreamState::endPlaying(Destinations* dests) {
00477 if (dests->isTCP) {
00478 if (fRTPSink != NULL) {
00479 fRTPSink->removeStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
00480 }
00481 if (fRTCPInstance != NULL) {
00482 fRTCPInstance->removeStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
00483 fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,
00484 NULL, NULL);
00485 }
00486 } else {
00487
00488 if (fRTPgs != NULL) fRTPgs->removeDestination(dests->addr, dests->rtpPort);
00489 if (fRTCPgs != NULL) fRTCPgs->removeDestination(dests->addr, dests->rtcpPort);
00490 if (fRTCPInstance != NULL) {
00491 fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
00492 NULL, NULL);
00493 }
00494 }
00495 }
00496
00497 void StreamState::reclaim() {
00498
00499 Medium::close(fRTCPInstance) ; fRTCPInstance = NULL;
00500 Medium::close(fRTPSink); fRTPSink = NULL;
00501 Medium::close(fUDPSink); fUDPSink = NULL;
00502
00503 fMaster.closeStreamSource(fMediaSource); fMediaSource = NULL;
00504
00505 delete fRTPgs; fRTPgs = NULL;
00506 delete fRTCPgs; fRTCPgs = NULL;
00507 }