Sirikata
|
00001 /* Sirikata 00002 * CSFQODPFlowScheduler.hpp 00003 * 00004 * Copyright (c) 2010, Ewen Cheslack-Postava 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef _CSFQ_ODP_FLOW_SCHEDULER_HPP_ 00034 #define _CSFQ_ODP_FLOW_SCHEDULER_HPP_ 00035 00036 #include "ODPFlowScheduler.hpp" 00037 #include <sirikata/core/queue/Queue.hpp> 00038 #include "RateEstimator.hpp" 00039 #include <sirikata/core/queue/SizedThreadSafeQueue.hpp> 00040 00041 //#define CSFQODP_DEBUG 00042 00043 namespace Sirikata { 00044 00045 class LocationService; 00046 00050 class CSFQODPFlowScheduler : public ODPFlowScheduler { 00051 public: 00052 CSFQODPFlowScheduler(SpaceContext* ctx, ForwarderServiceQueue* parent, ServerID sid, uint32 serv_id, uint32 max_size, LocationService* loc); 00053 virtual ~CSFQODPFlowScheduler(); 00054 00055 // Interface: AbstractQueue<Message*> 00056 virtual const Type& front() const; 00057 virtual Type& front(); 00058 virtual Type pop(); 00059 virtual bool empty() const; 00060 virtual uint32 size() const { return mQueue.getResourceMonitor().filledSize(); } 00061 00062 // ODP push interface 00063 virtual bool push(Sirikata::Protocol::Object::ObjectMessage* msg, const OSegEntry&, const OSegEntry&); 00064 // Get the sum of the weights of active queues. 00065 virtual float totalActiveWeight(); 00066 // Get the total used weight of active queues. If all flows are saturating, 00067 // this should equal totalActiveWeights, otherwise it will be smaller. 00068 virtual float totalSenderUsedWeight(); 00069 // Get the total used weight of active queues. If all flows are saturating, 00070 // this should equal totalActiveWeights, otherwise it will be smaller. 00071 virtual float totalReceiverUsedWeight(); 00072 private: 00073 00074 enum { 00075 SENDER = 0, 00076 RECEIVER = 1, 00077 NUM_DOWNSTREAM = 2 00078 }; 00079 00080 struct ObjectPair { 00081 ObjectPair(const UUID& s, const UUID& d) 00082 : source(s), dest(d) 00083 {} 00084 00085 bool operator<(const ObjectPair& rhs) const { 00086 return (source < rhs.source || (source == rhs.source && dest < rhs.dest)); 00087 } 00088 00089 bool operator==(const ObjectPair& rhs) const { 00090 return (source == rhs.source && dest == rhs.dest); 00091 } 00092 00093 class Hasher { 00094 public: 00095 size_t operator() (const ObjectPair& op) const { 00096 return *(uint32*)op.source.getArray().data() ^ *(uint32*)op.dest.getArray().data(); 00097 } 00098 }; 00099 00100 UUID source; 00101 UUID dest; 00102 }; 00103 00104 struct FlowInfo { 00105 FlowInfo(double w, const Time& start) 00106 : rate(0.0, start), 00107 weight(w) 00108 #ifdef CSFQODP_DEBUG 00109 , 00110 arrived(0), 00111 accepted(0) 00112 #endif 00113 { 00114 for(int i = 0; i < NUM_DOWNSTREAM; i++) 00115 usedWeight[i] = w; 00116 } 00117 00118 RateEstimator rate; 00119 double weight; 00120 double usedWeight[NUM_DOWNSTREAM]; 00121 #ifdef CSFQODP_DEBUG 00122 uint64 arrived; 00123 uint64 accepted; 00124 #endif 00125 }; 00126 00127 struct QueuedMessage { 00128 QueuedMessage() 00129 : msg(NULL), 00130 _size(0) 00131 {} 00132 00133 QueuedMessage(Message* m, int32 s) 00134 : msg(m), 00135 _size(s) 00136 {} 00137 00138 int32 size() const { return _size; } 00139 00140 Message* msg; 00141 int32 _size; 00142 }; 00143 00144 FlowInfo* getFlow(const ObjectPair& new_packet_pair, const OSegEntry&src_info, const OSegEntry&dst_info, const Time& t); 00145 void removeFlow(const ObjectPair& packet_pair); 00146 int flowCount() const; 00147 float normalizedFlowWeight(float unnorm_weight); 00148 00149 void estimateAlpha(int32 packet_size, Time& arrival_time, double label, bool dropped); 00150 bool queueExceedsLowWaterMark() const { return true; } // Not necessary in our implementation 00151 double minCongestedAlpha() const { return mCapacityRate.get() / std::max(1, flowCount()); } 00152 00153 // Helper to get the region we compute weight over 00154 BoundingBox3f getObjectWeightRegion(const UUID& objid, const OSegEntry& sid) const; 00155 00156 00157 boost::mutex mPushMutex; 00158 00159 // Note: unfortunately we need to mark these as mutable because a) 00160 // SizedThreadSafeQueue doesn't have methods marked properly as const and b) 00161 // ThreadSafeQueue doesn't provide a front() method. 00162 mutable QueuedMessage mQueueBuffer; 00163 mutable Sirikata::SizedThreadSafeQueue<QueuedMessage> mQueue; 00164 mutable Sirikata::AtomicValue<bool> mNeedsNotification; 00165 // Used to collect information for weight computation 00166 LocationService* mLoc; 00167 00168 // CSFQ Summary Information 00169 SimpleRateEstimator mArrivalRate; 00170 double mSumEstimatedArrivalRates; 00171 SimpleRateEstimator mAcceptedRate; 00172 SimpleRateEstimator mCapacityRate; // Actual capacity observed (no dummy packets) 00173 00174 // CSFQ Control Information 00175 double mAlpha; 00176 double mAlphaWindowed; 00177 bool mCongested; 00178 Time mCongestionStartTime; 00179 Duration mCongestionWindow; 00180 int mKAlphaReductionsLeft; 00181 00182 // Per Flow Information 00183 typedef std::tr1::unordered_map<ObjectPair, FlowInfo, ObjectPair::Hasher> FlowMap; 00184 FlowMap mFlows; 00185 // Flow Summary Information 00186 double mTotalActiveWeight; 00187 double mTotalUsedWeight[NUM_DOWNSTREAM]; 00188 }; // class CSFQODPFlowScheduler 00189 00190 } // namespace Sirikata 00191 00192 #endif //_CSFQ_ODP_FLOW_SCHEDULER_HPP_