Sirikata
liboh/include/sirikata/oh/SessionManager.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  SessionManager.hpp
00003  *
00004  *  Copyright (c) 2010, Ewen Cheslack-Postava
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef _SIRIKATA_LIBOH_SESSION_MANAGER_HPP_
00034 #define _SIRIKATA_LIBOH_SESSION_MANAGER_HPP_
00035 
00036 #include <sirikata/oh/ObjectHostContext.hpp>
00037 #include <sirikata/core/service/Service.hpp>
00038 #include <sirikata/oh/SpaceNodeConnection.hpp>
00039 #include <sirikata/core/odp/SST.hpp>
00040 #include <sirikata/core/util/MotionVector.hpp>
00041 #include <sirikata/core/util/MotionQuaternion.hpp>
00042 #include <sirikata/core/util/SpaceObjectReference.hpp>
00043 #include <sirikata/core/util/Platform.hpp>
00044 #include <sirikata/core/ohdp/DelegateService.hpp>
00045 #include <sirikata/core/sync/TimeSyncClient.hpp>
00046 #include <sirikata/core/network/Address4.hpp>
00047 
00048 #include <sirikata/oh/DisconnectCodes.hpp>
00049 #include <sirikata/oh/SpaceNodeSession.hpp>
00050 
00051 namespace Sirikata {
00052 
00053 class ServerIDMap;
00054 
00055 namespace Protocol {
00056 namespace Session {
00057 class Container;
00058 }
00059 }
00060 
00070 class SIRIKATA_OH_EXPORT SessionManager
00071     : public PollingService,
00072       public OHDP::DelegateService,
00073       public SpaceNodeSessionManager
00074 {
00075   public:
00076 
00077     struct ConnectionInfo {
00078         ServerID server;
00079         TimedMotionVector3f loc;
00080         TimedMotionQuaternion orient;
00081         BoundingSphere3f bounds;
00082         String mesh;
00083         String physics;
00084         String query;
00085     };
00086 
00087     enum ConnectionEvent {
00088         Connected,
00089         Migrated,
00090         Disconnected
00091     };
00092 
00093     // Callback indicating that a connection to the server was made
00094     // and it is available for sessions
00095     typedef std::tr1::function<void(const SpaceID&, const ObjectReference&, const ConnectionInfo&)> ConnectedCallback;
00096     // Callback indicating that a connection is being migrated to a new server.  This occurs as soon
00097     // as the object host starts the transition and no additional notification is given since, for all
00098     // intents and purposes this is the point at which the transition happens
00099     typedef std::tr1::function<void(const SpaceID&, const ObjectReference&, ServerID)> MigratedCallback;
00100     typedef std::tr1::function<void(const SpaceObjectReference&, ConnectionEvent after)> StreamCreatedCallback;
00101     typedef std::tr1::function<void(const SpaceObjectReference&, Disconnect::Code)> DisconnectedCallback;
00102 
00103     typedef std::tr1::function<void(const Sirikata::Protocol::Object::ObjectMessage&)> ObjectMessageCallback;
00104 
00105     // Notifies the ObjectHost class of a new object connection: void(object, connectedTo)
00106     typedef std::tr1::function<void(const SpaceObjectReference&,ServerID)> ObjectConnectedCallback;
00107     // Notifies the ObjectHost class of a migrated object:
00108     // void(object, migratedFrom, migratedTo)
00109     typedef std::tr1::function<void(const SpaceObjectReference&,ServerID,ServerID)> ObjectMigratedCallback;
00110     // Returns a message to the object host for handling.
00111     typedef std::tr1::function<void(const SpaceObjectReference&, Sirikata::Protocol::Object::ObjectMessage*)> ObjectMessageHandlerCallback;
00112     // Notifies the ObjectHost of object connection that was closed, including a
00113     // reason.
00114     typedef std::tr1::function<void(const SpaceObjectReference&, Disconnect::Code)> ObjectDisconnectedCallback;
00115 
00116     // SST stream related typedefs
00117     typedef SST::Stream<SpaceObjectReference> SSTStream;
00118     typedef SSTStream::Ptr SSTStreamPtr;
00119     typedef SSTStream::EndpointType SSTEndpoint;
00120     typedef OHDPSST::Stream OHSSTStream;
00121     typedef OHSSTStream::Ptr OHSSTStreamPtr;
00122     typedef OHDPSST::Endpoint OHSSTEndpoint;
00123 
00124 
00125     SessionManager(ObjectHostContext* ctx, const SpaceID& space, ServerIDMap* sidmap, ObjectConnectedCallback, ObjectMigratedCallback, ObjectMessageHandlerCallback, ObjectDisconnectedCallback);
00126     ~SessionManager();
00127 
00128     // NOTE: The public interface is only safe to access from the main strand.
00129 
00133     bool connect(
00134         const SpaceObjectReference& sporef_objid,
00135         const TimedMotionVector3f& init_loc,
00136         const TimedMotionQuaternion& init_orient,
00137         const BoundingSphere3f& init_bounds,
00138         const String& init_mesh, const String& init_phy,
00139         const String& init_query, const String& init_zernike,
00140         ConnectedCallback connect_cb, MigratedCallback migrate_cb,
00141         StreamCreatedCallback stream_cb, DisconnectedCallback disconnected_cb
00142     );
00144     void disconnect(const SpaceObjectReference& id);
00145 
00149     Duration serverTimeOffset() const;
00153     Duration clientTimeOffset() const;
00154 
00155     // Private version of send that doesn't verify src UUID, allows us to masquerade for session purposes
00156     // The allow_connecting parameter allows you to use a connection over which the object is still opening
00157     // a connection.  This is safe since it can only be used by this class (since this is private), so it will
00158     // only be used to deal with session management.
00159     // If dest_server is NullServerID, then getConnectedServer is used to determine where to send the packet.
00160     // This is used to possibly exchange data between the main and IO strands, so it acquires locks.
00161     //
00162     // The allow_connecting flag should only be used internally and allows
00163     // sending packets over a still-connecting session. This is only used to
00164     // allow this to act as an OHDP::Service while still in the connecting phase
00165     // (no callback from SpaceNodeConnection yet) so we can build OHDP::SST
00166     // streams as part of the connection process.
00167     bool send(const SpaceObjectReference& sporef_objid, const ObjectMessagePort src_port, const UUID& dest, const ObjectMessagePort dest_port, const std::string& payload, ServerID dest_server = NullServerID);
00168 
00169     SSTStreamPtr getSpaceStream(const ObjectReference& objectID);
00170 
00171     // Service Implementation
00172     virtual void start();
00173     virtual void stop();
00174     // PollingService Implementation
00175     virtual void poll();
00176 
00177 private:
00178     // Implementation Note: mIOStrand is a bit misleading. All the "real" IO is isolated to that strand --
00179     // reads and writes to the actual sockets are handled in mIOStrand. But that is all that is handled
00180     // there. Since creating/connecting/disconnecting/destroying SpaceNodeConnections is cheap and relatively
00181     // rare, we keep these in the main strand, allowing us to leave the SpaceNodeConnection map without a lock.
00182     // Note that the SpaceNodeConnections may themselves be accessed from multiple threads.
00183     //
00184     // The data exchange between the strands happens in two places. When sending, it occurs in the connections
00185     // queue, which is thread safe.  When receiving, it occurs by posting a handler for the parsed message
00186     // to the main thread.
00187     //
00188     // Note that this means the majority of this class is executed in the main strand. Only reading and writing
00189     // are separated out, which allows us to ensure the network will be serviced as fast as possible, but
00190     // doesn't help if our limiting factor is the speed at which this input/output can be handled.
00191     //
00192     // Note also that this class does *not* handle multithreaded input -- currently all access of public
00193     // methods should be performed from the main strand.
00194 
00195     struct ConnectingInfo;
00196 
00197     // Schedules received server messages for handling
00198     void scheduleHandleServerMessages(SpaceNodeConnection* conn);
00199     void handleServerMessages(Liveness::Token alive, SpaceNodeConnection* conn);
00200     // Starting point for handling of all messages from the server -- either handled as a special case, such as
00201     // for session management, or dispatched to the object
00202     void handleServerMessage(ObjectMessage* msg, ServerID sid);
00203 
00204     // Handles session messages received from the server -- connection replies, migration requests, etc.
00205     void handleSessionMessage(Sirikata::Protocol::Object::ObjectMessage* msg, ServerID from_server);
00206     // Handlers for specific parts of session messages
00207     void handleSessionMessageConnectResponseSuccess(ServerID from_server, const SpaceObjectReference& sporef_obj, Sirikata::Protocol::Session::Container& session_msg);
00208     void handleSessionMessageConnectResponseRedirect(ServerID from_server, const SpaceObjectReference& sporef_obj, Sirikata::Protocol::Session::Container& session_msg);
00209     void handleSessionMessageConnectResponseError(ServerID from_server, const SpaceObjectReference& sporef_obj, Sirikata::Protocol::Session::Container& session_msg);
00210     void handleSessionMessageInitMigration(ServerID from_server, const SpaceObjectReference& sporef_obj, Sirikata::Protocol::Session::Container& session_msg);
00211 
00212 
00213     // This gets invoked when the connection really is ready -- after
00214     // successful response and we have time sync info. It does some
00215     // additional setup work (sst stream) and then invokes the real callback
00216     void handleObjectFullyConnected(const SpaceID& space, const ObjectReference& obj, ServerID server, const ConnectingInfo& ci, ConnectedCallback real_cb);
00217     // This gets invoked after full migration occurs. It does additional setup
00218     // work (new sst stream to new space server) and invokes the real callback.
00219     void handleObjectFullyMigrated(const SpaceID& space, const ObjectReference& obj, ServerID server, MigratedCallback real_cb);
00220 
00221     void retryOpenConnection(const SpaceObjectReference& sporef_uuid,ServerID sid);
00222 
00223     // Utility for sending the connection ack
00224     void sendConnectSuccessAck(const SpaceObjectReference& sporef, ServerID connected_to);
00225 
00226     // Utility for sending the disconnection request. Session seqno is passed
00227     // explicitly instead of looking it up via the sporef so that this can be
00228     // used to request disconnections for sessions we don't have a record for
00229     // (e.g. because they were cleared out due to object cleanup and then we got
00230     // the connection success response back).
00231     void sendDisconnectMessage(const SpaceObjectReference& sporef, ServerID connected_to, uint64 session_seqno);
00232 
00233     // Utility method which keeps trying to resend a message
00234     void sendRetryingMessage(const SpaceObjectReference& sporef_src, const ObjectMessagePort src_port, const UUID& dest, const ObjectMessagePort dest_port, const std::string& payload, ServerID dest_server, Network::IOStrand* strand, const Duration& rate);
00235 
00238     // Get an existing space connection or initiate a new one at random
00239     // which can be used for bootstrapping connections
00240     void getAnySpaceConnection(SpaceNodeConnection::GotSpaceConnectionCallback cb);
00241     // Get the connection to the specified space node
00242     void getSpaceConnection(ServerID sid, SpaceNodeConnection::GotSpaceConnectionCallback cb);
00243     // If we have an existing connection or request, get it. Helper
00244     // for getSpaceConnection and setupRandomSpaceConnection
00245     bool getExistingSpaceConnection(ServerID sid, SpaceNodeConnection::GotSpaceConnectionCallback cb);
00246 
00247     // Set up a space connection to the given server
00248     void setupSpaceConnection(ServerID server, SpaceNodeConnection::GotSpaceConnectionCallback cb);
00249     void finishSetupSpaceConnection(ServerID server, ServerID resolved_server, Address4 addr);
00250     // And a random server
00251     void setupRandomSpaceConnection(ServerID resolved_server, Address4 addr, SpaceNodeConnection::GotSpaceConnectionCallback cb);
00252     // Helper for setting up an in-progress connection
00253     void registerSpaceNodeConnection(ServerID server, SpaceNodeConnection::GotSpaceConnectionCallback cb);
00254 
00255 
00256     // Handle a connection event, i.e. the socket either successfully connected or failed
00257     void handleSpaceConnection(const Sirikata::Network::Stream::ConnectionStatus status,
00258                                const std::string&reason,
00259                                ServerID sid);
00260     // Handle a session event, i.e. the SST stream conected.
00261     void handleSpaceSession(ServerID sid, SpaceNodeConnection* conn);
00262 
00263 
00266     // Final callback in session initiation -- we have all the info and now just
00267     // have to return it to the object. is_retry indicates whether this is
00268     // retrying (reuse the existing request ID) or new (generate a new session
00269     // ID/seqno)
00270     void openConnectionStartSession(const SpaceObjectReference& sporef_uuid, SpaceNodeConnection* conn, bool is_retry);
00271 
00272     // Timeout handler for initial session message -- checks if the connection
00273     // succeeded and, if necessary, retries. Requires a seqno so the identical
00274     // request can be identified if it was retransmitted because it took too
00275     // long to get a response but was received
00276     void checkConnectedAndRetry(const SpaceObjectReference& sporef_uuid, ServerID connTo);
00277 
00278 
00281     // Start the migration process for the object to the given server.
00282     void migrate(const SpaceObjectReference& sporef_obj_id, ServerID sid);
00283 
00284     // Callback that indicates we have a connection to the new server and can now start the migration to it.
00285     void openConnectionStartMigration(const SpaceObjectReference& sporef_uuid, ServerID sid, SpaceNodeConnection* conn);
00286 
00287 
00291     // OHDP::DelegateService dependencies
00292     OHDP::DelegatePort* createDelegateOHDPPort(OHDP::DelegateService*, const OHDP::Endpoint& ept);
00293     bool delegateOHDPPortSend(const OHDP::Endpoint& source_ep, const OHDP::Endpoint& dest_ep, MemoryReference payload);
00294 
00295     void timeSyncUpdated();
00296 
00297     OptionSet* mStreamOptions;
00298 
00299     // THREAD SAFE
00300     // These may be accessed safely by any thread
00301 
00302     ObjectHostContext* mContext;
00303     SpaceID mSpace;
00304 
00305     Network::IOStrand* mIOStrand;
00306 
00307     ServerIDMap* mServerIDMap;
00308 
00309     TimeProfiler::Stage* mHandleReadProfiler;
00310     TimeProfiler::Stage* mHandleMessageProfiler;
00311 
00312     Sirikata::SerializationCheck mSerialization;
00313 
00314     // Callbacks for parent ObjectHost
00315     ObjectConnectedCallback mObjectConnectedCallback;
00316     ObjectMigratedCallback mObjectMigratedCallback;
00317     ObjectMessageHandlerCallback mObjectMessageHandlerCallback;
00318     ObjectDisconnectedCallback mObjectDisconnectedCallback;
00319 
00320     // Only main strand accesses and manipulates the map, although other strand
00321     // may access the SpaceNodeConnection*'s.
00322     typedef std::tr1::unordered_map<ServerID, SpaceNodeConnection*> ServerConnectionMap;
00323     ServerConnectionMap mConnections;
00324     ServerConnectionMap mConnectingConnections;
00325 
00326     // Info associated with opening connections
00327     struct ConnectingInfo {
00328         TimedMotionVector3f loc;
00329         TimedMotionQuaternion orient;
00330         BoundingSphere3f bounds;
00331         String mesh;
00332         String physics;
00333         String query;
00334         String zernike;
00335     };
00336     typedef std::tr1::function<void(const SpaceID&, const ObjectReference&, ServerID, const ConnectingInfo& ci)> InternalConnectedCallback;
00337 
00338     // Objects connections, maintains object connections and mapping
00339     class ObjectConnections {
00340     public:
00341         ObjectConnections(SessionManager* _parent);
00342 
00343         // Add the object, completely disconnected, to the index
00344         void add(
00345             const SpaceObjectReference& sporef_objid, ConnectingInfo ci,
00346             InternalConnectedCallback connect_cb, MigratedCallback migrate_cb,
00347             StreamCreatedCallback stream_created_cb, DisconnectedCallback disconnected_cb
00348         );
00349 
00350         bool exists(const SpaceObjectReference& sporef_objid);
00351 
00352         void clearSeqno(const SpaceObjectReference& sporef_objid);
00353         // Get the seqno for an outstanding request
00354         uint64 getSeqno(const SpaceObjectReference& sporef_objid);
00355         // Get a new seqno, and store it
00356         uint64 updateSeqno(const SpaceObjectReference& sporef_objid);
00357         // Indicates if the seqno we have right now is non-zero, for rare
00358         // situations where seqno is reset but other info isn't yet, so it's the
00359         // only way to tell if a request failed.
00360         bool validSeqno(const SpaceObjectReference& sporef_objid);
00361 
00362         // Mark the object as connecting to the given server
00363         ConnectingInfo& connectingTo(const SpaceObjectReference& obj, ServerID connecting_to);
00364 
00365         // Start a migration to a new server, return the MigratedCallback for the object
00366         void startMigration(const SpaceObjectReference& objid, ServerID migrating_to);
00367 
00368         WARN_UNUSED
00369         InternalConnectedCallback& getConnectCallback(const SpaceObjectReference& sporef_objid);
00370 
00371         // Marks as connected and returns the server connected to. do_cb
00372         // specifies whether the callback should be invoked or deferred
00373         ServerID handleConnectSuccess(const SpaceObjectReference& sporef_obj, const TimedMotionVector3f& loc, const TimedMotionQuaternion& orient, const BoundingSphere3f& bnds, const String& mesh, const String& phy, bool do_cb);
00374 
00375         void handleConnectError(const SpaceObjectReference& sporef_objid);
00376 
00377         void handleConnectStream(const SpaceObjectReference& sporef_objid, ConnectionEvent after, bool do_cb);
00378 
00379         void remove(const SpaceObjectReference& obj);
00380 
00381         // Handle a disconnection triggered by the loss of the underlying
00382         // network connection, i.e. because the TCPSST connection was lost
00383         // rather than the space server closing an individual session.
00384         void handleUnderlyingDisconnect(ServerID sid, const String& reason);
00385 
00386         // Handle a graceful disconnection, notifying other objects
00387         void gracefulDisconnect(const SpaceObjectReference& sporef);
00388 
00389         void disconnectWithCode(const SpaceObjectReference& sporef, const SpaceObjectReference& connectedAs, Disconnect::Code code);
00390         // Lookup the server the object is connected to.  With allow_connecting, allows using
00391         // the server currently being connected to, not just one where a session has been
00392         // established
00393         ServerID getConnectedServer(const SpaceObjectReference& sporef_obj_id, bool allow_connecting = false);
00394 
00395         ServerID getConnectingToServer(const SpaceObjectReference& sporef_obj_id);
00396 
00397         ServerID getMigratingToServer(const SpaceObjectReference& sporef_obj_id);
00398 
00399         //UUID getInternalID(const ObjectReference& space_objid) const;
00400 
00401         // We have to defer some callbacks sometimes for time
00402         // synchronization. This invokes them, allowing the connection process
00403         // to continue.
00404         void invokeDeferredCallbacks();
00405 
00406     private:
00407         SessionManager* parent;
00408 
00409         // Source for seqnos for Session messages
00410         uint64 mSeqnoSource;
00411 
00412         struct ObjectInfo {
00413             ObjectInfo();
00414 
00415             ConnectingInfo connectingInfo;
00416 
00417             // This is essentially a unique session ID for this
00418             // object. Note that this shouldn't change as long as the
00419             // same HostedObject is requesting a session *with the
00420             // same space server*. So, it will remain the same for all
00421             // session requests while a HostedObject remains alive and
00422             // no migrations occur.  This allows us to disambiguate
00423             // replies from the space server which may come from
00424             // different requests while the ObjectHost remains
00425             // connected to the space server.
00426             uint64 seqno;
00427 
00428             // Server currently being connected to
00429             ServerID connectingTo;
00430             // Server currently connected to
00431             ServerID connectedTo;
00432             // Server we're trying to migrate to
00433             ServerID migratingTo;
00434 
00435             SpaceObjectReference connectedAs;
00436 
00437             InternalConnectedCallback connectedCB;
00438             MigratedCallback migratedCB;
00439         StreamCreatedCallback streamCreatedCB;
00440         DisconnectedCallback disconnectedCB;
00441         };
00442         typedef std::tr1::unordered_map<ServerID, std::vector<SpaceObjectReference> > ObjectServerMap;
00443         ObjectServerMap mObjectServerMap;
00444         typedef std::tr1::unordered_map<SpaceObjectReference, ObjectInfo, SpaceObjectReference::Hasher> ObjectInfoMap;
00445         ObjectInfoMap mObjectInfo;
00446 
00447         // A reverse index allows us to lookup an objects internal ID
00448         //typedef std::tr1::unordered_map<SpaceObjectReference, UUID, ObjectReference::Hasher> InternalIDMap;
00449         //InternalIDMap mInternalIDs;
00450 
00451         typedef std::tr1::function<void()> DeferredCallback;
00452         typedef std::vector<DeferredCallback> DeferredCallbackList;
00453         DeferredCallbackList mDeferredCallbacks;
00454     };
00455     ObjectConnections mObjectConnections;
00456     friend class ObjectConnections;
00457 
00458     TimeSyncClient* mTimeSyncClient;
00459 
00460     bool mShuttingDown;
00461 
00462     void spaceConnectCallback(int err, SSTStreamPtr s, SpaceObjectReference obj, ConnectionEvent after);
00463     std::map<ObjectReference, SSTStreamPtr> mObjectToSpaceStreams;
00464 
00465 #ifdef PROFILE_OH_PACKET_RTT
00466     // Track outstanding packets for computing RTTs
00467     typedef std::tr1::unordered_map<uint64, Time> OutstandingPacketMap;
00468     OutstandingPacketMap mOutstandingPackets;
00469     uint8 mClearOutstandingCount;
00470     // And stats
00471     Duration mLatencySum;
00472     uint32 mLatencyCount;
00473     // And some helpers for reporting
00474     const String mTimeSeriesOHRTT;
00475 #endif
00476 }; // class SessionManager
00477 
00478 } // namespace Sirikata
00479 
00480 
00481 #endif //_SIRIKATA_LIBOH_SESSION_MANAGER_HPP_