clamd/thrmgr.c
c238ac42
 /*
e1cbc270
  *  Copyright (C) 2013-2019 Cisco Systems, Inc. and/or its affiliates. All rights reserved.
  *  Copyright (C) 2007-2013 Sourcefire, Inc.
086eab5c
  *
  *  Authors: Trog, Török Edvin
c238ac42
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
  *  the Free Software Foundation; either version 2 of the License, or
  *  (at your option) any later version.
  *
  *  This program is distributed in the hope that it will be useful,
  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  *  GNU General Public License for more details.
  *
  *  You should have received a copy of the GNU General Public License
  *  along with this program; if not, write to the Free Software
48b7b4a7
  *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  *  MA 02110-1301, USA.
c238ac42
  */
52e8d3c6
 
67118e92
 #if HAVE_CONFIG_H
 #include "clamav-config.h"
 #endif
 
bd8603aa
 #include <stdio.h>
c238ac42
 #include <pthread.h>
 #include <time.h>
 #include <errno.h>
949c6fe5
 #include <string.h>
c238ac42
 
bd8603aa
 #include "shared/output.h"
c238ac42
 
60d8d2c3
 #include "libclamav/clamav.h"
bd8603aa
 #include "thrmgr.h"
52e8d3c6
 #include "others.h"
deb30312
 #include "mpool.h"
949c6fe5
 #include "server.h"
18f620f2
 #include "libclamav/others.h"
c238ac42
 
4b93f2b6
 #ifdef HAVE_MALLINFO
aa22174b
 #include <malloc.h>
 #endif
 
efac2f09
 /* BSD and HP-UX need a bigger stacksize than the system default */
288057e9
 #if defined(C_BSD) || defined(C_HPUX) || defined(C_AIX) || (defined(C_LINUX) && !defined(__GLIBC__))
efac2f09
 #define C_BIGSTACK 1
 #endif
 
079229d6
 static work_queue_t *work_queue_new(void)
52e8d3c6
 {
288057e9
     work_queue_t *work_q;
64cb3807
 
288057e9
     work_q = (work_queue_t *)malloc(sizeof(work_queue_t));
     if (!work_q) {
         return NULL;
     }
64cb3807
 
288057e9
     work_q->head = work_q->tail = NULL;
     work_q->item_count          = 0;
     work_q->popped              = 0;
     return work_q;
c238ac42
 }
 
c3b33e5a
 static int work_queue_add(work_queue_t *work_q, void *data)
c238ac42
 {
288057e9
     work_item_t *work_item;
 
     if (!work_q) {
         return FALSE;
     }
     work_item = (work_item_t *)malloc(sizeof(work_item_t));
     if (!work_item) {
         return FALSE;
     }
 
     work_item->next = NULL;
     work_item->data = data;
     gettimeofday(&(work_item->time_queued), NULL);
 
     if (work_q->head == NULL) {
         work_q->head = work_q->tail = work_item;
         work_q->item_count          = 1;
     } else {
         work_q->tail->next = work_item;
         work_q->tail       = work_item;
         work_q->item_count++;
     }
     return TRUE;
c238ac42
 }
 
079229d6
 static void *work_queue_pop(work_queue_t *work_q)
c238ac42
 {
288057e9
     work_item_t *work_item;
     void *data;
 
     if (!work_q || !work_q->head) {
         return NULL;
     }
     work_item    = work_q->head;
     data         = work_item->data;
     work_q->head = work_item->next;
     if (work_q->head == NULL) {
         work_q->tail = NULL;
     }
     free(work_item);
     work_q->item_count--;
     return data;
c238ac42
 }
 
aa22174b
 static struct threadpool_list {
288057e9
     threadpool_t *pool;
     struct threadpool_list *nxt;
 } *pools                          = NULL;
