Sirikata
cseg/src/DistributedCoordinateSegmentation.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  DistributedCoordinateSegmentation.hpp
00003  *
00004  *  Copyright (c) 2009, Tahir Azim
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef _SIRIKATA_DISTRIBUTED_COORDINATE_SEGMENTATION_HPP_
00034 #define _SIRIKATA_DISTRIBUTED_COORDINATE_SEGMENTATION_HPP_
00035 
00036 #include <boost/unordered_map.hpp>
00037 #include <boost/shared_ptr.hpp>
00038 #include <boost/asio.hpp>
00039 #include <boost/tokenizer.hpp>
00040 #include <boost/bind.hpp>
00041 
00042 
00043 #include <sirikata/core/util/AtomicTypes.hpp>
00044 #include <sirikata/core/network/Message.hpp>
00045 #include <sirikata/core/queue/SizedThreadSafeQueue.hpp>
00046 #include <sirikata/core/service/PollingService.hpp>
00047 #include <sirikata/space/SegmentedRegion.hpp>
00048 #include <sirikata/core/network/Address4.hpp>
00049 #include <sirikata/core/options/CommonOptions.hpp>
00050 #include <sirikata/core/network/Message.hpp>
00051 #include <sirikata/core/util/Hash.hpp>
00052 #include "WorldPopulationBSPTree.hpp"
00053 #include "CSegContext.hpp"
00054 #include "LoadBalancer.hpp"
00055 
00056 #include "Protocol_CSeg.pbj.hpp"
00057 
00058 typedef boost::asio::ip::tcp tcp;
00059 
00060 
00061 
00062 namespace Sirikata {
00063 
00064 
00065 
00066 class ServerIDMap;
00067 
00068 typedef struct SegmentationChangeListener {
00069   char host[255];
00070   uint16 port;
00071 } SegmentationChangeListener;
00072 
00073 typedef boost::shared_ptr<tcp::socket> SocketPtr;
00074 typedef struct SocketContainer {
00075   SocketContainer() {
00076     mSocket = SocketPtr();
00077   }
00078 
00079   SocketContainer(SocketPtr socket) {
00080     mSocket = socket;
00081   }
00082 
00083   size_t size() {
00084     return 1;
00085   }
00086 
00087   SocketPtr socket() {
00088     return mSocket;
00089   }
00090 
00091   SocketPtr mSocket;
00092 
00093 } SocketContainer;
00094 
00095 typedef std::tr1::function< void(std::map<ServerID, SocketContainer>) > ResponseCompletionFunction;
00096 
00097 typedef boost::shared_ptr<Sirikata::SizedThreadSafeQueue<SocketContainer> > SocketQueuePtr;
00098 
00100 class DistributedCoordinateSegmentation : public PollingService {
00101 public:
00102     DistributedCoordinateSegmentation(CSegContext* ctx, const BoundingBox3f& region, const Vector3ui32& perdim, int, ServerIDMap * );
00103     virtual ~DistributedCoordinateSegmentation();
00104 
00105     virtual BoundingBoxList serverRegionCached(const ServerID& server);
00106     void getServerRegionUncached(const ServerID& server,
00107                           boost::shared_ptr<tcp::socket> socket);
00108     virtual BoundingBox3f region() ;
00109     virtual uint32 numServers() ;
00110 
00111     virtual void poll();
00112     virtual void stop();
00113 
00114     void handleSelfLookup(ServerID my_sid, Address4 my_addr);
00115 
00116 private:
00117     void service();
00118 
00119     CSegContext* mContext;
00120 
00121     SegmentedRegion mTopLevelRegion;
00122     Time mLastUpdateTime;
00123 
00124     LoadBalancer mLoadBalancer;
00125 
00126     std::vector<SegmentationChangeListener> mSpacePeers;
00127 
00128 
00129     boost::asio::io_service mIOService;  //creates an io service
00130     boost::asio::io_service mLLIOService;  //creates an io service
00131 
00132     boost::shared_ptr<tcp::acceptor> mAcceptor;
00133     boost::shared_ptr<tcp::socket> mSocket;
00134 
00135 
00136     boost::shared_ptr<tcp::acceptor> mLLTreeAcceptor;
00137     boost::shared_ptr<tcp::socket> mLLTreeAcceptorSocket;
00138 
00139     std::map<String, SegmentedRegion*> mHigherLevelTrees;
00140     std::map<String, SegmentedRegion*> mLowerLevelTrees;
00141 
00142     int mAvailableCSEGServers;
00143     int mUpperTreeCSEGServers;
00144 
00145     void csegChangeMessage(Sirikata::Protocol::CSeg::ChangeMessage* ccMsg);
00146     void handleLoadReport(boost::shared_ptr<tcp::socket>, Sirikata::Protocol::CSeg::LoadReportMessage* message);
00147     void notifySpaceServersOfChange(const std::vector<SegmentationInfo> segInfoVector);
00148 
00149     /* Start listening for and accepting incoming connections.  */
00150     void startAccepting();
00151     void startAcceptingLLRequests();
00152 
00153     /* Handlers for incoming connections */
00154     void accept_handler();
00155     void acceptLLTreeRequestHandler();
00156 
00157    /* Functions to do the initial kd-tree partitioning of the virtual world into regions, and
00158       divide the kd-tree into upper and lower trees.
00159    */
00160     void generateHierarchicalTrees(SegmentedRegion* region, int depth, int& numLLTreesSoFar);
00161     void subdivideTopLevelRegion(SegmentedRegion* region,
00162                  Vector3ui32 perdim,
00163                  int& numServersAssigned);
00164 
00165 
00166 
00167     /* Functions to contact another CSEG server, create sockets to them and/or forward calls to them.
00168        These should go away later when calls to CSEG no longer remain recursive. */
00169 
00170     void callLowerLevelCSEGServer(boost::shared_ptr<tcp::socket> socket, ServerID, const Vector3f& searchVec,
00171                                       const BoundingBox3f& boundingBox,
00172                                       BoundingBox3f& returningBBox);
00173   void callLowerLevelCSEGServersForServerRegions(boost::shared_ptr<tcp::socket> socket,
00174                                                  ServerID server_id, BoundingBoxList&);
00175   void sendLoadReportToLowerLevelCSEGServer(boost::shared_ptr<tcp::socket> socket, ServerID,
00176                                               const BoundingBox3f& boundingBox,
00177                                               Sirikata::Protocol::CSeg::LoadReportMessage* message);
00178     void callLowerLevelCSEGServersForLookupBoundingBoxes(boost::shared_ptr<tcp::socket> socket,
00179                                                          const BoundingBox3f& bbox,
00180                                                          const std::map<ServerID, std::vector<SegmentedRegion*> >&,
00181                                                          std::vector<ServerID>& );
00182 
00183 
00184     /* Functions run in separate threads to listen for incoming packets */
00185     void ioServicingLoop();
00186     void llIOServicingLoop();
00187 
00188     /* Functions to send buffers to all CSEG or all space servers  */
00189     void sendToAllCSEGServers( Sirikata::Protocol::CSeg::CSegMessage&  );
00190     void sendOnAllSockets(Sirikata::Protocol::CSeg::CSegMessage csegMessage, std::map< ServerID, SocketContainer > socketList);
00191 
00192 
00193     void sendToAllSpaceServers( Sirikata::Protocol::CSeg::CSegMessage& );
00194 
00195     uint32 getAvailableServerIndex();
00196 
00197     void getRandomLeafParentSibling(SegmentedRegion** randomLeaf,
00198                     SegmentedRegion** sibling,
00199                     SegmentedRegion** parent);
00200 
00201 
00202 
00203 
00204     /* Functions to read from a socket.  */
00205 
00206     void asyncRead(boost::shared_ptr<tcp::socket> socket,
00207            uint8* asyncBufferArray,
00208            const boost::system::error_code& ec,
00209            std::size_t bytes_transferred);
00210      void asyncLLRead(boost::shared_ptr<tcp::socket> socket,
00211               uint8* asyncBufferArray,
00212               const boost::system::error_code& ec,
00213               std::size_t bytes_transferred);
00214 
00215 
00216     void writeCSEGMessage(boost::shared_ptr<tcp::socket> socket, Sirikata::Protocol::CSeg::CSegMessage& csegMessage);
00217 
00218     void readCSEGMessage(boost::shared_ptr<tcp::socket> socket,
00219                           Sirikata::Protocol::CSeg::CSegMessage& csegMessage,
00220                           uint8* bufferSoFar,
00221                           uint32 bufferSoFarSize
00222                          );
00223 
00224     void readCSEGMessage(boost::shared_ptr<tcp::socket> socket,
00225                          Sirikata::Protocol::CSeg::CSegMessage& csegMessage);
00226 
00227     /* Functions to serialize the whole CSEG tree. */
00228     void serializeBSPTree(SerializedBSPTree* serializedBSPTree);
00229     void traverseAndStoreTree(SegmentedRegion* region, uint32& idx,
00230                   SerializedBSPTree* serializedTree);
00231 
00232 
00233 
00234     ServerIDMap *  mSidMap;
00235 
00236 
00237     /* Key value maps for fast lookup of bounding boxes managed by space servers. */
00238     std::map<ServerID, BoundingBoxList > mWholeTreeServerRegionMap;
00239     std::map<ServerID, BoundingBoxList > mLowerTreeServerRegionMap;
00240 
00241     boost::shared_mutex mCSEGReadWriteMutex;
00242 
00243     boost::shared_mutex mSocketsToCSEGServersMutex;
00244     std::map<ServerID, SocketQueuePtr > mLeasedSocketsToCSEGServers;
00245 
00246     friend class LoadBalancer;
00247 
00248 
00249     SocketContainer getSocketToCSEGServer(ServerID server_id);
00250 
00251 
00252   void doSocketCreation(ServerID server_id,
00253                         std::vector<ServerID> server_id_List,
00254                         std::map<ServerID, SocketContainer>* socketMapPtr,
00255                         ResponseCompletionFunction func,
00256                         ServerID resolved_id,
00257                         Address4 addy
00258                         );
00259 
00260   void createSocketContainers(std::vector<ServerID> server_id_List,
00261                               std::map<ServerID, SocketContainer>* socketMap,
00262                               ResponseCompletionFunction func
00263                               );
00264 
00265   bool handleLookup(Vector3f vector,
00266                     boost::shared_ptr<tcp::socket> socket);
00267 
00268   void lookupOnSocket(boost::shared_ptr<tcp::socket> clientSocket, const Vector3f searchVec,
00269                                                        const BoundingBox3f boundingBox, std::map<ServerID, SocketContainer> socketList);
00270 
00271 
00272   void writeLookupResponse(boost::shared_ptr<tcp::socket> socket, BoundingBox3f& bbox, ServerID sid) ;
00273 
00274 
00275 
00276 
00277   void requestServerRegionsOnSockets(boost::shared_ptr<tcp::socket> socket, ServerID server_id,
00278                                      BoundingBoxList bbList,
00279                                      std::map<ServerID, SocketContainer> socketList);
00280 
00281   void writeServerRegionResponse(boost::shared_ptr<tcp::socket> socket,
00282                                  BoundingBoxList boundingBoxlist);
00283 
00284   bool handleLookupBBox(const BoundingBox3f& bbox, boost::shared_ptr<tcp::socket> socket);
00285 
00286   void lookupBBoxOnSocket(boost::shared_ptr<tcp::socket> clientSocket,
00287                const BoundingBox3f boundingBox, std::vector<ServerID> server_ids,
00288                std::map<ServerID, std::vector<SegmentedRegion*> > otherCSEGServers,
00289                std::map<ServerID, SocketContainer> socketList);
00290 
00291   void writeLookupBBoxResponse(boost::shared_ptr<tcp::socket> clientSocket, std::vector<ServerID>) ;
00292 
00293   void sendLoadReportOnSocket(boost::shared_ptr<tcp::socket> clientSocket, BoundingBox3f boundingBox, Sirikata::Protocol::CSeg::LoadReportMessage message, std::map< ServerID, SocketContainer > socketList);
00294 
00295 
00296 }; // class CoordinateSegmentation
00297 
00298 } // namespace Sirikata
00299 
00300 #endif