Sirikata
Classes | Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Types | Private Member Functions | Private Attributes | Friends
Sirikata::Network::TCPStream Class Reference

This is a particular example implementation of the Stream interface sitting atop TCP. More...

#include <TCPStream.hpp>

Inheritance diagram for Sirikata::Network::TCPStream:
Collaboration diagram for Sirikata::Network::TCPStream:

List of all members.

Classes

class  Callbacks
 A pair of callbacks related to a stream. More...

Public Types

enum  HeaderSizeEnumerant { STRING_PREFIX_LENGTH = 6, TcpSstHeaderSize = 24, MaxWebSocketHeaderSize = 2048 }
enum  TCPStreamControlCodes { TCPStreamCloseStream = 1, TCPStreamAckCloseStream = 2 }
enum  StreamType { UNKNOWN, BASE64_ZERODELIM, LENGTH_DELIM, RFC_6455 }

Public Member Functions

StreamID getID () const
 Returns the active stream ID.
 TCPStream (IOStrand *, OptionSet *)
 Constructor which leaves socket in a disconnection state, prepared for a connect() or a clone()
 TCPStream (const MultiplexedSocketPtr &shared_socket, const Stream::StreamID &)
 Constructor which brings the socket up to speed in a completely connected state, prepped with a StreamID and communal link pointer.
virtual Streamfactory ()
 Create a new Stream of the same type as this stream.
virtual void readyRead ()
 There is room on a downstream queue and futher sends should be retried.
virtual void requestReadySendCallback ()
 Send a readySendCallback notification when there is room on the send queue.
virtual WARN_UNUSED bool send (MemoryReference, StreamReliability)
 Implementation of send interface.
virtual WARN_UNUSED bool send (MemoryReference, MemoryReference, StreamReliability)
 Implementation of send interface.
virtual WARN_UNUSED bool send (const Chunk &data, StreamReliability)
 Implementation of send interface.
virtual bool canSend (size_t dataSize) const
 Determine if a message of the specified size could be enqueued to be sent.
virtual void connect (const Address &addy, const SubstreamCallback &substreamCallback, const ConnectionCallback &connectionCallback, const ReceivedCallback &chunkReceivedCallback, const ReadySendCallback &readySend)
 Implementation of connect interface.
virtual Streamclone (const SubstreamCallback &cb)
 Creates a new substream on this connection.
virtual Streamclone (const ConnectionCallback &connectionCallback, const ReceivedCallback &chunkReceivedCallback, const ReadySendCallback &readySendCallback)
 Creates a new substream on this connection. This is for when the callbacks do not require the Stream*.
Address getRemoteEndpoint () const
 Only returns a legitimate address if ConnectionStatus called back, otherwise return Address::null()
Address getLocalEndpoint () const
 Only returns a legitimate address if ConnectionStatus called back, otherwise return Address::null()
virtual void close ()
 Close this Stream, notifying the remote endpoint.
 ~TCPStream ()
virtual Duration averageSendLatency () const
 Get the average latency spent in queues in the stream.
virtual Duration averageReceiveLatency () const
 Get the average latency spent in queues in the stream, waiting for deliver to the user.

Static Public Member Functions

static const char * STRING_PREFIX ()
static const char * WEBSOCKET_STRING_PREFIX ()
static bool closeSendStatus (AtomicValue< int > &vSendStatus)
 Atomically sets the sendStatus for this socket to closed. FIXME: should use atomic compare and swap for |= instead of += right now only supports 2 non-io threads closing at once.
static TCPStreamconstruct (Network::IOStrand *io, OptionSet *options)

Static Public Attributes

static int sFragmentPackets = 0

Private Types

enum  { SendStatusClosing = (1<<29) }

Private Member Functions

void addCallbacks (Callbacks *)
 A function to add callbacks to this particular stream, called by the relevant TCPSetCallbacks function inheriting from SetCallbacks.
 TCPStream (IOStrand *, unsigned char mNumSimultaneousSockets, unsigned int mSendBufferSize, bool noDelay, StreamType streamType, unsigned int kernelSendBufferSize, unsigned int kernelReceiveBufferSize)
 Constructor which leaves socket in a disconnection state, prepared for a connect() or a clone() called internally from factory.

Private Attributes

IOStrandmIOStrand
 The low level boost::asio io service handle.
MultiplexedSocketPtr mSocket
 the shared pointer to the communal sending connection
StreamID mID
 The streamID that must be prepended to the data within any packet sent and all received packets for this Stream.
