Sirikata
libcore/include/sirikata/core/transfer/AggregatedTransferPool.hpp
Go to the documentation of this file.
00001 // Copyright (c) 2011 Sirikata Authors. All rights reserved.
00002 // Use of this source code is governed by a BSD-style license that can
00003 // be found in the LICENSE file.
00004 
00005 #ifndef _SIRIKATA_CORE_TRANSFER_AGGREGATED_TRANSFER_POOL_HPP_
00006 #define _SIRIKATA_CORE_TRANSFER_AGGREGATED_TRANSFER_POOL_HPP_
00007 
00008 #include <sirikata/core/transfer/TransferPool.hpp>
00009 #include <sirikata/core/transfer/MaxPriorityAggregation.hpp>
00010 
00011 namespace Sirikata {
00012 namespace Transfer {
00013 
00021 class AggregatedTransferPool : public TransferPool {
00022 public:
00023     virtual ~AggregatedTransferPool() {
00024         boost::unique_lock<boost::mutex> lock(mMutex);
00025         for(RequestDataMap::iterator it = mRequestData.begin(); it != mRequestData.end(); it++) {
00026             setRequestDeletion(it->second.aggregateRequest);
00027             mDeltaQueue.push(it->second.aggregateRequest);
00028         }
00029     }
00030 
00031     //Puts a request into the pool
00032     virtual void addRequest(TransferRequestPtr req) {
00033         if (!req) {
00034             mDeltaQueue.push(req);
00035             return;
00036         }
00037 
00038         boost::unique_lock<boost::mutex> lock(mMutex);
00039 
00040         RequestDataMap::iterator it = mRequestData.find(req->getIdentifier());
00041 
00042         if (it == mRequestData.end()) {
00043             // Original addition. Create real request, add it
00044             RequestData data;
00045             data.inputRequests.push_back(req);
00046 
00047             // To create the real request and be able to forward callbacks
00048             //properly, we need to down cast to figure out the type.
00049             MetadataRequestPtr metadata_req = std::tr1::dynamic_pointer_cast<MetadataRequest>(req);
00050             if (metadata_req) {
00051                 data.aggregateRequest = TransferRequestPtr(
00052                     new MetadataRequest(
00053                         metadata_req->getURI(), metadata_req->getPriority(),
00054                         std::tr1::bind(&AggregatedTransferPool::handleMetadata, this, req->getIdentifier(), _1, _2)
00055                     )
00056                 );
00057             }
00058             ChunkRequestPtr chunk_req = std::tr1::dynamic_pointer_cast<ChunkRequest>(req);
00059             if (chunk_req) {
00060                 data.aggregateRequest = TransferRequestPtr(
00061                     new ChunkRequest(
00062                         chunk_req->getURI(), chunk_req->getMetadata(), chunk_req->getChunk(), chunk_req->getPriority(),
00063                         std::tr1::bind(&AggregatedTransferPool::handleChunk, this, req->getIdentifier(), _1, _2)
00064                     )
00065                 );
00066             }
00067             DirectChunkRequestPtr direct_chunk_req = std::tr1::dynamic_pointer_cast<DirectChunkRequest>(req);
00068             if (direct_chunk_req) {
00069                 data.aggregateRequest = TransferRequestPtr(
00070                     new DirectChunkRequest(
00071                         direct_chunk_req->getChunk(), direct_chunk_req->getPriority(),
00072                         std::tr1::bind(&AggregatedTransferPool::handleDirectChunk, this, req->getIdentifier(), _1, _2)
00073                     )
00074                 );
00075             }
00076             UploadRequestPtr upload_req = std::tr1::dynamic_pointer_cast<UploadRequest>(req);
00077             if (upload_req) {
00078                 data.aggregateRequest = TransferRequestPtr(
00079                     new UploadRequest(
00080                         upload_req->oauth(), upload_req->files(), upload_req->path(), upload_req->params(), upload_req->getPriority(),
00081                         std::tr1::bind(&AggregatedTransferPool::handleUpload, this, req->getIdentifier(), _1, _2)
00082                     )
00083                 );
00084             }
00085 
00086             if (!data.aggregateRequest) {
00087                 SILOG(transfer, error, "Failed to add request to AggregatedTransferPool: unhandled request type.");
00088                 return;
00089             }
00090 
00091             mRequestData[req->getIdentifier()] = data;
00092             it = mRequestData.find(req->getIdentifier());
00093         }
00094         else {
00095             // Add to existing aggregate.
00096             it->second.inputRequests.push_back(req);
00097         }
00098 
00099         // Whether this is new or not, we can just set its basic properties, and
00100         // we'll definitely need to recompute the
00101         setRequestClientID(it->second.aggregateRequest);
00102         setRequestPriority(it->second.aggregateRequest, mAggregationAlgorithm->aggregate(it->second.inputRequests));
00103 
00104         mDeltaQueue.push(it->second.aggregateRequest);
00105     }
00106 
00107     //Updates priority of a request in the pool
00108     virtual void updatePriority(TransferRequestPtr req, Priority p) {
00109         boost::unique_lock<boost::mutex> lock(mMutex);
00110 
00111         RequestDataMap::iterator it = mRequestData.find(req->getIdentifier());
00112         // We want
00113         //assert(it != mRequestData.end());
00114         // but threading and the fact that callbacks are posted across
00115         // strands means we might end up cleaning out a request and
00116         // then very soon after get an updatePriority request, and
00117         // then the callback is made which would make the request by
00118         // the client invalid.
00119         if (it == mRequestData.end()) return;
00120 
00121         // Update priority of individual request
00122         setRequestPriority(req, p);
00123         // Update aggregate priority
00124         setRequestPriority(it->second.aggregateRequest, mAggregationAlgorithm->aggregate(it->second.inputRequests));
00125         mDeltaQueue.push(it->second.aggregateRequest);
00126     }
00127 
00128     //Updates priority of a request in the pool
00129     inline void deleteRequest(TransferRequestPtr req) {
00130         boost::unique_lock<boost::mutex> lock(mMutex);
00131 
00132         RequestDataMap::iterator it = mRequestData.find(req->getIdentifier());
00133         // We want
00134         //assert(it != mRequestData.end());
00135         // but can't assume it, see note in updatePriority
00136         if (it == mRequestData.end()) return;
00137 
00138         // Remove from the list of input requests
00139         for(TransferRequestList::iterator in_it = it->second.inputRequests.begin();
00140             in_it != it->second.inputRequests.end();
00141             in_it++) {
00142             // Strict equality on the shared ptrs, the "unique" ids on
00143             // TransferRequests are not actually unique
00144             if (*in_it == req) {
00145                 it->second.inputRequests.erase(in_it);
00146                 break;
00147             }
00148         }
00149 
00150         // If we've run out of requests, forward the deletion on via the
00151         // aggregate and clean up.
00152         if (it->second.inputRequests.empty()) {
00153             setRequestDeletion(it->second.aggregateRequest);
00154             mDeltaQueue.push(it->second.aggregateRequest);
00155             mRequestData.erase(it);
00156         }
00157         else {
00158             // Otherwise, update priority
00159             setRequestPriority(it->second.aggregateRequest, mAggregationAlgorithm->aggregate(it->second.inputRequests));
00160             mDeltaQueue.push(it->second.aggregateRequest);
00161         }
00162 
00163     }
00164 
00165 private:
00166     // Friend in TransferMediator so it can construct, call getRequest
00167     friend class TransferMediator;
00168 
00169     AggregatedTransferPool(const std::string &clientID)
00170      : TransferPool(clientID)
00171     {
00172         // FIXME should be customizable
00173         mAggregationAlgorithm = new MaxPriorityAggregation();
00174     }
00175 
00176     //Returns an item from the pool. Blocks if pool is empty.
00177     inline std::tr1::shared_ptr<TransferRequest> getRequest() {
00178         std::tr1::shared_ptr<TransferRequest> retval;
00179         mDeltaQueue.blockingPop(retval);
00180         return retval;
00181     }
00182 
00183 
00184     // Handle metadata callback, sending data to callbacks
00185     void handleMetadata(const String input_identifier, MetadataRequestPtr req, RemoteFileMetadataPtr response) {
00186         // Since callbacks may manipulate this TransferPool, grab the data
00187         // needed for callbacks, unlock, then invoke them
00188         TransferRequestList inputRequests;
00189         {
00190             boost::unique_lock<boost::mutex> lock(mMutex);
00191 
00192             // Need to search by input identifier because the unique id of req is
00193             // different than the one we index by.
00194             RequestDataMap::iterator it = mRequestData.find(input_identifier);
00195             // We'd like to do
00196             //assert(it != mRequestData.end());
00197             // but because of threading that could be incorrect -- the
00198             // TransferMediator could finish and post the callback,
00199             // then we might have another request for the same data
00200             // come in, causing us to re-request it. We think its a
00201             // priority update, but TransferMediator now adds it as an
00202             // additional request. Instead, we can just ignore the
00203             // callback if we don't have data for it -- we've already
00204             // invoked the callback that caused the problem.
00205             if (it == mRequestData.end()) return;
00206 
00207             inputRequests.swap(it->second.inputRequests);
00208             mRequestData.erase(it);
00209         }
00210 
00211         // Just forward the callback and clear out the data. Because the request
00212         // isn't guaranteed to have the response, we need to
00213         for(TransferRequestList::iterator in_it = inputRequests.begin();
00214             in_it != inputRequests.end();
00215             in_it++) {
00216             MetadataRequestPtr metadata_req = std::tr1::dynamic_pointer_cast<MetadataRequest>(*in_it);
00217             metadata_req->notifyCaller(metadata_req, req, response);
00218         }
00219     }
00220     void handleChunk(const String input_identifier, ChunkRequestPtr req, DenseDataPtr response) {
00221         // Since callbacks may manipulate this TransferPool, grab the data
00222         // needed for callbacks, unlock, then invoke them
00223         TransferRequestList inputRequests;
00224         {
00225             boost::unique_lock<boost::mutex> lock(mMutex);
00226 
00227             // Need to search by input identifier because the unique id of req is
00228             // different than the one we index by.
00229             RequestDataMap::iterator it = mRequestData.find(input_identifier);
00230             // We'd like to do
00231             //assert(it != mRequestData.end());
00232             // but because of threading that could be incorrect -- the
00233             // TransferMediator could finish and post the callback,
00234             // then we might have another request for the same data
00235             // come in, causing us to re-request it. We think its a
00236             // priority update, but TransferMediator now adds it as an
00237             // additional request. Instead, we can just ignore the
00238             // callback if we don't have data for it -- we've already
00239             // invoked the callback that caused the problem.
00240             if (it == mRequestData.end()) return;
00241 
00242             inputRequests.swap(it->second.inputRequests);
00243             mRequestData.erase(it);
00244         }
00245 
00246         // Just forward the callback and clear out the data. Because the request
00247         // isn't guaranteed to have the response, we need to
00248         for(TransferRequestList::iterator in_it = inputRequests.begin();
00249             in_it != inputRequests.end();
00250             in_it++) {
00251             ChunkRequestPtr chunk_req = std::tr1::dynamic_pointer_cast<ChunkRequest>(*in_it);
00252             chunk_req->notifyCaller(chunk_req, req, response);
00253         }
00254     }
00255 
00256     void handleDirectChunk(const String input_identifier, DirectChunkRequestPtr req, DenseDataPtr response) {
00257         TransferRequestList inputRequests;
00258         {
00259             boost::unique_lock<boost::mutex> lock(mMutex);
00260             RequestDataMap::iterator it = mRequestData.find(input_identifier);
00261             if (it == mRequestData.end()) return;
00262             inputRequests.swap(it->second.inputRequests);
00263             mRequestData.erase(it);
00264         }
00265 
00266         for(TransferRequestList::iterator in_it = inputRequests.begin();
00267             in_it != inputRequests.end();
00268             in_it++) {
00269             DirectChunkRequestPtr direct_chunk_req = std::tr1::dynamic_pointer_cast<DirectChunkRequest>(*in_it);
00270             direct_chunk_req->notifyCaller(direct_chunk_req, req, response);
00271         }
00272     }
00273 
00274     void handleUpload(const String input_identifier, UploadRequestPtr req, const Transfer::URI& response) {
00275         TransferRequestList inputRequests;
00276         {
00277             boost::unique_lock<boost::mutex> lock(mMutex);
00278             RequestDataMap::iterator it = mRequestData.find(input_identifier);
00279             if (it == mRequestData.end()) return;
00280             inputRequests.swap(it->second.inputRequests);
00281             mRequestData.erase(it);
00282         }
00283 
00284         for(TransferRequestList::iterator in_it = inputRequests.begin();
00285             in_it != inputRequests.end();
00286             in_it++) {
00287             UploadRequestPtr upload_req = std::tr1::dynamic_pointer_cast<UploadRequest>(*in_it);
00288             // notifyCaller copies info into upload_req, so response doesn't
00289             // need to be passed through
00290             upload_req->notifyCaller(upload_req, req);
00291         }
00292     }
00293 
00294     // Aggregation implementation. FIXME This should be customizable
00295     PriorityAggregationAlgorithm* mAggregationAlgorithm;
00296 
00297     // For each unique request ID, we need to maintain the original requests and
00298     // the request
00299     boost::mutex mMutex; // Protection for mRequestData
00300     typedef std::vector<TransferRequestPtr> TransferRequestList;
00301     struct RequestData {
00302         TransferRequestList inputRequests;
00303         TransferRequestPtr aggregateRequest;
00304     };
00305     // UniqueID -> RequestData
00306     typedef std::tr1::unordered_map<String, RequestData> RequestDataMap;
00307     RequestDataMap mRequestData;
00308 
00309     // Requests to the TransferMediator
00310     ThreadSafeQueue<TransferRequestPtr> mDeltaQueue;
00311 };
00312 
00313 } // namespace Transfer
00314 } // namespace Sirikata
00315 
00316 #endif //_SIRIKATA_CORE_TRANSFER_AGGREGATED_TRANSFER_POOL_HPP_