Sirikata
|
00001 /* Sirikata Transfer -- Content Transfer management system 00002 * DiskCacheLayer.hpp 00003 * 00004 * Copyright (c) 2008, Patrick Reiter Horn 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 /* Created on: Jan 1, 2009 */ 00033 00034 #ifndef SIRIKATA_DiskCacheLayer_HPP__ 00035 #define SIRIKATA_DiskCacheLayer_HPP__ 00036 00037 #include <sys/stat.h> 00038 #include <sys/types.h> 00039 00040 #include <sirikata/core/transfer/CacheLayer.hpp> 00041 #include <sirikata/core/transfer/CacheMap.hpp> 00042 #include <sirikata/core/queue/ThreadSafeQueue.hpp> 00043 #include <sirikata/core/util/Thread.hpp> 00044 00045 namespace Sirikata { 00046 namespace Transfer { 00047 00048 // should really be a config option. 00049 //#define NUM_WORKER_THREADS 10 00050 00051 00053 class SIRIKATA_EXPORT DiskCacheLayer : public CacheLayer { 00054 public: 00055 struct CacheData : public CacheEntry { 00056 RangeList mRanges; 00057 bool wholeFile() const { 00058 return mRanges.empty(); 00059 } 00060 bool contains(const Range &range) const { 00061 if (wholeFile()) { 00062 return true; 00063 } 00064 return range.isContainedBy(mRanges); 00065 } 00066 }; 00067 00068 private: 00069 00070 struct DiskRequest; 00071 ThreadSafeQueue<std::tr1::shared_ptr<DiskRequest> > mRequestQueue; // must be initialized before the thread. 00072 Thread *mWorkerThread; 00073 00074 CacheMap mFiles; 00075 00076 00077 std::string mPrefix; // directory or prefix name with trailing slash. 00078 00079 struct DiskRequest { 00080 enum Operation {OPREAD, OPWRITE, OPDELETE, OPEXIT} op; 00081 00082 DiskRequest(Operation op, const Fingerprint &id, const Range &myRange) 00083 :op(op), fileId(id), toRead(myRange) {} 00084 00085 Fingerprint fileId; 00086 Range toRead; 00087 TransferCallback finished; 00088 DenseDataPtr data; // if NULL, read data. 00089 00090 }; 00091 00092 boost::mutex destroyLock; 00093 boost::condition_variable destroyCV; 00094 bool mCleaningUp; // do not delete any files. 00095 00096 public: 00097 void workerThread(); // defined in DiskCache.cpp 00098 void unserialize(); // defined in DiskCache.cpp 00099 00100 void readDataFromDisk(const Fingerprint &fileId, 00101 const Range &requestedRange, 00102 const TransferCallback&callback) { 00103 std::tr1::shared_ptr<DiskRequest> req ( 00104 new DiskRequest(DiskRequest::OPREAD, fileId, requestedRange)); 00105 req->finished = callback; 00106 00107 mRequestQueue.push(req); 00108 } 00109 00110 void serializeRanges(const RangeList &list, std::string &out) { 00111 std::ostringstream outs; 00112 00113 for (RangeList::const_iterator liter = list.begin(); liter != list.end(); ++liter) { 00114 cache_ssize_type len = (cache_ssize_type)((*liter).length()); 00115 if ((*liter).goesToEndOfFile()) { 00116 len = -len; 00117 } 00118 outs << (*liter).startbyte() << " " << len << "; "; 00119 } 00120 out = outs.str(); 00121 } 00122 00123 void unserializeRanges(RangeList &rlist, std::istream &iranges) { 00124 while (iranges.good()) { 00125 Range::base_type start = 0; 00126 cache_ssize_type length = 0; 00127 bool toEndOfFile = false; 00128 iranges >> start >> length; 00129 00130 if (length == 0) { 00131 continue; 00132 } 00133 if (length < 0) { 00134 length = -length; 00135 toEndOfFile = true; 00136 } 00137 00138 Range toAdd (start, length, LENGTH, toEndOfFile); 00139 00140 toAdd.addToList(toAdd, rlist); 00141 00142 char c; 00143 iranges >> c; 00144 } 00145 } 00146 00147 protected: 00148 virtual void populateCache(const Fingerprint& fileId, const DenseDataPtr &data) { 00149 std::tr1::shared_ptr<DiskRequest> req ( 00150 new DiskRequest(DiskRequest::OPWRITE, fileId, *data)); 00151 req->data = data; 00152 00153 mRequestQueue.push(req); 00154 00155 CacheLayer::populateParentCaches(req->fileId, data); 00156 } 00157 00158 virtual void destroyCacheEntry(const Fingerprint &fileId, CacheEntry *cacheLayerData, cache_usize_type releaseSize) { 00159 if (!mCleaningUp) { 00160 // don't want to erase the disk cache when exiting the program. 00161 std::string fileName = fileId.convertToHexString(); 00162 std::tr1::shared_ptr<DiskRequest> req 00163 (new DiskRequest(DiskRequest::OPDELETE, fileId, Range(true))); 00164 mRequestQueue.push(req); 00165 } 00166 CacheData *toDelete = static_cast<CacheData*>(cacheLayerData); 00167 delete toDelete; 00168 } 00169 00170 public: 00171 00172 DiskCacheLayer(CachePolicy *policy, const std::string &prefix, CacheLayer *tryNext); 00173 00174 virtual ~DiskCacheLayer() { 00175 std::tr1::shared_ptr<DiskRequest> req 00176 (new DiskRequest(DiskRequest::OPEXIT, Fingerprint(), Range(true))); 00177 boost::unique_lock<boost::mutex> sleep_cv(destroyLock); 00178 mRequestQueue.push(req); 00179 destroyCV.wait(sleep_cv); // we know the thread has terminated. 00180 00181 mCleaningUp = true; // don't allow destroyCacheEntry to delete files. 00182 delete mWorkerThread; 00183 } 00184 00185 virtual void purgeFromCache(const Fingerprint &fileId) { 00186 CacheMap::write_iterator iter(mFiles); 00187 if (iter.find(fileId)) { 00188 iter.erase(); 00189 } 00190 CacheLayer::purgeFromCache(fileId); 00191 } 00192 00193 virtual void getData(const Fingerprint &fileId, 00194 const Range &requestedRange, 00195 const TransferCallback&callback) { 00196 bool haveRange = false; 00197 { 00198 CacheMap::read_iterator iter(mFiles); 00199 00200 if (iter.find(fileId)) { 00201 const CacheData *rlist = static_cast<const CacheData*>(*iter); 00202 haveRange = rlist->contains(requestedRange); 00203 } 00204 if (haveRange) { 00205 iter.use(); // or is it more proper to use() after reading from disk? 00206 } 00207 } 00208 if (haveRange) { 00209 readDataFromDisk(fileId, requestedRange, callback); 00210 } else { 00211 CacheLayer::getData(fileId, requestedRange, callback); 00212 } 00213 } 00214 }; 00215 00216 } 00217 } 00218 00219 00220 #endif /* SIRIKATA_DiskCache_HPP__ */