Sirikata
|
00001 /* Sirikata Utilities -- Sirikata Synchronization Utilities 00002 * LockFreeQueue.hpp 00003 * 00004 * Copyright (c) 2008, Daniel Reiter Horn 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions are 00009 * met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in 00014 * the documentation and/or other materials provided with the 00015 * distribution. 00016 * * Neither the name of Sirikata nor the names of its contributors may 00017 * be used to endorse or promote products derived from this software 00018 * without specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 00021 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00023 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER 00024 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00025 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00026 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00027 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00028 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 00029 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00030 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 */ 00032 00033 00034 #ifndef _SIRIKATA_LOCK_FREE_QUEUE_HPP_ 00035 #define _SIRIKATA_LOCK_FREE_QUEUE_HPP_ 00036 00037 #include <sirikata/core/util/AtomicTypes.hpp> 00038 00040 namespace Sirikata { 00041 00043 template <typename T> class LockFreeQueue { 00044 private: 00045 struct Node { 00046 volatile Node * mNext; 00047 T mContent; 00048 void *operator new(size_t num_bytes) { 00049 return Sirikata::aligned_malloc<Node>(num_bytes,16); 00050 } 00051 void operator delete(void *data) { 00052 Sirikata::aligned_free(data); 00053 } 00054 Node() :mNext(NULL), mContent() { 00055 } 00056 }; 00057 class FreeNodePool { 00058 00059 volatile Node *mHead; 00060 public: 00061 FreeNodePool() { 00062 mHead = new Node(); 00063 } 00064 00065 ~FreeNodePool() { 00066 Node *cur = const_cast<Node*>(mHead); 00067 Node *next = NULL; 00068 do { 00069 next = const_cast<Node*>(cur->mNext); 00070 delete cur; 00071 } while ((cur=next)!=NULL); 00072 } 00073 00074 Node* allocate() { 00075 volatile Node* node=0; 00076 do { 00077 node = mHead->mNext; 00078 if (node == 0) { 00079 return new Node();//FIXME should probably be aligned to size(Node) bytes 00080 } 00081 } while (!compare_and_swap(&mHead->mNext, node, node->mNext)); 00082 Node * return_node=(Node*)node;//FIXME volatile cast only allowed if mContent is primitive type of pointer size or less 00083 return_node->mNext = NULL; 00084 return_node->mContent=T(); 00085 return return_node; 00086 } 00087 00088 void release(Node *node) { 00089 node->mContent = T(); 00090 do { 00091 node->mNext = mHead->mNext; 00092 } while (!compare_and_swap(&mHead->mNext, node->mNext, node)); 00093 } 00094 } mFreeNodePool; 00095 volatile Node *mHead; 00096 volatile Node *mTail; 00097 public: 00098 LockFreeQueue() { 00099 mHead = mFreeNodePool.allocate(); 00100 mTail = mHead; 00101 } 00102 class NodeIterator { 00103 private: 00104 // Noncopyable 00105 NodeIterator(const NodeIterator &other); 00106 void operator=(const NodeIterator &other); 00107 00108 Node *mLastReturned; 00109 Node *mCurrent; 00110 00111 FreeNodePool *mFreePool; 00112 00113 public: 00114 NodeIterator(LockFreeQueue<T> &queue) 00115 : mLastReturned(queue.fork()), 00116 mCurrent(const_cast<Node*>(mLastReturned->mNext)), 00117 mFreePool(&queue.mFreeNodePool) { 00118 } 00119 00120 ~NodeIterator() { 00121 if (mLastReturned) { 00122 mFreePool->release(mLastReturned); 00123 } 00124 while (mCurrent) { 00125 mFreePool->release(mCurrent); 00126 mCurrent = const_cast<Node*>(mCurrent->mNext); 00127 } 00128 } 00129 00130 T *next() { 00131 if (mLastReturned) { 00132 mFreePool->release(mLastReturned); 00133 } 00134 mLastReturned = mCurrent; 00135 if (mCurrent) { 00136 mCurrent = const_cast<Node*>(mCurrent->mNext); 00137 return &mLastReturned->mContent; 00138 } else { 00139 return NULL; 00140 } 00141 } 00142 00143 }; 00144 00145 ~LockFreeQueue() { 00146 NodeIterator junk(*this); 00147 // release everything in the queue in ~NodeIterator 00148 } 00149 00150 private: 00151 friend class NodeIterator; 00152 00153 Node *fork() { 00154 volatile Node *newHead = mFreeNodePool.allocate(); 00155 volatile Node *oldHead = mHead; 00156 00157 // Acquire "lock" on head, for multiple people fork()ing at once. 00158 { 00159 while (oldHead == 0 || 00160 !compare_and_swap(&mHead, oldHead, (volatile Node*)0)) { 00161 oldHead = mHead; 00162 } 00163 } 00164 00165 { 00166 volatile Node *oldTail = mTail; 00167 while (!compare_and_swap(&mTail, oldTail, newHead)) { 00168 oldTail = mTail; 00169 } 00170 } 00171 00172 mHead = newHead; 00173 return const_cast<Node*>(oldHead); 00174 } 00175 public: 00176 00182 void push(const T &value) { 00183 volatile Node* formerTail = NULL; 00184 volatile Node* formerTailNext=NULL; 00185 00186 Node* newerNode = mFreeNodePool.allocate(); 00187 newerNode->mContent = value; 00188 volatile Node*newNode=newerNode; 00189 bool successfulAddNode = false; 00190 while (!successfulAddNode) { 00191 formerTail = mTail; 00192 formerTailNext = formerTail->mNext; 00193 00194 if (mTail == formerTail) { 00195 if (formerTailNext == 0) 00196 successfulAddNode = compare_and_swap(&mTail->mNext, (volatile Node*)0, newNode); 00197 else 00198 compare_and_swap(&mTail, formerTail, formerTailNext); 00199 } 00200 } 00201 00202 compare_and_swap(&mTail, formerTail, newNode); 00203 } 00204 00211 bool pop(T &value) { 00212 volatile Node* formerHead = NULL; 00213 00214 bool headAlreadyAdvanced = false; 00215 while (!headAlreadyAdvanced) { 00216 00217 formerHead = mHead; 00218 if (formerHead == NULL) { 00219 // fork() function is operating on mTail. 00220 continue; 00221 } 00222 volatile Node* formerHeadNext = formerHead->mNext; 00223 volatile Node*formerTail = mTail; 00224 00225 if (formerHead == mHead) { 00226 if (formerHead == formerTail) { 00227 if (formerHeadNext == NULL) { 00228 value=T(); 00229 return false; 00230 } 00231 compare_and_swap(&mTail, formerTail, formerHeadNext); 00232 } 00233 00234 else { 00235 value = ((Node*)formerHeadNext)->mContent;//FIXME volatile cast only allowed if mContent is primitive type of pointer size or less 00236 headAlreadyAdvanced = compare_and_swap(&mHead, formerHead, formerHeadNext); 00237 } 00238 } 00239 } 00240 mFreeNodePool.release((Node*)formerHead);//FIXME volatile cast only allowed if mContent is primitive type of pointer size or less 00241 return true; 00242 } 00243 00244 void blockingPop(T &item) { 00245 throw std::runtime_error(std::string("Blocking Pop not implemented!!!")); 00246 } 00247 void swap(std::deque<T>&swapWith){ 00248 if (!swapWith.empty()) 00249 throw std::runtime_error(std::string("Trying to swap with a nonempty queue")); 00250 popAll(&swapWith); 00251 } 00252 void popAll(std::deque<T>*toPop) { 00253 assert (toPop->empty()); 00254 T value; 00255 while (pop(value)){ 00256 toPop->push(value); 00257 } 00258 } 00259 bool probablyEmpty() { 00260 volatile Node* formerHead = mHead; 00261 if (formerHead) { 00262 if (formerHead->mNext) { 00263 // fork() function is operating on mTail. 00264 return false; 00265 } 00266 } 00267 return true; 00268 } 00269 }; 00270 } 00271 00272 #endif //_SIRIKATA_LOCK_FREE_QUEUE_HPP_