aa22174b
 static pthread_mutex_t pools_lock = PTHREAD_MUTEX_INITIALIZER;
 
 static void add_topools(threadpool_t *t)
 {
288057e9
     struct threadpool_list *new = malloc(sizeof(*new));
     if (!new) {
         logg("!Unable to add threadpool to list\n");
         return;
     }
     new->pool = t;
     pthread_mutex_lock(&pools_lock);
     new->nxt = pools;
     pools    = new;
     pthread_mutex_unlock(&pools_lock);
aa22174b
 }
 
 static void remove_frompools(threadpool_t *t)
 {
288057e9
     struct threadpool_list *l, *prev;
     struct task_desc *desc;
     pthread_mutex_lock(&pools_lock);
     prev = NULL;
     l    = pools;
     while (l && l->pool != t) {
         prev = l;
         l    = l->nxt;
     }
     if (!l) {
b2c04b6c
         pthread_mutex_unlock(&pools_lock);
288057e9
         return;
     }
     if (prev)
         prev->nxt = l->nxt;
     if (l == pools)
         pools = l->nxt;
     free(l);
     desc = t->tasks;
     while (desc) {
         struct task_desc *q = desc;
         desc                = desc->nxt;
         free(q);
     }
     t->tasks = NULL;
     pthread_mutex_unlock(&pools_lock);
aa22174b
 }
 
949c6fe5
 static void print_queue(int f, work_queue_t *queue, struct timeval *tv_now)
 {
288057e9
     long umin = LONG_MAX, umax = 0, usum = 0;
949c6fe5
     unsigned invalids = 0, cnt = 0;
     work_item_t *q;
 
288057e9
     if (!queue->head)
         return;
     for (q = queue->head; q; q = q->next) {
         long delta;
         delta = tv_now->tv_usec - q->time_queued.tv_usec;
         delta += (tv_now->tv_sec - q->time_queued.tv_sec) * 1000000;
         if (delta < 0) {
             invalids++;
             continue;
         }
         if (delta > umax)
             umax = delta;
         if (delta < umin)
             umin = delta;
         usum += delta;
         ++cnt;
     }
     mdprintf(f, " min_wait: %.6f max_wait: %.6f avg_wait: %.6f",
              umin / 1e6, umax / 1e6, usum / (1e6 * cnt));
     if (invalids)
         mdprintf(f, " (INVALID timestamps: %u)", invalids);
     if (cnt + invalids != (unsigned)queue->item_count)
         mdprintf(f, " (ERROR: %u != %u)", cnt + invalids,
                  (unsigned)queue->item_count);
949c6fe5
 }
 
f2381892
 int thrmgr_printstats(int f, char term)
