Sirikata
|
00001 // Copyright (c) 2011 Sirikata Authors. All rights reserved. 00002 // Use of this source code is governed by a BSD-style license that can 00003 // be found in the LICENSE file. 00004 00005 #ifndef _SIRIKATA_LIBPROX_MANUAL_PROXIMITY_HPP_ 00006 #define _SIRIKATA_LIBPROX_MANUAL_PROXIMITY_HPP_ 00007 00008 #include "LibproxProximityBase.hpp" 00009 #include <prox/manual/QueryHandler.hpp> 00010 #include <prox/base/LocationUpdateListener.hpp> 00011 #include <prox/base/AggregateListener.hpp> 00012 #include <sirikata/core/queue/ThreadSafeQueue.hpp> 00013 #include <sirikata/pintoloc/ManualReplicatedClient.hpp> 00014 #include "ManualReplicatedRequestManager.hpp" 00015 #include <boost/tr1/tuple.hpp> 00016 #include <sirikata/pintoloc/CopyableLocUpdate.hpp> 00017 00018 namespace Sirikata { 00019 00024 class LibproxManualProximity : 00025 public LibproxProximityBase, 00026 Prox::AggregateListener<ObjectProxSimulationTraits>, 00027 Prox::QueryEventListener<ObjectProxSimulationTraits, Prox::ManualQuery<ObjectProxSimulationTraits> >, 00028 Pinto::Manual::ReplicatedClientWithID<ServerID>::Parent 00029 { 00030 private: 00031 typedef Prox::ManualQueryHandler<ObjectProxSimulationTraits> ProxQueryHandler; 00032 typedef std::tr1::shared_ptr<ProxQueryHandler> ProxQueryHandlerPtr; 00033 typedef Prox::Aggregator<ObjectProxSimulationTraits> ProxAggregator; 00034 typedef Prox::ManualQuery<ObjectProxSimulationTraits> ProxQuery; 00035 typedef Prox::QueryEvent<ObjectProxSimulationTraits> ProxQueryEvent; 00036 typedef Prox::LocationServiceCache<ObjectProxSimulationTraits> ProxLocCache; 00037 00038 typedef std::pair<OHDP::NodeID, Sirikata::Protocol::Object::ObjectMessage*> OHResult; 00039 public: 00040 LibproxManualProximity(SpaceContext* ctx, LocationService* locservice, CoordinateSegmentation* cseg, SpaceNetwork* net, AggregateManager* aggmgr); 00041 ~LibproxManualProximity(); 00042 00043 // MAIN Thread: 00044 00045 // Service Interface overrides 00046 virtual void start(); 00047 virtual void stop(); 00048 00049 // Objects 00050 virtual void addQuery(UUID obj, SolidAngle sa, uint32 max_results); 00051 virtual void addQuery(UUID obj, const String& params); 00052 virtual void removeQuery(UUID obj); 00053 00054 // PollingService Interface 00055 virtual void poll(); 00056 00057 // PintoServerQuerierListener Interface 00058 virtual void onPintoServerResult(const Sirikata::Protocol::Prox::ProximityUpdate& update); 00059 virtual void onPintoServerLocUpdate(const LocUpdate& update); 00060 00061 // LocationServiceListener Interface - Used for deciding when to switch 00062 // objects between static/dynamic, get raw loc updates from other 00063 // servers so they can be applied to the correct replicated indexes 00064 virtual void localObjectRemoved(const UUID& uuid, bool agg); 00065 virtual void localLocationUpdated(const UUID& uuid, bool agg, const TimedMotionVector3f& newval); 00066 virtual void replicaObjectRemoved(const UUID& uuid); 00067 virtual void replicaLocationUpdated(const UUID& uuid, const TimedMotionVector3f& newval); 00068 virtual void onLocationUpdateFromServer(const ServerID sid, const Sirikata::Protocol::Loc::LocationUpdate& update); 00069 00070 // MessageRecipient Interface 00071 virtual void receiveMessage(Message* msg); 00072 00073 // MigrationDataClient Interface 00074 virtual std::string migrationClientTag(); 00075 virtual std::string generateMigrationData(const UUID& obj, ServerID source_server, ServerID dest_server); 00076 virtual void receiveMigrationData(const UUID& obj, ServerID source_server, ServerID dest_server, const std::string& data); 00077 00078 // ObjectHostSessionListener Interface 00079 virtual void onObjectHostSession(const OHDP::NodeID& id, ObjectHostSessionPtr oh_sess); 00080 virtual void onObjectHostSessionEnded(const OHDP::NodeID& id); 00081 00082 00083 // PROX Thread: 00084 00085 // AggregateListener Interface 00086 virtual void aggregateCreated(ProxAggregator* handler, const ObjectReference& objid); 00087 virtual void aggregateChildAdded(ProxAggregator* handler, const ObjectReference& objid, const ObjectReference& child, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size); 00088 virtual void aggregateChildRemoved(ProxAggregator* handler, const ObjectReference& objid, const ObjectReference& child, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size); 00089 virtual void aggregateBoundsUpdated(ProxAggregator* handler, const ObjectReference& objid, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size); 00090 virtual void aggregateDestroyed(ProxAggregator* handler, const ObjectReference& objid); 00091 virtual void aggregateObserved(ProxAggregator* handler, const ObjectReference& objid, uint32 nobservers, uint32 nchildren); 00092 00093 // QueryEventListener Interface 00094 void queryHasEvents(ProxQuery* query); 00095 00096 private: 00097 typedef std::tr1::unordered_set<ObjectReference, ObjectReference::Hasher> ObjectIDSet; 00098 typedef std::tr1::unordered_set<OHDP::NodeID, OHDP::NodeID::Hasher> OHQuerierSet; 00099 00100 struct ProxQueryHandlerData; 00101 typedef Pinto::Manual::ReplicatedClientWithID<ServerID> ReplicatedClient; 00102 typedef ReplicatedClient::Parent ReplicatedClientParent; 00103 typedef std::tr1::shared_ptr<ReplicatedClient> ReplicatedClientPtr; 00104 00105 // MAIN Thread: 00106 00107 00108 // MAIN Thread Server-to-server queries (including top-level 00109 // pinto): 00110 00111 // ReplicatedClientWithID Interface Part I 00112 virtual void sendReplicatedClientProxMessage(ReplicatedClient* client, const ServerID& evt_src_server, const String& msg); 00113 00114 // First handling stage of ProximityResults, getting them into 00115 // Loc. See handleUpdateServerQueryResultsToReplicatedTrees for 00116 // second half 00117 void handleUpdateServerQueryResultsToLocService(ServerID sid, const Sirikata::Protocol::Prox::ProximityResults& results); 00118 00119 // MAIN Thread OH queries: 00120 00121 virtual int32 objectHostQueries() const; 00122 virtual int32 serverQueries() const; 00123 00124 // ObjectHost message management 00125 void handleObjectHostSubstream(int success, OHDPSST::Stream::Ptr substream, SeqNoPtr seqNo); 00126 00127 // Server queries management 00128 void updateServerQuery(ServerID sid, const String& raw_query); 00129 void updateServerQueryResults(ServerID sid, const Sirikata::Protocol::Prox::ProximityResults& results); 00130 00131 std::deque<Message*> mServerResultsToSend; // actually both 00132 // results + commands 00133 std::deque<OHResult> mOHResultsToSend; 00134 00135 // PROX Thread: 00136 00137 void tickQueryHandlers(); 00138 00139 // PROX Thread -- Server-to-server and top-level pinto 00140 00141 // ReplicatedClientWithID Interface Part II (only invoked through prox 00142 // strand because the methods that invoke these are only called 00143 // from this class in the prox strand) 00144 virtual void onCreatedReplicatedIndex(ReplicatedClient* client, const ServerID& evt_src_server, ProxIndexID proxid, ReplicatedLocationServiceCachePtr loccache, ServerID sid, bool dynamic_objects); 00145 virtual void onDestroyedReplicatedIndex(ReplicatedClient* client, const ServerID& evt_src_server, ProxIndexID proxid); 00146 00147 // Server events, some from the main thread 00148 // Override for forced disconnections 00149 virtual void handleForcedDisconnection(ServerID server); 00150 00151 // Real handlers for PintoServerQuerierListener and 00152 // ReplicatedClientWithID -- these generate/destroy LocCaches and 00153 // Query Handlers 00154 void handleOnPintoServerResult(const Sirikata::Protocol::Prox::ProximityUpdate& update); 00155 void handleOnPintoServerLocUpdate(const CopyableLocUpdate& update); 00156 00157 void handleUpdateServerQuery(ServerID sid, const String& raw_query); 00158 void handleUpdateServerQueryResultsToReplicatedTrees(ServerID sid, const Sirikata::Protocol::Prox::ProximityResults& results); 00159 void handleUpdateServerQueryResultsToRetryRequests(ServerID sid, const Sirikata::Protocol::Prox::ProximityResults& results); 00160 00161 // Helpers for un/registering a server query 00162 void registerServerQuery(const ServerID& querier); 00163 void unregisterServerQuery(const ServerID& querier); 00164 00165 SeqNoPtr getOrCreateSeqNoInfo(const ServerID server_id); 00166 void eraseSeqNoInfo(const ServerID server_id); 00167 00168 // PROX Thread -- OH queries 00169 00170 // Real handler for OH requests, in the prox thread 00171 void handleObjectHostProxMessage(const OHDP::NodeID& id, const String& data, SeqNoPtr seqNo); 00172 // Real handler for OH disconnects 00173 void handleObjectHostSessionEnded(const OHDP::NodeID& id); 00174 void destroyQuery(const OHDP::NodeID& id); 00175 00176 // Helpers for un/registering a query against an entire Server 00177 void registerOHQueryWithServerHandlers(const OHDP::NodeID& querier, ServerID queried_node); 00178 void unregisterOHQueryWithServerHandlers(const OHDP::NodeID& querier, ServerID queried_node); 00179 // Helpers for un/registering a query against a single index 00180 void registerOHQueryWithServerIndexHandler(const OHDP::NodeID& querier, ServerID queried_node, ProxIndexID queried_index); 00181 void unregisterOHQueryWithServerIndexHandler(const OHDP::NodeID& querier, ServerID queried_node, ProxIndexID queried_index); 00182 00183 // Decides whether a query handler should handle a particular object. 00184 bool handlerShouldHandleObject(bool is_static_handler, bool is_global_handler, const ObjectReference& obj_id, bool is_local, bool is_aggregate, const TimedMotionVector3f& pos, const BoundingSphere3f& region, float maxSize); 00185 // The real handler for moving objects between static/dynamic 00186 void handleCheckObjectClassForHandlers(const ObjectReference& objid, bool is_static, ProxQueryHandlerData handlers[NUM_OBJECT_CLASSES]); 00187 virtual void trySwapHandlers(bool is_local, const ObjectReference& objid, bool is_static); 00188 00189 SeqNoPtr getSeqNoInfo(const OHDP::NodeID& node); 00190 void eraseSeqNoInfo(const OHDP::NodeID& node); 00191 00192 00193 00194 // Command handlers 00195 virtual void commandProperties(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00196 virtual void commandListHandlers(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00197 bool parseHandlerName(const String& name, ProxQueryHandler** handler_out); 00198 virtual void commandForceRebuild(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00199 virtual void commandListNodes(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00200 virtual void commandListQueriers(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00201 00202 00203 00204 // The layout is complicated and nested here because we have many 00205 // layers -- queriers, space servers, indices, etc. First, we'll 00206 // start with how the replicated data and query handlers are 00207 // managed. 00208 00209 // ReplicateClients for nodes will manage the basic replication of 00210 // remote trees -- they are the "session" with the remote node, 00211 // manage the cut(s), and notify us when indices are 00212 // added/destroyed on a remote node. 00213 // No new types here, just ReplicatedClientPtr 00214 00215 // On top of the replicated clients we need to build query 00216 // processors for OH queries. Each server needs a replicated 00217 // client, and within that we may have many replicated indices and 00218 // handlers: 00219 // First, for any given replicated index, we need to provide a 00220 // query handler for it, hold on to the loccache for it, and track 00221 // its basic properties. Note that this only tracks info about the 00222 // handler, not about the registered queries. 00223 struct ReplicatedIndexQueryHandler { 00224 ReplicatedIndexQueryHandler(ReplicatedLocationServiceCachePtr loccache_, ProxQueryHandlerPtr handler_) 00225 : loccache(loccache_), handler(handler_) {} 00226 ReplicatedIndexQueryHandler() 00227 {} 00228 00229 // Prereqs and properties: 00230 ReplicatedLocationServiceCachePtr loccache; 00231 00232 // The handler and its data: 00233 ProxQueryHandlerPtr handler; 00234 // Additions and removals that need to be processed on the 00235 // next tick. These need to be handled carefully since they 00236 // can be due to swapping between handlers. If they are 00237 // processed in the wrong order we could end up generating 00238 // [addition, removal] instead of [removal, addition] for 00239 // queriers. 00240 ObjectIDSet additions; 00241 ObjectIDSet removals; 00242 }; 00243 // Next, we collect these for each server. However, here we start 00244 // tracking registered queries as well. We keep track of which 00245 // queries got to requesting data from this node (i.e. they 00246 // refined to get to this server) so on the first or new 00247 // replicated trees we can actually register queries for them. 00248 typedef std::tr1::unordered_map<ProxIndexID, ReplicatedIndexQueryHandler> ReplicatedIndexQueryHandlerMap; 00249 struct ReplicatedServerData { 00250 // The client that interacts with the other server to manage 00251 // replicated data, and lets us know about new indices. 00252 ReplicatedClientPtr client; 00253 // Handlers for individual replicated indices 00254 ReplicatedIndexQueryHandlerMap handlers; 00255 // Queriers that have refined to this tree. 00256 OHQuerierSet queriers; 00257 }; 00258 // Finally, use a map of these, one for each server, to actually 00259 // instantiate all of this 00260 typedef std::tr1::unordered_map<ServerID, ReplicatedServerData> ReplicatedServerDataMap; 00261 ReplicatedServerDataMap mReplicatedServerDataMap; 00262 00263 // Maintain two maps for ProxIndexIDs. We actually get 2 00264 // ProxIndexIDs for each replicated tree: 1 from the origin server 00265 // and one from the query handler we generate on top of 00266 // that. Since we might get the same ID from multiple origin 00267 // servers, those are not unique. We always want to report our 00268 // locally generated version to clients. However, that also means 00269 // we need to be able to map between the two/get back to the 00270 // original data. 00271 // Maintain a map of *locally generated* ProxIndexIDs to the 00272 // source server + remote index ID responsible for them. If we only 00273 // have the local ProxIndexID, this lets us traverse the above data 00274 // structure 00275 typedef std::pair<ServerID, ProxIndexID> NodeProxIndexID; 00276 typedef std::tr1::unordered_map<ProxIndexID, NodeProxIndexID> IndexToServerMap; 00277 IndexToServerMap mLocalToRemoteIndexMap; 00278 00279 // And a map from aggregator (libprox handler) -> (ServerID, ProxIndexID) for when we 00280 // get aggregate callbacks. This ProxIndexID is the *origin 00281 // server's* identifier since we use it to get back to the 00282 // replicated data. 00283 typedef std::tr1::unordered_map<ProxAggregator*, NodeProxIndexID> InverseReplicatedIndexQueryHandlerMap; 00284 InverseReplicatedIndexQueryHandlerMap mAggregatorToIndexMap; 00285 00286 // Now, with all the data replicated and query handlers setup, we 00287 // can move on to the management of the queries themselves. 00288 00289 // We need a record of each individual query based on which tree its in, 00290 // i.e. based on the space server and index ID. We could just have a map of 00291 // <OHID, SSID, IndexID> -> ProxQuery. However, for destroying complete 00292 // queries, we need the set of all individual queries associated with an OH 00293 // query, and for disconnecting space servers we ultimately need to clear 00294 // out all individual queries with an associate space server. So instead, we 00295 // build the equivalent but more indirect 00296 // OHID -> (SSID -> (IndexID -> ProxQuery)). 00297 typedef std::tr1::unordered_map<ProxIndexID, ProxQuery*> IndexToQueryMap; 00298 typedef std::tr1::unordered_map<ServerID, IndexToQueryMap> ServerToIndexToQueryMap; 00299 typedef std::tr1::unordered_map<OHDP::NodeID, ServerToIndexToQueryMap, OHDP::NodeID::Hasher> OHQueryToServerToIndexToQueryMap; 00300 OHQueryToServerToIndexToQueryMap mOHRemoteQueries; 00301 // We also need to keep track of the inverted index so when we get results 00302 // from the query process and are given a ProxQuery*, we can get back to the 00303 // data. 00304 typedef std::tr1::tuple<OHDP::NodeID, ServerID, ProxIndexID> OHRemoteQueryID; 00305 typedef std::tr1::unordered_map<ProxQuery*, OHRemoteQueryID> InvertedOHRemoteQueryMap; 00306 InvertedOHRemoteQueryMap mInvertedOHRemoteQueries; 00307 00308 // A single query actually consists of many subqueries against 00309 // individual nodes, e.g. a query from OH1 consists of queries 00310 // against top-level pinto and ss1, ss2, ss4. 00311 // We maintain a map of these, one per registered OH query 00312 typedef std::tr1::unordered_map<OHDP::NodeID, ProxQuery*, OHDP::NodeID::Hasher> OHQueryMap; 00313 // And finally, we need the reverse index -- given an individual 00314 // query we get in a callback, which OH query does it belong to, 00315 // and which space server is it registered with? 00316 typedef std::tr1::unordered_map<ProxQuery*, OHDP::NodeID> InvertedOHQueryMap; 00317 00318 // These track objects on this server and respond to OH queries. 00319 OHQueryMap mOHQueries[NUM_OBJECT_CLASSES]; 00320 InvertedOHQueryMap mInvertedOHQueries; 00321 // And their results 00322 Sirikata::ThreadSafeQueue<OHResult> mOHResults; 00323 typedef std::tr1::unordered_map<OHDP::NodeID, SeqNoPtr, OHDP::NodeID::Hasher> OHSeqNoInfoMap; 00324 OHSeqNoInfoMap mOHSeqNos; 00325 // For object hosts *only* we track requests that failed we we can try to 00326 // reapply them if new data comes in. This masks failures that occur when 00327 // we're still waiting for a request to another server (or top-level pinto) 00328 // to be fulfilled and results to be returned. 00329 ManualReplicatedRequestManager mOHRequestsManager; 00330 00331 00332 // Server-to-Server queries 00333 typedef std::tr1::unordered_map<ServerID, ProxQuery*> ServerQueryMap; 00334 typedef std::tr1::unordered_map<ProxQuery*, ServerID> InvertedServerQueryMap; 00335 ServerQueryMap mServerQueries[NUM_OBJECT_CLASSES]; 00336 InvertedServerQueryMap mInvertedServerQueries; 00337 Sirikata::ThreadSafeQueue<Message*> mServerResults; // server query results + commands 00338 typedef std::tr1::unordered_map<ServerID, SeqNoPtr> ServerSeqNoInfoMap; 00339 ServerSeqNoInfoMap mServerSeqNos; 00340 00341 00342 // And these are the actual data structures for local queries -- 00343 // handling both server queries (their entire query) and OH 00344 // queries (when they get into this this tree). 00345 00346 struct ProxQueryHandlerData { 00347 ProxQueryHandler* handler; 00348 // Additions and removals that need to be processed on the 00349 // next tick. These need to be handled carefully since they 00350 // can be due to swapping between handlers. If they are 00351 // processed in the wrong order we could end up generating 00352 // [addition, removal] instead of [removal, addition] for 00353 // queriers. 00354 ObjectIDSet additions; 00355 ObjectIDSet removals; 00356 }; 00357 00358 ProxQueryHandlerData mLocalQueryHandler[NUM_OBJECT_CLASSES]; 00359 PollerService mLocalHandlerPoller; 00360 }; // class LibproxManualProximity 00361 00362 } // namespace Sirikata 00363 00364 #endif //_SIRIKATA_LIBPROX_MANUAL_PROXIMITY_HPP_