Sirikata
libcore/include/sirikata/core/ohdp/SST.hpp
Go to the documentation of this file.
00001 // Copyright (c) 2011 Sirikata Authors. All rights reserved.
00002 // Use of this source code is governed by a BSD-style license that can
00003 // be found in the LICENSE file.
00004 
00005 #ifndef _SIRIKATA_LIBCORE_OHDP_SST_HPP_
00006 #define _SIRIKATA_LIBCORE_OHDP_SST_HPP_
00007 
00008 #include <sirikata/core/network/SSTImpl.hpp>
00009 #include <sirikata/core/ohdp/Service.hpp>
00010 
00011 namespace Sirikata {
00012 
00013 // Convenience typedefs in a separate namespace
00014 namespace OHDPSST {
00015 typedef Sirikata::SST::EndPoint<OHDP::SpaceNodeID> Endpoint;
00016 typedef Sirikata::SST::BaseDatagramLayer<OHDP::SpaceNodeID> BaseDatagramLayer;
00017 typedef Sirikata::SST::Connection<OHDP::SpaceNodeID> Connection;
00018 typedef Sirikata::SST::Stream<OHDP::SpaceNodeID> Stream;
00019 typedef Sirikata::SST::ConnectionManager<OHDP::SpaceNodeID> ConnectionManager;
00020 } // namespace OHDPSST
00021 
00022 // OHDP::SpaceNodeID/OHDP-specific implementation
00023 namespace SST {
00024 
00025 template <>
00026 class SIRIKATA_EXPORT BaseDatagramLayer<OHDP::SpaceNodeID>
00027 {
00028   private:
00029     typedef OHDP::SpaceNodeID EndPointType;
00030 
00031   public:
00032     typedef std::tr1::shared_ptr<BaseDatagramLayer<EndPointType> > Ptr;
00033     typedef Ptr BaseDatagramLayerPtr;
00034 
00035     typedef std::tr1::function<void(void*, int)> DataCallback;
00036 
00037     static BaseDatagramLayerPtr getDatagramLayer(ConnectionVariables<EndPointType>* sstConnVars,
00038                                                  EndPointType endPoint)
00039     {
00040         return sstConnVars->getDatagramLayer(endPoint);
00041     }
00042 
00043     static BaseDatagramLayerPtr createDatagramLayer(
00044         ConnectionVariables<EndPointType>* sstConnVars,
00045         EndPointType endPoint,
00046         const Context* ctx,
00047         OHDP::Service* ohdp)
00048     {
00049         BaseDatagramLayerPtr datagramLayer = getDatagramLayer(sstConnVars, endPoint);
00050         if (datagramLayer) return datagramLayer;
00051 
00052         datagramLayer = BaseDatagramLayerPtr(
00053             new BaseDatagramLayer(sstConnVars, ctx, ohdp, endPoint)
00054         );
00055         sstConnVars->addDatagramLayer(endPoint, datagramLayer);
00056 
00057         return datagramLayer;
00058     }
00059 
00060     static void stopListening(ConnectionVariables<EndPointType>* sstConnVars, EndPoint<EndPointType>& listeningEndPoint) {
00061         EndPointType endPointID = listeningEndPoint.endPoint;
00062 
00063         BaseDatagramLayerPtr bdl = sstConnVars->getDatagramLayer(endPointID);
00064         if (!bdl) return;
00065         sstConnVars->removeDatagramLayer(endPointID, true);
00066         bdl->unlisten(listeningEndPoint);
00067     }
00068 
00069     void listenOn(EndPoint<EndPointType>& listeningEndPoint, DataCallback cb) {
00070       OHDP::Port* port = allocatePort(listeningEndPoint);
00071         port->receive(
00072             std::tr1::bind(&BaseDatagramLayer<EndPointType>::receiveMessageToCallback, this,
00073                 std::tr1::placeholders::_1,
00074                 std::tr1::placeholders::_2,
00075                 std::tr1::placeholders::_3,
00076                 cb
00077             )
00078         );
00079     }
00080 
00081     void listenOn(const EndPoint<EndPointType>& listeningEndPoint) {
00082         OHDP::Port* port = allocatePort(listeningEndPoint);
00083         port->receive(
00084             std::tr1::bind(
00085                 &BaseDatagramLayer::receiveMessage, this,
00086                 std::tr1::placeholders::_1,
00087                 std::tr1::placeholders::_2,
00088                 std::tr1::placeholders::_3
00089             )
00090         );
00091     }
00092 
00093     void unlisten(EndPoint<EndPointType>& ep) {
00094         // To stop listening, just destroy the corresponding port
00095         PortMap::iterator it = mAllocatedPorts.find(ep);
00096         if (it == mAllocatedPorts.end()) return;
00097         delete it->second;
00098         mAllocatedPorts.erase(it);
00099     }
00100 
00101     void send(EndPoint<EndPointType>* src, EndPoint<EndPointType>* dest, void* data, int len) {
00102         boost::mutex::scoped_lock lock(mMutex);
00103 
00104         OHDP::Port* port = getOrAllocatePort(*src);
00105 
00106         port->send(
00107             OHDP::Endpoint(dest->endPoint, dest->port),
00108             MemoryReference(data, len)
00109         );
00110     }
00111 
00112     const Context* context() {
00113         return mContext;
00114     }
00115 
00116     uint32 getUnusedPort(const EndPointType& ep) {
00117         return mOHDP->unusedOHDPPort(ep.space(), ep.node());
00118     }
00119 
00120     void invalidate() {
00121         mOHDP = NULL;
00122         mSSTConnVars->removeDatagramLayer(mEndpoint, true);
00123     }
00124 
00125   private:
00126     BaseDatagramLayer(ConnectionVariables<EndPointType>* sstConnVars, const Context* ctx, OHDP::Service* ohdpservice, const EndPointType&ep)
00127         : mContext(ctx),
00128           mOHDP(ohdpservice),
00129           mSSTConnVars(sstConnVars),
00130           mEndpoint(ep)
00131         {
00132 
00133         }
00134 
00135     OHDP::Port* allocatePort(const EndPoint<EndPointType>& ep) {
00136         OHDP::Port* port = mOHDP->bindOHDPPort(
00137             ep.endPoint.space(), ep.endPoint.node(), ep.port
00138         );
00139         mAllocatedPorts[ep] = port;
00140         return port;
00141     }
00142 
00143     OHDP::Port* getPort(const EndPoint<EndPointType>& ep) {
00144         PortMap::iterator it = mAllocatedPorts.find(ep);
00145         if (it == mAllocatedPorts.end()) return NULL;
00146         return it->second;
00147     }
00148 
00149     OHDP::Port* getOrAllocatePort(const EndPoint<EndPointType>& ep) {
00150         OHDP::Port* result = getPort(ep);
00151         if (result != NULL) return result;
00152         result = allocatePort(ep);
00153         return result;
00154     }
00155 
00156     void receiveMessage(const OHDP::Endpoint &src, const OHDP::Endpoint &dst, MemoryReference payload) {
00157         Connection<EndPointType>::handleReceive(
00158             mSSTConnVars,
00159             EndPoint<EndPointType> (OHDP::SpaceNodeID(src.space(), src.node()), src.port()),
00160             EndPoint<EndPointType> (OHDP::SpaceNodeID(dst.space(), dst.node()), dst.port()),
00161             (void*) payload.data(), payload.size()
00162         );
00163     }
00164 
00165     void receiveMessageToCallback(const OHDP::Endpoint &src, const OHDP::Endpoint &dst, MemoryReference payload, DataCallback cb) {
00166       cb((void*) payload.data(), payload.size() );
00167     }
00168 
00169 
00170 
00171     const Context* mContext;
00172     OHDP::Service* mOHDP;
00173 
00174     typedef std::map<EndPoint<EndPointType>, OHDP::Port*> PortMap;
00175     PortMap mAllocatedPorts;
00176 
00177     boost::mutex mMutex;
00178 
00179     ConnectionVariables<EndPointType>* mSSTConnVars;
00180     EndPointType mEndpoint;
00181 };
00182 
00183 #if SIRIKATA_PLATFORM == SIRIKATA_PLATFORM_WINDOWS
00184   // These exports keep Windows happy by forcing the export of these
00185   // types. BaseDatagramLayer is now excluded because it is explicitly
00186   // specialized, which, for some reason, keeps things working
00187   // properly.
00188   //SIRIKATA_EXPORT_TEMPLATE template class SIRIKATA_EXPORT BaseDatagramLayer<OHDP::SpaceNodeID>;
00189   SIRIKATA_EXPORT_TEMPLATE template class SIRIKATA_EXPORT Connection<OHDP::SpaceNodeID>;
00190   SIRIKATA_EXPORT_TEMPLATE template class SIRIKATA_EXPORT Stream<OHDP::SpaceNodeID>;
00191 #endif
00192 
00193 } // namespace SST
00194 
00195 } // namespace Sirikata
00196 
00197 #endif //_SIRIKATA_LIBCORE_OHDP_SST_HPP_