clamd/thrmgr.c
c238ac42
 /*
  *  Copyright (C) 2004 Trog <trog@clamav.net>
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
  *  the Free Software Foundation; either version 2 of the License, or
  *  (at your option) any later version.
  *
  *  This program is distributed in the hope that it will be useful,
  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  *  GNU General Public License for more details.
  *
  *  You should have received a copy of the GNU General Public License
  *  along with this program; if not, write to the Free Software
48b7b4a7
  *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  *  MA 02110-1301, USA.
c238ac42
  */
52e8d3c6
 
67118e92
 #if HAVE_CONFIG_H
 #include "clamav-config.h"
 #endif
 
bd8603aa
 #include <stdio.h>
c238ac42
 #include <pthread.h>
 #include <time.h>
 #include <errno.h>
 
bd8603aa
 #include "shared/output.h"
c238ac42
 
bd8603aa
 #include "thrmgr.h"
52e8d3c6
 #include "others.h"
c238ac42
 
52e8d3c6
 #define FALSE (0)
 #define TRUE (1)
c238ac42
 
079229d6
 static work_queue_t *work_queue_new(void)
52e8d3c6
 {
 	work_queue_t *work_q;
 	
8ca8a18e
 	work_q = (work_queue_t *) malloc(sizeof(work_queue_t));
c3b33e5a
 	if (!work_q) {
 		return NULL;
 	}
52e8d3c6
 	
 	work_q->head = work_q->tail = NULL;
 	work_q->item_count = 0;
 	return work_q;
c238ac42
 }
 
c3b33e5a
 static int work_queue_add(work_queue_t *work_q, void *data)
c238ac42
 {
52e8d3c6
 	work_item_t *work_item;
 	
 	if (!work_q) {
c3b33e5a
 		return FALSE;
52e8d3c6
 	}
8ca8a18e
 	work_item = (work_item_t *) malloc(sizeof(work_item_t));
c3b33e5a
 	if (!work_item) {
 		return FALSE;
 	}
 	
52e8d3c6
 	work_item->next = NULL;
 	work_item->data = data;
 	gettimeofday(&(work_item->time_queued), NULL);
 	
 	if (work_q->head == NULL) {
 		work_q->head = work_q->tail = work_item;
 		work_q->item_count = 1;
 	} else {
 		work_q->tail->next = work_item;
 		work_q->tail = work_item;
 		work_q->item_count++;
 	}
c3b33e5a
 	return TRUE;
c238ac42
 }
 
079229d6
 static void *work_queue_pop(work_queue_t *work_q)
c238ac42
 {
52e8d3c6
 	work_item_t *work_item;
 	void *data;
 	
 	if (!work_q || !work_q->head) {
 		return NULL;
 	}
 	work_item = work_q->head;
 	data = work_item->data;
 	work_q->head = work_item->next;
 	if (work_q->head == NULL) {
 		work_q->tail = NULL;
 	}
 	free(work_item);
 	return data;
c238ac42
 }
 
52e8d3c6
 void thrmgr_destroy(threadpool_t *threadpool)
c238ac42
 {
52e8d3c6
 	if (!threadpool || (threadpool->state != POOL_VALID)) {
 		return;
 	}
   	if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
10b04232
    		logg("!Mutex lock failed\n");
52e8d3c6
     		exit(-1);
 	}
 	threadpool->state = POOL_EXIT;
 	
 	/* wait for threads to exit */
 	if (threadpool->thr_alive > 0) {
 		if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) {
 			pthread_mutex_unlock(&threadpool->pool_mutex);
 			return;
 		}
 	}
 	while (threadpool->thr_alive > 0) {
 		if (pthread_cond_wait (&threadpool->pool_cond, &threadpool->pool_mutex) != 0) {
 			pthread_mutex_unlock(&threadpool->pool_mutex);
 			return;
 		}
 	}
   	if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
10b04232
     		logg("!Mutex unlock failed\n");
52e8d3c6
     		exit(-1);
   	}
 	
 	pthread_mutex_destroy(&(threadpool->pool_mutex));
 	pthread_cond_destroy(&(threadpool->pool_cond));
 	pthread_attr_destroy(&(threadpool->pool_attr));
85d7001f
 	free(threadpool->queue);
52e8d3c6
 	free(threadpool);
 	return;
c238ac42
 }
 
1593523e
 threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *))
