Sirikata
|
00001 /* Sirikata 00002 * FairQueue.hpp 00003 * 00004 * Copyright (c) 2009, Ewen Cheslack-Postava 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef _FAIR_MESSAGE_QUEUE_HPP_ 00034 #define _FAIR_MESSAGE_QUEUE_HPP_ 00035 00036 #include "Queue.hpp" 00037 #include <sirikata/core/util/Time.hpp> 00038 00039 namespace Sirikata { 00040 00044 template <class Message,class Key,class TQueue> class FairQueue { 00045 private: 00046 typedef TQueue MessageQueue; 00047 00048 struct QueueInfo { 00049 private: 00050 QueueInfo() 00051 : key(), 00052 messageQueue(NULL), 00053 weight(1.f), 00054 weight_inv(1.f), 00055 nextFinishMessage(NULL), 00056 nextFinishStartTime(Time::null()), 00057 nextFinishTime(Time::null()), 00058 enabled(true) 00059 { 00060 } 00061 public: 00062 QueueInfo(Key _key, TQueue* queue, float w) 00063 : key(_key), 00064 messageQueue(queue), 00065 weight(w), 00066 weight_inv( w == 0.f ? 0.f : (1.f / w) ), 00067 nextFinishMessage(NULL), 00068 nextFinishStartTime(Time::null()), 00069 nextFinishTime(Time::null()), 00070 enabled(true) 00071 {} 00072 00073 ~QueueInfo() { 00074 delete messageQueue; 00075 } 00076 00077 Key key; 00078 TQueue* messageQueue; 00079 float weight; 00080 float weight_inv; 00081 Message* nextFinishMessage; // Need to verify this matches when we pop it off 00082 Time nextFinishStartTime; // The time the next message to finish started at, used to recompute if front() changed 00083 Time nextFinishTime; 00084 bool enabled; 00085 }; 00086 00087 typedef std::map<Key, QueueInfo*> QueueInfoByKey; // NOTE: this could be unordered, but must be unique associative container 00088 typedef std::multimap<Time, QueueInfo*> QueueInfoByFinishTime; // NOTE: this must be ordered multiple associative container 00089 00090 typedef typename QueueInfoByKey::iterator ByKeyIterator; 00091 typedef typename QueueInfoByKey::const_iterator ConstByKeyIterator; 00092 00093 typedef typename QueueInfoByFinishTime::iterator ByTimeIterator; 00094 typedef typename QueueInfoByFinishTime::const_iterator ConstByTimeIterator; 00095 00096 typedef std::set<Key> KeySet; 00097 typedef std::set<QueueInfo*> QueueInfoSet; 00098 public: 00099 FairQueue() 00100 :zero_time(Duration::zero()), 00101 min_tx_time(Duration::microseconds(1)), 00102 default_tx_time(Duration::seconds((float)1000)), 00103 mCurrentVirtualTime(Time::null()), 00104 mQueuesByKey(), 00105 mQueuesByTime(), 00106 mFrontQueue(NULL) 00107 { 00108 warn_count = 0; 00109 } 00110 00111 ~FairQueue() { 00112 while(!mQueuesByKey.empty()) 00113 removeQueue( mQueuesByKey.begin()->first ); 00114 } 00115 00116 void addQueue(MessageQueue *mq, Key key, float weight) { 00117 QueueInfo* queue_info = new QueueInfo(key, mq, weight); 00118 mQueuesByKey[key] = queue_info; 00119 computeNextFinishTime(queue_info); 00120 mFrontQueue = NULL; // Force recomputation of front 00121 } 00122 00123 void setQueueWeight(Key key, float weight) { 00124 ConstByKeyIterator it = mQueuesByKey.find(key); 00125 if (it != mQueuesByKey.end()) { 00126 QueueInfo* qi = it->second; 00127 float old_weight = qi->weight; 00128 qi->weight = weight; 00129 qi->weight_inv = (weight == 0.f ? 0.f : (1.f/weight)); 00130 // FIXME should we update the finish time here, or just wait until the next packet? 00131 // Updating here requires either starting from current time or keeping track of 00132 // the last dequeued time 00133 //updateNextFinishTime(qi); 00134 00135 // Currently, we just special case queues going from zero 00136 // to non-zero weight so they don't get stuck. Computing 00137 // based on the current virtual time is pretty much always 00138 // better than waiting the default maximum amount of time 00139 // for the current head packet to pass through. 00140 if (old_weight == 0.0) { 00141 removeFromTimeIndex(qi); 00142 computeNextFinishTime(qi); 00143 } 00144 } 00145 } 00146 00147 float getQueueWeight(Key key) const { 00148 ConstByKeyIterator it = mQueuesByKey.find(key); 00149 if (it != mQueuesByKey.end()) 00150 return it->second->weight; 00151 return 0.f; 00152 } 00153 00154 bool removeQueue(Key key) { 00155 // Find the queue 00156 ByKeyIterator it = mQueuesByKey.find(key); 00157 bool havequeue = (it != mQueuesByKey.end()); 00158 if (!havequeue) return false; 00159 00160 QueueInfo* qi = it->second; 00161 00162 // Remove from the time index 00163 removeFromTimeIndex(qi); 00164 00165 // If its the front queue, reset it 00166 if (mFrontQueue == qi) 00167 mFrontQueue = NULL; 00168 00169 // Clean up queue and main entry 00170 mQueuesByKey.erase(it); 00171 delete qi; 00172 00173 return true; 00174 } 00175 00176 // NOTE: Enabling and disabling only affects the computation of front() and 00177 // pop() by setting a flag. Otherwise disabled queues are treated just like 00178 // enabled queues, just ignored when looking for the next item. Both 00179 // operations, under certain conditions, need to reset the previously 00180 // computed front queue because they can affect this computation by adding 00181 // or removing options. 00182 void enableQueue(Key key) { 00183 ByKeyIterator it = mQueuesByKey.find(key); 00184 if (it == mQueuesByKey.end()) 00185 return; 00186 QueueInfo* qi = it->second; 00187 qi->enabled = true; 00188 // Enabling a queue *might* affect the choice of the front queue if 00189 // a. another queue is currently selected as the front 00190 // b. the enabled queue is non-empty 00191 // c. the next finish time of the enabled queue is less than that of 00192 // the currently selected front item 00193 if (mFrontQueue != NULL && // a 00194 !qi->messageQueue->empty() && // b 00195 qi->nextFinishTime < mFrontQueue->nextFinishTime) // c 00196 mFrontQueue = NULL; 00197 } 00198 00199 void disableQueue(Key key) { 00200 ByKeyIterator it = mQueuesByKey.find(key); 00201 assert(it != mQueuesByKey.end()); 00202 QueueInfo* qi = it->second; 00203 qi->enabled = false; 00204 00205 // Disabling a queue will only affect the choice of front queue if the 00206 // one disabled *was* the front queue. 00207 if (mFrontQueue == qi) 00208 mFrontQueue = NULL; 00209 } 00210 00211 bool hasQueue(Key key) const{ 00212 return ( mQueuesByKey.find(key) != mQueuesByKey.end() ); 00213 } 00214 00215 uint32 numQueues() const { 00216 return (uint32)mQueuesByKey.size(); 00217 } 00218 00219 QueueEnum::PushResult push(Key key, Message *msg) { 00220 ByKeyIterator qi_it = mQueuesByKey.find(key); 00221 assert( qi_it != mQueuesByKey.end() ); 00222 00223 QueueInfo* queue_info = qi_it->second; 00224 bool wasEmpty = queue_info->messageQueue->empty() || 00225 queue_info->nextFinishMessage == NULL; 00226 00227 QueueEnum::PushResult pushResult = queue_info->messageQueue->push(msg); 00228 00229 if (wasEmpty) 00230 computeNextFinishTime(queue_info); 00231 00232 return pushResult; 00233 } 00234 00235 // An alternative to push(key,msg) for input queue types that may not allow 00236 // manual pushing of data, e.g. a network queue. This instead allows the 00237 // user to notify the queue that data was pushed onto the queue. Note that 00238 // the name is notifyPushFront, not notifyPush: it is only necessary to call 00239 // this when the item at the front of the queue has changed (either by going 00240 // from empty -> one or more elements, or because the "queue" got 00241 // rearranged). However, it is safe to call this method when any new 00242 // elements are pushed. In this case of the network example, we only need 00243 // to know when data becomes available when none was left in the network 00244 // buffer, but it is safe to call notifyPushFront on every data received 00245 // callback. 00246 void notifyPushFront(Key key) { 00247 ByKeyIterator qi_it = mQueuesByKey.find(key); 00248 assert( qi_it != mQueuesByKey.end() ); 00249 00250 // We just need to (re)compute the next finish time. 00251 QueueInfo* queue_info = qi_it->second; 00252 removeFromTimeIndex(queue_info); 00253 computeNextFinishTime(queue_info); 00254 00255 // Reevaluate front queue 00256 mFrontQueue = NULL; 00257 } 00258 00259 // Returns the next message to deliver 00260 // \returns the next message, or NULL if the queue is empty 00261 Message* front(Key* keyAtFront) { 00262 Message* result = NULL; 00263 00264 if (mFrontQueue == NULL) { 00265 Time vftime(Time::null()); 00266 nextMessage(&result, &vftime, &mFrontQueue); 00267 } 00268 else { // Otherwise, just fill in the information we need from the marked queue 00269 assert(!mFrontQueue->messageQueue->empty()); 00270 result = mFrontQueue->nextFinishMessage; 00271 assert(result == mFrontQueue->messageQueue->front()); 00272 } 00273 00274 if (result != NULL) { 00275 *keyAtFront = mFrontQueue->key; 00276 assert(mFrontQueue->enabled); 00277 return result; 00278 } 00279 00280 return NULL; 00281 } 00282 00283 // Returns the next message to deliver 00284 // \returns the next message, or NULL if the queue is empty 00285 Message* pop(Key* keyAtFront = NULL) { 00286 Message* result = NULL; 00287 Time vftime(Time::null()); 00288 00289 // If we haven't marked any queue as holding the front item, do so now 00290 if (mFrontQueue == NULL) 00291 nextMessage(&result, &vftime, &mFrontQueue); 00292 else { // Otherwise, just fill in the information we need from the marked queue 00293 assert(!mFrontQueue->messageQueue->empty()); 00294 result = mFrontQueue->nextFinishMessage; 00295 assert(result == mFrontQueue->messageQueue->front()); 00296 vftime = mFrontQueue->nextFinishTime; 00297 } 00298 00299 if (result != NULL) { 00300 // Note: we may have skipped a msg using the predicate, so we use max here to make sure 00301 // the virtual time increases monotonically. 00302 mCurrentVirtualTime = std::max(vftime, mCurrentVirtualTime); 00303 00304 assert(mFrontQueue != NULL); 00305 assert(mFrontQueue->enabled); 00306 00307 if (keyAtFront != NULL) 00308 *keyAtFront = mFrontQueue->key; 00309 00310 Message* popped_val = mFrontQueue->messageQueue->pop(); 00311 assert(popped_val == mFrontQueue->nextFinishMessage); 00312 assert(popped_val == result); 00313 00314 // Remove from queue time list 00315 removeFromTimeIndex(mFrontQueue); 00316 // Update finish time and add back to time index if necessary 00317 computeNextFinishTime(mFrontQueue, vftime); 00318 00319 // Unmark the queue as being in front 00320 mFrontQueue = NULL; 00321 } 00322 00323 return result; 00324 } 00325 00326 bool empty() const { 00327 // Queues won't be in mQueuesByTime unless they have something in them 00328 // This allows us to efficiently answer false if we know we have pending 00329 // items 00330 return mQueuesByTime.empty(); 00331 } 00332 00333 // Returns the total amount of space that can be allocated for the destination 00334 uint32 maxSize(Key key) const { 00335 // FIXME we could go through fewer using the ByTime index 00336 ConstByKeyIterator it = mQueuesByKey.find(key); 00337 if (it == mQueuesByKey.end()) return 0; 00338 return it->second->messageQueue->maxSize(); 00339 } 00340 00341 // Returns the total amount of space currently used for the destination 00342 uint32 size(Key key) const { 00343 ConstByKeyIterator it = mQueuesByKey.find(key); 00344 if (it == mQueuesByKey.end()) return 0; 00345 return it->second->messageQueue->size(); 00346 } 00347 00348 // FIXME we really shouldn't have to expose this 00349 float avg_weight() const { 00350 if (mQueuesByKey.size() == 0) return 1.f; 00351 float w_sum = 0.f; 00352 for(ConstByKeyIterator it = mQueuesByKey.begin(); it != mQueuesByKey.end(); it++) 00353 w_sum += it->second->weight; 00354 return w_sum / mQueuesByKey.size(); 00355 } 00356 00357 // Key iteration support 00358 class const_iterator { 00359 public: 00360 Key operator*() const { 00361 return internal_it->first; 00362 } 00363 00364 void operator++() { 00365 ++internal_it; 00366 } 00367 void operator++(int) { 00368 internal_it++; 00369 } 00370 00371 bool operator==(const const_iterator& rhs) const { 00372 return internal_it == rhs.internal_it; 00373 } 00374 bool operator!=(const const_iterator& rhs) const { 00375 return internal_it != rhs.internal_it; 00376 } 00377 private: 00378 friend class FairQueue; 00379 00380 const_iterator(const typename QueueInfoByKey::const_iterator& it) 00381 : internal_it(it) 00382 { 00383 } 00384 00385 const_iterator(); 00386 00387 typename QueueInfoByKey::const_iterator internal_it; 00388 }; 00389 00390 const_iterator keyBegin() const { 00391 return const_iterator(mQueuesByKey.begin()); 00392 } 00393 const_iterator keyEnd() const { 00394 return const_iterator(mQueuesByKey.end()); 00395 } 00396 00397 protected: 00398 // Retrieves the next message to deliver, along with its virtual finish time 00399 // for transmission. Returns null if the queue is empty. 00400 void nextMessage(Message** result_out, Time* vftime_out, QueueInfo** min_queue_info_out) { 00401 *result_out = NULL; 00402 00403 // If there's nothing in the queue, there is no next message 00404 if (mQueuesByTime.empty()) 00405 return; 00406 00407 // Loop through until we find one that has data and can be handled. 00408 bool advance = true; // Indicates whether the loop needs to advance, set to false when an unexpected front has already advanced to the next item in order to remove the current item 00409 for(ByTimeIterator it = mQueuesByTime.begin(); it != mQueuesByTime.end(); advance ? it++ : it) { 00410 QueueInfo* min_queue_info = it->second; 00411 00412 advance = true; 00413 00414 // First, check if this queue is even enabled 00415 if (!min_queue_info->enabled) 00416 continue; 00417 00418 // These just assert that this queue is just sane. 00419 assert(min_queue_info->nextFinishMessage != NULL); 00420 assert(min_queue_info->nextFinishMessage == min_queue_info->messageQueue->front()); 00421 00422 *min_queue_info_out = min_queue_info; 00423 *vftime_out = min_queue_info->nextFinishTime; 00424 *result_out = min_queue_info->nextFinishMessage; 00425 break; 00426 } 00427 } 00428 00429 // Finds and removes this queue from the time index (mQueuesByTime). 00430 void removeFromTimeIndex(QueueInfo* qi) { 00431 std::pair<ByTimeIterator, ByTimeIterator> eq_range = mQueuesByTime.equal_range(qi->nextFinishTime); 00432 ByTimeIterator start_q = eq_range.first; 00433 ByTimeIterator end_q = eq_range.second; 00434 00435 for(ByTimeIterator it = start_q; it != end_q; it++) { 00436 if (it->second == qi) { 00437 mQueuesByTime.erase(it); 00438 return; 00439 } 00440 } 00441 } 00442 00443 // Computes the next finish time for this queue and, if it has one, inserts it into the time index 00444 ByTimeIterator computeNextFinishTime(QueueInfo* qi, const Time& last_finish_time) { 00445 if ( qi->messageQueue->empty() ) { 00446 qi->nextFinishMessage = NULL; 00447 return mQueuesByTime.end(); 00448 } 00449 00450 // If we don't restrict to strict queues, front() may return NULL even though the queue is not empty. 00451 // For example, if the input queue is a FairQueue itself, nothing may be able to send due to the 00452 // canSend predicate. 00453 Message* front_msg = qi->messageQueue->front(); 00454 if ( front_msg == NULL ) { 00455 qi->nextFinishMessage = NULL; 00456 return mQueuesByTime.end(); 00457 } 00458 00459 qi->nextFinishMessage = front_msg; 00460 qi->nextFinishTime = finishTime( front_msg->size(), qi, last_finish_time); 00461 qi->nextFinishStartTime = last_finish_time; 00462 00463 ByTimeIterator new_it = mQueuesByTime.insert( typename QueueInfoByFinishTime::value_type(qi->nextFinishTime, qi) ); 00464 00465 return new_it; 00466 } 00467 00468 void computeNextFinishTime(QueueInfo* qi) { 00469 computeNextFinishTime(qi, mCurrentVirtualTime); 00470 } 00471 00474 Time finishTime(uint32 size, QueueInfo* qi, const Time& last_finish_time) const { 00475 if (qi->weight == 0) { 00476 if (!(warn_count++)) 00477 SILOG(fairqueue,fatal,"Encountered 0 weight."); 00478 return last_finish_time + default_tx_time; 00479 } 00480 00481 Duration transmitTime = Duration::seconds( size * qi->weight_inv ); 00482 if (transmitTime == zero_time) { 00483 SILOG(fairqueue,fatal,"Encountered 0 duration transmission"); 00484 transmitTime = min_tx_time; // just make sure we take *some* time 00485 } 00486 return last_finish_time + transmitTime; 00487 } 00488 00489 protected: 00490 const Duration zero_time; 00491 const Duration min_tx_time; 00492 const Duration default_tx_time; 00493 mutable uint32 warn_count; 00494 00495 uint32 mRate; 00496 Time mCurrentVirtualTime; 00497 // FIXME if I could get the templates to work, using multi_index_container instead of 2 containers would be preferable 00498 QueueInfoByKey mQueuesByKey; 00499 QueueInfoByFinishTime mQueuesByTime; 00500 QueueInfo* mFrontQueue; // Queue holding the front item 00501 }; // class FairQueue 00502 00503 } // namespace Sirikata 00504 00505 #endif //_FAIR_MESSAGE_QUEUE_HPP_