Sirikata
libcore/include/sirikata/core/network/Stream.hpp
Go to the documentation of this file.
00001 /*  Sirikata Network Utilities
00002  *  Stream.hpp
00003  *
00004  *  Copyright (c) 2009, Daniel Reiter Horn
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef SIRIKATA_Stream_HPP__
00034 #define SIRIKATA_Stream_HPP__
00035 
00036 #include <sirikata/core/network/Address.hpp>
00037 #include <sirikata/core/util/Time.hpp>
00038 
00039 namespace Sirikata {
00040 namespace Network {
00041 
00043 enum StreamReliability {
00044     Unreliable,
00045     ReliableUnordered,
00046     ReliableOrdered
00047 };
00048 
00064 class SIRIKATA_EXPORT Stream {
00065 protected:
00066     Stream(){}
00067 public:
00068     class SetCallbacks;
00069 
00070     virtual ~Stream(){};
00071 
00075     class StreamID: public vuint32 {
00076     public:
00077         StreamID(uint32 input) :vuint32(input){
00078         }
00079         StreamID(){}
00080         enum {
00081             MAX_HEX_SERIALIZED_LENGTH=sizeof(uint32)*2+1
00082         };
00083         static uint8 fromHex(char c) {
00084             if (c>='0'&&c<='9') return c-'0';
00085             if (c>='A'&&c<='Z') return (c-'A')+10;
00086             return (c-'a')+10;
00087         }
00088         static char toHex(uint8 h) {
00089             if (h<=9) return '0'+h;
00090             return (h-10)+'A';
00091         }
00092         unsigned int serializeToHex(uint8 *destination, unsigned int maxsize) const {
00093             assert(maxsize>1);
00094             uint32 tmp = read();
00095             unsigned int count=0;
00096             do {
00097                 destination[count++]=toHex(tmp%16);
00098                 tmp/=16;
00099             }while(count<maxsize&&tmp);
00100             std::reverse(destination,destination+count);
00101             destination[count++]='%';
00102             return count;
00103         }
00104         bool unserializeFromHex(const uint8*src, unsigned int&size) {
00105             if (size==0) return false;
00106             unsigned int maxsize=size;
00107             uint64 retval=0;
00108             uint64 temp=0;
00109             bool success=false;
00110             for (size=0;size<maxsize;++size) {
00111                 char c=src[size];
00112                 if (c=='%') {
00113                     ++size;
00114                     success=true;
00115                     break;
00116                 }
00117                 retval*=16;
00118                 retval+=fromHex(c);
00119             }
00120             *this=StreamID((uint32)retval);
00121             return success;
00122         }
00123 
00124     };
00125 
00127     enum ConnectionStatus {
00128         Connected,
00129         ConnectionFailed,
00130         Disconnected
00131     };
00132 
00137     typedef std::tr1::function<void(void)> PauseReceiveCallback;
00138 
00143     typedef std::tr1::function<void(ConnectionStatus,const std::string&reason)> ConnectionCallback;
00144 
00151     typedef std::tr1::function<void(Stream*,SetCallbacks&)> SubstreamCallback;
00152 
00165     typedef std::tr1::function<void(Chunk&, const PauseReceiveCallback&)> ReceivedCallback;
00166 
00170     typedef std::tr1::function<void()> ReadySendCallback;
00171 
00173     class SetCallbacks : Noncopyable {
00174     public:
00175         virtual ~SetCallbacks(){}
00180         virtual void operator()(const Stream::ConnectionCallback &connectionCallback,
00181                                 const Stream::ReceivedCallback &receivedCallback,
00182                                 const Stream::ReadySendCallback&readySendCallback)=0;
00183     };
00184 
00188     static void ignoreSubstreamCallback(Stream * stream, SetCallbacks&);
00190     static void ignoreConnectionCallback(ConnectionStatus status,const std::string&reason);
00192     static void ignoreReceivedCallback(const Chunk&, const PauseReceiveCallback& );
00194     static void ignoreReadySendCallback();
00195 
00205     virtual void connect(
00206         const Address& addr,
00207         const SubstreamCallback& substreamCallback,
00208         const ConnectionCallback& connectionCallback,
00209         const ReceivedCallback& receivedCallback,
00210         const ReadySendCallback& readySendCallback)=0;
00211 
00215     virtual Stream* factory()=0;
00216 
00222     virtual Stream* clone(const SubstreamCallback&cb)=0;
00229     virtual Stream* clone(const ConnectionCallback &connectionCallback,
00230                           const ReceivedCallback&chunkReceivedCallback,
00231                           const ReadySendCallback&readySendCallback)=0;
00232 
00237     virtual void readyRead()=0;
00238 
00239     /* Request that the Stream call the ReadySendCallback specified by the user at stream creation when the
00240      * Stream is able to send additional data.  This can be called after a call to send() fails so the user
00241      * is notified when a message of the same size can be enqueued for sending. Calling this method
00242      * guarantees the user will receive the callback, even if space became available between the failure
00243      * of the send() call and the call to this method. Note that multiple requests may be handled by a
00244      * single callback and the callback may be invoked inside this method.
00245      */
00246     virtual void requestReadySendCallback()=0;
00247 
00254     virtual bool send(MemoryReference msg, StreamReliability reliability)=0;
00262     virtual bool send(MemoryReference first_msg, MemoryReference second_msg, StreamReliability reliability)=0;
00269     virtual bool send(const Chunk&data, StreamReliability reliability)=0;
00270 
00274     virtual bool canSend(size_t dataSize)const=0;
00275 
00279     virtual Address getRemoteEndpoint() const=0;
00283     virtual Address getLocalEndpoint() const=0;
00284 
00288     virtual void close()=0;
00289 
00290 
00291     // -- Statistics
00292 
00296     virtual Duration averageSendLatency() const {
00297         return Duration::zero();
00298     }
00299 
00304     virtual Duration averageReceiveLatency() const {
00305         return Duration::zero();
00306     }
00307 
00308 };
00309 } // namespace Network
00310 } // namespace Sirikata
00311 
00312 #endif //SIRIKATA_Stream_HPP__