Shadowrun: Awakened 29 September 2011 - Build 871
ThreadPool.cpp
Go to the documentation of this file.
00001 #include "ThreadPool.h"
00002 
00003 namespace Support
00004 {
00008     ThreadPool::ThreadPoolWorker::ThreadPoolWorker(ThreadPool& threadPool) : _threadPool(threadPool) 
00009     {
00010         _waitTimeActive.sec = WorkerActiveWaitTimeInSeconds;
00011         _waitTimeIdle.sec = WorkerIdleWaitTimeInSeconds;
00012     }
00013 
00018     void ThreadPool::ThreadPoolWorker::operator()()
00019     {
00020         while(true)
00021         {
00022             //how long we wait after the task depends on if we have another
00023             //even if we have another, it's good to sleep to surrender the CPU from time to time
00024             if(_threadPool.consumeTask())
00025                 boost::thread::sleep(_waitTimeActive);
00026             else
00027                 boost::thread::sleep(_waitTimeIdle);
00028         }
00029     }
00030 
00035     void ThreadPool::ThreadPoolManager::operator()()
00036     {
00037         while(true)
00038         {
00039             _threadPool.checkGrowPool();
00040             boost::thread::sleep(_waitTime);
00041         }
00042     }
00043 
00049     bool ThreadPool::SafeDequeue(ThreadPoolTaskBase** item)
00050     {
00051         boost::lock_guard<boost::mutex> guard(_taskMutex);
00052 
00053         if(_tasks.size() == 0)
00054         {
00055             *item = NULL;
00056             return false;
00057         }
00058         else
00059         {
00060             *item = _tasks.front();
00061             _tasks.pop_front();
00062             return true;
00063         }
00064     }
00065 
00069     void ThreadPool::createWorkerThread()
00070     {
00071         ThreadPoolWorker worker(*this);
00072         boost::thread* newThread = new boost::thread(worker);
00073         //add thread provides some thread-safety
00074         _threads.add_thread(newThread);
00075     }
00076 
00080     void ThreadPool::createManagerThread()
00081     {
00082         ThreadPoolManager manager(*this);
00083         boost::thread* newThread = new boost::thread(manager);
00084         _threads.add_thread(newThread);
00085     }
00086 
00091     void ThreadPool::checkGrowPool()
00092     {
00093         if((taskCount() > _newThreadThreshold) && (_threads.size() < _maxThreadCount))
00094             createWorkerThread();
00095     }
00096     
00103     bool ThreadPool::consumeTask()
00104     {
00105         ThreadPoolTaskBase* task;
00106         if(SafeDequeue(&task))
00107         {
00108             task->execute();
00109             if(task->getDelete())
00110                 delete task;
00111             return true;
00112         }
00113         else
00114             return false;
00115     }
00116 
00117     //max threads is also +1 to accomodate a manager thread that is created to spawn other threads
00118 
00122     ThreadPool::ThreadPool() : 
00123         _maxThreadCount(DefaultMaxThreadsInThreadPool + 1), 
00124         _newThreadThreshold(DefaultTaskCountThresholdForNewThread) 
00125     {
00126         createManagerThread();
00127         createWorkerThread();
00128     }
00129 
00134     ThreadPool::ThreadPool(size_t numThreads)
00135     {
00136         for(size_t i = 0; i < numThreads; ++i)
00137             createWorkerThread();
00138     }
00139 
00144     ThreadPool::ThreadPool(size_t maxThreads, size_t growThreshold) :
00145         _maxThreadCount(maxThreads + 1), 
00146         _newThreadThreshold(growThreshold) 
00147     {
00148         createManagerThread();
00149         createWorkerThread();
00150     }
00151 
00155     ThreadPool::~ThreadPool()
00156     {
00157         stop();
00158     }
00159 
00163     void ThreadPool::stop()
00164     {
00165         //cleans up both the remaining tasks and the thread group
00166         clearTasks();
00167         _threads.interrupt_all();
00168         _threads.join_all();
00169     }
00170 
00174     void ThreadPool::queueTask(ThreadPoolTaskBase* task)
00175     {
00176         boost::lock_guard<boost::mutex> guard(_taskMutex);
00177         _tasks.push_back(task);
00178     }
00179 
00183     size_t ThreadPool::taskCount()
00184     {
00185         boost::lock_guard<boost::mutex> guard(_taskMutex);
00186         return _tasks.size();
00187     }
00188 
00192     void ThreadPool::clearTasks()
00193     {
00194         boost::lock_guard<boost::mutex> guard(_taskMutex);
00195 
00196         for(std::list<ThreadPoolTaskBase*>::iterator it=_tasks.begin(),end=_tasks.end();
00197             it!=end;
00198             ++it)
00199         {
00200             if((*it)->getDelete())
00201                 delete *it;
00202         }
00203 
00204         _tasks.clear();
00205     }
00206 }

Copyright © 2007-2010 by The Shadowrun: Awakened Team. This work is licensed under the GNU Lesser General Public License 3.

GNU Lesser General Public License 3 Sourceforge.net