00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "ThreadPool.h"
00028
00029
00030 #include <cstdlib>
00031 #include <ctime>
00032 #ifdef _WIN32
00033 #include <process.h>
00034 #endif
00035 #include <iostream>
00036
00037
00038 #include "include/exit_codes.h"
00039 #include "Common/util.h"
00040 #include "server/server_util.h"
00041
00042 using namespace std;
00043
00044 namespace IXE {
00045
00046 #ifndef _WIN32
00047 pthread_key_t jmpbuf_key;
00048 #endif
00049
00050 #define Notify(event) event.notify()
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00081
00082
00083 extern "C" void ThreadPool::Thread::Destroy(void *p)
00084 {
00085 # ifdef DEBUG_threads
00086 cerr << "ThreadPool::Thread::Destroy()" << endl;
00087 # endif
00088
00089
00090
00091
00092
00093
00094 ThreadPool::Thread* const t = static_cast<ThreadPool::Thread *>(p);
00095 if (t) {
00096 bool deleted;
00097 Locking(t->pool.threads,
00098 deleted = (t->pool.threads->find(t) == t->pool.threads->end()));
00099 if (!deleted) {
00100
00101
00102
00103
00104
00105 t->destroying = true;
00106 delete t;
00107 }
00108 }
00109 }
00110
00111
00133
00134
00135 TRESULT
00136 ThreadPool::Thread::Main(void* p)
00137 {
00138 # ifndef _WIN32
00139
00140
00141 ::pthread_key_create(&jmpbuf_key, 0);
00142
00143
00144
00145
00146
00147
00148 pthread_cleanup_push(Destroy, p);
00149
00150 # endif // _WIN32
00151
00152 ThreadPool::Thread* const thread = static_cast<ThreadPool::Thread*>(p);
00153 register ThreadPool& pool = thread->pool;
00154 ThreadPool::Thread::TaskDescr task((void*)0);
00155
00156 while (true) {
00157
00158 # ifdef DEBUG_threads
00159 cerr << "Thread::Main(): waiting for task" << endl;
00160 # endif
00161
00162 {
00163 LockUp taskQueue(pool.tasks._lock);
00164
00165 while (pool.tasks->empty()) {
00166 Count current_threads;
00167 Locking(pool.threads,
00168 current_threads = pool.threads->size());
00169 if (current_threads <= pool.minThreads) {
00170
00171
00172
00173
00174 Notify(pool.idle_thread);
00175 ConditionWait(pool.task_available, &pool.tasks._lock);
00176 } else {
00177
00178
00179
00180 Notify(pool.idle_thread);
00181
00182
00183
00184
00185 int result = 0;
00186 ConditionWaitInterval(pool.task_available, &pool.tasks._lock,
00187 pool.timeout * 1000, 0, result);
00188 if (result == WAIT_TIMEOUT) {
00189
00190
00191
00192
00193
00194 Locking(pool.threads,
00195 current_threads = pool.threads->size());
00196 if (current_threads > pool.minThreads) {
00197
00198
00199
00200
00201 LeaveCriticalSection(&pool.tasks._lock);
00202 delete thread;
00203 }
00204 }
00205 }
00206 }
00207
00208 # ifdef DEBUG_threads
00209 cerr << "Thread::Main(): got task" << endl;
00210 # endif
00211
00212 task = pool.tasks->front();
00213 pool.tasks->pop();
00214
00215 }
00216
00217 Locking(pool.busyThreads,
00218 ++(*pool.busyThreads));
00219
00220 # ifdef DEBUG_threads
00221 cerr << "Thread::Main(): performing task" << endl;
00222 # endif
00223
00224 thread->Run(task);
00225
00226 # ifdef DEBUG_threads
00227 cerr << "Thread::Main(): completed task" << endl;
00228 # endif
00229
00230 Locking(pool.busyThreads,
00231 --(*pool.busyThreads));
00232 }
00233
00234
00235
00236
00237
00238 # ifndef _WIN32
00239 pthread_cleanup_pop(0);
00240 # endif
00241 return 0;
00242 }
00243
00244
00249
00250
00251 ThreadPool::Thread::Thread(ThreadPool& p)
00252 : pool(p), destroying(false)
00253 { }
00254
00255
00259
00260 void
00261 ThreadPool::Thread::Start()
00262 {
00263 # ifdef DEBUG_threads
00264 cerr << "Thread::Start()" << endl;
00265 # endif
00266
00267 # ifdef _WIN32
00268 unsigned int id;
00269 thread = (HANDLE)_beginthreadex(NULL, 0, Main, (LPVOID)this, 0, &id);
00270 bool result = !thread;
00271 # else
00272 ::pthread_attr_t attr;
00273 ::pthread_attr_init(&attr);
00274 ::pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00275 int const result = ::pthread_create(&thread, &attr, Main, this);
00276 # endif
00277 if (result) {
00278 cerr << pool.process_name << "could not create thread: "
00279 << ::strerror(result) << endl;
00280 ::exit(Exit_No_Create_Thread);
00281 }
00282 }
00283
00284
00288
00289
00290
00291 ThreadPool::Thread::~Thread()
00292 {
00293 # ifdef DEBUG_threads
00294 cerr << "Thread::~Thread()" << endl;
00295 # endif
00296
00297 if (!destroying) {
00298
00299 if (thread == GetCurrentThread()) {
00300
00301
00302
00303
00304
00305
00306
00307 ExitThread(0);
00308 } else {
00309
00310
00311
00312
00313
00314 TerminateThread(thread, 0);
00315 }
00316 }
00317
00318
00319
00320 if (!(*pool.destructing)) {
00321
00322
00323
00324 Locking(pool.threads,
00325 pool.threads->erase(this));
00326 }
00327 }
00328
00329 ThreadPool::ThreadPool(char const* process_name,
00330 Thread const* prototype,
00331 int min_threads, int max_threads, int timeout) :
00332 process_name(process_name), minThreads(min_threads),
00333 maxThreads(max_threads),
00334 destructing(false), timeout(timeout),
00335 busyThreads(0)
00336 {
00337 init(prototype);
00338 }
00339
00340 ThreadPool::ThreadPool(char const* process_name,
00341 int min_threads, int max_threads, int timeout) :
00342 process_name(process_name), minThreads(min_threads),
00343 maxThreads(max_threads),
00344 destructing(false), timeout(timeout),
00345 busyThreads(0)
00346 { }
00347
00348 void ThreadPool::init(Thread const* prototype)
00349 {
00350 Locking(threads,
00351 if (threads->empty()) {
00352 ((Thread*)prototype)->Start();
00353 threads->insert((Thread*)prototype);
00354 for (Count i = 1; i < minThreads; ++i)
00355 threads->insert(prototype->clone(*this));
00356 });
00357 }
00358
00359
00363
00364
00365 ThreadPool::~ThreadPool()
00366 {
00367
00368
00369
00370
00371
00372
00373 Locking(destructing,
00374 destructing = true);
00375
00376 Locking(threads, {
00377 for (ThreadSet::iterator t = threads->begin(); t != threads->end(); ++t)
00378 delete *t;
00379 });
00380 }
00381
00382
00389
00390
00391 #define MAX_PENDING 1
00392
00393 void ThreadPool::AddTask(Thread::TaskDescr task)
00394 {
00395 # ifdef DEBUG_threads
00396 cerr << "ThreadPool::AddTask()" << endl;
00397 # endif
00398
00399
00400
00401 Locking(tasks, {
00402 tasks->push(task);
00403 Notify(task_available);
00404 });
00405
00406 Locking(threads, {
00407
00408 Count accepted = (Count)busyThreads + tasks->size();
00409
00410 if (accepted >= maxThreads + MAX_PENDING) {
00411
00412
00413
00414
00415 ConditionWait(idle_thread, &threads._lock);
00416 } else if (busyThreads == threads->size()
00417 && threads->size() < maxThreads) {
00418
00419
00420
00421
00422
00423 threads->insert((*threads->begin())->clone(*this));
00424 }
00425 });
00426 }
00427
00428 }