Sirikata
|
This is a particular example implementation of the Stream interface sitting atop TCP. More...
#include <TCPStream.hpp>
Classes | |
class | Callbacks |
A pair of callbacks related to a stream. More... | |
Public Types | |
enum | HeaderSizeEnumerant { STRING_PREFIX_LENGTH = 6, TcpSstHeaderSize = 24, MaxWebSocketHeaderSize = 2048 } |
enum | TCPStreamControlCodes { TCPStreamCloseStream = 1, TCPStreamAckCloseStream = 2 } |
enum | StreamType { UNKNOWN, BASE64_ZERODELIM, LENGTH_DELIM, RFC_6455 } |
Public Member Functions | |
StreamID | getID () const |
Returns the active stream ID. | |
TCPStream (IOStrand *, OptionSet *) | |
Constructor which leaves socket in a disconnection state, prepared for a connect() or a clone() | |
TCPStream (const MultiplexedSocketPtr &shared_socket, const Stream::StreamID &) | |
Constructor which brings the socket up to speed in a completely connected state, prepped with a StreamID and communal link pointer. | |
virtual Stream * | factory () |
Create a new Stream of the same type as this stream. | |
virtual void | readyRead () |
There is room on a downstream queue and futher sends should be retried. | |
virtual void | requestReadySendCallback () |
Send a readySendCallback notification when there is room on the send queue. | |
virtual WARN_UNUSED bool | send (MemoryReference, StreamReliability) |
Implementation of send interface. | |
virtual WARN_UNUSED bool | send (MemoryReference, MemoryReference, StreamReliability) |
Implementation of send interface. | |
virtual WARN_UNUSED bool | send (const Chunk &data, StreamReliability) |
Implementation of send interface. | |
virtual bool | canSend (size_t dataSize) const |
Determine if a message of the specified size could be enqueued to be sent. | |
virtual void | connect (const Address &addy, const SubstreamCallback &substreamCallback, const ConnectionCallback &connectionCallback, const ReceivedCallback &chunkReceivedCallback, const ReadySendCallback &readySend) |
Implementation of connect interface. | |
virtual Stream * | clone (const SubstreamCallback &cb) |
Creates a new substream on this connection. | |
virtual Stream * | clone (const ConnectionCallback &connectionCallback, const ReceivedCallback &chunkReceivedCallback, const ReadySendCallback &readySendCallback) |
Creates a new substream on this connection. This is for when the callbacks do not require the Stream*. | |
Address | getRemoteEndpoint () const |
Only returns a legitimate address if ConnectionStatus called back, otherwise return Address::null() | |
Address | getLocalEndpoint () const |
Only returns a legitimate address if ConnectionStatus called back, otherwise return Address::null() | |
virtual void | close () |
Close this Stream, notifying the remote endpoint. | |
~TCPStream () | |
virtual Duration | averageSendLatency () const |
Get the average latency spent in queues in the stream. | |
virtual Duration | averageReceiveLatency () const |
Get the average latency spent in queues in the stream, waiting for deliver to the user. | |
Static Public Member Functions | |
static const char * | STRING_PREFIX () |
static const char * | WEBSOCKET_STRING_PREFIX () |
static bool | closeSendStatus (AtomicValue< int > &vSendStatus) |
Atomically sets the sendStatus for this socket to closed. FIXME: should use atomic compare and swap for |= instead of += right now only supports 2 non-io threads closing at once. | |
static TCPStream * | construct (Network::IOStrand *io, OptionSet *options) |
Static Public Attributes | |
static int | sFragmentPackets = 0 |
Private Types | |
enum | { SendStatusClosing = (1<<29) } |
Private Member Functions | |
void | addCallbacks (Callbacks *) |
A function to add callbacks to this particular stream, called by the relevant TCPSetCallbacks function inheriting from SetCallbacks. | |
TCPStream (IOStrand *, unsigned char mNumSimultaneousSockets, unsigned int mSendBufferSize, bool noDelay, StreamType streamType, unsigned int kernelSendBufferSize, unsigned int kernelReceiveBufferSize) | |
Constructor which leaves socket in a disconnection state, prepared for a connect() or a clone() called internally from factory. | |
Private Attributes | |
IOStrand * | mIOStrand |
The low level boost::asio io service handle. | |
MultiplexedSocketPtr | mSocket |
the shared pointer to the communal sending connection | |
StreamID | mID |
The streamID that must be prepended to the data within any packet sent and all received packets for this Stream. | |
std::tr1::shared_ptr < AtomicValue< int > > | mSendStatus |
incremented while sending: or'd in SendStatusClosing when close function triggered so no further packets will be sent using old ID. | |
unsigned char | mNumSimultaneousSockets |
bool | mNoDelay |
StreamType | mStreamType |
unsigned int | mSendBufferSize |
unsigned int | mKernelSendBufferSize |
unsigned int | mKernelReceiveBufferSize |
Friends | |
class | MultiplexedSocket |
class | TCPSetCallbacks |
This is a particular example implementation of the Stream interface sitting atop TCP.
The protocol specified in the Structured streams paper http://pdos.csail.mit.edu/uia/sst/ May be used as another implementation of the Stream interface. This is provided as the default implementation as it supports a wide range of network devices and is not subject to prejudice against UDP connections which we routinely see on the net (my home system gets 10 second lag on UDP connections just due to ISP policy combating P2P networks)
The protocol is as follows:
--Connection-- To connect to a remote host, the client opens a small number (N) of TCP sockets to that host and sends the version string (currently SSTTCP ), an ascii 2 digit representation of N (in this case '03') and a 16 byte unique ID used to pair up connections and possibly in the future for encryption The other side respinds in turn on each connection with a similar handshake (though the 16 bytes may be different) now the connection is online. The remote host may immediately follow the handshake with live packets and the other side may respond as soon as it receives the remote hosts handshake response
--Live Phase-- The first live stream has StreamID of 1. New streams coming from listener must have even StreamID's and new streams coming from connector must have odd stream ID's. To indicate a new substream, simply send a packet with a new stream ID
--Packet Format-- All packets are prepended by a variable length int30 length and int30 StreamID and then the bytestream of data The length value includes the length of the variable-length StreamID. the int30 format is as follows if the highest value bit in the first byte is 0, the other 7 bits represent numbers 0-128 if the highest value bit in the first byte is 1, and the highest value bit in the second byte is 0 the first 7 bits represent the 0-127 remainder and the first 7 bits in the second byte represent the 0-128 significant 7 bit digit valued between 0 and 16256 if the highest value bit in the first and second bytes are both 1 the value between 0 and 16383 above is added to 16384*the third byte added to 16384*256*the fourth byte
--Shutting down a Stream-- If either side decides to close a stream they must send a control packet (which is the implicit stream with StreamID = default int30, which serializes to a single byte=0) and then a control code which must be a byte equal to 1, and finally a variable length StreamID indicating which stream to close This control packet must be broadcast on every related open TCP socket connection This party may not reuse the streamID (given parity match) until it receives control packets with control code equal to 2 (close Ack) on all sockets. The other side must keep the bargain and send the control packet with control code 2 on all sockets to allow ID reuse.
If all streams are shut down, the sockets may be deactivated If the socket disconnects due to error, then Disconnect callbacks must be called
anonymous enum [private] |
Sirikata::Network::TCPStream::TCPStream | ( | IOStrand * | io, |
unsigned char | mNumSimultaneousSockets, | ||
unsigned int | mSendBufferSize, | ||
bool | noDelay, | ||
StreamType | streamType, | ||
unsigned int | kernelSendBufferSize, | ||
unsigned int | kernelReceiveBufferSize | ||
) | [private] |
Constructor which leaves socket in a disconnection state, prepared for a connect() or a clone() called internally from factory.
References mIOStrand, mKernelReceiveBufferSize, mKernelSendBufferSize, mNoDelay, mNumSimultaneousSockets, mSendBufferSize, and mStreamType.
Referenced by clone(), construct(), and factory().
Constructor which leaves socket in a disconnection state, prepared for a connect() or a clone()
References Sirikata::OptionValue::as(), LENGTH_DELIM, mIOStrand, mKernelReceiveBufferSize, mKernelSendBufferSize, mNoDelay, mNumSimultaneousSockets, mSendBufferSize, mStreamType, Sirikata::OptionSet::referenceOption(), RFC_6455, and sFragmentPackets.
Sirikata::Network::TCPStream::TCPStream | ( | const MultiplexedSocketPtr & | shared_socket, |
const Stream::StreamID & | sid | ||
) |
Constructor which brings the socket up to speed in a completely connected state, prepped with a StreamID and communal link pointer.
References mKernelReceiveBufferSize, mKernelSendBufferSize, mNoDelay, mNumSimultaneousSockets, mSendBufferSize, and mStreamType.
Sirikata::Network::TCPStream::~TCPStream | ( | ) |
References close().
void Sirikata::Network::TCPStream::addCallbacks | ( | Callbacks * | ) | [private] |
A function to add callbacks to this particular stream, called by the relevant TCPSetCallbacks function inheriting from SetCallbacks.
Duration Sirikata::Network::TCPStream::averageReceiveLatency | ( | ) | const [virtual] |
Get the average latency spent in queues in the stream, waiting for deliver to the user.
This does not include time spent locally in the underlying network library or OS.
Reimplemented from Sirikata::Network::Stream.
References mSocket.
Duration Sirikata::Network::TCPStream::averageSendLatency | ( | ) | const [virtual] |
Get the average latency spent in queues in the stream.
This does not include time spent locally but buffered in an underlying network library or the OS.
Reimplemented from Sirikata::Network::Stream.
References mSocket.
bool Sirikata::Network::TCPStream::canSend | ( | size_t | dataSize | ) | const [virtual] |
Determine if a message of the specified size could be enqueued to be sent.
Implements Sirikata::Network::Stream.
References Sirikata::Logging::debug, getID(), Sirikata::VariableLength::MAX_SERIALIZED_LENGTH, mID, mSocket, Sirikata::VariableLength::serialize(), and SILOG.
Stream * Sirikata::Network::TCPStream::clone | ( | const ConnectionCallback & | connectionCallback, |
const ReceivedCallback & | chunkReceivedCallback, | ||
const ReadySendCallback & | readySendCallback | ||
) | [virtual] |
Creates a new substream on this connection. This is for when the callbacks do not require the Stream*.
Implements Sirikata::Network::Stream.
References mID, mIOStrand, mKernelReceiveBufferSize, mKernelSendBufferSize, mNoDelay, mNumSimultaneousSockets, mSendBufferSize, mSocket, mStreamType, and TCPStream().
Stream * Sirikata::Network::TCPStream::clone | ( | const SubstreamCallback & | cb | ) | [virtual] |
Creates a new substream on this connection.
Implements Sirikata::Network::Stream.
References mID, mIOStrand, mKernelReceiveBufferSize, mKernelSendBufferSize, mNoDelay, mNumSimultaneousSockets, mSendBufferSize, mSocket, mStreamType, and TCPStream().
void Sirikata::Network::TCPStream::close | ( | ) | [virtual] |
Close this Stream, notifying the remote endpoint.
If this is the last substream surviving from the original Stream, the underlying connection is closed as well.
Implements Sirikata::Network::Stream.
References closeSendStatus(), Sirikata::Network::MultiplexedSocket::closeStream(), getID(), mSendStatus, and mSocket.
Referenced by ~TCPStream().
bool Sirikata::Network::TCPStream::closeSendStatus | ( | AtomicValue< int > & | vSendStatus | ) | [static] |
Atomically sets the sendStatus for this socket to closed. FIXME: should use atomic compare and swap for |= instead of += right now only supports 2 non-io threads closing at once.
This function waits on the sendStatus clearing up so no outstanding sends are being made (and no further ones WILL be made cus of the SendStatusClosing flag that is on.
FIXME we want to |= here
References Sirikata::AtomicValue< T >::read(), and SendStatusClosing.
Referenced by close(), and Sirikata::Network::MultiplexedSocket::shutDownClosedStream().
void Sirikata::Network::TCPStream::connect | ( | const Address & | addy, |
const SubstreamCallback & | substreamCallback, | ||
const ConnectionCallback & | connectionCallback, | ||
const ReceivedCallback & | chunkReceivedCallback, | ||
const ReadySendCallback & | readySend | ||
) | [virtual] |
Implementation of connect interface.
Implements Sirikata::Network::Stream.
References Sirikata::Network::MultiplexedSocket::getFirstStreamID(), getID(), mID, mIOStrand, mKernelReceiveBufferSize, mKernelSendBufferSize, mNoDelay, mNumSimultaneousSockets, mSendBufferSize, mSendStatus, mSocket, and mStreamType.
static TCPStream* Sirikata::Network::TCPStream::construct | ( | Network::IOStrand * | io, |
OptionSet * | options | ||
) | [inline, static] |
References TCPStream().
Referenced by init().
Stream * Sirikata::Network::TCPStream::factory | ( | ) | [virtual] |
Create a new Stream of the same type as this stream.
Implements Sirikata::Network::Stream.
References mIOStrand, mKernelReceiveBufferSize, mKernelSendBufferSize, mNoDelay, mNumSimultaneousSockets, mSendBufferSize, mStreamType, and TCPStream().
StreamID Sirikata::Network::TCPStream::getID | ( | ) | const [inline] |
Returns the active stream ID.
References mID.
Referenced by canSend(), close(), connect(), getLocalEndpoint(), getRemoteEndpoint(), Sirikata::Network::TCPSetCallbacks::operator()(), readyRead(), requestReadySendCallback(), and send().
Address Sirikata::Network::TCPStream::getLocalEndpoint | ( | ) | const [virtual] |
Only returns a legitimate address if ConnectionStatus called back, otherwise return Address::null()
Implements Sirikata::Network::Stream.
References Sirikata::Logging::debug, getID(), mID, mSocket, Sirikata::Network::Address::null(), and SILOG.
Address Sirikata::Network::TCPStream::getRemoteEndpoint | ( | ) | const [virtual] |
Only returns a legitimate address if ConnectionStatus called back, otherwise return Address::null()
Implements Sirikata::Network::Stream.
References Sirikata::Logging::debug, getID(), mID, mSocket, Sirikata::Network::Address::null(), and SILOG.
void Sirikata::Network::TCPStream::readyRead | ( | ) | [virtual] |
There is room on a downstream queue and futher sends should be retried.
Implements Sirikata::Network::Stream.
References Sirikata::Logging::debug, getID(), Sirikata::Network::MultiplexedSocket::ioReactorThreadResumeRead(), mID, mSocket, and SILOG.
void Sirikata::Network::TCPStream::requestReadySendCallback | ( | ) | [virtual] |
Send a readySendCallback notification when there is room on the send queue.
Implements Sirikata::Network::Stream.
References Sirikata::Logging::debug, getID(), Sirikata::Network::MultiplexedSocket::ioReactorThreadPauseSend(), mID, mSocket, and SILOG.
bool Sirikata::Network::TCPStream::send | ( | MemoryReference | firstChunk, |
StreamReliability | reliability | ||
) | [virtual] |
Implementation of send interface.
Implements Sirikata::Network::Stream.
References Sirikata::DataReference< T >::null().
Referenced by send().
bool Sirikata::Network::TCPStream::send | ( | const Chunk & | data, |
StreamReliability | reliability | ||
) | [virtual] |
bool Sirikata::Network::TCPStream::send | ( | MemoryReference | firstChunk, |
MemoryReference | secondChunk, | ||
StreamReliability | reliability | ||
) | [virtual] |
Implementation of send interface.
this function should never return something larger than the MAX_SERIALIZED_LEGNTH
this is just testing code to fragment send packets
max of 3 entities can close the stream at once (FIXME: should implement |= on atomic ints), but as of now at most the recv thread the sender responsible and a user close() is all that is allowed at once...so 3 is fine)
max of 3 entities can close the stream at once (FIXME: should implement |= on atomic ints), but as of now at most the recv thread the sender responsible and a user close() is all that is allowed at once...so 3 is fine)
Implements Sirikata::Network::Stream.
References BASE64_ZERODELIM, Sirikata::DataReference< T >::begin(), Sirikata::DataReference< T >::data(), Sirikata::Network::MultiplexedSocket::RawRequest::data, Sirikata::Logging::debug, Sirikata::DataReference< T >::end(), getID(), LENGTH_DELIM, Sirikata::Network::Stream::StreamID::MAX_HEX_SERIALIZED_LENGTH, Sirikata::VariableLength::MAX_SERIALIZED_LENGTH, mSendBufferSize, mSendStatus, mSocket, mStreamType, Sirikata::Network::MultiplexedSocket::RawRequest::originStream, Sirikata::Network::ReliableOrdered, Sirikata::Network::ReliableUnordered, RFC_6455, Sirikata::Network::MultiplexedSocket::sendBytes(), SendStatusClosing, Sirikata::VariableLength::serialize(), Sirikata::Network::Stream::StreamID::serializeToHex(), sFragmentPackets, SILOG, Sirikata::DataReference< T >::size(), Sirikata::Network::ASIOSocketWrapper::toBase64ZeroDelim(), Sirikata::Network::MultiplexedSocket::RawRequest::unordered, Sirikata::Network::MultiplexedSocket::RawRequest::unreliable, and Sirikata::Network::Unreliable.
static const char* Sirikata::Network::TCPStream::STRING_PREFIX | ( | ) | [inline, static] |
static const char* Sirikata::Network::TCPStream::WEBSOCKET_STRING_PREFIX | ( | ) | [inline, static] |
friend class MultiplexedSocket [friend] |
friend class TCPSetCallbacks [friend] |
StreamID Sirikata::Network::TCPStream::mID [private] |
The streamID that must be prepended to the data within any packet sent and all received packets for this Stream.
Referenced by canSend(), clone(), connect(), getID(), getLocalEndpoint(), getRemoteEndpoint(), readyRead(), and requestReadySendCallback().
IOStrand* Sirikata::Network::TCPStream::mIOStrand [private] |
The low level boost::asio io service handle.
Referenced by clone(), connect(), factory(), and TCPStream().
unsigned int Sirikata::Network::TCPStream::mKernelReceiveBufferSize [private] |
Referenced by clone(), connect(), factory(), and TCPStream().
unsigned int Sirikata::Network::TCPStream::mKernelSendBufferSize [private] |
Referenced by clone(), connect(), factory(), and TCPStream().
bool Sirikata::Network::TCPStream::mNoDelay [private] |
Referenced by clone(), connect(), factory(), and TCPStream().
unsigned char Sirikata::Network::TCPStream::mNumSimultaneousSockets [private] |
Referenced by clone(), connect(), factory(), and TCPStream().
unsigned int Sirikata::Network::TCPStream::mSendBufferSize [private] |
Referenced by clone(), connect(), factory(), send(), and TCPStream().
std::tr1::shared_ptr<AtomicValue<int> > Sirikata::Network::TCPStream::mSendStatus [private] |
incremented while sending: or'd in SendStatusClosing when close function triggered so no further packets will be sent using old ID.
Referenced by close(), connect(), Sirikata::Network::TCPSetCallbacks::operator()(), and send().
the shared pointer to the communal sending connection
Referenced by averageReceiveLatency(), averageSendLatency(), canSend(), clone(), close(), connect(), getLocalEndpoint(), getRemoteEndpoint(), readyRead(), requestReadySendCallback(), and send().
Referenced by clone(), connect(), factory(), send(), and TCPStream().
int Sirikata::Network::TCPStream::sFragmentPackets = 0 [static] |