Browse code

thrmgr: new clean reimplementation

git-svn-id: file:///var/lib/svn/clamav-devel/trunk/clamav-devel@338 77e5149b-7576-45b1-b177-96237e5ba77b

Trog authored on 2004/02/25 20:07:00
Showing 4 changed files
... ...
@@ -1,3 +1,7 @@
1
+Wed Feb 25 11:07:53 GMT 2004 (trog)
2
+-----------------------------------
3
+ * clamd thrmgr: new clean reimplementation
4
+
1 5
 Wed Feb 25 08:57:35 GMT 2004 (trog)
2 6
 -----------------------------------
3 7
   * libclamav/vba_extract.c: add VBA signature for Office 2003
... ...
@@ -176,7 +176,7 @@ static struct cl_node *reload_db(struct cl_node *root, const struct cfgstruct *c
176 176
 int acceptloop_th(int socketd, struct cl_node *root, const struct cfgstruct *copt)
177 177
 {
178 178
 	int new_sd, max_threads, options=0;
179
-	thrmgr_t thrmgr;
179
+	threadpool_t *thr_pool;
180 180
 	struct sigaction sigact;
181 181
 	mode_t old_umask;
182 182
 	struct cl_limits limits;
... ...
@@ -371,8 +371,8 @@ int acceptloop_th(int socketd, struct cl_node *root, const struct cfgstruct *cop
371 371
     pthread_mutex_init(&exit_mutex, NULL);
372 372
     pthread_mutex_init(&reload_mutex, NULL);
373 373
 
374
-    if(thrmgr_init(&thrmgr, max_threads, 1, scanner_thread) != 0) {
375
-	logg("thrmgr_init failed");
374
+    if((thr_pool=thrmgr_new(max_threads, 30, scanner_thread)) == NULL) {
375
+	logg("thrmgr_new failed");
376 376
 	exit(-1);
377 377
     }
378 378
 
... ...
@@ -400,7 +400,7 @@ int acceptloop_th(int socketd, struct cl_node *root, const struct cfgstruct *cop
400 400
 		client_conn->copt = copt;
401 401
 		client_conn->root = root;
402 402
 		client_conn->limits = &limits;
403
-		thrmgr_add(&thrmgr, client_conn);
403
+		thrmgr_dispatch(thr_pool, client_conn);
404 404
 	}
405 405
 
406 406
 	pthread_mutex_lock(&exit_mutex);
... ...
@@ -429,10 +429,10 @@ int acceptloop_th(int socketd, struct cl_node *root, const struct cfgstruct *cop
429 429
 	    /* Destroy the thread manager.
430 430
 	     * This waits for all current tasks to end
431 431
 	     */
432
-	    thrmgr_destroy(&thrmgr);
432
+	    thrmgr_destroy(thr_pool);
433 433
 	    root = reload_db(root, copt, FALSE);
434
-	    if(thrmgr_init(&thrmgr, max_threads, 1, scanner_thread) != 0) {
435
-		logg("!thrmgr_init failed");
434
+	    if((thr_pool=thrmgr_new(max_threads, 30, scanner_thread)) == NULL) {
435
+		logg("!thrmgr_new failed");
436 436
 		pthread_mutex_unlock(&reload_mutex);
437 437
 		exit(-1);
438 438
 	    }
... ...
@@ -1,9 +1,6 @@
1 1
 /*
2 2
  *  Copyright (C) 2004 Trog <trog@clamav.net>
3 3
  *
4
- *  The code is based on the book "Programming with POSIX threads" by Dave
5
- *  Butenhof
6
- *
7 4
  *  This program is free software; you can redistribute it and/or modify
8 5
  *  it under the terms of the GNU General Public License as published by
9 6
  *  the Free Software Foundation; either version 2 of the License, or
... ...
@@ -18,356 +15,242 @@
18 18
  *  along with this program; if not, write to the Free Software
19 19
  *  Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 20
  */
21
-/*
22
- * thrmgr.c
23
- *
24
- * This file implements the interfaces for a "work queue"
25
- * manager. A "manager object" is created with several
26
- * parameters, including the required size of a work queue
27
- * entry, the maximum desired degree of parallelism (number of
28
- * threads to service the queue), and the address of an
29
- * execution engine routine.
30
- *
31
- * The application requests a work queue entry from the manager,
32
- * fills in the application-specific fields, and returns it to
33
- * the queue manager for processing. The manager will create a
34
- * new thread to service the queue if all current threads are
35
- * busy and the maximum level of parallelism has not yet been
36
- * reached.
37
- *
38
- * The manager will dequeue items and present them to the
39
- * processing engine until the queue is empty; at that point,
40
- * processing threads will begin to shut down. (They will be
41
- * restarted when work appears.)
42
- */
21
+
43 22
 #include <pthread.h>
44
-#include <stdlib.h>
45 23
 #include <time.h>
46
-#include <string.h>
47 24
 #include <errno.h>
48 25
 
49 26
 #include "thrmgr.h"
50
-#include "others.h"
51 27
 
52
-/*
53
- * Thread start routine to serve the work queue.
54
- */
55
-static void *thrmgr_server (void *arg)
56
-{
57
-  thrmgr_t *thrmgr = (thrmgr_t *)arg;
58
-  work_element_t *we;
59
-  int status;
60
-
61
-  /*
62
-   * We don't need to validate the thrmgr_t here... we don't
63
-   * create server threads until requests are queued (the
64
-   * queue has been initialized by then!) and we wait for all
65
-   * server threads to terminate before destroying a work
66
-   * queue.
67
-   */
68
-/*    log_message ("A worker is starting"); */
69
-  status = pthread_mutex_lock (&thrmgr->mutex);
70
-  if (status != 0) {
71
-    //log_message ("A worker is dying");
72
-    return(NULL);
73
-  }
74
-
75
-  while (1) {
76
-    thrmgr->idle++;
28
+#include "others.h"
77 29
 
78
- /*     log_message ("Worker waiting for work - idle:%d", thrmgr->idle); */
79
-    
80
-    while ( (thrmgr->first == NULL) && !thrmgr->quit) {
81
-#ifndef BROKEN_COND_SIGNAL
82
-      status = pthread_cond_wait (&thrmgr->cond, &thrmgr->mutex);
83
-#else
84
-      status = pthread_mutex_unlock (&thrmgr->mutex);
85
-      status = sem_wait(&thrmgr->semaphore);
86
-      pthread_mutex_lock (&thrmgr->mutex);
87
-#endif
88
-      if (status != 0) {
89
-	/*
90
-	 * This shouldn't happen, so the work queue
91
-	 * package should fail. Because the work queue
92
-	 * API is asynchronous, that would add
93
-	 * complication. Because the chances of failure
94
-	 * are slim, I choose to avoid that
95
-	 * complication. The server thread will return,
96
-	 * and allow another server thread to pick up
97
-	 * the work later. Note that, if this was the
98
-	 * only server thread, the queue won't be
99
-	 * serviced until a new work item is
100
-	 * queued. That could be fixed by creating a new
101
-	 * server here.
102
-	 */
103
-	//log_message ("Worker wait failed, %d (%s)",
104
-		     //status, strerror (status));
105
-	thrmgr->counter--;
106
-	thrmgr->idle--;
107
-	pthread_mutex_unlock (&thrmgr->mutex);
108
-	return(NULL);
109
-      }
110
-    }
111
-    we = thrmgr->first;
112
-    
113
-    if (we != NULL) {
114
-      thrmgr->first = we->next;
115
-      if (thrmgr->last == we) {
116
-	thrmgr->last = NULL;
117
-      }
118
-      thrmgr->idle--;
119
-      status = pthread_mutex_unlock (&thrmgr->mutex);
120
-      if (status != 0) {
121
-	//log_message ("A worker is dying");
122
-	return(NULL);
123
-      }
124
-/*        log_message ("Worker calling handler"); */
125
-      thrmgr->handler (we->data);
126
-      free (we);
127
-      status = pthread_mutex_lock (&thrmgr->mutex);
128
-      if (status != 0) {
129
-	//log_message ("A worker is dying");
130
-	return(NULL);
131
-      }
132
-    }
133
-    
134
-    /*
135
-     * If there are no more work requests, and the servers
136
-     * have been asked to quit, then shut down.
137
-     */
138
-    if ( (thrmgr->first == NULL) &&thrmgr->quit) {
139
-      //log_message ("Worker shutting down");
140
-      thrmgr->counter--;
141
-      
142
-      /*
143
-       * NOTE: Just to prove that every rule has an
144
-       * exception, I'm using the "cond" condition for two
145
-       * separate predicates here.  That's OK, since the
146
-       * case used here applies only once during the life
147
-       * of a work queue -- during rundown. The overhead
148
-       * is minimal and it's not worth creating a separate
149
-       * condition variable that would be waited and
150
-       * signaled exactly once!
151
-       */
152
-#ifndef BROKEN_COND_SIGNAL
153
-      if (thrmgr->counter == 0) {
154
-	pthread_cond_broadcast (&thrmgr->cond);
155
-      }
156
-#endif
157
-      pthread_mutex_unlock (&thrmgr->mutex);
158
-      //log_message ("A worker is dying");
159
-      return(NULL);
160
-    }
161
-    
162
-  }
30
+#define FALSE (0)
31
+#define TRUE (1)
163 32
 
164
-  pthread_mutex_unlock (&thrmgr->mutex);
165
-  //log_message ("Worker exiting");
166
-  return(NULL);
33
+work_queue_t *work_queue_new()
34
+{
35
+	work_queue_t *work_q;
36
+	
37
+	work_q = (work_queue_t *) mmalloc(sizeof(work_queue_t));
38
+	
39
+	work_q->head = work_q->tail = NULL;
40
+	work_q->item_count = 0;
41
+	return work_q;
167 42
 }
168 43
 
169
-/*
170
- * Initialize a thread manager.
171
- */
172
-int thrmgr_init( thrmgr_t *thrmgr,                 /* thread manager */
173
-		 int       max_threads,            /* maximum threads */
174
-		 int       alloc_unit,             /* thread creation unit */
175
-		 void      (*handler)(void *arg))  /* request handler */
44
+void work_queue_add(work_queue_t *work_q, void *data)
176 45
 {
177
-  int status;
178
-
179
-  status = pthread_attr_init (&thrmgr->attr);
180
-  if (status != 0)
181
-    return(status);
182
-  status = pthread_attr_setdetachstate (&thrmgr->attr,
183
-					PTHREAD_CREATE_DETACHED);
184
-  if (status != 0) {
185
-    pthread_attr_destroy (&thrmgr->attr);
186
-    return(status);
187
-  }
188
-  status = pthread_mutex_init (&thrmgr->mutex, NULL);
189
-  if (status != 0) {
190
-    pthread_attr_destroy (&thrmgr->attr);
191
-    return(status);
192
-  }
193
-#ifndef BROKEN_COND_SIGNAL
194
-  status = pthread_cond_init (&thrmgr->cond, NULL);
195
-#else
196
-  status = sem_init(&thrmgr->semaphore, 0, 0);
197
-#endif
198
-  if (status != 0) {
199
-    pthread_mutex_destroy (&thrmgr->mutex);
200
-    pthread_attr_destroy (&thrmgr->attr);
201
-    return(status);
202
-  }
203
-  thrmgr->quit = 0;                       /* not time to quit */
204
-  thrmgr->first = thrmgr->last = NULL;    /* no queue entries */
205
-  thrmgr->parallelism = max_threads;      /* max servers */
206
-  thrmgr->alloc_unit = alloc_unit;        /* thread creation unit */
207
-  thrmgr->counter = 0;                    /* no server threads yet */
208
-  thrmgr->idle = 0;                       /* no idle servers */
209
-  thrmgr->handler = handler;
210
-  thrmgr->valid = THRMGR_VALID;
211
-  return(0);
46
+	work_item_t *work_item;
47
+	
48
+	if (!work_q) {
49
+		return;
50
+	}
51
+	work_item = (work_item_t *) mmalloc(sizeof(work_item_t));
52
+	work_item->next = NULL;
53
+	work_item->data = data;
54
+	gettimeofday(&(work_item->time_queued), NULL);
55
+	
56
+	if (work_q->head == NULL) {
57
+		work_q->head = work_q->tail = work_item;
58
+		work_q->item_count = 1;
59
+	} else {
60
+		work_q->tail->next = work_item;
61
+		work_q->tail = work_item;
62
+		work_q->item_count++;
63
+	}
64
+	return;
212 65
 }
213 66
 
214
-/*
215
- * Destroy a thread manager
216
- */
217
-int thrmgr_destroy (thrmgr_t *thrmgr)
67
+void *work_queue_pop(work_queue_t *work_q)
218 68
 {
219
-  int status, status1, status2;
220
-  
221
-  if (thrmgr->valid != THRMGR_VALID) {
222
-    return EINVAL;
223
-  }
224
-  status = pthread_mutex_lock (&thrmgr->mutex);
225
-  if (status != 0) {
226
-    return(status);
227
-  }
228
-  thrmgr->valid = 0;             /* prevent any other operations */
229
-
230
-  /*
231
-   * Check whether any threads are active, and run them down:
232
-   *
233
-   * 1.       set the quit flag
234
-   * 2.       broadcast to wake any servers that may be asleep
235
-   * 3.       wait for all threads to quit (counter goes to 0)
236
-   *
237
-   */
238
-  if (thrmgr->counter > 0) {
239
-    thrmgr->quit = 1;
240
-    /* if any threads are idling, wake them. */
241
-    if (thrmgr->idle > 0) {
242
-#ifndef BROKEN_COND_SIGNAL
243
-      status = pthread_cond_broadcast (&thrmgr->cond);
244
-      if (status != 0) {
245
-	pthread_mutex_unlock (&thrmgr->mutex);
246
-	return(status);
247
-      }
248
-#endif
249
-    }
250
-
251
-    /*
252
-     * Just to prove that every rule has an exception, I'm
253
-     * using the "cv" condition for two separate predicates
254
-     * here. That's OK, since the case used here applies
255
-     * only once during the life of a work queue -- during
256
-     * rundown. The overhead is minimal and it's not worth
257
-     * creating a separate condition variable that would be
258
-     * waited and signalled exactly once!
259
-     */
260
-    while (thrmgr->counter > 0) {
261
-#ifndef BROKEN_COND_SIGNAL
262
-      status = pthread_cond_wait (&thrmgr->cond, &thrmgr->mutex);
263
-      if (status != 0) {
264
-	pthread_mutex_unlock (&thrmgr->mutex);
265
-	return(status);
266
-      }
267
-#endif
268
-    }       
269
-  }
270
-  status = pthread_mutex_unlock (&thrmgr->mutex);
271
-  if (status != 0) {
272
-    return(status);
273
-  }
274
-  status = pthread_mutex_destroy (&thrmgr->mutex);
275
-#ifndef BROKEN_COND_SIGNAL
276
-  status1 = pthread_cond_destroy (&thrmgr->cond);
277
-#else
278
-  status1 = sem_destroy(&thrmgr->semaphore);
279
-#endif
280
-  status2 = pthread_attr_destroy (&thrmgr->attr);
281
-  return (status ? status : (status1 ? status1 : status2));
69
+	work_item_t *work_item;
70
+	void *data;
71
+	
72
+	if (!work_q || !work_q->head) {
73
+		return NULL;
74
+	}
75
+	work_item = work_q->head;
76
+	data = work_item->data;
77
+	work_q->head = work_item->next;
78
+	if (work_q->head == NULL) {
79
+		work_q->tail = NULL;
80
+	}
81
+	free(work_item);
82
+	return data;
282 83
 }
283 84
 
284
-/*
285
- * Add an item to a work queue.
286
- */
287
-int thrmgr_add( thrmgr_t *thrmgr,
288
-		void     *element )
85
+void thrmgr_destroy(threadpool_t *threadpool)
289 86
 {
290
-  work_element_t *item;
291
-  pthread_t id;
292
-  int status;
293
-  int count;
294
-
295
-  if (thrmgr->valid != THRMGR_VALID) {
296
-    return(EINVAL);
297
-  }
298
-
299
-  /*
300
-   * Create and initialize a request structure.
301
-   */
302
-  item = mmalloc( sizeof (work_element_t) );
303
-  item->data = element;
304
-  item->next = NULL;
305
-  status = pthread_mutex_lock (&thrmgr->mutex);
306
-  if (status != 0) {
307
-    free (item);
308
-    return(status);
309
-  }
310
-
311
-  /*
312
-   * Add the request to the end of the queue, updating the
313
-   * first and last pointers.
314
-   */
315
-  if (thrmgr->first == NULL) {
316
-    thrmgr->first = item;
317
-  } else {
318
-    thrmgr->last->next = item;
319
-  }
320
-  thrmgr->last = item;
321
-  
322
-  /*
323
-   * if any threads are idling, wake one.
324
-   */
325
-/*    printf("Idle threads: %d\n", thrmgr->idle); */
326
-  if (thrmgr->idle > 0) {
327
-#ifndef BROKEN_COND_SIGNAL
328
-    status = pthread_cond_signal (&thrmgr->cond);
329
-#else
330
-    status = sem_post(&thrmgr->semaphore);
331
-#endif
332
-    if (status != 0) {
333
-      pthread_mutex_unlock (&thrmgr->mutex);
334
-      return(status);
335
-    }
336
-  } else if (thrmgr->counter < thrmgr->parallelism) {
337
-    /*
338
-     * If there were no idling threads, and we're allowed to
339
-     * create a new thread, do so.
340
-     */
341
-    for ( count=0 ; count < thrmgr->alloc_unit ; count++ ) {
342
-/*        log_message ("Creating new worker"); */
343
-      status = pthread_create (&id, &thrmgr->attr, thrmgr_server, (void*)thrmgr);
344
-      if (status != 0) {
345
-	pthread_mutex_unlock (&thrmgr->mutex);
346
-	return(status);
347
-      }
348
-      thrmgr->counter++;
349
-    }
350
-  }
351
-  pthread_mutex_unlock (&thrmgr->mutex);
352
-  return(0);
87
+	if (!threadpool || (threadpool->state != POOL_VALID)) {
88
+		return;
89
+	}
90
+  	if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
91
+   		logg("!Mutex lock failed");
92
+    		exit(-1);
93
+	}
94
+	threadpool->state = POOL_EXIT;
95
+	
96
+	/* wait for threads to exit */
97
+	if (threadpool->thr_alive > 0) {
98
+		if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) {
99
+			pthread_mutex_unlock(&threadpool->pool_mutex);
100
+			return;
101
+		}
102
+	}
103
+	while (threadpool->thr_alive > 0) {
104
+		if (pthread_cond_wait (&threadpool->pool_cond, &threadpool->pool_mutex) != 0) {
105
+			pthread_mutex_unlock(&threadpool->pool_mutex);
106
+			return;
107
+		}
108
+	}
109
+  	if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
110
+    		logg("!Mutex unlock failed");
111
+    		exit(-1);
112
+  	}
113
+	
114
+	pthread_mutex_destroy(&(threadpool->pool_mutex));
115
+	pthread_cond_destroy(&(threadpool->pool_cond));
116
+	pthread_attr_destroy(&(threadpool->pool_attr));
117
+	free(threadpool);
118
+	return;
353 119
 }
354 120
 
355
-int thrmgr_stat( thrmgr_t *thrmgr,
356
-		 int      *threads,
357
-		 int      *idle )
121
+threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *))
358 122
 {
359
-  int status;
123
+	threadpool_t *threadpool;
124
+	
125
+	if (max_threads <= 0) {
126
+		return NULL;
127
+	}
128
+	
129
+	threadpool = (threadpool_t *) mmalloc(sizeof(threadpool_t));
360 130
 
361
-  status = pthread_mutex_lock (&thrmgr->mutex);
362
-  if (status != 0) {
363
-    return(-1);
364
-  }
131
+	threadpool->queue = work_queue_new();
132
+	if (!threadpool->queue) {
133
+		free(threadpool);
134
+		return NULL;
135
+	}	
136
+	threadpool->thr_max = max_threads;
137
+	threadpool->thr_alive = 0;
138
+	threadpool->thr_idle = 0;
139
+	threadpool->idle_timeout = idle_timeout;
140
+	threadpool->handler = handler;
141
+	
142
+	pthread_mutex_init(&(threadpool->pool_mutex), NULL);
143
+	if (pthread_cond_init(&(threadpool->pool_cond), NULL) != 0) {
144
+		free(threadpool);
145
+		return NULL;
146
+	}
147
+		
148
+	if (pthread_attr_init(&(threadpool->pool_attr)) != 0) {
149
+		free(threadpool);
150
+		return NULL;
151
+	}
152
+	
153
+	if (pthread_attr_setdetachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != 0) {
154
+		free(threadpool);
155
+		return NULL;
156
+	}
157
+	threadpool->state = POOL_VALID;
365 158
 
366
-  *threads = thrmgr->counter;
367
-  *idle = thrmgr->idle;
159
+	return threadpool;
160
+}
368 161
 
369
-  pthread_mutex_unlock (&thrmgr->mutex);
370
-  return(0);
162
+void *thrmgr_worker(void *arg)
163
+{
164
+	threadpool_t *threadpool = (threadpool_t *) arg;
165
+	void *job_data;
166
+	int retval, must_exit = FALSE;
167
+	struct timespec timeout;
168
+	
169
+	/* loop looking for work */
170
+	for (;;) {
171
+		if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
172
+			/* Fatal error */
173
+			logg("!Fatal: mutex lock failed");
174
+			exit(-2);
175
+		}
176
+		timeout.tv_sec = time(NULL) + threadpool->idle_timeout;
177
+		timeout.tv_nsec = 0;
178
+		threadpool->thr_idle++;
179
+		while (((job_data=work_queue_pop(threadpool->queue)) == NULL)
180
+				&& (threadpool->state != POOL_EXIT)) {
181
+			/* Sleep, awaiting wakeup */
182
+			retval = pthread_cond_timedwait(&(threadpool->pool_cond),
183
+				&(threadpool->pool_mutex), &timeout);
184
+			if (retval == ETIMEDOUT) {
185
+				must_exit = TRUE;
186
+				break;
187
+			}
188
+		}
189
+		threadpool->thr_idle--;
190
+		if (threadpool->state == POOL_EXIT) {
191
+			must_exit = TRUE;
192
+		}
193
+		
194
+		if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
195
+			/* Fatal error */
196
+			logg("!Fatal: mutex unlock failed");
197
+			exit(-2);
198
+		}
199
+		if (job_data) {
200
+			threadpool->handler(job_data);
201
+		} else if (must_exit) {
202
+			break;
203
+		}
204
+	}
205
+	if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
206
+		/* Fatal error */
207
+		logg("!Fatal: mutex lock failed");
208
+		exit(-2);
209
+	}
210
+	threadpool->thr_alive--;
211
+	if (threadpool->thr_alive == 0) {
212
+		/* signal that all threads are finished */
213
+		pthread_cond_broadcast(&threadpool->pool_cond);
214
+	}
215
+	if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
216
+		/* Fatal error */
217
+		logg("!Fatal: mutex unlock failed");
218
+		exit(-2);
219
+	}
220
+	return NULL;
371 221
 }
