Sirikata
libspace/plugins/prox/LibproxManualProximity.hpp
Go to the documentation of this file.
00001 // Copyright (c) 2011 Sirikata Authors. All rights reserved.
00002 // Use of this source code is governed by a BSD-style license that can
00003 // be found in the LICENSE file.
00004 
00005 #ifndef _SIRIKATA_LIBPROX_MANUAL_PROXIMITY_HPP_
00006 #define _SIRIKATA_LIBPROX_MANUAL_PROXIMITY_HPP_
00007 
00008 #include "LibproxProximityBase.hpp"
00009 #include <prox/manual/QueryHandler.hpp>
00010 #include <prox/base/LocationUpdateListener.hpp>
00011 #include <prox/base/AggregateListener.hpp>
00012 #include <sirikata/core/queue/ThreadSafeQueue.hpp>
00013 #include <sirikata/pintoloc/ManualReplicatedClient.hpp>
00014 #include "ManualReplicatedRequestManager.hpp"
00015 #include <boost/tr1/tuple.hpp>
00016 #include <sirikata/pintoloc/CopyableLocUpdate.hpp>
00017 
00018 namespace Sirikata {
00019 
00024 class LibproxManualProximity :
00025         public LibproxProximityBase,
00026         Prox::AggregateListener<ObjectProxSimulationTraits>,
00027         Prox::QueryEventListener<ObjectProxSimulationTraits, Prox::ManualQuery<ObjectProxSimulationTraits> >,
00028         Pinto::Manual::ReplicatedClientWithID<ServerID>::Parent
00029 {
00030 private:
00031     typedef Prox::ManualQueryHandler<ObjectProxSimulationTraits> ProxQueryHandler;
00032     typedef std::tr1::shared_ptr<ProxQueryHandler> ProxQueryHandlerPtr;
00033     typedef Prox::Aggregator<ObjectProxSimulationTraits> ProxAggregator;
00034     typedef Prox::ManualQuery<ObjectProxSimulationTraits> ProxQuery;
00035     typedef Prox::QueryEvent<ObjectProxSimulationTraits> ProxQueryEvent;
00036     typedef Prox::LocationServiceCache<ObjectProxSimulationTraits> ProxLocCache;
00037 
00038     typedef std::pair<OHDP::NodeID, Sirikata::Protocol::Object::ObjectMessage*> OHResult;
00039 public:
00040     LibproxManualProximity(SpaceContext* ctx, LocationService* locservice, CoordinateSegmentation* cseg, SpaceNetwork* net, AggregateManager* aggmgr);
00041     ~LibproxManualProximity();
00042 
00043     // MAIN Thread:
00044 
00045     // Service Interface overrides
00046     virtual void start();
00047     virtual void stop();
00048 
00049     // Objects
00050     virtual void addQuery(UUID obj, SolidAngle sa, uint32 max_results);
00051     virtual void addQuery(UUID obj, const String& params);
00052     virtual void removeQuery(UUID obj);
00053 
00054     // PollingService Interface
00055     virtual void poll();
00056 
00057     // PintoServerQuerierListener Interface
00058     virtual void onPintoServerResult(const Sirikata::Protocol::Prox::ProximityUpdate& update);
00059     virtual void onPintoServerLocUpdate(const LocUpdate& update);
00060 
00061     // LocationServiceListener Interface - Used for deciding when to switch
00062     // objects between static/dynamic, get raw loc updates from other
00063     // servers so they can be applied to the correct replicated indexes
00064     virtual void localObjectRemoved(const UUID& uuid, bool agg);
00065     virtual void localLocationUpdated(const UUID& uuid, bool agg, const TimedMotionVector3f& newval);
00066     virtual void replicaObjectRemoved(const UUID& uuid);
00067     virtual void replicaLocationUpdated(const UUID& uuid, const TimedMotionVector3f& newval);
00068     virtual void onLocationUpdateFromServer(const ServerID sid, const Sirikata::Protocol::Loc::LocationUpdate& update);
00069 
00070     // MessageRecipient Interface
00071     virtual void receiveMessage(Message* msg);
00072 
00073     // MigrationDataClient Interface
00074     virtual std::string migrationClientTag();
00075     virtual std::string generateMigrationData(const UUID& obj, ServerID source_server, ServerID dest_server);
00076     virtual void receiveMigrationData(const UUID& obj, ServerID source_server, ServerID dest_server, const std::string& data);
00077 
00078     // ObjectHostSessionListener Interface
00079     virtual void onObjectHostSession(const OHDP::NodeID& id, ObjectHostSessionPtr oh_sess);
00080     virtual void onObjectHostSessionEnded(const OHDP::NodeID& id);
00081 
00082 
00083     // PROX Thread:
00084 
00085     // AggregateListener Interface
00086     virtual void aggregateCreated(ProxAggregator* handler, const ObjectReference& objid);
00087     virtual void aggregateChildAdded(ProxAggregator* handler, const ObjectReference& objid, const ObjectReference& child, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size);
00088     virtual void aggregateChildRemoved(ProxAggregator* handler, const ObjectReference& objid, const ObjectReference& child, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size);
00089     virtual void aggregateBoundsUpdated(ProxAggregator* handler, const ObjectReference& objid, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size);
00090     virtual void aggregateDestroyed(ProxAggregator* handler, const ObjectReference& objid);
00091     virtual void aggregateObserved(ProxAggregator* handler, const ObjectReference& objid, uint32 nobservers, uint32 nchildren);
00092 
00093     // QueryEventListener Interface
00094     void queryHasEvents(ProxQuery* query);
00095 
00096 private:
00097     typedef std::tr1::unordered_set<ObjectReference, ObjectReference::Hasher> ObjectIDSet;
00098     typedef std::tr1::unordered_set<OHDP::NodeID, OHDP::NodeID::Hasher> OHQuerierSet;
00099 
00100     struct ProxQueryHandlerData;
00101     typedef Pinto::Manual::ReplicatedClientWithID<ServerID> ReplicatedClient;
00102     typedef ReplicatedClient::Parent ReplicatedClientParent;
00103     typedef std::tr1::shared_ptr<ReplicatedClient> ReplicatedClientPtr;
00104 
00105     // MAIN Thread:
00106 
00107 
00108     // MAIN Thread  Server-to-server queries (including top-level
00109     // pinto):
00110 
00111     // ReplicatedClientWithID Interface Part I
00112     virtual void sendReplicatedClientProxMessage(ReplicatedClient* client, const ServerID& evt_src_server, const String& msg);
00113 
00114     // First handling stage of ProximityResults, getting them into
00115     // Loc. See handleUpdateServerQueryResultsToReplicatedTrees for
00116     // second half
00117     void handleUpdateServerQueryResultsToLocService(ServerID sid, const Sirikata::Protocol::Prox::ProximityResults& results);
00118 
00119     // MAIN Thread  OH queries:
00120 
00121     virtual int32 objectHostQueries() const;
00122     virtual int32 serverQueries() const;
00123 
00124     // ObjectHost message management
00125     void handleObjectHostSubstream(int success, OHDPSST::Stream::Ptr substream, SeqNoPtr seqNo);
00126 
00127     // Server queries management
00128     void updateServerQuery(ServerID sid, const String& raw_query);
00129     void updateServerQueryResults(ServerID sid, const Sirikata::Protocol::Prox::ProximityResults& results);
00130 
00131     std::deque<Message*> mServerResultsToSend; // actually both
00132                                                // results + commands
00133     std::deque<OHResult> mOHResultsToSend;
00134 
00135     // PROX Thread:
00136 
00137     void tickQueryHandlers();
00138 
00139     // PROX Thread -- Server-to-server and top-level pinto
00140 
00141     // ReplicatedClientWithID Interface Part II (only invoked through prox
00142     // strand because the methods that invoke these are only called
00143     // from this class in the prox strand)
00144     virtual void onCreatedReplicatedIndex(ReplicatedClient* client, const ServerID& evt_src_server, ProxIndexID proxid, ReplicatedLocationServiceCachePtr loccache, ServerID sid, bool dynamic_objects);
00145     virtual void onDestroyedReplicatedIndex(ReplicatedClient* client, const ServerID& evt_src_server, ProxIndexID proxid);
00146 
00147     // Server events, some from the main thread
00148     // Override for forced disconnections
00149     virtual void handleForcedDisconnection(ServerID server);
00150 
00151     // Real handlers for PintoServerQuerierListener and
00152     // ReplicatedClientWithID -- these generate/destroy LocCaches and
00153     // Query Handlers
00154     void handleOnPintoServerResult(const Sirikata::Protocol::Prox::ProximityUpdate& update);
00155     void handleOnPintoServerLocUpdate(const CopyableLocUpdate& update);
00156 
00157     void handleUpdateServerQuery(ServerID sid, const String& raw_query);
00158     void handleUpdateServerQueryResultsToReplicatedTrees(ServerID sid, const Sirikata::Protocol::Prox::ProximityResults& results);
00159     void handleUpdateServerQueryResultsToRetryRequests(ServerID sid, const Sirikata::Protocol::Prox::ProximityResults& results);
00160 
00161     // Helpers for un/registering a server query
00162     void registerServerQuery(const ServerID& querier);
00163     void unregisterServerQuery(const ServerID& querier);
00164 
00165     SeqNoPtr getOrCreateSeqNoInfo(const ServerID server_id);
00166     void eraseSeqNoInfo(const ServerID server_id);
00167 
00168     // PROX Thread -- OH queries
00169 
00170     // Real handler for OH requests, in the prox thread
00171     void handleObjectHostProxMessage(const OHDP::NodeID& id, const String& data, SeqNoPtr seqNo);
00172     // Real handler for OH disconnects
00173     void handleObjectHostSessionEnded(const OHDP::NodeID& id);
00174     void destroyQuery(const OHDP::NodeID& id);
00175 
00176     // Helpers for un/registering a query against an entire Server
00177     void registerOHQueryWithServerHandlers(const OHDP::NodeID& querier, ServerID queried_node);
00178     void unregisterOHQueryWithServerHandlers(const OHDP::NodeID& querier, ServerID queried_node);
00179     // Helpers for un/registering a query against a single index
00180     void registerOHQueryWithServerIndexHandler(const OHDP::NodeID& querier, ServerID queried_node, ProxIndexID queried_index);
00181     void unregisterOHQueryWithServerIndexHandler(const OHDP::NodeID& querier, ServerID queried_node, ProxIndexID queried_index);
00182 
00183     // Decides whether a query handler should handle a particular object.
00184     bool handlerShouldHandleObject(bool is_static_handler, bool is_global_handler, const ObjectReference& obj_id, bool is_local, bool is_aggregate, const TimedMotionVector3f& pos, const BoundingSphere3f& region, float maxSize);
00185     // The real handler for moving objects between static/dynamic
00186     void handleCheckObjectClassForHandlers(const ObjectReference& objid, bool is_static, ProxQueryHandlerData handlers[NUM_OBJECT_CLASSES]);
00187     virtual void trySwapHandlers(bool is_local, const ObjectReference& objid, bool is_static);
00188 
00189     SeqNoPtr getSeqNoInfo(const OHDP::NodeID& node);
00190     void eraseSeqNoInfo(const OHDP::NodeID& node);
00191 
00192 
00193 
00194     // Command handlers
00195     virtual void commandProperties(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00196     virtual void commandListHandlers(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00197     bool parseHandlerName(const String& name, ProxQueryHandler** handler_out);
00198     virtual void commandForceRebuild(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00199     virtual void commandListNodes(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00200     virtual void commandListQueriers(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00201 
00202 
00203 
00204     // The layout is complicated and nested here because we have many
00205     // layers -- queriers, space servers, indices, etc. First, we'll
00206     // start with how the replicated data and query handlers are
00207     // managed.
00208 
00209     // ReplicateClients for nodes will manage the basic replication of
00210     // remote trees -- they are the "session" with the remote node,
00211     // manage the cut(s), and notify us when indices are
00212     // added/destroyed on a remote node.
00213     // No new types here, just ReplicatedClientPtr
00214 
00215     // On top of the replicated clients we need to build query
00216     // processors for OH queries. Each server needs a replicated
00217     // client, and within that we may have many replicated indices and
00218     // handlers:
00219     // First, for any given replicated index, we need to provide a
00220     // query handler for it, hold on to the loccache for it, and track
00221     // its basic properties. Note that this only tracks info about the
00222     // handler, not about the registered queries.
00223     struct ReplicatedIndexQueryHandler {
00224         ReplicatedIndexQueryHandler(ReplicatedLocationServiceCachePtr loccache_, ProxQueryHandlerPtr handler_)
00225          : loccache(loccache_), handler(handler_) {}
00226         ReplicatedIndexQueryHandler()
00227         {}
00228 
00229         // Prereqs and properties:
00230         ReplicatedLocationServiceCachePtr loccache;
00231 
00232         // The handler and its data:
00233         ProxQueryHandlerPtr handler;
00234         // Additions and removals that need to be processed on the
00235         // next tick. These need to be handled carefully since they
00236         // can be due to swapping between handlers. If they are
00237         // processed in the wrong order we could end up generating
00238         // [addition, removal] instead of [removal, addition] for
00239         // queriers.
00240         ObjectIDSet additions;
00241         ObjectIDSet removals;
00242     };
00243     // Next, we collect these for each server. However, here we start
00244     // tracking registered queries as well. We keep track of which
00245     // queries got to requesting data from this node (i.e. they
00246     // refined to get to this server) so on the first or new
00247     // replicated trees we can actually register queries for them.
00248     typedef std::tr1::unordered_map<ProxIndexID, ReplicatedIndexQueryHandler> ReplicatedIndexQueryHandlerMap;
00249     struct ReplicatedServerData {
00250         // The client that interacts with the other server to manage
00251         // replicated data, and lets us know about new indices.
00252         ReplicatedClientPtr client;
00253         // Handlers for individual replicated indices
00254         ReplicatedIndexQueryHandlerMap handlers;
00255         // Queriers that have refined to this tree.
00256         OHQuerierSet queriers;
00257     };
00258     // Finally, use a map of these, one for each server, to actually
00259     // instantiate all of this
00260     typedef std::tr1::unordered_map<ServerID, ReplicatedServerData> ReplicatedServerDataMap;
00261     ReplicatedServerDataMap mReplicatedServerDataMap;
00262 
00263     // Maintain two maps for ProxIndexIDs. We actually get 2
00264     // ProxIndexIDs for each replicated tree: 1 from the origin server
00265     // and one from the query handler we generate on top of
00266     // that. Since we might get the same ID from multiple origin
00267     // servers, those are not unique. We always want to report our
00268     // locally generated version to clients. However, that also means
00269     // we need to be able to map between the two/get back to the
00270     // original data.
00271     // Maintain a map of *locally generated* ProxIndexIDs to the
00272     // source server + remote index ID responsible for them. If we only
00273     // have the local ProxIndexID, this lets us traverse the above data
00274     // structure
00275     typedef std::pair<ServerID, ProxIndexID> NodeProxIndexID;
00276     typedef std::tr1::unordered_map<ProxIndexID, NodeProxIndexID> IndexToServerMap;
00277     IndexToServerMap mLocalToRemoteIndexMap;
00278 
00279     // And a map from aggregator (libprox handler) -> (ServerID, ProxIndexID) for when we
00280     // get aggregate callbacks. This ProxIndexID is the *origin
00281     // server's* identifier since we use it to get back to the
00282     // replicated data.
00283     typedef std::tr1::unordered_map<ProxAggregator*, NodeProxIndexID> InverseReplicatedIndexQueryHandlerMap;
00284     InverseReplicatedIndexQueryHandlerMap mAggregatorToIndexMap;
00285 
00286     // Now, with all the data replicated and query handlers setup, we
00287     // can move on to the management of the queries themselves.
00288 
00289     // We need a record of each individual query based on which tree its in,
00290     // i.e. based on the space server and index ID. We could just have a map of
00291     // <OHID, SSID, IndexID> -> ProxQuery. However, for destroying complete
00292     // queries, we need the set of all individual queries associated with an OH
00293     // query, and for disconnecting space servers we ultimately need to clear
00294     // out all individual queries with an associate space server. So instead, we
00295     // build the equivalent but more indirect
00296     // OHID -> (SSID -> (IndexID -> ProxQuery)).
00297     typedef std::tr1::unordered_map<ProxIndexID, ProxQuery*> IndexToQueryMap;
00298     typedef std::tr1::unordered_map<ServerID, IndexToQueryMap> ServerToIndexToQueryMap;
00299     typedef std::tr1::unordered_map<OHDP::NodeID, ServerToIndexToQueryMap, OHDP::NodeID::Hasher> OHQueryToServerToIndexToQueryMap;
00300     OHQueryToServerToIndexToQueryMap mOHRemoteQueries;
00301     // We also need to keep track of the inverted index so when we get results
00302     // from the query process and are given a ProxQuery*, we can get back to the
00303     // data.
00304     typedef std::tr1::tuple<OHDP::NodeID, ServerID, ProxIndexID> OHRemoteQueryID;
00305     typedef std::tr1::unordered_map<ProxQuery*, OHRemoteQueryID> InvertedOHRemoteQueryMap;
00306     InvertedOHRemoteQueryMap mInvertedOHRemoteQueries;
00307 
00308     // A single query actually consists of many subqueries against
00309     // individual nodes, e.g. a query from OH1 consists of queries
00310     // against top-level pinto and ss1, ss2, ss4.
00311     // We maintain a map of these, one per registered OH query
00312     typedef std::tr1::unordered_map<OHDP::NodeID, ProxQuery*, OHDP::NodeID::Hasher> OHQueryMap;
00313     // And finally, we need the reverse index -- given an individual
00314     // query we get in a callback, which OH query does it belong to,
00315     // and which space server is it registered with?
00316     typedef std::tr1::unordered_map<ProxQuery*, OHDP::NodeID> InvertedOHQueryMap;
00317 
00318     // These track objects on this server and respond to OH queries.
00319     OHQueryMap mOHQueries[NUM_OBJECT_CLASSES];
00320     InvertedOHQueryMap mInvertedOHQueries;
00321     // And their results
00322     Sirikata::ThreadSafeQueue<OHResult> mOHResults;
00323     typedef std::tr1::unordered_map<OHDP::NodeID, SeqNoPtr, OHDP::NodeID::Hasher> OHSeqNoInfoMap;
00324     OHSeqNoInfoMap mOHSeqNos;
00325     // For object hosts *only* we track requests that failed we we can try to
00326     // reapply them if new data comes in. This masks failures that occur when
00327     // we're still waiting for a request to another server (or top-level pinto)
00328     // to be fulfilled and results to be returned.
00329     ManualReplicatedRequestManager mOHRequestsManager;
00330 
00331 
00332     // Server-to-Server queries
00333     typedef std::tr1::unordered_map<ServerID, ProxQuery*> ServerQueryMap;
00334     typedef std::tr1::unordered_map<ProxQuery*, ServerID> InvertedServerQueryMap;
00335     ServerQueryMap mServerQueries[NUM_OBJECT_CLASSES];
00336     InvertedServerQueryMap mInvertedServerQueries;
00337     Sirikata::ThreadSafeQueue<Message*> mServerResults; // server query results + commands
00338     typedef std::tr1::unordered_map<ServerID, SeqNoPtr> ServerSeqNoInfoMap;
00339     ServerSeqNoInfoMap mServerSeqNos;
00340 
00341 
00342     // And these are the actual data structures for local queries --
00343     // handling both server queries (their entire query) and OH
00344     // queries (when they get into this this tree).
00345 
00346     struct ProxQueryHandlerData {
00347         ProxQueryHandler* handler;
00348         // Additions and removals that need to be processed on the
00349         // next tick. These need to be handled carefully since they
00350         // can be due to swapping between handlers. If they are
00351         // processed in the wrong order we could end up generating
00352         // [addition, removal] instead of [removal, addition] for
00353         // queriers.
00354         ObjectIDSet additions;
00355         ObjectIDSet removals;
00356     };
00357 
00358     ProxQueryHandlerData mLocalQueryHandler[NUM_OBJECT_CLASSES];
00359     PollerService mLocalHandlerPoller;
00360 }; // class LibproxManualProximity
00361 
00362 } // namespace Sirikata
00363 
00364 #endif //_SIRIKATA_LIBPROX_MANUAL_PROXIMITY_HPP_