aa22174b
 {
288057e9
     struct threadpool_list *l;
     unsigned cnt, pool_cnt = 0;
     size_t pool_used = 0, pool_total = 0, seen_cnt = 0, error_flag = 0;
     float mem_heap = 0, mem_mmap = 0, mem_used = 0, mem_free = 0, mem_releasable = 0;
     const struct cl_engine **seen = NULL;
     int has_libc_memstats         = 0;
 
     pthread_mutex_lock(&pools_lock);
     for (cnt = 0, l = pools; l; l = l->nxt) cnt++;
     mdprintf(f, "POOLS: %u\n\n", cnt);
     for (l = pools; l && !error_flag; l = l->nxt) {
         threadpool_t *pool = l->pool;
         const char *state;
         struct timeval tv_now;
         struct task_desc *task;
         cnt = 0;
 
         if (!pool) {
             mdprintf(f, "NULL\n\n");
             continue;
         }
         /* now we can access desc->, knowing that they won't get freed
deb30312
 		 * because the other tasks can't quit while pool_mutex is taken
 		 */
288057e9
         switch (pool->state) {
             case POOL_INVALID:
                 state = "INVALID";
                 break;
             case POOL_VALID:
                 state = "VALID";
                 break;
             case POOL_EXIT:
                 state = "EXIT";
                 break;
             default:
                 state = "??";
                 break;
         }
         mdprintf(f, "STATE: %s %s\n", state, l->nxt ? "" : "PRIMARY");
         mdprintf(f, "THREADS: live %u  idle %u max %u idle-timeout %u\n", pool->thr_alive, pool->thr_idle, pool->thr_max,
                  pool->idle_timeout);
         /* TODO: show both queues */
         mdprintf(f, "QUEUE: %u items", pool->single_queue->item_count + pool->bulk_queue->item_count);
         gettimeofday(&tv_now, NULL);
         print_queue(f, pool->bulk_queue, &tv_now);
         print_queue(f, pool->single_queue, &tv_now);
         mdprintf(f, "\n");
         for (task = pool->tasks; task; task = task->nxt) {
             double delta;
             size_t used, total;
 
             delta = tv_now.tv_usec - task->tv.tv_usec;
             delta += (tv_now.tv_sec - task->tv.tv_sec) * 1000000.0;
             mdprintf(f, "\t%s %f %s\n",
                      task->command ? task->command : "N/A",
                      delta / 1e6,
                      task->filename ? task->filename : "");
             if (task->engine) {
                 /* we usually have at most 2 engines so a linear
deb30312
 				 * search is good enough */
288057e9
                 size_t i;
                 for (i = 0; i < seen_cnt; i++) {
                     if (seen[i] == task->engine)
                         break;
                 }
                 /* we need to count the memusage from the same
deb30312
 				 * engine only once */
288057e9
                 if (i == seen_cnt) {
                     const struct cl_engine **s;
                     /* new engine */
                     ++seen_cnt;
                     s = realloc(seen, seen_cnt * sizeof(*seen));
                     if (!s) {
                         error_flag = 1;
                         break;
                     }
                     seen               = s;
                     seen[seen_cnt - 1] = task->engine;
 
544fa973
                     if (MPOOL_GETSTATS(task->engine, &used, &total) != -1) {
288057e9
                         pool_used += used;
                         pool_total += total;
                         pool_cnt++;
                     }
                 }
             }
         }
         mdprintf(f, "\n");
     }
     free(seen);
4b93f2b6
 #ifdef HAVE_MALLINFO
288057e9
     {
         struct mallinfo inf = mallinfo();
         mem_heap            = inf.arena / (1024 * 1024.0);
         mem_mmap            = inf.hblkhd / (1024 * 1024.0);
         mem_used            = (inf.usmblks + inf.uordblks) / (1024 * 1024.0);
         mem_free            = (inf.fsmblks + inf.fordblks) / (1024 * 1024.0);
         mem_releasable      = inf.keepcost / (1024 * 1024.0);
         has_libc_memstats   = 1;
     }
aa22174b
 #endif
288057e9
     if (error_flag) {
         mdprintf(f, "ERROR: error encountered while formatting statistics\n");
     } else {
         if (has_libc_memstats)
             mdprintf(f, "MEMSTATS: heap %.3fM mmap %.3fM used %.3fM free %.3fM releasable %.3fM pools %u pools_used %.3fM pools_total %.3fM\n",
                      mem_heap, mem_mmap, mem_used, mem_free, mem_releasable, pool_cnt,
                      pool_used / (1024 * 1024.0), pool_total / (1024 * 1024.0));
         else
             mdprintf(f, "MEMSTATS: heap N/A mmap N/A used N/A free N/A releasable N/A pools %u pools_used %.3fM pools_total %.3fM\n",
                      pool_cnt, pool_used / (1024 * 1024.0), pool_total / (1024 * 1024.0));
     }
     mdprintf(f, "END%c", term);
     pthread_mutex_unlock(&pools_lock);
     return 0;
aa22174b
 }
 
52e8d3c6
 void thrmgr_destroy(threadpool_t *threadpool)