c238ac42
 {
52e8d3c6
 	threadpool_t *threadpool;
7b04a82c
 #if defined(C_BIGSTACK) || defined(C_BSD)
 	size_t stacksize;
 #endif
52e8d3c6
 	
 	if (max_threads <= 0) {
 		return NULL;
 	}
 	
8ca8a18e
 	threadpool = (threadpool_t *) malloc(sizeof(threadpool_t));
c3b33e5a
 	if (!threadpool) {
 		return NULL;
 	}
c238ac42
 
52e8d3c6
 	threadpool->queue = work_queue_new();
 	if (!threadpool->queue) {
 		free(threadpool);
 		return NULL;
 	}	
 	threadpool->thr_max = max_threads;
 	threadpool->thr_alive = 0;
 	threadpool->thr_idle = 0;
 	threadpool->idle_timeout = idle_timeout;
 	threadpool->handler = handler;
 	
 	pthread_mutex_init(&(threadpool->pool_mutex), NULL);
 	if (pthread_cond_init(&(threadpool->pool_cond), NULL) != 0) {
c3b33e5a
 		pthread_mutex_destroy(&(threadpool->pool_mutex));
 		free(threadpool->queue);
52e8d3c6
 		free(threadpool);
 		return NULL;
 	}
 		
 	if (pthread_attr_init(&(threadpool->pool_attr)) != 0) {
c3b33e5a
 		pthread_cond_destroy(&(threadpool->pool_cond));
 		pthread_mutex_destroy(&(threadpool->pool_mutex));
 		free(threadpool->queue);
52e8d3c6
 		free(threadpool);
 		return NULL;
 	}
 	
 	if (pthread_attr_setdetachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != 0) {
c3b33e5a
 		pthread_attr_destroy(&(threadpool->pool_attr));
 		pthread_cond_destroy(&(threadpool->pool_cond));
 		pthread_mutex_destroy(&(threadpool->pool_mutex));
 		free(threadpool->queue);
52e8d3c6
 		free(threadpool);
 		return NULL;
 	}
7b04a82c
 
 #if defined(C_BIGSTACK) || defined(C_BSD)
 	pthread_attr_getstacksize(&(threadpool->pool_attr), &stacksize);
 	stacksize = stacksize + 64 * 1024;
 	if (stacksize < 1048576) stacksize = 1048576; /* at least 1MB please */
 	logg("Set stacksize to %u\n", stacksize);
 	pthread_attr_setstacksize(&(threadpool->pool_attr), stacksize);
 #endif
52e8d3c6
 	threadpool->state = POOL_VALID;
c238ac42
 
52e8d3c6
 	return threadpool;
 }
c238ac42
 
fc83da82
 static void *thrmgr_worker(void *arg)
52e8d3c6
 {
 	threadpool_t *threadpool = (threadpool_t *) arg;
 	void *job_data;
1593523e
 	int retval, must_exit = FALSE;
52e8d3c6
 	struct timespec timeout;
 	
 	/* loop looking for work */
 	for (;;) {
 		if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
 			/* Fatal error */
10b04232
 			logg("!Fatal: mutex lock failed\n");
52e8d3c6
 			exit(-2);
 		}
 		timeout.tv_sec = time(NULL) + threadpool->idle_timeout;
 		timeout.tv_nsec = 0;
 		threadpool->thr_idle++;
 		while (((job_data=work_queue_pop(threadpool->queue)) == NULL)
 				&& (threadpool->state != POOL_EXIT)) {
 			/* Sleep, awaiting wakeup */
 			retval = pthread_cond_timedwait(&(threadpool->pool_cond),
 				&(threadpool->pool_mutex), &timeout);
 			if (retval == ETIMEDOUT) {
1593523e
 				must_exit = TRUE;
52e8d3c6
 				break;
 			}
 		}
 		threadpool->thr_idle--;
1593523e
 		if (threadpool->state == POOL_EXIT) {
 			must_exit = TRUE;
52e8d3c6
 		}
1593523e
 		
52e8d3c6
 		if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
 			/* Fatal error */
10b04232
 			logg("!Fatal: mutex unlock failed\n");
52e8d3c6
 			exit(-2);
 		}
 		if (job_data) {
 			threadpool->handler(job_data);
1593523e
 		} else if (must_exit) {
52e8d3c6
 			break;
 		}
 	}
 	if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
 		/* Fatal error */
10b04232
 		logg("!Fatal: mutex lock failed\n");
52e8d3c6
 		exit(-2);
 	}
 	threadpool->thr_alive--;
 	if (threadpool->thr_alive == 0) {
 		/* signal that all threads are finished */
 		pthread_cond_broadcast(&threadpool->pool_cond);
 	}
 	if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
 		/* Fatal error */
10b04232
 		logg("!Fatal: mutex unlock failed\n");
52e8d3c6
 		exit(-2);
 	}
 	return NULL;
c238ac42
 }
 
52e8d3c6
 int thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
 {
 	pthread_t thr_id;
9fa64734
 
52e8d3c6
 	if (!threadpool) {
 		return FALSE;
 	}
9fa64734
 
52e8d3c6
 	/* Lock the threadpool */
 	if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
1593523e
 		logg("!Mutex lock failed\n");
52e8d3c6
 		return FALSE;
 	}
9fa64734
 
52e8d3c6
 	if (threadpool->state != POOL_VALID) {
9fa64734
 		if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
1593523e
 			logg("!Mutex unlock failed\n");
9fa64734
 			return FALSE;
 		}
52e8d3c6
 		return FALSE;
 	}
c3b33e5a
 	if (!work_queue_add(threadpool->queue, user_data)) {
 		if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
 			logg("!Mutex unlock failed\n");
 			return FALSE;
 		}
 		return FALSE;
 	}
9fa64734
 
52e8d3c6
 	if ((threadpool->thr_idle == 0) &&
1593523e
 			(threadpool->thr_alive < threadpool->thr_max)) {
52e8d3c6
 		/* Start a new thread */
 		if (pthread_create(&thr_id, &(threadpool->pool_attr),
 				thrmgr_worker, threadpool) != 0) {
1593523e
 			logg("!pthread_create failed\n");
52e8d3c6
 		} else {
 			threadpool->thr_alive++;
 		}
 	}
1593523e
 	pthread_cond_signal(&(threadpool->pool_cond));
9fa64734
 
52e8d3c6
 	if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
1593523e
 		logg("!Mutex unlock failed\n");
52e8d3c6
 		return FALSE;
 	}
 	return TRUE;
 }