![]() |
Shadowrun: Awakened 29 September 2011 - Build 871
|
00001 #ifndef __THREAD_POOL_H 00002 #define __THREAD_POOL_H 00003 00004 #include "RakMemoryOverride.h" 00005 #include "DS_Queue.h" 00006 #include "SimpleMutex.h" 00007 #include "Export.h" 00008 #include "RakThread.h" 00009 #include "SignaledEvent.h" 00010 00011 #ifdef _MSC_VER 00012 #pragma warning( push ) 00013 #endif 00014 00015 class ThreadDataInterface 00016 { 00017 public: 00018 ThreadDataInterface() {} 00019 virtual ~ThreadDataInterface() {} 00020 00021 virtual void* PerThreadFactory(void *context)=0; 00022 virtual void PerThreadDestructor(void* factoryResult, void *context)=0; 00023 }; 00028 template <class InputType, class OutputType> 00029 struct RAK_DLL_EXPORT ThreadPool 00030 { 00031 ThreadPool(); 00032 ~ThreadPool(); 00033 00040 bool StartThreads(int numThreads, int stackSize, void* (*_perThreadInit)()=0, void (*_perThreadDeinit)(void*)=0); 00041 00042 // Alternate form of _perThreadDataFactory, _perThreadDataDestructor 00043 void SetThreadDataInterface(ThreadDataInterface *tdi, void *context); 00044 00046 void StopThreads(void); 00047 00056 void AddInput(OutputType (*workerThreadCallback)(InputType, bool *returnOutput, void* perThreadData), InputType inputData); 00057 00061 void AddOutput(OutputType outputData); 00062 00065 bool HasOutput(void); 00066 00069 bool HasOutputFast(void); 00070 00073 bool HasInput(void); 00074 00077 bool HasInputFast(void); 00078 00082 OutputType GetOutput(void); 00083 00085 void Clear(void); 00086 00089 void LockInput(void); 00090 00092 void UnlockInput(void); 00093 00095 unsigned InputSize(void); 00096 00098 InputType GetInputAtIndex(unsigned index); 00099 00101 void RemoveInputAtIndex(unsigned index); 00102 00105 void LockOutput(void); 00106 00108 void UnlockOutput(void); 00109 00111 unsigned OutputSize(void); 00112 00114 OutputType GetOutputAtIndex(unsigned index); 00115 00117 void RemoveOutputAtIndex(unsigned index); 00118 00120 void ClearInput(void); 00121 00123 void ClearOutput(void); 00124 00126 bool IsWorking(void); 00127 00129 int NumThreadsWorking(void); 00130 00132 bool WasStarted(void); 00133 00134 // Block until all threads are stopped. 00135 bool Pause(void); 00136 00137 // Continue running 00138 void Resume(void); 00139 00140 protected: 00141 // It is valid to cancel input before it is processed. To do so, lock the inputQueue with inputQueueMutex, 00142 // Scan the list, and remove the item you don't want. 00143 RakNet::SimpleMutex inputQueueMutex, outputQueueMutex, workingThreadCountMutex, runThreadsMutex; 00144 00145 void* (*perThreadDataFactory)(); 00146 void (*perThreadDataDestructor)(void*); 00147 00148 // inputFunctionQueue & inputQueue are paired arrays so if you delete from one at a particular index you must delete from the other 00149 // at the same index 00150 DataStructures::Queue<OutputType (*)(InputType, bool *, void*)> inputFunctionQueue; 00151 DataStructures::Queue<InputType> inputQueue; 00152 DataStructures::Queue<OutputType> outputQueue; 00153 00154 ThreadDataInterface *threadDataInterface; 00155 void *tdiContext; 00156 00157 00158 template <class ThreadInputType, class ThreadOutputType> 00159 friend RAK_THREAD_DECLARATION(WorkerThread); 00160 00161 /* 00162 #ifdef _WIN32 00163 friend unsigned __stdcall WorkerThread( LPVOID arguments ); 00164 #else 00165 friend void* WorkerThread( void* arguments ); 00166 #endif 00167 */ 00168 00170 bool runThreads; 00172 int numThreadsRunning; 00174 int numThreadsWorking; 00176 RakNet::SimpleMutex numThreadsRunningMutex; 00177 00178 RakNet::SignaledEvent quitAndIncomingDataEvents; 00179 00180 00181 00182 00183 }; 00184 00185 #include "ThreadPool.h" 00186 #include "RakSleep.h" 00187 #ifdef _WIN32 00188 00189 #else 00190 #include <unistd.h> 00191 #endif 00192 00193 #ifdef _MSC_VER 00194 #pragma warning(disable:4127) 00195 #pragma warning( disable : 4701 ) // potentially uninitialized local variable 'inputData' used 00196 #endif 00197 00198 template <class ThreadInputType, class ThreadOutputType> 00199 RAK_THREAD_DECLARATION(WorkerThread) 00200 /* 00201 #ifdef _WIN32 00202 unsigned __stdcall WorkerThread( LPVOID arguments ) 00203 #else 00204 void* WorkerThread( void* arguments ) 00205 #endif 00206 */ 00207 { 00208 00209 00210 00211 ThreadPool<ThreadInputType, ThreadOutputType> *threadPool = (ThreadPool<ThreadInputType, ThreadOutputType>*) arguments; 00212 00213 00214 bool returnOutput; 00215 ThreadOutputType (*userCallback)(ThreadInputType, bool *, void*); 00216 ThreadInputType inputData; 00217 ThreadOutputType callbackOutput; 00218 00219 userCallback=0; 00220 00221 void *perThreadData; 00222 if (threadPool->perThreadDataFactory) 00223 perThreadData=threadPool->perThreadDataFactory(); 00224 else if (threadPool->threadDataInterface) 00225 perThreadData=threadPool->threadDataInterface->PerThreadFactory(threadPool->tdiContext); 00226 else 00227 perThreadData=0; 00228 00229 // Increase numThreadsRunning 00230 threadPool->numThreadsRunningMutex.Lock(); 00231 ++threadPool->numThreadsRunning; 00232 threadPool->numThreadsRunningMutex.Unlock(); 00233 00234 while (1) 00235 { 00236 #ifdef _WIN32 00237 if (userCallback==0) 00238 { 00239 threadPool->quitAndIncomingDataEvents.WaitOnEvent(INFINITE); 00240 } 00241 #endif 00242 00243 threadPool->runThreadsMutex.Lock(); 00244 if (threadPool->runThreads==false) 00245 { 00246 threadPool->runThreadsMutex.Unlock(); 00247 break; 00248 } 00249 threadPool->runThreadsMutex.Unlock(); 00250 00251 threadPool->workingThreadCountMutex.Lock(); 00252 ++threadPool->numThreadsWorking; 00253 threadPool->workingThreadCountMutex.Unlock(); 00254 00255 // Read input data 00256 userCallback=0; 00257 threadPool->inputQueueMutex.Lock(); 00258 if (threadPool->inputFunctionQueue.Size()) 00259 { 00260 userCallback=threadPool->inputFunctionQueue.Pop(); 00261 inputData=threadPool->inputQueue.Pop(); 00262 } 00263 threadPool->inputQueueMutex.Unlock(); 00264 00265 if (userCallback) 00266 { 00267 callbackOutput=userCallback(inputData, &returnOutput,perThreadData); 00268 if (returnOutput) 00269 { 00270 threadPool->outputQueueMutex.Lock(); 00271 threadPool->outputQueue.Push(callbackOutput, _FILE_AND_LINE_ ); 00272 threadPool->outputQueueMutex.Unlock(); 00273 } 00274 } 00275 00276 threadPool->workingThreadCountMutex.Lock(); 00277 --threadPool->numThreadsWorking; 00278 threadPool->workingThreadCountMutex.Unlock(); 00279 } 00280 00281 // Decrease numThreadsRunning 00282 threadPool->numThreadsRunningMutex.Lock(); 00283 --threadPool->numThreadsRunning; 00284 threadPool->numThreadsRunningMutex.Unlock(); 00285 00286 if (threadPool->perThreadDataDestructor) 00287 threadPool->perThreadDataDestructor(perThreadData); 00288 else if (threadPool->threadDataInterface) 00289 threadPool->threadDataInterface->PerThreadDestructor(perThreadData, threadPool->tdiContext); 00290 00291 return 0; 00292 } 00293 template <class InputType, class OutputType> 00294 ThreadPool<InputType, OutputType>::ThreadPool() 00295 { 00296 runThreads=false; 00297 numThreadsRunning=0; 00298 threadDataInterface=0; 00299 tdiContext=0; 00300 numThreadsWorking=0; 00301 00302 } 00303 template <class InputType, class OutputType> 00304 ThreadPool<InputType, OutputType>::~ThreadPool() 00305 { 00306 StopThreads(); 00307 Clear(); 00308 } 00309 template <class InputType, class OutputType> 00310 bool ThreadPool<InputType, OutputType>::StartThreads(int numThreads, int stackSize, void* (*_perThreadDataFactory)(), void (*_perThreadDataDestructor)(void *)) 00311 { 00312 (void) stackSize; 00313 00314 00315 00316 00317 00318 runThreadsMutex.Lock(); 00319 if (runThreads==true) 00320 { 00321 // Already running 00322 runThreadsMutex.Unlock(); 00323 return false; 00324 } 00325 runThreadsMutex.Unlock(); 00326 00327 quitAndIncomingDataEvents.InitEvent(); 00328 00329 perThreadDataFactory=_perThreadDataFactory; 00330 perThreadDataDestructor=_perThreadDataDestructor; 00331 00332 runThreadsMutex.Lock(); 00333 runThreads=true; 00334 runThreadsMutex.Unlock(); 00335 00336 numThreadsWorking=0; 00337 unsigned threadId = 0; 00338 (void) threadId; 00339 int i; 00340 for (i=0; i < numThreads; i++) 00341 { 00342 int errorCode; 00343 00344 00345 00346 errorCode = RakNet::RakThread::Create(WorkerThread<InputType, OutputType>, this); 00347 00348 if (errorCode!=0) 00349 { 00350 StopThreads(); 00351 return false; 00352 } 00353 } 00354 // Wait for number of threads running to increase to numThreads 00355 bool done=false; 00356 while (done==false) 00357 { 00358 RakSleep(50); 00359 numThreadsRunningMutex.Lock(); 00360 if (numThreadsRunning==numThreads) 00361 done=true; 00362 numThreadsRunningMutex.Unlock(); 00363 } 00364 00365 return true; 00366 } 00367 template <class InputType, class OutputType> 00368 void ThreadPool<InputType, OutputType>::SetThreadDataInterface(ThreadDataInterface *tdi, void *context) 00369 { 00370 threadDataInterface=tdi; 00371 tdiContext=context; 00372 } 00373 template <class InputType, class OutputType> 00374 void ThreadPool<InputType, OutputType>::StopThreads(void) 00375 { 00376 runThreadsMutex.Lock(); 00377 if (runThreads==false) 00378 { 00379 runThreadsMutex.Unlock(); 00380 return; 00381 } 00382 00383 runThreads=false; 00384 runThreadsMutex.Unlock(); 00385 00386 // Wait for number of threads running to decrease to 0 00387 bool done=false; 00388 while (done==false) 00389 { 00390 quitAndIncomingDataEvents.SetEvent(); 00391 00392 RakSleep(50); 00393 numThreadsRunningMutex.Lock(); 00394 if (numThreadsRunning==0) 00395 done=true; 00396 numThreadsRunningMutex.Unlock(); 00397 } 00398 00399 quitAndIncomingDataEvents.CloseEvent(); 00400 00401 00402 00403 00404 00405 00406 } 00407 template <class InputType, class OutputType> 00408 void ThreadPool<InputType, OutputType>::AddInput(OutputType (*workerThreadCallback)(InputType, bool *returnOutput, void* perThreadData), InputType inputData) 00409 { 00410 inputQueueMutex.Lock(); 00411 inputQueue.Push(inputData, _FILE_AND_LINE_ ); 00412 inputFunctionQueue.Push(workerThreadCallback, _FILE_AND_LINE_ ); 00413 inputQueueMutex.Unlock(); 00414 00415 quitAndIncomingDataEvents.SetEvent(); 00416 } 00417 template <class InputType, class OutputType> 00418 void ThreadPool<InputType, OutputType>::AddOutput(OutputType outputData) 00419 { 00420 outputQueueMutex.Lock(); 00421 outputQueue.Push(outputData, _FILE_AND_LINE_ ); 00422 outputQueueMutex.Unlock(); 00423 } 00424 template <class InputType, class OutputType> 00425 bool ThreadPool<InputType, OutputType>::HasOutputFast(void) 00426 { 00427 return outputQueue.IsEmpty()==false; 00428 } 00429 template <class InputType, class OutputType> 00430 bool ThreadPool<InputType, OutputType>::HasOutput(void) 00431 { 00432 bool res; 00433 outputQueueMutex.Lock(); 00434 res=outputQueue.IsEmpty()==false; 00435 outputQueueMutex.Unlock(); 00436 return res; 00437 } 00438 template <class InputType, class OutputType> 00439 bool ThreadPool<InputType, OutputType>::HasInputFast(void) 00440 { 00441 return inputQueue.IsEmpty()==false; 00442 } 00443 template <class InputType, class OutputType> 00444 bool ThreadPool<InputType, OutputType>::HasInput(void) 00445 { 00446 bool res; 00447 inputQueueMutex.Lock(); 00448 res=inputQueue.IsEmpty()==false; 00449 inputQueueMutex.Unlock(); 00450 return res; 00451 } 00452 template <class InputType, class OutputType> 00453 OutputType ThreadPool<InputType, OutputType>::GetOutput(void) 00454 { 00455 // Real output check 00456 OutputType output; 00457 outputQueueMutex.Lock(); 00458 output=outputQueue.Pop(); 00459 outputQueueMutex.Unlock(); 00460 return output; 00461 } 00462 template <class InputType, class OutputType> 00463 void ThreadPool<InputType, OutputType>::Clear(void) 00464 { 00465 runThreadsMutex.Lock(); 00466 if (runThreads) 00467 { 00468 runThreadsMutex.Unlock(); 00469 inputQueueMutex.Lock(); 00470 inputFunctionQueue.Clear(_FILE_AND_LINE_); 00471 inputQueue.Clear(_FILE_AND_LINE_); 00472 inputQueueMutex.Unlock(); 00473 00474 outputQueueMutex.Lock(); 00475 outputQueue.Clear(_FILE_AND_LINE_); 00476 outputQueueMutex.Unlock(); 00477 } 00478 else 00479 { 00480 inputFunctionQueue.Clear(_FILE_AND_LINE_); 00481 inputQueue.Clear(_FILE_AND_LINE_); 00482 outputQueue.Clear(_FILE_AND_LINE_); 00483 } 00484 } 00485 template <class InputType, class OutputType> 00486 void ThreadPool<InputType, OutputType>::LockInput(void) 00487 { 00488 inputQueueMutex.Lock(); 00489 } 00490 template <class InputType, class OutputType> 00491 void ThreadPool<InputType, OutputType>::UnlockInput(void) 00492 { 00493 inputQueueMutex.Unlock(); 00494 } 00495 template <class InputType, class OutputType> 00496 unsigned ThreadPool<InputType, OutputType>::InputSize(void) 00497 { 00498 return inputQueue.Size(); 00499 } 00500 template <class InputType, class OutputType> 00501 InputType ThreadPool<InputType, OutputType>::GetInputAtIndex(unsigned index) 00502 { 00503 return inputQueue[index]; 00504 } 00505 template <class InputType, class OutputType> 00506 void ThreadPool<InputType, OutputType>::RemoveInputAtIndex(unsigned index) 00507 { 00508 inputQueue.RemoveAtIndex(index); 00509 inputFunctionQueue.RemoveAtIndex(index); 00510 } 00511 template <class InputType, class OutputType> 00512 void ThreadPool<InputType, OutputType>::LockOutput(void) 00513 { 00514 outputQueueMutex.Lock(); 00515 } 00516 template <class InputType, class OutputType> 00517 void ThreadPool<InputType, OutputType>::UnlockOutput(void) 00518 { 00519 outputQueueMutex.Unlock(); 00520 } 00521 template <class InputType, class OutputType> 00522 unsigned ThreadPool<InputType, OutputType>::OutputSize(void) 00523 { 00524 return outputQueue.Size(); 00525 } 00526 template <class InputType, class OutputType> 00527 OutputType ThreadPool<InputType, OutputType>::GetOutputAtIndex(unsigned index) 00528 { 00529 return outputQueue[index]; 00530 } 00531 template <class InputType, class OutputType> 00532 void ThreadPool<InputType, OutputType>::RemoveOutputAtIndex(unsigned index) 00533 { 00534 outputQueue.RemoveAtIndex(index); 00535 } 00536 template <class InputType, class OutputType> 00537 void ThreadPool<InputType, OutputType>::ClearInput(void) 00538 { 00539 inputQueue.Clear(_FILE_AND_LINE_); 00540 inputFunctionQueue.Clear(_FILE_AND_LINE_); 00541 } 00542 00543 template <class InputType, class OutputType> 00544 void ThreadPool<InputType, OutputType>::ClearOutput(void) 00545 { 00546 outputQueue.Clear(_FILE_AND_LINE_); 00547 } 00548 template <class InputType, class OutputType> 00549 bool ThreadPool<InputType, OutputType>::IsWorking(void) 00550 { 00551 bool isWorking; 00552 // workingThreadCountMutex.Lock(); 00553 // isWorking=numThreadsWorking!=0; 00554 // workingThreadCountMutex.Unlock(); 00555 00556 // if (isWorking) 00557 // return true; 00558 00559 // Bug fix: Originally the order of these two was reversed. 00560 // It's possible with the thread timing that working could have been false, then it picks up the data in the other thread, then it checks 00561 // here and sees there is no data. So it thinks the thread is not working when it was. 00562 if (HasOutputFast() && HasOutput()) 00563 return true; 00564 00565 if (HasInputFast() && HasInput()) 00566 return true; 00567 00568 // Need to check is working again, in case the thread was between the first and second checks 00569 workingThreadCountMutex.Lock(); 00570 isWorking=numThreadsWorking!=0; 00571 workingThreadCountMutex.Unlock(); 00572 00573 return isWorking; 00574 } 00575 00576 template <class InputType, class OutputType> 00577 int ThreadPool<InputType, OutputType>::NumThreadsWorking(void) 00578 { 00579 return numThreadsWorking; 00580 } 00581 00582 template <class InputType, class OutputType> 00583 bool ThreadPool<InputType, OutputType>::WasStarted(void) 00584 { 00585 bool b; 00586 runThreadsMutex.Lock(); 00587 b = runThreads; 00588 runThreadsMutex.Unlock(); 00589 return b; 00590 } 00591 template <class InputType, class OutputType> 00592 bool ThreadPool<InputType, OutputType>::Pause(void) 00593 { 00594 if (WasStarted()==false) 00595 return false; 00596 00597 workingThreadCountMutex.Lock(); 00598 while (numThreadsWorking>0) 00599 { 00600 RakSleep(30); 00601 } 00602 return true; 00603 } 00604 template <class InputType, class OutputType> 00605 void ThreadPool<InputType, OutputType>::Resume(void) 00606 { 00607 workingThreadCountMutex.Unlock(); 00608 } 00609 00610 #ifdef _MSC_VER 00611 #pragma warning( pop ) 00612 #endif 00613 00614 #endif 00615
Copyright © 2007-2010 by The Shadowrun: Awakened Team. This work is licensed under the GNU Lesser General Public License 3.