Sirikata
|
00001 /* Sirikata Utilities -- Sirikata Synchronization Utilities 00002 * ThreadSafeQueue.hpp 00003 * 00004 * Copyright (c) 2008, Patrick Reiter Horn 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 #ifndef SIRIKATA_ThreadSafeQueue_HPP__ 00034 #define SIRIKATA_ThreadSafeQueue_HPP__ 00035 00036 00037 namespace Sirikata { 00038 namespace ThreadSafeQueueNS{ 00039 class Lock; 00040 class Condition; 00042 SIRIKATA_EXPORT void lock(Lock*lok); 00044 SIRIKATA_EXPORT void wait(Lock*lok, 00045 Condition* cond, 00046 bool(*check)(void*, void *), 00047 void* arg1, void* arg2); 00049 SIRIKATA_EXPORT void notify(Condition* cond); 00051 SIRIKATA_EXPORT void unlock(Lock*lok); 00053 SIRIKATA_EXPORT Lock* lockCreate(); 00055 SIRIKATA_EXPORT Condition* condCreate(); 00057 SIRIKATA_EXPORT void lockDestroy(Lock*oldlock); 00059 SIRIKATA_EXPORT void condDestroy(Condition*oldcond); 00060 } 00061 00065 template <typename T> 00066 class ThreadSafeQueue { 00067 protected: 00068 typedef std::deque<T> ListType; 00069 ThreadSafeQueueNS::Lock* mLock; 00070 ListType mList; 00071 ThreadSafeQueueNS::Condition* mCond; 00072 private: 00080 ThreadSafeQueue& operator= (const ThreadSafeQueue &other){ 00081 if (this<&other) { 00082 ThreadSafeQueueNS::lock(mLock); 00083 ThreadSafeQueueNS::lock(other.mLock); 00084 }else { 00085 ThreadSafeQueueNS::lock(other.mLock); 00086 ThreadSafeQueueNS::lock(mLock); 00087 } 00088 try { 00089 mList=other.mList; 00090 }catch (...) { 00091 if (this<&other) { 00092 ThreadSafeQueueNS::unlock(other.mLock); 00093 ThreadSafeQueueNS::unlock(mLock); 00094 }else { 00095 ThreadSafeQueueNS::unlock(mLock); 00096 ThreadSafeQueueNS::unlock(other.mLock); 00097 } 00098 throw; 00099 } 00100 if (this<&other) { 00101 ThreadSafeQueueNS::unlock(other.mLock); 00102 ThreadSafeQueueNS::unlock(mLock); 00103 }else { 00104 ThreadSafeQueueNS::unlock(mLock); 00105 ThreadSafeQueueNS::unlock(other.mLock); 00106 } 00107 00108 return *this; 00109 } 00110 ThreadSafeQueue(const ThreadSafeQueue &other){ 00111 mLock=ThreadSafeQueueNS::lockCreate(); 00112 mCond=ThreadSafeQueueNS::condCreate(); 00113 ThreadSafeQueueNS::lock(other.mLock); 00114 try { 00115 mList=other.mList; 00116 ThreadSafeQueueNS::unlock(other.mLock); 00117 }catch (...) { 00118 ThreadSafeQueueNS::unlock(other.mLock); 00119 throw; 00120 } 00121 } 00122 00130 static bool waitCheck(void * thus, void*vretval) { 00131 T* retval=reinterpret_cast<T*>(vretval); 00132 00133 if (reinterpret_cast <ThreadSafeQueue* >(thus)->mList.empty()) 00134 return true; 00135 *retval=reinterpret_cast <ThreadSafeQueue* >(thus)->mList.front(); 00136 reinterpret_cast <ThreadSafeQueue* >(thus)->mList.pop_front(); 00137 return false; 00138 } 00139 00140 public: 00141 class NodeIterator { 00142 private: 00143 // Noncopyable 00144 NodeIterator(const NodeIterator &other); 00145 void operator=(const NodeIterator &other); 00146 00147 T *mNext; 00148 ListType mSwappedList; 00149 00150 public: 00151 NodeIterator(ThreadSafeQueue<T> &queue) : mNext(NULL) { 00152 queue.swap(mSwappedList); 00153 } 00154 00155 T *next() { 00156 if (mNext) { 00157 mSwappedList.pop_front(); 00158 } 00159 if (mSwappedList.empty()) { 00160 return NULL; 00161 } 00162 mNext = &(mSwappedList.front()); 00163 return mNext; 00164 } 00165 }; 00166 friend class NodeIterator; 00167 00168 ThreadSafeQueue() { 00169 mLock=ThreadSafeQueueNS::lockCreate(); 00170 mCond=ThreadSafeQueueNS::condCreate(); 00171 } 00172 ~ThreadSafeQueue(){ 00173 ThreadSafeQueueNS::lockDestroy(mLock); 00174 ThreadSafeQueueNS::condDestroy(mCond); 00175 } 00176 00182 void swap(std::deque<T> &swapWith) { 00183 ThreadSafeQueueNS::lock(mLock); 00184 mList.swap(swapWith); 00185 ThreadSafeQueueNS::unlock(mLock); 00186 } 00187 00192 void popAll(std::deque<T> *popResults) { 00193 popResults->resize(0); 00194 swap(*popResults); 00195 } 00196 00200 int32 push(const T &value) { 00201 int32 new_size; 00202 ThreadSafeQueueNS::lock(mLock); 00203 try { 00204 mList.push_back(value); 00205 new_size = mList.size(); 00206 ThreadSafeQueueNS::notify(mCond); 00207 } catch (...) { 00208 ThreadSafeQueueNS::unlock(mLock); 00209 throw; 00210 } 00211 ThreadSafeQueueNS::unlock(mLock); 00212 return new_size; 00213 } 00214 00218 int32 pushMultiple(const std::deque<T> &values) { 00219 int32 new_size; 00220 ThreadSafeQueueNS::lock(mLock); 00221 try { 00222 while(!values.empty()) { 00223 T& value = values.front(); 00224 mList.push_back(value); 00225 values.pop(); 00226 } 00227 new_size = mList.size(); 00228 ThreadSafeQueueNS::notify(mCond); 00229 } catch (...) { 00230 ThreadSafeQueueNS::unlock(mLock); 00231 throw; 00232 } 00233 ThreadSafeQueueNS::unlock(mLock); 00234 return new_size; 00235 } 00236 00241 bool pop(T& ret) { 00242 ThreadSafeQueueNS::lock(mLock); 00243 if (mList.empty()) { 00244 ThreadSafeQueueNS::unlock(mLock); 00245 return false; 00246 } else { 00247 try { 00248 ret = mList.front(); 00249 mList.pop_front(); 00250 }catch (...) { 00251 ThreadSafeQueueNS::unlock(mLock); 00252 throw; 00253 } 00254 ThreadSafeQueueNS::unlock(mLock); 00255 return true; 00256 } 00257 } 00258 00263 void blockingPop(T& retval) { 00264 ThreadSafeQueueNS::wait(mLock, mCond, &ThreadSafeQueue<T>::waitCheck, this, &retval); 00265 } 00266 00273 bool probablyEmpty() { 00274 return mList.empty(); 00275 } 00276 00281 int32 size() { 00282 return mList.size(); 00283 } 00284 }; 00285 00286 } 00287 00288 #endif