Sirikata
|
00001 /* Sirikata 00002 * DistributedCoordinateSegmentation.hpp 00003 * 00004 * Copyright (c) 2009, Tahir Azim 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef _SIRIKATA_DISTRIBUTED_COORDINATE_SEGMENTATION_HPP_ 00034 #define _SIRIKATA_DISTRIBUTED_COORDINATE_SEGMENTATION_HPP_ 00035 00036 #include <boost/unordered_map.hpp> 00037 #include <boost/shared_ptr.hpp> 00038 #include <boost/asio.hpp> 00039 #include <boost/tokenizer.hpp> 00040 #include <boost/bind.hpp> 00041 00042 00043 #include <sirikata/core/util/AtomicTypes.hpp> 00044 #include <sirikata/core/network/Message.hpp> 00045 #include <sirikata/core/queue/SizedThreadSafeQueue.hpp> 00046 #include <sirikata/core/service/PollingService.hpp> 00047 #include <sirikata/space/SegmentedRegion.hpp> 00048 #include <sirikata/core/network/Address4.hpp> 00049 #include <sirikata/core/options/CommonOptions.hpp> 00050 #include <sirikata/core/network/Message.hpp> 00051 #include <sirikata/core/util/Hash.hpp> 00052 #include "WorldPopulationBSPTree.hpp" 00053 #include "CSegContext.hpp" 00054 #include "LoadBalancer.hpp" 00055 00056 #include "Protocol_CSeg.pbj.hpp" 00057 00058 typedef boost::asio::ip::tcp tcp; 00059 00060 00061 00062 namespace Sirikata { 00063 00064 00065 00066 class ServerIDMap; 00067 00068 typedef struct SegmentationChangeListener { 00069 char host[255]; 00070 uint16 port; 00071 } SegmentationChangeListener; 00072 00073 typedef boost::shared_ptr<tcp::socket> SocketPtr; 00074 typedef struct SocketContainer { 00075 SocketContainer() { 00076 mSocket = SocketPtr(); 00077 } 00078 00079 SocketContainer(SocketPtr socket) { 00080 mSocket = socket; 00081 } 00082 00083 size_t size() { 00084 return 1; 00085 } 00086 00087 SocketPtr socket() { 00088 return mSocket; 00089 } 00090 00091 SocketPtr mSocket; 00092 00093 } SocketContainer; 00094 00095 typedef std::tr1::function< void(std::map<ServerID, SocketContainer>) > ResponseCompletionFunction; 00096 00097 typedef boost::shared_ptr<Sirikata::SizedThreadSafeQueue<SocketContainer> > SocketQueuePtr; 00098 00100 class DistributedCoordinateSegmentation : public PollingService { 00101 public: 00102 DistributedCoordinateSegmentation(CSegContext* ctx, const BoundingBox3f& region, const Vector3ui32& perdim, int, ServerIDMap * ); 00103 virtual ~DistributedCoordinateSegmentation(); 00104 00105 virtual BoundingBoxList serverRegionCached(const ServerID& server); 00106 void getServerRegionUncached(const ServerID& server, 00107 boost::shared_ptr<tcp::socket> socket); 00108 virtual BoundingBox3f region() ; 00109 virtual uint32 numServers() ; 00110 00111 virtual void poll(); 00112 virtual void stop(); 00113 00114 void handleSelfLookup(ServerID my_sid, Address4 my_addr); 00115 00116 private: 00117 void service(); 00118 00119 CSegContext* mContext; 00120 00121 SegmentedRegion mTopLevelRegion; 00122 Time mLastUpdateTime; 00123 00124 LoadBalancer mLoadBalancer; 00125 00126 std::vector<SegmentationChangeListener> mSpacePeers; 00127 00128 00129 boost::asio::io_service mIOService; //creates an io service 00130 boost::asio::io_service mLLIOService; //creates an io service 00131 00132 boost::shared_ptr<tcp::acceptor> mAcceptor; 00133 boost::shared_ptr<tcp::socket> mSocket; 00134 00135 00136 boost::shared_ptr<tcp::acceptor> mLLTreeAcceptor; 00137 boost::shared_ptr<tcp::socket> mLLTreeAcceptorSocket; 00138 00139 std::map<String, SegmentedRegion*> mHigherLevelTrees; 00140 std::map<String, SegmentedRegion*> mLowerLevelTrees; 00141 00142 int mAvailableCSEGServers; 00143 int mUpperTreeCSEGServers; 00144 00145 void csegChangeMessage(Sirikata::Protocol::CSeg::ChangeMessage* ccMsg); 00146 void handleLoadReport(boost::shared_ptr<tcp::socket>, Sirikata::Protocol::CSeg::LoadReportMessage* message); 00147 void notifySpaceServersOfChange(const std::vector<SegmentationInfo> segInfoVector); 00148 00149 /* Start listening for and accepting incoming connections. */ 00150 void startAccepting(); 00151 void startAcceptingLLRequests(); 00152 00153 /* Handlers for incoming connections */ 00154 void accept_handler(); 00155 void acceptLLTreeRequestHandler(); 00156 00157 /* Functions to do the initial kd-tree partitioning of the virtual world into regions, and 00158 divide the kd-tree into upper and lower trees. 00159 */ 00160 void generateHierarchicalTrees(SegmentedRegion* region, int depth, int& numLLTreesSoFar); 00161 void subdivideTopLevelRegion(SegmentedRegion* region, 00162 Vector3ui32 perdim, 00163 int& numServersAssigned); 00164 00165 00166 00167 /* Functions to contact another CSEG server, create sockets to them and/or forward calls to them. 00168 These should go away later when calls to CSEG no longer remain recursive. */ 00169 00170 void callLowerLevelCSEGServer(boost::shared_ptr<tcp::socket> socket, ServerID, const Vector3f& searchVec, 00171 const BoundingBox3f& boundingBox, 00172 BoundingBox3f& returningBBox); 00173 void callLowerLevelCSEGServersForServerRegions(boost::shared_ptr<tcp::socket> socket, 00174 ServerID server_id, BoundingBoxList&); 00175 void sendLoadReportToLowerLevelCSEGServer(boost::shared_ptr<tcp::socket> socket, ServerID, 00176 const BoundingBox3f& boundingBox, 00177 Sirikata::Protocol::CSeg::LoadReportMessage* message); 00178 void callLowerLevelCSEGServersForLookupBoundingBoxes(boost::shared_ptr<tcp::socket> socket, 00179 const BoundingBox3f& bbox, 00180 const std::map<ServerID, std::vector<SegmentedRegion*> >&, 00181 std::vector<ServerID>& ); 00182 00183 00184 /* Functions run in separate threads to listen for incoming packets */ 00185 void ioServicingLoop(); 00186 void llIOServicingLoop(); 00187 00188 /* Functions to send buffers to all CSEG or all space servers */ 00189 void sendToAllCSEGServers( Sirikata::Protocol::CSeg::CSegMessage& ); 00190 void sendOnAllSockets(Sirikata::Protocol::CSeg::CSegMessage csegMessage, std::map< ServerID, SocketContainer > socketList); 00191 00192 00193 void sendToAllSpaceServers( Sirikata::Protocol::CSeg::CSegMessage& ); 00194 00195 uint32 getAvailableServerIndex(); 00196 00197 void getRandomLeafParentSibling(SegmentedRegion** randomLeaf, 00198 SegmentedRegion** sibling, 00199 SegmentedRegion** parent); 00200 00201 00202 00203 00204 /* Functions to read from a socket. */ 00205 00206 void asyncRead(boost::shared_ptr<tcp::socket> socket, 00207 uint8* asyncBufferArray, 00208 const boost::system::error_code& ec, 00209 std::size_t bytes_transferred); 00210 void asyncLLRead(boost::shared_ptr<tcp::socket> socket, 00211 uint8* asyncBufferArray, 00212 const boost::system::error_code& ec, 00213 std::size_t bytes_transferred); 00214 00215 00216 void writeCSEGMessage(boost::shared_ptr<tcp::socket> socket, Sirikata::Protocol::CSeg::CSegMessage& csegMessage); 00217 00218 void readCSEGMessage(boost::shared_ptr<tcp::socket> socket, 00219 Sirikata::Protocol::CSeg::CSegMessage& csegMessage, 00220 uint8* bufferSoFar, 00221 uint32 bufferSoFarSize 00222 ); 00223 00224 void readCSEGMessage(boost::shared_ptr<tcp::socket> socket, 00225 Sirikata::Protocol::CSeg::CSegMessage& csegMessage); 00226 00227 /* Functions to serialize the whole CSEG tree. */ 00228 void serializeBSPTree(SerializedBSPTree* serializedBSPTree); 00229 void traverseAndStoreTree(SegmentedRegion* region, uint32& idx, 00230 SerializedBSPTree* serializedTree); 00231 00232 00233 00234 ServerIDMap * mSidMap; 00235 00236 00237 /* Key value maps for fast lookup of bounding boxes managed by space servers. */ 00238 std::map<ServerID, BoundingBoxList > mWholeTreeServerRegionMap; 00239 std::map<ServerID, BoundingBoxList > mLowerTreeServerRegionMap; 00240 00241 boost::shared_mutex mCSEGReadWriteMutex; 00242 00243 boost::shared_mutex mSocketsToCSEGServersMutex; 00244 std::map<ServerID, SocketQueuePtr > mLeasedSocketsToCSEGServers; 00245 00246 friend class LoadBalancer; 00247 00248 00249 SocketContainer getSocketToCSEGServer(ServerID server_id); 00250 00251 00252 void doSocketCreation(ServerID server_id, 00253 std::vector<ServerID> server_id_List, 00254 std::map<ServerID, SocketContainer>* socketMapPtr, 00255 ResponseCompletionFunction func, 00256 ServerID resolved_id, 00257 Address4 addy 00258 ); 00259 00260 void createSocketContainers(std::vector<ServerID> server_id_List, 00261 std::map<ServerID, SocketContainer>* socketMap, 00262 ResponseCompletionFunction func 00263 ); 00264 00265 bool handleLookup(Vector3f vector, 00266 boost::shared_ptr<tcp::socket> socket); 00267 00268 void lookupOnSocket(boost::shared_ptr<tcp::socket> clientSocket, const Vector3f searchVec, 00269 const BoundingBox3f boundingBox, std::map<ServerID, SocketContainer> socketList); 00270 00271 00272 void writeLookupResponse(boost::shared_ptr<tcp::socket> socket, BoundingBox3f& bbox, ServerID sid) ; 00273 00274 00275 00276 00277 void requestServerRegionsOnSockets(boost::shared_ptr<tcp::socket> socket, ServerID server_id, 00278 BoundingBoxList bbList, 00279 std::map<ServerID, SocketContainer> socketList); 00280 00281 void writeServerRegionResponse(boost::shared_ptr<tcp::socket> socket, 00282 BoundingBoxList boundingBoxlist); 00283 00284 bool handleLookupBBox(const BoundingBox3f& bbox, boost::shared_ptr<tcp::socket> socket); 00285 00286 void lookupBBoxOnSocket(boost::shared_ptr<tcp::socket> clientSocket, 00287 const BoundingBox3f boundingBox, std::vector<ServerID> server_ids, 00288 std::map<ServerID, std::vector<SegmentedRegion*> > otherCSEGServers, 00289 std::map<ServerID, SocketContainer> socketList); 00290 00291 void writeLookupBBoxResponse(boost::shared_ptr<tcp::socket> clientSocket, std::vector<ServerID>) ; 00292 00293 void sendLoadReportOnSocket(boost::shared_ptr<tcp::socket> clientSocket, BoundingBox3f boundingBox, Sirikata::Protocol::CSeg::LoadReportMessage message, std::map< ServerID, SocketContainer > socketList); 00294 00295 00296 }; // class CoordinateSegmentation 00297 00298 } // namespace Sirikata 00299 00300 #endif