Sirikata
|
00001 /* Sirikata 00002 * ForwarderServiceQueue.hpp 00003 * 00004 * Copyright (c) 2010, Ewen Cheslack-Postava 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef _SIRIKATA_FORWARDER_SERVICE_QUEUE_HPP_ 00034 #define _SIRIKATA_FORWARDER_SERVICE_QUEUE_HPP_ 00035 00036 #include <sirikata/core/util/Platform.hpp> 00037 #include <sirikata/core/queue/FairQueue.hpp> 00038 #include <sirikata/space/ServerMessage.hpp> 00039 #include <boost/thread.hpp> 00040 00041 namespace Sirikata { 00042 00048 class ForwarderServiceQueue { 00049 public: 00050 typedef uint32 ServiceID; 00051 typedef AbstractQueue<Message*> MessageQueue; 00052 // Functor for creating a new input message queue. Must take 2 parameters. 00053 // The first is the ServerID this queue is being allocated for . The second 00054 // is a uint32 parameter specifying its maximum size. This will be invoked 00055 // once for each (server, service) pair. 00056 typedef std::tr1::function<MessageQueue*(ServerID,uint32)> MessageQueueCreator; 00057 00058 class Listener { 00059 public: 00060 virtual ~Listener() {} 00061 virtual void forwarderServiceMessageReady(ServerID dest_server) = 0; 00062 }; 00063 00064 ForwarderServiceQueue(ServerID this_server, uint32 size, Listener* listener); 00065 ~ForwarderServiceQueue(); 00066 00067 void addService(ServiceID svc, MessageQueueCreator creator = 0); 00068 00069 Message* front(ServerID sid); 00070 Message* pop(ServerID sid); 00071 bool empty(ServerID sid); 00072 private: 00073 friend class Forwarder; 00074 friend class ForwarderServerMessageRouter; 00075 friend class ODPFlowScheduler; 00076 00077 typedef FairQueue<Message, ServiceID, MessageQueue> OutgoingFairQueue; 00078 typedef std::tr1::unordered_map<ServerID, OutgoingFairQueue*> ServerQueueMap; 00079 typedef std::tr1::unordered_map<ServiceID, MessageQueueCreator> MessageQueueCreatorMap; 00080 00081 ServerID mThisServer; 00082 MessageQueueCreatorMap mQueueCreators; 00083 ServerQueueMap mQueues; 00084 uint32 mQueueSize; 00085 Listener* mListener; 00086 boost::mutex mMutex; 00087 00088 // Normal push 00089 QueueEnum::PushResult push(ServiceID svc, Message* msg); 00090 // Use prePush to indicate you need to push to the queue. This has to be 00091 // here because we need to keep organized with the 00092 // Forwarder/ODPFlowSchedulers, which need us to get them allocated before 00093 // they can be pushed to. 00094 void prePush(ServerID sid); 00095 // Notifies the queue that a push has been successfully performed for the 00096 // given service on the given server. 00097 void notifyPushFront(ServerID sid, ServiceID svc); 00098 00099 uint32 size(ServerID sid, ServiceID svc); 00100 00101 // Utilities 00102 00103 // Gets the FairQueue over services for the specified server. 00104 OutgoingFairQueue* getServerFairQueue(ServerID sid); 00105 // This is just a sanity check -- verifies ofq has an input queue for svc_id 00106 // and returns ofq. 00107 OutgoingFairQueue* checkServiceQueue(OutgoingFairQueue* ofq, ServiceID svc_id); 00108 }; 00109 00110 } // namespace Sirikata 00111 00112 #endif //_SIRIKATA_FORWARDER_SERVICE_QUEUE_HPP_