Sirikata
|
00001 // Copyright (c) 2011 Sirikata Authors. All rights reserved. 00002 // Use of this source code is governed by a BSD-style license that can 00003 // be found in the LICENSE file. 00004 00005 #ifndef _SIRIKATA_CORE_RECORD_SST_STREAM_HPP_ 00006 #define _SIRIKATA_CORE_RECORD_SST_STREAM_HPP_ 00007 00008 #include <sirikata/core/util/Platform.hpp> 00009 #include <sirikata/core/network/Frame.hpp> 00010 #include <sirikata/core/util/Liveness.hpp> 00011 #include <sirikata/core/util/Time.hpp> 00012 00013 namespace Sirikata { 00014 00021 template<typename StreamPtrType> 00022 class RecordSSTStream : public Liveness, public Noncopyable { 00023 public: 00024 typedef std::tr1::function<void(MemoryReference)> RecordCallback; 00025 00026 RecordSSTStream() {} 00027 ~RecordSSTStream() { 00028 destroy(); 00029 } 00030 00031 void initialize(StreamPtrType stream, RecordCallback cb) { 00032 mStream = stream; 00033 mCB = cb; 00034 00035 using std::tr1::placeholders::_1; 00036 using std::tr1::placeholders::_2; 00037 00038 assert(mStream); 00039 mStream->registerReadCallback( 00040 std::tr1::bind( 00041 &RecordSSTStream::handleRead, this, 00042 _1, _2 00043 ) 00044 ); 00045 } 00046 00047 void write(const MemoryReference& data) { 00048 outstanding.push(Network::Frame::write(data.begin(), data.size())); 00049 writeSomeData(livenessToken()); 00050 } 00051 00052 void destroy() { 00053 letDie(); 00054 00055 if (!mStream) return; 00056 mStream->registerReadCallback(0); 00057 } 00058 private: 00059 00060 RecordSSTStream(const RecordSSTStream&); 00061 00062 void handleRead(uint8* data, int size) { 00063 partial_frame.append((const char*)data, size); 00064 while(true) { 00065 String parsed = Network::Frame::parse(partial_frame); 00066 if (parsed.empty()) return; 00067 mCB( MemoryReference(parsed) ); 00068 } 00069 } 00070 00071 void writeSomeData(Liveness::Token alive) { 00072 if (!alive) return; 00073 00074 static Duration retry_rate = Duration::milliseconds((int64)1); 00075 00076 writing = true; 00077 00078 if (!mStream) { 00079 // We're still waiting on the stream. Initialization should trigger 00080 // writing if it gets a stream and there's data waiting. 00081 writing = false; 00082 return; 00083 } 00084 00085 // Otherwise, keep sending until we run out or 00086 while(!outstanding.empty()) { 00087 std::string& framed_msg = outstanding.front(); 00088 int bytes_written = mStream->write((const uint8*)framed_msg.data(), framed_msg.size()); 00089 if (bytes_written < 0) { 00090 // FIXME 00091 break; 00092 } 00093 else if (bytes_written < (int)framed_msg.size()) { 00094 framed_msg = framed_msg.substr(bytes_written); 00095 break; 00096 } 00097 else { 00098 outstanding.pop(); 00099 } 00100 } 00101 00102 if (outstanding.empty()) 00103 writing = false; 00104 else 00105 mStream->getContext()->mainStrand->post( 00106 retry_rate, 00107 std::tr1::bind(&RecordSSTStream::writeSomeData, this, alive), 00108 "RecordSSTStream::writeSomeData" 00109 ); 00110 } 00111 00112 StreamPtrType mStream; 00113 RecordCallback mCB; 00114 00115 // Outstanding data to be sent. FIXME efficiency 00116 std::queue<std::string> outstanding; 00117 // If writing is currently in progress 00118 bool writing; 00119 00120 // Backlog of data, i.e. incomplete frame 00121 String partial_frame; 00122 }; // class RecordSSTStream 00123 00124 } // namespace Sirikata 00125 00126 #endif //_SIRIKATA_CORE_RECORD_SST_STREAM_HPP_