Sirikata
space/src/TCPSpaceNetwork.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  TCPSpaceNetwork.hpp.hpp
00003  *
00004  *  Copyright (c) 2010, Daniel Reiter Horn
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef _SIRIKATA_TCP_SPACE_NETWORK_HPP_
00034 #define _SIRIKATA_TCP_SPACE_NETWORK_HPP_
00035 
00036 #include <sirikata/space/SpaceNetwork.hpp>
00037 #include <sirikata/core/network/Address4.hpp>
00038 #include <sirikata/core/network/Stream.hpp>
00039 #include <sirikata/core/network/StreamListener.hpp>
00040 #include <sirikata/core/util/PluginManager.hpp>
00041 #include <sirikata/core/queue/SizedThreadSafeQueue.hpp>
00042 #include <sirikata/core/queue/CountResourceMonitor.hpp>
00043 
00044 namespace Sirikata {
00045 
00046 class TCPSpaceNetwork : public SpaceNetwork {
00047     // Data associated with a stream.  Note that this stream is
00048     // usually, but not always, unique to the endpoint pair.  Due to
00049     // the possibility of both sides initiating a connection at the
00050     // same time we may have more than one of these per remote
00051     // endpoint, so we maintain a bit more bookkeeping information to
00052     // manage that possibility.
00053     struct RemoteStream {
00054         enum Initiator {
00055             Us, Them
00056         };
00057 
00058         // parent is used to set the buffer size, stream is the
00059         // underlying stream
00060         RemoteStream(TCPSpaceNetwork* parent, Sirikata::Network::Stream*strm, ServerID remote_id, Address4 remote_net, Initiator init);
00061 
00062         ~RemoteStream();
00063 
00064         bool push(Chunk& data, bool* was_empty);
00065         Chunk* pop(Network::IOStrand* ios);
00066 
00067         Sirikata::Network::Stream* stream;
00068 
00069         Address4 network_endpoint; // The remote endpoint we're talking to.
00070         ServerID logical_endpoint; // The remote endpoint we're
00071                                    // talking to.
00072 
00073         Initiator initiator;
00074 
00075         bool connected; // Indicates if the initial header specifying
00076                         // the remote endpoint has been sent or
00077                         // received (depending on which side initiated
00078                         // the connection). If this is true then this
00079                         // connection should be in the map of
00080                         // connections and it should be safe to send
00081                         // data on it.
00082         bool shutting_down; // Inidicates if this connection is
00083                             // currently being shutdown. Will be true
00084                             // if another stream to the same endpoint
00085                             // was preferred over this one.
00086         typedef Sirikata::SizedThreadSafeQueue<Chunk*,CountResourceMonitor> SizedChunkReceiveQueue;
00087         SizedChunkReceiveQueue receive_queue; // Note: This can't be a single
00088                                               // front item or the receive queue
00089                                               // empties it too quickly and
00090                                               // loses bandwidth.
00091         bool paused; // Indicates if receiving is currently paused for
00092                      // this stream.  If true, the stream must be
00093                      // unpaused the next time someone calls
00094                      // receiveOne
00095 
00096         boost::mutex mPushPopMutex;
00097     };
00098 
00099     typedef std::tr1::shared_ptr<RemoteStream> RemoteStreamPtr;
00100     typedef std::tr1::weak_ptr<RemoteStream> RemoteStreamWPtr;
00101     typedef std::tr1::unordered_map<ServerID, RemoteStreamPtr> RemoteStreamMap;
00102     typedef std::tr1::unordered_map<Address4, RemoteStreamPtr, Address4::Hasher> RemoteNetStreamMap;
00103 
00104 
00117     struct RemoteSession {
00118         RemoteSession(ServerID sid);
00119         ~RemoteSession();
00120 
00121         // Endpoint for this session, guaranteed not to change.
00122         ServerID logical_endpoint;
00123 
00124         // Streams for this session.
00125         RemoteStreamPtr remote_stream; // The stream we're normally trying to
00126                                        // receive from
00127         RemoteStreamPtr closing_stream; // A stream we're trying to shut down
00128         RemoteStreamPtr pending_out; // Remote stream that we've opened and
00129                                         // called connect on, but which isn't
00130                                         // ready yet
00131     };
00132     typedef std::tr1::shared_ptr<RemoteSession> RemoteSessionPtr;
00133     typedef std::tr1::weak_ptr<RemoteSession> RemoteSessionWPtr;
00134 
00135     class TCPSendStream : public SpaceNetwork::SendStream {
00136     public:
00137         TCPSendStream(ServerID sid, RemoteSessionPtr s);
00138         ~TCPSendStream();
00139 
00140         virtual ServerID id() const;
00141         virtual bool send(const Chunk&);
00142 
00143     private:
00144         ServerID logical_endpoint;
00145         RemoteSessionPtr session;
00146     };
00147     typedef std::tr1::unordered_map<ServerID, TCPSendStream*> SendStreamMap;
00148 
00149     class TCPReceiveStream : public SpaceNetwork::ReceiveStream {
00150     public:
00151         TCPReceiveStream(ServerID sid, RemoteSessionPtr s, Network::IOStrand* _ios);
00152         ~TCPReceiveStream();
00153         virtual ServerID id() const;
00154         virtual Chunk* front();
00155         virtual Chunk* pop();
00156 
00157     private:
00158         // Get the current queue for receiving data from the address.
00159         // This considers any closing streams first, then the main stream
00160         // in order to handle any data arriving on closing streams as
00161         // quickly as possible.  It takes into account whether any data is
00162         // available on the stream as well, so a closing stream with no
00163         // data available will *not* be returned when an active stream
00164         // with data available also exists.
00165         // NOTE: Now this just ensures that front_stream will either
00166         // point to the right thing or to NULL, so there is no return value.
00167         void getCurrentRemoteStream();
00168 
00169         bool canReadFrom(RemoteStreamPtr& strm);
00170 
00171         ServerID logical_endpoint;
00172         RemoteSessionPtr session;
00173         RemoteStreamPtr front_stream; // Stream which we already got a front()
00174                                       // item from
00175         Chunk* front_elem; // The front item, left out here to make it
00176                            // accessible since the RemoteStream doesn't give
00177                            // easy access
00178         Network::IOStrand* ios;
00179     };
00180     typedef std::tr1::unordered_map<ServerID, TCPReceiveStream*> ReceiveStreamMap;
00181 
00182     // Simple storage for the session, send stream, and receive stream
00183     // associated with a single endpoint.  There should only ever be *one* of
00184     // these per remote endpoint.
00185     struct RemoteData {
00186         RemoteData(RemoteSessionPtr s)
00187          :session(s),
00188           send(NULL),
00189           receive(NULL)
00190         {}
00191 
00192         ~RemoteData() {
00193             delete send;
00194             delete receive;
00195             session.reset();
00196         }
00197 
00198         RemoteSessionPtr session;
00199         TCPSendStream* send;
00200         TCPReceiveStream* receive;
00201     };
00202 
00203     // This is the one chunk of truly thread-safe data, followed by a small
00204     // number of truly thread-safe methods to manipulate it.  These control the
00205     // central repository of RemoteSessionPtrs, ReceiveStreams, and SendStreams,
00206     // each unique per ServerID. This map is really only used to retrieve or
00207     // initialize each, after which all operations are directly on those objects
00208     // instead of through the map.  It is also locked on new connections to
00209     // make lookups so tradeoffs between underlying connections are possible.
00210     typedef std::tr1::unordered_map<ServerID, RemoteData*> RemoteDataMap;
00211     RemoteDataMap mRemoteData;
00212     boost::recursive_mutex mRemoteDataMutex;
00213     RemoteData* getRemoteData(ServerID sid);
00214     RemoteSessionPtr getRemoteSession(ServerID sid);
00215     TCPSendStream* getNewSendStream(ServerID sid);
00216     TCPReceiveStream* getNewReceiveStream(ServerID sid);
00217     // Get a new uninitialized outgoing stream that connect can be called on.
00218     // Will return NULL if another outgoing stream has already been requested,
00219     // guaranteeing that multiple outgoing stream connections won't be
00220     // attempted.
00221     RemoteStreamPtr getNewOutgoingStream(ServerID sid, Address4 remote_net, RemoteStream::Initiator init);
00222     RemoteStreamPtr getNewIncomingStream(Address4 remote_net, RemoteStream::Initiator init, Sirikata::Network::Stream* strm);
00223     typedef std::set<Sirikata::Network::IOTimerPtr> TimerSet;
00224 
00225     // All the real data is handled in the main thread.
00226     Sirikata::PluginManager mPluginManager;
00227     String mStreamPlugin;
00228     Sirikata::OptionSet* mListenOptions;
00229     Sirikata::OptionSet* mSendOptions;
00230 
00231     Network::IOStrand *mIOStrand;
00232     Network::IOWork* mIOWork;
00233 
00234     RemoteStreamMap mClosingStreams;
00235     TimerSet mClosingStreamTimers; // Timers for streams that are still closing.
00236 
00237     void finishListen(ServerID resolved_sid, Address4 addr, ReceiveListener* receive_listener);
00238 
00239     // Open a new connection.  Should be called when an existing connection
00240     // isn't available.
00241     TCPSendStream* openConnection(Network::IOStrand* strand, const ServerID& dest);
00242     // Finish opening the connection
00243     void finishOpenConnection(const ServerID& dest, ServerID resolved_dest, Address4 addr);
00244 
00245     // Add stream to system, possibly resolving conflicting sets of
00246     // streams. Return corresponding TCPReceiveStream*
00247     TCPReceiveStream* handleConnectedStream(RemoteStreamPtr source_stream);
00248     // Mark a send stream as disconnected
00249     void handleDisconnectedStream(const RemoteStreamPtr& wstream);
00250 
00251 
00252 
00253     Address4 mListenAddress;
00254 
00255     Sirikata::Network::StreamListener *mListener;
00256 
00257     SendListener* mSendListener; // Listener for our send events
00258     ReceiveListener* mReceiveListener; // Listener for our receive events
00259 
00260     // Main Thread/Strand Methods, allowed to access all the core data structures.  These are mainly utility methods
00261     // posted by the IO thread.
00262 
00263     // Sends the introduction of this server to the remote server.  Should be
00264     // used for connections allocated when trying to send.
00265     void sendServerIntro(const RemoteStreamPtr& out_stream);
00266 
00267     // Handles timeouts for closing streams -- forcibly closes them, removes
00268     // them from the closing set.
00269     void handleClosingStreamTimeout(Sirikata::Network::IOTimerPtr timer, RemoteStreamPtr& wstream);
00270 
00271     // IO Thread/Strand.  They must be certain not to call any main thread methods or access any main thread data.
00272     // These are all callbacks from the network, so mostly they should
00273     // just be posting the results to the main thread.
00274 
00275     RemoteNetStreamMap mPendingStreams; // Streams initiated by the remote
00276                                         // endpoint that are waiting for the
00277                                         // initial message specifying the remote
00278                                         // endpoint ID
00279 
00280     void newStreamCallback(Sirikata::Network::Stream* strm, Sirikata::Network::Stream::SetCallbacks& cb);
00281 
00282     void connectionCallback(RemoteStreamWPtr wstream, const Sirikata::Network::Stream::ConnectionStatus status, const std::string& reason);
00283     // NOTE: The ind_recv_strm is a bit of an odd construction.  Essentially, we
00284     // are forced to register callbacks immediately upon connection, but at that
00285     // time we don't have all the information we will need.  However, we also
00286     // don't want to have to take a lock and look it up on every callback.
00287     // Therefore, we pass through a location for the recv_strm to be stored.
00288     // Further, we wrap it in a shared_ptr so it gets cleaned up when no longer
00289     // needed (the storage for the pointer, not the TCPReceiveStream itself,
00290     // which is cleaned up separately.)
00291     typedef std::tr1::shared_ptr<TCPReceiveStream*> IndirectTCPReceiveStream;
00292     void bytesReceivedCallback(RemoteStreamWPtr wstream, IndirectTCPReceiveStream ind_recv_strm, Chunk& data, const Sirikata::Network::Stream::PauseReceiveCallback& pause);
00293     void readySendCallback(RemoteStreamWPtr wstream);
00294 
00295 public:
00296     TCPSpaceNetwork(SpaceContext* ctx);
00297     virtual ~TCPSpaceNetwork();
00298 
00299     virtual void setSendListener(SendListener* sl);
00300 
00301     virtual void listen(const ServerID& addr, ReceiveListener* receive_listener);
00302     virtual SendStream* connect(Network::IOStrand* strand, const ServerID& addr);
00303 };
00304 
00305 } // namespace Sirikata
00306 
00307 
00308 #endif //_SIRIKATA_TCP_SPACE_NETWORK_HPP_