Sirikata
|
00001 // Copyright (c) 2011 Sirikata Authors. All rights reserved. 00002 // Use of this source code is governed by a BSD-style license that can 00003 // be found in the LICENSE file. 00004 00005 #ifndef _SIRIKATA_LIBPROX_PROXIMITY_BASE_HPP_ 00006 #define _SIRIKATA_LIBPROX_PROXIMITY_BASE_HPP_ 00007 00008 #include <sirikata/space/Proximity.hpp> 00009 #include "CBRLocationServiceCache.hpp" 00010 #include <prox/base/QueryEvent.hpp> 00011 #include <sirikata/space/PintoServerQuerier.hpp> 00012 00013 #include <boost/multi_index_container.hpp> 00014 #include <boost/multi_index/member.hpp> 00015 #include <boost/multi_index/ordered_index.hpp> 00016 00017 #include <sirikata/core/util/InstanceMethodNotReentrant.hpp> 00018 00019 namespace Sirikata { 00020 00024 class LibproxProximityBase : public Proximity, PintoServerQuerierListener { 00025 public: 00026 LibproxProximityBase(SpaceContext* ctx, LocationService* locservice, CoordinateSegmentation* cseg, SpaceNetwork* net, AggregateManager* aggmgr); 00027 ~LibproxProximityBase(); 00028 00029 // Service Interface overrides 00030 virtual void start(); 00031 virtual void stop(); 00032 00033 // LocationServiceListener Interface - used here only to track 00034 // object sizes for top-level pinto 00035 virtual void localObjectAdded(const UUID& uuid, bool agg, const TimedMotionVector3f& loc, const TimedMotionQuaternion& orient, const AggregateBoundingInfo& bounds, const String& mesh, const String& physics, const String& zernike); 00036 virtual void localObjectRemoved(const UUID& uuid, bool agg); 00037 virtual void localBoundsUpdated(const UUID& uuid, bool agg, const AggregateBoundingInfo& newval); 00038 00039 protected: 00040 typedef Prox::QueryEvent<ObjectProxSimulationTraits> QueryEvent; 00041 typedef std::deque<QueryEvent> QueryEventList; 00042 00043 // Helper types & methods 00044 enum ObjectClass { 00045 OBJECT_CLASS_STATIC = 0, 00046 OBJECT_CLASS_DYNAMIC = 1, 00047 NUM_OBJECT_CLASSES = 2 00048 }; 00049 static const std::string& ObjectClassToString(ObjectClass c); 00050 static BoundingBox3f aggregateBBoxes(const BoundingBoxList& bboxes); 00051 static bool velocityIsStatic(const Vector3f& vel); 00052 00053 // Coalesces events, turning them effectively into one giant event (although 00054 // split across enough events that no event is too large). This gets rid of 00055 // any intermediate additions/removals that occurred as the cut was 00056 // refined/unrefined. The old results are destroyed and the new ones are 00057 // placed back in the query event queue. 00058 // 00059 // per_event indicates how many additions/removals to put in each 00060 // event. Since they are no longer forced to be together to be atomic, we 00061 // can pack them however we like. 00062 void coalesceEvents(QueryEventList& evts, uint32 per_event); 00063 00064 // BOTH Threads: These are read-only or lock protected. 00065 00066 struct Stats { 00067 Stats() 00068 : objectSentBytes(0), 00069 objectSentMessages(0), 00070 objectReceivedBytes(0), 00071 objectReceivedMessages(0), 00072 objectHostSentBytes(0), 00073 objectHostSentMessages(0), 00074 objectHostReceivedBytes(0), 00075 objectHostReceivedMessages(0), 00076 spaceSentBytes(0), 00077 spaceSentMessages(0), 00078 spaceReceivedBytes(0), 00079 spaceReceivedMessages(0) 00080 {} 00081 00082 // Total number of bytes sent to objects 00083 AtomicValue<uint32> objectSentBytes; 00084 // Total messages sent to objects (calls to sendObjectResult) 00085 AtomicValue<uint32> objectSentMessages; 00086 // Total number of bytes received from objects 00087 AtomicValue<uint32> objectReceivedBytes; 00088 // Total messages received to objects 00089 AtomicValue<uint32> objectReceivedMessages; 00090 00091 // Total number of bytes sent to object hosts 00092 AtomicValue<uint32> objectHostSentBytes; 00093 // Total messages sent to objects hosts (calls to sendObjectHostResult) 00094 AtomicValue<uint32> objectHostSentMessages; 00095 // Total number of bytes received to object hosts 00096 AtomicValue<uint32> objectHostReceivedBytes; 00097 // Total messages received from object hosts 00098 AtomicValue<uint32> objectHostReceivedMessages; 00099 00100 // Total number of bytes sent to other space servers 00101 AtomicValue<uint32> spaceSentBytes; 00102 // Total messages sent to other space servers 00103 AtomicValue<uint32> spaceSentMessages; 00104 // Total number of bytes received from other space servers 00105 AtomicValue<uint32> spaceReceivedBytes; 00106 // Total messages received from other space servers 00107 AtomicValue<uint32> spaceReceivedMessages; 00108 }; 00109 Stats mStats; 00110 00111 // To support a static/dynamic split but also support mixing them for 00112 // comparison purposes track which we are doing and, for most places, use a 00113 // simple index to control whether they point to different query handlers or 00114 // the same one. 00115 bool mSeparateDynamicObjects; 00116 int mNumQueryHandlers; 00117 // When using separate trees, how long to wait after an object becomes 00118 // static to move it into the static tree. This keeps us from moving things 00119 // in and out of trees frequently because of short stops (e.g. and avatar 00120 // stops for a few seconds while walking). 00121 Duration mMoveToStaticDelay; 00122 00123 00124 // Top level Pinto + server interactions. The base class takes 00125 // care of querying top level Pinto and tracking which servers 00126 // have connections + need queries, but the implementations need 00127 // to take it from there (removing items from 00128 // mNeedServerQueryUpdate and processing them). Note that unlike the data 00129 // below, this is only accessed on the main thread. 00130 PintoServerQuerier* mServerQuerier; 00131 00132 00133 00134 // MAIN Thread: Utility methods that should only be called from the main 00135 // strand 00136 00137 // PintoServerQuerierListener Interface 00138 virtual void onPintoServerResult(const Sirikata::Protocol::Prox::ProximityUpdate& update) = 0; 00139 virtual void onPintoServerLocUpdate(const LocUpdate& update) = 0; 00140 00141 // SpaceNetworkConnectionListener Interface 00142 virtual void onSpaceNetworkConnected(ServerID sid); 00143 virtual void onSpaceNetworkDisconnected(ServerID sid); 00144 00145 // CoordinateSegmentation::Listener Interface 00146 virtual void updatedSegmentation(CoordinateSegmentation* cseg, const std::vector<SegmentationInfo>& new_seg); 00147 00148 // Object sizes -- shouldn't be called by subclasses, these are 00149 // used to update the server querier. Just make sure you call this 00150 // class's implementation of LocationServiceListener methods. 00151 void updateObjectSize(const UUID& obj, float rad); 00152 void removeObjectSize(const UUID& obj); 00153 00154 private: // Shouldn't be needed by subclasses, only used for server querier 00155 // Track object sizes and the maximum of all of them. 00156 typedef std::tr1::unordered_map<UUID, float32, UUID::Hasher> ObjectSizeMap; 00157 ObjectSizeMap mObjectSizes; 00158 protected: 00159 float32 mMaxObject; // Only exposed for reporting to commands 00160 00161 // Server-to-server messages 00162 private: 00163 // Now private to ensure we get correct accounting, send via the helper 00164 // method below 00165 Router<Message*>* mProxServerMessageService; 00166 protected: 00167 // Helpers to track traffic stats 00168 bool sendServerMessage(Message* msg); 00169 void serverMessageReceived(Message* msg); 00170 00171 // Server-to-Object, Server-to-ObjectHost streams 00172 00173 // ProxStreamInfo manages *most* of the state for sending data to 00174 // a client. This data is managed by the main thread, where 00175 // messaging is performed. See SeqNoInfo for how sequence numbers 00176 // are stored -- they need to be accessed in the Prox thread so 00177 // they are managed separately. 00178 template<typename EndpointType, typename StreamType> 00179 struct ProxStreamInfo { 00180 public: 00181 typedef std::tr1::shared_ptr<StreamType> StreamTypePtr; 00182 typedef std::tr1::shared_ptr<ProxStreamInfo> Ptr; 00183 typedef std::tr1::weak_ptr<ProxStreamInfo> WPtr; 00184 00185 // Start a fresh ProxStreamInfo, which will require requesting 00186 // a new substream 00187 ProxStreamInfo() 00188 : iostream_requested(false), writing(false) {} 00189 // Start a ProxStreamInfo on an existing stream. 00190 ProxStreamInfo(StreamTypePtr strm) 00191 : iostream(strm), iostream_requested(true), writing(false) {} 00192 00193 void disable() { 00194 if (iostream) 00195 iostream->close(false); 00196 } 00197 00198 // Setup reading of frames from the stream. ProxStreamInfo 00199 // takes care of queueing up messages until complete frames 00200 // are available, giving just a callback per message. 00201 typedef std::tr1::function<void(String&)> FrameReceivedCallback; 00202 void readFramesFromStream(Ptr prox_stream, FrameReceivedCallback cb); 00203 00204 // The actual stream we send on 00205 StreamTypePtr iostream; 00206 // Whether we've requested the iostream 00207 bool iostream_requested; 00208 00209 // Outstanding data to be sent. FIXME efficiency 00210 std::queue<std::string> outstanding; 00211 // If writing is currently in progress 00212 bool writing; 00213 // Stored callback for writing 00214 std::tr1::function<void()> writecb; 00215 00216 // Stored callback for reading frames 00217 FrameReceivedCallback read_frame_cb; 00218 // Backlog of data, i.e. incomplete frame 00219 String partial_frame; 00220 00221 // Defined safely in cpp since these are only used from 00222 // LibproxProximityBase 00223 00224 // Handle reads from the underlying stream, decoding frames 00225 // and invoking the read callback 00226 static void handleRead(WPtr w_prox_stream, uint8* data, int size); 00227 00228 // The driver for getting data to the OH, initially triggered by sendObjectResults 00229 static void writeSomeObjectResults(Context* ctx, WPtr prox_stream); 00230 // Helper for setting up the initial proximity stream. Retries automatically 00231 // until successful. 00232 static void requestProxSubstream(LibproxProximityBase* parent, Context* ctx, const EndpointType& oref, Ptr prox_stream); 00233 // Helper that handles callbacks about prox stream setup 00234 static void proxSubstreamCallback(LibproxProximityBase* parent, Context* ctx, int x, const EndpointType& oref, StreamTypePtr parent_stream, StreamTypePtr substream, Ptr prox_stream_info); 00235 }; 00236 00237 typedef ODPSST::Stream::Ptr ProxObjectStreamPtr; 00238 typedef ProxStreamInfo<ObjectReference, ODPSST::Stream> ProxObjectStreamInfo; 00239 typedef std::tr1::shared_ptr<ProxObjectStreamInfo> ProxObjectStreamInfoPtr; 00240 typedef OHDPSST::Stream::Ptr ProxObjectHostStreamPtr; 00241 typedef ProxStreamInfo<OHDP::NodeID, OHDPSST::Stream> ProxObjectHostStreamInfo; 00242 typedef std::tr1::shared_ptr<ProxObjectHostStreamInfo> ProxObjectHostStreamInfoPtr; 00243 00244 // Utility for implementations. Start listening on the stream and 00245 // read each Network::Frame, emitting a callback for each. 00246 void readFramesFromObjectStream(const ObjectReference& oref, ProxObjectStreamInfo::FrameReceivedCallback cb); 00247 void readFramesFromObjectHostStream(const OHDP::NodeID& node, ProxObjectHostStreamInfo::FrameReceivedCallback cb); 00248 // Wrappers for FrameReceivedCallbacks that track stats 00249 void readObjectStreamFrame(String& payload, ProxObjectStreamInfo::FrameReceivedCallback cb); 00250 void readObjectHostStreamFrame(String& payload, ProxObjectStreamInfo::FrameReceivedCallback cb); 00251 00252 // Utility for poll. Queues a message for delivery, encoding it and putting 00253 // it on the send stream. If necessary, starts send processing on the stream. 00254 void sendObjectResult(Sirikata::Protocol::Object::ObjectMessage*); 00255 void sendObjectHostResult(const OHDP::NodeID& node, Sirikata::Protocol::Object::ObjectMessage*); 00256 00257 // Helpers that are protocol-specific 00258 bool validSession(const ObjectReference& oref) const; 00259 bool validSession(const OHDP::NodeID& node) const; 00260 ProxObjectStreamPtr getBaseStream(const ObjectReference& oref) const; 00261 ProxObjectHostStreamPtr getBaseStream(const OHDP::NodeID& node) const; 00262 // Use these to setup ProxStreamInfo's when the client initiates 00263 // the stream that will be used to communicate with it. 00264 void addObjectProxStreamInfo(ODPSST::Stream::Ptr); 00265 void addObjectHostProxStreamInfo(OHDPSST::Stream::Ptr); 00266 00267 // Handle various events in the main thread that are triggered in the prox thread 00268 void handleAddObjectLocSubscription(const UUID& subscriber, const UUID& observed); 00269 void handleAddObjectLocSubscriptionWithID(const UUID& subscriber, const UUID& observed, ProxIndexID index_id); 00270 void handleRemoveObjectLocSubscription(const UUID& subscriber, const UUID& observed); 00271 void handleRemoveObjectLocSubscriptionWithID(const UUID& subscriber, const UUID& observed, ProxIndexID index_id); 00272 void handleRemoveAllObjectLocSubscription(const UUID& subscriber); 00273 void handleAddOHLocSubscription(const OHDP::NodeID& subscriber, const UUID& observed); 00274 void handleAddOHLocSubscriptionWithID(const OHDP::NodeID& subscriber, const UUID& observed, ProxIndexID index_id); 00275 void handleRemoveOHLocSubscription(const OHDP::NodeID& subscriber, const UUID& observed); 00276 void handleRemoveOHLocSubscriptionWithID(const OHDP::NodeID& subscriber, const UUID& observed, ProxIndexID index_id); 00277 void handleRemoveAllOHLocSubscription(const OHDP::NodeID& subscriber); 00278 void handleAddServerLocSubscription(const ServerID& subscriber, const UUID& observed, SeqNoPtr seqPtr); 00279 void handleAddServerLocSubscriptionWithID(const ServerID& subscriber, const UUID& observed, ProxIndexID index_id, SeqNoPtr seqPtr); 00280 void handleRemoveServerLocSubscription(const ServerID& subscriber, const UUID& observed); 00281 void handleRemoveServerLocSubscriptionWithID(const ServerID& subscriber, const UUID& observed, ProxIndexID index_id); 00282 void handleRemoveAllServerLocSubscription(const ServerID& subscriber); 00283 00284 // Takes care of switching objects between static/dynamic 00285 void checkObjectClass(bool is_local, const UUID& objid, const TimedMotionVector3f& newval); 00286 00287 00288 typedef std::tr1::unordered_map<UUID, ProxObjectStreamInfoPtr, UUID::Hasher> ObjectProxStreamMap; 00289 ObjectProxStreamMap mObjectProxStreams; 00290 00291 typedef std::tr1::unordered_map<OHDP::NodeID, ProxObjectHostStreamInfoPtr, OHDP::NodeID::Hasher> ObjectHostProxStreamMap; 00292 ObjectHostProxStreamMap mObjectHostProxStreams; 00293 00294 00295 00296 00297 00298 00299 // PROX Thread - Should only be accessed in methods used by the 00300 // prox thread 00301 00302 CBRLocationServiceCache* mLocCache; 00303 00304 // Track objects that have become static and, after a delay, need to be 00305 // moved between trees. We track them by ID (to cancel due to movement or 00306 // disconnect) and time (to process them efficiently as their timeouts 00307 // expire). 00308 struct StaticObjectTimeout { 00309 StaticObjectTimeout(ObjectReference id, Time _expires, bool l) 00310 : objid(id), 00311 expires(_expires), 00312 local(l) 00313 {} 00314 ObjectReference objid; 00315 Time expires; 00316 bool local; 00317 }; 00318 // Tags used by ObjectInfoSet 00319 struct objid_tag {}; 00320 struct expires_tag {}; 00321 typedef boost::multi_index_container< 00322 StaticObjectTimeout, 00323 boost::multi_index::indexed_by< 00324 boost::multi_index::ordered_unique< boost::multi_index::tag<objid_tag>, BOOST_MULTI_INDEX_MEMBER(StaticObjectTimeout,ObjectReference,objid) >, 00325 boost::multi_index::ordered_non_unique< boost::multi_index::tag<expires_tag>, BOOST_MULTI_INDEX_MEMBER(StaticObjectTimeout,Time,expires) > 00326 > 00327 > StaticObjectTimeouts; 00328 typedef StaticObjectTimeouts::index<objid_tag>::type StaticObjectsByID; 00329 typedef StaticObjectTimeouts::index<expires_tag>::type StaticObjectsByExpiration; 00330 StaticObjectTimeouts mStaticObjectTimeouts; 00331 00332 // All queryHasEvents calls are going to not be reentrant unless you're very 00333 // careful, so the base class provides this so it's easy to verify it. 00334 InstanceMethodNotReentrant mQueryHasEventsNotRentrant; 00335 00336 // Prox thread handlers for connection events. They perform some 00337 // basic maintenance (putting server into set that needs update, 00338 // removing from that set, etc) and then Implementations can 00339 // override these to perform additional operations, but they'll 00340 // get other events as a result even if they don't -- new servers 00341 // will appear in the set that need to be queried and 00342 // handleForcedServerDisconnection will be invoked. 00343 virtual void handleConnectedServer(ServerID sid); 00344 virtual void handleDisconnectedServer(ServerID sid); 00345 00346 void removeStaticObjectTimeout(const ObjectReference& objid); 00347 virtual void trySwapHandlers(bool is_local, const ObjectReference& objid, bool is_static) = 0; 00348 void handleCheckObjectClass(bool is_local, const ObjectReference& objid, const TimedMotionVector3f& newval); 00349 void processExpiredStaticObjectTimeouts(); 00350 00351 // Query-Type-Agnostic AggregateListener Interface -- manages adding to Loc 00352 // and passing to AggregateManager, but you need to delegate to these 00353 // yourself since the AggregateListener interface depends on the type of 00354 // query/query handler being used. 00355 virtual void aggregateCreated(const ObjectReference& objid); 00356 virtual void aggregateChildAdded(const ObjectReference& objid, const ObjectReference& child, const Vector3f& pos, const AggregateBoundingInfo& bnds); 00357 virtual void aggregateChildRemoved(const ObjectReference& objid, const ObjectReference& child, const Vector3f& pos, const AggregateBoundingInfo& bnds); 00358 virtual void aggregateBoundsUpdated(const ObjectReference& objid, const Vector3f& pos, const AggregateBoundingInfo& bnds); 00359 virtual void aggregateDestroyed(const ObjectReference& objid); 00360 virtual void aggregateObserved(const ObjectReference& objid, uint32 nobservers, uint32 nchildren); 00361 // Helper for updating aggregates 00362 void updateAggregateLoc(const ObjectReference& objid, const Vector3f& pos, const AggregateBoundingInfo& bnds); 00363 00364 // Command handlers 00365 virtual void commandProperties(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid) = 0; 00366 virtual void commandListHandlers(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid) = 0; 00367 virtual void commandForceRebuild(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid) = 0; 00368 virtual void commandListNodes(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid) = 0; 00369 virtual void commandStats(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00370 00371 }; // class LibproxProximityBase 00372 00373 } // namespace Sirikata 00374 00375 #endif //_SIRIKATA_LIBPROX_PROXIMITY_BASE_HPP_