Shadowrun: Awakened 29 September 2011 - Build 871
Public Member Functions | Protected Attributes | Friends
ThreadPool< InputType, OutputType > Struct Template Reference

#include <ThreadPool.h>

Inheritance diagram for ThreadPool< InputType, OutputType >:

List of all members.

Public Member Functions

void AddInput (OutputType(*workerThreadCallback)(InputType, bool *returnOutput, void *perThreadData), InputType inputData)
void AddOutput (OutputType outputData)
void Clear (void)
 Clears internal buffers.
void ClearInput (void)
 Removes all items from the input queue.
void ClearOutput (void)
 Removes all items from the output queue.
InputType GetInputAtIndex (unsigned index)
 Get the input at a specified index.
OutputType GetOutput (void)
OutputType GetOutputAtIndex (unsigned index)
 Get the output at a specified index.
bool HasInput (void)
bool HasInputFast (void)
bool HasOutput (void)
bool HasOutputFast (void)
unsigned InputSize (void)
 Length of the input queue.
bool IsWorking (void)
 Are any of the threads working, or is input or output available?
void LockInput (void)
void LockOutput (void)
int NumThreadsWorking (void)
 The number of currently active threads.
unsigned OutputSize (void)
 Length of the output queue.
bool Pause (void)
void RemoveInputAtIndex (unsigned index)
 Remove input from a specific index. This does NOT do memory deallocation - it only removes the item from the queue.
void RemoveOutputAtIndex (unsigned index)
 Remove output from a specific index. This does NOT do memory deallocation - it only removes the item from the queue.
void Resume (void)
void SetThreadDataInterface (ThreadDataInterface *tdi, void *context)
bool StartThreads (int numThreads, int stackSize, void *(*_perThreadInit)()=0, void(*_perThreadDeinit)(void *)=0)
void StopThreads (void)
 Stops all threads.
 ThreadPool ()
void UnlockInput (void)
 Unlock the input buffer after you are done with the functions InputSize, GetInputAtIndex, and RemoveInputAtIndex.
void UnlockOutput (void)
 Unlock the output buffer after you are done with the functions OutputSize, GetOutputAtIndex, and RemoveOutputAtIndex.
bool WasStarted (void)
 Did we call Start?
 ~ThreadPool ()

Protected Attributes

DataStructures::Queue
< OutputType(*)(InputType,
bool *, void *)> 
inputFunctionQueue
DataStructures::Queue< InputType > inputQueue
RakNet::SimpleMutex inputQueueMutex
int numThreadsRunning
RakNet::SimpleMutex numThreadsRunningMutex
int numThreadsWorking
DataStructures::Queue< OutputType > outputQueue
RakNet::SimpleMutex outputQueueMutex
void(* perThreadDataDestructor )(void *)
void *(* perThreadDataFactory )()
RakNet::SignaledEvent quitAndIncomingDataEvents
bool runThreads
RakNet::SimpleMutex runThreadsMutex
void * tdiContext
ThreadDataInterfacethreadDataInterface
RakNet::SimpleMutex workingThreadCountMutex

Friends

template<class ThreadInputType , class ThreadOutputType >
void * WorkerThread (void *arguments)

Detailed Description

template<class InputType, class OutputType>
struct ThreadPool< InputType, OutputType >

A simple class to create worker threads that processes a queue of functions with data. This class does not allocate or deallocate memory. It is up to the user to handle memory management. InputType and OutputType are stored directly in a queue. For large structures, if you plan to delete from the middle of the queue, you might wish to store pointers rather than the structures themselves so the array can shift efficiently.

Definition at line 29 of file Include/RakNet/ThreadPool.h.


Constructor & Destructor Documentation

template<class InputType , class OutputType >
ThreadPool< InputType, OutputType >::ThreadPool ( )
template<class InputType , class OutputType >
ThreadPool< InputType, OutputType >::~ThreadPool ( )

Definition at line 304 of file Include/RakNet/ThreadPool.h.

{
    StopThreads();
    Clear();
}

Member Function Documentation

template<class InputType, class OutputType>
void ThreadPool< InputType, OutputType >::AddInput ( OutputType(*)(InputType, bool *returnOutput, void *perThreadData)  workerThreadCallback,
InputType  inputData 
)

Adds a function to a queue with data to pass to that function. This function will be called from the thread Memory management is your responsibility! This class does not allocate or deallocate memory. The best way to deallocate inputData is in userCallback. If you call EndThreads such that callbacks were not called, you can iterate through the inputQueue and deallocate all pending input data there The best way to deallocate output is as it is returned to you from GetOutput. Similarly, if you end the threads such that not all output was returned, you can iterate through outputQueue and deallocate it there.

