Sirikata
|
#include <SQLiteStorage.hpp>
Classes | |
struct | BucketRenewTimeout |
struct | StorageAction |
struct | TransactionData |
Public Member Functions | |
SQLiteStorage (ObjectHostContext *ctx, const String &dbpath, const Duration &lease_duration) | |
~SQLiteStorage () | |
virtual void | start () |
Service Interface. | |
virtual void | stop () |
virtual void | leaseBucket (const Bucket &bucket) |
Trigger a lease on a bucket. | |
virtual void | releaseBucket (const Bucket &bucket) |
Release whatever lease or lock was acquired by the corresponding call to leaseBucket. | |
virtual void | beginTransaction (const Bucket &bucket) |
Begin a transaction. | |
virtual void | commitTransaction (const Bucket &bucket, const CommitCallback &cb=0, const String ×tamp="current") |
Completes a transaction and requests that it be written to Flushes all outstanding events (writes and removes) from pending queue. | |
virtual bool | erase (const Bucket &bucket, const Key &key, const CommitCallback &cb=0, const String ×tamp="current") |
virtual bool | write (const Bucket &bucket, const Key &key, const String &value, const CommitCallback &cb=0, const String ×tamp="current") |
Queues writes to the item named by entryName:itemName. | |
virtual bool | read (const Bucket &bucket, const Key &key, const CommitCallback &cb=0, const String ×tamp="current") |
virtual bool | compare (const Bucket &bucket, const Key &key, const String &value, const CommitCallback &cb=0, const String ×tamp="current") |
Add a comparison operation to the transaction. | |
virtual bool | rangeRead (const Bucket &bucket, const Key &start, const Key &finish, const CommitCallback &cb=0, const String ×tamp="current") |
virtual bool | rangeErase (const Bucket &bucket, const Key &start, const Key &finish, const CommitCallback &cb=0, const String ×tamp="current") |
virtual bool | count (const Bucket &bucket, const Key &start, const Key &finish, const CountCallback &cb=0, const String ×tamp="current") |
Private Types | |
typedef std::vector < StorageAction > | Transaction |
typedef std::tr1::unordered_map < Bucket, Transaction *, Bucket::Hasher > | BucketTransactions |
typedef ThreadSafeQueueWithNotification < TransactionData > | TransactionQueue |
Private Member Functions | |
void | initDB () |
Transaction * | getTransaction (const Bucket &bucket, bool *is_new=NULL) |
void | postProcessTransactions () |
void | processTransactions () |
Result | executeCommit (const Bucket &bucket, Transaction *trans, CommitCallback cb, ReadSet **read_set_out) |
void | executeCount (const String value_count, const Key &start, const Key &finish, CountCallback cb) |
bool | sqlBeginTransaction () |
bool | sqlCommit () |
bool | sqlRollback () |
String | getLeaseString () |
void | parseLeaseString (const String &ls, String *client_out, Time *expiration_out) |
Result | acquireLease (const Bucket &bucket) |
void | renewLease (const Bucket &bucket) |
void | releaseLease (const Bucket &bucket) |
void | processRenewals () |
Static Private Member Functions | |
static bool | checkSQLiteError (SQLiteDBPtr db, int rc, const String &msg) |
Private Attributes | |
ObjectHostContext * | mContext |
BucketTransactions | mTransactions |
String | mDBFilename |
SQLiteDBPtr | mDB |
Network::IOService * | mIOService |
Network::IOWork * | mWork |
Thread * | mThread |
const String | mSQLClientID |
const Duration | mLeaseDuration |
TransactionQueue | mTransactionQueue |
uint32 | mMaxCoalescedTransactions |
const Duration | mRetrySleepDuration |
const int32 | mNormalOpRetries |
const int32 | mLeaseOpRetries |
std::queue< BucketRenewTimeout > | mRenewTimes |
Network::IOTimerPtr | mRenewTimer |
typedef std::tr1::unordered_map<Bucket, Transaction*, Bucket::Hasher> Sirikata::OH::SQLiteStorage::BucketTransactions [private] |
typedef std::vector<StorageAction> Sirikata::OH::SQLiteStorage::Transaction [private] |
typedef ThreadSafeQueueWithNotification<TransactionData> Sirikata::OH::SQLiteStorage::TransactionQueue [private] |
Sirikata::OH::SQLiteStorage::SQLiteStorage | ( | ObjectHostContext * | ctx, |
const String & | dbpath, | ||
const Duration & | lease_duration | ||
) |
Sirikata::OH::SQLiteStorage::~SQLiteStorage | ( | ) |
Storage::Result Sirikata::OH::SQLiteStorage::acquireLease | ( | const Bucket & | bucket | ) | [private] |
References Sirikata::OH::SQLiteStorage::StorageAction::executeWithRetry(), getLeaseString(), Sirikata::OH::SQLiteStorage::StorageAction::key, LEASE_KEY, Sirikata::OH::Storage::LOCK_ERROR, mContext, mDB, mLeaseDuration, mLeaseOpRetries, mRenewTimer, mRenewTimes, mRetrySleepDuration, mSQLClientID, Sirikata::Timer::now(), parseLeaseString(), Sirikata::OH::SQLiteStorage::StorageAction::Read, Sirikata::Context::realTime(), Sirikata::OH::Storage::SUCCESS, Sirikata::OH::Storage::TRANSACTION_ERROR, Sirikata::OH::SQLiteStorage::StorageAction::type, Sirikata::OH::SQLiteStorage::StorageAction::value, and Sirikata::OH::SQLiteStorage::StorageAction::Write.
Referenced by executeCommit().
void Sirikata::OH::SQLiteStorage::beginTransaction | ( | const Bucket & | bucket | ) | [virtual] |
bool Sirikata::OH::SQLiteStorage::checkSQLiteError | ( | SQLiteDBPtr | db, |
int | rc, | ||
const String & | msg | ||
) | [static, private] |
References Sirikata::SQLite::check_sql_error(), Sirikata::Logging::error, and SILOG.
Referenced by Sirikata::OH::SQLiteStorage::StorageAction::execute(), executeCount(), initDB(), sqlBeginTransaction(), sqlCommit(), and sqlRollback().
void Sirikata::OH::SQLiteStorage::commitTransaction | ( | const Bucket & | bucket, |
const CommitCallback & | cb = 0 , |
||
const String & | timestamp = "current" |
||
) | [virtual] |
Completes a transaction and requests that it be written to Flushes all outstanding events (writes and removes) from pending queue.
Resets pending queue as well.
Implements Sirikata::OH::Storage.
References getTransaction(), mTransactionQueue, mTransactions, Sirikata::ThreadSafeQueueWithNotification< T >::push(), and Sirikata::OH::Storage::SUCCESS.
Referenced by compare(), erase(), rangeErase(), rangeRead(), read(), and write().
bool Sirikata::OH::SQLiteStorage::compare | ( | const Bucket & | bucket, |
const Key & | key, | ||
const String & | value, | ||
const CommitCallback & | cb = 0 , |
||
const String & | timestamp = "current" |
||
) | [virtual] |
Add a comparison operation to the transaction.
The given key is read and compared to the value. If the comparison is false, it causes the entire transaction to fail.
{Key} | key the key to compare to |
{String} | value What should be written into that item |
{CommitCallback} | cb optional commit callback which is invoked if this is a single operation transaction. |
{String} | timestamp the timestamp of the operation |
Implements Sirikata::OH::Storage.
References commitTransaction(), Sirikata::OH::SQLiteStorage::StorageAction::Compare, getTransaction(), Sirikata::OH::SQLiteStorage::StorageAction::key, Sirikata::OH::SQLiteStorage::StorageAction::type, and Sirikata::OH::SQLiteStorage::StorageAction::value.
bool Sirikata::OH::SQLiteStorage::count | ( | const Bucket & | bucket, |
const Key & | start, | ||
const Key & | finish, | ||
const CountCallback & | cb = 0 , |
||
const String & | timestamp = "current" |
||
) | [virtual] |
{Key} | from the start key of range of keys to count |
{Key} | finish the end key of range of keys to count |
{CommitCallback} | cb optional commit callback which is invoked if this is a single operation transaction. |
{String} | timestamp the timestamp of the operation |
Implements Sirikata::OH::Storage.
References executeCount(), mIOService, Sirikata::Network::IOService::post(), Sirikata::UUID::rawHexData(), and TABLE_NAME.
Referenced by executeCount().
bool Sirikata::OH::SQLiteStorage::erase | ( | const Bucket & | bucket, |
const Key & | key, | ||
const CommitCallback & | cb = 0 , |
||
const String & | timestamp = "current" |
||
) | [virtual] |
{Key} | key the key to erase |
{CommitCallback} | cb optional commit callback which is invoked if this is a single operation transaction. |
{String} | timestamp the timestamp of the operation |
Queues the item to be removed from the backend. Does not actually delete until the flush operation is called.
Erasing a non-existant key does not cause an error -- it's just treated as a noop. If you care that a key is actually erase, use a compare + delete combination.
Implements Sirikata::OH::Storage.
References commitTransaction(), Sirikata::OH::SQLiteStorage::StorageAction::Erase, getTransaction(), Sirikata::OH::SQLiteStorage::StorageAction::key, and Sirikata::OH::SQLiteStorage::StorageAction::type.
Storage::Result Sirikata::OH::SQLiteStorage::executeCommit | ( | const Bucket & | bucket, |
Transaction * | trans, | ||
CommitCallback | cb, | ||
ReadSet ** | read_set_out | ||
) | [private] |
References acquireLease(), mDB, mNormalOpRetries, mRetrySleepDuration, and Sirikata::OH::Storage::SUCCESS.
Referenced by processTransactions().
void Sirikata::OH::SQLiteStorage::executeCount | ( | const String | value_count, |
const Key & | start, | ||
const Key & | finish, | ||
CountCallback | cb | ||
) | [private] |
References checkSQLiteError(), count(), Sirikata::Context::mainStrand, mContext, mDB, Sirikata::Network::IOStrand::post(), Sirikata::OH::Storage::SUCCESS, and Sirikata::OH::Storage::TRANSACTION_ERROR.
Referenced by count().
String Sirikata::OH::SQLiteStorage::getLeaseString | ( | ) | [private] |
References mContext, mLeaseDuration, mSQLClientID, Sirikata::Time::raw(), and Sirikata::Context::realTime().
Referenced by acquireLease(), and renewLease().
SQLiteStorage::Transaction * Sirikata::OH::SQLiteStorage::getTransaction | ( | const Bucket & | bucket, |
bool * | is_new = NULL |
||
) | [private] |
References mTransactions.
Referenced by beginTransaction(), commitTransaction(), compare(), erase(), rangeErase(), rangeRead(), read(), and write().
void Sirikata::OH::SQLiteStorage::initDB | ( | ) | [private] |
References checkSQLiteError(), Sirikata::SQLite::getSingleton(), mDB, mDBFilename, Sirikata::SQLite::open(), and TABLE_NAME.
Referenced by start().
void Sirikata::OH::SQLiteStorage::leaseBucket | ( | const Bucket & | bucket | ) | [virtual] |
Trigger a lease on a bucket.
This is just a signal to the storage layer that it should try to make sure it can commit transactions for the bucket. Depending on the implementation and options, this could mean anything from doing nothing (single process, local operation) to a full lease or lock (distributed storage).
Implements Sirikata::OH::Storage.
void Sirikata::OH::SQLiteStorage::parseLeaseString | ( | const String & | ls, |
String * | client_out, | ||
Time * | expiration_out | ||
) | [private] |
References Sirikata::Time::null().
Referenced by acquireLease(), releaseLease(), and renewLease().
void Sirikata::OH::SQLiteStorage::postProcessTransactions | ( | ) | [private] |
References mIOService, Sirikata::Network::IOService::post(), and processTransactions().
void Sirikata::OH::SQLiteStorage::processRenewals | ( | ) | [private] |
References mRenewTimer, mRenewTimes, Sirikata::Timer::now(), and renewLease().
Referenced by start().
void Sirikata::OH::SQLiteStorage::processTransactions | ( | ) | [private] |
References Sirikata::OH::SQLiteStorage::TransactionData::bucket, Sirikata::OH::SQLiteStorage::TransactionData::cb, Sirikata::ThreadSafeQueueWithNotification< T >::empty(), executeCommit(), Sirikata::OH::Storage::LOCK_ERROR, Sirikata::Context::mainStrand, mContext, mMaxCoalescedTransactions, mTransactionQueue, Sirikata::ThreadSafeQueueWithNotification< T >::pop(), Sirikata::Network::IOStrand::post(), sqlBeginTransaction(), sqlCommit(), sqlRollback(), Sirikata::OH::Storage::SUCCESS, and Sirikata::OH::SQLiteStorage::TransactionData::trans.
Referenced by postProcessTransactions().
bool Sirikata::OH::SQLiteStorage::rangeErase | ( | const Bucket & | bucket, |
const Key & | start, | ||
const Key & | finish, | ||
const CommitCallback & | cb = 0 , |
||
const String & | timestamp = "current" |
||
) | [virtual] |
{Key} | from the start key of range of keys to erase |
{Key} | finish the end key of range of keys to erase |
{CommitCallback} | cb optional commit callback which is invoked if this is a single operation transaction. |
{String} | timestamp the timestamp of the operation |
Implements Sirikata::OH::Storage.
References commitTransaction(), Sirikata::OH::SQLiteStorage::StorageAction::EraseRange, getTransaction(), Sirikata::OH::SQLiteStorage::StorageAction::key, Sirikata::OH::SQLiteStorage::StorageAction::keyEnd, start(), and Sirikata::OH::SQLiteStorage::StorageAction::type.
bool Sirikata::OH::SQLiteStorage::rangeRead | ( | const Bucket & | bucket, |
const Key & | start, | ||
const Key & | finish, | ||
const CommitCallback & | cb = 0 , |
||
const String & | timestamp = "current" |
||
) | [virtual] |
{Key} | from the start key of range of keys to read |
{Key} | finish the end key of range of keys to read |
{CommitCallback} | cb optional commit callback which is invoked if this is a single operation transaction. |
{String} | timestamp the timestamp of the operation |
Implements Sirikata::OH::Storage.
References commitTransaction(), getTransaction(), Sirikata::OH::SQLiteStorage::StorageAction::key, Sirikata::OH::SQLiteStorage::StorageAction::keyEnd, Sirikata::OH::SQLiteStorage::StorageAction::ReadRange, start(), and Sirikata::OH::SQLiteStorage::StorageAction::type.
bool Sirikata::OH::SQLiteStorage::read | ( | const Bucket & | bucket, |
const Key & | key, | ||
const CommitCallback & | cb = 0 , |
||
const String & | timestamp = "current" |
||
) | [virtual] |
{Key} | key the key to read |
{CommitCallback} | cb optional commit callback which is invoked if this is a single operation transaction. |
{String} | timestamp the timestamp of the operation |
Implements Sirikata::OH::Storage.
References commitTransaction(), getTransaction(), Sirikata::OH::SQLiteStorage::StorageAction::key, Sirikata::OH::SQLiteStorage::StorageAction::Read, and Sirikata::OH::SQLiteStorage::StorageAction::type.
void Sirikata::OH::SQLiteStorage::releaseBucket | ( | const Bucket & | bucket | ) | [virtual] |
Release whatever lease or lock was acquired by the corresponding call to leaseBucket.
Implements Sirikata::OH::Storage.
References mIOService, Sirikata::Network::IOService::post(), and releaseLease().
void Sirikata::OH::SQLiteStorage::releaseLease | ( | const Bucket & | bucket | ) | [private] |
References Sirikata::OH::SQLiteStorage::StorageAction::Erase, Sirikata::Logging::error, Sirikata::OH::SQLiteStorage::StorageAction::executeWithRetry(), Sirikata::OH::SQLiteStorage::StorageAction::key, LEASE_KEY, Sirikata::OH::Storage::LOCK_ERROR, mDB, mLeaseOpRetries, mRetrySleepDuration, mSQLClientID, parseLeaseString(), Sirikata::OH::SQLiteStorage::StorageAction::Read, SILOG, sqlBeginTransaction(), sqlCommit(), sqlRollback(), Sirikata::OH::Storage::SUCCESS, and Sirikata::OH::SQLiteStorage::StorageAction::type.
Referenced by releaseBucket().
void Sirikata::OH::SQLiteStorage::renewLease | ( | const Bucket & | bucket | ) | [private] |
References Sirikata::OH::SQLiteStorage::StorageAction::executeWithRetry(), getLeaseString(), Sirikata::OH::SQLiteStorage::StorageAction::key, LEASE_KEY, Sirikata::OH::Storage::LOCK_ERROR, mDB, mLeaseDuration, mLeaseOpRetries, mRenewTimes, mRetrySleepDuration, mSQLClientID, Sirikata::Timer::now(), parseLeaseString(), Sirikata::OH::SQLiteStorage::StorageAction::Read, sqlBeginTransaction(), sqlCommit(), sqlRollback(), Sirikata::OH::Storage::SUCCESS, Sirikata::OH::SQLiteStorage::StorageAction::type, Sirikata::OH::SQLiteStorage::StorageAction::value, and Sirikata::OH::SQLiteStorage::StorageAction::Write.
Referenced by processRenewals().
bool Sirikata::OH::SQLiteStorage::sqlBeginTransaction | ( | ) | [private] |
References checkSQLiteError(), and mDB.
Referenced by processTransactions(), releaseLease(), and renewLease().
bool Sirikata::OH::SQLiteStorage::sqlCommit | ( | ) | [private] |
References checkSQLiteError(), and mDB.
Referenced by processTransactions(), releaseLease(), and renewLease().
bool Sirikata::OH::SQLiteStorage::sqlRollback | ( | ) | [private] |
References checkSQLiteError(), and mDB.
Referenced by processTransactions(), releaseLease(), and renewLease().
void Sirikata::OH::SQLiteStorage::start | ( | ) | [virtual] |
Service Interface.
Reimplemented from Sirikata::OH::Storage.
References Sirikata::Network::IOTimer::create(), initDB(), mIOService, mRenewTimer, mThread, mWork, Sirikata::Network::IOService::post(), processRenewals(), and Sirikata::Network::IOService::runNoReturn().
Referenced by rangeErase(), and rangeRead().
void Sirikata::OH::SQLiteStorage::stop | ( | ) | [virtual] |
Reimplemented from Sirikata::OH::Storage.
References Sirikata::Thread::join(), mIOService, mRenewTimer, mThread, mTransactions, and mWork.
bool Sirikata::OH::SQLiteStorage::write | ( | const Bucket & | bucket, |
const Key & | key, | ||
const String & | value, | ||
const CommitCallback & | cb = 0 , |
||
const String & | timestamp = "current" |
||
) | [virtual] |
Queues writes to the item named by entryName:itemName.
Writes will not be committed until flush command. Note, if issue this command multiple times with the same entryName:itemName, will only process the last when calling flush.
{Key} | key the key to erase |
{String} | value What should be written into that item |
{CommitCallback} | cb optional commit callback which is invoked if this is a single operation transaction. |
{String} | timestamp the timestamp of the operation |
Implements Sirikata::OH::Storage.
References commitTransaction(), getTransaction(), Sirikata::OH::SQLiteStorage::StorageAction::key, Sirikata::OH::SQLiteStorage::StorageAction::type, Sirikata::OH::SQLiteStorage::StorageAction::value, and Sirikata::OH::SQLiteStorage::StorageAction::Write.
Referenced by acquireLease(), executeCount(), getLeaseString(), and processTransactions().
SQLiteDBPtr Sirikata::OH::SQLiteStorage::mDB [private] |
Referenced by acquireLease(), executeCommit(), executeCount(), initDB(), releaseLease(), renewLease(), sqlBeginTransaction(), sqlCommit(), and sqlRollback().
String Sirikata::OH::SQLiteStorage::mDBFilename [private] |
Referenced by initDB().
Referenced by count(), postProcessTransactions(), releaseBucket(), start(), and stop().
const Duration Sirikata::OH::SQLiteStorage::mLeaseDuration [private] |
Referenced by acquireLease(), getLeaseString(), and renewLease().
const int32 Sirikata::OH::SQLiteStorage::mLeaseOpRetries [private] |
Referenced by acquireLease(), releaseLease(), and renewLease().
uint32 Sirikata::OH::SQLiteStorage::mMaxCoalescedTransactions [private] |
Referenced by processTransactions().
const int32 Sirikata::OH::SQLiteStorage::mNormalOpRetries [private] |
Referenced by executeCommit().
Referenced by acquireLease(), processRenewals(), start(), and stop().
std::queue<BucketRenewTimeout> Sirikata::OH::SQLiteStorage::mRenewTimes [private] |
Referenced by acquireLease(), processRenewals(), and renewLease().
const Duration Sirikata::OH::SQLiteStorage::mRetrySleepDuration [private] |
Referenced by acquireLease(), executeCommit(), releaseLease(), and renewLease().
const String Sirikata::OH::SQLiteStorage::mSQLClientID [private] |
Referenced by acquireLease(), getLeaseString(), releaseLease(), and renewLease().
Thread* Sirikata::OH::SQLiteStorage::mThread [private] |
Referenced by commitTransaction(), and processTransactions().
Referenced by commitTransaction(), getTransaction(), and stop().