Sirikata
Classes | Public Member Functions | Private Types | Private Member Functions | Private Attributes
Sirikata::OH::CassandraStorage Class Reference

#include <CassandraStorage.hpp>

Inheritance diagram for Sirikata::OH::CassandraStorage:
Collaboration diagram for Sirikata::OH::CassandraStorage:

List of all members.

Classes

struct  BucketRenewTimeout
struct  StorageAction

Public Member Functions

 CassandraStorage (ObjectHostContext *ctx, const String &host, int port, const Duration &lease_duration)
 ~CassandraStorage ()
virtual void start ()
 Service Interface.
virtual void stop ()
virtual void leaseBucket (const Bucket &bucket)
 Trigger a lease on a bucket.
virtual void releaseBucket (const Bucket &bucket)
 Release whatever lease or lock was acquired by the corresponding call to leaseBucket.
virtual void beginTransaction (const Bucket &bucket)
 Begin a transaction.
virtual void commitTransaction (const Bucket &bucket, const CommitCallback &cb=0, const String &timestamp="current")
 Completes a transaction and requests that it be written to Flushes all outstanding events (writes and removes) from pending queue.
virtual bool erase (const Bucket &bucket, const Key &key, const CommitCallback &cb=0, const String &timestamp="current")
virtual bool write (const Bucket &bucket, const Key &key, const String &value, const CommitCallback &cb=0, const String &timestamp="current")
 Queues writes to the item named by entryName:itemName.
virtual bool read (const Bucket &bucket, const Key &key, const CommitCallback &cb=0, const String &timestamp="current")
virtual bool rangeRead (const Bucket &bucket, const Key &start, const Key &finish, const CommitCallback &cb=0, const String &timestamp="current")
virtual bool rangeErase (const Bucket &bucket, const Key &start, const Key &finish, const CommitCallback &cb=0, const String &timestamp="current")
virtual bool compare (const Bucket &bucket, const Key &key, const String &value, const CommitCallback &cb=0, const String &timestamp="current")
 Add a comparison operation to the transaction.
virtual bool count (const Bucket &bucket, const Key &start, const Key &finish, const CountCallback &cb=0, const String &timestamp="current")

Private Types

typedef std::vector< String > Keys
typedef
org::apache::cassandra::Column 
Column
typedef std::vector< ColumnColumns
typedef
org::apache::cassandra::SliceRange 
SliceRange
typedef std::vector< SliceRangeSliceRanges
typedef
org::apache::cassandra::ColumnParent 
ColumnParent
typedef
org::apache::cassandra::SlicePredicate 
SlicePredicate
typedef std::tr1::tuple
< String, String, String,
Columns, Keys
batchTuple
typedef std::vector
< StorageAction
Transaction
typedef
std::tr1::unordered_map
< Bucket, Transaction
*, Bucket::Hasher
BucketTransactions
typedef std::set< String > LeaseRequestSet
typedef
std::tr1::unordered_set
< Bucket, Bucket::Hasher
LeaseSet

Private Member Functions

void initDB ()
TransactiongetTransaction (const Bucket &bucket, bool *is_new=NULL)
void executeCommit (const Bucket &bucket, Transaction *trans, CommitCallback cb, const String &timestamp)
void executeCount (const Bucket &bucket, ColumnParent &parent, SlicePredicate &predicate, CountCallback cb, const String &timestamp)
void completeCommit (Transaction *trans, CommitCallback cb, Result success, ReadSet *rs)
void completeCount (CountCallback cb, Result success, int32 count)
Result CassandraCommit (CassandraDBPtr db, const Bucket &bucket, Columns *columns, Keys *eraseKeys, Keys *readKeys, SliceRanges *readRanges, ReadSet *compares, SliceRanges *eraseRanges, ReadSet *rs, const String &timestamp)
String getLeaseBucketName (const Bucket &bucket)
LeaseRequestSet readLeaseRequests (const Bucket &bucket)
Result acquireLease (const Bucket &bucket)
void renewLease (const Bucket &bucket)
void releaseLease (const Bucket &bucket)
void processRenewals ()