Parameters:
[in]workerThreadCallbackThe function to call from the thread
[in]inputDataThe parameter to pass to userCallback

Definition at line 408 of file Include/RakNet/ThreadPool.h.

References _FILE_AND_LINE_.

template<class InputType , class OutputType>
void ThreadPool< InputType, OutputType >::AddOutput ( OutputType  outputData)

Adds to the output queue Use it if you want to inject output into the same queue that the system uses. Normally you would not use this. Consider it a convenience function.

Parameters:
[in]outputDataThe output to inject

Definition at line 418 of file Include/RakNet/ThreadPool.h.

References _FILE_AND_LINE_.

template<class InputType , class OutputType >
void ThreadPool< InputType, OutputType >::Clear ( void  )
template<class InputType , class OutputType >
void ThreadPool< InputType, OutputType >::ClearInput ( void  )
template<class InputType , class OutputType >
void ThreadPool< InputType, OutputType >::ClearOutput ( void  )

Definition at line 544 of file Include/RakNet/ThreadPool.h.

References _FILE_AND_LINE_.

template<class InputType , class OutputType >
InputType ThreadPool< InputType, OutputType >::GetInputAtIndex ( unsigned  index)

Definition at line 501 of file Include/RakNet/ThreadPool.h.

{
    return inputQueue[index];
}
template<class InputType , class OutputType >
OutputType ThreadPool< InputType, OutputType >::GetOutput ( void  )

Gets the output of a call to userCallback HasOutput must return true before you call this function. Otherwise it will assert.

Returns:
The output of userCallback. If you have different output signatures, it is up to you to encode the data to indicate this

Definition at line 453 of file Include/RakNet/ThreadPool.h.

{
    // Real output check
    OutputType output;
    outputQueueMutex.Lock();
    output=outputQueue.Pop();
    outputQueueMutex.Unlock();
    return output;
}
template<class InputType , class OutputType >
OutputType ThreadPool< InputType, OutputType >::GetOutputAtIndex ( unsigned  index)

Definition at line 527 of file Include/RakNet/ThreadPool.h.

{
    return outputQueue[index];
}
template<class InputType , class OutputType >
bool ThreadPool< InputType, OutputType >::HasInput ( void  )

Returns true if input from GetInput is waiting.

Returns:
true if input is waiting, false otherwise

Definition at line 444 of file Include/RakNet/ThreadPool.h.

{
    bool res;
    inputQueueMutex.Lock();
    res=inputQueue.IsEmpty()==false;
    inputQueueMutex.Unlock();
    return res;
}
template<class InputType , class OutputType >
bool ThreadPool< InputType, OutputType >::HasInputFast ( void  )

Inaccurate but fast version of HasInput. If this returns true, you should still check HasInput for the real value.

Returns:
true if input is probably waiting, false otherwise

Definition at line 439 of file Include/RakNet/ThreadPool.h.

{
    return inputQueue.IsEmpty()==false;
}
template<class InputType , class OutputType >
bool ThreadPool< InputType, OutputType >::HasOutput ( void  )

Returns true if output from GetOutput is waiting.

Returns:
true if output is waiting, false otherwise

Definition at line 430 of file Include/RakNet/ThreadPool.h.

{
    bool res;
    outputQueueMutex.Lock();
    res=outputQueue.IsEmpty()==false;
    outputQueueMutex.Unlock();
    return res;
}
template<class InputType , class OutputType >
bool ThreadPool< InputType, OutputType >::HasOutputFast ( void  )

Inaccurate but fast version of HasOutput. If this returns true, you should still check HasOutput for the real value.

Returns:
true if output is probably waiting, false otherwise

Definition at line 425 of file Include/RakNet/ThreadPool.h.

{
    return outputQueue.IsEmpty()==false;
}
template<class InputType , class OutputType >
unsigned ThreadPool< InputType, OutputType >::InputSize ( void  )

Definition at line 496 of file Include/RakNet/ThreadPool.h.

{
    return inputQueue.Size();
}
template<class InputType , class OutputType >
bool ThreadPool< InputType, OutputType >::IsWorking ( void  )

Definition at line 549 of file Include/RakNet/ThreadPool.h.

