00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "OnDemandServerMediaSubsession.hh"
00023 #include <GroupsockHelper.hh>
00024
00025 OnDemandServerMediaSubsession
00026 ::OnDemandServerMediaSubsession(UsageEnvironment& env,
00027 Boolean reuseFirstSource,
00028 portNumBits initialPortNum)
00029 : ServerMediaSubsession(env),
00030 fSDPLines(NULL), fReuseFirstSource(reuseFirstSource), fInitialPortNum(initialPortNum), fLastStreamToken(NULL) {
00031 fDestinationsHashTable = HashTable::create(ONE_WORD_HASH_KEYS);
00032 gethostname(fCNAME, sizeof fCNAME);
00033 fCNAME[sizeof fCNAME-1] = '\0';
00034 }
00035
00036 OnDemandServerMediaSubsession::~OnDemandServerMediaSubsession() {
00037 delete[] fSDPLines;
00038
00039
00040 while (1) {
00041 Destinations* destinations
00042 = (Destinations*)(fDestinationsHashTable->RemoveNext());
00043 if (destinations == NULL) break;
00044 delete destinations;
00045 }
00046 delete fDestinationsHashTable;
00047 }
00048
00049 char const*
00050 OnDemandServerMediaSubsession::sdpLines() {
00051 if (fSDPLines == NULL) {
00052
00053
00054
00055
00056 unsigned estBitrate;
00057 FramedSource* inputSource = createNewStreamSource(0, estBitrate);
00058 if (inputSource == NULL) return NULL;
00059
00060 struct in_addr dummyAddr;
00061 dummyAddr.s_addr = 0;
00062 Groupsock dummyGroupsock(envir(), dummyAddr, 0, 0);
00063 unsigned char rtpPayloadType = 96 + trackNumber()-1;
00064 RTPSink* dummyRTPSink
00065 = createNewRTPSink(&dummyGroupsock, rtpPayloadType, inputSource);
00066
00067 setSDPLinesFromRTPSink(dummyRTPSink, inputSource, estBitrate);
00068 Medium::close(dummyRTPSink);
00069 closeStreamSource(inputSource);
00070 }
00071
00072 return fSDPLines;
00073 }
00074
00075 void OnDemandServerMediaSubsession
00076 ::getStreamParameters(unsigned clientSessionId,
00077 netAddressBits clientAddress,
00078 Port const& clientRTPPort,
00079 Port const& clientRTCPPort,
00080 int tcpSocketNum,
00081 unsigned char rtpChannelId,
00082 unsigned char rtcpChannelId,
00083 netAddressBits& destinationAddress,
00084 u_int8_t& ,
00085 Boolean& isMulticast,
00086 Port& serverRTPPort,
00087 Port& serverRTCPPort,
00088 void*& streamToken) {
00089 if (destinationAddress == 0) destinationAddress = clientAddress;
00090 struct in_addr destinationAddr; destinationAddr.s_addr = destinationAddress;
00091 isMulticast = False;
00092
00093 if (fLastStreamToken != NULL && fReuseFirstSource) {
00094
00095
00096 serverRTPPort = ((StreamState*)fLastStreamToken)->serverRTPPort();
00097 serverRTCPPort = ((StreamState*)fLastStreamToken)->serverRTCPPort();
00098 ++((StreamState*)fLastStreamToken)->referenceCount();
00099 streamToken = fLastStreamToken;
00100 } else {
00101
00102 unsigned streamBitrate;
00103 FramedSource* mediaSource
00104 = createNewStreamSource(clientSessionId, streamBitrate);
00105
00106
00107
00108 RTPSink* rtpSink;
00109 BasicUDPSink* udpSink;
00110 Groupsock* rtpGroupsock;
00111 Groupsock* rtcpGroupsock;
00112 portNumBits serverPortNum;
00113 if (clientRTCPPort.num() == 0) {
00114
00115 NoReuse dummy(envir());
00116 for (serverPortNum = fInitialPortNum; ; ++serverPortNum) {
00117 struct in_addr dummyAddr; dummyAddr.s_addr = 0;
00118
00119 serverRTPPort = serverPortNum;
00120 rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);
00121 if (rtpGroupsock->socketNum() >= 0) break;
00122 }
00123
00124 rtcpGroupsock = NULL;
00125 rtpSink = NULL;
00126 udpSink = BasicUDPSink::createNew(envir(), rtpGroupsock);
00127 } else {
00128
00129
00130 NoReuse dummy(envir());
00131 for (portNumBits serverPortNum = fInitialPortNum; ; serverPortNum += 2) {
00132 struct in_addr dummyAddr; dummyAddr.s_addr = 0;
00133
00134 serverRTPPort = serverPortNum;
00135 rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);
00136 if (rtpGroupsock->socketNum() < 0) {
00137 delete rtpGroupsock;
00138 continue;
00139 }
00140
00141 serverRTCPPort = serverPortNum+1;
00142 rtcpGroupsock = new Groupsock(envir(), dummyAddr, serverRTCPPort, 255);
00143 if (rtcpGroupsock->socketNum() < 0) {
00144 delete rtpGroupsock;
00145 delete rtcpGroupsock;
00146 continue;
00147 }
00148
00149 break;
00150 }
00151
00152 unsigned char rtpPayloadType = 96 + trackNumber()-1;
00153 rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource);
00154 udpSink = NULL;
00155 }
00156
00157
00158
00159 if (rtpGroupsock != NULL) rtpGroupsock->removeAllDestinations();
00160 if (rtcpGroupsock != NULL) rtcpGroupsock->removeAllDestinations();
00161
00162 if (rtpGroupsock != NULL) {
00163
00164
00165 unsigned rtpBufSize = streamBitrate * 25 / 2;
00166 if (rtpBufSize < 50 * 1024) rtpBufSize = 50 * 1024;
00167 increaseSendBufferTo(envir(), rtpGroupsock->socketNum(), rtpBufSize);
00168 }
00169
00170
00171 streamToken = fLastStreamToken
00172 = new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink,
00173 streamBitrate, mediaSource,
00174 rtpGroupsock, rtcpGroupsock);
00175 }
00176
00177
00178 Destinations* destinations;
00179 if (tcpSocketNum < 0) {
00180 destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort);
00181 } else {
00182 destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId);
00183 }
00184 fDestinationsHashTable->Add((char const*)clientSessionId, destinations);
00185 }
00186
00187 void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,
00188 void* streamToken,
00189 TaskFunc* rtcpRRHandler,
00190 void* rtcpRRHandlerClientData,
00191 unsigned short& rtpSeqNum,
00192 unsigned& rtpTimestamp,
00193 ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
00194 void* serverRequestAlternativeByteHandlerClientData) {
00195 StreamState* streamState = (StreamState*)streamToken;
00196 Destinations* destinations
00197 = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
00198 if (streamState != NULL) {
00199 streamState->startPlaying(destinations,
00200 rtcpRRHandler, rtcpRRHandlerClientData,
00201 serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
00202 if (streamState->rtpSink() != NULL) {
00203 rtpSeqNum = streamState->rtpSink()->currentSeqNo();
00204 rtpTimestamp = streamState->rtpSink()->presetNextTimestamp();
00205 }
00206 }
00207 }
00208
00209 void OnDemandServerMediaSubsession::pauseStream(unsigned ,
00210 void* streamToken) {
00211
00212
00213 if (fReuseFirstSource) return;
00214
00215 StreamState* streamState = (StreamState*)streamToken;
00216 if (streamState != NULL) streamState->pause();
00217 }
00218
00219 void OnDemandServerMediaSubsession::seekStream(unsigned ,
00220 void* streamToken, double& seekNPT, double streamDuration, u_int64_t& numBytes) {
00221 numBytes = 0;
00222
00223
00224
00225 if (fReuseFirstSource) return;
00226
00227 StreamState* streamState = (StreamState*)streamToken;
00228 if (streamState != NULL && streamState->mediaSource() != NULL) {
00229 seekStreamSource(streamState->mediaSource(), seekNPT, streamDuration, numBytes);
00230 }
00231 }
00232
00233 void OnDemandServerMediaSubsession::setStreamScale(unsigned ,
00234 void* streamToken, float scale) {
00235
00236
00237 if (fReuseFirstSource) return;
00238
00239 StreamState* streamState = (StreamState*)streamToken;
00240 if (streamState != NULL && streamState->mediaSource() != NULL) {
00241 setStreamSourceScale(streamState->mediaSource(), scale);
00242 }
00243 }
00244
00245 FramedSource* OnDemandServerMediaSubsession::getStreamSource(void* streamToken) {
00246 if (streamToken == NULL) return NULL;
00247
00248 StreamState* streamState = (StreamState*)streamToken;
00249 return streamState->mediaSource();
00250 }
00251
00252 void OnDemandServerMediaSubsession::deleteStream(unsigned clientSessionId,
00253 void*& streamToken) {
00254 StreamState* streamState = (StreamState*)streamToken;
00255
00256
00257 Destinations* destinations
00258 = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
00259 if (destinations != NULL) {
00260 fDestinationsHashTable->Remove((char const*)clientSessionId);
00261
00262
00263 if (streamState != NULL) streamState->endPlaying(destinations);
00264 }
00265
00266
00267 if (streamState != NULL) {
00268 if (streamState->referenceCount() > 0) --streamState->referenceCount();
00269 if (streamState->referenceCount() == 0) {
00270 delete streamState;
00271 streamToken = NULL;
00272 }
00273 }
00274
00275
00276 delete destinations;
00277 }
00278
00279 char const* OnDemandServerMediaSubsession
00280 ::getAuxSDPLine(RTPSink* rtpSink, FramedSource* ) {
00281
00282 return rtpSink == NULL ? NULL : rtpSink->auxSDPLine();
00283 }
00284
00285 void OnDemandServerMediaSubsession::seekStreamSource(FramedSource* ,
00286 double& , double , u_int64_t& numBytes) {
00287
00288 }
00289
00290 void OnDemandServerMediaSubsession
00291 ::setStreamSourceScale(FramedSource* , float ) {
00292
00293 }
00294
00295 void OnDemandServerMediaSubsession::closeStreamSource(FramedSource *inputSource) {
00296 Medium::close(inputSource);
00297 }
00298
00299 void OnDemandServerMediaSubsession
00300 ::setSDPLinesFromRTPSink(RTPSink* rtpSink, FramedSource* inputSource, unsigned estBitrate) {
00301 if (rtpSink == NULL) return;
00302
00303 char const* mediaType = rtpSink->sdpMediaType();
00304 unsigned char rtpPayloadType = rtpSink->rtpPayloadType();
00305 AddressString ipAddressStr(fServerAddressForSDP);
00306 char* rtpmapLine = rtpSink->rtpmapLine();
00307 char const* rangeLine = rangeSDPLine();
00308 char const* auxSDPLine = getAuxSDPLine(rtpSink, inputSource);
00309 if (auxSDPLine == NULL) auxSDPLine = "";
00310
00311 char const* const sdpFmt =
00312 "m=%s %u RTP/AVP %d\r\n"
00313 "c=IN IP4 %s\r\n"
00314 "b=AS:%u\r\n"
00315 "%s"
00316 "%s"
00317 "%s"
00318 "a=control:%s\r\n";
00319 unsigned sdpFmtSize = strlen(sdpFmt)
00320 + strlen(mediaType) + 5 + 3
00321 + strlen(ipAddressStr.val())
00322 + 20
00323 + strlen(rtpmapLine)
00324 + strlen(rangeLine)
00325 + strlen(auxSDPLine)
00326 + strlen(trackId());
00327 char* sdpLines = new char[sdpFmtSize];
00328 sprintf(sdpLines, sdpFmt,
00329 mediaType,
00330 fPortNumForSDP,
00331 rtpPayloadType,
00332 ipAddressStr.val(),
00333 estBitrate,
00334 rtpmapLine,
00335 rangeLine,
00336 auxSDPLine,
00337 trackId());
00338 delete[] (char*)rangeLine; delete[] rtpmapLine;
00339
00340 fSDPLines = strDup(sdpLines);
00341 delete[] sdpLines;
00342 }
00343
00344
00346
00347 static void afterPlayingStreamState(void* clientData) {
00348 StreamState* streamState = (StreamState*)clientData;
00349 if (streamState->streamDuration() == 0.0) {
00350
00351
00352
00353
00354 streamState->reclaim();
00355 }
00356
00357
00358
00359 }
00360
00361 StreamState::StreamState(OnDemandServerMediaSubsession& master,
00362 Port const& serverRTPPort, Port const& serverRTCPPort,
00363 RTPSink* rtpSink, BasicUDPSink* udpSink,
00364 unsigned totalBW, FramedSource* mediaSource,
00365 Groupsock* rtpGS, Groupsock* rtcpGS)
00366 : fMaster(master), fAreCurrentlyPlaying(False), fReferenceCount(1),
00367 fServerRTPPort(serverRTPPort), fServerRTCPPort(serverRTCPPort),
00368 fRTPSink(rtpSink), fUDPSink(udpSink), fStreamDuration(master.duration()),
00369 fTotalBW(totalBW), fRTCPInstance(NULL) ,
00370 fMediaSource(mediaSource), fRTPgs(rtpGS), fRTCPgs(rtcpGS) {
00371 }
00372
00373 StreamState::~StreamState() {
00374 reclaim();
00375 }
00376
00377 void StreamState
00378 ::startPlaying(Destinations* dests,
00379 TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData,
00380 ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
00381 void* serverRequestAlternativeByteHandlerClientData) {
00382 if (dests == NULL) return;
00383
00384 if (fRTCPInstance == NULL && fRTPSink != NULL) {
00385
00386 fRTCPInstance
00387 = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs,
00388 fTotalBW, (unsigned char*)fMaster.fCNAME,
00389 fRTPSink, NULL );
00390
00391 }
00392
00393 if (dests->isTCP) {
00394
00395 if (fRTPSink != NULL) {
00396 fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
00397 fRTPSink->setServerRequestAlternativeByteHandler(dests->tcpSocketNum, serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
00398 }
00399 if (fRTCPInstance != NULL) {
00400 fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
00401 fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,
00402 rtcpRRHandler, rtcpRRHandlerClientData);
00403 }
00404 } else {
00405
00406
00407 if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort);
00408 if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort);
00409 if (fRTCPInstance != NULL) {
00410 fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
00411 rtcpRRHandler, rtcpRRHandlerClientData);
00412 }
00413 }
00414
00415 if (!fAreCurrentlyPlaying && fMediaSource != NULL) {
00416 if (fRTPSink != NULL) {
00417 fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
00418 fAreCurrentlyPlaying = True;
00419 } else if (fUDPSink != NULL) {
00420 fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
00421 fAreCurrentlyPlaying = True;
00422 }
00423 }
00424 }
00425
00426 void StreamState::pause() {
00427 if (fRTPSink != NULL) fRTPSink->stopPlaying();
00428 if (fUDPSink != NULL) fUDPSink->stopPlaying();
00429 fAreCurrentlyPlaying = False;
00430 }
00431
00432 void StreamState::endPlaying(Destinations* dests) {
00433 if (dests->isTCP) {
00434 if (fRTPSink != NULL) {
00435 fRTPSink->removeStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
00436 }
00437 if (fRTCPInstance != NULL) {
00438 fRTCPInstance->removeStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
00439 fRTCPInstance->unsetSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId);
00440 }
00441 } else {
00442
00443 if (fRTPgs != NULL) fRTPgs->removeDestination(dests->addr, dests->rtpPort);
00444 if (fRTCPgs != NULL) fRTCPgs->removeDestination(dests->addr, dests->rtcpPort);
00445 if (fRTCPInstance != NULL) {
00446 fRTCPInstance->unsetSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort);
00447 }
00448 }
00449 }
00450
00451 void StreamState::reclaim() {
00452
00453 Medium::close(fRTCPInstance) ; fRTCPInstance = NULL;
00454 Medium::close(fRTPSink); fRTPSink = NULL;
00455 Medium::close(fUDPSink); fUDPSink = NULL;
00456
00457 fMaster.closeStreamSource(fMediaSource); fMediaSource = NULL;
00458 if (fMaster.fLastStreamToken == this) fMaster.fLastStreamToken = NULL;
00459
00460 delete fRTPgs; fRTPgs = NULL;
00461 delete fRTCPgs; fRTCPgs = NULL;
00462 }