00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "DelayQueue.hh"
00022 #include "GroupsockHelper.hh"
00023
00024 static const int MILLION = 1000000;
00025
00027
00028 int Timeval::operator>=(const Timeval& arg2) const {
00029 return seconds() > arg2.seconds()
00030 || (seconds() == arg2.seconds()
00031 && useconds() >= arg2.useconds());
00032 }
00033
00034 void Timeval::operator+=(const DelayInterval& arg2) {
00035 secs() += arg2.seconds(); usecs() += arg2.useconds();
00036 if (useconds() >= MILLION) {
00037 usecs() -= MILLION;
00038 ++secs();
00039 }
00040 }
00041
00042 void Timeval::operator-=(const DelayInterval& arg2) {
00043 secs() -= arg2.seconds(); usecs() -= arg2.useconds();
00044 if ((int)useconds() < 0) {
00045 usecs() += MILLION;
00046 --secs();
00047 }
00048 if ((int)seconds() < 0)
00049 secs() = usecs() = 0;
00050
00051 }
00052
00053 DelayInterval operator-(const Timeval& arg1, const Timeval& arg2) {
00054 time_base_seconds secs = arg1.seconds() - arg2.seconds();
00055 time_base_seconds usecs = arg1.useconds() - arg2.useconds();
00056
00057 if ((int)usecs < 0) {
00058 usecs += MILLION;
00059 --secs;
00060 }
00061 if ((int)secs < 0)
00062 return DELAY_ZERO;
00063 else
00064 return DelayInterval(secs, usecs);
00065 }
00066
00067
00069
00070 DelayInterval operator*(short arg1, const DelayInterval& arg2) {
00071 time_base_seconds result_seconds = arg1*arg2.seconds();
00072 time_base_seconds result_useconds = arg1*arg2.useconds();
00073
00074 time_base_seconds carry = result_useconds/MILLION;
00075 result_useconds -= carry*MILLION;
00076 result_seconds += carry;
00077
00078 return DelayInterval(result_seconds, result_useconds);
00079 }
00080
00081 #ifndef INT_MAX
00082 #define INT_MAX 0x7FFFFFFF
00083 #endif
00084 const DelayInterval DELAY_ZERO(0, 0);
00085 const DelayInterval DELAY_SECOND(1, 0);
00086 const DelayInterval ETERNITY(INT_MAX, MILLION-1);
00087
00088
00089
00091
00092 intptr_t DelayQueueEntry::tokenCounter = 0;
00093
00094 DelayQueueEntry::DelayQueueEntry(DelayInterval delay)
00095 : fDeltaTimeRemaining(delay) {
00096 fNext = fPrev = this;
00097 fToken = ++tokenCounter;
00098 }
00099
00100 DelayQueueEntry::~DelayQueueEntry() {
00101 }
00102
00103 void DelayQueueEntry::handleTimeout() {
00104 delete this;
00105 }
00106
00107
00109
00110 DelayQueue::DelayQueue()
00111 : DelayQueueEntry(ETERNITY) {
00112 fLastSyncTime = TimeNow();
00113 }
00114
00115 DelayQueue::~DelayQueue() {
00116 while (fNext != this) {
00117 DelayQueueEntry* entryToRemove = fNext;
00118 removeEntry(entryToRemove);
00119 delete entryToRemove;
00120 }
00121 }
00122
00123 void DelayQueue::addEntry(DelayQueueEntry* newEntry) {
00124 synchronize();
00125
00126 DelayQueueEntry* cur = head();
00127 while (newEntry->fDeltaTimeRemaining >= cur->fDeltaTimeRemaining) {
00128 newEntry->fDeltaTimeRemaining -= cur->fDeltaTimeRemaining;
00129 cur = cur->fNext;
00130 }
00131
00132 cur->fDeltaTimeRemaining -= newEntry->fDeltaTimeRemaining;
00133
00134
00135 newEntry->fNext = cur;
00136 newEntry->fPrev = cur->fPrev;
00137 cur->fPrev = newEntry->fPrev->fNext = newEntry;
00138 }
00139
00140 void DelayQueue::updateEntry(DelayQueueEntry* entry, DelayInterval newDelay) {
00141 if (entry == NULL) return;
00142
00143 removeEntry(entry);
00144 entry->fDeltaTimeRemaining = newDelay;
00145 addEntry(entry);
00146 }
00147
00148 void DelayQueue::updateEntry(intptr_t tokenToFind, DelayInterval newDelay) {
00149 DelayQueueEntry* entry = findEntryByToken(tokenToFind);
00150 updateEntry(entry, newDelay);
00151 }
00152
00153 void DelayQueue::removeEntry(DelayQueueEntry* entry) {
00154 if (entry == NULL || entry->fNext == NULL) return;
00155
00156 entry->fNext->fDeltaTimeRemaining += entry->fDeltaTimeRemaining;
00157 entry->fPrev->fNext = entry->fNext;
00158 entry->fNext->fPrev = entry->fPrev;
00159 entry->fNext = entry->fPrev = NULL;
00160
00161 }
00162
00163 DelayQueueEntry* DelayQueue::removeEntry(intptr_t tokenToFind) {
00164 DelayQueueEntry* entry = findEntryByToken(tokenToFind);
00165 removeEntry(entry);
00166 return entry;
00167 }
00168
00169 DelayInterval const& DelayQueue::timeToNextAlarm() {
00170 if (head()->fDeltaTimeRemaining == DELAY_ZERO) return DELAY_ZERO;
00171
00172 synchronize();
00173 return head()->fDeltaTimeRemaining;
00174 }
00175
00176 void DelayQueue::handleAlarm() {
00177 if (head()->fDeltaTimeRemaining != DELAY_ZERO) synchronize();
00178
00179 if (head()->fDeltaTimeRemaining == DELAY_ZERO) {
00180
00181 DelayQueueEntry* toRemove = head();
00182 removeEntry(toRemove);
00183
00184 toRemove->handleTimeout();
00185 }
00186 }
00187
00188 DelayQueueEntry* DelayQueue::findEntryByToken(intptr_t tokenToFind) {
00189 DelayQueueEntry* cur = head();
00190 while (cur != this) {
00191 if (cur->token() == tokenToFind) return cur;
00192 cur = cur->fNext;
00193 }
00194
00195 return NULL;
00196 }
00197
00198 void DelayQueue::synchronize() {
00199
00200 EventTime timeNow = TimeNow();
00201 if (timeNow < fLastSyncTime) {
00202
00203 fLastSyncTime = timeNow;
00204 return;
00205 }
00206 DelayInterval timeSinceLastSync = timeNow - fLastSyncTime;
00207 fLastSyncTime = timeNow;
00208
00209
00210 DelayQueueEntry* curEntry = head();
00211 while (timeSinceLastSync >= curEntry->fDeltaTimeRemaining) {
00212 timeSinceLastSync -= curEntry->fDeltaTimeRemaining;
00213 curEntry->fDeltaTimeRemaining = DELAY_ZERO;
00214 curEntry = curEntry->fNext;
00215 }
00216 curEntry->fDeltaTimeRemaining -= timeSinceLastSync;
00217 }
00218
00219
00221
00222 EventTime TimeNow() {
00223 struct timeval tvNow;
00224
00225 gettimeofday(&tvNow, NULL);
00226
00227 return EventTime(tvNow.tv_sec, tvNow.tv_usec);
00228 }
00229
00230 const EventTime THE_END_OF_TIME(INT_MAX);