Browse code

minimize memory fragmentation after db reload (bb#1028)

git-svn: trunk@3873

Tomasz Kojm authored on 2008/06/02 21:00:37
Showing 5 changed files
... ...
@@ -1,3 +1,9 @@
1
+Mon Jun  2 13:09:57 CEST 2008 (tk)
2
+----------------------------------
3
+  * clamd: minimize memory fragmentation after db reload (bb#1028,
4
+	   OPTIMIZE_MEMORY_FOOTPRINT currently defined by default)
5
+	   Patch from LEE, Kok-Seng <kokseng*88pobox.com>
6
+
1 7
 Fri May 30 11:40:56 CEST 2008 (tk)
2 8
 ----------------------------------
3 9
   * libclamunrar_iface, clamd, freshclam: merge win32 patches from NJH (bb#802)
... ...
@@ -83,6 +83,16 @@ typedef struct client_conn_tag {
83 83
     int nsockets;
84 84
 } client_conn_t;
85 85
 
86
+static void scanner_thread_cleanup(void *arg)
87
+{
88
+	client_conn_t *conn = (client_conn_t *) arg;
89
+
90
+    shutdown(conn->sd, 2);
91
+    closesocket(conn->sd);
92
+    cl_free(conn->engine);
93
+    free(conn);
94
+}
95
+
86 96
 static void scanner_thread(void *arg)
87 97
 {
88 98
 	client_conn_t *conn = (client_conn_t *) arg;
... ...
@@ -110,6 +120,9 @@ static void scanner_thread(void *arg)
110 110
     if(!timeout)
111 111
     	timeout = -1;
112 112
 
113
+    /* register cleanup procedure in this thread */
114
+    pthread_cleanup_push(scanner_thread_cleanup, arg);
115
+   
113 116
     do {
114 117
     	ret = command(conn->sd, conn->engine, conn->limits, conn->options, conn->copt, timeout);
115 118
 	if (ret < 0) {
... ...
@@ -155,10 +168,7 @@ static void scanner_thread(void *arg)
155 155
 	}
156 156
     } while (session);
157 157
 
158
-    shutdown(conn->sd, 2);
159
-    closesocket(conn->sd);
160
-    cl_free(conn->engine);
161
-    free(conn);
158
+    pthread_cleanup_pop(1);
162 159
     return;
163 160
 }
164 161
 
... ...
@@ -621,6 +631,10 @@ int acceptloop_th(int *socketds, int nsockets, struct cl_engine *engine, unsigne
621 621
 
622 622
 	pthread_mutex_lock(&reload_mutex);
623 623
 	if(reload) {
624
+#ifdef OPTIMIZE_MEMORY_FOOTPRINT
625
+	    /* Signal all worker threads to STOP, wait till no more acive threads */
626
+	    thrmgr_worker_stop_wait(thr_pool);
627
+#endif	    
624 628
 	    pthread_mutex_unlock(&reload_mutex);
625 629
 	    engine = reload_db(engine, dboptions, copt, FALSE, &ret);
626 630
 	    if(ret) {
... ...
@@ -633,10 +647,15 @@ int acceptloop_th(int *socketds, int nsockets, struct cl_engine *engine, unsigne
633 633
 #endif
634 634
 		break;
635 635
 	    }
636
+
636 637
 	    pthread_mutex_lock(&reload_mutex);
637 638
 	    reload = 0;
638 639
 	    time(&reloaded_time);
639 640
 	    pthread_mutex_unlock(&reload_mutex);
641
+#ifdef OPTIMIZE_MEMORY_FOOTPRINT    
642
+	    /* Resume thread pool worker threads */
643
+	    thrmgr_setstate(thr_pool, POOL_VALID);
644
+#endif
640 645
 #ifdef CLAMUKO
641 646
 	    if(cfgopt(copt, "ClamukoScanOnAccess")->enabled) {
642 647
 		logg("Stopping and restarting Clamuko.\n");
... ...
@@ -210,6 +210,90 @@ threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void
210 210
 	return threadpool;
211 211
 }
212 212
 
213
+#ifdef OPTIMIZE_MEMORY_FOOTPRINT
214
+/**
215
+ * thrmgr_worker_stop_wait : set state to POOL_STOP, wake all thread worker, wait for them
216
+ * to exit before continuing.
217
+ */
218
+void thrmgr_worker_stop_wait(threadpool_t * const threadpool)
219
+{
220
+	struct timespec timeout;
221
+	int ret_cond;
222
+	int loop = 2;
223
+	
224
+	if (!threadpool || (threadpool->state != POOL_VALID)) {
225
+		return;
226
+	}
227
+  	if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
228
+   		logg("!Mutex lock failed\n");
229
+    		exit(-1);
230
+	}
231
+	threadpool->state = POOL_STOP;
232
+	
233
+	/* wait for threads to exit */
234
+	if (threadpool->thr_alive > 0) {
235
+		logg("*%u active threads: waking them and entering wait loop\n", threadpool->thr_alive);
236
+		if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) {
237
+			pthread_mutex_unlock(&threadpool->pool_mutex);
238
+			logg("!Fatal: failed in cond broadcast 'pool_cond'\n");
239
+			return;
240
+		}
241
+	}
242
+	/* now, wait for the threads to exit, make 'loop' number of tries,  */
243
+	while (threadpool->thr_alive > 0 && loop--) {		
244
+		logg("*%u active threads. Waiting.\n", threadpool->thr_alive);
245
+		timeout.tv_sec = time(NULL) + (threadpool->idle_timeout/2) + 10L;
246
+		timeout.tv_nsec = 0;
247
+		ret_cond = pthread_cond_timedwait (&threadpool->pool_cond, &threadpool->pool_mutex, &timeout);
248
+		if (ret_cond == ETIMEDOUT) {
249
+			logg("*%u active threads. Continue to wait.\n", threadpool->thr_alive);
250
+		} else if (ret_cond == 0) {
251
+			logg("*Received signal. %u active threads.\n", threadpool->thr_alive);
252
+		}
253
+	}
254
+  	if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
255
+    		logg("!Mutex unlock failed\n");
256
+    		exit(-1);
257
+  	}
258
+}
259
+#endif
260
+#ifdef OPTIMIZE_MEMORY_FOOTPRINT
261
+void thrmgr_setstate(threadpool_t * const threadpool, pool_state_t state )
262
+{
263
+  	if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
264
+   		logg("!Mutex lock failed\n");
265
+    		exit(-1);
266
+	}
267
+	threadpool->state = state;
268
+  	if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
269
+    		logg("!Mutex unlock failed\n");
270
+    		exit(-1);
271
+  	}
272
+}
273
+#endif
274
+
275
+static void *thrmgr_worker_cleanup(void *arg)
276
+{
277
+	threadpool_t *threadpool = (threadpool_t *) arg;
278
+	
279
+	if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
280
+		/* Fatal error */
281
+		logg("!Fatal: mutex lock failed\n");
282
+		exit(-2);
283
+	}
284
+	(threadpool->thr_alive) && threadpool->thr_alive--;
285
+	logg("*Thread clean up, %u active threads.", threadpool->thr_alive);
286
+	if (threadpool->thr_alive == 0) {
287
+		/* signal that all threads are finished */
288
+		pthread_cond_broadcast(&threadpool->pool_cond);
289
+	}
290
+	if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
291
+		/* Fatal error */
292
+		logg("!Fatal: mutex unlock failed\n");
293
+		exit(-2);
294
+	}
295
+}
296
+
213 297
 static void *thrmgr_worker(void *arg)