std::tr1::shared_ptr
< AtomicValue< int > > 
mSendStatus
 incremented while sending: or'd in SendStatusClosing when close function triggered so no further packets will be sent using old ID.
unsigned char mNumSimultaneousSockets
bool mNoDelay
StreamType mStreamType
unsigned int mSendBufferSize
unsigned int mKernelSendBufferSize
unsigned int mKernelReceiveBufferSize

Friends

class MultiplexedSocket
class TCPSetCallbacks

Detailed Description

This is a particular example implementation of the Stream interface sitting atop TCP.

The protocol specified in the Structured streams paper http://pdos.csail.mit.edu/uia/sst/ May be used as another implementation of the Stream interface. This is provided as the default implementation as it supports a wide range of network devices and is not subject to prejudice against UDP connections which we routinely see on the net (my home system gets 10 second lag on UDP connections just due to ISP policy combating P2P networks)

The protocol is as follows:

--Connection-- To connect to a remote host, the client opens a small number (N) of TCP sockets to that host and sends the version string (currently SSTTCP ), an ascii 2 digit representation of N (in this case '03') and a 16 byte unique ID used to pair up connections and possibly in the future for encryption The other side respinds in turn on each connection with a similar handshake (though the 16 bytes may be different) now the connection is online. The remote host may immediately follow the handshake with live packets and the other side may respond as soon as it receives the remote hosts handshake response

--Live Phase-- The first live stream has StreamID of 1. New streams coming from listener must have even StreamID's and new streams coming from connector must have odd stream ID's. To indicate a new substream, simply send a packet with a new stream ID

--Packet Format-- All packets are prepended by a variable length int30 length and int30 StreamID and then the bytestream of data The length value includes the length of the variable-length StreamID. the int30 format is as follows if the highest value bit in the first byte is 0, the other 7 bits represent numbers 0-128 if the highest value bit in the first byte is 1, and the highest value bit in the second byte is 0 the first 7 bits represent the 0-127 remainder and the first 7 bits in the second byte represent the 0-128 significant 7 bit digit valued between 0 and 16256 if the highest value bit in the first and second bytes are both 1 the value between 0 and 16383 above is added to 16384*the third byte added to 16384*256*the fourth byte

--Shutting down a Stream-- If either side decides to close a stream they must send a control packet (which is the implicit stream with StreamID = default int30, which serializes to a single byte=0) and then a control code which must be a byte equal to 1, and finally a variable length StreamID indicating which stream to close This control packet must be broadcast on every related open TCP socket connection This party may not reuse the streamID (given parity match) until it receives control packets with control code equal to 2 (close Ack) on all sockets. The other side must keep the bargain and send the control packet with control code 2 on all sockets to allow ID reuse.

If all streams are shut down, the sockets may be deactivated If the socket disconnects due to error, then Disconnect callbacks must be called


Member Enumeration Documentation

anonymous enum [private]
Enumerator:
SendStatusClosing 

A bit flag indicating that the socket is being shut down and no further sends may proceed.

Enumerator:
STRING_PREFIX_LENGTH 
TcpSstHeaderSize 
MaxWebSocketHeaderSize 
Enumerator:
UNKNOWN 
BASE64_ZERODELIM 
LENGTH_DELIM 
RFC_6455 
Enumerator:
TCPStreamCloseStream 
TCPStreamAckCloseStream 

Constructor & Destructor Documentation

Sirikata::Network::TCPStream::TCPStream ( IOStrand io,
unsigned char  mNumSimultaneousSockets,
unsigned int  mSendBufferSize,
bool  noDelay,
StreamType  streamType,
unsigned int  kernelSendBufferSize,
unsigned int  kernelReceiveBufferSize 
) [private]

Constructor which leaves socket in a disconnection state, prepared for a connect() or a clone() called internally from factory.

References mIOStrand, mKernelReceiveBufferSize, mKernelSendBufferSize, mNoDelay, mNumSimultaneousSockets, mSendBufferSize, and mStreamType.

Referenced by clone(), construct(), and factory().

Sirikata::Network::TCPStream::TCPStream ( IOStrand io,
OptionSet options 
)
Sirikata::Network::TCPStream::TCPStream ( const MultiplexedSocketPtr shared_socket,
const Stream::StreamID sid 
)

Constructor which brings the socket up to speed in a completely connected state, prepped with a StreamID and communal link pointer.

References mKernelReceiveBufferSize, mKernelSendBufferSize, mNoDelay, mNumSimultaneousSockets, mSendBufferSize, and mStreamType.

Sirikata::Network::TCPStream::~TCPStream ( )

References close().


Member Function Documentation