Private Attributes

ObjectHostContextmContext
BucketTransactions mTransactions
String mDBHost
int mDBPort
CassandraDBPtr mDB
Network::IOServicemIOService
Network::IOWorkmWork
ThreadmThread
const String mClientID
const Duration mLeaseDuration
LeaseSet mLeases
std::queue< BucketRenewTimeoutmRenewTimes
Network::IOTimerPtr mRenewTimer

Member Typedef Documentation

typedef std::tr1::tuple<String, String, String, Columns, Keys > Sirikata::OH::CassandraStorage::batchTuple [private]
typedef org::apache::cassandra::Column Sirikata::OH::CassandraStorage::Column [private]
typedef org::apache::cassandra::ColumnParent Sirikata::OH::CassandraStorage::ColumnParent [private]
typedef std::vector<Column> Sirikata::OH::CassandraStorage::Columns [private]
typedef std::vector<String> Sirikata::OH::CassandraStorage::Keys [private]
typedef std::set<String> Sirikata::OH::CassandraStorage::LeaseRequestSet [private]
typedef std::tr1::unordered_set<Bucket, Bucket::Hasher> Sirikata::OH::CassandraStorage::LeaseSet [private]
typedef org::apache::cassandra::SlicePredicate Sirikata::OH::CassandraStorage::SlicePredicate [private]
typedef org::apache::cassandra::SliceRange Sirikata::OH::CassandraStorage::SliceRange [private]

Constructor & Destructor Documentation

Sirikata::OH::CassandraStorage::CassandraStorage ( ObjectHostContext ctx,
const String &  host,
int  port,
const Duration lease_duration 
)
Sirikata::OH::CassandraStorage::~CassandraStorage ( )

Member Function Documentation

Storage::Result Sirikata::OH::CassandraStorage::acquireLease ( const Bucket bucket) [private]
void Sirikata::OH::CassandraStorage::beginTransaction ( const Bucket bucket) [virtual]

Begin a transaction.

Implements Sirikata::OH::Storage.

References getTransaction().

Storage::Result Sirikata::OH::CassandraStorage::CassandraCommit ( CassandraDBPtr  db,
const Bucket bucket,
Columns columns,
Keys eraseKeys,
Keys readKeys,
SliceRanges readRanges,
ReadSet compares,
SliceRanges eraseRanges,
ReadSet rs,
const String &  timestamp 
) [private]
void Sirikata::OH::CassandraStorage::commitTransaction ( const Bucket bucket,
const CommitCallback cb = 0,
const String &  timestamp = "current" 
) [virtual]

Completes a transaction and requests that it be written to Flushes all outstanding events (writes and removes) from pending queue.

Resets pending queue as well.

Implements Sirikata::OH::Storage.

References completeCommit(), executeCommit(), getTransaction(), mIOService, mTransactions, Sirikata::Network::IOService::post(), and Sirikata::OH::Storage::SUCCESS.

Referenced by compare(), erase(), rangeErase(), rangeRead(), read(), and write().

bool Sirikata::OH::CassandraStorage::compare ( const Bucket bucket,
const Key key,
const String &  value,
const CommitCallback cb = 0,
const String &  timestamp = "current" 
) [virtual]

Add a comparison operation to the transaction.

The given key is read and compared to the value. If the comparison is false, it causes the entire transaction to fail.

Parameters:
{Key}key the key to compare to
{String}value What should be written into that item
{CommitCallback}cb optional commit callback which is invoked if this is a single operation transaction.
{String}timestamp the timestamp of the operation
Returns:
{bool} true if the read is queued, false otherwise. Does not indicated success of actual read operation.

Implements Sirikata::OH::Storage.

References commitTransaction(), Sirikata::OH::CassandraStorage::StorageAction::Compare, getTransaction(), Sirikata::OH::CassandraStorage::StorageAction::key, Sirikata::OH::CassandraStorage::StorageAction::type, and Sirikata::OH::CassandraStorage::StorageAction::value.