{
    bool isWorking;
//  workingThreadCountMutex.Lock();
//  isWorking=numThreadsWorking!=0;
//  workingThreadCountMutex.Unlock();

//  if (isWorking)
//      return true;

    // Bug fix: Originally the order of these two was reversed.
    // It's possible with the thread timing that working could have been false, then it picks up the data in the other thread, then it checks
    // here and sees there is no data.  So it thinks the thread is not working when it was.
    if (HasOutputFast() && HasOutput())
        return true;

    if (HasInputFast() && HasInput())
        return true;

    // Need to check is working again, in case the thread was between the first and second checks
    workingThreadCountMutex.Lock();
    isWorking=numThreadsWorking!=0;
    workingThreadCountMutex.Unlock();

    return isWorking;
}
template<class InputType , class OutputType >
void ThreadPool< InputType, OutputType >::LockInput ( void  )

Lock the input buffer before calling the functions InputSize, InputAtIndex, and RemoveInputAtIndex It is only necessary to lock the input or output while the threads are running

Definition at line 486 of file Include/RakNet/ThreadPool.h.

template<class InputType , class OutputType >
void ThreadPool< InputType, OutputType >::LockOutput ( void  )

Lock the output buffer before calling the functions OutputSize, OutputAtIndex, and RemoveOutputAtIndex It is only necessary to lock the input or output while the threads are running

Definition at line 512 of file Include/RakNet/ThreadPool.h.

template<class InputType , class OutputType >
int ThreadPool< InputType, OutputType >::NumThreadsWorking ( void  )

Definition at line 577 of file Include/RakNet/ThreadPool.h.

{
    return numThreadsWorking;
}
template<class InputType , class OutputType >
unsigned ThreadPool< InputType, OutputType >::OutputSize ( void  )

Definition at line 522 of file Include/RakNet/ThreadPool.h.

{
    return outputQueue.Size();
}
template<class InputType , class OutputType >
bool ThreadPool< InputType, OutputType >::Pause ( void  )

Definition at line 592 of file Include/RakNet/ThreadPool.h.

References RakSleep().

{
    if (WasStarted()==false)
        return false;

    workingThreadCountMutex.Lock();
    while (numThreadsWorking>0)
    {
        RakSleep(30);
    }
    return true;
}
template<class InputType , class OutputType >
void ThreadPool< InputType, OutputType >::RemoveInputAtIndex ( unsigned  index)
template<class InputType , class OutputType >
void ThreadPool< InputType, OutputType >::RemoveOutputAtIndex ( unsigned  index)

Definition at line 532 of file Include/RakNet/ThreadPool.h.

template<class InputType , class OutputType >
void ThreadPool< InputType, OutputType >::Resume ( void  )

Definition at line 605 of file Include/RakNet/ThreadPool.h.

template<class InputType , class OutputType >
void ThreadPool< InputType, OutputType >::SetThreadDataInterface ( ThreadDataInterface tdi,
void *  context 
)

Definition at line 368 of file Include/RakNet/ThreadPool.h.

{
    threadDataInterface=tdi;
    tdiContext=context;
}
template<class InputType , class OutputType >
bool ThreadPool< InputType, OutputType >::StartThreads ( int  numThreads,
int  stackSize,
void *(*)()  _perThreadInit = 0,
void(*)(void *)  _perThreadDeinit = 0 
)

Start the specified number of threads.

Parameters:
[in]numThreadsThe number of threads to start
[in]stackSize0 for default (except on consoles).
[in]_perThreadInitUser callback to return data stored per thread. Pass 0 if not needed.
[in]_perThreadDeinitUser callback to destroy data stored per thread, created by _perThreadInit. Pass 0 if not needed.
Returns:
True on success, false on failure.

Definition at line 310 of file Include/RakNet/ThreadPool.h.

References RakNet::RakThread::Create(), and RakSleep().

{
    (void) stackSize;





    runThreadsMutex.Lock();
    if (runThreads==true)
    {
        // Already running
        runThreadsMutex.Unlock();
        return false;
    }
    runThreadsMutex.Unlock();

    quitAndIncomingDataEvents.InitEvent();

    perThreadDataFactory=_perThreadDataFactory;
    perThreadDataDestructor=_perThreadDataDestructor;

    runThreadsMutex.Lock();
    runThreads=true;
    runThreadsMutex.Unlock();

    numThreadsWorking=0;
    unsigned threadId = 0;
    (void) threadId;
    int i;
    for (i=0; i < numThreads; i++)
    {
        int errorCode;



        errorCode = RakNet::RakThread::Create(WorkerThread<InputType, OutputType>, this);

        if (errorCode!=0)
        {
            StopThreads();
            return false;
        }
    }
    // Wait for number of threads running to increase to numThreads
    bool done=false;
    while (done==false)
    {
        RakSleep(50);
        numThreadsRunningMutex.Lock();
        if (numThreadsRunning==numThreads)
            done=true;
        numThreadsRunningMutex.Unlock();
    }

    return true;
}
template<class InputType , class OutputType >
void ThreadPool< InputType, OutputType >::StopThreads ( void  )

