![]() |
Shadowrun: Awakened 29 September 2011 - Build 871
|
00001 /* 00002 Copyright 2005-2010 Intel Corporation. All Rights Reserved. 00003 00004 This file is part of Threading Building Blocks. 00005 00006 Threading Building Blocks is free software; you can redistribute it 00007 and/or modify it under the terms of the GNU General Public License 00008 version 2 as published by the Free Software Foundation. 00009 00010 Threading Building Blocks is distributed in the hope that it will be 00011 useful, but WITHOUT ANY WARRANTY; without even the implied warranty 00012 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00013 GNU General Public License for more details. 00014 00015 You should have received a copy of the GNU General Public License 00016 along with Threading Building Blocks; if not, write to the Free Software 00017 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 00018 00019 As a special exception, you may use this file as part of a free software 00020 library without restriction. Specifically, if other files instantiate 00021 templates or use macros or inline functions from this file, or you compile 00022 this file and link it with other files to produce an executable, this 00023 file does not by itself cause the resulting executable to be covered by 00024 the GNU General Public License. This exception does not however 00025 invalidate any other reasons why the executable file might be covered by 00026 the GNU General Public License. 00027 */ 00028 00029 #ifndef __TBB_concurrent_queue_internal_H 00030 #define __TBB_concurrent_queue_internal_H 00031 00032 #include "tbb_stddef.h" 00033 #include "tbb_machine.h" 00034 #include "atomic.h" 00035 #include "spin_mutex.h" 00036 #include "cache_aligned_allocator.h" 00037 #include "tbb_exception.h" 00038 #include <new> 00039 00040 #if !TBB_USE_EXCEPTIONS && _MSC_VER 00041 // Suppress "C++ exception handler used, but unwind semantics are not enabled" warning in STL headers 00042 #pragma warning (push) 00043 #pragma warning (disable: 4530) 00044 #endif 00045 00046 #include <iterator> 00047 00048 #if !TBB_USE_EXCEPTIONS && _MSC_VER 00049 #pragma warning (pop) 00050 #endif 00051 00052 00053 namespace tbb { 00054 00055 #if !__TBB_TEMPLATE_FRIENDS_BROKEN 00056 00057 // forward declaration 00058 namespace strict_ppl { 00059 template<typename T, typename A> class concurrent_queue; 00060 } 00061 00062 template<typename T, typename A> class concurrent_bounded_queue; 00063 00064 namespace deprecated { 00065 template<typename T, typename A> class concurrent_queue; 00066 } 00067 #endif 00068 00070 namespace strict_ppl { 00071 00073 namespace internal { 00074 00075 using namespace tbb::internal; 00076 00077 typedef size_t ticket; 00078 00079 template<typename T> class micro_queue ; 00080 template<typename T> class micro_queue_pop_finalizer ; 00081 template<typename T> class concurrent_queue_base_v3; 00082 00084 00087 struct concurrent_queue_rep_base : no_copy { 00088 template<typename T> friend class micro_queue; 00089 template<typename T> friend class concurrent_queue_base_v3; 00090 00091 protected: 00093 static const size_t phi = 3; 00094 00095 public: 00096 // must be power of 2 00097 static const size_t n_queue = 8; 00098 00100 struct page { 00101 page* next; 00102 uintptr_t mask; 00103 }; 00104 00105 atomic<ticket> head_counter; 00106 char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)]; 00107 atomic<ticket> tail_counter; 00108 char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)]; 00109 00111 size_t items_per_page; 00112 00114 size_t item_size; 00115 00117 atomic<size_t> n_invalid_entries; 00118 00119 char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)]; 00120 } ; 00121 00122 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) { 00123 return uintptr_t(p)>1; 00124 } 00125 00127 00130 class concurrent_queue_page_allocator 00131 { 00132 template<typename T> friend class micro_queue ; 00133 template<typename T> friend class micro_queue_pop_finalizer ; 00134 protected: 00135 virtual ~concurrent_queue_page_allocator() {} 00136 private: 00137 virtual concurrent_queue_rep_base::page* allocate_page() = 0; 00138 virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0; 00139 } ; 00140 00141 #if _MSC_VER && !defined(__INTEL_COMPILER) 00142 // unary minus operator applied to unsigned type, result still unsigned 00143 #pragma warning( push ) 00144 #pragma warning( disable: 4146 ) 00145 #endif 00146 00148 00150 template<typename T> 00151 class micro_queue : no_copy { 00152 typedef concurrent_queue_rep_base::page page; 00153 00155 class destroyer: no_copy { 00156 T& my_value; 00157 public: 00158 destroyer( T& value ) : my_value(value) {} 00159 ~destroyer() {my_value.~T();} 00160 }; 00161 00162 void copy_item( page& dst, size_t index, const void* src ) { 00163 new( &get_ref(dst,index) ) T(*static_cast<const T*>(src)); 00164 } 00165 00166 void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) { 00167 new( &get_ref(dst,dindex) ) T( get_ref(const_cast<page&>(src),sindex) ); 00168 } 00169 00170 void assign_and_destroy_item( void* dst, page& src, size_t index ) { 00171 T& from = get_ref(src,index); 00172 destroyer d(from); 00173 *static_cast<T*>(dst) = from; 00174 } 00175 00176 void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ; 00177 00178 public: 00179 friend class micro_queue_pop_finalizer<T>; 00180 00181 struct padded_page: page { 00183 padded_page(); 00185 void operator=( const padded_page& ); 00187 T last; 00188 }; 00189 00190 static T& get_ref( page& p, size_t index ) { 00191 return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index]; 00192 } 00193 00194 atomic<page*> head_page; 00195 atomic<ticket> head_counter; 00196 00197 atomic<page*> tail_page; 00198 atomic<ticket> tail_counter; 00199 00200 spin_mutex page_mutex; 00201 00202 void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ; 00203 00204 bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ; 00205 00206 micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ; 00207 00208 page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ; 00209 00210 void invalidate_page_and_rethrow( ticket k ) ; 00211 }; 00212 00213 template<typename T> 00214 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const { 00215 atomic_backoff backoff; 00216 do { 00217 backoff.pause(); 00218 if( counter&1 ) { 00219 ++rb.n_invalid_entries; 00220 throw_exception( eid_bad_last_alloc ); 00221 } 00222 } while( counter!=k ) ; 00223 } 00224 00225 template<typename T> 00226 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) { 00227 k &= -concurrent_queue_rep_base::n_queue; 00228 page* p = NULL; 00229 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1); 00230 if( !index ) { 00231 __TBB_TRY { 00232 concurrent_queue_page_allocator& pa = base; 00233 p = pa.allocate_page(); 00234 } __TBB_CATCH (...) { 00235 ++base.my_rep->n_invalid_entries; 00236 invalidate_page_and_rethrow( k ); 00237 } 00238 p->mask = 0; 00239 p->next = NULL; 00240 } 00241 00242 if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep ); 00243 00244 if( p ) { 00245 spin_mutex::scoped_lock lock( page_mutex ); 00246 page* q = tail_page; 00247 if( is_valid_page(q) ) 00248 q->next = p; 00249 else 00250 head_page = p; 00251 tail_page = p; 00252 } else { 00253 p = tail_page; 00254 } 00255 00256 __TBB_TRY { 00257 copy_item( *p, index, item ); 00258 // If no exception was thrown, mark item as present. 00259 p->mask |= uintptr_t(1)<<index; 00260 tail_counter += concurrent_queue_rep_base::n_queue; 00261 } __TBB_CATCH (...) { 00262 ++base.my_rep->n_invalid_entries; 00263 tail_counter += concurrent_queue_rep_base::n_queue; 00264 __TBB_RETHROW(); 00265 } 00266 } 00267 00268 template<typename T> 00269 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) { 00270 k &= -concurrent_queue_rep_base::n_queue; 00271 if( head_counter!=k ) spin_wait_until_eq( head_counter, k ); 00272 if( tail_counter==k ) spin_wait_while_eq( tail_counter, k ); 00273 page& p = *head_page; 00274 __TBB_ASSERT( &p, NULL ); 00275 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1); 00276 bool success = false; 00277 { 00278 micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL ); 00279 if( p.mask & uintptr_t(1)<<index ) { 00280 success = true; 00281 assign_and_destroy_item( dst, p, index ); 00282 } else { 00283 --base.my_rep->n_invalid_entries; 00284 } 00285 } 00286 return success; 00287 } 00288 00289 template<typename T> 00290 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) { 00291 head_counter = src.head_counter; 00292 tail_counter = src.tail_counter; 00293 page_mutex = src.page_mutex; 00294 00295 const page* srcp = src.head_page; 00296 if( is_valid_page(srcp) ) { 00297 ticket g_index = head_counter; 00298 __TBB_TRY { 00299 size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue; 00300 size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1); 00301 size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page; 00302 00303 head_page = make_copy( base, srcp, index, end_in_first_page, g_index ); 00304 page* cur_page = head_page; 00305 00306 if( srcp != src.tail_page ) { 00307 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) { 00308 cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index ); 00309 cur_page = cur_page->next; 00310 } 00311 00312 __TBB_ASSERT( srcp==src.tail_page, NULL ); 00313 size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1); 00314 if( last_index==0 ) last_index = base.my_rep->items_per_page; 00315 00316 cur_page->next = make_copy( base, srcp, 0, last_index, g_index ); 00317 cur_page = cur_page->next; 00318 } 00319 tail_page = cur_page; 00320 } __TBB_CATCH (...) { 00321 invalidate_page_and_rethrow( g_index ); 00322 } 00323 } else { 00324 head_page = tail_page = NULL; 00325 } 00326 return *this; 00327 } 00328 00329 template<typename T> 00330 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) { 00331 // Append an invalid page at address 1 so that no more pushes are allowed. 00332 page* invalid_page = (page*)uintptr_t(1); 00333 { 00334 spin_mutex::scoped_lock lock( page_mutex ); 00335 tail_counter = k+concurrent_queue_rep_base::n_queue+1; 00336 page* q = tail_page; 00337 if( is_valid_page(q) ) 00338 q->next = invalid_page; 00339 else 00340 head_page = invalid_page; 00341 tail_page = invalid_page; 00342 } 00343 __TBB_RETHROW(); 00344 } 00345 00346 template<typename T> 00347 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) { 00348 concurrent_queue_page_allocator& pa = base; 00349 page* new_page = pa.allocate_page(); 00350 new_page->next = NULL; 00351 new_page->mask = src_page->mask; 00352 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index ) 00353 if( new_page->mask & uintptr_t(1)<<begin_in_page ) 00354 copy_item( *new_page, begin_in_page, *src_page, begin_in_page ); 00355 return new_page; 00356 } 00357 00358 template<typename T> 00359 class micro_queue_pop_finalizer: no_copy { 00360 typedef concurrent_queue_rep_base::page page; 00361 ticket my_ticket; 00362 micro_queue<T>& my_queue; 00363 page* my_page; 00364 concurrent_queue_page_allocator& allocator; 00365 public: 00366 micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) : 00367 my_ticket(k), my_queue(queue), my_page(p), allocator(b) 00368 {} 00369 ~micro_queue_pop_finalizer() ; 00370 }; 00371 00372 template<typename T> 00373 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() { 00374 page* p = my_page; 00375 if( is_valid_page(p) ) { 00376 spin_mutex::scoped_lock lock( my_queue.page_mutex ); 00377 page* q = p->next; 00378 my_queue.head_page = q; 00379 if( !is_valid_page(q) ) { 00380 my_queue.tail_page = NULL; 00381 } 00382 } 00383 my_queue.head_counter = my_ticket; 00384 if( is_valid_page(p) ) { 00385 allocator.deallocate_page( p ); 00386 } 00387 } 00388 00389 #if _MSC_VER && !defined(__INTEL_COMPILER) 00390 #pragma warning( pop ) 00391 #endif // warning 4146 is back 00392 00393 template<typename T> class concurrent_queue_iterator_rep ; 00394 template<typename T> class concurrent_queue_iterator_base_v3; 00395 00397 00400 template<typename T> 00401 struct concurrent_queue_rep : public concurrent_queue_rep_base { 00402 micro_queue<T> array[n_queue]; 00403 00405 static size_t index( ticket k ) { 00406 return k*phi%n_queue; 00407 } 00408 00409 micro_queue<T>& choose( ticket k ) { 00410 // The formula here approximates LRU in a cache-oblivious way. 00411 return array[index(k)]; 00412 } 00413 }; 00414 00416 00420 template<typename T> 00421 class concurrent_queue_base_v3: public concurrent_queue_page_allocator { 00423 concurrent_queue_rep<T>* my_rep; 00424 00425 friend struct concurrent_queue_rep<T>; 00426 friend class micro_queue<T>; 00427 friend class concurrent_queue_iterator_rep<T>; 00428 friend class concurrent_queue_iterator_base_v3<T>; 00429 00430 protected: 00431 typedef typename concurrent_queue_rep<T>::page page; 00432 00433 private: 00434 typedef typename micro_queue<T>::padded_page padded_page; 00435 00436 /* override */ virtual page *allocate_page() { 00437 concurrent_queue_rep<T>& r = *my_rep; 00438 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T); 00439 return reinterpret_cast<page*>(allocate_block ( n )); 00440 } 00441 00442 /* override */ virtual void deallocate_page( concurrent_queue_rep_base::page *p ) { 00443 concurrent_queue_rep<T>& r = *my_rep; 00444 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T); 00445 deallocate_block( reinterpret_cast<void*>(p), n ); 00446 } 00447 00449 virtual void *allocate_block( size_t n ) = 0; 00450 00452 virtual void deallocate_block( void *p, size_t n ) = 0; 00453 00454 protected: 00455 concurrent_queue_base_v3(); 00456 00457 /* override */ virtual ~concurrent_queue_base_v3() { 00458 #if __TBB_USE_ASSERT 00459 size_t nq = my_rep->n_queue; 00460 for( size_t i=0; i<nq; i++ ) 00461 __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" ); 00462 #endif /* __TBB_USE_ASSERT */ 00463 cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1); 00464 } 00465 00467 void internal_push( const void* src ) { 00468 concurrent_queue_rep<T>& r = *my_rep; 00469 ticket k = r.tail_counter++; 00470 r.choose(k).push( src, k, *this ); 00471 } 00472 00474 00475 bool internal_try_pop( void* dst ) ; 00476 00478 size_t internal_size() const ; 00479 00481 bool internal_empty() const ; 00482 00484 /* note that the name may be misleading, but it remains so due to a historical accident. */ 00485 void internal_finish_clear() ; 00486 00488 void internal_throw_exception() const { 00489 throw_exception( eid_bad_alloc ); 00490 } 00491 00493 void assign( const concurrent_queue_base_v3& src ) ; 00494 }; 00495 00496 template<typename T> 00497 concurrent_queue_base_v3<T>::concurrent_queue_base_v3() { 00498 const size_t item_size = sizeof(T); 00499 my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1); 00500 __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" ); 00501 __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" ); 00502 __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" ); 00503 __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" ); 00504 memset(my_rep,0,sizeof(concurrent_queue_rep<T>)); 00505 my_rep->item_size = item_size; 00506 my_rep->items_per_page = item_size<=8 ? 32 : 00507 item_size<=16 ? 16 : 00508 item_size<=32 ? 8 : 00509 item_size<=64 ? 4 : 00510 item_size<=128 ? 2 : 00511 1; 00512 } 00513 00514 template<typename T> 00515 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) { 00516 concurrent_queue_rep<T>& r = *my_rep; 00517 ticket k; 00518 do { 00519 k = r.head_counter; 00520 for(;;) { 00521 if( r.tail_counter<=k ) { 00522 // Queue is empty 00523 return false; 00524 } 00525 // Queue had item with ticket k when we looked. Attempt to get that item. 00526 ticket tk=k; 00527 #if defined(_MSC_VER) && defined(_Wp64) 00528 #pragma warning (push) 00529 #pragma warning (disable: 4267) 00530 #endif 00531 k = r.head_counter.compare_and_swap( tk+1, tk ); 00532 #if defined(_MSC_VER) && defined(_Wp64) 00533 #pragma warning (pop) 00534 #endif 00535 if( k==tk ) 00536 break; 00537 // Another thread snatched the item, retry. 00538 } 00539 } while( !r.choose( k ).pop( dst, k, *this ) ); 00540 return true; 00541 } 00542 00543 template<typename T> 00544 size_t concurrent_queue_base_v3<T>::internal_size() const { 00545 concurrent_queue_rep<T>& r = *my_rep; 00546 __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL ); 00547 ticket hc = r.head_counter; 00548 size_t nie = r.n_invalid_entries; 00549 ticket tc = r.tail_counter; 00550 __TBB_ASSERT( hc!=tc || !nie, NULL ); 00551 ptrdiff_t sz = tc-hc-nie; 00552 return sz<0 ? 0 : size_t(sz); 00553 } 00554 00555 template<typename T> 00556 bool concurrent_queue_base_v3<T>::internal_empty() const { 00557 concurrent_queue_rep<T>& r = *my_rep; 00558 ticket tc = r.tail_counter; 00559 ticket hc = r.head_counter; 00560 // if tc!=r.tail_counter, the queue was not empty at some point between the two reads. 00561 return tc==r.tail_counter && tc==hc+r.n_invalid_entries ; 00562 } 00563 00564 template<typename T> 00565 void concurrent_queue_base_v3<T>::internal_finish_clear() { 00566 concurrent_queue_rep<T>& r = *my_rep; 00567 size_t nq = r.n_queue; 00568 for( size_t i=0; i<nq; ++i ) { 00569 page* tp = r.array[i].tail_page; 00570 if( is_valid_page(tp) ) { 00571 __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" ); 00572 deallocate_page( tp ); 00573 r.array[i].tail_page = NULL; 00574 } else 00575 __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" ); 00576 } 00577 } 00578 00579 template<typename T> 00580 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) { 00581 concurrent_queue_rep<T>& r = *my_rep; 00582 r.items_per_page = src.my_rep->items_per_page; 00583 00584 // copy concurrent_queue_rep. 00585 r.head_counter = src.my_rep->head_counter; 00586 r.tail_counter = src.my_rep->tail_counter; 00587 r.n_invalid_entries = src.my_rep->n_invalid_entries; 00588 00589 // copy micro_queues 00590 for( size_t i = 0; i<r.n_queue; ++i ) 00591 r.array[i].assign( src.my_rep->array[i], *this); 00592 00593 __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter, 00594 "the source concurrent queue should not be concurrently modified." ); 00595 } 00596 00597 template<typename Container, typename Value> class concurrent_queue_iterator; 00598 00599 template<typename T> 00600 class concurrent_queue_iterator_rep: no_assign { 00601 typedef typename micro_queue<T>::padded_page padded_page; 00602 public: 00603 ticket head_counter; 00604 const concurrent_queue_base_v3<T>& my_queue; 00605 typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue]; 00606 concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) : 00607 head_counter(queue.my_rep->head_counter), 00608 my_queue(queue) 00609 { 00610 for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k ) 00611 array[k] = queue.my_rep->array[k].head_page; 00612 } 00613 00615 bool get_item( T*& item, size_t k ) ; 00616 }; 00617 00618 template<typename T> 00619 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) { 00620 if( k==my_queue.my_rep->tail_counter ) { 00621 item = NULL; 00622 return true; 00623 } else { 00624 typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)]; 00625 __TBB_ASSERT(p,NULL); 00626 size_t i = k/concurrent_queue_rep<T>::n_queue & (my_queue.my_rep->items_per_page-1); 00627 item = µ_queue<T>::get_ref(*p,i); 00628 return (p->mask & uintptr_t(1)<<i)!=0; 00629 } 00630 } 00631 00633 00634 template<typename Value> 00635 class concurrent_queue_iterator_base_v3 : no_assign { 00637 00638 concurrent_queue_iterator_rep<Value>* my_rep; 00639 00640 template<typename C, typename T, typename U> 00641 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ); 00642 00643 template<typename C, typename T, typename U> 00644 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ); 00645 protected: 00647 Value* my_item; 00648 00650 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) { 00651 #if __GNUC__==4&&__GNUC_MINOR__==3 00652 // to get around a possible gcc 4.3 bug 00653 __asm__ __volatile__("": : :"memory"); 00654 #endif 00655 } 00656 00658 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) 00659 : no_assign(), my_rep(NULL), my_item(NULL) { 00660 assign(i); 00661 } 00662 00664 concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ; 00665 00667 void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ; 00668 00670 void advance() ; 00671 00673 ~concurrent_queue_iterator_base_v3() { 00674 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1); 00675 my_rep = NULL; 00676 } 00677 }; 00678 00679 template<typename Value> 00680 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) { 00681 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1); 00682 new( my_rep ) concurrent_queue_iterator_rep<Value>(queue); 00683 size_t k = my_rep->head_counter; 00684 if( !my_rep->get_item(my_item, k) ) advance(); 00685 } 00686 00687 template<typename Value> 00688 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) { 00689 if( my_rep!=other.my_rep ) { 00690 if( my_rep ) { 00691 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1); 00692 my_rep = NULL; 00693 } 00694 if( other.my_rep ) { 00695 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1); 00696 new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep ); 00697 } 00698 } 00699 my_item = other.my_item; 00700 } 00701 00702 template<typename Value> 00703 void concurrent_queue_iterator_base_v3<Value>::advance() { 00704 __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" ); 00705 size_t k = my_rep->head_counter; 00706 const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue; 00707 #if TBB_USE_ASSERT 00708 Value* tmp; 00709 my_rep->get_item(tmp,k); 00710 __TBB_ASSERT( my_item==tmp, NULL ); 00711 #endif /* TBB_USE_ASSERT */ 00712 size_t i = k/concurrent_queue_rep<Value>::n_queue & (queue.my_rep->items_per_page-1); 00713 if( i==queue.my_rep->items_per_page-1 ) { 00714 typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)]; 00715 root = root->next; 00716 } 00717 // advance k 00718 my_rep->head_counter = ++k; 00719 if( !my_rep->get_item(my_item, k) ) advance(); 00720 } 00721 00723 00724 template<typename T> struct tbb_remove_cv {typedef T type;}; 00725 template<typename T> struct tbb_remove_cv<const T> {typedef T type;}; 00726 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;}; 00727 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;}; 00728 00730 00732 template<typename Container, typename Value> 00733 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>, 00734 public std::iterator<std::forward_iterator_tag,Value> { 00735 #if !__TBB_TEMPLATE_FRIENDS_BROKEN 00736 template<typename T, class A> 00737 friend class ::tbb::strict_ppl::concurrent_queue; 00738 #else 00739 public: // workaround for MSVC 00740 #endif 00741 00742 concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) : 00743 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue) 00744 { 00745 } 00746 00747 public: 00748 concurrent_queue_iterator() {} 00749 00750 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) : 00751 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other) 00752 {} 00753 00755 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) { 00756 assign(other); 00757 return *this; 00758 } 00759 00761 Value& operator*() const { 00762 return *static_cast<Value*>(this->my_item); 00763 } 00764 00765 Value* operator->() const {return &operator*();} 00766 00768 concurrent_queue_iterator& operator++() { 00769 this->advance(); 00770 return *this; 00771 } 00772 00774 Value* operator++(int) { 00775 Value* result = &operator*(); 00776 operator++(); 00777 return result; 00778 } 00779 }; // concurrent_queue_iterator 00780 00781 00782 template<typename C, typename T, typename U> 00783 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) { 00784 return i.my_item==j.my_item; 00785 } 00786 00787 template<typename C, typename T, typename U> 00788 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) { 00789 return i.my_item!=j.my_item; 00790 } 00791 00792 } // namespace internal 00793 00795 00796 } // namespace strict_ppl 00797 00799 namespace internal { 00800 00801 class concurrent_queue_rep; 00802 class concurrent_queue_iterator_rep; 00803 class concurrent_queue_iterator_base_v3; 00804 template<typename Container, typename Value> class concurrent_queue_iterator; 00805 00807 00809 class concurrent_queue_base_v3: no_copy { 00811 concurrent_queue_rep* my_rep; 00812 00813 friend class concurrent_queue_rep; 00814 friend struct micro_queue; 00815 friend class micro_queue_pop_finalizer; 00816 friend class concurrent_queue_iterator_rep; 00817 friend class concurrent_queue_iterator_base_v3; 00818 protected: 00820 struct page { 00821 page* next; 00822 uintptr_t mask; 00823 }; 00824 00826 ptrdiff_t my_capacity; 00827 00829 size_t items_per_page; 00830 00832 size_t item_size; 00833 00834 #if __TBB_GCC_3_3_PROTECTED_BROKEN 00835 public: 00836 #endif 00837 template<typename T> 00838 struct padded_page: page { 00840 padded_page(); 00842 void operator=( const padded_page& ); 00844 T last; 00845 }; 00846 00847 private: 00848 virtual void copy_item( page& dst, size_t index, const void* src ) = 0; 00849 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0; 00850 protected: 00851 __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size ); 00852 virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3(); 00853 00855 void __TBB_EXPORTED_METHOD internal_push( const void* src ); 00856 00858 void __TBB_EXPORTED_METHOD internal_pop( void* dst ); 00859 00861 bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src ); 00862 00864 00865 bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst ); 00866 00868 ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const; 00869 00871 bool __TBB_EXPORTED_METHOD internal_empty() const; 00872 00874 void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size ); 00875 00877 virtual page *allocate_page() = 0; 00878 00880 virtual void deallocate_page( page *p ) = 0; 00881 00883 /* note that the name may be misleading, but it remains so due to a historical accident. */ 00884 void __TBB_EXPORTED_METHOD internal_finish_clear() ; 00885 00887 void __TBB_EXPORTED_METHOD internal_throw_exception() const; 00888 00890 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ; 00891 00892 private: 00893 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0; 00894 }; 00895 00897 00898 class concurrent_queue_iterator_base_v3 { 00900 00901 concurrent_queue_iterator_rep* my_rep; 00902 00903 template<typename C, typename T, typename U> 00904 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ); 00905 00906 template<typename C, typename T, typename U> 00907 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ); 00908 00909 void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data ); 00910 protected: 00912 void* my_item; 00913 00915 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {} 00916 00918 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) { 00919 assign(i); 00920 } 00921 00923 00924 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue ); 00925 00927 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data ); 00928 00930 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i ); 00931 00933 void __TBB_EXPORTED_METHOD advance(); 00934 00936 __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3(); 00937 }; 00938 00939 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base; 00940 00942 00944 template<typename Container, typename Value> 00945 class concurrent_queue_iterator: public concurrent_queue_iterator_base, 00946 public std::iterator<std::forward_iterator_tag,Value> { 00947 00948 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER) 00949 template<typename T, class A> 00950 friend class ::tbb::concurrent_bounded_queue; 00951 00952 template<typename T, class A> 00953 friend class ::tbb::deprecated::concurrent_queue; 00954 #else 00955 public: // workaround for MSVC 00956 #endif 00957 00958 concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) : 00959 concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last)) 00960 { 00961 } 00962 00963 public: 00964 concurrent_queue_iterator() {} 00965 00968 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) : 00969 concurrent_queue_iterator_base_v3(other) 00970 {} 00971 00973 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) { 00974 assign(other); 00975 return *this; 00976 } 00977 00979 Value& operator*() const { 00980 return *static_cast<Value*>(my_item); 00981 } 00982 00983 Value* operator->() const {return &operator*();} 00984 00986 concurrent_queue_iterator& operator++() { 00987 advance(); 00988 return *this; 00989 } 00990 00992 Value* operator++(int) { 00993 Value* result = &operator*(); 00994 operator++(); 00995 return result; 00996 } 00997 }; // concurrent_queue_iterator 00998 00999 01000 template<typename C, typename T, typename U> 01001 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) { 01002 return i.my_item==j.my_item; 01003 } 01004 01005 template<typename C, typename T, typename U> 01006 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) { 01007 return i.my_item!=j.my_item; 01008 } 01009 01010 } // namespace internal; 01011 01013 01014 } // namespace tbb 01015 01016 #endif /* __TBB_concurrent_queue_internal_H */
Copyright © 2007-2010 by The Shadowrun: Awakened Team. This work is licensed under the GNU Lesser General Public License 3.