Sirikata
libspace/plugins/craq/CraqObjectSegmentation.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  CraqObjectSegmentation.hpp
00003  *
00004  *  Copyright (c) 2010, Behram Mistree
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef _SIRIKATA_DHT_OBJECT_SEGMENTATION_HPP_
00034 #define _SIRIKATA_DHT_OBJECT_SEGMENTATION_HPP_
00035 
00036 namespace Sirikata {
00037 class CraqObjectSegmentation;
00038 }
00039 
00040 #include <sirikata/core/trace/Trace.hpp>
00041 #include <sirikata/space/ServerMessage.hpp>
00042 #include <sirikata/space/ObjectSegmentation.hpp>
00043 #include "craq_oseg/asyncCraq.hpp"
00044 #include "craq_oseg/asyncUtil.hpp"
00045 #include "craq_oseg/asyncConnection.hpp"
00046 #include <sirikata/space/CoordinateSegmentation.hpp>
00047 #include <string.h>
00048 #include <vector>
00049 
00050 #include <sirikata/space/OSegLookupTraceToken.hpp>
00051 #include "craq_hybrid/asyncCraqHybrid.hpp"
00052 #include "craq_hybrid/asyncCraqUtil.hpp"
00053 #include <boost/thread/mutex.hpp>
00054 
00055 //#define CRAQ_DEBUG
00056 #define CRAQ_CACHE
00057 
00058 
00059 namespace Sirikata
00060 {
00061 
00062   struct TransLookup
00063   {
00064     CraqEntry sID;
00065 
00066     int timeAdmitted;
00067     TransLookup():sID(CraqEntry::null()){}
00068   };
00069 
00070 
00071   static const int CRAQ_NOT_FOUND_SIT_OUT   =  500; //that's ms
00072 
00073 class CraqObjectSegmentation : public ObjectSegmentation
00074   {
00075   private:
00076       typedef std::tr1::unordered_map<UUID, CraqEntry, UUID::Hasher> ObjectSet;
00077 
00078     CoordinateSegmentation* mCSeg; //will be used in lookup call
00079 
00080     double checkOwnTimeDur;
00081     int checkOwnTimeCount;
00082 
00083 
00084     //debugging:
00085 
00086     char myUniquePrefixKey; //should just be one character long.
00087 
00088     //for logging
00089     int numCacheHits;
00090     int numOnThisServer;
00091     int numLookups;
00092     int numCraqLookups;
00093     int numTimeElapsedCacheEviction;
00094     int numMigrationNotCompleteYet;
00095     int numAlreadyLookingUp;
00096     int numServices;
00097     int numLookingUpDebug;
00098     Duration lastTimerDur;
00099     //end for logging.
00100 
00101     std::map<std::string, UUID > mapDataKeyToUUID;
00102       typedef std::tr1::unordered_map<UUID,TransLookup,UUID::Hasher> InTransitMap;
00103       InTransitMap mInTransitOrLookup;//These are the objects that are in transit from this server to another.  When we receive an acknowledge message from the oseg that these objects are being sent to, then we remove that object's id from being in transit, then we
00104     boost::mutex inTransOrLookup_m;
00105 
00106 
00107     struct TrackedSetResultsData
00108     {
00109       Sirikata::Protocol::OSeg::MigrateMessageAcknowledge* migAckMsg;
00110       Duration dur;
00111     };
00112 
00113       typedef std::tr1::unordered_map<int, TrackedSetResultsData> TrackedMessageMap;
00114     TrackedMessageMap trackingMessages;
00115 
00116       ObjectSet mReceivingObjects; //this is a vector of objects that have been pushed to this server, but whose migration isn't complete yet, becase we don't have an ack from CRAQ that they've been stored yet.
00117     boost::mutex receivingObjects_m;
00118 
00119 
00120     //what to do when craq can't find the object
00121     void notFoundFunction(CraqOperationResult* nf); //this function tells us what to do with all the ids that just weren't found in craq.
00122 
00123     struct NotFoundData
00124     {
00125       Duration dur;
00126       UUID obj_id;
00127       OSegLookupTraceToken* traceToken;
00128     };
00129     typedef std::queue<NotFoundData*> NfDataQ;
00130     NfDataQ mNfData;
00131     void checkNotFoundData();
00132     //end what to do when craq can't find the object
00133 
00134 
00135     //for lookups and sets respectively
00136       AsyncCraqHybrid craqDhtGet1;
00137       AsyncCraqHybrid craqDhtGet2;
00138       AsyncCraqHybrid craqDhtSet;
00139 
00140 
00141       AtomicValue<int> mAtomicTrackID;
00142     // int mAtomicTrackID;
00143     // boost::mutex atomic_track_id_m;
00144     int getUniqueTrackID();
00145 
00146 
00147     Network::IOStrand* postingStrand;
00148     Network::IOStrand* mStrand;
00149 
00150     void convert_obj_id_to_dht_key(const UUID& obj_id, CraqDataKey& returner) const;
00151 
00152       ObjectSet mObjects; //a list of the objects that are currently being hosted on the space server associated with this oseg.
00153     bool checkOwn(const UUID& obj_id, float*radius);
00154     bool checkMigratingFromNotCompleteYet(const UUID& obj_id,float*radius);
00155 
00156     void removeFromInTransOrLookup(const UUID& obj_id);
00157     void removeFromReceivingObjects(const UUID& obj_id);
00158 
00159     //for message addition. when add an object, send a message to the server that you can now finish adding it to forwarder, loc services, etc.
00160     struct TrackedSetResultsDataAdded
00161     {
00162       Sirikata::Protocol::OSeg::AddedObjectMessage* msgAdded;
00163       Duration dur;
00164     };
00165       typedef std::tr1::unordered_map<int, TrackedSetResultsDataAdded> TrackedMessageMapAdded;
00166     TrackedMessageMapAdded trackedAddMessages; // so that can't query for object until it's registered.
00167     Sirikata::Protocol::OSeg::AddedObjectMessage* generateAddedMessage(const UUID& obj_id, float radius);
00168     //end message addition.
00169 
00170 
00171 
00172     //building for the cache
00173     CraqEntry satisfiesCache(const UUID& obj_id);
00174     OSegCache* mCraqCache;
00175     //end building for the cache
00176 
00177     void beginCraqLookup(const UUID& obj_id, OSegLookupTraceToken* traceToken);
00178     void callOsegLookupCompleted(const UUID& obj_id, const CraqEntry& sID, OSegLookupTraceToken* traceToken);
00179 
00180       bool shouldLog();
00181 
00182     SpaceContext* ctx;
00183 
00184   public:
00185       CraqObjectSegmentation (SpaceContext* con, Network::IOStrand* o_strand, CoordinateSegmentation* cseg, OSegCache* cache, char unique);
00186 
00187 
00188       virtual ~CraqObjectSegmentation();
00189       virtual OSegEntry lookup(const UUID& obj_id);
00190       virtual OSegEntry cacheLookup(const UUID& obj_id);
00191       virtual void migrateObject(const UUID& obj_id, const OSegEntry& new_server_id);
00192       virtual void addNewObject(const UUID& obj_id, float radius);
00193       virtual void addMigratedObject(const UUID& obj_id, float radius, ServerID idServerAckTo, bool);
00194       virtual void removeObject(const UUID& obj_id);
00195       virtual bool clearToMigrate(const UUID& obj_id);
00196       virtual void craqGetResult(CraqOperationResult* cor);
00197       virtual void craqSetResult(CraqOperationResult* cor);
00198       virtual void stop();
00199 
00200       virtual int getPushback();
00201 
00202       AtomicValue<int> mOSegQueueLen;//(0);
00203 
00204       Sirikata::Protocol::OSeg::MigrateMessageAcknowledge* generateAcknowledgeMessage(const UUID &obj_id, float radius, ServerID serverToAckTo);
00205 
00206       virtual void handleMigrateMessageAck(const Sirikata::Protocol::OSeg::MigrateMessageAcknowledge& msg);
00207       virtual void handleUpdateOSegMessage(const Sirikata::Protocol::OSeg::UpdateOSegMessage& update_oseg_msg);
00208 
00209       void processMigrateMessageAcknowledge(const Sirikata::Protocol::OSeg::MigrateMessageAcknowledge& msg);
00210       void processUpdateOSegMessage(const Sirikata::Protocol::OSeg::UpdateOSegMessage& update_oseg_msg);
00211 
00212   };
00213 }
00214 #endif