Sirikata
|
00001 /* Sirikata 00002 * TCPSpaceNetwork.hpp.hpp 00003 * 00004 * Copyright (c) 2010, Daniel Reiter Horn 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef _SIRIKATA_TCP_SPACE_NETWORK_HPP_ 00034 #define _SIRIKATA_TCP_SPACE_NETWORK_HPP_ 00035 00036 #include <sirikata/space/SpaceNetwork.hpp> 00037 #include <sirikata/core/network/Address4.hpp> 00038 #include <sirikata/core/network/Stream.hpp> 00039 #include <sirikata/core/network/StreamListener.hpp> 00040 #include <sirikata/core/util/PluginManager.hpp> 00041 #include <sirikata/core/queue/SizedThreadSafeQueue.hpp> 00042 #include <sirikata/core/queue/CountResourceMonitor.hpp> 00043 00044 namespace Sirikata { 00045 00046 class TCPSpaceNetwork : public SpaceNetwork { 00047 // Data associated with a stream. Note that this stream is 00048 // usually, but not always, unique to the endpoint pair. Due to 00049 // the possibility of both sides initiating a connection at the 00050 // same time we may have more than one of these per remote 00051 // endpoint, so we maintain a bit more bookkeeping information to 00052 // manage that possibility. 00053 struct RemoteStream { 00054 enum Initiator { 00055 Us, Them 00056 }; 00057 00058 // parent is used to set the buffer size, stream is the 00059 // underlying stream 00060 RemoteStream(TCPSpaceNetwork* parent, Sirikata::Network::Stream*strm, ServerID remote_id, Address4 remote_net, Initiator init); 00061 00062 ~RemoteStream(); 00063 00064 bool push(Chunk& data, bool* was_empty); 00065 Chunk* pop(Network::IOStrand* ios); 00066 00067 Sirikata::Network::Stream* stream; 00068 00069 Address4 network_endpoint; // The remote endpoint we're talking to. 00070 ServerID logical_endpoint; // The remote endpoint we're 00071 // talking to. 00072 00073 Initiator initiator; 00074 00075 bool connected; // Indicates if the initial header specifying 00076 // the remote endpoint has been sent or 00077 // received (depending on which side initiated 00078 // the connection). If this is true then this 00079 // connection should be in the map of 00080 // connections and it should be safe to send 00081 // data on it. 00082 bool shutting_down; // Inidicates if this connection is 00083 // currently being shutdown. Will be true 00084 // if another stream to the same endpoint 00085 // was preferred over this one. 00086 typedef Sirikata::SizedThreadSafeQueue<Chunk*,CountResourceMonitor> SizedChunkReceiveQueue; 00087 SizedChunkReceiveQueue receive_queue; // Note: This can't be a single 00088 // front item or the receive queue 00089 // empties it too quickly and 00090 // loses bandwidth. 00091 bool paused; // Indicates if receiving is currently paused for 00092 // this stream. If true, the stream must be 00093 // unpaused the next time someone calls 00094 // receiveOne 00095 00096 boost::mutex mPushPopMutex; 00097 }; 00098 00099 typedef std::tr1::shared_ptr<RemoteStream> RemoteStreamPtr; 00100 typedef std::tr1::weak_ptr<RemoteStream> RemoteStreamWPtr; 00101 typedef std::tr1::unordered_map<ServerID, RemoteStreamPtr> RemoteStreamMap; 00102 typedef std::tr1::unordered_map<Address4, RemoteStreamPtr, Address4::Hasher> RemoteNetStreamMap; 00103 00104 00117 struct RemoteSession { 00118 RemoteSession(ServerID sid); 00119 ~RemoteSession(); 00120 00121 // Endpoint for this session, guaranteed not to change. 00122 ServerID logical_endpoint; 00123 00124 // Streams for this session. 00125 RemoteStreamPtr remote_stream; // The stream we're normally trying to 00126 // receive from 00127 RemoteStreamPtr closing_stream; // A stream we're trying to shut down 00128 RemoteStreamPtr pending_out; // Remote stream that we've opened and 00129 // called connect on, but which isn't 00130 // ready yet 00131 }; 00132 typedef std::tr1::shared_ptr<RemoteSession> RemoteSessionPtr; 00133 typedef std::tr1::weak_ptr<RemoteSession> RemoteSessionWPtr; 00134 00135 class TCPSendStream : public SpaceNetwork::SendStream { 00136 public: 00137 TCPSendStream(ServerID sid, RemoteSessionPtr s); 00138 ~TCPSendStream(); 00139 00140 virtual ServerID id() const; 00141 virtual bool send(const Chunk&); 00142 00143 private: 00144 ServerID logical_endpoint; 00145 RemoteSessionPtr session; 00146 }; 00147 typedef std::tr1::unordered_map<ServerID, TCPSendStream*> SendStreamMap; 00148 00149 class TCPReceiveStream : public SpaceNetwork::ReceiveStream { 00150 public: 00151 TCPReceiveStream(ServerID sid, RemoteSessionPtr s, Network::IOStrand* _ios); 00152 ~TCPReceiveStream(); 00153 virtual ServerID id() const; 00154 virtual Chunk* front(); 00155 virtual Chunk* pop(); 00156 00157 private: 00158 // Get the current queue for receiving data from the address. 00159 // This considers any closing streams first, then the main stream 00160 // in order to handle any data arriving on closing streams as 00161 // quickly as possible. It takes into account whether any data is 00162 // available on the stream as well, so a closing stream with no 00163 // data available will *not* be returned when an active stream 00164 // with data available also exists. 00165 // NOTE: Now this just ensures that front_stream will either 00166 // point to the right thing or to NULL, so there is no return value. 00167 void getCurrentRemoteStream(); 00168 00169 bool canReadFrom(RemoteStreamPtr& strm); 00170 00171 ServerID logical_endpoint; 00172 RemoteSessionPtr session; 00173 RemoteStreamPtr front_stream; // Stream which we already got a front() 00174 // item from 00175 Chunk* front_elem; // The front item, left out here to make it 00176 // accessible since the RemoteStream doesn't give 00177 // easy access 00178 Network::IOStrand* ios; 00179 }; 00180 typedef std::tr1::unordered_map<ServerID, TCPReceiveStream*> ReceiveStreamMap; 00181 00182 // Simple storage for the session, send stream, and receive stream 00183 // associated with a single endpoint. There should only ever be *one* of 00184 // these per remote endpoint. 00185 struct RemoteData { 00186 RemoteData(RemoteSessionPtr s) 00187 :session(s), 00188 send(NULL), 00189 receive(NULL) 00190 {} 00191 00192 ~RemoteData() { 00193 delete send; 00194 delete receive; 00195 session.reset(); 00196 } 00197 00198 RemoteSessionPtr session; 00199 TCPSendStream* send; 00200 TCPReceiveStream* receive; 00201 }; 00202 00203 // This is the one chunk of truly thread-safe data, followed by a small 00204 // number of truly thread-safe methods to manipulate it. These control the 00205 // central repository of RemoteSessionPtrs, ReceiveStreams, and SendStreams, 00206 // each unique per ServerID. This map is really only used to retrieve or 00207 // initialize each, after which all operations are directly on those objects 00208 // instead of through the map. It is also locked on new connections to 00209 // make lookups so tradeoffs between underlying connections are possible. 00210 typedef std::tr1::unordered_map<ServerID, RemoteData*> RemoteDataMap; 00211 RemoteDataMap mRemoteData; 00212 boost::recursive_mutex mRemoteDataMutex; 00213 RemoteData* getRemoteData(ServerID sid); 00214 RemoteSessionPtr getRemoteSession(ServerID sid); 00215 TCPSendStream* getNewSendStream(ServerID sid); 00216 TCPReceiveStream* getNewReceiveStream(ServerID sid); 00217 // Get a new uninitialized outgoing stream that connect can be called on. 00218 // Will return NULL if another outgoing stream has already been requested, 00219 // guaranteeing that multiple outgoing stream connections won't be 00220 // attempted. 00221 RemoteStreamPtr getNewOutgoingStream(ServerID sid, Address4 remote_net, RemoteStream::Initiator init); 00222 RemoteStreamPtr getNewIncomingStream(Address4 remote_net, RemoteStream::Initiator init, Sirikata::Network::Stream* strm); 00223 typedef std::set<Sirikata::Network::IOTimerPtr> TimerSet; 00224 00225 // All the real data is handled in the main thread. 00226 Sirikata::PluginManager mPluginManager; 00227 String mStreamPlugin; 00228 Sirikata::OptionSet* mListenOptions; 00229 Sirikata::OptionSet* mSendOptions; 00230 00231 Network::IOStrand *mIOStrand; 00232 Network::IOWork* mIOWork; 00233 00234 RemoteStreamMap mClosingStreams; 00235 TimerSet mClosingStreamTimers; // Timers for streams that are still closing. 00236 00237 void finishListen(ServerID resolved_sid, Address4 addr, ReceiveListener* receive_listener); 00238 00239 // Open a new connection. Should be called when an existing connection 00240 // isn't available. 00241 TCPSendStream* openConnection(Network::IOStrand* strand, const ServerID& dest); 00242 // Finish opening the connection 00243 void finishOpenConnection(const ServerID& dest, ServerID resolved_dest, Address4 addr); 00244 00245 // Add stream to system, possibly resolving conflicting sets of 00246 // streams. Return corresponding TCPReceiveStream* 00247 TCPReceiveStream* handleConnectedStream(RemoteStreamPtr source_stream); 00248 // Mark a send stream as disconnected 00249 void handleDisconnectedStream(const RemoteStreamPtr& wstream); 00250 00251 00252 00253 Address4 mListenAddress; 00254 00255 Sirikata::Network::StreamListener *mListener; 00256 00257 SendListener* mSendListener; // Listener for our send events 00258 ReceiveListener* mReceiveListener; // Listener for our receive events 00259 00260 // Main Thread/Strand Methods, allowed to access all the core data structures. These are mainly utility methods 00261 // posted by the IO thread. 00262 00263 // Sends the introduction of this server to the remote server. Should be 00264 // used for connections allocated when trying to send. 00265 void sendServerIntro(const RemoteStreamPtr& out_stream); 00266 00267 // Handles timeouts for closing streams -- forcibly closes them, removes 00268 // them from the closing set. 00269 void handleClosingStreamTimeout(Sirikata::Network::IOTimerPtr timer, RemoteStreamPtr& wstream); 00270 00271 // IO Thread/Strand. They must be certain not to call any main thread methods or access any main thread data. 00272 // These are all callbacks from the network, so mostly they should 00273 // just be posting the results to the main thread. 00274 00275 RemoteNetStreamMap mPendingStreams; // Streams initiated by the remote 00276 // endpoint that are waiting for the 00277 // initial message specifying the remote 00278 // endpoint ID 00279 00280 void newStreamCallback(Sirikata::Network::Stream* strm, Sirikata::Network::Stream::SetCallbacks& cb); 00281 00282 void connectionCallback(RemoteStreamWPtr wstream, const Sirikata::Network::Stream::ConnectionStatus status, const std::string& reason); 00283 // NOTE: The ind_recv_strm is a bit of an odd construction. Essentially, we 00284 // are forced to register callbacks immediately upon connection, but at that 00285 // time we don't have all the information we will need. However, we also 00286 // don't want to have to take a lock and look it up on every callback. 00287 // Therefore, we pass through a location for the recv_strm to be stored. 00288 // Further, we wrap it in a shared_ptr so it gets cleaned up when no longer 00289 // needed (the storage for the pointer, not the TCPReceiveStream itself, 00290 // which is cleaned up separately.) 00291 typedef std::tr1::shared_ptr<TCPReceiveStream*> IndirectTCPReceiveStream; 00292 void bytesReceivedCallback(RemoteStreamWPtr wstream, IndirectTCPReceiveStream ind_recv_strm, Chunk& data, const Sirikata::Network::Stream::PauseReceiveCallback& pause); 00293 void readySendCallback(RemoteStreamWPtr wstream); 00294 00295 public: 00296 TCPSpaceNetwork(SpaceContext* ctx); 00297 virtual ~TCPSpaceNetwork(); 00298 00299 virtual void setSendListener(SendListener* sl); 00300 00301 virtual void listen(const ServerID& addr, ReceiveListener* receive_listener); 00302 virtual SendStream* connect(Network::IOStrand* strand, const ServerID& addr); 00303 }; 00304 00305 } // namespace Sirikata 00306 00307 00308 #endif //_SIRIKATA_TCP_SPACE_NETWORK_HPP_