Definition at line 374 of file Include/RakNet/ThreadPool.h.

References RakSleep().

{
    runThreadsMutex.Lock();
    if (runThreads==false)
    {
        runThreadsMutex.Unlock();
        return;
    }

    runThreads=false;
    runThreadsMutex.Unlock();

    // Wait for number of threads running to decrease to 0
    bool done=false;
    while (done==false)
    {
        quitAndIncomingDataEvents.SetEvent();

        RakSleep(50);
        numThreadsRunningMutex.Lock();
        if (numThreadsRunning==0)
            done=true;
        numThreadsRunningMutex.Unlock();
    }

    quitAndIncomingDataEvents.CloseEvent();






}
template<class InputType , class OutputType >
void ThreadPool< InputType, OutputType >::UnlockInput ( void  )

Definition at line 491 of file Include/RakNet/ThreadPool.h.

template<class InputType , class OutputType >
void ThreadPool< InputType, OutputType >::UnlockOutput ( void  )

Definition at line 517 of file Include/RakNet/ThreadPool.h.

template<class InputType , class OutputType >
bool ThreadPool< InputType, OutputType >::WasStarted ( void  )

Definition at line 583 of file Include/RakNet/ThreadPool.h.

{
    bool b;
    runThreadsMutex.Lock();
    b = runThreads;
    runThreadsMutex.Unlock();
    return b;
}

Friends And Related Function Documentation

template<class InputType, class OutputType>
template<class ThreadInputType , class ThreadOutputType >
void* WorkerThread ( void *  arguments) [friend]

Definition at line 199 of file Include/RakNet/ThreadPool.h.

{



    ThreadPool<ThreadInputType, ThreadOutputType> *threadPool = (ThreadPool<ThreadInputType, ThreadOutputType>*) arguments;


    bool returnOutput;
    ThreadOutputType (*userCallback)(ThreadInputType, bool *, void*);
    ThreadInputType inputData;
    ThreadOutputType callbackOutput;

    userCallback=0;

    void *perThreadData;
    if (threadPool->perThreadDataFactory)
        perThreadData=threadPool->perThreadDataFactory();
    else if (threadPool->threadDataInterface)
        perThreadData=threadPool->threadDataInterface->PerThreadFactory(threadPool->tdiContext);
    else
        perThreadData=0;

    // Increase numThreadsRunning
    threadPool->numThreadsRunningMutex.Lock();
    ++threadPool->numThreadsRunning;
    threadPool->numThreadsRunningMutex.Unlock();

    while (1)
    {
#ifdef _WIN32
        if (userCallback==0)
        {
            threadPool->quitAndIncomingDataEvents.WaitOnEvent(INFINITE);
        }       
#endif

        threadPool->runThreadsMutex.Lock();
        if (threadPool->runThreads==false)
        {
            threadPool->runThreadsMutex.Unlock();
            break;
        }
        threadPool->runThreadsMutex.Unlock();

        threadPool->workingThreadCountMutex.Lock();
        ++threadPool->numThreadsWorking;
        threadPool->workingThreadCountMutex.Unlock();

        // Read input data
        userCallback=0;
        threadPool->inputQueueMutex.Lock();
        if (threadPool->inputFunctionQueue.Size())
        {
            userCallback=threadPool->inputFunctionQueue.Pop();
            inputData=threadPool->inputQueue.Pop();
        }
        threadPool->inputQueueMutex.Unlock();

        if (userCallback)
        {
            callbackOutput=userCallback(inputData, &returnOutput,perThreadData);
            if (returnOutput)
            {
                threadPool->outputQueueMutex.Lock();
                threadPool->outputQueue.Push(callbackOutput, _FILE_AND_LINE_ );
                threadPool->outputQueueMutex.Unlock();
            }           
        }

        threadPool->workingThreadCountMutex.Lock();
        --threadPool->numThreadsWorking;
        threadPool->workingThreadCountMutex.Unlock();
    }

    // Decrease numThreadsRunning
    threadPool->numThreadsRunningMutex.Lock();
    --threadPool->numThreadsRunning;
    threadPool->numThreadsRunningMutex.Unlock();
    
    if (threadPool->perThreadDataDestructor)
        threadPool->perThreadDataDestructor(perThreadData);
    else if (threadPool->threadDataInterface)
        threadPool->threadDataInterface->PerThreadDestructor(perThreadData, threadPool->tdiContext);

    return 0;
}

