Sirikata
|
00001 // Copyright (c) 2012 Sirikata Authors. All rights reserved. 00002 // Use of this source code is governed by a BSD-style license that can 00003 // be found in the LICENSE file. 00004 00005 #ifndef _SIRIKATA_OH_MQ_SERVER_QUERY_HANDLER_HPP_ 00006 #define _SIRIKATA_OH_MQ_SERVER_QUERY_HANDLER_HPP_ 00007 00008 #include <sirikata/oh/ObjectHostContext.hpp> 00009 #include <sirikata/oh/SpaceNodeSession.hpp> 00010 #include <sirikata/pintoloc/ManualReplicatedClient.hpp> 00011 #include <sirikata/oh/OHSpaceTimeSynced.hpp> 00012 00013 namespace Sirikata { 00014 namespace OH { 00015 namespace Manual { 00016 00017 class ManualObjectQueryProcessor; 00018 00025 class ServerQueryHandler : 00026 public SpaceNodeSessionListener, 00027 OrphanLocUpdateManager::Listener, 00028 Pinto::Manual::ReplicatedClientWithID<OHDP::SpaceNodeID>::Parent 00029 { 00030 public: 00031 ServerQueryHandler(ObjectHostContext* ctx, ManualObjectQueryProcessor* parent, Network::IOStrandPtr strand); 00032 00033 // Service Interface 00034 virtual void start(); 00035 virtual void stop(); 00036 00037 00038 // Helper that marks a server with another connected object and may register 00039 // a query 00040 void incrementServerQuery(OHDP::SpaceNodeID node); 00041 // Helper that tries to remove the server query pointed at by the 00042 // iterator, checking if it is referenced at all yet and sending a 00043 // message to kill the query 00044 void decrementServerQuery(OHDP::SpaceNodeID node); 00045 00046 00047 // ObjectQueryHandler callbacks - handle notifications about local queries 00048 // in the tree so we know how to move the cut on the space server up or down. 00049 void queriersAreObserving(const OHDP::SpaceNodeID& snid, const ProxIndexID indexid, const ObjectReference& objid); 00050 void queriersStoppedObserving(const OHDP::SpaceNodeID& snid, const ProxIndexID indexid, const ObjectReference& objid); 00051 void replicatedNodeRemoved(const OHDP::SpaceNodeID& snid, ProxIndexID indexid, const ObjectReference& objid); 00052 00053 private: 00054 ObjectHostContext* mContext; 00055 ManualObjectQueryProcessor* mParent; 00056 Network::IOStrandPtr mStrand; 00057 00058 // Queries we've registered with servers so that we can resolve 00059 // object queries. Most of the work is performed by the ReplicatedClient, 00060 // and this just interacts with it and manages communication. 00061 struct ServerQueryState { 00062 ServerQueryState(ServerQueryHandler* parent_, const OHDP::SpaceNodeID& id_, ObjectHostContext* ctx_, Network::IOStrandPtr strand_, OHDPSST::Stream::Ptr base) 00063 : nconnected(0), 00064 sync(new OHSpaceTimeSynced(ctx_->objectHost, id_.space())), 00065 client(ctx_, strand_, parent_, sync, id_.toString(), id_), 00066 base_stream(base), 00067 prox_stream(), 00068 prox_stream_requested(false), 00069 outstanding(), 00070 writing(false) 00071 { 00072 } 00073 virtual ~ServerQueryState() { 00074 // DO NOT delete sync. See note about ownership below. 00075 } 00076 00077 bool canRemove() const { return nconnected == 0; } 00078 00079 int32 nconnected; 00080 // This sync is stored here and passed into client. client owns it, we 00081 // just use to to translate loc updates. Ideally we'd eventually make 00082 // ManualReplicatedClient work only with abstract values (i.e. Prox 00083 // updates would also be translated here) so we could just be the only 00084 // users of it. 00085 OHSpaceTimeSynced* sync; 00086 Pinto::Manual::ReplicatedClientWithID<OHDP::SpaceNodeID> client; 00087 OHDPSST::Stream::Ptr base_stream; 00088 OHDPSST::Stream::Ptr prox_stream; 00089 // Whether we've requested the prox_stream 00090 bool prox_stream_requested; 00091 // Outstanding data to be sent 00092 std::queue<String> outstanding; 00093 // Whether we're in the process of sending messages 00094 bool writing; 00095 }; 00096 typedef std::tr1::shared_ptr<ServerQueryState> ServerQueryStatePtr; 00097 typedef std::tr1::unordered_map<OHDP::SpaceNodeID, ServerQueryStatePtr, OHDP::SpaceNodeID::Hasher> ServerQueryMap; 00098 ServerQueryMap mServerQueries; 00099 00100 // SpaceNodeSessionListener Interface 00101 virtual void onSpaceNodeSession(const OHDP::SpaceNodeID& id, OHDPSST::Stream::Ptr sn_stream); 00102 virtual void onSpaceNodeSessionEnded(const OHDP::SpaceNodeID& id); 00103 00104 // ReplicatedClientWithID Interface 00105 virtual void onCreatedReplicatedIndex(Pinto::Manual::ReplicatedClientWithID<OHDP::SpaceNodeID>* client, const OHDP::SpaceNodeID& snid, ProxIndexID proxid, ReplicatedLocationServiceCachePtr loccache, ServerID sid, bool dynamic_objects); 00106 virtual void onDestroyedReplicatedIndex(Pinto::Manual::ReplicatedClientWithID<OHDP::SpaceNodeID>* client, const OHDP::SpaceNodeID& snid, ProxIndexID proxid); 00107 virtual void sendReplicatedClientProxMessage(Pinto::Manual::ReplicatedClientWithID<OHDP::SpaceNodeID>* client, const OHDP::SpaceNodeID& snid, const String& msg); 00108 00109 00110 // Proximity 00111 00112 // Send a message to prox, triggering new stream as necessary 00113 void sendProxMessage(ServerQueryMap::iterator serv_it, const String& msg); 00114 // Utility that triggers writing some more prox data. As long as more is 00115 // available, it'll keep looping. 00116 void writeSomeProxData(ServerQueryStatePtr data); 00117 // Callback from creating proximity substream 00118 void handleCreatedProxSubstream(const OHDP::SpaceNodeID& snid, int success, OHDPSST::Stream::Ptr prox_stream); 00119 // Data read callback for prox substreams -- translate to proximity events 00120 void handleProximitySubstreamRead(const OHDP::SpaceNodeID& snid, OHDPSST::Stream::Ptr prox_stream, String* prevdata, uint8* buffer, int length); 00121 // Handle decode proximity message 00122 void handleProximityMessage(const OHDP::SpaceNodeID& snid, const String& payload); 00123 00124 // Location 00125 // Handlers for substreams for space-managed updates 00126 void handleLocationSubstream(const OHDP::SpaceNodeID& snid, int err, OHDPSST::Stream::Ptr s); 00127 // Handlers for substream read events for space-managed updates 00128 void handleLocationSubstreamRead(const OHDP::SpaceNodeID& snid, OHDPSST::Stream::Ptr s, std::stringstream* prevdata, uint8* buffer, int length); 00129 bool handleLocationMessage(const OHDP::SpaceNodeID& snid, const std::string& payload); 00130 00131 }; 00132 00133 } // namespace Manual 00134 } // namespace OH 00135 } // namespace Sirikata 00136 00137 #endif //_SIRIKATA_OH_MQ_SERVER_QUERY_HANDLER_HPP_