00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "RTPInterface.hh"
00024 #include <GroupsockHelper.hh>
00025 #include <stdio.h>
00026
00028
00029
00030
00031
00032 static void sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00033 int socketNum, unsigned char streamChannelId);
00034
00035
00036
00037
00038
00039
00040 static HashTable* socketHashTable(UsageEnvironment& env) {
00041 _Tables* ourTables = _Tables::getOurTables(env);
00042 if (ourTables->socketTable == NULL) {
00043
00044 ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);
00045 }
00046 return (HashTable*)(ourTables->socketTable);
00047 }
00048
00049 class SocketDescriptor {
00050 public:
00051 SocketDescriptor(UsageEnvironment& env, int socketNum);
00052 virtual ~SocketDescriptor();
00053
00054 void registerRTPInterface(unsigned char streamChannelId,
00055 RTPInterface* rtpInterface);
00056 RTPInterface* lookupRTPInterface(unsigned char streamChannelId);
00057 void deregisterRTPInterface(unsigned char streamChannelId);
00058
00059 private:
00060 static void tcpReadHandler(SocketDescriptor*, int mask);
00061
00062 private:
00063 UsageEnvironment& fEnv;
00064 int fOurSocketNum;
00065 HashTable* fSubChannelHashTable;
00066 };
00067
00068 static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env,
00069 int sockNum) {
00070 char const* key = (char const*)(long)sockNum;
00071 return (SocketDescriptor*)(socketHashTable(env)->Lookup(key));
00072 }
00073
00074 static void removeSocketDescription(UsageEnvironment& env, int sockNum) {
00075 char const* key = (char const*)(long)sockNum;
00076 HashTable* table = socketHashTable(env);
00077 table->Remove(key);
00078
00079 if (table->IsEmpty()) {
00080
00081 _Tables* ourTables = _Tables::getOurTables(env);
00082 delete table;
00083 ourTables->socketTable = NULL;
00084 ourTables->reclaimIfPossible();
00085 }
00086 }
00087
00088
00090
00091 RTPInterface::RTPInterface(Medium* owner, Groupsock* gs)
00092 : fOwner(owner), fGS(gs),
00093 fTCPStreams(NULL),
00094 fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1),
00095 fNextTCPReadStreamChannelId(0xFF), fReadHandlerProc(NULL),
00096 fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) {
00097
00098
00099
00100
00101 makeSocketNonBlocking(fGS->socketNum());
00102 increaseSendBufferTo(envir(), fGS->socketNum(), 50*1024);
00103 }
00104
00105 RTPInterface::~RTPInterface() {
00106 delete fTCPStreams;
00107 }
00108
00109 Boolean RTPOverTCP_OK = True;
00110
00111 void RTPInterface::setStreamSocket(int sockNum,
00112 unsigned char streamChannelId) {
00113 fGS->removeAllDestinations();
00114 addStreamSocket(sockNum, streamChannelId);
00115 }
00116
00117 void RTPInterface::addStreamSocket(int sockNum,
00118 unsigned char streamChannelId) {
00119 if (sockNum < 0) return;
00120 else RTPOverTCP_OK = True;
00121
00122 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00123 streams = streams->fNext) {
00124 if (streams->fStreamSocketNum == sockNum
00125 && streams->fStreamChannelId == streamChannelId) {
00126 return;
00127 }
00128 }
00129
00130 fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);
00131 }
00132
00133 void RTPInterface::removeStreamSocket(int sockNum,
00134 unsigned char streamChannelId) {
00135 for (tcpStreamRecord** streamsPtr = &fTCPStreams; *streamsPtr != NULL;
00136 streamsPtr = &((*streamsPtr)->fNext)) {
00137 if ((*streamsPtr)->fStreamSocketNum == sockNum
00138 && (*streamsPtr)->fStreamChannelId == streamChannelId) {
00139
00140 tcpStreamRecord* next = (*streamsPtr)->fNext;
00141 (*streamsPtr)->fNext = NULL;
00142 delete (*streamsPtr);
00143 *streamsPtr = next;
00144 return;
00145 }
00146 }
00147 }
00148
00149 void RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
00150
00151 fGS->output(envir(), fGS->ttl(), packet, packetSize);
00152
00153
00154 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00155 streams = streams->fNext) {
00156 sendRTPOverTCP(packet, packetSize,
00157 streams->fStreamSocketNum, streams->fStreamChannelId);
00158 }
00159 }
00160
00161 void RTPInterface
00162 ::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
00163
00164 envir().taskScheduler().
00165 turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);
00166
00167
00168 fReadHandlerProc = handlerProc;
00169 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00170 streams = streams->fNext) {
00171
00172 SocketDescriptor* socketDescriptor
00173 = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
00174 if (socketDescriptor == NULL) {
00175 socketDescriptor
00176 = new SocketDescriptor(envir(), streams->fStreamSocketNum);
00177 socketHashTable(envir())->Add((char const*)(long)(streams->fStreamSocketNum),
00178 socketDescriptor);
00179 }
00180
00181
00182 socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
00183 }
00184 }
00185
00186 Boolean RTPInterface::handleRead(unsigned char* buffer,
00187 unsigned bufferMaxSize,
00188 unsigned& bytesRead,
00189 struct sockaddr_in& fromAddress) {
00190 Boolean readSuccess;
00191 if (fNextTCPReadStreamSocketNum < 0) {
00192
00193 readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);
00194 } else {
00195
00196 bytesRead = 0;
00197 unsigned totBytesToRead = fNextTCPReadSize;
00198 if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize;
00199 unsigned curBytesToRead = totBytesToRead;
00200 int curBytesRead;
00201 while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,
00202 &buffer[bytesRead], curBytesToRead,
00203 fromAddress)) > 0) {
00204 bytesRead += curBytesRead;
00205 if (bytesRead >= totBytesToRead) break;
00206 curBytesToRead -= curBytesRead;
00207 }
00208 if (curBytesRead <= 0) {
00209 bytesRead = 0;
00210 readSuccess = False;
00211 RTPOverTCP_OK = False;
00212 } else {
00213 readSuccess = True;
00214 }
00215 fNextTCPReadStreamSocketNum = -1;
00216 }
00217
00218 if (readSuccess && fAuxReadHandlerFunc != NULL) {
00219
00220 (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);
00221 }
00222 return readSuccess;
00223 }
00224
00225 void RTPInterface::stopNetworkReading() {
00226
00227 envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum());
00228
00229
00230 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00231 streams = streams->fNext) {
00232 SocketDescriptor* socketDescriptor
00233 = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
00234 if (socketDescriptor != NULL) {
00235 socketDescriptor->deregisterRTPInterface(streams->fStreamChannelId);
00236
00237
00238 }
00239 }
00240 }
00241
00242
00244
00245 void sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00246 int socketNum, unsigned char streamChannelId) {
00247 #ifdef DEBUG
00248 fprintf(stderr, "sendRTPOverTCP: %d bytes over channel %d (socket %d)\n",
00249 packetSize, streamChannelId, socketNum); fflush(stderr);
00250 #endif
00251
00252
00253 do {
00254 char const dollar = '$';
00255 if (send(socketNum, &dollar, 1, 0) != 1) break;
00256 if (send(socketNum, (char*)&streamChannelId, 1, 0) != 1) break;
00257
00258 char netPacketSize[2];
00259 netPacketSize[0] = (char) ((packetSize&0xFF00)>>8);
00260 netPacketSize[1] = (char) (packetSize&0xFF);
00261 if (send(socketNum, netPacketSize, 2, 0) != 2) break;
00262
00263 if (send(socketNum, (char*)packet, packetSize, 0) != (int)packetSize) break;
00264
00265 #ifdef DEBUG
00266 fprintf(stderr, "sendRTPOverTCP: completed\n"); fflush(stderr);
00267 #endif
00268
00269 return;
00270 } while (0);
00271
00272 RTPOverTCP_OK = False;
00273 #ifdef DEBUG
00274 fprintf(stderr, "sendRTPOverTCP: failed!\n"); fflush(stderr);
00275 #endif
00276 }
00277
00278 SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum)
00279 : fEnv(env), fOurSocketNum(socketNum),
00280 fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)) {
00281 }
00282
00283 SocketDescriptor::~SocketDescriptor() {
00284 delete fSubChannelHashTable;
00285 }
00286
00287 void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId,
00288 RTPInterface* rtpInterface) {
00289 Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty();
00290 fSubChannelHashTable->Add((char const*)(long)streamChannelId,
00291 rtpInterface);
00292
00293 if (isFirstRegistration) {
00294
00295 TaskScheduler::BackgroundHandlerProc* handler
00296 = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler;
00297 fEnv.taskScheduler().
00298 turnOnBackgroundReadHandling(fOurSocketNum, handler, this);
00299 }
00300 }
00301
00302 RTPInterface* SocketDescriptor
00303 ::lookupRTPInterface(unsigned char streamChannelId) {
00304 char const* lookupArg = (char const*)(long)streamChannelId;
00305 return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));
00306 }
00307
00308 void SocketDescriptor
00309 ::deregisterRTPInterface(unsigned char streamChannelId) {
00310 fSubChannelHashTable->Remove((char const*)(long)streamChannelId);
00311
00312 if (fSubChannelHashTable->IsEmpty()) {
00313
00314 fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
00315 removeSocketDescription(fEnv, fOurSocketNum);
00316 delete this;
00317 }
00318 }
00319
00320 void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor,
00321 int mask) {
00322 do {
00323 UsageEnvironment& env = socketDescriptor->fEnv;
00324 int socketNum = socketDescriptor->fOurSocketNum;
00325
00326
00327
00328
00329
00330
00331 unsigned char c;
00332 struct sockaddr_in fromAddress;
00333 struct timeval timeout; timeout.tv_sec = 0; timeout.tv_usec = 0;
00334 do {
00335 int result = readSocket(env, socketNum, &c, 1, fromAddress, &timeout);
00336 if (result != 1) {
00337 if (result < 0) {
00338 env.taskScheduler().turnOffBackgroundReadHandling(socketNum);
00339 }
00340 return;
00341 }
00342 } while (c != '$');
00343
00344
00345 unsigned char streamChannelId;
00346 if (readSocket(env, socketNum, &streamChannelId, 1, fromAddress)
00347 != 1) break;
00348 RTPInterface* rtpInterface
00349 = socketDescriptor->lookupRTPInterface(streamChannelId);
00350 if (rtpInterface == NULL) break;
00351
00352
00353 unsigned short size;
00354 if (readSocketExact(env, socketNum, (unsigned char*)&size, 2,
00355 fromAddress) != 2) break;
00356 rtpInterface->fNextTCPReadSize = ntohs(size);
00357 rtpInterface->fNextTCPReadStreamSocketNum = socketNum;
00358 rtpInterface->fNextTCPReadStreamChannelId = streamChannelId;
00359 #ifdef DEBUG
00360 fprintf(stderr, "SocketDescriptor::tcpReadHandler() reading %d bytes on channel %d\n", rtpInterface->fNextTCPReadSize, streamChannelId);
00361 #endif
00362
00363
00364
00365 if (rtpInterface->fReadHandlerProc != NULL) {
00366 rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask);
00367 }
00368
00369 } while (0);
00370 }
00371
00372
00374
00375 tcpStreamRecord
00376 ::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
00377 tcpStreamRecord* next)
00378 : fNext(next),
00379 fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {
00380 }
00381
00382 tcpStreamRecord::~tcpStreamRecord() {
00383 delete fNext;
00384 }
00385