Browse code

Don't allow bulk requests (multiscan, idsession) to fill more than 50% of the queue. (bb #1732)

This makes clamd responsive to simple (version,contscan,etc.) requests
even during multiscan.
Previously these would get stuck behind a ~100 item queue, and the 1:4 ratio
of executing these commands wasn't working, since the commands weren't in the queue
in the first place.

Török Edvin authored on 2010/02/01 21:06:07
Showing 2 changed files
... ...
@@ -345,7 +345,8 @@ void thrmgr_destroy(threadpool_t *threadpool)
345 345
 
346 346
 	pthread_mutex_destroy(&(threadpool->pool_mutex));
347 347
 	pthread_cond_destroy(&(threadpool->idle_cond));
348
-	pthread_cond_destroy(&(threadpool->queueable_cond));
348
+	pthread_cond_destroy(&(threadpool->queueable_single_cond));
349
+	pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
349 350
 	pthread_cond_destroy(&(threadpool->pool_cond));
350 351
 	pthread_attr_destroy(&(threadpool->pool_attr));
351 352
 	free(threadpool->single_queue);
... ...
@@ -407,7 +408,7 @@ threadpool_t *thrmgr_new(int max_threads, int idle_timeout, int max_queue, void
407 407
 		return NULL;
408 408
 	}
409 409
 
