Sirikata
libspace/plugins/prox/LibproxProximity.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  LibproxProximity.hpp
00003  *
00004  *  Copyright (c) 2009, Ewen Cheslack-Postava
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef _SIRIKATA_LIBPROX_PROXIMITY_HPP_
00034 #define _SIRIKATA_LIBPROX_PROXIMITY_HPP_
00035 
00036 #include "LibproxProximityBase.hpp"
00037 #include <prox/geom/QueryHandler.hpp>
00038 #include <prox/base/LocationUpdateListener.hpp>
00039 #include <prox/base/AggregateListener.hpp>
00040 
00041 #include <sirikata/core/network/SSTImpl.hpp>
00042 #include <sirikata/core/queue/ThreadSafeQueue.hpp>
00043 
00044 namespace Sirikata {
00045 
00046 class ProximityInputEvent;
00047 class ProximityOutputEvent;
00048 
00049 class LibproxProximity :
00050         public LibproxProximityBase,
00051         Prox::QueryEventListener<ObjectProxSimulationTraits, Prox::Query<ObjectProxSimulationTraits> >,
00052         Prox::AggregateListener<ObjectProxSimulationTraits>
00053 {
00054 private:
00055     typedef Prox::QueryHandler<ObjectProxSimulationTraits> ProxQueryHandler;
00056     typedef Prox::Aggregator<ObjectProxSimulationTraits> ProxAggregator;
00057 public:
00058     // MAIN Thread: All public interface is expected to be called only from the main thread.
00059     typedef Prox::Query<ObjectProxSimulationTraits> Query;
00060     typedef Prox::QueryEvent<ObjectProxSimulationTraits> QueryEvent;
00061 
00062     LibproxProximity(SpaceContext* ctx, LocationService* locservice, CoordinateSegmentation* cseg, SpaceNetwork* net, AggregateManager* aggmgr);
00063     ~LibproxProximity();
00064 
00065     // MAIN Thread:
00066 
00067     // Service Interface overrides
00068     virtual void start();
00069 
00070     // ObjectSessionListener Interface
00071     virtual void newSession(ObjectSession* session);
00072     virtual void sessionClosed(ObjectSession* session);
00073 
00074     // Objects
00075     virtual void addQuery(UUID obj, SolidAngle sa, uint32 max_results);
00076     virtual void addQuery(UUID obj, const String& params);
00077     virtual void removeQuery(UUID obj);
00078 
00079     // PintoServerQuerierListener Interface
00080     virtual void onPintoServerResult(const Sirikata::Protocol::Prox::ProximityUpdate& update);
00081     virtual void onPintoServerLocUpdate(const LocUpdate& update);
00082 
00083     // LocationServiceListener Interface
00084     virtual void localObjectRemoved(const UUID& uuid, bool agg);
00085     virtual void localLocationUpdated(const UUID& uuid, bool agg, const TimedMotionVector3f& newval);
00086     virtual void localBoundsUpdated(const UUID& uuid, bool agg, const AggregateBoundingInfo& newval);
00087     virtual void replicaObjectRemoved(const UUID& uuid);
00088     virtual void replicaLocationUpdated(const UUID& uuid, const TimedMotionVector3f& newval);
00089 
00090     // MessageRecipient Interface
00091     virtual void receiveMessage(Message* msg);
00092 
00093     // MigrationDataClient Interface
00094     virtual std::string migrationClientTag();
00095     virtual std::string generateMigrationData(const UUID& obj, ServerID source_server, ServerID dest_server);
00096     virtual void receiveMigrationData(const UUID& obj, ServerID source_server, ServerID dest_server, const std::string& data);
00097 
00098 
00099     // PROX Thread:
00100 
00101     // AggregateListener Interface
00102     virtual void aggregateCreated(ProxAggregator* handler, const ObjectReference& objid);
00103     virtual void aggregateChildAdded(ProxAggregator* handler, const ObjectReference& objid, const ObjectReference& child, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size);
00104     virtual void aggregateChildRemoved(ProxAggregator* handler, const ObjectReference& objid, const ObjectReference& child, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size);
00105     virtual void aggregateBoundsUpdated(ProxAggregator* handler, const ObjectReference& objid, const Vector3f& bnds_center, const float32 bnds_center_radius, const float32 max_obj_size);
00106     virtual void aggregateDestroyed(ProxAggregator* handler, const ObjectReference& objid);
00107     virtual void aggregateObserved(ProxAggregator* handler, const ObjectReference& objid, uint32 nobservers, uint32 nchildren);
00108 
00109     // QueryEventListener Interface
00110     void queryHasEvents(Query* query);
00111 
00112 
00113 private:
00114     struct ProxQueryHandlerData;
00115     typedef std::tr1::unordered_set<ServerID> ServerSet;
00116 
00117     void handleObjectProximityMessage(const UUID& objid, void* buffer, uint32 length);
00118 
00119     // BOTH Threads - uses thread safe data
00120 
00121     // PintoServerQuerier management
00122     // Utility -- setup all known servers for a server query update
00123     void addAllServersForUpdate();
00124     // Get/add servers for sending and update of our aggregate query to
00125     void getServersForAggregateQueryUpdate(ServerSet* servers_out);
00126     void addServerForAggregateQueryUpdate(ServerID sid);
00127     // Initiate updates to aggregate queries and stats over all objects, used to
00128     // trigger updated requests to top-level pinto and other servers
00129     void updateAggregateQuery();
00130     // Number of servers we have active queries to
00131     uint32 numServersQueried();
00132 
00133 
00134     // MAIN Thread: These are utility methods which should only be called from the main thread.
00135     virtual int32 objectQueries() const;
00136     virtual int32 serverQueries() const;
00137 
00138     // Update queries based on current state.
00139     void poll();
00140 
00141     // Server queries requests, generated by receiving messages
00142     void updateQuery(ServerID sid, const TimedMotionVector3f& loc, const BoundingSphere3f& bounds, const SolidAngle& sa, uint32 max_results);
00143     void removeQuery(ServerID sid);
00144 
00145     // Object queries
00146     void updateQuery(UUID obj, const TimedMotionVector3f& loc, const BoundingSphere3f& bounds, SolidAngle sa, uint32 max_results);
00147 
00148     // Send a query add/update request to any servers we've marked as needing an
00149     // update
00150     void sendQueryRequests();
00151 
00152 
00153     // PROX Thread: These are utility methods which should only be called from the prox thread.
00154 
00155     // Handle various query events from the main thread
00156     void handleUpdateServerQuery(const ServerID& server, const TimedMotionVector3f& loc, const BoundingSphere3f& bounds, const SolidAngle& angle, uint32 max_results);
00157     void handleRemoveServerQuery(const ServerID& server);
00158 
00159     // Override for forced disconnections
00160     virtual void handleConnectedServer(ServerID server);
00161     virtual void handleDisconnectedServer(ServerID server);
00162 
00163     void handleUpdateObjectQuery(const UUID& object, const TimedMotionVector3f& loc, const BoundingSphere3f& bounds, const SolidAngle& angle, uint32 max_results, SeqNoPtr seqno);
00164     void handleRemoveObjectQuery(const UUID& object, bool notify_main_thread);
00165     void handleDisconnectedObject(const UUID& object);
00166 
00167     // Generate query events based on results collected from query handlers
00168     void generateServerQueryEvents(Query* query);
00169     void generateObjectQueryEvents(Query* query, bool do_first=false);
00170 
00171     // Decides whether a query handler should handle a particular object.
00172     bool handlerShouldHandleObject(bool is_static_handler, bool is_global_handler, const ObjectReference& obj_id, bool local, bool aggregate, const TimedMotionVector3f& pos, const BoundingSphere3f& region, float maxSize);
00173     // The real handler for moving objects between static/dynamic
00174     void handleCheckObjectClassForHandlers(const ObjectReference& objid, bool is_static, ProxQueryHandlerData handlers[NUM_OBJECT_CLASSES]);
00175     virtual void trySwapHandlers(bool is_local, const ObjectReference& objid, bool is_static);
00176 
00183     SeqNoPtr getOrCreateSeqNoInfo(const ServerID server_id);
00184     void eraseSeqNoInfo(const ServerID server_id);
00185     SeqNoPtr getSeqNoInfo(const UUID& obj_id);
00186     void eraseSeqNoInfo(const UUID& obj_id);
00187 
00188     typedef std::set<UUID> ObjectSet;
00189     typedef std::tr1::unordered_map<ServerID, Query*> ServerQueryMap;
00190     typedef std::tr1::unordered_map<Query*, ServerID> InvertedServerQueryMap;
00191     typedef std::tr1::unordered_map<UUID, Query*, UUID::Hasher> ObjectQueryMap;
00192     typedef std::tr1::unordered_set<Query*> FirstIterationObjectSet;
00193     typedef std::tr1::unordered_map<Query*, UUID> InvertedObjectQueryMap;
00194 
00195     typedef std::tr1::shared_ptr<ObjectSet> ObjectSetPtr;
00196     typedef std::tr1::unordered_map<ServerID, ObjectSetPtr> ServerQueryResultSet;
00197 
00198 
00199     // BOTH Threads - thread-safe data
00200 
00201     boost::mutex mServerSetMutex;
00202     // This tracks the servers we currently have subscriptions with
00203     ServerSet mServersQueried;
00204     // And this indicates whether we need to send new requests
00205     // out to other servers
00206     ServerSet mNeedServerQueryUpdate;
00207 
00208 
00209 
00210     // MAIN Thread - Should only be accessed in methods used by the main thread
00211 
00212     // The distance to use when doing range queries instead of solid angle queries.
00213     // FIXME we should have this configurable somehow
00214     float32 mDistanceQueryDistance;
00215 
00216     // Tracks object query angles for quick access in the main thread
00217     // NOTE: It really sucks that we're duplicating this information
00218     // but we'd have to provide a safe query map and query angle accessor
00219     // to avoid this.
00220     typedef std::map<UUID, SolidAngle> ObjectQueryAngleMap;
00221     ObjectQueryAngleMap mObjectQueryAngles;
00222     typedef std::map<UUID, uint32> ObjectQueryMaxCountMap;
00223     ObjectQueryMaxCountMap mObjectQueryMaxCounts;
00224 
00225 
00226     // Aggregate query info. Aggregate object stats are managed by
00227     // LibproxProximityBase.
00228     // Since objects can be moving, we just recompute periodically and update
00229     // other servers. We only track the value so we can avoid sending updates
00230     // when it doesn't change
00231     AggregateBoundingInfo mLastAggregateQuerierBounds;
00232     // This tracks the minimum object query size, which is used
00233     // as the angle for queries to other servers.
00234     SolidAngle mMinObjectQueryAngle;
00235     // And similarly, this tracks the maximum max_count query parameters as
00236     // conservative estimate of number of results needed from other servers.
00237     uint32 mMaxMaxCount;
00238 
00239 
00240     std::deque<Message*> mServerResultsToSend; // server query results waiting to be sent
00241     std::deque<Sirikata::Protocol::Object::ObjectMessage*> mObjectResultsToSend; // object query results waiting to be sent
00242 
00243 
00244 
00245     // PROX Thread - Should only be accessed in methods used by the prox thread
00246 
00247     void tickQueryHandler(ProxQueryHandlerData qh[NUM_OBJECT_CLASSES]);
00248     void rebuildHandlerType(ProxQueryHandlerData* handler, ObjectClass objtype);
00249     void rebuildHandler(ObjectClass objtype);
00250 
00251     void recomputeAggregateQueryBounds();
00252 
00253     // Command handlers
00254     virtual void commandProperties(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00255     virtual void commandListHandlers(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00256     bool parseHandlerName(const String& name, ProxQueryHandlerData** handlers_out, ObjectClass* class_out);
00257     virtual void commandForceRebuild(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00258     virtual void commandListNodes(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00259     virtual void commandListQueriers(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00260 
00261     typedef std::tr1::unordered_set<ObjectReference, ObjectReference::Hasher> ObjectIDSet;
00262     struct ProxQueryHandlerData {
00263         ProxQueryHandler* handler;
00264         // Additions and removals that need to be processed on the
00265         // next tick. These need to be handled carefully since they
00266         // can be due to swapping between handlers. If they are
00267         // processed in the wrong order we could end up generating
00268         // [addition, removal] instead of [removal, addition] for
00269         // queriers.
00270         ObjectIDSet additions;
00271         ObjectIDSet removals;
00272     };
00273     // These track local objects and answer queries from other
00274     // servers.
00275     ServerQueryMap mServerQueries[NUM_OBJECT_CLASSES];
00276     InvertedServerQueryMap mInvertedServerQueries;
00277     ProxQueryHandlerData mServerQueryHandler[NUM_OBJECT_CLASSES];
00278     bool mServerDistance; // Using distance queries
00279     // Results from queries to other servers, so we know what we need to remove
00280     // on forceful disconnection
00281     ServerQueryResultSet mServerQueryResults;
00282     PollerService mServerHandlerPoller;
00283     PollerService mServerQueryBoundsPoller;
00284 
00285     // These track all objects being reported to this server and
00286     // answer queries for objects connected to this server.
00287     ObjectQueryMap mObjectQueries[NUM_OBJECT_CLASSES];
00288     InvertedObjectQueryMap mInvertedObjectQueries;
00289     FirstIterationObjectSet mObjectQueriesFirstIteration;
00290     ProxQueryHandlerData mObjectQueryHandler[NUM_OBJECT_CLASSES];
00291     bool mObjectDistance; // Using distance queries
00292     PollerService mObjectHandlerPoller;
00293 
00294     // Pollers that trigger rebuilding of query data structures
00295     PollerService mStaticRebuilderPoller;
00296     PollerService mDynamicRebuilderPoller;
00297 
00298     // Track SeqNo info for each querier
00299     typedef std::tr1::unordered_map<ServerID, SeqNoPtr> ServerSeqNoInfoMap;
00300     ServerSeqNoInfoMap mServerSeqNos;
00301     typedef std::tr1::unordered_map<UUID, SeqNoPtr, UUID::Hasher> ObjectSeqNoInfoMap;
00302     ObjectSeqNoInfoMap mObjectSeqNos;
00303 
00304 
00305     // Threads: Thread-safe data used for exchange between threads
00306     Sirikata::ThreadSafeQueue<Message*> mServerResults; // server query results that need to be sent
00307     Sirikata::ThreadSafeQueue<Sirikata::Protocol::Object::ObjectMessage*> mObjectResults; // object query results that need to be sent
00308 
00309 }; //class LibproxProximity
00310 
00311 } // namespace Sirikata
00312 
00313 #endif //_SIRIKATA_LIBPROX_PROXIMITY_HPP_