c238ac42
 {
288057e9
     if (!threadpool) {
         return;
     }
     if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
         logg("!Mutex lock failed\n");
         exit(-1);
     }
     if (threadpool->state != POOL_VALID) {
         if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
             logg("!Mutex unlock failed\n");
             exit(-1);
         }
         return;
     }
     threadpool->state = POOL_EXIT;
 
     /* wait for threads to exit */
     if (threadpool->thr_alive > 0) {
         if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) {
             pthread_mutex_unlock(&threadpool->pool_mutex);
             return;
         }
     }
     while (threadpool->thr_alive > 0) {
         if (pthread_cond_wait(&threadpool->pool_cond, &threadpool->pool_mutex) != 0) {
             pthread_mutex_unlock(&threadpool->pool_mutex);
             return;
         }
     }
     remove_frompools(threadpool);
     if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
         logg("!Mutex unlock failed\n");
         exit(-1);
     }
 
     pthread_mutex_destroy(&(threadpool->pool_mutex));
     pthread_cond_destroy(&(threadpool->idle_cond));
     pthread_cond_destroy(&(threadpool->queueable_single_cond));
     pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
     pthread_cond_destroy(&(threadpool->pool_cond));
     pthread_attr_destroy(&(threadpool->pool_attr));
     free(threadpool->single_queue);
     free(threadpool->bulk_queue);
     free(threadpool);
     return;
c238ac42
 }
 
949c6fe5
 threadpool_t *thrmgr_new(int max_threads, int idle_timeout, int max_queue, void (*handler)(void *))
c238ac42
 {
288057e9
     threadpool_t *threadpool;
efac2f09
 #if defined(C_BIGSTACK)
288057e9
     size_t stacksize;
7b04a82c
 #endif
64cb3807
 
288057e9
     if (max_threads <= 0) {
         return NULL;
     }
 
     threadpool = (threadpool_t *)malloc(sizeof(threadpool_t));
     if (!threadpool) {
         return NULL;
     }
 
     threadpool->single_queue = work_queue_new();
     if (!threadpool->single_queue) {
         free(threadpool);
         return NULL;
     }
     threadpool->bulk_queue = work_queue_new();
     if (!threadpool->bulk_queue) {
         free(threadpool->single_queue);
         free(threadpool);
         return NULL;
     }
 
     threadpool->queue_max = max_queue;
 
     threadpool->thr_max       = max_threads;
     threadpool->thr_alive     = 0;
     threadpool->thr_idle      = 0;
     threadpool->thr_multiscan = 0;
     threadpool->idle_timeout  = idle_timeout;
     threadpool->handler       = handler;
     threadpool->tasks         = NULL;
 
     if (pthread_mutex_init(&(threadpool->pool_mutex), NULL)) {
         free(threadpool->single_queue);
         free(threadpool->bulk_queue);
         free(threadpool);
         return NULL;
     }
 
     if (pthread_cond_init(&(threadpool->pool_cond), NULL) != 0) {
         pthread_mutex_destroy(&(threadpool->pool_mutex));
         free(threadpool->single_queue);
         free(threadpool->bulk_queue);
         free(threadpool);
         return NULL;
     }
 
     if (pthread_cond_init(&(threadpool->queueable_single_cond), NULL) != 0) {
         pthread_cond_destroy(&(threadpool->pool_cond));
         pthread_mutex_destroy(&(threadpool->pool_mutex));
         free(threadpool->single_queue);
         free(threadpool->bulk_queue);
         free(threadpool);
         return NULL;
     }
 
     if (pthread_cond_init(&(threadpool->queueable_bulk_cond), NULL) != 0) {
         pthread_cond_destroy(&(threadpool->queueable_single_cond));
         pthread_cond_destroy(&(threadpool->pool_cond));
         pthread_mutex_destroy(&(threadpool->pool_mutex));
         free(threadpool->single_queue);
         free(threadpool->bulk_queue);
         free(threadpool);
         return NULL;
     }
 
     if (pthread_cond_init(&(threadpool->idle_cond), NULL) != 0) {
         pthread_cond_destroy(&(threadpool->queueable_single_cond));
         pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
         pthread_cond_destroy(&(threadpool->pool_cond));
         pthread_mutex_destroy(&(threadpool->pool_mutex));
         free(threadpool->single_queue);
         free(threadpool->bulk_queue);
         free(threadpool);
         return NULL;
     }
 
     if (pthread_attr_init(&(threadpool->pool_attr)) != 0) {
         pthread_cond_destroy(&(threadpool->queueable_single_cond));
         pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
         pthread_cond_destroy(&(threadpool->idle_cond));
         pthread_cond_destroy(&(threadpool->pool_cond));
         pthread_mutex_destroy(&(threadpool->pool_mutex));
         free(threadpool->single_queue);
         free(threadpool->bulk_queue);
         free(threadpool);
         return NULL;
     }
 
     if (pthread_attr_setdetachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != 0) {
         pthread_cond_destroy(&(threadpool->queueable_single_cond));
         pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
         pthread_attr_destroy(&(threadpool->pool_attr));
         pthread_cond_destroy(&(threadpool->idle_cond));
         pthread_cond_destroy(&(threadpool->pool_cond));
         pthread_mutex_destroy(&(threadpool->pool_mutex));
         free(threadpool->single_queue);
         free(threadpool->bulk_queue);
         free(threadpool);
         return NULL;
     }
7b04a82c
 
efac2f09
 #if defined(C_BIGSTACK)
288057e9
     pthread_attr_getstacksize(&(threadpool->pool_attr), &stacksize);
     stacksize = stacksize + 64 * 1024;
     if (stacksize < 1048576) /* at least 1MB please */
18f620f2
 #if defined(C_HPUX) && defined(USE_MPOOL)
288057e9
         /* Set aside one cli_pagesize() for the stack's pthread header,
18f620f2
 		 * giving a 1M region to fit a 1M large-page */
288057e9
         if (cli_getpagesize() < 1048576)
             stacksize = 1048576 - cli_getpagesize();
         else
18f620f2
 #endif
288057e9
             stacksize = 1048576;
     logg("Set stacksize to %lu\n", (unsigned long int)stacksize);
     pthread_attr_setstacksize(&(threadpool->pool_attr), stacksize);
7b04a82c
 #endif
288057e9
     threadpool->state = POOL_VALID;
c238ac42
 
288057e9
     add_topools(threadpool);
     return threadpool;
52e8d3c6
 }
