Shadowrun: Awakened 29 September 2011 - Build 871
pipeline.h
Go to the documentation of this file.
00001 /*
00002     Copyright 2005-2010 Intel Corporation.  All Rights Reserved.
00003 
00004     This file is part of Threading Building Blocks.
00005 
00006     Threading Building Blocks is free software; you can redistribute it
00007     and/or modify it under the terms of the GNU General Public License
00008     version 2 as published by the Free Software Foundation.
00009 
00010     Threading Building Blocks is distributed in the hope that it will be
00011     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
00012     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013     GNU General Public License for more details.
00014 
00015     You should have received a copy of the GNU General Public License
00016     along with Threading Building Blocks; if not, write to the Free Software
00017     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00018 
00019     As a special exception, you may use this file as part of a free software
00020     library without restriction.  Specifically, if other files instantiate
00021     templates or use macros or inline functions from this file, or you compile
00022     this file and link it with other files to produce an executable, this
00023     file does not by itself cause the resulting executable to be covered by
00024     the GNU General Public License.  This exception does not however
00025     invalidate any other reasons why the executable file might be covered by
00026     the GNU General Public License.
00027 */
00028 
00029 #ifndef __TBB_pipeline_H 
00030 #define __TBB_pipeline_H 
00031 
00032 #include "atomic.h"
00033 #include "task.h"
00034 #include <cstddef>
00035 
00036 namespace tbb {
00037 
00038 class pipeline;
00039 class filter;
00040 
00042 namespace internal {
00043 
00044 // The argument for PIPELINE_VERSION should be an integer between 2 and 9
00045 #define __TBB_PIPELINE_VERSION(x) (unsigned char)(x-2)<<1
00046 
00047 typedef unsigned long Token;
00048 typedef long tokendiff_t;
00049 class stage_task;
00050 class input_buffer;
00051 class pipeline_root_task;
00052 class pipeline_cleaner;
00053 
00054 } // namespace internal
00055 
00056 namespace interface5 {
00057     template<typename T, typename U> class filter_t;
00058 
00059     namespace internal {
00060         class pipeline_proxy;
00061     }
00062 }
00063 
00065 
00067 
00068 class filter: internal::no_copy {
00069 private:
00071     static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));}
00072     
00074     static const unsigned char filter_is_serial = 0x1; 
00075 
00077 
00079     static const unsigned char filter_is_out_of_order = 0x1<<4;  
00080 
00082     static const unsigned char filter_is_bound = 0x1<<5;  
00083 
00085     static const unsigned char exact_exception_propagation =
00086 #if TBB_USE_CAPTURED_EXCEPTION
00087             0x0;
00088 #else
00089             0x1<<7;
00090 #endif /* TBB_USE_CAPTURED_EXCEPTION */
00091 
00092     static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
00093     static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version
00094 public:
00095     enum mode {
00097         parallel = current_version | filter_is_out_of_order, 
00099         serial_in_order = current_version | filter_is_serial,
00101         serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
00103         serial = serial_in_order
00104     };
00105 protected:
00106     filter( bool is_serial_ ) : 
00107         next_filter_in_pipeline(not_in_pipeline()),
00108         my_input_buffer(NULL),
00109         my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
00110         prev_filter_in_pipeline(not_in_pipeline()),
00111         my_pipeline(NULL),
00112         next_segment(NULL)
00113     {}
00114     
00115     filter( mode filter_mode ) :
00116         next_filter_in_pipeline(not_in_pipeline()),
00117         my_input_buffer(NULL),
00118         my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
00119         prev_filter_in_pipeline(not_in_pipeline()),
00120         my_pipeline(NULL),
00121         next_segment(NULL)
00122     {}
00123 
00124 public:
00126     bool is_serial() const {
00127         return bool( my_filter_mode & filter_is_serial );
00128     }  
00129     
00131     bool is_ordered() const {
00132         return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
00133     }
00134 
00136     bool is_bound() const {
00137         return ( my_filter_mode & filter_is_bound )==filter_is_bound;
00138     }
00139 
00141 
00142     virtual void* operator()( void* item ) = 0;
00143 
00145 
00146     virtual __TBB_EXPORTED_METHOD ~filter();
00147 
00148 #if __TBB_TASK_GROUP_CONTEXT
00149 
00150 
00152     virtual void finalize( void* /*item*/ ) {};
00153 #endif
00154 
00155 private:
00157     filter* next_filter_in_pipeline;
00158 
00160 
00161     internal::input_buffer* my_input_buffer;
00162 
00163     friend class internal::stage_task;
00164     friend class internal::pipeline_root_task;
00165     friend class pipeline;
00166     friend class thread_bound_filter;
00167 
00169     const unsigned char my_filter_mode;
00170 
00172     filter* prev_filter_in_pipeline;
00173 
00175     pipeline* my_pipeline;
00176 
00178 
00179     filter* next_segment;
00180 };
00181 
00183 
00184 class thread_bound_filter: public filter {
00185 public:
00186     enum result_type {
00187         // item was processed
00188         success,
00189         // item is currently not available
00190         item_not_available,
00191         // there are no more items to process
00192         end_of_stream
00193     };
00194 protected:
00195     thread_bound_filter(mode filter_mode): 
00196          filter(static_cast<mode>(filter_mode | filter::filter_is_bound | filter::exact_exception_propagation))
00197     {}
00198 public:
00200 
00205     result_type __TBB_EXPORTED_METHOD try_process_item(); 
00206 
00208 
00212     result_type __TBB_EXPORTED_METHOD process_item();
00213 
00214 private:
00216     result_type internal_process_item(bool is_blocking);
00217 };
00218 
00220 
00221 class pipeline {
00222 public:
00224     __TBB_EXPORTED_METHOD pipeline();
00225 
00228     virtual __TBB_EXPORTED_METHOD ~pipeline();
00229 
00231     void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
00232 
00234     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
00235 
00236 #if __TBB_TASK_GROUP_CONTEXT
00237 
00238     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
00239 #endif
00240 
00242     void __TBB_EXPORTED_METHOD clear();
00243 
00244 private:
00245     friend class internal::stage_task;
00246     friend class internal::pipeline_root_task;
00247     friend class filter;
00248     friend class thread_bound_filter;
00249     friend class internal::pipeline_cleaner;
00250     friend class tbb::interface5::internal::pipeline_proxy;
00251 
00253     filter* filter_list;
00254 
00256     filter* filter_end;
00257 
00259     task* end_counter;
00260 
00262     atomic<internal::Token> input_tokens;
00263 
00265     atomic<internal::Token> token_counter;
00266 
00268     bool end_of_input;
00269 
00271     bool has_thread_bound_filters;
00272 
00274     void remove_filter( filter& filter_ );
00275 
00277     void __TBB_EXPORTED_METHOD inject_token( task& self );
00278 
00279 #if __TBB_TASK_GROUP_CONTEXT
00280 
00281     void clear_filters();
00282 #endif
00283 };
00284 
00285 //------------------------------------------------------------------------
00286 // Support for lambda-friendly parallel_pipeline interface
00287 //------------------------------------------------------------------------
00288 
00289 namespace interface5 {
00290 
00291 namespace internal {
00292     template<typename T, typename U, typename Body> class concrete_filter;
00293 }
00294 
00295 class flow_control {
00296     bool is_pipeline_stopped;
00297     flow_control() { is_pipeline_stopped = false; }
00298     template<typename T, typename U, typename Body> friend class internal::concrete_filter;
00299 public:
00300     void stop() { is_pipeline_stopped = true; }
00301 };
00302 
00304 namespace internal {
00305 
00306 template<typename T, typename U, typename Body>
00307 class concrete_filter: public tbb::filter {
00308     Body my_body;
00309 
00310     /*override*/ void* operator()(void* input) {
00311         T* temp_input = (T*)input;
00312         // Call user's operator()() here
00313         void* output = (void*) new U(my_body(*temp_input)); 
00314         delete temp_input;
00315         return output;
00316     }
00317 
00318 public:
00319     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00320 };
00321 
00322 template<typename U, typename Body>
00323 class concrete_filter<void,U,Body>: public filter {
00324     Body my_body;
00325 
00326     /*override*/void* operator()(void*) {
00327         flow_control control;
00328         U temp_output = my_body(control);
00329         void* output = control.is_pipeline_stopped ? NULL : (void*) new U(temp_output); 
00330         return output;
00331     }
00332 public:
00333     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00334 };
00335 
00336 template<typename T, typename Body>
00337 class concrete_filter<T,void,Body>: public filter {
00338     Body my_body;
00339    
00340     /*override*/ void* operator()(void* input) {
00341         T* temp_input = (T*)input;
00342         my_body(*temp_input);
00343         delete temp_input;
00344         return NULL;
00345     }
00346 public:
00347     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00348 };
00349 
00350 template<typename Body>
00351 class concrete_filter<void,void,Body>: public filter {
00352     Body my_body;
00353     
00355     /*override*/ void* operator()(void*) {
00356         flow_control control;
00357         my_body(control);
00358         void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1; 
00359         return output;
00360     }
00361 public:
00362     concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00363 };
00364 
00366 
00367 class pipeline_proxy {
00368     tbb::pipeline my_pipe;
00369 public:
00370     pipeline_proxy( const filter_t<void,void>& filter_chain );
00371     ~pipeline_proxy() {
00372         while( filter* f = my_pipe.filter_list ) 
00373             delete f; // filter destructor removes it from the pipeline
00374     }
00375     tbb::pipeline* operator->() { return &my_pipe; }
00376 };
00377 
00379 
00380 class filter_node: tbb::internal::no_copy {
00382     tbb::atomic<intptr_t> ref_count;
00383 protected:
00384     filter_node() {
00385         ref_count = 0;
00386 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00387         ++(__TBB_TEST_FILTER_NODE_COUNT);
00388 #endif
00389     }
00390 public:
00392     virtual void add_to( pipeline& ) = 0;
00394     void add_ref() {++ref_count;}
00396     void remove_ref() {
00397         __TBB_ASSERT(ref_count>0,"ref_count underflow");
00398         if( --ref_count==0 ) 
00399             delete this;
00400     }
00401     virtual ~filter_node() {
00402 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00403         --(__TBB_TEST_FILTER_NODE_COUNT);
00404 #endif
00405     }
00406 };
00407 
00409 template<typename T, typename U, typename Body>
00410 class filter_node_leaf: public filter_node  {
00411     const tbb::filter::mode mode;
00412     const Body& body;
00413     /*override*/void add_to( pipeline& p ) {
00414         concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
00415         p.add_filter( *f );
00416     }
00417 public:
00418     filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
00419 };
00420 
00422 class filter_node_join: public filter_node {
00423     friend class filter_node; // to suppress GCC 3.2 warnings
00424     filter_node& left;
00425     filter_node& right;
00426     /*override*/~filter_node_join() {
00427        left.remove_ref();
00428        right.remove_ref();
00429     }
00430     /*override*/void add_to( pipeline& p ) {
00431         left.add_to(p);
00432         right.add_to(p);
00433     }
00434 public:
00435     filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
00436        left.add_ref();
00437        right.add_ref();
00438     }
00439 };
00440 
00441 } // namespace internal
00443 
00444 template<typename T, typename U, typename Body>
00445 filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
00446     return new internal::filter_node_leaf<T,U,Body>(mode, body);
00447 }
00448 
00449 template<typename T, typename V, typename U>
00450 filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
00451     __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
00452     __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
00453     return new internal::filter_node_join(*left.root,*right.root);
00454 }
00455 
00457 template<typename T, typename U>
00458 class filter_t {
00459     typedef internal::filter_node filter_node;
00460     filter_node* root;
00461     filter_t( filter_node* root_ ) : root(root_) {
00462         root->add_ref();
00463     }
00464     friend class internal::pipeline_proxy;
00465     template<typename T_, typename U_, typename Body>
00466     friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
00467     template<typename T_, typename V_, typename U_>
00468     friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
00469 public:
00470     filter_t() : root(NULL) {}
00471     filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
00472         if( root ) root->add_ref();
00473     }
00474     template<typename Body>
00475     filter_t( tbb::filter::mode mode, const Body& body ) :
00476         root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
00477         root->add_ref();
00478     }
00479 
00480     void operator=( const filter_t<T,U>& rhs ) {
00481         // Order of operations below carefully chosen so that reference counts remain correct
00482         // in unlikely event that remove_ref throws exception.
00483         filter_node* old = root;
00484         root = rhs.root; 
00485         if( root ) root->add_ref();
00486         if( old ) old->remove_ref();
00487     }
00488     ~filter_t() {
00489         if( root ) root->remove_ref();
00490     }
00491     void clear() {
00492         // Like operator= with filter_t() on right side.
00493         if( root ) {
00494             filter_node* old = root;
00495             root = NULL;
00496             old->remove_ref();
00497         }
00498     }
00499 };
00500 
00501 inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
00502     __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t"  );
00503     filter_chain.root->add_to(my_pipe);
00504 }
00505 
00506 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
00507 #if __TBB_TASK_GROUP_CONTEXT
00508     , tbb::task_group_context& context
00509 #endif
00510     ) {
00511     internal::pipeline_proxy pipe(filter_chain);
00512     // tbb::pipeline::run() is called via the proxy
00513     pipe->run(max_number_of_live_tokens
00514 #if __TBB_TASK_GROUP_CONTEXT
00515               , context
00516 #endif
00517     );
00518 }
00519 
00520 #if __TBB_TASK_GROUP_CONTEXT
00521 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
00522     tbb::task_group_context context;
00523     parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
00524 }
00525 #endif // __TBB_TASK_GROUP_CONTEXT
00526 
00527 } // interface5
00528 
00529 using interface5::flow_control;
00530 using interface5::filter_t;
00531 using interface5::make_filter;
00532 using interface5::parallel_pipeline;
00533 
00534 } // tbb
00535 
00536 #endif /* __TBB_pipeline_H */

Copyright © 2007-2010 by The Shadowrun: Awakened Team. This work is licensed under the GNU Lesser General Public License 3.

GNU Lesser General Public License 3 Sourceforge.net