Sirikata
libcore/include/sirikata/core/transfer/TransferMediator.hpp
Go to the documentation of this file.
00001 /*  Sirikata Transfer -- Content Transfer mediation
00002  *  TransferMediator.hpp
00003  *
00004  *  Copyright (c) 2010, Jeff Terrace
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 /*  Created on: Jan 15, 2010 */
00033 
00034 #ifndef SIRIKATA_TransferMediator_HPP__
00035 #define SIRIKATA_TransferMediator_HPP__
00036 
00037 #include <sirikata/core/transfer/TransferPool.hpp>
00038 #include <boost/multi_index_container.hpp>
00039 #include <boost/multi_index/ordered_index.hpp>
00040 #include <boost/multi_index/identity.hpp>
00041 #include <boost/multi_index/member.hpp>
00042 #include <boost/multi_index/mem_fun.hpp>
00043 #include <boost/multi_index/hashed_index.hpp>
00044 #include <boost/lambda/lambda.hpp>
00045 #include <sirikata/core/network/IOService.hpp>
00046 #include <sirikata/core/network/Asio.hpp>
00047 #include <sirikata/core/util/Thread.hpp>
00048 #include <sirikata/core/util/Singleton.hpp>
00049 
00050 #include <sirikata/core/command/Commander.hpp>
00051 
00052 namespace Sirikata {
00053 namespace Transfer {
00054 
00055 using namespace boost::multi_index;
00056 
00057 /*
00058  * Mediates requests for name lookups and chunk downloads
00059  */
00060 class SIRIKATA_EXPORT TransferMediator
00061     : public AutoSingleton<TransferMediator> {
00062 
00063     /*
00064      * Used to aggregate requests from different clients. If multiple clients request
00065      * the same file, this object keeps track of the original separate requests so that
00066      * each client's callback can be called when the request finishes. It also uses the
00067      * PriorityAggregation interface to aggregate multiple client priorities into a single
00068      * aggregated priority.
00069      */
00070     class AggregateRequest {
00071     public:
00072         //Stores the aggregated priority
00073         Priority mPriority;
00074             // Whether we've started processing this request.
00075             bool mExecuting;
00076     private:
00077         //Maps each client's string ID to the original TransferRequest object
00078         std::map<std::string, std::tr1::shared_ptr<TransferRequest> > mTransferReqs;
00079 
00080         //Aggregated request unique identifier
00081         const std::string mIdentifier;
00082 
00083         //Updates the aggregated priority from each client's priority when needed
00084         void updateAggregatePriority();
00085 
00086     public:
00087         //Returns a map from each client ID to their original TransferRequest object
00088         const std::map<std::string, std::tr1::shared_ptr<TransferRequest> > & getTransferRequests() const;
00089 
00090         //Since there is overlap between requests here, this returns a single TransferRequest from the list of clients
00091         std::tr1::shared_ptr<TransferRequest> getSingleRequest();
00092 
00093         //Adds an additional client's request
00094         void setClientPriority(std::tr1::shared_ptr<TransferRequest> req);
00095 
00096         //Removes a client request from this aggregate request
00097         void removeClient(std::string clientID);
00098 
00099         //Returns a unique identifier for this aggregated request
00100         const std::string& getIdentifier() const;
00101 
00102         //Returns the aggregated priority value
00103         Priority getPriority() const;
00104 
00105         //Pass in the first client's request
00106         AggregateRequest(std::tr1::shared_ptr<TransferRequest> req);
00107     };
00108 
00109     //lock this to access mAggregatedList
00110     boost::mutex mAggMutex;
00111 
00112     //tags used to index AggregateList (see boost::multi_index)
00113     struct tagID{};
00114     struct tagPriority{};
00115 
00116     /*
00117      * This multi_index_container allows the efficient retrieval of an AggregateRequest
00118      * either by its identifier or sorted by its priority
00119      */
00120     typedef multi_index_container<
00121         std::tr1::shared_ptr<AggregateRequest>,
00122         indexed_by<
00123             hashed_unique<tag<tagID>, const_mem_fun<AggregateRequest,const std::string &,&AggregateRequest::getIdentifier> >,
00124             ordered_non_unique<tag<tagPriority>,
00125             member<AggregateRequest,Priority,&AggregateRequest::mPriority>,
00126             std::greater<Priority> >
00127         >
00128     > AggregateList;
00129     AggregateList mAggregateList;
00130 
00131     //access iterators for AggregateList for convenience (see boost::multi_index)
00132     typedef AggregateList::index<tagID>::type AggregateListByID;
00133     typedef AggregateList::index<tagPriority>::type AggregateListByPriority;
00134 
00135     /*
00136      * Used to process the queue of requests coming from a single client.
00137      * There is one PoolWorker for each client, and this makes it so that the client's
00138      * thread doesn't have to block when inserting a request. Instead, the PoolWorkers
00139      * block against each other when inserting the request into the aggregated list.
00140      */
00141     class PoolWorker {
00142     private:
00143         //The TransferPool associated with this worker
00144         std::tr1::shared_ptr<TransferPool> mTransferPool;
00145 
00146         //The worker's thread
00147         Thread * mWorkerThread;
00148 
00149         //Set to true when worker should shut down
00150         bool mCleanup;
00151 
00152         //Runs the worker thread
00153         void run();
00154 
00155     public:
00156         //Initialize with the associated TransferPool
00157         PoolWorker(std::tr1::shared_ptr<TransferPool> transferPool);
00158 
00159         //Returns the TransferPool this was initialized with
00160         std::tr1::shared_ptr<TransferPool> getTransferPool() const;
00161 
00162         //Returns the worker's thread
00163         Thread * getThread() const;
00164 
00165         //Signal to shut down
00166         void cleanup();
00167     };
00168 
00169     friend class PoolWorker;
00170     //Helper for compatibility with compilers where TransferPool declaring
00171     //TransferMediator as a friend does not give PoolWorker access
00172     static inline std::tr1::shared_ptr<TransferRequest> getRequest(std::tr1::shared_ptr<TransferPool> pool) {
00173         return pool->getRequest();
00174     }
00175 
00176     //Maps a client ID string to the PoolWorker class
00177     typedef std::map<std::string, std::tr1::shared_ptr<PoolWorker> > PoolType;
00178     //Stores the list of pools
00179     PoolType mPools;
00180     //lock this to access mPools
00181     boost::shared_mutex mPoolMutex;
00182 
00183     //Set to true to signal shutdown
00184     bool mCleanup;
00185     //Number of outstanding requests
00186     uint32 mNumOutstanding;
00187 
00188     //TransferMediator's worker thread
00189     Thread* mThread;
00190 
00191     // Algorithm used to aggregate priorities of requests
00192     PriorityAggregationAlgorithm* mAggregationAlgorithm;
00193 
00194     //Main thread that handles the input pools
00195     void mediatorThread();
00196 
00197     //Callback for when an executed request finishes
00198     void execute_finished(std::tr1::shared_ptr<TransferRequest> req, std::string id);
00199 
00200     //Check our internal queue to see what request to process next
00201     void checkQueue();
00202 
00203     void registerPool(TransferPoolPtr pool);
00204 
00205 
00206     void commandListRequests(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00207 public:
00208     static TransferMediator& getSingleton();
00209     static void destroy();
00210 
00211     TransferMediator();
00212     ~TransferMediator();
00213 
00214 
00219     void registerContext(Context* ctx);
00220 
00226     template<typename TPoolType>
00227     std::tr1::shared_ptr<TPoolType> registerClient(const std::string& clientID) {
00228         std::tr1::shared_ptr<TPoolType> ret(new TPoolType(clientID));
00229         registerPool(ret);
00230         return ret;
00231     }
00232 
00233     //Call when system should be shut down
00234     void cleanup();
00235 };
00236 
00237 }
00238 }
00239 
00240 #endif /* SIRIKATA_TransferMediator_HPP__ */