![]() |
Shadowrun: Awakened 29 September 2011 - Build 871
|
00001 #include "ThreadPool.h" 00002 00003 namespace Support 00004 { 00008 ThreadPool::ThreadPoolWorker::ThreadPoolWorker(ThreadPool& threadPool) : _threadPool(threadPool) 00009 { 00010 _waitTimeActive.sec = WorkerActiveWaitTimeInSeconds; 00011 _waitTimeIdle.sec = WorkerIdleWaitTimeInSeconds; 00012 } 00013 00018 void ThreadPool::ThreadPoolWorker::operator()() 00019 { 00020 while(true) 00021 { 00022 //how long we wait after the task depends on if we have another 00023 //even if we have another, it's good to sleep to surrender the CPU from time to time 00024 if(_threadPool.consumeTask()) 00025 boost::thread::sleep(_waitTimeActive); 00026 else 00027 boost::thread::sleep(_waitTimeIdle); 00028 } 00029 } 00030 00035 void ThreadPool::ThreadPoolManager::operator()() 00036 { 00037 while(true) 00038 { 00039 _threadPool.checkGrowPool(); 00040 boost::thread::sleep(_waitTime); 00041 } 00042 } 00043 00049 bool ThreadPool::SafeDequeue(ThreadPoolTaskBase** item) 00050 { 00051 boost::lock_guard<boost::mutex> guard(_taskMutex); 00052 00053 if(_tasks.size() == 0) 00054 { 00055 *item = NULL; 00056 return false; 00057 } 00058 else 00059 { 00060 *item = _tasks.front(); 00061 _tasks.pop_front(); 00062 return true; 00063 } 00064 } 00065 00069 void ThreadPool::createWorkerThread() 00070 { 00071 ThreadPoolWorker worker(*this); 00072 boost::thread* newThread = new boost::thread(worker); 00073 //add thread provides some thread-safety 00074 _threads.add_thread(newThread); 00075 } 00076 00080 void ThreadPool::createManagerThread() 00081 { 00082 ThreadPoolManager manager(*this); 00083 boost::thread* newThread = new boost::thread(manager); 00084 _threads.add_thread(newThread); 00085 } 00086 00091 void ThreadPool::checkGrowPool() 00092 { 00093 if((taskCount() > _newThreadThreshold) && (_threads.size() < _maxThreadCount)) 00094 createWorkerThread(); 00095 } 00096 00103 bool ThreadPool::consumeTask() 00104 { 00105 ThreadPoolTaskBase* task; 00106 if(SafeDequeue(&task)) 00107 { 00108 task->execute(); 00109 if(task->getDelete()) 00110 delete task; 00111 return true; 00112 } 00113 else 00114 return false; 00115 } 00116 00117 //max threads is also +1 to accomodate a manager thread that is created to spawn other threads 00118 00122 ThreadPool::ThreadPool() : 00123 _maxThreadCount(DefaultMaxThreadsInThreadPool + 1), 00124 _newThreadThreshold(DefaultTaskCountThresholdForNewThread) 00125 { 00126 createManagerThread(); 00127 createWorkerThread(); 00128 } 00129 00134 ThreadPool::ThreadPool(size_t numThreads) 00135 { 00136 for(size_t i = 0; i < numThreads; ++i) 00137 createWorkerThread(); 00138 } 00139 00144 ThreadPool::ThreadPool(size_t maxThreads, size_t growThreshold) : 00145 _maxThreadCount(maxThreads + 1), 00146 _newThreadThreshold(growThreshold) 00147 { 00148 createManagerThread(); 00149 createWorkerThread(); 00150 } 00151 00155 ThreadPool::~ThreadPool() 00156 { 00157 stop(); 00158 } 00159 00163 void ThreadPool::stop() 00164 { 00165 //cleans up both the remaining tasks and the thread group 00166 clearTasks(); 00167 _threads.interrupt_all(); 00168 _threads.join_all(); 00169 } 00170 00174 void ThreadPool::queueTask(ThreadPoolTaskBase* task) 00175 { 00176 boost::lock_guard<boost::mutex> guard(_taskMutex); 00177 _tasks.push_back(task); 00178 } 00179 00183 size_t ThreadPool::taskCount() 00184 { 00185 boost::lock_guard<boost::mutex> guard(_taskMutex); 00186 return _tasks.size(); 00187 } 00188 00192 void ThreadPool::clearTasks() 00193 { 00194 boost::lock_guard<boost::mutex> guard(_taskMutex); 00195 00196 for(std::list<ThreadPoolTaskBase*>::iterator it=_tasks.begin(),end=_tasks.end(); 00197 it!=end; 00198 ++it) 00199 { 00200 if((*it)->getDelete()) 00201 delete *it; 00202 } 00203 00204 _tasks.clear(); 00205 } 00206 }
Copyright © 2007-2010 by The Shadowrun: Awakened Team. This work is licensed under the GNU Lesser General Public License 3.