Sirikata
libcore/plugins/tcpsst/MultiplexedSocket.hpp
Go to the documentation of this file.
00001 /*  Sirikata Network Utilities
00002  *  MultiplexedSocket.hpp
00003  *
00004  *  Copyright (c) 2009, Daniel Reiter Horn
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef _SIRIKATA_TCPSST_MULTIPLEXED_SOCKET_HPP_
00034 #define _SIRIKATA_TCPSST_MULTIPLEXED_SOCKET_HPP_
00035 
00036 #include <sirikata/core/util/SerializationCheck.hpp>
00037 #include <boost/thread.hpp>
00038 #include "TCPSSTDecls.hpp"
00039 #include "TCPStream.hpp"
00040 
00041 namespace Sirikata {
00042 namespace Network {
00043 
00044 class ASIOReadBuffer;
00045 class ASIOSocketWrapper;
00046 
00047 class MultiplexedSocket:public SelfWeakPtr<MultiplexedSocket>, public SerializationCheck {
00048 public:
00049     friend class ASIOReadBuffer;
00050     class RawRequest {
00051     public:
00052         bool unordered;
00053         bool unreliable;
00054         Stream::StreamID originStream;
00055         Chunk * data;
00056 
00057         uint32 size() const {
00058             return data->size();
00059         }
00060     };
00061     enum SocketConnectionPhase{
00062         PRECONNECTION,
00063         WAITCONNECTING,//need to fetch the lock, but about to connect
00064         CONNECTED,
00065         DISCONNECTED
00066     };
00067 private:
00068 
00069 //Begin Members//
00070 
00072     IOStrand* mIO;
00074     std::vector<ASIOSocketWrapper> mSockets;
00076     Stream::SubstreamCallback mNewSubstreamCallback;
00077     typedef std::tr1::unordered_map<Stream::StreamID,TCPStream::Callbacks*,Stream::StreamID::Hasher> CallbackMap;
00079     class StreamIDCallbackPair{
00080     public:
00081         Stream::StreamID mID;
00082         TCPStream::Callbacks* mCallback;
00083         StreamIDCallbackPair(Stream::StreamID id,TCPStream::Callbacks* cb):mID(id) {
00084             mCallback=cb;
00085         }
00086         std::pair<Stream::StreamID,TCPStream::Callbacks*> pair() const{
00087            return std::pair<Stream::StreamID,TCPStream::Callbacks*>(mID,mCallback);
00088         }
00089     };
00091     static boost::mutex sConnectingMutex;
00093     SizedThreadSafeQueue<RawRequest>* mNewRequests;
00095     volatile SocketConnectionPhase mSocketConnectionPhase;
00097     std::deque<StreamIDCallbackPair> mCallbackRegistration;
00099     CallbackMap mCallbacks;
00101     TCPStream::StreamType mStreamType;
00103     std::tr1::unordered_map<Stream::StreamID,unsigned int,Stream::StreamID::Hasher>mAckedClosingStreams;
00105     std::tr1::unordered_set<Stream::StreamID,Stream::StreamID::Hasher>mOneSidedClosingStreams;
00106 #define ThreadSafeStack ThreadSafeQueue //FIXME this can be way more efficient
00107 
00108     AtomicValue<uint32> mHighestStreamID;
00110     ThreadSafeStack<Stream::StreamID>mFreeStreamIDs;
00111 #undef ThreadSafeStack
00112 
00113 //Begin helper functions//
00114 
00116     void ioReactorThreadCommitCallback(StreamIDCallbackPair& newcallback);
00118     bool CommitCallbacks(std::deque<StreamIDCallbackPair> &registration, SocketConnectionPhase status, bool setConnectedStatus=false);
00119 
00121     size_t leastBusyStream(size_t preferredStream);
00127     float dropChance(const Chunk*data,size_t whichStream);
00132     static bool sendBytesNow(const MultiplexedSocketPtr& thus,const RawRequest&data, bool force);
00137     void connectionFailureOrSuccessCallback(SocketConnectionPhase status, Stream::ConnectionStatus reportedProblem, const std::string&errorMessage=std::string());
00142     void connectionFailedCallback(const std::string& error);
00147     void hostDisconnectedCallback(const std::string& error);
00148 public:
00149     TCPStream::StreamType getStreamType() const {
00150         return mStreamType;
00151     }
00153     IOStrand* getStrand() {return mIO;}
00154 
00159     static void closeStream(const MultiplexedSocketPtr& thus,const Stream::StreamID&sid,TCPStream::TCPStreamControlCodes code=TCPStream::TCPStreamCloseStream);
00160 
00165     static bool sendBytes(const MultiplexedSocketPtr& thus,const RawRequest&data, unsigned int maxSendQueueSize=2147483647);
00169     bool canSendBytes(Stream::StreamID origin,size_t dataSize)const;
00174     SocketConnectionPhase addCallbacks(const Stream::StreamID&sid, TCPStream::Callbacks* cb);
00176     Stream::StreamID getNewID();
00178     static Stream::StreamID getFirstStreamID(bool connector);
00180     MultiplexedSocket(IOStrand*io, const Stream::SubstreamCallback&substreamCallback, TCPStream::StreamType type);
00182     MultiplexedSocket(IOStrand*io, const UUID&uuid, const Stream::SubstreamCallback &substreamCallback, TCPStream::StreamType type);
00184     void initFromSockets(const std::vector<TCPSocket*>&sockets, size_t max_send_buffer_size);
00186     static void sendAllProtocolHeaders(const MultiplexedSocketPtr& thus, const std::string&origin, const std::string&host, const std::string&port, const std::string&resource_name, const std::string&subprotocol,bool doSendSubProtocol, const std::map<TCPSocket*,std::string>& response, TCPStream::StreamType streamType);
00188     ~MultiplexedSocket();
00190     void shutDownClosedStream(unsigned int controlCode,const Stream::StreamID &id);
00196     void receiveFullChunk(unsigned int whichSocket, Stream::StreamID id, Chunk&newChunk, const Stream::PauseReceiveCallback& pauseReceive);
00197 
00201     void receivePing(unsigned int whichSocket, MemoryReference data, bool isPong);
00202 
00207     void connectionFailedCallback(unsigned int whichSocket, const std::string& error);
00212     template <class ErrorCode> void connectionFailedCallback(const ErrorCode& error) {
00213         connectionFailedCallback(error.message());
00214     }
00219     template <class ErrorCode> void connectionFailedCallback(unsigned int whichSocket, const ErrorCode& error) {
00220         connectionFailedCallback(whichSocket,error.message());
00221     }
00226     template <class ErrorCode> void connectionFailedCallback(const ASIOSocketWrapper* whichSocket, const ErrorCode &error) {
00227         unsigned int which=0;
00228         for (std::vector<ASIOSocketWrapper>::iterator i=mSockets.begin(),ie=mSockets.end();i!=ie;++i,++which) {
00229             if (&*i==whichSocket)
00230                 break;
00231         }
00232         connectionFailedCallback(which==mSockets.size()?0:which,error.message());
00233     }
00234 
00239     void hostDisconnectedCallback(unsigned int whichSocket, const std::string& error);
00244     template <class ErrorCode> void hostDisconnectedCallback(unsigned int whichSocket, const ErrorCode& error) {
00245         hostDisconnectedCallback(whichSocket,error.message());
00246     }
00251     template <class ErrorCode> void hostDisconnectedCallback(const ASIOSocketWrapper* whichSocket, const ErrorCode &error) {
00252         unsigned int which=0;
00253         for (std::vector<ASIOSocketWrapper>::iterator i=mSockets.begin(),ie=mSockets.end();i!=ie;++i,++which) {
00254             if (&*i==whichSocket)
00255                 break;
00256         }
00257         hostDisconnectedCallback(which==mSockets.size()?0:which,error.message());
00258     }
00259 
00260     static void ioReactorThreadResumeRead(const MultiplexedSocketWPtr&, Stream::StreamID id);
00261     static void ioReactorThreadPauseSend(const MultiplexedSocketWPtr& mp, Stream::StreamID id);
00262 
00267     void connectedCallback() {
00268         connectionFailureOrSuccessCallback(CONNECTED,Stream::Connected);
00269     }
00270     void unpauseSendStreams(const std::vector<Stream::StreamID>&toUnpause);
00271 
00291     void connect(const Address&address, unsigned int numSockets, size_t maxEnqueuedSendSize, bool noDelay, unsigned int kernelSendBufferSize, unsigned int kernelReceiveBufferSize);
00292 
00298     void prepareConnect(unsigned int numSockets, size_t maxEnqueuedSendSize, bool noDelay, unsigned int kernelSendBufferSize, unsigned int kernelReceiveBufferSize);
00299 
00300     unsigned int numSockets() const {
00301         return mSockets.size();
00302     }
00303     ASIOSocketWrapper&getASIOSocketWrapper(unsigned int whichSocket){
00304         return mSockets[whichSocket];
00305     }
00306     const ASIOSocketWrapper&getASIOSocketWrapper(unsigned int whichSocket)const{
00307         return mSockets[whichSocket];
00308     }
00309 
00310     Address getRemoteEndpoint(Stream::StreamID id)const ;
00311     Address getLocalEndpoint(Stream::StreamID id)const ;
00312 
00313     // -- Statistics
00314     Duration averageSendLatency() const;
00315     Duration averageReceiveLatency() const;
00316 };
00317 
00318 } // namespace Network
00319 } // namespace Sirikata
00320 
00321 
00322 #endif //_SIRIKATA_TCPSST_MULTIPLEXED_SOCKET_HPP_