Sirikata
|
00001 /* Sirikata Network Utilities 00002 * ASIOSocketWrapper.hpp 00003 * 00004 * Copyright (c) 2009, Daniel Reiter Horn 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 #include <sirikata/core/util/UUID.hpp> 00033 #include <sirikata/core/queue/SizedThreadSafeQueue.hpp> 00034 #include <sirikata/core/util/Time.hpp> 00035 #include <sirikata/core/util/EWA.hpp> 00036 #include <sirikata/core/network/IOStrand.hpp> 00037 00038 #define SEND_LATENCY_EWA_ALPHA .10f 00039 00040 namespace Sirikata { namespace Network { 00041 class ASIOSocketWrapper; 00042 class ASIOReadBuffer; 00043 00044 void triggerMultiplexedConnectionError(MultiplexedSocket*,ASIOSocketWrapper*,const boost::system::error_code &error); 00045 void ASIOLogBuffer(void * pointerkey, const char extension[16], const uint8* buffer, size_t buffersize); 00046 class ASIOSocketWrapper { 00047 IOStrand* mStrand; 00048 TCPSocket*mSocket; 00049 00050 ASIOReadBuffer *mReadBuffer; 00056 AtomicValue<uint32> mSendingStatus; 00057 00058 struct TimestampedChunk { 00059 TimestampedChunk() 00060 : chunk(NULL), time(Time::null()) 00061 {} 00062 00063 TimestampedChunk(Chunk* _c) 00064 : chunk(_c), time(Time::local()) 00065 {} 00066 00067 uint32 size() const { 00068 return chunk->size(); 00069 } 00070 00071 Duration sinceCreation() const { 00072 return Time::local() - time; 00073 } 00074 00075 Chunk* chunk; 00076 Time time; 00077 }; 00078 00082 SizedThreadSafeQueue<TimestampedChunk>mSendQueue; 00083 enum { 00084 ASYNCHRONOUS_SEND_FLAG=(1<<29), 00085 QUEUE_CHECK_FLAG=(1<<30), 00086 00087 }; 00088 EWA<Duration> mAverageSendLatency; 00089 00090 std::vector<Stream::StreamID> mPausedSendStreams; 00091 std::deque<TimestampedChunk> mToSend; 00092 std::tr1::weak_ptr<MultiplexedSocket>mParent; 00093 std::tr1::shared_ptr<MultiplexedSocket>mOutstandingDataParent; 00095 void finishedSendingChunk(const TimestampedChunk& tc); 00096 00097 00098 typedef boost::system::error_code ErrorCode; 00099 std::tr1::function<void(const ErrorCode &error, std::size_t bytes_sent)>mSendManyDequeItems; 00105 void finishAsyncSend(const MultiplexedSocketPtr&parentMultiSocket); 00106 00112 void sendManyDequeItems(const std::tr1::weak_ptr<MultiplexedSocket>&parentMultiSocket, const ErrorCode &error, std::size_t bytes_sent); 00113 00117 void sendToWire(const MultiplexedSocketPtr&parentMultiSocket, TimestampedChunk toSend); 00118 00123 void sendToWire(const MultiplexedSocketPtr&parentMultiSocket, std::deque<TimestampedChunk>&const_toSend); 00124 00134 void retryQueuedSend(const MultiplexedSocketPtr&parentMultiSocket, uint32 current_status); 00135 00136 public: 00137 00138 ASIOSocketWrapper(TCPSocket* socket,uint32 queuedBufferSize,uint32 sendBufferSize, const MultiplexedSocketPtr&parent) 00139 : mSocket(socket), 00140 mReadBuffer(NULL), 00141 mSendingStatus(0), 00142 mSendQueue(SizedResourceMonitor(queuedBufferSize)), 00143 mAverageSendLatency(SEND_LATENCY_EWA_ALPHA), 00144 mParent(parent) 00145 { 00146 //mPacketLogger.reserve(268435456); 00147 bindFunctions(parent); 00148 } 00149 00150 ASIOSocketWrapper(const ASIOSocketWrapper& socket) 00151 : mSocket(socket.mSocket), 00152 mReadBuffer(NULL), 00153 mSendingStatus(0), 00154 mSendQueue(socket.getResourceMonitor()), 00155 mAverageSendLatency(SEND_LATENCY_EWA_ALPHA) 00156 { 00157 MultiplexedSocketPtr parent(socket.mParent.lock()); 00158 mParent=parent; 00159 bindFunctions(parent); 00160 } 00161 00162 ASIOSocketWrapper(uint32 queuedBufferSize,uint32 sendBufferSize,const MultiplexedSocketPtr&parent) 00163 : mSocket(NULL), 00164 mReadBuffer(NULL), 00165 mSendingStatus(0), 00166 mSendQueue(SizedResourceMonitor(queuedBufferSize)), 00167 mAverageSendLatency(SEND_LATENCY_EWA_ALPHA), 00168 mParent(parent) 00169 { 00170 bindFunctions(parent); 00171 } 00172 void bindFunctions(const MultiplexedSocketPtr&parent); 00173 ~ASIOSocketWrapper() { 00174 if (mToSend.size()!=0) { 00175 SILOG(tcpsst,error,"Outstanding data left on socket that is being deleted. mOutstandingDataParent is "<<(size_t)mOutstandingDataParent.get()); 00176 } 00177 assert(mToSend.size()==0); 00178 } 00179 00180 ASIOSocketWrapper&operator=(const ASIOSocketWrapper& socket){ 00181 mSocket=socket.mSocket; 00182 return *this; 00183 } 00184 const SizedResourceMonitor&getResourceMonitor()const{return mSendQueue.getResourceMonitor();} 00185 00186 void clearReadBuffer(){ 00187 mReadBuffer=NULL; 00188 } 00189 ASIOReadBuffer*getReadBuffer()const{ 00190 return mReadBuffer; 00191 } 00192 ASIOReadBuffer* setReadBuffer(ASIOReadBuffer*arb) { 00193 return mReadBuffer=arb; 00194 } 00195 TCPSocket&getSocket() {return *mSocket;} 00196 00197 const TCPSocket&getSocket()const {return *mSocket;} 00198 00200 void shutdownAndClose(); 00201 00203 void createSocket(unsigned int kernelSendBufferSize, unsigned int kernelReceiveBufferSize); 00204 00206 void destroySocket(); 00216 bool rawSend(const MultiplexedSocketPtr&parentMultiSocket, Chunk * chunk, bool force); 00217 bool canSend(size_t dataSize)const; 00218 static Chunk*constructControlPacket(const MultiplexedSocketPtr&parentMultiSocket, TCPStream::TCPStreamControlCodes code,const Stream::StreamID&sid); 00222 static Chunk* constructPing(const MultiplexedSocketPtr&parentMultiSocket, MemoryReference data, bool isPong); 00227 void sendControlPacket(const MultiplexedSocketPtr&parentMultiSocket, TCPStream::TCPStreamControlCodes code,const Stream::StreamID&sid) { 00228 rawSend(parentMultiSocket,constructControlPacket(parentMultiSocket, code,sid),true); 00229 } 00233 void sendProtocolHeader(const MultiplexedSocketPtr&parentMultiSocket, const Address& address, const UUID&value, unsigned int numConnections); 00234 void sendServerProtocolHeader(const MultiplexedSocketPtr& thus, const std::string&origin, const std::string&host, const std::string&port, const std::string&resource_name, const std::string&subprotocol, bool doSendSubProtocol, const std::string& response); 00235 00236 void ioReactorThreadPauseStream(const MultiplexedSocketPtr&parentMultiSocket, Stream::StreamID sid); 00237 void unpauseSendStreams(const MultiplexedSocketPtr&parentMultiSocket); 00238 Address getRemoteEndpoint()const; 00239 Address getLocalEndpoint()const; 00240 00241 // -- Statistics 00242 Duration averageSendLatency() const; 00243 Duration averageReceiveLatency() const; 00244 //converts 3 arrays into a contiguous array of base64 numbers, delimited with a '\0' at the end. 00245 static Chunk* toBase64ZeroDelim(const MemoryReference&a, const MemoryReference&b, const MemoryReference&c, const MemoryReference *bytesToPrependUnencoded=NULL); 00247 static UUID massageUUID(const UUID&); 00248 class CheckCRLF { 00249 const Array<uint8,TCPStream::MaxWebSocketHeaderSize> *mArray; 00250 unsigned int mLastTransferred; 00251 public: 00252 CheckCRLF(const Array<uint8,TCPStream::MaxWebSocketHeaderSize>*array) { 00253 mArray=array; 00254 mLastTransferred=0; 00255 00256 } 00257 size_t operator() (const ErrorCode&error, size_t bytes_transferred); 00258 }; 00259 00260 00261 }; 00262 00263 00264 00265 } }