Sirikata
|
00001 /* Sirikata 00002 * Forwarder.hpp 00003 * 00004 * Copyright (c) 2010, Daniel Reiter Horn 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef _SIRIKATA_FORWARDER_HPP_ 00034 #define _SIRIKATA_FORWARDER_HPP_ 00035 00036 #include <sirikata/core/util/Platform.hpp> 00037 #include <sirikata/space/SpaceContext.hpp> 00038 #include <sirikata/space/ServerMessage.hpp> 00039 #include <sirikata/space/SpaceNetwork.hpp> 00040 00041 #include <sirikata/core/queue/Queue.hpp> 00042 #include <sirikata/core/queue/FairQueue.hpp> 00043 00044 #include "OSegLookupQueue.hpp" 00045 00046 #include "ServerMessageQueue.hpp" 00047 #include "ServerMessageReceiver.hpp" 00048 00049 #include <sirikata/core/odp/SST.hpp> 00050 00051 #include "ForwarderServiceQueue.hpp" 00052 00053 #include <sirikata/core/queue/SizedThreadSafeQueue.hpp> 00054 #include <sirikata/core/queue/ThreadSafeQueueWithNotification.hpp> 00055 00056 namespace Sirikata 00057 { 00058 class Object; 00059 class ObjectSegmentation; 00060 class CoordinateSegmentation; 00061 class SpaceNetwork; 00062 namespace Trace { 00063 class Trace; 00064 } 00065 class ObjectConnection; 00066 class OSegLookupQueue; 00067 class ForwarderServiceQueue; 00068 class ODPFlowScheduler; 00069 class LocationService; 00070 class LocalForwarder; 00071 00072 namespace ODP { 00073 class DelegateService; 00074 } 00075 00076 class Forwarder : public ServerMessageDispatcher, 00077 public ServerMessageRouter, 00078 public MessageRecipient, 00079 public ServerMessageQueue::Sender, 00080 public ServerMessageReceiver::Listener, 00081 private ForwarderServiceQueue::Listener, 00082 public Service 00083 { 00084 private: 00085 SpaceContext* mContext; 00086 ForwarderServiceQueue *mOutgoingMessages; 00087 ServerMessageQueue* mServerMessageQueue; 00088 ServerMessageReceiver* mServerMessageReceiver; 00089 00090 LocalForwarder* mLocalForwarder; 00091 OSegLookupQueue* mOSegLookups; //this maps the object ids to a list of 00092 //messages that are being looked up in oseg. 00093 00094 // We maintain a pointer to this Server's DelegateODPService because the 00095 // forwarder is the one that actually intercepts messages 00096 ODP::DelegateService* mDelegateODPService; 00097 // And here we also maintain the core SST datagram layer for the server 00098 SST::BaseDatagramLayer<SpaceObjectReference>::Ptr mSSTDatagramLayer; 00099 00100 00101 // Object connections, identified by a separate unique ID to handle fast migrations 00102 uint64 mUniqueConnIDs; // Connection ID generator 00103 struct UniqueObjConn 00104 { 00105 uint64 id; 00106 ObjectConnection* conn; 00107 }; 00108 typedef std::tr1::unordered_map<UUID, UniqueObjConn, UUID::Hasher> ObjectConnectionMap; 00109 ObjectConnectionMap mObjectConnections; 00110 OSegLookupQueue::LookupCallback mNullServerIDOSegCallback; 00111 typedef std::vector<ServerID> ListServersUpdate; 00112 typedef std::tr1::unordered_map<UUID,ListServersUpdate, UUID::Hasher> ObjectServerUpdateMap; 00113 ObjectServerUpdateMap mServersToUpdate; // Map of object id -> servers which should be notified of new result 00114 00115 00116 // ServerMessageRouter data 00117 ForwarderServiceQueue::ServiceID mServiceIDSource; 00118 typedef std::map<String, ForwarderServiceQueue::ServiceID> ServiceMap; 00119 ServiceMap mServiceIDMap; 00120 00121 // Per-Service ServerMessage Router's 00122 Router<Message*>* mOSegCacheUpdateRouter; 00123 Router<Message*>* mForwarderWeightRouter; 00124 typedef std::tr1::unordered_map<ServerID, ODPFlowScheduler*> ODPRouterMap; 00125 boost::recursive_mutex mODPRouterMapMutex; 00126 ODPRouterMap mODPRouters; 00127 Poller mServerWeightPoller; // For updating ServerMessageQueue, remote 00128 // ServerMessageReceiver with per-server weights 00129 00130 // Note: This is kinda stupid, but we need to protect this thread safe queue 00131 // with another lock because we don't have a sized thread safe queue with 00132 // notification. 00133 boost::mutex mReceivedMessagesMutex; 00134 Sirikata::SizedThreadSafeQueue<Message*> mReceivedMessages; 00135 00136 Poller mTimeSeriesPoller; 00137 Time mLastStatsTime; 00138 const String mTimeSeriesForwardedPerSecondName; 00139 AtomicValue<uint32> mForwardedPerSecond; 00140 const String mTimeSeriesDroppedPerSecondName; 00141 AtomicValue<uint32> mDroppedPerSecond; 00142 00143 // -- Boiler plate stuff - initialization, destruction, methods to satisfy interfaces 00144 public: 00145 Forwarder(SpaceContext* ctx); 00146 ~Forwarder(); 00147 00148 void initialize(ObjectSegmentation* oseg, ServerMessageQueue* smq, ServerMessageReceiver* smr, LocationService* loc); 00149 void setODPService(ODP::DelegateService* odp); 00150 00151 void setLocalForwarder(LocalForwarder* lf) { mLocalForwarder = lf; } 00152 00153 // Service Implementation 00154 void start(); 00155 void stop(); 00156 00157 private: 00158 void reportStats(); 00159 00160 // Init method: adds an odp routing service to the ForwarderServiceQueue and 00161 // sets up the callback used to create new ODP input queues. 00162 void addODPServerMessageService(LocationService* loc); 00163 // Allocates a new ODPFlowScheduler, invoked by ForwarderServiceQueue when a 00164 // new server connection is made. This creates it and gets it setup so the 00165 // Forwarder can get weight updates sent to the remote endpoint. 00166 ODPFlowScheduler* createODPFlowScheduler(LocationService* loc, ServerID remote_server, uint32 max_size); 00167 00168 // Invoked periodically by an (internal) poller to update server fair queue 00169 // weights. Updates local ServerMessageQueue and sends messages to remote 00170 // ServerMessageReceivers. 00171 void updateServerWeights(); 00172 00173 // -- Public routing interface 00174 public: 00175 virtual Router<Message*>* createServerMessageService(const String& name); 00176 00177 // Used only by Server. Called from networking thready to try to forward 00178 // quickly (avoiding going through OSeg Lookup Queue) by checking OSeg 00179 // cache. 00180 WARN_UNUSED 00181 bool tryCacheForward(Sirikata::Protocol::Object::ObjectMessage* msg); 00182 00183 // -- Real routing interface + implementation 00184 00185 00186 // --- Inputs 00187 public: 00188 // Received from OH networking, needs forwarding decision. Forwards or 00189 // drops -- ownership is given to Forwarder either way 00190 void routeObjectHostMessage(Sirikata::Protocol::Object::ObjectMessage* obj_msg); 00191 private: 00192 // Received from other space server, needs forwarding decision 00193 void receiveMessage(Message* msg); 00194 00195 void receiveObjectRoutingMessage(Message* msg); 00196 void receiveWeightUpdateMessage(Message* msg); 00197 00198 private: 00199 // --- Worker Methods - do the real forwarding decision making and work 00200 00205 WARN_UNUSED 00206 bool forward(Sirikata::Protocol::Object::ObjectMessage* msg, ServerID forwardFrom = NullServerID); 00207 00208 // This version is provided if you already know which server the message should be sent to 00209 void routeObjectMessageToServerNoReturn(Sirikata::Protocol::Object::ObjectMessage* msg, const OSegEntry& dest_serv, OSegLookupQueue::ResolvedFrom resolved_from, ServerID forwardFrom = NullServerID); 00210 WARN_UNUSED 00211 bool routeObjectMessageToServer(Sirikata::Protocol::Object::ObjectMessage* msg, const OSegEntry& dest_serv, OSegLookupQueue::ResolvedFrom resolved_from, ServerID forwardFrom = NullServerID); 00212 00213 // Dispatches a message destined for the space server itself 00214 void dispatchMessage(Sirikata::Protocol::Object::ObjectMessage* msg) const; 00215 00216 // Handles the case where OSeg told us we have the object. Post this to the 00217 // main strand. 00218 void handleObjectMessageLoop(Sirikata::Protocol::Object::ObjectMessage* msg) const; 00219 00220 // ServerMessageQueue::Sender Interface 00221 virtual Message* serverMessagePull(ServerID dest); 00222 virtual bool serverMessageEmpty(ServerID dest); 00223 // ServerMessageReceiver::Listener Interface 00224 virtual void serverConnectionReceived(ServerID sid); 00225 virtual void serverMessageReceived(Message* msg); 00226 00227 void scheduleProcessReceivedServerMessages(); 00228 void processReceivedServerMessages(); 00229 00230 // ForwarderServiceQueue::Listener Interface (passed on to ServerMessageQueue) 00231 virtual void forwarderServiceMessageReady(ServerID dest_server); 00232 00233 00234 // -- Object Connection Management used by Server 00235 public: 00236 void addObjectConnection(const UUID& dest_obj, ObjectConnection* conn); 00237 void enableObjectConnection(const UUID& dest_obj); 00238 ObjectConnection* removeObjectConnection(const UUID& dest_obj); 00239 //private: FIXME these should not be public 00240 public: 00241 ObjectConnection* getObjectConnection(const UUID& dest_obj); 00242 ObjectConnection* getObjectConnection(const UUID& dest_obj, uint64& uniqueconnid ); 00243 }; // class Forwarder 00244 00245 } //end namespace Sirikata 00246 00247 00248 #endif //_SIRIKATA_FORWARDER_HPP_