Sirikata
|
00001 /* Sirikata Network Utilities 00002 * MultiplexedSocket.hpp 00003 * 00004 * Copyright (c) 2009, Daniel Reiter Horn 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef _SIRIKATA_TCPSST_MULTIPLEXED_SOCKET_HPP_ 00034 #define _SIRIKATA_TCPSST_MULTIPLEXED_SOCKET_HPP_ 00035 00036 #include <sirikata/core/util/SerializationCheck.hpp> 00037 #include <boost/thread.hpp> 00038 #include "TCPSSTDecls.hpp" 00039 #include "TCPStream.hpp" 00040 00041 namespace Sirikata { 00042 namespace Network { 00043 00044 class ASIOReadBuffer; 00045 class ASIOSocketWrapper; 00046 00047 class MultiplexedSocket:public SelfWeakPtr<MultiplexedSocket>, public SerializationCheck { 00048 public: 00049 friend class ASIOReadBuffer; 00050 class RawRequest { 00051 public: 00052 bool unordered; 00053 bool unreliable; 00054 Stream::StreamID originStream; 00055 Chunk * data; 00056 00057 uint32 size() const { 00058 return data->size(); 00059 } 00060 }; 00061 enum SocketConnectionPhase{ 00062 PRECONNECTION, 00063 WAITCONNECTING,//need to fetch the lock, but about to connect 00064 CONNECTED, 00065 DISCONNECTED 00066 }; 00067 private: 00068 00069 //Begin Members// 00070 00072 IOStrand* mIO; 00074 std::vector<ASIOSocketWrapper> mSockets; 00076 Stream::SubstreamCallback mNewSubstreamCallback; 00077 typedef std::tr1::unordered_map<Stream::StreamID,TCPStream::Callbacks*,Stream::StreamID::Hasher> CallbackMap; 00079 class StreamIDCallbackPair{ 00080 public: 00081 Stream::StreamID mID; 00082 TCPStream::Callbacks* mCallback; 00083 StreamIDCallbackPair(Stream::StreamID id,TCPStream::Callbacks* cb):mID(id) { 00084 mCallback=cb; 00085 } 00086 std::pair<Stream::StreamID,TCPStream::Callbacks*> pair() const{ 00087 return std::pair<Stream::StreamID,TCPStream::Callbacks*>(mID,mCallback); 00088 } 00089 }; 00091 static boost::mutex sConnectingMutex; 00093 SizedThreadSafeQueue<RawRequest>* mNewRequests; 00095 volatile SocketConnectionPhase mSocketConnectionPhase; 00097 std::deque<StreamIDCallbackPair> mCallbackRegistration; 00099 CallbackMap mCallbacks; 00101 TCPStream::StreamType mStreamType; 00103 std::tr1::unordered_map<Stream::StreamID,unsigned int,Stream::StreamID::Hasher>mAckedClosingStreams; 00105 std::tr1::unordered_set<Stream::StreamID,Stream::StreamID::Hasher>mOneSidedClosingStreams; 00106 #define ThreadSafeStack ThreadSafeQueue //FIXME this can be way more efficient 00107 00108 AtomicValue<uint32> mHighestStreamID; 00110 ThreadSafeStack<Stream::StreamID>mFreeStreamIDs; 00111 #undef ThreadSafeStack 00112 00113 //Begin helper functions// 00114 00116 void ioReactorThreadCommitCallback(StreamIDCallbackPair& newcallback); 00118 bool CommitCallbacks(std::deque<StreamIDCallbackPair> ®istration, SocketConnectionPhase status, bool setConnectedStatus=false); 00119 00121 size_t leastBusyStream(size_t preferredStream); 00127 float dropChance(const Chunk*data,size_t whichStream); 00132 static bool sendBytesNow(const MultiplexedSocketPtr& thus,const RawRequest&data, bool force); 00137 void connectionFailureOrSuccessCallback(SocketConnectionPhase status, Stream::ConnectionStatus reportedProblem, const std::string&errorMessage=std::string()); 00142 void connectionFailedCallback(const std::string& error); 00147 void hostDisconnectedCallback(const std::string& error); 00148 public: 00149 TCPStream::StreamType getStreamType() const { 00150 return mStreamType; 00151 } 00153 IOStrand* getStrand() {return mIO;} 00154 00159 static void closeStream(const MultiplexedSocketPtr& thus,const Stream::StreamID&sid,TCPStream::TCPStreamControlCodes code=TCPStream::TCPStreamCloseStream); 00160 00165 static bool sendBytes(const MultiplexedSocketPtr& thus,const RawRequest&data, unsigned int maxSendQueueSize=2147483647); 00169 bool canSendBytes(Stream::StreamID origin,size_t dataSize)const; 00174 SocketConnectionPhase addCallbacks(const Stream::StreamID&sid, TCPStream::Callbacks* cb); 00176 Stream::StreamID getNewID(); 00178 static Stream::StreamID getFirstStreamID(bool connector); 00180 MultiplexedSocket(IOStrand*io, const Stream::SubstreamCallback&substreamCallback, TCPStream::StreamType type); 00182 MultiplexedSocket(IOStrand*io, const UUID&uuid, const Stream::SubstreamCallback &substreamCallback, TCPStream::StreamType type); 00184 void initFromSockets(const std::vector<TCPSocket*>&sockets, size_t max_send_buffer_size); 00186 static void sendAllProtocolHeaders(const MultiplexedSocketPtr& thus, const std::string&origin, const std::string&host, const std::string&port, const std::string&resource_name, const std::string&subprotocol,bool doSendSubProtocol, const std::map<TCPSocket*,std::string>& response, TCPStream::StreamType streamType); 00188 ~MultiplexedSocket(); 00190 void shutDownClosedStream(unsigned int controlCode,const Stream::StreamID &id); 00196 void receiveFullChunk(unsigned int whichSocket, Stream::StreamID id, Chunk&newChunk, const Stream::PauseReceiveCallback& pauseReceive); 00197 00201 void receivePing(unsigned int whichSocket, MemoryReference data, bool isPong); 00202 00207 void connectionFailedCallback(unsigned int whichSocket, const std::string& error); 00212 template <class ErrorCode> void connectionFailedCallback(const ErrorCode& error) { 00213 connectionFailedCallback(error.message()); 00214 } 00219 template <class ErrorCode> void connectionFailedCallback(unsigned int whichSocket, const ErrorCode& error) { 00220 connectionFailedCallback(whichSocket,error.message()); 00221 } 00226 template <class ErrorCode> void connectionFailedCallback(const ASIOSocketWrapper* whichSocket, const ErrorCode &error) { 00227 unsigned int which=0; 00228 for (std::vector<ASIOSocketWrapper>::iterator i=mSockets.begin(),ie=mSockets.end();i!=ie;++i,++which) { 00229 if (&*i==whichSocket) 00230 break; 00231 } 00232 connectionFailedCallback(which==mSockets.size()?0:which,error.message()); 00233 } 00234 00239 void hostDisconnectedCallback(unsigned int whichSocket, const std::string& error); 00244 template <class ErrorCode> void hostDisconnectedCallback(unsigned int whichSocket, const ErrorCode& error) { 00245 hostDisconnectedCallback(whichSocket,error.message()); 00246 } 00251 template <class ErrorCode> void hostDisconnectedCallback(const ASIOSocketWrapper* whichSocket, const ErrorCode &error) { 00252 unsigned int which=0; 00253 for (std::vector<ASIOSocketWrapper>::iterator i=mSockets.begin(),ie=mSockets.end();i!=ie;++i,++which) { 00254 if (&*i==whichSocket) 00255 break; 00256 } 00257 hostDisconnectedCallback(which==mSockets.size()?0:which,error.message()); 00258 } 00259 00260 static void ioReactorThreadResumeRead(const MultiplexedSocketWPtr&, Stream::StreamID id); 00261 static void ioReactorThreadPauseSend(const MultiplexedSocketWPtr& mp, Stream::StreamID id); 00262 00267 void connectedCallback() { 00268 connectionFailureOrSuccessCallback(CONNECTED,Stream::Connected); 00269 } 00270 void unpauseSendStreams(const std::vector<Stream::StreamID>&toUnpause); 00271 00291 void connect(const Address&address, unsigned int numSockets, size_t maxEnqueuedSendSize, bool noDelay, unsigned int kernelSendBufferSize, unsigned int kernelReceiveBufferSize); 00292 00298 void prepareConnect(unsigned int numSockets, size_t maxEnqueuedSendSize, bool noDelay, unsigned int kernelSendBufferSize, unsigned int kernelReceiveBufferSize); 00299 00300 unsigned int numSockets() const { 00301 return mSockets.size(); 00302 } 00303 ASIOSocketWrapper&getASIOSocketWrapper(unsigned int whichSocket){ 00304 return mSockets[whichSocket]; 00305 } 00306 const ASIOSocketWrapper&getASIOSocketWrapper(unsigned int whichSocket)const{ 00307 return mSockets[whichSocket]; 00308 } 00309 00310 Address getRemoteEndpoint(Stream::StreamID id)const ; 00311 Address getLocalEndpoint(Stream::StreamID id)const ; 00312 00313 // -- Statistics 00314 Duration averageSendLatency() const; 00315 Duration averageReceiveLatency() const; 00316 }; 00317 00318 } // namespace Network 00319 } // namespace Sirikata 00320 00321 00322 #endif //_SIRIKATA_TCPSST_MULTIPLEXED_SOCKET_HPP_