Sirikata
|
00001 // Copyright (c) 2012 Sirikata Authors. All rights reserved. 00002 // Use of this source code is governed by a BSD-style license that can 00003 // be found in the LICENSE file. 00004 00005 #ifndef _SIRIKATA_OH_MQ_OBJECT_QUERY_HANDLER_HPP_ 00006 #define _SIRIKATA_OH_MQ_OBJECT_QUERY_HANDLER_HPP_ 00007 00008 #include "ObjectQueryHandlerBase.hpp" 00009 #include <sirikata/pintoloc/ProxSimulationTraits.hpp> 00010 #include <sirikata/pintoloc/ReplicatedLocationUpdateListener.hpp> 00011 #include <sirikata/pintoloc/ReplicatedLocationServiceCache.hpp> 00012 #include <prox/geom/QueryHandler.hpp> 00013 #include <prox/base/AggregateListener.hpp> 00014 #include <sirikata/core/queue/ThreadSafeQueueWithNotification.hpp> 00015 #include <sirikata/core/service/PollerService.hpp> 00016 #include <sirikata/oh/HostedObject.hpp> 00017 #include <sirikata/core/prox/Defs.hpp> 00018 #include <sirikata/core/util/InstanceMethodNotReentrant.hpp> 00019 00020 namespace Sirikata { 00021 00022 00023 namespace Protocol { 00024 namespace Prox { 00025 class ProximityUpdate; 00026 } 00027 } 00028 00029 namespace OH { 00030 namespace Manual { 00031 00032 class ObjectQueryHandler : 00033 public ObjectQueryHandlerBase, 00034 Prox::QueryEventListener<ObjectProxSimulationTraits, Prox::Query<ObjectProxSimulationTraits> >, 00035 Prox::AggregateListener<ObjectProxSimulationTraits>, 00036 public ReplicatedLocationUpdateListener 00037 { 00038 private: 00039 typedef Prox::QueryHandler<ObjectProxSimulationTraits> ProxQueryHandler; 00040 typedef Prox::Aggregator<ObjectProxSimulationTraits> ProxAggregator; 00041 public: 00042 // MAIN Thread: All public interface is expected to be called only from the main thread. 00043 typedef Prox::Query<ObjectProxSimulationTraits> Query; 00044 typedef Prox::QueryEvent<ObjectProxSimulationTraits> QueryEvent; 00045 00046 ObjectQueryHandler(ObjectHostContext* ctx, ManualObjectQueryProcessor* parent, const OHDP::SpaceNodeID& space, Network::IOStrandPtr prox_strand); 00047 ~ObjectQueryHandler(); 00048 00049 // MAIN Thread: 00050 00051 // Service Interface overrides 00052 virtual void start(); 00053 virtual void stop(); 00054 00055 00056 // Index/Tree replication events from server queries 00057 void createdReplicatedIndex(ProxIndexID iid, ReplicatedLocationServiceCachePtr loc_cache, ServerID objects_from_server, bool dynamic_objects); 00058 void removedReplicatedIndex(ProxIndexID iid); 00059 00060 // Query settings. These pass in the HostedObject so they don't 00061 // have to rely on the ReplicatedLocationServiceCache for implicit 00062 // settings since it's possible the ReplicatedLocationServiceCache hasn't 00063 // received (or might not receive!) information about the object 00064 // yet. Note that we use SpaceObjectReference here so we can 00065 // perform lookups in HostedObject, even though everything in this 00066 // space is specific to a single SpaceID. 00067 // 00068 // There is no addQuery, you always use updateQuery and it registers the 00069 // query if necessary 00070 void updateQuery(HostedObjectPtr ho, const SpaceObjectReference& sporef, const String& params); 00071 void removeQuery(HostedObjectPtr ho, const SpaceObjectReference& sporef); 00072 // (Potential) Querier Lifetimes 00073 void presenceConnected(const ObjectReference& sporef); 00074 void presenceDisconnected(const ObjectReference& sporef); 00075 00076 // ReplicatedLocationUpdateListener Interface 00077 // We override this for two reasons. First, a few callbacks are used to 00078 // update query parameters (location & bounds). Second, we use these updates 00079 // to generate LocUpdates for queriers that are subscribed to the object. 00080 virtual void onObjectAdded(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj); 00081 virtual void onObjectRemoved(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj); 00082 virtual void onParentUpdated(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj); 00083 virtual void onEpochUpdated(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj); 00084 virtual void onLocationUpdated(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj); 00085 virtual void onOrientationUpdated(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj); 00086 virtual void onBoundsUpdated(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj); 00087 virtual void onMeshUpdated(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj); 00088 virtual void onPhysicsUpdated(ReplicatedLocationServiceCache* loccache, const ObjectReference& obj); 00089 00090 // PROX Thread: 00091 00092 void handleCreatedReplicatedIndex(Liveness::Token alive, ProxIndexID iid, ReplicatedLocationServiceCachePtr loc_cache, ServerID objects_from_server, bool dynamic_objects); 00093 void handleRemovedReplicatedIndex(Liveness::Token alive, ProxIndexID iid); 00094 00095 // QueryEventListener Interface 00096 void queryHasEvents(Query* query); 00097 00098 // AggregateListener Interface 00099 virtual void aggregateCreated(ProxAggregator* handler, const ObjectReference& objid); 00100 virtual void aggregateChildAdded(ProxAggregator* handler, const ObjectReference& objid, const ObjectReference& child, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size); 00101 virtual void aggregateChildRemoved(ProxAggregator* handler, const ObjectReference& objid, const ObjectReference& child, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size); 00102 virtual void aggregateBoundsUpdated(ProxAggregator* handler, const ObjectReference& objid, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size); 00103 virtual void aggregateDestroyed(ProxAggregator* handler, const ObjectReference& objid); 00104 virtual void aggregateObserved(ProxAggregator* handler, const ObjectReference& objid, uint32 nobservers, uint32 nchildren); 00105 00106 ProxQueryHandler* getQueryHandler(const String& handler_name); 00107 00108 void commandListInfo(const OHDP::SpaceNodeID& snid, Command::Result& result); 00109 void commandListNodes(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00110 void commandListQueriers(const OHDP::SpaceNodeID& snid, Command::Result& result); 00111 void commandForceRebuild(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00112 00113 private: 00114 00115 // MAIN Thread: These are utility methods which should only be called from the main thread. 00116 int32 objectQueries() const; 00117 00118 // Update queries based on current state. 00119 void handleDeliverEvents(); 00120 00121 // Update subscribed queriers that the given object was updated. This 00122 // currently takes the brute force approach of updating all properties and 00123 // uses the most up-to-date info, even if that's actually newer than the 00124 // update that triggered this. 00125 void handleNotifySubscribersLocUpdate(Liveness::Token alive, ReplicatedLocationServiceCache* loccache, const ObjectReference& oref); 00126 00127 // Object queries 00128 void updateQuery(HostedObjectPtr ho, const SpaceObjectReference& sporef, SolidAngle sa, uint32 max_results); 00129 void updateQuery(const ObjectReference& obj, const TimedMotionVector3f& loc, const BoundingSphere3f& bounds, SolidAngle sa, uint32 max_results); 00130 00131 00132 // PROX Thread: These are utility methods which should only be called from the prox thread. 00133 00134 // Reusable utility for registering the query with a particular 00135 // index. Useful since we may need to register in different conditions -- 00136 // new query, new index, index becomes relevant to query, etc. 00137 void registerObjectQueryWithIndex(const ObjectReference& object, ProxIndexID index_id, ProxQueryHandler* handler, const TimedMotionVector3f& loc, const BoundingSphere3f& bounds, const SolidAngle& angle, uint32 max_results); 00138 // Utility for registering query with all indices for a ServerID. This just 00139 // calls registerObjectQueryWithIndex for each index on the server. 00140 void registerObjectQueryWithServer(const ObjectReference& object, ServerID sid, const TimedMotionVector3f& loc, const BoundingSphere3f& bounds, const SolidAngle& angle, uint32 max_results); 00141 void unregisterObjectQueryWithIndex(const ObjectReference& object, ProxIndexID index_id); 00142 void unregisterObjectQueryWithServer(const ObjectReference& object, ServerID sid); 00143 00144 // Events on queries/objects 00145 void handleUpdateObjectQuery(Liveness::Token alive, const ObjectReference& object, const TimedMotionVector3f& loc, const BoundingSphere3f& bounds, const SolidAngle& angle, uint32 max_results); 00146 void handleRemoveObjectQuery(Liveness::Token alive, const ObjectReference& object, bool notify_main_thread); 00147 void handleDisconnectedObject(Liveness::Token alive, const ObjectReference& object); 00148 00149 // Generate query events based on results collected from query handlers 00150 void generateObjectQueryEvents(Query* query); 00151 00152 typedef std::set<ObjectReference> ObjectSet; 00153 typedef std::tr1::unordered_map<ProxIndexID, Query*> IndexQueryMap; 00154 typedef std::tr1::unordered_set<ServerID> ServerIDSet; 00155 // A single object query may have queries registered against many query 00156 // handlers for different replicated trees. 00157 struct ObjectQueryData { 00158 // We need to store the query parameters because a query may get 00159 // registered when no trees have been replicated yet. 00160 TimedMotionVector3f loc; 00161 BoundingSphere3f bounds; 00162 SolidAngle angle; 00163 uint32 max_results; 00164 // And we also keep track of all the individual queries against 00165 // replicated indices. 00166 IndexQueryMap queries; 00167 // And which entire Servers they are subscribed to. 00168 ServerIDSet servers; 00169 }; 00170 typedef std::tr1::shared_ptr<ObjectQueryData> ObjectQueryDataPtr; 00171 typedef std::tr1::unordered_map<ObjectReference, ObjectQueryDataPtr, ObjectReference::Hasher> ObjectQueryMap; 00172 // Individual libprox queries are keyed by both the object who registered a 00173 // query and the replicated index the individual query operates on 00174 typedef std::pair<ObjectReference, ProxIndexID> ObjectIndexQueryKey; 00175 typedef std::tr1::unordered_map<Query*, ObjectIndexQueryKey> InvertedObjectQueryMap; 00176 00177 // We replicate many object indices, so we need to generate many query 00178 // handlers, one for each replicated index. We'll track them by their unique 00179 // index IDs. Since the query handler can't track all the info we might care 00180 // about, we put it in a struct with a bit more metadata 00181 struct ReplicatedIndexQueryHandler { 00182 ReplicatedIndexQueryHandler(ProxQueryHandler* handler_, ReplicatedLocationServiceCachePtr loccache_, ServerID from_, bool dynamic_) 00183 : handler(handler_), loccache(loccache_), from(from_), dynamic(dynamic_) {} 00184 ReplicatedIndexQueryHandler() 00185 : handler(NULL), loccache(), from(NullServerID), dynamic(true) {} 00186 00187 ProxQueryHandler* handler; 00188 ReplicatedLocationServiceCachePtr loccache; 00189 // The Server this tree was replicated from. This may not be unique. 00190 ServerID from; 00191 // Whether this tree includes dynamic objects 00192 bool dynamic; 00193 }; 00194 typedef std::tr1::unordered_map<ProxIndexID, ReplicatedIndexQueryHandler> ReplicatedIndexQueryHandlerMap; 00195 // Sort of the inverse of the above: aggregator (libprox handler) -> 00196 // ProxIndexID lets us get back to the data 00197 typedef std::tr1::unordered_map<ProxAggregator*, ProxIndexID> InverseReplicatedIndexQueryHandlerMap; 00198 00199 typedef std::tr1::shared_ptr<ObjectSet> ObjectSetPtr; 00200 00201 00202 // MAIN Thread - Should only be accessed in methods used by the main thread 00203 00204 // We track subscriptions in the main thread since loc events need to be 00205 // executed in the main thread. This let's us just create one update and 00206 // reuse it across all subscribers. 00207 // Set of subscribers 00208 typedef std::tr1::unordered_set<ObjectReference, ObjectReference::Hasher> SubscriberSet; 00209 typedef std::tr1::shared_ptr<SubscriberSet> SubscriberSetPtr; 00210 // Map of object -> subscribers to that object 00211 typedef std::tr1::unordered_map<ObjectReference, SubscriberSetPtr, ObjectReference::Hasher> SubscribersMap; 00212 SubscribersMap mSubscribers; 00213 00214 00215 // PROX Thread - Should only be accessed in methods used by the prox thread 00216 00217 void tickQueryHandler(); 00218 00219 // All queryHasEvents calls are going to not be reentrant unless you're very 00220 // careful, so the base class provides this so it's easy to verify it. 00221 InstanceMethodNotReentrant mQueryHasEventsNotRentrant; 00222 00223 // These track all objects being reported to this server and 00224 // answer queries for objects connected to this server. 00225 ObjectQueryMap mObjectQueries; 00226 InvertedObjectQueryMap mInvertedObjectQueries; 00227 ReplicatedIndexQueryHandlerMap mObjectQueryHandlers; 00228 InverseReplicatedIndexQueryHandlerMap mInverseObjectQueryHandlers; 00229 bool mObjectDistance; // Using distance queries 00230 PollerService mObjectHandlerPoller; 00231 00232 // Threads: Thread-safe data used for exchange between threads 00233 struct ProximityResultInfo { 00234 ProximityResultInfo(const ObjectReference& q, Sirikata::Protocol::Prox::ProximityUpdate* res) 00235 : querier(q), results(res) 00236 {} 00237 00238 ObjectReference querier; 00239 Sirikata::Protocol::Prox::ProximityUpdate* results; 00240 }; 00241 Sirikata::ThreadSafeQueueWithNotification<ProximityResultInfo> mObjectResults; // object query results that need to be sent 00242 00243 }; //class ObjectQueryHandler 00244 00245 } // namespace Manual 00246 } // namespace OH 00247 } // namespace Sirikata 00248 00249 #endif //_SIRIKATA_OH_MQ_OBJECT_QUERY_HANDLER_HPP_