void Sirikata::OH::CassandraStorage::completeCommit ( Transaction trans,
CommitCallback  cb,
Result  success,
ReadSet rs 
) [private]

Referenced by commitTransaction(), and executeCommit().

void Sirikata::OH::CassandraStorage::completeCount ( CountCallback  cb,
Result  success,
int32  count 
) [private]

Referenced by executeCount().

bool Sirikata::OH::CassandraStorage::count ( const Bucket bucket,
const Key start,
const Key finish,
const CountCallback cb = 0,
const String &  timestamp = "current" 
) [virtual]
Parameters:
{Key}from the start key of range of keys to count
{Key}finish the end key of range of keys to count
{CommitCallback}cb optional commit callback which is invoked if this is a single operation transaction.
{String}timestamp the timestamp of the operation
Returns:
{bool} true if operation is successful, false otherwise

Implements Sirikata::OH::Storage.

References CF_NAME, executeCount(), mIOService, Sirikata::Network::IOService::post(), and start().

Referenced by executeCount().

bool Sirikata::OH::CassandraStorage::erase ( const Bucket bucket,
const Key key,
const CommitCallback cb = 0,
const String &  timestamp = "current" 
) [virtual]
Parameters:
{Key}key the key to erase
{CommitCallback}cb optional commit callback which is invoked if this is a single operation transaction.
{String}timestamp the timestamp of the operation
Returns:
{bool} true if the erase is queued, false otherwise. Does not indicated success of actual erase operation.

Queues the item to be removed from the backend. Does not actually delete until the flush operation is called.

Erasing a non-existant key does not cause an error -- it's just treated as a noop. If you care that a key is actually erase, use a compare + delete combination.

Implements Sirikata::OH::Storage.

References commitTransaction(), Sirikata::OH::CassandraStorage::StorageAction::Erase, getTransaction(), Sirikata::OH::CassandraStorage::StorageAction::key, and Sirikata::OH::CassandraStorage::StorageAction::type.

void Sirikata::OH::CassandraStorage::executeCommit ( const Bucket bucket,
Transaction trans,
CommitCallback  cb,
const String &  timestamp 
) [private]
void Sirikata::OH::CassandraStorage::executeCount ( const Bucket bucket,
ColumnParent parent,
SlicePredicate predicate,
CountCallback  cb,
const String &  timestamp 
) [private]
String Sirikata::OH::CassandraStorage::getLeaseBucketName ( const Bucket bucket) [private]
CassandraStorage::Transaction * Sirikata::OH::CassandraStorage::getTransaction ( const Bucket bucket,
bool *  is_new = NULL 
) [private]
void Sirikata::OH::CassandraStorage::initDB ( ) [private]

References mDB, mDBHost, and mDBPort.

Referenced by start().

void Sirikata::OH::CassandraStorage::leaseBucket ( const Bucket bucket) [virtual]

Trigger a lease on a bucket.

This is just a signal to the storage layer that it should try to make sure it can commit transactions for the bucket. Depending on the implementation and options, this could mean anything from doing nothing (single process, local operation) to a full lease or lock (distributed storage).

Implements Sirikata::OH::Storage.

void Sirikata::OH::CassandraStorage::processRenewals ( ) [private]
bool Sirikata::OH::CassandraStorage::rangeErase ( const Bucket bucket,
const Key start,
const Key finish,
const CommitCallback cb = 0,
const String &  timestamp = "current" 
) [virtual]
Parameters:
{Key}from the start key of range of keys to erase
{Key}finish the end key of range of keys to erase
{CommitCallback}cb optional commit callback which is invoked if this is a single operation transaction.
{String}timestamp the timestamp of the operation
Returns:
{bool} true if the read is queued, false otherwise. Does not indicated success of actual erase operation.

Implements Sirikata::OH::Storage.

References commitTransaction(), Sirikata::OH::CassandraStorage::StorageAction::EraseRange, getTransaction(), Sirikata::OH::CassandraStorage::StorageAction::key, Sirikata::OH::CassandraStorage::StorageAction::keyEnd, start(), and Sirikata::OH::CassandraStorage::StorageAction::type.

