Sirikata
libspace/plugins/prox/LibproxProximityBase.hpp
Go to the documentation of this file.
00001 // Copyright (c) 2011 Sirikata Authors. All rights reserved.
00002 // Use of this source code is governed by a BSD-style license that can
00003 // be found in the LICENSE file.
00004 
00005 #ifndef _SIRIKATA_LIBPROX_PROXIMITY_BASE_HPP_
00006 #define _SIRIKATA_LIBPROX_PROXIMITY_BASE_HPP_
00007 
00008 #include <sirikata/space/Proximity.hpp>
00009 #include "CBRLocationServiceCache.hpp"
00010 #include <prox/base/QueryEvent.hpp>
00011 #include <sirikata/space/PintoServerQuerier.hpp>
00012 
00013 #include <boost/multi_index_container.hpp>
00014 #include <boost/multi_index/member.hpp>
00015 #include <boost/multi_index/ordered_index.hpp>
00016 
00017 #include <sirikata/core/util/InstanceMethodNotReentrant.hpp>
00018 
00019 namespace Sirikata {
00020 
00024 class LibproxProximityBase : public Proximity, PintoServerQuerierListener {
00025 public:
00026     LibproxProximityBase(SpaceContext* ctx, LocationService* locservice, CoordinateSegmentation* cseg, SpaceNetwork* net, AggregateManager* aggmgr);
00027     ~LibproxProximityBase();
00028 
00029     // Service Interface overrides
00030     virtual void start();
00031     virtual void stop();
00032 
00033     // LocationServiceListener Interface - used here only to track
00034     // object sizes for top-level pinto
00035     virtual void localObjectAdded(const UUID& uuid, bool agg, const TimedMotionVector3f& loc, const TimedMotionQuaternion& orient, const AggregateBoundingInfo& bounds, const String& mesh, const String& physics, const String& zernike);
00036     virtual void localObjectRemoved(const UUID& uuid, bool agg);
00037     virtual void localBoundsUpdated(const UUID& uuid, bool agg, const AggregateBoundingInfo& newval);
00038 
00039 protected:
00040     typedef Prox::QueryEvent<ObjectProxSimulationTraits> QueryEvent;
00041     typedef std::deque<QueryEvent> QueryEventList;
00042 
00043     // Helper types & methods
00044     enum ObjectClass {
00045         OBJECT_CLASS_STATIC = 0,
00046         OBJECT_CLASS_DYNAMIC = 1,
00047         NUM_OBJECT_CLASSES = 2
00048     };
00049     static const std::string& ObjectClassToString(ObjectClass c);
00050     static BoundingBox3f aggregateBBoxes(const BoundingBoxList& bboxes);
00051     static bool velocityIsStatic(const Vector3f& vel);
00052 
00053     // Coalesces events, turning them effectively into one giant event (although
00054     // split across enough events that no event is too large). This gets rid of
00055     // any intermediate additions/removals that occurred as the cut was
00056     // refined/unrefined. The old results are destroyed and the new ones are
00057     // placed back in the query event queue.
00058     //
00059     // per_event indicates how many additions/removals to put in each
00060     // event. Since they are no longer forced to be together to be atomic, we
00061     // can pack them however we like.
00062     void coalesceEvents(QueryEventList& evts, uint32 per_event);
00063 
00064     // BOTH Threads: These are read-only or lock protected.
00065 
00066     struct Stats {
00067         Stats()
00068          : objectSentBytes(0),
00069            objectSentMessages(0),
00070            objectReceivedBytes(0),
00071            objectReceivedMessages(0),
00072            objectHostSentBytes(0),
00073            objectHostSentMessages(0),
00074            objectHostReceivedBytes(0),
00075            objectHostReceivedMessages(0),
00076            spaceSentBytes(0),
00077            spaceSentMessages(0),
00078            spaceReceivedBytes(0),
00079            spaceReceivedMessages(0)
00080         {}
00081 
00082         // Total number of bytes sent to objects
00083         AtomicValue<uint32> objectSentBytes;
00084         // Total messages sent to objects (calls to sendObjectResult)
00085         AtomicValue<uint32> objectSentMessages;
00086         // Total number of bytes received from objects
00087         AtomicValue<uint32> objectReceivedBytes;
00088         // Total messages received to objects
00089         AtomicValue<uint32> objectReceivedMessages;
00090 
00091         // Total number of bytes sent to object hosts
00092         AtomicValue<uint32> objectHostSentBytes;
00093         // Total messages sent to objects hosts (calls to sendObjectHostResult)
00094         AtomicValue<uint32> objectHostSentMessages;
00095         // Total number of bytes received to object hosts
00096         AtomicValue<uint32> objectHostReceivedBytes;
00097         // Total messages received from object hosts
00098         AtomicValue<uint32> objectHostReceivedMessages;
00099 
00100         // Total number of bytes sent to other space servers
00101         AtomicValue<uint32> spaceSentBytes;
00102         // Total messages sent to other space servers
00103         AtomicValue<uint32> spaceSentMessages;
00104         // Total number of bytes received from other space servers
00105         AtomicValue<uint32> spaceReceivedBytes;
00106         // Total messages received from other space servers
00107         AtomicValue<uint32> spaceReceivedMessages;
00108     };
00109     Stats mStats;
00110 
00111     // To support a static/dynamic split but also support mixing them for
00112     // comparison purposes track which we are doing and, for most places, use a
00113     // simple index to control whether they point to different query handlers or
00114     // the same one.
00115     bool mSeparateDynamicObjects;
00116     int mNumQueryHandlers;
00117     // When using separate trees, how long to wait after an object becomes
00118     // static to move it into the static tree. This keeps us from moving things
00119     // in and out of trees frequently because of short stops (e.g. and avatar
00120     // stops for a few seconds while walking).
00121     Duration mMoveToStaticDelay;
00122 
00123 
00124     // Top level Pinto + server interactions. The base class takes
00125     // care of querying top level Pinto and tracking which servers
00126     // have connections + need queries, but the implementations need
00127     // to take it from there (removing items from
00128     // mNeedServerQueryUpdate and processing them). Note that unlike the data
00129     // below, this is only accessed on the main thread.
00130     PintoServerQuerier* mServerQuerier;
00131 
00132 
00133 
00134     // MAIN Thread: Utility methods that should only be called from the main
00135     // strand
00136 
00137     // PintoServerQuerierListener Interface
00138     virtual void onPintoServerResult(const Sirikata::Protocol::Prox::ProximityUpdate& update) = 0;
00139     virtual void onPintoServerLocUpdate(const LocUpdate& update) = 0;
00140 
00141     // SpaceNetworkConnectionListener Interface
00142     virtual void onSpaceNetworkConnected(ServerID sid);
00143     virtual void onSpaceNetworkDisconnected(ServerID sid);
00144 
00145     // CoordinateSegmentation::Listener Interface
00146     virtual void updatedSegmentation(CoordinateSegmentation* cseg, const std::vector<SegmentationInfo>& new_seg);
00147 
00148     // Object sizes -- shouldn't be called by subclasses, these are
00149     // used to update the server querier. Just make sure you call this
00150     // class's implementation of LocationServiceListener methods.
00151     void updateObjectSize(const UUID& obj, float rad);
00152     void removeObjectSize(const UUID& obj);
00153 
00154 private: // Shouldn't be needed by subclasses, only used for server querier
00155     // Track object sizes and the maximum of all of them.
00156     typedef std::tr1::unordered_map<UUID, float32, UUID::Hasher> ObjectSizeMap;
00157     ObjectSizeMap mObjectSizes;
00158 protected:
00159     float32 mMaxObject; // Only exposed for reporting to commands
00160 
00161     // Server-to-server messages
00162 private:
00163     // Now private to ensure we get correct accounting, send via the helper
00164     // method below
00165     Router<Message*>* mProxServerMessageService;
00166 protected:
00167     // Helpers to track traffic stats
00168     bool sendServerMessage(Message* msg);
00169     void serverMessageReceived(Message* msg);
00170 
00171     // Server-to-Object, Server-to-ObjectHost streams
00172 
00173     // ProxStreamInfo manages *most* of the state for sending data to
00174     // a client. This data is managed by the main thread, where
00175     // messaging is performed. See SeqNoInfo for how sequence numbers
00176     // are stored -- they need to be accessed in the Prox thread so
00177     // they are managed separately.
00178     template<typename EndpointType, typename StreamType>
00179     struct ProxStreamInfo {
00180     public:
00181         typedef std::tr1::shared_ptr<StreamType> StreamTypePtr;
00182         typedef std::tr1::shared_ptr<ProxStreamInfo> Ptr;
00183         typedef std::tr1::weak_ptr<ProxStreamInfo> WPtr;
00184 
00185         // Start a fresh ProxStreamInfo, which will require requesting
00186         // a new substream
00187         ProxStreamInfo()
00188          : iostream_requested(false), writing(false) {}
00189         // Start a ProxStreamInfo on an existing stream.
00190         ProxStreamInfo(StreamTypePtr strm)
00191          : iostream(strm), iostream_requested(true), writing(false) {}
00192 
00193         void disable() {
00194             if (iostream)
00195                 iostream->close(false);
00196         }
00197 
00198         // Setup reading of frames from the stream. ProxStreamInfo
00199         // takes care of queueing up messages until complete frames
00200         // are available, giving just a callback per message.
00201         typedef std::tr1::function<void(String&)> FrameReceivedCallback;
00202         void readFramesFromStream(Ptr prox_stream, FrameReceivedCallback cb);
00203 
00204         // The actual stream we send on
00205         StreamTypePtr iostream;
00206         // Whether we've requested the iostream
00207         bool iostream_requested;
00208 
00209         // Outstanding data to be sent. FIXME efficiency
00210         std::queue<std::string> outstanding;
00211         // If writing is currently in progress
00212         bool writing;
00213         // Stored callback for writing
00214         std::tr1::function<void()> writecb;
00215 
00216         // Stored callback for reading frames
00217         FrameReceivedCallback read_frame_cb;
00218         // Backlog of data, i.e. incomplete frame
00219         String partial_frame;
00220 
00221         // Defined safely in cpp since these are only used from
00222         // LibproxProximityBase
00223 
00224         // Handle reads from the underlying stream, decoding frames
00225         // and invoking the read callback
00226         static void handleRead(WPtr w_prox_stream, uint8* data, int size);
00227 
00228         // The driver for getting data to the OH, initially triggered by sendObjectResults
00229         static void writeSomeObjectResults(Context* ctx, WPtr prox_stream);
00230         // Helper for setting up the initial proximity stream. Retries automatically
00231         // until successful.
00232         static void requestProxSubstream(LibproxProximityBase* parent, Context* ctx, const EndpointType& oref, Ptr prox_stream);
00233         // Helper that handles callbacks about prox stream setup
00234         static void proxSubstreamCallback(LibproxProximityBase* parent, Context* ctx, int x, const EndpointType& oref, StreamTypePtr parent_stream, StreamTypePtr substream, Ptr prox_stream_info);
00235     };
00236 
00237     typedef ODPSST::Stream::Ptr ProxObjectStreamPtr;
00238     typedef ProxStreamInfo<ObjectReference, ODPSST::Stream> ProxObjectStreamInfo;
00239     typedef std::tr1::shared_ptr<ProxObjectStreamInfo> ProxObjectStreamInfoPtr;
00240     typedef OHDPSST::Stream::Ptr ProxObjectHostStreamPtr;
00241     typedef ProxStreamInfo<OHDP::NodeID, OHDPSST::Stream> ProxObjectHostStreamInfo;
00242     typedef std::tr1::shared_ptr<ProxObjectHostStreamInfo> ProxObjectHostStreamInfoPtr;
00243 
00244     // Utility for implementations. Start listening on the stream and
00245     // read each Network::Frame, emitting a callback for each.
00246     void readFramesFromObjectStream(const ObjectReference& oref, ProxObjectStreamInfo::FrameReceivedCallback cb);
00247     void readFramesFromObjectHostStream(const OHDP::NodeID& node, ProxObjectHostStreamInfo::FrameReceivedCallback cb);
00248     // Wrappers for FrameReceivedCallbacks that track stats
00249     void readObjectStreamFrame(String& payload, ProxObjectStreamInfo::FrameReceivedCallback cb);
00250     void readObjectHostStreamFrame(String& payload, ProxObjectStreamInfo::FrameReceivedCallback cb);
00251 
00252     // Utility for poll.  Queues a message for delivery, encoding it and putting
00253     // it on the send stream.  If necessary, starts send processing on the stream.
00254     void sendObjectResult(Sirikata::Protocol::Object::ObjectMessage*);
00255     void sendObjectHostResult(const OHDP::NodeID& node, Sirikata::Protocol::Object::ObjectMessage*);
00256 
00257     // Helpers that are protocol-specific
00258     bool validSession(const ObjectReference& oref) const;
00259     bool validSession(const OHDP::NodeID& node) const;
00260     ProxObjectStreamPtr getBaseStream(const ObjectReference& oref) const;
00261     ProxObjectHostStreamPtr getBaseStream(const OHDP::NodeID& node) const;
00262     // Use these to setup ProxStreamInfo's when the client initiates
00263     // the stream that will be used to communicate with it.
00264     void addObjectProxStreamInfo(ODPSST::Stream::Ptr);
00265     void addObjectHostProxStreamInfo(OHDPSST::Stream::Ptr);
00266 
00267     // Handle various events in the main thread that are triggered in the prox thread
00268     void handleAddObjectLocSubscription(const UUID& subscriber, const UUID& observed);
00269     void handleAddObjectLocSubscriptionWithID(const UUID& subscriber, const UUID& observed, ProxIndexID index_id);
00270     void handleRemoveObjectLocSubscription(const UUID& subscriber, const UUID& observed);
00271     void handleRemoveObjectLocSubscriptionWithID(const UUID& subscriber, const UUID& observed, ProxIndexID index_id);
00272     void handleRemoveAllObjectLocSubscription(const UUID& subscriber);
00273     void handleAddOHLocSubscription(const OHDP::NodeID& subscriber, const UUID& observed);
00274     void handleAddOHLocSubscriptionWithID(const OHDP::NodeID& subscriber, const UUID& observed, ProxIndexID index_id);
00275     void handleRemoveOHLocSubscription(const OHDP::NodeID& subscriber, const UUID& observed);
00276     void handleRemoveOHLocSubscriptionWithID(const OHDP::NodeID& subscriber, const UUID& observed, ProxIndexID index_id);
00277     void handleRemoveAllOHLocSubscription(const OHDP::NodeID& subscriber);
00278     void handleAddServerLocSubscription(const ServerID& subscriber, const UUID& observed, SeqNoPtr seqPtr);
00279     void handleAddServerLocSubscriptionWithID(const ServerID& subscriber, const UUID& observed, ProxIndexID index_id, SeqNoPtr seqPtr);
00280     void handleRemoveServerLocSubscription(const ServerID& subscriber, const UUID& observed);
00281     void handleRemoveServerLocSubscriptionWithID(const ServerID& subscriber, const UUID& observed, ProxIndexID index_id);
00282     void handleRemoveAllServerLocSubscription(const ServerID& subscriber);
00283 
00284     // Takes care of switching objects between static/dynamic
00285     void checkObjectClass(bool is_local, const UUID& objid, const TimedMotionVector3f& newval);
00286 
00287 
00288     typedef std::tr1::unordered_map<UUID, ProxObjectStreamInfoPtr, UUID::Hasher> ObjectProxStreamMap;
00289     ObjectProxStreamMap mObjectProxStreams;
00290 
00291     typedef std::tr1::unordered_map<OHDP::NodeID, ProxObjectHostStreamInfoPtr, OHDP::NodeID::Hasher> ObjectHostProxStreamMap;
00292     ObjectHostProxStreamMap mObjectHostProxStreams;
00293 
00294 
00295 
00296 
00297 
00298 
00299     // PROX Thread - Should only be accessed in methods used by the
00300     // prox thread
00301 
00302     CBRLocationServiceCache* mLocCache;
00303 
00304     // Track objects that have become static and, after a delay, need to be
00305     // moved between trees. We track them by ID (to cancel due to movement or
00306     // disconnect) and time (to process them efficiently as their timeouts
00307     // expire).
00308     struct StaticObjectTimeout {
00309         StaticObjectTimeout(ObjectReference id, Time _expires, bool l)
00310          : objid(id),
00311            expires(_expires),
00312            local(l)
00313         {}
00314         ObjectReference objid;
00315         Time expires;
00316         bool local;
00317     };
00318     // Tags used by ObjectInfoSet
00319     struct objid_tag {};
00320     struct expires_tag {};
00321     typedef boost::multi_index_container<
00322         StaticObjectTimeout,
00323         boost::multi_index::indexed_by<
00324             boost::multi_index::ordered_unique< boost::multi_index::tag<objid_tag>, BOOST_MULTI_INDEX_MEMBER(StaticObjectTimeout,ObjectReference,objid) >,
00325             boost::multi_index::ordered_non_unique< boost::multi_index::tag<expires_tag>, BOOST_MULTI_INDEX_MEMBER(StaticObjectTimeout,Time,expires) >
00326             >
00327         > StaticObjectTimeouts;
00328     typedef StaticObjectTimeouts::index<objid_tag>::type StaticObjectsByID;
00329     typedef StaticObjectTimeouts::index<expires_tag>::type StaticObjectsByExpiration;
00330     StaticObjectTimeouts mStaticObjectTimeouts;
00331 
00332     // All queryHasEvents calls are going to not be reentrant unless you're very
00333     // careful, so the base class provides this so it's easy to verify it.
00334     InstanceMethodNotReentrant mQueryHasEventsNotRentrant;
00335 
00336     // Prox thread handlers for connection events. They perform some
00337     // basic maintenance (putting server into set that needs update,
00338     // removing from that set, etc) and then Implementations can
00339     // override these to perform additional operations, but they'll
00340     // get other events as a result even if they don't -- new servers
00341     // will appear in the set that need to be queried and
00342     // handleForcedServerDisconnection will be invoked.
00343     virtual void handleConnectedServer(ServerID sid);
00344     virtual void handleDisconnectedServer(ServerID sid);
00345 
00346     void removeStaticObjectTimeout(const ObjectReference& objid);
00347     virtual void trySwapHandlers(bool is_local, const ObjectReference& objid, bool is_static) = 0;
00348     void handleCheckObjectClass(bool is_local, const ObjectReference& objid, const TimedMotionVector3f& newval);
00349     void processExpiredStaticObjectTimeouts();
00350 
00351     // Query-Type-Agnostic AggregateListener Interface -- manages adding to Loc
00352     // and passing to AggregateManager, but you need to delegate to these
00353     // yourself since the AggregateListener interface depends on the type of
00354     // query/query handler being used.
00355     virtual void aggregateCreated(const ObjectReference& objid);
00356     virtual void aggregateChildAdded(const ObjectReference& objid, const ObjectReference& child, const Vector3f& pos, const AggregateBoundingInfo& bnds);
00357     virtual void aggregateChildRemoved(const ObjectReference& objid, const ObjectReference& child, const Vector3f& pos, const AggregateBoundingInfo& bnds);
00358     virtual void aggregateBoundsUpdated(const ObjectReference& objid, const Vector3f& pos, const AggregateBoundingInfo& bnds);
00359     virtual void aggregateDestroyed(const ObjectReference& objid);
00360     virtual void aggregateObserved(const ObjectReference& objid, uint32 nobservers, uint32 nchildren);
00361     // Helper for updating aggregates
00362     void updateAggregateLoc(const ObjectReference& objid, const Vector3f& pos, const AggregateBoundingInfo& bnds);
00363 
00364     // Command handlers
00365     virtual void commandProperties(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid) = 0;
00366     virtual void commandListHandlers(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid) = 0;
00367     virtual void commandForceRebuild(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid) = 0;
00368     virtual void commandListNodes(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid) = 0;
00369     virtual void commandStats(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00370 
00371 }; // class LibproxProximityBase
00372 
00373 } // namespace Sirikata
00374 
00375 #endif //_SIRIKATA_LIBPROX_PROXIMITY_BASE_HPP_