/* * Copyright (C) 2013-2019 Cisco Systems, Inc. and/or its affiliates. All rights reserved. * Copyright (C) 2007-2013 Sourcefire, Inc. * * Authors: Trog, Török Edvin * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, * MA 02110-1301, USA. */ #if HAVE_CONFIG_H #include "clamav-config.h" #endif #include #include #include #include #include #include "shared/output.h" #include "libclamav/clamav.h" #include "thrmgr.h" #include "others.h" #include "mpool.h" #include "server.h" #include "libclamav/others.h" #ifdef HAVE_MALLINFO #include #endif /* BSD and HP-UX need a bigger stacksize than the system default */ #if defined(C_BSD) || defined(C_HPUX) || defined(C_AIX) || (defined(C_LINUX) && !defined(__GLIBC__)) #define C_BIGSTACK 1 #endif static work_queue_t *work_queue_new(void) { work_queue_t *work_q; work_q = (work_queue_t *)malloc(sizeof(work_queue_t)); if (!work_q) { return NULL; } work_q->head = work_q->tail = NULL; work_q->item_count = 0; work_q->popped = 0; return work_q; } static int work_queue_add(work_queue_t *work_q, void *data) { work_item_t *work_item; if (!work_q) { return FALSE; } work_item = (work_item_t *)malloc(sizeof(work_item_t)); if (!work_item) { return FALSE; } work_item->next = NULL; work_item->data = data; gettimeofday(&(work_item->time_queued), NULL); if (work_q->head == NULL) { work_q->head = work_q->tail = work_item; work_q->item_count = 1; } else { work_q->tail->next = work_item; work_q->tail = work_item; work_q->item_count++; } return TRUE; } static void *work_queue_pop(work_queue_t *work_q) { work_item_t *work_item; void *data; if (!work_q || !work_q->head) { return NULL; } work_item = work_q->head; data = work_item->data; work_q->head = work_item->next; if (work_q->head == NULL) { work_q->tail = NULL; } free(work_item); work_q->item_count--; return data; } static struct threadpool_list { threadpool_t *pool; struct threadpool_list *nxt; } *pools = NULL; static pthread_mutex_t pools_lock = PTHREAD_MUTEX_INITIALIZER; static void add_topools(threadpool_t *t) { struct threadpool_list *new = malloc(sizeof(*new)); if (!new) { logg("!Unable to add threadpool to list\n"); return; } new->pool = t; pthread_mutex_lock(&pools_lock); new->nxt = pools; pools = new; pthread_mutex_unlock(&pools_lock); } static void remove_frompools(threadpool_t *t) { struct threadpool_list *l, *prev; struct task_desc *desc; pthread_mutex_lock(&pools_lock); prev = NULL; l = pools; while (l && l->pool != t) { prev = l; l = l->nxt; } if (!l) { pthread_mutex_unlock(&pools_lock); return; } if (prev) prev->nxt = l->nxt; if (l == pools) pools = l->nxt; free(l); desc = t->tasks; while (desc) { struct task_desc *q = desc; desc = desc->nxt; free(q); } t->tasks = NULL; pthread_mutex_unlock(&pools_lock); } static void print_queue(int f, work_queue_t *queue, struct timeval *tv_now) { long umin = LONG_MAX, umax = 0, usum = 0; unsigned invalids = 0, cnt = 0; work_item_t *q; if (!queue->head) return; for (q = queue->head; q; q = q->next) { long delta; delta = tv_now->tv_usec - q->time_queued.tv_usec; delta += (tv_now->tv_sec - q->time_queued.tv_sec) * 1000000; if (delta < 0) { invalids++; continue; } if (delta > umax) umax = delta; if (delta < umin) umin = delta; usum += delta; ++cnt; } mdprintf(f, " min_wait: %.6f max_wait: %.6f avg_wait: %.6f", umin / 1e6, umax / 1e6, usum / (1e6 * cnt)); if (invalids) mdprintf(f, " (INVALID timestamps: %u)", invalids); if (cnt + invalids != (unsigned)queue->item_count) mdprintf(f, " (ERROR: %u != %u)", cnt + invalids, (unsigned)queue->item_count); } int thrmgr_printstats(int f, char term) { struct threadpool_list *l; unsigned cnt, pool_cnt = 0; size_t pool_used = 0, pool_total = 0, seen_cnt = 0, error_flag = 0; float mem_heap = 0, mem_mmap = 0, mem_used = 0, mem_free = 0, mem_releasable = 0; const struct cl_engine **seen = NULL; int has_libc_memstats = 0; pthread_mutex_lock(&pools_lock); for (cnt = 0, l = pools; l; l = l->nxt) cnt++; mdprintf(f, "POOLS: %u\n\n", cnt); for (l = pools; l && !error_flag; l = l->nxt) { threadpool_t *pool = l->pool; const char *state; struct timeval tv_now; struct task_desc *task; cnt = 0; if (!pool) { mdprintf(f, "NULL\n\n"); continue; } /* now we can access desc->, knowing that they won't get freed * because the other tasks can't quit while pool_mutex is taken */ switch (pool->state) { case POOL_INVALID: state = "INVALID"; break; case POOL_VALID: state = "VALID"; break; case POOL_EXIT: state = "EXIT"; break; default: state = "??"; break; } mdprintf(f, "STATE: %s %s\n", state, l->nxt ? "" : "PRIMARY"); mdprintf(f, "THREADS: live %u idle %u max %u idle-timeout %u\n", pool->thr_alive, pool->thr_idle, pool->thr_max, pool->idle_timeout); /* TODO: show both queues */ mdprintf(f, "QUEUE: %u items", pool->single_queue->item_count + pool->bulk_queue->item_count); gettimeofday(&tv_now, NULL); print_queue(f, pool->bulk_queue, &tv_now); print_queue(f, pool->single_queue, &tv_now); mdprintf(f, "\n"); for (task = pool->tasks; task; task = task->nxt) { double delta; size_t used, total; delta = tv_now.tv_usec - task->tv.tv_usec; delta += (tv_now.tv_sec - task->tv.tv_sec) * 1000000.0; mdprintf(f, "\t%s %f %s\n", task->command ? task->command : "N/A", delta / 1e6, task->filename ? task->filename : ""); if (task->engine) { /* we usually have at most 2 engines so a linear * search is good enough */ size_t i; for (i = 0; i < seen_cnt; i++) { if (seen[i] == task->engine) break; } /* we need to count the memusage from the same * engine only once */ if (i == seen_cnt) { const struct cl_engine **s; /* new engine */ ++seen_cnt; s = realloc(seen, seen_cnt * sizeof(*seen)); if (!s) { error_flag = 1; break; } seen = s; seen[seen_cnt - 1] = task->engine; if (MPOOL_GETSTATS(task->engine, &used, &total) != -1) { pool_used += used; pool_total += total; pool_cnt++; } } } } mdprintf(f, "\n"); } free(seen); #ifdef HAVE_MALLINFO { struct mallinfo inf = mallinfo(); mem_heap = inf.arena / (1024 * 1024.0); mem_mmap = inf.hblkhd / (1024 * 1024.0); mem_used = (inf.usmblks + inf.uordblks) / (1024 * 1024.0); mem_free = (inf.fsmblks + inf.fordblks) / (1024 * 1024.0); mem_releasable = inf.keepcost / (1024 * 1024.0); has_libc_memstats = 1; } #endif if (error_flag) { mdprintf(f, "ERROR: error encountered while formatting statistics\n"); } else { if (has_libc_memstats) mdprintf(f, "MEMSTATS: heap %.3fM mmap %.3fM used %.3fM free %.3fM releasable %.3fM pools %u pools_used %.3fM pools_total %.3fM\n", mem_heap, mem_mmap, mem_used, mem_free, mem_releasable, pool_cnt, pool_used / (1024 * 1024.0), pool_total / (1024 * 1024.0)); else mdprintf(f, "MEMSTATS: heap N/A mmap N/A used N/A free N/A releasable N/A pools %u pools_used %.3fM pools_total %.3fM\n", pool_cnt, pool_used / (1024 * 1024.0), pool_total / (1024 * 1024.0)); } mdprintf(f, "END%c", term); pthread_mutex_unlock(&pools_lock); return 0; } void thrmgr_destroy(threadpool_t *threadpool) { if (!threadpool) { return; } if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) { logg("!Mutex lock failed\n"); exit(-1); } if (threadpool->state != POOL_VALID) { if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) { logg("!Mutex unlock failed\n"); exit(-1); } return; } threadpool->state = POOL_EXIT; /* wait for threads to exit */ if (threadpool->thr_alive > 0) { if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) { pthread_mutex_unlock(&threadpool->pool_mutex); return; } } while (threadpool->thr_alive > 0) { if (pthread_cond_wait(&threadpool->pool_cond, &threadpool->pool_mutex) != 0) { pthread_mutex_unlock(&threadpool->pool_mutex); return; } } remove_frompools(threadpool); if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) { logg("!Mutex unlock failed\n"); exit(-1); } pthread_mutex_destroy(&(threadpool->pool_mutex)); pthread_cond_destroy(&(threadpool->idle_cond)); pthread_cond_destroy(&(threadpool->queueable_single_cond)); pthread_cond_destroy(&(threadpool->queueable_bulk_cond)); pthread_cond_destroy(&(threadpool->pool_cond)); pthread_attr_destroy(&(threadpool->pool_attr)); free(threadpool->single_queue); free(threadpool->bulk_queue); free(threadpool); return; } threadpool_t *thrmgr_new(int max_threads, int idle_timeout, int max_queue, void (*handler)(void *)) { threadpool_t *threadpool; #if defined(C_BIGSTACK) size_t stacksize; #endif if (max_threads <= 0) { return NULL; } threadpool = (threadpool_t *)malloc(sizeof(threadpool_t)); if (!threadpool) { return NULL; } threadpool->single_queue = work_queue_new(); if (!threadpool->single_queue) { free(threadpool); return NULL; } threadpool->bulk_queue = work_queue_new(); if (!threadpool->bulk_queue) { free(threadpool->single_queue); free(threadpool); return NULL; } threadpool->queue_max = max_queue; threadpool->thr_max = max_threads; threadpool->thr_alive = 0; threadpool->thr_idle = 0; threadpool->thr_multiscan = 0; threadpool->idle_timeout = idle_timeout; threadpool->handler = handler; threadpool->tasks = NULL; if (pthread_mutex_init(&(threadpool->pool_mutex), NULL)) { free(threadpool->single_queue); free(threadpool->bulk_queue); free(threadpool); return NULL; } if (pthread_cond_init(&(threadpool->pool_cond), NULL) != 0) { pthread_mutex_destroy(&(threadpool->pool_mutex)); free(threadpool->single_queue); free(threadpool->bulk_queue); free(threadpool); return NULL; } if (pthread_cond_init(&(threadpool->queueable_single_cond), NULL) != 0) { pthread_cond_destroy(&(threadpool->pool_cond)); pthread_mutex_destroy(&(threadpool->pool_mutex)); free(threadpool->single_queue); free(threadpool->bulk_queue); free(threadpool); return NULL; } if (pthread_cond_init(&(threadpool->queueable_bulk_cond), NULL) != 0) { pthread_cond_destroy(&(threadpool->queueable_single_cond)); pthread_cond_destroy(&(threadpool->pool_cond)); pthread_mutex_destroy(&(threadpool->pool_mutex)); free(threadpool->single_queue); free(threadpool->bulk_queue); free(threadpool); return NULL; } if (pthread_cond_init(&(threadpool->idle_cond), NULL) != 0) { pthread_cond_destroy(&(threadpool->queueable_single_cond)); pthread_cond_destroy(&(threadpool->queueable_bulk_cond)); pthread_cond_destroy(&(threadpool->pool_cond)); pthread_mutex_destroy(&(threadpool->pool_mutex)); free(threadpool->single_queue); free(threadpool->bulk_queue); free(threadpool); return NULL; } if (pthread_attr_init(&(threadpool->pool_attr)) != 0) { pthread_cond_destroy(&(threadpool->queueable_single_cond)); pthread_cond_destroy(&(threadpool->queueable_bulk_cond)); pthread_cond_destroy(&(threadpool->idle_cond)); pthread_cond_destroy(&(threadpool->pool_cond)); pthread_mutex_destroy(&(threadpool->pool_mutex)); free(threadpool->single_queue); free(threadpool->bulk_queue); free(threadpool); return NULL; } if (pthread_attr_setdetachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != 0) { pthread_cond_destroy(&(threadpool->queueable_single_cond)); pthread_cond_destroy(&(threadpool->queueable_bulk_cond)); pthread_attr_destroy(&(threadpool->pool_attr)); pthread_cond_destroy(&(threadpool->idle_cond)); pthread_cond_destroy(&(threadpool->pool_cond)); pthread_mutex_destroy(&(threadpool->pool_mutex)); free(threadpool->single_queue); free(threadpool->bulk_queue); free(threadpool); return NULL; } #if defined(C_BIGSTACK) pthread_attr_getstacksize(&(threadpool->pool_attr), &stacksize); stacksize = stacksize + 64 * 1024; if (stacksize < 1048576) /* at least 1MB please */ #if defined(C_HPUX) && defined(USE_MPOOL) /* Set aside one cli_pagesize() for the stack's pthread header, * giving a 1M region to fit a 1M large-page */ if (cli_getpagesize() < 1048576) stacksize = 1048576 - cli_getpagesize(); else #endif stacksize = 1048576; logg("Set stacksize to %lu\n", (unsigned long int)stacksize); pthread_attr_setstacksize(&(threadpool->pool_attr), stacksize); #endif threadpool->state = POOL_VALID; add_topools(threadpool); return threadpool; } static pthread_key_t stats_tls_key; static pthread_once_t stats_tls_key_once = PTHREAD_ONCE_INIT; static void stats_tls_key_alloc(void) { pthread_key_create(&stats_tls_key, NULL); } static const char *IDLE_TASK = "IDLE"; /* no mutex is needed, we are using thread local variable */ void thrmgr_setactivetask(const char *filename, const char *cmd) { struct task_desc *desc; pthread_once(&stats_tls_key_once, stats_tls_key_alloc); desc = pthread_getspecific(stats_tls_key); if (!desc) return; desc->filename = filename; if (cmd) { if (cmd == IDLE_TASK && desc->command == cmd) return; desc->command = cmd; gettimeofday(&desc->tv, NULL); } } void thrmgr_setactiveengine(const struct cl_engine *engine) { struct task_desc *desc; pthread_once(&stats_tls_key_once, stats_tls_key_alloc); desc = pthread_getspecific(stats_tls_key); if (!desc) return; desc->engine = engine; } /* thread pool mutex must be held on entry */ static void stats_init(threadpool_t *pool) { struct task_desc *desc = calloc(1, sizeof(*desc)); if (!desc) return; pthread_once(&stats_tls_key_once, stats_tls_key_alloc); pthread_setspecific(stats_tls_key, desc); if (!pool->tasks) pool->tasks = desc; else { desc->nxt = pool->tasks; pool->tasks->prv = desc; pool->tasks = desc; } } /* thread pool mutex must be held on entry */ static void stats_destroy(threadpool_t *pool) { struct task_desc *desc = pthread_getspecific(stats_tls_key); if (!desc) return; pthread_mutex_lock(&pools_lock); if (desc->prv) desc->prv->nxt = desc->nxt; if (desc->nxt) desc->nxt->prv = desc->prv; if (pool->tasks == desc) pool->tasks = desc->nxt; free(desc); pthread_setspecific(stats_tls_key, NULL); pthread_mutex_unlock(&pools_lock); } static inline int thrmgr_contended(threadpool_t *pool, int bulk) { /* don't allow bulk items to exceed 50% of queue, so that * non-bulk items get a chance to be in the queue */ if (bulk && pool->bulk_queue->item_count >= pool->queue_max / 2) return 1; return pool->bulk_queue->item_count + pool->single_queue->item_count + pool->thr_alive - pool->thr_idle >= pool->queue_max; } /* when both queues have tasks, it will pick 4 items from the single queue, * and 1 from the bulk */ #define SINGLE_BULK_RATIO 4 #define SINGLE_BULK_SUM (SINGLE_BULK_RATIO + 1) /* must be called with pool_mutex held */ static void *thrmgr_pop(threadpool_t *pool) { void *task; work_queue_t *first, *second; int ratio; if (pool->single_queue->popped < SINGLE_BULK_RATIO) { first = pool->single_queue; second = pool->bulk_queue; ratio = SINGLE_BULK_RATIO; } else { second = pool->single_queue; first = pool->bulk_queue; ratio = SINGLE_BULK_SUM - SINGLE_BULK_RATIO; } task = work_queue_pop(first); if (task) { if (++first->popped == ratio) second->popped = 0; } else { task = work_queue_pop(second); if (task) { if (++second->popped == ratio) first->popped = 0; } } if (!thrmgr_contended(pool, 0)) { logg("$THRMGR: queue (single) crossed low threshold -> signaling\n"); pthread_cond_signal(&pool->queueable_single_cond); } if (!thrmgr_contended(pool, 1)) { logg("$THRMGR: queue (bulk) crossed low threshold -> signaling\n"); pthread_cond_signal(&pool->queueable_bulk_cond); } return task; } static void *thrmgr_worker(void *arg) { threadpool_t *threadpool = (threadpool_t *)arg; void *job_data; int retval, must_exit = FALSE, stats_inited = FALSE; struct timespec timeout; /* loop looking for work */ for (;;) { if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) { logg("!Fatal: mutex lock failed\n"); exit(-2); } if (!stats_inited) { stats_init(threadpool); stats_inited = TRUE; } thrmgr_setactiveengine(NULL); thrmgr_setactivetask(NULL, IDLE_TASK); timeout.tv_sec = time(NULL) + threadpool->idle_timeout; timeout.tv_nsec = 0; threadpool->thr_idle++; while (((job_data = thrmgr_pop(threadpool)) == NULL) && (threadpool->state != POOL_EXIT)) { /* Sleep, awaiting wakeup */ pthread_cond_signal(&threadpool->idle_cond); retval = pthread_cond_timedwait(&(threadpool->pool_cond), &(threadpool->pool_mutex), &timeout); if (retval == ETIMEDOUT) { must_exit = TRUE; break; } } threadpool->thr_idle--; if (threadpool->state == POOL_EXIT) { must_exit = TRUE; } if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) { logg("!Fatal: mutex unlock failed\n"); exit(-2); } if (job_data) { threadpool->handler(job_data); } else if (must_exit) { break; } } if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) { /* Fatal error */ logg("!Fatal: mutex lock failed\n"); exit(-2); } threadpool->thr_alive--; if (threadpool->thr_alive == 0) { /* signal that all threads are finished */ pthread_cond_broadcast(&threadpool->pool_cond); } stats_destroy(threadpool); if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) { /* Fatal error */ logg("!Fatal: mutex unlock failed\n"); exit(-2); } return NULL; } static int thrmgr_dispatch_internal(threadpool_t *threadpool, void *user_data, int bulk) { int ret = TRUE; pthread_t thr_id; if (!threadpool) { return FALSE; } /* Lock the threadpool */ if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) { logg("!Mutex lock failed\n"); return FALSE; } do { work_queue_t *queue; pthread_cond_t *queueable_cond; int items; if (threadpool->state != POOL_VALID) { ret = FALSE; break; } if (bulk) { queue = threadpool->bulk_queue; queueable_cond = &threadpool->queueable_bulk_cond; } else { queue = threadpool->single_queue; queueable_cond = &threadpool->queueable_single_cond; } while (thrmgr_contended(threadpool, bulk)) { logg("$THRMGR: contended, sleeping\n"); pthread_cond_wait(queueable_cond, &threadpool->pool_mutex); logg("$THRMGR: contended, woken\n"); } if (!work_queue_add(queue, user_data)) { ret = FALSE; break; } items = threadpool->single_queue->item_count + threadpool->bulk_queue->item_count; if ((threadpool->thr_idle < items) && (threadpool->thr_alive < threadpool->thr_max)) { /* Start a new thread */ if (pthread_create(&thr_id, &(threadpool->pool_attr), thrmgr_worker, threadpool) != 0) { logg("!pthread_create failed\n"); } else { threadpool->thr_alive++; } } pthread_cond_signal(&(threadpool->pool_cond)); } while (0); if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) { logg("!Mutex unlock failed\n"); return FALSE; } return ret; } int thrmgr_dispatch(threadpool_t *threadpool, void *user_data) { return thrmgr_dispatch_internal(threadpool, user_data, 0); } int thrmgr_group_dispatch(threadpool_t *threadpool, jobgroup_t *group, void *user_data, int bulk) { int ret; if (group) { pthread_mutex_lock(&group->mutex); group->jobs++; logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs); pthread_mutex_unlock(&group->mutex); } if (!(ret = thrmgr_dispatch_internal(threadpool, user_data, bulk)) && group) { pthread_mutex_lock(&group->mutex); group->jobs--; logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs); pthread_mutex_unlock(&group->mutex); } return ret; } /* returns * 0 - this was not the last thread in the group * 1 - this was last thread in group, group freed */ int thrmgr_group_finished(jobgroup_t *group, enum thrmgr_exit exitc) { int ret = 0; if (!group) { /* there is no group, we are obviously the last one */ return 1; } pthread_mutex_lock(&group->mutex); logg("$THRMGR: group_finished: %p, %d\n", group, group->jobs); group->exit_total++; switch (exitc) { case EXIT_OK: group->exit_ok++; break; case EXIT_ERROR: group->exit_error++; break; default: break; } if (group->jobs) { if (!--group->jobs) { ret = 1; } else logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs); if (group->jobs == 1) pthread_cond_signal(&group->only); } pthread_mutex_unlock(&group->mutex); if (ret) { logg("$THRMGR: group_finished: freeing %p\n", group); pthread_mutex_destroy(&group->mutex); pthread_cond_destroy(&group->only); free(group); } return ret; } void thrmgr_group_waitforall(jobgroup_t *group, unsigned *ok, unsigned *error, unsigned *total) { int needexit = 0, needfree = 0; struct timespec timeout; pthread_mutex_lock(&group->mutex); while (group->jobs > 1) { pthread_mutex_lock(&exit_mutex); needexit = progexit; pthread_mutex_unlock(&exit_mutex); if (needexit) break; /* wake to check progexit */ timeout.tv_sec = time(NULL) + 5; timeout.tv_nsec = 0; pthread_cond_timedwait(&group->only, &group->mutex, &timeout); } *ok = group->exit_ok; *error = group->exit_error + needexit; *total = group->exit_total; if (!--group->jobs) needfree = 1; else logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs); pthread_mutex_unlock(&group->mutex); if (needfree) { logg("$THRMGR: group finished freeing %p\n", group); free(group); } } jobgroup_t *thrmgr_group_new(void) { jobgroup_t *group; group = malloc(sizeof(*group)); if (!group) return NULL; group->jobs = 1; group->exit_ok = group->exit_error = group->exit_total = group->force_exit = 0; if (pthread_mutex_init(&group->mutex, NULL)) { logg("^Failed to initialize group mutex"); free(group); return NULL; } if (pthread_cond_init(&group->only, NULL)) { logg("^Failed to initialize group cond"); pthread_mutex_destroy(&group->mutex); free(group); return NULL; } logg("$THRMGR: new group: %p\n", group); return group; } int thrmgr_group_need_terminate(jobgroup_t *group) { int ret; if (group) { pthread_mutex_lock(&group->mutex); ret = group->force_exit; pthread_mutex_unlock(&group->mutex); } else ret = 0; pthread_mutex_lock(&exit_mutex); ret |= progexit; pthread_mutex_unlock(&exit_mutex); return ret; } void thrmgr_group_terminate(jobgroup_t *group) { if (group) { /* we may not be the last active job, now * the last active job will free resources */ pthread_mutex_lock(&group->mutex); group->force_exit = 1; pthread_mutex_unlock(&group->mutex); } }