Shadowrun: Awakened 29 September 2011 - Build 871
ThreadPool.h
Go to the documentation of this file.
00001 #ifndef __THREAD_POOL_H
00002 #define __THREAD_POOL_H
00003 
00004 #include "RakMemoryOverride.h"
00005 #include "DS_Queue.h"
00006 #include "SimpleMutex.h"
00007 #include "Export.h"
00008 #include "RakThread.h"
00009 #include "SignaledEvent.h"
00010 
00011 #ifdef _MSC_VER
00012 #pragma warning( push )
00013 #endif
00014 
00015 class ThreadDataInterface
00016 {
00017 public:
00018     ThreadDataInterface() {}
00019     virtual ~ThreadDataInterface() {}
00020 
00021     virtual void* PerThreadFactory(void *context)=0;
00022     virtual void PerThreadDestructor(void* factoryResult, void *context)=0;
00023 };
00028 template <class InputType, class OutputType>
00029 struct RAK_DLL_EXPORT ThreadPool
00030 {
00031     ThreadPool();
00032     ~ThreadPool();
00033 
00040     bool StartThreads(int numThreads, int stackSize, void* (*_perThreadInit)()=0, void (*_perThreadDeinit)(void*)=0);
00041 
00042     // Alternate form of _perThreadDataFactory, _perThreadDataDestructor
00043     void SetThreadDataInterface(ThreadDataInterface *tdi, void *context);
00044 
00046     void StopThreads(void);
00047 
00056     void AddInput(OutputType (*workerThreadCallback)(InputType, bool *returnOutput, void* perThreadData), InputType inputData);
00057 
00061     void AddOutput(OutputType outputData);
00062 
00065     bool HasOutput(void);
00066 
00069     bool HasOutputFast(void);
00070 
00073     bool HasInput(void);
00074 
00077     bool HasInputFast(void);
00078 
00082     OutputType GetOutput(void);
00083 
00085     void Clear(void);
00086 
00089     void LockInput(void);
00090 
00092     void UnlockInput(void);
00093 
00095     unsigned InputSize(void);
00096 
00098     InputType GetInputAtIndex(unsigned index);
00099 
00101     void RemoveInputAtIndex(unsigned index);
00102 
00105     void LockOutput(void);
00106     
00108     void UnlockOutput(void);
00109 
00111     unsigned OutputSize(void);
00112 
00114     OutputType GetOutputAtIndex(unsigned index);
00115 
00117     void RemoveOutputAtIndex(unsigned index);
00118 
00120     void ClearInput(void);
00121 
00123     void ClearOutput(void);
00124 
00126     bool IsWorking(void);
00127 
00129     int NumThreadsWorking(void);
00130 
00132     bool WasStarted(void);
00133 
00134     // Block until all threads are stopped.
00135     bool Pause(void);
00136 
00137     // Continue running
00138     void Resume(void);
00139 
00140 protected:
00141     // It is valid to cancel input before it is processed.  To do so, lock the inputQueue with inputQueueMutex,
00142     // Scan the list, and remove the item you don't want.
00143     RakNet::SimpleMutex inputQueueMutex, outputQueueMutex, workingThreadCountMutex, runThreadsMutex;
00144 
00145     void* (*perThreadDataFactory)();
00146     void (*perThreadDataDestructor)(void*);
00147 
00148     // inputFunctionQueue & inputQueue are paired arrays so if you delete from one at a particular index you must delete from the other
00149     // at the same index
00150     DataStructures::Queue<OutputType (*)(InputType, bool *, void*)> inputFunctionQueue;
00151     DataStructures::Queue<InputType> inputQueue;
00152     DataStructures::Queue<OutputType> outputQueue;
00153 
00154     ThreadDataInterface *threadDataInterface;
00155     void *tdiContext;
00156 
00157     
00158     template <class ThreadInputType, class ThreadOutputType>
00159     friend RAK_THREAD_DECLARATION(WorkerThread);
00160 
00161     /*
00162 #ifdef _WIN32
00163     friend unsigned __stdcall WorkerThread( LPVOID arguments );
00164 #else
00165     friend void* WorkerThread( void* arguments );
00166 #endif
00167     */
00168 
00170     bool runThreads;
00172     int numThreadsRunning;
00174     int numThreadsWorking;
00176     RakNet::SimpleMutex numThreadsRunningMutex;
00177 
00178     RakNet::SignaledEvent quitAndIncomingDataEvents;
00179 
00180 
00181 
00182 
00183 };
00184 
00185 #include "ThreadPool.h"
00186 #include "RakSleep.h"
00187 #ifdef _WIN32
00188 
00189 #else
00190 #include <unistd.h>
00191 #endif
00192 
00193 #ifdef _MSC_VER
00194 #pragma warning(disable:4127)
00195 #pragma warning( disable : 4701 )  // potentially uninitialized local variable 'inputData' used
00196 #endif
00197 
00198 template <class ThreadInputType, class ThreadOutputType>
00199 RAK_THREAD_DECLARATION(WorkerThread)
00200 /*
00201 #ifdef _WIN32
00202 unsigned __stdcall WorkerThread( LPVOID arguments )
00203 #else
00204 void* WorkerThread( void* arguments )
00205 #endif
00206 */
00207 {
00208 
00209 
00210 
00211     ThreadPool<ThreadInputType, ThreadOutputType> *threadPool = (ThreadPool<ThreadInputType, ThreadOutputType>*) arguments;
00212 
00213 
00214     bool returnOutput;
00215     ThreadOutputType (*userCallback)(ThreadInputType, bool *, void*);
00216     ThreadInputType inputData;
00217     ThreadOutputType callbackOutput;
00218 
00219     userCallback=0;
00220 
00221     void *perThreadData;
00222     if (threadPool->perThreadDataFactory)
00223         perThreadData=threadPool->perThreadDataFactory();
00224     else if (threadPool->threadDataInterface)
00225         perThreadData=threadPool->threadDataInterface->PerThreadFactory(threadPool->tdiContext);
00226     else
00227         perThreadData=0;
00228 
00229     // Increase numThreadsRunning
00230     threadPool->numThreadsRunningMutex.Lock();
00231     ++threadPool->numThreadsRunning;
00232     threadPool->numThreadsRunningMutex.Unlock();
00233 
00234     while (1)
00235     {
00236 #ifdef _WIN32
00237         if (userCallback==0)
00238         {
00239             threadPool->quitAndIncomingDataEvents.WaitOnEvent(INFINITE);
00240         }       
00241 #endif
00242 
00243         threadPool->runThreadsMutex.Lock();
00244         if (threadPool->runThreads==false)
00245         {
00246             threadPool->runThreadsMutex.Unlock();
00247             break;
00248         }
00249         threadPool->runThreadsMutex.Unlock();
00250 
00251         threadPool->workingThreadCountMutex.Lock();
00252         ++threadPool->numThreadsWorking;
00253         threadPool->workingThreadCountMutex.Unlock();
00254 
00255         // Read input data
00256         userCallback=0;
00257         threadPool->inputQueueMutex.Lock();
00258         if (threadPool->inputFunctionQueue.Size())
00259         {
00260             userCallback=threadPool->inputFunctionQueue.Pop();
00261             inputData=threadPool->inputQueue.Pop();
00262         }
00263         threadPool->inputQueueMutex.Unlock();
00264 
00265         if (userCallback)
00266         {
00267             callbackOutput=userCallback(inputData, &returnOutput,perThreadData);
00268             if (returnOutput)
00269             {
00270                 threadPool->outputQueueMutex.Lock();
00271                 threadPool->outputQueue.Push(callbackOutput, _FILE_AND_LINE_ );
00272                 threadPool->outputQueueMutex.Unlock();
00273             }           
00274         }
00275 
00276         threadPool->workingThreadCountMutex.Lock();
00277         --threadPool->numThreadsWorking;
00278         threadPool->workingThreadCountMutex.Unlock();
00279     }
00280 
00281     // Decrease numThreadsRunning
00282     threadPool->numThreadsRunningMutex.Lock();
00283     --threadPool->numThreadsRunning;
00284     threadPool->numThreadsRunningMutex.Unlock();
00285     
00286     if (threadPool->perThreadDataDestructor)
00287         threadPool->perThreadDataDestructor(perThreadData);
00288     else if (threadPool->threadDataInterface)
00289         threadPool->threadDataInterface->PerThreadDestructor(perThreadData, threadPool->tdiContext);
00290 
00291     return 0;
00292 }
00293 template <class InputType, class OutputType>
00294 ThreadPool<InputType, OutputType>::ThreadPool()
00295 {
00296     runThreads=false;
00297     numThreadsRunning=0;
00298     threadDataInterface=0;
00299     tdiContext=0;
00300     numThreadsWorking=0;
00301 
00302 }
00303 template <class InputType, class OutputType>
00304 ThreadPool<InputType, OutputType>::~ThreadPool()
00305 {
00306     StopThreads();
00307     Clear();
00308 }
00309 template <class InputType, class OutputType>
00310 bool ThreadPool<InputType, OutputType>::StartThreads(int numThreads, int stackSize, void* (*_perThreadDataFactory)(), void (*_perThreadDataDestructor)(void *))
00311 {
00312     (void) stackSize;
00313 
00314 
00315 
00316 
00317 
00318     runThreadsMutex.Lock();
00319     if (runThreads==true)
00320     {
00321         // Already running
00322         runThreadsMutex.Unlock();
00323         return false;
00324     }
00325     runThreadsMutex.Unlock();
00326 
00327     quitAndIncomingDataEvents.InitEvent();
00328 
00329     perThreadDataFactory=_perThreadDataFactory;
00330     perThreadDataDestructor=_perThreadDataDestructor;
00331 
00332     runThreadsMutex.Lock();
00333     runThreads=true;
00334     runThreadsMutex.Unlock();
00335 
00336     numThreadsWorking=0;
00337     unsigned threadId = 0;
00338     (void) threadId;
00339     int i;
00340     for (i=0; i < numThreads; i++)
00341     {
00342         int errorCode;
00343 
00344 
00345 
00346         errorCode = RakNet::RakThread::Create(WorkerThread<InputType, OutputType>, this);
00347 
00348         if (errorCode!=0)
00349         {
00350             StopThreads();
00351             return false;
00352         }
00353     }
00354     // Wait for number of threads running to increase to numThreads
00355     bool done=false;
00356     while (done==false)
00357     {
00358         RakSleep(50);
00359         numThreadsRunningMutex.Lock();
00360         if (numThreadsRunning==numThreads)
00361             done=true;
00362         numThreadsRunningMutex.Unlock();
00363     }
00364 
00365     return true;
00366 }
00367 template <class InputType, class OutputType>
00368 void ThreadPool<InputType, OutputType>::SetThreadDataInterface(ThreadDataInterface *tdi, void *context)
00369 {
00370     threadDataInterface=tdi;
00371     tdiContext=context;
00372 }
00373 template <class InputType, class OutputType>
00374 void ThreadPool<InputType, OutputType>::StopThreads(void)
00375 {
00376     runThreadsMutex.Lock();
00377     if (runThreads==false)
00378     {
00379         runThreadsMutex.Unlock();
00380         return;
00381     }
00382 
00383     runThreads=false;
00384     runThreadsMutex.Unlock();
00385 
00386     // Wait for number of threads running to decrease to 0
00387     bool done=false;
00388     while (done==false)
00389     {
00390         quitAndIncomingDataEvents.SetEvent();
00391 
00392         RakSleep(50);
00393         numThreadsRunningMutex.Lock();
00394         if (numThreadsRunning==0)
00395             done=true;
00396         numThreadsRunningMutex.Unlock();
00397     }
00398 
00399     quitAndIncomingDataEvents.CloseEvent();
00400 
00401 
00402 
00403 
00404 
00405 
00406 }
00407 template <class InputType, class OutputType>
00408 void ThreadPool<InputType, OutputType>::AddInput(OutputType (*workerThreadCallback)(InputType, bool *returnOutput, void* perThreadData), InputType inputData)
00409 {
00410     inputQueueMutex.Lock();
00411     inputQueue.Push(inputData, _FILE_AND_LINE_ );
00412     inputFunctionQueue.Push(workerThreadCallback, _FILE_AND_LINE_ );
00413     inputQueueMutex.Unlock();
00414 
00415     quitAndIncomingDataEvents.SetEvent();
00416 }
00417 template <class InputType, class OutputType>
00418 void ThreadPool<InputType, OutputType>::AddOutput(OutputType outputData)
00419 {
00420     outputQueueMutex.Lock();
00421     outputQueue.Push(outputData, _FILE_AND_LINE_ );
00422     outputQueueMutex.Unlock();
00423 }
00424 template <class InputType, class OutputType>
00425 bool ThreadPool<InputType, OutputType>::HasOutputFast(void)
00426 {
00427     return outputQueue.IsEmpty()==false;
00428 }
00429 template <class InputType, class OutputType>
00430 bool ThreadPool<InputType, OutputType>::HasOutput(void)
00431 {
00432     bool res;
00433     outputQueueMutex.Lock();
00434     res=outputQueue.IsEmpty()==false;
00435     outputQueueMutex.Unlock();
00436     return res;
00437 }
00438 template <class InputType, class OutputType>
00439 bool ThreadPool<InputType, OutputType>::HasInputFast(void)
00440 {
00441     return inputQueue.IsEmpty()==false;
00442 }
00443 template <class InputType, class OutputType>
00444 bool ThreadPool<InputType, OutputType>::HasInput(void)
00445 {
00446     bool res;
00447     inputQueueMutex.Lock();
00448     res=inputQueue.IsEmpty()==false;
00449     inputQueueMutex.Unlock();
00450     return res;
00451 }
00452 template <class InputType, class OutputType>
00453 OutputType ThreadPool<InputType, OutputType>::GetOutput(void)
00454 {
00455     // Real output check
00456     OutputType output;
00457     outputQueueMutex.Lock();
00458     output=outputQueue.Pop();
00459     outputQueueMutex.Unlock();
00460     return output;
00461 }
00462 template <class InputType, class OutputType>
00463 void ThreadPool<InputType, OutputType>::Clear(void)
00464 {
00465     runThreadsMutex.Lock();
00466     if (runThreads)
00467     {
00468         runThreadsMutex.Unlock();
00469         inputQueueMutex.Lock();
00470         inputFunctionQueue.Clear(_FILE_AND_LINE_);
00471         inputQueue.Clear(_FILE_AND_LINE_);
00472         inputQueueMutex.Unlock();
00473 
00474         outputQueueMutex.Lock();
00475         outputQueue.Clear(_FILE_AND_LINE_);
00476         outputQueueMutex.Unlock();
00477     }
00478     else
00479     {
00480         inputFunctionQueue.Clear(_FILE_AND_LINE_);
00481         inputQueue.Clear(_FILE_AND_LINE_);
00482         outputQueue.Clear(_FILE_AND_LINE_);
00483     }
00484 }
00485 template <class InputType, class OutputType>
00486 void ThreadPool<InputType, OutputType>::LockInput(void)
00487 {
00488     inputQueueMutex.Lock();
00489 }
00490 template <class InputType, class OutputType>
00491 void ThreadPool<InputType, OutputType>::UnlockInput(void)
00492 {
00493     inputQueueMutex.Unlock();
00494 }
00495 template <class InputType, class OutputType>
00496 unsigned ThreadPool<InputType, OutputType>::InputSize(void)
00497 {
00498     return inputQueue.Size();
00499 }
00500 template <class InputType, class OutputType>
00501 InputType ThreadPool<InputType, OutputType>::GetInputAtIndex(unsigned index)
00502 {
00503     return inputQueue[index];
00504 }
00505 template <class InputType, class OutputType>
00506 void ThreadPool<InputType, OutputType>::RemoveInputAtIndex(unsigned index)
00507 {
00508     inputQueue.RemoveAtIndex(index);
00509     inputFunctionQueue.RemoveAtIndex(index);
00510 }
00511 template <class InputType, class OutputType>
00512 void ThreadPool<InputType, OutputType>::LockOutput(void)
00513 {
00514     outputQueueMutex.Lock();
00515 }
00516 template <class InputType, class OutputType>
00517 void ThreadPool<InputType, OutputType>::UnlockOutput(void)
00518 {
00519     outputQueueMutex.Unlock();
00520 }
00521 template <class InputType, class OutputType>
00522 unsigned ThreadPool<InputType, OutputType>::OutputSize(void)
00523 {
00524     return outputQueue.Size();
00525 }
00526 template <class InputType, class OutputType>
00527 OutputType ThreadPool<InputType, OutputType>::GetOutputAtIndex(unsigned index)
00528 {
00529     return outputQueue[index];
00530 }
00531 template <class InputType, class OutputType>
00532 void ThreadPool<InputType, OutputType>::RemoveOutputAtIndex(unsigned index)
00533 {
00534     outputQueue.RemoveAtIndex(index);
00535 }
00536 template <class InputType, class OutputType>
00537 void ThreadPool<InputType, OutputType>::ClearInput(void)
00538 {
00539     inputQueue.Clear(_FILE_AND_LINE_);
00540     inputFunctionQueue.Clear(_FILE_AND_LINE_);
00541 }
00542 
00543 template <class InputType, class OutputType>
00544 void ThreadPool<InputType, OutputType>::ClearOutput(void)
00545 {
00546     outputQueue.Clear(_FILE_AND_LINE_);
00547 }
00548 template <class InputType, class OutputType>
00549 bool ThreadPool<InputType, OutputType>::IsWorking(void)
00550 {
00551     bool isWorking;
00552 //  workingThreadCountMutex.Lock();
00553 //  isWorking=numThreadsWorking!=0;
00554 //  workingThreadCountMutex.Unlock();
00555 
00556 //  if (isWorking)
00557 //      return true;
00558 
00559     // Bug fix: Originally the order of these two was reversed.
00560     // It's possible with the thread timing that working could have been false, then it picks up the data in the other thread, then it checks
00561     // here and sees there is no data.  So it thinks the thread is not working when it was.
00562     if (HasOutputFast() && HasOutput())
00563         return true;
00564 
00565     if (HasInputFast() && HasInput())
00566         return true;
00567 
00568     // Need to check is working again, in case the thread was between the first and second checks
00569     workingThreadCountMutex.Lock();
00570     isWorking=numThreadsWorking!=0;
00571     workingThreadCountMutex.Unlock();
00572 
00573     return isWorking;
00574 }
00575 
00576 template <class InputType, class OutputType>
00577 int ThreadPool<InputType, OutputType>::NumThreadsWorking(void)
00578 {
00579     return numThreadsWorking;
00580 }
00581 
00582 template <class InputType, class OutputType>
00583 bool ThreadPool<InputType, OutputType>::WasStarted(void)
00584 {
00585     bool b;
00586     runThreadsMutex.Lock();
00587     b = runThreads;
00588     runThreadsMutex.Unlock();
00589     return b;
00590 }
00591 template <class InputType, class OutputType>
00592 bool ThreadPool<InputType, OutputType>::Pause(void)
00593 {
00594     if (WasStarted()==false)
00595         return false;
00596 
00597     workingThreadCountMutex.Lock();
00598     while (numThreadsWorking>0)
00599     {
00600         RakSleep(30);
00601     }
00602     return true;
00603 }
00604 template <class InputType, class OutputType>
00605 void ThreadPool<InputType, OutputType>::Resume(void)
00606 {
00607     workingThreadCountMutex.Unlock();
00608 }
00609 
00610 #ifdef _MSC_VER
00611 #pragma warning( pop )
00612 #endif
00613 
00614 #endif
00615 

Copyright © 2007-2010 by The Shadowrun: Awakened Team. This work is licensed under the GNU Lesser General Public License 3.

GNU Lesser General Public License 3 Sourceforge.net