c238ac42
 
aa22174b
 static pthread_key_t stats_tls_key;
 static pthread_once_t stats_tls_key_once = PTHREAD_ONCE_INIT;
 
 static void stats_tls_key_alloc(void)
 {
288057e9
     pthread_key_create(&stats_tls_key, NULL);
aa22174b
 }
 
 static const char *IDLE_TASK = "IDLE";
764f76c8
 
 /* no mutex is needed, we are using  thread local variable */
288057e9
 void thrmgr_setactivetask(const char *filename, const char *cmd)
aa22174b
 {
288057e9
     struct task_desc *desc;
     pthread_once(&stats_tls_key_once, stats_tls_key_alloc);
     desc = pthread_getspecific(stats_tls_key);
     if (!desc)
         return;
     desc->filename = filename;
     if (cmd) {
         if (cmd == IDLE_TASK && desc->command == cmd)
             return;
         desc->command = cmd;
         gettimeofday(&desc->tv, NULL);
     }
aa22174b
 }
 
deb30312
 void thrmgr_setactiveengine(const struct cl_engine *engine)
 {
288057e9
     struct task_desc *desc;
     pthread_once(&stats_tls_key_once, stats_tls_key_alloc);
     desc = pthread_getspecific(stats_tls_key);
     if (!desc)
         return;
     desc->engine = engine;
deb30312
 }
 
764f76c8
 /* thread pool mutex must be held on entry */
aa22174b
 static void stats_init(threadpool_t *pool)
 {
288057e9
     struct task_desc *desc = calloc(1, sizeof(*desc));
     if (!desc)
         return;
     pthread_once(&stats_tls_key_once, stats_tls_key_alloc);
     pthread_setspecific(stats_tls_key, desc);
     if (!pool->tasks)
         pool->tasks = desc;
     else {
         desc->nxt        = pool->tasks;
         pool->tasks->prv = desc;
         pool->tasks      = desc;
     }
aa22174b
 }
 
