Sirikata
|
00001 /* Sirikata Network Utilities 00002 * TCPStream.hpp 00003 * 00004 * Copyright (c) 2009, Daniel Reiter Horn 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 #ifndef SIRIKATA_TCPStream_HPP__ 00033 #define SIRIKATA_TCPStream_HPP__ 00034 00035 #include <sirikata/core/network/Stream.hpp> 00036 #include <sirikata/core/util/AtomicTypes.hpp> 00037 #include "TCPSSTDecls.hpp" 00038 00039 namespace Sirikata { 00040 namespace Network { 00041 00042 class TCPSetCallbacks; 00043 class IOStrand; 00044 00090 class TCPStream:public Stream { 00091 public: 00092 class Callbacks; 00093 static const char * STRING_PREFIX() { 00094 return "SSTTCP"; 00095 } 00096 static const char * WEBSOCKET_STRING_PREFIX() { 00097 return "SST:WS"; 00098 } 00099 enum HeaderSizeEnumerant { 00100 STRING_PREFIX_LENGTH=6, 00101 TcpSstHeaderSize=24, 00102 MaxWebSocketHeaderSize=2048 00103 }; 00104 enum TCPStreamControlCodes { 00105 TCPStreamCloseStream=1, 00106 TCPStreamAckCloseStream=2 00107 }; 00108 enum StreamType{ 00109 UNKNOWN, 00110 BASE64_ZERODELIM, 00111 LENGTH_DELIM, 00112 RFC_6455 00113 }; 00114 //if !=0 and type is RFC_6455, arbitrarily fragment packets 2 indicates more aggressive testing of fragmentation than 1 (testing option) 00115 static int sFragmentPackets; 00116 private: 00117 friend class MultiplexedSocket; 00118 friend class TCPSetCallbacks; 00120 IOStrand* mIOStrand; 00122 MultiplexedSocketPtr mSocket; 00124 void addCallbacks(Callbacks*); 00126 StreamID mID; 00127 enum { 00129 SendStatusClosing=(1<<29) 00130 }; 00132 std::tr1::shared_ptr<AtomicValue<int> >mSendStatus; 00133 unsigned char mNumSimultaneousSockets; 00134 bool mNoDelay; 00135 StreamType mStreamType; 00136 unsigned int mSendBufferSize; 00137 unsigned int mKernelSendBufferSize; 00138 unsigned int mKernelReceiveBufferSize; 00139 00141 TCPStream(IOStrand*,unsigned char mNumSimultaneousSockets, unsigned int mSendBufferSize, bool noDelay, StreamType streamType, unsigned int kernelSendBufferSize, unsigned int kernelReceiveBufferSize); 00142 00143 00144 public: 00146 static bool closeSendStatus(AtomicValue<int>&vSendStatus); 00148 StreamID getID()const {return mID;} 00153 class Callbacks:public Noncopyable { 00154 public: 00155 Stream::ConnectionCallback mConnectionCallback; 00156 Stream::ReceivedCallback mBytesReceivedCallback; 00157 Stream::ReadySendCallback mReadySendCallback; 00158 std::tr1::weak_ptr<AtomicValue<int> > mSendStatus; 00159 Callbacks(const Stream::ConnectionCallback &connectionCallback, 00160 const Stream::ReceivedCallback &bytesReceivedCallback, 00161 const Stream::ReadySendCallback &readySendCallback, 00162 const std::tr1::weak_ptr<AtomicValue<int> >&sendStatus): 00163 mConnectionCallback(connectionCallback), 00164 mBytesReceivedCallback(bytesReceivedCallback), 00165 mReadySendCallback(readySendCallback), 00166 mSendStatus(sendStatus){ 00167 } 00168 }; 00170 TCPStream(IOStrand*, OptionSet*); 00172 TCPStream(const MultiplexedSocketPtr &shared_socket, const Stream::StreamID&); 00173 virtual Stream*factory(); 00175 virtual void readyRead(); 00177 virtual void requestReadySendCallback(); 00179 WARN_UNUSED 00180 virtual bool send(MemoryReference, StreamReliability); 00182 WARN_UNUSED 00183 virtual bool send(MemoryReference, MemoryReference, StreamReliability); 00185 WARN_UNUSED 00186 virtual bool send(const Chunk&data,StreamReliability); 00187 virtual bool canSend(size_t dataSize)const; 00189 virtual void connect( 00190 const Address& addy, 00191 const SubstreamCallback &substreamCallback, 00192 const ConnectionCallback &connectionCallback, 00193 const ReceivedCallback&chunkReceivedCallback, 00194 const ReadySendCallback&readySend); 00195 00196 static TCPStream* construct(Network::IOStrand*io, OptionSet*options) { 00197 return new TCPStream(io,options); 00198 } 00200 virtual Stream* clone(const SubstreamCallback&cb); 00202 virtual Stream* clone(const ConnectionCallback &connectionCallback, 00203 const ReceivedCallback&chunkReceivedCallback, 00204 const ReadySendCallback&readySendCallback); 00205 00207 Address getRemoteEndpoint() const; 00209 Address getLocalEndpoint() const; 00210 00211 00212 //Shuts down the socket, allowing StreamID to be reused and opposing stream to get disconnection callback 00213 virtual void close(); 00214 ~TCPStream(); 00215 00216 virtual Duration averageSendLatency() const; 00217 virtual Duration averageReceiveLatency() const; 00218 }; 00219 00220 } // namespace Network 00221 } // namespace Sirikata 00222 00223 #endif //SIRIKATA_TCPStream_HPP__