Sirikata
space/src/ODPFlowScheduler.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  ODPFlowScheduler.hpp
00003  *
00004  *  Copyright (c) 2010, Ewen Cheslack-Postava
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef _ODP_FLOW_SCHEDULER_HPP_
00034 #define _ODP_FLOW_SCHEDULER_HPP_
00035 
00036 #include <sirikata/space/SpaceContext.hpp>
00037 #include <sirikata/core/queue/AbstractQueue.hpp>
00038 #include "ForwarderServiceQueue.hpp"
00039 #include <sirikata/core/util/RegionWeightCalculator.hpp>
00040 #include <sirikata/core/options/CommonOptions.hpp>
00041 #include <sirikata/space/ObjectSegmentation.hpp>
00042 
00043 namespace Sirikata {
00044 
00062 class ODPFlowScheduler : public AbstractQueue<Message*> {
00063 public:
00064     typedef AbstractQueue<Message*>::Type Type;
00065 
00066     ODPFlowScheduler(SpaceContext* ctx, ForwarderServiceQueue* parent, ServerID sid, uint32 serv_id)
00067      : mContext(ctx),
00068        mParent(parent),
00069        mDestServer(sid),
00070        mServiceID(serv_id),
00071        mSenderTotalWeight(0.0),
00072        mSenderCapacity(0.0),
00073        mReceiverTotalWeight(0.0),
00074        mReceiverCapacity(0.0)
00075     {
00076         mWeightCalculator =
00077             RegionWeightCalculatorFactory::getSingleton().getConstructor(GetOptionValue<String>(OPT_REGION_WEIGHT))(GetOptionValue<String>(OPT_REGION_WEIGHT_ARGS))
00078             ;
00079     }
00080 
00081     virtual ~ODPFlowScheduler() {
00082         delete mWeightCalculator;
00083     }
00084 
00085     // Interface: AbstractQueue<Message*>
00086     virtual QueueEnum::PushResult push(const Type& msg) { assert(false); return QueueEnum::PushExceededMaximumSize; }
00087     virtual const Type& front() const = 0;
00088     virtual Type& front() = 0;
00089     virtual Type pop() = 0;
00090     virtual bool empty() const = 0;
00091     virtual uint32 size() const = 0;
00092 
00093     // ODP push interface. Note: Must be thread safe!
00094     virtual bool push(Sirikata::Protocol::Object::ObjectMessage* msg, const OSegEntry& sourceObjectData, const OSegEntry& dstObjectData) = 0;
00095 
00096     // Get the sum of the weights of active queues.
00097     virtual float totalActiveWeight() = 0;
00098     // Get the total used weight of active queues.  If all flows are saturating,
00099     // this should equal totalActiveWeights, otherwise it will be smaller.  This
00100     // version computes used weights from the perspective of the downstream send
00101     // scheduler, i.e. ServerMessageQueue, using its total weight and capacity.
00102     virtual float totalSenderUsedWeight() = 0;
00103     // Get the total used weight of active queues.  If all flows are saturating,
00104     // this should equal totalActiveWeights, otherwise it will be smaller.  This
00105     // version computes used weights from the perspective of the downstream recv
00106     // scheduler, i.e. ServerMessageReceiver, using its total weight and capacity.
00107     virtual float totalReceiverUsedWeight() = 0;
00108 
00109     // Updates statistics from the downstream send scheduler.  These are just
00110     // stored and are used to compute rate information for this scheduler and is
00111     // input for stats which are sent back to the sender.
00112     void updateSenderStats(double total_weight, double capacity) {
00113         mSenderTotalWeight = total_weight;
00114         mSenderCapacity = capacity;
00115     }
00116     // Updates statistics from the downstream receive scheduler.  These are just
00117     // stored and are used to compute rate information for this scheduler and is
00118     // input for stats which are sent back to the receiver.
00119     void updateReceiverStats(double total_weight, double capacity) {
00120         mReceiverTotalWeight = total_weight;
00121         mReceiverCapacity = capacity;
00122     }
00123 protected:
00124     // Should be called by implementations when an ODP message is successfully added.
00125     void notifyPushFront() {
00126         mParent->notifyPushFront(mDestServer, mServiceID);
00127     }
00128 
00129     Message* createMessageFromODP(Sirikata::Protocol::Object::ObjectMessage* obj_msg, ServerID dest_serv) {
00130         Message* svr_obj_msg = new Message(
00131             mContext->id(),
00132             SERVER_PORT_OBJECT_MESSAGE_ROUTING,
00133             dest_serv,
00134             SERVER_PORT_OBJECT_MESSAGE_ROUTING,
00135             obj_msg
00136         );
00137         return svr_obj_msg;
00138     }
00139 
00140     SpaceContext* mContext;
00141     ForwarderServiceQueue* mParent;
00142     ServerID mDestServer;
00143     uint32 mServiceID;
00144 
00145 
00146     double mSenderTotalWeight; // Total input weight for this pair (sum of used weights)
00147     double mSenderCapacity; // Capacity of sender
00148     double mReceiverTotalWeight; // Total input weight to receiver (sum of used weights)
00149     double mReceiverCapacity; // Capacity of receiver
00150 
00151     RegionWeightCalculator* mWeightCalculator;
00152 }; // class ODPFlowScheduler
00153 
00154 } // namespace Sirikata
00155 
00156 #endif //_ODP_FLOW_SCHEDULER_HPP_