Browse code

revert the queue limit patch

git-svn: trunk@1637

Tomasz Kojm authored on 2005/06/25 20:15:06
Showing 2 changed files
... ...
@@ -40,17 +40,14 @@ work_queue_t *work_queue_new()
40 40
 	return work_q;
41 41
 }
42 42
 
43
-int work_queue_add(work_queue_t *work_q, void *data)
43
+void work_queue_add(work_queue_t *work_q, void *data)
44 44
 {
45 45
 	work_item_t *work_item;
46 46
 	
47 47
 	if (!work_q) {
48
-		return FALSE;
48
+		return;
49 49
 	}
50 50
 	work_item = (work_item_t *) mmalloc(sizeof(work_item_t));
51
-	if (!work_item) {
52
-		return FALSE;
53
-	}
54 51
 	work_item->next = NULL;
55 52
 	work_item->data = data;
56 53
 	gettimeofday(&(work_item->time_queued), NULL);
... ...
@@ -63,7 +60,7 @@ int work_queue_add(work_queue_t *work_q, void *data)
63 63
 		work_q->tail = work_item;
64 64
 		work_q->item_count++;
65 65
 	}
66
-	return TRUE;
66
+	return;
67 67
 }
68 68
 
69 69
 void *work_queue_pop(work_queue_t *work_q)
... ...
@@ -115,13 +112,12 @@ void thrmgr_destroy(threadpool_t *threadpool)
115 115
 	
116 116
 	pthread_mutex_destroy(&(threadpool->pool_mutex));
117 117
 	pthread_cond_destroy(&(threadpool->pool_cond));
118
-	pthread_cond_destroy(&(threadpool->pool_notfull_cond));
119 118
 	pthread_attr_destroy(&(threadpool->pool_attr));
120 119
 	free(threadpool);
121 120
 	return;
122 121
 }
123 122
 
124
-threadpool_t *thrmgr_new(int max_threads, int max_dispatch_queue, int idle_timeout, void (*handler)(void *))
123
+threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *))
125 124
 {
126 125
 	threadpool_t *threadpool;
127 126
 	
... ...
@@ -137,8 +133,6 @@ threadpool_t *thrmgr_new(int max_threads, int max_dispatch_queue, int idle_timeo
137 137
 		return NULL;
138 138
 	}	
139 139
 	threadpool->thr_max = max_threads;
140
-	threadpool->thr_max_queue = max_dispatch_queue;
141
-	threadpool->thr_queued = 0;
142 140
 	threadpool->thr_alive = 0;
143 141
 	threadpool->thr_idle = 0;
144 142
 	threadpool->idle_timeout = idle_timeout;
... ...
@@ -149,15 +143,8 @@ threadpool_t *thrmgr_new(int max_threads, int max_dispatch_queue, int idle_timeo
149 149
 		free(threadpool);
150 150
 		return NULL;
151 151
 	}
152
-	if (pthread_cond_init(&(threadpool->pool_notfull_cond), NULL) != 0) {
153
-		pthread_cond_destroy(&(threadpool->pool_cond));
154
-		free(threadpool);
155
-		return NULL;
156
-	}
157 152
 		
158 153
 	if (pthread_attr_init(&(threadpool->pool_attr)) != 0) {
159
-		pthread_cond_destroy(&(threadpool->pool_cond));
160
-		pthread_cond_destroy(&(threadpool->pool_notfull_cond));
161 154
 		free(threadpool);
162 155
 		return NULL;
163 156
 	}
... ...
@@ -175,7 +162,7 @@ void *thrmgr_worker(void *arg)
175 175
 {
176 176
 	threadpool_t *threadpool = (threadpool_t *) arg;
177 177
 	void *job_data;
178
-	int retval;
178
+	int retval, must_exit = FALSE;
179 179
 	struct timespec timeout;
180 180
 	
181 181
 	/* loop looking for work */
... ...
@@ -194,20 +181,15 @@ void *thrmgr_worker(void *arg)
194 194
 			retval = pthread_cond_timedwait(&(threadpool->pool_cond),
195 195
 				&(threadpool->pool_mutex), &timeout);
196 196
 			if (retval == ETIMEDOUT) {
197
+				must_exit = TRUE;
197 198
 				break;
198 199
 			}
199 200
 		}
200 201
 		threadpool->thr_idle--;
201
-
202
-		if (job_data) {
203
-			if ((threadpool->thr_max_queue > 0) &&
204
-			    (threadpool->thr_queued == threadpool->thr_max_queue)) {
205
-				logg("*Thread Queue: Resuming...\n");
206
-				/* signal that queue no longer full */
207
-				pthread_cond_signal(&threadpool->pool_notfull_cond);
208
-			}
209
-			threadpool->thr_queued--;
202
+		if (threadpool->state == POOL_EXIT) {
203
+			must_exit = TRUE;
210 204
 		}
205
+		
211 206
 		if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
212 207
 			/* Fatal error */
213 208
 			logg("!Fatal: mutex unlock failed\n");
... ...
@@ -215,7 +197,7 @@ void *thrmgr_worker(void *arg)
215 215
 		}
216 216
 		if (job_data) {
217 217
 			threadpool->handler(job_data);
218
-		} else {
218
+		} else if (must_exit) {
219 219
 			break;
220 220
 		}
221 221
 	}
