Sirikata
|
00001 /* Sirikata 00002 * RedisObjectSegmentation.hpp 00003 * 00004 * Copyright (c) 2011, Ewen Cheslack-Postava 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef _SIRIKATA_REDIS_OBJECT_SEGMENTATION_HPP_ 00034 #define _SIRIKATA_REDIS_OBJECT_SEGMENTATION_HPP_ 00035 00036 #include <sirikata/space/ObjectSegmentation.hpp> 00037 #include <hiredis/async.h> 00038 00039 #include <boost/multi_index_container.hpp> 00040 #include <boost/multi_index/member.hpp> 00041 #include <boost/multi_index/ordered_index.hpp> 00042 #include <boost/multi_index/hashed_index.hpp> 00043 00044 namespace Sirikata { 00045 00046 class RedisObjectSegmentation : public ObjectSegmentation { 00047 public: 00048 RedisObjectSegmentation(SpaceContext* con, Network::IOStrand* o_strand, CoordinateSegmentation* cseg, OSegCache* cache, const String& redis_host, uint32 redis_port, const String& redis_prefix, Duration redis_ttl, bool redis_has_transactions); 00049 ~RedisObjectSegmentation(); 00050 00051 virtual void start(); 00052 virtual void stop(); 00053 00054 virtual OSegEntry cacheLookup(const UUID& obj_id); 00055 virtual OSegEntry lookup(const UUID& obj_id); 00056 00057 virtual void addNewObject(const UUID& obj_id, float radius); 00058 virtual void addMigratedObject(const UUID& obj_id, float radius, ServerID idServerAckTo, bool); 00059 virtual void removeObject(const UUID& obj_id); 00060 00061 virtual bool clearToMigrate(const UUID& obj_id); 00062 virtual void migrateObject(const UUID& obj_id, const OSegEntry& new_server_id); 00063 00064 virtual void handleMigrateMessageAck(const Sirikata::Protocol::OSeg::MigrateMessageAcknowledge& msg); 00065 virtual void handleUpdateOSegMessage(const Sirikata::Protocol::OSeg::UpdateOSegMessage& update_oseg_msg); 00066 00067 00068 // Redis event handlers 00069 void disconnected(); 00070 void addRead(); 00071 void delRead(); 00072 void addWrite(); 00073 void delWrite(); 00074 void cleanup(); 00075 00076 // Helper handlers, public since redis needs C functions as callbacks, which 00077 // then invoke these to complete operations. 00078 void finishReadObject(const UUID& obj_id, const String& data_str); 00079 void failReadObject(const UUID& obj_id); 00080 void finishWriteNewObject(const UUID& obj_id, OSegWriteListener::OSegAddNewStatus); 00081 void finishWriteMigratedObject(const UUID& obj_id, ServerID ackTo); 00082 00083 private: 00084 void connect(); 00085 void ensureConnected(); 00086 00087 // If the appropriate flag is set, starts and stops read/write operations 00088 void startRead(); 00089 void startWrite(); 00090 00091 void readHandler(const boost::system::error_code& ec); 00092 void writeHandler(const boost::system::error_code& ec); 00093 00094 // Schedule an object to be refreshed in .5 TTL to keep it's key alive 00095 void scheduleObjectRefresh(const UUID& obj_id); 00096 void startTimeoutHandler(); 00097 void processExpiredObjects(); 00098 void refreshObjectTimeout(const UUID& obj_id); 00099 00100 CoordinateSegmentation* mCSeg; 00101 OSegCache* mCache; 00102 00103 typedef std::tr1::unordered_map<UUID, OSegEntry, UUID::Hasher> OSegMap; 00104 OSegMap mOSeg; 00105 00106 String mRedisHost; 00107 uint16 mRedisPort; 00108 String mRedisPrefix; 00109 Duration mRedisKeyTTL; 00110 bool mRedisHasTransactions; 00111 00112 redisAsyncContext* mRedisContext; 00113 boost::asio::posix::stream_descriptor* mRedisFD; // Wrapped hiredis file descriptor 00114 bool mReading, mWriting; 00115 00116 // Read and write events can happen concurrently and can generate callbacks 00117 // for either type (i.e. when we get a notice that the redis file descriptor 00118 // can read and call redisAsyncHandleRead, that can invoke both read and 00119 // write callbacks) and as far as I can tell hiredis isn't thread safe at 00120 // all. To protect hiredis, we need to add locks in some methods 00121 // (e.g. lookup) around just the redis command even though all the other 00122 // data is safe since these commands are known to come from only one strand 00123 typedef boost::recursive_mutex Mutex; 00124 typedef boost::lock_guard<Mutex> Lock; 00125 Mutex mMutex; 00126 00127 00128 // Track objects that need timeouts refreshed in redis 00129 struct ObjectTimeout { 00130 ObjectTimeout(const UUID& id, Time _expires) 00131 : objid(id), 00132 expires(_expires) 00133 {} 00134 UUID objid; 00135 Time expires; 00136 }; 00137 struct objid_tag {}; 00138 struct expires_tag {}; 00139 typedef boost::multi_index_container< 00140 ObjectTimeout, 00141 boost::multi_index::indexed_by< 00142 boost::multi_index::hashed_unique< boost::multi_index::tag<objid_tag>, BOOST_MULTI_INDEX_MEMBER(ObjectTimeout,UUID,objid), UUID::Hasher >, 00143 boost::multi_index::ordered_non_unique< boost::multi_index::tag<expires_tag>, BOOST_MULTI_INDEX_MEMBER(ObjectTimeout,Time,expires) > 00144 > 00145 > ObjectTimeouts; 00146 typedef ObjectTimeouts::index<objid_tag>::type ObjectTimeoutsByID; 00147 typedef ObjectTimeouts::index<expires_tag>::type ObjectTimeoutsByExpiration; 00148 ObjectTimeouts mTimeouts; 00149 Network::IOTimerPtr mExpiryTimer; 00150 }; 00151 00152 } // namespace Sirikata 00153 00154 #endif //_SIRIKATA_REDIS_OBJECT_SEGMENTATION_HPP_