Sirikata
liboh/include/sirikata/oh/QueueRouterElement.hpp
Go to the documentation of this file.
00001 /*  Sirikata
00002  *  QueueRouterElement.hpp
00003  *
00004  *  Copyright (c) 2009, Ewen Cheslack-Postava
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef _SIRIKATA_QUEUE_ROUTER_ELEMENT_
00034 #define _SIRIKATA_QUEUE_ROUTER_ELEMENT_
00035 
00036 #include "RouterElement.hpp"
00037 #include <boost/thread/mutex.hpp>
00038 #include <boost/thread/locks.hpp>
00039 
00040 namespace Sirikata {
00041 
00047 template<typename PacketType>
00048 class QueueRouterElement : public UpstreamElementFixed<PacketType, 1>, public DownstreamElementFixed<PacketType, 1> {
00049     typedef boost::mutex mutex;
00050     typedef boost::unique_lock<mutex> unique_lock;
00051 public:
00052     typedef std::tr1::function<uint32(PacketType*)> SizeFunctor;
00053 
00054     QueueRouterElement(uint32 max_size, SizeFunctor sizefunc)
00055      : mSizeFunctor(sizefunc),
00056        mMaxSize(max_size),
00057        mSize(0),
00058        mWentNonEmpty(false)
00059     {
00060     }
00061 
00062     ~QueueRouterElement() {
00063         PacketType* pkt = NULL;
00064         while( (pkt = pull()) != NULL)
00065             delete pkt;
00066     }
00067 
00068     bool empty() {
00069         return (mSize.read() == 0);
00070     }
00071 
00072     bool push(PacketType* pkt) {
00073         return push(0, pkt);
00074     }
00075     virtual bool push(uint32 port, PacketType* pkt) {
00076         return push(port, pkt, false);
00077     }
00078     bool push(PacketType* pkt, bool force) {
00079         return push(0, pkt, force);
00080     }
00081     bool push(uint32 port, PacketType* pkt, bool force) {
00082         assert(pkt != NULL);
00083         assert(port == 0);
00084 
00085         uint32 sz = mSizeFunctor(pkt);
00086 
00087         uint32 cur_size = mSize.read();
00088         uint32 new_size = cur_size + sz;
00089 
00090         if (new_size > mMaxSize && !force) {
00091             // drop
00092             return false;
00093         }
00094 
00095         {
00096             unique_lock guard(mMutex);
00097             // else store, must recompute since mSize may have changed
00098             if (mSize.read() == 0)
00099                 mWentNonEmpty = true;
00100             mSize += sz;
00101             mPackets.push(pkt);
00102         }
00103 
00104         return true;
00105     }
00106 
00110     bool wentNonEmpty() {
00111         bool wne = mWentNonEmpty;
00112         mWentNonEmpty = false;
00113         return wne;
00114     }
00115 
00116     PacketType* pull() {
00117         return pull(0);
00118     }
00119     virtual PacketType* pull(uint32 port) {
00120         assert(port == 0);
00121 
00122         uint32 cur_size = mSize.read();
00123 
00124         if (cur_size == 0)
00125             return NULL;
00126 
00127         PacketType* pkt = NULL;
00128         {
00129             unique_lock guard(mMutex);
00130 
00131             if (mPackets.empty())
00132                 return NULL;
00133 
00134             pkt = mPackets.front();
00135             mPackets.pop();
00136             uint32 sz = mSizeFunctor(pkt);
00137             mSize -= sz;
00138         }
00139 
00140         return pkt;
00141     }
00142 
00143 private:
00144     typedef std::queue<PacketType*> PacketQueue;
00145     boost::mutex mMutex;
00146     PacketQueue mPackets;
00147     const SizeFunctor mSizeFunctor;
00148     uint32 mMaxSize;
00149     Sirikata::AtomicValue<uint32> mSize;
00150 
00151     bool mWentNonEmpty;
00152 }; // class QueueRouterElement
00153 
00154 } // namespace Sirikata
00155 
00156 #endif //_SIRIKATA_QUEUE_ROUTER_ELEMENT_