410
-	if (pthread_cond_init(&(threadpool->queueable_cond), NULL) != 0) {
410
+	if (pthread_cond_init(&(threadpool->queueable_single_cond), NULL) != 0) {
411 411
 		pthread_cond_destroy(&(threadpool->pool_cond));
412 412
 		pthread_mutex_destroy(&(threadpool->pool_mutex));
413 413
 		free(threadpool->single_queue);
... ...
@@ -416,8 +417,20 @@ threadpool_t *thrmgr_new(int max_threads, int idle_timeout, int max_queue, void
416 416
 		return NULL;
417 417
 	}
418 418
 
419
+	if (pthread_cond_init(&(threadpool->queueable_bulk_cond), NULL) != 0) {
420
+		pthread_cond_destroy(&(threadpool->queueable_single_cond));
421
+		pthread_cond_destroy(&(threadpool->pool_cond));
422
+		pthread_mutex_destroy(&(threadpool->pool_mutex));
423
+		free(threadpool->single_queue);
424
+		free(threadpool->bulk_queue);
425
+		free(threadpool);
426
+		return NULL;
427
+	}
428
+
429
+
419 430
 	if (pthread_cond_init(&(threadpool->idle_cond),NULL) != 0)  {
420
-		pthread_cond_destroy(&(threadpool->queueable_cond));
431
+		pthread_cond_destroy(&(threadpool->queueable_single_cond));
432
+		pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
421 433
 		pthread_cond_destroy(&(threadpool->pool_cond));
422 434
 		pthread_mutex_destroy(&(threadpool->pool_mutex));
423 435
 		free(threadpool->single_queue);
... ...
@@ -427,7 +440,8 @@ threadpool_t *thrmgr_new(int max_threads, int idle_timeout, int max_queue, void
427 427
 	}
428 428
 
429 429
 	if (pthread_attr_init(&(threadpool->pool_attr)) != 0) {
430
-		pthread_cond_destroy(&(threadpool->queueable_cond));
430
+		pthread_cond_destroy(&(threadpool->queueable_single_cond));
431
+		pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
431 432
 		pthread_cond_destroy(&(threadpool->idle_cond));
432 433
 		pthread_cond_destroy(&(threadpool->pool_cond));
433 434
 		pthread_mutex_destroy(&(threadpool->pool_mutex));
... ...
@@ -438,7 +452,8 @@ threadpool_t *thrmgr_new(int max_threads, int idle_timeout, int max_queue, void
438 438
 	}
439 439
 
440 440
 	if (pthread_attr_setdetachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != 0) {
441
-		pthread_cond_destroy(&(threadpool->queueable_cond));
441
+		pthread_cond_destroy(&(threadpool->queueable_single_cond));
442
+		pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
442 443
 		pthread_attr_destroy(&(threadpool->pool_attr));
443 444
 		pthread_cond_destroy(&(threadpool->idle_cond));
444 445
 		pthread_cond_destroy(&(threadpool->pool_cond));
... ...
@@ -534,8 +549,12 @@ static void stats_destroy(threadpool_t *pool)
534 534
 	pthread_mutex_unlock(&pools_lock);
535 535
 }
536 536
 
537
-static inline int thrmgr_contended(threadpool_t *pool)
537
+static inline int thrmgr_contended(threadpool_t *pool, int bulk)
538 538
 {
539
+    /* don't allow bulk items to exceed 50% of queue, so that
540
+     * non-bulk items get a chance to be in the queue */
541
+    if (bulk && pool->bulk_queue->item_count >= pool->queue_max/2)
542
+	return 1;
539 543
     return pool->bulk_queue->item_count + pool->single_queue->item_count
540 544
 	+ pool->thr_alive - pool->thr_idle >= pool->queue_max;
541 545
 }
... ...
@@ -574,9 +593,14 @@ static void *thrmgr_pop(threadpool_t *pool)
574 574
 	}
575 575
     }
576 576
 
577
-    if (!thrmgr_contended(pool)) {
578
-	logg("$THRMGR: queue crossed low threshold -> signaling\n");
579
-	pthread_cond_signal(&pool->queueable_cond);
577
+    if (!thrmgr_contended(pool, 0)) {
578
+	logg("$THRMGR: queue (single) crossed low threshold -> signaling\n");
579
+	pthread_cond_signal(&pool->queueable_single_cond);
580
+    }
581
+
582
+    if (!thrmgr_contended(pool, 1)) {
583
+	logg("$THRMGR: queue (bulk) crossed low threshold -> signaling\n");
584
+	pthread_cond_signal(&pool->queueable_bulk_cond);
580 585
     }
581 586
 
582 587
     return task;
... ...
@@ -667,6 +691,7 @@ static int thrmgr_dispatch_internal(threadpool_t *threadpool, void *user_data, i
667 667
 
668 668
 	do {
669 669
 	    work_queue_t *queue;
670
+	    pthread_cond_t *queueable_cond;
670 671
 	    int items;
671 672
 
672 673
 	    if (threadpool->state != POOL_VALID) {
... ...
@@ -674,14 +699,17 @@ static int thrmgr_dispatch_internal(threadpool_t *threadpool, void *user_data, i
674 674
 		break;
675 675
 	    }
676 676
 
677
-	    if (bulk)
677
+	    if (bulk) {
678 678
 		queue = threadpool->bulk_queue;
679
-	    else
679
+		queueable_cond = &threadpool->queueable_bulk_cond;
680
+	    } else {
680 681
 		queue = threadpool->single_queue;
682
+		queueable_cond = &threadpool->queueable_single_cond;
683
+	    }
681 684
 
682
-	    while (thrmgr_contended(threadpool)) {
685
+	    while (thrmgr_contended(threadpool, bulk)) {
683 686
 		logg("$THRMGR: contended, sleeping\n");
684
-		pthread_cond_wait(&threadpool->queueable_cond, &threadpool->pool_mutex);
687
+		pthread_cond_wait(queueable_cond, &threadpool->pool_mutex);
685 688
 		logg("$THRMGR: contended, woken\n");
686 689
 	    }
687 690
 
... ...
@@ -726,7 +754,7 @@ int thrmgr_group_dispatch(threadpool_t *threadpool, jobgroup_t *group, void *use
726 726
 	logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs);
727 727
 	pthread_mutex_unlock(&group->mutex);
728 728
     }
729
-    if (!(ret = thrmgr_dispatch_internal(threadpool, user_data, 1)) && group) {
729
+    if (!(ret = thrmgr_dispatch_internal(threadpool, user_data, group ? 1 : 0)) && group) {
730 730
 	pthread_mutex_lock(&group->mutex);
731 731
 	group->jobs--;
732 732
 	logg("$THRMGR: active jobs for %p: %d\n", group, group->jobs);
... ...
@@ -62,8 +62,9 @@ typedef struct threadpool_tag {
62 62
 	pthread_attr_t pool_attr;
63 63
 
64 64
 	pthread_cond_t  idle_cond;
65
-	pthread_cond_t  queueable_cond;
66
-	
65
+	pthread_cond_t  queueable_single_cond;
66
+	pthread_cond_t  queueable_bulk_cond;
67
+
67 68
 	pool_state_t state;
68 69
 	int thr_max;
69 70
 	int queue_max;