![]() |
Shadowrun: Awakened 29 September 2011 - Build 871
|
#include <ThreadPool.h>
Inheritance diagram for ThreadPool< InputType, OutputType >:Public Member Functions | |
| void | AddInput (OutputType(*workerThreadCallback)(InputType, bool *returnOutput, void *perThreadData), InputType inputData) |
| void | AddOutput (OutputType outputData) |
| void | Clear (void) |
| Clears internal buffers. | |
| void | ClearInput (void) |
| Removes all items from the input queue. | |
| void | ClearOutput (void) |
| Removes all items from the output queue. | |
| InputType | GetInputAtIndex (unsigned index) |
| Get the input at a specified index. | |
| OutputType | GetOutput (void) |
| OutputType | GetOutputAtIndex (unsigned index) |
| Get the output at a specified index. | |
| bool | HasInput (void) |
| bool | HasInputFast (void) |
| bool | HasOutput (void) |
| bool | HasOutputFast (void) |
| unsigned | InputSize (void) |
| Length of the input queue. | |
| bool | IsWorking (void) |
| Are any of the threads working, or is input or output available? | |
| void | LockInput (void) |
| void | LockOutput (void) |
| int | NumThreadsWorking (void) |
| The number of currently active threads. | |
| unsigned | OutputSize (void) |
| Length of the output queue. | |
| bool | Pause (void) |
| void | RemoveInputAtIndex (unsigned index) |
| Remove input from a specific index. This does NOT do memory deallocation - it only removes the item from the queue. | |
| void | RemoveOutputAtIndex (unsigned index) |
| Remove output from a specific index. This does NOT do memory deallocation - it only removes the item from the queue. | |
| void | Resume (void) |
| void | SetThreadDataInterface (ThreadDataInterface *tdi, void *context) |
| bool | StartThreads (int numThreads, int stackSize, void *(*_perThreadInit)()=0, void(*_perThreadDeinit)(void *)=0) |
| void | StopThreads (void) |
| Stops all threads. | |
| ThreadPool () | |
| void | UnlockInput (void) |
| Unlock the input buffer after you are done with the functions InputSize, GetInputAtIndex, and RemoveInputAtIndex. | |
| void | UnlockOutput (void) |
| Unlock the output buffer after you are done with the functions OutputSize, GetOutputAtIndex, and RemoveOutputAtIndex. | |
| bool | WasStarted (void) |
| Did we call Start? | |
| ~ThreadPool () | |
Protected Attributes | |
| DataStructures::Queue < OutputType(*)(InputType, bool *, void *)> | inputFunctionQueue |
| DataStructures::Queue< InputType > | inputQueue |
| RakNet::SimpleMutex | inputQueueMutex |
| int | numThreadsRunning |
| RakNet::SimpleMutex | numThreadsRunningMutex |
| int | numThreadsWorking |
| DataStructures::Queue< OutputType > | outputQueue |
| RakNet::SimpleMutex | outputQueueMutex |
| void(* | perThreadDataDestructor )(void *) |
| void *(* | perThreadDataFactory )() |
| RakNet::SignaledEvent | quitAndIncomingDataEvents |
| bool | runThreads |
| RakNet::SimpleMutex | runThreadsMutex |
| void * | tdiContext |
| ThreadDataInterface * | threadDataInterface |
| RakNet::SimpleMutex | workingThreadCountMutex |
Friends | |
| template<class ThreadInputType , class ThreadOutputType > | |
| void * | WorkerThread (void *arguments) |
A simple class to create worker threads that processes a queue of functions with data. This class does not allocate or deallocate memory. It is up to the user to handle memory management. InputType and OutputType are stored directly in a queue. For large structures, if you plan to delete from the middle of the queue, you might wish to store pointers rather than the structures themselves so the array can shift efficiently.
Definition at line 29 of file Include/RakNet/ThreadPool.h.
| ThreadPool< InputType, OutputType >::ThreadPool | ( | ) |
Definition at line 294 of file Include/RakNet/ThreadPool.h.
{
runThreads=false;
numThreadsRunning=0;
threadDataInterface=0;
tdiContext=0;
numThreadsWorking=0;
}
| ThreadPool< InputType, OutputType >::~ThreadPool | ( | ) |
Definition at line 304 of file Include/RakNet/ThreadPool.h.
{
StopThreads();
Clear();
}
| void ThreadPool< InputType, OutputType >::AddInput | ( | OutputType(*)(InputType, bool *returnOutput, void *perThreadData) | workerThreadCallback, |
| InputType | inputData | ||
| ) |
Adds a function to a queue with data to pass to that function. This function will be called from the thread Memory management is your responsibility! This class does not allocate or deallocate memory. The best way to deallocate inputData is in userCallback. If you call EndThreads such that callbacks were not called, you can iterate through the inputQueue and deallocate all pending input data there The best way to deallocate output is as it is returned to you from GetOutput. Similarly, if you end the threads such that not all output was returned, you can iterate through outputQueue and deallocate it there.
| [in] | workerThreadCallback | The function to call from the thread |
| [in] | inputData | The parameter to pass to userCallback |
Definition at line 408 of file Include/RakNet/ThreadPool.h.
References _FILE_AND_LINE_.
{
inputQueueMutex.Lock();
inputQueue.Push(inputData, _FILE_AND_LINE_ );
inputFunctionQueue.Push(workerThreadCallback, _FILE_AND_LINE_ );
inputQueueMutex.Unlock();
quitAndIncomingDataEvents.SetEvent();
}
| void ThreadPool< InputType, OutputType >::AddOutput | ( | OutputType | outputData | ) |
Adds to the output queue Use it if you want to inject output into the same queue that the system uses. Normally you would not use this. Consider it a convenience function.
| [in] | outputData | The output to inject |
Definition at line 418 of file Include/RakNet/ThreadPool.h.
References _FILE_AND_LINE_.
{
outputQueueMutex.Lock();
outputQueue.Push(outputData, _FILE_AND_LINE_ );
outputQueueMutex.Unlock();
}
| void ThreadPool< InputType, OutputType >::Clear | ( | void | ) |
Definition at line 463 of file Include/RakNet/ThreadPool.h.
References _FILE_AND_LINE_.
{
runThreadsMutex.Lock();
if (runThreads)
{
runThreadsMutex.Unlock();
inputQueueMutex.Lock();
inputFunctionQueue.Clear(_FILE_AND_LINE_);
inputQueue.Clear(_FILE_AND_LINE_);
inputQueueMutex.Unlock();
outputQueueMutex.Lock();
outputQueue.Clear(_FILE_AND_LINE_);
outputQueueMutex.Unlock();
}
else
{
inputFunctionQueue.Clear(_FILE_AND_LINE_);
inputQueue.Clear(_FILE_AND_LINE_);
outputQueue.Clear(_FILE_AND_LINE_);
}
}
| void ThreadPool< InputType, OutputType >::ClearInput | ( | void | ) |
Definition at line 537 of file Include/RakNet/ThreadPool.h.
References _FILE_AND_LINE_.
| void ThreadPool< InputType, OutputType >::ClearOutput | ( | void | ) |
Definition at line 544 of file Include/RakNet/ThreadPool.h.
References _FILE_AND_LINE_.
{
outputQueue.Clear(_FILE_AND_LINE_);
}
| InputType ThreadPool< InputType, OutputType >::GetInputAtIndex | ( | unsigned | index | ) |
Definition at line 501 of file Include/RakNet/ThreadPool.h.
{
return inputQueue[index];
}
| OutputType ThreadPool< InputType, OutputType >::GetOutput | ( | void | ) |
Gets the output of a call to userCallback HasOutput must return true before you call this function. Otherwise it will assert.
Definition at line 453 of file Include/RakNet/ThreadPool.h.
{
// Real output check
OutputType output;
outputQueueMutex.Lock();
output=outputQueue.Pop();
outputQueueMutex.Unlock();
return output;
}
| OutputType ThreadPool< InputType, OutputType >::GetOutputAtIndex | ( | unsigned | index | ) |
Definition at line 527 of file Include/RakNet/ThreadPool.h.
{
return outputQueue[index];
}
| bool ThreadPool< InputType, OutputType >::HasInput | ( | void | ) |
Returns true if input from GetInput is waiting.
Definition at line 444 of file Include/RakNet/ThreadPool.h.
{
bool res;
inputQueueMutex.Lock();
res=inputQueue.IsEmpty()==false;
inputQueueMutex.Unlock();
return res;
}
| bool ThreadPool< InputType, OutputType >::HasInputFast | ( | void | ) |
Inaccurate but fast version of HasInput. If this returns true, you should still check HasInput for the real value.
Definition at line 439 of file Include/RakNet/ThreadPool.h.
{
return inputQueue.IsEmpty()==false;
}
| bool ThreadPool< InputType, OutputType >::HasOutput | ( | void | ) |
Returns true if output from GetOutput is waiting.
Definition at line 430 of file Include/RakNet/ThreadPool.h.
{
bool res;
outputQueueMutex.Lock();
res=outputQueue.IsEmpty()==false;
outputQueueMutex.Unlock();
return res;
}
| bool ThreadPool< InputType, OutputType >::HasOutputFast | ( | void | ) |
Inaccurate but fast version of HasOutput. If this returns true, you should still check HasOutput for the real value.
Definition at line 425 of file Include/RakNet/ThreadPool.h.
{
return outputQueue.IsEmpty()==false;
}
| unsigned ThreadPool< InputType, OutputType >::InputSize | ( | void | ) |
Definition at line 496 of file Include/RakNet/ThreadPool.h.
{
return inputQueue.Size();
}
| bool ThreadPool< InputType, OutputType >::IsWorking | ( | void | ) |
Definition at line 549 of file Include/RakNet/ThreadPool.h.
{
bool isWorking;
// workingThreadCountMutex.Lock();
// isWorking=numThreadsWorking!=0;
// workingThreadCountMutex.Unlock();
// if (isWorking)
// return true;
// Bug fix: Originally the order of these two was reversed.
// It's possible with the thread timing that working could have been false, then it picks up the data in the other thread, then it checks
// here and sees there is no data. So it thinks the thread is not working when it was.
if (HasOutputFast() && HasOutput())
return true;
if (HasInputFast() && HasInput())
return true;
// Need to check is working again, in case the thread was between the first and second checks
workingThreadCountMutex.Lock();
isWorking=numThreadsWorking!=0;
workingThreadCountMutex.Unlock();
return isWorking;
}
| void ThreadPool< InputType, OutputType >::LockInput | ( | void | ) |
Lock the input buffer before calling the functions InputSize, InputAtIndex, and RemoveInputAtIndex It is only necessary to lock the input or output while the threads are running
Definition at line 486 of file Include/RakNet/ThreadPool.h.
{
inputQueueMutex.Lock();
}
| void ThreadPool< InputType, OutputType >::LockOutput | ( | void | ) |
Lock the output buffer before calling the functions OutputSize, OutputAtIndex, and RemoveOutputAtIndex It is only necessary to lock the input or output while the threads are running
Definition at line 512 of file Include/RakNet/ThreadPool.h.
{
outputQueueMutex.Lock();
}
| int ThreadPool< InputType, OutputType >::NumThreadsWorking | ( | void | ) |
Definition at line 577 of file Include/RakNet/ThreadPool.h.
{
return numThreadsWorking;
}
| unsigned ThreadPool< InputType, OutputType >::OutputSize | ( | void | ) |
Definition at line 522 of file Include/RakNet/ThreadPool.h.
{
return outputQueue.Size();
}
| bool ThreadPool< InputType, OutputType >::Pause | ( | void | ) |
Definition at line 592 of file Include/RakNet/ThreadPool.h.
References RakSleep().
{
if (WasStarted()==false)
return false;
workingThreadCountMutex.Lock();
while (numThreadsWorking>0)
{
RakSleep(30);
}
return true;
}
| void ThreadPool< InputType, OutputType >::RemoveInputAtIndex | ( | unsigned | index | ) |
Definition at line 506 of file Include/RakNet/ThreadPool.h.
{
inputQueue.RemoveAtIndex(index);
inputFunctionQueue.RemoveAtIndex(index);
}
| void ThreadPool< InputType, OutputType >::RemoveOutputAtIndex | ( | unsigned | index | ) |
Definition at line 532 of file Include/RakNet/ThreadPool.h.
{
outputQueue.RemoveAtIndex(index);
}
| void ThreadPool< InputType, OutputType >::Resume | ( | void | ) |
Definition at line 605 of file Include/RakNet/ThreadPool.h.
{
workingThreadCountMutex.Unlock();
}
| void ThreadPool< InputType, OutputType >::SetThreadDataInterface | ( | ThreadDataInterface * | tdi, |
| void * | context | ||
| ) |
Definition at line 368 of file Include/RakNet/ThreadPool.h.
{
threadDataInterface=tdi;
tdiContext=context;
}
| bool ThreadPool< InputType, OutputType >::StartThreads | ( | int | numThreads, |
| int | stackSize, | ||
| void *(*)() | _perThreadInit = 0, |
||
| void(*)(void *) | _perThreadDeinit = 0 |
||
| ) |
Start the specified number of threads.
| [in] | numThreads | The number of threads to start |
| [in] | stackSize | 0 for default (except on consoles). |
| [in] | _perThreadInit | User callback to return data stored per thread. Pass 0 if not needed. |
| [in] | _perThreadDeinit | User callback to destroy data stored per thread, created by _perThreadInit. Pass 0 if not needed. |
Definition at line 310 of file Include/RakNet/ThreadPool.h.
References RakNet::RakThread::Create(), and RakSleep().
{
(void) stackSize;
runThreadsMutex.Lock();
if (runThreads==true)
{
// Already running
runThreadsMutex.Unlock();
return false;
}
runThreadsMutex.Unlock();
quitAndIncomingDataEvents.InitEvent();
perThreadDataFactory=_perThreadDataFactory;
perThreadDataDestructor=_perThreadDataDestructor;
runThreadsMutex.Lock();
runThreads=true;
runThreadsMutex.Unlock();
numThreadsWorking=0;
unsigned threadId = 0;
(void) threadId;
int i;
for (i=0; i < numThreads; i++)
{
int errorCode;
errorCode = RakNet::RakThread::Create(WorkerThread<InputType, OutputType>, this);
if (errorCode!=0)
{
StopThreads();
return false;
}
}
// Wait for number of threads running to increase to numThreads
bool done=false;
while (done==false)
{
RakSleep(50);
numThreadsRunningMutex.Lock();
if (numThreadsRunning==numThreads)
done=true;
numThreadsRunningMutex.Unlock();
}
return true;
}
| void ThreadPool< InputType, OutputType >::StopThreads | ( | void | ) |
Definition at line 374 of file Include/RakNet/ThreadPool.h.
References RakSleep().
{
runThreadsMutex.Lock();
if (runThreads==false)
{
runThreadsMutex.Unlock();
return;
}
runThreads=false;
runThreadsMutex.Unlock();
// Wait for number of threads running to decrease to 0
bool done=false;
while (done==false)
{
quitAndIncomingDataEvents.SetEvent();
RakSleep(50);
numThreadsRunningMutex.Lock();
if (numThreadsRunning==0)
done=true;
numThreadsRunningMutex.Unlock();
}
quitAndIncomingDataEvents.CloseEvent();
}
| void ThreadPool< InputType, OutputType >::UnlockInput | ( | void | ) |
Definition at line 491 of file Include/RakNet/ThreadPool.h.
{
inputQueueMutex.Unlock();
}
| void ThreadPool< InputType, OutputType >::UnlockOutput | ( | void | ) |
Definition at line 517 of file Include/RakNet/ThreadPool.h.
{
outputQueueMutex.Unlock();
}
| bool ThreadPool< InputType, OutputType >::WasStarted | ( | void | ) |
Definition at line 583 of file Include/RakNet/ThreadPool.h.
{
bool b;
runThreadsMutex.Lock();
b = runThreads;
runThreadsMutex.Unlock();
return b;
}
| void* WorkerThread | ( | void * | arguments | ) | [friend] |
Definition at line 199 of file Include/RakNet/ThreadPool.h.
{
ThreadPool<ThreadInputType, ThreadOutputType> *threadPool = (ThreadPool<ThreadInputType, ThreadOutputType>*) arguments;
bool returnOutput;
ThreadOutputType (*userCallback)(ThreadInputType, bool *, void*);
ThreadInputType inputData;
ThreadOutputType callbackOutput;
userCallback=0;
void *perThreadData;
if (threadPool->perThreadDataFactory)
perThreadData=threadPool->perThreadDataFactory();
else if (threadPool->threadDataInterface)
perThreadData=threadPool->threadDataInterface->PerThreadFactory(threadPool->tdiContext);
else
perThreadData=0;
// Increase numThreadsRunning
threadPool->numThreadsRunningMutex.Lock();
++threadPool->numThreadsRunning;
threadPool->numThreadsRunningMutex.Unlock();
while (1)
{
#ifdef _WIN32
if (userCallback==0)
{
threadPool->quitAndIncomingDataEvents.WaitOnEvent(INFINITE);
}
#endif
threadPool->runThreadsMutex.Lock();
if (threadPool->runThreads==false)
{
threadPool->runThreadsMutex.Unlock();
break;
}
threadPool->runThreadsMutex.Unlock();
threadPool->workingThreadCountMutex.Lock();
++threadPool->numThreadsWorking;
threadPool->workingThreadCountMutex.Unlock();
// Read input data
userCallback=0;
threadPool->inputQueueMutex.Lock();
if (threadPool->inputFunctionQueue.Size())
{
userCallback=threadPool->inputFunctionQueue.Pop();
inputData=threadPool->inputQueue.Pop();
}
threadPool->inputQueueMutex.Unlock();
if (userCallback)
{
callbackOutput=userCallback(inputData, &returnOutput,perThreadData);
if (returnOutput)
{
threadPool->outputQueueMutex.Lock();
threadPool->outputQueue.Push(callbackOutput, _FILE_AND_LINE_ );
threadPool->outputQueueMutex.Unlock();
}
}
threadPool->workingThreadCountMutex.Lock();
--threadPool->numThreadsWorking;
threadPool->workingThreadCountMutex.Unlock();
}
// Decrease numThreadsRunning
threadPool->numThreadsRunningMutex.Lock();
--threadPool->numThreadsRunning;
threadPool->numThreadsRunningMutex.Unlock();
if (threadPool->perThreadDataDestructor)
threadPool->perThreadDataDestructor(perThreadData);
else if (threadPool->threadDataInterface)
threadPool->threadDataInterface->PerThreadDestructor(perThreadData, threadPool->tdiContext);
return 0;
}
DataStructures::Queue<OutputType (*)(InputType, bool *, void*)> ThreadPool< InputType, OutputType >::inputFunctionQueue [protected] |
Definition at line 150 of file Include/RakNet/ThreadPool.h.
Referenced by WorkerThread().
DataStructures::Queue<InputType> ThreadPool< InputType, OutputType >::inputQueue [protected] |
Definition at line 151 of file Include/RakNet/ThreadPool.h.
Referenced by WorkerThread().
RakNet::SimpleMutex ThreadPool< InputType, OutputType >::inputQueueMutex [protected] |
Definition at line 143 of file Include/RakNet/ThreadPool.h.
Referenced by WorkerThread().
int ThreadPool< InputType, OutputType >::numThreadsRunning [protected] |
Definition at line 172 of file Include/RakNet/ThreadPool.h.
Referenced by WorkerThread().
RakNet::SimpleMutex ThreadPool< InputType, OutputType >::numThreadsRunningMutex [protected] |
Definition at line 176 of file Include/RakNet/ThreadPool.h.
Referenced by WorkerThread().
int ThreadPool< InputType, OutputType >::numThreadsWorking [protected] |
Definition at line 174 of file Include/RakNet/ThreadPool.h.
Referenced by WorkerThread().
DataStructures::Queue<OutputType> ThreadPool< InputType, OutputType >::outputQueue [protected] |
Definition at line 152 of file Include/RakNet/ThreadPool.h.
Referenced by WorkerThread().
RakNet::SimpleMutex ThreadPool< InputType, OutputType >::outputQueueMutex [protected] |
Definition at line 143 of file Include/RakNet/ThreadPool.h.
Referenced by WorkerThread().
void(* ThreadPool< InputType, OutputType >::perThreadDataDestructor)(void *) [protected] |
Definition at line 146 of file Include/RakNet/ThreadPool.h.
Referenced by WorkerThread().
void*(* ThreadPool< InputType, OutputType >::perThreadDataFactory)() [protected] |
Definition at line 145 of file Include/RakNet/ThreadPool.h.
Referenced by WorkerThread().
RakNet::SignaledEvent ThreadPool< InputType, OutputType >::quitAndIncomingDataEvents [protected] |
Definition at line 178 of file Include/RakNet/ThreadPool.h.
Referenced by WorkerThread().
bool ThreadPool< InputType, OutputType >::runThreads [protected] |
Definition at line 170 of file Include/RakNet/ThreadPool.h.
Referenced by WorkerThread().
RakNet::SimpleMutex ThreadPool< InputType, OutputType >::runThreadsMutex [protected] |
Definition at line 143 of file Include/RakNet/ThreadPool.h.
Referenced by WorkerThread().
void* ThreadPool< InputType, OutputType >::tdiContext [protected] |
Definition at line 155 of file Include/RakNet/ThreadPool.h.
Referenced by WorkerThread().
ThreadDataInterface* ThreadPool< InputType, OutputType >::threadDataInterface [protected] |
Definition at line 154 of file Include/RakNet/ThreadPool.h.
Referenced by WorkerThread().
RakNet::SimpleMutex ThreadPool< InputType, OutputType >::workingThreadCountMutex [protected] |
Definition at line 143 of file Include/RakNet/ThreadPool.h.
Referenced by WorkerThread().
Copyright © 2007-2010 by The Shadowrun: Awakened Team. This work is licensed under the GNU Lesser General Public License 3.