372
-  
373 222
 
223
+int thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
224
+{
225
+	pthread_t thr_id;
226
+	
227
+	if (!threadpool) {
228
+		return FALSE;
229
+	}
230
+	
231
+	/* Lock the threadpool */
232
+	if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
233
+		logg("!Mutex lock failed");
234
+		return FALSE;
235
+	}
236
+	
237
+	if (threadpool->state != POOL_VALID) {
238
+		return FALSE;
239
+	}
240
+	work_queue_add(threadpool->queue, user_data);
241
+	
242
+	if ((threadpool->thr_idle == 0) &&
243
+			(threadpool->thr_alive < threadpool->thr_max)) {
244
+		/* Start a new thread */
245
+		if (pthread_create(&thr_id, &(threadpool->pool_attr),
246
+				thrmgr_worker, threadpool) != 0) {
247
+			logg("!pthread_create failed");
248
+		} else {
249
+			threadpool->thr_alive++;
250
+		}
251
+	}
252
+	pthread_cond_signal(&(threadpool->pool_cond));
253
+	
254
+	if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
255
+		logg("!Mutex unlock failed");
256
+		return FALSE;
257
+	}
258
+	return TRUE;
259
+}
... ...
@@ -1,9 +1,6 @@
1 1
 /*
2 2
  *  Copyright (C) 2004 Trog <trog@clamav.net>
3 3
  *
4
- *  The code is based on the book "Programming with POSIX threads" by Dave
5
- *  Butenhof
6
- *
7 4
  *  This program is free software; you can redistribute it and/or modify
8 5
  *  it under the terms of the GNU General Public License as published by
9 6
  *  the Free Software Foundation; either version 2 of the License, or
... ...
@@ -18,92 +15,49 @@
18 18
  *  along with this program; if not, write to the Free Software
19 19
  *  Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 20
  */
