Shadowrun: Awakened 29 September 2011 - Build 871
LocklessFIFO.hpp
Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2010 Christopher A. Taylor.  All rights reserved.
00003 
00004     Redistribution and use in source and binary forms, with or without
00005     modification, are permitted provided that the following conditions are met:
00006 
00007     * Redistributions of source code must retain the above copyright notice,
00008       this list of conditions and the following disclaimer.
00009     * Redistributions in binary form must reproduce the above copyright notice,
00010       this list of conditions and the following disclaimer in the documentation
00011       and/or other materials provided with the distribution.
00012     * Neither the name of LibCat nor the names of its contributors may be used
00013       to endorse or promote products derived from this software without
00014       specific prior written permission.
00015 
00016     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00017     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00018     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00019     ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
00020     LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00021     CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00022     SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00023     INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00024     CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00025     ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00026     POSSIBILITY OF SUCH DAMAGE.
00027 */
00028 
00029 /*
00030     Algorithm from "An Optimistic Approach to Lock-Free FIFO Queues"
00031         Edya Ladan-Mozes and Nir Shavit (2004)
00032 */
00033 
00034 #ifndef CAT_LOCKLESS_FIFO_HPP
00035 #define CAT_LOCKLESS_FIFO_HPP
00036 
00037 #include <cat/threads/RegionAllocator.hpp>
00038 #include <cat/threads/Atomic.hpp>
00039 
00040 #if defined(CAT_OS_WINDOWS)
00041 # include <cat/port/WindowsInclude.hpp>
00042 #else
00043 # error "Not portable to your OS!"
00044 #endif
00045 
00046 namespace cat {
00047 
00048 
00049 namespace FIFO {
00050 
00051 template<class T> class Ptr;
00052 template<class T> class Node;
00053 template<class T> class Queue;
00054 
00055 
00057 
00058     // Union for an ABA-proof pointer
00059     template<class T>
00060     class Ptr
00061     {
00062     public:
00063         union
00064         {
00065             struct
00066             {
00067                 Node<T> *ptr;
00068 #if defined(CAT_WORD_64)
00069                 u64 tag;
00070 #else
00071                 u32 tag;
00072 #endif
00073             };
00074 #if defined(CAT_WORD_64)
00075             volatile u64 N[2];
00076 #else
00077             volatile u64 N;
00078 #endif
00079         } CAT_PACKED;
00080 
00081         Ptr(); // zero everything
00082 
00083         bool operator==(const Ptr<T> &rhs);
00084         bool operator!=(const Ptr<T> &rhs);
00085     };
00086 
00087 
00089 
00090     template<class T>
00091     class Node
00092     {
00093         friend class Queue<T>;
00094 
00095         T *value;
00096         Ptr<T> next, prev;
00097     };
00098 
00099 
00101 
00102     // Performs lazy deallocation of data objects on behalf of the caller,
00103     // freeing all remaining objects when the Queue goes out of scope.
00104     template<class T>
00105     class Queue
00106     {
00107         // Pointer to head and tail
00108         Ptr<T> Head, Tail;
00109 
00110         // Event to wait on if dequeuing
00111 #if defined(CAT_OS_WINDOWS)
00112         HANDLE hEvent;
00113 #endif
00114 
00115     public:
00116         Queue();
00117         ~Queue();
00118 
00119     public:
00120         void Enqueue(T *data);
00121         T *Dequeue();
00122         void FixList(Ptr<T> tail, Ptr<T> head);
00123 
00124         // Enqueue a new event to wake up a thread stuck here
00125         T *DequeueWait();
00126     };
00127 
00128 
00130 
00131     template<class T>
00132     inline bool CAS2(Ptr<T> &destination, const Ptr<T> &expected, const Ptr<T> &replacement)
00133     {
00134         return Atomic::CAS2(&destination, &expected, &replacement);
00135     }
00136 
00137 
00139 
00140     template<class T>
00141     Ptr<T>::Ptr()
00142     {
00143 #if defined(CAT_WORD_64)
00144         N[0] = 0;
00145         N[1] = 0;
00146 #else
00147         N = 0;
00148 #endif
00149     }
00150 
00151     template<class T>
00152     bool Ptr<T>::operator==(const Ptr<T> &n)
00153     {
00154 #if defined(CAT_WORD_64)
00155         return N[0] == n.N[0] && N[1] == n.N[1];
00156 #else
00157         return N == n.N;
00158 #endif
00159     }
00160 
00161     template<class T>
00162     bool Ptr<T>::operator!=(const Ptr<T> &n)
00163     {
00164 #if defined(CAT_WORD_64)
00165         return N[0] != n.N[0] || N[1] != n.N[1];
00166 #else
00167         return N != n.N;
00168 #endif
00169     }
00170 
00171 
00173 
00174     template<class T>
00175     Queue<T>::Queue()
00176     {
00177 #if defined(CAT_OS_WINDOWS)
00178         hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
00179 #endif
00180 
00181         Node<T> *node = new (RegionAllocator::ii) Node<T>;
00182         node->value = 0;
00183 
00184         Head.ptr = Tail.ptr = node;
00185     }
00186 
00187     template<class T>
00188     Queue<T>::~Queue()
00189     {
00190         // Destroy objects that are still queued
00191         for (Node<T> *next, *ptr = Head.ptr; ptr; ptr = next)
00192         {
00193             next = ptr->next.ptr;
00194 
00195             if (ptr->value)
00196                 RegionAllocator::ii->Delete(ptr->value);
00197             RegionAllocator::ii->Delete(ptr);
00198         }
00199 
00200 #if defined(CAT_OS_WINDOWS)
00201         CloseHandle(hEvent);
00202 #endif
00203     }
00204 
00205     template<class T>
00206     void Queue<T>::Enqueue(T *val)
00207     {
00208         Ptr<T> tail;
00209         Node<T> *nd = new (RegionAllocator::ii) Node<T>;
00210         nd->value = val;
00211 
00212         for (;;)
00213         {
00214             tail = Tail;
00215             nd->next.ptr = tail.ptr;
00216             nd->next.tag = tail.tag + 1;
00217 
00218             Ptr<T> new_tail;
00219             new_tail.ptr = nd;
00220             new_tail.tag = tail.tag + 1;
00221 
00222             if (CAS2(Tail, tail, new_tail))
00223             {
00224                 tail.ptr->prev.ptr = nd;
00225                 tail.ptr->prev.tag = tail.tag;
00226                 break;
00227             }
00228         }
00229 
00230 #if defined(CAT_OS_WINDOWS)
00231         SetEvent(hEvent);
00232 #endif
00233     }
00234 
00235     template<class T>
00236     T *Queue<T>::DequeueWait()
00237     {
00238         for (;;)
00239         {
00240             // Attempt to dequeue a message
00241             // If we won the race to service the message, then return it
00242             T *retval = Dequeue();
00243             if (retval) return retval;
00244 
00245 #if defined(CAT_OS_WINDOWS)
00246             // If the sychronization wait fails (handle closed), abort with 0
00247             if (WaitForSingleObject(hEvent, INFINITE) != WAIT_OBJECT_0)
00248                 return 0;
00249 #endif
00250         }
00251     }
00252 
00253     template<class T>
00254     T *Queue<T>::Dequeue()
00255     {
00256         Ptr<T> tail, head, firstNodePrev;
00257         Node<T> *nd_dummy;
00258         T *val;
00259 
00260         for (;;)
00261         {
00262             head = Head;
00263             tail = Tail;
00264             firstNodePrev = head.ptr->prev;
00265             val = head.ptr->value;
00266 
00267             if (head == Head)
00268             {
00269                 if (val != 0)
00270                 {
00271                     if (tail != head)
00272                     {
00273                         if (firstNodePrev.tag != head.tag)
00274                         {
00275                             FixList(tail, head);
00276                             continue;
00277                         }
00278                     }
00279                     else
00280                     {
00281                         nd_dummy = new (RegionAllocator::ii) Node<T>;
00282                         nd_dummy->value = 0;
00283                         nd_dummy->next.ptr = tail.ptr;
00284                         nd_dummy->next.tag = tail.tag + 1;
00285 
00286                         Ptr<T> new_tail;
00287                         new_tail.ptr = nd_dummy;
00288                         new_tail.tag = tail.tag + 1;
00289 
00290                         if (CAS2(Tail, tail, new_tail))
00291                         {
00292                             head.ptr->prev.ptr = nd_dummy;
00293                             head.ptr->prev.tag = tail.tag;
00294                         }
00295                         else
00296                         {
00297                             RegionAllocator::ii->Delete(nd_dummy);
00298                         }
00299 
00300                         continue;
00301                     }
00302 
00303                     Ptr<T> new_head;
00304                     new_head.ptr = firstNodePrev.ptr;
00305                     new_head.tag = head.tag + 1;
00306 
00307                     if (CAS2(Head, head, new_head))
00308                     {
00309                         RegionAllocator::ii->Delete(head.ptr);
00310                         return val;
00311                     }
00312                 }
00313                 else
00314                 {
00315                     if (tail.ptr == head.ptr)
00316                         return 0;
00317 
00318                     if (firstNodePrev.tag != head.tag)
00319                     {
00320                         FixList(tail, head);
00321 
00322                         continue;
00323                     }
00324 
00325                     Ptr<T> new_head;
00326                     new_head.ptr = firstNodePrev.ptr;
00327                     new_head.tag = head.tag + 1;
00328 
00329                     CAS2(Head, head, new_head);
00330                 }
00331             }
00332         }
00333     }
00334 
00335     template<class T>
00336     void Queue<T>::FixList(Ptr<T> tail, Ptr<T> head)
00337     {
00338         Ptr<T> curNode, curNodeNext, nextNodePrev;
00339 
00340         curNode = tail;
00341 
00342         while (head == Head && curNode != head)
00343         {
00344             curNodeNext = curNode.ptr->next;
00345 
00346             if (curNodeNext.tag != curNode.tag)
00347                 return;
00348 
00349             nextNodePrev = curNodeNext.ptr->prev;
00350 
00351             if (nextNodePrev.ptr != curNode.ptr || nextNodePrev.tag != curNode.tag - 1)
00352             {
00353                 curNodeNext.ptr->prev.ptr = curNode.ptr;
00354                 curNodeNext.ptr->prev.tag = curNode.tag - 1;
00355             }
00356 
00357             curNode.ptr = curNodeNext.ptr;
00358             curNode.tag = curNode.tag - 1;
00359         }
00360     }
00361 
00362 
00363 } // namespace FIFO
00364 
00365 
00366 } // namespace cat
00367 
00368 #endif // CAT_LOCKLESS_FIFO_HPP

Copyright © 2007-2010 by The Shadowrun: Awakened Team. This work is licensed under the GNU Lesser General Public License 3.

GNU Lesser General Public License 3 Sourceforge.net