764f76c8
 /* thread pool mutex must be held on entry */
aa22174b
 static void stats_destroy(threadpool_t *pool)
 {
288057e9
     struct task_desc *desc = pthread_getspecific(stats_tls_key);
     if (!desc)
         return;
     pthread_mutex_lock(&pools_lock);
     if (desc->prv)
         desc->prv->nxt = desc->nxt;
     if (desc->nxt)
         desc->nxt->prv = desc->prv;
     if (pool->tasks == desc)
         pool->tasks = desc->nxt;
     free(desc);
     pthread_setspecific(stats_tls_key, NULL);
     pthread_mutex_unlock(&pools_lock);
aa22174b
 }
 
6a5ec1f9
 static inline int thrmgr_contended(threadpool_t *pool, int bulk)
949c6fe5
 {
6a5ec1f9
     /* don't allow bulk items to exceed 50% of queue, so that
      * non-bulk items get a chance to be in the queue */
288057e9
     if (bulk && pool->bulk_queue->item_count >= pool->queue_max / 2)
         return 1;
     return pool->bulk_queue->item_count + pool->single_queue->item_count + pool->thr_alive - pool->thr_idle >= pool->queue_max;
949c6fe5
 }
 
 /* when both queues have tasks, it will pick 4 items from the single queue,
  * and 1 from the bulk */
 #define SINGLE_BULK_RATIO 4
 #define SINGLE_BULK_SUM (SINGLE_BULK_RATIO + 1)
 
 /* must be called with pool_mutex held */
 static void *thrmgr_pop(threadpool_t *pool)
 {
     void *task;
     work_queue_t *first, *second;
     int ratio;
 
     if (pool->single_queue->popped < SINGLE_BULK_RATIO) {
288057e9
         first  = pool->single_queue;
         second = pool->bulk_queue;
         ratio  = SINGLE_BULK_RATIO;
949c6fe5
     } else {
288057e9
         second = pool->single_queue;
         first  = pool->bulk_queue;
         ratio  = SINGLE_BULK_SUM - SINGLE_BULK_RATIO;
949c6fe5
     }
 
     task = work_queue_pop(first);
     if (task) {
288057e9
         if (++first->popped == ratio)
             second->popped = 0;
949c6fe5
     } else {
288057e9
         task = work_queue_pop(second);
         if (task) {
             if (++second->popped == ratio)
                 first->popped = 0;
         }
949c6fe5
     }
 
6a5ec1f9
     if (!thrmgr_contended(pool, 0)) {
288057e9
         logg("$THRMGR: queue (single) crossed low threshold -> signaling\n");
         pthread_cond_signal(&pool->queueable_single_cond);
6a5ec1f9
     }
 
     if (!thrmgr_contended(pool, 1)) {
288057e9
         logg("$THRMGR: queue (bulk) crossed low threshold -> signaling\n");
         pthread_cond_signal(&pool->queueable_bulk_cond);
949c6fe5
     }
 
     return task;
 }
 
fc83da82
 static void *thrmgr_worker(void *arg)
