Sirikata
|
00001 // Copyright (c) 2011 Sirikata Authors. All rights reserved. 00002 // Use of this source code is governed by a BSD-style license that can 00003 // be found in the LICENSE file. 00004 00005 #ifndef _SIRIKATA_CORE_TRANSFER_AGGREGATED_TRANSFER_POOL_HPP_ 00006 #define _SIRIKATA_CORE_TRANSFER_AGGREGATED_TRANSFER_POOL_HPP_ 00007 00008 #include <sirikata/core/transfer/TransferPool.hpp> 00009 #include <sirikata/core/transfer/MaxPriorityAggregation.hpp> 00010 00011 namespace Sirikata { 00012 namespace Transfer { 00013 00021 class AggregatedTransferPool : public TransferPool { 00022 public: 00023 virtual ~AggregatedTransferPool() { 00024 boost::unique_lock<boost::mutex> lock(mMutex); 00025 for(RequestDataMap::iterator it = mRequestData.begin(); it != mRequestData.end(); it++) { 00026 setRequestDeletion(it->second.aggregateRequest); 00027 mDeltaQueue.push(it->second.aggregateRequest); 00028 } 00029 } 00030 00031 //Puts a request into the pool 00032 virtual void addRequest(TransferRequestPtr req) { 00033 if (!req) { 00034 mDeltaQueue.push(req); 00035 return; 00036 } 00037 00038 boost::unique_lock<boost::mutex> lock(mMutex); 00039 00040 RequestDataMap::iterator it = mRequestData.find(req->getIdentifier()); 00041 00042 if (it == mRequestData.end()) { 00043 // Original addition. Create real request, add it 00044 RequestData data; 00045 data.inputRequests.push_back(req); 00046 00047 // To create the real request and be able to forward callbacks 00048 //properly, we need to down cast to figure out the type. 00049 MetadataRequestPtr metadata_req = std::tr1::dynamic_pointer_cast<MetadataRequest>(req); 00050 if (metadata_req) { 00051 data.aggregateRequest = TransferRequestPtr( 00052 new MetadataRequest( 00053 metadata_req->getURI(), metadata_req->getPriority(), 00054 std::tr1::bind(&AggregatedTransferPool::handleMetadata, this, req->getIdentifier(), _1, _2) 00055 ) 00056 ); 00057 } 00058 ChunkRequestPtr chunk_req = std::tr1::dynamic_pointer_cast<ChunkRequest>(req); 00059 if (chunk_req) { 00060 data.aggregateRequest = TransferRequestPtr( 00061 new ChunkRequest( 00062 chunk_req->getURI(), chunk_req->getMetadata(), chunk_req->getChunk(), chunk_req->getPriority(), 00063 std::tr1::bind(&AggregatedTransferPool::handleChunk, this, req->getIdentifier(), _1, _2) 00064 ) 00065 ); 00066 } 00067 DirectChunkRequestPtr direct_chunk_req = std::tr1::dynamic_pointer_cast<DirectChunkRequest>(req); 00068 if (direct_chunk_req) { 00069 data.aggregateRequest = TransferRequestPtr( 00070 new DirectChunkRequest( 00071 direct_chunk_req->getChunk(), direct_chunk_req->getPriority(), 00072 std::tr1::bind(&AggregatedTransferPool::handleDirectChunk, this, req->getIdentifier(), _1, _2) 00073 ) 00074 ); 00075 } 00076 UploadRequestPtr upload_req = std::tr1::dynamic_pointer_cast<UploadRequest>(req); 00077 if (upload_req) { 00078 data.aggregateRequest = TransferRequestPtr( 00079 new UploadRequest( 00080 upload_req->oauth(), upload_req->files(), upload_req->path(), upload_req->params(), upload_req->getPriority(), 00081 std::tr1::bind(&AggregatedTransferPool::handleUpload, this, req->getIdentifier(), _1, _2) 00082 ) 00083 ); 00084 } 00085 00086 if (!data.aggregateRequest) { 00087 SILOG(transfer, error, "Failed to add request to AggregatedTransferPool: unhandled request type."); 00088 return; 00089 } 00090 00091 mRequestData[req->getIdentifier()] = data; 00092 it = mRequestData.find(req->getIdentifier()); 00093 } 00094 else { 00095 // Add to existing aggregate. 00096 it->second.inputRequests.push_back(req); 00097 } 00098 00099 // Whether this is new or not, we can just set its basic properties, and 00100 // we'll definitely need to recompute the 00101 setRequestClientID(it->second.aggregateRequest); 00102 setRequestPriority(it->second.aggregateRequest, mAggregationAlgorithm->aggregate(it->second.inputRequests)); 00103 00104 mDeltaQueue.push(it->second.aggregateRequest); 00105 } 00106 00107 //Updates priority of a request in the pool 00108 virtual void updatePriority(TransferRequestPtr req, Priority p) { 00109 boost::unique_lock<boost::mutex> lock(mMutex); 00110 00111 RequestDataMap::iterator it = mRequestData.find(req->getIdentifier()); 00112 // We want 00113 //assert(it != mRequestData.end()); 00114 // but threading and the fact that callbacks are posted across 00115 // strands means we might end up cleaning out a request and 00116 // then very soon after get an updatePriority request, and 00117 // then the callback is made which would make the request by 00118 // the client invalid. 00119 if (it == mRequestData.end()) return; 00120 00121 // Update priority of individual request 00122 setRequestPriority(req, p); 00123 // Update aggregate priority 00124 setRequestPriority(it->second.aggregateRequest, mAggregationAlgorithm->aggregate(it->second.inputRequests)); 00125 mDeltaQueue.push(it->second.aggregateRequest); 00126 } 00127 00128 //Updates priority of a request in the pool 00129 inline void deleteRequest(TransferRequestPtr req) { 00130 boost::unique_lock<boost::mutex> lock(mMutex); 00131 00132 RequestDataMap::iterator it = mRequestData.find(req->getIdentifier()); 00133 // We want 00134 //assert(it != mRequestData.end()); 00135 // but can't assume it, see note in updatePriority 00136 if (it == mRequestData.end()) return; 00137 00138 // Remove from the list of input requests 00139 for(TransferRequestList::iterator in_it = it->second.inputRequests.begin(); 00140 in_it != it->second.inputRequests.end(); 00141 in_it++) { 00142 // Strict equality on the shared ptrs, the "unique" ids on 00143 // TransferRequests are not actually unique 00144 if (*in_it == req) { 00145 it->second.inputRequests.erase(in_it); 00146 break; 00147 } 00148 } 00149 00150 // If we've run out of requests, forward the deletion on via the 00151 // aggregate and clean up. 00152 if (it->second.inputRequests.empty()) { 00153 setRequestDeletion(it->second.aggregateRequest); 00154 mDeltaQueue.push(it->second.aggregateRequest); 00155 mRequestData.erase(it); 00156 } 00157 else { 00158 // Otherwise, update priority 00159 setRequestPriority(it->second.aggregateRequest, mAggregationAlgorithm->aggregate(it->second.inputRequests)); 00160 mDeltaQueue.push(it->second.aggregateRequest); 00161 } 00162 00163 } 00164 00165 private: 00166 // Friend in TransferMediator so it can construct, call getRequest 00167 friend class TransferMediator; 00168 00169 AggregatedTransferPool(const std::string &clientID) 00170 : TransferPool(clientID) 00171 { 00172 // FIXME should be customizable 00173 mAggregationAlgorithm = new MaxPriorityAggregation(); 00174 } 00175 00176 //Returns an item from the pool. Blocks if pool is empty. 00177 inline std::tr1::shared_ptr<TransferRequest> getRequest() { 00178 std::tr1::shared_ptr<TransferRequest> retval; 00179 mDeltaQueue.blockingPop(retval); 00180 return retval; 00181 } 00182 00183 00184 // Handle metadata callback, sending data to callbacks 00185 void handleMetadata(const String input_identifier, MetadataRequestPtr req, RemoteFileMetadataPtr response) { 00186 // Since callbacks may manipulate this TransferPool, grab the data 00187 // needed for callbacks, unlock, then invoke them 00188 TransferRequestList inputRequests; 00189 { 00190 boost::unique_lock<boost::mutex> lock(mMutex); 00191 00192 // Need to search by input identifier because the unique id of req is 00193 // different than the one we index by. 00194 RequestDataMap::iterator it = mRequestData.find(input_identifier); 00195 // We'd like to do 00196 //assert(it != mRequestData.end()); 00197 // but because of threading that could be incorrect -- the 00198 // TransferMediator could finish and post the callback, 00199 // then we might have another request for the same data 00200 // come in, causing us to re-request it. We think its a 00201 // priority update, but TransferMediator now adds it as an 00202 // additional request. Instead, we can just ignore the 00203 // callback if we don't have data for it -- we've already 00204 // invoked the callback that caused the problem. 00205 if (it == mRequestData.end()) return; 00206 00207 inputRequests.swap(it->second.inputRequests); 00208 mRequestData.erase(it); 00209 } 00210 00211 // Just forward the callback and clear out the data. Because the request 00212 // isn't guaranteed to have the response, we need to 00213 for(TransferRequestList::iterator in_it = inputRequests.begin(); 00214 in_it != inputRequests.end(); 00215 in_it++) { 00216 MetadataRequestPtr metadata_req = std::tr1::dynamic_pointer_cast<MetadataRequest>(*in_it); 00217 metadata_req->notifyCaller(metadata_req, req, response); 00218 } 00219 } 00220 void handleChunk(const String input_identifier, ChunkRequestPtr req, DenseDataPtr response) { 00221 // Since callbacks may manipulate this TransferPool, grab the data 00222 // needed for callbacks, unlock, then invoke them 00223 TransferRequestList inputRequests; 00224 { 00225 boost::unique_lock<boost::mutex> lock(mMutex); 00226 00227 // Need to search by input identifier because the unique id of req is 00228 // different than the one we index by. 00229 RequestDataMap::iterator it = mRequestData.find(input_identifier); 00230 // We'd like to do 00231 //assert(it != mRequestData.end()); 00232 // but because of threading that could be incorrect -- the 00233 // TransferMediator could finish and post the callback, 00234 // then we might have another request for the same data 00235 // come in, causing us to re-request it. We think its a 00236 // priority update, but TransferMediator now adds it as an 00237 // additional request. Instead, we can just ignore the 00238 // callback if we don't have data for it -- we've already 00239 // invoked the callback that caused the problem. 00240 if (it == mRequestData.end()) return; 00241 00242 inputRequests.swap(it->second.inputRequests); 00243 mRequestData.erase(it); 00244 } 00245 00246 // Just forward the callback and clear out the data. Because the request 00247 // isn't guaranteed to have the response, we need to 00248 for(TransferRequestList::iterator in_it = inputRequests.begin(); 00249 in_it != inputRequests.end(); 00250 in_it++) { 00251 ChunkRequestPtr chunk_req = std::tr1::dynamic_pointer_cast<ChunkRequest>(*in_it); 00252 chunk_req->notifyCaller(chunk_req, req, response); 00253 } 00254 } 00255 00256 void handleDirectChunk(const String input_identifier, DirectChunkRequestPtr req, DenseDataPtr response) { 00257 TransferRequestList inputRequests; 00258 { 00259 boost::unique_lock<boost::mutex> lock(mMutex); 00260 RequestDataMap::iterator it = mRequestData.find(input_identifier); 00261 if (it == mRequestData.end()) return; 00262 inputRequests.swap(it->second.inputRequests); 00263 mRequestData.erase(it); 00264 } 00265 00266 for(TransferRequestList::iterator in_it = inputRequests.begin(); 00267 in_it != inputRequests.end(); 00268 in_it++) { 00269 DirectChunkRequestPtr direct_chunk_req = std::tr1::dynamic_pointer_cast<DirectChunkRequest>(*in_it); 00270 direct_chunk_req->notifyCaller(direct_chunk_req, req, response); 00271 } 00272 } 00273 00274 void handleUpload(const String input_identifier, UploadRequestPtr req, const Transfer::URI& response) { 00275 TransferRequestList inputRequests; 00276 { 00277 boost::unique_lock<boost::mutex> lock(mMutex); 00278 RequestDataMap::iterator it = mRequestData.find(input_identifier); 00279 if (it == mRequestData.end()) return; 00280 inputRequests.swap(it->second.inputRequests); 00281 mRequestData.erase(it); 00282 } 00283 00284 for(TransferRequestList::iterator in_it = inputRequests.begin(); 00285 in_it != inputRequests.end(); 00286 in_it++) { 00287 UploadRequestPtr upload_req = std::tr1::dynamic_pointer_cast<UploadRequest>(*in_it); 00288 // notifyCaller copies info into upload_req, so response doesn't 00289 // need to be passed through 00290 upload_req->notifyCaller(upload_req, req); 00291 } 00292 } 00293 00294 // Aggregation implementation. FIXME This should be customizable 00295 PriorityAggregationAlgorithm* mAggregationAlgorithm; 00296 00297 // For each unique request ID, we need to maintain the original requests and 00298 // the request 00299 boost::mutex mMutex; // Protection for mRequestData 00300 typedef std::vector<TransferRequestPtr> TransferRequestList; 00301 struct RequestData { 00302 TransferRequestList inputRequests; 00303 TransferRequestPtr aggregateRequest; 00304 }; 00305 // UniqueID -> RequestData 00306 typedef std::tr1::unordered_map<String, RequestData> RequestDataMap; 00307 RequestDataMap mRequestData; 00308 00309 // Requests to the TransferMediator 00310 ThreadSafeQueue<TransferRequestPtr> mDeltaQueue; 00311 }; 00312 00313 } // namespace Transfer 00314 } // namespace Sirikata 00315 00316 #endif //_SIRIKATA_CORE_TRANSFER_AGGREGATED_TRANSFER_POOL_HPP_