Sirikata
liboh/plugins/manual_query/ObjectQueryHandler.hpp
Go to the documentation of this file.
00001 // Copyright (c) 2012 Sirikata Authors. All rights reserved.
00002 // Use of this source code is governed by a BSD-style license that can
00003 // be found in the LICENSE file.
00004 
00005 #ifndef _SIRIKATA_OH_MQ_OBJECT_QUERY_HANDLER_HPP_
00006 #define _SIRIKATA_OH_MQ_OBJECT_QUERY_HANDLER_HPP_
00007 
00008 #include "ObjectQueryHandlerBase.hpp"
00009 #include <sirikata/pintoloc/ProxSimulationTraits.hpp>
00010 #include <sirikata/pintoloc/ReplicatedLocationUpdateListener.hpp>
00011 #include <sirikata/pintoloc/ReplicatedLocationServiceCache.hpp>
00012 #include <prox/geom/QueryHandler.hpp>
00013 #include <prox/base/AggregateListener.hpp>
00014 #include <sirikata/core/queue/ThreadSafeQueueWithNotification.hpp>
00015 #include <sirikata/core/service/PollerService.hpp>
00016 #include <sirikata/oh/HostedObject.hpp>
00017 #include <sirikata/core/prox/Defs.hpp>
00018 #include <sirikata/core/util/InstanceMethodNotReentrant.hpp>
00019 
00020 namespace Sirikata {
00021 
00022 
00023 namespace Protocol {
00024 namespace Prox {
00025 class ProximityUpdate;
00026 }
00027 }
00028 
00029 namespace OH {
00030 namespace Manual {
00031 
00032 class ObjectQueryHandler :
00033         public ObjectQueryHandlerBase,
00034         Prox::QueryEventListener<ObjectProxSimulationTraits, Prox::Query<ObjectProxSimulationTraits> >,
00035         Prox::AggregateListener<ObjectProxSimulationTraits>,
00036         public ReplicatedLocationUpdateListener
00037 {
00038 private:
00039     typedef Prox::QueryHandler<ObjectProxSimulationTraits> ProxQueryHandler;
00040     typedef Prox::Aggregator<ObjectProxSimulationTraits> ProxAggregator;
00041 public:
00042     // MAIN Thread: All public interface is expected to be called only from the main thread.
00043     typedef Prox::Query<ObjectProxSimulationTraits> Query;
00044     typedef Prox::QueryEvent<ObjectProxSimulationTraits> QueryEvent;
00045 
00046     ObjectQueryHandler(ObjectHostContext* ctx, ManualObjectQueryProcessor* parent, const OHDP::SpaceNodeID& space, Network::IOStrandPtr prox_strand);
00047     ~ObjectQueryHandler();
00048 
00049     // MAIN Thread:
00050 
00051     // Service Interface overrides
00052     virtual void start();
00053     virtual void stop();
00054 
00055 
00056     // Index/Tree replication events from server queries
00057     void createdReplicatedIndex(ProxIndexID iid, ReplicatedLocationServiceCachePtr loc_cache, ServerID objects_from_server, bool dynamic_objects);
00058     void removedReplicatedIndex(ProxIndexID iid);
00059 
00060     // Query settings. These pass in the HostedObject so they don't
00061     // have to rely on the ReplicatedLocationServiceCache for implicit
00062     // settings since it's possible the ReplicatedLocationServiceCache hasn't
00063     // received (or might not receive!) information about the object
00064     // yet. Note that we use SpaceObjectReference here so we can
00065     // perform lookups in HostedObject, even though everything in this
00066     // space is specific to a single SpaceID.
00067     //
00068     // There is no addQuery, you always use updateQuery and it registers the
00069     // query if necessary
00070     void updateQuery(HostedObjectPtr ho, const SpaceObjectReference& sporef, const String& params);
00071     void removeQuery(HostedObjectPtr ho, const SpaceObjectReference& sporef);
00072     // (Potential) Querier Lifetimes
00073     void presenceConnected(const ObjectReference& sporef);
00074     void presenceDisconnected(const ObjectReference& sporef);
00075 
00076     // ReplicatedLocationUpdateListener Interface
00077     // We override this for two reasons. First, a few callbacks are used to
00078     // update query parameters (location & bounds). Second, we use these updates
00079     // to generate LocUpdates for queriers that are subscribed to the object.
00080     virtual void onObjectAdded(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj);
00081     virtual void onObjectRemoved(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj);
00082     virtual void onParentUpdated(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj);
00083     virtual void onEpochUpdated(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj);
00084     virtual void onLocationUpdated(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj);
00085     virtual void onOrientationUpdated(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj);
00086     virtual void onBoundsUpdated(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj);
00087     virtual void onMeshUpdated(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj);
00088     virtual void onPhysicsUpdated(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj);
00089 
00090     // PROX Thread:
00091 
00092     void handleCreatedReplicatedIndex(Liveness::Token alive, ProxIndexID iid, ReplicatedLocationServiceCachePtr loc_cache, ServerID objects_from_server, bool dynamic_objects);
00093     void handleRemovedReplicatedIndex(Liveness::Token alive, ProxIndexID iid);
00094 
00095     // QueryEventListener Interface
00096     void queryHasEvents(Query* query);
00097 
00098     // AggregateListener Interface
00099     virtual void aggregateCreated(ProxAggregator* handler, const ObjectReference& objid);
00100     virtual void aggregateChildAdded(ProxAggregator* handler, const ObjectReference& objid, const ObjectReference& child, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size);
00101     virtual void aggregateChildRemoved(ProxAggregator* handler, const ObjectReference& objid, const ObjectReference& child, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size);
00102     virtual void aggregateBoundsUpdated(ProxAggregator* handler, const ObjectReference& objid, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size);
00103     virtual void aggregateDestroyed(ProxAggregator* handler, const ObjectReference& objid);
00104     virtual void aggregateObserved(ProxAggregator* handler, const ObjectReference& objid, uint32 nobservers, uint32 nchildren);
00105 
00106     ProxQueryHandler* getQueryHandler(const String& handler_name);
00107 
00108     void commandListInfo(const OHDP::SpaceNodeID& snid, Command::Result& result);
00109     void commandListNodes(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00110     void commandListQueriers(const OHDP::SpaceNodeID& snid, Command::Result& result);
00111     void commandForceRebuild(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00112 
00113 private:
00114 
00115     // MAIN Thread: These are utility methods which should only be called from the main thread.
00116     int32 objectQueries() const;
00117 
00118     // Update queries based on current state.
00119     void handleDeliverEvents();
00120 
00121     // Update subscribed queriers that the given object was updated. This
00122     // currently takes the brute force approach of updating all properties and
00123     // uses the most up-to-date info, even if that's actually newer than the
00124     // update that triggered this.
00125     void handleNotifySubscribersLocUpdate(Liveness::Token alive, ReplicatedLocationServiceCache* loccache, const ObjectReference& oref);
00126 
00127     // Object queries
00128     void updateQuery(HostedObjectPtr ho, const SpaceObjectReference& sporef, SolidAngle sa, uint32 max_results);
00129     void updateQuery(const ObjectReference& obj, const TimedMotionVector3f& loc, const BoundingSphere3f& bounds, SolidAngle sa, uint32 max_results);
00130 
00131 
00132     // PROX Thread: These are utility methods which should only be called from the prox thread.
00133 
00134     // Reusable utility for registering the query with a particular
00135     // index. Useful since we may need to register in different conditions --
00136     // new query, new index, index becomes relevant to query, etc.
00137     void registerObjectQueryWithIndex(const ObjectReference& object, ProxIndexID index_id, ProxQueryHandler* handler, const TimedMotionVector3f& loc, const BoundingSphere3f& bounds, const SolidAngle& angle, uint32 max_results);
00138     // Utility for registering query with all indices for a ServerID. This just
00139     // calls registerObjectQueryWithIndex for each index on the server.
00140     void registerObjectQueryWithServer(const ObjectReference& object, ServerID sid, const TimedMotionVector3f& loc, const BoundingSphere3f& bounds, const SolidAngle& angle, uint32 max_results);
00141     void unregisterObjectQueryWithIndex(const ObjectReference& object, ProxIndexID index_id);
00142     void unregisterObjectQueryWithServer(const ObjectReference& object, ServerID sid);
00143 
00144     // Events on queries/objects
00145     void handleUpdateObjectQuery(Liveness::Token alive, const ObjectReference& object, const TimedMotionVector3f& loc, const BoundingSphere3f& bounds, const SolidAngle& angle, uint32 max_results);
00146     void handleRemoveObjectQuery(Liveness::Token alive, const ObjectReference& object, bool notify_main_thread);
00147     void handleDisconnectedObject(Liveness::Token alive, const ObjectReference& object);
00148 
00149     // Generate query events based on results collected from query handlers
00150     void generateObjectQueryEvents(Query* query);
00151 
00152     typedef std::set<ObjectReference> ObjectSet;
00153     typedef std::tr1::unordered_map<ProxIndexID, Query*> IndexQueryMap;
00154     typedef std::tr1::unordered_set<ServerID> ServerIDSet;
00155     // A single object query may have queries registered against many query
00156     // handlers for different replicated trees.
00157     struct ObjectQueryData {
00158         // We need to store the query parameters because a query may get
00159         // registered when no trees have been replicated yet.
00160         TimedMotionVector3f loc;
00161         BoundingSphere3f bounds;
00162         SolidAngle angle;
00163         uint32 max_results;
00164         // And we also keep track of all the individual queries against
00165         // replicated indices.
00166         IndexQueryMap queries;
00167         // And which entire Servers they are subscribed to.
00168         ServerIDSet servers;
00169     };
00170     typedef std::tr1::shared_ptr<ObjectQueryData> ObjectQueryDataPtr;
00171     typedef std::tr1::unordered_map<ObjectReference, ObjectQueryDataPtr, ObjectReference::Hasher> ObjectQueryMap;
00172     // Individual libprox queries are keyed by both the object who registered a
00173     // query and the replicated index the individual query operates on
00174     typedef std::pair<ObjectReference, ProxIndexID> ObjectIndexQueryKey;
00175     typedef std::tr1::unordered_map<Query*, ObjectIndexQueryKey> InvertedObjectQueryMap;
00176 
00177     // We replicate many object indices, so we need to generate many query
00178     // handlers, one for each replicated index. We'll track them by their unique
00179     // index IDs. Since the query handler can't track all the info we might care
00180     // about, we put it in a struct with a bit more metadata
00181     struct ReplicatedIndexQueryHandler {
00182         ReplicatedIndexQueryHandler(ProxQueryHandler* handler_, ReplicatedLocationServiceCachePtr loccache_, ServerID from_, bool dynamic_)
00183          : handler(handler_), loccache(loccache_), from(from_), dynamic(dynamic_) {}
00184         ReplicatedIndexQueryHandler()
00185          : handler(NULL), loccache(), from(NullServerID), dynamic(true) {}
00186 
00187         ProxQueryHandler* handler;
00188         ReplicatedLocationServiceCachePtr loccache;
00189         // The Server this tree was replicated from. This may not be unique.
00190         ServerID from;
00191         // Whether this tree includes dynamic objects
00192         bool dynamic;
00193     };
00194     typedef std::tr1::unordered_map<ProxIndexID, ReplicatedIndexQueryHandler> ReplicatedIndexQueryHandlerMap;
00195     // Sort of the inverse of the above: aggregator (libprox handler) ->
00196     // ProxIndexID lets us get back to the data
00197     typedef std::tr1::unordered_map<ProxAggregator*, ProxIndexID> InverseReplicatedIndexQueryHandlerMap;
00198 
00199     typedef std::tr1::shared_ptr<ObjectSet> ObjectSetPtr;
00200 
00201 
00202     // MAIN Thread - Should only be accessed in methods used by the main thread
00203 
00204     // We track subscriptions in the main thread since loc events need to be
00205     // executed in the main thread. This let's us just create one update and
00206     // reuse it across all subscribers.
00207     // Set of subscribers
00208     typedef std::tr1::unordered_set<ObjectReference, ObjectReference::Hasher> SubscriberSet;
00209     typedef std::tr1::shared_ptr<SubscriberSet> SubscriberSetPtr;
00210     // Map of object -> subscribers to that object
00211     typedef std::tr1::unordered_map<ObjectReference, SubscriberSetPtr, ObjectReference::Hasher> SubscribersMap;
00212     SubscribersMap mSubscribers;
00213 
00214 
00215     // PROX Thread - Should only be accessed in methods used by the prox thread
00216 
00217     void tickQueryHandler();
00218 
00219     // All queryHasEvents calls are going to not be reentrant unless you're very
00220     // careful, so the base class provides this so it's easy to verify it.
00221     InstanceMethodNotReentrant mQueryHasEventsNotRentrant;
00222 
00223     // These track all objects being reported to this server and
00224     // answer queries for objects connected to this server.
00225     ObjectQueryMap mObjectQueries;
00226     InvertedObjectQueryMap mInvertedObjectQueries;
00227     ReplicatedIndexQueryHandlerMap mObjectQueryHandlers;
00228     InverseReplicatedIndexQueryHandlerMap mInverseObjectQueryHandlers;
00229     bool mObjectDistance; // Using distance queries
00230     PollerService mObjectHandlerPoller;
00231 
00232     // Threads: Thread-safe data used for exchange between threads
00233     struct ProximityResultInfo {
00234         ProximityResultInfo(const ObjectReference& q, Sirikata::Protocol::Prox::ProximityUpdate* res)
00235          : querier(q), results(res)
00236         {}
00237 
00238         ObjectReference querier;
00239         Sirikata::Protocol::Prox::ProximityUpdate* results;
00240     };
00241     Sirikata::ThreadSafeQueueWithNotification<ProximityResultInfo> mObjectResults; // object query results that need to be sent
00242 
00243 }; //class ObjectQueryHandler
00244 
00245 } // namespace Manual
00246 } // namespace OH
00247 } // namespace Sirikata
00248 
00249 #endif //_SIRIKATA_OH_MQ_OBJECT_QUERY_HANDLER_HPP_