Sirikata
libcore/include/sirikata/core/queue/ThreadSafeQueue.hpp
Go to the documentation of this file.
00001 /*  Sirikata Utilities -- Sirikata Synchronization Utilities
00002  *  ThreadSafeQueue.hpp
00003  *
00004  *  Copyright (c) 2008, Patrick Reiter Horn
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 #ifndef SIRIKATA_ThreadSafeQueue_HPP__
00034 #define SIRIKATA_ThreadSafeQueue_HPP__
00035 
00036 
00037 namespace Sirikata {
00038 namespace ThreadSafeQueueNS{
00039 class Lock;
00040 class Condition;
00042 SIRIKATA_EXPORT void lock(Lock*lok);
00044 SIRIKATA_EXPORT void wait(Lock*lok,
00045           Condition* cond,
00046           bool(*check)(void*, void *),
00047           void* arg1, void* arg2);
00049 SIRIKATA_EXPORT void notify(Condition* cond);
00051 SIRIKATA_EXPORT void unlock(Lock*lok);
00053 SIRIKATA_EXPORT Lock* lockCreate();
00055 SIRIKATA_EXPORT Condition* condCreate();
00057 SIRIKATA_EXPORT void lockDestroy(Lock*oldlock);
00059 SIRIKATA_EXPORT void condDestroy(Condition*oldcond);
00060 }
00061 
00065 template <typename T>
00066 class ThreadSafeQueue {
00067   protected:
00068     typedef std::deque<T> ListType;
00069     ThreadSafeQueueNS::Lock* mLock;
00070     ListType mList;
00071     ThreadSafeQueueNS::Condition* mCond;
00072   private:
00080     ThreadSafeQueue& operator= (const ThreadSafeQueue &other){
00081         if (this<&other) {
00082             ThreadSafeQueueNS::lock(mLock);
00083             ThreadSafeQueueNS::lock(other.mLock);
00084         }else {
00085             ThreadSafeQueueNS::lock(other.mLock);
00086             ThreadSafeQueueNS::lock(mLock);
00087         }
00088         try {
00089             mList=other.mList;
00090         }catch (...) {
00091             if (this<&other) {
00092                 ThreadSafeQueueNS::unlock(other.mLock);
00093                 ThreadSafeQueueNS::unlock(mLock);
00094             }else {
00095                 ThreadSafeQueueNS::unlock(mLock);
00096                 ThreadSafeQueueNS::unlock(other.mLock);
00097             }
00098             throw;
00099         }
00100         if (this<&other) {
00101             ThreadSafeQueueNS::unlock(other.mLock);
00102             ThreadSafeQueueNS::unlock(mLock);
00103         }else {
00104             ThreadSafeQueueNS::unlock(mLock);
00105             ThreadSafeQueueNS::unlock(other.mLock);
00106         }
00107 
00108         return *this;
00109     }
00110     ThreadSafeQueue(const ThreadSafeQueue &other){
00111         mLock=ThreadSafeQueueNS::lockCreate();
00112         mCond=ThreadSafeQueueNS::condCreate();
00113         ThreadSafeQueueNS::lock(other.mLock);
00114         try {
00115             mList=other.mList;
00116             ThreadSafeQueueNS::unlock(other.mLock);
00117         }catch (...) {
00118             ThreadSafeQueueNS::unlock(other.mLock);
00119             throw;
00120         }
00121     }
00122 
00130     static bool waitCheck(void * thus, void*vretval) {
00131         T* retval=reinterpret_cast<T*>(vretval);
00132 
00133         if (reinterpret_cast <ThreadSafeQueue* >(thus)->mList.empty())
00134             return true;
00135         *retval=reinterpret_cast <ThreadSafeQueue* >(thus)->mList.front();
00136         reinterpret_cast <ThreadSafeQueue* >(thus)->mList.pop_front();
00137         return false;
00138     }
00139 
00140 public:
00141     class NodeIterator {
00142     private:
00143         // Noncopyable
00144         NodeIterator(const NodeIterator &other);
00145         void operator=(const NodeIterator &other);
00146 
00147         T *mNext;
00148         ListType mSwappedList;
00149 
00150     public:
00151         NodeIterator(ThreadSafeQueue<T> &queue) : mNext(NULL) {
00152             queue.swap(mSwappedList);
00153         }
00154 
00155         T *next() {
00156             if (mNext) {
00157                 mSwappedList.pop_front();
00158             }
00159             if (mSwappedList.empty()) {
00160                 return NULL;
00161             }
00162             mNext = &(mSwappedList.front());
00163             return mNext;
00164         }
00165     };
00166     friend class NodeIterator;
00167 
00168     ThreadSafeQueue() {
00169         mLock=ThreadSafeQueueNS::lockCreate();
00170         mCond=ThreadSafeQueueNS::condCreate();
00171     }
00172     ~ThreadSafeQueue(){
00173         ThreadSafeQueueNS::lockDestroy(mLock);
00174         ThreadSafeQueueNS::condDestroy(mCond);
00175     }
00176 
00182     void swap(std::deque<T> &swapWith) {
00183         ThreadSafeQueueNS::lock(mLock);
00184         mList.swap(swapWith);
00185         ThreadSafeQueueNS::unlock(mLock);
00186     }
00187 
00192     void popAll(std::deque<T> *popResults) {
00193         popResults->resize(0);
00194         swap(*popResults);
00195     }
00196 
00200     int32 push(const T &value) {
00201         int32 new_size;
00202         ThreadSafeQueueNS::lock(mLock);
00203         try {
00204             mList.push_back(value);
00205             new_size = mList.size();
00206             ThreadSafeQueueNS::notify(mCond);
00207         } catch (...) {
00208             ThreadSafeQueueNS::unlock(mLock);
00209             throw;
00210         }
00211         ThreadSafeQueueNS::unlock(mLock);
00212         return new_size;
00213     }
00214 
00218     int32 pushMultiple(const std::deque<T> &values) {
00219         int32 new_size;
00220         ThreadSafeQueueNS::lock(mLock);
00221         try {
00222             while(!values.empty()) {
00223                 T& value = values.front();
00224                 mList.push_back(value);
00225                 values.pop();
00226             }
00227             new_size = mList.size();
00228             ThreadSafeQueueNS::notify(mCond);
00229         } catch (...) {
00230             ThreadSafeQueueNS::unlock(mLock);
00231             throw;
00232         }
00233         ThreadSafeQueueNS::unlock(mLock);
00234         return new_size;
00235     }
00236 
00241     bool pop(T& ret) {
00242         ThreadSafeQueueNS::lock(mLock);
00243         if (mList.empty()) {
00244             ThreadSafeQueueNS::unlock(mLock);
00245             return false;
00246         } else {
00247             try {
00248                 ret = mList.front();
00249                 mList.pop_front();
00250             }catch (...) {
00251                 ThreadSafeQueueNS::unlock(mLock);
00252                 throw;
00253             }
00254             ThreadSafeQueueNS::unlock(mLock);
00255             return true;
00256         }
00257     }
00258 
00263     void blockingPop(T& retval) {
00264         ThreadSafeQueueNS::wait(mLock, mCond, &ThreadSafeQueue<T>::waitCheck, this, &retval);
00265     }
00266 
00273     bool probablyEmpty() {
00274         return mList.empty();
00275     }
00276 
00281     int32 size() {
00282         return mList.size();
00283     }
00284 };
00285 
00286 }
00287 
00288 #endif