Sirikata
|
00001 /* Sirikata 00002 * asyncConnectionGet.hpp 00003 * 00004 * Copyright (c) 2010, Behram Mistree 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef __CRAQ_ASYNC_CONNECTION_GET_HPP__ 00034 #define __CRAQ_ASYNC_CONNECTION_GET_HPP__ 00035 00036 #include <sirikata/core/util/Platform.hpp> 00037 #include <boost/asio.hpp> 00038 #include "../asyncCraqUtil.hpp" 00039 #include <sirikata/space/SpaceContext.hpp> 00040 #include <sirikata/core/network/IOStrandImpl.hpp> 00041 #include <sirikata/core/network/Asio.hpp> 00042 #include "../../CraqObjectSegmentation.hpp" 00043 #include "../asyncCraqScheduler.hpp" 00044 #include <sirikata/space/OSegLookupTraceToken.hpp> 00045 00046 00047 namespace Sirikata 00048 { 00049 00050 static const int MAX_TIME_BETWEEN_RESULTS = 50000; 00051 00052 00053 class AsyncConnectionGet 00054 { 00055 private: 00056 struct IndividualQueryData 00057 { 00058 IndividualQueryData():currentlySettingTo(CraqEntry::null()){} 00059 enum GetOrSet {GET,SET}; 00060 GetOrSet gs; 00061 CraqDataKey currentlySearchingFor; 00062 CraqEntry currentlySettingTo; 00063 bool is_tracking; 00064 int tracking_number; 00065 uint64 time_admitted; //in milliseconds what time was when lookup was requested. 00066 Sirikata::Network::DeadlineTimer* deadline_timer; 00067 OSegLookupTraceToken* traceToken; 00068 }; 00069 00070 00071 public: 00072 00073 enum ConnectionState {READY, NEED_NEW_SOCKET,PROCESSING}; //we'll probably be always processing or need new socket. (minus the initial connection registration. 00074 00075 void initialize(Sirikata::Network::TCPSocket* socket, boost::asio::ip::tcp::resolver::iterator ); 00076 00077 AsyncConnectionGet::ConnectionState ready(); //tells the querier whether I'm processing a message or available for more information. 00078 00079 00080 void get(const CraqDataKey& dataToGet, OSegLookupTraceToken* traceToken); 00081 void getBound(const CraqObjectID& obj_dataToGet, OSegLookupTraceToken* traceToken); 00082 00083 00084 ~AsyncConnectionGet(); 00085 AsyncConnectionGet(SpaceContext* con, Network::IOStrand* str, Network::IOStrand* error_strand, Network::IOStrand* result_strand, AsyncCraqScheduler* master, CraqObjectSegmentation* oseg, const std::tr1::function<void()> &readyStateChangedCb ); 00086 00087 int numStillProcessing(); 00088 void printOutstanding(); 00089 00090 int runReQuery(); //re-query everything remaining in outstanding results. 00091 void printStatisticsTimesTaken(); 00092 00093 int getRespCount(); 00094 00096 void setProcessing(); 00097 void stop(); 00098 00099 private: 00100 00101 int mAllResponseCount; 00102 std::vector<double> mTimesTaken; 00103 00104 int mTimesBetweenResults; 00105 bool mHandlerState; 00106 00107 00108 00109 Sirikata::Network::TCPSocket* mSocket; 00110 00111 00112 void outputLargeOutstanding(); 00113 00114 typedef std::tr1::unordered_multimap<std::string, IndividualQueryData*> MultiOutstandingQueries; //the string represents the obj id of the data. 00115 MultiOutstandingQueries allOutstandingQueries; //we can be getting and setting so we need this to be a multimap 00116 00117 volatile ConnectionState mReady; 00118 00119 bool getQuery(const CraqDataKey& dataToGet); 00120 00121 00122 void queryTimedOutCallbackGet(const boost::system::error_code& e, const std::string&searchFor); 00123 void queryTimedOutCallbackGetPrint(const boost::system::error_code& e, const std::string&searchFor); 00124 00125 //this function is responsible for elegantly killing connections and telling the controlling asyncCraq that that's what it's doing. 00126 void killSequence(); 00127 00128 00129 void processValueNotFound(std::string dataKey); //takes in 00130 void processValueFound(std::string dataKey, const CraqEntry& sID); 00131 void processStoredValue(std::string dataKey); 00132 00133 00134 bool parseValueNotFound(std::string response, std::string& dataKey); 00135 bool parseValueValue(std::string response, std::string& dataKey,CraqEntry& sID); 00136 bool parseStoredValue(const std::string& response, std::string& dataKey); 00137 00138 bool processEntireResponse(std::string response); 00139 00140 bool checkStored(std::string& response); 00141 bool checkValue(std::string& response); 00142 bool checkNotFound(std::string& response); 00143 bool checkError(std::string& response); 00144 00145 00146 size_t smallestIndex(std::vector<size_t> sizeVec); 00147 00148 00149 std::string mPrevReadFrag; 00150 00151 00152 //***********handlers************** 00153 00154 //connect_handler 00155 void connect_handler(const boost::system::error_code& error); 00156 00157 void generic_read_stored_not_found_error_handler ( const boost::system::error_code& error, std::size_t bytes_transferred, boost::asio::streambuf* sBuff); 00158 void set_generic_stored_not_found_error_handler(); 00159 00160 00161 //set handler 00162 void read_handler_set ( const boost::system::error_code& error, std::size_t bytes_transferred, boost::asio::streambuf* sBuff); 00163 void write_some_handler_set( const boost::system::error_code& error, std::size_t bytes_transferred); 00164 //get handler 00165 void write_some_handler_get( const boost::system::error_code& error, std::size_t bytes_transferred); 00166 void read_handler_get ( const boost::system::error_code& error, std::size_t bytes_transferred, boost::asio::streambuf* sBuff); 00167 00168 00169 void clear_all_deadline_timers(); 00170 00171 //***strand and context 00172 SpaceContext* ctx; 00173 Network::IOStrand* mStrand; 00174 Network::IOStrand* mPostErrorsStrand; 00175 Network::IOStrand* mResultStrand; 00176 AsyncCraqScheduler* mSchedulerMaster; 00177 CraqObjectSegmentation* mOSeg; 00178 double getTime; 00179 int numGets; 00180 bool mReceivedStopRequest; 00181 std::tr1::function<void()> mReadyStateChangedCallback; 00182 }; 00183 00184 } 00185 00186 #endif