Sirikata
libcore/include/sirikata/core/transfer/DiskCacheLayer.hpp
Go to the documentation of this file.
00001 /*  Sirikata Transfer -- Content Transfer management system
00002  *  DiskCacheLayer.hpp
00003  *
00004  *  Copyright (c) 2008, Patrick Reiter Horn
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 /*  Created on: Jan 1, 2009 */
00033 
00034 #ifndef SIRIKATA_DiskCacheLayer_HPP__
00035 #define SIRIKATA_DiskCacheLayer_HPP__
00036 
00037 #include <sys/stat.h>
00038 #include <sys/types.h>
00039 
00040 #include <sirikata/core/transfer/CacheLayer.hpp>
00041 #include <sirikata/core/transfer/CacheMap.hpp>
00042 #include <sirikata/core/queue/ThreadSafeQueue.hpp>
00043 #include <sirikata/core/util/Thread.hpp>
00044 
00045 namespace Sirikata {
00046 namespace Transfer {
00047 
00048 // should really be a config option.
00049 //#define NUM_WORKER_THREADS 10
00050 
00051 
00053 class SIRIKATA_EXPORT DiskCacheLayer : public CacheLayer {
00054 public:
00055     struct CacheData : public CacheEntry {
00056         RangeList mRanges;
00057         bool wholeFile() const {
00058             return mRanges.empty();
00059         }
00060         bool contains(const Range &range) const {
00061             if (wholeFile()) {
00062                 return true;
00063             }
00064             return range.isContainedBy(mRanges);
00065         }
00066     };
00067 
00068 private:
00069 
00070     struct DiskRequest;
00071     ThreadSafeQueue<std::tr1::shared_ptr<DiskRequest> > mRequestQueue; // must be initialized before the thread.
00072     Thread *mWorkerThread;
00073 
00074     CacheMap mFiles;
00075 
00076 
00077     std::string mPrefix; // directory or prefix name with trailing slash.
00078 
00079     struct DiskRequest {
00080         enum Operation {OPREAD, OPWRITE, OPDELETE, OPEXIT} op;
00081 
00082         DiskRequest(Operation op, const Fingerprint &id, const Range &myRange)
00083             :op(op), fileId(id), toRead(myRange) {}
00084 
00085         Fingerprint fileId;
00086         Range toRead;
00087         TransferCallback finished;
00088         DenseDataPtr data; // if NULL, read data.
00089 
00090     };
00091 
00092     boost::mutex destroyLock;
00093     boost::condition_variable destroyCV;
00094     bool mCleaningUp; // do not delete any files.
00095 
00096 public:
00097     void workerThread(); // defined in DiskCache.cpp
00098     void unserialize(); // defined in DiskCache.cpp
00099 
00100     void readDataFromDisk(const Fingerprint &fileId,
00101             const Range &requestedRange,
00102             const TransferCallback&callback) {
00103         std::tr1::shared_ptr<DiskRequest> req (
00104                 new DiskRequest(DiskRequest::OPREAD, fileId, requestedRange));
00105         req->finished = callback;
00106 
00107         mRequestQueue.push(req);
00108     }
00109 
00110     void serializeRanges(const RangeList &list, std::string &out) {
00111         std::ostringstream outs;
00112 
00113         for (RangeList::const_iterator liter = list.begin(); liter != list.end(); ++liter) {
00114             cache_ssize_type len = (cache_ssize_type)((*liter).length());
00115             if ((*liter).goesToEndOfFile()) {
00116                 len = -len;
00117             }
00118             outs << (*liter).startbyte() << " " << len << "; ";
00119         }
00120         out = outs.str();
00121     }
00122 
00123     void unserializeRanges(RangeList &rlist, std::istream &iranges) {
00124         while (iranges.good()) {
00125             Range::base_type start = 0;
00126             cache_ssize_type length = 0;
00127             bool toEndOfFile = false;
00128             iranges >> start >> length;
00129 
00130             if (length == 0) {
00131                 continue;
00132             }
00133             if (length < 0) {
00134                 length = -length;
00135                 toEndOfFile = true;
00136             }
00137 
00138             Range toAdd (start, length, LENGTH, toEndOfFile);
00139 
00140             toAdd.addToList(toAdd, rlist);
00141 
00142             char c;
00143             iranges >> c;
00144         }
00145     }
00146 
00147 protected:
00148     virtual void populateCache(const Fingerprint& fileId, const DenseDataPtr &data) {
00149         std::tr1::shared_ptr<DiskRequest> req (
00150                     new DiskRequest(DiskRequest::OPWRITE, fileId, *data));
00151         req->data = data;
00152 
00153         mRequestQueue.push(req);
00154 
00155         CacheLayer::populateParentCaches(req->fileId, data);
00156     }
00157 
00158     virtual void destroyCacheEntry(const Fingerprint &fileId, CacheEntry *cacheLayerData, cache_usize_type releaseSize) {
00159         if (!mCleaningUp) {
00160             // don't want to erase the disk cache when exiting the program.
00161             std::string fileName = fileId.convertToHexString();
00162             std::tr1::shared_ptr<DiskRequest> req
00163                             (new DiskRequest(DiskRequest::OPDELETE, fileId, Range(true)));
00164                         mRequestQueue.push(req);
00165         }
00166         CacheData *toDelete = static_cast<CacheData*>(cacheLayerData);
00167         delete toDelete;
00168     }
00169 
00170 public:
00171 
00172     DiskCacheLayer(CachePolicy *policy, const std::string &prefix, CacheLayer *tryNext);
00173 
00174     virtual ~DiskCacheLayer() {
00175         std::tr1::shared_ptr<DiskRequest> req
00176             (new DiskRequest(DiskRequest::OPEXIT, Fingerprint(), Range(true)));
00177         boost::unique_lock<boost::mutex> sleep_cv(destroyLock);
00178         mRequestQueue.push(req);
00179         destroyCV.wait(sleep_cv); // we know the thread has terminated.
00180 
00181         mCleaningUp = true; // don't allow destroyCacheEntry to delete files.
00182         delete mWorkerThread;
00183     }
00184 
00185     virtual void purgeFromCache(const Fingerprint &fileId) {
00186         CacheMap::write_iterator iter(mFiles);
00187         if (iter.find(fileId)) {
00188             iter.erase();
00189         }
00190         CacheLayer::purgeFromCache(fileId);
00191     }
00192 
00193     virtual void getData(const Fingerprint &fileId,
00194             const Range &requestedRange,
00195             const TransferCallback&callback) {
00196         bool haveRange = false;
00197         {
00198             CacheMap::read_iterator iter(mFiles);
00199 
00200             if (iter.find(fileId)) {
00201                 const CacheData *rlist = static_cast<const CacheData*>(*iter);
00202                 haveRange = rlist->contains(requestedRange);
00203             }
00204             if (haveRange) {
00205                 iter.use(); // or is it more proper to use() after reading from disk?
00206             }
00207         }
00208         if (haveRange) {
00209             readDataFromDisk(fileId, requestedRange, callback);
00210         } else {
00211             CacheLayer::getData(fileId, requestedRange, callback);
00212         }
00213     }
00214 };
00215 
00216 }
00217 }
00218 
00219 
00220 #endif /* SIRIKATA_DiskCache_HPP__ */