Sirikata
liboh/plugins/sqlite/SQLiteStorage.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  SQLiteStorage.hpp
00003  *
00004  *  Copyright (c) 2011, Stanford University
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef __SIRIKATA_OH_STORAGE_SQLITE_HPP__
00034 #define __SIRIKATA_OH_STORAGE_SQLITE_HPP__
00035 
00036 #include <sirikata/oh/Storage.hpp>
00037 #include <sirikata/sqlite/SQLite.hpp>
00038 #include <sirikata/core/queue/ThreadSafeQueueWithNotification.hpp>
00039 
00040 namespace Sirikata {
00041 namespace OH {
00042 
00043 class FileStorageEvent;
00044 
00045 class SQLiteStorage : public Storage
00046 {
00047 public:
00048     SQLiteStorage(ObjectHostContext* ctx, const String& dbpath, const Duration& lease_duration);
00049     ~SQLiteStorage();
00050 
00051     virtual void start();
00052     virtual void stop();
00053 
00054     virtual void leaseBucket(const Bucket& bucket);
00055     virtual void releaseBucket(const Bucket& bucket);
00056 
00057     virtual void beginTransaction(const Bucket& bucket);
00058 
00059     virtual void commitTransaction(const Bucket& bucket, const CommitCallback& cb = 0, const String& timestamp="current");
00060     virtual bool erase(const Bucket& bucket, const Key& key, const CommitCallback& cb = 0, const String& timestamp="current");
00061     virtual bool write(const Bucket& bucket, const Key& key, const String& value, const CommitCallback& cb = 0, const String& timestamp="current");
00062     virtual bool read(const Bucket& bucket, const Key& key, const CommitCallback& cb = 0, const String& timestamp="current");
00063     virtual bool compare(const Bucket& bucket, const Key& key, const String& value, const CommitCallback& cb = 0, const String& timestamp="current");
00064     virtual bool rangeRead(const Bucket& bucket, const Key& start, const Key& finish, const CommitCallback& cb = 0, const String& timestamp="current");
00065     virtual bool rangeErase(const Bucket& bucket, const Key& start, const Key& finish, const CommitCallback& cb = 0, const String& timestamp="current");
00066     virtual bool count(const Bucket& bucket, const Key& start, const Key& finish, const CountCallback& cb = 0, const String& timestamp="current");
00067 
00068 private:
00069     // StorageActions are individual actions to take, i.e. read, write,
00070     // erase. We queue them up in a list and eventually fire them off in a
00071     // transaction.
00072     struct StorageAction {
00073         enum Type {
00074             Read,
00075             ReadRange,
00076             Compare,
00077             Write,
00078             Erase,
00079             EraseRange,
00080             Error
00081         };
00082 
00083         StorageAction();
00084         StorageAction(const StorageAction& rhs);
00085         ~StorageAction();
00086 
00087         StorageAction& operator=(const StorageAction& rhs);
00088 
00089         // Executes this action. Assumes the owning SQLiteStorage has setup the transaction.
00090         Result execute(SQLiteDBPtr db, const Bucket& bucket, ReadSet* rs);
00091 
00092         // Executes this action, retrying the given number of times if there's a
00093         // temporary failure to lock the database. Assumes the owning
00094         // SQLiteStorage has setup the transaction.
00095         Result executeWithRetry(SQLiteDBPtr db, const Bucket& bucket, ReadSet* rs, int32 retries, const Duration& retry_wait);
00096 
00097         // Bucket is implicit, passed into execute
00098         Type type;
00099         Key key;
00100         Key keyEnd; // Only relevant for *Range and Count
00101         String* value;
00102     };
00103 
00104     typedef std::vector<StorageAction> Transaction;
00105     typedef std::tr1::unordered_map<Bucket, Transaction*, Bucket::Hasher> BucketTransactions;
00106 
00107     // We keep a queue of transactions and trigger handlers, which can process
00108     // more than one at a time, on the storage IOService
00109     struct TransactionData {
00110         TransactionData()
00111          : bucket(), trans(NULL), cb()
00112         {}
00113         TransactionData(const Bucket& b, Transaction* t, CommitCallback c)
00114          : bucket(b), trans(t), cb(c)
00115         {}
00116 
00117         Bucket bucket;
00118         Transaction* trans;
00119         CommitCallback cb;
00120     };
00121     typedef ThreadSafeQueueWithNotification<TransactionData> TransactionQueue;
00122 
00123     // Helper that checks and logs errors, then returns bool indicating
00124     // success/failure
00125     static bool checkSQLiteError(SQLiteDBPtr db, int rc, const String& msg);
00126 
00127     // Initializes the database. This is separate from the main initialization
00128     // function because we need to make sure it executes in the right thread so
00129     // all sqlite requests on the db ptr come from the same thread.
00130     void initDB();
00131 
00132     // Gets the current transaction or creates one. Also can return whether the
00133     // transaction was just created, e.g. to tell whether an operation is an
00134     // implicit transaction.
00135     Transaction* getTransaction(const Bucket& bucket, bool* is_new = NULL);
00136 
00137     // Indirection to get on mIOService
00138     void postProcessTransactions();
00139     // Process transactions. Runs until queue is empty and is triggered anytime
00140     // the queue goes from empty to non-empty.
00141     void processTransactions();
00142 
00143     // Tries to execute a commit *assuming it is within a SQL
00144     // transaction*. Returns whether it was successful, allowing for
00145     // rollback/retrying.
00146     Result executeCommit(const Bucket& bucket, Transaction* trans, CommitCallback cb, ReadSet** read_set_out);
00147 
00148     void executeCount(const String value_count, const Key& start, const Key& finish, CountCallback cb);
00149 
00150     // A few helper methods that wrap sql operations.
00151     bool sqlBeginTransaction();
00152     bool sqlCommit();
00153     bool sqlRollback();
00154 
00155 
00156     // Helpers for leases:
00157     // Get the current lease string, which includes our client ID and
00158     // an expiration time based on the current time
00159     String getLeaseString();
00160     // Parse a lease string read from the DB into the client ID
00161     // (owner) and expiration time.
00162     void parseLeaseString(const String& ls, String* client_out, Time* expiration_out);
00163 
00164     // Acquire a lease (or update if it's already valid) for the given
00165     // bucket. This is part of a transaction -- the first part to
00166     // ensure the transaction is valid
00167     Result acquireLease(const Bucket& bucket);
00168     // Renew a lease that we already have. Verifies we still hold the
00169     // lease, then renews it. This is an entire transaction.
00170     void renewLease(const Bucket& bucket);
00171     // Release the lease if we own it.
00172     void releaseLease(const Bucket& bucket);
00173 
00174     // Process renewals at front of queue that need updating.
00175     void processRenewals();
00176 
00177     ObjectHostContext* mContext;
00178     BucketTransactions mTransactions;
00179     String mDBFilename;
00180     SQLiteDBPtr mDB;
00181 
00182     // FIXME because we don't have proper multithreaded support in cppoh, we
00183     // need to allocate our own thread dedicated to IO
00184     Network::IOService* mIOService;
00185     Network::IOWork* mWork;
00186     Thread* mThread;
00187 
00188     // A unique client ID for leases. These should not include '-' as
00189     // those are used to separate the client ID and timestamp
00190     const String mSQLClientID;
00191     const Duration mLeaseDuration;
00192 
00193     TransactionQueue mTransactionQueue;
00194     // Maximum transactions to combine into a single transaction in the
00195     // underlying database. TODO(ewencp) this should probably be dynamic, should
00196     // increase/decrease based on success/failure and avoid latency getting too
00197     // hight. Right now we just have a reasonable, but small, number.
00198     uint32 mMaxCoalescedTransactions;
00199 
00200     // Amount of time to sleep between retries. Shouldn't be too big or you can
00201     // back up all storage, but should be long enough that transient errors such
00202     // as waiting for other threads to unlock are likely to be resolved.
00203     const Duration mRetrySleepDuration;
00204     // Number of times to retry a normal operation (user transaction requests)
00205     // and lease operations (acquiring/releasing locks). The latter should be
00206     // more aggressive about retrying, whereas the former can rely on
00207     // application-level retries when transient errors are detected.
00208     const int32 mNormalOpRetries;
00209     const int32 mLeaseOpRetries;
00210 
00211     struct BucketRenewTimeout {
00212         BucketRenewTimeout(const Bucket& _b, Time _t)
00213          : bucket(_b), t(_t)
00214         {}
00215         const Bucket bucket;
00216         const Time t;
00217     };
00218     std::queue<BucketRenewTimeout> mRenewTimes;
00219     Network::IOTimerPtr mRenewTimer;
00220 };
00221 
00222 }//end namespace OH
00223 }//end namespace Sirikata
00224 
00225 #endif //__SIRIKATA_OH_STORAGE_SQLITE_HPP__