Sirikata
|
00001 /* Sirikata 00002 * Server.hpp 00003 * 00004 * Copyright (c) 2010, Daniel Reiter Horn 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 00034 #ifndef _SIRIKATA_SERVER_HPP_ 00035 #define _SIRIKATA_SERVER_HPP_ 00036 00037 #include <sirikata/core/util/Platform.hpp> 00038 #include <sirikata/space/SpaceContext.hpp> 00039 00040 #include <sirikata/space/ObjectHostConnectionManager.hpp> 00041 #include <sirikata/core/service/Service.hpp> 00042 #include <sirikata/core/queue/SizedThreadSafeQueue.hpp> 00043 00044 #include <sirikata/core/util/MotionVector.hpp> 00045 #include <sirikata/core/util/AggregateBoundingInfo.hpp> 00046 00047 #include "Protocol_Session.pbj.hpp" 00048 #include "Protocol_Migration.pbj.hpp" 00049 00050 #include <sirikata/space/ObjectSegmentation.hpp> 00051 00052 #include <sirikata/core/odp/DelegateService.hpp> 00053 #include <sirikata/core/ohdp/DelegateService.hpp> 00054 00055 #include <sirikata/core/sync/TimeSyncServer.hpp> 00056 00057 #include <sirikata/core/command/Commander.hpp> 00058 00059 namespace Sirikata 00060 { 00061 class Authenticator; 00062 00063 class Forwarder; 00064 class LocalForwarder; 00065 00066 class LocationService; 00067 class Proximity; 00068 class MigrationMonitor; 00069 00070 class CoordinateSegmentation; 00071 class ObjectSegmentation; 00072 00073 class ObjectConnection; 00074 class ObjectHostConnectionManager; 00075 00076 class ObjectHostSessionManager; 00077 class ObjectSessionManager; 00078 00085 class Server : 00086 public MessageRecipient, public Service, 00087 public OSegWriteListener, 00088 public ODP::DelegateService, public OHDP::DelegateService, 00089 ObjectHostConnectionManager::Listener 00090 { 00091 public: 00092 Server(SpaceContext* ctx, Authenticator* auth, Forwarder* forwarder, LocationService* loc_service, CoordinateSegmentation* cseg, Proximity* prox, ObjectSegmentation* oseg, Address4 oh_listen_addr, ObjectHostSessionManager* oh_sess_mgr, ObjectSessionManager* obj_sess_mgr); 00093 ~Server(); 00094 00095 virtual void receiveMessage(Message* msg); 00096 private: 00097 // Service Implementation 00098 void start(); 00099 void stop(); 00100 00101 // OSegWriteListener Interface 00102 virtual void osegMigrationAcknowledged(const UUID& id); 00103 virtual void osegAddNewFinished(const UUID& id, OSegAddNewStatus status); 00104 00105 00106 // ODP::DelegateService dependencies 00107 ODP::DelegatePort* createDelegateODPPort(ODP::DelegateService*, const SpaceObjectReference& sor, ODP::PortID port); 00108 bool delegateODPPortSend(const ODP::Endpoint& source_ep, const ODP::Endpoint& dest_ep, MemoryReference payload); 00109 00110 // OHDP::DelegateService dependencies 00111 OHDP::DelegatePort* createDelegateOHDPPort(OHDP::DelegateService*, const OHDP::Endpoint& ept); 00112 bool delegateOHDPPortSend(const OHDP::Endpoint& source_ep, const OHDP::Endpoint& dest_ep, MemoryReference payload); 00113 00114 // ObjectHostConnectionManager::Listener Interface: 00115 00116 virtual void onObjectHostConnected(const ObjectHostConnectionID& conn_id, const ShortObjectHostConnectionID short_conn_id, OHDPSST::Stream::Ptr stream); 00117 // Callback which handles messages from object hosts -- mostly just does sanity checking 00118 // before using the forwarder to do routing. Operates in the 00119 // network strand to allow for fast forwarding, see 00120 // handleObjectHostMessageRouting for continuation in main strand 00121 virtual bool onObjectHostMessageReceived(const ObjectHostConnectionID& conn_id, const ShortObjectHostConnectionID short_conn_id, Sirikata::Protocol::Object::ObjectMessage*); 00122 // Disconnection events, forwarded to 00123 // handleObjectHostConnectionClosed in main strand 00124 virtual void onObjectHostDisconnected(const ObjectHostConnectionID& conn_id, const ShortObjectHostConnectionID short_conn_id); 00125 00126 00127 // Handle a migration event generated by the MigrationMonitor 00128 void handleMigrationEvent(const UUID& objid); 00129 00130 // Starts the process of trying to send migration messages, or continues one if it's already running. 00131 void startSendMigrationMessages(); 00132 00133 // Try to send outstanding migration messages. This chains automatically until the queue is emptied. 00134 void trySendMigrationMessages(); 00135 00136 00137 // Send a session message directly to the object via the OH connection manager, bypassing any restrictions on 00138 // the current state of the connection. Keeps retrying until the message gets through. 00139 void sendSessionMessageWithRetry(const ObjectHostConnectionID& conn, Sirikata::Protocol::Object::ObjectMessage* msg, const Duration& retry_rate); 00140 00141 00142 // Checks if an object is connected to this server 00143 bool isObjectConnected(const UUID& object_id) const; 00144 // Checks if an object is current connecting to this server (post authentication) 00145 bool isObjectConnecting(const UUID& object_id) const; 00146 00147 00148 // Handle an object host closing its connection 00149 void handleObjectHostConnectionClosed(const ObjectHostConnectionID& conn_id); 00150 // Schedule main thread to handle oh message routing 00151 void scheduleObjectHostMessageRouting(); 00152 void handleObjectHostMessageRouting(); 00153 // Perform forwarding for a message on the front of mRouteObjectMessage from the object host which 00154 // couldn't be forwarded directly by the networking code 00155 // (i.e. needs routing to another node) 00156 bool handleSingleObjectHostMessageRouting(); 00157 00158 // Handle Session messages from an object 00159 void handleSessionMessage(const ObjectHostConnectionID& oh_conn_id, Sirikata::Protocol::Object::ObjectMessage* msg); 00160 // Handle Connect message from object 00161 void handleConnect(const ObjectHostConnectionID& oh_conn_id, const Sirikata::Protocol::Object::ObjectMessage& container, const Sirikata::Protocol::Session::Connect& connect_msg, uint64 seqno); 00162 void handleConnectAuthResponse(const ObjectHostConnectionID& oh_conn_id, const UUID& obj_id, const Sirikata::Protocol::Session::Connect& connect_msg, uint64 seqno, bool authenticated); 00163 00164 void sendConnectSuccess(const ObjectHostConnectionID& oh_conn_id, const UUID& obj_id, uint64 session_request_seqno); 00165 void sendConnectError(const ObjectHostConnectionID& oh_conn_id, const UUID& obj_id, uint64 session_request_seqno); 00166 00167 // Handle connection ack message from object 00168 void handleConnectAck(const ObjectHostConnectionID& oh_conn_id, const Sirikata::Protocol::Object::ObjectMessage& container, uint64 session_request_seqno); 00169 00170 // Handle Migrate message from object 00171 void handleMigrate(const ObjectHostConnectionID& oh_conn_id, const Sirikata::Protocol::Object::ObjectMessage& container, const Sirikata::Protocol::Session::Connect& migrate_msg, uint64 seqno); 00172 00173 // Performs actual migration after all the necessary information is available. 00174 void handleMigration(const UUID& obj_id); 00175 00176 // Handle a disconnection. 00177 void handleDisconnect(UUID obj_id, ObjectConnection* conn, uint64 session_request_seqno); 00178 00179 //finally deletes any object connections to obj_id 00180 void killObjectConnection(const UUID& obj_id); 00181 00182 void finishAddObject(const UUID& obj_id, OSegAddNewStatus status); 00183 00184 bool checkAlreadyMigrating(const UUID& obj_id); 00185 void processAlreadyMigrating(const UUID& obj_id); 00186 00187 void newStream(int err, SST::Stream<SpaceObjectReference>::Ptr s); 00188 00189 00190 // Commander commands 00191 void commandObjectsCount(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00192 void commandObjectsList(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00193 void commandObjectsDisconnect(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00194 00195 SpaceContext* mContext; 00196 00197 TimeSyncServer* mTimeSyncServer; 00198 00199 Authenticator* mAuthenticator; 00200 LocationService* mLocationService; 00201 CoordinateSegmentation* mCSeg; 00202 Proximity* mProximity; 00203 ObjectSegmentation* mOSeg; 00204 LocalForwarder* mLocalForwarder; 00205 Forwarder* mForwarder; 00206 MigrationMonitor* mMigrationMonitor; 00207 ObjectHostSessionManager* mOHSessionManager; 00208 ObjectSessionManager* mObjectSessionManager; 00209 00210 Router<Message*>* mMigrateServerMessageService; 00211 00212 bool mMigrationSendRunning; // Indicates whether an event chain for sending outstanding migration messages is running. 00213 // Note that ideally this could be replaced by just using our own internal queue 00214 // to hold messages since migration messsages aren't going to get stale 00215 00216 bool mShutdownRequested; 00217 00218 ObjectHostConnectionManager* mObjectHostConnectionManager; 00219 00220 typedef std::tr1::unordered_map<UUID, ObjectConnection*, UUID::Hasher> ObjectConnectionMap; 00221 00222 ObjectConnectionMap mObjects; // NOTE: only Forwarder and LocalForwarder 00223 // should actually use the connection, this is 00224 // only still a map to handle migrations 00225 // properly 00226 // Information to be able to respond to a migration request *from 00227 // the object*. 00228 typedef ObjectConnectionMap MigrationRequestMap; 00229 MigrationRequestMap mObjectsAwaitingMigration; 00230 00231 00232 typedef std::tr1::unordered_map<UUID, Sirikata::Protocol::Migration::MigrationMessage*, UUID::Hasher> ObjectMigrationMap; 00233 ObjectMigrationMap mObjectMigrations; 00234 00235 //std::map<UUID,ObjectConnection*> 00236 struct MigratingObjectConnectionsData 00237 { 00238 ObjectConnection* obj_conner; 00239 int milliseconds; 00240 ServerID migratingTo; 00241 TimedMotionVector3f loc; 00242 AggregateBoundingInfo bnds; 00243 uint64 uniqueConnId; 00244 bool serviceConnection; 00245 }; 00246 00247 typedef std::queue<Message*> MigrateMessageQueue; 00248 // Outstanding MigrateMessages, which get objects to other servers. 00249 MigrateMessageQueue mMigrateMessages; 00250 00251 // ObjectConnectionMap mMigratingConnections;//bftm add 00252 typedef std::map<UUID,MigratingObjectConnectionsData> MigConnectionsMap; 00253 MigConnectionsMap mMigratingConnections;//bftm add 00254 Timer mMigrationTimer; 00255 00256 struct StoredConnection 00257 { 00258 ObjectHostConnectionID conn_id; 00259 Sirikata::Protocol::Session::Connect conn_msg; 00260 // Sequence number from session request so we can uniquely 00261 // identify the request 00262 uint64 session_seqno; 00263 }; 00264 00265 typedef std::map<UUID, StoredConnection> StoredConnectionMap; 00266 StoredConnectionMap mStoredConnectionData; 00267 struct ConnectionIDObjectMessagePair{ 00268 ObjectHostConnectionID conn_id; 00269 Sirikata::Protocol::Object::ObjectMessage* obj_msg; 00270 ConnectionIDObjectMessagePair(ObjectHostConnectionID conn_id, Sirikata::Protocol::Object::ObjectMessage*msg) { 00271 this->conn_id=conn_id; 00272 this->obj_msg=msg; 00273 } 00274 size_t size() const{ 00275 return 1; 00276 } 00277 }; 00278 00279 // FIXME Another place where needing a size queue and notifications causes 00280 // double locking... 00281 boost::mutex mRouteObjectMessageMutex; 00282 Sirikata::SizedThreadSafeQueue<ConnectionIDObjectMessagePair>mRouteObjectMessage; 00283 00284 // TimeSeries identifiers. Must include the ServerID for uniqueness, so we 00285 // cache them so TimeSeries reports are fast 00286 String mTimeSeriesObjects; 00287 00288 }; // class Server 00289 00290 } // namespace Sirikata 00291 00292 #endif //_SIRIKATA_SERVER_HPP_