Sirikata
|
00001 /* Sirikata 00002 * LibproxProximity.hpp 00003 * 00004 * Copyright (c) 2009, Ewen Cheslack-Postava 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef _SIRIKATA_LIBPROX_PROXIMITY_HPP_ 00034 #define _SIRIKATA_LIBPROX_PROXIMITY_HPP_ 00035 00036 #include "LibproxProximityBase.hpp" 00037 #include <prox/geom/QueryHandler.hpp> 00038 #include <prox/base/LocationUpdateListener.hpp> 00039 #include <prox/base/AggregateListener.hpp> 00040 00041 #include <sirikata/core/network/SSTImpl.hpp> 00042 #include <sirikata/core/queue/ThreadSafeQueue.hpp> 00043 00044 namespace Sirikata { 00045 00046 class ProximityInputEvent; 00047 class ProximityOutputEvent; 00048 00049 class LibproxProximity : 00050 public LibproxProximityBase, 00051 Prox::QueryEventListener<ObjectProxSimulationTraits, Prox::Query<ObjectProxSimulationTraits> >, 00052 Prox::AggregateListener<ObjectProxSimulationTraits> 00053 { 00054 private: 00055 typedef Prox::QueryHandler<ObjectProxSimulationTraits> ProxQueryHandler; 00056 typedef Prox::Aggregator<ObjectProxSimulationTraits> ProxAggregator; 00057 public: 00058 // MAIN Thread: All public interface is expected to be called only from the main thread. 00059 typedef Prox::Query<ObjectProxSimulationTraits> Query; 00060 typedef Prox::QueryEvent<ObjectProxSimulationTraits> QueryEvent; 00061 00062 LibproxProximity(SpaceContext* ctx, LocationService* locservice, CoordinateSegmentation* cseg, SpaceNetwork* net, AggregateManager* aggmgr); 00063 ~LibproxProximity(); 00064 00065 // MAIN Thread: 00066 00067 // Service Interface overrides 00068 virtual void start(); 00069 00070 // ObjectSessionListener Interface 00071 virtual void newSession(ObjectSession* session); 00072 virtual void sessionClosed(ObjectSession* session); 00073 00074 // Objects 00075 virtual void addQuery(UUID obj, SolidAngle sa, uint32 max_results); 00076 virtual void addQuery(UUID obj, const String& params); 00077 virtual void removeQuery(UUID obj); 00078 00079 // PintoServerQuerierListener Interface 00080 virtual void onPintoServerResult(const Sirikata::Protocol::Prox::ProximityUpdate& update); 00081 virtual void onPintoServerLocUpdate(const LocUpdate& update); 00082 00083 // LocationServiceListener Interface 00084 virtual void localObjectRemoved(const UUID& uuid, bool agg); 00085 virtual void localLocationUpdated(const UUID& uuid, bool agg, const TimedMotionVector3f& newval); 00086 virtual void localBoundsUpdated(const UUID& uuid, bool agg, const AggregateBoundingInfo& newval); 00087 virtual void replicaObjectRemoved(const UUID& uuid); 00088 virtual void replicaLocationUpdated(const UUID& uuid, const TimedMotionVector3f& newval); 00089 00090 // MessageRecipient Interface 00091 virtual void receiveMessage(Message* msg); 00092 00093 // MigrationDataClient Interface 00094 virtual std::string migrationClientTag(); 00095 virtual std::string generateMigrationData(const UUID& obj, ServerID source_server, ServerID dest_server); 00096 virtual void receiveMigrationData(const UUID& obj, ServerID source_server, ServerID dest_server, const std::string& data); 00097 00098 00099 // PROX Thread: 00100 00101 // AggregateListener Interface 00102 virtual void aggregateCreated(ProxAggregator* handler, const ObjectReference& objid); 00103 virtual void aggregateChildAdded(ProxAggregator* handler, const ObjectReference& objid, const ObjectReference& child, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size); 00104 virtual void aggregateChildRemoved(ProxAggregator* handler, const ObjectReference& objid, const ObjectReference& child, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size); 00105 virtual void aggregateBoundsUpdated(ProxAggregator* handler, const ObjectReference& objid, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size); 00106 virtual void aggregateDestroyed(ProxAggregator* handler, const ObjectReference& objid); 00107 virtual void aggregateObserved(ProxAggregator* handler, const ObjectReference& objid, uint32 nobservers, uint32 nchildren); 00108 00109 // QueryEventListener Interface 00110 void queryHasEvents(Query* query); 00111 00112 00113 private: 00114 struct ProxQueryHandlerData; 00115 typedef std::tr1::unordered_set<ServerID> ServerSet; 00116 00117 void handleObjectProximityMessage(const UUID& objid, void* buffer, uint32 length); 00118 00119 // BOTH Threads - uses thread safe data 00120 00121 // PintoServerQuerier management 00122 // Utility -- setup all known servers for a server query update 00123 void addAllServersForUpdate(); 00124 // Get/add servers for sending and update of our aggregate query to 00125 void getServersForAggregateQueryUpdate(ServerSet* servers_out); 00126 void addServerForAggregateQueryUpdate(ServerID sid); 00127 // Initiate updates to aggregate queries and stats over all objects, used to 00128 // trigger updated requests to top-level pinto and other servers 00129 void updateAggregateQuery(); 00130 // Number of servers we have active queries to 00131 uint32 numServersQueried(); 00132 00133 00134 // MAIN Thread: These are utility methods which should only be called from the main thread. 00135 virtual int32 objectQueries() const; 00136 virtual int32 serverQueries() const; 00137 00138 // Update queries based on current state. 00139 void poll(); 00140 00141 // Server queries requests, generated by receiving messages 00142 void updateQuery(ServerID sid, const TimedMotionVector3f& loc, const BoundingSphere3f& bounds, const SolidAngle& sa, uint32 max_results); 00143 void removeQuery(ServerID sid); 00144 00145 // Object queries 00146 void updateQuery(UUID obj, const TimedMotionVector3f& loc, const BoundingSphere3f& bounds, SolidAngle sa, uint32 max_results); 00147 00148 // Send a query add/update request to any servers we've marked as needing an 00149 // update 00150 void sendQueryRequests(); 00151 00152 00153 // PROX Thread: These are utility methods which should only be called from the prox thread. 00154 00155 // Handle various query events from the main thread 00156 void handleUpdateServerQuery(const ServerID& server, const TimedMotionVector3f& loc, const BoundingSphere3f& bounds, const SolidAngle& angle, uint32 max_results); 00157 void handleRemoveServerQuery(const ServerID& server); 00158 00159 // Override for forced disconnections 00160 virtual void handleConnectedServer(ServerID server); 00161 virtual void handleDisconnectedServer(ServerID server); 00162 00163 void handleUpdateObjectQuery(const UUID& object, const TimedMotionVector3f& loc, const BoundingSphere3f& bounds, const SolidAngle& angle, uint32 max_results, SeqNoPtr seqno); 00164 void handleRemoveObjectQuery(const UUID& object, bool notify_main_thread); 00165 void handleDisconnectedObject(const UUID& object); 00166 00167 // Generate query events based on results collected from query handlers 00168 void generateServerQueryEvents(Query* query); 00169 void generateObjectQueryEvents(Query* query, bool do_first=false); 00170 00171 // Decides whether a query handler should handle a particular object. 00172 bool handlerShouldHandleObject(bool is_static_handler, bool is_global_handler, const ObjectReference& obj_id, bool local, bool aggregate, const TimedMotionVector3f& pos, const BoundingSphere3f& region, float maxSize); 00173 // The real handler for moving objects between static/dynamic 00174 void handleCheckObjectClassForHandlers(const ObjectReference& objid, bool is_static, ProxQueryHandlerData handlers[NUM_OBJECT_CLASSES]); 00175 virtual void trySwapHandlers(bool is_local, const ObjectReference& objid, bool is_static); 00176 00183 SeqNoPtr getOrCreateSeqNoInfo(const ServerID server_id); 00184 void eraseSeqNoInfo(const ServerID server_id); 00185 SeqNoPtr getSeqNoInfo(const UUID& obj_id); 00186 void eraseSeqNoInfo(const UUID& obj_id); 00187 00188 typedef std::set<UUID> ObjectSet; 00189 typedef std::tr1::unordered_map<ServerID, Query*> ServerQueryMap; 00190 typedef std::tr1::unordered_map<Query*, ServerID> InvertedServerQueryMap; 00191 typedef std::tr1::unordered_map<UUID, Query*, UUID::Hasher> ObjectQueryMap; 00192 typedef std::tr1::unordered_set<Query*> FirstIterationObjectSet; 00193 typedef std::tr1::unordered_map<Query*, UUID> InvertedObjectQueryMap; 00194 00195 typedef std::tr1::shared_ptr<ObjectSet> ObjectSetPtr; 00196 typedef std::tr1::unordered_map<ServerID, ObjectSetPtr> ServerQueryResultSet; 00197 00198 00199 // BOTH Threads - thread-safe data 00200 00201 boost::mutex mServerSetMutex; 00202 // This tracks the servers we currently have subscriptions with 00203 ServerSet mServersQueried; 00204 // And this indicates whether we need to send new requests 00205 // out to other servers 00206 ServerSet mNeedServerQueryUpdate; 00207 00208 00209 00210 // MAIN Thread - Should only be accessed in methods used by the main thread 00211 00212 // The distance to use when doing range queries instead of solid angle queries. 00213 // FIXME we should have this configurable somehow 00214 float32 mDistanceQueryDistance; 00215 00216 // Tracks object query angles for quick access in the main thread 00217 // NOTE: It really sucks that we're duplicating this information 00218 // but we'd have to provide a safe query map and query angle accessor 00219 // to avoid this. 00220 typedef std::map<UUID, SolidAngle> ObjectQueryAngleMap; 00221 ObjectQueryAngleMap mObjectQueryAngles; 00222 typedef std::map<UUID, uint32> ObjectQueryMaxCountMap; 00223 ObjectQueryMaxCountMap mObjectQueryMaxCounts; 00224 00225 00226 // Aggregate query info. Aggregate object stats are managed by 00227 // LibproxProximityBase. 00228 // Since objects can be moving, we just recompute periodically and update 00229 // other servers. We only track the value so we can avoid sending updates 00230 // when it doesn't change 00231 AggregateBoundingInfo mLastAggregateQuerierBounds; 00232 // This tracks the minimum object query size, which is used 00233 // as the angle for queries to other servers. 00234 SolidAngle mMinObjectQueryAngle; 00235 // And similarly, this tracks the maximum max_count query parameters as 00236 // conservative estimate of number of results needed from other servers. 00237 uint32 mMaxMaxCount; 00238 00239 00240 std::deque<Message*> mServerResultsToSend; // server query results waiting to be sent 00241 std::deque<Sirikata::Protocol::Object::ObjectMessage*> mObjectResultsToSend; // object query results waiting to be sent 00242 00243 00244 00245 // PROX Thread - Should only be accessed in methods used by the prox thread 00246 00247 void tickQueryHandler(ProxQueryHandlerData qh[NUM_OBJECT_CLASSES]); 00248 void rebuildHandlerType(ProxQueryHandlerData* handler, ObjectClass objtype); 00249 void rebuildHandler(ObjectClass objtype); 00250 00251 void recomputeAggregateQueryBounds(); 00252 00253 // Command handlers 00254 virtual void commandProperties(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00255 virtual void commandListHandlers(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00256 bool parseHandlerName(const String& name, ProxQueryHandlerData** handlers_out, ObjectClass* class_out); 00257 virtual void commandForceRebuild(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00258 virtual void commandListNodes(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00259 virtual void commandListQueriers(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00260 00261 typedef std::tr1::unordered_set<ObjectReference, ObjectReference::Hasher> ObjectIDSet; 00262 struct ProxQueryHandlerData { 00263 ProxQueryHandler* handler; 00264 // Additions and removals that need to be processed on the 00265 // next tick. These need to be handled carefully since they 00266 // can be due to swapping between handlers. If they are 00267 // processed in the wrong order we could end up generating 00268 // [addition, removal] instead of [removal, addition] for 00269 // queriers. 00270 ObjectIDSet additions; 00271 ObjectIDSet removals; 00272 }; 00273 // These track local objects and answer queries from other 00274 // servers. 00275 ServerQueryMap mServerQueries[NUM_OBJECT_CLASSES]; 00276 InvertedServerQueryMap mInvertedServerQueries; 00277 ProxQueryHandlerData mServerQueryHandler[NUM_OBJECT_CLASSES]; 00278 bool mServerDistance; // Using distance queries 00279 // Results from queries to other servers, so we know what we need to remove 00280 // on forceful disconnection 00281 ServerQueryResultSet mServerQueryResults; 00282 PollerService mServerHandlerPoller; 00283 PollerService mServerQueryBoundsPoller; 00284 00285 // These track all objects being reported to this server and 00286 // answer queries for objects connected to this server. 00287 ObjectQueryMap mObjectQueries[NUM_OBJECT_CLASSES]; 00288 InvertedObjectQueryMap mInvertedObjectQueries; 00289 FirstIterationObjectSet mObjectQueriesFirstIteration; 00290 ProxQueryHandlerData mObjectQueryHandler[NUM_OBJECT_CLASSES]; 00291 bool mObjectDistance; // Using distance queries 00292 PollerService mObjectHandlerPoller; 00293 00294 // Pollers that trigger rebuilding of query data structures 00295 PollerService mStaticRebuilderPoller; 00296 PollerService mDynamicRebuilderPoller; 00297 00298 // Track SeqNo info for each querier 00299 typedef std::tr1::unordered_map<ServerID, SeqNoPtr> ServerSeqNoInfoMap; 00300 ServerSeqNoInfoMap mServerSeqNos; 00301 typedef std::tr1::unordered_map<UUID, SeqNoPtr, UUID::Hasher> ObjectSeqNoInfoMap; 00302 ObjectSeqNoInfoMap mObjectSeqNos; 00303 00304 00305 // Threads: Thread-safe data used for exchange between threads 00306 Sirikata::ThreadSafeQueue<Message*> mServerResults; // server query results that need to be sent 00307 Sirikata::ThreadSafeQueue<Sirikata::Protocol::Object::ObjectMessage*> mObjectResults; // object query results that need to be sent 00308 00309 }; //class LibproxProximity 00310 00311 } // namespace Sirikata 00312 00313 #endif //_SIRIKATA_LIBPROX_PROXIMITY_HPP_