Sirikata
libcore/include/sirikata/core/queue/LockFreeQueue.hpp
Go to the documentation of this file.
00001 /*  Sirikata Utilities -- Sirikata Synchronization Utilities
00002  *  LockFreeQueue.hpp
00003  *
00004  *  Copyright (c) 2008, Daniel Reiter Horn
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions are
00009  *  met:
00010  *  * Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer.
00012  *  * Redistributions in binary form must reproduce the above copyright
00013  *    notice, this list of conditions and the following disclaimer in
00014  *    the documentation and/or other materials provided with the
00015  *    distribution.
00016  *  * Neither the name of Sirikata nor the names of its contributors may
00017  *    be used to endorse or promote products derived from this software
00018  *    without specific prior written permission.
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
00021  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00023  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
00024  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00025  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00026  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00027  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00028  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00029  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00030  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031  */
00032 
00033 
00034 #ifndef _SIRIKATA_LOCK_FREE_QUEUE_HPP_
00035 #define _SIRIKATA_LOCK_FREE_QUEUE_HPP_
00036 
00037 #include <sirikata/core/util/AtomicTypes.hpp>
00038 
00040 namespace Sirikata {
00041 
00043 template <typename T> class LockFreeQueue {
00044 private:
00045     struct Node {
00046         volatile Node * mNext;
00047         T mContent;
00048         void *operator new(size_t num_bytes) {
00049             return Sirikata::aligned_malloc<Node>(num_bytes,16);
00050         }
00051         void operator delete(void *data) {
00052             Sirikata::aligned_free(data);
00053         }
00054         Node() :mNext(NULL), mContent() {
00055         }
00056     };
00057     class FreeNodePool {
00058 
00059         volatile Node *mHead;
00060     public:
00061         FreeNodePool() {
00062             mHead = new Node();
00063         }
00064 
00065         ~FreeNodePool() {
00066             Node *cur = const_cast<Node*>(mHead);
00067             Node *next = NULL;
00068             do {
00069                 next = const_cast<Node*>(cur->mNext);
00070                 delete cur;
00071             } while ((cur=next)!=NULL);
00072         }
00073 
00074         Node* allocate() {
00075             volatile Node* node=0;
00076             do {
00077                 node = mHead->mNext;
00078                 if (node == 0) {
00079                     return new Node();//FIXME should probably be aligned to size(Node) bytes
00080                 }
00081             } while (!compare_and_swap(&mHead->mNext, node, node->mNext));
00082             Node * return_node=(Node*)node;//FIXME volatile cast only allowed if mContent is primitive type of pointer size or less
00083             return_node->mNext = NULL;
00084             return_node->mContent=T();
00085             return return_node;
00086         }
00087 
00088         void release(Node *node) {
00089             node->mContent = T();
00090             do {
00091                 node->mNext = mHead->mNext;
00092             } while (!compare_and_swap(&mHead->mNext, node->mNext, node));
00093         }
00094     } mFreeNodePool;
00095     volatile Node *mHead;
00096     volatile Node *mTail;
00097 public:
00098     LockFreeQueue() {
00099         mHead = mFreeNodePool.allocate();
00100         mTail = mHead;
00101     }
00102     class NodeIterator {
00103     private:
00104         // Noncopyable
00105         NodeIterator(const NodeIterator &other);
00106         void operator=(const NodeIterator &other);
00107 
00108         Node *mLastReturned;
00109         Node *mCurrent;
00110 
00111         FreeNodePool *mFreePool;
00112 
00113     public:
00114         NodeIterator(LockFreeQueue<T> &queue)
00115             : mLastReturned(queue.fork()),
00116               mCurrent(const_cast<Node*>(mLastReturned->mNext)),
00117               mFreePool(&queue.mFreeNodePool) {
00118         }
00119 
00120         ~NodeIterator() {
00121             if (mLastReturned) {
00122                 mFreePool->release(mLastReturned);
00123             }
00124             while (mCurrent) {
00125                 mFreePool->release(mCurrent);
00126                 mCurrent = const_cast<Node*>(mCurrent->mNext);
00127             }
00128         }
00129 
00130         T *next() {
00131             if (mLastReturned) {
00132                 mFreePool->release(mLastReturned);
00133             }
00134             mLastReturned = mCurrent;
00135             if (mCurrent) {
00136                 mCurrent = const_cast<Node*>(mCurrent->mNext);
00137                 return &mLastReturned->mContent;
00138             } else {
00139                 return NULL;
00140             }
00141         }
00142 
00143     };
00144 
00145     ~LockFreeQueue() {
00146         NodeIterator junk(*this);
00147         // release everything in the queue in ~NodeIterator
00148     }
00149 
00150 private:
00151     friend class NodeIterator;
00152 
00153     Node *fork() {
00154         volatile Node *newHead = mFreeNodePool.allocate();
00155         volatile Node *oldHead = mHead;
00156 
00157         // Acquire "lock" on head, for multiple people fork()ing at once.
00158         {
00159             while (oldHead == 0 ||
00160                         !compare_and_swap(&mHead, oldHead, (volatile Node*)0)) {
00161                 oldHead = mHead;
00162             }
00163         }
00164 
00165         {
00166             volatile Node *oldTail = mTail;
00167             while (!compare_and_swap(&mTail, oldTail, newHead)) {
00168                 oldTail = mTail;
00169             }
00170         }
00171 
00172         mHead = newHead;
00173         return const_cast<Node*>(oldHead);
00174     }
00175 public:
00176 
00182     void push(const T &value) {
00183         volatile Node* formerTail = NULL;
00184         volatile Node* formerTailNext=NULL;
00185 
00186         Node* newerNode = mFreeNodePool.allocate();
00187         newerNode->mContent = value;
00188         volatile Node*newNode=newerNode;
00189         bool successfulAddNode = false;
00190         while (!successfulAddNode) {
00191             formerTail = mTail;
00192             formerTailNext = formerTail->mNext;
00193 
00194             if (mTail == formerTail) {
00195                 if (formerTailNext == 0)
00196                     successfulAddNode = compare_and_swap(&mTail->mNext, (volatile Node*)0, newNode);
00197                 else
00198                     compare_and_swap(&mTail, formerTail, formerTailNext);
00199             }
00200         }
00201 
00202         compare_and_swap(&mTail, formerTail, newNode);
00203     }
00204 
00211     bool pop(T &value) {
00212         volatile Node* formerHead = NULL;
00213 
00214         bool headAlreadyAdvanced = false;
00215         while (!headAlreadyAdvanced) {
00216 
00217             formerHead = mHead;
00218             if (formerHead == NULL) {
00219                 // fork() function is operating on mTail.
00220                 continue;
00221             }
00222             volatile Node* formerHeadNext = formerHead->mNext;
00223             volatile Node*formerTail = mTail;
00224 
00225             if (formerHead == mHead) {
00226                 if (formerHead == formerTail) {
00227                     if (formerHeadNext == NULL) {
00228                         value=T();
00229                         return false;
00230                     }
00231                     compare_and_swap(&mTail, formerTail, formerHeadNext);
00232                 }
00233 
00234                 else {
00235                     value = ((Node*)formerHeadNext)->mContent;//FIXME volatile cast only allowed if mContent is primitive type of pointer size or less
00236                     headAlreadyAdvanced = compare_and_swap(&mHead, formerHead, formerHeadNext);
00237                 }
00238             }
00239         }
00240         mFreeNodePool.release((Node*)formerHead);//FIXME volatile cast only allowed if mContent is primitive type of pointer size or less
00241         return true;
00242     }
00243 
00244     void blockingPop(T &item) {
00245         throw std::runtime_error(std::string("Blocking Pop not implemented!!!"));
00246     }
00247     void swap(std::deque<T>&swapWith){
00248         if (!swapWith.empty())
00249             throw std::runtime_error(std::string("Trying to swap with a nonempty queue"));
00250         popAll(&swapWith);
00251     }
00252     void popAll(std::deque<T>*toPop) {
00253         assert (toPop->empty());
00254         T value;
00255         while (pop(value)){
00256             toPop->push(value);
00257         }
00258     }
00259     bool probablyEmpty() {
00260         volatile Node* formerHead = mHead;
00261         if (formerHead) {
00262             if (formerHead->mNext) {
00263                 // fork() function is operating on mTail.
00264                 return false;
00265             }
00266         }
00267         return true;
00268     }
00269 };
00270 }
00271 
00272 #endif //_SIRIKATA_LOCK_FREE_QUEUE_HPP_