Sirikata
|
00001 /* Sirikata 00002 * SQLiteStorage.hpp 00003 * 00004 * Copyright (c) 2011, Stanford University 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef __SIRIKATA_OH_STORAGE_SQLITE_HPP__ 00034 #define __SIRIKATA_OH_STORAGE_SQLITE_HPP__ 00035 00036 #include <sirikata/oh/Storage.hpp> 00037 #include <sirikata/sqlite/SQLite.hpp> 00038 #include <sirikata/core/queue/ThreadSafeQueueWithNotification.hpp> 00039 00040 namespace Sirikata { 00041 namespace OH { 00042 00043 class FileStorageEvent; 00044 00045 class SQLiteStorage : public Storage 00046 { 00047 public: 00048 SQLiteStorage(ObjectHostContext* ctx, const String& dbpath, const Duration& lease_duration); 00049 ~SQLiteStorage(); 00050 00051 virtual void start(); 00052 virtual void stop(); 00053 00054 virtual void leaseBucket(const Bucket& bucket); 00055 virtual void releaseBucket(const Bucket& bucket); 00056 00057 virtual void beginTransaction(const Bucket& bucket); 00058 00059 virtual void commitTransaction(const Bucket& bucket, const CommitCallback& cb = 0, const String& timestamp="current"); 00060 virtual bool erase(const Bucket& bucket, const Key& key, const CommitCallback& cb = 0, const String& timestamp="current"); 00061 virtual bool write(const Bucket& bucket, const Key& key, const String& value, const CommitCallback& cb = 0, const String& timestamp="current"); 00062 virtual bool read(const Bucket& bucket, const Key& key, const CommitCallback& cb = 0, const String& timestamp="current"); 00063 virtual bool compare(const Bucket& bucket, const Key& key, const String& value, const CommitCallback& cb = 0, const String& timestamp="current"); 00064 virtual bool rangeRead(const Bucket& bucket, const Key& start, const Key& finish, const CommitCallback& cb = 0, const String& timestamp="current"); 00065 virtual bool rangeErase(const Bucket& bucket, const Key& start, const Key& finish, const CommitCallback& cb = 0, const String& timestamp="current"); 00066 virtual bool count(const Bucket& bucket, const Key& start, const Key& finish, const CountCallback& cb = 0, const String& timestamp="current"); 00067 00068 private: 00069 // StorageActions are individual actions to take, i.e. read, write, 00070 // erase. We queue them up in a list and eventually fire them off in a 00071 // transaction. 00072 struct StorageAction { 00073 enum Type { 00074 Read, 00075 ReadRange, 00076 Compare, 00077 Write, 00078 Erase, 00079 EraseRange, 00080 Error 00081 }; 00082 00083 StorageAction(); 00084 StorageAction(const StorageAction& rhs); 00085 ~StorageAction(); 00086 00087 StorageAction& operator=(const StorageAction& rhs); 00088 00089 // Executes this action. Assumes the owning SQLiteStorage has setup the transaction. 00090 Result execute(SQLiteDBPtr db, const Bucket& bucket, ReadSet* rs); 00091 00092 // Executes this action, retrying the given number of times if there's a 00093 // temporary failure to lock the database. Assumes the owning 00094 // SQLiteStorage has setup the transaction. 00095 Result executeWithRetry(SQLiteDBPtr db, const Bucket& bucket, ReadSet* rs, int32 retries, const Duration& retry_wait); 00096 00097 // Bucket is implicit, passed into execute 00098 Type type; 00099 Key key; 00100 Key keyEnd; // Only relevant for *Range and Count 00101 String* value; 00102 }; 00103 00104 typedef std::vector<StorageAction> Transaction; 00105 typedef std::tr1::unordered_map<Bucket, Transaction*, Bucket::Hasher> BucketTransactions; 00106 00107 // We keep a queue of transactions and trigger handlers, which can process 00108 // more than one at a time, on the storage IOService 00109 struct TransactionData { 00110 TransactionData() 00111 : bucket(), trans(NULL), cb() 00112 {} 00113 TransactionData(const Bucket& b, Transaction* t, CommitCallback c) 00114 : bucket(b), trans(t), cb(c) 00115 {} 00116 00117 Bucket bucket; 00118 Transaction* trans; 00119 CommitCallback cb; 00120 }; 00121 typedef ThreadSafeQueueWithNotification<TransactionData> TransactionQueue; 00122 00123 // Helper that checks and logs errors, then returns bool indicating 00124 // success/failure 00125 static bool checkSQLiteError(SQLiteDBPtr db, int rc, const String& msg); 00126 00127 // Initializes the database. This is separate from the main initialization 00128 // function because we need to make sure it executes in the right thread so 00129 // all sqlite requests on the db ptr come from the same thread. 00130 void initDB(); 00131 00132 // Gets the current transaction or creates one. Also can return whether the 00133 // transaction was just created, e.g. to tell whether an operation is an 00134 // implicit transaction. 00135 Transaction* getTransaction(const Bucket& bucket, bool* is_new = NULL); 00136 00137 // Indirection to get on mIOService 00138 void postProcessTransactions(); 00139 // Process transactions. Runs until queue is empty and is triggered anytime 00140 // the queue goes from empty to non-empty. 00141 void processTransactions(); 00142 00143 // Tries to execute a commit *assuming it is within a SQL 00144 // transaction*. Returns whether it was successful, allowing for 00145 // rollback/retrying. 00146 Result executeCommit(const Bucket& bucket, Transaction* trans, CommitCallback cb, ReadSet** read_set_out); 00147 00148 void executeCount(const String value_count, const Key& start, const Key& finish, CountCallback cb); 00149 00150 // A few helper methods that wrap sql operations. 00151 bool sqlBeginTransaction(); 00152 bool sqlCommit(); 00153 bool sqlRollback(); 00154 00155 00156 // Helpers for leases: 00157 // Get the current lease string, which includes our client ID and 00158 // an expiration time based on the current time 00159 String getLeaseString(); 00160 // Parse a lease string read from the DB into the client ID 00161 // (owner) and expiration time. 00162 void parseLeaseString(const String& ls, String* client_out, Time* expiration_out); 00163 00164 // Acquire a lease (or update if it's already valid) for the given 00165 // bucket. This is part of a transaction -- the first part to 00166 // ensure the transaction is valid 00167 Result acquireLease(const Bucket& bucket); 00168 // Renew a lease that we already have. Verifies we still hold the 00169 // lease, then renews it. This is an entire transaction. 00170 void renewLease(const Bucket& bucket); 00171 // Release the lease if we own it. 00172 void releaseLease(const Bucket& bucket); 00173 00174 // Process renewals at front of queue that need updating. 00175 void processRenewals(); 00176 00177 ObjectHostContext* mContext; 00178 BucketTransactions mTransactions; 00179 String mDBFilename; 00180 SQLiteDBPtr mDB; 00181 00182 // FIXME because we don't have proper multithreaded support in cppoh, we 00183 // need to allocate our own thread dedicated to IO 00184 Network::IOService* mIOService; 00185 Network::IOWork* mWork; 00186 Thread* mThread; 00187 00188 // A unique client ID for leases. These should not include '-' as 00189 // those are used to separate the client ID and timestamp 00190 const String mSQLClientID; 00191 const Duration mLeaseDuration; 00192 00193 TransactionQueue mTransactionQueue; 00194 // Maximum transactions to combine into a single transaction in the 00195 // underlying database. TODO(ewencp) this should probably be dynamic, should 00196 // increase/decrease based on success/failure and avoid latency getting too 00197 // hight. Right now we just have a reasonable, but small, number. 00198 uint32 mMaxCoalescedTransactions; 00199 00200 // Amount of time to sleep between retries. Shouldn't be too big or you can 00201 // back up all storage, but should be long enough that transient errors such 00202 // as waiting for other threads to unlock are likely to be resolved. 00203 const Duration mRetrySleepDuration; 00204 // Number of times to retry a normal operation (user transaction requests) 00205 // and lease operations (acquiring/releasing locks). The latter should be 00206 // more aggressive about retrying, whereas the former can rely on 00207 // application-level retries when transient errors are detected. 00208 const int32 mNormalOpRetries; 00209 const int32 mLeaseOpRetries; 00210 00211 struct BucketRenewTimeout { 00212 BucketRenewTimeout(const Bucket& _b, Time _t) 00213 : bucket(_b), t(_t) 00214 {} 00215 const Bucket bucket; 00216 const Time t; 00217 }; 00218 std::queue<BucketRenewTimeout> mRenewTimes; 00219 Network::IOTimerPtr mRenewTimer; 00220 }; 00221 00222 }//end namespace OH 00223 }//end namespace Sirikata 00224 00225 #endif //__SIRIKATA_OH_STORAGE_SQLITE_HPP__