Sirikata
liboh/plugins/cassandra/CassandraStorage.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  CassandraStorage.hpp
00003  *
00004  *  Copyright (c) 2011, Stanford University
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef __SIRIKATA_OH_STORAGE_CASSANDRA_HPP__
00034 #define __SIRIKATA_OH_STORAGE_CASSANDRA_HPP__
00035 
00036 #include <sirikata/oh/Storage.hpp>
00037 #include <sirikata/cassandra/Cassandra.hpp>
00038 
00039 namespace Sirikata {
00040 namespace OH {
00041 
00042 class FileStorageEvent;
00043 
00044 class CassandraStorage : public Storage
00045 {
00046 public:
00047     CassandraStorage(ObjectHostContext* ctx, const String& host, int port, const Duration& lease_duration);
00048     ~CassandraStorage();
00049 
00050     virtual void start();
00051     virtual void stop();
00052 
00053     virtual void leaseBucket(const Bucket& bucket);
00054     virtual void releaseBucket(const Bucket& bucket);
00055 
00056     virtual void beginTransaction(const Bucket& bucket);
00057     virtual void commitTransaction(const Bucket& bucket, const CommitCallback& cb = 0, const String& timestamp="current");
00058     virtual bool erase(const Bucket& bucket, const Key& key, const CommitCallback& cb = 0, const String& timestamp="current");
00059     virtual bool write(const Bucket& bucket, const Key& key, const String& value, const CommitCallback& cb = 0, const String& timestamp="current");
00060     virtual bool read(const Bucket& bucket, const Key& key, const CommitCallback& cb = 0, const String& timestamp="current");
00061     virtual bool rangeRead(const Bucket& bucket, const Key& start, const Key& finish, const CommitCallback& cb = 0, const String& timestamp="current");
00062     virtual bool rangeErase(const Bucket& bucket, const Key& start, const Key& finish, const CommitCallback& cb = 0, const String& timestamp="current");
00063     virtual bool compare(const Bucket& bucket, const Key& key, const String& value, const CommitCallback& cb = 0, const String& timestamp="current");
00064     virtual bool count(const Bucket& bucket, const Key& start, const Key& finish, const CountCallback& cb = 0, const String& timestamp="current");
00065 
00066 
00067 private:
00068 
00069     typedef std::vector<String> Keys;
00070     typedef org::apache::cassandra::Column Column;
00071     typedef std::vector<Column> Columns;
00072     typedef org::apache::cassandra::SliceRange SliceRange;
00073     typedef std::vector<SliceRange> SliceRanges;
00074     typedef org::apache::cassandra::ColumnParent ColumnParent;
00075     typedef org::apache::cassandra::SlicePredicate SlicePredicate;
00076 
00077     typedef std::tr1::tuple<String,   //column family
00078                             String,   //row key
00079                             String,   //super column name
00080                             Columns,  //columns to write
00081                             Keys      //keys to erase
00082                           > batchTuple;
00083 
00084     // StorageActions are individual actions to take, i.e. read, write,
00085     // erase. We queue them up in a list and eventually fire them off in a
00086     // transaction.
00087     struct StorageAction {
00088         enum Type {
00089             Read,
00090             ReadRange,
00091             Compare,
00092             Write,
00093             Erase,
00094             EraseRange,
00095             Error
00096         };
00097 
00098         StorageAction();
00099         StorageAction(const StorageAction& rhs);
00100         ~StorageAction();
00101 
00102         StorageAction& operator=(const StorageAction& rhs);
00103 
00104         // Executes this action: push action to lists and wait for commitment
00105         void execute(const Bucket& bucket, Columns* columns, Keys* eraseKeys, Keys* readKeys, SliceRanges* readRanges, ReadSet* compares, SliceRanges* eraseRanges, const String& timestamp);
00106 
00107         // Bucket is implicit, passed into execute
00108         Type type;
00109         Key key;
00110         Key keyEnd;
00111         String* value;
00112     };
00113 
00114     typedef std::vector<StorageAction> Transaction;
00115     typedef std::tr1::unordered_map<Bucket, Transaction*, Bucket::Hasher> BucketTransactions;
00116 
00117     // Initializes the database.
00118     void initDB();
00119 
00120     // Gets the current transaction or creates one. Also can return whether the
00121     // transaction was just created, e.g. to tell whether an operation is an
00122     // implicit transaction.
00123     Transaction* getTransaction(const Bucket& bucket, bool* is_new = NULL);
00124 
00125     // Executes a commit. Runs in a separate thread, so the transaction is
00126     // passed in directly
00127     void executeCommit(const Bucket& bucket, Transaction* trans, CommitCallback cb, const String& timestamp);
00128 
00129     void executeCount(const Bucket& bucket, ColumnParent& parent, SlicePredicate& predicate, CountCallback cb, const String& timestamp);
00130 
00131     // Complete a commit back in the main thread, cleaning it up and dispatching the callback
00132     void completeCommit(Transaction* trans, CommitCallback cb, Result success, ReadSet* rs);
00133     void completeCount(CountCallback cb, Result success, int32 count);
00134 
00135     // Call libcassandra methods to commit transcation
00136     Result CassandraCommit(CassandraDBPtr db, const Bucket& bucket, Columns* columns, Keys* eraseKeys, Keys* readKeys, SliceRanges* readRanges, ReadSet* compares, SliceRanges* eraseRanges, ReadSet* rs, const String& timestamp);
00137 
00138 
00139     // Helpers for leases:
00140     // The LeaseRequestSet is a client's view of all the lease requests. It's
00141     // just a set of client IDs that were found as keys in the request row for
00142     // the bucket.
00143     typedef std::set<String> LeaseRequestSet;
00144     // Get the 'bucket' (Cassandra row) name for lease negotiation for this
00145     // object
00146     String getLeaseBucketName(const Bucket& bucket);
00147     // Read all requests for leases against the given bucket. Returns the other
00148     // clients we see requesting a lease (removes our own ID).
00149     LeaseRequestSet readLeaseRequests(const Bucket& bucket);
00150 
00151     // Acquire a lease (or update if it's already valid) for the given
00152     // bucket. This is part of a transaction -- the first part to
00153     // ensure the transaction is valid
00154     Result acquireLease(const Bucket& bucket);
00155     // Renew a lease that we already have. Verifies we still hold the
00156     // lease, then renews it. This is an entire transaction.
00157     void renewLease(const Bucket& bucket);
00158     // Release the lease if we own it.
00159     void releaseLease(const Bucket& bucket);
00160 
00161     // Process renewals at front of queue that need updating.
00162     void processRenewals();
00163 
00164 
00165 
00166     ObjectHostContext* mContext;
00167     BucketTransactions mTransactions;
00168     String mDBHost;              //host name of Cassandra server
00169     int mDBPort;
00170     CassandraDBPtr mDB;
00171 
00172     Network::IOService* mIOService;
00173     Network::IOWork* mWork;
00174     Thread* mThread;
00175 
00176     // A unique client ID for leases. These should not include '-' as
00177     // those are used to separate the client ID and timestamp
00178     const String mClientID;
00179     const Duration mLeaseDuration;
00180     // Track which objects we have active leases on
00181     typedef std::tr1::unordered_set<Bucket, Bucket::Hasher> LeaseSet;
00182     LeaseSet mLeases;
00183 
00184     struct BucketRenewTimeout {
00185         BucketRenewTimeout(const Bucket& _b, Time _t)
00186          : bucket(_b), t(_t)
00187         {}
00188         const Bucket bucket;
00189         const Time t;
00190     };
00191     std::queue<BucketRenewTimeout> mRenewTimes;
00192     Network::IOTimerPtr mRenewTimer;
00193 };
00194 
00195 }//end namespace OH
00196 }//end namespace Sirikata
00197 
00198 #endif //__SIRIKATA_OH_STORAGE_Cassandra_HPP__