c238ac42 |
/* |
086eab5c |
* Copyright (C) 2007-2009 Sourcefire, Inc.
*
* Authors: Trog, Török Edvin |
c238ac42 |
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software |
48b7b4a7 |
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301, USA. |
c238ac42 |
*/ |
52e8d3c6 |
|
67118e92 |
#if HAVE_CONFIG_H
#include "clamav-config.h"
#endif
|
bd8603aa |
#include <stdio.h> |
c238ac42 |
#include <pthread.h>
#include <time.h>
#include <errno.h> |
949c6fe5 |
#include <string.h> |
c238ac42 |
|
bd8603aa |
#include "shared/output.h" |
c238ac42 |
|
bd8603aa |
#include "thrmgr.h" |
52e8d3c6 |
#include "others.h" |
deb30312 |
#include "mpool.h" |
949c6fe5 |
#include "server.h" |
18f620f2 |
#include "libclamav/others.h" |
c238ac42 |
|
4b93f2b6 |
#ifdef HAVE_MALLINFO |
aa22174b |
#include <malloc.h>
#endif
|
efac2f09 |
/* BSD and HP-UX need a bigger stacksize than the system default */ |
75ce59c1 |
#if defined (C_BSD) || defined (C_HPUX) || defined(C_AIX) |
efac2f09 |
#define C_BIGSTACK 1
#endif
|
079229d6 |
static work_queue_t *work_queue_new(void) |
52e8d3c6 |
{
work_queue_t *work_q; |
64cb3807 |
|
8ca8a18e |
work_q = (work_queue_t *) malloc(sizeof(work_queue_t)); |
c3b33e5a |
if (!work_q) {
return NULL;
} |
64cb3807 |
|
52e8d3c6 |
work_q->head = work_q->tail = NULL;
work_q->item_count = 0; |
949c6fe5 |
work_q->popped = 0; |
52e8d3c6 |
return work_q; |
c238ac42 |
}
|
c3b33e5a |
static int work_queue_add(work_queue_t *work_q, void *data) |
c238ac42 |
{ |
52e8d3c6 |
work_item_t *work_item; |
64cb3807 |
|
52e8d3c6 |
if (!work_q) { |
c3b33e5a |
return FALSE; |
52e8d3c6 |
} |
8ca8a18e |
work_item = (work_item_t *) malloc(sizeof(work_item_t)); |
c3b33e5a |
if (!work_item) {
return FALSE;
} |
64cb3807 |
|
52e8d3c6 |
work_item->next = NULL;
work_item->data = data;
gettimeofday(&(work_item->time_queued), NULL); |
64cb3807 |
|
52e8d3c6 |
if (work_q->head == NULL) {
work_q->head = work_q->tail = work_item;
work_q->item_count = 1;
} else {
work_q->tail->next = work_item;
work_q->tail = work_item;
work_q->item_count++;
} |
c3b33e5a |
return TRUE; |
c238ac42 |
}
|
079229d6 |
static void *work_queue_pop(work_queue_t *work_q) |
c238ac42 |
{ |
52e8d3c6 |
work_item_t *work_item;
void *data; |
64cb3807 |
|
52e8d3c6 |
if (!work_q || !work_q->head) {
return NULL;
}
work_item = work_q->head;
data = work_item->data;
work_q->head = work_item->next;
if (work_q->head == NULL) {
work_q->tail = NULL;
}
free(work_item); |
4270f93b |
work_q->item_count--; |
52e8d3c6 |
return data; |
c238ac42 |
}
|
aa22174b |
static struct threadpool_list {
threadpool_t *pool;
struct threadpool_list *nxt;
} *pools = NULL;
static pthread_mutex_t pools_lock = PTHREAD_MUTEX_INITIALIZER;
static void add_topools(threadpool_t *t)
{
struct threadpool_list *new = malloc(sizeof(*new));
if(!new) {
logg("!Unable to add threadpool to list\n");
return;
}
new->pool = t;
pthread_mutex_lock(&pools_lock);
new->nxt = pools;
pools = new;
pthread_mutex_unlock(&pools_lock);
}
static void remove_frompools(threadpool_t *t)
{
struct threadpool_list *l, *prev;
struct task_desc *desc;
pthread_mutex_lock(&pools_lock);
prev = NULL;
l = pools;
while(l && l->pool != t) {
prev = l;
l = l->nxt;
} |
b2c04b6c |
if(!l) {
pthread_mutex_unlock(&pools_lock); |
aa22174b |
return; |
b2c04b6c |
} |
aa22174b |
if(prev)
prev->nxt = l->nxt;
if(l == pools)
pools = l->nxt;
free(l);
desc = t->tasks;
while(desc) {
struct task_desc *q = desc;
desc = desc->nxt;
free(q);
}
t->tasks = NULL;
pthread_mutex_unlock(&pools_lock);
}
|
949c6fe5 |
static void print_queue(int f, work_queue_t *queue, struct timeval *tv_now)
{ |
61637120 |
long umin=LONG_MAX, umax=0, usum=0; |
949c6fe5 |
unsigned invalids = 0, cnt = 0;
work_item_t *q;
if(!queue->head)
return;
for(q=queue->head;q;q=q->next) {
long delta;
delta = tv_now->tv_usec - q->time_queued.tv_usec;
delta += (tv_now->tv_sec - q->time_queued.tv_sec)*1000000;
if(delta < 0) {
invalids++;
continue;
}
if(delta > umax)
umax = delta;
if(delta < umin)
umin = delta;
usum += delta;
++cnt;
}
mdprintf(f," min_wait: %.6f max_wait: %.6f avg_wait: %.6f",
umin/1e6, umax/1e6, usum /(1e6*cnt));
if(invalids)
mdprintf(f," (INVALID timestamps: %u)", invalids);
if(cnt + invalids != (unsigned)queue->item_count)
mdprintf(f," (ERROR: %u != %u)", cnt + invalids,
(unsigned)queue->item_count);
}
|
f2381892 |
int thrmgr_printstats(int f, char term) |
aa22174b |
{
struct threadpool_list *l; |
b9b47784 |
unsigned cnt, pool_cnt = 0;
size_t pool_used = 0, pool_total = 0, seen_cnt = 0, error_flag = 0; |
deb30312 |
float mem_heap = 0, mem_mmap = 0, mem_used = 0, mem_free = 0, mem_releasable = 0;
const struct cl_engine **seen = NULL; |
59deef7d |
int has_libc_memstats = 0; |
deb30312 |
|
aa22174b |
pthread_mutex_lock(&pools_lock);
for(cnt=0,l=pools;l;l=l->nxt) cnt++; |
6bdb35ea |
mdprintf(f,"POOLS: %u\n\n", cnt); |
deb30312 |
for(l= pools;l && !error_flag;l = l->nxt) { |
aa22174b |
threadpool_t *pool = l->pool;
const char *state;
struct timeval tv_now;
struct task_desc *task; |
5a1034b9 |
cnt = 0; |
aa22174b |
if(!pool) { |
6bdb35ea |
mdprintf(f,"NULL\n\n"); |
aa22174b |
continue;
} |
deb30312 |
/* now we can access desc->, knowing that they won't get freed
* because the other tasks can't quit while pool_mutex is taken
*/ |
aa22174b |
switch(pool->state) {
case POOL_INVALID:
state = "INVALID";
break;
case POOL_VALID:
state = "VALID";
break;
case POOL_EXIT:
state = "EXIT";
break; |
b9b47784 |
default:
state = "??";
break; |
aa22174b |
} |
6bdb35ea |
mdprintf(f, "STATE: %s %s\n", state, l->nxt ? "" : "PRIMARY");
mdprintf(f, "THREADS: live %u idle %u max %u idle-timeout %u\n" |
aa22174b |
,pool->thr_alive, pool->thr_idle, pool->thr_max,
pool->idle_timeout); |
949c6fe5 |
/* TODO: show both queues */
mdprintf(f,"QUEUE: %u items", pool->single_queue->item_count + pool->bulk_queue->item_count); |
aa22174b |
gettimeofday(&tv_now, NULL); |
949c6fe5 |
print_queue(f, pool->bulk_queue, &tv_now);
print_queue(f, pool->single_queue, &tv_now); |
6bdb35ea |
mdprintf(f, "\n"); |
aa22174b |
for(task = pool->tasks; task; task = task->nxt) { |
cab1b475 |
double delta; |
deb30312 |
size_t used, total;
|
aa22174b |
delta = tv_now.tv_usec - task->tv.tv_usec; |
cab1b475 |
delta += (tv_now.tv_sec - task->tv.tv_sec)*1000000.0; |
6bdb35ea |
mdprintf(f,"\t%s %f %s\n", |
aa22174b |
task->command ? task->command : "N/A",
delta/1e6,
task->filename ? task->filename:""); |
deb30312 |
if (task->engine) {
/* we usually have at most 2 engines so a linear
* search is good enough */
size_t i;
for (i=0;i<seen_cnt;i++) {
if (seen[i] == task->engine)
break;
}
/* we need to count the memusage from the same
* engine only once */
if (i == seen_cnt) {
const struct cl_engine **s;
/* new engine */
++seen_cnt;
s = realloc(seen, seen_cnt * sizeof(*seen));
if (!s) {
error_flag = 1;
break;
}
seen = s;
seen[seen_cnt - 1] = task->engine;
|
47d40feb |
if (mpool_getstats(task->engine, &used, &total) != -1) { |
deb30312 |
pool_used += used;
pool_total += total;
pool_cnt++;
}
}
} |
aa22174b |
} |
6bdb35ea |
mdprintf(f,"\n"); |
aa22174b |
} |
deb30312 |
free(seen); |
4b93f2b6 |
#ifdef HAVE_MALLINFO |
aa22174b |
{
struct mallinfo inf = mallinfo(); |
deb30312 |
mem_heap = inf.arena/(1024*1024.0);
mem_mmap = inf.hblkhd/(1024*1024.0); |
4b93f2b6 |
mem_used = (inf.usmblks + inf.uordblks)/(1024*1024.0);
mem_free = (inf.fsmblks + inf.fordblks)/(1024*1024.0); |
deb30312 |
mem_releasable = inf.keepcost/(1024*1024.0); |
59deef7d |
has_libc_memstats=1; |
aa22174b |
}
#endif |
deb30312 |
if (error_flag) {
mdprintf(f, "ERROR: error encountered while formatting statistics\n");
} else { |
59deef7d |
if (has_libc_memstats) |
deb30312 |
mdprintf(f,"MEMSTATS: heap %.3fM mmap %.3fM used %.3fM free %.3fM releasable %.3fM pools %u pools_used %.3fM pools_total %.3fM\n",
mem_heap, mem_mmap, mem_used, mem_free, mem_releasable, pool_cnt,
pool_used/(1024*1024.0), pool_total/(1024*1024.0)); |
59deef7d |
else
mdprintf(f,"MEMSTATS: heap N/A mmap N/A used N/A free N/A releasable N/A pools %u pools_used %.3fM pools_total %.3fM\n",
pool_cnt, pool_used/(1024*1024.0), pool_total/(1024*1024.0)); |
deb30312 |
} |
f2381892 |
mdprintf(f,"END%c", term); |
aa22174b |
pthread_mutex_unlock(&pools_lock);
return 0;
}
|
52e8d3c6 |
void thrmgr_destroy(threadpool_t *threadpool) |
c238ac42 |
{ |
b8edbb88 |
if (!threadpool) { |
52e8d3c6 |
return;
} |
64cb3807 |
if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
logg("!Mutex lock failed\n");
exit(-1); |
52e8d3c6 |
} |
b8edbb88 |
if(threadpool->state != POOL_VALID) {
if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
logg("!Mutex unlock failed\n");
exit(-1);
}
return;
} |
52e8d3c6 |
threadpool->state = POOL_EXIT; |
64cb3807 |
|
52e8d3c6 |
/* wait for threads to exit */
if (threadpool->thr_alive > 0) {
if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) {
pthread_mutex_unlock(&threadpool->pool_mutex);
return;
}
}
while (threadpool->thr_alive > 0) {
if (pthread_cond_wait (&threadpool->pool_cond, &threadpool->pool_mutex) != 0) {
pthread_mutex_unlock(&threadpool->pool_mutex);
return;
}
} |
aa22174b |
remove_frompools(threadpool); |
64cb3807 |
if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
logg("!Mutex unlock failed\n");
exit(-1);
}
|
52e8d3c6 |
pthread_mutex_destroy(&(threadpool->pool_mutex)); |
949c6fe5 |
pthread_cond_destroy(&(threadpool->idle_cond)); |
6a5ec1f9 |
pthread_cond_destroy(&(threadpool->queueable_single_cond));
pthread_cond_destroy(&(threadpool->queueable_bulk_cond)); |
52e8d3c6 |
pthread_cond_destroy(&(threadpool->pool_cond));
pthread_attr_destroy(&(threadpool->pool_attr)); |
949c6fe5 |
free(threadpool->single_queue);
free(threadpool->bulk_queue); |
52e8d3c6 |
free(threadpool);
return; |
c238ac42 |
}
|
949c6fe5 |
threadpool_t *thrmgr_new(int max_threads, int idle_timeout, int max_queue, void (*handler)(void *)) |
c238ac42 |
{ |
52e8d3c6 |
threadpool_t *threadpool; |
efac2f09 |
#if defined(C_BIGSTACK) |
7b04a82c |
size_t stacksize;
#endif |
64cb3807 |
|
52e8d3c6 |
if (max_threads <= 0) {
return NULL;
} |
64cb3807 |
|
8ca8a18e |
threadpool = (threadpool_t *) malloc(sizeof(threadpool_t)); |
c3b33e5a |
if (!threadpool) {
return NULL;
} |
c238ac42 |
|
949c6fe5 |
threadpool->single_queue = work_queue_new();
if (!threadpool->single_queue) { |
52e8d3c6 |
free(threadpool);
return NULL; |
64cb3807 |
} |
949c6fe5 |
threadpool->bulk_queue = work_queue_new();
if (!threadpool->bulk_queue) {
free(threadpool->single_queue);
free(threadpool);
return NULL;
}
threadpool->queue_max = max_queue;
|
52e8d3c6 |
threadpool->thr_max = max_threads;
threadpool->thr_alive = 0;
threadpool->thr_idle = 0; |
80301d0c |
threadpool->thr_multiscan = 0; |
52e8d3c6 |
threadpool->idle_timeout = idle_timeout;
threadpool->handler = handler; |
aa22174b |
threadpool->tasks = NULL; |
64cb3807 |
|
cc4232a3 |
if(pthread_mutex_init(&(threadpool->pool_mutex), NULL)) { |
949c6fe5 |
free(threadpool->single_queue);
free(threadpool->bulk_queue); |
cc4232a3 |
free(threadpool);
return NULL;
}
|
52e8d3c6 |
if (pthread_cond_init(&(threadpool->pool_cond), NULL) != 0) { |
c3b33e5a |
pthread_mutex_destroy(&(threadpool->pool_mutex)); |
949c6fe5 |
free(threadpool->single_queue);
free(threadpool->bulk_queue);
free(threadpool);
return NULL;
}
|
6a5ec1f9 |
if (pthread_cond_init(&(threadpool->queueable_single_cond), NULL) != 0) { |
949c6fe5 |
pthread_cond_destroy(&(threadpool->pool_cond));
pthread_mutex_destroy(&(threadpool->pool_mutex));
free(threadpool->single_queue);
free(threadpool->bulk_queue); |
52e8d3c6 |
free(threadpool);
return NULL;
} |
d6df9ffb |
|
6a5ec1f9 |
if (pthread_cond_init(&(threadpool->queueable_bulk_cond), NULL) != 0) {
pthread_cond_destroy(&(threadpool->queueable_single_cond));
pthread_cond_destroy(&(threadpool->pool_cond));
pthread_mutex_destroy(&(threadpool->pool_mutex));
free(threadpool->single_queue);
free(threadpool->bulk_queue);
free(threadpool);
return NULL;
}
|
d6df9ffb |
if (pthread_cond_init(&(threadpool->idle_cond),NULL) != 0) { |
6a5ec1f9 |
pthread_cond_destroy(&(threadpool->queueable_single_cond));
pthread_cond_destroy(&(threadpool->queueable_bulk_cond)); |
d6df9ffb |
pthread_cond_destroy(&(threadpool->pool_cond));
pthread_mutex_destroy(&(threadpool->pool_mutex)); |
949c6fe5 |
free(threadpool->single_queue);
free(threadpool->bulk_queue); |
d6df9ffb |
free(threadpool);
return NULL;
}
|
52e8d3c6 |
if (pthread_attr_init(&(threadpool->pool_attr)) != 0) { |
6a5ec1f9 |
pthread_cond_destroy(&(threadpool->queueable_single_cond));
pthread_cond_destroy(&(threadpool->queueable_bulk_cond)); |
d6df9ffb |
pthread_cond_destroy(&(threadpool->idle_cond)); |
c3b33e5a |
pthread_cond_destroy(&(threadpool->pool_cond));
pthread_mutex_destroy(&(threadpool->pool_mutex)); |
949c6fe5 |
free(threadpool->single_queue);
free(threadpool->bulk_queue); |
52e8d3c6 |
free(threadpool);
return NULL;
} |
64cb3807 |
|
52e8d3c6 |
if (pthread_attr_setdetachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != 0) { |
6a5ec1f9 |
pthread_cond_destroy(&(threadpool->queueable_single_cond));
pthread_cond_destroy(&(threadpool->queueable_bulk_cond)); |
c3b33e5a |
pthread_attr_destroy(&(threadpool->pool_attr)); |
d6df9ffb |
pthread_cond_destroy(&(threadpool->idle_cond)); |
c3b33e5a |
pthread_cond_destroy(&(threadpool->pool_cond));
pthread_mutex_destroy(&(threadpool->pool_mutex)); |
949c6fe5 |
free(threadpool->single_queue);
free(threadpool->bulk_queue); |
52e8d3c6 |
free(threadpool);
return NULL;
} |
7b04a82c |
|
efac2f09 |
#if defined(C_BIGSTACK) |
7b04a82c |
pthread_attr_getstacksize(&(threadpool->pool_attr), &stacksize);
stacksize = stacksize + 64 * 1024; |
18f620f2 |
if (stacksize < 1048576) /* at least 1MB please */
#if defined(C_HPUX) && defined(USE_MPOOL)
/* Set aside one cli_pagesize() for the stack's pthread header,
* giving a 1M region to fit a 1M large-page */
if(cli_getpagesize() < 1048576)
stacksize = 1048576 - cli_getpagesize();
else
#endif
stacksize = 1048576; |
e78b5186 |
logg("Set stacksize to %lu\n", (unsigned long int) stacksize); |
7b04a82c |
pthread_attr_setstacksize(&(threadpool->pool_attr), stacksize);
#endif |
52e8d3c6 |
threadpool->state = POOL_VALID; |
c238ac42 |
|
aa22174b |
add_topools(threadpool); |
52e8d3c6 |
return threadpool;
} |
c238ac42 |
|
aa22174b |
static pthread_key_t stats_tls_key;
static pthread_once_t stats_tls_key_once = PTHREAD_ONCE_INIT;
static void stats_tls_key_alloc(void)
{
pthread_key_create(&stats_tls_key, NULL);
}
static const char *IDLE_TASK = "IDLE"; |
764f76c8 |
/* no mutex is needed, we are using thread local variable */ |
949c6fe5 |
void thrmgr_setactivetask(const char *filename, const char* cmd) |
aa22174b |
{ |
15adbc84 |
struct task_desc *desc;
pthread_once(&stats_tls_key_once, stats_tls_key_alloc);
desc = pthread_getspecific(stats_tls_key); |
aa22174b |
if(!desc)
return;
desc->filename = filename; |
949c6fe5 |
if(cmd) {
if(cmd == IDLE_TASK && desc->command == cmd) |
aa22174b |
return; |
949c6fe5 |
desc->command = cmd; |
aa22174b |
gettimeofday(&desc->tv, NULL);
}
}
|
deb30312 |
void thrmgr_setactiveengine(const struct cl_engine *engine)
{ |
15adbc84 |
struct task_desc *desc;
pthread_once(&stats_tls_key_once, stats_tls_key_alloc);
desc = pthread_getspecific(stats_tls_key); |
deb30312 |
if(!desc)
return;
desc->engine = engine;
}
|
764f76c8 |
/* thread pool mutex must be held on entry */ |
aa22174b |
static void stats_init(threadpool_t *pool)
{
struct task_desc *desc = calloc(1, sizeof(*desc));
if(!desc)
return;
pthread_once(&stats_tls_key_once, stats_tls_key_alloc);
pthread_setspecific(stats_tls_key, desc);
if(!pool->tasks)
pool->tasks = desc;
else {
desc->nxt = pool->tasks;
pool->tasks->prv = desc;
pool->tasks = desc;
}
}
|
764f76c8 |
/* thread pool mutex must be held on entry */ |
aa22174b |
static void stats_destroy(threadpool_t *pool)
{
struct task_desc *desc = pthread_getspecific(stats_tls_key);
if(!desc)
return; |
b25fdae4 |
pthread_mutex_lock(&pools_lock); |
aa22174b |
if(desc->prv)
desc->prv->nxt = desc->nxt;
if(desc->nxt)
desc->nxt->prv = desc->prv;
if(pool->tasks == desc)
pool->tasks = desc->nxt;
free(desc); |
15adbc84 |
pthread_setspecific(stats_tls_key, NULL); |
b25fdae4 |
pthread_mutex_unlock(&pools_lock); |
aa22174b |
}
|
6a5ec1f9 |
static inline int thrmgr_contended(threadpool_t *pool, int bulk) |
949c6fe5 |
{ |
6a5ec1f9 |
/* don't allow bulk items to exceed 50% of queue, so that
* non-bulk items get a chance to be in the queue */
if (bulk && pool->bulk_queue->item_count >= pool->queue_max/2)
return 1; |
949c6fe5 |
return pool->bulk_queue->item_count + pool->single_queue->item_count
+ pool->thr_alive - pool->thr_idle >= pool->queue_max;
}
/* when both queues have tasks, it will pick 4 items from the single queue,
* and 1 from the bulk */
#define SINGLE_BULK_RATIO 4
#define SINGLE_BULK_SUM (SINGLE_BULK_RATIO + 1)
/* must be called with pool_mutex held */
static void *thrmgr_pop(threadpool_t *pool)
{
void *task;
work_queue_t *first, *second;
int ratio;
if (pool->single_queue->popped < SINGLE_BULK_RATIO) {
first = pool->single_queue;
second = pool->bulk_queue;
ratio = SINGLE_BULK_RATIO;
} else {
second = pool->single_queue;
first = pool->bulk_queue;
ratio = SINGLE_BULK_SUM - SINGLE_BULK_RATIO;
}
task = work_queue_pop(first);
if (task) {
if (++first->popped == ratio)
second->popped = 0;
} else {
task = work_queue_pop(second);
if (task) {
if (++second->popped == ratio)
first->popped = 0;
}
}
|
6a5ec1f9 |
if (!thrmgr_contended(pool, 0)) {
logg("$THRMGR: queue (single) crossed low threshold -> signaling\n");
pthread_cond_signal(&pool->queueable_single_cond);
}
if (!thrmgr_contended(pool, 1)) {
logg("$THRMGR: queue (bulk) crossed low threshold -> signaling\n");
pthread_cond_signal(&pool->queueable_bulk_cond); |
949c6fe5 |
}
return task;
}
|
fc83da82 |
static void *thrmgr_worker(void *arg) |
52e8d3c6 |
{
threadpool_t *threadpool = (threadpool_t *) arg;
void *job_data; |
aa22174b |
int retval, must_exit = FALSE, stats_inited = FALSE; |
52e8d3c6 |
struct timespec timeout; |
64cb3807 |
|
52e8d3c6 |
/* loop looking for work */
for (;;) {
if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) { |
10b04232 |
logg("!Fatal: mutex lock failed\n"); |
52e8d3c6 |
exit(-2);
} |
aa22174b |
if(!stats_inited) {
stats_init(threadpool);
stats_inited = TRUE;
} |
15adbc84 |
thrmgr_setactiveengine(NULL); |
aa22174b |
thrmgr_setactivetask(NULL, IDLE_TASK); |
52e8d3c6 |
timeout.tv_sec = time(NULL) + threadpool->idle_timeout;
timeout.tv_nsec = 0;
threadpool->thr_idle++; |
949c6fe5 |
while (((job_data=thrmgr_pop(threadpool)) == NULL) |
52e8d3c6 |
&& (threadpool->state != POOL_EXIT)) {
/* Sleep, awaiting wakeup */ |
d6df9ffb |
pthread_cond_signal(&threadpool->idle_cond); |
52e8d3c6 |
retval = pthread_cond_timedwait(&(threadpool->pool_cond),
&(threadpool->pool_mutex), &timeout); |
5a66732f |
if (retval == ETIMEDOUT) { |
1593523e |
must_exit = TRUE; |
5a66732f |
break;
} |
52e8d3c6 |
}
threadpool->thr_idle--; |
1593523e |
if (threadpool->state == POOL_EXIT) {
must_exit = TRUE; |
52e8d3c6 |
} |
04ba76d2 |
|
52e8d3c6 |
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) { |
10b04232 |
logg("!Fatal: mutex unlock failed\n"); |
52e8d3c6 |
exit(-2);
} |
5a66732f |
if (job_data) {
threadpool->handler(job_data);
} else if (must_exit) {
break;
}
}
if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
/* Fatal error */
logg("!Fatal: mutex lock failed\n");
exit(-2);
}
threadpool->thr_alive--;
if (threadpool->thr_alive == 0) {
/* signal that all threads are finished */
pthread_cond_broadcast(&threadpool->pool_cond);
} |
aa22174b |
stats_destroy(threadpool); |
5a66732f |
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
/* Fatal error */
logg("!Fatal: mutex unlock failed\n");
exit(-2); |
52e8d3c6 |
}
return NULL; |
c238ac42 |
}
|
949c6fe5 |
static int thrmgr_dispatch_internal(threadpool_t *threadpool, void *user_data, int bulk) |
52e8d3c6 |
{ |
949c6fe5 |
int ret = TRUE; |
52e8d3c6 |
pthread_t thr_id; |
9fa64734 |
|
52e8d3c6 |
if (!threadpool) {
return FALSE;
} |
9fa64734 |
|
52e8d3c6 |
/* Lock the threadpool */
if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) { |
1593523e |
logg("!Mutex lock failed\n"); |
52e8d3c6 |
return FALSE;
} |
9fa64734 |
|
949c6fe5 |
do {
work_queue_t *queue; |
6a5ec1f9 |
pthread_cond_t *queueable_cond; |
949c6fe5 |
int items;
if (threadpool->state != POOL_VALID) {
ret = FALSE;
break;
}
|
6a5ec1f9 |
if (bulk) { |
949c6fe5 |
queue = threadpool->bulk_queue; |
6a5ec1f9 |
queueable_cond = &threadpool->queueable_bulk_cond;
} else { |
949c6fe5 |
queue = threadpool->single_queue; |
6a5ec1f9 |
queueable_cond = &threadpool->queueable_single_cond;
} |
9fa64734 |
|
6a5ec1f9 |
while (thrmgr_contended(threadpool, bulk)) { |
fb6fe4f5 |
logg("$THRMGR: contended, sleeping\n"); |
6a5ec1f9 |
pthread_cond_wait(queueable_cond, &threadpool->pool_mutex); |
fb6fe4f5 |
logg("$THRMGR: contended, woken\n"); |
949c6fe5 |
}
if (!work_queue_add(queue, user_data)) {
ret = FALSE;
break;
}
items = threadpool->single_queue->item_count + threadpool->bulk_queue->item_count;
if ((threadpool->thr_idle < items) &&
(threadpool->thr_alive < threadpool->thr_max)) { |
52e8d3c6 |
/* Start a new thread */
if (pthread_create(&thr_id, &(threadpool->pool_attr), |
949c6fe5 |
thrmgr_worker, threadpool) != 0) {
logg("!pthread_create failed\n"); |
52e8d3c6 |
} else { |
949c6fe5 |
threadpool->thr_alive++; |
52e8d3c6 |
} |
949c6fe5 |
}
pthread_cond_signal(&(threadpool->pool_cond));
} while (0); |
9fa64734 |
|
52e8d3c6 |
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) { |
949c6fe5 |
logg("!Mutex unlock failed\n");
return FALSE; |
52e8d3c6 |
} |
949c6fe5 |
return ret;
}
int thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
{
return thrmgr_dispatch_internal(threadpool, user_data, 0);
}
|
1514794c |
int thrmgr_group_dispatch(threadpool_t *threadpool, jobgroup_t *group, void *user_data, int bulk) |
949c6fe5 |
{
int ret;
if (group) {
pthread_mutex_lock(&group->mutex);
group->jobs++; |
fb6fe4f5 |
logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs); |
949c6fe5 |
pthread_mutex_unlock(&group->mutex);
} |
1514794c |
if (!(ret = thrmgr_dispatch_internal(threadpool, user_data, bulk)) && group) { |
949c6fe5 |
pthread_mutex_lock(&group->mutex);
group->jobs--; |
fb6fe4f5 |
logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs); |
949c6fe5 |
pthread_mutex_unlock(&group->mutex);
}
return ret;
}
/* returns
* 0 - this was not the last thread in the group
* 1 - this was last thread in group, group freed
*/
int thrmgr_group_finished(jobgroup_t *group, enum thrmgr_exit exitc)
{
int ret = 0;
if (!group) {
/* there is no group, we are obviously the last one */
return 1;
}
pthread_mutex_lock(&group->mutex); |
fb6fe4f5 |
logg("$THRMGR: group_finished: %p, %d\n", group, group->jobs); |
949c6fe5 |
group->exit_total++;
switch (exitc) {
case EXIT_OK:
group->exit_ok++;
break;
case EXIT_ERROR:
group->exit_error++;
break; |
e4a0f2c9 |
default:
break; |
949c6fe5 |
}
if (group->jobs) {
if (!--group->jobs) {
ret = 1; |
fb6fe4f5 |
} else
logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs); |
949c6fe5 |
if (group->jobs == 1)
pthread_cond_signal(&group->only);
}
pthread_mutex_unlock(&group->mutex);
if (ret) { |
fb6fe4f5 |
logg("$THRMGR: group_finished: freeing %p\n", group); |
7660b7cb |
pthread_mutex_destroy(&group->mutex);
pthread_cond_destroy(&group->only); |
949c6fe5 |
free(group);
}
return ret;
}
void thrmgr_group_waitforall(jobgroup_t *group, unsigned *ok, unsigned *error, unsigned *total)
{
int needexit = 0, needfree = 0;
struct timespec timeout;
pthread_mutex_lock(&group->mutex);
while (group->jobs > 1) {
pthread_mutex_lock(&exit_mutex);
needexit = progexit;
pthread_mutex_unlock(&exit_mutex);
if (needexit)
break;
/* wake to check progexit */
timeout.tv_sec = time(NULL) + 5;
timeout.tv_nsec = 0;
pthread_cond_timedwait(&group->only, &group->mutex, &timeout);
}
*ok = group->exit_ok;
*error = group->exit_error + needexit;
*total = group->exit_total;
if(!--group->jobs)
needfree = 1; |
fb6fe4f5 |
else
logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs); |
949c6fe5 |
pthread_mutex_unlock(&group->mutex);
if (needfree) { |
fb6fe4f5 |
logg("$THRMGR: group finished freeing %p\n", group); |
949c6fe5 |
free(group);
}
}
jobgroup_t *thrmgr_group_new(void)
{
jobgroup_t *group;
group = malloc(sizeof(*group));
if (!group)
return NULL; |
7660b7cb |
group->jobs = 1;
group->exit_ok = group->exit_error = group->exit_total = group->force_exit = 0;
if (pthread_mutex_init(&group->mutex, NULL)) {
logg("^Failed to initialize group mutex");
free(group);
return NULL;
}
if (pthread_cond_init(&group->only, NULL)) {
logg("^Failed to initialize group cond");
pthread_mutex_destroy(&group->mutex);
free(group);
return NULL;
} |
fb6fe4f5 |
logg("$THRMGR: new group: %p\n", group); |
949c6fe5 |
return group;
}
int thrmgr_group_need_terminate(jobgroup_t *group)
{
int ret;
if (group) {
pthread_mutex_lock(&group->mutex);
ret = group->force_exit;
pthread_mutex_unlock(&group->mutex);
} else
ret = 0;
pthread_mutex_lock(&exit_mutex);
ret |= progexit;
pthread_mutex_unlock(&exit_mutex);
return ret;
}
|
0378a9ab |
void thrmgr_group_terminate(jobgroup_t *group) |
949c6fe5 |
{ |
0378a9ab |
if (group) {
/* we may not be the last active job, now
* the last active job will free resources */
pthread_mutex_lock(&group->mutex);
group->force_exit = 1;
pthread_mutex_unlock(&group->mutex);
} |
52e8d3c6 |
} |