liveMedia/StreamReplicator.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // "liveMedia"
00017 // Copyright (c) 1996-2012 Live Networks, Inc.  All rights reserved.
00018 // An class that can be used to create (possibly multiple) 'replicas' of an incoming stream.
00019 // Implementation.
00020 
00021 #include "StreamReplicator.hh"
00022 
00024 
00025 class StreamReplica: public FramedSource {
00026 protected:
00027   friend class StreamReplicator;
00028   StreamReplica(StreamReplicator& ourReplicator); // called only by "StreamReplicator::createStreamReplica()"
00029   virtual ~StreamReplica();
00030 
00031 private: // redefined virtual functions:
00032   virtual void doGetNextFrame();
00033   virtual void doStopGettingFrames();
00034 
00035 private:
00036   static void copyReceivedFrame(StreamReplica* toReplica, StreamReplica* fromReplica);
00037 
00038 private:
00039   StreamReplicator& fOurReplicator;
00040   int fFrameIndex; // 0 or 1, depending upon which frame we're currently requesting; could also be -1 if we've stopped playing
00041 
00042   // Replicas that are currently awaiting data are kept in a (singly-linked) list:
00043   StreamReplica* fNext;
00044 };
00045 
00046 
00048 
00049 StreamReplicator* StreamReplicator::createNew(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies) {
00050   return new StreamReplicator(env, inputSource, deleteWhenLastReplicaDies);
00051 }
00052 
00053 StreamReplicator::StreamReplicator(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies)
00054   : Medium(env),
00055     fInputSource(inputSource), fDeleteWhenLastReplicaDies(deleteWhenLastReplicaDies), fInputSourceHasClosed(False),
00056     fNumReplicas(0), fNumActiveReplicas(0), fNumDeliveriesMadeSoFar(0),
00057     fFrameIndex(0), fMasterReplica(NULL), fReplicasAwaitingCurrentFrame(NULL), fReplicasAwaitingNextFrame(NULL) {
00058 }
00059 
00060 StreamReplicator::~StreamReplicator() {
00061   Medium::close(fInputSource);
00062 }
00063 
00064 FramedSource* StreamReplicator::createStreamReplica() {
00065   ++fNumReplicas;
00066   return new StreamReplica(*this);
00067 }
00068 
00069 void StreamReplicator::getNextFrame(StreamReplica* replica) {
00070   if (fInputSourceHasClosed) { // handle closure instead
00071     FramedSource::handleClosure(replica);
00072     return;
00073   }
00074 
00075   if (replica->fFrameIndex == -1) {
00076     // This replica had stopped playing (or had just been created), but is now actively reading.  Note this:
00077     replica->fFrameIndex = fFrameIndex;
00078     ++fNumActiveReplicas;
00079   }
00080 
00081   if (fMasterReplica == NULL) {
00082     // This is the first replica to request the next unread frame.  Make it the 'master' replica - meaning that we read the frame
00083     // into its buffer, and then copy from this into the other replicas' buffers.
00084     fMasterReplica = replica;
00085 
00086     // Arrange to read the next frame into this replica's buffer:
00087     if (fInputSource != NULL) fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize,
00088                                                          afterGettingFrame, this, onSourceClosure, this);
00089   } else if (replica->fFrameIndex != fFrameIndex) {
00090     // This replica is already asking for the next frame (because it has already received the current frame).  Enqueue it:
00091     replica->fNext = fReplicasAwaitingNextFrame;
00092     fReplicasAwaitingNextFrame = replica;
00093   } else {
00094     // This replica is asking for the current frame.  Enqueue it:
00095     replica->fNext = fReplicasAwaitingCurrentFrame;
00096     fReplicasAwaitingCurrentFrame = replica;
00097 
00098     if (fInputSource != NULL && !fInputSource->isCurrentlyAwaitingData()) {
00099       // The current frame has already arrived, so deliver it to this replica now:
00100       deliverReceivedFrame();
00101     }
00102   }
00103 }
00104 
00105 void StreamReplicator::deactivateStreamReplica(StreamReplica* replicaBeingDeactivated) {
00106   // Assert: fNumActiveReplicas > 0
00107   if (fNumReplicas == 0) fprintf(stderr, "StreamReplicator::deactivateStreamReplica() Internal Error!\n"); // should not happen
00108   --fNumActiveReplicas;
00109 
00110   // Check whether the replica being deactivated is the 'master' replica, or is enqueued awaiting a frame:
00111   if (replicaBeingDeactivated == fMasterReplica) {
00112     // We need to replace the 'master replica', if we can:
00113     if (fReplicasAwaitingCurrentFrame == NULL) {
00114       // There's currently no replacement 'master replica'
00115       fMasterReplica = NULL;
00116     } else {
00117       // There's another replica that we can use as a replacement 'master replica':
00118       fMasterReplica = fReplicasAwaitingCurrentFrame;
00119       fReplicasAwaitingCurrentFrame = fReplicasAwaitingCurrentFrame->fNext;
00120       fMasterReplica->fNext = NULL;
00121     }
00122 
00123     // Check whether the read into the old master replica's buffer is still pending, or has completed:
00124     if (fInputSource != NULL) {
00125       if (fInputSource->isCurrentlyAwaitingData()) {
00126         // We have a pending read into the old master replica's buffer.
00127         // We need to stop it, and retry the read with a new master (if available)
00128         fInputSource->stopGettingFrames();
00129 
00130         if (fMasterReplica != NULL) {
00131           fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize,
00132                                      afterGettingFrame, this, onSourceClosure, this);
00133         }
00134       } else {
00135         // The read into the old master replica's buffer has already completed.  Copy the data to the new master replica (if any):
00136         if (fMasterReplica != NULL) {
00137           StreamReplica::copyReceivedFrame(fMasterReplica, replicaBeingDeactivated);
00138         } else {
00139           // We don't have a new master replica, so we can't copy the received frame to any new replica that might ask for it.
00140           // Fortunately this should be a very rare occurrence.
00141         }
00142       }
00143     }
00144   } else {
00145     // The replica that's being removed was not our 'master replica', but make sure it's not on either of our queues:
00146     if (fReplicasAwaitingCurrentFrame != NULL) {
00147       if (replicaBeingDeactivated == fReplicasAwaitingCurrentFrame) {
00148         fReplicasAwaitingCurrentFrame = replicaBeingDeactivated->fNext;
00149         replicaBeingDeactivated->fNext = NULL;
00150       }
00151       else {
00152         for (StreamReplica* r1 = fReplicasAwaitingCurrentFrame; r1->fNext != NULL; r1 = r1->fNext) {
00153           if (r1->fNext == replicaBeingDeactivated) {
00154             r1->fNext = replicaBeingDeactivated->fNext;
00155             replicaBeingDeactivated->fNext = NULL;
00156             break;
00157           }
00158         }
00159       }
00160     }
00161     if (fReplicasAwaitingNextFrame != NULL) {
00162       if (replicaBeingDeactivated == fReplicasAwaitingNextFrame) {
00163         fReplicasAwaitingNextFrame = replicaBeingDeactivated->fNext;
00164         replicaBeingDeactivated->fNext = NULL;
00165       }
00166       else {
00167         for (StreamReplica* r2 = fReplicasAwaitingNextFrame; r2->fNext != NULL; r2 = r2->fNext) {
00168           if (r2->fNext == replicaBeingDeactivated) {
00169             r2->fNext = replicaBeingDeactivated->fNext;
00170             replicaBeingDeactivated->fNext = NULL;
00171             break;
00172           }
00173         }
00174       }
00175     }
00176   }
00177 }
00178 
00179 void StreamReplicator::removeStreamReplica(StreamReplica* replicaBeingRemoved) {
00180   // Assert: fNumReplicas > 0
00181   if (fNumReplicas == 0) fprintf(stderr, "StreamReplicator::removeStreamReplica() Internal Error!\n"); // should not happen
00182   --fNumReplicas;
00183 
00184   // If this was the last replica, then delete ourselves (if we were set up to do so):
00185   if (fNumReplicas == 0 && fDeleteWhenLastReplicaDies) {
00186     delete this;
00187     return;
00188   }
00189 
00190   // Now handle the replica that's being removed the same way that we would if it were merely being deactivated:
00191   if (replicaBeingRemoved->fFrameIndex != -1) { // i.e., we haven't already done this
00192     deactivateStreamReplica(replicaBeingRemoved);
00193   }
00194 }
00195 
00196 void StreamReplicator::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes,
00197                                          struct timeval presentationTime, unsigned durationInMicroseconds) {
00198   ((StreamReplicator*)clientData)->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds);
00199 }
00200 
00201 void StreamReplicator::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes,
00202                                          struct timeval presentationTime, unsigned durationInMicroseconds) {
00203   // The frame was read into our master replica's buffer.  Update the master replica's state, but don't complete delivery to it
00204   // just yet.  We do that later, after we're sure that we've delivered it to all other replicas.
00205   fMasterReplica->fFrameSize = frameSize;
00206   fMasterReplica->fNumTruncatedBytes = numTruncatedBytes;
00207   fMasterReplica->fPresentationTime = presentationTime;
00208   fMasterReplica->fDurationInMicroseconds = durationInMicroseconds;
00209 
00210   deliverReceivedFrame();
00211 }
00212 
00213 void StreamReplicator::onSourceClosure(void* clientData) {
00214   ((StreamReplicator*)clientData)->onSourceClosure();
00215 }
00216 
00217 void StreamReplicator::onSourceClosure() {
00218   fInputSourceHasClosed = True;
00219 
00220   // Signal the closure to each replica that is currently awaiting a frame:
00221   StreamReplica* replica;
00222   while ((replica = fReplicasAwaitingCurrentFrame) != NULL) {
00223     fReplicasAwaitingCurrentFrame = replica->fNext;
00224     replica->fNext = NULL;
00225     FramedSource::handleClosure(replica);
00226   }
00227   while ((replica = fReplicasAwaitingNextFrame) != NULL) {
00228     fReplicasAwaitingNextFrame = replica->fNext;
00229     replica->fNext = NULL;
00230     FramedSource::handleClosure(replica);
00231   }
00232   if ((replica = fMasterReplica) != NULL) {
00233     fMasterReplica = NULL;
00234     FramedSource::handleClosure(replica);
00235   }
00236 }
00237 
00238 void StreamReplicator::deliverReceivedFrame() {
00239   // The 'master replica' has received its copy of the current frame.
00240   // Copy it (and complete delivery) to any other replica that has requested this frame.
00241   // Then, if no more requests for this frame are expected, complete delivery to the 'master replica' itself.
00242   StreamReplica* replica;
00243   while ((replica = fReplicasAwaitingCurrentFrame) != NULL) {
00244     fReplicasAwaitingCurrentFrame = replica->fNext;
00245     replica->fNext = NULL;
00246     
00247     // Assert: fMasterReplica != NULL
00248     if (fMasterReplica == NULL) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 1!\n"); // shouldn't happen
00249     StreamReplica::copyReceivedFrame(replica, fMasterReplica);
00250     replica->fFrameIndex = 1 - replica->fFrameIndex; // toggle it (0<->1), because this replica no longer awaits the current frame
00251     ++fNumDeliveriesMadeSoFar;
00252 
00253     // Assert: fNumDeliveriesMadeSoFar < fNumActiveReplicas; // because we still have the 'master replica' to deliver to
00254     if (!(fNumDeliveriesMadeSoFar < fNumActiveReplicas)) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 2(%d,%d)!\n", fNumDeliveriesMadeSoFar, fNumActiveReplicas); // should not happen
00255 
00256     // Complete delivery to this replica:
00257     FramedSource::afterGetting(replica);
00258   }
00259 
00260   if (fNumDeliveriesMadeSoFar == fNumActiveReplicas - 1 && fMasterReplica != NULL) {
00261     // No more requests for this frame are expected, so complete delivery to the 'master replica':
00262     replica = fMasterReplica;
00263     fMasterReplica = NULL;
00264     replica->fFrameIndex = 1 - replica->fFrameIndex; // toggle it (0<->1), because this replica no longer awaits the current frame
00265     fFrameIndex = 1 - fFrameIndex; // toggle it (0<->1) for the next frame
00266     fNumDeliveriesMadeSoFar = 0; // reset for the next frame
00267 
00268     if (fReplicasAwaitingNextFrame != NULL) {
00269       // One of the other replicas has already requested the next frame, so make it the next 'master replica':
00270       fMasterReplica = fReplicasAwaitingNextFrame;
00271       fReplicasAwaitingNextFrame = fReplicasAwaitingNextFrame->fNext;
00272       fMasterReplica->fNext = NULL;
00273 
00274       // Arrange to read the next frame into this replica's buffer:
00275       if (fInputSource != NULL) fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize,
00276                                                            afterGettingFrame, this, onSourceClosure, this);
00277     }      
00278 
00279     // Move any other replicas that had already requested the next frame to the 'requesting current frame' list:
00280     // Assert: fReplicasAwaitingCurrentFrame == NULL;
00281     if (!(fReplicasAwaitingCurrentFrame == NULL)) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 3!\n"); // should not happen
00282     fReplicasAwaitingCurrentFrame = fReplicasAwaitingNextFrame;
00283     fReplicasAwaitingNextFrame = NULL;
00284     
00285     FramedSource::afterGetting(replica);
00286   }
00287 }
00288 
00289 
00291 
00292 StreamReplica::StreamReplica(StreamReplicator& ourReplicator)
00293   : FramedSource(ourReplicator.envir()),
00294     fOurReplicator(ourReplicator), fFrameIndex(-1/*we haven't started playing yet*/), fNext(NULL) {
00295 }
00296 
00297 StreamReplica::~StreamReplica() {
00298   fOurReplicator.removeStreamReplica(this);
00299 }
00300 
00301 void StreamReplica::doGetNextFrame() {
00302   fOurReplicator.getNextFrame(this);
00303 }
00304 
00305 void StreamReplica::doStopGettingFrames() {
00306   fFrameIndex = -1; // When we start reading again, this will tell the replicator that we were previously inactive.
00307   fOurReplicator.deactivateStreamReplica(this);
00308 }
00309 
00310 void StreamReplica::copyReceivedFrame(StreamReplica* toReplica, StreamReplica* fromReplica) {
00311   // First, figure out how much data to copy.  ("toReplica" might have a smaller buffer than "fromReplica".)
00312   unsigned numNewBytesToTruncate
00313     = toReplica->fMaxSize < fromReplica->fFrameSize ? fromReplica->fFrameSize - toReplica->fMaxSize : 0;
00314   toReplica->fFrameSize = fromReplica->fFrameSize - numNewBytesToTruncate;
00315   toReplica->fNumTruncatedBytes = fromReplica->fNumTruncatedBytes + numNewBytesToTruncate;
00316 
00317   memmove(toReplica->fTo, fromReplica->fTo, toReplica->fFrameSize);
00318   toReplica->fPresentationTime = fromReplica->fPresentationTime;
00319   toReplica->fDurationInMicroseconds = fromReplica->fDurationInMicroseconds;
00320 }

Generated on Thu May 17 07:11:47 2012 for live by  doxygen 1.5.2