52e8d3c6
 {
288057e9
     threadpool_t *threadpool = (threadpool_t *)arg;
     void *job_data;
     int retval, must_exit = FALSE, stats_inited = FALSE;
     struct timespec timeout;
 
     /* loop looking for work */
     for (;;) {
         if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
             logg("!Fatal: mutex lock failed\n");
             exit(-2);
         }
         if (!stats_inited) {
             stats_init(threadpool);
             stats_inited = TRUE;
         }
         thrmgr_setactiveengine(NULL);
         thrmgr_setactivetask(NULL, IDLE_TASK);
         timeout.tv_sec  = time(NULL) + threadpool->idle_timeout;
         timeout.tv_nsec = 0;
         threadpool->thr_idle++;
         while (((job_data = thrmgr_pop(threadpool)) == NULL) && (threadpool->state != POOL_EXIT)) {
             /* Sleep, awaiting wakeup */
             pthread_cond_signal(&threadpool->idle_cond);
             retval = pthread_cond_timedwait(&(threadpool->pool_cond),
                                             &(threadpool->pool_mutex), &timeout);
             if (retval == ETIMEDOUT) {
                 must_exit = TRUE;
                 break;
             }
         }
         threadpool->thr_idle--;
         if (threadpool->state == POOL_EXIT) {
             must_exit = TRUE;
         }
 
         if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
             logg("!Fatal: mutex unlock failed\n");
             exit(-2);
         }
         if (job_data) {
             threadpool->handler(job_data);
         } else if (must_exit) {
             break;
         }
     }
     if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
         /* Fatal error */
         logg("!Fatal: mutex lock failed\n");
         exit(-2);
     }
     threadpool->thr_alive--;
     if (threadpool->thr_alive == 0) {
         /* signal that all threads are finished */
         pthread_cond_broadcast(&threadpool->pool_cond);
     }
     stats_destroy(threadpool);
     if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
         /* Fatal error */
         logg("!Fatal: mutex unlock failed\n");
         exit(-2);
     }
     return NULL;
c238ac42
 }
 
949c6fe5
 static int thrmgr_dispatch_internal(threadpool_t *threadpool, void *user_data, int bulk)
52e8d3c6
 {
288057e9
     int ret = TRUE;
     pthread_t thr_id;
 
     if (!threadpool) {
         return FALSE;
     }
 
     /* Lock the threadpool */
     if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
         logg("!Mutex lock failed\n");
         return FALSE;
     }
 
     do {
         work_queue_t *queue;
         pthread_cond_t *queueable_cond;
         int items;
 
         if (threadpool->state != POOL_VALID) {
             ret = FALSE;
             break;
         }
 
         if (bulk) {
             queue          = threadpool->bulk_queue;
             queueable_cond = &threadpool->queueable_bulk_cond;
         } else {
             queue          = threadpool->single_queue;
             queueable_cond = &threadpool->queueable_single_cond;
         }
 
         while (thrmgr_contended(threadpool, bulk)) {
             logg("$THRMGR: contended, sleeping\n");
             pthread_cond_wait(queueable_cond, &threadpool->pool_mutex);
             logg("$THRMGR: contended, woken\n");
         }
 
         if (!work_queue_add(queue, user_data)) {
             ret = FALSE;
             break;
         }
 
         items = threadpool->single_queue->item_count + threadpool->bulk_queue->item_count;
         if ((threadpool->thr_idle < items) &&
             (threadpool->thr_alive < threadpool->thr_max)) {
             /* Start a new thread */
             if (pthread_create(&thr_id, &(threadpool->pool_attr),
                                thrmgr_worker, threadpool) != 0) {
                 logg("!pthread_create failed\n");
             } else {
                 threadpool->thr_alive++;
             }
         }
         pthread_cond_signal(&(threadpool->pool_cond));
 
     } while (0);
 
     if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
         logg("!Mutex unlock failed\n");
         return FALSE;
     }
     return ret;
949c6fe5
 }
 
 int thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
 {
     return thrmgr_dispatch_internal(threadpool, user_data, 0);
 }
 
1514794c
 int thrmgr_group_dispatch(threadpool_t *threadpool, jobgroup_t *group, void *user_data, int bulk)
949c6fe5
 {
     int ret;
     if (group) {
288057e9
         pthread_mutex_lock(&group->mutex);
         group->jobs++;
         logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs);
         pthread_mutex_unlock(&group->mutex);
949c6fe5
     }
1514794c
     if (!(ret = thrmgr_dispatch_internal(threadpool, user_data, bulk)) && group) {
288057e9
         pthread_mutex_lock(&group->mutex);
         group->jobs--;
         logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs);
         pthread_mutex_unlock(&group->mutex);
