liveMedia/RTPInterface.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00015 **********/
00016 // "liveMedia"
00017 // Copyright (c) 1996-2008 Live Networks, Inc.  All rights reserved.
00018 // An abstraction of a network interface used for RTP (or RTCP).
00019 // (This allows the RTP-over-TCP hack (RFC 2326, section 10.12) to
00020 // be implemented transparently.)
00021 // Implementation
00022 
00023 #include "RTPInterface.hh"
00024 #include <GroupsockHelper.hh>
00025 #include <stdio.h>
00026 
00028 
00029 // Helper routines and data structures, used to implement
00030 // sending/receiving RTP/RTCP over a TCP socket:
00031 
00032 static void sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00033                            int socketNum, unsigned char streamChannelId);
00034 
00035 // Reading RTP-over-TCP is implemented using two levels of hash tables.
00036 // The top-level hash table maps TCP socket numbers to a
00037 // "SocketDescriptor" that contains a hash table for each of the
00038 // sub-channels that are reading from this socket.
00039 
00040 static HashTable* socketHashTable(UsageEnvironment& env) {
00041   _Tables* ourTables = _Tables::getOurTables(env);
00042   if (ourTables->socketTable == NULL) {
00043     // Create a new socket number -> SocketDescriptor mapping table:
00044     ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);
00045   }
00046   return (HashTable*)(ourTables->socketTable);
00047 }
00048 
00049 class SocketDescriptor {
00050 public:
00051   SocketDescriptor(UsageEnvironment& env, int socketNum);
00052   virtual ~SocketDescriptor();
00053 
00054   void registerRTPInterface(unsigned char streamChannelId,
00055                             RTPInterface* rtpInterface);
00056   RTPInterface* lookupRTPInterface(unsigned char streamChannelId);
00057   void deregisterRTPInterface(unsigned char streamChannelId);
00058 
00059 private:
00060   static void tcpReadHandler(SocketDescriptor*, int mask);
00061 
00062 private:
00063   UsageEnvironment& fEnv;
00064   int fOurSocketNum;
00065   HashTable* fSubChannelHashTable;
00066 };
00067 
00068 static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env,
00069                                                 int sockNum) {
00070   char const* key = (char const*)(long)sockNum;
00071   return (SocketDescriptor*)(socketHashTable(env)->Lookup(key));
00072 }
00073 
00074 static void removeSocketDescription(UsageEnvironment& env, int sockNum) {
00075   char const* key = (char const*)(long)sockNum;
00076   HashTable* table = socketHashTable(env);
00077   table->Remove(key);
00078 
00079   if (table->IsEmpty()) {
00080     // We can also delete the table (to reclaim space):
00081     _Tables* ourTables = _Tables::getOurTables(env);
00082     delete table;
00083     ourTables->socketTable = NULL;
00084     ourTables->reclaimIfPossible();
00085   }
00086 }
00087 
00088 
00090 
00091 RTPInterface::RTPInterface(Medium* owner, Groupsock* gs)
00092   : fOwner(owner), fGS(gs),
00093     fTCPStreams(NULL),
00094     fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1),
00095     fNextTCPReadStreamChannelId(0xFF), fReadHandlerProc(NULL),
00096     fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) {
00097   // Make the socket non-blocking, even though it will be read from only asynchronously, when packets arrive.
00098   // The reason for this is that, in some OSs, reads on a blocking socket can (allegedly) sometimes block,
00099   // even if the socket was previously reported (e.g., by "select()") as having data available.
00100   // (This can supposedly happen if the UDP checksum fails, for example.)
00101   makeSocketNonBlocking(fGS->socketNum());
00102   increaseSendBufferTo(envir(), fGS->socketNum(), 50*1024);
00103 }
00104 
00105 RTPInterface::~RTPInterface() {
00106   delete fTCPStreams;
00107 }
00108 
00109 Boolean RTPOverTCP_OK = True; // HACK: For detecting TCP socket failure externally #####
00110 
00111 void RTPInterface::setStreamSocket(int sockNum,
00112                                    unsigned char streamChannelId) {
00113   fGS->removeAllDestinations();
00114   addStreamSocket(sockNum, streamChannelId);
00115 }
00116 
00117 void RTPInterface::addStreamSocket(int sockNum,
00118                                    unsigned char streamChannelId) {
00119   if (sockNum < 0) return;
00120   else RTPOverTCP_OK = True; //##### HACK
00121 
00122   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00123        streams = streams->fNext) {
00124     if (streams->fStreamSocketNum == sockNum
00125         && streams->fStreamChannelId == streamChannelId) {
00126       return; // we already have it
00127     }
00128   }
00129 
00130   fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);
00131 }
00132 
00133 void RTPInterface::removeStreamSocket(int sockNum,
00134                                       unsigned char streamChannelId) {
00135   for (tcpStreamRecord** streamsPtr = &fTCPStreams; *streamsPtr != NULL;
00136        streamsPtr = &((*streamsPtr)->fNext)) {
00137     if ((*streamsPtr)->fStreamSocketNum == sockNum
00138         && (*streamsPtr)->fStreamChannelId == streamChannelId) {
00139       // Remove the record pointed to by *streamsPtr :
00140       tcpStreamRecord* next = (*streamsPtr)->fNext;
00141       (*streamsPtr)->fNext = NULL;
00142       delete (*streamsPtr);
00143       *streamsPtr = next;
00144       return;
00145     }
00146   }
00147 }
00148 
00149 void RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
00150   // Normal case: Send as a UDP packet:
00151   fGS->output(envir(), fGS->ttl(), packet, packetSize);
00152 
00153   // Also, send over each of our TCP sockets:
00154   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00155        streams = streams->fNext) {
00156     sendRTPOverTCP(packet, packetSize,
00157                    streams->fStreamSocketNum, streams->fStreamChannelId);
00158   }
00159 }
00160 
00161 void RTPInterface
00162 ::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
00163   // Normal case: Arrange to read UDP packets:
00164   envir().taskScheduler().
00165     turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);
00166 
00167   // Also, receive RTP over TCP, on each of our TCP connections:
00168   fReadHandlerProc = handlerProc;
00169   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00170        streams = streams->fNext) {
00171     // Get a socket descriptor for "streams->fStreamSocketNum":
00172     SocketDescriptor* socketDescriptor
00173       = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
00174     if (socketDescriptor == NULL) {
00175       socketDescriptor
00176         = new SocketDescriptor(envir(), streams->fStreamSocketNum);
00177       socketHashTable(envir())->Add((char const*)(long)(streams->fStreamSocketNum),
00178                                     socketDescriptor);
00179     }
00180 
00181     // Tell it about our subChannel:
00182     socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
00183   }
00184 }
00185 
00186 Boolean RTPInterface::handleRead(unsigned char* buffer,
00187                                  unsigned bufferMaxSize,
00188                                  unsigned& bytesRead,
00189                                  struct sockaddr_in& fromAddress) {
00190   Boolean readSuccess;
00191   if (fNextTCPReadStreamSocketNum < 0) {
00192     // Normal case: read from the (datagram) 'groupsock':
00193     readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);
00194   } else {
00195     // Read from the TCP connection:
00196     bytesRead = 0;
00197     unsigned totBytesToRead = fNextTCPReadSize;
00198     if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize; 
00199     unsigned curBytesToRead = totBytesToRead;
00200     int curBytesRead;
00201     while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,
00202                                       &buffer[bytesRead], curBytesToRead,
00203                                       fromAddress)) > 0) {
00204       bytesRead += curBytesRead;
00205       if (bytesRead >= totBytesToRead) break;
00206       curBytesToRead -= curBytesRead;
00207     }
00208     if (curBytesRead <= 0) {
00209       bytesRead = 0;
00210       readSuccess = False;
00211       RTPOverTCP_OK = False; // HACK #####
00212     } else {
00213       readSuccess = True;
00214     }
00215     fNextTCPReadStreamSocketNum = -1; // default, for next time
00216   }
00217 
00218   if (readSuccess && fAuxReadHandlerFunc != NULL) {
00219     // Also pass the newly-read packet data to our auxilliary handler:
00220     (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);
00221   }
00222   return readSuccess;
00223 }
00224 
00225 void RTPInterface::stopNetworkReading() {
00226   // Normal case
00227   envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum());
00228 
00229   // Also turn off read handling on each of our TCP connections:
00230   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00231        streams = streams->fNext) {
00232     SocketDescriptor* socketDescriptor
00233       = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
00234     if (socketDescriptor != NULL) {
00235       socketDescriptor->deregisterRTPInterface(streams->fStreamChannelId);
00236         // Note: This may delete "socketDescriptor",
00237         // if no more interfaces are using this socket
00238     }
00239   }
00240 }
00241 
00242 
00244 
00245 void sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00246                     int socketNum, unsigned char streamChannelId) {
00247 #ifdef DEBUG
00248   fprintf(stderr, "sendRTPOverTCP: %d bytes over channel %d (socket %d)\n",
00249           packetSize, streamChannelId, socketNum); fflush(stderr);
00250 #endif
00251   // Send RTP over TCP, using the encoding defined in
00252   // RFC 2326, section 10.12:
00253   do {
00254     char const dollar = '$';
00255     if (send(socketNum, &dollar, 1, 0) != 1) break;
00256     if (send(socketNum, (char*)&streamChannelId, 1, 0) != 1) break;
00257 
00258     char netPacketSize[2];
00259     netPacketSize[0] = (char) ((packetSize&0xFF00)>>8);
00260     netPacketSize[1] = (char) (packetSize&0xFF);
00261     if (send(socketNum, netPacketSize, 2, 0) != 2) break;
00262 
00263     if (send(socketNum, (char*)packet, packetSize, 0) != (int)packetSize) break;
00264 
00265 #ifdef DEBUG
00266     fprintf(stderr, "sendRTPOverTCP: completed\n"); fflush(stderr);
00267 #endif
00268 
00269     return;
00270   } while (0);
00271 
00272   RTPOverTCP_OK = False; // HACK #####
00273 #ifdef DEBUG
00274   fprintf(stderr, "sendRTPOverTCP: failed!\n"); fflush(stderr);
00275 #endif
00276 }
00277 
00278 SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum)
00279   : fEnv(env), fOurSocketNum(socketNum),
00280     fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)) {
00281 }
00282 
00283 SocketDescriptor::~SocketDescriptor() {
00284   delete fSubChannelHashTable;
00285 }
00286 
00287 void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId,
00288                                             RTPInterface* rtpInterface) {
00289   Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty();
00290   fSubChannelHashTable->Add((char const*)(long)streamChannelId,
00291                             rtpInterface);
00292 
00293   if (isFirstRegistration) {
00294     // Arrange to handle reads on this TCP socket:
00295     TaskScheduler::BackgroundHandlerProc* handler
00296       = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler;
00297     fEnv.taskScheduler().
00298       turnOnBackgroundReadHandling(fOurSocketNum, handler, this);
00299   }
00300 }
00301 
00302 RTPInterface* SocketDescriptor
00303 ::lookupRTPInterface(unsigned char streamChannelId) {
00304   char const* lookupArg = (char const*)(long)streamChannelId;
00305   return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));
00306 }
00307 
00308 void SocketDescriptor
00309 ::deregisterRTPInterface(unsigned char streamChannelId) {
00310   fSubChannelHashTable->Remove((char const*)(long)streamChannelId);
00311 
00312   if (fSubChannelHashTable->IsEmpty()) {
00313     // No more interfaces are using us, so it's curtains for us now
00314     fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
00315     removeSocketDescription(fEnv, fOurSocketNum);
00316     delete this;
00317   }
00318 }
00319 
00320 void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor,
00321                                       int mask) {
00322   do {
00323     UsageEnvironment& env = socketDescriptor->fEnv; // abbrev
00324     int socketNum = socketDescriptor->fOurSocketNum;
00325 
00326     // Begin by reading and discarding any characters that aren't '$'.
00327     // Any such characters are probably regular RTSP responses or
00328     // commands from the server.  At present, we can't do anything with
00329     // these, because we have taken complete control of reading this socket.
00330     // (Later, fix) #####
00331     unsigned char c;
00332     struct sockaddr_in fromAddress;
00333     struct timeval timeout; timeout.tv_sec = 0; timeout.tv_usec = 0;
00334     do {
00335       int result = readSocket(env, socketNum, &c, 1, fromAddress, &timeout);
00336       if (result != 1) { // error reading TCP socket
00337         if (result < 0) {
00338           env.taskScheduler().turnOffBackgroundReadHandling(socketNum); // stops further calls to us
00339         }
00340         return;
00341       }
00342     } while (c != '$');
00343 
00344     // The next byte is the stream channel id:
00345     unsigned char streamChannelId;
00346     if (readSocket(env, socketNum, &streamChannelId, 1, fromAddress)
00347         != 1) break;
00348     RTPInterface* rtpInterface
00349       = socketDescriptor->lookupRTPInterface(streamChannelId);
00350     if (rtpInterface == NULL) break; // we're not interested in this channel
00351 
00352     // The next two bytes are the RTP or RTCP packet size (in network order)
00353     unsigned short size;
00354     if (readSocketExact(env, socketNum, (unsigned char*)&size, 2,
00355                         fromAddress) != 2) break;
00356     rtpInterface->fNextTCPReadSize = ntohs(size);
00357     rtpInterface->fNextTCPReadStreamSocketNum = socketNum;
00358     rtpInterface->fNextTCPReadStreamChannelId = streamChannelId;
00359 #ifdef DEBUG
00360     fprintf(stderr, "SocketDescriptor::tcpReadHandler() reading %d bytes on channel %d\n", rtpInterface->fNextTCPReadSize, streamChannelId);
00361 #endif
00362 
00363     // Now that we have the data set up, call this subchannel's
00364     // read handler:
00365     if (rtpInterface->fReadHandlerProc != NULL) {
00366       rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask);
00367     }
00368 
00369   } while (0);
00370 }
00371 
00372 
00374 
00375 tcpStreamRecord
00376 ::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
00377                   tcpStreamRecord* next)
00378   : fNext(next),
00379     fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {
00380 }
00381 
00382 tcpStreamRecord::~tcpStreamRecord() {
00383   delete fNext;
00384 }
00385                                                                                 

Generated on Tue Jul 22 06:39:06 2008 for live by  doxygen 1.5.2