Sirikata
|
00001 /* Sirikata 00002 * AlwaysLocationUpdatePolicy.hpp 00003 * 00004 * Copyright (c) 2009, Ewen Cheslack-Postava 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef _ALWAYS_LOCATION_UPDATE_POLICY_HPP_ 00034 #define _ALWAYS_LOCATION_UPDATE_POLICY_HPP_ 00035 00036 #include <sirikata/space/LocationService.hpp> 00037 #include <sirikata/core/options/CommonOptions.hpp> 00038 00039 #include "Protocol_Loc.pbj.hpp" 00040 00041 #define ALWAYS_POLICY_OPTIONS "always_location_update_policy" 00042 #define LOC_MAX_PER_RESULT "loc.max-per-result" 00043 00044 namespace Sirikata { 00045 00046 void InitAlwaysLocationUpdatePolicyOptions(); 00047 00051 class AlwaysLocationUpdatePolicy : public LocationUpdatePolicy { 00052 public: 00053 AlwaysLocationUpdatePolicy(SpaceContext* ctx, const String& args); 00054 virtual ~AlwaysLocationUpdatePolicy(); 00055 00056 virtual void start(); 00057 virtual void stop(); 00058 00059 virtual void subscribe(ServerID remote, const UUID& uuid, SeqNoPtr seqno); 00060 virtual void subscribe(ServerID remote, const UUID& uuid, ProxIndexID index_id, SeqNoPtr seqno); 00061 virtual void unsubscribe(ServerID remote, const UUID& uuid); 00062 virtual void unsubscribe(ServerID remote, const UUID& uuid, ProxIndexID index_id); 00063 virtual void unsubscribe(ServerID remote); 00064 00065 virtual void subscribe(const OHDP::NodeID& remote, const UUID& uuid); 00066 virtual void subscribe(const OHDP::NodeID& remote, const UUID& uuid, ProxIndexID index_id); 00067 virtual void unsubscribe(const OHDP::NodeID& remote, const UUID& uuid); 00068 virtual void unsubscribe(const OHDP::NodeID& remote, const UUID& uuid, ProxIndexID index_id); 00069 virtual void unsubscribe(const OHDP::NodeID& remote); 00070 00071 virtual void subscribe(const UUID& remote, const UUID& uuid); 00072 virtual void subscribe(const UUID& remote, const UUID& uuid, ProxIndexID index_id); 00073 virtual void unsubscribe(const UUID& remote, const UUID& uuid); 00074 virtual void unsubscribe(const UUID& remote, const UUID& uuid, ProxIndexID index_id); 00075 virtual void unsubscribe(const UUID& remote); 00076 00077 virtual void localObjectAdded(const UUID& uuid, bool agg, const TimedMotionVector3f& loc, const TimedMotionQuaternion& orient, const AggregateBoundingInfo& bounds, const String& mesh, const String& physics); 00078 virtual void localObjectRemoved(const UUID& uuid, bool agg); 00079 virtual void localLocationUpdated(const UUID& uuid, bool agg, const TimedMotionVector3f& newval); 00080 virtual void localOrientationUpdated(const UUID& uuid, bool agg, const TimedMotionQuaternion& newval); 00081 virtual void localBoundsUpdated(const UUID& uuid, bool agg, const AggregateBoundingInfo& newval); 00082 virtual void localMeshUpdated(const UUID& uuid, bool agg, const String& newval); 00083 virtual void localPhysicsUpdated(const UUID& uuid, bool agg, const String& newval); 00084 00085 virtual void replicaObjectAdded(const UUID& uuid, const TimedMotionVector3f& loc, const TimedMotionQuaternion& orient, const AggregateBoundingInfo& bounds, const String& mesh, const String& physics); 00086 virtual void replicaObjectRemoved(const UUID& uuid); 00087 virtual void replicaLocationUpdated(const UUID& uuid, const TimedMotionVector3f& newval); 00088 virtual void replicaOrientationUpdated(const UUID& uuid, const TimedMotionQuaternion& newval); 00089 virtual void replicaBoundsUpdated(const UUID& uuid, const AggregateBoundingInfo& newval); 00090 virtual void replicaMeshUpdated(const UUID& uuid, const String& newval); 00091 virtual void replicaPhysicsUpdated(const UUID& uuid, const String& newval); 00092 00093 virtual void service(); 00094 00095 private: 00096 void reportStats(); 00097 00098 00099 struct UpdateInfo { 00100 uint64 epoch; 00101 TimedMotionVector3f location; 00102 TimedMotionQuaternion orientation; 00103 AggregateBoundingInfo bounds; 00104 String mesh; 00105 String physics; 00106 }; 00107 00108 typedef std::set<UUID> UUIDSet; 00109 typedef std::set<ProxIndexID> ProxIndexSet; 00110 typedef std::map<UUID, ProxIndexSet> ObjectIndexesMap; 00111 00112 struct SubscriberInfo { 00113 SubscriberInfo(SeqNoPtr seq_number_ptr ) 00114 : seqnoPtr(seq_number_ptr) 00115 {} 00116 SeqNoPtr seqnoPtr; 00117 // Indexes this subscriber is observing each object in. This acts both 00118 // as a set of objects that this subscriber is observing (the keys) and 00119 // the list of indexes each object is being observed in. 00120 // 00121 // This needs to persist permanently between subscribe/unsubscribe calls 00122 // so we can specify them with each update we create. We keep them 00123 // separately from outstandingUpdates so we can clear out that data as 00124 // we use it up but keep this around. 00125 // 00126 // TODO(ewencp) we might be able to figure out some way to keep this 00127 // only as aggregate info, e.g. that an object with UUID has n 00128 // subscribers in index i, and always report i as long as n > 0. But 00129 // then clients may get updates for indices they aren't replicating and 00130 // we need some way to deal with orphan updates vs. extra updates that 00131 // aren't real orphans because we used aggregate data. 00132 ObjectIndexesMap objectIndexes; 00133 // Information about each object that we need to create and send an 00134 // update about 00135 std::map<UUID, UpdateInfo> outstandingUpdates; 00136 00137 // Indicates that there are no subscriptions for this object left, 00138 // allowing us to clear out its entry 00139 bool noSubscriptionsLeft() const { 00140 return objectIndexes.empty(); 00141 } 00142 }; 00143 typedef std::tr1::shared_ptr<SubscriberInfo> SubscriberInfoPtr; 00144 00145 // Sometimes a subscriber may stall or hang, leaving the underlying 00146 // connection open but not handling loc update substreams. In this 00147 // case, we can end up generating a ton of update streams that fail 00148 // and eat up a bunch of our processor just looping for 00149 // retries. With objects moving, we could get arbitarily many 00150 // outstanding updates since once the substream request is started 00151 // it frees up the spot in the outstandingUpdates map above, 00152 // allowing more for the same object to be sent. To protect against 00153 // this, we track how many loc update messages are outstanding and 00154 // stall updates while we're waiting for them to return (or fail!). 00155 static long numOutstandingMessages(const SubscriberInfoPtr& sub_info) { 00156 return sub_info.use_count()-1; 00157 } 00158 00159 template<typename SubscriberType> 00160 struct SubscriberIndex { 00161 AlwaysLocationUpdatePolicy* parent; 00162 AtomicValue<uint32>& sent_count; 00163 typedef std::set<SubscriberType> SubscriberSet; 00164 typedef std::tr1::shared_ptr<SubscriberInfo> SubscriberInfoPtr; 00165 // Forward index: Subscriber -> Objects + Updates 00166 typedef std::map<SubscriberType, SubscriberInfoPtr> SubscriberMap; 00167 SubscriberMap mSubscriptions; 00168 // Reverse index: Objects -> Subscribers 00169 typedef std::map<UUID, SubscriberSet*> ObjectSubscribersMap; 00170 ObjectSubscribersMap mObjectSubscribers; 00171 00172 SubscriberIndex(AlwaysLocationUpdatePolicy* p, AtomicValue<uint32>& _sent_count) 00173 : parent(p), 00174 sent_count(_sent_count) 00175 { 00176 } 00177 00178 ~SubscriberIndex() { 00179 for(typename SubscriberMap::iterator sub_it = mSubscriptions.begin(); sub_it != mSubscriptions.end(); sub_it++) 00180 sub_it->second.reset(); 00181 mSubscriptions.clear(); 00182 00183 for(typename ObjectSubscribersMap::iterator sub_it = mObjectSubscribers.begin(); sub_it != mObjectSubscribers.end(); sub_it++) 00184 delete sub_it->second; 00185 mObjectSubscribers.clear(); 00186 } 00187 00188 void subscribe(const SubscriberType& remote, const UUID& uuid, SeqNoPtr seqnoPtr) { 00189 subscribe(remote, uuid, (ProxIndexID*)NULL, seqnoPtr); 00190 } 00191 void subscribe(const SubscriberType& remote, const UUID& uuid, ProxIndexID index_id, SeqNoPtr seqnoPtr) { 00192 subscribe(remote, uuid, &index_id, seqnoPtr); 00193 } 00194 00195 void subscribe(const SubscriberType& remote, const UUID& uuid, ProxIndexID* index_id, SeqNoPtr seqnoPtr) { 00196 // Make sure we have a record of this subscriber 00197 typename SubscriberMap::iterator sub_it = mSubscriptions.find(remote); 00198 if (sub_it == mSubscriptions.end()) { 00199 SubscriberInfoPtr sub_info(new SubscriberInfo(seqnoPtr)); 00200 mSubscriptions.insert(typename SubscriberMap::value_type(remote,sub_info)); 00201 00202 sub_it = mSubscriptions.find(remote); 00203 } 00204 00205 // Add object to server's subscription list, tracking which index 00206 // it's in 00207 typename ObjectIndexesMap::iterator indexes_it = sub_it->second->objectIndexes.find(uuid); 00208 if (indexes_it == sub_it->second->objectIndexes.end()) { 00209 sub_it->second->objectIndexes[uuid] = ProxIndexSet(); 00210 } 00211 else { 00212 // If we already have an entry for this subscriber then either 00213 // subscribing w/o indices (must have empty list of indices) or 00214 // w/ indices (if we already have an entry, the set must be 00215 // non-empty). 00216 assert((index_id == NULL && sub_it->second->objectIndexes[uuid].empty()) || 00217 (index_id != NULL && !sub_it->second->objectIndexes[uuid].empty())); 00218 } 00219 // If we are using indices, add this to the list 00220 if (index_id != NULL) 00221 sub_it->second->objectIndexes[uuid].insert(*index_id); 00222 00223 // Add server to object's subscribers list 00224 typename ObjectSubscribersMap::iterator obj_sub_it = mObjectSubscribers.find(uuid); 00225 if (obj_sub_it == mObjectSubscribers.end()) { 00226 mObjectSubscribers[uuid] = new SubscriberSet(); 00227 obj_sub_it = mObjectSubscribers.find(uuid); 00228 } 00229 SubscriberSet* obj_subs = obj_sub_it->second; 00230 obj_subs->insert(remote); 00231 00232 // Force an update. This is necessary because the subscription comes 00233 // in asynchronously from Proximity, so its possible the data sent 00234 // with the origin subscription is out of date by the time this 00235 // subscription occurs. Forcing an extra update handles this case. 00236 propertyUpdatedForSubscriber(uuid, parent->mLocService, remote, NULL); 00237 } 00238 00239 void unsubscribe(const SubscriberType& remote, const UUID& uuid) { 00240 unsubscribe(remote, uuid, (ProxIndexID*)NULL); 00241 } 00242 void unsubscribe(const SubscriberType& remote, const UUID& uuid, ProxIndexID index_id) { 00243 unsubscribe(remote, uuid, &index_id); 00244 } 00245 void unsubscribe(const SubscriberType& remote, const UUID& uuid, ProxIndexID* index_id) { 00246 // Remove object from server's list 00247 typename SubscriberMap::iterator sub_it = mSubscriptions.find(remote); 00248 if (sub_it != mSubscriptions.end()) { 00249 typename ObjectIndexesMap::iterator indexes_it = sub_it->second->objectIndexes.find(uuid); 00250 if (indexes_it != sub_it->second->objectIndexes.end()) { 00251 if (index_id != NULL) { 00252 // If we're using indexes, erase the index and 00253 // completely remove the object as being tracked if we 00254 // hit no indices marked as still tracking 00255 indexes_it->second.erase(*index_id); 00256 if (indexes_it->second.empty()) 00257 sub_it->second->objectIndexes.erase(indexes_it); 00258 } 00259 else { 00260 // Otherwise, we have one implicit index we're 00261 // tracking. This call is enough to remove it since we 00262 // can only have 1 subscription to it 00263 sub_it->second->objectIndexes.erase(indexes_it); 00264 } 00265 } 00266 } 00267 00268 // Remove server from object's list 00269 typename ObjectSubscribersMap::iterator obj_it = mObjectSubscribers.find(uuid); 00270 if (obj_it != mObjectSubscribers.end()) { 00271 SubscriberSet* subs = obj_it->second; 00272 subs->erase(remote); 00273 } 00274 } 00275 00276 void unsubscribe(const SubscriberType& remote) { 00277 typename SubscriberMap::iterator sub_it = mSubscriptions.find(remote); 00278 if (sub_it == mSubscriptions.end()) 00279 return; 00280 00281 SubscriberInfoPtr subs = sub_it->second; 00282 // We just need to clear out this subscriber's objectIndexes 00283 // (marking no more object subscriptions, whether we were tracking 00284 // indices or not). We don't use individual unsubscription calls 00285 // because they require the correct type of call -- with or without 00286 // indices. Instead just do the second half of what they would do 00287 // manually -- remove references to the objects subscribed to from 00288 // mObjectSubscribers' SubscriberSets 00289 for(typename ObjectIndexesMap::iterator obj_ind_it = subs->objectIndexes.begin(); obj_ind_it != subs->objectIndexes.end(); obj_ind_it++) { 00290 UUID uuid = obj_ind_it->first; 00291 // Remove server from object's list 00292 typename ObjectSubscribersMap::iterator obj_it = mObjectSubscribers.find(uuid); 00293 if (obj_it != mObjectSubscribers.end()) { 00294 SubscriberSet* subs = obj_it->second; 00295 subs->erase(remote); 00296 } 00297 } 00298 // And then actually clear out the list of subscriptions 00299 subs->objectIndexes.clear(); 00300 00301 // Just drop any outstanding updates we have left. They 00302 // are useless if we're using tree replication since we 00303 // just destroyed the information about indices the 00304 // updates apply to. Even in basic queries, this doesn't 00305 // matter because the querier will just be left with stale 00306 // data, which they would have been anyway. 00307 mSubscriptions.erase(sub_it); 00308 } 00309 00310 typedef std::tr1::function<void(UpdateInfo&)> UpdateFunctor; 00311 // Generic version of an update - adds updates per-subscriber as 00312 // necessary and calls the UpdateFunctor to trigger the particular 00313 // update to values. 00314 void propertyUpdated(const UUID& uuid, LocationService* locservice, UpdateFunctor fup) { 00315 // Add the update to each subscribed object 00316 typename ObjectSubscribersMap::iterator obj_sub_it = mObjectSubscribers.find(uuid); 00317 if (obj_sub_it == mObjectSubscribers.end()) return; 00318 00319 SubscriberSet* object_subscribers = obj_sub_it->second; 00320 00321 for(typename SubscriberSet::iterator subscriber_it = object_subscribers->begin(); subscriber_it != object_subscribers->end(); subscriber_it++) { 00322 propertyUpdatedForSubscriber(uuid, locservice, *subscriber_it, fup); 00323 } 00324 } 00325 00326 // Update of location information for individual subscriber. Note that 00327 // this should only be used in special cases -- mainly to handle when a 00328 // new subscriber is added. Otherwise its just a utility for the normal 00329 // update method above. 00330 void propertyUpdatedForSubscriber(const UUID& uuid, LocationService* locservice, SubscriberType sub, UpdateFunctor fup) { 00331 if (mSubscriptions.find(sub) == mSubscriptions.end()) return; // XXX FIXME 00332 assert(mSubscriptions.find(sub) != mSubscriptions.end()); 00333 std::tr1::shared_ptr<SubscriberInfo> sub_info = mSubscriptions[sub]; 00334 if (sub_info->objectIndexes.find(uuid) == sub_info->objectIndexes.end()) return; // XXX FIXME 00335 assert(sub_info->objectIndexes.find(uuid) != sub_info->objectIndexes.end()); 00336 00337 if (sub_info->outstandingUpdates.find(uuid) == sub_info->outstandingUpdates.end()) { 00338 UpdateInfo new_ui; 00339 new_ui.epoch = locservice->epoch(uuid); 00340 new_ui.location = locservice->location(uuid); 00341 new_ui.bounds = locservice->bounds(uuid); 00342 new_ui.mesh = locservice->mesh(uuid); 00343 new_ui.orientation = locservice->orientation(uuid); 00344 new_ui.physics = locservice->physics(uuid); 00345 sub_info->outstandingUpdates[uuid] = new_ui; 00346 } 00347 else 00348 UpdateInfo& ui = sub_info->outstandingUpdates[uuid]; 00349 00350 00351 UpdateInfo& ui = sub_info->outstandingUpdates[uuid]; 00352 if (fup) 00353 fup(ui); 00354 } 00355 00356 static void setUILocation(UpdateInfo& ui, const TimedMotionVector3f& newval) {ui.location = newval; } 00357 static void setUIOrientation(UpdateInfo& ui, const TimedMotionQuaternion& newval) { ui.orientation = newval; } 00358 static void setUIBounds(UpdateInfo& ui, const AggregateBoundingInfo& newval) { ui.bounds = newval; } 00359 static void setUIMesh(UpdateInfo& ui, const String& newval) {ui.mesh = newval;} 00360 static void setUIPhysics(UpdateInfo& ui, const String& newval) {ui.physics = newval;} 00361 00362 void locationUpdated(const UUID& uuid, const TimedMotionVector3f& newval, LocationService* locservice) { 00363 propertyUpdated( 00364 uuid, locservice, 00365 std::tr1::bind(&setUILocation, std::tr1::placeholders::_1, newval) 00366 ); 00367 } 00368 00369 void orientationUpdated(const UUID& uuid, const TimedMotionQuaternion& newval, LocationService* locservice) { 00370 propertyUpdated( 00371 uuid, locservice, 00372 std::tr1::bind(&setUIOrientation, std::tr1::placeholders::_1, newval) 00373 ); 00374 } 00375 00376 void boundsUpdated(const UUID& uuid, const AggregateBoundingInfo& newval, LocationService* locservice) { 00377 propertyUpdated( 00378 uuid, locservice, 00379 std::tr1::bind(&setUIBounds, std::tr1::placeholders::_1, newval) 00380 ); 00381 } 00382 00383 void meshUpdated(const UUID& uuid, const String& newval, LocationService* locservice) { 00384 propertyUpdated( 00385 uuid, locservice, 00386 std::tr1::bind(&setUIMesh, std::tr1::placeholders::_1, newval) 00387 ); 00388 } 00389 00390 void physicsUpdated(const UUID& uuid, const String& newval, LocationService* locservice) { 00391 propertyUpdated( 00392 uuid, locservice, 00393 std::tr1::bind(&setUIPhysics, std::tr1::placeholders::_1, newval) 00394 ); 00395 } 00396 00397 00398 void service() { 00399 uint32 max_updates = GetOptionValue<uint32>(ALWAYS_POLICY_OPTIONS, LOC_MAX_PER_RESULT); 00400 const uint32 outstanding_message_hard_limit = 64; 00401 const uint32 outstanding_message_soft_limit = 25; 00402 00403 std::list<SubscriberType> to_delete; 00404 00405 for(typename SubscriberMap::iterator server_it = mSubscriptions.begin(); server_it != mSubscriptions.end(); server_it++) { 00406 SubscriberType sid = server_it->first; 00407 std::tr1::shared_ptr<SubscriberInfo> sub_info = server_it->second; 00408 00409 // We can end up with leftover updates after a subscriber has 00410 // already disconnected. We need to ignore them if we're not 00411 // even going to be able to send the messages. 00412 if (!parent->validSubscriber(sid)) { 00413 sub_info->outstandingUpdates.clear(); 00414 if (sub_info->noSubscriptionsLeft()) { 00415 sub_info.reset(); 00416 to_delete.push_back(sid); 00417 } 00418 continue; 00419 } 00420 00421 Sirikata::Protocol::Loc::BulkLocationUpdate bulk_update; 00422 00423 bool send_failed = false; 00424 std::map<UUID, UpdateInfo>::iterator last_shipped = sub_info->outstandingUpdates.begin(); 00425 for(std::map<UUID, UpdateInfo>::iterator up_it = sub_info->outstandingUpdates.begin(); 00426 numOutstandingMessages(sub_info) < outstanding_message_soft_limit && up_it != sub_info->outstandingUpdates.end(); 00427 up_it++) 00428 { 00429 Sirikata::Protocol::Loc::ILocationUpdate update = bulk_update.add_update(); 00430 update.set_object(up_it->first); 00431 00432 //write and update sequence number 00433 update.set_seqno( (*(sub_info->seqnoPtr)) ++ ); 00434 00435 if (parent->isSelfSubscriber(sid, up_it->first)) 00436 update.set_epoch(up_it->second.epoch); 00437 00438 // If we're tracking indexes (tree replication), add the 00439 // list in 00440 typename ObjectIndexesMap::iterator obj_ind_it = sub_info->objectIndexes.find(up_it->first); 00441 if (obj_ind_it != sub_info->objectIndexes.end()) { 00442 for (typename ProxIndexSet::iterator prox_idx_it = obj_ind_it->second.begin(); prox_idx_it != obj_ind_it->second.end(); prox_idx_it++) 00443 update.add_index_id((uint32)*prox_idx_it); 00444 } 00445 00446 Sirikata::Protocol::ITimedMotionVector location = update.mutable_location(); 00447 location.set_t(up_it->second.location.updateTime()); 00448 location.set_position(up_it->second.location.position()); 00449 00450 location.set_velocity(up_it->second.location.velocity()); 00451 00452 Sirikata::Protocol::ITimedMotionQuaternion orientation = update.mutable_orientation(); 00453 orientation.set_t(up_it->second.orientation.updateTime()); 00454 orientation.set_position(up_it->second.orientation.position()); 00455 orientation.set_velocity(up_it->second.orientation.velocity()); 00456 00457 Sirikata::Protocol::IAggregateBoundingInfo msg_bounds = update.mutable_aggregate_bounds(); 00458 msg_bounds.set_center_offset(up_it->second.bounds.centerOffset); 00459 msg_bounds.set_center_bounds_radius(up_it->second.bounds.centerBoundsRadius); 00460 msg_bounds.set_max_object_size(up_it->second.bounds.maxObjectRadius); 00461 00462 update.set_mesh(up_it->second.mesh); 00463 update.set_physics(up_it->second.physics); 00464 00465 // If we hit the limit for this update, try to send it out 00466 if (bulk_update.update_size() > (int32)max_updates) { 00467 bool sent = parent->trySend(sid, bulk_update, sub_info); 00468 if (!sent) { 00469 send_failed = true; 00470 break; 00471 } 00472 else { 00473 bulk_update = Sirikata::Protocol::Loc::BulkLocationUpdate(); // clear it out 00474 last_shipped = up_it; 00475 sent_count++; 00476 } 00477 } 00478 } 00479 00480 // Try to send the last few if necessary/possible 00481 if (numOutstandingMessages(sub_info) < outstanding_message_hard_limit && !send_failed && bulk_update.update_size() > 0) { 00482 bool sent = parent->trySend(sid, bulk_update, sub_info); 00483 if (sent) { 00484 last_shipped = sub_info->outstandingUpdates.end(); 00485 sent_count++; 00486 } 00487 } 00488 00489 // Finally clear out any entries successfully sent out 00490 sub_info->outstandingUpdates.erase( sub_info->outstandingUpdates.begin(), last_shipped); 00491 00492 if (sub_info->noSubscriptionsLeft() && sub_info->outstandingUpdates.empty()) { 00493 sub_info.reset(); 00494 to_delete.push_back(sid); 00495 } 00496 } 00497 00498 for(typename std::list<SubscriberType>::iterator it = to_delete.begin(); it != to_delete.end(); it++) 00499 mSubscriptions.erase(*it); 00500 } 00501 00502 }; 00503 00504 void tryCreateChildStream(const UUID& dest, ODPSST::Stream::Ptr parent_stream, std::string* msg, int count, const SubscriberInfoPtr&numOutstandingMessageCount); 00505 void objectLocSubstreamCallback(int x, ODPSST::Stream::Ptr substream, const UUID& dest, ODPSST::Stream::Ptr parent_substream, std::string* msg, int count, const SubscriberInfoPtr&numOutstandingMessageCount); 00506 void tryCreateChildStream(const OHDP::NodeID& dest, OHDPSST::Stream::Ptr parent_stream, std::string* msg, int count, const SubscriberInfoPtr&numOutstandingMessageCount); 00507 void ohLocSubstreamCallback(int x, OHDPSST::Stream::Ptr substream, const OHDP::NodeID& dest, OHDPSST::Stream::Ptr parent_substream, std::string* msg, int count, const SubscriberInfoPtr&numOutstandingMessageCount); 00508 00509 bool validSubscriber(const UUID& dest); 00510 bool validSubscriber(const OHDP::NodeID& dest); 00511 bool validSubscriber(const ServerID& dest); 00512 00513 bool isSelfSubscriber(const UUID& sid, const UUID& observed); 00514 bool isSelfSubscriber(const OHDP::NodeID& sid, const UUID& observed); 00515 bool isSelfSubscriber(const ServerID& sid, const UUID& observed); 00516 00517 bool trySend(const UUID& dest, const Sirikata::Protocol::Loc::BulkLocationUpdate& blu, const SubscriberInfoPtr& numOutstandingMessageCount); 00518 bool trySend(const OHDP::NodeID& dest, const Sirikata::Protocol::Loc::BulkLocationUpdate& blu, const SubscriberInfoPtr& numOutstandingMessageCount); 00519 bool trySend(const ServerID& dest, const Sirikata::Protocol::Loc::BulkLocationUpdate& blu, const SubscriberInfoPtr& numOutstandingMessageCount); 00520 Poller mStatsPoller; 00521 Time mLastStatsTime; 00522 const String mTimeSeriesServerUpdatesName; 00523 AtomicValue<uint32> mServerUpdatesPerSecond; 00524 const String mTimeSeriesOHUpdatesName; 00525 AtomicValue<uint32> mOHUpdatesPerSecond; 00526 const String mTimeSeriesObjectUpdatesName; 00527 AtomicValue<uint32> mObjectUpdatesPerSecond; 00528 00529 typedef SubscriberIndex<ServerID> ServerSubscriberIndex; 00530 ServerSubscriberIndex mServerSubscriptions; 00531 00532 typedef SubscriberIndex<OHDP::NodeID> OHSubscriberIndex; 00533 OHSubscriberIndex mOHSubscriptions; 00534 00535 typedef SubscriberIndex<UUID> ObjectSubscriberIndex; 00536 ObjectSubscriberIndex mObjectSubscriptions; 00537 }; // class AlwaysLocationUpdatePolicy 00538 00539 } // namespace Sirikata 00540 00541 #endif //_ALWAYS_LOCATION_UPDATE_POLICY_HPP_