Sirikata
libspace/include/sirikata/space/AggregateManager.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  AggregateManager.hpp
00003  *
00004  *  Copyright (c) 2010, Tahir Azim.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 
00034 #ifndef _AGGREGATE_MANAGER_HPP
00035 #define _AGGREGATE_MANAGER_HPP
00036 
00037 #include <sirikata/space/Platform.hpp>
00038 #include <sirikata/core/util/UUID.hpp>
00039 #include <sirikata/core/transfer/TransferData.hpp>
00040 #include <sirikata/core/transfer/RemoteFileMetadata.hpp>
00041 #include <sirikata/core/transfer/TransferPool.hpp>
00042 #include <sirikata/core/transfer/TransferMediator.hpp>
00043 
00044 #include <sirikata/space/LocationService.hpp>
00045 
00046 
00047 #include <sirikata/mesh/Meshdata.hpp>
00048 #include <sirikata/mesh/ModelsSystem.hpp>
00049 #include <sirikata/mesh/MeshSimplifier.hpp>
00050 #include <sirikata/mesh/Filter.hpp>
00051 
00052 #include <sirikata/core/transfer/HttpManager.hpp>
00053 
00054 #include <sirikata/core/command/Command.hpp>
00055 
00056 #include <boost/thread/locks.hpp>
00057 
00058 namespace Sirikata {
00059 
00060 class SIRIKATA_SPACE_EXPORT AggregateManager : public LocationServiceListener {
00061 private:
00062 
00063   enum{MAX_NUM_GENERATION_THREADS=16};
00064   uint16 mNumGenerationThreads;
00065   Thread* mAggregationThreads[MAX_NUM_GENERATION_THREADS];
00066   Network::IOService* mAggregationServices[MAX_NUM_GENERATION_THREADS];
00067   Network::IOStrand* mAggregationStrands[MAX_NUM_GENERATION_THREADS];
00068   Network::IOWork* mIOWorks[MAX_NUM_GENERATION_THREADS];
00069 
00070   typedef struct LocationInfo {
00071   private:
00072       boost::shared_mutex mutex;
00073 
00074     Vector3f mCurrentPosition;
00075     AggregateBoundingInfo mBounds;
00076     Quaternion mCurrentOrientation;
00077     String mMesh;
00078 
00079   public:
00080     LocationInfo(Vector3f curPos, AggregateBoundingInfo bnds,
00081                  Quaternion curOrient, String msh) :
00082       mCurrentPosition(curPos), mBounds(bnds),
00083       mCurrentOrientation(curOrient), mMesh(msh)
00084     {
00085     }
00086 
00087       Vector3f currentPosition() {
00088           boost::shared_lock<boost::shared_mutex> read_lock(mutex);
00089           return mCurrentPosition;
00090       }
00091       AggregateBoundingInfo bounds() {
00092           boost::shared_lock<boost::shared_mutex> read_lock(mutex);
00093           return mBounds;
00094       }
00095       Quaternion currentOrientation() {
00096           boost::shared_lock<boost::shared_mutex> read_lock(mutex);
00097           return mCurrentOrientation;
00098       }
00099       String mesh() {
00100           boost::shared_lock<boost::shared_mutex> read_lock(mutex);
00101           return mMesh;
00102       }
00103 
00104       void currentPosition(Vector3f v) {
00105           boost::unique_lock<boost::shared_mutex> write_lock(mutex);
00106           mCurrentPosition = v;
00107       }
00108       void bounds(AggregateBoundingInfo v) {
00109           boost::unique_lock<boost::shared_mutex> write_lock(mutex);
00110           mBounds = v;
00111       }
00112       void currentOrientation(Quaternion v) {
00113           boost::unique_lock<boost::shared_mutex> write_lock(mutex);
00114           mCurrentOrientation = v;
00115       }
00116       void mesh(const String& v) {
00117           boost::unique_lock<boost::shared_mutex> write_lock(mutex);
00118           mMesh = v;
00119       }
00120   } LocationInfo;
00121   std::tr1::shared_ptr<LocationInfo> getCachedLocInfo(const UUID& uuid) ;
00122 
00123   class LocationServiceCache {
00124     public:
00125       LocationServiceCache() { }
00126 
00127       std::tr1::shared_ptr<LocationInfo> getLocationInfo(const UUID& uuid) {
00128         if (mLocMap.find(uuid) == mLocMap.end()){
00129           return std::tr1::shared_ptr<LocationInfo>();
00130         }
00131 
00132         return mLocMap[uuid];
00133       }
00134 
00135       void insertLocationInfo(const UUID& uuid, std::tr1::shared_ptr<LocationInfo> locinfo) {
00136         mLocMap[uuid] = locinfo;
00137       }
00138 
00139       void removeLocationInfo(const UUID& uuid) {
00140         mLocMap.erase(uuid);
00141       }
00142 
00143     private:
00144       std::tr1::unordered_map<UUID, std::tr1::shared_ptr<LocationInfo> , UUID::Hasher> mLocMap;
00145   };
00146 
00147   LocationServiceCache mLocationServiceCache;
00148   boost::mutex mLocCacheMutex;
00149   LocationService* mLoc;
00150 
00151   //Part of the LocationServiceListener interface.
00152   virtual void localObjectAdded(const UUID& uuid, bool agg, const TimedMotionVector3f& loc, const TimedMotionQuaternion& orient,
00153                               const AggregateBoundingInfo& bounds, const String& mesh, const String& physics,
00154                                 const String& zernike);
00155   virtual void localObjectRemoved(const UUID& uuid, bool agg) ;
00156   virtual void localLocationUpdated(const UUID& uuid, bool agg, const TimedMotionVector3f& newval);
00157   virtual void localOrientationUpdated(const UUID& uuid, bool agg, const TimedMotionQuaternion& newval);
00158   virtual void localBoundsUpdated(const UUID& uuid, bool agg, const AggregateBoundingInfo& newval) ;
00159   virtual void localMeshUpdated(const UUID& uuid, bool agg, const String& newval) ;
00160   virtual void replicaObjectAdded(const UUID& uuid, const TimedMotionVector3f& loc, const TimedMotionQuaternion& orient,
00161       const AggregateBoundingInfo& bounds, const String& mesh, const String& physics, const String& zernike);
00162   virtual void replicaObjectRemoved(const UUID& uuid);
00163   virtual void replicaLocationUpdated(const UUID& uuid, const TimedMotionVector3f& newval);
00164   virtual void replicaOrientationUpdated(const UUID& uuid, const TimedMotionQuaternion& newval);
00165   virtual void replicaBoundsUpdated(const UUID& uuid, const AggregateBoundingInfo& newval);
00166   virtual void replicaMeshUpdated(const UUID& uuid, const String& newval);
00167 
00168 
00169   boost::mutex mModelsSystemMutex;
00170   ModelsSystem* mModelsSystem;
00171   Sirikata::Mesh::MeshSimplifier mMeshSimplifier;
00172   Sirikata::Mesh::Filter* mCenteringFilter;
00173 
00174   struct AggregateObject;
00175   typedef std::tr1::shared_ptr<AggregateObject> AggregateObjectPtr;
00176   struct AggregateObject{
00177     UUID mUUID;
00178     std::set<UUID> mParentUUIDs;
00179   private:
00180       boost::mutex mChildrenMutex;
00181       std::vector< std::tr1::shared_ptr<struct AggregateObject>  > mChildren;
00182   public:
00183       std::vector<AggregateObjectPtr> getChildrenCopy(){
00184           boost::mutex::scoped_lock lock(mChildrenMutex);
00185           return mChildren;
00186       }
00187       std::size_t childrenSize() {
00188           boost::mutex::scoped_lock lock(mChildrenMutex);
00189           return mChildren.size();
00190       }
00191       void addChild(AggregateObjectPtr child) {
00192           boost::mutex::scoped_lock lock(mChildrenMutex);
00193           mChildren.push_back(child);
00194       }
00195       bool hasChild(const UUID& child) {
00196           boost::mutex::scoped_lock lock(mChildrenMutex);
00197           for (uint32 i=0; i < mChildren.size(); i++)
00198               if (mChildren[i]->mUUID == child) return true;
00199           return false;
00200       }
00201       void removeChild(const UUID& child) {
00202           boost::mutex::scoped_lock lock(mChildrenMutex);
00203           for (uint32 i=0; i < mChildren.size(); i++) {
00204               if (mChildren[i]->mUUID == child) {
00205                   mChildren.erase(mChildren.begin() + i);
00206                   return;
00207               }
00208           }
00209       }
00210     // Whether this is actually a leaf object (i.e. added implicitly as
00211     // AggregateObject when added as a child of a true aggregate).
00212     bool leaf;
00213     Time mLastGenerateTime;
00214     bool generatedLastRound;
00215     Mesh::MeshdataPtr mMeshdata;
00216 
00217     AggregateObject(const UUID& uuid, const UUID& parentUUID, bool is_leaf) :
00218       mUUID(uuid),
00219       leaf(is_leaf),
00220       mLastGenerateTime(Time::null()),
00221       mTreeLevel(0),  mNumObservers(0),
00222       mNumFailedGenerationAttempts(0),
00223       cdnBaseName(),
00224       refreshTTL(Time::null())
00225     {
00226       mParentUUIDs.insert(parentUUID);
00227       mMeshdata = Mesh::MeshdataPtr();
00228       generatedLastRound = false;
00229       mDistance = 0.01;
00230     }
00231 
00232     uint16 mTreeLevel;
00233     uint32 mNumObservers;
00234     uint32 mNumFailedGenerationAttempts;
00235     double mDistance;  //MINIMUM distance at which this object could be part of a cut
00236     std::vector<UUID> mLeaves;
00237 
00238     // The basename returned by the CDN. This points at the entire asset
00239     // rather than the particular mesh filename. Should include a version
00240     // number. Used for refreshing TTLs.
00241     String cdnBaseName;
00242     // Time at which we should try to refresh the TTL, should be set
00243     // a bit less than the actual timeout.
00244     Time refreshTTL;
00245 
00246   };
00247 
00248 
00249   //Lists of all aggregate objects and dirty aggregate objects.
00250   boost::mutex mAggregateObjectsMutex;
00251   typedef std::tr1::unordered_map<UUID, AggregateObjectPtr, UUID::Hasher > AggregateObjectsMap;
00252   AggregateObjectsMap mAggregateObjects;
00253   Time mAggregateGenerationStartTime;
00254   std::tr1::unordered_map<UUID, AggregateObjectPtr, UUID::Hasher> mDirtyAggregateObjects;
00255 
00256   boost::mutex mObjectsByPriorityLocks[MAX_NUM_GENERATION_THREADS];
00257   std::map<float, std::deque<AggregateObjectPtr > > mObjectsByPriority[MAX_NUM_GENERATION_THREADS];
00258 
00259   //Variables related to downloading and in-memory caching meshes
00260   boost::mutex mMeshStoreMutex;
00261   std::tr1::unordered_map<String, Mesh::MeshdataPtr> mMeshStore;
00262   std::tr1::shared_ptr<Transfer::TransferPool> mTransferPool;
00263   Transfer::TransferMediator *mTransferMediator;
00264   void addToInMemoryCache(const String& meshName, const Mesh::MeshdataPtr mdptr);
00265 
00266   //CDN upload-related variables
00267   Transfer::OAuthParamsPtr mOAuth;
00268   const String mCDNUsername;
00269   Duration mModelTTL;
00270   Poller* mCDNKeepAlivePoller;
00271   const String mLocalPath;
00272   const String mLocalURLPrefix;
00273   bool mSkipGenerate;
00274   bool mSkipUpload;
00275 
00276   //CDN upload threads' variables
00277   enum{MAX_NUM_UPLOAD_THREADS = 16};
00278   uint16 mNumUploadThreads;
00279   Thread* mUploadThreads[MAX_NUM_UPLOAD_THREADS];
00280   Network::IOService* mUploadServices[MAX_NUM_UPLOAD_THREADS];
00281   Network::IOStrand* mUploadStrands[MAX_NUM_UPLOAD_THREADS];
00282   Network::IOWork* mUploadWorks[MAX_NUM_UPLOAD_THREADS];
00283   void uploadThreadMain(uint8 i);
00284 
00285   // Stats.
00286   // Raw number of aggregate updates that could cause regeneration,
00287   // e.g. add/remove children, child moved, child mesh changed, etc.
00288   AtomicValue<uint32> mRawAggregateUpdates;
00289   // Number of aggregate generation requests we've queued up, even if they
00290   // haven't finished.
00291   AtomicValue<uint32> mAggregatesQueued;
00292   // Number of aggregates that were actually generated, i.e. the mesh was saved
00293   // and uploading scheduled.
00294   AtomicValue<uint32> mAggregatesGenerated;
00295   // Number of aggregates which, even after retries, failed to generate
00296   AtomicValue<uint32> mAggregatesFailedToGenerate;
00297   // Number of aggregates successfully uploaded to the CDN
00298   AtomicValue<uint32> mAggregatesUploaded;
00299   // Number of aggregate uploads which failed
00300   AtomicValue<uint32> mAggregatesFailedToUpload;
00301   // Some stats need locks
00302   boost::mutex mStatsMutex;
00303   // Cumulative time spent generating aggregates (across all threads,
00304   // so it could be, e.g., 4x wall clock time). Doesn't include failed
00305   // calls to generateAggregateMeshAsync
00306   Duration mAggregateCumulativeGenerationTime;
00307   // And uploading them. Doesn't include failed uploads. This includes
00308   // the time spent serializing the mesh.
00309   Duration mAggregateCumulativeUploadTime;
00310   // And their size after being serialized.
00311   uint64 mAggregateCumulativeDataSize;
00312 
00313   //Various utility functions
00314   bool findChild(std::vector<AggregateObjectPtr>& v, const UUID& uuid) ;
00315   void removeChild(std::vector<AggregateObjectPtr>& v, const UUID& uuid) ;
00316   void iRemoveChild(const UUID& uuid, const UUID& child_uuid);
00317   void getLeaves(const std::vector<UUID>& mIndividualObjects);
00318   void addLeavesUpTree(UUID leaf_uuid, UUID uuid);
00319   bool isAggregate(const UUID& uuid);
00320 
00321 
00322 
00323   //Function related to generating and updating aggregates.
00324   void updateChildrenTreeLevel(const UUID& uuid, uint16 treeLevel);
00325   void addDirtyAggregates(UUID uuid);
00326   void queueDirtyAggregates(Time postTime);
00327   void generateMeshesFromQueue(uint8 i);
00328 
00329 
00330   enum {
00331       GEN_SUCCESS=1,
00332       CHILDREN_NOT_YET_GEN=2,
00333       NEWER_REQUEST=3, // newer timestamped request for aggregation
00334       MISSING_LOC_INFO=4,
00335       MISSING_CHILD_LOC_INFO=5,
00336       MISSING_CHILD_MESHES=6
00337   };
00338   uint32 generateAggregateMeshAsync(const UUID uuid, Time postTime, bool generateSiblings = true);
00339   void aggregationThreadMain(uint8 i);
00340   void updateAggregateLocMesh(UUID uuid, String mesh);
00341 
00342 
00343   //Functions related to uploading aggregates
00344   void uploadAggregateMesh(Mesh::MeshdataPtr agg_mesh, AggregateObjectPtr aggObject,
00345       std::tr1::unordered_map<String, String> textureSet, uint32 retryAttempt, Time uploadStartTime);
00346   // Helper that handles the upload callback and sets flags to let the request
00347   // from the aggregation thread to continue
00348   void handleUploadFinished(Transfer::UploadRequestPtr request, const Transfer::URI& path, Mesh::MeshdataPtr agg_mesh,
00349                 AggregateObjectPtr aggObject, std::tr1::unordered_map<String, String> textureSet,
00350                             uint32 retryAttempt, const Time& uploadStartTime);
00351   // Look for any aggregates that need a keep-alive sent to the CDN
00352   // and try to send them.
00353   void sendKeepAlives();
00354   void handleKeepAliveResponse(const UUID& objid,
00355              std::tr1::shared_ptr<Transfer::HttpManager::HttpResponse> response,
00356              Transfer::HttpManager::ERR_TYPE error, const boost::system::error_code& boost_error);
00357 
00358 
00359 
00360   // Helper for cleaning out parent state from child, or deleting it if it is an
00361   // abandoned leaf object (non-aggregate). Returns true if the object was
00362   // removed.
00363   bool cleanUpChild(const UUID& parent_uuid, const UUID& child_id);
00364   void removeStaleLeaves();
00365 
00366   // Command handlers
00367   void commandStats(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00368 public:
00369 
00394   AggregateManager( LocationService* loc, Transfer::OAuthParamsPtr oauth, const String& username, const String& local_path, const String& local_url_prefix, uint16 n_gen_threads, uint16 n_upload_threads, bool skip_gen, bool skip_upload);
00395 
00396   ~AggregateManager();
00397 
00398   void addAggregate(const UUID& uuid);
00399 
00400   void removeAggregate(const UUID& uuid);
00401 
00402   void addChild(const UUID& uuid, const UUID& child_uuid) ;
00403 
00404   void removeChild(const UUID& uuid, const UUID& child_uuid);
00405 
00406   void aggregateObserved(const UUID& objid, uint32 nobservers, uint32 nchildren);
00407 
00408   // This version requires locking to get at the AggregateObjectPtr
00409   // for the object. This isn't safe if you already hold that lock.
00410   void generateAggregateMesh(const UUID& uuid, const Duration& delayFor = Duration::milliseconds(1.0f) );
00411   // This version doesn't require a lock.
00412   void generateAggregateMesh(const UUID& uuid, AggregateObjectPtr aggObject, const Duration& delayFor = Duration::milliseconds(1.0f) );
00413 
00414   void metadataFinished(Time t, const UUID uuid, const UUID child_uuid, std::string meshName,uint8 attemptNo,
00415                         std::tr1::shared_ptr<Transfer::MetadataRequest> request,
00416                         std::tr1::shared_ptr<Transfer::RemoteFileMetadata> response)  ;
00417 
00418   void chunkFinished(Time t, const UUID uuid, const UUID child_uuid, std::string meshName, std::tr1::shared_ptr<Transfer::ChunkRequest> request,
00419                       std::tr1::shared_ptr<const Transfer::DenseData> response);
00420 
00421 
00422 };
00423 
00424 }
00425 
00426 #endif