Sirikata
|
00001 /* Sirikata Transfer -- Content Transfer mediation 00002 * TransferMediator.hpp 00003 * 00004 * Copyright (c) 2010, Jeff Terrace 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 /* Created on: Jan 15, 2010 */ 00033 00034 #ifndef SIRIKATA_TransferMediator_HPP__ 00035 #define SIRIKATA_TransferMediator_HPP__ 00036 00037 #include <sirikata/core/transfer/TransferPool.hpp> 00038 #include <boost/multi_index_container.hpp> 00039 #include <boost/multi_index/ordered_index.hpp> 00040 #include <boost/multi_index/identity.hpp> 00041 #include <boost/multi_index/member.hpp> 00042 #include <boost/multi_index/mem_fun.hpp> 00043 #include <boost/multi_index/hashed_index.hpp> 00044 #include <boost/lambda/lambda.hpp> 00045 #include <sirikata/core/network/IOService.hpp> 00046 #include <sirikata/core/network/Asio.hpp> 00047 #include <sirikata/core/util/Thread.hpp> 00048 #include <sirikata/core/util/Singleton.hpp> 00049 00050 #include <sirikata/core/command/Commander.hpp> 00051 00052 namespace Sirikata { 00053 namespace Transfer { 00054 00055 using namespace boost::multi_index; 00056 00057 /* 00058 * Mediates requests for name lookups and chunk downloads 00059 */ 00060 class SIRIKATA_EXPORT TransferMediator 00061 : public AutoSingleton<TransferMediator> { 00062 00063 /* 00064 * Used to aggregate requests from different clients. If multiple clients request 00065 * the same file, this object keeps track of the original separate requests so that 00066 * each client's callback can be called when the request finishes. It also uses the 00067 * PriorityAggregation interface to aggregate multiple client priorities into a single 00068 * aggregated priority. 00069 */ 00070 class AggregateRequest { 00071 public: 00072 //Stores the aggregated priority 00073 Priority mPriority; 00074 // Whether we've started processing this request. 00075 bool mExecuting; 00076 private: 00077 //Maps each client's string ID to the original TransferRequest object 00078 std::map<std::string, std::tr1::shared_ptr<TransferRequest> > mTransferReqs; 00079 00080 //Aggregated request unique identifier 00081 const std::string mIdentifier; 00082 00083 //Updates the aggregated priority from each client's priority when needed 00084 void updateAggregatePriority(); 00085 00086 public: 00087 //Returns a map from each client ID to their original TransferRequest object 00088 const std::map<std::string, std::tr1::shared_ptr<TransferRequest> > & getTransferRequests() const; 00089 00090 //Since there is overlap between requests here, this returns a single TransferRequest from the list of clients 00091 std::tr1::shared_ptr<TransferRequest> getSingleRequest(); 00092 00093 //Adds an additional client's request 00094 void setClientPriority(std::tr1::shared_ptr<TransferRequest> req); 00095 00096 //Removes a client request from this aggregate request 00097 void removeClient(std::string clientID); 00098 00099 //Returns a unique identifier for this aggregated request 00100 const std::string& getIdentifier() const; 00101 00102 //Returns the aggregated priority value 00103 Priority getPriority() const; 00104 00105 //Pass in the first client's request 00106 AggregateRequest(std::tr1::shared_ptr<TransferRequest> req); 00107 }; 00108 00109 //lock this to access mAggregatedList 00110 boost::mutex mAggMutex; 00111 00112 //tags used to index AggregateList (see boost::multi_index) 00113 struct tagID{}; 00114 struct tagPriority{}; 00115 00116 /* 00117 * This multi_index_container allows the efficient retrieval of an AggregateRequest 00118 * either by its identifier or sorted by its priority 00119 */ 00120 typedef multi_index_container< 00121 std::tr1::shared_ptr<AggregateRequest>, 00122 indexed_by< 00123 hashed_unique<tag<tagID>, const_mem_fun<AggregateRequest,const std::string &,&AggregateRequest::getIdentifier> >, 00124 ordered_non_unique<tag<tagPriority>, 00125 member<AggregateRequest,Priority,&AggregateRequest::mPriority>, 00126 std::greater<Priority> > 00127 > 00128 > AggregateList; 00129 AggregateList mAggregateList; 00130 00131 //access iterators for AggregateList for convenience (see boost::multi_index) 00132 typedef AggregateList::index<tagID>::type AggregateListByID; 00133 typedef AggregateList::index<tagPriority>::type AggregateListByPriority; 00134 00135 /* 00136 * Used to process the queue of requests coming from a single client. 00137 * There is one PoolWorker for each client, and this makes it so that the client's 00138 * thread doesn't have to block when inserting a request. Instead, the PoolWorkers 00139 * block against each other when inserting the request into the aggregated list. 00140 */ 00141 class PoolWorker { 00142 private: 00143 //The TransferPool associated with this worker 00144 std::tr1::shared_ptr<TransferPool> mTransferPool; 00145 00146 //The worker's thread 00147 Thread * mWorkerThread; 00148 00149 //Set to true when worker should shut down 00150 bool mCleanup; 00151 00152 //Runs the worker thread 00153 void run(); 00154 00155 public: 00156 //Initialize with the associated TransferPool 00157 PoolWorker(std::tr1::shared_ptr<TransferPool> transferPool); 00158 00159 //Returns the TransferPool this was initialized with 00160 std::tr1::shared_ptr<TransferPool> getTransferPool() const; 00161 00162 //Returns the worker's thread 00163 Thread * getThread() const; 00164 00165 //Signal to shut down 00166 void cleanup(); 00167 }; 00168 00169 friend class PoolWorker; 00170 //Helper for compatibility with compilers where TransferPool declaring 00171 //TransferMediator as a friend does not give PoolWorker access 00172 static inline std::tr1::shared_ptr<TransferRequest> getRequest(std::tr1::shared_ptr<TransferPool> pool) { 00173 return pool->getRequest(); 00174 } 00175 00176 //Maps a client ID string to the PoolWorker class 00177 typedef std::map<std::string, std::tr1::shared_ptr<PoolWorker> > PoolType; 00178 //Stores the list of pools 00179 PoolType mPools; 00180 //lock this to access mPools 00181 boost::shared_mutex mPoolMutex; 00182 00183 //Set to true to signal shutdown 00184 bool mCleanup; 00185 //Number of outstanding requests 00186 uint32 mNumOutstanding; 00187 00188 //TransferMediator's worker thread 00189 Thread* mThread; 00190 00191 // Algorithm used to aggregate priorities of requests 00192 PriorityAggregationAlgorithm* mAggregationAlgorithm; 00193 00194 //Main thread that handles the input pools 00195 void mediatorThread(); 00196 00197 //Callback for when an executed request finishes 00198 void execute_finished(std::tr1::shared_ptr<TransferRequest> req, std::string id); 00199 00200 //Check our internal queue to see what request to process next 00201 void checkQueue(); 00202 00203 void registerPool(TransferPoolPtr pool); 00204 00205 00206 void commandListRequests(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00207 public: 00208 static TransferMediator& getSingleton(); 00209 static void destroy(); 00210 00211 TransferMediator(); 00212 ~TransferMediator(); 00213 00214 00219 void registerContext(Context* ctx); 00220 00226 template<typename TPoolType> 00227 std::tr1::shared_ptr<TPoolType> registerClient(const std::string& clientID) { 00228 std::tr1::shared_ptr<TPoolType> ret(new TPoolType(clientID)); 00229 registerPool(ret); 00230 return ret; 00231 } 00232 00233 //Call when system should be shut down 00234 void cleanup(); 00235 }; 00236 00237 } 00238 } 00239 00240 #endif /* SIRIKATA_TransferMediator_HPP__ */