949c6fe5
     }
     return ret;
 }
 
 /* returns
  *   0 - this was not the last thread in the group
  *   1 - this was last thread in group, group freed
  */
 int thrmgr_group_finished(jobgroup_t *group, enum thrmgr_exit exitc)
 {
     int ret = 0;
     if (!group) {
288057e9
         /* there is no group, we are obviously the last one */
         return 1;
949c6fe5
     }
     pthread_mutex_lock(&group->mutex);
fb6fe4f5
     logg("$THRMGR: group_finished: %p, %d\n", group, group->jobs);
949c6fe5
     group->exit_total++;
     switch (exitc) {
288057e9
         case EXIT_OK:
             group->exit_ok++;
             break;
         case EXIT_ERROR:
             group->exit_error++;
             break;
         default:
             break;
949c6fe5
     }
     if (group->jobs) {
288057e9
         if (!--group->jobs) {
             ret = 1;
         } else
             logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs);
         if (group->jobs == 1)
             pthread_cond_signal(&group->only);
949c6fe5
     }
     pthread_mutex_unlock(&group->mutex);
     if (ret) {
288057e9
         logg("$THRMGR: group_finished: freeing %p\n", group);
         pthread_mutex_destroy(&group->mutex);
         pthread_cond_destroy(&group->only);
         free(group);
949c6fe5
     }
     return ret;
 }
 
 void thrmgr_group_waitforall(jobgroup_t *group, unsigned *ok, unsigned *error, unsigned *total)
 {
     int needexit = 0, needfree = 0;
     struct timespec timeout;
     pthread_mutex_lock(&group->mutex);
     while (group->jobs > 1) {
288057e9
         pthread_mutex_lock(&exit_mutex);
         needexit = progexit;
         pthread_mutex_unlock(&exit_mutex);
         if (needexit)
             break;
         /* wake to check progexit */
         timeout.tv_sec  = time(NULL) + 5;
         timeout.tv_nsec = 0;
         pthread_cond_timedwait(&group->only, &group->mutex, &timeout);
     }
     *ok    = group->exit_ok;
949c6fe5
     *error = group->exit_error + needexit;
     *total = group->exit_total;
288057e9
     if (!--group->jobs)
         needfree = 1;
fb6fe4f5
     else
288057e9
         logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs);
949c6fe5
     pthread_mutex_unlock(&group->mutex);
     if (needfree) {
288057e9
         logg("$THRMGR: group finished freeing %p\n", group);
         free(group);
949c6fe5
     }
 }
 
 jobgroup_t *thrmgr_group_new(void)
 {
     jobgroup_t *group;
 
     group = malloc(sizeof(*group));
     if (!group)
288057e9
         return NULL;
     group->jobs    = 1;
7660b7cb
     group->exit_ok = group->exit_error = group->exit_total = group->force_exit = 0;
     if (pthread_mutex_init(&group->mutex, NULL)) {
288057e9
         logg("^Failed to initialize group mutex");
         free(group);
         return NULL;
7660b7cb
     }
     if (pthread_cond_init(&group->only, NULL)) {
288057e9
         logg("^Failed to initialize group cond");
         pthread_mutex_destroy(&group->mutex);
         free(group);
         return NULL;
7660b7cb
     }
fb6fe4f5
     logg("$THRMGR: new group: %p\n", group);
949c6fe5
     return group;
 }
 
 int thrmgr_group_need_terminate(jobgroup_t *group)
 {
     int ret;
     if (group) {
288057e9
         pthread_mutex_lock(&group->mutex);
         ret = group->force_exit;
         pthread_mutex_unlock(&group->mutex);
949c6fe5
     } else
288057e9
         ret = 0;
949c6fe5
     pthread_mutex_lock(&exit_mutex);
     ret |= progexit;
     pthread_mutex_unlock(&exit_mutex);
     return ret;
 }
 
0378a9ab
 void thrmgr_group_terminate(jobgroup_t *group)
949c6fe5
 {
0378a9ab
     if (group) {
288057e9
         /* we may not be the last active job, now
0378a9ab
 	 * the last active job will free resources */
288057e9
         pthread_mutex_lock(&group->mutex);
         group->force_exit = 1;
         pthread_mutex_unlock(&group->mutex);
0378a9ab
     }
52e8d3c6
 }