Sirikata
|
00001 /* Sirikata 00002 * QueueRouterElement.hpp 00003 * 00004 * Copyright (c) 2009, Ewen Cheslack-Postava 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef _SIRIKATA_QUEUE_ROUTER_ELEMENT_ 00034 #define _SIRIKATA_QUEUE_ROUTER_ELEMENT_ 00035 00036 #include "RouterElement.hpp" 00037 #include <boost/thread/mutex.hpp> 00038 #include <boost/thread/locks.hpp> 00039 00040 namespace Sirikata { 00041 00047 template<typename PacketType> 00048 class QueueRouterElement : public UpstreamElementFixed<PacketType, 1>, public DownstreamElementFixed<PacketType, 1> { 00049 typedef boost::mutex mutex; 00050 typedef boost::unique_lock<mutex> unique_lock; 00051 public: 00052 typedef std::tr1::function<uint32(PacketType*)> SizeFunctor; 00053 00054 QueueRouterElement(uint32 max_size, SizeFunctor sizefunc) 00055 : mSizeFunctor(sizefunc), 00056 mMaxSize(max_size), 00057 mSize(0), 00058 mWentNonEmpty(false) 00059 { 00060 } 00061 00062 ~QueueRouterElement() { 00063 PacketType* pkt = NULL; 00064 while( (pkt = pull()) != NULL) 00065 delete pkt; 00066 } 00067 00068 bool empty() { 00069 return (mSize.read() == 0); 00070 } 00071 00072 bool push(PacketType* pkt) { 00073 return push(0, pkt); 00074 } 00075 virtual bool push(uint32 port, PacketType* pkt) { 00076 return push(port, pkt, false); 00077 } 00078 bool push(PacketType* pkt, bool force) { 00079 return push(0, pkt, force); 00080 } 00081 bool push(uint32 port, PacketType* pkt, bool force) { 00082 assert(pkt != NULL); 00083 assert(port == 0); 00084 00085 uint32 sz = mSizeFunctor(pkt); 00086 00087 uint32 cur_size = mSize.read(); 00088 uint32 new_size = cur_size + sz; 00089 00090 if (new_size > mMaxSize && !force) { 00091 // drop 00092 return false; 00093 } 00094 00095 { 00096 unique_lock guard(mMutex); 00097 // else store, must recompute since mSize may have changed 00098 if (mSize.read() == 0) 00099 mWentNonEmpty = true; 00100 mSize += sz; 00101 mPackets.push(pkt); 00102 } 00103 00104 return true; 00105 } 00106 00110 bool wentNonEmpty() { 00111 bool wne = mWentNonEmpty; 00112 mWentNonEmpty = false; 00113 return wne; 00114 } 00115 00116 PacketType* pull() { 00117 return pull(0); 00118 } 00119 virtual PacketType* pull(uint32 port) { 00120 assert(port == 0); 00121 00122 uint32 cur_size = mSize.read(); 00123 00124 if (cur_size == 0) 00125 return NULL; 00126 00127 PacketType* pkt = NULL; 00128 { 00129 unique_lock guard(mMutex); 00130 00131 if (mPackets.empty()) 00132 return NULL; 00133 00134 pkt = mPackets.front(); 00135 mPackets.pop(); 00136 uint32 sz = mSizeFunctor(pkt); 00137 mSize -= sz; 00138 } 00139 00140 return pkt; 00141 } 00142 00143 private: 00144 typedef std::queue<PacketType*> PacketQueue; 00145 boost::mutex mMutex; 00146 PacketQueue mPackets; 00147 const SizeFunctor mSizeFunctor; 00148 uint32 mMaxSize; 00149 Sirikata::AtomicValue<uint32> mSize; 00150 00151 bool mWentNonEmpty; 00152 }; // class QueueRouterElement 00153 00154 } // namespace Sirikata 00155 00156 #endif //_SIRIKATA_QUEUE_ROUTER_ELEMENT_