214 298
 {
215 299
 	threadpool_t *threadpool = (threadpool_t *) arg;
... ...
@@ -217,58 +301,44 @@ static void *thrmgr_worker(void *arg)
217 217
 	int retval, must_exit = FALSE;
218 218
 	struct timespec timeout;
219 219
 	
220
+	/* Register cleanup procedure for worker in current thread */
221
+	pthread_cleanup_push(thrmgr_worker_cleanup, arg);
222
+	
220 223
 	/* loop looking for work */
221 224
 	for (;;) {
222 225
 		if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
223
-			/* Fatal error */
224 226
 			logg("!Fatal: mutex lock failed\n");
225 227
 			exit(-2);
226 228
 		}
227 229
 		timeout.tv_sec = time(NULL) + threadpool->idle_timeout;
228 230
 		timeout.tv_nsec = 0;
229 231
 		threadpool->thr_idle++;
230
-		while (((job_data=work_queue_pop(threadpool->queue)) == NULL)
232
+		while ( must_exit == FALSE 
233
+				&& ((job_data = work_queue_pop(threadpool->queue)) == NULL)
231 234
 				&& (threadpool->state != POOL_EXIT)) {
232 235
 			/* Sleep, awaiting wakeup */
233 236
 			pthread_cond_signal(&threadpool->idle_cond);
234 237
 			retval = pthread_cond_timedwait(&(threadpool->pool_cond),
235 238
 				&(threadpool->pool_mutex), &timeout);
236
-			if (retval == ETIMEDOUT) {
239
+			if (retval == ETIMEDOUT)
237 240
 				must_exit = TRUE;
238
-				break;
239
-			}
240 241
 		}
241 242
 		threadpool->thr_idle--;
242 243
 		if (threadpool->state == POOL_EXIT) {
243 244
 			must_exit = TRUE;
244 245
 		}
245
-		
246
+
246 247
 		if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
247
-			/* Fatal error */
248 248
 			logg("!Fatal: mutex unlock failed\n");
249 249
 			exit(-2);
250 250
 		}
251
-		if (job_data) {
252
-			threadpool->handler(job_data);
253
-		} else if (must_exit) {
254
-			break;
255
-		}
256
-	}
257
-	if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
258
-		/* Fatal error */
259
-		logg("!Fatal: mutex lock failed\n");
260
-		exit(-2);
261
-	}
262
-	threadpool->thr_alive--;
263
-	if (threadpool->thr_alive == 0) {
264
-		/* signal that all threads are finished */
265
-		pthread_cond_broadcast(&threadpool->pool_cond);
266
-	}
267
-	if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
268
-		/* Fatal error */
269
-		logg("!Fatal: mutex unlock failed\n");
270
-		exit(-2);
251
+		if (must_exit) break;
252
+		if (job_data) threadpool->handler(job_data);
253
+		if (threadpool->state == POOL_STOP) break;
271 254
 	}
