Sirikata
libcore/include/sirikata/core/queue/FairQueue.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  FairQueue.hpp
00003  *
00004  *  Copyright (c) 2009, Ewen Cheslack-Postava
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef _FAIR_MESSAGE_QUEUE_HPP_
00034 #define _FAIR_MESSAGE_QUEUE_HPP_
00035 
00036 #include "Queue.hpp"
00037 #include <sirikata/core/util/Time.hpp>
00038 
00039 namespace Sirikata {
00040 
00044 template <class Message,class Key,class TQueue> class FairQueue {
00045 private:
00046     typedef TQueue MessageQueue;
00047 
00048     struct QueueInfo {
00049     private:
00050         QueueInfo()
00051          : key(),
00052            messageQueue(NULL),
00053            weight(1.f),
00054            weight_inv(1.f),
00055            nextFinishMessage(NULL),
00056            nextFinishStartTime(Time::null()),
00057            nextFinishTime(Time::null()),
00058            enabled(true)
00059         {
00060         }
00061     public:
00062         QueueInfo(Key _key, TQueue* queue, float w)
00063          : key(_key),
00064            messageQueue(queue),
00065            weight(w),
00066            weight_inv( w == 0.f ? 0.f : (1.f / w) ),
00067            nextFinishMessage(NULL),
00068            nextFinishStartTime(Time::null()),
00069            nextFinishTime(Time::null()),
00070            enabled(true)
00071         {}
00072 
00073         ~QueueInfo() {
00074             delete messageQueue;
00075         }
00076 
00077         Key key;
00078         TQueue* messageQueue;
00079         float weight;
00080         float weight_inv;
00081         Message* nextFinishMessage; // Need to verify this matches when we pop it off
00082         Time nextFinishStartTime; // The time the next message to finish started at, used to recompute if front() changed
00083         Time nextFinishTime;
00084         bool enabled;
00085     };
00086 
00087     typedef std::map<Key, QueueInfo*> QueueInfoByKey; // NOTE: this could be unordered, but must be unique associative container
00088     typedef std::multimap<Time, QueueInfo*> QueueInfoByFinishTime; // NOTE: this must be ordered multiple associative container
00089 
00090     typedef typename QueueInfoByKey::iterator ByKeyIterator;
00091     typedef typename QueueInfoByKey::const_iterator ConstByKeyIterator;
00092 
00093     typedef typename QueueInfoByFinishTime::iterator ByTimeIterator;
00094     typedef typename QueueInfoByFinishTime::const_iterator ConstByTimeIterator;
00095 
00096     typedef std::set<Key> KeySet;
00097     typedef std::set<QueueInfo*> QueueInfoSet;
00098 public:
00099     FairQueue()
00100      :zero_time(Duration::zero()),
00101       min_tx_time(Duration::microseconds(1)),
00102       default_tx_time(Duration::seconds((float)1000)),
00103       mCurrentVirtualTime(Time::null()),
00104       mQueuesByKey(),
00105       mQueuesByTime(),
00106       mFrontQueue(NULL)
00107     {
00108         warn_count = 0;
00109     }
00110 
00111     ~FairQueue() {
00112         while(!mQueuesByKey.empty())
00113             removeQueue( mQueuesByKey.begin()->first );
00114     }
00115 
00116     void addQueue(MessageQueue *mq, Key key, float weight) {
00117         QueueInfo* queue_info = new QueueInfo(key, mq, weight);
00118         mQueuesByKey[key] = queue_info;
00119         computeNextFinishTime(queue_info);
00120         mFrontQueue = NULL; // Force recomputation of front
00121     }
00122 
00123     void setQueueWeight(Key key, float weight) {
00124         ConstByKeyIterator it = mQueuesByKey.find(key);
00125         if (it != mQueuesByKey.end()) {
00126             QueueInfo* qi = it->second;
00127             float old_weight = qi->weight;
00128             qi->weight = weight;
00129             qi->weight_inv = (weight == 0.f ? 0.f : (1.f/weight));
00130             // FIXME should we update the finish time here, or just wait until the next packet?
00131             // Updating here requires either starting from current time or keeping track of
00132             // the last dequeued time
00133             //updateNextFinishTime(qi);
00134 
00135             // Currently, we just special case queues going from zero
00136             // to non-zero weight so they don't get stuck.  Computing
00137             // based on the current virtual time is pretty much always
00138             // better than waiting the default maximum amount of time
00139             // for the current head packet to pass through.
00140             if (old_weight == 0.0) {
00141                 removeFromTimeIndex(qi);
00142                 computeNextFinishTime(qi);
00143             }
00144         }
00145     }
00146 
00147     float getQueueWeight(Key key) const {
00148         ConstByKeyIterator it = mQueuesByKey.find(key);
00149         if (it != mQueuesByKey.end())
00150             return it->second->weight;
00151         return 0.f;
00152     }
00153 
00154     bool removeQueue(Key key) {
00155         // Find the queue
00156         ByKeyIterator it = mQueuesByKey.find(key);
00157         bool havequeue = (it != mQueuesByKey.end());
00158         if (!havequeue) return false;
00159 
00160         QueueInfo* qi = it->second;
00161 
00162         // Remove from the time index
00163         removeFromTimeIndex(qi);
00164 
00165         // If its the front queue, reset it
00166         if (mFrontQueue == qi)
00167             mFrontQueue = NULL;
00168 
00169         // Clean up queue and main entry
00170         mQueuesByKey.erase(it);
00171         delete qi;
00172 
00173         return true;
00174     }
00175 
00176     // NOTE: Enabling and disabling only affects the computation of front() and
00177     // pop() by setting a flag. Otherwise disabled queues are treated just like
00178     // enabled queues, just ignored when looking for the next item.  Both
00179     // operations, under certain conditions, need to reset the previously
00180     // computed front queue because they can affect this computation by adding
00181     // or removing options.
00182     void enableQueue(Key key) {
00183         ByKeyIterator it = mQueuesByKey.find(key);
00184         if (it == mQueuesByKey.end())
00185             return;
00186         QueueInfo* qi = it->second;
00187         qi->enabled = true;
00188         // Enabling a queue *might* affect the choice of the front queue if
00189         //  a. another queue is currently selected as the front
00190         //  b. the enabled queue is non-empty
00191         //  c. the next finish time of the enabled queue is less than that of
00192         //     the currently selected front item
00193         if (mFrontQueue != NULL && // a
00194             !qi->messageQueue->empty() && // b
00195             qi->nextFinishTime < mFrontQueue->nextFinishTime) // c
00196             mFrontQueue = NULL;
00197     }
00198 
00199     void disableQueue(Key key) {
00200         ByKeyIterator it = mQueuesByKey.find(key);
00201         assert(it != mQueuesByKey.end());
00202         QueueInfo* qi = it->second;
00203         qi->enabled = false;
00204 
00205         // Disabling a queue will only affect the choice of front queue if the
00206         // one disabled *was* the front queue.
00207         if (mFrontQueue == qi)
00208             mFrontQueue = NULL;
00209     }
00210 
00211     bool hasQueue(Key key) const{
00212         return ( mQueuesByKey.find(key) != mQueuesByKey.end() );
00213     }
00214 
00215     uint32 numQueues() const {
00216         return (uint32)mQueuesByKey.size();
00217     }
00218 
00219     QueueEnum::PushResult push(Key key, Message *msg) {
00220         ByKeyIterator qi_it = mQueuesByKey.find(key);
00221         assert( qi_it != mQueuesByKey.end() );
00222 
00223         QueueInfo* queue_info = qi_it->second;
00224         bool wasEmpty = queue_info->messageQueue->empty() ||
00225             queue_info->nextFinishMessage == NULL;
00226 
00227         QueueEnum::PushResult pushResult = queue_info->messageQueue->push(msg);
00228 
00229         if (wasEmpty)
00230             computeNextFinishTime(queue_info);
00231 
00232         return pushResult;
00233     }
00234 
00235     // An alternative to push(key,msg) for input queue types that may not allow
00236     // manual pushing of data, e.g. a network queue.  This instead allows the
00237     // user to notify the queue that data was pushed onto the queue.  Note that
00238     // the name is notifyPushFront, not notifyPush: it is only necessary to call
00239     // this when the item at the front of the queue has changed (either by going
00240     // from empty -> one or more elements, or because the "queue" got
00241     // rearranged).  However, it is safe to call this method when any new
00242     // elements are pushed.  In this case of the network example, we only need
00243     // to know when data becomes available when none was left in the network
00244     // buffer, but it is safe to call notifyPushFront on every data received
00245     // callback.
00246     void notifyPushFront(Key key) {
00247         ByKeyIterator qi_it = mQueuesByKey.find(key);
00248         assert( qi_it != mQueuesByKey.end() );
00249 
00250         // We just need to (re)compute the next finish time.
00251         QueueInfo* queue_info = qi_it->second;
00252         removeFromTimeIndex(queue_info);
00253         computeNextFinishTime(queue_info);
00254 
00255         // Reevaluate front queue
00256         mFrontQueue = NULL;
00257     }
00258 
00259     // Returns the next message to deliver
00260     // \returns the next message, or NULL if the queue is empty
00261     Message* front(Key* keyAtFront) {
00262         Message* result = NULL;
00263 
00264         if (mFrontQueue == NULL) {
00265             Time vftime(Time::null());
00266             nextMessage(&result, &vftime, &mFrontQueue);
00267         }
00268         else { // Otherwise, just fill in the information we need from the marked queue
00269             assert(!mFrontQueue->messageQueue->empty());
00270             result = mFrontQueue->nextFinishMessage;
00271             assert(result == mFrontQueue->messageQueue->front());
00272         }
00273 
00274         if (result != NULL) {
00275             *keyAtFront = mFrontQueue->key;
00276             assert(mFrontQueue->enabled);
00277             return result;
00278         }
00279 
00280         return NULL;
00281     }
00282 
00283     // Returns the next message to deliver
00284     // \returns the next message, or NULL if the queue is empty
00285     Message* pop(Key* keyAtFront = NULL) {
00286         Message* result = NULL;
00287         Time vftime(Time::null());
00288 
00289         // If we haven't marked any queue as holding the front item, do so now
00290         if (mFrontQueue == NULL)
00291             nextMessage(&result, &vftime, &mFrontQueue);
00292         else { // Otherwise, just fill in the information we need from the marked queue
00293             assert(!mFrontQueue->messageQueue->empty());
00294             result = mFrontQueue->nextFinishMessage;
00295             assert(result == mFrontQueue->messageQueue->front());
00296             vftime = mFrontQueue->nextFinishTime;
00297         }
00298 
00299         if (result != NULL) {
00300             // Note: we may have skipped a msg using the predicate, so we use max here to make sure
00301             // the virtual time increases monotonically.
00302             mCurrentVirtualTime = std::max(vftime, mCurrentVirtualTime);
00303 
00304             assert(mFrontQueue != NULL);
00305             assert(mFrontQueue->enabled);
00306 
00307             if (keyAtFront != NULL)
00308                 *keyAtFront = mFrontQueue->key;
00309 
00310             Message* popped_val = mFrontQueue->messageQueue->pop();
00311             assert(popped_val == mFrontQueue->nextFinishMessage);
00312             assert(popped_val == result);
00313 
00314             // Remove from queue time list
00315             removeFromTimeIndex(mFrontQueue);
00316             // Update finish time and add back to time index if necessary
00317             computeNextFinishTime(mFrontQueue, vftime);
00318 
00319             // Unmark the queue as being in front
00320             mFrontQueue = NULL;
00321         }
00322 
00323         return result;
00324     }
00325 
00326     bool empty() const {
00327         // Queues won't be in mQueuesByTime unless they have something in them
00328         // This allows us to efficiently answer false if we know we have pending
00329         // items
00330         return mQueuesByTime.empty();
00331     }
00332 
00333     // Returns the total amount of space that can be allocated for the destination
00334     uint32 maxSize(Key key) const {
00335         // FIXME we could go through fewer using the ByTime index
00336         ConstByKeyIterator it = mQueuesByKey.find(key);
00337         if (it == mQueuesByKey.end()) return 0;
00338         return it->second->messageQueue->maxSize();
00339     }
00340 
00341     // Returns the total amount of space currently used for the destination
00342     uint32 size(Key key) const {
00343         ConstByKeyIterator it = mQueuesByKey.find(key);
00344         if (it == mQueuesByKey.end()) return 0;
00345         return it->second->messageQueue->size();
00346     }
00347 
00348     // FIXME we really shouldn't have to expose this
00349     float avg_weight() const {
00350         if (mQueuesByKey.size() == 0) return 1.f;
00351         float w_sum = 0.f;
00352         for(ConstByKeyIterator it = mQueuesByKey.begin(); it != mQueuesByKey.end(); it++)
00353             w_sum += it->second->weight;
00354         return w_sum / mQueuesByKey.size();
00355     }
00356 
00357     // Key iteration support
00358     class const_iterator {
00359       public:
00360         Key operator*() const {
00361             return internal_it->first;
00362         }
00363 
00364         void operator++() {
00365             ++internal_it;
00366         }
00367         void operator++(int) {
00368             internal_it++;
00369         }
00370 
00371         bool operator==(const const_iterator& rhs) const {
00372             return internal_it == rhs.internal_it;
00373         }
00374         bool operator!=(const const_iterator& rhs) const {
00375             return internal_it != rhs.internal_it;
00376         }
00377       private:
00378         friend class FairQueue;
00379 
00380         const_iterator(const typename QueueInfoByKey::const_iterator& it)
00381                 : internal_it(it)
00382         {
00383         }
00384 
00385         const_iterator();
00386 
00387         typename QueueInfoByKey::const_iterator internal_it;
00388     };
00389 
00390     const_iterator keyBegin() const {
00391         return const_iterator(mQueuesByKey.begin());
00392     }
00393     const_iterator keyEnd() const {
00394         return const_iterator(mQueuesByKey.end());
00395     }
00396 
00397 protected:
00398     // Retrieves the next message to deliver, along with its virtual finish time
00399     // for transmission. Returns null if the queue is empty.
00400     void nextMessage(Message** result_out, Time* vftime_out, QueueInfo** min_queue_info_out) {
00401         *result_out = NULL;
00402 
00403         // If there's nothing in the queue, there is no next message
00404         if (mQueuesByTime.empty())
00405             return;
00406 
00407         // Loop through until we find one that has data and can be handled.
00408         bool advance = true; // Indicates whether the loop needs to advance, set to false when an unexpected front has already advanced to the next item in order to remove the current item
00409         for(ByTimeIterator it = mQueuesByTime.begin(); it != mQueuesByTime.end(); advance ? it++ : it) {
00410             QueueInfo* min_queue_info = it->second;
00411 
00412             advance = true;
00413 
00414             // First, check if this queue is even enabled
00415             if (!min_queue_info->enabled)
00416                 continue;
00417 
00418             // These just assert that this queue is just sane.
00419             assert(min_queue_info->nextFinishMessage != NULL);
00420             assert(min_queue_info->nextFinishMessage == min_queue_info->messageQueue->front());
00421 
00422             *min_queue_info_out = min_queue_info;
00423             *vftime_out = min_queue_info->nextFinishTime;
00424             *result_out = min_queue_info->nextFinishMessage;
00425             break;
00426         }
00427     }
00428 
00429     // Finds and removes this queue from the time index (mQueuesByTime).
00430     void removeFromTimeIndex(QueueInfo* qi) {
00431         std::pair<ByTimeIterator, ByTimeIterator> eq_range = mQueuesByTime.equal_range(qi->nextFinishTime);
00432         ByTimeIterator start_q = eq_range.first;
00433         ByTimeIterator end_q = eq_range.second;
00434 
00435         for(ByTimeIterator it = start_q; it != end_q; it++) {
00436             if (it->second == qi) {
00437                 mQueuesByTime.erase(it);
00438                 return;
00439             }
00440         }
00441     }
00442 
00443     // Computes the next finish time for this queue and, if it has one, inserts it into the time index
00444     ByTimeIterator computeNextFinishTime(QueueInfo* qi, const Time& last_finish_time) {
00445         if ( qi->messageQueue->empty() ) {
00446             qi->nextFinishMessage = NULL;
00447             return mQueuesByTime.end();
00448         }
00449 
00450         // If we don't restrict to strict queues, front() may return NULL even though the queue is not empty.
00451         // For example, if the input queue is a FairQueue itself, nothing may be able to send due to the
00452         // canSend predicate.
00453         Message* front_msg = qi->messageQueue->front();
00454         if ( front_msg == NULL ) {
00455             qi->nextFinishMessage = NULL;
00456             return mQueuesByTime.end();
00457         }
00458 
00459         qi->nextFinishMessage = front_msg;
00460         qi->nextFinishTime = finishTime( front_msg->size(), qi, last_finish_time);
00461         qi->nextFinishStartTime = last_finish_time;
00462 
00463         ByTimeIterator new_it = mQueuesByTime.insert( typename QueueInfoByFinishTime::value_type(qi->nextFinishTime, qi) );
00464 
00465         return new_it;
00466     }
00467 
00468     void computeNextFinishTime(QueueInfo* qi) {
00469         computeNextFinishTime(qi, mCurrentVirtualTime);
00470     }
00471 
00474     Time finishTime(uint32 size, QueueInfo* qi, const Time& last_finish_time) const {
00475         if (qi->weight == 0) {
00476             if (!(warn_count++))
00477                 SILOG(fairqueue,fatal,"Encountered 0 weight.");
00478             return last_finish_time + default_tx_time;
00479         }
00480 
00481         Duration transmitTime = Duration::seconds( size * qi->weight_inv );
00482         if (transmitTime == zero_time) {
00483             SILOG(fairqueue,fatal,"Encountered 0 duration transmission");
00484             transmitTime = min_tx_time; // just make sure we take *some* time
00485         }
00486         return last_finish_time + transmitTime;
00487     }
00488 
00489 protected:
00490     const Duration zero_time;
00491     const Duration min_tx_time;
00492     const Duration default_tx_time;
00493     mutable uint32 warn_count;
00494 
00495     uint32 mRate;
00496     Time mCurrentVirtualTime;
00497     // FIXME if I could get the templates to work, using multi_index_container instead of 2 containers would be preferable
00498     QueueInfoByKey mQueuesByKey;
00499     QueueInfoByFinishTime mQueuesByTime;
00500     QueueInfo* mFrontQueue; // Queue holding the front item
00501 }; // class FairQueue
00502 
00503 } // namespace Sirikata
00504 
00505 #endif //_FAIR_MESSAGE_QUEUE_HPP_