00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "StreamReplicator.hh"
00022
00024
00025 class StreamReplica: public FramedSource {
00026 protected:
00027 friend class StreamReplicator;
00028 StreamReplica(StreamReplicator& ourReplicator);
00029 virtual ~StreamReplica();
00030
00031 private:
00032 virtual void doGetNextFrame();
00033 virtual void doStopGettingFrames();
00034
00035 private:
00036 static void copyReceivedFrame(StreamReplica* toReplica, StreamReplica* fromReplica);
00037
00038 private:
00039 StreamReplicator& fOurReplicator;
00040 int fFrameIndex;
00041
00042
00043 StreamReplica* fNext;
00044 };
00045
00046
00048
00049 StreamReplicator* StreamReplicator::createNew(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies) {
00050 return new StreamReplicator(env, inputSource, deleteWhenLastReplicaDies);
00051 }
00052
00053 StreamReplicator::StreamReplicator(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies)
00054 : Medium(env),
00055 fInputSource(inputSource), fDeleteWhenLastReplicaDies(deleteWhenLastReplicaDies), fInputSourceHasClosed(False),
00056 fNumReplicas(0), fNumActiveReplicas(0), fNumDeliveriesMadeSoFar(0),
00057 fFrameIndex(0), fMasterReplica(NULL), fReplicasAwaitingCurrentFrame(NULL), fReplicasAwaitingNextFrame(NULL) {
00058 }
00059
00060 StreamReplicator::~StreamReplicator() {
00061 Medium::close(fInputSource);
00062 }
00063
00064 FramedSource* StreamReplicator::createStreamReplica() {
00065 ++fNumReplicas;
00066 return new StreamReplica(*this);
00067 }
00068
00069 void StreamReplicator::getNextFrame(StreamReplica* replica) {
00070 if (fInputSourceHasClosed) {
00071 FramedSource::handleClosure(replica);
00072 return;
00073 }
00074
00075 if (replica->fFrameIndex == -1) {
00076
00077 replica->fFrameIndex = fFrameIndex;
00078 ++fNumActiveReplicas;
00079 }
00080
00081 if (fMasterReplica == NULL) {
00082
00083
00084 fMasterReplica = replica;
00085
00086
00087 if (fInputSource != NULL) fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize,
00088 afterGettingFrame, this, onSourceClosure, this);
00089 } else if (replica->fFrameIndex != fFrameIndex) {
00090
00091 replica->fNext = fReplicasAwaitingNextFrame;
00092 fReplicasAwaitingNextFrame = replica;
00093 } else {
00094
00095 replica->fNext = fReplicasAwaitingCurrentFrame;
00096 fReplicasAwaitingCurrentFrame = replica;
00097
00098 if (fInputSource != NULL && !fInputSource->isCurrentlyAwaitingData()) {
00099
00100 deliverReceivedFrame();
00101 }
00102 }
00103 }
00104
00105 void StreamReplicator::deactivateStreamReplica(StreamReplica* replicaBeingDeactivated) {
00106
00107 if (fNumReplicas == 0) fprintf(stderr, "StreamReplicator::deactivateStreamReplica() Internal Error!\n");
00108 --fNumActiveReplicas;
00109
00110
00111 if (replicaBeingDeactivated == fMasterReplica) {
00112
00113 if (fReplicasAwaitingCurrentFrame == NULL) {
00114
00115 fMasterReplica = NULL;
00116 } else {
00117
00118 fMasterReplica = fReplicasAwaitingCurrentFrame;
00119 fReplicasAwaitingCurrentFrame = fReplicasAwaitingCurrentFrame->fNext;
00120 fMasterReplica->fNext = NULL;
00121 }
00122
00123
00124 if (fInputSource != NULL) {
00125 if (fInputSource->isCurrentlyAwaitingData()) {
00126
00127
00128 fInputSource->stopGettingFrames();
00129
00130 if (fMasterReplica != NULL) {
00131 fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize,
00132 afterGettingFrame, this, onSourceClosure, this);
00133 }
00134 } else {
00135
00136 if (fMasterReplica != NULL) {
00137 StreamReplica::copyReceivedFrame(fMasterReplica, replicaBeingDeactivated);
00138 } else {
00139
00140
00141 }
00142 }
00143 }
00144 } else {
00145
00146 if (fReplicasAwaitingCurrentFrame != NULL) {
00147 if (replicaBeingDeactivated == fReplicasAwaitingCurrentFrame) {
00148 fReplicasAwaitingCurrentFrame = replicaBeingDeactivated->fNext;
00149 replicaBeingDeactivated->fNext = NULL;
00150 }
00151 else {
00152 for (StreamReplica* r1 = fReplicasAwaitingCurrentFrame; r1->fNext != NULL; r1 = r1->fNext) {
00153 if (r1->fNext == replicaBeingDeactivated) {
00154 r1->fNext = replicaBeingDeactivated->fNext;
00155 replicaBeingDeactivated->fNext = NULL;
00156 break;
00157 }
00158 }
00159 }
00160 }
00161 if (fReplicasAwaitingNextFrame != NULL) {
00162 if (replicaBeingDeactivated == fReplicasAwaitingNextFrame) {
00163 fReplicasAwaitingNextFrame = replicaBeingDeactivated->fNext;
00164 replicaBeingDeactivated->fNext = NULL;
00165 }
00166 else {
00167 for (StreamReplica* r2 = fReplicasAwaitingNextFrame; r2->fNext != NULL; r2 = r2->fNext) {
00168 if (r2->fNext == replicaBeingDeactivated) {
00169 r2->fNext = replicaBeingDeactivated->fNext;
00170 replicaBeingDeactivated->fNext = NULL;
00171 break;
00172 }
00173 }
00174 }
00175 }
00176 }
00177 }
00178
00179 void StreamReplicator::removeStreamReplica(StreamReplica* replicaBeingRemoved) {
00180
00181 if (fNumReplicas == 0) fprintf(stderr, "StreamReplicator::removeStreamReplica() Internal Error!\n");
00182 --fNumReplicas;
00183
00184
00185 if (fNumReplicas == 0 && fDeleteWhenLastReplicaDies) {
00186 delete this;
00187 return;
00188 }
00189
00190
00191 if (replicaBeingRemoved->fFrameIndex != -1) {
00192 deactivateStreamReplica(replicaBeingRemoved);
00193 }
00194 }
00195
00196 void StreamReplicator::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes,
00197 struct timeval presentationTime, unsigned durationInMicroseconds) {
00198 ((StreamReplicator*)clientData)->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds);
00199 }
00200
00201 void StreamReplicator::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes,
00202 struct timeval presentationTime, unsigned durationInMicroseconds) {
00203
00204
00205 fMasterReplica->fFrameSize = frameSize;
00206 fMasterReplica->fNumTruncatedBytes = numTruncatedBytes;
00207 fMasterReplica->fPresentationTime = presentationTime;
00208 fMasterReplica->fDurationInMicroseconds = durationInMicroseconds;
00209
00210 deliverReceivedFrame();
00211 }
00212
00213 void StreamReplicator::onSourceClosure(void* clientData) {
00214 ((StreamReplicator*)clientData)->onSourceClosure();
00215 }
00216
00217 void StreamReplicator::onSourceClosure() {
00218 fInputSourceHasClosed = True;
00219
00220
00221 StreamReplica* replica;
00222 while ((replica = fReplicasAwaitingCurrentFrame) != NULL) {
00223 fReplicasAwaitingCurrentFrame = replica->fNext;
00224 replica->fNext = NULL;
00225 FramedSource::handleClosure(replica);
00226 }
00227 while ((replica = fReplicasAwaitingNextFrame) != NULL) {
00228 fReplicasAwaitingNextFrame = replica->fNext;
00229 replica->fNext = NULL;
00230 FramedSource::handleClosure(replica);
00231 }
00232 if ((replica = fMasterReplica) != NULL) {
00233 fMasterReplica = NULL;
00234 FramedSource::handleClosure(replica);
00235 }
00236 }
00237
00238 void StreamReplicator::deliverReceivedFrame() {
00239
00240
00241
00242 StreamReplica* replica;
00243 while ((replica = fReplicasAwaitingCurrentFrame) != NULL) {
00244 fReplicasAwaitingCurrentFrame = replica->fNext;
00245 replica->fNext = NULL;
00246
00247
00248 if (fMasterReplica == NULL) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 1!\n");
00249 StreamReplica::copyReceivedFrame(replica, fMasterReplica);
00250 replica->fFrameIndex = 1 - replica->fFrameIndex;
00251 ++fNumDeliveriesMadeSoFar;
00252
00253
00254 if (!(fNumDeliveriesMadeSoFar < fNumActiveReplicas)) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 2(%d,%d)!\n", fNumDeliveriesMadeSoFar, fNumActiveReplicas);
00255
00256
00257 FramedSource::afterGetting(replica);
00258 }
00259
00260 if (fNumDeliveriesMadeSoFar == fNumActiveReplicas - 1 && fMasterReplica != NULL) {
00261
00262 replica = fMasterReplica;
00263 fMasterReplica = NULL;
00264 replica->fFrameIndex = 1 - replica->fFrameIndex;
00265 fFrameIndex = 1 - fFrameIndex;
00266 fNumDeliveriesMadeSoFar = 0;
00267
00268 if (fReplicasAwaitingNextFrame != NULL) {
00269
00270 fMasterReplica = fReplicasAwaitingNextFrame;
00271 fReplicasAwaitingNextFrame = fReplicasAwaitingNextFrame->fNext;
00272 fMasterReplica->fNext = NULL;
00273
00274
00275 if (fInputSource != NULL) fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize,
00276 afterGettingFrame, this, onSourceClosure, this);
00277 }
00278
00279
00280
00281 if (!(fReplicasAwaitingCurrentFrame == NULL)) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 3!\n");
00282 fReplicasAwaitingCurrentFrame = fReplicasAwaitingNextFrame;
00283 fReplicasAwaitingNextFrame = NULL;
00284
00285 FramedSource::afterGetting(replica);
00286 }
00287 }
00288
00289
00291
00292 StreamReplica::StreamReplica(StreamReplicator& ourReplicator)
00293 : FramedSource(ourReplicator.envir()),
00294 fOurReplicator(ourReplicator), fFrameIndex(-1), fNext(NULL) {
00295 }
00296
00297 StreamReplica::~StreamReplica() {
00298 fOurReplicator.removeStreamReplica(this);
00299 }
00300
00301 void StreamReplica::doGetNextFrame() {
00302 fOurReplicator.getNextFrame(this);
00303 }
00304
00305 void StreamReplica::doStopGettingFrames() {
00306 fFrameIndex = -1;
00307 fOurReplicator.deactivateStreamReplica(this);
00308 }
00309
00310 void StreamReplica::copyReceivedFrame(StreamReplica* toReplica, StreamReplica* fromReplica) {
00311
00312 unsigned numNewBytesToTruncate
00313 = toReplica->fMaxSize < fromReplica->fFrameSize ? fromReplica->fFrameSize - toReplica->fMaxSize : 0;
00314 toReplica->fFrameSize = fromReplica->fFrameSize - numNewBytesToTruncate;
00315 toReplica->fNumTruncatedBytes = fromReplica->fNumTruncatedBytes + numNewBytesToTruncate;
00316
00317 memmove(toReplica->fTo, fromReplica->fTo, toReplica->fFrameSize);
00318 toReplica->fPresentationTime = fromReplica->fPresentationTime;
00319 toReplica->fDurationInMicroseconds = fromReplica->fDurationInMicroseconds;
00320 }