Shadowrun: Awakened 29 September 2011 - Build 871
_concurrent_queue_internal.h
Go to the documentation of this file.
00001 /*
00002     Copyright 2005-2010 Intel Corporation.  All Rights Reserved.
00003 
00004     This file is part of Threading Building Blocks.
00005 
00006     Threading Building Blocks is free software; you can redistribute it
00007     and/or modify it under the terms of the GNU General Public License
00008     version 2 as published by the Free Software Foundation.
00009 
00010     Threading Building Blocks is distributed in the hope that it will be
00011     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
00012     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013     GNU General Public License for more details.
00014 
00015     You should have received a copy of the GNU General Public License
00016     along with Threading Building Blocks; if not, write to the Free Software
00017     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00018 
00019     As a special exception, you may use this file as part of a free software
00020     library without restriction.  Specifically, if other files instantiate
00021     templates or use macros or inline functions from this file, or you compile
00022     this file and link it with other files to produce an executable, this
00023     file does not by itself cause the resulting executable to be covered by
00024     the GNU General Public License.  This exception does not however
00025     invalidate any other reasons why the executable file might be covered by
00026     the GNU General Public License.
00027 */
00028 
00029 #ifndef __TBB_concurrent_queue_internal_H
00030 #define __TBB_concurrent_queue_internal_H
00031 
00032 #include "tbb_stddef.h"
00033 #include "tbb_machine.h"
00034 #include "atomic.h"
00035 #include "spin_mutex.h"
00036 #include "cache_aligned_allocator.h"
00037 #include "tbb_exception.h"
00038 #include <new>
00039 
00040 #if !TBB_USE_EXCEPTIONS && _MSC_VER
00041     // Suppress "C++ exception handler used, but unwind semantics are not enabled" warning in STL headers
00042     #pragma warning (push)
00043     #pragma warning (disable: 4530)
00044 #endif
00045 
00046 #include <iterator>
00047 
00048 #if !TBB_USE_EXCEPTIONS && _MSC_VER
00049     #pragma warning (pop)
00050 #endif
00051 
00052 
00053 namespace tbb {
00054 
00055 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00056 
00057 // forward declaration
00058 namespace strict_ppl {
00059 template<typename T, typename A> class concurrent_queue;
00060 }
00061 
00062 template<typename T, typename A> class concurrent_bounded_queue;
00063 
00064 namespace deprecated {
00065 template<typename T, typename A> class concurrent_queue;
00066 }
00067 #endif
00068 
00070 namespace strict_ppl {
00071 
00073 namespace internal {
00074 
00075 using namespace tbb::internal;
00076 
00077 typedef size_t ticket;
00078 
00079 template<typename T> class micro_queue ;
00080 template<typename T> class micro_queue_pop_finalizer ;
00081 template<typename T> class concurrent_queue_base_v3;
00082 
00084 
00087 struct concurrent_queue_rep_base : no_copy {
00088     template<typename T> friend class micro_queue;
00089     template<typename T> friend class concurrent_queue_base_v3;
00090 
00091 protected:
00093     static const size_t phi = 3;
00094 
00095 public:
00096     // must be power of 2
00097     static const size_t n_queue = 8;
00098 
00100     struct page {
00101         page* next;
00102         uintptr_t mask; 
00103     };
00104 
00105     atomic<ticket> head_counter;
00106     char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00107     atomic<ticket> tail_counter;
00108     char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00109 
00111     size_t items_per_page;
00112 
00114     size_t item_size;
00115 
00117     atomic<size_t> n_invalid_entries;
00118 
00119     char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
00120 } ;
00121 
00122 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
00123     return uintptr_t(p)>1;
00124 }
00125 
00127 
00130 class concurrent_queue_page_allocator
00131 {
00132     template<typename T> friend class micro_queue ;
00133     template<typename T> friend class micro_queue_pop_finalizer ;
00134 protected:
00135     virtual ~concurrent_queue_page_allocator() {}
00136 private:
00137     virtual concurrent_queue_rep_base::page* allocate_page() = 0;
00138     virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
00139 } ;
00140 
00141 #if _MSC_VER && !defined(__INTEL_COMPILER)
00142 // unary minus operator applied to unsigned type, result still unsigned
00143 #pragma warning( push )
00144 #pragma warning( disable: 4146 )
00145 #endif
00146 
00148 
00150 template<typename T>
00151 class micro_queue : no_copy {
00152     typedef concurrent_queue_rep_base::page page;
00153 
00155     class destroyer: no_copy {
00156         T& my_value;
00157     public:
00158         destroyer( T& value ) : my_value(value) {}
00159         ~destroyer() {my_value.~T();}          
00160     };
00161 
00162     void copy_item( page& dst, size_t index, const void* src ) {
00163         new( &get_ref(dst,index) ) T(*static_cast<const T*>(src)); 
00164     }
00165 
00166     void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
00167         new( &get_ref(dst,dindex) ) T( get_ref(const_cast<page&>(src),sindex) );
00168     }
00169 
00170     void assign_and_destroy_item( void* dst, page& src, size_t index ) {
00171         T& from = get_ref(src,index);
00172         destroyer d(from);
00173         *static_cast<T*>(dst) = from;
00174     }
00175 
00176     void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
00177 
00178 public:
00179     friend class micro_queue_pop_finalizer<T>;
00180 
00181     struct padded_page: page {
00183         padded_page(); 
00185         void operator=( const padded_page& );
00187         T last;
00188     };
00189 
00190     static T& get_ref( page& p, size_t index ) {
00191         return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
00192     }
00193 
00194     atomic<page*> head_page;
00195     atomic<ticket> head_counter;
00196 
00197     atomic<page*> tail_page;
00198     atomic<ticket> tail_counter;
00199 
00200     spin_mutex page_mutex;
00201     
00202     void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ;
00203 
00204     bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
00205 
00206     micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ;
00207 
00208     page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
00209 
00210     void invalidate_page_and_rethrow( ticket k ) ;
00211 };
00212 
00213 template<typename T>
00214 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
00215     atomic_backoff backoff;
00216     do {
00217         backoff.pause();
00218         if( counter&1 ) {
00219             ++rb.n_invalid_entries;
00220             throw_exception( eid_bad_last_alloc );
00221         }
00222     } while( counter!=k ) ;
00223 }
00224 
00225 template<typename T>
00226 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) {
00227     k &= -concurrent_queue_rep_base::n_queue;
00228     page* p = NULL;
00229     size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00230     if( !index ) {
00231         __TBB_TRY {
00232             concurrent_queue_page_allocator& pa = base;
00233             p = pa.allocate_page();
00234         } __TBB_CATCH (...) {
00235             ++base.my_rep->n_invalid_entries;
00236             invalidate_page_and_rethrow( k );
00237         }
00238         p->mask = 0;
00239         p->next = NULL;
00240     }
00241     
00242     if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
00243         
00244     if( p ) {
00245         spin_mutex::scoped_lock lock( page_mutex );
00246         page* q = tail_page;
00247         if( is_valid_page(q) )
00248             q->next = p;
00249         else
00250             head_page = p; 
00251         tail_page = p;
00252     } else {
00253         p = tail_page;
00254     }
00255    
00256     __TBB_TRY {
00257         copy_item( *p, index, item );
00258         // If no exception was thrown, mark item as present.
00259         p->mask |= uintptr_t(1)<<index;
00260         tail_counter += concurrent_queue_rep_base::n_queue; 
00261     } __TBB_CATCH (...) {
00262         ++base.my_rep->n_invalid_entries;
00263         tail_counter += concurrent_queue_rep_base::n_queue; 
00264         __TBB_RETHROW();
00265     }
00266 }
00267 
00268 template<typename T>
00269 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
00270     k &= -concurrent_queue_rep_base::n_queue;
00271     if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
00272     if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
00273     page& p = *head_page;
00274     __TBB_ASSERT( &p, NULL );
00275     size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00276     bool success = false; 
00277     {
00278         micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL ); 
00279         if( p.mask & uintptr_t(1)<<index ) {
00280             success = true;
00281             assign_and_destroy_item( dst, p, index );
00282         } else {
00283             --base.my_rep->n_invalid_entries;
00284         }
00285     }
00286     return success;
00287 }
00288 
00289 template<typename T>
00290 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) {
00291     head_counter = src.head_counter;
00292     tail_counter = src.tail_counter;
00293     page_mutex   = src.page_mutex;
00294 
00295     const page* srcp = src.head_page;
00296     if( is_valid_page(srcp) ) {
00297         ticket g_index = head_counter;
00298         __TBB_TRY {
00299             size_t n_items  = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
00300             size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00301             size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
00302 
00303             head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
00304             page* cur_page = head_page;
00305 
00306             if( srcp != src.tail_page ) {
00307                 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
00308                     cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index );
00309                     cur_page = cur_page->next;
00310                 }
00311 
00312                 __TBB_ASSERT( srcp==src.tail_page, NULL );
00313                 size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00314                 if( last_index==0 ) last_index = base.my_rep->items_per_page;
00315 
00316                 cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
00317                 cur_page = cur_page->next;
00318             }
00319             tail_page = cur_page;
00320         } __TBB_CATCH (...) {
00321             invalidate_page_and_rethrow( g_index );
00322         }
00323     } else {
00324         head_page = tail_page = NULL;
00325     }
00326     return *this;
00327 }
00328 
00329 template<typename T>
00330 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
00331     // Append an invalid page at address 1 so that no more pushes are allowed.
00332     page* invalid_page = (page*)uintptr_t(1);
00333     {
00334         spin_mutex::scoped_lock lock( page_mutex );
00335         tail_counter = k+concurrent_queue_rep_base::n_queue+1;
00336         page* q = tail_page;
00337         if( is_valid_page(q) )
00338             q->next = invalid_page;
00339         else
00340             head_page = invalid_page;
00341         tail_page = invalid_page;
00342     }
00343     __TBB_RETHROW();
00344 }
00345 
00346 template<typename T>
00347 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) {
00348     concurrent_queue_page_allocator& pa = base;
00349     page* new_page = pa.allocate_page();
00350     new_page->next = NULL;
00351     new_page->mask = src_page->mask;
00352     for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
00353         if( new_page->mask & uintptr_t(1)<<begin_in_page )
00354             copy_item( *new_page, begin_in_page, *src_page, begin_in_page );
00355     return new_page;
00356 }
00357 
00358 template<typename T>
00359 class micro_queue_pop_finalizer: no_copy {
00360     typedef concurrent_queue_rep_base::page page;
00361     ticket my_ticket;
00362     micro_queue<T>& my_queue;
00363     page* my_page; 
00364     concurrent_queue_page_allocator& allocator;
00365 public:
00366     micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
00367         my_ticket(k), my_queue(queue), my_page(p), allocator(b)
00368     {}
00369     ~micro_queue_pop_finalizer() ;
00370 };
00371 
00372 template<typename T>
00373 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
00374     page* p = my_page;
00375     if( is_valid_page(p) ) {
00376         spin_mutex::scoped_lock lock( my_queue.page_mutex );
00377         page* q = p->next;
00378         my_queue.head_page = q;
00379         if( !is_valid_page(q) ) {
00380             my_queue.tail_page = NULL;
00381         }
00382     }
00383     my_queue.head_counter = my_ticket;
00384     if( is_valid_page(p) ) {
00385         allocator.deallocate_page( p );
00386     }
00387 }
00388 
00389 #if _MSC_VER && !defined(__INTEL_COMPILER)
00390 #pragma warning( pop )
00391 #endif // warning 4146 is back
00392 
00393 template<typename T> class concurrent_queue_iterator_rep ;
00394 template<typename T> class concurrent_queue_iterator_base_v3;
00395 
00397 
00400 template<typename T>
00401 struct concurrent_queue_rep : public concurrent_queue_rep_base {
00402     micro_queue<T> array[n_queue];
00403 
00405     static size_t index( ticket k ) {
00406         return k*phi%n_queue;
00407     }
00408 
00409     micro_queue<T>& choose( ticket k ) {
00410         // The formula here approximates LRU in a cache-oblivious way.
00411         return array[index(k)];
00412     }
00413 };
00414 
00416 
00420 template<typename T>
00421 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
00423     concurrent_queue_rep<T>* my_rep;
00424 
00425     friend struct concurrent_queue_rep<T>;
00426     friend class micro_queue<T>;
00427     friend class concurrent_queue_iterator_rep<T>;
00428     friend class concurrent_queue_iterator_base_v3<T>;
00429 
00430 protected:
00431     typedef typename concurrent_queue_rep<T>::page page;
00432 
00433 private:
00434     typedef typename micro_queue<T>::padded_page padded_page;
00435 
00436     /* override */ virtual page *allocate_page() {
00437         concurrent_queue_rep<T>& r = *my_rep;
00438         size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
00439         return reinterpret_cast<page*>(allocate_block ( n ));
00440     }
00441 
00442     /* override */ virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
00443         concurrent_queue_rep<T>& r = *my_rep;
00444         size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
00445         deallocate_block( reinterpret_cast<void*>(p), n );
00446     }
00447 
00449     virtual void *allocate_block( size_t n ) = 0;
00450 
00452     virtual void deallocate_block( void *p, size_t n ) = 0;
00453 
00454 protected:
00455     concurrent_queue_base_v3();
00456 
00457     /* override */ virtual ~concurrent_queue_base_v3() {
00458 #if __TBB_USE_ASSERT
00459         size_t nq = my_rep->n_queue;
00460         for( size_t i=0; i<nq; i++ )
00461             __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
00462 #endif /* __TBB_USE_ASSERT */
00463         cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
00464     }
00465 
00467     void internal_push( const void* src ) {
00468         concurrent_queue_rep<T>& r = *my_rep;
00469         ticket k = r.tail_counter++;
00470         r.choose(k).push( src, k, *this );
00471     }
00472 
00474 
00475     bool internal_try_pop( void* dst ) ;
00476 
00478     size_t internal_size() const ;
00479 
00481     bool internal_empty() const ;
00482 
00484     /* note that the name may be misleading, but it remains so due to a historical accident. */
00485     void internal_finish_clear() ;
00486 
00488     void internal_throw_exception() const {
00489         throw_exception( eid_bad_alloc );
00490     }
00491 
00493     void assign( const concurrent_queue_base_v3& src ) ;
00494 };
00495 
00496 template<typename T>
00497 concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
00498     const size_t item_size = sizeof(T);
00499     my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
00500     __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
00501     __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
00502     __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
00503     __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
00504     memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
00505     my_rep->item_size = item_size;
00506     my_rep->items_per_page = item_size<=8 ? 32 :
00507                              item_size<=16 ? 16 : 
00508                              item_size<=32 ? 8 :
00509                              item_size<=64 ? 4 :
00510                              item_size<=128 ? 2 :
00511                              1;
00512 }
00513 
00514 template<typename T>
00515 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
00516     concurrent_queue_rep<T>& r = *my_rep;
00517     ticket k;
00518     do {
00519         k = r.head_counter;
00520         for(;;) {
00521             if( r.tail_counter<=k ) {
00522                 // Queue is empty 
00523                 return false;
00524             }
00525             // Queue had item with ticket k when we looked.  Attempt to get that item.
00526             ticket tk=k;
00527 #if defined(_MSC_VER) && defined(_Wp64)
00528     #pragma warning (push)
00529     #pragma warning (disable: 4267)
00530 #endif
00531             k = r.head_counter.compare_and_swap( tk+1, tk );
00532 #if defined(_MSC_VER) && defined(_Wp64)
00533     #pragma warning (pop)
00534 #endif
00535             if( k==tk )
00536                 break;
00537             // Another thread snatched the item, retry.
00538         }
00539     } while( !r.choose( k ).pop( dst, k, *this ) );
00540     return true;
00541 }
00542 
00543 template<typename T>
00544 size_t concurrent_queue_base_v3<T>::internal_size() const {
00545     concurrent_queue_rep<T>& r = *my_rep;
00546     __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
00547     ticket hc = r.head_counter;
00548     size_t nie = r.n_invalid_entries;
00549     ticket tc = r.tail_counter;
00550     __TBB_ASSERT( hc!=tc || !nie, NULL );
00551     ptrdiff_t sz = tc-hc-nie;
00552     return sz<0 ? 0 :  size_t(sz);
00553 }
00554 
00555 template<typename T>
00556 bool concurrent_queue_base_v3<T>::internal_empty() const {
00557     concurrent_queue_rep<T>& r = *my_rep;
00558     ticket tc = r.tail_counter;
00559     ticket hc = r.head_counter;
00560     // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
00561     return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
00562 }
00563 
00564 template<typename T>
00565 void concurrent_queue_base_v3<T>::internal_finish_clear() {
00566     concurrent_queue_rep<T>& r = *my_rep;
00567     size_t nq = r.n_queue;
00568     for( size_t i=0; i<nq; ++i ) {
00569         page* tp = r.array[i].tail_page;
00570         if( is_valid_page(tp) ) {
00571             __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
00572             deallocate_page( tp );
00573             r.array[i].tail_page = NULL;
00574         } else 
00575             __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
00576     }
00577 }
00578 
00579 template<typename T>
00580 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) {
00581     concurrent_queue_rep<T>& r = *my_rep;
00582     r.items_per_page = src.my_rep->items_per_page;
00583 
00584     // copy concurrent_queue_rep.
00585     r.head_counter = src.my_rep->head_counter;
00586     r.tail_counter = src.my_rep->tail_counter;
00587     r.n_invalid_entries = src.my_rep->n_invalid_entries;
00588 
00589     // copy micro_queues
00590     for( size_t i = 0; i<r.n_queue; ++i )
00591         r.array[i].assign( src.my_rep->array[i], *this);
00592 
00593     __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter, 
00594             "the source concurrent queue should not be concurrently modified." );
00595 }
00596 
00597 template<typename Container, typename Value> class concurrent_queue_iterator;
00598 
00599 template<typename T>
00600 class concurrent_queue_iterator_rep: no_assign {
00601     typedef typename micro_queue<T>::padded_page padded_page;
00602 public:
00603     ticket head_counter;
00604     const concurrent_queue_base_v3<T>& my_queue;
00605     typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
00606     concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
00607         head_counter(queue.my_rep->head_counter),
00608         my_queue(queue)
00609     {
00610         for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
00611             array[k] = queue.my_rep->array[k].head_page;
00612     }
00613 
00615     bool get_item( T*& item, size_t k ) ;
00616 };
00617 
00618 template<typename T>
00619 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
00620     if( k==my_queue.my_rep->tail_counter ) {
00621         item = NULL;
00622         return true;
00623     } else {
00624         typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
00625         __TBB_ASSERT(p,NULL);
00626         size_t i = k/concurrent_queue_rep<T>::n_queue & (my_queue.my_rep->items_per_page-1);
00627         item = &micro_queue<T>::get_ref(*p,i);
00628         return (p->mask & uintptr_t(1)<<i)!=0;
00629     }
00630 }
00631 
00633 
00634 template<typename Value>
00635 class concurrent_queue_iterator_base_v3 : no_assign {
00637 
00638     concurrent_queue_iterator_rep<Value>* my_rep;
00639 
00640     template<typename C, typename T, typename U>
00641     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00642 
00643     template<typename C, typename T, typename U>
00644     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00645 protected:
00647     Value* my_item;
00648 
00650     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
00651 #if __GNUC__==4&&__GNUC_MINOR__==3
00652         // to get around a possible gcc 4.3 bug
00653         __asm__ __volatile__("": : :"memory");
00654 #endif
00655     }
00656 
00658     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i )
00659     : no_assign(), my_rep(NULL), my_item(NULL) {
00660         assign(i);
00661     }
00662 
00664     concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
00665 
00667     void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
00668 
00670     void advance() ;
00671 
00673     ~concurrent_queue_iterator_base_v3() {
00674         cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00675         my_rep = NULL;
00676     }
00677 };
00678 
00679 template<typename Value>
00680 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
00681     my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00682     new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
00683     size_t k = my_rep->head_counter;
00684     if( !my_rep->get_item(my_item, k) ) advance();
00685 }
00686 
00687 template<typename Value>
00688 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
00689     if( my_rep!=other.my_rep ) {
00690         if( my_rep ) {
00691             cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00692             my_rep = NULL;
00693         }
00694         if( other.my_rep ) {
00695             my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00696             new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
00697         }
00698     }
00699     my_item = other.my_item;
00700 }
00701 
00702 template<typename Value>
00703 void concurrent_queue_iterator_base_v3<Value>::advance() {
00704     __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );  
00705     size_t k = my_rep->head_counter;
00706     const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
00707 #if TBB_USE_ASSERT
00708     Value* tmp;
00709     my_rep->get_item(tmp,k);
00710     __TBB_ASSERT( my_item==tmp, NULL );
00711 #endif /* TBB_USE_ASSERT */
00712     size_t i = k/concurrent_queue_rep<Value>::n_queue & (queue.my_rep->items_per_page-1);
00713     if( i==queue.my_rep->items_per_page-1 ) {
00714         typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
00715         root = root->next;
00716     }
00717     // advance k
00718     my_rep->head_counter = ++k;
00719     if( !my_rep->get_item(my_item, k) ) advance();
00720 }
00721 
00723 
00724 template<typename T> struct tbb_remove_cv {typedef T type;};
00725 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
00726 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
00727 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
00728 
00730 
00732 template<typename Container, typename Value>
00733 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
00734         public std::iterator<std::forward_iterator_tag,Value> {
00735 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00736     template<typename T, class A>
00737     friend class ::tbb::strict_ppl::concurrent_queue;
00738 #else
00739 public: // workaround for MSVC
00740 #endif 
00741 
00742     concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
00743         concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
00744     {
00745     }
00746 
00747 public:
00748     concurrent_queue_iterator() {}
00749 
00750     concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
00751         concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
00752     {}
00753 
00755     concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00756         assign(other);
00757         return *this;
00758     }
00759 
00761     Value& operator*() const {
00762         return *static_cast<Value*>(this->my_item);
00763     }
00764 
00765     Value* operator->() const {return &operator*();}
00766 
00768     concurrent_queue_iterator& operator++() {
00769         this->advance();
00770         return *this;
00771     }
00772 
00774     Value* operator++(int) {
00775         Value* result = &operator*();
00776         operator++();
00777         return result;
00778     }
00779 }; // concurrent_queue_iterator
00780 
00781 
00782 template<typename C, typename T, typename U>
00783 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00784     return i.my_item==j.my_item;
00785 }
00786 
00787 template<typename C, typename T, typename U>
00788 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00789     return i.my_item!=j.my_item;
00790 }
00791 
00792 } // namespace internal
00793 
00795 
00796 } // namespace strict_ppl
00797 
00799 namespace internal {
00800 
00801 class concurrent_queue_rep;
00802 class concurrent_queue_iterator_rep;
00803 class concurrent_queue_iterator_base_v3;
00804 template<typename Container, typename Value> class concurrent_queue_iterator;
00805 
00807 
00809 class concurrent_queue_base_v3: no_copy {
00811     concurrent_queue_rep* my_rep;
00812 
00813     friend class concurrent_queue_rep;
00814     friend struct micro_queue;
00815     friend class micro_queue_pop_finalizer;
00816     friend class concurrent_queue_iterator_rep;
00817     friend class concurrent_queue_iterator_base_v3;
00818 protected:
00820     struct page {
00821         page* next;
00822         uintptr_t mask; 
00823     };
00824 
00826     ptrdiff_t my_capacity;
00827    
00829     size_t items_per_page;
00830 
00832     size_t item_size;
00833 
00834 #if __TBB_GCC_3_3_PROTECTED_BROKEN
00835 public:
00836 #endif
00837     template<typename T>
00838     struct padded_page: page {
00840         padded_page(); 
00842         void operator=( const padded_page& );
00844         T last;
00845     };
00846 
00847 private:
00848     virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
00849     virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
00850 protected:
00851     __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
00852     virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
00853 
00855     void __TBB_EXPORTED_METHOD internal_push( const void* src );
00856 
00858     void __TBB_EXPORTED_METHOD internal_pop( void* dst );
00859 
00861     bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
00862 
00864 
00865     bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
00866 
00868     ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
00869 
00871     bool __TBB_EXPORTED_METHOD internal_empty() const;
00872 
00874     void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
00875 
00877     virtual page *allocate_page() = 0;
00878 
00880     virtual void deallocate_page( page *p ) = 0;
00881 
00883     /* note that the name may be misleading, but it remains so due to a historical accident. */
00884     void __TBB_EXPORTED_METHOD internal_finish_clear() ;
00885 
00887     void __TBB_EXPORTED_METHOD internal_throw_exception() const;
00888 
00890     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
00891 
00892 private:
00893     virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
00894 };
00895 
00897 
00898 class concurrent_queue_iterator_base_v3 {
00900 
00901     concurrent_queue_iterator_rep* my_rep;
00902 
00903     template<typename C, typename T, typename U>
00904     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00905 
00906     template<typename C, typename T, typename U>
00907     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00908 
00909     void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
00910 protected:
00912     void* my_item;
00913 
00915     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
00916 
00918     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00919         assign(i);
00920     }
00921 
00923 
00924     __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
00925 
00927     __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
00928 
00930     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
00931 
00933     void __TBB_EXPORTED_METHOD advance();
00934 
00936     __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
00937 };
00938 
00939 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
00940 
00942 
00944 template<typename Container, typename Value>
00945 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
00946         public std::iterator<std::forward_iterator_tag,Value> {
00947 
00948 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
00949     template<typename T, class A>
00950     friend class ::tbb::concurrent_bounded_queue;
00951 
00952     template<typename T, class A>
00953     friend class ::tbb::deprecated::concurrent_queue;
00954 #else
00955 public: // workaround for MSVC
00956 #endif 
00957 
00958     concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
00959         concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
00960     {
00961     }
00962 
00963 public:
00964     concurrent_queue_iterator() {}
00965 
00968     concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
00969         concurrent_queue_iterator_base_v3(other)
00970     {}
00971 
00973     concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00974         assign(other);
00975         return *this;
00976     }
00977 
00979     Value& operator*() const {
00980         return *static_cast<Value*>(my_item);
00981     }
00982 
00983     Value* operator->() const {return &operator*();}
00984 
00986     concurrent_queue_iterator& operator++() {
00987         advance();
00988         return *this;
00989     }
00990 
00992     Value* operator++(int) {
00993         Value* result = &operator*();
00994         operator++();
00995         return result;
00996     }
00997 }; // concurrent_queue_iterator
00998 
00999 
01000 template<typename C, typename T, typename U>
01001 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
01002     return i.my_item==j.my_item;
01003 }
01004 
01005 template<typename C, typename T, typename U>
01006 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
01007     return i.my_item!=j.my_item;
01008 }
01009 
01010 } // namespace internal;
01011 
01013 
01014 } // namespace tbb
01015 
01016 #endif /* __TBB_concurrent_queue_internal_H */

Copyright © 2007-2010 by The Shadowrun: Awakened Team. This work is licensed under the GNU Lesser General Public License 3.

GNU Lesser General Public License 3 Sourceforge.net