void Sirikata::Network::TCPStream::addCallbacks ( Callbacks ) [private]

A function to add callbacks to this particular stream, called by the relevant TCPSetCallbacks function inheriting from SetCallbacks.

Duration Sirikata::Network::TCPStream::averageReceiveLatency ( ) const [virtual]

Get the average latency spent in queues in the stream, waiting for deliver to the user.

This does not include time spent locally in the underlying network library or OS.

Reimplemented from Sirikata::Network::Stream.

References mSocket.

Duration Sirikata::Network::TCPStream::averageSendLatency ( ) const [virtual]

Get the average latency spent in queues in the stream.

This does not include time spent locally but buffered in an underlying network library or the OS.

Reimplemented from Sirikata::Network::Stream.

References mSocket.

bool Sirikata::Network::TCPStream::canSend ( size_t  dataSize) const [virtual]

Determine if a message of the specified size could be enqueued to be sent.

Returns:
true if a message of the specified size could be successfully enqueued

Implements Sirikata::Network::Stream.

References Sirikata::Logging::debug, getID(), Sirikata::VariableLength::MAX_SERIALIZED_LENGTH, mID, mSocket, Sirikata::VariableLength::serialize(), and SILOG.

Stream * Sirikata::Network::TCPStream::clone ( const ConnectionCallback connectionCallback,
const ReceivedCallback chunkReceivedCallback,
const ReadySendCallback readySendCallback 
) [virtual]

Creates a new substream on this connection. This is for when the callbacks do not require the Stream*.

Implements Sirikata::Network::Stream.

References mID, mIOStrand, mKernelReceiveBufferSize, mKernelSendBufferSize, mNoDelay, mNumSimultaneousSockets, mSendBufferSize, mSocket, mStreamType, and TCPStream().

Stream * Sirikata::Network::TCPStream::clone ( const SubstreamCallback cb) [virtual]
void Sirikata::Network::TCPStream::close ( ) [virtual]

Close this Stream, notifying the remote endpoint.

If this is the last substream surviving from the original Stream, the underlying connection is closed as well.

Implements Sirikata::Network::Stream.

References closeSendStatus(), Sirikata::Network::MultiplexedSocket::closeStream(), getID(), mSendStatus, and mSocket.

Referenced by ~TCPStream().

bool Sirikata::Network::TCPStream::closeSendStatus ( AtomicValue< int > &  vSendStatus) [static]

Atomically sets the sendStatus for this socket to closed. FIXME: should use atomic compare and swap for |= instead of += right now only supports 2 non-io threads closing at once.

This function waits on the sendStatus clearing up so no outstanding sends are being made (and no further ones WILL be made cus of the SendStatusClosing flag that is on.

FIXME we want to |= here

References Sirikata::AtomicValue< T >::read(), and SendStatusClosing.

Referenced by close(), and Sirikata::Network::MultiplexedSocket::shutDownClosedStream().

void Sirikata::Network::TCPStream::connect ( const Address addy,
const SubstreamCallback substreamCallback,
const ConnectionCallback connectionCallback,
const ReceivedCallback chunkReceivedCallback,
const ReadySendCallback readySend 
) [virtual]
static TCPStream* Sirikata::Network::TCPStream::construct ( Network::IOStrand io,
OptionSet options 
) [inline, static]

References TCPStream().

Referenced by init().

Stream * Sirikata::Network::TCPStream::factory ( ) [virtual]

Create a new Stream of the same type as this stream.

Returns:
a new Stream, completely independent of this one, of the same type and implementation.

Implements Sirikata::Network::Stream.

References mIOStrand, mKernelReceiveBufferSize, mKernelSendBufferSize, mNoDelay, mNumSimultaneousSockets, mSendBufferSize, mStreamType, and TCPStream().

StreamID Sirikata::Network::TCPStream::getID ( ) const [inline]
Address Sirikata::Network::TCPStream::getLocalEndpoint ( ) const [virtual]

Only returns a legitimate address if ConnectionStatus called back, otherwise return Address::null()

Implements Sirikata::Network::Stream.

References Sirikata::Logging::debug, getID(), mID, mSocket, Sirikata::Network::Address::null(), and SILOG.

Address Sirikata::Network::TCPStream::getRemoteEndpoint ( ) const [virtual]

Only returns a legitimate address if ConnectionStatus called back, otherwise return Address::null()

Implements Sirikata::Network::Stream.

References Sirikata::Logging::debug, getID(), mID, mSocket, Sirikata::Network::Address::null(), and SILOG.

void Sirikata::Network::TCPStream::readyRead ( ) [virtual]

There is room on a downstream queue and futher sends should be retried.

Implements Sirikata::Network::Stream.

References Sirikata::Logging::debug, getID(), Sirikata::Network::MultiplexedSocket::ioReactorThreadResumeRead(), mID, mSocket, and SILOG.

void Sirikata::Network::TCPStream::requestReadySendCallback ( ) [virtual]

Send a readySendCallback notification when there is room on the send queue.

Implements Sirikata::Network::Stream.

References Sirikata::Logging::debug, getID(), Sirikata::Network::MultiplexedSocket::ioReactorThreadPauseSend(), mID, mSocket, and SILOG.

bool Sirikata::Network::TCPStream::send ( MemoryReference  firstChunk,
StreamReliability  reliability 
) [virtual]

Implementation of send interface.

Implements Sirikata::Network::Stream.

References Sirikata::DataReference< T >::null().

Referenced by send().

bool Sirikata::Network::TCPStream::send ( const Chunk data,
StreamReliability  reliability 
) [virtual]

Implementation of send interface.

Implements Sirikata::Network::Stream.

References send().

bool Sirikata::Network::TCPStream::send ( MemoryReference  firstChunk,
MemoryReference  secondChunk,
StreamReliability  reliability 
) [virtual]

Implementation of send interface.

this function should never return something larger than the MAX_SERIALIZED_LEGNTH

this is just testing code to fragment send packets

max of 3 entities can close the stream at once (FIXME: should implement |= on atomic ints), but as of now at most the recv thread the sender responsible and a user close() is all that is allowed at once...so 3 is fine)

