![]() |
Shadowrun: Awakened 29 September 2011 - Build 871
|
00001 /* 00002 Copyright 2005-2010 Intel Corporation. All Rights Reserved. 00003 00004 This file is part of Threading Building Blocks. 00005 00006 Threading Building Blocks is free software; you can redistribute it 00007 and/or modify it under the terms of the GNU General Public License 00008 version 2 as published by the Free Software Foundation. 00009 00010 Threading Building Blocks is distributed in the hope that it will be 00011 useful, but WITHOUT ANY WARRANTY; without even the implied warranty 00012 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00013 GNU General Public License for more details. 00014 00015 You should have received a copy of the GNU General Public License 00016 along with Threading Building Blocks; if not, write to the Free Software 00017 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 00018 00019 As a special exception, you may use this file as part of a free software 00020 library without restriction. Specifically, if other files instantiate 00021 templates or use macros or inline functions from this file, or you compile 00022 this file and link it with other files to produce an executable, this 00023 file does not by itself cause the resulting executable to be covered by 00024 the GNU General Public License. This exception does not however 00025 invalidate any other reasons why the executable file might be covered by 00026 the GNU General Public License. 00027 */ 00028 00029 #ifndef __TBB_pipeline_H 00030 #define __TBB_pipeline_H 00031 00032 #include "atomic.h" 00033 #include "task.h" 00034 #include <cstddef> 00035 00036 namespace tbb { 00037 00038 class pipeline; 00039 class filter; 00040 00042 namespace internal { 00043 00044 // The argument for PIPELINE_VERSION should be an integer between 2 and 9 00045 #define __TBB_PIPELINE_VERSION(x) (unsigned char)(x-2)<<1 00046 00047 typedef unsigned long Token; 00048 typedef long tokendiff_t; 00049 class stage_task; 00050 class input_buffer; 00051 class pipeline_root_task; 00052 class pipeline_cleaner; 00053 00054 } // namespace internal 00055 00056 namespace interface5 { 00057 template<typename T, typename U> class filter_t; 00058 00059 namespace internal { 00060 class pipeline_proxy; 00061 } 00062 } 00063 00065 00067 00068 class filter: internal::no_copy { 00069 private: 00071 static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));} 00072 00074 static const unsigned char filter_is_serial = 0x1; 00075 00077 00079 static const unsigned char filter_is_out_of_order = 0x1<<4; 00080 00082 static const unsigned char filter_is_bound = 0x1<<5; 00083 00085 static const unsigned char exact_exception_propagation = 00086 #if TBB_USE_CAPTURED_EXCEPTION 00087 0x0; 00088 #else 00089 0x1<<7; 00090 #endif /* TBB_USE_CAPTURED_EXCEPTION */ 00091 00092 static const unsigned char current_version = __TBB_PIPELINE_VERSION(5); 00093 static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version 00094 public: 00095 enum mode { 00097 parallel = current_version | filter_is_out_of_order, 00099 serial_in_order = current_version | filter_is_serial, 00101 serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order, 00103 serial = serial_in_order 00104 }; 00105 protected: 00106 filter( bool is_serial_ ) : 00107 next_filter_in_pipeline(not_in_pipeline()), 00108 my_input_buffer(NULL), 00109 my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)), 00110 prev_filter_in_pipeline(not_in_pipeline()), 00111 my_pipeline(NULL), 00112 next_segment(NULL) 00113 {} 00114 00115 filter( mode filter_mode ) : 00116 next_filter_in_pipeline(not_in_pipeline()), 00117 my_input_buffer(NULL), 00118 my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)), 00119 prev_filter_in_pipeline(not_in_pipeline()), 00120 my_pipeline(NULL), 00121 next_segment(NULL) 00122 {} 00123 00124 public: 00126 bool is_serial() const { 00127 return bool( my_filter_mode & filter_is_serial ); 00128 } 00129 00131 bool is_ordered() const { 00132 return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial; 00133 } 00134 00136 bool is_bound() const { 00137 return ( my_filter_mode & filter_is_bound )==filter_is_bound; 00138 } 00139 00141 00142 virtual void* operator()( void* item ) = 0; 00143 00145 00146 virtual __TBB_EXPORTED_METHOD ~filter(); 00147 00148 #if __TBB_TASK_GROUP_CONTEXT 00149 00150 00152 virtual void finalize( void* /*item*/ ) {}; 00153 #endif 00154 00155 private: 00157 filter* next_filter_in_pipeline; 00158 00160 00161 internal::input_buffer* my_input_buffer; 00162 00163 friend class internal::stage_task; 00164 friend class internal::pipeline_root_task; 00165 friend class pipeline; 00166 friend class thread_bound_filter; 00167 00169 const unsigned char my_filter_mode; 00170 00172 filter* prev_filter_in_pipeline; 00173 00175 pipeline* my_pipeline; 00176 00178 00179 filter* next_segment; 00180 }; 00181 00183 00184 class thread_bound_filter: public filter { 00185 public: 00186 enum result_type { 00187 // item was processed 00188 success, 00189 // item is currently not available 00190 item_not_available, 00191 // there are no more items to process 00192 end_of_stream 00193 }; 00194 protected: 00195 thread_bound_filter(mode filter_mode): 00196 filter(static_cast<mode>(filter_mode | filter::filter_is_bound | filter::exact_exception_propagation)) 00197 {} 00198 public: 00200 00205 result_type __TBB_EXPORTED_METHOD try_process_item(); 00206 00208 00212 result_type __TBB_EXPORTED_METHOD process_item(); 00213 00214 private: 00216 result_type internal_process_item(bool is_blocking); 00217 }; 00218 00220 00221 class pipeline { 00222 public: 00224 __TBB_EXPORTED_METHOD pipeline(); 00225 00228 virtual __TBB_EXPORTED_METHOD ~pipeline(); 00229 00231 void __TBB_EXPORTED_METHOD add_filter( filter& filter_ ); 00232 00234 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens ); 00235 00236 #if __TBB_TASK_GROUP_CONTEXT 00237 00238 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context ); 00239 #endif 00240 00242 void __TBB_EXPORTED_METHOD clear(); 00243 00244 private: 00245 friend class internal::stage_task; 00246 friend class internal::pipeline_root_task; 00247 friend class filter; 00248 friend class thread_bound_filter; 00249 friend class internal::pipeline_cleaner; 00250 friend class tbb::interface5::internal::pipeline_proxy; 00251 00253 filter* filter_list; 00254 00256 filter* filter_end; 00257 00259 task* end_counter; 00260 00262 atomic<internal::Token> input_tokens; 00263 00265 atomic<internal::Token> token_counter; 00266 00268 bool end_of_input; 00269 00271 bool has_thread_bound_filters; 00272 00274 void remove_filter( filter& filter_ ); 00275 00277 void __TBB_EXPORTED_METHOD inject_token( task& self ); 00278 00279 #if __TBB_TASK_GROUP_CONTEXT 00280 00281 void clear_filters(); 00282 #endif 00283 }; 00284 00285 //------------------------------------------------------------------------ 00286 // Support for lambda-friendly parallel_pipeline interface 00287 //------------------------------------------------------------------------ 00288 00289 namespace interface5 { 00290 00291 namespace internal { 00292 template<typename T, typename U, typename Body> class concrete_filter; 00293 } 00294 00295 class flow_control { 00296 bool is_pipeline_stopped; 00297 flow_control() { is_pipeline_stopped = false; } 00298 template<typename T, typename U, typename Body> friend class internal::concrete_filter; 00299 public: 00300 void stop() { is_pipeline_stopped = true; } 00301 }; 00302 00304 namespace internal { 00305 00306 template<typename T, typename U, typename Body> 00307 class concrete_filter: public tbb::filter { 00308 Body my_body; 00309 00310 /*override*/ void* operator()(void* input) { 00311 T* temp_input = (T*)input; 00312 // Call user's operator()() here 00313 void* output = (void*) new U(my_body(*temp_input)); 00314 delete temp_input; 00315 return output; 00316 } 00317 00318 public: 00319 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {} 00320 }; 00321 00322 template<typename U, typename Body> 00323 class concrete_filter<void,U,Body>: public filter { 00324 Body my_body; 00325 00326 /*override*/void* operator()(void*) { 00327 flow_control control; 00328 U temp_output = my_body(control); 00329 void* output = control.is_pipeline_stopped ? NULL : (void*) new U(temp_output); 00330 return output; 00331 } 00332 public: 00333 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {} 00334 }; 00335 00336 template<typename T, typename Body> 00337 class concrete_filter<T,void,Body>: public filter { 00338 Body my_body; 00339 00340 /*override*/ void* operator()(void* input) { 00341 T* temp_input = (T*)input; 00342 my_body(*temp_input); 00343 delete temp_input; 00344 return NULL; 00345 } 00346 public: 00347 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {} 00348 }; 00349 00350 template<typename Body> 00351 class concrete_filter<void,void,Body>: public filter { 00352 Body my_body; 00353 00355 /*override*/ void* operator()(void*) { 00356 flow_control control; 00357 my_body(control); 00358 void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1; 00359 return output; 00360 } 00361 public: 00362 concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {} 00363 }; 00364 00366 00367 class pipeline_proxy { 00368 tbb::pipeline my_pipe; 00369 public: 00370 pipeline_proxy( const filter_t<void,void>& filter_chain ); 00371 ~pipeline_proxy() { 00372 while( filter* f = my_pipe.filter_list ) 00373 delete f; // filter destructor removes it from the pipeline 00374 } 00375 tbb::pipeline* operator->() { return &my_pipe; } 00376 }; 00377 00379 00380 class filter_node: tbb::internal::no_copy { 00382 tbb::atomic<intptr_t> ref_count; 00383 protected: 00384 filter_node() { 00385 ref_count = 0; 00386 #ifdef __TBB_TEST_FILTER_NODE_COUNT 00387 ++(__TBB_TEST_FILTER_NODE_COUNT); 00388 #endif 00389 } 00390 public: 00392 virtual void add_to( pipeline& ) = 0; 00394 void add_ref() {++ref_count;} 00396 void remove_ref() { 00397 __TBB_ASSERT(ref_count>0,"ref_count underflow"); 00398 if( --ref_count==0 ) 00399 delete this; 00400 } 00401 virtual ~filter_node() { 00402 #ifdef __TBB_TEST_FILTER_NODE_COUNT 00403 --(__TBB_TEST_FILTER_NODE_COUNT); 00404 #endif 00405 } 00406 }; 00407 00409 template<typename T, typename U, typename Body> 00410 class filter_node_leaf: public filter_node { 00411 const tbb::filter::mode mode; 00412 const Body& body; 00413 /*override*/void add_to( pipeline& p ) { 00414 concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body); 00415 p.add_filter( *f ); 00416 } 00417 public: 00418 filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {} 00419 }; 00420 00422 class filter_node_join: public filter_node { 00423 friend class filter_node; // to suppress GCC 3.2 warnings 00424 filter_node& left; 00425 filter_node& right; 00426 /*override*/~filter_node_join() { 00427 left.remove_ref(); 00428 right.remove_ref(); 00429 } 00430 /*override*/void add_to( pipeline& p ) { 00431 left.add_to(p); 00432 right.add_to(p); 00433 } 00434 public: 00435 filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) { 00436 left.add_ref(); 00437 right.add_ref(); 00438 } 00439 }; 00440 00441 } // namespace internal 00443 00444 template<typename T, typename U, typename Body> 00445 filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) { 00446 return new internal::filter_node_leaf<T,U,Body>(mode, body); 00447 } 00448 00449 template<typename T, typename V, typename U> 00450 filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) { 00451 __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'"); 00452 __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'"); 00453 return new internal::filter_node_join(*left.root,*right.root); 00454 } 00455 00457 template<typename T, typename U> 00458 class filter_t { 00459 typedef internal::filter_node filter_node; 00460 filter_node* root; 00461 filter_t( filter_node* root_ ) : root(root_) { 00462 root->add_ref(); 00463 } 00464 friend class internal::pipeline_proxy; 00465 template<typename T_, typename U_, typename Body> 00466 friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& ); 00467 template<typename T_, typename V_, typename U_> 00468 friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& ); 00469 public: 00470 filter_t() : root(NULL) {} 00471 filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) { 00472 if( root ) root->add_ref(); 00473 } 00474 template<typename Body> 00475 filter_t( tbb::filter::mode mode, const Body& body ) : 00476 root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) { 00477 root->add_ref(); 00478 } 00479 00480 void operator=( const filter_t<T,U>& rhs ) { 00481 // Order of operations below carefully chosen so that reference counts remain correct 00482 // in unlikely event that remove_ref throws exception. 00483 filter_node* old = root; 00484 root = rhs.root; 00485 if( root ) root->add_ref(); 00486 if( old ) old->remove_ref(); 00487 } 00488 ~filter_t() { 00489 if( root ) root->remove_ref(); 00490 } 00491 void clear() { 00492 // Like operator= with filter_t() on right side. 00493 if( root ) { 00494 filter_node* old = root; 00495 root = NULL; 00496 old->remove_ref(); 00497 } 00498 } 00499 }; 00500 00501 inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() { 00502 __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t" ); 00503 filter_chain.root->add_to(my_pipe); 00504 } 00505 00506 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain 00507 #if __TBB_TASK_GROUP_CONTEXT 00508 , tbb::task_group_context& context 00509 #endif 00510 ) { 00511 internal::pipeline_proxy pipe(filter_chain); 00512 // tbb::pipeline::run() is called via the proxy 00513 pipe->run(max_number_of_live_tokens 00514 #if __TBB_TASK_GROUP_CONTEXT 00515 , context 00516 #endif 00517 ); 00518 } 00519 00520 #if __TBB_TASK_GROUP_CONTEXT 00521 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) { 00522 tbb::task_group_context context; 00523 parallel_pipeline(max_number_of_live_tokens, filter_chain, context); 00524 } 00525 #endif // __TBB_TASK_GROUP_CONTEXT 00526 00527 } // interface5 00528 00529 using interface5::flow_control; 00530 using interface5::filter_t; 00531 using interface5::make_filter; 00532 using interface5::parallel_pipeline; 00533 00534 } // tbb 00535 00536 #endif /* __TBB_pipeline_H */
Copyright © 2007-2010 by The Shadowrun: Awakened Team. This work is licensed under the GNU Lesser General Public License 3.