![]() |
Shadowrun: Awakened 29 September 2011 - Build 871
|
00001 /* 00002 Copyright 2005-2010 Intel Corporation. All Rights Reserved. 00003 00004 This file is part of Threading Building Blocks. 00005 00006 Threading Building Blocks is free software; you can redistribute it 00007 and/or modify it under the terms of the GNU General Public License 00008 version 2 as published by the Free Software Foundation. 00009 00010 Threading Building Blocks is distributed in the hope that it will be 00011 useful, but WITHOUT ANY WARRANTY; without even the implied warranty 00012 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00013 GNU General Public License for more details. 00014 00015 You should have received a copy of the GNU General Public License 00016 along with Threading Building Blocks; if not, write to the Free Software 00017 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 00018 00019 As a special exception, you may use this file as part of a free software 00020 library without restriction. Specifically, if other files instantiate 00021 templates or use macros or inline functions from this file, or you compile 00022 this file and link it with other files to produce an executable, this 00023 file does not by itself cause the resulting executable to be covered by 00024 the GNU General Public License. This exception does not however 00025 invalidate any other reasons why the executable file might be covered by 00026 the GNU General Public License. 00027 */ 00028 00029 /* Example program that computes Fibonacci numbers in different ways. 00030 Arguments are: [ Number [Threads [Repeats]]] 00031 The defaults are Number=500 Threads=1:4 Repeats=1. 00032 00033 The point of this program is to check that the library is working properly. 00034 Most of the computations are deliberately silly and not expected to 00035 show any speedup on multiprocessors. 00036 */ 00037 00038 // enable assertions 00039 #ifdef NDEBUG 00040 #undef NDEBUG 00041 #endif 00042 00043 #include <cstdio> 00044 #include <cstdlib> 00045 #include <cassert> 00046 #include <utility> 00047 #include "tbb/task.h" 00048 #include "tbb/task_scheduler_init.h" 00049 #include "tbb/tick_count.h" 00050 #include "tbb/blocked_range.h" 00051 #include "tbb/concurrent_vector.h" 00052 #include "tbb/concurrent_queue.h" 00053 #include "tbb/concurrent_hash_map.h" 00054 #include "tbb/parallel_while.h" 00055 #include "tbb/parallel_for.h" 00056 #include "tbb/parallel_reduce.h" 00057 #include "tbb/parallel_scan.h" 00058 #include "tbb/pipeline.h" 00059 #include "tbb/atomic.h" 00060 #include "tbb/mutex.h" 00061 #include "tbb/spin_mutex.h" 00062 #include "tbb/queuing_mutex.h" 00063 #include "tbb/tbb_thread.h" 00064 00065 using namespace std; 00066 using namespace tbb; 00067 00069 typedef long long value; 00070 00072 struct Matrix2x2 00073 { 00075 value v[2][2]; 00076 Matrix2x2() {} 00077 Matrix2x2(value v00, value v01, value v10, value v11) { 00078 v[0][0] = v00; v[0][1] = v01; v[1][0] = v10; v[1][1] = v11; 00079 } 00080 Matrix2x2 operator * (const Matrix2x2 &to) const; //< Multiply two Matrices 00081 }; 00083 static const Matrix2x2 Matrix1110(1, 1, 1, 0); 00085 void Matrix2x2Multiply(const value a[2][2], const value b[2][2], value c[2][2]); 00086 00088 00090 value SerialFib(int n) 00091 { 00092 if(n < 2) 00093 return n; 00094 value a = 0, b = 1, sum; int i; 00095 for( i = 2; i <= n; i++ ) 00096 { // n is really index of Fibonacci number 00097 sum = a + b; a = b; b = sum; 00098 } 00099 return sum; 00100 } 00102 value SerialMatrixFib(int n) 00103 { 00104 value c[2][2], a[2][2] = {{1, 1}, {1, 0}}, b[2][2] = {{1, 1}, {1, 0}}; int i; 00105 for(i = 2; i < n; i++) 00106 { // Using condition to prevent copying of values 00107 if(i & 1) Matrix2x2Multiply(a, c, b); 00108 else Matrix2x2Multiply(a, b, c); 00109 } 00110 return (i & 1) ? c[0][0] : b[0][0]; // get result from upper left cell 00111 } 00113 value SerialRecursiveFib(int n) 00114 { 00115 value result; 00116 if(n < 2) 00117 result = n; 00118 else 00119 result = SerialRecursiveFib(n - 1) + SerialRecursiveFib(n - 2); 00120 return result; 00121 } 00123 value SerialQueueFib(int n) 00124 { 00125 concurrent_queue<Matrix2x2> Q; 00126 for(int i = 1; i < n; i++) 00127 Q.push(Matrix1110); 00128 Matrix2x2 A, B; 00129 while(true) { 00130 while( !Q.try_pop(A) ) this_tbb_thread::yield(); 00131 if(Q.empty()) break; 00132 while( !Q.try_pop(B) ) this_tbb_thread::yield(); 00133 Q.push(A * B); 00134 } 00135 return A.v[0][0]; 00136 } 00138 value SerialVectorFib(int n) 00139 { 00140 concurrent_vector<value> A; 00141 A.grow_by(2); 00142 A[0] = 0; A[1] = 1; 00143 for( int i = 2; i <= n; i++) 00144 { 00145 A.grow_to_at_least(i+1); 00146 A[i] = A[i-1] + A[i-2]; 00147 } 00148 return A[n]; 00149 } 00150 00152 00153 // *** Serial shared by mutexes *** // 00154 00156 value SharedA = 0, SharedB = 1; int SharedI = 1, SharedN; 00157 00159 template<typename M> 00160 class SharedSerialFibBody { 00161 M &mutex; 00162 public: 00163 SharedSerialFibBody( M &m ) : mutex( m ) {} 00165 void operator()( const blocked_range<int>& range ) const { 00166 for(;;) { 00167 typename M::scoped_lock lock( mutex ); 00168 if(SharedI >= SharedN) break; 00169 value sum = SharedA + SharedB; 00170 SharedA = SharedB; SharedB = sum; 00171 ++SharedI; 00172 } 00173 } 00174 }; 00175 00177 template<class M> 00178 value SharedSerialFib(int n) 00179 { 00180 SharedA = 0; SharedB = 1; SharedI = 1; SharedN = n; M mutex; 00181 parallel_for( blocked_range<int>(0,4,1), SharedSerialFibBody<M>( mutex ) ); 00182 return SharedB; 00183 } 00184 00185 // *** Serial shared by concurrent hash map *** // 00186 00188 struct IntHashCompare { 00189 bool equal( const int j, const int k ) const { return j == k; } 00190 unsigned long hash( const int k ) const { return (unsigned long)k; } 00191 }; 00193 typedef concurrent_hash_map<int, value, IntHashCompare> NumbersTable; 00195 class ConcurrentHashSerialFibTask: public task { 00196 NumbersTable &Fib; 00197 int my_n; 00198 public: 00200 ConcurrentHashSerialFibTask( NumbersTable &cht, int n ) : Fib(cht), my_n(n) { } 00202 /*override*/ task* execute() 00203 { 00204 for( int i = 2; i <= my_n; ++i ) { // there is no difference in to recycle or to make loop 00205 NumbersTable::const_accessor f1, f2; // same as iterators 00206 if( !Fib.find(f1, i-1) || !Fib.find(f2, i-2) ) { 00207 // Something is seriously wrong, because i-1 and i-2 must have been inserted 00208 // earlier by this thread or another thread. 00209 assert(0); 00210 } 00211 value sum = f1->second + f2->second; 00212 NumbersTable::const_accessor fsum; 00213 Fib.insert(fsum, make_pair(i, sum)); // inserting 00214 assert( fsum->second == sum ); // check value 00215 } 00216 return 0; 00217 } 00218 }; 00219 00221 value ConcurrentHashSerialFib(int n) 00222 { 00223 NumbersTable Fib; 00224 bool okay; 00225 okay = Fib.insert( make_pair(0, 0) ); assert(okay); // assign initial values 00226 okay = Fib.insert( make_pair(1, 1) ); assert(okay); 00227 00228 task_list list; 00229 // allocate tasks 00230 list.push_back(*new(task::allocate_root()) ConcurrentHashSerialFibTask(Fib, n)); 00231 list.push_back(*new(task::allocate_root()) ConcurrentHashSerialFibTask(Fib, n)); 00232 task::spawn_root_and_wait(list); 00233 NumbersTable::const_accessor fresult; 00234 okay = Fib.find( fresult, n ); 00235 assert(okay); 00236 return fresult->second; 00237 } 00238 00239 // *** Queue with parallel_for and parallel_while *** // 00240 00242 struct QueueStream { 00243 volatile bool producer_is_done; 00244 concurrent_queue<Matrix2x2> Queue; 00246 bool pop_if_present( pair<Matrix2x2, Matrix2x2> &mm ) { 00247 // get first matrix if present 00248 if(!Queue.try_pop(mm.first)) return false; 00249 // get second matrix if present 00250 if(!Queue.try_pop(mm.second)) { 00251 // if not, then push back first matrix 00252 Queue.push(mm.first); return false; 00253 } 00254 return true; 00255 } 00256 }; 00257 00259 struct parallel_forFibBody { 00260 QueueStream &my_stream; 00262 parallel_forFibBody(QueueStream &s) : my_stream(s) { } 00264 void operator()( const blocked_range<int> &range ) const { 00265 int i_end = range.end(); 00266 for( int i = range.begin(); i != i_end; ++i ) { 00267 my_stream.Queue.push( Matrix1110 ); // push initial matrix 00268 } 00269 } 00270 }; 00272 class parallel_whileFibBody 00273 { 00274 QueueStream &my_stream; 00275 parallel_while<parallel_whileFibBody> &my_while; 00276 public: 00277 typedef pair<Matrix2x2, Matrix2x2> argument_type; 00279 parallel_whileFibBody(parallel_while<parallel_whileFibBody> &w, QueueStream &s) 00280 : my_while(w), my_stream(s) { } 00282 void operator() (argument_type mm) const { 00283 mm.first = mm.first * mm.second; 00284 // note: it can run concurrently with QueueStream::pop_if_present() 00285 if(my_stream.Queue.try_pop(mm.second)) 00286 my_while.add( mm ); // now, two matrices available. Add next iteration. 00287 else my_stream.Queue.push( mm.first ); // or push back calculated value if queue is empty 00288 } 00289 }; 00290 00292 struct QueueInsertTask: public task { 00293 QueueStream &my_stream; 00294 int my_n; 00296 QueueInsertTask( int n, QueueStream &s ) : my_n(n), my_stream(s) { } 00298 /*override*/ task* execute() { 00299 // Execute of parallel pushing of n-1 initial matrices 00300 parallel_for( blocked_range<int>( 1, my_n, 10 ), parallel_forFibBody(my_stream) ); 00301 my_stream.producer_is_done = true; 00302 return 0; 00303 } 00304 }; 00306 struct QueueProcessTask: public task { 00307 QueueStream &my_stream; 00309 QueueProcessTask( QueueStream &s ) : my_stream(s) { } 00311 /*override*/ task* execute() { 00312 while( !my_stream.producer_is_done || my_stream.Queue.unsafe_size()>1 ) { 00313 parallel_while<parallel_whileFibBody> w; // run while loop in parallel 00314 w.run( my_stream, parallel_whileFibBody( w, my_stream ) ); 00315 } 00316 return 0; 00317 } 00318 }; 00320 value ParallelQueueFib(int n) 00321 { 00322 QueueStream stream; 00323 stream.producer_is_done = false; 00324 task_list list; 00325 list.push_back(*new(task::allocate_root()) QueueInsertTask( n, stream )); 00326 list.push_back(*new(task::allocate_root()) QueueProcessTask( stream )); 00327 // If there is only a single thread, the first task in the list runs to completion 00328 // before the second task in the list starts. 00329 task::spawn_root_and_wait(list); 00330 assert(stream.Queue.unsafe_size() == 1); // it is easy to lose some work 00331 Matrix2x2 M; 00332 bool result = stream.Queue.try_pop( M ); // get last matrix 00333 assert( result ); 00334 return M.v[0][0]; // and result number 00335 } 00336 00337 // *** Queue with pipeline *** // 00338 00340 class InputFilter: public filter { 00341 atomic<int> N; //< index of Fibonacci number minus 1 00342 public: 00343 concurrent_queue<Matrix2x2> Queue; 00345 InputFilter( int n ) : filter(false /*is not serial*/) { N = n; } 00347 /*override*/ void* operator()(void*) 00348 { 00349 int n = --N; 00350 if(n <= 0) return 0; 00351 Queue.push( Matrix1110 ); 00352 return &Queue; 00353 } 00354 }; 00356 class MultiplyFilter: public filter { 00357 public: 00358 MultiplyFilter( ) : filter(false /*is not serial*/) { } 00360 /*override*/ void* operator()(void*p) 00361 { 00362 concurrent_queue<Matrix2x2> &Queue = *static_cast<concurrent_queue<Matrix2x2> *>(p); 00363 Matrix2x2 m1, m2; 00364 // get two elements 00365 while( !Queue.try_pop( m1 ) ) this_tbb_thread::yield(); 00366 while( !Queue.try_pop( m2 ) ) this_tbb_thread::yield(); 00367 m1 = m1 * m2; // process them 00368 Queue.push( m1 ); // and push back 00369 return this; // just nothing 00370 } 00371 }; 00373 value ParallelPipeFib(int n) 00374 { 00375 InputFilter input( n-1 ); 00376 MultiplyFilter process; 00377 // Create the pipeline 00378 pipeline pipeline; 00379 // add filters 00380 pipeline.add_filter( input ); // first 00381 pipeline.add_filter( process ); // second 00382 00383 input.Queue.push( Matrix1110 ); 00384 // Run the pipeline 00385 pipeline.run( n ); // must be larger then max threads number 00386 pipeline.clear(); // do not forget clear the pipeline 00387 00388 assert( input.Queue.unsafe_size()==1 ); 00389 Matrix2x2 M; 00390 bool result = input.Queue.try_pop( M ); // get last element 00391 assert( result ); 00392 return M.v[0][0]; // get value 00393 } 00394 00395 // *** parallel_reduce *** // 00396 00398 struct parallel_reduceFibBody { 00399 Matrix2x2 sum; 00400 int splitted; //< flag to make one less operation for splitted bodies 00402 parallel_reduceFibBody() : sum( Matrix1110 ), splitted(0) { } 00404 parallel_reduceFibBody( parallel_reduceFibBody& other, split ) : sum( Matrix1110 ), splitted(1/*note that it is splitted*/) {} 00406 void join( parallel_reduceFibBody &s ) { 00407 sum = sum * s.sum; 00408 } 00410 void operator()( const blocked_range<int> &r ) { 00411 for( int k = r.begin() + splitted; k < r.end(); ++k ) 00412 sum = sum * Matrix1110; 00413 splitted = 0; // reset flag, because this method can be reused for next range 00414 } 00415 }; 00417 value parallel_reduceFib(int n) 00418 { 00419 parallel_reduceFibBody b; 00420 parallel_reduce(blocked_range<int>(2, n, 3), b); // do parallel reduce on range [2, n) for b 00421 return b.sum.v[0][0]; 00422 } 00423 00424 // *** parallel_scan *** // 00425 00427 struct parallel_scanFibBody { 00428 Matrix2x2 sum; 00429 int first; // flag to make one less operation for first range 00431 parallel_scanFibBody() : sum( Matrix1110 ), first(1) {} 00433 parallel_scanFibBody( parallel_scanFibBody &b, split) : sum( Matrix1110 ), first(1) {} 00435 void reverse_join( parallel_scanFibBody &a ) { 00436 sum = sum * a.sum; 00437 } 00439 void assign( parallel_scanFibBody &b ) { 00440 sum = b.sum; 00441 } 00443 template<typename T> 00444 void operator()( const blocked_range<int> &r, T) { 00445 // see tag.is_final_scan() for what tag is used 00446 for( int k = r.begin() + first; k < r.end(); ++k ) 00447 sum = sum * Matrix1110; 00448 first = 0; // reset flag, because this method can be reused for next range 00449 } 00450 }; 00452 value parallel_scanFib(int n) 00453 { 00454 parallel_scanFibBody b; 00455 parallel_scan(blocked_range<int>(1/*one less, because body skip first*/, n, 3), b); 00456 return b.sum.v[0][0]; 00457 } 00458 00459 // *** Raw tasks *** // 00460 00462 struct FibTask: public task { 00463 const int n; 00464 value& sum; 00465 value x, y; 00466 bool second_phase; //< flag of continuation 00467 // task arguments 00468 FibTask( int n_, value& sum_ ) : 00469 n(n_), sum(sum_), second_phase(false) 00470 {} 00472 /*override*/ task* execute() { 00473 // Using Lucas' formula here 00474 if( second_phase ) { // children finished 00475 sum = n&1 ? x*x + y*y : x*x - y*y; 00476 return NULL; 00477 } 00478 if( n <= 2 ) { 00479 sum = n!=0; 00480 return NULL; 00481 } else { 00482 recycle_as_continuation(); // repeat this task when children finish 00483 second_phase = true; // mark second phase 00484 FibTask& a = *new( allocate_child() ) FibTask( n/2 + 1, x ); 00485 FibTask& b = *new( allocate_child() ) FibTask( n/2 - 1 + (n&1), y ); 00486 set_ref_count(2); 00487 spawn( a ); 00488 return &b; 00489 } 00490 } 00491 }; 00493 value ParallelTaskFib(int n) { 00494 value sum; 00495 FibTask& a = *new(task::allocate_root()) FibTask(n, sum); 00496 task::spawn_root_and_wait(a); 00497 return sum; 00498 } 00499 00501 00503 struct IntRange { 00504 int low; 00505 int high; 00506 void set_from_string( const char* s ); 00507 IntRange( int low_, int high_ ) : low(low_), high(high_) {} 00508 }; 00509 00510 void IntRange::set_from_string( const char* s ) { 00511 char* end; 00512 high = low = strtol(s,&end,0); 00513 switch( *end ) { 00514 case ':': 00515 high = strtol(end+1,0,0); 00516 break; 00517 case '\0': 00518 break; 00519 default: 00520 printf("unexpected character = %c\n",*end); 00521 } 00522 } 00523 00525 static tick_count t0; 00526 00528 static bool Verbose = false; 00529 00530 typedef value (*MeasureFunc)(int); 00532 value Measure(const char *name, MeasureFunc func, int n) 00533 { 00534 value result; 00535 if(Verbose) printf("%s",name); 00536 t0 = tick_count::now(); 00537 for(int number = 2; number <= n; number++) 00538 result = func(number); 00539 if(Verbose) printf("\t- in %f msec\n", (tick_count::now() - t0).seconds()*1000); 00540 return result; 00541 } 00542 00544 int main(int argc, char* argv[]) 00545 { 00546 if(argc>1) Verbose = true; 00547 int NumbersCount = argc>1 ? strtol(argv[1],0,0) : 500; 00548 IntRange NThread(1,4);// Number of threads to use. 00549 if(argc>2) NThread.set_from_string(argv[2]); 00550 unsigned long ntrial = argc>3? (unsigned long)strtoul(argv[3],0,0) : 1; 00551 value result, sum; 00552 00553 if(Verbose) printf("Fibonacci numbers example. Generating %d numbers..\n", NumbersCount); 00554 00555 result = Measure("Serial loop", SerialFib, NumbersCount); 00556 sum = Measure("Serial matrix", SerialMatrixFib, NumbersCount); assert(result == sum); 00557 sum = Measure("Serial vector", SerialVectorFib, NumbersCount); assert(result == sum); 00558 sum = Measure("Serial queue", SerialQueueFib, NumbersCount); assert(result == sum); 00559 // now in parallel 00560 for( unsigned long i=0; i<ntrial; ++i ) { 00561 for(int threads = NThread.low; threads <= NThread.high; threads *= 2) 00562 { 00563 task_scheduler_init scheduler_init(threads); 00564 if(Verbose) printf("\nThreads number is %d\n", threads); 00565 00566 sum = Measure("Shared serial (mutex)\t", SharedSerialFib<mutex>, NumbersCount); assert(result == sum); 00567 sum = Measure("Shared serial (spin_mutex)", SharedSerialFib<spin_mutex>, NumbersCount); assert(result == sum); 00568 sum = Measure("Shared serial (queuing_mutex)", SharedSerialFib<queuing_mutex>, NumbersCount); assert(result == sum); 00569 sum = Measure("Shared serial (Conc.HashTable)", ConcurrentHashSerialFib, NumbersCount); assert(result == sum); 00570 sum = Measure("Parallel while+for/queue", ParallelQueueFib, NumbersCount); assert(result == sum); 00571 sum = Measure("Parallel pipe/queue\t", ParallelPipeFib, NumbersCount); assert(result == sum); 00572 sum = Measure("Parallel reduce\t\t", parallel_reduceFib, NumbersCount); assert(result == sum); 00573 sum = Measure("Parallel scan\t\t", parallel_scanFib, NumbersCount); assert(result == sum); 00574 sum = Measure("Parallel tasks\t\t", ParallelTaskFib, NumbersCount); assert(result == sum); 00575 } 00576 00577 #ifdef __GNUC__ 00578 if(Verbose) printf("Fibonacci number #%d modulo 2^64 is %lld\n\n", NumbersCount, result); 00579 #else 00580 if(Verbose) printf("Fibonacci number #%d modulo 2^64 is %I64d\n\n", NumbersCount, result); 00581 #endif 00582 } 00583 if(!Verbose) printf("TEST PASSED\n"); 00584 return 0; 00585 } 00586 00587 // Utils 00588 00589 void Matrix2x2Multiply(const value a[2][2], const value b[2][2], value c[2][2]) 00590 { 00591 for( int i = 0; i <= 1; i++) 00592 for( int j = 0; j <= 1; j++) 00593 c[i][j] = a[i][0]*b[0][j] + a[i][1]*b[1][j]; 00594 } 00595 00596 Matrix2x2 Matrix2x2::operator *(const Matrix2x2 &to) const 00597 { 00598 Matrix2x2 result; 00599 Matrix2x2Multiply(v, to.v, result.v); 00600 return result; 00601 }
Copyright © 2007-2010 by The Shadowrun: Awakened Team. This work is licensed under the GNU Lesser General Public License 3.