00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include "CArchMultithreadPosix.h"
00020 #include "CArch.h"
00021 #include "XArch.h"
00022 #include <signal.h>
00023 #if TIME_WITH_SYS_TIME
00024 # include <sys/time.h>
00025 # include <time.h>
00026 #else
00027 # if HAVE_SYS_TIME_H
00028 # include <sys/time.h>
00029 # else
00030 # include <time.h>
00031 # endif
00032 #endif
00033 #include <cerrno>
00034
00035 #define SIGWAKEUP SIGUSR1
00036
00037 #if !HAVE_PTHREAD_SIGNAL
00038
00039
00040
00041 # define pthread_sigmask sigprocmask
00042 # define pthread_kill(tid_, sig_) kill(0, (sig_))
00043 # define sigwait(set_, sig_)
00044 # undef HAVE_POSIX_SIGWAIT
00045 # define HAVE_POSIX_SIGWAIT 1
00046 #endif
00047
00048 static
00049 void
00050 setSignalSet(sigset_t* sigset)
00051 {
00052 sigemptyset(sigset);
00053 sigaddset(sigset, SIGHUP);
00054 sigaddset(sigset, SIGINT);
00055 sigaddset(sigset, SIGTERM);
00056 sigaddset(sigset, SIGUSR2);
00057 }
00058
00059
00060
00061
00062
00063 class CArchThreadImpl {
00064 public:
00065 CArchThreadImpl();
00066
00067 public:
00068 int m_refCount;
00069 IArchMultithread::ThreadID m_id;
00070 pthread_t m_thread;
00071 IArchMultithread::ThreadFunc m_func;
00072 void* m_userData;
00073 bool m_cancel;
00074 bool m_cancelling;
00075 bool m_exited;
00076 void* m_result;
00077 void* m_networkData;
00078 };
00079
00080 CArchThreadImpl::CArchThreadImpl() :
00081 m_refCount(1),
00082 m_id(0),
00083 m_func(NULL),
00084 m_userData(NULL),
00085 m_cancel(false),
00086 m_cancelling(false),
00087 m_exited(false),
00088 m_result(NULL),
00089 m_networkData(NULL)
00090 {
00091
00092 }
00093
00094
00095
00096
00097
00098
00099 CArchMultithreadPosix* CArchMultithreadPosix::s_instance = NULL;
00100
00101 CArchMultithreadPosix::CArchMultithreadPosix() :
00102 m_newThreadCalled(false),
00103 m_nextID(0)
00104 {
00105 assert(s_instance == NULL);
00106
00107 s_instance = this;
00108
00109
00110 for (size_t i = 0; i < kNUM_SIGNALS; ++i) {
00111 m_signalFunc[i] = NULL;
00112 m_signalUserData[i] = NULL;
00113 }
00114
00115
00116 m_threadMutex = newMutex();
00117
00118
00119
00120 m_mainThread = new CArchThreadImpl;
00121 m_mainThread->m_thread = pthread_self();
00122 insert(m_mainThread);
00123
00124
00125
00126
00127
00128
00129 struct sigaction act;
00130 sigemptyset(&act.sa_mask);
00131 # if defined(SA_INTERRUPT)
00132 act.sa_flags = SA_INTERRUPT;
00133 # else
00134 act.sa_flags = 0;
00135 # endif
00136 act.sa_handler = &threadCancel;
00137 sigaction(SIGWAKEUP, &act, NULL);
00138
00139
00140
00141 sigset_t sigset;
00142 sigemptyset(&sigset);
00143 sigaddset(&sigset, SIGWAKEUP);
00144 pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
00145 sigemptyset(&sigset);
00146 sigaddset(&sigset, SIGPIPE);
00147 pthread_sigmask(SIG_BLOCK, &sigset, NULL);
00148 }
00149
00150 CArchMultithreadPosix::~CArchMultithreadPosix()
00151 {
00152 assert(s_instance != NULL);
00153
00154 closeMutex(m_threadMutex);
00155 s_instance = NULL;
00156 }
00157
00158 void
00159 CArchMultithreadPosix::setNetworkDataForCurrentThread(void* data)
00160 {
00161 lockMutex(m_threadMutex);
00162 CArchThreadImpl* thread = find(pthread_self());
00163 thread->m_networkData = data;
00164 unlockMutex(m_threadMutex);
00165 }
00166
00167 void*
00168 CArchMultithreadPosix::getNetworkDataForThread(CArchThread thread)
00169 {
00170 lockMutex(m_threadMutex);
00171 void* data = thread->m_networkData;
00172 unlockMutex(m_threadMutex);
00173 return data;
00174 }
00175
00176 CArchMultithreadPosix*
00177 CArchMultithreadPosix::getInstance()
00178 {
00179 return s_instance;
00180 }
00181
00182 CArchCond
00183 CArchMultithreadPosix::newCondVar()
00184 {
00185 CArchCondImpl* cond = new CArchCondImpl;
00186 int status = pthread_cond_init(&cond->m_cond, NULL);
00187 (void)status;
00188 assert(status == 0);
00189 return cond;
00190 }
00191
00192 void
00193 CArchMultithreadPosix::closeCondVar(CArchCond cond)
00194 {
00195 int status = pthread_cond_destroy(&cond->m_cond);
00196 (void)status;
00197 assert(status == 0);
00198 delete cond;
00199 }
00200
00201 void
00202 CArchMultithreadPosix::signalCondVar(CArchCond cond)
00203 {
00204 int status = pthread_cond_signal(&cond->m_cond);
00205 (void)status;
00206 assert(status == 0);
00207 }
00208
00209 void
00210 CArchMultithreadPosix::broadcastCondVar(CArchCond cond)
00211 {
00212 int status = pthread_cond_broadcast(&cond->m_cond);
00213 (void)status;
00214 assert(status == 0);
00215 }
00216
00217 bool
00218 CArchMultithreadPosix::waitCondVar(CArchCond cond,
00219 CArchMutex mutex, double timeout)
00220 {
00221
00222
00223
00224
00225
00226
00227
00228
00229 static const double maxCancellationLatency = 0.1;
00230 if (timeout < 0.0 || timeout > maxCancellationLatency) {
00231 timeout = maxCancellationLatency;
00232 }
00233
00234
00235 testCancelThread();
00236
00237
00238 struct timeval now;
00239 gettimeofday(&now, NULL);
00240 struct timespec finalTime;
00241 finalTime.tv_sec = now.tv_sec;
00242 finalTime.tv_nsec = now.tv_usec * 1000;
00243 long timeout_sec = (long)timeout;
00244 long timeout_nsec = (long)(1.0e+9 * (timeout - timeout_sec));
00245 finalTime.tv_sec += timeout_sec;
00246 finalTime.tv_nsec += timeout_nsec;
00247 if (finalTime.tv_nsec >= 1000000000) {
00248 finalTime.tv_nsec -= 1000000000;
00249 finalTime.tv_sec += 1;
00250 }
00251
00252
00253 int status = pthread_cond_timedwait(&cond->m_cond,
00254 &mutex->m_mutex, &finalTime);
00255
00256
00257 testCancelThread();
00258
00259 switch (status) {
00260 case 0:
00261
00262 return true;
00263
00264 case ETIMEDOUT:
00265 return false;
00266
00267 default:
00268 assert(0 && "condition variable wait error");
00269 return false;
00270 }
00271 }
00272
00273 CArchMutex
00274 CArchMultithreadPosix::newMutex()
00275 {
00276 pthread_mutexattr_t attr;
00277 int status = pthread_mutexattr_init(&attr);
00278 assert(status == 0);
00279 CArchMutexImpl* mutex = new CArchMutexImpl;
00280 status = pthread_mutex_init(&mutex->m_mutex, &attr);
00281 assert(status == 0);
00282 return mutex;
00283 }
00284
00285 void
00286 CArchMultithreadPosix::closeMutex(CArchMutex mutex)
00287 {
00288 int status = pthread_mutex_destroy(&mutex->m_mutex);
00289 (void)status;
00290 assert(status == 0);
00291 delete mutex;
00292 }
00293
00294 void
00295 CArchMultithreadPosix::lockMutex(CArchMutex mutex)
00296 {
00297 int status = pthread_mutex_lock(&mutex->m_mutex);
00298
00299 switch (status) {
00300 case 0:
00301
00302 return;
00303
00304 case EDEADLK:
00305 assert(0 && "lock already owned");
00306 break;
00307
00308 case EAGAIN:
00309 assert(0 && "too many recursive locks");
00310 break;
00311
00312 default:
00313 assert(0 && "unexpected error");
00314 break;
00315 }
00316 }
00317
00318 void
00319 CArchMultithreadPosix::unlockMutex(CArchMutex mutex)
00320 {
00321 int status = pthread_mutex_unlock(&mutex->m_mutex);
00322
00323 switch (status) {
00324 case 0:
00325
00326 return;
00327
00328 case EPERM:
00329 assert(0 && "thread doesn't own a lock");
00330 break;
00331
00332 default:
00333 assert(0 && "unexpected error");
00334 break;
00335 }
00336 }
00337
00338 CArchThread
00339 CArchMultithreadPosix::newThread(ThreadFunc func, void* data)
00340 {
00341 assert(func != NULL);
00342
00343
00344
00345
00346
00347
00348
00349
00350 if (!m_newThreadCalled) {
00351 m_newThreadCalled = true;
00352 #if HAVE_PTHREAD_SIGNAL
00353 startSignalHandler();
00354 #endif
00355 }
00356
00357 lockMutex(m_threadMutex);
00358
00359
00360 CArchThreadImpl* thread = new CArchThreadImpl;
00361 thread->m_func = func;
00362 thread->m_userData = data;
00363
00364
00365
00366 pthread_attr_t attr;
00367 int status = pthread_attr_init(&attr);
00368 if (status == 0) {
00369 status = pthread_create(&thread->m_thread, &attr,
00370 &CArchMultithreadPosix::threadFunc, thread);
00371 pthread_attr_destroy(&attr);
00372 }
00373
00374
00375 if (status != 0) {
00376
00377 delete thread;
00378 thread = NULL;
00379 }
00380 else {
00381
00382 insert(thread);
00383
00384
00385 refThread(thread);
00386 }
00387
00388
00389 unlockMutex(m_threadMutex);
00390
00391 return thread;
00392 }
00393
00394 CArchThread
00395 CArchMultithreadPosix::newCurrentThread()
00396 {
00397 lockMutex(m_threadMutex);
00398 CArchThreadImpl* thread = find(pthread_self());
00399 unlockMutex(m_threadMutex);
00400 assert(thread != NULL);
00401 return thread;
00402 }
00403
00404 void
00405 CArchMultithreadPosix::closeThread(CArchThread thread)
00406 {
00407 assert(thread != NULL);
00408
00409
00410 if (--thread->m_refCount == 0) {
00411
00412 if (thread->m_func != NULL) {
00413 pthread_detach(thread->m_thread);
00414 }
00415
00416
00417 lockMutex(m_threadMutex);
00418 assert(findNoRef(thread->m_thread) == thread);
00419 erase(thread);
00420 unlockMutex(m_threadMutex);
00421
00422
00423 delete thread;
00424 }
00425 }
00426
00427 CArchThread
00428 CArchMultithreadPosix::copyThread(CArchThread thread)
00429 {
00430 refThread(thread);
00431 return thread;
00432 }
00433
00434 void
00435 CArchMultithreadPosix::cancelThread(CArchThread thread)
00436 {
00437 assert(thread != NULL);
00438
00439
00440 bool wakeup = false;
00441 lockMutex(m_threadMutex);
00442 if (!thread->m_exited && !thread->m_cancelling) {
00443 thread->m_cancel = true;
00444 wakeup = true;
00445 }
00446 unlockMutex(m_threadMutex);
00447
00448
00449 if (wakeup) {
00450 pthread_kill(thread->m_thread, SIGWAKEUP);
00451 }
00452 }
00453
00454 void
00455 CArchMultithreadPosix::setPriorityOfThread(CArchThread thread, int )
00456 {
00457 assert(thread != NULL);
00458
00459
00460 }
00461
00462 void
00463 CArchMultithreadPosix::testCancelThread()
00464 {
00465
00466 lockMutex(m_threadMutex);
00467 CArchThreadImpl* thread = findNoRef(pthread_self());
00468 unlockMutex(m_threadMutex);
00469
00470
00471 testCancelThreadImpl(thread);
00472 }
00473
00474 bool
00475 CArchMultithreadPosix::wait(CArchThread target, double timeout)
00476 {
00477 assert(target != NULL);
00478
00479 lockMutex(m_threadMutex);
00480
00481
00482 CArchThreadImpl* self = findNoRef(pthread_self());
00483
00484
00485 if (target == self) {
00486 unlockMutex(m_threadMutex);
00487 return false;
00488 }
00489
00490
00491 refThread(target);
00492
00493 unlockMutex(m_threadMutex);
00494
00495 try {
00496
00497 testCancelThreadImpl(self);
00498 if (isExitedThread(target)) {
00499 closeThread(target);
00500 return true;
00501 }
00502
00503
00504 if (timeout != 0.0) {
00505 const double start = ARCH->time();
00506 do {
00507
00508 ARCH->sleep(0.05);
00509
00510
00511 testCancelThreadImpl(self);
00512 if (isExitedThread(target)) {
00513 closeThread(target);
00514 return true;
00515 }
00516
00517
00518 } while (timeout < 0.0 || (ARCH->time() - start) <= timeout);
00519 }
00520
00521 closeThread(target);
00522 return false;
00523 }
00524 catch (...) {
00525 closeThread(target);
00526 throw;
00527 }
00528 }
00529
00530 bool
00531 CArchMultithreadPosix::isSameThread(CArchThread thread1, CArchThread thread2)
00532 {
00533 return (thread1 == thread2);
00534 }
00535
00536 bool
00537 CArchMultithreadPosix::isExitedThread(CArchThread thread)
00538 {
00539 lockMutex(m_threadMutex);
00540 bool exited = thread->m_exited;
00541 unlockMutex(m_threadMutex);
00542 return exited;
00543 }
00544
00545 void*
00546 CArchMultithreadPosix::getResultOfThread(CArchThread thread)
00547 {
00548 lockMutex(m_threadMutex);
00549 void* result = thread->m_result;
00550 unlockMutex(m_threadMutex);
00551 return result;
00552 }
00553
00554 IArchMultithread::ThreadID
00555 CArchMultithreadPosix::getIDOfThread(CArchThread thread)
00556 {
00557 return thread->m_id;
00558 }
00559
00560 void
00561 CArchMultithreadPosix::setSignalHandler(
00562 ESignal signal, SignalFunc func, void* userData)
00563 {
00564 lockMutex(m_threadMutex);
00565 m_signalFunc[signal] = func;
00566 m_signalUserData[signal] = userData;
00567 unlockMutex(m_threadMutex);
00568 }
00569
00570 void
00571 CArchMultithreadPosix::raiseSignal(ESignal signal)
00572 {
00573 lockMutex(m_threadMutex);
00574 if (m_signalFunc[signal] != NULL) {
00575 m_signalFunc[signal](signal, m_signalUserData[signal]);
00576 pthread_kill(m_mainThread->m_thread, SIGWAKEUP);
00577 }
00578 else if (signal == kINTERRUPT || signal == kTERMINATE) {
00579 ARCH->cancelThread(m_mainThread);
00580 }
00581 unlockMutex(m_threadMutex);
00582 }
00583
00584 void
00585 CArchMultithreadPosix::startSignalHandler()
00586 {
00587
00588
00589 sigset_t sigset, oldsigset;
00590 setSignalSet(&sigset);
00591 pthread_sigmask(SIG_BLOCK, &sigset, &oldsigset);
00592
00593
00594
00595
00596
00597 pthread_attr_t attr;
00598 int status = pthread_attr_init(&attr);
00599 if (status == 0) {
00600 status = pthread_create(&m_signalThread, &attr,
00601 &CArchMultithreadPosix::threadSignalHandler,
00602 NULL);
00603 pthread_attr_destroy(&attr);
00604 }
00605 if (status != 0) {
00606
00607
00608 pthread_sigmask(SIG_UNBLOCK, &oldsigset, NULL);
00609 }
00610 }
00611
00612 CArchThreadImpl*
00613 CArchMultithreadPosix::find(pthread_t thread)
00614 {
00615 CArchThreadImpl* impl = findNoRef(thread);
00616 if (impl != NULL) {
00617 refThread(impl);
00618 }
00619 return impl;
00620 }
00621
00622 CArchThreadImpl*
00623 CArchMultithreadPosix::findNoRef(pthread_t thread)
00624 {
00625
00626 for (CThreadList::const_iterator index = m_threadList.begin();
00627 index != m_threadList.end(); ++index) {
00628 if ((*index)->m_thread == thread) {
00629 return *index;
00630 }
00631 }
00632 return NULL;
00633 }
00634
00635 void
00636 CArchMultithreadPosix::insert(CArchThreadImpl* thread)
00637 {
00638 assert(thread != NULL);
00639
00640
00641 assert(findNoRef(thread->m_thread) == NULL);
00642
00643
00644
00645
00646
00647 thread->m_id = ++m_nextID;
00648
00649
00650 m_threadList.push_back(thread);
00651 }
00652
00653 void
00654 CArchMultithreadPosix::erase(CArchThreadImpl* thread)
00655 {
00656 for (CThreadList::iterator index = m_threadList.begin();
00657 index != m_threadList.end(); ++index) {
00658 if (*index == thread) {
00659 m_threadList.erase(index);
00660 break;
00661 }
00662 }
00663 }
00664
00665 void
00666 CArchMultithreadPosix::refThread(CArchThreadImpl* thread)
00667 {
00668 assert(thread != NULL);
00669 assert(findNoRef(thread->m_thread) != NULL);
00670 ++thread->m_refCount;
00671 }
00672
00673 void
00674 CArchMultithreadPosix::testCancelThreadImpl(CArchThreadImpl* thread)
00675 {
00676 assert(thread != NULL);
00677
00678
00679 lockMutex(m_threadMutex);
00680 bool cancel = false;
00681 if (thread->m_cancel && !thread->m_cancelling) {
00682 thread->m_cancelling = true;
00683 thread->m_cancel = false;
00684 cancel = true;
00685 }
00686 unlockMutex(m_threadMutex);
00687
00688
00689 if (cancel) {
00690 throw XThreadCancel();
00691 }
00692 }
00693
00694 void*
00695 CArchMultithreadPosix::threadFunc(void* vrep)
00696 {
00697
00698 CArchThreadImpl* thread = reinterpret_cast<CArchThreadImpl*>(vrep);
00699
00700
00701 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
00702 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
00703
00704
00705 s_instance->doThreadFunc(thread);
00706
00707
00708 return NULL;
00709 }
00710
00711 void
00712 CArchMultithreadPosix::doThreadFunc(CArchThread thread)
00713 {
00714
00715 setPriorityOfThread(thread, 1);
00716
00717
00718 lockMutex(m_threadMutex);
00719 unlockMutex(m_threadMutex);
00720
00721 void* result = NULL;
00722 try {
00723
00724 result = (*thread->m_func)(thread->m_userData);
00725 }
00726
00727 catch (XThreadCancel&) {
00728
00729 }
00730 catch (...) {
00731
00732 lockMutex(m_threadMutex);
00733 thread->m_exited = true;
00734 unlockMutex(m_threadMutex);
00735 closeThread(thread);
00736 throw;
00737 }
00738
00739
00740 lockMutex(m_threadMutex);
00741 thread->m_result = result;
00742 thread->m_exited = true;
00743 unlockMutex(m_threadMutex);
00744
00745
00746 closeThread(thread);
00747 }
00748
00749 void
00750 CArchMultithreadPosix::threadCancel(int)
00751 {
00752
00753 }
00754
00755 void*
00756 CArchMultithreadPosix::threadSignalHandler(void*)
00757 {
00758
00759 pthread_detach(pthread_self());
00760
00761
00762 sigset_t sigset;
00763 setSignalSet(&sigset);
00764
00765
00766
00767
00768
00769
00770
00771
00772
00773 sigaddset(&sigset, SIGABRT);
00774
00775
00776 for (;;) {
00777
00778 #if HAVE_POSIX_SIGWAIT
00779 int signal = 0;
00780 sigwait(&sigset, &signal);
00781 #else
00782 sigwait(&sigset);
00783 #endif
00784
00785
00786 switch (signal) {
00787 case SIGINT:
00788 ARCH->raiseSignal(kINTERRUPT);
00789 break;
00790
00791 case SIGTERM:
00792 ARCH->raiseSignal(kTERMINATE);
00793 break;
00794
00795 case SIGHUP:
00796 ARCH->raiseSignal(kHANGUP);
00797 break;
00798
00799 case SIGUSR2:
00800 ARCH->raiseSignal(kUSER);
00801 break;
00802
00803 default:
00804
00805 break;
00806 }
00807 }
00808
00809 return NULL;
00810 }