libclamunrar/threadpool.cpp
d39cb658
 #include "rar.hpp"
 
 #ifdef RAR_SMP
 #include "threadmisc.cpp"
 
 #ifdef _WIN_ALL
 int ThreadPool::ThreadPriority=THREAD_PRIORITY_NORMAL;
 #endif
 
 ThreadPool::ThreadPool(uint MaxThreads)
 {
   MaxAllowedThreads = MaxThreads;
   if (MaxAllowedThreads>MaxPoolThreads)
     MaxAllowedThreads=MaxPoolThreads;
   if (MaxAllowedThreads==0)
     MaxAllowedThreads=1;
 
   ThreadsCreatedCount=0;
 
   // If we have more threads than queue size, we'll hang on pool destroying,
   // not releasing all waiting threads.
   if (MaxAllowedThreads>ASIZE(TaskQueue))
     MaxAllowedThreads=ASIZE(TaskQueue);
 
   Closing=false;
 
   bool Success = CriticalSectionCreate(&CritSection);
 #ifdef _WIN_ALL
   QueuedTasksCnt=CreateSemaphore(NULL,0,ASIZE(TaskQueue),NULL);
   NoneActive=CreateEvent(NULL,TRUE,TRUE,NULL);
   Success=Success && QueuedTasksCnt!=NULL && NoneActive!=NULL;
 #elif defined(_UNIX)
   AnyActive = false;
   QueuedTasksCnt = 0;
   Success=Success && pthread_cond_init(&AnyActiveCond,NULL)==0 &&
           pthread_mutex_init(&AnyActiveMutex,NULL)==0 &&
           pthread_cond_init(&QueuedTasksCntCond,NULL)==0 &&
           pthread_mutex_init(&QueuedTasksCntMutex,NULL)==0;
 #endif
   if (!Success)
   {
     ErrHandler.GeneralErrMsg(L"\nThread pool initialization failed.");
     ErrHandler.Exit(RARX_FATAL);
   }
 
   QueueTop = 0;
   QueueBottom = 0;
   ActiveThreads = 0;
 }
 
 
 ThreadPool::~ThreadPool()
 {
   WaitDone();
   Closing=true;
 
 #ifdef _WIN_ALL
   ReleaseSemaphore(QueuedTasksCnt,ASIZE(TaskQueue),NULL);
 #elif defined(_UNIX)
   // Threads still can access QueuedTasksCnt for a short time after WaitDone(),
   // so lock is required. We would occassionally hang without it.
   pthread_mutex_lock(&QueuedTasksCntMutex);
   QueuedTasksCnt+=ASIZE(TaskQueue);
   pthread_mutex_unlock(&QueuedTasksCntMutex);
 
   pthread_cond_broadcast(&QueuedTasksCntCond);
 #endif
 
   for(uint I=0;I<ThreadsCreatedCount;I++)
   {
 #ifdef _WIN_ALL
     // Waiting until the thread terminates.
     CWaitForSingleObject(ThreadHandles[I]);
 #endif
     // Close the thread handle. In Unix it results in pthread_join call,
     // which also waits for thread termination.
     ThreadClose(ThreadHandles[I]);
   }
 
   CriticalSectionDelete(&CritSection);
 #ifdef _WIN_ALL
   CloseHandle(QueuedTasksCnt);
   CloseHandle(NoneActive);
 #elif defined(_UNIX)
   pthread_cond_destroy(&AnyActiveCond);
   pthread_mutex_destroy(&AnyActiveMutex);
   pthread_cond_destroy(&QueuedTasksCntCond);
   pthread_mutex_destroy(&QueuedTasksCntMutex);
 #endif
 }
 
 
 void ThreadPool::CreateThreads()
 {
   for(uint I=0;I<MaxAllowedThreads;I++)
   {
     ThreadHandles[I] = ThreadCreate(PoolThread, this);
     ThreadsCreatedCount++;
 #ifdef _WIN_ALL
     if (ThreadPool::ThreadPriority!=THREAD_PRIORITY_NORMAL)
       SetThreadPriority(ThreadHandles[I],ThreadPool::ThreadPriority);
 #endif
   }
 }
 
 
 NATIVE_THREAD_TYPE ThreadPool::PoolThread(void *Param)
 {
   ((ThreadPool*)Param)->PoolThreadLoop();
   return 0;
 }
 
 
 void ThreadPool::PoolThreadLoop()
 {
   QueueEntry Task;
   while (GetQueuedTask(&Task))
   {
     Task.Proc(Task.Param);
     
     CriticalSectionStart(&CritSection); 
     if (--ActiveThreads == 0)
     {
 #ifdef _WIN_ALL
       SetEvent(NoneActive);
 #elif defined(_UNIX)
       pthread_mutex_lock(&AnyActiveMutex);
       AnyActive=false;
       pthread_cond_signal(&AnyActiveCond);
       pthread_mutex_unlock(&AnyActiveMutex);
 #endif
     }
     CriticalSectionEnd(&CritSection); 
   }
 }
 
 
 bool ThreadPool::GetQueuedTask(QueueEntry *Task)
 {
 #ifdef _WIN_ALL
   CWaitForSingleObject(QueuedTasksCnt);
 #elif defined(_UNIX)
   pthread_mutex_lock(&QueuedTasksCntMutex);
   while (QueuedTasksCnt==0)
     cpthread_cond_wait(&QueuedTasksCntCond,&QueuedTasksCntMutex);
   QueuedTasksCnt--;
   pthread_mutex_unlock(&QueuedTasksCntMutex);
 #endif
 
   if (Closing)
     return false;
 
   CriticalSectionStart(&CritSection); 
 
   *Task = TaskQueue[QueueBottom];
   QueueBottom = (QueueBottom + 1) % ASIZE(TaskQueue);
 
   CriticalSectionEnd(&CritSection); 
 
   return true;
 }
 
 
 // Add task to queue. We assume that it is always called from main thread,
 // it allows to avoid any locks here. We process collected tasks only
 // when WaitDone is called.
 void ThreadPool::AddTask(PTHREAD_PROC Proc,void *Data)
 {
   if (ThreadsCreatedCount == 0)
     CreateThreads();
   
   // If queue is full, wait until it is empty.
   if ((QueueTop + 1) % ASIZE(TaskQueue) == QueueBottom)
     WaitDone();
 
   TaskQueue[QueueTop].Proc = Proc;
   TaskQueue[QueueTop].Param = Data;
   QueueTop = (QueueTop + 1) % ASIZE(TaskQueue);
 }
 
 
 // Start queued tasks and wait until all threads are inactive.
 // We assume that it is always called from main thread, when pool threads
 // are sleeping yet.
 void ThreadPool::WaitDone()
 {
   // We add ASIZE(TaskQueue) for case if TaskQueue array size is not
   // a power of two. Negative numbers would not suit our purpose here.
   ActiveThreads=(QueueTop+ASIZE(TaskQueue)-QueueBottom) % ASIZE(TaskQueue);
   if (ActiveThreads==0)
     return;
 #ifdef _WIN_ALL
   ResetEvent(NoneActive);
   ReleaseSemaphore(QueuedTasksCnt,ActiveThreads,NULL);
   CWaitForSingleObject(NoneActive);
 #elif defined(_UNIX)
   AnyActive=true;
 
   // Threads reset AnyActive before accessing QueuedTasksCnt and even
   // preceding WaitDone() call does not guarantee that some slow thread
   // is not accessing QueuedTasksCnt now. So lock is necessary.
   pthread_mutex_lock(&QueuedTasksCntMutex);
   QueuedTasksCnt+=ActiveThreads;
   pthread_mutex_unlock(&QueuedTasksCntMutex);
 
   pthread_cond_broadcast(&QueuedTasksCntCond);
 
   pthread_mutex_lock(&AnyActiveMutex);
   while (AnyActive)
     cpthread_cond_wait(&AnyActiveCond,&AnyActiveMutex);
   pthread_mutex_unlock(&AnyActiveMutex);
 #endif
 }
 #endif // RAR_SMP