Sirikata
|
00001 // Copyright (c) 2011 Sirikata Authors. All rights reserved. 00002 // Use of this source code is governed by a BSD-style license that can 00003 // be found in the LICENSE file. 00004 00005 #ifndef _SIRIKATA_OH_MANUAL_OBJECT_QUERY_PROCESSOR_HPP_ 00006 #define _SIRIKATA_OH_MANUAL_OBJECT_QUERY_PROCESSOR_HPP_ 00007 00008 #include <sirikata/oh/ObjectQueryProcessor.hpp> 00009 #include <sirikata/oh/ObjectNodeSession.hpp> 00010 00011 #include <sirikata/pintoloc/ReplicatedLocationServiceCache.hpp> 00012 #include "ServerQueryHandler.hpp" 00013 #include "ObjectQueryHandler.hpp" 00014 00015 namespace Sirikata { 00016 namespace OH { 00017 namespace Manual { 00018 00024 class ManualObjectQueryProcessor : 00025 public ObjectQueryProcessor, 00026 public ObjectNodeSessionListener 00027 { 00028 public: 00029 static ManualObjectQueryProcessor* create(ObjectHostContext* ctx, const String& args); 00030 00031 ManualObjectQueryProcessor(ObjectHostContext* ctx); 00032 virtual ~ManualObjectQueryProcessor(); 00033 00034 virtual void start(); 00035 virtual void stop(); 00036 00037 // ObjectNodeSessionListener Interface 00038 virtual void onObjectNodeSession(const SpaceID& space, const ObjectReference& oref, const OHDP::NodeID& id); 00039 00040 00041 // ObjectQueryProcessor Overrides 00042 virtual void presenceConnected(HostedObjectPtr ho, const SpaceObjectReference& sporef); 00043 virtual void presenceConnectedStream(HostedObjectPtr ho, const SpaceObjectReference& sporef, SSTStreamPtr strm); 00044 virtual void presenceDisconnected(HostedObjectPtr ho, const SpaceObjectReference& sporef); 00045 virtual String connectRequest(HostedObjectPtr ho, const SpaceObjectReference& sporef, const String& query); 00046 virtual void updateQuery(HostedObjectPtr ho, const SpaceObjectReference& sporef, const String& new_query); 00047 00048 00049 // ServerQueryHandler callbacks - Handle new/deleted queries 00050 // Notification when a new server query is setup. This occurs whether or not 00051 // a query is registered -- it just indicates that there's a connection that 00052 // we might care about and prepares us to setup local query 00053 // processors using ReplicatedLocationServiceCachePtrs provided by 00054 // callbacks from the ServerQueryHandler. 00055 void createdServerQuery(const OHDP::SpaceNodeID& snid); 00056 void removedServerQuery(const OHDP::SpaceNodeID& snid); 00057 // And these track when an index(tree) is replicated to the node -- these 00058 // calls will occur between createdServerQuery and 00059 // removedServerQuery. Creation calls are accompanied by a 00060 // LocationServiceCache which will hold replicated object data and info 00061 // about the replicated tree. 00062 void createdReplicatedIndex(const OHDP::SpaceNodeID& snid, ProxIndexID iid, ReplicatedLocationServiceCachePtr loc_cache, ServerID objects_from_server, bool dynamic_objects); 00063 void removedReplicatedIndex(const OHDP::SpaceNodeID& snid, ProxIndexID iid); 00064 00065 // ObjectQueryHandler callbacks - handle notifications about local queries 00066 // in the tree so we know how to move the cut on the space server up or down. 00067 void queriersAreObserving(const OHDP::SpaceNodeID& snid, const ProxIndexID indexid, const ObjectReference& objid); 00068 void queriersStoppedObserving(const OHDP::SpaceNodeID& snid, const ProxIndexID indexid, const ObjectReference& objid); 00069 void replicatedNodeRemoved(const OHDP::SpaceNodeID& snid, ProxIndexID indexid, const ObjectReference& objid); 00070 00071 // ObjectQueryProcessor callbacks - Handle results coming back for queries 00072 void deliverProximityResult(const SpaceObjectReference& sporef, const Sirikata::Protocol::Prox::ProximityUpdate& update); 00073 void deliverLocationResult(const SpaceObjectReference& sporef, const LocUpdate& lu); 00074 private: 00075 typedef std::tr1::shared_ptr<ObjectQueryHandler> ObjectQueryHandlerPtr; 00076 00077 // Helper that actually registers a query with the underlying query 00078 // processor. Factored out since we may need to defer registration until 00079 // after connection completes, so we can hit this from multiple code paths. 00080 void registerOrUpdateObjectQuery(const SpaceObjectReference& sporef); 00081 void unregisterObjectQuery(const SpaceObjectReference& sporef); 00082 00083 00084 virtual void commandProperties(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00085 virtual void commandListHandlers(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00086 ObjectQueryHandlerPtr lookupCommandHandler(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid) const; 00087 virtual void commandListNodes(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00088 virtual void commandListQueriers(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00089 virtual void commandForceRebuild(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00090 virtual void commandStats(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid); 00091 00092 00093 ObjectHostContext* mContext; 00094 Network::IOStrandPtr mStrand; 00095 00096 // Queries registered by objects to be resolved locally and 00097 // connection state 00098 struct ObjectState { 00099 ObjectState() 00100 : who(), 00101 node(OHDP::NodeID::null()), 00102 query(), 00103 registered(false) 00104 {} 00105 00106 // Checks if it is safe to destroy this ObjectState 00107 bool canRemove() const { 00108 return (node == OHDP::NodeID::null()); 00109 } 00110 00111 // Checks whether registration is needed, i.e. we have a query and 00112 // haven't registered it yet 00113 bool needsRegistration() const { 00114 return (!registered && !query.empty()); 00115 } 00116 00117 // Checks whether registration is currently possible, i.e. we're 00118 // actually connected. 00119 bool canRegister() const { 00120 return (node != OHDP::NodeID::null() && !query.empty()); 00121 } 00122 00123 00124 HostedObjectWPtr who; 00125 OHDP::NodeID node; 00126 String query; 00127 // Whether we've registered this query with the underlying query 00128 // processor. This requires the query processor and a fully connected 00129 // object (including connected space SST stream). 00130 bool registered; 00131 }; 00132 // This stores state about requests from objects we are hosting 00133 typedef std::tr1::unordered_map<SpaceObjectReference, ObjectState, SpaceObjectReference::Hasher> ObjectStateMap; 00134 ObjectStateMap mObjectState; 00135 00136 // This actually performs the queries from each object 00137 typedef std::tr1::unordered_map<OHDP::SpaceNodeID, ObjectQueryHandlerPtr, OHDP::SpaceNodeID::Hasher> QueryHandlerMap; 00138 QueryHandlerMap mObjectQueryHandlers; 00139 00140 // And this performs a single query against each server, getting results 00141 // locally that allow the above to process local queries. 00142 ServerQueryHandler mServerQueryHandler; 00143 00144 00145 }; 00146 00147 } // namespace Manual 00148 } // namespace OH 00149 } // namespace Sirikata 00150 00151 #endif //_SIRIKATA_OH_MANUAL_OBJECT_QUERY_PROCESSOR_HPP_