Sirikata
libcore/plugins/tcpsst/ASIOSocketWrapper.hpp
Go to the documentation of this file.
00001 /*  Sirikata Network Utilities
00002  *  ASIOSocketWrapper.hpp
00003  *
00004  *  Copyright (c) 2009, Daniel Reiter Horn
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 #include <sirikata/core/util/UUID.hpp>
00033 #include <sirikata/core/queue/SizedThreadSafeQueue.hpp>
00034 #include <sirikata/core/util/Time.hpp>
00035 #include <sirikata/core/util/EWA.hpp>
00036 #include <sirikata/core/network/IOStrand.hpp>
00037 
00038 #define SEND_LATENCY_EWA_ALPHA .10f
00039 
00040 namespace Sirikata { namespace Network {
00041 class ASIOSocketWrapper;
00042 class ASIOReadBuffer;
00043 
00044 void triggerMultiplexedConnectionError(MultiplexedSocket*,ASIOSocketWrapper*,const boost::system::error_code &error);
00045 void ASIOLogBuffer(void * pointerkey, const char extension[16], const uint8* buffer, size_t buffersize);
00046 class ASIOSocketWrapper {
00047     IOStrand* mStrand;
00048     TCPSocket*mSocket;
00049 
00050     ASIOReadBuffer *mReadBuffer;
00056     AtomicValue<uint32> mSendingStatus;
00057 
00058     struct TimestampedChunk {
00059         TimestampedChunk()
00060          : chunk(NULL), time(Time::null())
00061         {}
00062 
00063         TimestampedChunk(Chunk* _c)
00064          : chunk(_c), time(Time::local())
00065         {}
00066 
00067         uint32 size() const {
00068             return chunk->size();
00069         }
00070 
00071         Duration sinceCreation() const {
00072             return Time::local() - time;
00073         }
00074 
00075         Chunk* chunk;
00076         Time time;
00077     };
00078 
00082     SizedThreadSafeQueue<TimestampedChunk>mSendQueue;
00083     enum {
00084         ASYNCHRONOUS_SEND_FLAG=(1<<29),
00085         QUEUE_CHECK_FLAG=(1<<30),
00086 
00087     };
00088     EWA<Duration> mAverageSendLatency;
00089 
00090     std::vector<Stream::StreamID> mPausedSendStreams;
00091     std::deque<TimestampedChunk> mToSend;
00092     std::tr1::weak_ptr<MultiplexedSocket>mParent;
00093     std::tr1::shared_ptr<MultiplexedSocket>mOutstandingDataParent;
00095     void finishedSendingChunk(const TimestampedChunk& tc);
00096 
00097 
00098     typedef boost::system::error_code ErrorCode;
00099     std::tr1::function<void(const ErrorCode &error, std::size_t bytes_sent)>mSendManyDequeItems;
00105     void finishAsyncSend(const MultiplexedSocketPtr&parentMultiSocket);
00106 
00112     void sendManyDequeItems(const std::tr1::weak_ptr<MultiplexedSocket>&parentMultiSocket, const ErrorCode &error, std::size_t bytes_sent);
00113 
00117     void sendToWire(const MultiplexedSocketPtr&parentMultiSocket, TimestampedChunk toSend);
00118 
00123     void sendToWire(const MultiplexedSocketPtr&parentMultiSocket, std::deque<TimestampedChunk>&const_toSend);
00124 
00134     void retryQueuedSend(const MultiplexedSocketPtr&parentMultiSocket, uint32 current_status);
00135 
00136 public:
00137 
00138     ASIOSocketWrapper(TCPSocket* socket,uint32 queuedBufferSize,uint32 sendBufferSize, const MultiplexedSocketPtr&parent)
00139      : mSocket(socket),
00140        mReadBuffer(NULL),
00141        mSendingStatus(0),
00142        mSendQueue(SizedResourceMonitor(queuedBufferSize)),
00143        mAverageSendLatency(SEND_LATENCY_EWA_ALPHA),
00144        mParent(parent)
00145     {
00146         //mPacketLogger.reserve(268435456);
00147         bindFunctions(parent);
00148     }
00149 
00150     ASIOSocketWrapper(const ASIOSocketWrapper& socket)
00151      : mSocket(socket.mSocket),
00152        mReadBuffer(NULL),
00153        mSendingStatus(0),
00154        mSendQueue(socket.getResourceMonitor()),
00155        mAverageSendLatency(SEND_LATENCY_EWA_ALPHA)
00156     {
00157         MultiplexedSocketPtr parent(socket.mParent.lock());
00158         mParent=parent;
00159         bindFunctions(parent);
00160     }
00161 
00162     ASIOSocketWrapper(uint32 queuedBufferSize,uint32 sendBufferSize,const MultiplexedSocketPtr&parent)
00163      : mSocket(NULL),
00164        mReadBuffer(NULL),
00165        mSendingStatus(0),
00166        mSendQueue(SizedResourceMonitor(queuedBufferSize)),
00167        mAverageSendLatency(SEND_LATENCY_EWA_ALPHA),
00168        mParent(parent)
00169     {
00170         bindFunctions(parent);
00171     }
00172     void bindFunctions(const MultiplexedSocketPtr&parent);
00173     ~ASIOSocketWrapper() {
00174         if (mToSend.size()!=0) {
00175             SILOG(tcpsst,error,"Outstanding data left on socket that is being deleted. mOutstandingDataParent is "<<(size_t)mOutstandingDataParent.get());
00176         }
00177         assert(mToSend.size()==0);
00178     }
00179 
00180     ASIOSocketWrapper&operator=(const ASIOSocketWrapper& socket){
00181         mSocket=socket.mSocket;
00182         return *this;
00183     }
00184     const SizedResourceMonitor&getResourceMonitor()const{return mSendQueue.getResourceMonitor();}
00185 
00186     void clearReadBuffer(){
00187         mReadBuffer=NULL;
00188     }
00189     ASIOReadBuffer*getReadBuffer()const{
00190         return mReadBuffer;
00191     }
00192     ASIOReadBuffer* setReadBuffer(ASIOReadBuffer*arb) {
00193         return mReadBuffer=arb;
00194     }
00195     TCPSocket&getSocket() {return *mSocket;}
00196 
00197     const TCPSocket&getSocket()const {return *mSocket;}
00198 
00200     void shutdownAndClose();
00201 
00203     void createSocket(unsigned int kernelSendBufferSize, unsigned int kernelReceiveBufferSize);
00204 
00206     void destroySocket();
00216     bool rawSend(const MultiplexedSocketPtr&parentMultiSocket, Chunk * chunk, bool force);
00217     bool canSend(size_t dataSize)const;
00218     static Chunk*constructControlPacket(const MultiplexedSocketPtr&parentMultiSocket, TCPStream::TCPStreamControlCodes code,const Stream::StreamID&sid);
00222     static Chunk* constructPing(const MultiplexedSocketPtr&parentMultiSocket, MemoryReference data, bool isPong);
00227     void sendControlPacket(const MultiplexedSocketPtr&parentMultiSocket, TCPStream::TCPStreamControlCodes code,const Stream::StreamID&sid) {
00228         rawSend(parentMultiSocket,constructControlPacket(parentMultiSocket, code,sid),true);
00229     }
00233     void sendProtocolHeader(const MultiplexedSocketPtr&parentMultiSocket, const Address& address, const UUID&value, unsigned int numConnections);
00234     void sendServerProtocolHeader(const MultiplexedSocketPtr& thus, const std::string&origin, const std::string&host, const std::string&port, const std::string&resource_name, const std::string&subprotocol, bool doSendSubProtocol, const std::string& response);
00235 
00236     void ioReactorThreadPauseStream(const MultiplexedSocketPtr&parentMultiSocket, Stream::StreamID sid);
00237     void unpauseSendStreams(const MultiplexedSocketPtr&parentMultiSocket);
00238     Address getRemoteEndpoint()const;
00239     Address getLocalEndpoint()const;
00240 
00241     // -- Statistics
00242     Duration averageSendLatency() const;
00243     Duration averageReceiveLatency() const;
00244     //converts 3 arrays into a contiguous array of base64 numbers, delimited with a '\0' at the end.
00245     static Chunk* toBase64ZeroDelim(const MemoryReference&a, const MemoryReference&b, const MemoryReference&c, const MemoryReference *bytesToPrependUnencoded=NULL);
00247     static UUID massageUUID(const UUID&);
00248 class CheckCRLF {
00249     const Array<uint8,TCPStream::MaxWebSocketHeaderSize> *mArray;
00250     unsigned int mLastTransferred;
00251 public:
00252     CheckCRLF(const Array<uint8,TCPStream::MaxWebSocketHeaderSize>*array) {
00253         mArray=array;
00254         mLastTransferred=0;
00255 
00256     }
00257     size_t operator() (const ErrorCode&error, size_t bytes_transferred);
00258 };
00259 
00260 
00261 };
00262 
00263 
00264 
00265 } }