Sirikata
space/src/CSFQODPFlowScheduler.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  CSFQODPFlowScheduler.hpp
00003  *
00004  *  Copyright (c) 2010, Ewen Cheslack-Postava
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef _CSFQ_ODP_FLOW_SCHEDULER_HPP_
00034 #define _CSFQ_ODP_FLOW_SCHEDULER_HPP_
00035 
00036 #include "ODPFlowScheduler.hpp"
00037 #include <sirikata/core/queue/Queue.hpp>
00038 #include "RateEstimator.hpp"
00039 #include <sirikata/core/queue/SizedThreadSafeQueue.hpp>
00040 
00041 //#define CSFQODP_DEBUG
00042 
00043 namespace Sirikata {
00044 
00045 class LocationService;
00046 
00050 class CSFQODPFlowScheduler : public ODPFlowScheduler {
00051 public:
00052     CSFQODPFlowScheduler(SpaceContext* ctx, ForwarderServiceQueue* parent, ServerID sid, uint32 serv_id, uint32 max_size, LocationService* loc);
00053     virtual ~CSFQODPFlowScheduler();
00054 
00055     // Interface: AbstractQueue<Message*>
00056     virtual const Type& front() const;
00057     virtual Type& front();
00058     virtual Type pop();
00059     virtual bool empty() const;
00060     virtual uint32 size() const { return mQueue.getResourceMonitor().filledSize(); }
00061 
00062     // ODP push interface
00063     virtual bool push(Sirikata::Protocol::Object::ObjectMessage* msg, const OSegEntry&, const OSegEntry&);
00064     // Get the sum of the weights of active queues.
00065     virtual float totalActiveWeight();
00066     // Get the total used weight of active queues.  If all flows are saturating,
00067     // this should equal totalActiveWeights, otherwise it will be smaller.
00068     virtual float totalSenderUsedWeight();
00069     // Get the total used weight of active queues.  If all flows are saturating,
00070     // this should equal totalActiveWeights, otherwise it will be smaller.
00071     virtual float totalReceiverUsedWeight();
00072 private:
00073 
00074     enum {
00075         SENDER = 0,
00076         RECEIVER = 1,
00077         NUM_DOWNSTREAM = 2
00078     };
00079 
00080     struct ObjectPair {
00081         ObjectPair(const UUID& s, const UUID& d)
00082          : source(s), dest(d)
00083         {}
00084 
00085         bool operator<(const ObjectPair& rhs) const {
00086             return (source < rhs.source || (source == rhs.source && dest < rhs.dest));
00087         }
00088 
00089         bool operator==(const ObjectPair& rhs) const {
00090             return (source == rhs.source && dest == rhs.dest);
00091         }
00092 
00093         class Hasher {
00094         public:
00095             size_t operator() (const ObjectPair& op) const {
00096                 return *(uint32*)op.source.getArray().data() ^ *(uint32*)op.dest.getArray().data();
00097             }
00098         };
00099 
00100         UUID source;
00101         UUID dest;
00102     };
00103 
00104     struct FlowInfo {
00105         FlowInfo(double w, const Time& start)
00106          : rate(0.0, start),
00107            weight(w)
00108 #ifdef CSFQODP_DEBUG
00109            ,
00110            arrived(0),
00111            accepted(0)
00112 #endif
00113         {
00114             for(int i = 0; i < NUM_DOWNSTREAM; i++)
00115                 usedWeight[i] = w;
00116         }
00117 
00118         RateEstimator rate;
00119         double weight;
00120         double usedWeight[NUM_DOWNSTREAM];
00121 #ifdef CSFQODP_DEBUG
00122         uint64 arrived;
00123         uint64 accepted;
00124 #endif
00125     };
00126 
00127     struct QueuedMessage {
00128         QueuedMessage()
00129          : msg(NULL),
00130            _size(0)
00131         {}
00132 
00133         QueuedMessage(Message* m, int32 s)
00134          : msg(m),
00135            _size(s)
00136         {}
00137 
00138         int32 size() const { return _size; }
00139 
00140         Message* msg;
00141         int32 _size;
00142     };
00143 
00144     FlowInfo* getFlow(const ObjectPair& new_packet_pair, const OSegEntry&src_info, const OSegEntry&dst_info, const Time& t);
00145     void removeFlow(const ObjectPair& packet_pair);
00146     int flowCount() const;
00147     float normalizedFlowWeight(float unnorm_weight);
00148 
00149     void estimateAlpha(int32 packet_size, Time& arrival_time, double label, bool dropped);
00150     bool queueExceedsLowWaterMark() const { return true; } // Not necessary in our implementation
00151     double minCongestedAlpha() const { return mCapacityRate.get() / std::max(1, flowCount()); }
00152 
00153     // Helper to get the region we compute weight over
00154     BoundingBox3f getObjectWeightRegion(const UUID& objid, const OSegEntry& sid) const;
00155 
00156 
00157     boost::mutex mPushMutex;
00158 
00159     // Note: unfortunately we need to mark these as mutable because a)
00160     // SizedThreadSafeQueue doesn't have methods marked properly as const and b)
00161     // ThreadSafeQueue doesn't provide a front() method.
00162     mutable QueuedMessage mQueueBuffer;
00163     mutable Sirikata::SizedThreadSafeQueue<QueuedMessage> mQueue;
00164     mutable Sirikata::AtomicValue<bool> mNeedsNotification;
00165     // Used to collect information for weight computation
00166     LocationService* mLoc;
00167 
00168     // CSFQ Summary Information
00169     SimpleRateEstimator mArrivalRate;
00170     double mSumEstimatedArrivalRates;
00171     SimpleRateEstimator mAcceptedRate;
00172     SimpleRateEstimator mCapacityRate; // Actual capacity observed (no dummy packets)
00173 
00174     // CSFQ Control Information
00175     double mAlpha;
00176     double mAlphaWindowed;
00177     bool mCongested;
00178     Time mCongestionStartTime;
00179     Duration mCongestionWindow;
00180     int mKAlphaReductionsLeft;
00181 
00182     // Per Flow Information
00183     typedef std::tr1::unordered_map<ObjectPair, FlowInfo, ObjectPair::Hasher> FlowMap;
00184     FlowMap mFlows;
00185     // Flow Summary Information
00186     double mTotalActiveWeight;
00187     double mTotalUsedWeight[NUM_DOWNSTREAM];
00188 }; // class CSFQODPFlowScheduler
00189 
00190 } // namespace Sirikata
00191 
00192 #endif //_CSFQ_ODP_FLOW_SCHEDULER_HPP_