Sirikata
space/src/Server.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  Server.hpp
00003  *
00004  *  Copyright (c) 2010, Daniel Reiter Horn
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 
00034 #ifndef _SIRIKATA_SERVER_HPP_
00035 #define _SIRIKATA_SERVER_HPP_
00036 
00037 #include <sirikata/core/util/Platform.hpp>
00038 #include <sirikata/space/SpaceContext.hpp>
00039 
00040 #include <sirikata/space/ObjectHostConnectionManager.hpp>
00041 #include <sirikata/core/service/Service.hpp>
00042 #include <sirikata/core/queue/SizedThreadSafeQueue.hpp>
00043 
00044 #include <sirikata/core/util/MotionVector.hpp>
00045 #include <sirikata/core/util/AggregateBoundingInfo.hpp>
00046 
00047 #include "Protocol_Session.pbj.hpp"
00048 #include "Protocol_Migration.pbj.hpp"
00049 
00050 #include <sirikata/space/ObjectSegmentation.hpp>
00051 
00052 #include <sirikata/core/odp/DelegateService.hpp>
00053 #include <sirikata/core/ohdp/DelegateService.hpp>
00054 
00055 #include <sirikata/core/sync/TimeSyncServer.hpp>
00056 
00057 #include <sirikata/core/command/Commander.hpp>
00058 
00059 namespace Sirikata
00060 {
00061 class Authenticator;
00062 
00063 class Forwarder;
00064 class LocalForwarder;
00065 
00066 class LocationService;
00067 class Proximity;
00068 class MigrationMonitor;
00069 
00070 class CoordinateSegmentation;
00071 class ObjectSegmentation;
00072 
00073 class ObjectConnection;
00074 class ObjectHostConnectionManager;
00075 
00076 class ObjectHostSessionManager;
00077 class ObjectSessionManager;
00078 
00085 class Server :
00086         public MessageRecipient, public Service,
00087         public OSegWriteListener,
00088         public ODP::DelegateService, public OHDP::DelegateService,
00089         ObjectHostConnectionManager::Listener
00090 {
00091 public:
00092     Server(SpaceContext* ctx, Authenticator* auth, Forwarder* forwarder, LocationService* loc_service, CoordinateSegmentation* cseg, Proximity* prox, ObjectSegmentation* oseg, Address4 oh_listen_addr, ObjectHostSessionManager* oh_sess_mgr, ObjectSessionManager* obj_sess_mgr);
00093     ~Server();
00094 
00095     virtual void receiveMessage(Message* msg);
00096 private:
00097     // Service Implementation
00098     void start();
00099     void stop();
00100 
00101     // OSegWriteListener Interface
00102     virtual void osegMigrationAcknowledged(const UUID& id);
00103     virtual void osegAddNewFinished(const UUID& id, OSegAddNewStatus status);
00104 
00105 
00106     // ODP::DelegateService dependencies
00107     ODP::DelegatePort* createDelegateODPPort(ODP::DelegateService*, const SpaceObjectReference& sor, ODP::PortID port);
00108     bool delegateODPPortSend(const ODP::Endpoint& source_ep, const ODP::Endpoint& dest_ep, MemoryReference payload);
00109 
00110     // OHDP::DelegateService dependencies
00111     OHDP::DelegatePort* createDelegateOHDPPort(OHDP::DelegateService*, const OHDP::Endpoint& ept);
00112     bool delegateOHDPPortSend(const OHDP::Endpoint& source_ep, const OHDP::Endpoint& dest_ep, MemoryReference payload);
00113 
00114     // ObjectHostConnectionManager::Listener Interface:
00115 
00116     virtual void onObjectHostConnected(const ObjectHostConnectionID& conn_id, const ShortObjectHostConnectionID short_conn_id, OHDPSST::Stream::Ptr stream);
00117     // Callback which handles messages from object hosts -- mostly just does sanity checking
00118     // before using the forwarder to do routing.  Operates in the
00119     // network strand to allow for fast forwarding, see
00120     // handleObjectHostMessageRouting for continuation in main strand
00121     virtual bool onObjectHostMessageReceived(const ObjectHostConnectionID& conn_id, const ShortObjectHostConnectionID short_conn_id, Sirikata::Protocol::Object::ObjectMessage*);
00122     // Disconnection events, forwarded to
00123     // handleObjectHostConnectionClosed in main strand
00124     virtual void onObjectHostDisconnected(const ObjectHostConnectionID& conn_id, const ShortObjectHostConnectionID short_conn_id);
00125 
00126 
00127     // Handle a migration event generated by the MigrationMonitor
00128     void handleMigrationEvent(const UUID& objid);
00129 
00130     // Starts the process of trying to send migration messages, or continues one if it's already running.
00131     void startSendMigrationMessages();
00132 
00133     // Try to send outstanding migration messages.  This chains automatically until the queue is emptied.
00134     void trySendMigrationMessages();
00135 
00136 
00137     // Send a session message directly to the object via the OH connection manager, bypassing any restrictions on
00138     // the current state of the connection.  Keeps retrying until the message gets through.
00139     void sendSessionMessageWithRetry(const ObjectHostConnectionID& conn, Sirikata::Protocol::Object::ObjectMessage* msg, const Duration& retry_rate);
00140 
00141 
00142     // Checks if an object is connected to this server
00143     bool isObjectConnected(const UUID& object_id) const;
00144     // Checks if an object is current connecting to this server (post authentication)
00145     bool isObjectConnecting(const UUID& object_id) const;
00146 
00147 
00148     // Handle an object host closing its connection
00149     void handleObjectHostConnectionClosed(const ObjectHostConnectionID& conn_id);
00150     // Schedule main thread to handle oh message routing
00151     void scheduleObjectHostMessageRouting();
00152     void handleObjectHostMessageRouting();
00153     // Perform forwarding for a message on the front of mRouteObjectMessage from the object host which
00154     // couldn't be forwarded directly by the networking code
00155     // (i.e. needs routing to another node)
00156     bool handleSingleObjectHostMessageRouting();
00157 
00158     // Handle Session messages from an object
00159     void handleSessionMessage(const ObjectHostConnectionID& oh_conn_id, Sirikata::Protocol::Object::ObjectMessage* msg);
00160     // Handle Connect message from object
00161     void handleConnect(const ObjectHostConnectionID& oh_conn_id, const Sirikata::Protocol::Object::ObjectMessage& container, const Sirikata::Protocol::Session::Connect& connect_msg, uint64 seqno);
00162     void handleConnectAuthResponse(const ObjectHostConnectionID& oh_conn_id, const UUID& obj_id, const Sirikata::Protocol::Session::Connect& connect_msg, uint64 seqno, bool authenticated);
00163 
00164     void sendConnectSuccess(const ObjectHostConnectionID& oh_conn_id, const UUID& obj_id, uint64 session_request_seqno);
00165     void sendConnectError(const ObjectHostConnectionID& oh_conn_id, const UUID& obj_id, uint64 session_request_seqno);
00166 
00167     // Handle connection ack message from object
00168     void handleConnectAck(const ObjectHostConnectionID& oh_conn_id, const Sirikata::Protocol::Object::ObjectMessage& container, uint64 session_request_seqno);
00169 
00170     // Handle Migrate message from object
00171     void handleMigrate(const ObjectHostConnectionID& oh_conn_id, const Sirikata::Protocol::Object::ObjectMessage& container, const Sirikata::Protocol::Session::Connect& migrate_msg, uint64 seqno);
00172 
00173     // Performs actual migration after all the necessary information is available.
00174     void handleMigration(const UUID& obj_id);
00175 
00176     // Handle a disconnection.
00177     void handleDisconnect(UUID obj_id, ObjectConnection* conn, uint64 session_request_seqno);
00178 
00179     //finally deletes any object connections to obj_id
00180     void killObjectConnection(const UUID& obj_id);
00181 
00182     void finishAddObject(const UUID& obj_id, OSegAddNewStatus status);
00183 
00184     bool checkAlreadyMigrating(const UUID& obj_id);
00185     void processAlreadyMigrating(const UUID& obj_id);
00186 
00187     void newStream(int err, SST::Stream<SpaceObjectReference>::Ptr s);
00188 
00189 
00190     // Commander commands
00191     void commandObjectsCount(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00192     void commandObjectsList(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00193     void commandObjectsDisconnect(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00194 
00195     SpaceContext* mContext;
00196 
00197     TimeSyncServer* mTimeSyncServer;
00198 
00199     Authenticator* mAuthenticator;
00200     LocationService* mLocationService;
00201     CoordinateSegmentation* mCSeg;
00202     Proximity* mProximity;
00203     ObjectSegmentation* mOSeg;
00204     LocalForwarder* mLocalForwarder;
00205     Forwarder* mForwarder;
00206     MigrationMonitor* mMigrationMonitor;
00207     ObjectHostSessionManager* mOHSessionManager;
00208     ObjectSessionManager* mObjectSessionManager;
00209 
00210     Router<Message*>* mMigrateServerMessageService;
00211 
00212     bool mMigrationSendRunning; // Indicates whether an event chain for sending outstanding migration messages is running.
00213                                 // Note that ideally this could be replaced by just using our own internal queue
00214                                 // to hold messages since migration messsages aren't going to get stale
00215 
00216     bool mShutdownRequested;
00217 
00218     ObjectHostConnectionManager* mObjectHostConnectionManager;
00219 
00220     typedef std::tr1::unordered_map<UUID, ObjectConnection*, UUID::Hasher> ObjectConnectionMap;
00221 
00222     ObjectConnectionMap mObjects; // NOTE: only Forwarder and LocalForwarder
00223                                   // should actually use the connection, this is
00224                                   // only still a map to handle migrations
00225                                   // properly
00226     // Information to be able to respond to a migration request *from
00227     // the object*.
00228     typedef ObjectConnectionMap MigrationRequestMap;
00229     MigrationRequestMap mObjectsAwaitingMigration;
00230 
00231 
00232     typedef std::tr1::unordered_map<UUID, Sirikata::Protocol::Migration::MigrationMessage*, UUID::Hasher> ObjectMigrationMap;
00233     ObjectMigrationMap mObjectMigrations;
00234 
00235     //std::map<UUID,ObjectConnection*>
00236     struct MigratingObjectConnectionsData
00237     {
00238       ObjectConnection* obj_conner;
00239       int milliseconds;
00240       ServerID migratingTo;
00241       TimedMotionVector3f loc;
00242         AggregateBoundingInfo bnds;
00243       uint64 uniqueConnId;
00244       bool serviceConnection;
00245     };
00246 
00247       typedef std::queue<Message*> MigrateMessageQueue;
00248       // Outstanding MigrateMessages, which get objects to other servers.
00249       MigrateMessageQueue mMigrateMessages;
00250 
00251     //    ObjectConnectionMap mMigratingConnections;//bftm add
00252     typedef std::map<UUID,MigratingObjectConnectionsData> MigConnectionsMap;
00253     MigConnectionsMap mMigratingConnections;//bftm add
00254     Timer mMigrationTimer;
00255 
00256     struct StoredConnection
00257     {
00258       ObjectHostConnectionID    conn_id;
00259       Sirikata::Protocol::Session::Connect             conn_msg;
00260         // Sequence number from session request so we can uniquely
00261         // identify the request
00262         uint64 session_seqno;
00263     };
00264 
00265     typedef std::map<UUID, StoredConnection> StoredConnectionMap;
00266     StoredConnectionMap  mStoredConnectionData;
00267     struct ConnectionIDObjectMessagePair{
00268         ObjectHostConnectionID conn_id;
00269         Sirikata::Protocol::Object::ObjectMessage* obj_msg;
00270         ConnectionIDObjectMessagePair(ObjectHostConnectionID conn_id, Sirikata::Protocol::Object::ObjectMessage*msg) {
00271             this->conn_id=conn_id;
00272             this->obj_msg=msg;
00273         }
00274         size_t size() const{
00275             return 1;
00276         }
00277     };
00278 
00279     // FIXME Another place where needing a size queue and notifications causes
00280     // double locking...
00281     boost::mutex mRouteObjectMessageMutex;
00282     Sirikata::SizedThreadSafeQueue<ConnectionIDObjectMessagePair>mRouteObjectMessage;
00283 
00284     // TimeSeries identifiers. Must include the ServerID for uniqueness, so we
00285     // cache them so TimeSeries reports are fast
00286     String mTimeSeriesObjects;
00287 
00288 }; // class Server
00289 
00290 } // namespace Sirikata
00291 
00292 #endif //_SIRIKATA_SERVER_HPP_