Sirikata
libspace/plugins/standard/AlwaysLocationUpdatePolicy.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  AlwaysLocationUpdatePolicy.hpp
00003  *
00004  *  Copyright (c) 2009, Ewen Cheslack-Postava
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef _ALWAYS_LOCATION_UPDATE_POLICY_HPP_
00034 #define _ALWAYS_LOCATION_UPDATE_POLICY_HPP_
00035 
00036 #include <sirikata/space/LocationService.hpp>
00037 #include <sirikata/core/options/CommonOptions.hpp>
00038 
00039 #include "Protocol_Loc.pbj.hpp"
00040 
00041 #define ALWAYS_POLICY_OPTIONS      "always_location_update_policy"
00042 #define LOC_MAX_PER_RESULT         "loc.max-per-result"
00043 
00044 namespace Sirikata {
00045 
00046 void InitAlwaysLocationUpdatePolicyOptions();
00047 
00051 class AlwaysLocationUpdatePolicy : public LocationUpdatePolicy {
00052 public:
00053     AlwaysLocationUpdatePolicy(SpaceContext* ctx, const String& args);
00054     virtual ~AlwaysLocationUpdatePolicy();
00055 
00056     virtual void start();
00057     virtual void stop();
00058 
00059     virtual void subscribe(ServerID remote, const UUID& uuid, SeqNoPtr seqno);
00060     virtual void subscribe(ServerID remote, const UUID& uuid, ProxIndexID index_id, SeqNoPtr seqno);
00061     virtual void unsubscribe(ServerID remote, const UUID& uuid);
00062     virtual void unsubscribe(ServerID remote, const UUID& uuid, ProxIndexID index_id);
00063     virtual void unsubscribe(ServerID remote);
00064 
00065     virtual void subscribe(const OHDP::NodeID& remote, const UUID& uuid);
00066     virtual void subscribe(const OHDP::NodeID& remote, const UUID& uuid, ProxIndexID index_id);
00067     virtual void unsubscribe(const OHDP::NodeID& remote, const UUID& uuid);
00068     virtual void unsubscribe(const OHDP::NodeID& remote, const UUID& uuid, ProxIndexID index_id);
00069     virtual void unsubscribe(const OHDP::NodeID& remote);
00070 
00071     virtual void subscribe(const UUID& remote, const UUID& uuid);
00072     virtual void subscribe(const UUID& remote, const UUID& uuid, ProxIndexID index_id);
00073     virtual void unsubscribe(const UUID& remote, const UUID& uuid);
00074     virtual void unsubscribe(const UUID& remote, const UUID& uuid, ProxIndexID index_id);
00075     virtual void unsubscribe(const UUID& remote);
00076 
00077     virtual void localObjectAdded(const UUID& uuid, bool agg, const TimedMotionVector3f& loc, const TimedMotionQuaternion& orient, const AggregateBoundingInfo& bounds, const String& mesh, const String& physics);
00078     virtual void localObjectRemoved(const UUID& uuid, bool agg);
00079     virtual void localLocationUpdated(const UUID& uuid, bool agg, const TimedMotionVector3f& newval);
00080     virtual void localOrientationUpdated(const UUID& uuid, bool agg, const TimedMotionQuaternion& newval);
00081     virtual void localBoundsUpdated(const UUID& uuid, bool agg, const AggregateBoundingInfo& newval);
00082     virtual void localMeshUpdated(const UUID& uuid, bool agg, const String& newval);
00083     virtual void localPhysicsUpdated(const UUID& uuid, bool agg, const String& newval);
00084 
00085     virtual void replicaObjectAdded(const UUID& uuid, const TimedMotionVector3f& loc, const TimedMotionQuaternion& orient, const AggregateBoundingInfo& bounds, const String& mesh, const String& physics);
00086     virtual void replicaObjectRemoved(const UUID& uuid);
00087     virtual void replicaLocationUpdated(const UUID& uuid, const TimedMotionVector3f& newval);
00088     virtual void replicaOrientationUpdated(const UUID& uuid, const TimedMotionQuaternion& newval);
00089     virtual void replicaBoundsUpdated(const UUID& uuid, const AggregateBoundingInfo& newval);
00090     virtual void replicaMeshUpdated(const UUID& uuid, const String& newval);
00091     virtual void replicaPhysicsUpdated(const UUID& uuid, const String& newval);
00092 
00093     virtual void service();
00094 
00095 private:
00096     void reportStats();
00097 
00098 
00099     struct UpdateInfo {
00100         uint64 epoch;
00101         TimedMotionVector3f location;
00102         TimedMotionQuaternion orientation;
00103         AggregateBoundingInfo bounds;
00104         String mesh;
00105         String physics;
00106     };
00107 
00108     typedef std::set<UUID> UUIDSet;
00109     typedef std::set<ProxIndexID> ProxIndexSet;
00110     typedef std::map<UUID, ProxIndexSet> ObjectIndexesMap;
00111 
00112     struct SubscriberInfo {
00113         SubscriberInfo(SeqNoPtr seq_number_ptr )
00114             : seqnoPtr(seq_number_ptr)
00115         {}
00116         SeqNoPtr seqnoPtr;
00117         // Indexes this subscriber is observing each object in. This acts both
00118         // as a set of objects that this subscriber is observing (the keys) and
00119         // the list of indexes each object is being observed in.
00120         //
00121         // This needs to persist permanently between subscribe/unsubscribe calls
00122         // so we can specify them with each update we create. We keep them
00123         // separately from outstandingUpdates so we can clear out that data as
00124         // we use it up but keep this around.
00125         //
00126         // TODO(ewencp) we might be able to figure out some way to keep this
00127         // only as aggregate info, e.g. that an object with UUID has n
00128         // subscribers in index i, and always report i as long as n > 0. But
00129         // then clients may get updates for indices they aren't replicating and
00130         // we need some way to deal with orphan updates vs. extra updates that
00131         // aren't real orphans because we used aggregate data.
00132         ObjectIndexesMap objectIndexes;
00133         // Information about each object that we need to create and send an
00134         // update about
00135         std::map<UUID, UpdateInfo> outstandingUpdates;
00136 
00137         // Indicates that there are no subscriptions for this object left,
00138         // allowing us to clear out its entry
00139         bool noSubscriptionsLeft() const {
00140             return objectIndexes.empty();
00141         }
00142     };
00143     typedef std::tr1::shared_ptr<SubscriberInfo> SubscriberInfoPtr;
00144 
00145     // Sometimes a subscriber may stall or hang, leaving the underlying
00146     // connection open but not handling loc update substreams. In this
00147     // case, we can end up generating a ton of update streams that fail
00148     // and eat up a bunch of our processor just looping for
00149     // retries. With objects moving, we could get arbitarily many
00150     // outstanding updates since once the substream request is started
00151     // it frees up the spot in the outstandingUpdates map above,
00152     // allowing more for the same object to be sent. To protect against
00153     // this, we track how many loc update messages are outstanding and
00154     // stall updates while we're waiting for them to return (or fail!).
00155     static long numOutstandingMessages(const SubscriberInfoPtr& sub_info) {
00156         return sub_info.use_count()-1;
00157     }
00158 
00159     template<typename SubscriberType>
00160     struct SubscriberIndex {
00161         AlwaysLocationUpdatePolicy* parent;
00162         AtomicValue<uint32>& sent_count;
00163         typedef std::set<SubscriberType> SubscriberSet;
00164         typedef std::tr1::shared_ptr<SubscriberInfo> SubscriberInfoPtr;
00165         // Forward index: Subscriber -> Objects + Updates
00166         typedef std::map<SubscriberType, SubscriberInfoPtr> SubscriberMap;
00167         SubscriberMap mSubscriptions;
00168         // Reverse index: Objects -> Subscribers
00169         typedef std::map<UUID, SubscriberSet*> ObjectSubscribersMap;
00170         ObjectSubscribersMap mObjectSubscribers;
00171 
00172         SubscriberIndex(AlwaysLocationUpdatePolicy* p, AtomicValue<uint32>& _sent_count)
00173          : parent(p),
00174            sent_count(_sent_count)
00175         {
00176         }
00177 
00178         ~SubscriberIndex() {
00179             for(typename SubscriberMap::iterator sub_it = mSubscriptions.begin(); sub_it != mSubscriptions.end(); sub_it++)
00180                 sub_it->second.reset();
00181             mSubscriptions.clear();
00182 
00183             for(typename ObjectSubscribersMap::iterator sub_it = mObjectSubscribers.begin(); sub_it != mObjectSubscribers.end(); sub_it++)
00184                 delete sub_it->second;
00185             mObjectSubscribers.clear();
00186         }
00187 
00188         void subscribe(const SubscriberType& remote, const UUID& uuid, SeqNoPtr seqnoPtr) {
00189             subscribe(remote, uuid, (ProxIndexID*)NULL, seqnoPtr);
00190         }
00191         void subscribe(const SubscriberType& remote, const UUID& uuid, ProxIndexID index_id, SeqNoPtr seqnoPtr) {
00192             subscribe(remote, uuid, &index_id, seqnoPtr);
00193         }
00194 
00195         void subscribe(const SubscriberType& remote, const UUID& uuid, ProxIndexID* index_id, SeqNoPtr seqnoPtr) {
00196             // Make sure we have a record of this subscriber
00197             typename SubscriberMap::iterator sub_it = mSubscriptions.find(remote);
00198             if (sub_it == mSubscriptions.end()) {
00199                 SubscriberInfoPtr sub_info(new SubscriberInfo(seqnoPtr));
00200                 mSubscriptions.insert(typename SubscriberMap::value_type(remote,sub_info));
00201 
00202                 sub_it = mSubscriptions.find(remote);
00203             }
00204 
00205             // Add object to server's subscription list, tracking which index
00206             // it's in
00207             typename ObjectIndexesMap::iterator indexes_it = sub_it->second->objectIndexes.find(uuid);
00208             if (indexes_it == sub_it->second->objectIndexes.end()) {
00209                 sub_it->second->objectIndexes[uuid] = ProxIndexSet();
00210             }
00211             else {
00212                 // If we already have an entry for this subscriber then either
00213                 // subscribing w/o indices (must have empty list of indices) or
00214                 // w/ indices (if we already have an entry, the set must be
00215                 // non-empty).
00216                 assert((index_id == NULL && sub_it->second->objectIndexes[uuid].empty()) ||
00217                     (index_id != NULL && !sub_it->second->objectIndexes[uuid].empty()));
00218             }
00219             // If we are using indices, add this to the list
00220             if (index_id != NULL)
00221                 sub_it->second->objectIndexes[uuid].insert(*index_id);
00222 
00223             // Add server to object's subscribers list
00224             typename ObjectSubscribersMap::iterator obj_sub_it = mObjectSubscribers.find(uuid);
00225             if (obj_sub_it == mObjectSubscribers.end()) {
00226                 mObjectSubscribers[uuid] = new SubscriberSet();
00227                 obj_sub_it = mObjectSubscribers.find(uuid);
00228             }
00229             SubscriberSet* obj_subs = obj_sub_it->second;
00230             obj_subs->insert(remote);
00231 
00232             // Force an update. This is necessary because the subscription comes
00233             // in asynchronously from Proximity, so its possible the data sent
00234             // with the origin subscription is out of date by the time this
00235             // subscription occurs. Forcing an extra update handles this case.
00236             propertyUpdatedForSubscriber(uuid, parent->mLocService, remote, NULL);
00237         }
00238 
00239         void unsubscribe(const SubscriberType& remote, const UUID& uuid) {
00240             unsubscribe(remote, uuid, (ProxIndexID*)NULL);
00241         }
00242         void unsubscribe(const SubscriberType& remote, const UUID& uuid, ProxIndexID index_id) {
00243             unsubscribe(remote, uuid, &index_id);
00244         }
00245         void unsubscribe(const SubscriberType& remote, const UUID& uuid, ProxIndexID* index_id) {
00246             // Remove object from server's list
00247             typename SubscriberMap::iterator sub_it = mSubscriptions.find(remote);
00248             if (sub_it != mSubscriptions.end()) {
00249                 typename ObjectIndexesMap::iterator indexes_it = sub_it->second->objectIndexes.find(uuid);
00250                 if (indexes_it != sub_it->second->objectIndexes.end()) {
00251                     if (index_id != NULL) {
00252                         // If we're using indexes, erase the index and
00253                         // completely remove the object as being tracked if we
00254                         // hit no indices marked as still tracking
00255                         indexes_it->second.erase(*index_id);
00256                         if (indexes_it->second.empty())
00257                             sub_it->second->objectIndexes.erase(indexes_it);
00258                     }
00259                     else {
00260                         // Otherwise, we have one implicit index we're
00261                         // tracking. This call is enough to remove it since we
00262                         // can only have 1 subscription to it
00263                         sub_it->second->objectIndexes.erase(indexes_it);
00264                     }
00265                 }
00266             }
00267 
00268             // Remove server from object's list
00269             typename ObjectSubscribersMap::iterator obj_it = mObjectSubscribers.find(uuid);
00270             if (obj_it != mObjectSubscribers.end()) {
00271                 SubscriberSet* subs = obj_it->second;
00272                 subs->erase(remote);
00273             }
00274         }
00275 
00276         void unsubscribe(const SubscriberType& remote) {
00277             typename SubscriberMap::iterator sub_it = mSubscriptions.find(remote);
00278             if (sub_it == mSubscriptions.end())
00279                 return;
00280 
00281             SubscriberInfoPtr subs = sub_it->second;
00282             // We just need to clear out this subscriber's objectIndexes
00283             // (marking no more object subscriptions, whether we were tracking
00284             // indices or not). We don't use individual unsubscription calls
00285             // because they require the correct type of call -- with or without
00286             // indices. Instead just do the second half of what they would do
00287             // manually -- remove references to the objects subscribed to from
00288             // mObjectSubscribers' SubscriberSets
00289             for(typename ObjectIndexesMap::iterator obj_ind_it = subs->objectIndexes.begin(); obj_ind_it != subs->objectIndexes.end(); obj_ind_it++) {
00290                 UUID uuid = obj_ind_it->first;
00291                 // Remove server from object's list
00292                 typename ObjectSubscribersMap::iterator obj_it = mObjectSubscribers.find(uuid);
00293                 if (obj_it != mObjectSubscribers.end()) {
00294                     SubscriberSet* subs = obj_it->second;
00295                     subs->erase(remote);
00296                 }
00297             }
00298             // And then actually clear out the list of subscriptions
00299             subs->objectIndexes.clear();
00300 
00301             // Just drop any outstanding updates we have left. They
00302             // are useless if we're using tree replication since we
00303             // just destroyed the information about indices the
00304             // updates apply to. Even in basic queries, this doesn't
00305             // matter because the querier will just be left with stale
00306             // data, which they would have been anyway.
00307             mSubscriptions.erase(sub_it);
00308         }
00309 
00310         typedef std::tr1::function<void(UpdateInfo&)> UpdateFunctor;
00311         // Generic version of an update - adds updates per-subscriber as
00312         // necessary and calls the UpdateFunctor to trigger the particular
00313         // update to values.
00314         void propertyUpdated(const UUID& uuid, LocationService* locservice, UpdateFunctor fup) {
00315             // Add the update to each subscribed object
00316             typename ObjectSubscribersMap::iterator obj_sub_it = mObjectSubscribers.find(uuid);
00317             if (obj_sub_it == mObjectSubscribers.end()) return;
00318 
00319             SubscriberSet* object_subscribers = obj_sub_it->second;
00320 
00321             for(typename SubscriberSet::iterator subscriber_it = object_subscribers->begin(); subscriber_it != object_subscribers->end(); subscriber_it++) {
00322                 propertyUpdatedForSubscriber(uuid, locservice, *subscriber_it, fup);
00323             }
00324         }
00325 
00326         // Update of location information for individual subscriber. Note that
00327         // this should only be used in special cases -- mainly to handle when a
00328         // new subscriber is added.  Otherwise its just a utility for the normal
00329         // update method above.
00330         void propertyUpdatedForSubscriber(const UUID& uuid, LocationService* locservice, SubscriberType sub, UpdateFunctor fup) {
00331             if (mSubscriptions.find(sub) == mSubscriptions.end()) return; // XXX FIXME
00332             assert(mSubscriptions.find(sub) != mSubscriptions.end());
00333             std::tr1::shared_ptr<SubscriberInfo> sub_info = mSubscriptions[sub];
00334             if (sub_info->objectIndexes.find(uuid) == sub_info->objectIndexes.end()) return; // XXX FIXME
00335             assert(sub_info->objectIndexes.find(uuid) != sub_info->objectIndexes.end());
00336 
00337             if (sub_info->outstandingUpdates.find(uuid) == sub_info->outstandingUpdates.end()) {
00338                 UpdateInfo new_ui;
00339                 new_ui.epoch = locservice->epoch(uuid);
00340                 new_ui.location = locservice->location(uuid);
00341                 new_ui.bounds = locservice->bounds(uuid);
00342                 new_ui.mesh = locservice->mesh(uuid);
00343                 new_ui.orientation = locservice->orientation(uuid);
00344                 new_ui.physics = locservice->physics(uuid);
00345                 sub_info->outstandingUpdates[uuid] = new_ui;
00346             }
00347             else
00348                 UpdateInfo& ui = sub_info->outstandingUpdates[uuid];
00349 
00350 
00351             UpdateInfo& ui = sub_info->outstandingUpdates[uuid];
00352             if (fup)
00353                 fup(ui);
00354         }
00355 
00356         static void setUILocation(UpdateInfo& ui, const TimedMotionVector3f& newval) {ui.location = newval; }
00357         static void setUIOrientation(UpdateInfo& ui, const TimedMotionQuaternion& newval) { ui.orientation = newval; }
00358         static void setUIBounds(UpdateInfo& ui, const AggregateBoundingInfo& newval) { ui.bounds = newval; }
00359         static void setUIMesh(UpdateInfo& ui, const String& newval) {ui.mesh = newval;}
00360         static void setUIPhysics(UpdateInfo& ui, const String& newval) {ui.physics = newval;}
00361 
00362         void locationUpdated(const UUID& uuid, const TimedMotionVector3f& newval, LocationService* locservice) {
00363             propertyUpdated(
00364                 uuid, locservice,
00365                 std::tr1::bind(&setUILocation, std::tr1::placeholders::_1, newval)
00366             );
00367         }
00368 
00369         void orientationUpdated(const UUID& uuid, const TimedMotionQuaternion& newval, LocationService* locservice) {
00370             propertyUpdated(
00371                 uuid, locservice,
00372                 std::tr1::bind(&setUIOrientation, std::tr1::placeholders::_1, newval)
00373             );
00374         }
00375 
00376         void boundsUpdated(const UUID& uuid, const AggregateBoundingInfo& newval, LocationService* locservice) {
00377             propertyUpdated(
00378                 uuid, locservice,
00379                 std::tr1::bind(&setUIBounds, std::tr1::placeholders::_1, newval)
00380             );
00381         }
00382 
00383         void meshUpdated(const UUID& uuid, const String& newval, LocationService* locservice) {
00384             propertyUpdated(
00385                 uuid, locservice,
00386                 std::tr1::bind(&setUIMesh, std::tr1::placeholders::_1, newval)
00387             );
00388         }
00389 
00390         void physicsUpdated(const UUID& uuid, const String& newval, LocationService* locservice) {
00391             propertyUpdated(
00392                 uuid, locservice,
00393                 std::tr1::bind(&setUIPhysics, std::tr1::placeholders::_1, newval)
00394             );
00395         }
00396 
00397 
00398         void service() {
00399             uint32 max_updates = GetOptionValue<uint32>(ALWAYS_POLICY_OPTIONS, LOC_MAX_PER_RESULT);
00400             const uint32 outstanding_message_hard_limit = 64;
00401             const uint32 outstanding_message_soft_limit = 25;
00402 
00403             std::list<SubscriberType> to_delete;
00404 
00405             for(typename SubscriberMap::iterator server_it = mSubscriptions.begin(); server_it != mSubscriptions.end(); server_it++) {
00406                 SubscriberType sid = server_it->first;
00407                 std::tr1::shared_ptr<SubscriberInfo> sub_info = server_it->second;
00408 
00409                 // We can end up with leftover updates after a subscriber has
00410                 // already disconnected. We need to ignore them if we're not
00411                 // even going to be able to send the messages.
00412                 if (!parent->validSubscriber(sid)) {
00413                     sub_info->outstandingUpdates.clear();
00414                     if (sub_info->noSubscriptionsLeft()) {
00415                         sub_info.reset();
00416                         to_delete.push_back(sid);
00417                     }
00418                     continue;
00419                 }
00420 
00421                 Sirikata::Protocol::Loc::BulkLocationUpdate bulk_update;
00422 
00423                 bool send_failed = false;
00424                 std::map<UUID, UpdateInfo>::iterator last_shipped = sub_info->outstandingUpdates.begin();
00425                 for(std::map<UUID, UpdateInfo>::iterator up_it = sub_info->outstandingUpdates.begin();
00426                     numOutstandingMessages(sub_info) < outstanding_message_soft_limit && up_it != sub_info->outstandingUpdates.end();
00427                     up_it++)
00428                 {
00429                     Sirikata::Protocol::Loc::ILocationUpdate update = bulk_update.add_update();
00430                     update.set_object(up_it->first);
00431 
00432                     //write and update sequence number
00433                     update.set_seqno( (*(sub_info->seqnoPtr)) ++ );
00434 
00435                     if (parent->isSelfSubscriber(sid, up_it->first))
00436                         update.set_epoch(up_it->second.epoch);
00437 
00438                     // If we're tracking indexes (tree replication), add the
00439                     // list in
00440                     typename ObjectIndexesMap::iterator obj_ind_it = sub_info->objectIndexes.find(up_it->first);
00441                     if (obj_ind_it != sub_info->objectIndexes.end()) {
00442                         for (typename ProxIndexSet::iterator prox_idx_it = obj_ind_it->second.begin(); prox_idx_it != obj_ind_it->second.end(); prox_idx_it++)
00443                             update.add_index_id((uint32)*prox_idx_it);
00444                     }
00445 
00446                     Sirikata::Protocol::ITimedMotionVector location = update.mutable_location();
00447                     location.set_t(up_it->second.location.updateTime());
00448                     location.set_position(up_it->second.location.position());
00449 
00450                     location.set_velocity(up_it->second.location.velocity());
00451 
00452                     Sirikata::Protocol::ITimedMotionQuaternion orientation = update.mutable_orientation();
00453                     orientation.set_t(up_it->second.orientation.updateTime());
00454                     orientation.set_position(up_it->second.orientation.position());
00455                     orientation.set_velocity(up_it->second.orientation.velocity());
00456 
00457                     Sirikata::Protocol::IAggregateBoundingInfo msg_bounds = update.mutable_aggregate_bounds();
00458                     msg_bounds.set_center_offset(up_it->second.bounds.centerOffset);
00459                     msg_bounds.set_center_bounds_radius(up_it->second.bounds.centerBoundsRadius);
00460                     msg_bounds.set_max_object_size(up_it->second.bounds.maxObjectRadius);
00461 
00462                     update.set_mesh(up_it->second.mesh);
00463                     update.set_physics(up_it->second.physics);
00464 
00465                     // If we hit the limit for this update, try to send it out
00466                     if (bulk_update.update_size() > (int32)max_updates) {
00467                         bool sent = parent->trySend(sid, bulk_update, sub_info);
00468                         if (!sent) {
00469                             send_failed = true;
00470                             break;
00471                         }
00472                         else {
00473                             bulk_update = Sirikata::Protocol::Loc::BulkLocationUpdate(); // clear it out
00474                             last_shipped = up_it;
00475                             sent_count++;
00476                         }
00477                     }
00478                 }
00479 
00480                 // Try to send the last few if necessary/possible
00481                 if (numOutstandingMessages(sub_info) < outstanding_message_hard_limit && !send_failed && bulk_update.update_size() > 0) {
00482                     bool sent = parent->trySend(sid, bulk_update, sub_info);
00483                     if (sent) {
00484                         last_shipped = sub_info->outstandingUpdates.end();
00485                         sent_count++;
00486                     }
00487                 }
00488 
00489                 // Finally clear out any entries successfully sent out
00490                 sub_info->outstandingUpdates.erase( sub_info->outstandingUpdates.begin(), last_shipped);
00491 
00492                 if (sub_info->noSubscriptionsLeft() && sub_info->outstandingUpdates.empty()) {
00493                     sub_info.reset();
00494                     to_delete.push_back(sid);
00495                 }
00496             }
00497 
00498             for(typename std::list<SubscriberType>::iterator it = to_delete.begin(); it != to_delete.end(); it++)
00499                 mSubscriptions.erase(*it);
00500         }
00501 
00502     };
00503 
00504     void tryCreateChildStream(const UUID& dest, ODPSST::Stream::Ptr parent_stream, std::string* msg, int count, const SubscriberInfoPtr&numOutstandingMessageCount);
00505     void objectLocSubstreamCallback(int x, ODPSST::Stream::Ptr substream, const UUID& dest, ODPSST::Stream::Ptr parent_substream, std::string* msg, int count, const SubscriberInfoPtr&numOutstandingMessageCount);
00506     void tryCreateChildStream(const OHDP::NodeID& dest, OHDPSST::Stream::Ptr parent_stream, std::string* msg, int count, const SubscriberInfoPtr&numOutstandingMessageCount);
00507     void ohLocSubstreamCallback(int x, OHDPSST::Stream::Ptr substream, const OHDP::NodeID& dest, OHDPSST::Stream::Ptr parent_substream, std::string* msg, int count, const SubscriberInfoPtr&numOutstandingMessageCount);
00508 
00509     bool validSubscriber(const UUID& dest);
00510     bool validSubscriber(const OHDP::NodeID& dest);
00511     bool validSubscriber(const ServerID& dest);
00512 
00513     bool isSelfSubscriber(const UUID& sid, const UUID& observed);
00514     bool isSelfSubscriber(const OHDP::NodeID& sid, const UUID& observed);
00515     bool isSelfSubscriber(const ServerID& sid, const UUID& observed);
00516 
00517     bool trySend(const UUID& dest, const Sirikata::Protocol::Loc::BulkLocationUpdate& blu, const SubscriberInfoPtr& numOutstandingMessageCount);
00518     bool trySend(const OHDP::NodeID& dest, const Sirikata::Protocol::Loc::BulkLocationUpdate& blu, const SubscriberInfoPtr& numOutstandingMessageCount);
00519     bool trySend(const ServerID& dest, const Sirikata::Protocol::Loc::BulkLocationUpdate& blu, const SubscriberInfoPtr& numOutstandingMessageCount);
00520     Poller mStatsPoller;
00521     Time mLastStatsTime;
00522     const String mTimeSeriesServerUpdatesName;
00523     AtomicValue<uint32> mServerUpdatesPerSecond;
00524     const String mTimeSeriesOHUpdatesName;
00525     AtomicValue<uint32> mOHUpdatesPerSecond;
00526     const String mTimeSeriesObjectUpdatesName;
00527     AtomicValue<uint32> mObjectUpdatesPerSecond;
00528 
00529     typedef SubscriberIndex<ServerID> ServerSubscriberIndex;
00530     ServerSubscriberIndex mServerSubscriptions;
00531 
00532     typedef SubscriberIndex<OHDP::NodeID> OHSubscriberIndex;
00533     OHSubscriberIndex mOHSubscriptions;
00534 
00535     typedef SubscriberIndex<UUID> ObjectSubscriberIndex;
00536     ObjectSubscriberIndex mObjectSubscriptions;
00537 }; // class AlwaysLocationUpdatePolicy
00538 
00539 } // namespace Sirikata
00540 
00541 #endif //_ALWAYS_LOCATION_UPDATE_POLICY_HPP_