/* * Copyright (C) 2004 Trog <trog@clamav.net> * * The code is based on the book "Programming with POSIX threads" by Dave * Butenhof * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ /* * thrmgr.c * * This file implements the interfaces for a "work queue" * manager. A "manager object" is created with several * parameters, including the required size of a work queue * entry, the maximum desired degree of parallelism (number of * threads to service the queue), and the address of an * execution engine routine. * * The application requests a work queue entry from the manager, * fills in the application-specific fields, and returns it to * the queue manager for processing. The manager will create a * new thread to service the queue if all current threads are * busy and the maximum level of parallelism has not yet been * reached. * * The manager will dequeue items and present them to the * processing engine until the queue is empty; at that point, * processing threads will begin to shut down. (They will be * restarted when work appears.) */ #include <pthread.h> #include <stdlib.h> #include <time.h> #include <string.h> #include <errno.h> #include "thrmgr.h" /* * Thread start routine to serve the work queue. */ static void *thrmgr_server (void *arg) { thrmgr_t *thrmgr = (thrmgr_t *)arg; work_element_t *we; int status; /* * We don't need to validate the thrmgr_t here... we don't * create server threads until requests are queued (the * queue has been initialized by then!) and we wait for all * server threads to terminate before destroying a work * queue. */ /* log_message ("A worker is starting"); */ status = pthread_mutex_lock (&thrmgr->mutex); if (status != 0) { //log_message ("A worker is dying"); return(NULL); } while (1) { thrmgr->idle++; /* log_message ("Worker waiting for work - idle:%d", thrmgr->idle); */ while ( (thrmgr->first == NULL) && !thrmgr->quit) { #ifndef BROKEN_COND_SIGNAL status = pthread_cond_wait (&thrmgr->cond, &thrmgr->mutex); #else status = pthread_mutex_unlock (&thrmgr->mutex); status = sem_wait(&thrmgr->semaphore); pthread_mutex_lock (&thrmgr->mutex); #endif if (status != 0) { /* * This shouldn't happen, so the work queue * package should fail. Because the work queue * API is asynchronous, that would add * complication. Because the chances of failure * are slim, I choose to avoid that * complication. The server thread will return, * and allow another server thread to pick up * the work later. Note that, if this was the * only server thread, the queue won't be * serviced until a new work item is * queued. That could be fixed by creating a new * server here. */ //log_message ("Worker wait failed, %d (%s)", //status, strerror (status)); thrmgr->counter--; thrmgr->idle--; pthread_mutex_unlock (&thrmgr->mutex); return(NULL); } } we = thrmgr->first; if (we != NULL) { thrmgr->first = we->next; if (thrmgr->last == we) { thrmgr->last = NULL; } thrmgr->idle--; status = pthread_mutex_unlock (&thrmgr->mutex); if (status != 0) { //log_message ("A worker is dying"); return(NULL); } /* log_message ("Worker calling handler"); */ thrmgr->handler (we->data); free (we); status = pthread_mutex_lock (&thrmgr->mutex); if (status != 0) { //log_message ("A worker is dying"); return(NULL); } } /* * If there are no more work requests, and the servers * have been asked to quit, then shut down. */ if ( (thrmgr->first == NULL) &&thrmgr->quit) { //log_message ("Worker shutting down"); thrmgr->counter--; /* * NOTE: Just to prove that every rule has an * exception, I'm using the "cond" condition for two * separate predicates here. That's OK, since the * case used here applies only once during the life * of a work queue -- during rundown. The overhead * is minimal and it's not worth creating a separate * condition variable that would be waited and * signaled exactly once! */ #ifndef BROKEN_COND_SIGNAL if (thrmgr->counter == 0) { pthread_cond_broadcast (&thrmgr->cond); } #endif pthread_mutex_unlock (&thrmgr->mutex); //log_message ("A worker is dying"); return(NULL); } } pthread_mutex_unlock (&thrmgr->mutex); //log_message ("Worker exiting"); return(NULL); } /* * Initialize a thread manager. */ int thrmgr_init( thrmgr_t *thrmgr, /* thread manager */ int max_threads, /* maximum threads */ int alloc_unit, /* thread creation unit */ void (*handler)(void *arg)) /* request handler */ { int status; status = pthread_attr_init (&thrmgr->attr); if (status != 0) return(status); status = pthread_attr_setdetachstate (&thrmgr->attr, PTHREAD_CREATE_DETACHED); if (status != 0) { pthread_attr_destroy (&thrmgr->attr); return(status); } status = pthread_mutex_init (&thrmgr->mutex, NULL); if (status != 0) { pthread_attr_destroy (&thrmgr->attr); return(status); } #ifndef BROKEN_COND_SIGNAL status = pthread_cond_init (&thrmgr->cond, NULL); #else status = sem_init(&thrmgr->semaphore, 0, 0); #endif if (status != 0) { pthread_mutex_destroy (&thrmgr->mutex); pthread_attr_destroy (&thrmgr->attr); return(status); } thrmgr->quit = 0; /* not time to quit */ thrmgr->first = thrmgr->last = NULL; /* no queue entries */ thrmgr->parallelism = max_threads; /* max servers */ thrmgr->alloc_unit = alloc_unit; /* thread creation unit */ thrmgr->counter = 0; /* no server threads yet */ thrmgr->idle = 0; /* no idle servers */ thrmgr->handler = handler; thrmgr->valid = THRMGR_VALID; return(0); } /* * Destroy a thread manager */ int thrmgr_destroy (thrmgr_t *thrmgr) { int status, status1, status2; if (thrmgr->valid != THRMGR_VALID) { return EINVAL; } status = pthread_mutex_lock (&thrmgr->mutex); if (status != 0) { return(status); } thrmgr->valid = 0; /* prevent any other operations */ /* * Check whether any threads are active, and run them down: * * 1. set the quit flag * 2. broadcast to wake any servers that may be asleep * 3. wait for all threads to quit (counter goes to 0) * */ if (thrmgr->counter > 0) { thrmgr->quit = 1; /* if any threads are idling, wake them. */ if (thrmgr->idle > 0) { #ifndef BROKEN_COND_SIGNAL status = pthread_cond_broadcast (&thrmgr->cond); if (status != 0) { pthread_mutex_unlock (&thrmgr->mutex); return(status); } #endif } /* * Just to prove that every rule has an exception, I'm * using the "cv" condition for two separate predicates * here. That's OK, since the case used here applies * only once during the life of a work queue -- during * rundown. The overhead is minimal and it's not worth * creating a separate condition variable that would be * waited and signalled exactly once! */ while (thrmgr->counter > 0) { #ifndef BROKEN_COND_SIGNAL status = pthread_cond_wait (&thrmgr->cond, &thrmgr->mutex); if (status != 0) { pthread_mutex_unlock (&thrmgr->mutex); return(status); } #endif } } status = pthread_mutex_unlock (&thrmgr->mutex); if (status != 0) { return(status); } status = pthread_mutex_destroy (&thrmgr->mutex); #ifndef BROKEN_COND_SIGNAL status1 = pthread_cond_destroy (&thrmgr->cond); #else status1 = sem_destroy(&thrmgr->semaphore); #endif status2 = pthread_attr_destroy (&thrmgr->attr); return (status ? status : (status1 ? status1 : status2)); } /* * Add an item to a work queue. */ int thrmgr_add( thrmgr_t *thrmgr, void *element ) { work_element_t *item; pthread_t id; int status; int count; if (thrmgr->valid != THRMGR_VALID) { return(EINVAL); } /* * Create and initialize a request structure. */ item = mmalloc( sizeof (work_element_t) ); item->data = element; item->next = NULL; status = pthread_mutex_lock (&thrmgr->mutex); if (status != 0) { free (item); return(status); } /* * Add the request to the end of the queue, updating the * first and last pointers. */ if (thrmgr->first == NULL) { thrmgr->first = item; } else { thrmgr->last->next = item; } thrmgr->last = item; /* * if any threads are idling, wake one. */ /* printf("Idle threads: %d\n", thrmgr->idle); */ if (thrmgr->idle > 0) { #ifndef BROKEN_COND_SIGNAL status = pthread_cond_signal (&thrmgr->cond); #else status = sem_post(&thrmgr->semaphore); #endif if (status != 0) { pthread_mutex_unlock (&thrmgr->mutex); return(status); } } else if (thrmgr->counter < thrmgr->parallelism) { /* * If there were no idling threads, and we're allowed to * create a new thread, do so. */ for ( count=0 ; count < thrmgr->alloc_unit ; count++ ) { /* log_message ("Creating new worker"); */ status = pthread_create (&id, &thrmgr->attr, thrmgr_server, (void*)thrmgr); if (status != 0) { pthread_mutex_unlock (&thrmgr->mutex); return(status); } thrmgr->counter++; } } pthread_mutex_unlock (&thrmgr->mutex); return(0); } int thrmgr_stat( thrmgr_t *thrmgr, int *threads, int *idle ) { int status; status = pthread_mutex_lock (&thrmgr->mutex); if (status != 0) { return(-1); } *threads = thrmgr->counter; *idle = thrmgr->idle; pthread_mutex_unlock (&thrmgr->mutex); return(0); }