Sirikata
|
00001 /* Sirikata 00002 * AggregateManager.hpp 00003 * 00004 * Copyright (c) 2010, Tahir Azim. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 00034 #ifndef _AGGREGATE_MANAGER_HPP 00035 #define _AGGREGATE_MANAGER_HPP 00036 00037 #include <sirikata/space/Platform.hpp> 00038 #include <sirikata/core/util/UUID.hpp> 00039 #include <sirikata/core/transfer/TransferData.hpp> 00040 #include <sirikata/core/transfer/RemoteFileMetadata.hpp> 00041 #include <sirikata/core/transfer/TransferPool.hpp> 00042 #include <sirikata/core/transfer/TransferMediator.hpp> 00043 00044 #include <sirikata/space/LocationService.hpp> 00045 00046 00047 #include <sirikata/mesh/Meshdata.hpp> 00048 #include <sirikata/mesh/ModelsSystem.hpp> 00049 #include <sirikata/mesh/MeshSimplifier.hpp> 00050 #include <sirikata/mesh/Filter.hpp> 00051 00052 #include <sirikata/core/transfer/HttpManager.hpp> 00053 00054 #include <sirikata/core/command/Command.hpp> 00055 00056 #include <boost/thread/locks.hpp> 00057 00058 namespace Sirikata { 00059 00060 class SIRIKATA_SPACE_EXPORT AggregateManager : public LocationServiceListener { 00061 private: 00062 00063 enum{MAX_NUM_GENERATION_THREADS=16}; 00064 uint16 mNumGenerationThreads; 00065 Thread* mAggregationThreads[MAX_NUM_GENERATION_THREADS]; 00066 Network::IOService* mAggregationServices[MAX_NUM_GENERATION_THREADS]; 00067 Network::IOStrand* mAggregationStrands[MAX_NUM_GENERATION_THREADS]; 00068 Network::IOWork* mIOWorks[MAX_NUM_GENERATION_THREADS]; 00069 00070 typedef struct LocationInfo { 00071 private: 00072 boost::shared_mutex mutex; 00073 00074 Vector3f mCurrentPosition; 00075 AggregateBoundingInfo mBounds; 00076 Quaternion mCurrentOrientation; 00077 String mMesh; 00078 00079 public: 00080 LocationInfo(Vector3f curPos, AggregateBoundingInfo bnds, 00081 Quaternion curOrient, String msh) : 00082 mCurrentPosition(curPos), mBounds(bnds), 00083 mCurrentOrientation(curOrient), mMesh(msh) 00084 { 00085 } 00086 00087 Vector3f currentPosition() { 00088 boost::shared_lock<boost::shared_mutex> read_lock(mutex); 00089 return mCurrentPosition; 00090 } 00091 AggregateBoundingInfo bounds() { 00092 boost::shared_lock<boost::shared_mutex> read_lock(mutex); 00093 return mBounds; 00094 } 00095 Quaternion currentOrientation() { 00096 boost::shared_lock<boost::shared_mutex> read_lock(mutex); 00097 return mCurrentOrientation; 00098 } 00099 String mesh() { 00100 boost::shared_lock<boost::shared_mutex> read_lock(mutex); 00101 return mMesh; 00102 } 00103 00104 void currentPosition(Vector3f v) { 00105 boost::unique_lock<boost::shared_mutex> write_lock(mutex); 00106 mCurrentPosition = v; 00107 } 00108 void bounds(AggregateBoundingInfo v) { 00109 boost::unique_lock<boost::shared_mutex> write_lock(mutex); 00110 mBounds = v; 00111 } 00112 void currentOrientation(Quaternion v) { 00113 boost::unique_lock<boost::shared_mutex> write_lock(mutex); 00114 mCurrentOrientation = v; 00115 } 00116 void mesh(const String& v) { 00117 boost::unique_lock<boost::shared_mutex> write_lock(mutex); 00118 mMesh = v; 00119 } 00120 } LocationInfo; 00121 std::tr1::shared_ptr<LocationInfo> getCachedLocInfo(const UUID& uuid) ; 00122 00123 class LocationServiceCache { 00124 public: 00125 LocationServiceCache() { } 00126 00127 std::tr1::shared_ptr<LocationInfo> getLocationInfo(const UUID& uuid) { 00128 if (mLocMap.find(uuid) == mLocMap.end()){ 00129 return std::tr1::shared_ptr<LocationInfo>(); 00130 } 00131 00132 return mLocMap[uuid]; 00133 } 00134 00135 void insertLocationInfo(const UUID& uuid, std::tr1::shared_ptr<LocationInfo> locinfo) { 00136 mLocMap[uuid] = locinfo; 00137 } 00138 00139 void removeLocationInfo(const UUID& uuid) { 00140 mLocMap.erase(uuid); 00141 } 00142 00143 private: 00144 std::tr1::unordered_map<UUID, std::tr1::shared_ptr<LocationInfo> , UUID::Hasher> mLocMap; 00145 }; 00146 00147 LocationServiceCache mLocationServiceCache; 00148 boost::mutex mLocCacheMutex; 00149 LocationService* mLoc; 00150 00151 //Part of the LocationServiceListener interface. 00152 virtual void localObjectAdded(const UUID& uuid, bool agg, const TimedMotionVector3f& loc, const TimedMotionQuaternion& orient, 00153 const AggregateBoundingInfo& bounds, const String& mesh, const String& physics, 00154 const String& zernike); 00155 virtual void localObjectRemoved(const UUID& uuid, bool agg) ; 00156 virtual void localLocationUpdated(const UUID& uuid, bool agg, const TimedMotionVector3f& newval); 00157 virtual void localOrientationUpdated(const UUID& uuid, bool agg, const TimedMotionQuaternion& newval); 00158 virtual void localBoundsUpdated(const UUID& uuid, bool agg, const AggregateBoundingInfo& newval) ; 00159 virtual void localMeshUpdated(const UUID& uuid, bool agg, const String& newval) ; 00160 virtual void replicaObjectAdded(const UUID& uuid, const TimedMotionVector3f& loc, const TimedMotionQuaternion& orient, 00161 const AggregateBoundingInfo& bounds, const String& mesh, const String& physics, const String& zernike); 00162 virtual void replicaObjectRemoved(const UUID& uuid); 00163 virtual void replicaLocationUpdated(const UUID& uuid, const TimedMotionVector3f& newval); 00164 virtual void replicaOrientationUpdated(const UUID& uuid, const TimedMotionQuaternion& newval); 00165 virtual void replicaBoundsUpdated(const UUID& uuid, const AggregateBoundingInfo& newval); 00166 virtual void replicaMeshUpdated(const UUID& uuid, const String& newval); 00167 00168 00169 boost::mutex mModelsSystemMutex; 00170 ModelsSystem* mModelsSystem; 00171 Sirikata::Mesh::MeshSimplifier mMeshSimplifier; 00172 Sirikata::Mesh::Filter* mCenteringFilter; 00173 00174 struct AggregateObject; 00175 typedef std::tr1::shared_ptr<AggregateObject> AggregateObjectPtr; 00176 struct AggregateObject{ 00177 UUID mUUID; 00178 std::set<UUID> mParentUUIDs; 00179 private: 00180 boost::mutex mChildrenMutex; 00181 std::vector< std::tr1::shared_ptr<struct AggregateObject> > mChildren; 00182 public: 00183 std::vector<AggregateObjectPtr> getChildrenCopy(){ 00184 boost::mutex::scoped_lock lock(mChildrenMutex); 00185 return mChildren; 00186 } 00187 std::size_t childrenSize() { 00188 boost::mutex::scoped_lock lock(mChildrenMutex); 00189 return mChildren.size(); 00190 } 00191 void addChild(AggregateObjectPtr child) { 00192 boost::mutex::scoped_lock lock(mChildrenMutex); 00193 mChildren.push_back(child); 00194 } 00195 bool hasChild(const UUID& child) { 00196 boost::mutex::scoped_lock lock(mChildrenMutex); 00197 for (uint32 i=0; i < mChildren.size(); i++) 00198 if (mChildren[i]->mUUID == child) return true; 00199 return false; 00200 } 00201 void removeChild(const UUID& child) { 00202 boost::mutex::scoped_lock lock(mChildrenMutex); 00203 for (uint32 i=0; i < mChildren.size(); i++) { 00204 if (mChildren[i]->mUUID == child) { 00205 mChildren.erase(mChildren.begin() + i); 00206 return; 00207 } 00208 } 00209 } 00210 // Whether this is actually a leaf object (i.e. added implicitly as 00211 // AggregateObject when added as a child of a true aggregate). 00212 bool leaf; 00213 Time mLastGenerateTime; 00214 bool generatedLastRound; 00215 Mesh::MeshdataPtr mMeshdata; 00216 00217 AggregateObject(const UUID& uuid, const UUID& parentUUID, bool is_leaf) : 00218 mUUID(uuid), 00219 leaf(is_leaf), 00220 mLastGenerateTime(Time::null()), 00221 mTreeLevel(0), mNumObservers(0), 00222 mNumFailedGenerationAttempts(0), 00223 cdnBaseName(), 00224 refreshTTL(Time::null()) 00225 { 00226 mParentUUIDs.insert(parentUUID); 00227 mMeshdata = Mesh::MeshdataPtr(); 00228 generatedLastRound = false; 00229 mDistance = 0.01; 00230 } 00231 00232 uint16 mTreeLevel; 00233 uint32 mNumObservers; 00234 uint32 mNumFailedGenerationAttempts; 00235 double mDistance; //MINIMUM distance at which this object could be part of a cut 00236 std::vector<UUID> mLeaves; 00237 00238 // The basename returned by the CDN. This points at the entire asset 00239 // rather than the particular mesh filename. Should include a version 00240 // number. Used for refreshing TTLs. 00241 String cdnBaseName; 00242 // Time at which we should try to refresh the TTL, should be set 00243 // a bit less than the actual timeout. 00244 Time refreshTTL; 00245 00246 }; 00247 00248 00249 //Lists of all aggregate objects and dirty aggregate objects. 00250 boost::mutex mAggregateObjectsMutex; 00251 typedef std::tr1::unordered_map<UUID, AggregateObjectPtr, UUID::Hasher > AggregateObjectsMap; 00252 AggregateObjectsMap mAggregateObjects; 00253 Time mAggregateGenerationStartTime; 00254 std::tr1::unordered_map<UUID, AggregateObjectPtr, UUID::Hasher> mDirtyAggregateObjects; 00255 00256 boost::mutex mObjectsByPriorityLocks[MAX_NUM_GENERATION_THREADS]; 00257 std::map<float, std::deque<AggregateObjectPtr > > mObjectsByPriority[MAX_NUM_GENERATION_THREADS]; 00258 00259 //Variables related to downloading and in-memory caching meshes 00260 boost::mutex mMeshStoreMutex; 00261 std::tr1::unordered_map<String, Mesh::MeshdataPtr> mMeshStore; 00262 std::tr1::shared_ptr<Transfer::TransferPool> mTransferPool; 00263 Transfer::TransferMediator *mTransferMediator; 00264 void addToInMemoryCache(const String& meshName, const Mesh::MeshdataPtr mdptr); 00265 00266 //CDN upload-related variables 00267 Transfer::OAuthParamsPtr mOAuth; 00268 const String mCDNUsername; 00269 Duration mModelTTL; 00270 Poller* mCDNKeepAlivePoller; 00271 const String mLocalPath; 00272 const String mLocalURLPrefix; 00273 bool mSkipGenerate; 00274 bool mSkipUpload; 00275 00276 //CDN upload threads' variables 00277 enum{MAX_NUM_UPLOAD_THREADS = 16}; 00278 uint16 mNumUploadThreads; 00279 Thread* mUploadThreads[MAX_NUM_UPLOAD_THREADS]; 00280 Network::IOService* mUploadServices[MAX_NUM_UPLOAD_THREADS]; 00281 Network::IOStrand* mUploadStrands[MAX_NUM_UPLOAD_THREADS]; 00282 Network::IOWork* mUploadWorks[MAX_NUM_UPLOAD_THREADS]; 00283 void uploadThreadMain(uint8 i); 00284 00285 // Stats. 00286 // Raw number of aggregate updates that could cause regeneration, 00287 // e.g. add/remove children, child moved, child mesh changed, etc. 00288 AtomicValue<uint32> mRawAggregateUpdates; 00289 // Number of aggregate generation requests we've queued up, even if they 00290 // haven't finished. 00291 AtomicValue<uint32> mAggregatesQueued; 00292 // Number of aggregates that were actually generated, i.e. the mesh was saved 00293 // and uploading scheduled. 00294 AtomicValue<uint32> mAggregatesGenerated; 00295 // Number of aggregates which, even after retries, failed to generate 00296 AtomicValue<uint32> mAggregatesFailedToGenerate; 00297 // Number of aggregates successfully uploaded to the CDN 00298 AtomicValue<uint32> mAggregatesUploaded; 00299 // Number of aggregate uploads which failed 00300 AtomicValue<uint32> mAggregatesFailedToUpload; 00301 // Some stats need locks 00302 boost::mutex mStatsMutex; 00303 // Cumulative time spent generating aggregates (across all threads, 00304 // so it could be, e.g., 4x wall clock time). Doesn't include failed 00305 // calls to generateAggregateMeshAsync 00306 Duration mAggregateCumulativeGenerationTime; 00307 // And uploading them. Doesn't include failed uploads. This includes 00308 // the time spent serializing the mesh. 00309 Duration mAggregateCumulativeUploadTime; 00310 // And their size after being serialized. 00311 uint64 mAggregateCumulativeDataSize; 00312 00313 //Various utility functions 00314 bool findChild(std::vector<AggregateObjectPtr>& v, const UUID& uuid) ; 00315 void removeChild(std::vector<AggregateObjectPtr>& v, const UUID& uuid) ; 00316 void iRemoveChild(const UUID& uuid, const UUID& child_uuid); 00317 void getLeaves(const std::vector<UUID>& mIndividualObjects); 00318 void addLeavesUpTree(UUID leaf_uuid, UUID uuid); 00319 bool isAggregate(const UUID& uuid); 00320 00321 00322 00323 //Function related to generating and updating aggregates. 00324 void updateChildrenTreeLevel(const UUID& uuid, uint16 treeLevel); 00325 void addDirtyAggregates(UUID uuid); 00326 void queueDirtyAggregates(Time postTime); 00327 void generateMeshesFromQueue(uint8 i); 00328 00329 00330 enum { 00331 GEN_SUCCESS=1, 00332 CHILDREN_NOT_YET_GEN=2, 00333 NEWER_REQUEST=3, // newer timestamped request for aggregation 00334 MISSING_LOC_INFO=4, 00335 MISSING_CHILD_LOC_INFO=5, 00336 MISSING_CHILD_MESHES=6 00337 }; 00338 uint32 generateAggregateMeshAsync(const UUID uuid, Time postTime, bool generateSiblings = true); 00339 void aggregationThreadMain(uint8 i); 00340 void updateAggregateLocMesh(UUID uuid, String mesh); 00341 00342 00343 //Functions related to uploading aggregates 00344 void uploadAggregateMesh(Mesh::MeshdataPtr agg_mesh, AggregateObjectPtr aggObject, 00345 std::tr1::unordered_map<String, String> textureSet, uint32 retryAttempt, Time uploadStartTime); 00346 // Helper that handles the upload callback and sets flags to let the request 00347 // from the aggregation thread to continue 00348 void handleUploadFinished(Transfer::UploadRequestPtr request, const Transfer::URI& path, Mesh::MeshdataPtr agg_mesh, 00349 AggregateObjectPtr aggObject, std::tr1::unordered_map<String, String> textureSet, 00350 uint32 retryAttempt, const Time& uploadStartTime); 00351 // Look for any aggregates that need a keep-alive sent to the CDN 00352 // and try to send them. 00353 void sendKeepAlives(); 00354 void handleKeepAliveResponse(const UUID& objid, 00355 std::tr1::shared_ptr<Transfer::HttpManager::HttpResponse> response, 00356 Transfer::HttpManager::ERR_TYPE error, const boost::system::error_code& boost_error); 00357 00358 00359 00360 // Helper for cleaning out parent state from child, or deleting it if it is an 00361 // abandoned leaf object (non-aggregate). Returns true if the object was 00362 // removed. 00363 bool cleanUpChild(const UUID& parent_uuid, const UUID& child_id); 00364 void removeStaleLeaves(); 00365 00366 // Command handlers 00367 void commandStats(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00368 public: 00369 00394 AggregateManager( LocationService* loc, Transfer::OAuthParamsPtr oauth, const String& username, const String& local_path, const String& local_url_prefix, uint16 n_gen_threads, uint16 n_upload_threads, bool skip_gen, bool skip_upload); 00395 00396 ~AggregateManager(); 00397 00398 void addAggregate(const UUID& uuid); 00399 00400 void removeAggregate(const UUID& uuid); 00401 00402 void addChild(const UUID& uuid, const UUID& child_uuid) ; 00403 00404 void removeChild(const UUID& uuid, const UUID& child_uuid); 00405 00406 void aggregateObserved(const UUID& objid, uint32 nobservers, uint32 nchildren); 00407 00408 // This version requires locking to get at the AggregateObjectPtr 00409 // for the object. This isn't safe if you already hold that lock. 00410 void generateAggregateMesh(const UUID& uuid, const Duration& delayFor = Duration::milliseconds(1.0f) ); 00411 // This version doesn't require a lock. 00412 void generateAggregateMesh(const UUID& uuid, AggregateObjectPtr aggObject, const Duration& delayFor = Duration::milliseconds(1.0f) ); 00413 00414 void metadataFinished(Time t, const UUID uuid, const UUID child_uuid, std::string meshName,uint8 attemptNo, 00415 std::tr1::shared_ptr<Transfer::MetadataRequest> request, 00416 std::tr1::shared_ptr<Transfer::RemoteFileMetadata> response) ; 00417 00418 void chunkFinished(Time t, const UUID uuid, const UUID child_uuid, std::string meshName, std::tr1::shared_ptr<Transfer::ChunkRequest> request, 00419 std::tr1::shared_ptr<const Transfer::DenseData> response); 00420 00421 00422 }; 00423 00424 } 00425 00426 #endif