Sirikata
libspace/plugins/craq/craq_hybrid/craq_sets/asyncConnectionSet.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  asyncConnectionSet.hpp
00003  *
00004  *  Copyright (c) 2010, Behram Mistree
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef __CRAQ_ASYNC_CONNECTION_SET_HPP__
00034 #define __CRAQ_ASYNC_CONNECTION_SET_HPP__
00035 
00036 #include <sirikata/core/util/Platform.hpp>
00037 #include <boost/asio.hpp>
00038 #include "../asyncCraqUtil.hpp"
00039 #include <sirikata/space/SpaceContext.hpp>
00040 #include <sirikata/core/network/IOStrandImpl.hpp>
00041 #include "../asyncCraqScheduler.hpp"
00042 #include <sirikata/core/network/Asio.hpp>
00043 
00044 
00045 //#define ASYNC_CONNECTION_DEBUG
00046 
00047 namespace Sirikata
00048 {
00049 
00050 class CraqObjectSegmentation;
00051 
00052 class AsyncConnectionSet
00053 {
00054 
00055 public:
00056 
00057   enum ConnectionState {READY, NEED_NEW_SOCKET,PROCESSING}; //we'll probably be always processing or need new socket.  (minus the initial connection registration.
00058 
00059   void initialize(Sirikata::Network::TCPSocket* socket,     boost::asio::ip::tcp::resolver::iterator );
00060 
00061   AsyncConnectionSet::ConnectionState ready(); //tells the querier whether I'm processing a message or available for more information.
00062 
00063   void setBound(const CraqObjectID& obj_dataToGet, const CraqEntry& dataToSetTo, const bool&  track, const int& trackNum);
00064   void set(const CraqDataKey& dataToSet, const CraqEntry& dataToSetTo, const bool&  track, const int& trackNum);
00065 
00066   ~AsyncConnectionSet();
00067   AsyncConnectionSet(SpaceContext* con, Network::IOStrand* str, Network::IOStrand* error_strand, Network::IOStrand* result_strand, AsyncCraqScheduler* master, CraqObjectSegmentation* oseg, const std::tr1::function<void()>& readySetChanged);
00068 
00069   int numStillProcessing();
00070   void stop();
00072   void setProcessing();
00073 
00074 private:
00075   Sirikata::Network::TCPSocket* mSocket;
00076   struct IndividualQueryData
00077   {
00078     IndividualQueryData():currentlySettingTo(CraqEntry::null()){}
00079     enum GetOrSet {GET,SET};
00080     GetOrSet gs;
00081     CraqDataKey currentlySearchingFor;
00082     CraqEntry currentlySettingTo;
00083     bool is_tracking;
00084     int tracking_number;
00085     uint64 time_admitted; //in milliseconds what time was when lookup was requested.
00086     Sirikata::Network::DeadlineTimer* deadline_timer;
00087   };
00088   typedef std::multimap<std::string, IndividualQueryData*> MultiOutstandingQueries;   //the string represents the obj id of the data.
00089   MultiOutstandingQueries allOutstandingQueries;  //we can be getting and setting so we need this to be a multimap
00090 
00091   SpaceContext*                      ctx;
00092   Network::IOStrand*                      mStrand;
00093   Network::IOStrand*                 mErrorStrand;
00094   Network::IOStrand*               mResultsStrand;
00095   AsyncCraqScheduler*   mSchedulerMaster;
00096   CraqObjectSegmentation*              mOSeg;
00097 
00098   volatile ConnectionState mReady;
00099   //this function is responsible for elegantly killing connections and telling the controlling asyncCraq that that's what it's doing.
00100   void killSequence();
00101 
00102 
00103   void set_generic_read_result_handler();
00104   void set_generic_read_error_handler();
00105 
00106 
00107   bool processEntireResponse(std::string response);
00108 
00109   void processStoredValue(std::string dataKey);
00110   bool parseStoredValue(const std::string& response, std::string& dataKey);
00111 
00112 
00113   bool checkStored(std::string& response);
00114   bool checkError(std::string& response);
00115 
00116 
00117   size_t smallestIndex(std::vector<size_t> sizeVec);
00118 
00119 
00120   std::string mPrevReadFrag;
00121   bool mReceivedStopRequest;
00122   std::tr1::function<void()> mReadyStateChangedCallback;
00123   void clear_all_deadline_timers();
00124   //***********handlers**************
00125   //timeout callback handler
00126   void queryTimedOutCallbackSet(const boost::system::error_code& e, IndividualQueryData* iqd);
00127 
00128   //connect_handler
00129   void connect_handler(const boost::system::error_code& error);
00130 
00131   void generic_read_stored_not_found_error_handler ( const boost::system::error_code& error, std::size_t bytes_transferred, boost::asio::streambuf* sBuff);
00132   void set_generic_stored_not_found_error_handler();
00133 
00134 
00135   //set handler
00136   void read_handler_set      ( const boost::system::error_code& error, std::size_t bytes_transferred, boost::asio::streambuf* sBuff);
00137   void write_some_handler_set( const boost::system::error_code& error, std::size_t bytes_transferred);
00138 };
00139 
00140 
00141 
00142 }//end namespace
00143 
00144 #endif