Sirikata
|
An ODPFlowScheduler acts as a filter and queue for ODP messages for a single server. More...
#include <ODPFlowScheduler.hpp>
Public Types | |
typedef AbstractQueue< Message * > ::Type | Type |
Public Member Functions | |
ODPFlowScheduler (SpaceContext *ctx, ForwarderServiceQueue *parent, ServerID sid, uint32 serv_id) | |
virtual | ~ODPFlowScheduler () |
virtual QueueEnum::PushResult | push (const Type &msg) |
virtual const Type & | front () const =0 |
virtual Type & | front ()=0 |
virtual Type | pop ()=0 |
virtual bool | empty () const =0 |
virtual uint32 | size () const =0 |
virtual bool | push (Sirikata::Protocol::Object::ObjectMessage *msg, const OSegEntry &sourceObjectData, const OSegEntry &dstObjectData)=0 |
virtual float | totalActiveWeight ()=0 |
virtual float | totalSenderUsedWeight ()=0 |
virtual float | totalReceiverUsedWeight ()=0 |
void | updateSenderStats (double total_weight, double capacity) |
void | updateReceiverStats (double total_weight, double capacity) |
Protected Member Functions | |
void | notifyPushFront () |
Message * | createMessageFromODP (Sirikata::Protocol::Object::ObjectMessage *obj_msg, ServerID dest_serv) |
Protected Attributes | |
SpaceContext * | mContext |
ForwarderServiceQueue * | mParent |
ServerID | mDestServer |
uint32 | mServiceID |
double | mSenderTotalWeight |
double | mSenderCapacity |
double | mReceiverTotalWeight |
double | mReceiverCapacity |
RegionWeightCalculator * | mWeightCalculator |
An ODPFlowScheduler acts as a filter and queue for ODP messages for a single server.
It has 2 primary roles. First, it acts as an ODP input queue for ForwarderServiceQueue; i.e. queues ODP messages, converts them to server messages, and serves them up for ForwarderServiceQueue. Therefore it implements AbstractQueue<Message*>, but uses the ForwarderService::notifyPush back channel, and forces failure if somebody tries to push(Message*) to it.
Second, it collects statistics about active flows and makes that data available to the rest of the system. This data is used in 2 ways: first, on the send side it is used to set the weights on the ServerMessageQueue to schedule between destination servers: i.e. its really just used to create a hierarchical fair queue. It is also used by the receive side: the Forwarder has a control channel which forwards these weights to the destination, which uses them to determine the total weight of active flows for each source server, allowing it to properly balance the input rates.
typedef AbstractQueue<Message*>::Type Sirikata::ODPFlowScheduler::Type |
Reimplemented from Sirikata::AbstractQueue< Message * >.
Sirikata::ODPFlowScheduler::ODPFlowScheduler | ( | SpaceContext * | ctx, |
ForwarderServiceQueue * | parent, | ||
ServerID | sid, | ||
uint32 | serv_id | ||
) | [inline] |
virtual Sirikata::ODPFlowScheduler::~ODPFlowScheduler | ( | ) | [inline, virtual] |
References mWeightCalculator.
Message* Sirikata::ODPFlowScheduler::createMessageFromODP | ( | Sirikata::Protocol::Object::ObjectMessage * | obj_msg, |
ServerID | dest_serv | ||
) | [inline, protected] |
References Sirikata::SpaceContext::id(), mContext, and SERVER_PORT_OBJECT_MESSAGE_ROUTING.
Referenced by Sirikata::RegionODPFlowScheduler::push(), and Sirikata::CSFQODPFlowScheduler::push().
virtual bool Sirikata::ODPFlowScheduler::empty | ( | ) | const [pure virtual] |
Implements Sirikata::AbstractQueue< Message * >.
Implemented in Sirikata::CSFQODPFlowScheduler, and Sirikata::RegionODPFlowScheduler.
virtual const Type& Sirikata::ODPFlowScheduler::front | ( | ) | const [pure virtual] |
Implements Sirikata::AbstractQueue< Message * >.
Implemented in Sirikata::CSFQODPFlowScheduler, and Sirikata::RegionODPFlowScheduler.
virtual Type& Sirikata::ODPFlowScheduler::front | ( | ) | [pure virtual] |
Implements Sirikata::AbstractQueue< Message * >.
Implemented in Sirikata::CSFQODPFlowScheduler, and Sirikata::RegionODPFlowScheduler.
void Sirikata::ODPFlowScheduler::notifyPushFront | ( | ) | [inline, protected] |
References mDestServer, mParent, mServiceID, and Sirikata::ForwarderServiceQueue::notifyPushFront().
Referenced by Sirikata::RegionODPFlowScheduler::push(), and Sirikata::CSFQODPFlowScheduler::push().
virtual Type Sirikata::ODPFlowScheduler::pop | ( | ) | [pure virtual] |
Reimplemented from Sirikata::AbstractQueue< Message * >.
Implemented in Sirikata::CSFQODPFlowScheduler, and Sirikata::RegionODPFlowScheduler.
virtual QueueEnum::PushResult Sirikata::ODPFlowScheduler::push | ( | const Type & | msg | ) | [inline, virtual] |
References Sirikata::QueueEnum::PushExceededMaximumSize.
Referenced by Sirikata::Forwarder::routeObjectMessageToServer().
virtual bool Sirikata::ODPFlowScheduler::push | ( | Sirikata::Protocol::Object::ObjectMessage * | msg, |
const OSegEntry & | sourceObjectData, | ||
const OSegEntry & | dstObjectData | ||
) | [pure virtual] |
Implemented in Sirikata::CSFQODPFlowScheduler, and Sirikata::RegionODPFlowScheduler.
virtual uint32 Sirikata::ODPFlowScheduler::size | ( | ) | const [pure virtual] |
Implements Sirikata::AbstractQueue< Message * >.
Implemented in Sirikata::CSFQODPFlowScheduler, and Sirikata::RegionODPFlowScheduler.
virtual float Sirikata::ODPFlowScheduler::totalActiveWeight | ( | ) | [pure virtual] |
Implemented in Sirikata::CSFQODPFlowScheduler, and Sirikata::RegionODPFlowScheduler.
Referenced by Sirikata::Forwarder::updateServerWeights().
virtual float Sirikata::ODPFlowScheduler::totalReceiverUsedWeight | ( | ) | [pure virtual] |
Implemented in Sirikata::CSFQODPFlowScheduler, and Sirikata::RegionODPFlowScheduler.
Referenced by Sirikata::Forwarder::updateServerWeights().
virtual float Sirikata::ODPFlowScheduler::totalSenderUsedWeight | ( | ) | [pure virtual] |
Implemented in Sirikata::CSFQODPFlowScheduler, and Sirikata::RegionODPFlowScheduler.
Referenced by Sirikata::Forwarder::updateServerWeights().
void Sirikata::ODPFlowScheduler::updateReceiverStats | ( | double | total_weight, |
double | capacity | ||
) | [inline] |
References mReceiverCapacity, and mReceiverTotalWeight.
Referenced by Sirikata::Forwarder::receiveWeightUpdateMessage().
void Sirikata::ODPFlowScheduler::updateSenderStats | ( | double | total_weight, |
double | capacity | ||
) | [inline] |
References mSenderCapacity, and mSenderTotalWeight.
Referenced by Sirikata::Forwarder::receiveWeightUpdateMessage().
SpaceContext* Sirikata::ODPFlowScheduler::mContext [protected] |
Referenced by createMessageFromODP(), Sirikata::CSFQODPFlowScheduler::getObjectWeightRegion(), Sirikata::CSFQODPFlowScheduler::pop(), Sirikata::CSFQODPFlowScheduler::push(), Sirikata::RegionODPFlowScheduler::totalActiveWeight(), Sirikata::RegionODPFlowScheduler::totalReceiverUsedWeight(), and Sirikata::RegionODPFlowScheduler::totalSenderUsedWeight().
ServerID Sirikata::ODPFlowScheduler::mDestServer [protected] |
ForwarderServiceQueue* Sirikata::ODPFlowScheduler::mParent [protected] |
Referenced by notifyPushFront().
double Sirikata::ODPFlowScheduler::mReceiverCapacity [protected] |
double Sirikata::ODPFlowScheduler::mReceiverTotalWeight [protected] |
double Sirikata::ODPFlowScheduler::mSenderCapacity [protected] |
double Sirikata::ODPFlowScheduler::mSenderTotalWeight [protected] |
uint32 Sirikata::ODPFlowScheduler::mServiceID [protected] |
Referenced by notifyPushFront().