Sirikata
liboh/plugins/manual_query/ServerQueryHandler.hpp
Go to the documentation of this file.
00001 // Copyright (c) 2012 Sirikata Authors. All rights reserved.
00002 // Use of this source code is governed by a BSD-style license that can
00003 // be found in the LICENSE file.
00004 
00005 #ifndef _SIRIKATA_OH_MQ_SERVER_QUERY_HANDLER_HPP_
00006 #define _SIRIKATA_OH_MQ_SERVER_QUERY_HANDLER_HPP_
00007 
00008 #include <sirikata/oh/ObjectHostContext.hpp>
00009 #include <sirikata/oh/SpaceNodeSession.hpp>
00010 #include <sirikata/pintoloc/ManualReplicatedClient.hpp>
00011 #include <sirikata/oh/OHSpaceTimeSynced.hpp>
00012 
00013 namespace Sirikata {
00014 namespace OH {
00015 namespace Manual {
00016 
00017 class ManualObjectQueryProcessor;
00018 
00025 class ServerQueryHandler :
00026         public SpaceNodeSessionListener,
00027         OrphanLocUpdateManager::Listener,
00028         Pinto::Manual::ReplicatedClientWithID<OHDP::SpaceNodeID>::Parent
00029 {
00030 public:
00031     ServerQueryHandler(ObjectHostContext* ctx, ManualObjectQueryProcessor* parent, Network::IOStrandPtr strand);
00032 
00033     // Service Interface
00034     virtual void start();
00035     virtual void stop();
00036 
00037 
00038     // Helper that marks a server with another connected object and may register
00039     // a query
00040     void incrementServerQuery(OHDP::SpaceNodeID node);
00041     // Helper that tries to remove the server query pointed at by the
00042     // iterator, checking if it is referenced at all yet and sending a
00043     // message to kill the query
00044     void decrementServerQuery(OHDP::SpaceNodeID node);
00045 
00046 
00047     // ObjectQueryHandler callbacks - handle notifications about local queries
00048     // in the tree so we know how to move the cut on the space server up or down.
00049     void queriersAreObserving(const OHDP::SpaceNodeID& snid, const ProxIndexID indexid, const ObjectReference& objid);
00050     void queriersStoppedObserving(const OHDP::SpaceNodeID& snid, const ProxIndexID indexid, const ObjectReference& objid);
00051     void replicatedNodeRemoved(const OHDP::SpaceNodeID& snid, ProxIndexID indexid, const ObjectReference& objid);
00052 
00053 private:
00054     ObjectHostContext* mContext;
00055     ManualObjectQueryProcessor* mParent;
00056     Network::IOStrandPtr mStrand;
00057 
00058     // Queries we've registered with servers so that we can resolve
00059     // object queries. Most of the work is performed by the ReplicatedClient,
00060     // and this just interacts with it and manages communication.
00061     struct ServerQueryState {
00062         ServerQueryState(ServerQueryHandler* parent_, const OHDP::SpaceNodeID& id_, ObjectHostContext* ctx_, Network::IOStrandPtr strand_, OHDPSST::Stream::Ptr base)
00063          : nconnected(0),
00064            sync(new OHSpaceTimeSynced(ctx_->objectHost, id_.space())),
00065            client(ctx_, strand_, parent_, sync, id_.toString(), id_),
00066            base_stream(base),
00067            prox_stream(),
00068            prox_stream_requested(false),
00069            outstanding(),
00070            writing(false)
00071         {
00072         }
00073         virtual ~ServerQueryState() {
00074             // DO NOT delete sync. See note about ownership below.
00075         }
00076 
00077         bool canRemove() const { return nconnected == 0; }
00078 
00079         int32 nconnected;
00080         // This sync is stored here and passed into client. client owns it, we
00081         // just use to to translate loc updates. Ideally we'd eventually make
00082         // ManualReplicatedClient work only with abstract values (i.e. Prox
00083         // updates would also be translated here) so we could just be the only
00084         // users of it.
00085         OHSpaceTimeSynced* sync;
00086         Pinto::Manual::ReplicatedClientWithID<OHDP::SpaceNodeID> client;
00087         OHDPSST::Stream::Ptr base_stream;
00088         OHDPSST::Stream::Ptr prox_stream;
00089         // Whether we've requested the prox_stream
00090         bool prox_stream_requested;
00091         // Outstanding data to be sent
00092         std::queue<String> outstanding;
00093         // Whether we're in the process of sending messages
00094         bool writing;
00095     };
00096     typedef std::tr1::shared_ptr<ServerQueryState> ServerQueryStatePtr;
00097     typedef std::tr1::unordered_map<OHDP::SpaceNodeID, ServerQueryStatePtr, OHDP::SpaceNodeID::Hasher> ServerQueryMap;
00098     ServerQueryMap mServerQueries;
00099 
00100     // SpaceNodeSessionListener Interface
00101     virtual void onSpaceNodeSession(const OHDP::SpaceNodeID& id, OHDPSST::Stream::Ptr sn_stream);
00102     virtual void onSpaceNodeSessionEnded(const OHDP::SpaceNodeID& id);
00103 
00104     // ReplicatedClientWithID Interface
00105     virtual void onCreatedReplicatedIndex(Pinto::Manual::ReplicatedClientWithID<OHDP::SpaceNodeID>* client, const OHDP::SpaceNodeID& snid, ProxIndexID proxid, ReplicatedLocationServiceCachePtr loccache, ServerID sid, bool dynamic_objects);
00106     virtual void onDestroyedReplicatedIndex(Pinto::Manual::ReplicatedClientWithID<OHDP::SpaceNodeID>* client, const OHDP::SpaceNodeID& snid, ProxIndexID proxid);
00107     virtual void sendReplicatedClientProxMessage(Pinto::Manual::ReplicatedClientWithID<OHDP::SpaceNodeID>* client, const OHDP::SpaceNodeID& snid, const String& msg);
00108 
00109 
00110     // Proximity
00111 
00112     // Send a message to prox, triggering new stream as necessary
00113     void sendProxMessage(ServerQueryMap::iterator serv_it, const String& msg);
00114     // Utility that triggers writing some more prox data. As long as more is
00115     // available, it'll keep looping.
00116     void writeSomeProxData(ServerQueryStatePtr data);
00117     // Callback from creating proximity substream
00118     void handleCreatedProxSubstream(const OHDP::SpaceNodeID& snid, int success, OHDPSST::Stream::Ptr prox_stream);
00119     // Data read callback for prox substreams -- translate to proximity events
00120     void handleProximitySubstreamRead(const OHDP::SpaceNodeID& snid, OHDPSST::Stream::Ptr prox_stream, String* prevdata, uint8* buffer, int length);
00121     // Handle decode proximity message
00122     void handleProximityMessage(const OHDP::SpaceNodeID& snid, const String& payload);
00123 
00124     // Location
00125     // Handlers for substreams for space-managed updates
00126     void handleLocationSubstream(const OHDP::SpaceNodeID& snid, int err, OHDPSST::Stream::Ptr s);
00127     // Handlers for substream read events for space-managed updates
00128     void handleLocationSubstreamRead(const OHDP::SpaceNodeID& snid, OHDPSST::Stream::Ptr s, std::stringstream* prevdata, uint8* buffer, int length);
00129     bool handleLocationMessage(const OHDP::SpaceNodeID& snid, const std::string& payload);
00130 
00131 };
00132 
00133 } // namespace Manual
00134 } // namespace OH
00135 } // namespace Sirikata
00136 
00137 #endif //_SIRIKATA_OH_MQ_SERVER_QUERY_HANDLER_HPP_