Sirikata
libcore/include/sirikata/core/network/RecordSSTStream.hpp
Go to the documentation of this file.
00001 // Copyright (c) 2011 Sirikata Authors. All rights reserved.
00002 // Use of this source code is governed by a BSD-style license that can
00003 // be found in the LICENSE file.
00004 
00005 #ifndef _SIRIKATA_CORE_RECORD_SST_STREAM_HPP_
00006 #define _SIRIKATA_CORE_RECORD_SST_STREAM_HPP_
00007 
00008 #include <sirikata/core/util/Platform.hpp>
00009 #include <sirikata/core/network/Frame.hpp>
00010 #include <sirikata/core/util/Liveness.hpp>
00011 #include <sirikata/core/util/Time.hpp>
00012 
00013 namespace Sirikata {
00014 
00021 template<typename StreamPtrType>
00022 class RecordSSTStream : public Liveness, public Noncopyable {
00023 public:
00024     typedef std::tr1::function<void(MemoryReference)> RecordCallback;
00025 
00026     RecordSSTStream() {}
00027     ~RecordSSTStream() {
00028         destroy();
00029     }
00030 
00031     void initialize(StreamPtrType stream, RecordCallback cb) {
00032         mStream = stream;
00033         mCB = cb;
00034 
00035         using std::tr1::placeholders::_1;
00036         using std::tr1::placeholders::_2;
00037 
00038         assert(mStream);
00039         mStream->registerReadCallback(
00040             std::tr1::bind(
00041                 &RecordSSTStream::handleRead, this,
00042                 _1, _2
00043             )
00044         );
00045     }
00046 
00047     void write(const MemoryReference& data) {
00048         outstanding.push(Network::Frame::write(data.begin(), data.size()));
00049         writeSomeData(livenessToken());
00050     }
00051 
00052     void destroy() {
00053         letDie();
00054 
00055         if (!mStream) return;
00056         mStream->registerReadCallback(0);
00057     }
00058 private:
00059 
00060     RecordSSTStream(const RecordSSTStream&);
00061     
00062     void handleRead(uint8* data, int size) {
00063         partial_frame.append((const char*)data, size);
00064         while(true) {
00065             String parsed = Network::Frame::parse(partial_frame);
00066             if (parsed.empty()) return;
00067             mCB( MemoryReference(parsed) );
00068         }
00069     }
00070 
00071     void writeSomeData(Liveness::Token alive) {
00072         if (!alive) return;
00073 
00074         static Duration retry_rate = Duration::milliseconds((int64)1);
00075 
00076         writing = true;
00077 
00078         if (!mStream) {
00079             // We're still waiting on the stream. Initialization should trigger
00080             // writing if it gets a stream and there's data waiting.
00081             writing = false;
00082             return;
00083         }
00084 
00085         // Otherwise, keep sending until we run out or
00086         while(!outstanding.empty()) {
00087             std::string& framed_msg = outstanding.front();
00088             int bytes_written = mStream->write((const uint8*)framed_msg.data(), framed_msg.size());
00089             if (bytes_written < 0) {
00090                 // FIXME
00091                 break;
00092             }
00093             else if (bytes_written < (int)framed_msg.size()) {
00094                 framed_msg = framed_msg.substr(bytes_written);
00095                 break;
00096             }
00097             else {
00098                 outstanding.pop();
00099             }
00100         }
00101 
00102         if (outstanding.empty())
00103             writing = false;
00104         else
00105             mStream->getContext()->mainStrand->post(
00106                 retry_rate,
00107                 std::tr1::bind(&RecordSSTStream::writeSomeData, this, alive),
00108                 "RecordSSTStream::writeSomeData"
00109             );
00110     }
00111 
00112     StreamPtrType mStream;
00113     RecordCallback mCB;
00114 
00115     // Outstanding data to be sent. FIXME efficiency
00116     std::queue<std::string> outstanding;
00117     // If writing is currently in progress
00118     bool writing;
00119 
00120     // Backlog of data, i.e. incomplete frame
00121     String partial_frame;
00122 }; // class RecordSSTStream
00123 
00124 } // namespace Sirikata
00125 
00126 #endif //_SIRIKATA_CORE_RECORD_SST_STREAM_HPP_