Shadowrun: Awakened 29 September 2011 - Build 871
parallel_while.h
Go to the documentation of this file.
00001 /*
00002     Copyright 2005-2010 Intel Corporation.  All Rights Reserved.
00003 
00004     This file is part of Threading Building Blocks.
00005 
00006     Threading Building Blocks is free software; you can redistribute it
00007     and/or modify it under the terms of the GNU General Public License
00008     version 2 as published by the Free Software Foundation.
00009 
00010     Threading Building Blocks is distributed in the hope that it will be
00011     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
00012     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013     GNU General Public License for more details.
00014 
00015     You should have received a copy of the GNU General Public License
00016     along with Threading Building Blocks; if not, write to the Free Software
00017     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00018 
00019     As a special exception, you may use this file as part of a free software
00020     library without restriction.  Specifically, if other files instantiate
00021     templates or use macros or inline functions from this file, or you compile
00022     this file and link it with other files to produce an executable, this
00023     file does not by itself cause the resulting executable to be covered by
00024     the GNU General Public License.  This exception does not however
00025     invalidate any other reasons why the executable file might be covered by
00026     the GNU General Public License.
00027 */
00028 
00029 #ifndef __TBB_parallel_while
00030 #define __TBB_parallel_while
00031 
00032 #include "task.h"
00033 #include <new>
00034 
00035 namespace tbb {
00036 
00037 template<typename Body>
00038 class parallel_while;
00039 
00041 namespace internal {
00042 
00043     template<typename Stream, typename Body> class while_task;
00044 
00046 
00048     template<typename Body>
00049     class while_iteration_task: public task {
00050         const Body& my_body;
00051         typename Body::argument_type my_value;
00052         /*override*/ task* execute() {
00053             my_body(my_value); 
00054             return NULL;
00055         }
00056         while_iteration_task( const typename Body::argument_type& value, const Body& body ) : 
00057             my_body(body), my_value(value)
00058         {}
00059         template<typename Body_> friend class while_group_task;
00060         friend class tbb::parallel_while<Body>;
00061     };
00062 
00064 
00066     template<typename Body>
00067     class while_group_task: public task {
00068         static const size_t max_arg_size = 4;         
00069         const Body& my_body;
00070         size_t size;
00071         typename Body::argument_type my_arg[max_arg_size];
00072         while_group_task( const Body& body ) : my_body(body), size(0) {} 
00073         /*override*/ task* execute() {
00074             typedef while_iteration_task<Body> iteration_type;
00075             __TBB_ASSERT( size>0, NULL );
00076             task_list list;
00077             task* t; 
00078             size_t k=0; 
00079             for(;;) {
00080                 t = new( allocate_child() ) iteration_type(my_arg[k],my_body); 
00081                 if( ++k==size ) break;
00082                 list.push_back(*t);
00083             }
00084             set_ref_count(int(k+1));
00085             spawn(list);
00086             spawn_and_wait_for_all(*t);
00087             return NULL;
00088         }
00089         template<typename Stream, typename Body_> friend class while_task;
00090     };
00091     
00093 
00095     template<typename Stream, typename Body>
00096     class while_task: public task {
00097         Stream& my_stream;
00098         const Body& my_body;
00099         empty_task& my_barrier;
00100         /*override*/ task* execute() {
00101             typedef while_group_task<Body> block_type;
00102             block_type& t = *new( allocate_additional_child_of(my_barrier) ) block_type(my_body);
00103             size_t k=0; 
00104             while( my_stream.pop_if_present(t.my_arg[k]) ) {
00105                 if( ++k==block_type::max_arg_size ) {
00106                     // There might be more iterations.
00107                     recycle_to_reexecute();
00108                     break;
00109                 }
00110             }
00111             if( k==0 ) {
00112                 destroy(t);
00113                 return NULL;
00114             } else {
00115                 t.size = k;
00116                 return &t;
00117             }
00118         }
00119         while_task( Stream& stream, const Body& body, empty_task& barrier ) : 
00120             my_stream(stream),
00121             my_body(body),
00122             my_barrier(barrier)
00123         {} 
00124         friend class tbb::parallel_while<Body>;
00125     };
00126 
00127 } // namespace internal
00129 
00131 
00136 template<typename Body>
00137 class parallel_while: internal::no_copy {
00138 public:
00140     parallel_while() : my_body(NULL), my_barrier(NULL) {}
00141 
00143     ~parallel_while() {
00144         if( my_barrier ) {
00145             my_barrier->destroy(*my_barrier);    
00146             my_barrier = NULL;
00147         }
00148     }
00149 
00151     typedef typename Body::argument_type value_type;
00152 
00154 
00157     template<typename Stream>
00158     void run( Stream& stream, const Body& body );
00159 
00161 
00162     void add( const value_type& item );
00163 
00164 private:
00165     const Body* my_body;
00166     empty_task* my_barrier;
00167 };
00168 
00169 template<typename Body>
00170 template<typename Stream>
00171 void parallel_while<Body>::run( Stream& stream, const Body& body ) {
00172     using namespace internal;
00173     empty_task& barrier = *new( task::allocate_root() ) empty_task();
00174     my_body = &body;
00175     my_barrier = &barrier;
00176     my_barrier->set_ref_count(2);
00177     while_task<Stream,Body>& w = *new( my_barrier->allocate_child() ) while_task<Stream,Body>( stream, body, barrier );
00178     my_barrier->spawn_and_wait_for_all(w);
00179     my_barrier->destroy(*my_barrier);
00180     my_barrier = NULL;
00181     my_body = NULL;
00182 }
00183 
00184 template<typename Body>
00185 void parallel_while<Body>::add( const value_type& item ) {
00186     __TBB_ASSERT(my_barrier,"attempt to add to parallel_while that is not running");
00187     typedef internal::while_iteration_task<Body> iteration_type;
00188     iteration_type& i = *new( task::allocate_additional_child_of(*my_barrier) ) iteration_type(item,*my_body);
00189     task::self().spawn( i );
00190 }
00191 
00192 } // namespace 
00193 
00194 #endif /* __TBB_parallel_while */

Copyright © 2007-2010 by The Shadowrun: Awakened Team. This work is licensed under the GNU Lesser General Public License 3.

GNU Lesser General Public License 3 Sourceforge.net