Sirikata
|
00001 /* Sirikata 00002 * CassandraStorage.hpp 00003 * 00004 * Copyright (c) 2011, Stanford University 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef __SIRIKATA_OH_STORAGE_CASSANDRA_HPP__ 00034 #define __SIRIKATA_OH_STORAGE_CASSANDRA_HPP__ 00035 00036 #include <sirikata/oh/Storage.hpp> 00037 #include <sirikata/cassandra/Cassandra.hpp> 00038 00039 namespace Sirikata { 00040 namespace OH { 00041 00042 class FileStorageEvent; 00043 00044 class CassandraStorage : public Storage 00045 { 00046 public: 00047 CassandraStorage(ObjectHostContext* ctx, const String& host, int port, const Duration& lease_duration); 00048 ~CassandraStorage(); 00049 00050 virtual void start(); 00051 virtual void stop(); 00052 00053 virtual void leaseBucket(const Bucket& bucket); 00054 virtual void releaseBucket(const Bucket& bucket); 00055 00056 virtual void beginTransaction(const Bucket& bucket); 00057 virtual void commitTransaction(const Bucket& bucket, const CommitCallback& cb = 0, const String& timestamp="current"); 00058 virtual bool erase(const Bucket& bucket, const Key& key, const CommitCallback& cb = 0, const String& timestamp="current"); 00059 virtual bool write(const Bucket& bucket, const Key& key, const String& value, const CommitCallback& cb = 0, const String& timestamp="current"); 00060 virtual bool read(const Bucket& bucket, const Key& key, const CommitCallback& cb = 0, const String& timestamp="current"); 00061 virtual bool rangeRead(const Bucket& bucket, const Key& start, const Key& finish, const CommitCallback& cb = 0, const String& timestamp="current"); 00062 virtual bool rangeErase(const Bucket& bucket, const Key& start, const Key& finish, const CommitCallback& cb = 0, const String& timestamp="current"); 00063 virtual bool compare(const Bucket& bucket, const Key& key, const String& value, const CommitCallback& cb = 0, const String& timestamp="current"); 00064 virtual bool count(const Bucket& bucket, const Key& start, const Key& finish, const CountCallback& cb = 0, const String& timestamp="current"); 00065 00066 00067 private: 00068 00069 typedef std::vector<String> Keys; 00070 typedef org::apache::cassandra::Column Column; 00071 typedef std::vector<Column> Columns; 00072 typedef org::apache::cassandra::SliceRange SliceRange; 00073 typedef std::vector<SliceRange> SliceRanges; 00074 typedef org::apache::cassandra::ColumnParent ColumnParent; 00075 typedef org::apache::cassandra::SlicePredicate SlicePredicate; 00076 00077 typedef std::tr1::tuple<String, //column family 00078 String, //row key 00079 String, //super column name 00080 Columns, //columns to write 00081 Keys //keys to erase 00082 > batchTuple; 00083 00084 // StorageActions are individual actions to take, i.e. read, write, 00085 // erase. We queue them up in a list and eventually fire them off in a 00086 // transaction. 00087 struct StorageAction { 00088 enum Type { 00089 Read, 00090 ReadRange, 00091 Compare, 00092 Write, 00093 Erase, 00094 EraseRange, 00095 Error 00096 }; 00097 00098 StorageAction(); 00099 StorageAction(const StorageAction& rhs); 00100 ~StorageAction(); 00101 00102 StorageAction& operator=(const StorageAction& rhs); 00103 00104 // Executes this action: push action to lists and wait for commitment 00105 void execute(const Bucket& bucket, Columns* columns, Keys* eraseKeys, Keys* readKeys, SliceRanges* readRanges, ReadSet* compares, SliceRanges* eraseRanges, const String& timestamp); 00106 00107 // Bucket is implicit, passed into execute 00108 Type type; 00109 Key key; 00110 Key keyEnd; 00111 String* value; 00112 }; 00113 00114 typedef std::vector<StorageAction> Transaction; 00115 typedef std::tr1::unordered_map<Bucket, Transaction*, Bucket::Hasher> BucketTransactions; 00116 00117 // Initializes the database. 00118 void initDB(); 00119 00120 // Gets the current transaction or creates one. Also can return whether the 00121 // transaction was just created, e.g. to tell whether an operation is an 00122 // implicit transaction. 00123 Transaction* getTransaction(const Bucket& bucket, bool* is_new = NULL); 00124 00125 // Executes a commit. Runs in a separate thread, so the transaction is 00126 // passed in directly 00127 void executeCommit(const Bucket& bucket, Transaction* trans, CommitCallback cb, const String& timestamp); 00128 00129 void executeCount(const Bucket& bucket, ColumnParent& parent, SlicePredicate& predicate, CountCallback cb, const String& timestamp); 00130 00131 // Complete a commit back in the main thread, cleaning it up and dispatching the callback 00132 void completeCommit(Transaction* trans, CommitCallback cb, Result success, ReadSet* rs); 00133 void completeCount(CountCallback cb, Result success, int32 count); 00134 00135 // Call libcassandra methods to commit transcation 00136 Result CassandraCommit(CassandraDBPtr db, const Bucket& bucket, Columns* columns, Keys* eraseKeys, Keys* readKeys, SliceRanges* readRanges, ReadSet* compares, SliceRanges* eraseRanges, ReadSet* rs, const String& timestamp); 00137 00138 00139 // Helpers for leases: 00140 // The LeaseRequestSet is a client's view of all the lease requests. It's 00141 // just a set of client IDs that were found as keys in the request row for 00142 // the bucket. 00143 typedef std::set<String> LeaseRequestSet; 00144 // Get the 'bucket' (Cassandra row) name for lease negotiation for this 00145 // object 00146 String getLeaseBucketName(const Bucket& bucket); 00147 // Read all requests for leases against the given bucket. Returns the other 00148 // clients we see requesting a lease (removes our own ID). 00149 LeaseRequestSet readLeaseRequests(const Bucket& bucket); 00150 00151 // Acquire a lease (or update if it's already valid) for the given 00152 // bucket. This is part of a transaction -- the first part to 00153 // ensure the transaction is valid 00154 Result acquireLease(const Bucket& bucket); 00155 // Renew a lease that we already have. Verifies we still hold the 00156 // lease, then renews it. This is an entire transaction. 00157 void renewLease(const Bucket& bucket); 00158 // Release the lease if we own it. 00159 void releaseLease(const Bucket& bucket); 00160 00161 // Process renewals at front of queue that need updating. 00162 void processRenewals(); 00163 00164 00165 00166 ObjectHostContext* mContext; 00167 BucketTransactions mTransactions; 00168 String mDBHost; //host name of Cassandra server 00169 int mDBPort; 00170 CassandraDBPtr mDB; 00171 00172 Network::IOService* mIOService; 00173 Network::IOWork* mWork; 00174 Thread* mThread; 00175 00176 // A unique client ID for leases. These should not include '-' as 00177 // those are used to separate the client ID and timestamp 00178 const String mClientID; 00179 const Duration mLeaseDuration; 00180 // Track which objects we have active leases on 00181 typedef std::tr1::unordered_set<Bucket, Bucket::Hasher> LeaseSet; 00182 LeaseSet mLeases; 00183 00184 struct BucketRenewTimeout { 00185 BucketRenewTimeout(const Bucket& _b, Time _t) 00186 : bucket(_b), t(_t) 00187 {} 00188 const Bucket bucket; 00189 const Time t; 00190 }; 00191 std::queue<BucketRenewTimeout> mRenewTimes; 00192 Network::IOTimerPtr mRenewTimer; 00193 }; 00194 00195 }//end namespace OH 00196 }//end namespace Sirikata 00197 00198 #endif //__SIRIKATA_OH_STORAGE_Cassandra_HPP__