Sirikata
libspace/plugins/craq/craq_hybrid/craq_gets/asyncConnectionGet.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  asyncConnectionGet.hpp
00003  *
00004  *  Copyright (c) 2010, Behram Mistree
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef __CRAQ_ASYNC_CONNECTION_GET_HPP__
00034 #define __CRAQ_ASYNC_CONNECTION_GET_HPP__
00035 
00036 #include <sirikata/core/util/Platform.hpp>
00037 #include <boost/asio.hpp>
00038 #include "../asyncCraqUtil.hpp"
00039 #include <sirikata/space/SpaceContext.hpp>
00040 #include <sirikata/core/network/IOStrandImpl.hpp>
00041 #include <sirikata/core/network/Asio.hpp>
00042 #include "../../CraqObjectSegmentation.hpp"
00043 #include "../asyncCraqScheduler.hpp"
00044 #include <sirikata/space/OSegLookupTraceToken.hpp>
00045 
00046 
00047 namespace Sirikata
00048 {
00049 
00050 static const int MAX_TIME_BETWEEN_RESULTS = 50000;
00051 
00052 
00053 class AsyncConnectionGet
00054 {
00055 private:
00056   struct IndividualQueryData
00057   {
00058     IndividualQueryData():currentlySettingTo(CraqEntry::null()){}
00059     enum GetOrSet {GET,SET};
00060     GetOrSet gs;
00061     CraqDataKey currentlySearchingFor;
00062     CraqEntry currentlySettingTo;
00063     bool is_tracking;
00064     int tracking_number;
00065     uint64 time_admitted; //in milliseconds what time was when lookup was requested.
00066     Sirikata::Network::DeadlineTimer* deadline_timer;
00067     OSegLookupTraceToken* traceToken;
00068   };
00069 
00070 
00071 public:
00072 
00073   enum ConnectionState {READY, NEED_NEW_SOCKET,PROCESSING}; //we'll probably be always processing or need new socket.  (minus the initial connection registration.
00074 
00075   void initialize(Sirikata::Network::TCPSocket* socket,     boost::asio::ip::tcp::resolver::iterator );
00076 
00077   AsyncConnectionGet::ConnectionState ready(); //tells the querier whether I'm processing a message or available for more information.
00078 
00079 
00080   void get(const CraqDataKey& dataToGet, OSegLookupTraceToken* traceToken);
00081   void getBound(const CraqObjectID& obj_dataToGet, OSegLookupTraceToken* traceToken);
00082 
00083 
00084   ~AsyncConnectionGet();
00085     AsyncConnectionGet(SpaceContext* con, Network::IOStrand* str, Network::IOStrand* error_strand, Network::IOStrand* result_strand, AsyncCraqScheduler* master, CraqObjectSegmentation* oseg, const std::tr1::function<void()> &readyStateChangedCb );
00086 
00087   int numStillProcessing();
00088   void printOutstanding();
00089 
00090   int runReQuery(); //re-query everything remaining in outstanding results.
00091   void printStatisticsTimesTaken();
00092 
00093   int getRespCount();
00094 
00096   void setProcessing();
00097   void stop();
00098 
00099 private:
00100 
00101   int mAllResponseCount;
00102   std::vector<double> mTimesTaken;
00103 
00104   int mTimesBetweenResults;
00105   bool mHandlerState;
00106 
00107 
00108 
00109   Sirikata::Network::TCPSocket* mSocket;
00110 
00111 
00112   void outputLargeOutstanding();
00113 
00114     typedef std::tr1::unordered_multimap<std::string, IndividualQueryData*> MultiOutstandingQueries;   //the string represents the obj id of the data.
00115   MultiOutstandingQueries allOutstandingQueries;  //we can be getting and setting so we need this to be a multimap
00116 
00117   volatile ConnectionState mReady;
00118 
00119   bool getQuery(const CraqDataKey& dataToGet);
00120 
00121 
00122   void queryTimedOutCallbackGet(const boost::system::error_code& e, const std::string&searchFor);
00123   void queryTimedOutCallbackGetPrint(const boost::system::error_code& e, const std::string&searchFor);
00124 
00125   //this function is responsible for elegantly killing connections and telling the controlling asyncCraq that that's what it's doing.
00126   void killSequence();
00127 
00128 
00129   void processValueNotFound(std::string dataKey); //takes in
00130   void processValueFound(std::string dataKey, const CraqEntry& sID);
00131   void processStoredValue(std::string dataKey);
00132 
00133 
00134   bool parseValueNotFound(std::string response, std::string& dataKey);
00135   bool parseValueValue(std::string response, std::string& dataKey,CraqEntry& sID);
00136   bool parseStoredValue(const std::string& response, std::string& dataKey);
00137 
00138   bool processEntireResponse(std::string response);
00139 
00140   bool checkStored(std::string& response);
00141   bool checkValue(std::string& response);
00142   bool checkNotFound(std::string& response);
00143   bool checkError(std::string& response);
00144 
00145 
00146   size_t smallestIndex(std::vector<size_t> sizeVec);
00147 
00148 
00149   std::string mPrevReadFrag;
00150 
00151 
00152   //***********handlers**************
00153 
00154   //connect_handler
00155   void connect_handler(const boost::system::error_code& error);
00156 
00157   void generic_read_stored_not_found_error_handler ( const boost::system::error_code& error, std::size_t bytes_transferred, boost::asio::streambuf* sBuff);
00158   void set_generic_stored_not_found_error_handler();
00159 
00160 
00161   //set handler
00162   void read_handler_set      ( const boost::system::error_code& error, std::size_t bytes_transferred, boost::asio::streambuf* sBuff);
00163   void write_some_handler_set( const boost::system::error_code& error, std::size_t bytes_transferred);
00164   //get handler
00165   void write_some_handler_get(  const boost::system::error_code& error, std::size_t bytes_transferred);
00166   void read_handler_get      (  const boost::system::error_code& error, std::size_t bytes_transferred, boost::asio::streambuf* sBuff);
00167 
00168 
00169   void clear_all_deadline_timers();
00170 
00171   //***strand and context
00172   SpaceContext* ctx;
00173   Network::IOStrand* mStrand;
00174   Network::IOStrand* mPostErrorsStrand;
00175   Network::IOStrand* mResultStrand;
00176   AsyncCraqScheduler* mSchedulerMaster;
00177   CraqObjectSegmentation* mOSeg;
00178   double getTime;
00179   int numGets;
00180   bool mReceivedStopRequest;
00181   std::tr1::function<void()> mReadyStateChangedCallback;
00182 };
00183 
00184 }
00185 
00186 #endif