max of 3 entities can close the stream at once (FIXME: should implement |= on atomic ints), but as of now at most the recv thread the sender responsible and a user close() is all that is allowed at once...so 3 is fine)

Implements Sirikata::Network::Stream.

References BASE64_ZERODELIM, Sirikata::DataReference< T >::begin(), Sirikata::DataReference< T >::data(), Sirikata::Network::MultiplexedSocket::RawRequest::data, Sirikata::Logging::debug, Sirikata::DataReference< T >::end(), getID(), LENGTH_DELIM, Sirikata::Network::Stream::StreamID::MAX_HEX_SERIALIZED_LENGTH, Sirikata::VariableLength::MAX_SERIALIZED_LENGTH, mSendBufferSize, mSendStatus, mSocket, mStreamType, Sirikata::Network::MultiplexedSocket::RawRequest::originStream, Sirikata::Network::ReliableOrdered, Sirikata::Network::ReliableUnordered, RFC_6455, Sirikata::Network::MultiplexedSocket::sendBytes(), SendStatusClosing, Sirikata::VariableLength::serialize(), Sirikata::Network::Stream::StreamID::serializeToHex(), sFragmentPackets, SILOG, Sirikata::DataReference< T >::size(), Sirikata::Network::ASIOSocketWrapper::toBase64ZeroDelim(), Sirikata::Network::MultiplexedSocket::RawRequest::unordered, Sirikata::Network::MultiplexedSocket::RawRequest::unreliable, and Sirikata::Network::Unreliable.

static const char* Sirikata::Network::TCPStream::STRING_PREFIX ( ) [inline, static]
static const char* Sirikata::Network::TCPStream::WEBSOCKET_STRING_PREFIX ( ) [inline, static]

Friends And Related Function Documentation

friend class MultiplexedSocket [friend]
friend class TCPSetCallbacks [friend]

Member Data Documentation

The streamID that must be prepended to the data within any packet sent and all received packets for this Stream.

Referenced by canSend(), clone(), connect(), getID(), getLocalEndpoint(), getRemoteEndpoint(), readyRead(), and requestReadySendCallback().

The low level boost::asio io service handle.

Referenced by clone(), connect(), factory(), and TCPStream().

Referenced by clone(), connect(), factory(), and TCPStream().

Referenced by clone(), connect(), factory(), and TCPStream().

Referenced by clone(), connect(), factory(), and TCPStream().

Referenced by clone(), connect(), factory(), and TCPStream().

Referenced by clone(), connect(), factory(), send(), and TCPStream().

std::tr1::shared_ptr<AtomicValue<int> > Sirikata::Network::TCPStream::mSendStatus [private]

incremented while sending: or'd in SendStatusClosing when close function triggered so no further packets will be sent using old ID.

Referenced by close(), connect(), Sirikata::Network::TCPSetCallbacks::operator()(), and send().

Referenced by clone(), connect(), factory(), send(), and TCPStream().


The documentation for this class was generated from the following files: