Sirikata
liboh/plugins/manual_query/ManualObjectQueryProcessor.hpp
Go to the documentation of this file.
00001 // Copyright (c) 2011 Sirikata Authors. All rights reserved.
00002 // Use of this source code is governed by a BSD-style license that can
00003 // be found in the LICENSE file.
00004 
00005 #ifndef _SIRIKATA_OH_MANUAL_OBJECT_QUERY_PROCESSOR_HPP_
00006 #define _SIRIKATA_OH_MANUAL_OBJECT_QUERY_PROCESSOR_HPP_
00007 
00008 #include <sirikata/oh/ObjectQueryProcessor.hpp>
00009 #include <sirikata/oh/ObjectNodeSession.hpp>
00010 
00011 #include <sirikata/pintoloc/ReplicatedLocationServiceCache.hpp>
00012 #include "ServerQueryHandler.hpp"
00013 #include "ObjectQueryHandler.hpp"
00014 
00015 namespace Sirikata {
00016 namespace OH {
00017 namespace Manual {
00018 
00024 class ManualObjectQueryProcessor :
00025         public ObjectQueryProcessor,
00026         public ObjectNodeSessionListener
00027 {
00028 public:
00029     static ManualObjectQueryProcessor* create(ObjectHostContext* ctx, const String& args);
00030 
00031     ManualObjectQueryProcessor(ObjectHostContext* ctx);
00032     virtual ~ManualObjectQueryProcessor();
00033 
00034     virtual void start();
00035     virtual void stop();
00036 
00037     // ObjectNodeSessionListener Interface
00038     virtual void onObjectNodeSession(const SpaceID& space, const ObjectReference& oref, const OHDP::NodeID& id);
00039 
00040 
00041     // ObjectQueryProcessor Overrides
00042     virtual void presenceConnected(HostedObjectPtr ho, const SpaceObjectReference& sporef);
00043     virtual void presenceConnectedStream(HostedObjectPtr ho, const SpaceObjectReference& sporef, SSTStreamPtr strm);
00044     virtual void presenceDisconnected(HostedObjectPtr ho, const SpaceObjectReference& sporef);
00045     virtual String connectRequest(HostedObjectPtr ho, const SpaceObjectReference& sporef, const String& query);
00046     virtual void updateQuery(HostedObjectPtr ho, const SpaceObjectReference& sporef, const String& new_query);
00047 
00048 
00049     // ServerQueryHandler callbacks - Handle new/deleted queries
00050     // Notification when a new server query is setup. This occurs whether or not
00051     // a query is registered -- it just indicates that there's a connection that
00052     // we might care about and prepares us to setup local query
00053     // processors using ReplicatedLocationServiceCachePtrs provided by
00054     // callbacks from the ServerQueryHandler.
00055     void createdServerQuery(const OHDP::SpaceNodeID& snid);
00056     void removedServerQuery(const OHDP::SpaceNodeID& snid);
00057     // And these track when an index(tree) is replicated to the node -- these
00058     // calls will occur between createdServerQuery and
00059     // removedServerQuery. Creation calls are accompanied by a
00060     // LocationServiceCache which will hold replicated object data and info
00061     // about the replicated tree.
00062     void createdReplicatedIndex(const OHDP::SpaceNodeID& snid, ProxIndexID iid, ReplicatedLocationServiceCachePtr loc_cache, ServerID objects_from_server, bool dynamic_objects);
00063     void removedReplicatedIndex(const OHDP::SpaceNodeID& snid, ProxIndexID iid);
00064 
00065     // ObjectQueryHandler callbacks - handle notifications about local queries
00066     // in the tree so we know how to move the cut on the space server up or down.
00067     void queriersAreObserving(const OHDP::SpaceNodeID& snid, const ProxIndexID indexid, const ObjectReference& objid);
00068     void queriersStoppedObserving(const OHDP::SpaceNodeID& snid, const ProxIndexID indexid, const ObjectReference& objid);
00069     void replicatedNodeRemoved(const OHDP::SpaceNodeID& snid, ProxIndexID indexid, const ObjectReference& objid);
00070 
00071     // ObjectQueryProcessor callbacks - Handle results coming back for queries
00072     void deliverProximityResult(const SpaceObjectReference& sporef, const Sirikata::Protocol::Prox::ProximityUpdate& update);
00073     void deliverLocationResult(const SpaceObjectReference& sporef, const LocUpdate& lu);
00074 private:
00075     typedef std::tr1::shared_ptr<ObjectQueryHandler> ObjectQueryHandlerPtr;
00076 
00077     // Helper that actually registers a query with the underlying query
00078     // processor. Factored out since we may need to defer registration until
00079     // after connection completes, so we can hit this from multiple code paths.
00080     void registerOrUpdateObjectQuery(const SpaceObjectReference& sporef);
00081     void unregisterObjectQuery(const SpaceObjectReference& sporef);
00082 
00083 
00084     virtual void commandProperties(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00085     virtual void commandListHandlers(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00086     ObjectQueryHandlerPtr lookupCommandHandler(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid) const;
00087     virtual void commandListNodes(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00088     virtual void commandListQueriers(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00089     virtual void commandForceRebuild(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00090     virtual void commandStats(const Command::Command& cmd, Command::Commander* cmdr, Command::CommandID cmdid);
00091 
00092 
00093     ObjectHostContext* mContext;
00094     Network::IOStrandPtr mStrand;
00095 
00096     // Queries registered by objects to be resolved locally and
00097     // connection state
00098     struct ObjectState {
00099         ObjectState()
00100          : who(),
00101            node(OHDP::NodeID::null()),
00102            query(),
00103            registered(false)
00104         {}
00105 
00106         // Checks if it is safe to destroy this ObjectState
00107         bool canRemove() const {
00108             return (node == OHDP::NodeID::null());
00109         }
00110 
00111         // Checks whether registration is needed, i.e. we have a query and
00112         // haven't registered it yet
00113         bool needsRegistration() const {
00114             return (!registered && !query.empty());
00115         }
00116 
00117         // Checks whether registration is currently possible, i.e. we're
00118         // actually connected.
00119         bool canRegister() const {
00120             return (node != OHDP::NodeID::null() && !query.empty());
00121         }
00122 
00123 
00124         HostedObjectWPtr who;
00125         OHDP::NodeID node;
00126         String query;
00127         // Whether we've registered this query with the underlying query
00128         // processor. This requires the query processor and a fully connected
00129         // object (including connected space SST stream).
00130         bool registered;
00131     };
00132     // This stores state about requests from objects we are hosting
00133     typedef std::tr1::unordered_map<SpaceObjectReference, ObjectState, SpaceObjectReference::Hasher> ObjectStateMap;
00134     ObjectStateMap mObjectState;
00135 
00136     // This actually performs the queries from each object
00137     typedef std::tr1::unordered_map<OHDP::SpaceNodeID, ObjectQueryHandlerPtr, OHDP::SpaceNodeID::Hasher> QueryHandlerMap;
00138     QueryHandlerMap mObjectQueryHandlers;
00139 
00140     // And this performs a single query against each server, getting results
00141     // locally that allow the above to process local queries.
00142     ServerQueryHandler mServerQueryHandler;
00143 
00144 
00145 };
00146 
00147 } // namespace Manual
00148 } // namespace OH
00149 } // namespace Sirikata
00150 
00151 #endif //_SIRIKATA_OH_MANUAL_OBJECT_QUERY_PROCESSOR_HPP_