21
-/*
22
- * workq.h
23
- *
24
- * This header file defines the interfaces for a "work queue"
25
- * manager. A "manager object" is created with several
26
- * parameters, including the required size of a work queue
27
- * entry, the maximum desired degree of parallelism (number of
28
- * threads to service the queue), and the address of an
29
- * execution engine routine.
30
- *
31
- * The application requests a work queue entry from the manager,
32
- * fills in the application-specific fields, and returns it to
33
- * the queue manager for processing. The manager will create a
34
- * new thread to service the queue if all current threads are
35
- * busy and the maximum level of parallelism has not yet been
36
- * reached.
37
- *
38
- * The manager will dequeue items and present them to the
39
- * processing engine until the queue is empty; at that point,
40
- * processing threads will begin to shut down. (They will be
41
- * restarted when work appears.)
42
- */
43 21
 
44 22
 #ifndef __THRMGR_H__
45 23
 #define __THRMGR_H__
46 24
 
47
-#ifdef DEBUG
48
-# define DPRINTF(arg) printf arg
49
-#else
50
-# define DPRINTF(arg)
51
-#endif
52
-
53 25
 #include <pthread.h>
54
-// #include "config.h"
55
-
56
-#ifdef BROKEN_COND_SIGNAL
57
-#include <semaphore.h>
58
-#endif
59
-
60
-/*
61
- * Structure to keep track of work requests.
62
- */
63
-typedef struct work_element_tag {
64
-  struct work_element_tag     *next;
65
-  void                        *data;
66
-} work_element_t;
26
+#include <sys/time.h>
27
+
28
+typedef struct work_item_tag {
29
+	struct work_item_tag *next;
30
+	void *data;
31
+	struct timeval time_queued;
32
+} work_item_t;
33
+	
34
+typedef struct work_queue_tag {
35
+	work_item_t *head;
36
+	work_item_t *tail;
37
+	int item_count;
38
+} work_queue_t;
39
+
40
+typedef enum {
41
+	POOL_INVALID,
42
+	POOL_VALID,
43
+	POOL_EXIT,
44
+} pool_state_t;
45
+
46
+typedef struct threadpool_tag {
47
+	pthread_mutex_t pool_mutex;
48
+	pthread_cond_t pool_cond;
49
+	pthread_attr_t pool_attr;
50
+	
51
+	pool_state_t state;
52
+	int thr_max;
53
+	int thr_alive;
54
+	int thr_idle;
55
+	int idle_timeout;
56
+	
57
+	void (*handler)(void *);
58
+	
59
+	work_queue_t *queue;
60
+} threadpool_t;
61
+
62
+threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *));
63
+void thrmgr_destroy(threadpool_t *threadpool);
64
+int thrmgr_dispatch(threadpool_t *threadpool, void *user_data);
67 65
 