... ...
@@ -247,81 +229,33 @@ int thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
247 247
 
248 248
 	/* Lock the threadpool */
249 249
 	if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
250
-		logg("!Mutex lock failed: %d\n", errno);
250
+		logg("!Mutex lock failed\n");
251 251
 		return FALSE;
252 252
 	}
253 253
 
254 254
 	if (threadpool->state != POOL_VALID) {
255 255
 		if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
256
-			logg("!Mutex unlock failed (!POOL_VALID): %d\n", errno);
256
+			logg("!Mutex unlock failed\n");
257 257
 			return FALSE;
258 258
 		}
259 259
 		return FALSE;
260 260
 	}
261
-	if (work_queue_add(threadpool->queue, user_data) == FALSE) {
262
-		if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
263
-			logg("!Mutex unlock failed (work_queue_add): %d\n", errno);
264
-			return FALSE;
265
-		}
266
-		return FALSE;
267
-	}
268
-	threadpool->thr_queued++;
269
-	while ((threadpool->thr_max_queue > 0) && 
270
-		(threadpool->thr_queued >= threadpool->thr_max_queue) &&
271
-		(threadpool->state == POOL_VALID)) {
272
-		logg("*Thread Queue: Full, waiting...\n");
273
-		if (pthread_cond_wait(&threadpool->pool_notfull_cond, &threadpool->pool_mutex) != 0) {
274
-			logg("!pthread_cond_wait failed: %d\n", errno);
275
-			pthread_mutex_unlock(&threadpool->pool_mutex);
276
-			return FALSE;
277
-		}
278
-	}
261
+	work_queue_add(threadpool->queue, user_data);
279 262
 
280 263
 	if ((threadpool->thr_idle == 0) &&
281
-	    (threadpool->thr_alive < threadpool->thr_max) &&
282
-	    (threadpool->state == POOL_VALID)) {
264
+			(threadpool->thr_alive < threadpool->thr_max)) {
283 265
 		/* Start a new thread */
284 266
 		if (pthread_create(&thr_id, &(threadpool->pool_attr),
285 267
 				thrmgr_worker, threadpool) != 0) {
286
-			logg("!pthread_create failed: %d\n", errno);
268
+			logg("!pthread_create failed\n");
287 269
 		} else {
288 270
 			threadpool->thr_alive++;
289 271
 		}
290 272
 	}
291
-	switch(threadpool->thr_idle) {
292
-		case 0:
293
-			break;
294
-		case 1:
295
-			if (pthread_cond_signal(&(threadpool->pool_cond)) != 0) {
296
-				logg("!pthread_cond_signal failed: %d\n", errno);
297
-				if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
298
-					logg("!Mutex unlock failed (after pthread_cond_signal failure): %d\n", errno);
299
-				}
300
-				return FALSE;
301
-			}
302
-			break;
303
-		default:
304
-			if(threadpool->thr_queued > 1) {
305
-				if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) {
306
-					logg("!pthread_cond_broadcast failed: %d\n", errno);
307
-					if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
308
-						logg("!Mutex unlock failed (after pthread_cond_broadcast failure): %d\n", errno);
309
-					}
310
-					return FALSE;
311
-				}
312
-			} else {
313
-				if (pthread_cond_signal(&(threadpool->pool_cond)) != 0) {
314
-					logg("!pthread_cond_signal failed: %d\n", errno);
315
-					if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
316
-						logg("!Mutex unlock failed (after pthread_cond_signal failure): %d\n", errno);
317
-					}
318
-					return FALSE;
319
-				}
320
-			}
321
-	}
273
+	pthread_cond_signal(&(threadpool->pool_cond));
322 274
 
323 275
 	if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
324
-		logg("!Mutex unlock failed (before complete): %d\n", errno);
276
+		logg("!Mutex unlock failed\n");
325 277
 		return FALSE;
326 278
 	}
327 279
 	return TRUE;
... ...
@@ -43,15 +43,12 @@ typedef enum {
43 43
 typedef struct threadpool_tag {
44 44
 	pthread_mutex_t pool_mutex;
45 45
 	pthread_cond_t pool_cond;
46
-	pthread_cond_t pool_notfull_cond;
47 46
 	pthread_attr_t pool_attr;
48 47
 	
49
-	volatile pool_state_t state;
50
-	volatile int thr_max;
51
-	volatile int thr_max_queue;
52
-	volatile int thr_queued;
53
-	volatile int thr_alive;
54
-	volatile int thr_idle;
48
+	pool_state_t state;
49
+	int thr_max;
50
+	int thr_alive;
51
+	int thr_idle;
55 52
 	int idle_timeout;
56 53
 	
57 54
 	void (*handler)(void *);
... ...
@@ -59,7 +56,7 @@ typedef struct threadpool_tag {
59 59
 	work_queue_t *queue;
60 60
 } threadpool_t;
61 61
 
62
-threadpool_t *thrmgr_new(int max_threads, int max_dispatch_queue, int idle_timeout, void (*handler)(void *));
62
+threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *));
63 63
 void thrmgr_destroy(threadpool_t *threadpool);
64 64
 int thrmgr_dispatch(threadpool_t *threadpool, void *user_data);
65 65