Sirikata
|
00001 /* Sirikata 00002 * ODPFlowScheduler.hpp 00003 * 00004 * Copyright (c) 2010, Ewen Cheslack-Postava 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef _ODP_FLOW_SCHEDULER_HPP_ 00034 #define _ODP_FLOW_SCHEDULER_HPP_ 00035 00036 #include <sirikata/space/SpaceContext.hpp> 00037 #include <sirikata/core/queue/AbstractQueue.hpp> 00038 #include "ForwarderServiceQueue.hpp" 00039 #include <sirikata/core/util/RegionWeightCalculator.hpp> 00040 #include <sirikata/core/options/CommonOptions.hpp> 00041 #include <sirikata/space/ObjectSegmentation.hpp> 00042 00043 namespace Sirikata { 00044 00062 class ODPFlowScheduler : public AbstractQueue<Message*> { 00063 public: 00064 typedef AbstractQueue<Message*>::Type Type; 00065 00066 ODPFlowScheduler(SpaceContext* ctx, ForwarderServiceQueue* parent, ServerID sid, uint32 serv_id) 00067 : mContext(ctx), 00068 mParent(parent), 00069 mDestServer(sid), 00070 mServiceID(serv_id), 00071 mSenderTotalWeight(0.0), 00072 mSenderCapacity(0.0), 00073 mReceiverTotalWeight(0.0), 00074 mReceiverCapacity(0.0) 00075 { 00076 mWeightCalculator = 00077 RegionWeightCalculatorFactory::getSingleton().getConstructor(GetOptionValue<String>(OPT_REGION_WEIGHT))(GetOptionValue<String>(OPT_REGION_WEIGHT_ARGS)) 00078 ; 00079 } 00080 00081 virtual ~ODPFlowScheduler() { 00082 delete mWeightCalculator; 00083 } 00084 00085 // Interface: AbstractQueue<Message*> 00086 virtual QueueEnum::PushResult push(const Type& msg) { assert(false); return QueueEnum::PushExceededMaximumSize; } 00087 virtual const Type& front() const = 0; 00088 virtual Type& front() = 0; 00089 virtual Type pop() = 0; 00090 virtual bool empty() const = 0; 00091 virtual uint32 size() const = 0; 00092 00093 // ODP push interface. Note: Must be thread safe! 00094 virtual bool push(Sirikata::Protocol::Object::ObjectMessage* msg, const OSegEntry& sourceObjectData, const OSegEntry& dstObjectData) = 0; 00095 00096 // Get the sum of the weights of active queues. 00097 virtual float totalActiveWeight() = 0; 00098 // Get the total used weight of active queues. If all flows are saturating, 00099 // this should equal totalActiveWeights, otherwise it will be smaller. This 00100 // version computes used weights from the perspective of the downstream send 00101 // scheduler, i.e. ServerMessageQueue, using its total weight and capacity. 00102 virtual float totalSenderUsedWeight() = 0; 00103 // Get the total used weight of active queues. If all flows are saturating, 00104 // this should equal totalActiveWeights, otherwise it will be smaller. This 00105 // version computes used weights from the perspective of the downstream recv 00106 // scheduler, i.e. ServerMessageReceiver, using its total weight and capacity. 00107 virtual float totalReceiverUsedWeight() = 0; 00108 00109 // Updates statistics from the downstream send scheduler. These are just 00110 // stored and are used to compute rate information for this scheduler and is 00111 // input for stats which are sent back to the sender. 00112 void updateSenderStats(double total_weight, double capacity) { 00113 mSenderTotalWeight = total_weight; 00114 mSenderCapacity = capacity; 00115 } 00116 // Updates statistics from the downstream receive scheduler. These are just 00117 // stored and are used to compute rate information for this scheduler and is 00118 // input for stats which are sent back to the receiver. 00119 void updateReceiverStats(double total_weight, double capacity) { 00120 mReceiverTotalWeight = total_weight; 00121 mReceiverCapacity = capacity; 00122 } 00123 protected: 00124 // Should be called by implementations when an ODP message is successfully added. 00125 void notifyPushFront() { 00126 mParent->notifyPushFront(mDestServer, mServiceID); 00127 } 00128 00129 Message* createMessageFromODP(Sirikata::Protocol::Object::ObjectMessage* obj_msg, ServerID dest_serv) { 00130 Message* svr_obj_msg = new Message( 00131 mContext->id(), 00132 SERVER_PORT_OBJECT_MESSAGE_ROUTING, 00133 dest_serv, 00134 SERVER_PORT_OBJECT_MESSAGE_ROUTING, 00135 obj_msg 00136 ); 00137 return svr_obj_msg; 00138 } 00139 00140 SpaceContext* mContext; 00141 ForwarderServiceQueue* mParent; 00142 ServerID mDestServer; 00143 uint32 mServiceID; 00144 00145 00146 double mSenderTotalWeight; // Total input weight for this pair (sum of used weights) 00147 double mSenderCapacity; // Capacity of sender 00148 double mReceiverTotalWeight; // Total input weight to receiver (sum of used weights) 00149 double mReceiverCapacity; // Capacity of receiver 00150 00151 RegionWeightCalculator* mWeightCalculator; 00152 }; // class ODPFlowScheduler 00153 00154 } // namespace Sirikata 00155 00156 #endif //_ODP_FLOW_SCHEDULER_HPP_