liveMedia/RTPInterface.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // "liveMedia"
00017 // Copyright (c) 1996-2012 Live Networks, Inc.  All rights reserved.
00018 // An abstraction of a network interface used for RTP (or RTCP).
00019 // (This allows the RTP-over-TCP hack (RFC 2326, section 10.12) to
00020 // be implemented transparently.)
00021 // Implementation
00022 
00023 #include "RTPInterface.hh"
00024 #include <GroupsockHelper.hh>
00025 #include <stdio.h>
00026 
00028 
00029 // Helper routines and data structures, used to implement
00030 // sending/receiving RTP/RTCP over a TCP socket:
00031 
00032 static Boolean sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00033                               int socketNum, unsigned char streamChannelId);
00034 
00035 // Reading RTP-over-TCP is implemented using two levels of hash tables.
00036 // The top-level hash table maps TCP socket numbers to a
00037 // "SocketDescriptor" that contains a hash table for each of the
00038 // sub-channels that are reading from this socket.
00039 
00040 static HashTable* socketHashTable(UsageEnvironment& env, Boolean createIfNotPresent = True) {
00041   _Tables* ourTables = _Tables::getOurTables(env, createIfNotPresent);
00042   if (ourTables == NULL) return NULL;
00043 
00044   if (ourTables->socketTable == NULL) {
00045     // Create a new socket number -> SocketDescriptor mapping table:
00046     ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);
00047   }
00048   return (HashTable*)(ourTables->socketTable);
00049 }
00050 
00051 class SocketDescriptor {
00052 public:
00053   SocketDescriptor(UsageEnvironment& env, int socketNum);
00054   virtual ~SocketDescriptor();
00055 
00056   void registerRTPInterface(unsigned char streamChannelId,
00057                             RTPInterface* rtpInterface);
00058   RTPInterface* lookupRTPInterface(unsigned char streamChannelId);
00059   void deregisterRTPInterface(unsigned char streamChannelId);
00060 
00061   void setServerRequestAlternativeByteHandler(ServerRequestAlternativeByteHandler* handler, void* clientData) {
00062     fServerRequestAlternativeByteHandler = handler;
00063     fServerRequestAlternativeByteHandlerClientData = clientData;
00064   }
00065 
00066 private:
00067   static void tcpReadHandler(SocketDescriptor*, int mask);
00068   void tcpReadHandler1(int mask);
00069 
00070 private:
00071   UsageEnvironment& fEnv;
00072   int fOurSocketNum;
00073   HashTable* fSubChannelHashTable;
00074   ServerRequestAlternativeByteHandler* fServerRequestAlternativeByteHandler;
00075   void* fServerRequestAlternativeByteHandlerClientData;
00076   u_int8_t fStreamChannelId, fSizeByte1;
00077   enum { AWAITING_DOLLAR, AWAITING_STREAM_CHANNEL_ID, AWAITING_SIZE1, AWAITING_SIZE2, AWAITING_PACKET_DATA } fTCPReadingState;
00078 };
00079 
00080 static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env, int sockNum, Boolean createIfNotFound = True) {
00081   HashTable* table = socketHashTable(env, createIfNotFound);
00082   if (table == NULL) return NULL;
00083 
00084   char const* key = (char const*)(long)sockNum;
00085   SocketDescriptor* socketDescriptor = (SocketDescriptor*)(table->Lookup(key));
00086   if (socketDescriptor == NULL && createIfNotFound) {
00087     socketDescriptor = new SocketDescriptor(env, sockNum);
00088     table->Add((char const*)(long)(sockNum), socketDescriptor);
00089   }
00090 
00091   return socketDescriptor;
00092 }
00093 
00094 static void removeSocketDescription(UsageEnvironment& env, int sockNum) {
00095   char const* key = (char const*)(long)sockNum;
00096   HashTable* table = socketHashTable(env);
00097   table->Remove(key);
00098 
00099   if (table->IsEmpty()) {
00100     // We can also delete the table (to reclaim space):
00101     _Tables* ourTables = _Tables::getOurTables(env);
00102     delete table;
00103     ourTables->socketTable = NULL;
00104     ourTables->reclaimIfPossible();
00105   }
00106 }
00107 
00108 
00110 
00111 RTPInterface::RTPInterface(Medium* owner, Groupsock* gs)
00112   : fOwner(owner), fGS(gs),
00113     fTCPStreams(NULL),
00114     fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1),
00115     fNextTCPReadStreamChannelId(0xFF), fReadHandlerProc(NULL),
00116     fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) {
00117   // Make the socket non-blocking, even though it will be read from only asynchronously, when packets arrive.
00118   // The reason for this is that, in some OSs, reads on a blocking socket can (allegedly) sometimes block,
00119   // even if the socket was previously reported (e.g., by "select()") as having data available.
00120   // (This can supposedly happen if the UDP checksum fails, for example.)
00121   makeSocketNonBlocking(fGS->socketNum());
00122   increaseSendBufferTo(envir(), fGS->socketNum(), 50*1024);
00123 }
00124 
00125 RTPInterface::~RTPInterface() {
00126   delete fTCPStreams;
00127 }
00128 
00129 void RTPInterface::setStreamSocket(int sockNum,
00130                                    unsigned char streamChannelId) {
00131   fGS->removeAllDestinations();
00132   addStreamSocket(sockNum, streamChannelId);
00133 }
00134 
00135 void RTPInterface::addStreamSocket(int sockNum,
00136                                    unsigned char streamChannelId) {
00137   if (sockNum < 0) return;
00138 
00139   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00140        streams = streams->fNext) {
00141     if (streams->fStreamSocketNum == sockNum
00142         && streams->fStreamChannelId == streamChannelId) {
00143       return; // we already have it
00144     }
00145   }
00146 
00147   fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);
00148 
00149   // Also, make sure this new socket is set up for receiving RTP/RTCP-over-TCP:
00150   SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), sockNum);
00151   socketDescriptor->registerRTPInterface(streamChannelId, this);
00152 }
00153 
00154 static void deregisterSocket(UsageEnvironment& env, int sockNum, unsigned char streamChannelId) {
00155   SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, sockNum, False);
00156   if (socketDescriptor != NULL) {
00157     socketDescriptor->deregisterRTPInterface(streamChannelId);
00158         // Note: This may delete "socketDescriptor",
00159         // if no more interfaces are using this socket
00160   }
00161 }
00162 
00163 void RTPInterface::removeStreamSocket(int sockNum,
00164                                       unsigned char streamChannelId) {
00165   for (tcpStreamRecord** streamsPtr = &fTCPStreams; *streamsPtr != NULL;
00166        streamsPtr = &((*streamsPtr)->fNext)) {
00167     if ((*streamsPtr)->fStreamSocketNum == sockNum
00168         && (*streamsPtr)->fStreamChannelId == streamChannelId) {
00169       deregisterSocket(envir(), sockNum, streamChannelId);
00170 
00171       // Then remove the record pointed to by *streamsPtr :
00172       tcpStreamRecord* next = (*streamsPtr)->fNext;
00173       (*streamsPtr)->fNext = NULL;
00174       delete (*streamsPtr);
00175       *streamsPtr = next;
00176       return;
00177     }
00178   }
00179 }
00180 
00181 void RTPInterface
00182 ::setServerRequestAlternativeByteHandler(int socketNum, ServerRequestAlternativeByteHandler* handler, void* clientData) {
00183   SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), socketNum);
00184 
00185   if (socketDescriptor != NULL) socketDescriptor->setServerRequestAlternativeByteHandler(handler, clientData);
00186 }
00187 
00188 
00189 Boolean RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
00190   Boolean success = True; // we'll return False instead if any of the sends fail
00191 
00192   // Normal case: Send as a UDP packet:
00193   if (!fGS->output(envir(), fGS->ttl(), packet, packetSize)) success = False;
00194 
00195   // Also, send over each of our TCP sockets:
00196   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00197        streams = streams->fNext) {
00198     if (!sendRTPOverTCP(packet, packetSize,
00199                         streams->fStreamSocketNum, streams->fStreamChannelId)) {
00200       success = False;
00201     }
00202   }
00203 
00204   return success;
00205 }
00206 
00207 void RTPInterface
00208 ::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
00209   // Normal case: Arrange to read UDP packets:
00210   envir().taskScheduler().
00211     turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);
00212 
00213   // Also, receive RTP over TCP, on each of our TCP connections:
00214   fReadHandlerProc = handlerProc;
00215   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00216        streams = streams->fNext) {
00217     // Get a socket descriptor for "streams->fStreamSocketNum":
00218     SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
00219 
00220     // Tell it about our subChannel:
00221     socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
00222   }
00223 }
00224 
00225 Boolean RTPInterface::handleRead(unsigned char* buffer, unsigned bufferMaxSize,
00226                                  unsigned& bytesRead, struct sockaddr_in& fromAddress, Boolean& packetReadWasIncomplete) {
00227   packetReadWasIncomplete = False; // by default
00228   Boolean readSuccess;
00229   if (fNextTCPReadStreamSocketNum < 0) {
00230     // Normal case: read from the (datagram) 'groupsock':
00231     readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);
00232   } else {
00233     // Read from the TCP connection:
00234     bytesRead = 0;
00235     unsigned totBytesToRead = fNextTCPReadSize;
00236     if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize;
00237     unsigned curBytesToRead = totBytesToRead;
00238     int curBytesRead;
00239     while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,
00240                                       &buffer[bytesRead], curBytesToRead,
00241                                       fromAddress)) > 0) {
00242       bytesRead += curBytesRead;
00243       if (bytesRead >= totBytesToRead) break;
00244       curBytesToRead -= curBytesRead;
00245     }
00246     fNextTCPReadSize -= bytesRead;
00247     if (fNextTCPReadSize > 0) {
00248       packetReadWasIncomplete = True;
00249       return True;
00250     } else if (curBytesRead < 0) {
00251       bytesRead = 0;
00252       readSuccess = False;
00253     } else {
00254       readSuccess = True;
00255     }
00256     fNextTCPReadStreamSocketNum = -1; // default, for next time
00257   }
00258 
00259   if (readSuccess && fAuxReadHandlerFunc != NULL) {
00260     // Also pass the newly-read packet data to our auxilliary handler:
00261     (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);
00262   }
00263   return readSuccess;
00264 }
00265 
00266 void RTPInterface::stopNetworkReading() {
00267   // Normal case
00268   envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum());
00269 
00270   // Also turn off read handling on each of our TCP connections:
00271   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00272        streams = streams->fNext) {
00273     deregisterSocket(envir(), streams->fStreamSocketNum, streams->fStreamChannelId);
00274   }
00275 }
00276 
00277 
00279 
00280 Boolean sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00281                     int socketNum, unsigned char streamChannelId) {
00282 #ifdef DEBUG
00283   fprintf(stderr, "sendRTPOverTCP: %d bytes over channel %d (socket %d)\n",
00284           packetSize, streamChannelId, socketNum); fflush(stderr);
00285 #endif
00286   // Send RTP over TCP, using the encoding defined in
00287   // RFC 2326, section 10.12:
00288   do {
00289     char const dollar = '$';
00290     if (send(socketNum, &dollar, 1, 0) != 1) break;
00291     if (send(socketNum, (char*)&streamChannelId, 1, 0) != 1) break;
00292 
00293     char netPacketSize[2];
00294     netPacketSize[0] = (char) ((packetSize&0xFF00)>>8);
00295     netPacketSize[1] = (char) (packetSize&0xFF);
00296     if (send(socketNum, netPacketSize, 2, 0) != 2) break;
00297 
00298     if (send(socketNum, (char*)packet, packetSize, 0) != (int)packetSize) break;
00299 
00300 #ifdef DEBUG
00301     fprintf(stderr, "sendRTPOverTCP: completed\n"); fflush(stderr);
00302 #endif
00303 
00304     return True;
00305   } while (0);
00306 
00307 #ifdef DEBUG
00308   fprintf(stderr, "sendRTPOverTCP: failed!\n"); fflush(stderr);
00309 #endif
00310   return False;
00311 }
00312 
00313 SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum)
00314   :fEnv(env), fOurSocketNum(socketNum),
00315     fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)),
00316    fServerRequestAlternativeByteHandler(NULL), fServerRequestAlternativeByteHandlerClientData(NULL),
00317    fTCPReadingState(AWAITING_DOLLAR) {
00318 }
00319 
00320 SocketDescriptor::~SocketDescriptor() {
00321   delete fSubChannelHashTable;
00322 }
00323 
00324 void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId,
00325                                             RTPInterface* rtpInterface) {
00326   Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty();
00327   fSubChannelHashTable->Add((char const*)(long)streamChannelId,
00328                             rtpInterface);
00329 
00330   if (isFirstRegistration) {
00331     // Arrange to handle reads on this TCP socket:
00332     TaskScheduler::BackgroundHandlerProc* handler
00333       = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler;
00334     fEnv.taskScheduler().
00335       turnOnBackgroundReadHandling(fOurSocketNum, handler, this);
00336   }
00337 }
00338 
00339 RTPInterface* SocketDescriptor
00340 ::lookupRTPInterface(unsigned char streamChannelId) {
00341   char const* lookupArg = (char const*)(long)streamChannelId;
00342   return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));
00343 }
00344 
00345 void SocketDescriptor
00346 ::deregisterRTPInterface(unsigned char streamChannelId) {
00347   fSubChannelHashTable->Remove((char const*)(long)streamChannelId);
00348 
00349   if (fSubChannelHashTable->IsEmpty()) {
00350     // No more interfaces are using us, so it's curtains for us now
00351     fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
00352     removeSocketDescription(fEnv, fOurSocketNum);
00353     delete this;
00354   }
00355 }
00356 
00357 void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor, int mask) {
00358   socketDescriptor->tcpReadHandler1(mask);
00359 }
00360 
00361 void SocketDescriptor::tcpReadHandler1(int mask) {
00362   // We expect the following data over the TCP channel:
00363   //   optional RTSP command or response bytes (before the first '$' character)
00364   //   a '$' character
00365   //   a 1-byte channel id
00366   //   a 2-byte packet size (in network byte order)
00367   //   the packet data.
00368   // However, because the socket is being read asynchronously, this data might arrive in pieces.
00369   
00370   u_int8_t c;
00371   struct sockaddr_in fromAddress;
00372   if (fTCPReadingState != AWAITING_PACKET_DATA) {
00373     int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);
00374     if (result != 1) { // error reading TCP socket, or no more data available
00375       if (result < 0) { // error
00376         fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum); // stops further calls to us
00377       }
00378       return;
00379     }
00380   }
00381   
00382   switch (fTCPReadingState) {
00383     case AWAITING_DOLLAR: {
00384       if (c == '$') {
00385         fTCPReadingState = AWAITING_STREAM_CHANNEL_ID;
00386       } else {
00387         // This character is part of a RTSP request or command, which is handled separately:
00388         if (fServerRequestAlternativeByteHandler != NULL) {
00389           (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, c);
00390         }
00391       }
00392       break;
00393     }
00394     case AWAITING_STREAM_CHANNEL_ID: {
00395       // The byte that we read is the stream channel id.
00396       if (lookupRTPInterface(c) != NULL) { // sanity check
00397         fStreamChannelId = c;
00398         fTCPReadingState = AWAITING_SIZE1;
00399       } else {
00400         // This wasn't a stream channel id that we expected.  We're (somehow) in a strange state.  Try to recover:
00401         fTCPReadingState = AWAITING_DOLLAR;
00402       }
00403       break;
00404     }
00405     case AWAITING_SIZE1: {
00406       // The byte that we read is the first (high) byte of the 16-bit RTP or RTCP packet 'size'.
00407       fSizeByte1 = c;
00408       fTCPReadingState = AWAITING_SIZE2;
00409       break;
00410     }
00411     case AWAITING_SIZE2: {
00412       // The byte that we read is the second (low) byte of the 16-bit RTP or RTCP packet 'size'.
00413       unsigned short size = (fSizeByte1<<8)|c;
00414       
00415       // Record the information about the packet data that will be read next:
00416       RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
00417       if (rtpInterface != NULL) {
00418         rtpInterface->fNextTCPReadSize = size;
00419         rtpInterface->fNextTCPReadStreamSocketNum = fOurSocketNum;
00420         rtpInterface->fNextTCPReadStreamChannelId = fStreamChannelId;
00421       }
00422       fTCPReadingState = AWAITING_PACKET_DATA;
00423       break;
00424     }
00425     case AWAITING_PACKET_DATA: {
00426       // Call the appropriate read handler to get the packet data from the TCP stream:
00427       RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
00428       if (rtpInterface != NULL) {
00429         if (rtpInterface->fNextTCPReadSize == 0) {
00430           // We've already read all the data for this packet.
00431           fTCPReadingState = AWAITING_DOLLAR;
00432           break;
00433         }
00434         if (rtpInterface->fReadHandlerProc != NULL) {
00435 #ifdef DEBUG
00436           fprintf(stderr, "SocketDescriptor::tcpReadHandler() reading %d bytes on channel %d\n", rtpInterface->fNextTCPReadSize, rtpInterface->fNextTCPReadStreamChannelId);
00437 #endif
00438           rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask);
00439         }
00440       }
00441       return;
00442     }
00443   }
00444 }
00445 
00446 
00448 
00449 tcpStreamRecord
00450 ::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
00451                   tcpStreamRecord* next)
00452   : fNext(next),
00453     fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {
00454 }
00455 
00456 tcpStreamRecord::~tcpStreamRecord() {
00457   delete fNext;
00458 }
00459 

Generated on Thu May 17 07:11:47 2012 for live by  doxygen 1.5.2