Sirikata
|
#include <CassandraStorage.hpp>
Classes | |
struct | BucketRenewTimeout |
struct | StorageAction |
Public Member Functions | |
CassandraStorage (ObjectHostContext *ctx, const String &host, int port, const Duration &lease_duration) | |
~CassandraStorage () | |
virtual void | start () |
Service Interface. | |
virtual void | stop () |
virtual void | leaseBucket (const Bucket &bucket) |
Trigger a lease on a bucket. | |
virtual void | releaseBucket (const Bucket &bucket) |
Release whatever lease or lock was acquired by the corresponding call to leaseBucket. | |
virtual void | beginTransaction (const Bucket &bucket) |
Begin a transaction. | |
virtual void | commitTransaction (const Bucket &bucket, const CommitCallback &cb=0, const String ×tamp="current") |
Completes a transaction and requests that it be written to Flushes all outstanding events (writes and removes) from pending queue. | |
virtual bool | erase (const Bucket &bucket, const Key &key, const CommitCallback &cb=0, const String ×tamp="current") |
virtual bool | write (const Bucket &bucket, const Key &key, const String &value, const CommitCallback &cb=0, const String ×tamp="current") |
Queues writes to the item named by entryName:itemName. | |
virtual bool | read (const Bucket &bucket, const Key &key, const CommitCallback &cb=0, const String ×tamp="current") |
virtual bool | rangeRead (const Bucket &bucket, const Key &start, const Key &finish, const CommitCallback &cb=0, const String ×tamp="current") |
virtual bool | rangeErase (const Bucket &bucket, const Key &start, const Key &finish, const CommitCallback &cb=0, const String ×tamp="current") |
virtual bool | compare (const Bucket &bucket, const Key &key, const String &value, const CommitCallback &cb=0, const String ×tamp="current") |
Add a comparison operation to the transaction. | |
virtual bool | count (const Bucket &bucket, const Key &start, const Key &finish, const CountCallback &cb=0, const String ×tamp="current") |
Private Types | |
typedef std::vector< String > | Keys |
typedef org::apache::cassandra::Column | Column |
typedef std::vector< Column > | Columns |
typedef org::apache::cassandra::SliceRange | SliceRange |
typedef std::vector< SliceRange > | SliceRanges |
typedef org::apache::cassandra::ColumnParent | ColumnParent |
typedef org::apache::cassandra::SlicePredicate | SlicePredicate |
typedef std::tr1::tuple < String, String, String, Columns, Keys > | batchTuple |
typedef std::vector < StorageAction > | Transaction |
typedef std::tr1::unordered_map < Bucket, Transaction *, Bucket::Hasher > | BucketTransactions |
typedef std::set< String > | LeaseRequestSet |
typedef std::tr1::unordered_set < Bucket, Bucket::Hasher > | LeaseSet |
Private Member Functions | |
void | initDB () |
Transaction * | getTransaction (const Bucket &bucket, bool *is_new=NULL) |
void | executeCommit (const Bucket &bucket, Transaction *trans, CommitCallback cb, const String ×tamp) |
void | executeCount (const Bucket &bucket, ColumnParent &parent, SlicePredicate &predicate, CountCallback cb, const String ×tamp) |
void | completeCommit (Transaction *trans, CommitCallback cb, Result success, ReadSet *rs) |
void | completeCount (CountCallback cb, Result success, int32 count) |
Result | CassandraCommit (CassandraDBPtr db, const Bucket &bucket, Columns *columns, Keys *eraseKeys, Keys *readKeys, SliceRanges *readRanges, ReadSet *compares, SliceRanges *eraseRanges, ReadSet *rs, const String ×tamp) |
String | getLeaseBucketName (const Bucket &bucket) |
LeaseRequestSet | readLeaseRequests (const Bucket &bucket) |
Result | acquireLease (const Bucket &bucket) |
void | renewLease (const Bucket &bucket) |
void | releaseLease (const Bucket &bucket) |
void | processRenewals () |
Private Attributes | |
ObjectHostContext * | mContext |
BucketTransactions | mTransactions |
String | mDBHost |
int | mDBPort |
CassandraDBPtr | mDB |
Network::IOService * | mIOService |
Network::IOWork * | mWork |
Thread * | mThread |
const String | mClientID |
const Duration | mLeaseDuration |
LeaseSet | mLeases |
std::queue< BucketRenewTimeout > | mRenewTimes |
Network::IOTimerPtr | mRenewTimer |
typedef std::tr1::tuple<String, String, String, Columns, Keys > Sirikata::OH::CassandraStorage::batchTuple [private] |
typedef std::tr1::unordered_map<Bucket, Transaction*, Bucket::Hasher> Sirikata::OH::CassandraStorage::BucketTransactions [private] |
typedef org::apache::cassandra::Column Sirikata::OH::CassandraStorage::Column [private] |
typedef org::apache::cassandra::ColumnParent Sirikata::OH::CassandraStorage::ColumnParent [private] |
typedef std::vector<Column> Sirikata::OH::CassandraStorage::Columns [private] |
typedef std::vector<String> Sirikata::OH::CassandraStorage::Keys [private] |
typedef std::set<String> Sirikata::OH::CassandraStorage::LeaseRequestSet [private] |
typedef std::tr1::unordered_set<Bucket, Bucket::Hasher> Sirikata::OH::CassandraStorage::LeaseSet [private] |
typedef org::apache::cassandra::SlicePredicate Sirikata::OH::CassandraStorage::SlicePredicate [private] |
typedef org::apache::cassandra::SliceRange Sirikata::OH::CassandraStorage::SliceRange [private] |
typedef std::vector<SliceRange> Sirikata::OH::CassandraStorage::SliceRanges [private] |
typedef std::vector<StorageAction> Sirikata::OH::CassandraStorage::Transaction [private] |
Sirikata::OH::CassandraStorage::CassandraStorage | ( | ObjectHostContext * | ctx, |
const String & | host, | ||
int | port, | ||
const Duration & | lease_duration | ||
) |
Sirikata::OH::CassandraStorage::~CassandraStorage | ( | ) |
Storage::Result Sirikata::OH::CassandraStorage::acquireLease | ( | const Bucket & | bucket | ) | [private] |
References Sirikata::Logging::detailed, Sirikata::Logging::error, getLeaseBucketName(), LEASES_CF_NAME, Sirikata::OH::Storage::LOCK_ERROR, mClientID, mDB, mLeaseDuration, mLeases, mRenewTimes, Sirikata::Timer::now(), readLeaseRequests(), SILOG, Sirikata::Timer::sleep(), and Sirikata::OH::Storage::SUCCESS.
Referenced by executeCommit().
void Sirikata::OH::CassandraStorage::beginTransaction | ( | const Bucket & | bucket | ) | [virtual] |
Storage::Result Sirikata::OH::CassandraStorage::CassandraCommit | ( | CassandraDBPtr | db, |
const Bucket & | bucket, | ||
Columns * | columns, | ||
Keys * | eraseKeys, | ||
Keys * | readKeys, | ||
SliceRanges * | readRanges, | ||
ReadSet * | compares, | ||
SliceRanges * | eraseRanges, | ||
ReadSet * | rs, | ||
const String & | timestamp | ||
) | [private] |
References CF_NAME, Sirikata::Logging::fatal, mDB, Sirikata::UUID::rawHexData(), SILOG, Sirikata::OH::Storage::SUCCESS, and Sirikata::OH::Storage::TRANSACTION_ERROR.
Referenced by executeCommit().
void Sirikata::OH::CassandraStorage::commitTransaction | ( | const Bucket & | bucket, |
const CommitCallback & | cb = 0 , |
||
const String & | timestamp = "current" |
||
) | [virtual] |
Completes a transaction and requests that it be written to Flushes all outstanding events (writes and removes) from pending queue.
Resets pending queue as well.
Implements Sirikata::OH::Storage.
References completeCommit(), executeCommit(), getTransaction(), mIOService, mTransactions, Sirikata::Network::IOService::post(), and Sirikata::OH::Storage::SUCCESS.
Referenced by compare(), erase(), rangeErase(), rangeRead(), read(), and write().
bool Sirikata::OH::CassandraStorage::compare | ( | const Bucket & | bucket, |
const Key & | key, | ||
const String & | value, | ||
const CommitCallback & | cb = 0 , |
||
const String & | timestamp = "current" |
||
) | [virtual] |
Add a comparison operation to the transaction.
The given key is read and compared to the value. If the comparison is false, it causes the entire transaction to fail.
{Key} | key the key to compare to |
{String} | value What should be written into that item |
{CommitCallback} | cb optional commit callback which is invoked if this is a single operation transaction. |
{String} | timestamp the timestamp of the operation |
Implements Sirikata::OH::Storage.
References commitTransaction(), Sirikata::OH::CassandraStorage::StorageAction::Compare, getTransaction(), Sirikata::OH::CassandraStorage::StorageAction::key, Sirikata::OH::CassandraStorage::StorageAction::type, and Sirikata::OH::CassandraStorage::StorageAction::value.
void Sirikata::OH::CassandraStorage::completeCommit | ( | Transaction * | trans, |
CommitCallback | cb, | ||
Result | success, | ||
ReadSet * | rs | ||
) | [private] |
Referenced by commitTransaction(), and executeCommit().
void Sirikata::OH::CassandraStorage::completeCount | ( | CountCallback | cb, |
Result | success, | ||
int32 | count | ||
) | [private] |
Referenced by executeCount().
bool Sirikata::OH::CassandraStorage::count | ( | const Bucket & | bucket, |
const Key & | start, | ||
const Key & | finish, | ||
const CountCallback & | cb = 0 , |
||
const String & | timestamp = "current" |
||
) | [virtual] |
{Key} | from the start key of range of keys to count |
{Key} | finish the end key of range of keys to count |
{CommitCallback} | cb optional commit callback which is invoked if this is a single operation transaction. |
{String} | timestamp the timestamp of the operation |
Implements Sirikata::OH::Storage.
References CF_NAME, executeCount(), mIOService, Sirikata::Network::IOService::post(), and start().
Referenced by executeCount().
bool Sirikata::OH::CassandraStorage::erase | ( | const Bucket & | bucket, |
const Key & | key, | ||
const CommitCallback & | cb = 0 , |
||
const String & | timestamp = "current" |
||
) | [virtual] |
{Key} | key the key to erase |
{CommitCallback} | cb optional commit callback which is invoked if this is a single operation transaction. |
{String} | timestamp the timestamp of the operation |
Queues the item to be removed from the backend. Does not actually delete until the flush operation is called.
Erasing a non-existant key does not cause an error -- it's just treated as a noop. If you care that a key is actually erase, use a compare + delete combination.
Implements Sirikata::OH::Storage.
References commitTransaction(), Sirikata::OH::CassandraStorage::StorageAction::Erase, getTransaction(), Sirikata::OH::CassandraStorage::StorageAction::key, and Sirikata::OH::CassandraStorage::StorageAction::type.
void Sirikata::OH::CassandraStorage::executeCommit | ( | const Bucket & | bucket, |
Transaction * | trans, | ||
CommitCallback | cb, | ||
const String & | timestamp | ||
) | [private] |
References acquireLease(), CassandraCommit(), completeCommit(), Sirikata::Context::mainStrand, mContext, mDB, Sirikata::Network::IOStrand::post(), and Sirikata::OH::Storage::SUCCESS.
Referenced by commitTransaction().
void Sirikata::OH::CassandraStorage::executeCount | ( | const Bucket & | bucket, |
ColumnParent & | parent, | ||
SlicePredicate & | predicate, | ||
CountCallback | cb, | ||
const String & | timestamp | ||
) | [private] |
String Sirikata::OH::CassandraStorage::getLeaseBucketName | ( | const Bucket & | bucket | ) | [private] |
References Sirikata::UUID::rawHexData().
Referenced by acquireLease(), readLeaseRequests(), releaseLease(), and renewLease().
CassandraStorage::Transaction * Sirikata::OH::CassandraStorage::getTransaction | ( | const Bucket & | bucket, |
bool * | is_new = NULL |
||
) | [private] |
References mTransactions.
Referenced by beginTransaction(), commitTransaction(), compare(), erase(), rangeErase(), rangeRead(), read(), and write().
void Sirikata::OH::CassandraStorage::initDB | ( | ) | [private] |
void Sirikata::OH::CassandraStorage::leaseBucket | ( | const Bucket & | bucket | ) | [virtual] |
Trigger a lease on a bucket.
This is just a signal to the storage layer that it should try to make sure it can commit transactions for the bucket. Depending on the implementation and options, this could mean anything from doing nothing (single process, local operation) to a full lease or lock (distributed storage).
Implements Sirikata::OH::Storage.
void Sirikata::OH::CassandraStorage::processRenewals | ( | ) | [private] |
References mRenewTimer, mRenewTimes, Sirikata::Timer::now(), and renewLease().
Referenced by start().
bool Sirikata::OH::CassandraStorage::rangeErase | ( | const Bucket & | bucket, |
const Key & | start, | ||
const Key & | finish, | ||
const CommitCallback & | cb = 0 , |
||
const String & | timestamp = "current" |
||
) | [virtual] |
{Key} | from the start key of range of keys to erase |
{Key} | finish the end key of range of keys to erase |
{CommitCallback} | cb optional commit callback which is invoked if this is a single operation transaction. |
{String} | timestamp the timestamp of the operation |
Implements Sirikata::OH::Storage.
References commitTransaction(), Sirikata::OH::CassandraStorage::StorageAction::EraseRange, getTransaction(), Sirikata::OH::CassandraStorage::StorageAction::key, Sirikata::OH::CassandraStorage::StorageAction::keyEnd, start(), and Sirikata::OH::CassandraStorage::StorageAction::type.
bool Sirikata::OH::CassandraStorage::rangeRead | ( | const Bucket & | bucket, |
const Key & | start, | ||
const Key & | finish, | ||
const CommitCallback & | cb = 0 , |
||
const String & | timestamp = "current" |
||
) | [virtual] |
{Key} | from the start key of range of keys to read |
{Key} | finish the end key of range of keys to read |
{CommitCallback} | cb optional commit callback which is invoked if this is a single operation transaction. |
{String} | timestamp the timestamp of the operation |
Implements Sirikata::OH::Storage.
References commitTransaction(), getTransaction(), Sirikata::OH::CassandraStorage::StorageAction::key, Sirikata::OH::CassandraStorage::StorageAction::keyEnd, Sirikata::OH::CassandraStorage::StorageAction::ReadRange, start(), and Sirikata::OH::CassandraStorage::StorageAction::type.
bool Sirikata::OH::CassandraStorage::read | ( | const Bucket & | bucket, |
const Key & | key, | ||
const CommitCallback & | cb = 0 , |
||
const String & | timestamp = "current" |
||
) | [virtual] |
{Key} | key the key to read |
{CommitCallback} | cb optional commit callback which is invoked if this is a single operation transaction. |
{String} | timestamp the timestamp of the operation |
Implements Sirikata::OH::Storage.
References commitTransaction(), getTransaction(), Sirikata::OH::CassandraStorage::StorageAction::key, Sirikata::OH::CassandraStorage::StorageAction::Read, and Sirikata::OH::CassandraStorage::StorageAction::type.
CassandraStorage::LeaseRequestSet Sirikata::OH::CassandraStorage::readLeaseRequests | ( | const Bucket & | bucket | ) | [private] |
References getLeaseBucketName(), LEASES_CF_NAME, mClientID, and mDB.
Referenced by acquireLease().
void Sirikata::OH::CassandraStorage::releaseBucket | ( | const Bucket & | bucket | ) | [virtual] |
Release whatever lease or lock was acquired by the corresponding call to leaseBucket.
Implements Sirikata::OH::Storage.
References Sirikata::Logging::detailed, mIOService, Sirikata::Network::IOService::post(), releaseLease(), and SILOG.
void Sirikata::OH::CassandraStorage::releaseLease | ( | const Bucket & | bucket | ) | [private] |
References Sirikata::Logging::detailed, Sirikata::Logging::error, getLeaseBucketName(), LEASES_CF_NAME, mClientID, mDB, mLeases, and SILOG.
Referenced by releaseBucket().
void Sirikata::OH::CassandraStorage::renewLease | ( | const Bucket & | bucket | ) | [private] |
References Sirikata::Logging::detailed, Sirikata::Logging::error, getLeaseBucketName(), LEASES_CF_NAME, mClientID, mDB, mLeaseDuration, mRenewTimes, Sirikata::Timer::now(), and SILOG.
Referenced by processRenewals().
void Sirikata::OH::CassandraStorage::start | ( | ) | [virtual] |
Service Interface.
Reimplemented from Sirikata::OH::Storage.
References Sirikata::Network::IOTimer::create(), initDB(), mIOService, mRenewTimer, mThread, mWork, Sirikata::Network::IOService::post(), processRenewals(), and Sirikata::Network::IOService::runNoReturn().
Referenced by count(), rangeErase(), and rangeRead().
void Sirikata::OH::CassandraStorage::stop | ( | ) | [virtual] |
Reimplemented from Sirikata::OH::Storage.
References Sirikata::Thread::join(), mIOService, mRenewTimer, mThread, mTransactions, and mWork.
bool Sirikata::OH::CassandraStorage::write | ( | const Bucket & | bucket, |
const Key & | key, | ||
const String & | value, | ||
const CommitCallback & | cb = 0 , |
||
const String & | timestamp = "current" |
||
) | [virtual] |
Queues writes to the item named by entryName:itemName.
Writes will not be committed until flush command. Note, if issue this command multiple times with the same entryName:itemName, will only process the last when calling flush.
{Key} | key the key to erase |
{String} | value What should be written into that item |
{CommitCallback} | cb optional commit callback which is invoked if this is a single operation transaction. |
{String} | timestamp the timestamp of the operation |
Implements Sirikata::OH::Storage.
References commitTransaction(), getTransaction(), Sirikata::OH::CassandraStorage::StorageAction::key, Sirikata::OH::CassandraStorage::StorageAction::type, Sirikata::OH::CassandraStorage::StorageAction::value, and Sirikata::OH::CassandraStorage::StorageAction::Write.
const String Sirikata::OH::CassandraStorage::mClientID [private] |
Referenced by acquireLease(), readLeaseRequests(), releaseLease(), and renewLease().
Referenced by executeCommit(), and executeCount().
CassandraDBPtr Sirikata::OH::CassandraStorage::mDB [private] |
Referenced by acquireLease(), CassandraCommit(), executeCommit(), executeCount(), initDB(), readLeaseRequests(), releaseLease(), and renewLease().
String Sirikata::OH::CassandraStorage::mDBHost [private] |
Referenced by initDB().
int Sirikata::OH::CassandraStorage::mDBPort [private] |
Referenced by initDB().
Referenced by commitTransaction(), count(), releaseBucket(), start(), and stop().
const Duration Sirikata::OH::CassandraStorage::mLeaseDuration [private] |
Referenced by acquireLease(), and renewLease().
Referenced by acquireLease(), and releaseLease().
Referenced by processRenewals(), start(), and stop().
std::queue<BucketRenewTimeout> Sirikata::OH::CassandraStorage::mRenewTimes [private] |
Referenced by acquireLease(), processRenewals(), and renewLease().
Thread* Sirikata::OH::CassandraStorage::mThread [private] |
Referenced by commitTransaction(), getTransaction(), and stop().