68
-/*
69
- * Structure describing a work queue.
70
- */
71
-typedef struct thrmgr_tag {
72
-  pthread_mutex_t     mutex;
73
-#ifndef BROKEN_COND_SIGNAL
74
-  pthread_cond_t      cond;                    /* wait for work */
75
-#else
76
-  sem_t               semaphore;
77 66
 #endif
78
-  pthread_attr_t      attr;                    /* create detached threads */
79
-  work_element_t      *first, *last;           /* work queue */
80
-  int                 valid;                   /* set when valid */
81
-  int                 quit;                    /* set when workq should quit */
82
-  int                 parallelism;             /* number of threads required */
83
-  int                 alloc_unit;              /* unit of thread creation */
84
-  int                 counter;                 /* current number of threads */
85
-  int                 idle;                    /* number of idle threads */
86
-  void                (*handler)(void *arg);   /* request handler */
87
-} thrmgr_t;
88
-
89
-#define THRMGR_VALID     0xdeadfeed
90
-
91
-/*
92
- * Define work queue functions
93
- */
94
-extern int thrmgr_init( thrmgr_t *thrmgr,                 /* thread manager */
95
-		        int       max_threads,            /* maximum threads */
96
-		        int       alloc_unit,             /* thread creation unit */
97
-		        void      (*handler)(void *) );   /* request handler */
98
-
99
-extern int thrmgr_destroy( thrmgr_t *thrmgr );
100
-
101
-extern int thrmgr_add( thrmgr_t *thrmgr,
102
-		       void     *data );
103
-
104
-int thrmgr_stat( thrmgr_t     *thrmgr,
105
-		 int *threads,
106
-		 int *idle );
107
-
108
-#endif /* __THRMGR_H__ */
109
-