Sirikata
|
00001 /* Sirikata 00002 * asyncConnectionSet.hpp 00003 * 00004 * Copyright (c) 2010, Behram Mistree 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef __CRAQ_ASYNC_CONNECTION_SET_HPP__ 00034 #define __CRAQ_ASYNC_CONNECTION_SET_HPP__ 00035 00036 #include <sirikata/core/util/Platform.hpp> 00037 #include <boost/asio.hpp> 00038 #include "../asyncCraqUtil.hpp" 00039 #include <sirikata/space/SpaceContext.hpp> 00040 #include <sirikata/core/network/IOStrandImpl.hpp> 00041 #include "../asyncCraqScheduler.hpp" 00042 #include <sirikata/core/network/Asio.hpp> 00043 00044 00045 //#define ASYNC_CONNECTION_DEBUG 00046 00047 namespace Sirikata 00048 { 00049 00050 class CraqObjectSegmentation; 00051 00052 class AsyncConnectionSet 00053 { 00054 00055 public: 00056 00057 enum ConnectionState {READY, NEED_NEW_SOCKET,PROCESSING}; //we'll probably be always processing or need new socket. (minus the initial connection registration. 00058 00059 void initialize(Sirikata::Network::TCPSocket* socket, boost::asio::ip::tcp::resolver::iterator ); 00060 00061 AsyncConnectionSet::ConnectionState ready(); //tells the querier whether I'm processing a message or available for more information. 00062 00063 void setBound(const CraqObjectID& obj_dataToGet, const CraqEntry& dataToSetTo, const bool& track, const int& trackNum); 00064 void set(const CraqDataKey& dataToSet, const CraqEntry& dataToSetTo, const bool& track, const int& trackNum); 00065 00066 ~AsyncConnectionSet(); 00067 AsyncConnectionSet(SpaceContext* con, Network::IOStrand* str, Network::IOStrand* error_strand, Network::IOStrand* result_strand, AsyncCraqScheduler* master, CraqObjectSegmentation* oseg, const std::tr1::function<void()>& readySetChanged); 00068 00069 int numStillProcessing(); 00070 void stop(); 00072 void setProcessing(); 00073 00074 private: 00075 Sirikata::Network::TCPSocket* mSocket; 00076 struct IndividualQueryData 00077 { 00078 IndividualQueryData():currentlySettingTo(CraqEntry::null()){} 00079 enum GetOrSet {GET,SET}; 00080 GetOrSet gs; 00081 CraqDataKey currentlySearchingFor; 00082 CraqEntry currentlySettingTo; 00083 bool is_tracking; 00084 int tracking_number; 00085 uint64 time_admitted; //in milliseconds what time was when lookup was requested. 00086 Sirikata::Network::DeadlineTimer* deadline_timer; 00087 }; 00088 typedef std::multimap<std::string, IndividualQueryData*> MultiOutstandingQueries; //the string represents the obj id of the data. 00089 MultiOutstandingQueries allOutstandingQueries; //we can be getting and setting so we need this to be a multimap 00090 00091 SpaceContext* ctx; 00092 Network::IOStrand* mStrand; 00093 Network::IOStrand* mErrorStrand; 00094 Network::IOStrand* mResultsStrand; 00095 AsyncCraqScheduler* mSchedulerMaster; 00096 CraqObjectSegmentation* mOSeg; 00097 00098 volatile ConnectionState mReady; 00099 //this function is responsible for elegantly killing connections and telling the controlling asyncCraq that that's what it's doing. 00100 void killSequence(); 00101 00102 00103 void set_generic_read_result_handler(); 00104 void set_generic_read_error_handler(); 00105 00106 00107 bool processEntireResponse(std::string response); 00108 00109 void processStoredValue(std::string dataKey); 00110 bool parseStoredValue(const std::string& response, std::string& dataKey); 00111 00112 00113 bool checkStored(std::string& response); 00114 bool checkError(std::string& response); 00115 00116 00117 size_t smallestIndex(std::vector<size_t> sizeVec); 00118 00119 00120 std::string mPrevReadFrag; 00121 bool mReceivedStopRequest; 00122 std::tr1::function<void()> mReadyStateChangedCallback; 00123 void clear_all_deadline_timers(); 00124 //***********handlers************** 00125 //timeout callback handler 00126 void queryTimedOutCallbackSet(const boost::system::error_code& e, IndividualQueryData* iqd); 00127 00128 //connect_handler 00129 void connect_handler(const boost::system::error_code& error); 00130 00131 void generic_read_stored_not_found_error_handler ( const boost::system::error_code& error, std::size_t bytes_transferred, boost::asio::streambuf* sBuff); 00132 void set_generic_stored_not_found_error_handler(); 00133 00134 00135 //set handler 00136 void read_handler_set ( const boost::system::error_code& error, std::size_t bytes_transferred, boost::asio::streambuf* sBuff); 00137 void write_some_handler_set( const boost::system::error_code& error, std::size_t bytes_transferred); 00138 }; 00139 00140 00141 00142 }//end namespace 00143 00144 #endif