Sirikata
libcore/include/sirikata/core/queue/SizedThreadSafeQueue.hpp
Go to the documentation of this file.
00001 /*  Sirikata Utilities -- Sirikata Synchronization Utilities
00002  *  SizedThreadSafeQueue.hpp
00003  *
00004  *  Copyright (c) 2008, Daniel Reiter Horn
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef SIRIKATA_SizedThreadSafeQueue_HPP__
00034 #define SIRIKATA_SizedThreadSafeQueue_HPP__
00035 #include "ThreadSafeQueue.hpp"
00036 
00037 
00038 namespace Sirikata {
00039 
00040 
00041 class SizedResourceMonitor {
00042     AtomicValue<int32> mSize;
00043     const int32 mLimit;
00044 public:
00045     void reset() {
00046         mSize=0;
00047     }
00048     uint32 maxSize() const {
00049         return (uint32)mLimit;
00050     }
00051     uint32 filledSize() const {
00052         return (uint32)mSize.read();
00053     }
00054     SizedResourceMonitor(uint32 limit):mSize(0),mLimit((int32)limit){
00055         assert(mLimit>=0);
00056     }
00057 
00058     template <class T>
00059     bool preIncrement(T value, bool force) {
00060         if ((mSize+=(int32)value.size())>=mLimit&&mLimit&&!force) {
00061             mSize-=value.size();
00062             return false;
00063         }
00064         return true;
00065     }
00066 
00067     template <class T>
00068     bool preIncrement(T* value, bool force) {
00069         if ((mSize+=(int32)value->size())>=mLimit&&mLimit&&!force) {
00070             mSize-=value->size();
00071             return false;
00072         }
00073         return true;
00074     }
00075 
00076     template <class T>
00077     void postDecrement(T value) {
00078         int32 check=(mSize-=(int32)value.size());
00079         assert(check>=0);
00080     }
00081 
00082     template <class T>
00083     void postDecrement(T* value) {
00084         int32 check=(mSize-=(int32)value->size());
00085         assert(check>=0);
00086     }
00087 
00088     template <class T>
00089     bool probablyCanPush(T t) {
00090         return mSize.read()+(uint32)t->size()<=mLimit||!mLimit;
00091     }
00092 
00093     template <class T>
00094     bool probablyCanPush(T* t) {
00095         return mSize.read()+(uint32)t.size()<=mLimit||!mLimit;
00096     }
00097 
00098     bool probablyCanPush(size_t size) {
00099         return mSize.read()+(int32)size<=mLimit||!mLimit;
00100     }
00101 };
00102 
00109 template <
00110     typename T,
00111     class ResourceMonitor=SizedResourceMonitor,
00112     class Superclass=ThreadSafeQueue<T>
00113     >
00114 class SizedThreadSafeQueue : protected Superclass {
00115     ResourceMonitor mResourceMonitor;
00116 public:
00117     SizedThreadSafeQueue(const ResourceMonitor&rm):mResourceMonitor(rm){
00118         mResourceMonitor.reset();
00119     }
00120     const ResourceMonitor&getResourceMonitor()const{return mResourceMonitor;}
00121     void popAll(std::deque<T>*popResults) {
00122         Superclass::popAll(popResults);
00123         for (typename std::deque<T>::iterator i=popResults->begin(),ie=popResults->end();i!=ie;++i) {
00124             mResourceMonitor.postDecrement(*i);
00125         }
00126     }
00127     bool push(const T &value, bool force) {
00128         if (mResourceMonitor.preIncrement(value,force)) {
00129             try {
00130                 Superclass::push(value);
00131                 return true;
00132             } catch (...) {
00133                 mResourceMonitor.postDecrement(value);
00134                 throw;//didn't get successfully pushed
00135             }
00136         }
00137         return false;
00138     }
00139     bool pop(T &value) {
00140         if (Superclass::pop(value)) {
00141             mResourceMonitor.postDecrement(value);
00142             return true;
00143         }
00144         return false;
00145     }
00146     void blockingPop(T&value) {
00147         Superclass::blockingPop(value);
00148         mResourceMonitor.postDecrement(value);
00149     }
00150     template <class U> bool probablyCanPush(const U&specifier) {
00151         return mResourceMonitor.probablyCanPush(specifier);
00152     }
00153     bool probablyEmpty() {
00154         return Superclass::probablyEmpty();
00155     }
00156 };
00157 
00158 }
00159 
00160 #endif