00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "RTPInterface.hh"
00024 #include <GroupsockHelper.hh>
00025 #include <stdio.h>
00026
00028
00029
00030
00031
00032 static Boolean sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00033 int socketNum, unsigned char streamChannelId);
00034
00035
00036
00037
00038
00039
00040 static HashTable* socketHashTable(UsageEnvironment& env, Boolean createIfNotPresent = True) {
00041 _Tables* ourTables = _Tables::getOurTables(env, createIfNotPresent);
00042 if (ourTables == NULL) return NULL;
00043
00044 if (ourTables->socketTable == NULL) {
00045
00046 ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);
00047 }
00048 return (HashTable*)(ourTables->socketTable);
00049 }
00050
00051 class SocketDescriptor {
00052 public:
00053 SocketDescriptor(UsageEnvironment& env, int socketNum);
00054 virtual ~SocketDescriptor();
00055
00056 void registerRTPInterface(unsigned char streamChannelId,
00057 RTPInterface* rtpInterface);
00058 RTPInterface* lookupRTPInterface(unsigned char streamChannelId);
00059 void deregisterRTPInterface(unsigned char streamChannelId);
00060
00061 void setServerRequestAlternativeByteHandler(ServerRequestAlternativeByteHandler* handler, void* clientData) {
00062 fServerRequestAlternativeByteHandler = handler;
00063 fServerRequestAlternativeByteHandlerClientData = clientData;
00064 }
00065
00066 private:
00067 static void tcpReadHandler(SocketDescriptor*, int mask);
00068 void tcpReadHandler1(int mask);
00069
00070 private:
00071 UsageEnvironment& fEnv;
00072 int fOurSocketNum;
00073 HashTable* fSubChannelHashTable;
00074 ServerRequestAlternativeByteHandler* fServerRequestAlternativeByteHandler;
00075 void* fServerRequestAlternativeByteHandlerClientData;
00076 u_int8_t fStreamChannelId, fSizeByte1;
00077 enum { AWAITING_DOLLAR, AWAITING_STREAM_CHANNEL_ID, AWAITING_SIZE1, AWAITING_SIZE2, AWAITING_PACKET_DATA } fTCPReadingState;
00078 };
00079
00080 static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env, int sockNum, Boolean createIfNotFound = True) {
00081 HashTable* table = socketHashTable(env, createIfNotFound);
00082 if (table == NULL) return NULL;
00083
00084 char const* key = (char const*)(long)sockNum;
00085 SocketDescriptor* socketDescriptor = (SocketDescriptor*)(table->Lookup(key));
00086 if (socketDescriptor == NULL && createIfNotFound) {
00087 socketDescriptor = new SocketDescriptor(env, sockNum);
00088 table->Add((char const*)(long)(sockNum), socketDescriptor);
00089 }
00090
00091 return socketDescriptor;
00092 }
00093
00094 static void removeSocketDescription(UsageEnvironment& env, int sockNum) {
00095 char const* key = (char const*)(long)sockNum;
00096 HashTable* table = socketHashTable(env);
00097 table->Remove(key);
00098
00099 if (table->IsEmpty()) {
00100
00101 _Tables* ourTables = _Tables::getOurTables(env);
00102 delete table;
00103 ourTables->socketTable = NULL;
00104 ourTables->reclaimIfPossible();
00105 }
00106 }
00107
00108
00110
00111 RTPInterface::RTPInterface(Medium* owner, Groupsock* gs)
00112 : fOwner(owner), fGS(gs),
00113 fTCPStreams(NULL),
00114 fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1),
00115 fNextTCPReadStreamChannelId(0xFF), fReadHandlerProc(NULL),
00116 fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) {
00117
00118
00119
00120
00121 makeSocketNonBlocking(fGS->socketNum());
00122 increaseSendBufferTo(envir(), fGS->socketNum(), 50*1024);
00123 }
00124
00125 RTPInterface::~RTPInterface() {
00126 delete fTCPStreams;
00127 }
00128
00129 void RTPInterface::setStreamSocket(int sockNum,
00130 unsigned char streamChannelId) {
00131 fGS->removeAllDestinations();
00132 addStreamSocket(sockNum, streamChannelId);
00133 }
00134
00135 void RTPInterface::addStreamSocket(int sockNum,
00136 unsigned char streamChannelId) {
00137 if (sockNum < 0) return;
00138
00139 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00140 streams = streams->fNext) {
00141 if (streams->fStreamSocketNum == sockNum
00142 && streams->fStreamChannelId == streamChannelId) {
00143 return;
00144 }
00145 }
00146
00147 fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);
00148
00149
00150 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), sockNum);
00151 socketDescriptor->registerRTPInterface(streamChannelId, this);
00152 }
00153
00154 static void deregisterSocket(UsageEnvironment& env, int sockNum, unsigned char streamChannelId) {
00155 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, sockNum, False);
00156 if (socketDescriptor != NULL) {
00157 socketDescriptor->deregisterRTPInterface(streamChannelId);
00158
00159
00160 }
00161 }
00162
00163 void RTPInterface::removeStreamSocket(int sockNum,
00164 unsigned char streamChannelId) {
00165 for (tcpStreamRecord** streamsPtr = &fTCPStreams; *streamsPtr != NULL;
00166 streamsPtr = &((*streamsPtr)->fNext)) {
00167 if ((*streamsPtr)->fStreamSocketNum == sockNum
00168 && (*streamsPtr)->fStreamChannelId == streamChannelId) {
00169 deregisterSocket(envir(), sockNum, streamChannelId);
00170
00171
00172 tcpStreamRecord* next = (*streamsPtr)->fNext;
00173 (*streamsPtr)->fNext = NULL;
00174 delete (*streamsPtr);
00175 *streamsPtr = next;
00176 return;
00177 }
00178 }
00179 }
00180
00181 void RTPInterface
00182 ::setServerRequestAlternativeByteHandler(int socketNum, ServerRequestAlternativeByteHandler* handler, void* clientData) {
00183 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), socketNum);
00184
00185 if (socketDescriptor != NULL) socketDescriptor->setServerRequestAlternativeByteHandler(handler, clientData);
00186 }
00187
00188
00189 Boolean RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
00190 Boolean success = True;
00191
00192
00193 if (!fGS->output(envir(), fGS->ttl(), packet, packetSize)) success = False;
00194
00195
00196 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00197 streams = streams->fNext) {
00198 if (!sendRTPOverTCP(packet, packetSize,
00199 streams->fStreamSocketNum, streams->fStreamChannelId)) {
00200 success = False;
00201 }
00202 }
00203
00204 return success;
00205 }
00206
00207 void RTPInterface
00208 ::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
00209
00210 envir().taskScheduler().
00211 turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);
00212
00213
00214 fReadHandlerProc = handlerProc;
00215 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00216 streams = streams->fNext) {
00217
00218 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
00219
00220
00221 socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
00222 }
00223 }
00224
00225 Boolean RTPInterface::handleRead(unsigned char* buffer, unsigned bufferMaxSize,
00226 unsigned& bytesRead, struct sockaddr_in& fromAddress, Boolean& packetReadWasIncomplete) {
00227 packetReadWasIncomplete = False;
00228 Boolean readSuccess;
00229 if (fNextTCPReadStreamSocketNum < 0) {
00230
00231 readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);
00232 } else {
00233
00234 bytesRead = 0;
00235 unsigned totBytesToRead = fNextTCPReadSize;
00236 if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize;
00237 unsigned curBytesToRead = totBytesToRead;
00238 int curBytesRead;
00239 while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,
00240 &buffer[bytesRead], curBytesToRead,
00241 fromAddress)) > 0) {
00242 bytesRead += curBytesRead;
00243 if (bytesRead >= totBytesToRead) break;
00244 curBytesToRead -= curBytesRead;
00245 }
00246 fNextTCPReadSize -= bytesRead;
00247 if (fNextTCPReadSize > 0) {
00248 packetReadWasIncomplete = True;
00249 return True;
00250 } else if (curBytesRead < 0) {
00251 bytesRead = 0;
00252 readSuccess = False;
00253 } else {
00254 readSuccess = True;
00255 }
00256 fNextTCPReadStreamSocketNum = -1;
00257 }
00258
00259 if (readSuccess && fAuxReadHandlerFunc != NULL) {
00260
00261 (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);
00262 }
00263 return readSuccess;
00264 }
00265
00266 void RTPInterface::stopNetworkReading() {
00267
00268 envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum());
00269
00270
00271 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00272 streams = streams->fNext) {
00273 deregisterSocket(envir(), streams->fStreamSocketNum, streams->fStreamChannelId);
00274 }
00275 }
00276
00277
00279
00280 Boolean sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00281 int socketNum, unsigned char streamChannelId) {
00282 #ifdef DEBUG
00283 fprintf(stderr, "sendRTPOverTCP: %d bytes over channel %d (socket %d)\n",
00284 packetSize, streamChannelId, socketNum); fflush(stderr);
00285 #endif
00286
00287
00288 do {
00289 char const dollar = '$';
00290 if (send(socketNum, &dollar, 1, 0) != 1) break;
00291 if (send(socketNum, (char*)&streamChannelId, 1, 0) != 1) break;
00292
00293 char netPacketSize[2];
00294 netPacketSize[0] = (char) ((packetSize&0xFF00)>>8);
00295 netPacketSize[1] = (char) (packetSize&0xFF);
00296 if (send(socketNum, netPacketSize, 2, 0) != 2) break;
00297
00298 if (send(socketNum, (char*)packet, packetSize, 0) != (int)packetSize) break;
00299
00300 #ifdef DEBUG
00301 fprintf(stderr, "sendRTPOverTCP: completed\n"); fflush(stderr);
00302 #endif
00303
00304 return True;
00305 } while (0);
00306
00307 #ifdef DEBUG
00308 fprintf(stderr, "sendRTPOverTCP: failed!\n"); fflush(stderr);
00309 #endif
00310 return False;
00311 }
00312
00313 SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum)
00314 :fEnv(env), fOurSocketNum(socketNum),
00315 fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)),
00316 fServerRequestAlternativeByteHandler(NULL), fServerRequestAlternativeByteHandlerClientData(NULL),
00317 fTCPReadingState(AWAITING_DOLLAR) {
00318 }
00319
00320 SocketDescriptor::~SocketDescriptor() {
00321 delete fSubChannelHashTable;
00322 }
00323
00324 void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId,
00325 RTPInterface* rtpInterface) {
00326 Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty();
00327 fSubChannelHashTable->Add((char const*)(long)streamChannelId,
00328 rtpInterface);
00329
00330 if (isFirstRegistration) {
00331
00332 TaskScheduler::BackgroundHandlerProc* handler
00333 = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler;
00334 fEnv.taskScheduler().
00335 turnOnBackgroundReadHandling(fOurSocketNum, handler, this);
00336 }
00337 }
00338
00339 RTPInterface* SocketDescriptor
00340 ::lookupRTPInterface(unsigned char streamChannelId) {
00341 char const* lookupArg = (char const*)(long)streamChannelId;
00342 return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));
00343 }
00344
00345 void SocketDescriptor
00346 ::deregisterRTPInterface(unsigned char streamChannelId) {
00347 fSubChannelHashTable->Remove((char const*)(long)streamChannelId);
00348
00349 if (fSubChannelHashTable->IsEmpty()) {
00350
00351 fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
00352 removeSocketDescription(fEnv, fOurSocketNum);
00353 delete this;
00354 }
00355 }
00356
00357 void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor, int mask) {
00358 socketDescriptor->tcpReadHandler1(mask);
00359 }
00360
00361 void SocketDescriptor::tcpReadHandler1(int mask) {
00362
00363
00364
00365
00366
00367
00368
00369
00370 u_int8_t c;
00371 struct sockaddr_in fromAddress;
00372 if (fTCPReadingState != AWAITING_PACKET_DATA) {
00373 int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);
00374 if (result != 1) {
00375 if (result < 0) {
00376 fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
00377 }
00378 return;
00379 }
00380 }
00381
00382 switch (fTCPReadingState) {
00383 case AWAITING_DOLLAR: {
00384 if (c == '$') {
00385 fTCPReadingState = AWAITING_STREAM_CHANNEL_ID;
00386 } else {
00387
00388 if (fServerRequestAlternativeByteHandler != NULL) {
00389 (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, c);
00390 }
00391 }
00392 break;
00393 }
00394 case AWAITING_STREAM_CHANNEL_ID: {
00395
00396 if (lookupRTPInterface(c) != NULL) {
00397 fStreamChannelId = c;
00398 fTCPReadingState = AWAITING_SIZE1;
00399 } else {
00400
00401 fTCPReadingState = AWAITING_DOLLAR;
00402 }
00403 break;
00404 }
00405 case AWAITING_SIZE1: {
00406
00407 fSizeByte1 = c;
00408 fTCPReadingState = AWAITING_SIZE2;
00409 break;
00410 }
00411 case AWAITING_SIZE2: {
00412
00413 unsigned short size = (fSizeByte1<<8)|c;
00414
00415
00416 RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
00417 if (rtpInterface != NULL) {
00418 rtpInterface->fNextTCPReadSize = size;
00419 rtpInterface->fNextTCPReadStreamSocketNum = fOurSocketNum;
00420 rtpInterface->fNextTCPReadStreamChannelId = fStreamChannelId;
00421 }
00422 fTCPReadingState = AWAITING_PACKET_DATA;
00423 break;
00424 }
00425 case AWAITING_PACKET_DATA: {
00426
00427 RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
00428 if (rtpInterface != NULL) {
00429 if (rtpInterface->fNextTCPReadSize == 0) {
00430
00431 fTCPReadingState = AWAITING_DOLLAR;
00432 break;
00433 }
00434 if (rtpInterface->fReadHandlerProc != NULL) {
00435 #ifdef DEBUG
00436 fprintf(stderr, "SocketDescriptor::tcpReadHandler() reading %d bytes on channel %d\n", rtpInterface->fNextTCPReadSize, rtpInterface->fNextTCPReadStreamChannelId);
00437 #endif
00438 rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask);
00439 }
00440 }
00441 return;
00442 }
00443 }
00444 }
00445
00446
00448
00449 tcpStreamRecord
00450 ::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
00451 tcpStreamRecord* next)
00452 : fNext(next),
00453 fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {
00454 }
00455
00456 tcpStreamRecord::~tcpStreamRecord() {
00457 delete fNext;
00458 }
00459