Member Data Documentation

template<class InputType, class OutputType>
DataStructures::Queue<OutputType (*)(InputType, bool *, void*)> ThreadPool< InputType, OutputType >::inputFunctionQueue [protected]

Definition at line 150 of file Include/RakNet/ThreadPool.h.

Referenced by WorkerThread().

template<class InputType, class OutputType>
DataStructures::Queue<InputType> ThreadPool< InputType, OutputType >::inputQueue [protected]

Definition at line 151 of file Include/RakNet/ThreadPool.h.

Referenced by WorkerThread().

template<class InputType, class OutputType>
RakNet::SimpleMutex ThreadPool< InputType, OutputType >::inputQueueMutex [protected]

Definition at line 143 of file Include/RakNet/ThreadPool.h.

Referenced by WorkerThread().

template<class InputType, class OutputType>
int ThreadPool< InputType, OutputType >::numThreadsRunning [protected]

Definition at line 172 of file Include/RakNet/ThreadPool.h.

Referenced by WorkerThread().

template<class InputType, class OutputType>
RakNet::SimpleMutex ThreadPool< InputType, OutputType >::numThreadsRunningMutex [protected]

Definition at line 176 of file Include/RakNet/ThreadPool.h.

Referenced by WorkerThread().

template<class InputType, class OutputType>
int ThreadPool< InputType, OutputType >::numThreadsWorking [protected]

Definition at line 174 of file Include/RakNet/ThreadPool.h.

Referenced by WorkerThread().

template<class InputType, class OutputType>
DataStructures::Queue<OutputType> ThreadPool< InputType, OutputType >::outputQueue [protected]

Definition at line 152 of file Include/RakNet/ThreadPool.h.

Referenced by WorkerThread().

template<class InputType, class OutputType>
RakNet::SimpleMutex ThreadPool< InputType, OutputType >::outputQueueMutex [protected]

Definition at line 143 of file Include/RakNet/ThreadPool.h.

Referenced by WorkerThread().

template<class InputType, class OutputType>
void(* ThreadPool< InputType, OutputType >::perThreadDataDestructor)(void *) [protected]

Definition at line 146 of file Include/RakNet/ThreadPool.h.

Referenced by WorkerThread().

template<class InputType, class OutputType>
void*(* ThreadPool< InputType, OutputType >::perThreadDataFactory)() [protected]

Definition at line 145 of file Include/RakNet/ThreadPool.h.

Referenced by WorkerThread().

template<class InputType, class OutputType>
RakNet::SignaledEvent ThreadPool< InputType, OutputType >::quitAndIncomingDataEvents [protected]

Definition at line 178 of file Include/RakNet/ThreadPool.h.

Referenced by WorkerThread().

template<class InputType, class OutputType>
bool ThreadPool< InputType, OutputType >::runThreads [protected]

Definition at line 170 of file Include/RakNet/ThreadPool.h.

Referenced by WorkerThread().

template<class InputType, class OutputType>
RakNet::SimpleMutex ThreadPool< InputType, OutputType >::runThreadsMutex [protected]

Definition at line 143 of file Include/RakNet/ThreadPool.h.

Referenced by WorkerThread().

template<class InputType, class OutputType>
void* ThreadPool< InputType, OutputType >::tdiContext [protected]

Definition at line 155 of file Include/RakNet/ThreadPool.h.

Referenced by WorkerThread().

template<class InputType, class OutputType>
ThreadDataInterface* ThreadPool< InputType, OutputType >::threadDataInterface [protected]

Definition at line 154 of file Include/RakNet/ThreadPool.h.

Referenced by WorkerThread().

template<class InputType, class OutputType>
RakNet::SimpleMutex ThreadPool< InputType, OutputType >::workingThreadCountMutex [protected]

Definition at line 143 of file Include/RakNet/ThreadPool.h.

Referenced by WorkerThread().


The documentation for this struct was generated from the following file:

Copyright © 2007-2010 by The Shadowrun: Awakened Team. This work is licensed under the GNU Lesser General Public License 3.

GNU Lesser General Public License 3 Sourceforge.net