Sirikata
|
00001 /* Sirikata 00002 * SessionManager.hpp 00003 * 00004 * Copyright (c) 2010, Ewen Cheslack-Postava 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef _SIRIKATA_LIBOH_SESSION_MANAGER_HPP_ 00034 #define _SIRIKATA_LIBOH_SESSION_MANAGER_HPP_ 00035 00036 #include <sirikata/oh/ObjectHostContext.hpp> 00037 #include <sirikata/core/service/Service.hpp> 00038 #include <sirikata/oh/SpaceNodeConnection.hpp> 00039 #include <sirikata/core/odp/SST.hpp> 00040 #include <sirikata/core/util/MotionVector.hpp> 00041 #include <sirikata/core/util/MotionQuaternion.hpp> 00042 #include <sirikata/core/util/SpaceObjectReference.hpp> 00043 #include <sirikata/core/util/Platform.hpp> 00044 #include <sirikata/core/ohdp/DelegateService.hpp> 00045 #include <sirikata/core/sync/TimeSyncClient.hpp> 00046 #include <sirikata/core/network/Address4.hpp> 00047 00048 #include <sirikata/oh/DisconnectCodes.hpp> 00049 #include <sirikata/oh/SpaceNodeSession.hpp> 00050 00051 namespace Sirikata { 00052 00053 class ServerIDMap; 00054 00055 namespace Protocol { 00056 namespace Session { 00057 class Container; 00058 } 00059 } 00060 00070 class SIRIKATA_OH_EXPORT SessionManager 00071 : public PollingService, 00072 public OHDP::DelegateService, 00073 public SpaceNodeSessionManager 00074 { 00075 public: 00076 00077 struct ConnectionInfo { 00078 ServerID server; 00079 TimedMotionVector3f loc; 00080 TimedMotionQuaternion orient; 00081 BoundingSphere3f bounds; 00082 String mesh; 00083 String physics; 00084 String query; 00085 }; 00086 00087 enum ConnectionEvent { 00088 Connected, 00089 Migrated, 00090 Disconnected 00091 }; 00092 00093 // Callback indicating that a connection to the server was made 00094 // and it is available for sessions 00095 typedef std::tr1::function<void(const SpaceID&, const ObjectReference&, const ConnectionInfo&)> ConnectedCallback; 00096 // Callback indicating that a connection is being migrated to a new server. This occurs as soon 00097 // as the object host starts the transition and no additional notification is given since, for all 00098 // intents and purposes this is the point at which the transition happens 00099 typedef std::tr1::function<void(const SpaceID&, const ObjectReference&, ServerID)> MigratedCallback; 00100 typedef std::tr1::function<void(const SpaceObjectReference&, ConnectionEvent after)> StreamCreatedCallback; 00101 typedef std::tr1::function<void(const SpaceObjectReference&, Disconnect::Code)> DisconnectedCallback; 00102 00103 typedef std::tr1::function<void(const Sirikata::Protocol::Object::ObjectMessage&)> ObjectMessageCallback; 00104 00105 // Notifies the ObjectHost class of a new object connection: void(object, connectedTo) 00106 typedef std::tr1::function<void(const SpaceObjectReference&,ServerID)> ObjectConnectedCallback; 00107 // Notifies the ObjectHost class of a migrated object: 00108 // void(object, migratedFrom, migratedTo) 00109 typedef std::tr1::function<void(const SpaceObjectReference&,ServerID,ServerID)> ObjectMigratedCallback; 00110 // Returns a message to the object host for handling. 00111 typedef std::tr1::function<void(const SpaceObjectReference&, Sirikata::Protocol::Object::ObjectMessage*)> ObjectMessageHandlerCallback; 00112 // Notifies the ObjectHost of object connection that was closed, including a 00113 // reason. 00114 typedef std::tr1::function<void(const SpaceObjectReference&, Disconnect::Code)> ObjectDisconnectedCallback; 00115 00116 // SST stream related typedefs 00117 typedef SST::Stream<SpaceObjectReference> SSTStream; 00118 typedef SSTStream::Ptr SSTStreamPtr; 00119 typedef SSTStream::EndpointType SSTEndpoint; 00120 typedef OHDPSST::Stream OHSSTStream; 00121 typedef OHSSTStream::Ptr OHSSTStreamPtr; 00122 typedef OHDPSST::Endpoint OHSSTEndpoint; 00123 00124 00125 SessionManager(ObjectHostContext* ctx, const SpaceID& space, ServerIDMap* sidmap, ObjectConnectedCallback, ObjectMigratedCallback, ObjectMessageHandlerCallback, ObjectDisconnectedCallback); 00126 ~SessionManager(); 00127 00128 // NOTE: The public interface is only safe to access from the main strand. 00129 00133 bool connect( 00134 const SpaceObjectReference& sporef_objid, 00135 const TimedMotionVector3f& init_loc, 00136 const TimedMotionQuaternion& init_orient, 00137 const BoundingSphere3f& init_bounds, 00138 const String& init_mesh, const String& init_phy, 00139 const String& init_query, const String& init_zernike, 00140 ConnectedCallback connect_cb, MigratedCallback migrate_cb, 00141 StreamCreatedCallback stream_cb, DisconnectedCallback disconnected_cb 00142 ); 00144 void disconnect(const SpaceObjectReference& id); 00145 00149 Duration serverTimeOffset() const; 00153 Duration clientTimeOffset() const; 00154 00155 // Private version of send that doesn't verify src UUID, allows us to masquerade for session purposes 00156 // The allow_connecting parameter allows you to use a connection over which the object is still opening 00157 // a connection. This is safe since it can only be used by this class (since this is private), so it will 00158 // only be used to deal with session management. 00159 // If dest_server is NullServerID, then getConnectedServer is used to determine where to send the packet. 00160 // This is used to possibly exchange data between the main and IO strands, so it acquires locks. 00161 // 00162 // The allow_connecting flag should only be used internally and allows 00163 // sending packets over a still-connecting session. This is only used to 00164 // allow this to act as an OHDP::Service while still in the connecting phase 00165 // (no callback from SpaceNodeConnection yet) so we can build OHDP::SST 00166 // streams as part of the connection process. 00167 bool send(const SpaceObjectReference& sporef_objid, const ObjectMessagePort src_port, const UUID& dest, const ObjectMessagePort dest_port, const std::string& payload, ServerID dest_server = NullServerID); 00168 00169 SSTStreamPtr getSpaceStream(const ObjectReference& objectID); 00170 00171 // Service Implementation 00172 virtual void start(); 00173 virtual void stop(); 00174 // PollingService Implementation 00175 virtual void poll(); 00176 00177 private: 00178 // Implementation Note: mIOStrand is a bit misleading. All the "real" IO is isolated to that strand -- 00179 // reads and writes to the actual sockets are handled in mIOStrand. But that is all that is handled 00180 // there. Since creating/connecting/disconnecting/destroying SpaceNodeConnections is cheap and relatively 00181 // rare, we keep these in the main strand, allowing us to leave the SpaceNodeConnection map without a lock. 00182 // Note that the SpaceNodeConnections may themselves be accessed from multiple threads. 00183 // 00184 // The data exchange between the strands happens in two places. When sending, it occurs in the connections 00185 // queue, which is thread safe. When receiving, it occurs by posting a handler for the parsed message 00186 // to the main thread. 00187 // 00188 // Note that this means the majority of this class is executed in the main strand. Only reading and writing 00189 // are separated out, which allows us to ensure the network will be serviced as fast as possible, but 00190 // doesn't help if our limiting factor is the speed at which this input/output can be handled. 00191 // 00192 // Note also that this class does *not* handle multithreaded input -- currently all access of public 00193 // methods should be performed from the main strand. 00194 00195 struct ConnectingInfo; 00196 00197 // Schedules received server messages for handling 00198 void scheduleHandleServerMessages(SpaceNodeConnection* conn); 00199 void handleServerMessages(Liveness::Token alive, SpaceNodeConnection* conn); 00200 // Starting point for handling of all messages from the server -- either handled as a special case, such as 00201 // for session management, or dispatched to the object 00202 void handleServerMessage(ObjectMessage* msg, ServerID sid); 00203 00204 // Handles session messages received from the server -- connection replies, migration requests, etc. 00205 void handleSessionMessage(Sirikata::Protocol::Object::ObjectMessage* msg, ServerID from_server); 00206 // Handlers for specific parts of session messages 00207 void handleSessionMessageConnectResponseSuccess(ServerID from_server, const SpaceObjectReference& sporef_obj, Sirikata::Protocol::Session::Container& session_msg); 00208 void handleSessionMessageConnectResponseRedirect(ServerID from_server, const SpaceObjectReference& sporef_obj, Sirikata::Protocol::Session::Container& session_msg); 00209 void handleSessionMessageConnectResponseError(ServerID from_server, const SpaceObjectReference& sporef_obj, Sirikata::Protocol::Session::Container& session_msg); 00210 void handleSessionMessageInitMigration(ServerID from_server, const SpaceObjectReference& sporef_obj, Sirikata::Protocol::Session::Container& session_msg); 00211 00212 00213 // This gets invoked when the connection really is ready -- after 00214 // successful response and we have time sync info. It does some 00215 // additional setup work (sst stream) and then invokes the real callback 00216 void handleObjectFullyConnected(const SpaceID& space, const ObjectReference& obj, ServerID server, const ConnectingInfo& ci, ConnectedCallback real_cb); 00217 // This gets invoked after full migration occurs. It does additional setup 00218 // work (new sst stream to new space server) and invokes the real callback. 00219 void handleObjectFullyMigrated(const SpaceID& space, const ObjectReference& obj, ServerID server, MigratedCallback real_cb); 00220 00221 void retryOpenConnection(const SpaceObjectReference& sporef_uuid,ServerID sid); 00222 00223 // Utility for sending the connection ack 00224 void sendConnectSuccessAck(const SpaceObjectReference& sporef, ServerID connected_to); 00225 00226 // Utility for sending the disconnection request. Session seqno is passed 00227 // explicitly instead of looking it up via the sporef so that this can be 00228 // used to request disconnections for sessions we don't have a record for 00229 // (e.g. because they were cleared out due to object cleanup and then we got 00230 // the connection success response back). 00231 void sendDisconnectMessage(const SpaceObjectReference& sporef, ServerID connected_to, uint64 session_seqno); 00232 00233 // Utility method which keeps trying to resend a message 00234 void sendRetryingMessage(const SpaceObjectReference& sporef_src, const ObjectMessagePort src_port, const UUID& dest, const ObjectMessagePort dest_port, const std::string& payload, ServerID dest_server, Network::IOStrand* strand, const Duration& rate); 00235 00238 // Get an existing space connection or initiate a new one at random 00239 // which can be used for bootstrapping connections 00240 void getAnySpaceConnection(SpaceNodeConnection::GotSpaceConnectionCallback cb); 00241 // Get the connection to the specified space node 00242 void getSpaceConnection(ServerID sid, SpaceNodeConnection::GotSpaceConnectionCallback cb); 00243 // If we have an existing connection or request, get it. Helper 00244 // for getSpaceConnection and setupRandomSpaceConnection 00245 bool getExistingSpaceConnection(ServerID sid, SpaceNodeConnection::GotSpaceConnectionCallback cb); 00246 00247 // Set up a space connection to the given server 00248 void setupSpaceConnection(ServerID server, SpaceNodeConnection::GotSpaceConnectionCallback cb); 00249 void finishSetupSpaceConnection(ServerID server, ServerID resolved_server, Address4 addr); 00250 // And a random server 00251 void setupRandomSpaceConnection(ServerID resolved_server, Address4 addr, SpaceNodeConnection::GotSpaceConnectionCallback cb); 00252 // Helper for setting up an in-progress connection 00253 void registerSpaceNodeConnection(ServerID server, SpaceNodeConnection::GotSpaceConnectionCallback cb); 00254 00255 00256 // Handle a connection event, i.e. the socket either successfully connected or failed 00257 void handleSpaceConnection(const Sirikata::Network::Stream::ConnectionStatus status, 00258 const std::string&reason, 00259 ServerID sid); 00260 // Handle a session event, i.e. the SST stream conected. 00261 void handleSpaceSession(ServerID sid, SpaceNodeConnection* conn); 00262 00263 00266 // Final callback in session initiation -- we have all the info and now just 00267 // have to return it to the object. is_retry indicates whether this is 00268 // retrying (reuse the existing request ID) or new (generate a new session 00269 // ID/seqno) 00270 void openConnectionStartSession(const SpaceObjectReference& sporef_uuid, SpaceNodeConnection* conn, bool is_retry); 00271 00272 // Timeout handler for initial session message -- checks if the connection 00273 // succeeded and, if necessary, retries. Requires a seqno so the identical 00274 // request can be identified if it was retransmitted because it took too 00275 // long to get a response but was received 00276 void checkConnectedAndRetry(const SpaceObjectReference& sporef_uuid, ServerID connTo); 00277 00278 00281 // Start the migration process for the object to the given server. 00282 void migrate(const SpaceObjectReference& sporef_obj_id, ServerID sid); 00283 00284 // Callback that indicates we have a connection to the new server and can now start the migration to it. 00285 void openConnectionStartMigration(const SpaceObjectReference& sporef_uuid, ServerID sid, SpaceNodeConnection* conn); 00286 00287 00291 // OHDP::DelegateService dependencies 00292 OHDP::DelegatePort* createDelegateOHDPPort(OHDP::DelegateService*, const OHDP::Endpoint& ept); 00293 bool delegateOHDPPortSend(const OHDP::Endpoint& source_ep, const OHDP::Endpoint& dest_ep, MemoryReference payload); 00294 00295 void timeSyncUpdated(); 00296 00297 OptionSet* mStreamOptions; 00298 00299 // THREAD SAFE 00300 // These may be accessed safely by any thread 00301 00302 ObjectHostContext* mContext; 00303 SpaceID mSpace; 00304 00305 Network::IOStrand* mIOStrand; 00306 00307 ServerIDMap* mServerIDMap; 00308 00309 TimeProfiler::Stage* mHandleReadProfiler; 00310 TimeProfiler::Stage* mHandleMessageProfiler; 00311 00312 Sirikata::SerializationCheck mSerialization; 00313 00314 // Callbacks for parent ObjectHost 00315 ObjectConnectedCallback mObjectConnectedCallback; 00316 ObjectMigratedCallback mObjectMigratedCallback; 00317 ObjectMessageHandlerCallback mObjectMessageHandlerCallback; 00318 ObjectDisconnectedCallback mObjectDisconnectedCallback; 00319 00320 // Only main strand accesses and manipulates the map, although other strand 00321 // may access the SpaceNodeConnection*'s. 00322 typedef std::tr1::unordered_map<ServerID, SpaceNodeConnection*> ServerConnectionMap; 00323 ServerConnectionMap mConnections; 00324 ServerConnectionMap mConnectingConnections; 00325 00326 // Info associated with opening connections 00327 struct ConnectingInfo { 00328 TimedMotionVector3f loc; 00329 TimedMotionQuaternion orient; 00330 BoundingSphere3f bounds; 00331 String mesh; 00332 String physics; 00333 String query; 00334 String zernike; 00335 }; 00336 typedef std::tr1::function<void(const SpaceID&, const ObjectReference&, ServerID, const ConnectingInfo& ci)> InternalConnectedCallback; 00337 00338 // Objects connections, maintains object connections and mapping 00339 class ObjectConnections { 00340 public: 00341 ObjectConnections(SessionManager* _parent); 00342 00343 // Add the object, completely disconnected, to the index 00344 void add( 00345 const SpaceObjectReference& sporef_objid, ConnectingInfo ci, 00346 InternalConnectedCallback connect_cb, MigratedCallback migrate_cb, 00347 StreamCreatedCallback stream_created_cb, DisconnectedCallback disconnected_cb 00348 ); 00349 00350 bool exists(const SpaceObjectReference& sporef_objid); 00351 00352 void clearSeqno(const SpaceObjectReference& sporef_objid); 00353 // Get the seqno for an outstanding request 00354 uint64 getSeqno(const SpaceObjectReference& sporef_objid); 00355 // Get a new seqno, and store it 00356 uint64 updateSeqno(const SpaceObjectReference& sporef_objid); 00357 // Indicates if the seqno we have right now is non-zero, for rare 00358 // situations where seqno is reset but other info isn't yet, so it's the 00359 // only way to tell if a request failed. 00360 bool validSeqno(const SpaceObjectReference& sporef_objid); 00361 00362 // Mark the object as connecting to the given server 00363 ConnectingInfo& connectingTo(const SpaceObjectReference& obj, ServerID connecting_to); 00364 00365 // Start a migration to a new server, return the MigratedCallback for the object 00366 void startMigration(const SpaceObjectReference& objid, ServerID migrating_to); 00367 00368 WARN_UNUSED 00369 InternalConnectedCallback& getConnectCallback(const SpaceObjectReference& sporef_objid); 00370 00371 // Marks as connected and returns the server connected to. do_cb 00372 // specifies whether the callback should be invoked or deferred 00373 ServerID handleConnectSuccess(const SpaceObjectReference& sporef_obj, const TimedMotionVector3f& loc, const TimedMotionQuaternion& orient, const BoundingSphere3f& bnds, const String& mesh, const String& phy, bool do_cb); 00374 00375 void handleConnectError(const SpaceObjectReference& sporef_objid); 00376 00377 void handleConnectStream(const SpaceObjectReference& sporef_objid, ConnectionEvent after, bool do_cb); 00378 00379 void remove(const SpaceObjectReference& obj); 00380 00381 // Handle a disconnection triggered by the loss of the underlying 00382 // network connection, i.e. because the TCPSST connection was lost 00383 // rather than the space server closing an individual session. 00384 void handleUnderlyingDisconnect(ServerID sid, const String& reason); 00385 00386 // Handle a graceful disconnection, notifying other objects 00387 void gracefulDisconnect(const SpaceObjectReference& sporef); 00388 00389 void disconnectWithCode(const SpaceObjectReference& sporef, const SpaceObjectReference& connectedAs, Disconnect::Code code); 00390 // Lookup the server the object is connected to. With allow_connecting, allows using 00391 // the server currently being connected to, not just one where a session has been 00392 // established 00393 ServerID getConnectedServer(const SpaceObjectReference& sporef_obj_id, bool allow_connecting = false); 00394 00395 ServerID getConnectingToServer(const SpaceObjectReference& sporef_obj_id); 00396 00397 ServerID getMigratingToServer(const SpaceObjectReference& sporef_obj_id); 00398 00399 //UUID getInternalID(const ObjectReference& space_objid) const; 00400 00401 // We have to defer some callbacks sometimes for time 00402 // synchronization. This invokes them, allowing the connection process 00403 // to continue. 00404 void invokeDeferredCallbacks(); 00405 00406 private: 00407 SessionManager* parent; 00408 00409 // Source for seqnos for Session messages 00410 uint64 mSeqnoSource; 00411 00412 struct ObjectInfo { 00413 ObjectInfo(); 00414 00415 ConnectingInfo connectingInfo; 00416 00417 // This is essentially a unique session ID for this 00418 // object. Note that this shouldn't change as long as the 00419 // same HostedObject is requesting a session *with the 00420 // same space server*. So, it will remain the same for all 00421 // session requests while a HostedObject remains alive and 00422 // no migrations occur. This allows us to disambiguate 00423 // replies from the space server which may come from 00424 // different requests while the ObjectHost remains 00425 // connected to the space server. 00426 uint64 seqno; 00427 00428 // Server currently being connected to 00429 ServerID connectingTo; 00430 // Server currently connected to 00431 ServerID connectedTo; 00432 // Server we're trying to migrate to 00433 ServerID migratingTo; 00434 00435 SpaceObjectReference connectedAs; 00436 00437 InternalConnectedCallback connectedCB; 00438 MigratedCallback migratedCB; 00439 StreamCreatedCallback streamCreatedCB; 00440 DisconnectedCallback disconnectedCB; 00441 }; 00442 typedef std::tr1::unordered_map<ServerID, std::vector<SpaceObjectReference> > ObjectServerMap; 00443 ObjectServerMap mObjectServerMap; 00444 typedef std::tr1::unordered_map<SpaceObjectReference, ObjectInfo, SpaceObjectReference::Hasher> ObjectInfoMap; 00445 ObjectInfoMap mObjectInfo; 00446 00447 // A reverse index allows us to lookup an objects internal ID 00448 //typedef std::tr1::unordered_map<SpaceObjectReference, UUID, ObjectReference::Hasher> InternalIDMap; 00449 //InternalIDMap mInternalIDs; 00450 00451 typedef std::tr1::function<void()> DeferredCallback; 00452 typedef std::vector<DeferredCallback> DeferredCallbackList; 00453 DeferredCallbackList mDeferredCallbacks; 00454 }; 00455 ObjectConnections mObjectConnections; 00456 friend class ObjectConnections; 00457 00458 TimeSyncClient* mTimeSyncClient; 00459 00460 bool mShuttingDown; 00461 00462 void spaceConnectCallback(int err, SSTStreamPtr s, SpaceObjectReference obj, ConnectionEvent after); 00463 std::map<ObjectReference, SSTStreamPtr> mObjectToSpaceStreams; 00464 00465 #ifdef PROFILE_OH_PACKET_RTT 00466 // Track outstanding packets for computing RTTs 00467 typedef std::tr1::unordered_map<uint64, Time> OutstandingPacketMap; 00468 OutstandingPacketMap mOutstandingPackets; 00469 uint8 mClearOutstandingCount; 00470 // And stats 00471 Duration mLatencySum; 00472 uint32 mLatencyCount; 00473 // And some helpers for reporting 00474 const String mTimeSeriesOHRTT; 00475 #endif 00476 }; // class SessionManager 00477 00478 } // namespace Sirikata 00479 00480 00481 #endif //_SIRIKATA_LIBOH_SESSION_MANAGER_HPP_