Sirikata
space/src/Forwarder.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  Forwarder.hpp
00003  *
00004  *  Copyright (c) 2010, Daniel Reiter Horn
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef _SIRIKATA_FORWARDER_HPP_
00034 #define _SIRIKATA_FORWARDER_HPP_
00035 
00036 #include <sirikata/core/util/Platform.hpp>
00037 #include <sirikata/space/SpaceContext.hpp>
00038 #include <sirikata/space/ServerMessage.hpp>
00039 #include <sirikata/space/SpaceNetwork.hpp>
00040 
00041 #include <sirikata/core/queue/Queue.hpp>
00042 #include <sirikata/core/queue/FairQueue.hpp>
00043 
00044 #include "OSegLookupQueue.hpp"
00045 
00046 #include "ServerMessageQueue.hpp"
00047 #include "ServerMessageReceiver.hpp"
00048 
00049 #include <sirikata/core/odp/SST.hpp>
00050 
00051 #include "ForwarderServiceQueue.hpp"
00052 
00053 #include <sirikata/core/queue/SizedThreadSafeQueue.hpp>
00054 #include <sirikata/core/queue/ThreadSafeQueueWithNotification.hpp>
00055 
00056 namespace Sirikata
00057 {
00058   class Object;
00059   class ObjectSegmentation;
00060   class CoordinateSegmentation;
00061   class SpaceNetwork;
00062 namespace Trace {
00063   class Trace;
00064 }
00065   class ObjectConnection;
00066   class OSegLookupQueue;
00067 class ForwarderServiceQueue;
00068 class ODPFlowScheduler;
00069 class LocationService;
00070 class LocalForwarder;
00071 
00072 namespace ODP {
00073 class DelegateService;
00074 }
00075 
00076 class Forwarder : public ServerMessageDispatcher,
00077             public ServerMessageRouter,
00078                     public MessageRecipient,
00079                     public ServerMessageQueue::Sender,
00080                   public ServerMessageReceiver::Listener,
00081                   private ForwarderServiceQueue::Listener,
00082                   public Service
00083 {
00084 private:
00085     SpaceContext* mContext;
00086     ForwarderServiceQueue *mOutgoingMessages;
00087     ServerMessageQueue* mServerMessageQueue;
00088     ServerMessageReceiver* mServerMessageReceiver;
00089 
00090     LocalForwarder* mLocalForwarder;
00091     OSegLookupQueue* mOSegLookups; //this maps the object ids to a list of
00092                                    //messages that are being looked up in oseg.
00093 
00094     // We maintain a pointer to this Server's DelegateODPService because the
00095     // forwarder is the one that actually intercepts messages
00096     ODP::DelegateService* mDelegateODPService;
00097     // And here we also maintain the core SST datagram layer for the server
00098     SST::BaseDatagramLayer<SpaceObjectReference>::Ptr mSSTDatagramLayer;
00099 
00100 
00101     // Object connections, identified by a separate unique ID to handle fast migrations
00102     uint64 mUniqueConnIDs; // Connection ID generator
00103     struct UniqueObjConn
00104     {
00105       uint64 id;
00106       ObjectConnection* conn;
00107     };
00108     typedef std::tr1::unordered_map<UUID, UniqueObjConn, UUID::Hasher> ObjectConnectionMap;
00109     ObjectConnectionMap mObjectConnections;
00110     OSegLookupQueue::LookupCallback mNullServerIDOSegCallback;
00111     typedef std::vector<ServerID> ListServersUpdate;
00112     typedef std::tr1::unordered_map<UUID,ListServersUpdate, UUID::Hasher> ObjectServerUpdateMap;
00113     ObjectServerUpdateMap mServersToUpdate; // Map of object id -> servers which should be notified of new result
00114 
00115 
00116     // ServerMessageRouter data
00117     ForwarderServiceQueue::ServiceID mServiceIDSource;
00118     typedef std::map<String, ForwarderServiceQueue::ServiceID> ServiceMap;
00119     ServiceMap mServiceIDMap;
00120 
00121     // Per-Service ServerMessage Router's
00122     Router<Message*>* mOSegCacheUpdateRouter;
00123     Router<Message*>* mForwarderWeightRouter;
00124     typedef std::tr1::unordered_map<ServerID, ODPFlowScheduler*> ODPRouterMap;
00125     boost::recursive_mutex mODPRouterMapMutex;
00126     ODPRouterMap mODPRouters;
00127     Poller mServerWeightPoller; // For updating ServerMessageQueue, remote
00128                                 // ServerMessageReceiver with per-server weights
00129 
00130     // Note: This is kinda stupid, but we need to protect this thread safe queue
00131     // with another lock because we don't have a sized thread safe queue with
00132     // notification.
00133     boost::mutex mReceivedMessagesMutex;
00134     Sirikata::SizedThreadSafeQueue<Message*> mReceivedMessages;
00135 
00136     Poller mTimeSeriesPoller;
00137     Time mLastStatsTime;
00138     const String mTimeSeriesForwardedPerSecondName;
00139     AtomicValue<uint32> mForwardedPerSecond;
00140     const String mTimeSeriesDroppedPerSecondName;
00141     AtomicValue<uint32> mDroppedPerSecond;
00142 
00143     // -- Boiler plate stuff - initialization, destruction, methods to satisfy interfaces
00144   public:
00145       Forwarder(SpaceContext* ctx);
00146       ~Forwarder();
00147 
00148     void initialize(ObjectSegmentation* oseg, ServerMessageQueue* smq, ServerMessageReceiver* smr, LocationService* loc);
00149     void setODPService(ODP::DelegateService* odp);
00150 
00151     void setLocalForwarder(LocalForwarder* lf) { mLocalForwarder = lf; }
00152 
00153     // Service Implementation
00154     void start();
00155     void stop();
00156 
00157   private:
00158     void reportStats();
00159 
00160     // Init method: adds an odp routing service to the ForwarderServiceQueue and
00161     // sets up the callback used to create new ODP input queues.
00162     void addODPServerMessageService(LocationService* loc);
00163     // Allocates a new ODPFlowScheduler, invoked by ForwarderServiceQueue when a
00164     // new server connection is made.  This creates it and gets it setup so the
00165     // Forwarder can get weight updates sent to the remote endpoint.
00166     ODPFlowScheduler* createODPFlowScheduler(LocationService* loc, ServerID remote_server, uint32 max_size);
00167 
00168     // Invoked periodically by an (internal) poller to update server fair queue
00169     // weights. Updates local ServerMessageQueue and sends messages to remote
00170     // ServerMessageReceivers.
00171     void updateServerWeights();
00172 
00173     // -- Public routing interface
00174   public:
00175     virtual Router<Message*>* createServerMessageService(const String& name);
00176 
00177     // Used only by Server.  Called from networking thready to try to forward
00178     // quickly (avoiding going through OSeg Lookup Queue) by checking OSeg
00179     // cache.
00180     WARN_UNUSED
00181     bool tryCacheForward(Sirikata::Protocol::Object::ObjectMessage* msg);
00182 
00183     // -- Real routing interface + implementation
00184 
00185 
00186     // --- Inputs
00187   public:
00188     // Received from OH networking, needs forwarding decision.  Forwards or
00189     // drops -- ownership is given to Forwarder either way
00190     void routeObjectHostMessage(Sirikata::Protocol::Object::ObjectMessage* obj_msg);
00191   private:
00192     // Received from other space server, needs forwarding decision
00193     void receiveMessage(Message* msg);
00194 
00195     void receiveObjectRoutingMessage(Message* msg);
00196     void receiveWeightUpdateMessage(Message* msg);
00197 
00198   private:
00199     // --- Worker Methods - do the real forwarding decision making and work
00200 
00205     WARN_UNUSED
00206     bool forward(Sirikata::Protocol::Object::ObjectMessage* msg, ServerID forwardFrom = NullServerID);
00207 
00208     // This version is provided if you already know which server the message should be sent to
00209     void routeObjectMessageToServerNoReturn(Sirikata::Protocol::Object::ObjectMessage* msg, const OSegEntry& dest_serv, OSegLookupQueue::ResolvedFrom resolved_from, ServerID forwardFrom = NullServerID);
00210     WARN_UNUSED
00211     bool routeObjectMessageToServer(Sirikata::Protocol::Object::ObjectMessage* msg, const OSegEntry& dest_serv, OSegLookupQueue::ResolvedFrom resolved_from, ServerID forwardFrom = NullServerID);
00212 
00213     // Dispatches a message destined for the space server itself
00214     void dispatchMessage(Sirikata::Protocol::Object::ObjectMessage* msg) const;
00215 
00216     // Handles the case where OSeg told us we have the object. Post this to the
00217     // main strand.
00218     void handleObjectMessageLoop(Sirikata::Protocol::Object::ObjectMessage* msg) const;
00219 
00220     // ServerMessageQueue::Sender Interface
00221     virtual Message* serverMessagePull(ServerID dest);
00222     virtual bool serverMessageEmpty(ServerID dest);
00223     // ServerMessageReceiver::Listener Interface
00224     virtual void serverConnectionReceived(ServerID sid);
00225     virtual void serverMessageReceived(Message* msg);
00226 
00227     void scheduleProcessReceivedServerMessages();
00228     void processReceivedServerMessages();
00229 
00230     // ForwarderServiceQueue::Listener Interface (passed on to ServerMessageQueue)
00231     virtual void forwarderServiceMessageReady(ServerID dest_server);
00232 
00233 
00234     // -- Object Connection Management used by Server
00235   public:
00236     void addObjectConnection(const UUID& dest_obj, ObjectConnection* conn);
00237     void enableObjectConnection(const UUID& dest_obj);
00238     ObjectConnection* removeObjectConnection(const UUID& dest_obj);
00239   //private: FIXME these should not be public
00240   public:
00241     ObjectConnection* getObjectConnection(const UUID& dest_obj);
00242     ObjectConnection* getObjectConnection(const UUID& dest_obj, uint64& uniqueconnid );
00243 }; // class Forwarder
00244 
00245 } //end namespace Sirikata
00246 
00247 
00248 #endif //_SIRIKATA_FORWARDER_HPP_