bool Sirikata::OH::CassandraStorage::rangeRead ( const Bucket bucket,
const Key start,
const Key finish,
const CommitCallback cb = 0,
const String &  timestamp = "current" 
) [virtual]
Parameters:
{Key}from the start key of range of keys to read
{Key}finish the end key of range of keys to read
{CommitCallback}cb optional commit callback which is invoked if this is a single operation transaction.
{String}timestamp the timestamp of the operation
Returns:
{bool} true if the read is queued, false otherwise. Does not indicated success of actual read operation.

Implements Sirikata::OH::Storage.

References commitTransaction(), getTransaction(), Sirikata::OH::CassandraStorage::StorageAction::key, Sirikata::OH::CassandraStorage::StorageAction::keyEnd, Sirikata::OH::CassandraStorage::StorageAction::ReadRange, start(), and Sirikata::OH::CassandraStorage::StorageAction::type.

bool Sirikata::OH::CassandraStorage::read ( const Bucket bucket,
const Key key,
const CommitCallback cb = 0,
const String &  timestamp = "current" 
) [virtual]
Parameters:
{Key}key the key to read
{CommitCallback}cb optional commit callback which is invoked if this is a single operation transaction.
{String}timestamp the timestamp of the operation
Returns:
{bool} true if the read is queued, false otherwise. Does not indicated success of actual read operation.

Implements Sirikata::OH::Storage.

References commitTransaction(), getTransaction(), Sirikata::OH::CassandraStorage::StorageAction::key, Sirikata::OH::CassandraStorage::StorageAction::Read, and Sirikata::OH::CassandraStorage::StorageAction::type.

CassandraStorage::LeaseRequestSet Sirikata::OH::CassandraStorage::readLeaseRequests ( const Bucket bucket) [private]

References getLeaseBucketName(), LEASES_CF_NAME, mClientID, and mDB.

Referenced by acquireLease().

void Sirikata::OH::CassandraStorage::releaseBucket ( const Bucket bucket) [virtual]

Release whatever lease or lock was acquired by the corresponding call to leaseBucket.

Implements Sirikata::OH::Storage.

References Sirikata::Logging::detailed, mIOService, Sirikata::Network::IOService::post(), releaseLease(), and SILOG.

void Sirikata::OH::CassandraStorage::releaseLease ( const Bucket bucket) [private]
void Sirikata::OH::CassandraStorage::renewLease ( const Bucket bucket) [private]
void Sirikata::OH::CassandraStorage::start ( ) [virtual]
void Sirikata::OH::CassandraStorage::stop ( ) [virtual]
bool Sirikata::OH::CassandraStorage::write ( const Bucket bucket,
const Key key,
const String &  value,
const CommitCallback cb = 0,
const String &  timestamp = "current" 
) [virtual]

Queues writes to the item named by entryName:itemName.

Writes will not be committed until flush command. Note, if issue this command multiple times with the same entryName:itemName, will only process the last when calling flush.

Parameters:
{Key}key the key to erase
{String}value What should be written into that item
{CommitCallback}cb optional commit callback which is invoked if this is a single operation transaction.
{String}timestamp the timestamp of the operation
Returns:
{bool} true if the write is queued, false otherwise. Does not indicated success of actual write operation.

Implements Sirikata::OH::Storage.

References commitTransaction(), getTransaction(), Sirikata::OH::CassandraStorage::StorageAction::key, Sirikata::OH::CassandraStorage::StorageAction::type, Sirikata::OH::CassandraStorage::StorageAction::value, and Sirikata::OH::CassandraStorage::StorageAction::Write.


Member Data Documentation

Referenced by executeCommit(), and executeCount().

CassandraDBPtr Sirikata::OH::CassandraStorage::mDB [private]

Referenced by initDB().

Referenced by initDB().

Referenced by acquireLease(), and renewLease().

Referenced by acquireLease(), and releaseLease().

Referenced by processRenewals(), start(), and stop().

Referenced by start(), and stop().

Referenced by start(), and stop().


The documentation for this class was generated from the following files: