Shadowrun: Awakened 29 September 2011 - Build 871
Fibonacci.cpp
Go to the documentation of this file.
00001 /*
00002     Copyright 2005-2010 Intel Corporation.  All Rights Reserved.
00003 
00004     This file is part of Threading Building Blocks.
00005 
00006     Threading Building Blocks is free software; you can redistribute it
00007     and/or modify it under the terms of the GNU General Public License
00008     version 2 as published by the Free Software Foundation.
00009 
00010     Threading Building Blocks is distributed in the hope that it will be
00011     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
00012     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013     GNU General Public License for more details.
00014 
00015     You should have received a copy of the GNU General Public License
00016     along with Threading Building Blocks; if not, write to the Free Software
00017     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00018 
00019     As a special exception, you may use this file as part of a free software
00020     library without restriction.  Specifically, if other files instantiate
00021     templates or use macros or inline functions from this file, or you compile
00022     this file and link it with other files to produce an executable, this
00023     file does not by itself cause the resulting executable to be covered by
00024     the GNU General Public License.  This exception does not however
00025     invalidate any other reasons why the executable file might be covered by
00026     the GNU General Public License.
00027 */
00028 
00029 /* Example program that computes Fibonacci numbers in different ways.
00030    Arguments are: [ Number [Threads [Repeats]]]
00031    The defaults are Number=500 Threads=1:4 Repeats=1.
00032 
00033    The point of this program is to check that the library is working properly.
00034    Most of the computations are deliberately silly and not expected to
00035    show any speedup on multiprocessors.
00036 */
00037 
00038 // enable assertions
00039 #ifdef NDEBUG
00040 #undef NDEBUG
00041 #endif
00042 
00043 #include <cstdio>
00044 #include <cstdlib>
00045 #include <cassert>
00046 #include <utility>
00047 #include "tbb/task.h"
00048 #include "tbb/task_scheduler_init.h"
00049 #include "tbb/tick_count.h"
00050 #include "tbb/blocked_range.h"
00051 #include "tbb/concurrent_vector.h"
00052 #include "tbb/concurrent_queue.h"
00053 #include "tbb/concurrent_hash_map.h"
00054 #include "tbb/parallel_while.h"
00055 #include "tbb/parallel_for.h"
00056 #include "tbb/parallel_reduce.h"
00057 #include "tbb/parallel_scan.h"
00058 #include "tbb/pipeline.h"
00059 #include "tbb/atomic.h"
00060 #include "tbb/mutex.h"
00061 #include "tbb/spin_mutex.h"
00062 #include "tbb/queuing_mutex.h"
00063 #include "tbb/tbb_thread.h"
00064 
00065 using namespace std;
00066 using namespace tbb;
00067 
00069 typedef long long value;
00070 
00072 struct Matrix2x2
00073 {
00075     value v[2][2];
00076     Matrix2x2() {}
00077     Matrix2x2(value v00, value v01, value v10, value v11) {
00078         v[0][0] = v00; v[0][1] = v01; v[1][0] = v10; v[1][1] = v11;
00079     }
00080     Matrix2x2 operator * (const Matrix2x2 &to) const; //< Multiply two Matrices
00081 };
00083 static const Matrix2x2 Matrix1110(1, 1, 1, 0);
00085 void Matrix2x2Multiply(const value a[2][2], const value b[2][2], value c[2][2]);
00086 
00088 
00090 value SerialFib(int n)
00091 {
00092     if(n < 2)
00093         return n;
00094     value a = 0, b = 1, sum; int i;
00095     for( i = 2; i <= n; i++ )
00096     {   // n is really index of Fibonacci number
00097         sum = a + b; a = b; b = sum;
00098     }
00099     return sum;
00100 }
00102 value SerialMatrixFib(int n)
00103 {
00104     value c[2][2], a[2][2] = {{1, 1}, {1, 0}}, b[2][2] = {{1, 1}, {1, 0}}; int i;
00105     for(i = 2; i < n; i++)
00106     {   // Using condition to prevent copying of values
00107         if(i & 1) Matrix2x2Multiply(a, c, b);
00108         else      Matrix2x2Multiply(a, b, c);
00109     }
00110     return (i & 1) ? c[0][0] : b[0][0]; // get result from upper left cell
00111 }
00113 value SerialRecursiveFib(int n)
00114 {
00115     value result;
00116     if(n < 2)
00117         result = n;
00118     else
00119         result = SerialRecursiveFib(n - 1) + SerialRecursiveFib(n - 2);
00120     return result;
00121 }
00123 value SerialQueueFib(int n)
00124 {
00125     concurrent_queue<Matrix2x2> Q;
00126     for(int i = 1; i < n; i++)
00127         Q.push(Matrix1110);
00128     Matrix2x2 A, B;
00129     while(true) {
00130         while( !Q.try_pop(A) ) this_tbb_thread::yield();
00131         if(Q.empty()) break;
00132         while( !Q.try_pop(B) ) this_tbb_thread::yield();
00133         Q.push(A * B);
00134     }
00135     return A.v[0][0];
00136 }
00138 value SerialVectorFib(int n)
00139 {
00140     concurrent_vector<value> A;
00141     A.grow_by(2);
00142     A[0] = 0; A[1] = 1;
00143     for( int i = 2; i <= n; i++)
00144     {
00145         A.grow_to_at_least(i+1);
00146         A[i] = A[i-1] + A[i-2];
00147     }
00148     return A[n];
00149 }
00150 
00152 
00153 // *** Serial shared by mutexes *** //
00154 
00156 value SharedA = 0, SharedB = 1; int SharedI = 1, SharedN;
00157 
00159 template<typename M>
00160 class SharedSerialFibBody {
00161     M &mutex;
00162 public:
00163     SharedSerialFibBody( M &m ) : mutex( m ) {}
00165     void operator()( const blocked_range<int>& range ) const {
00166         for(;;) {
00167             typename M::scoped_lock lock( mutex );
00168             if(SharedI >= SharedN) break;
00169             value sum = SharedA + SharedB;
00170             SharedA = SharedB; SharedB = sum;
00171             ++SharedI;
00172         }
00173     }
00174 };
00175 
00177 template<class M>
00178 value SharedSerialFib(int n)
00179 {
00180     SharedA = 0; SharedB = 1; SharedI = 1; SharedN = n; M mutex;
00181     parallel_for( blocked_range<int>(0,4,1), SharedSerialFibBody<M>( mutex ) );
00182     return SharedB;
00183 }
00184 
00185 // *** Serial shared by concurrent hash map *** //
00186 
00188 struct IntHashCompare {
00189     bool equal( const int j, const int k ) const { return j == k; }
00190     unsigned long hash( const int k ) const { return (unsigned long)k; }   
00191 };
00193 typedef concurrent_hash_map<int, value, IntHashCompare> NumbersTable;
00195 class ConcurrentHashSerialFibTask: public task {
00196     NumbersTable &Fib;
00197     int my_n;
00198 public:
00200     ConcurrentHashSerialFibTask( NumbersTable &cht, int n ) : Fib(cht), my_n(n) { }
00202     /*override*/ task* execute()
00203     {
00204         for( int i = 2; i <= my_n; ++i ) { // there is no difference in to recycle or to make loop
00205             NumbersTable::const_accessor f1, f2; // same as iterators
00206             if( !Fib.find(f1, i-1) || !Fib.find(f2, i-2) ) {
00207                 // Something is seriously wrong, because i-1 and i-2 must have been inserted 
00208                 // earlier by this thread or another thread.
00209                 assert(0);
00210             }
00211             value sum = f1->second + f2->second;
00212             NumbersTable::const_accessor fsum;
00213             Fib.insert(fsum, make_pair(i, sum)); // inserting
00214             assert( fsum->second == sum ); // check value
00215         }
00216         return 0;
00217     }
00218 };
00219 
00221 value ConcurrentHashSerialFib(int n)
00222 {
00223     NumbersTable Fib; 
00224     bool okay;
00225     okay = Fib.insert( make_pair(0, 0) ); assert(okay); // assign initial values
00226     okay = Fib.insert( make_pair(1, 1) ); assert(okay);
00227 
00228     task_list list;
00229     // allocate tasks
00230     list.push_back(*new(task::allocate_root()) ConcurrentHashSerialFibTask(Fib, n));
00231     list.push_back(*new(task::allocate_root()) ConcurrentHashSerialFibTask(Fib, n));
00232     task::spawn_root_and_wait(list);
00233     NumbersTable::const_accessor fresult;
00234     okay = Fib.find( fresult, n );
00235     assert(okay);
00236     return fresult->second;
00237 }
00238 
00239 // *** Queue with parallel_for and parallel_while *** //
00240 
00242 struct QueueStream {
00243     volatile bool producer_is_done;
00244     concurrent_queue<Matrix2x2> Queue;
00246     bool pop_if_present( pair<Matrix2x2, Matrix2x2> &mm ) {
00247         // get first matrix if present
00248         if(!Queue.try_pop(mm.first)) return false;
00249         // get second matrix if present
00250         if(!Queue.try_pop(mm.second)) {
00251             // if not, then push back first matrix
00252             Queue.push(mm.first); return false;
00253         }
00254         return true;
00255     }
00256 };
00257 
00259 struct parallel_forFibBody { 
00260     QueueStream &my_stream;
00262     parallel_forFibBody(QueueStream &s) : my_stream(s) { }
00264     void operator()( const blocked_range<int> &range ) const {
00265         int i_end = range.end();
00266         for( int i = range.begin(); i != i_end; ++i ) {
00267             my_stream.Queue.push( Matrix1110 ); // push initial matrix
00268         }
00269     }
00270 };
00272 class parallel_whileFibBody
00273 {
00274     QueueStream &my_stream;
00275     parallel_while<parallel_whileFibBody> &my_while;
00276 public:
00277     typedef pair<Matrix2x2, Matrix2x2> argument_type;
00279     parallel_whileFibBody(parallel_while<parallel_whileFibBody> &w, QueueStream &s)
00280         : my_while(w), my_stream(s) { }
00282     void operator() (argument_type mm) const {
00283         mm.first = mm.first * mm.second;
00284         // note: it can run concurrently with QueueStream::pop_if_present()
00285         if(my_stream.Queue.try_pop(mm.second))
00286              my_while.add( mm ); // now, two matrices available. Add next iteration.
00287         else my_stream.Queue.push( mm.first ); // or push back calculated value if queue is empty
00288     }
00289 };
00290 
00292 struct QueueInsertTask: public task {
00293     QueueStream &my_stream;
00294     int my_n;
00296     QueueInsertTask( int n, QueueStream &s ) : my_n(n), my_stream(s) { }
00298     /*override*/ task* execute() {
00299         // Execute of parallel pushing of n-1 initial matrices
00300         parallel_for( blocked_range<int>( 1, my_n, 10 ), parallel_forFibBody(my_stream) ); 
00301         my_stream.producer_is_done = true; 
00302         return 0;
00303     }
00304 };
00306 struct QueueProcessTask: public task {
00307     QueueStream &my_stream;
00309     QueueProcessTask( QueueStream &s ) : my_stream(s) { }
00311     /*override*/ task* execute() {
00312         while( !my_stream.producer_is_done || my_stream.Queue.unsafe_size()>1 ) {
00313             parallel_while<parallel_whileFibBody> w; // run while loop in parallel
00314             w.run( my_stream, parallel_whileFibBody( w, my_stream ) );
00315         }
00316         return 0;
00317     }
00318 };
00320 value ParallelQueueFib(int n)
00321 {
00322     QueueStream stream;
00323     stream.producer_is_done = false;
00324     task_list list;
00325     list.push_back(*new(task::allocate_root()) QueueInsertTask( n, stream ));
00326     list.push_back(*new(task::allocate_root()) QueueProcessTask( stream ));
00327     // If there is only a single thread, the first task in the list runs to completion
00328     // before the second task in the list starts.
00329     task::spawn_root_and_wait(list);
00330     assert(stream.Queue.unsafe_size() == 1); // it is easy to lose some work
00331     Matrix2x2 M; 
00332     bool result = stream.Queue.try_pop( M ); // get last matrix
00333     assert( result );
00334     return M.v[0][0]; // and result number
00335 }
00336 
00337 // *** Queue with pipeline *** //
00338 
00340 class InputFilter: public filter {
00341     atomic<int> N; //< index of Fibonacci number minus 1
00342 public:
00343     concurrent_queue<Matrix2x2> Queue;
00345     InputFilter( int n ) : filter(false /*is not serial*/) { N = n; }
00347     /*override*/ void* operator()(void*)
00348     {
00349         int n = --N;
00350         if(n <= 0) return 0;
00351         Queue.push( Matrix1110 );
00352         return &Queue;
00353     }
00354 };
00356 class MultiplyFilter: public filter {
00357 public:
00358     MultiplyFilter( ) : filter(false /*is not serial*/) { }
00360     /*override*/ void* operator()(void*p)
00361     {
00362         concurrent_queue<Matrix2x2> &Queue = *static_cast<concurrent_queue<Matrix2x2> *>(p);
00363         Matrix2x2 m1, m2;
00364         // get two elements
00365         while( !Queue.try_pop( m1 ) ) this_tbb_thread::yield(); 
00366         while( !Queue.try_pop( m2 ) ) this_tbb_thread::yield();
00367         m1 = m1 * m2; // process them
00368         Queue.push( m1 ); // and push back
00369         return this; // just nothing
00370     }
00371 };
00373 value ParallelPipeFib(int n)
00374 {
00375     InputFilter input( n-1 );
00376     MultiplyFilter process;
00377     // Create the pipeline
00378     pipeline pipeline;
00379     // add filters
00380     pipeline.add_filter( input ); // first
00381     pipeline.add_filter( process ); // second
00382 
00383     input.Queue.push( Matrix1110 );
00384     // Run the pipeline
00385     pipeline.run( n ); // must be larger then max threads number
00386     pipeline.clear(); // do not forget clear the pipeline
00387 
00388     assert( input.Queue.unsafe_size()==1 );
00389     Matrix2x2 M; 
00390     bool result = input.Queue.try_pop( M ); // get last element
00391     assert( result );
00392     return M.v[0][0]; // get value
00393 }
00394 
00395 // *** parallel_reduce *** //
00396 
00398 struct parallel_reduceFibBody {
00399     Matrix2x2 sum;
00400     int splitted;  //< flag to make one less operation for splitted bodies
00402     parallel_reduceFibBody() : sum( Matrix1110 ), splitted(0) { }
00404     parallel_reduceFibBody( parallel_reduceFibBody& other, split ) : sum( Matrix1110 ), splitted(1/*note that it is splitted*/) {}
00406     void join( parallel_reduceFibBody &s ) {
00407         sum = sum * s.sum;
00408     }
00410     void operator()( const blocked_range<int> &r ) {
00411         for( int k = r.begin() + splitted; k < r.end(); ++k )
00412             sum = sum * Matrix1110;
00413         splitted = 0; // reset flag, because this method can be reused for next range
00414     }
00415 };
00417 value parallel_reduceFib(int n)
00418 {
00419     parallel_reduceFibBody b;
00420     parallel_reduce(blocked_range<int>(2, n, 3), b); // do parallel reduce on range [2, n) for b
00421     return b.sum.v[0][0];
00422 }
00423 
00424 // *** parallel_scan *** //
00425 
00427 struct parallel_scanFibBody {
00428     Matrix2x2 sum;
00429     int first;  // flag to make one less operation for first range
00431     parallel_scanFibBody() : sum( Matrix1110 ), first(1) {}
00433     parallel_scanFibBody( parallel_scanFibBody &b, split) : sum( Matrix1110 ), first(1) {}
00435     void reverse_join( parallel_scanFibBody &a ) {
00436         sum = sum * a.sum;
00437     }
00439     void assign( parallel_scanFibBody &b ) {
00440         sum = b.sum;
00441     }
00443     template<typename T>
00444     void operator()( const blocked_range<int> &r, T) {
00445         // see tag.is_final_scan() for what tag is used
00446         for( int k = r.begin() + first; k < r.end(); ++k )
00447             sum = sum * Matrix1110;
00448         first = 0; // reset flag, because this method can be reused for next range
00449     }
00450 };
00452 value parallel_scanFib(int n)
00453 {
00454     parallel_scanFibBody b;
00455     parallel_scan(blocked_range<int>(1/*one less, because body skip first*/, n, 3), b);
00456     return b.sum.v[0][0];
00457 }
00458 
00459 // *** Raw tasks *** //
00460 
00462 struct FibTask: public task {
00463     const int n;
00464     value& sum;
00465     value x, y;
00466     bool second_phase; //< flag of continuation
00467     // task arguments
00468     FibTask( int n_, value& sum_ ) : 
00469         n(n_), sum(sum_), second_phase(false)
00470     {}
00472     /*override*/ task* execute() {
00473         // Using Lucas' formula here
00474         if( second_phase ) { // children finished
00475             sum = n&1 ? x*x + y*y : x*x - y*y;
00476             return NULL;
00477         }
00478         if( n <= 2 ) {
00479             sum = n!=0;
00480             return NULL;
00481         } else {
00482             recycle_as_continuation();  // repeat this task when children finish
00483             second_phase = true; // mark second phase
00484             FibTask& a = *new( allocate_child() ) FibTask( n/2 + 1, x );
00485             FibTask& b = *new( allocate_child() ) FibTask( n/2 - 1 + (n&1), y );
00486             set_ref_count(2);
00487             spawn( a );
00488             return &b;
00489         }
00490     }
00491 };
00493 value ParallelTaskFib(int n) { 
00494     value sum;
00495     FibTask& a = *new(task::allocate_root()) FibTask(n, sum);
00496     task::spawn_root_and_wait(a);
00497     return sum;
00498 }
00499 
00501 
00503 struct IntRange {
00504     int low;
00505     int high;
00506     void set_from_string( const char* s );
00507     IntRange( int low_, int high_ ) : low(low_), high(high_) {}
00508 };
00509 
00510 void IntRange::set_from_string( const char* s ) {
00511     char* end;
00512     high = low = strtol(s,&end,0);
00513     switch( *end ) {
00514     case ':': 
00515         high = strtol(end+1,0,0); 
00516         break;
00517     case '\0':
00518         break;
00519     default:
00520         printf("unexpected character = %c\n",*end);
00521     }
00522 }
00523 
00525 static tick_count t0;
00526 
00528 static bool Verbose = false;
00529 
00530 typedef value (*MeasureFunc)(int);
00532 value Measure(const char *name, MeasureFunc func, int n)
00533 {
00534     value result;
00535     if(Verbose) printf("%s",name);
00536     t0 = tick_count::now();
00537     for(int number = 2; number <= n; number++)
00538         result = func(number);
00539     if(Verbose) printf("\t- in %f msec\n", (tick_count::now() - t0).seconds()*1000);
00540     return result;
00541 }
00542 
00544 int main(int argc, char* argv[])
00545 {
00546     if(argc>1) Verbose = true;
00547     int NumbersCount = argc>1 ? strtol(argv[1],0,0) : 500;
00548     IntRange NThread(1,4);// Number of threads to use.
00549     if(argc>2) NThread.set_from_string(argv[2]);
00550     unsigned long ntrial = argc>3? (unsigned long)strtoul(argv[3],0,0) : 1;
00551     value result, sum;
00552 
00553     if(Verbose) printf("Fibonacci numbers example. Generating %d numbers..\n",  NumbersCount);
00554 
00555     result = Measure("Serial loop", SerialFib, NumbersCount);
00556     sum = Measure("Serial matrix", SerialMatrixFib, NumbersCount); assert(result == sum);
00557     sum = Measure("Serial vector", SerialVectorFib, NumbersCount); assert(result == sum);
00558     sum = Measure("Serial queue", SerialQueueFib, NumbersCount); assert(result == sum);
00559     // now in parallel
00560     for( unsigned long i=0; i<ntrial; ++i ) {
00561         for(int threads = NThread.low; threads <= NThread.high; threads *= 2)
00562         {
00563             task_scheduler_init scheduler_init(threads);
00564             if(Verbose) printf("\nThreads number is %d\n", threads);
00565 
00566             sum = Measure("Shared serial (mutex)\t", SharedSerialFib<mutex>, NumbersCount); assert(result == sum);
00567             sum = Measure("Shared serial (spin_mutex)", SharedSerialFib<spin_mutex>, NumbersCount); assert(result == sum);
00568             sum = Measure("Shared serial (queuing_mutex)", SharedSerialFib<queuing_mutex>, NumbersCount); assert(result == sum);
00569             sum = Measure("Shared serial (Conc.HashTable)", ConcurrentHashSerialFib, NumbersCount); assert(result == sum);
00570             sum = Measure("Parallel while+for/queue", ParallelQueueFib, NumbersCount); assert(result == sum);
00571             sum = Measure("Parallel pipe/queue\t", ParallelPipeFib, NumbersCount); assert(result == sum);
00572             sum = Measure("Parallel reduce\t\t", parallel_reduceFib, NumbersCount); assert(result == sum);
00573             sum = Measure("Parallel scan\t\t", parallel_scanFib, NumbersCount); assert(result == sum);
00574             sum = Measure("Parallel tasks\t\t", ParallelTaskFib, NumbersCount); assert(result == sum);
00575         }
00576 
00577     #ifdef __GNUC__
00578         if(Verbose) printf("Fibonacci number #%d modulo 2^64 is %lld\n\n", NumbersCount, result);
00579     #else
00580         if(Verbose) printf("Fibonacci number #%d modulo 2^64 is %I64d\n\n", NumbersCount, result);
00581     #endif
00582     }
00583     if(!Verbose) printf("TEST PASSED\n");
00584     return 0;
00585 }
00586 
00587 // Utils
00588 
00589 void Matrix2x2Multiply(const value a[2][2], const value b[2][2], value c[2][2])
00590 {
00591     for( int i = 0; i <= 1; i++)
00592         for( int j = 0; j <= 1; j++)
00593             c[i][j] = a[i][0]*b[0][j] + a[i][1]*b[1][j];
00594 }
00595 
00596 Matrix2x2 Matrix2x2::operator *(const Matrix2x2 &to) const
00597 {
00598     Matrix2x2 result;
00599     Matrix2x2Multiply(v, to.v, result.v);
00600     return result;
00601 }

Copyright © 2007-2010 by The Shadowrun: Awakened Team. This work is licensed under the GNU Lesser General Public License 3.

GNU Lesser General Public License 3 Sourceforge.net