Sirikata
|
00001 /* Sirikata 00002 * CraqObjectSegmentation.hpp 00003 * 00004 * Copyright (c) 2010, Behram Mistree 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef _SIRIKATA_DHT_OBJECT_SEGMENTATION_HPP_ 00034 #define _SIRIKATA_DHT_OBJECT_SEGMENTATION_HPP_ 00035 00036 namespace Sirikata { 00037 class CraqObjectSegmentation; 00038 } 00039 00040 #include <sirikata/core/trace/Trace.hpp> 00041 #include <sirikata/space/ServerMessage.hpp> 00042 #include <sirikata/space/ObjectSegmentation.hpp> 00043 #include "craq_oseg/asyncCraq.hpp" 00044 #include "craq_oseg/asyncUtil.hpp" 00045 #include "craq_oseg/asyncConnection.hpp" 00046 #include <sirikata/space/CoordinateSegmentation.hpp> 00047 #include <string.h> 00048 #include <vector> 00049 00050 #include <sirikata/space/OSegLookupTraceToken.hpp> 00051 #include "craq_hybrid/asyncCraqHybrid.hpp" 00052 #include "craq_hybrid/asyncCraqUtil.hpp" 00053 #include <boost/thread/mutex.hpp> 00054 00055 //#define CRAQ_DEBUG 00056 #define CRAQ_CACHE 00057 00058 00059 namespace Sirikata 00060 { 00061 00062 struct TransLookup 00063 { 00064 CraqEntry sID; 00065 00066 int timeAdmitted; 00067 TransLookup():sID(CraqEntry::null()){} 00068 }; 00069 00070 00071 static const int CRAQ_NOT_FOUND_SIT_OUT = 500; //that's ms 00072 00073 class CraqObjectSegmentation : public ObjectSegmentation 00074 { 00075 private: 00076 typedef std::tr1::unordered_map<UUID, CraqEntry, UUID::Hasher> ObjectSet; 00077 00078 CoordinateSegmentation* mCSeg; //will be used in lookup call 00079 00080 double checkOwnTimeDur; 00081 int checkOwnTimeCount; 00082 00083 00084 //debugging: 00085 00086 char myUniquePrefixKey; //should just be one character long. 00087 00088 //for logging 00089 int numCacheHits; 00090 int numOnThisServer; 00091 int numLookups; 00092 int numCraqLookups; 00093 int numTimeElapsedCacheEviction; 00094 int numMigrationNotCompleteYet; 00095 int numAlreadyLookingUp; 00096 int numServices; 00097 int numLookingUpDebug; 00098 Duration lastTimerDur; 00099 //end for logging. 00100 00101 std::map<std::string, UUID > mapDataKeyToUUID; 00102 typedef std::tr1::unordered_map<UUID,TransLookup,UUID::Hasher> InTransitMap; 00103 InTransitMap mInTransitOrLookup;//These are the objects that are in transit from this server to another. When we receive an acknowledge message from the oseg that these objects are being sent to, then we remove that object's id from being in transit, then we 00104 boost::mutex inTransOrLookup_m; 00105 00106 00107 struct TrackedSetResultsData 00108 { 00109 Sirikata::Protocol::OSeg::MigrateMessageAcknowledge* migAckMsg; 00110 Duration dur; 00111 }; 00112 00113 typedef std::tr1::unordered_map<int, TrackedSetResultsData> TrackedMessageMap; 00114 TrackedMessageMap trackingMessages; 00115 00116 ObjectSet mReceivingObjects; //this is a vector of objects that have been pushed to this server, but whose migration isn't complete yet, becase we don't have an ack from CRAQ that they've been stored yet. 00117 boost::mutex receivingObjects_m; 00118 00119 00120 //what to do when craq can't find the object 00121 void notFoundFunction(CraqOperationResult* nf); //this function tells us what to do with all the ids that just weren't found in craq. 00122 00123 struct NotFoundData 00124 { 00125 Duration dur; 00126 UUID obj_id; 00127 OSegLookupTraceToken* traceToken; 00128 }; 00129 typedef std::queue<NotFoundData*> NfDataQ; 00130 NfDataQ mNfData; 00131 void checkNotFoundData(); 00132 //end what to do when craq can't find the object 00133 00134 00135 //for lookups and sets respectively 00136 AsyncCraqHybrid craqDhtGet1; 00137 AsyncCraqHybrid craqDhtGet2; 00138 AsyncCraqHybrid craqDhtSet; 00139 00140 00141 AtomicValue<int> mAtomicTrackID; 00142 // int mAtomicTrackID; 00143 // boost::mutex atomic_track_id_m; 00144 int getUniqueTrackID(); 00145 00146 00147 Network::IOStrand* postingStrand; 00148 Network::IOStrand* mStrand; 00149 00150 void convert_obj_id_to_dht_key(const UUID& obj_id, CraqDataKey& returner) const; 00151 00152 ObjectSet mObjects; //a list of the objects that are currently being hosted on the space server associated with this oseg. 00153 bool checkOwn(const UUID& obj_id, float*radius); 00154 bool checkMigratingFromNotCompleteYet(const UUID& obj_id,float*radius); 00155 00156 void removeFromInTransOrLookup(const UUID& obj_id); 00157 void removeFromReceivingObjects(const UUID& obj_id); 00158 00159 //for message addition. when add an object, send a message to the server that you can now finish adding it to forwarder, loc services, etc. 00160 struct TrackedSetResultsDataAdded 00161 { 00162 Sirikata::Protocol::OSeg::AddedObjectMessage* msgAdded; 00163 Duration dur; 00164 }; 00165 typedef std::tr1::unordered_map<int, TrackedSetResultsDataAdded> TrackedMessageMapAdded; 00166 TrackedMessageMapAdded trackedAddMessages; // so that can't query for object until it's registered. 00167 Sirikata::Protocol::OSeg::AddedObjectMessage* generateAddedMessage(const UUID& obj_id, float radius); 00168 //end message addition. 00169 00170 00171 00172 //building for the cache 00173 CraqEntry satisfiesCache(const UUID& obj_id); 00174 OSegCache* mCraqCache; 00175 //end building for the cache 00176 00177 void beginCraqLookup(const UUID& obj_id, OSegLookupTraceToken* traceToken); 00178 void callOsegLookupCompleted(const UUID& obj_id, const CraqEntry& sID, OSegLookupTraceToken* traceToken); 00179 00180 bool shouldLog(); 00181 00182 SpaceContext* ctx; 00183 00184 public: 00185 CraqObjectSegmentation (SpaceContext* con, Network::IOStrand* o_strand, CoordinateSegmentation* cseg, OSegCache* cache, char unique); 00186 00187 00188 virtual ~CraqObjectSegmentation(); 00189 virtual OSegEntry lookup(const UUID& obj_id); 00190 virtual OSegEntry cacheLookup(const UUID& obj_id); 00191 virtual void migrateObject(const UUID& obj_id, const OSegEntry& new_server_id); 00192 virtual void addNewObject(const UUID& obj_id, float radius); 00193 virtual void addMigratedObject(const UUID& obj_id, float radius, ServerID idServerAckTo, bool); 00194 virtual void removeObject(const UUID& obj_id); 00195 virtual bool clearToMigrate(const UUID& obj_id); 00196 virtual void craqGetResult(CraqOperationResult* cor); 00197 virtual void craqSetResult(CraqOperationResult* cor); 00198 virtual void stop(); 00199 00200 virtual int getPushback(); 00201 00202 AtomicValue<int> mOSegQueueLen;//(0); 00203 00204 Sirikata::Protocol::OSeg::MigrateMessageAcknowledge* generateAcknowledgeMessage(const UUID &obj_id, float radius, ServerID serverToAckTo); 00205 00206 virtual void handleMigrateMessageAck(const Sirikata::Protocol::OSeg::MigrateMessageAcknowledge& msg); 00207 virtual void handleUpdateOSegMessage(const Sirikata::Protocol::OSeg::UpdateOSegMessage& update_oseg_msg); 00208 00209 void processMigrateMessageAcknowledge(const Sirikata::Protocol::OSeg::MigrateMessageAcknowledge& msg); 00210 void processUpdateOSegMessage(const Sirikata::Protocol::OSeg::UpdateOSegMessage& update_oseg_msg); 00211 00212 }; 00213 } 00214 #endif