Sirikata
libspace/plugins/redis/RedisObjectSegmentation.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  RedisObjectSegmentation.hpp
00003  *
00004  *  Copyright (c) 2011, Ewen Cheslack-Postava
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef _SIRIKATA_REDIS_OBJECT_SEGMENTATION_HPP_
00034 #define _SIRIKATA_REDIS_OBJECT_SEGMENTATION_HPP_
00035 
00036 #include <sirikata/space/ObjectSegmentation.hpp>
00037 #include <hiredis/async.h>
00038 
00039 #include <boost/multi_index_container.hpp>
00040 #include <boost/multi_index/member.hpp>
00041 #include <boost/multi_index/ordered_index.hpp>
00042 #include <boost/multi_index/hashed_index.hpp>
00043 
00044 namespace Sirikata {
00045 
00046 class RedisObjectSegmentation : public ObjectSegmentation {
00047 public:
00048     RedisObjectSegmentation(SpaceContext* con, Network::IOStrand* o_strand, CoordinateSegmentation* cseg, OSegCache* cache, const String& redis_host, uint32 redis_port, const String& redis_prefix, Duration redis_ttl, bool redis_has_transactions);
00049     ~RedisObjectSegmentation();
00050 
00051     virtual void start();
00052     virtual void stop();
00053 
00054     virtual OSegEntry cacheLookup(const UUID& obj_id);
00055     virtual OSegEntry lookup(const UUID& obj_id);
00056 
00057     virtual void addNewObject(const UUID& obj_id, float radius);
00058     virtual void addMigratedObject(const UUID& obj_id, float radius, ServerID idServerAckTo, bool);
00059     virtual void removeObject(const UUID& obj_id);
00060 
00061     virtual bool clearToMigrate(const UUID& obj_id);
00062     virtual void migrateObject(const UUID& obj_id, const OSegEntry& new_server_id);
00063 
00064     virtual void handleMigrateMessageAck(const Sirikata::Protocol::OSeg::MigrateMessageAcknowledge& msg);
00065     virtual void handleUpdateOSegMessage(const Sirikata::Protocol::OSeg::UpdateOSegMessage& update_oseg_msg);
00066 
00067 
00068     // Redis event handlers
00069     void disconnected();
00070     void addRead();
00071     void delRead();
00072     void addWrite();
00073     void delWrite();
00074     void cleanup();
00075 
00076     // Helper handlers, public since redis needs C functions as callbacks, which
00077     // then invoke these to complete operations.
00078     void finishReadObject(const UUID& obj_id, const String& data_str);
00079     void failReadObject(const UUID& obj_id);
00080     void finishWriteNewObject(const UUID& obj_id, OSegWriteListener::OSegAddNewStatus);
00081     void finishWriteMigratedObject(const UUID& obj_id, ServerID ackTo);
00082 
00083 private:
00084     void connect();
00085     void ensureConnected();
00086 
00087     // If the appropriate flag is set, starts and stops read/write operations
00088     void startRead();
00089     void startWrite();
00090 
00091     void readHandler(const boost::system::error_code& ec);
00092     void writeHandler(const boost::system::error_code& ec);
00093 
00094     // Schedule an object to be refreshed in .5 TTL to keep it's key alive
00095     void scheduleObjectRefresh(const UUID& obj_id);
00096     void startTimeoutHandler();
00097     void processExpiredObjects();
00098     void refreshObjectTimeout(const UUID& obj_id);
00099 
00100     CoordinateSegmentation* mCSeg;
00101     OSegCache* mCache;
00102 
00103     typedef std::tr1::unordered_map<UUID, OSegEntry, UUID::Hasher> OSegMap;
00104     OSegMap mOSeg;
00105 
00106     String mRedisHost;
00107     uint16 mRedisPort;
00108     String mRedisPrefix;
00109     Duration mRedisKeyTTL;
00110     bool mRedisHasTransactions;
00111 
00112     redisAsyncContext* mRedisContext;
00113     boost::asio::posix::stream_descriptor* mRedisFD; // Wrapped hiredis file descriptor
00114     bool mReading, mWriting;
00115 
00116     // Read and write events can happen concurrently and can generate callbacks
00117     // for either type (i.e. when we get a notice that the redis file descriptor
00118     // can read and call redisAsyncHandleRead, that can invoke both read and
00119     // write callbacks) and as far as I can tell hiredis isn't thread safe at
00120     // all. To protect hiredis, we need to add locks in some methods
00121     // (e.g. lookup) around just the redis command even though all the other
00122     // data is safe since these commands are known to come from only one strand
00123     typedef boost::recursive_mutex Mutex;
00124     typedef boost::lock_guard<Mutex> Lock;
00125     Mutex mMutex;
00126 
00127 
00128     // Track objects that need timeouts refreshed in redis
00129     struct ObjectTimeout {
00130         ObjectTimeout(const UUID& id, Time _expires)
00131          : objid(id),
00132            expires(_expires)
00133         {}
00134         UUID objid;
00135         Time expires;
00136     };
00137     struct objid_tag {};
00138     struct expires_tag {};
00139     typedef boost::multi_index_container<
00140         ObjectTimeout,
00141         boost::multi_index::indexed_by<
00142             boost::multi_index::hashed_unique< boost::multi_index::tag<objid_tag>, BOOST_MULTI_INDEX_MEMBER(ObjectTimeout,UUID,objid), UUID::Hasher >,
00143             boost::multi_index::ordered_non_unique< boost::multi_index::tag<expires_tag>, BOOST_MULTI_INDEX_MEMBER(ObjectTimeout,Time,expires) >
00144             >
00145         > ObjectTimeouts;
00146     typedef ObjectTimeouts::index<objid_tag>::type ObjectTimeoutsByID;
00147     typedef ObjectTimeouts::index<expires_tag>::type ObjectTimeoutsByExpiration;
00148     ObjectTimeouts mTimeouts;
00149     Network::IOTimerPtr mExpiryTimer;
00150 };
00151 
00152 } // namespace Sirikata
00153 
00154 #endif //_SIRIKATA_REDIS_OBJECT_SEGMENTATION_HPP_