255
+	
256
+	pthread_yield(); /* do not remove on premptive kernel e.g linux 2.6 */
257
+	pthread_cleanup_pop(1);
272 258
 	return NULL;
273 259
 }
274 260
 
... ...
@@ -289,7 +359,6 @@ int thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
289 289
 	if (threadpool->state != POOL_VALID) {
290 290
 		if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
291 291
 			logg("!Mutex unlock failed\n");
292
-			return FALSE;
293 292
 		}
294 293
 		return FALSE;
295 294
 	}
... ...
@@ -26,6 +26,13 @@
26 26
 #include <sys/time.h>
27 27
 #endif
28 28
 
29
+/**
30
+ * OPTIMIZE_MEMORY_FOOTPRINT : #ifdef, it will force all worker threads to terminate
31
+ * before switching to new database, thereby, avoiding fragmenting memory due which
32
+ * will cause swelling resident memory during run-time.
33
+ */
34
+#define OPTIMIZE_MEMORY_FOOTPRINT
35
+
29 36
 typedef struct work_item_tag {
30 37
 	struct work_item_tag *next;
31 38
 	void *data;
... ...
@@ -41,6 +48,7 @@ typedef struct work_queue_tag {
41 41
 typedef enum {
42 42
 	POOL_INVALID,
43 43
 	POOL_VALID,
44
+	POOL_STOP,	/* All worker threads should exit */
44 45
 	POOL_EXIT
45 46
 } pool_state_t;
46 47
 
... ...
@@ -66,4 +74,13 @@ threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void
66 66
 void thrmgr_destroy(threadpool_t *threadpool);
67 67
 int thrmgr_dispatch(threadpool_t *threadpool, void *user_data);
68 68
 
69
+#ifdef OPTIMIZE_MEMORY_FOOTPRINT
70
+/**
71
+ * thrmgr_worker_stop_wait : set state to POOL_STOP, wake all thread worker, wait for them
72
+ * to exit before continuing.
73
+ */
74
+void thrmgr_worker_stop_wait(threadpool_t * const threadpool);
75
+void thrmgr_setstate(threadpool_t * const threadpool, pool_state_t state);
76
+#endif
77
+
69 78
 #endif
... ...
@@ -1692,7 +1692,9 @@ void cl_free(struct cl_engine *engine)
1692 1692
     pthread_mutex_lock(&cli_ref_mutex);
1693 1693
 #endif
1694 1694
 
1695
-    engine->refcount--;
1695
+    if(engine->refcount)
1696
+	engine->refcount--;
1697
+
1696 1698
     if(engine->refcount) {
1697 1699
 #ifdef CL_THREAD_SAFE
1698 1700
 	pthread_mutex_unlock(&cli_ref_mutex);