Browse code

clamd: integrated new thread manager

git-svn-id: file:///var/lib/svn/clamav-devel/trunk/clamav-devel@296 77e5149b-7576-45b1-b177-96237e5ba77b

Tomasz Kojm authored on 2004/02/17 09:08:25
Showing 15 changed files
... ...
@@ -1,3 +1,9 @@
1
+Tue Feb 17 01:06:58 CET 2004 (tk)
2
+---------------------------------
3
+  * clamd: + integrated new thread manager from Trog
4
+	   + --debug (enables core dumping)
5
+  * contrib: Trashscan v0.10 (trashware*gmx.de)
6
+
1 7
 Mon Feb 16 14:19:42 CET 2004 (tk)
2 8
 ---------------------------------
3 9
   V 0.67-1 (increased version number of clamav-milter)
... ...
@@ -32,6 +32,8 @@ clamd_SOURCES = \
32 32
     localserver.h \
33 33
     session.c \
34 34
     session.h \
35
+    thrmgr.c \
36
+    thrmgr.h \
35 37
     server-th.c \
36 38
     server.h \
37 39
     scanner.c \
... ...
@@ -130,6 +130,8 @@ install_sh = @install_sh@
130 130
 @BUILD_CLAMD_TRUE@    localserver.h \
131 131
 @BUILD_CLAMD_TRUE@    session.c \
132 132
 @BUILD_CLAMD_TRUE@    session.h \
133
+@BUILD_CLAMD_TRUE@    thrmgr.c \
134
+@BUILD_CLAMD_TRUE@    thrmgr.h \
133 135
 @BUILD_CLAMD_TRUE@    server-th.c \
134 136
 @BUILD_CLAMD_TRUE@    server.h \
135 137
 @BUILD_CLAMD_TRUE@    scanner.c \
... ...
@@ -161,9 +163,10 @@ PROGRAMS = $(sbin_PROGRAMS)
161 161
 @BUILD_CLAMD_TRUE@am_clamd_OBJECTS = options.$(OBJEXT) cfgfile.$(OBJEXT) \
162 162
 @BUILD_CLAMD_TRUE@	clamd.$(OBJEXT) tcpserver.$(OBJEXT) \
163 163
 @BUILD_CLAMD_TRUE@	localserver.$(OBJEXT) session.$(OBJEXT) \
164
-@BUILD_CLAMD_TRUE@	server-th.$(OBJEXT) scanner.$(OBJEXT) \
165
-@BUILD_CLAMD_TRUE@	others.$(OBJEXT) clamuko.$(OBJEXT) \
166
-@BUILD_CLAMD_TRUE@	dazukoio.$(OBJEXT) tests.$(OBJEXT)
164
+@BUILD_CLAMD_TRUE@	thrmgr.$(OBJEXT) server-th.$(OBJEXT) \
165
+@BUILD_CLAMD_TRUE@	scanner.$(OBJEXT) others.$(OBJEXT) \
166
+@BUILD_CLAMD_TRUE@	clamuko.$(OBJEXT) dazukoio.$(OBJEXT) \
167
+@BUILD_CLAMD_TRUE@	tests.$(OBJEXT)
167 168
 clamd_OBJECTS = $(am_clamd_OBJECTS)
168 169
 @BUILD_CLAMD_TRUE@clamd_DEPENDENCIES = ../clamscan/getopt.o
169 170
 @BUILD_CLAMD_FALSE@clamd_DEPENDENCIES =
... ...
@@ -178,7 +181,8 @@ am__depfiles_maybe = depfiles
178 178
 @AMDEP_TRUE@	./$(DEPDIR)/localserver.Po ./$(DEPDIR)/options.Po \
179 179
 @AMDEP_TRUE@	./$(DEPDIR)/others.Po ./$(DEPDIR)/scanner.Po \
180 180
 @AMDEP_TRUE@	./$(DEPDIR)/server-th.Po ./$(DEPDIR)/session.Po \
181
-@AMDEP_TRUE@	./$(DEPDIR)/tcpserver.Po ./$(DEPDIR)/tests.Po
181
+@AMDEP_TRUE@	./$(DEPDIR)/tcpserver.Po ./$(DEPDIR)/tests.Po \
182
+@AMDEP_TRUE@	./$(DEPDIR)/thrmgr.Po
182 183
 COMPILE = $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) \
183 184
 	$(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS)
184 185
 LTCOMPILE = $(LIBTOOL) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) \
... ...
@@ -249,6 +253,7 @@ distclean-compile:
249 249
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/session.Po@am__quote@
250 250
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/tcpserver.Po@am__quote@
251 251
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/tests.Po@am__quote@
252
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/thrmgr.Po@am__quote@
252 253
 
253 254
 distclean-depend:
254 255
 	-rm -rf ./$(DEPDIR)
... ...
@@ -71,6 +71,11 @@ void clamd(struct optstruct *opt)
71 71
     	help();
72 72
     }
73 73
 
74
+    if(optl(opt, "debug"))
75
+	debug_mode = 1;
76
+    else
77
+	debug_mode = 0;
78
+
74 79
     /* parse the config file */
75 80
     if(optc(opt, 'c'))
76 81
 	cfgfile = getargc(opt, 'c');
... ...
@@ -246,6 +251,7 @@ void help(void)
246 246
 
247 247
     printf("    --help                   -h             Show this help.\n");
248 248
     printf("    --version                -V             Show version number.\n");
249
+    printf("    --debug                                 Enable debug mode.\n");
249 250
     printf("    --config-file=FILE       -c FILE        Read configuration from FILE.\n\n");
250 251
 
251 252
     exit(0);
... ...
@@ -267,7 +273,8 @@ void daemonize(void)
267 267
 	dup2(i, 2);
268 268
     }
269 269
 
270
-    chdir("/");
270
+    if(!debug_mode)
271
+	chdir("/");
271 272
 
272 273
     if(fork())
273 274
 	exit(0);
... ...
@@ -22,7 +22,7 @@
22 22
 #define CL_DEFAULT_CFG CONFDIR"/clamav.conf"
23 23
 #define CL_DEFAULT_BACKLOG 15
24 24
 #define CL_DEFAULT_MAXTHREADS 5
25
-#define CL_DEFAULT_SCANTIMEOUT 180
25
+#define CL_DEFAULT_SCANTIMEOUT 60
26 26
 #define CL_DEFAULT_LOGSIZE 1048576
27 27
 #define CL_DEFAULT_CLAMUKOMAXFILESIZE 5 * 1048576
28 28
 #define CL_DEFAULT_SELFCHECK 3600
... ...
@@ -50,6 +50,7 @@ int main(int argc, char **argv)
50 50
 	    {"help", 0, 0, 'h'},
51 51
 	    {"config-file", 1, 0, 'c'},
52 52
 	    {"version", 0, 0, 'V'},
53
+	    {"debug", 0, 0, 0},
53 54
 	    {0, 0, 0, 0}
54 55
     	};
55 56
 
... ...
@@ -28,7 +28,7 @@ void *mmalloc(size_t size);
28 28
 void *mcalloc(size_t nmemb, size_t size);
29 29
 void chomp(char *string);
30 30
 
31
-short int logverbose, logcompressed, loglock, logtime, sighup, logok;
31
+short int logverbose, logcompressed, loglock, logtime, sighup, logok, debug_mode;
32 32
 int logsize;
33 33
 const char *logfile;
34 34
 int logg(const char *str, ...);
... ...
@@ -31,7 +31,9 @@
31 31
 #include <sys/types.h>
32 32
 #include <sys/socket.h>
33 33
 #include <netinet/in.h>
34
+#include <errno.h>
34 35
 #include <clamav.h>
36
+#include <sys/poll.h>
35 37
 
36 38
 #include "cfgfile.h"
37 39
 #include "others.h"
... ...
@@ -184,14 +186,14 @@ int scan(const char *filename, unsigned long int *scanned, const struct cl_node
184 184
 
185 185
 int scanstream(int odesc, unsigned long int *scanned, const struct cl_node *root, const struct cl_limits *limits, int options, const struct cfgstruct *copt)
186 186
 {
187
-	int ret, portscan = CL_DEFAULT_MAXPORTSCAN, sockfd, port, acceptd, tmpd, bread;
187
+	int ret, flags, portscan = CL_DEFAULT_MAXPORTSCAN, sockfd, port, acceptd, tmpd, bread, count;
188 188
 	long int size = 0, maxsize = 0;
189 189
 	short bound = 0;
190 190
 	char *virname, buff[32768];
191 191
 	struct sockaddr_in server;
192 192
 	struct cfgstruct *cpt;
193 193
 	FILE *tmp = NULL;
194
-
194
+	struct pollfd poll_data[1];
195 195
 
196 196
     while(!bound && portscan--) {
197 197
 	if((port = cl_rndnum(60000)) < 1024)
... ...
@@ -221,6 +223,25 @@ int scanstream(int odesc, unsigned long int *scanned, const struct cl_node *root
221 221
 	mdprintf(odesc, "PORT %d\n", port);
222 222
     }
223 223
 
224
+    poll_data[0].fd = sockfd;
225
+    poll_data[0].events = POLLIN;
226
+    poll_data[0].revents = 0;
227
+
228
+    while (1) {
229
+	count = poll(poll_data, 1, CL_DEFAULT_SCANTIMEOUT*1000); /* wait for timeout */
230
+	if (count != 1) {
231
+		if ((count == -1) && (errno == EINTR)) {
232
+			continue;
233
+		}
234
+		close(sockfd);
235
+		mdprintf(odesc, "ERROR\n");
236
+		logg("!ScanStream: accept timeout.\n");
237
+		return -1;
238
+	} else {
239
+		break;
240
+	}
241
+    }
242
+
224 243
     if((acceptd = accept(sockfd, NULL, NULL)) == -1) {
225 244
 	close(sockfd);
226 245
 	mdprintf(odesc, "accept() ERROR\n");
... ...
@@ -231,7 +252,12 @@ int scanstream(int odesc, unsigned long int *scanned, const struct cl_node *root
231 231
 
232 232
     logg("*Accepted connection on port %d, fd %d\n", port, acceptd);
233 233
 
234
-    if(cfgopt(copt, "StreamSaveToDisk")) {
234
+    poll_data[0].fd = acceptd;
235
+    poll_data[0].events = POLLIN;
236
+    poll_data[0].revents = 0;
237
+
238
+    /* StreamSaveToDisk is enforced, to ensure timeoute */
239
+    /*if(cfgopt(copt, "StreamSaveToDisk")) {	*/
235 240
 	if((tmp = tmpfile()) == NULL) {
236 241
 	    shutdown(sockfd, 2);
237 242
 	    close(sockfd);
... ...
@@ -245,7 +271,11 @@ int scanstream(int odesc, unsigned long int *scanned, const struct cl_node *root
245 245
 	if((cpt = cfgopt(copt, "StreamMaxLength")))
246 246
 	    maxsize = cpt->numarg;
247 247
 
248
-	while((bread = read(acceptd, buff, sizeof(buff))) > 0) {
248
+	while((count = poll(poll_data, 1, CL_DEFAULT_SCANTIMEOUT*1000)) == 1) {
249
+	    bread = read(acceptd, buff, sizeof(buff));
250
+	    if (bread <= 0) {
251
+		break;
252
+	    }
249 253
 	    size += bread;
250 254
 
251 255
 	    if(maxsize && (size + sizeof(buff)) > maxsize) {
... ...
@@ -277,8 +307,9 @@ int scanstream(int odesc, unsigned long int *scanned, const struct cl_node *root
277 277
 	if(tmp)
278 278
 	    fclose(tmp);
279 279
 
280
-    } else
280
+    /* } else
281 281
 	ret = cl_scandesc(acceptd, &virname, scanned, root, limits, 0);
282
+	*/
282 283
 
283 284
     close(acceptd);
284 285
     close(sockfd);
... ...
@@ -1,5 +1,6 @@
1 1
 /*
2 2
  *  Copyright (C) 2002 - 2004 Tomasz Kojm <tkojm@clamav.net>
3
+ *			      Trog <trog@clamav.net>
3 4
  *
4 5
  *  This program is free software; you can redistribute it and/or modify
5 6
  *  it under the terms of the GNU General Public License as published by
... ...
@@ -16,378 +17,190 @@
16 16
  *  Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17 17
  */
18 18
 
19
-#if HAVE_CONFIG_H
20
-#include "clamav-config.h"
21
-#endif
22
-
23
-#include <stdio.h>
24
-#include <stdlib.h>
25
-#include <string.h>
26
-#include <unistd.h>
27
-#include <sys/types.h>
28
-#include <sys/socket.h>
29 19
 #include <pthread.h>
30
-#include <time.h>
20
+#include <errno.h>
31 21
 #include <signal.h>
22
+#include <stdio.h>
23
+#include <time.h>
32 24
 
33
-#include "cfgfile.h"
34
-#include "others.h"
35
-#include "defaults.h"
36
-#include "scanner.h"
37 25
 #include "server.h"
38
-#include "clamuko.h"
39
-#include "tests.h"
26
+#include "thrmgr.h"
40 27
 #include "session.h"
41
-#include "../target.h"
28
+#include "defaults.h"
29
+#include "clamuko.h"
30
+#include "others.h"
42 31
 
43
-#ifdef CLAMUKO
44
-pthread_t clamukoid;
45
-#endif
32
+#define BUFFSIZE 1024
33
+#define FALSE (0)
34
+#define TRUE (1)
46 35
 
47
-#ifdef TARGET_OS_DARWIN5_5
48
-#define	pthread_sigmask(A, B, C)    sigprocmask((A), (B), (C))
49
-#define	pthread_kill(A, B)	{ }
50
-#endif
36
+int progexit = 0;
37
+pthread_mutex_t exit_mutex;
38
+int reload = 0;
39
+pthread_mutex_t reload_mutex;
51 40
 
41
+typedef struct client_conn_tag {
42
+    int sd;
43
+    int options;
44
+    const struct cfgstruct *copt;
45
+    const struct cl_node *root;
46
+    const struct cl_limits *limits;
47
+} client_conn_t;
52 48
 
53
-void *threadscanner(void *arg)
49
+void scanner_thread(void *arg)
54 50
 {
55
-	struct thrarg *tharg = (struct thrarg *) arg;
51
+	client_conn_t *conn = (client_conn_t *) arg;
56 52
 	sigset_t sigset;
57 53
 	int ret;
58 54
 
55
+
59 56
     /* ignore all signals */
60 57
     sigfillset(&sigset);
61 58
     pthread_sigmask(SIG_SETMASK, &sigset, NULL);
62 59
 
63
-    ret = command(ths[tharg->sid].desc, tharg->root, tharg->limits, tharg->options, tharg->copt);
60
+    ret = command(conn->sd, conn->root, conn->limits, conn->options, conn->copt);
64 61
 
65 62
     switch(ret) {
66 63
 	case COMMAND_QUIT:
64
+	    pthread_mutex_lock(&exit_mutex);
67 65
 	    progexit = 1;
66
+	    pthread_mutex_unlock(&exit_mutex);
68 67
 	    break;
69 68
 
70 69
 	case COMMAND_RELOAD:
70
+	    pthread_mutex_lock(&reload_mutex);
71 71
 	    reload = 1;
72
+	    pthread_mutex_unlock(&reload_mutex);
72 73
 	    break;
73
-    }
74
+	}
74 75
 
75
-    close(ths[tharg->sid].desc);
76
-    ths[tharg->sid].active = 0;
77
-    /* this mutex is rather useless */
78
-    /* pthread_mutex_unlock(&ths[tharg->sid].mutex); */
79
-    free(tharg);
80
-    return NULL;
76
+    close(conn->sd);
77
+    free(conn);
78
+    return;
81 79
 }
82 80
 
83
-/* this function takes care for threads, exit and various checks */
84
-
85
-void *threadwatcher(void *arg)
81
+void sighandler_th(int sig)
86 82
 {
87
-	struct thrwarg *thwarg = (struct thrwarg *) arg;
88
-	struct cfgstruct *cpt;
89
-	sigset_t sigset;
90
-	int i, j, ret, virnum;
91
-	unsigned long int timer = 0;
92
-	unsigned int timeout, threads, selfchk;
93
-	short int need_wait = 0, do_loop = 0, db_problem = 0;
94
-	const char *dbdir;
95
-	struct cl_stat dbstat;
96
-#ifdef CLAMUKO
97
-	struct thrarg *tharg;
98
-	pthread_attr_t thattr;
99
-	int maxwait;
100
-#endif
101
-
102
-
103
-    /* ignore all signals (except for SIGSEGV) */
104
-    sigfillset(&sigset);
105
-    sigdelset(&sigset, SIGSEGV);
106
-    pthread_sigmask(SIG_SETMASK, &sigset, NULL);
107
-
108
-#ifdef C_LINUX
109
-    logg("*ThreadWatcher: Started in process %d\n", getpid());
110
-#endif
111
-
112
-    if((cpt = cfgopt(thwarg->copt, "MaxThreads")))
113
-	threads = cpt->numarg;
114
-    else
115
-	threads = CL_DEFAULT_MAXTHREADS;
116
-
117
-    if((cpt = cfgopt(thwarg->copt, "SelfCheck")))
118
-	selfchk = cpt->numarg;
119
-    else
120
-	selfchk = CL_DEFAULT_SELFCHECK;
121
-
122
-    if(!selfchk) {
123
-	logg("^Self checking disabled.\n");
124
-    } else
125
-	logg("Self checking every %d seconds.\n", selfchk);
126
-
127
-    if((cpt = cfgopt(thwarg->copt, "ThreadTimeout")))
128
-	timeout = cpt->numarg;
129
-    else
130
-	timeout = CL_DEFAULT_SCANTIMEOUT;
131
-
132
-    if(!timeout) {
133
-	logg("^Timeout disabled.\n");
134
-    } else
135
-	logg("Timeout set to %d seconds.\n", timeout);
136
-
137
-    if((cpt = cfgopt(thwarg->copt, "DatabaseDirectory")) || (cpt = cfgopt(thwarg->copt, "DataDirectory")))
138
-	dbdir = cpt->strarg;
139
-    else
140
-	dbdir = cl_retdbdir();
141
-
142
-    memset(&dbstat, 0, sizeof(struct cl_stat));
143
-    cl_statinidir(dbdir, &dbstat);
144
-
145
-    for(i = 0; ; i++) {
146
-
147
-        if(i == threads)
148
-	    i = 0;
83
+	time_t currtime;
84
+	int maxwait = CL_DEFAULT_MAXWHILEWAIT * 5;
85
+	int i;
149 86
 
150
-	/* check time */
151
-        if(timeout && ths[i].active) /* races are harmless here (timeout is re-set) */
152
-	    if(time(NULL) - ths[i].start > timeout) {
153
-		pthread_cancel(ths[i].id);
154
-		mdprintf(ths[i].desc, "Session(%d): Time out ERROR\n", i);
155
-		close(ths[i].desc);
156
-		logg("Session %d stopped due to timeout.\n", i);
157
-		ths[i].active = 0;
158
-		/* pthread_mutex_unlock(&ths[i].mutex); */
159
-	    }
87
+    switch(sig) {
88
+	case SIGINT:
89
+	case SIGTERM:
90
+	    progexit = 1;
91
+	    logg("*Signal %d caught -> exiting.\n", sig);
92
+	    time(&currtime);
93
+	    logg("--- Stopped at %s", ctime(&currtime));
94
+	    exit(0);
95
+	    break; /* not reached */
160 96
 
161
-	/* cancel all threads in case of quit */
162
-	if(progexit == 1) {
163
-#ifdef CLAMUKO
164
-	    /* stop clamuko */
165
-	    if(clamuko_running) {
166
-		logg("Stopping Clamuko...\n");
167
-		pthread_kill(clamukoid, SIGUSR1);
168
-		/* we must wait for Dazuko unregistration */
169
-		maxwait = CL_DEFAULT_MAXWHILEWAIT * 5;
170
-		while(clamuko_running && maxwait--)
171
-		    usleep(200000);
172
-
173
-		if(!maxwait && clamuko_running)
174
-		    logg("!Critical error: Can't stop Clamuko.\n");
175
-	    }
176
-#endif
97
+	    case SIGSEGV:
98
+		logg("Segmentation fault :-( Bye..\n");
99
+		exit(11); /* probably not reached at all */
100
+		break; /* not reached */
177 101
 
178
-	    for(j = 0; j < threads; j++)
179
-		if(ths[j].active) {
180
-		    pthread_cancel(ths[j].id);
181
-		    mdprintf(ths[j].desc, "Session(%d): Stopped (exiting)\n", j);
182
-		    close(ths[j].desc);
183
-		    logg("Session %d stopped (exiting).\n", j);
184
-		    /* pthread_mutex_unlock(&ths[j].mutex); */
185
-		}
186
-#ifndef C_BSD
187
-	    logg("*Freeing trie structure.\n");
188
-	    cl_freetrie(*thwarg->root);
189
-#endif
190
-	    logg("*Shutting down the main socket.\n");
191
-	    shutdown(thwarg->socketd, 2);
192
-	    logg("*Closing the main socket.\n");
193
-	    close(thwarg->socketd);
194
-	    if((cpt = cfgopt(thwarg->copt, "LocalSocket"))) {
195
-		if(unlink(cpt->strarg) == -1)
196
-		    logg("!Can't unlink the socket file %s\n", cpt->strarg);
197
-		else
198
-		    logg("Socket file removed.\n");
199
-	    }
102
+	    case SIGHUP:
103
+		/* sighup = 1;
104
+		logg("SIGHUP catched: log file re-opened.\n"); */
105
+		break;
106
+    }
107
+}
200 108
 
201
-	    if((cpt = cfgopt(thwarg->copt, "PidFile"))) {
202
-		if(unlink(cpt->strarg) == -1)
203
-		    logg("!Can't unlink the pid file %s\n", cpt->strarg);
204
-		else
205
-		    logg("Pid file removed.\n");
206
-	    }
109
+static struct cl_node *reload_db(struct cl_node *root, const struct cfgstruct *copt, int do_check)
110
+{
111
+	char *dbdir;
112
+	int virnum=0, retval;
113
+	struct cfgstruct *cpt;
114
+	static struct cl_stat *dbstat=NULL;
207 115
 
208
-	    logg("*Freeing stat structure.\n");
209
-            cl_statfree(&dbstat);
210 116
 
211
-	    progexit = 2;
212
-	    logg("*Exit level %d, ThreadWatcher termination.\n", progexit);
213
-	    return NULL;
117
+    if(do_check) {
118
+	if(dbstat == NULL) {
119
+	    logg("No stats for Database check - forcing reload\n");
120
+	    return root;
214 121
 	}
215 122
 
216
-
217
-	/* do self checks */
218
-	if(selfchk && (db_problem || !(timer % selfchk))) {
219
-	    /* check the integrity of the database */
220
-	    if(!reload) {
221
-
222
-		if(cl_statchkdir(&dbstat) == 1) {
223
-		    logg("SelfCheck: Database modification detected. Forcing reload.\n");
224
-		    reload = 1;
225
-		    cl_statfree(&dbstat);
226
-		    cl_statinidir(dbdir, &dbstat);
227
-		} else
228
-		    logg("SelfCheck: Database status OK.\n");
229
-
230
-		if(!testsignature(*thwarg->root)) {
231
-		    if(db_problem) {
232
-			logg("!SelfCheck: Unable to repair internal structure. Exiting.\n");
233
-			kill(progpid, SIGTERM);
234
-			continue;
235
-		    }
236
-		    /* oops */
237
-		    logg("!SelfCheck: Unable to detect test signature, forcing database reload.\n");
238
-		    db_problem = 1;
239
-		    reload = 1;
240
-		} else {
241
-		    logg("*SelfCheck: Integrity OK\n");
242
-		    db_problem = 0;
243
-		}
244
-	    }
123
+	if(cl_statchkdir(dbstat) == 1) {
124
+	    logg("SelfCheck: Database modification detected. Forcing reload.\n");
125
+	    return root;
126
+	} else {
127
+	    logg("SelfCheck: Database status OK.\n");
128
+	    return NULL;
245 129
 	}
130
+    }
246 131
 
247
-	timer++;
248
-
249
-	/* reload the database */
250
-	if(reload) {
251
-
252
-	    /* make sure the main thread doesn't start new threads */
253
-	    do {
254
-		usleep(200000);
255
-	    } while(!main_accept && !main_reload);
256
-
257
-	    /* wait until all working threads are finished */
258
-	    do {
259
-		need_wait = 0;
260
-		for(j = 0; j < threads; j++)
261
-		    if(ths[j].active) {
262
-			if(timeout && (time(NULL) - ths[j].start > timeout)) {
263
-			    do_loop = 1;
264
-			    break;
265
-			} else need_wait = 1;
266
-		    }
267
-
268
-#ifdef CLAMUKO
269
-		if(clamuko_running) {
270
-		    logg("Stopping Clamuko...\n");
271
-		    pthread_kill(clamukoid, SIGUSR1);
272
-		    /* we must wait for Dazuko unregistration */
273
-		    maxwait = CL_DEFAULT_MAXWHILEWAIT * 5;
274
-		    while(clamuko_running && maxwait--)
275
-			usleep(200000);
276
-
277
-		    if(!maxwait && clamuko_running)
278
-			logg("!Critical error: Can't stop Clamuko.\n");
279
-		    /* should we stop here ? */
280
-		}
281
-#endif
282
-		if(need_wait)
283
-		    usleep(200000);
284
-
285
-		if(progexit == 1)
286
-		    break;
287
-
288
-	    } while(need_wait);
289
-
290
-	    if(progexit == 1) {
291
-		reload = 0;
292
-		continue;
293
-	    }
294
-
295
-	    if(do_loop) {
296
-		/* some threads must be stopped in the next iteration,
297
-		 * reload is still == 1
298
-		 */
299
-		logg("Database reload: some threads must be stopped in the next iteration.\n");
300
-		do_loop = 0;
301
-		continue;
302
-	    }
303
-
304
-	    /* relase old structure */
305
-	    cl_freetrie(*thwarg->root);
306
-	    *thwarg->root = NULL;
307
-
308
-	    /* reload */
309
-
310
-	    logg("Reading databases from %s\n", dbdir);
311
-
312
-	    cl_statfree(&dbstat);
313
-	    cl_statinidir(dbdir, &dbstat);
314
-	    virnum = 0;
315
-	    if((ret = cl_loaddbdir(dbdir, &*thwarg->root, &virnum))) {
316
-		logg("!%s\n", cl_strerror(ret));
317
-		kill(progpid, SIGTERM);
318
-		/* we stay in reload == 1, so all threads are waiting */
319
-		continue;
320
-	    }
321
-
322
-	    if(! *thwarg->root) {
323
-		logg("!Database initialization problem.\n");
324
-		kill(progpid, SIGTERM);
325
-	    } else {
326
-		if((ret = cl_buildtrie(*thwarg->root)) != 0) {
327
-		    logg("!Database initialization error: can't build the trie: %s\n", cl_strerror(ret));
328
-		    kill(progpid, SIGTERM);
329
-		}
330
-		/* check integrity */
331
-		if(!testsignature(*thwarg->root)) {
332
-		    logg("!Unable to detect test signature.\n");
333
-		    kill(progpid, SIGTERM);
334
-		}
132
+    /* release old structure */
133
+    if(root) {
134
+	cl_freetrie(root);
135
+	root = NULL;
136
+    }
335 137
 
336
-		logg("Database correctly reloaded (%d viruses)\n", virnum);
337
-	    }
138
+    if((cpt = cfgopt(copt, "DatabaseDirectory")) || (cpt = cfgopt(copt, "DataDirectory"))) {
139
+	dbdir = cpt->strarg;
140
+    } else {
141
+	dbdir = cl_retdbdir();
142
+    }
143
+    logg("Reading databases from %s\n", dbdir);
338 144
 
339
-	    /* start clamuko */
340
-#ifdef CLAMUKO
145
+    if(dbstat == NULL) {
146
+	dbstat = (struct cl_stat *) mmalloc(sizeof(struct cl_stat));
147
+    } else {
148
+	cl_statfree(dbstat);
149
+    }
341 150
 
342
-	    if(cfgopt(thwarg->copt, "ClamukoScanOnLine")) {
343
-		logg("Starting Clamuko...\n");
344
-		tharg = (struct thrarg *) mcalloc(1, sizeof(struct thrarg));
345
-		tharg->copt = thwarg->copt;
346
-		tharg->root = *thwarg->root;
347
-		tharg->limits = thwarg->limits;
348
-		tharg->options = thwarg->options;
349
-
350
-		pthread_attr_init(&thattr);
351
-		pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
352
-		pthread_create(&clamukoid, &thattr, clamukoth, tharg);
353
-		pthread_attr_destroy(&thattr);
354
-	    }
355
-#endif
151
+    memset(dbstat, 0, sizeof(struct cl_stat));
152
+    cl_statinidir(dbdir, dbstat);
153
+    if((retval = cl_loaddbdir(dbdir, &root, &virnum))) {
154
+	logg("!reload db failed: %s\n", cl_strerror(retval));
155
+	exit(-1);
156
+    }
356 157
 
357
-	    reload = 0;
358
-	}
158
+    if(!root) {
159
+	logg("!load db failed: %s\n", cl_strerror(retval));
160
+	exit(-1);
161
+    }
359 162
 
360
-	sleep(1);
163
+    if((retval = cl_buildtrie(root)) != 0) {
164
+	logg("!Database initialization error: can't build the trie: %s\n",
165
+	cl_strerror(retval));
166
+	exit(-1);
361 167
     }
168
+    logg("Database correctly reloaded (%d viruses)\n", virnum);
362 169
 
363
-    return NULL;
170
+    return root;
364 171
 }
365 172
 
366
-int threads;
367
-pthread_t watcherid;
368
-
369 173
 int acceptloop_th(int socketd, struct cl_node *root, const struct cfgstruct *copt)
370 174
 {
371
-	int acceptd, i, options = 0, maxwait;
372
-	struct cfgstruct *cpt;
373
-	struct thrarg *tharg;
374
-	struct thrwarg thwarg;
175
+	int new_sd, max_threads, options=0;
176
+	thrmgr_t thrmgr;
177
+	struct sigaction sigact;
178
+	mode_t old_umask;
375 179
 	struct cl_limits limits;
376 180
 	pthread_attr_t thattr;
377
-	struct sigaction sigact;
378 181
 	sigset_t sigset;
379
-	mode_t old_umask;
380
-
182
+	client_conn_t *client_conn;
183
+	struct cfgstruct *cpt;
184
+	char *buff[BUFFSIZE+1];
185
+	unsigned int selfchk;
186
+	time_t start_time, current_time;
187
+	
381 188
 #if defined(C_BIGSTACK) || defined(C_BSD)
382
-	size_t stacksize;
189
+        size_t stacksize;
383 190
 #endif
384 191
 
385
-    memset (&sigact, 0, sizeof(struct sigaction));
192
+#ifdef CLAMUKO
193
+	pthread_t clamuko_pid;
194
+	pthread_attr_t clamuko_attr;
195
+	struct thrarg *tharg;
196
+#endif
197
+	memset(&sigact, 0, sizeof(struct sigaction));
386 198
 
387 199
     /* save the PID */
388 200
     if((cpt = cfgopt(copt, "PidFile"))) {
389 201
 	    FILE *fd;
390
-	    old_umask = umask(0006);
202
+	old_umask = umask(0006);
391 203
 	if((fd = fopen(cpt->strarg, "w")) == NULL) {
392 204
 	    logg("!Can't save PID in file %s\n", cpt->strarg);
393 205
 	} else {
... ...
@@ -398,20 +211,11 @@ int acceptloop_th(int socketd, struct cl_node *root, const struct cfgstruct *cop
398 398
     }
399 399
 
400 400
     logg("*Listening daemon: PID: %d\n", getpid());
401
-
402
-    if((cpt = cfgopt(copt, "MaxThreads")))
403
-	threads = cpt->numarg;
404
-    else
405
-	threads = CL_DEFAULT_MAXTHREADS;
406
-
407
-    logg("Maximal number of threads: %d\n", threads);
408
-
409
-    ths = (struct thrsession *) mcalloc(threads, sizeof(struct thrsession));
410
- 
411
-    /*
412
-    for(i = 0; i < threads; i++)
413
-	pthread_mutex_init(&ths[i].mutex, NULL);
414
-    */
401
+    if((cpt = cfgopt(copt, "MaxThreads"))) {
402
+	max_threads = cpt->numarg;
403
+    } else {
404
+	max_threads = CL_DEFAULT_MAXTHREADS;
405
+    }
415 406
 
416 407
     if(cfgopt(copt, "ScanArchive") || cfgopt(copt, "ClamukoScanArchive")) {
417 408
 
... ...
@@ -419,40 +223,44 @@ int acceptloop_th(int socketd, struct cl_node *root, const struct cfgstruct *cop
419 419
 	memset(&limits, 0, sizeof(struct cl_limits));
420 420
 
421 421
 	if((cpt = cfgopt(copt, "ArchiveMaxFileSize"))) {
422
-	    if((limits.maxfilesize = cpt->numarg))
422
+	    if((limits.maxfilesize = cpt->numarg)) {
423 423
 		logg("Archive: Archived file size limit set to %d bytes.\n", limits.maxfilesize);
424
-	    else
424
+	    } else {
425 425
 		logg("^Archive: File size limit protection disabled.\n");
426
+	    }
426 427
 	} else {
427 428
 	    limits.maxfilesize = 10485760;
428 429
 	    logg("^USING HARDCODED LIMIT: Archive: Archived file size limit set to %d bytes.\n", limits.maxfilesize);
429 430
 	}
430 431
 
431 432
 	if((cpt = cfgopt(copt, "ArchiveMaxRecursion"))) {
432
-	    if((limits.maxreclevel = cpt->numarg))
433
+	    if((limits.maxreclevel = cpt->numarg)) {
433 434
 		logg("Archive: Recursion level limit set to %d.\n", limits.maxreclevel);
434
-	    else
435
+	    } else {
435 436
 		logg("^Archive: Recursion level limit protection disabled.\n");
437
+	    }
436 438
 	} else {
437 439
 	    limits.maxreclevel = 5;
438 440
 	    logg("^USING HARDCODED LIMIT: Archive: Recursion level set to %d.\n", limits.maxreclevel);
439 441
 	}
440 442
 
441 443
 	if((cpt = cfgopt(copt, "ArchiveMaxFiles"))) {
442
-	    if((limits.maxfiles = cpt->numarg))
444
+	    if((limits.maxfiles = cpt->numarg)) {
443 445
 		logg("Archive: Files limit set to %d.\n", limits.maxfiles);
444
-	    else
446
+	    } else {
445 447
 		logg("^Archive: Files limit protection disabled.\n");
448
+	    }
446 449
 	} else {
447 450
 	    limits.maxfiles = 1000;
448 451
 	    logg("^USING HARDCODED LIMIT: Archive: Files limit set to %d.\n", limits.maxfiles);
449 452
 	}
450 453
 
451 454
 	if((cpt = cfgopt(copt, "ArchiveMaxCompressionRatio"))) {
452
-	    if((limits.maxratio = cpt->numarg))
455
+	    if((limits.maxratio = cpt->numarg)) {
453 456
 		logg("Archive: Compression ratio limit set to %d.\n", limits.maxratio);
454
-	    else
457
+	    } else {
455 458
 		logg("^Archive: Compression ratio limit disabled.\n");
459
+	    }
456 460
 	} else {
457 461
 	    limits.maxratio = 200;
458 462
 	    logg("^USING HARDCODED LIMIT: Archive: Compression ratio limit set to %d.\n", limits.maxratio);
... ...
@@ -461,11 +269,12 @@ int acceptloop_th(int socketd, struct cl_node *root, const struct cfgstruct *cop
461 461
 	if(cfgopt(copt, "ArchiveLimitMemoryUsage")) {
462 462
 	    limits.archivememlim = 1;
463 463
 	    logg("Archive: Limited memory usage.\n");
464
-	} else
464
+	} else {
465 465
 	    limits.archivememlim = 0;
466
+	}
466 467
     }
467 468
 
468
-    if(cfgopt(copt, "ScanArchive")) { 
469
+    if(cfgopt(copt, "ScanArchive")) {
469 470
 	logg("Archive support enabled.\n");
470 471
 	options |= CL_ARCHIVE;
471 472
 
... ...
@@ -479,64 +288,58 @@ int acceptloop_th(int socketd, struct cl_node *root, const struct cfgstruct *cop
479 479
 	logg("Archive support disabled.\n");
480 480
     }
481 481
 
482
-    if(cfgopt(copt, "ScanMail")) { 
482
+    if(cfgopt(copt, "ScanMail")) {
483 483
 	logg("Mail files support enabled.\n");
484 484
 	options |= CL_MAIL;
485 485
     } else {
486 486
 	logg("Mail files support disabled.\n");
487 487
     }
488 488
 
489
-    if(cfgopt(copt, "ScanOLE2")) { 
489
+    if(cfgopt(copt, "ScanOLE2")) {
490 490
 	logg("OLE2 support enabled.\n");
491 491
 	options |= CL_OLE2;
492 492
     } else {
493 493
 	logg("OLE2 support disabled.\n");
494 494
     }
495 495
 
496
-    /* initialize important global variables */
497
-    progexit = 0;
498
-    progpid = 0;
499
-    reload = 0;
500
-#ifdef CLAMUKO
501
-    clamuko_running = 0;
502
-#endif
496
+    if((cpt = cfgopt(copt, "SelfCheck"))) {
497
+	selfchk = cpt->numarg;
498
+    } else {
499
+	selfchk = CL_DEFAULT_SELFCHECK;
500
+    }
501
+
502
+    if(!selfchk) {
503
+	logg("Self checking disabled.\n");
504
+    } else {
505
+	logg("Self checking every %d seconds.\n", selfchk);
506
+    }
503 507
 
504 508
     pthread_attr_init(&thattr);
505 509
     pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
506 510
 
507
-    /* run clamuko */
508
-    if(cfgopt(copt, "ClamukoScanOnLine"))
509 511
 #ifdef CLAMUKO
510
-    {
511
-	tharg = (struct thrarg *) mcalloc(1, sizeof(struct thrarg));
512
-	tharg->copt = copt;
513
-	tharg->root = root;
514
-	tharg->limits = &limits;
515
-	tharg->options = options;
516
-
517
-	pthread_create(&clamukoid, &thattr, clamukoth, tharg);
518
-    }
512
+    pthread_attr_init(&clamuko_attr);
513
+    pthread_attr_setdetachstate(&clamuko_attr, PTHREAD_CREATE_JOINABLE);
514
+
515
+    tharg = (struct thrarg *) mmalloc(sizeof(struct thrarg));
516
+    tharg->copt = copt;
517
+    tharg->root = root;
518
+    tharg->limits = &limits;
519
+    tharg->options = options;
520
+
521
+    pthread_create(&clamuko_pid, &clamuko_attr, clamukoth, tharg);
519 522
 #else
520
-	logg("!Clamuko is not available.\n");
523
+    logg("!Clamuko is not available.\n");
521 524
 #endif
522 525
 
523
-    /* start thread watcher */
524
-    thwarg.socketd = socketd;
525
-    thwarg.copt = copt;
526
-    thwarg.root = &root;
527
-    thwarg.limits = &limits;
528
-    thwarg.options = options;
529
-    pthread_create(&watcherid, &thattr, threadwatcher, &thwarg);
530
-
531 526
     /* set up signal handling */
532
-
533 527
     sigfillset(&sigset);
534 528
     sigdelset(&sigset, SIGINT);
535 529
     sigdelset(&sigset, SIGTERM);
536 530
     sigdelset(&sigset, SIGSEGV);
537 531
     sigdelset(&sigset, SIGHUP);
538 532
     sigprocmask(SIG_SETMASK, &sigset, NULL);
539
-
533
+ 
540 534
     /* SIGINT, SIGTERM, SIGSEGV */
541 535
     sigact.sa_handler = sighandler_th;
542 536
     sigemptyset(&sigact.sa_mask);
... ...
@@ -545,15 +348,11 @@ int acceptloop_th(int socketd, struct cl_node *root, const struct cfgstruct *cop
545 545
     sigaddset(&sigact.sa_mask, SIGHUP);
546 546
     sigaction(SIGINT, &sigact, NULL);
547 547
     sigaction(SIGTERM, &sigact, NULL);
548
-#ifndef CL_DEBUG
549
-    sigaction(SIGSEGV, &sigact, NULL);
550
-#endif
551
-    sigaction(SIGHUP, &sigact, NULL);
552 548
 
553
-    /* we need to save program's PID, because under Linux each thread
554
-     * has another PID, it works with other OSes as well
555
-     */
556
-    progpid = getpid();
549
+    if(!debug_mode)
550
+	sigaction(SIGSEGV, &sigact, NULL);
551
+
552
+    sigaction(SIGHUP, &sigact, NULL);
557 553
 
558 554
 #if defined(C_BIGSTACK) || defined(C_BSD)
559 555
     /*
... ...
@@ -567,116 +366,82 @@ int acceptloop_th(int socketd, struct cl_node *root, const struct cfgstruct *cop
567 567
     pthread_attr_setstacksize(&thattr, stacksize + SCANBUFF + 64 * 1024);
568 568
 #endif
569 569
 
570
-    while(progexit != 2) {
571
-
572
-	/* find a free session */
573
-	for(i = 0; ; i++) {
574
-	    if(i == threads) {
575
-		i = 0;
576
-		usleep(50000);
577
-	    }
570
+    pthread_mutex_init(&exit_mutex, NULL);
571
+    pthread_mutex_init(&reload_mutex, NULL);
578 572
 
579
-	    if(!ths[i].active) {
580
-		/* logg("*Found free slot: %d\n", i); */
581
-		break;
582
-	    }
583
-	}
573
+    if(thrmgr_init(&thrmgr, max_threads, 1, scanner_thread) != 0) {
574
+	logg("thrmgr_init failed");
575
+	exit(-1);
576
+    }
584 577
 
578
+    time(&start_time);
585 579
 
586
-	main_accept = 1;
587
-	if((acceptd = accept(socketd, NULL, NULL)) == -1) {
588
-	    logg("!accept() failed.\n");
589
-	    /* exit ? */
580
+    for(;;) {				
581
+	new_sd = accept(socketd, NULL, NULL);
582
+	if((new_sd == -1) && (errno != EINTR)) {
583
+	    logg("!accept() failed: %s", strerror_r(errno, buff, BUFFSIZE));
584
+	    /* very bad - need to exit or restart */
590 585
 	    continue;
591 586
 	}
592
-	main_accept = 0;
593
-
594
-	if(reload) { /* do not start new threads */
595
-	    main_reload = 1;
596
-	    logg("*Main thread: database reloading (waiting).\n");
597
-	    maxwait = CL_DEFAULT_MAXWHILEWAIT;
598
-	    while(reload && maxwait--)
599
-		sleep(1);
600
-
601
-	    if(!maxwait && reload) {
602
-		logg("!Database reloading failed (time exceeded). Exit forced.\n");
603
-		progexit = 1;
604
-		sleep(10);
605
-		exit(1);
606
-	    }
607 587
 
608
-	    logg("*Main thread: database reloaded.\n");
609
-	    main_reload = 0;
588
+	client_conn = (client_conn_t *) mmalloc(sizeof(struct client_conn_tag));
589
+	client_conn->sd = new_sd;
590
+	client_conn->options = options;
591
+	client_conn->copt = copt;
592
+	client_conn->root = root;
593
+	client_conn->limits = &limits;
594
+	thrmgr_add(&thrmgr, client_conn);
595
+
596
+	pthread_mutex_lock(&exit_mutex);
597
+	if(progexit) {
598
+	    pthread_mutex_unlock(&exit_mutex);
599
+	    break;
610 600
 	}
601
+	pthread_mutex_unlock(&exit_mutex);
611 602
 
612
-	tharg = (struct thrarg *) mcalloc(1, sizeof(struct thrarg));
613
-	tharg->copt = copt;
614
-	tharg->sid = i;
615
-	tharg->root = root;
616
-	tharg->limits = &limits;
617
-	tharg->options = options;
618
-
619
-	ths[i].desc = acceptd;
620
-	ths[i].start = time(NULL);
621
-	ths[i].active = 1; /* the structure must be activated exactly here
622
-			    * because we will surely create a race condition 
623
-			    * in other places (if activated in the new thread
624
-			    * there * will be a race in the main thread (it
625
-			    * may assign the same thread session once more);
626
-			    * if activated after pthread_create() the new
627
-			    * thread may be already finished).
628
-			    */
629
-
630
-	if(pthread_create(&ths[i].id, &thattr, threadscanner, tharg)) {
631
-	    logg("!Session(%d) did not start. Dropping connection.", i);
632
-	    close(acceptd);
633
-	    ths[i].active = 0;
603
+	if(selfchk) {
604
+	    time(&current_time);
605
+	    if((current_time - start_time) > selfchk) {
606
+		if(reload_db(root, copt, TRUE)) {
607
+		    pthread_mutex_lock(&reload_mutex);
608
+		    reload = 1;
609
+		    pthread_mutex_unlock(&reload_mutex);
610
+		}
611
+		time(&start_time);
612
+	    }
634 613
 	}
635
-    }
636
-    free(ths);
637
-    return 0;
638
-}
639 614
 
640
-void sighandler_th(int sig)
641
-{
642
-	time_t currtime;
643
-	int maxwait = CL_DEFAULT_MAXWHILEWAIT * 5;
644
-#ifndef CL_DEBUG
645
-	int i;
615
+	pthread_mutex_lock(&reload_mutex);
616
+	if(reload) {
617
+	    reload = 0;
618
+	    pthread_mutex_unlock(&reload_mutex);
619
+	    /* Destroy the thread manager.
620
+	     * This waits for all current tasks to end
621
+	     */
622
+	    thrmgr_destroy(&thrmgr);
623
+	    root = reload_db(root, copt, FALSE);
624
+	    if(thrmgr_init(&thrmgr, max_threads, 1, scanner_thread) != 0) {
625
+		logg("!thrmgr_init failed");
626
+		pthread_mutex_unlock(&reload_mutex);
627
+		exit(-1);
628
+	    }
629
+#ifdef CLAMUKO
630
+	    logg("Stopping and restarting Clamuko.\n");
631
+	    pthread_kill(clamuko_pid, SIGUSR1);
632
+	    pthread_join(clamuko_pid, NULL);
633
+	    tharg->root = root;
634
+	    pthread_create(&clamuko_pid, &clamuko_attr, clamukoth, tharg);
646 635
 #endif
636
+	} else {
637
+	    pthread_mutex_unlock(&reload_mutex);
638
+	}
639
+    }
647 640
 
648
-    switch(sig) {
649
-	case SIGINT:
650
-	case SIGTERM:
651
-	    progexit = 1;
652
-	    logg("*Signal %d caught -> exiting.\n", sig);
653
-
654
-	    while(progexit != 2 && maxwait--)
655
-		usleep(200000);
656
-
657
-	    if(!maxwait && progexit != 2)
658
-		logg("!Critical error: Cannot reach exit level 2.\n");
659
-
660
-	    time(&currtime);
661
-	    logg("--- Stopped at %s", ctime(&currtime));
662
-	    exit(0);
663
-	    break; /* not reached */
664
-
665
-#ifndef CL_DEBUG
666
-	case SIGSEGV:
667
-	    logg("Segmentation fault :-( Bye..\n");
668
-
669
-	    for(i = 0; i < threads; i++)
670
-		if(ths[i].active)
671
-		    pthread_kill(ths[i].id, 9);
672
-
673
-	    pthread_kill(watcherid, 9);
674
-	    exit(11); /* probably not reached at all */
675
-	    break; /* not reached */
641
+#ifdef CLAMUKO
642
+    logg("Stopping Clamuko.\n");
643
+    pthread_kill(clamuko_pid, SIGUSR1);
644
+    pthread_join(clamuko_pid, NULL);
676 645
 #endif
677
-	case SIGHUP:
678
-	    sighup = 1;
679
-	    logg("SIGHUP catched: log file re-opened.\n");
680
-	    break;
681
-    }
646
+    logg("Exiting (clean)\n");
647
+    return 0;
682 648
 }
... ...
@@ -48,10 +48,6 @@ struct thrwarg {
48 48
     int options;
49 49
 };
50 50
 
51
-short int progexit; /* exit steering variable */
52
-int progpid; /* clamd pid */
53
-short int reload, main_accept, main_reload;
54
-
55 51
 int acceptloop_proc(int socketd, struct cl_node *root, const struct cfgstruct *copt);
56 52
 int acceptloop_th(int socketd, struct cl_node *root, const struct cfgstruct *copt);
57 53
 void sighandler(int sig);
... ...
@@ -29,6 +29,8 @@
29 29
 #include <pthread.h>
30 30
 #include <time.h>
31 31
 #include <signal.h>
32
+#include <poll.h>
33
+#include <errno.h>
32 34
 
33 35
 #include "cfgfile.h"
34 36
 #include "others.h"
... ...
@@ -55,8 +57,26 @@
55 55
 int command(int desc, const struct cl_node *root, const struct cl_limits *limits, int options, const struct cfgstruct *copt)
56 56
 {
57 57
 	char buff[1025];
58
-	int bread, opt, ret;
59
-
58
+	int bread, opt, ret, count;
59
+	struct pollfd poll_data[1];
60
+
61
+    poll_data[0].fd = desc;
62
+    poll_data[0].events = POLLIN;
63
+    poll_data[0].revents = 0;
64
+                                                                                                                                                                                
65
+    while (1) {
66
+        count = poll(poll_data, 1, CL_DEFAULT_SCANTIMEOUT*1000); /* wait for timeout */
67
+        if (count != 1) {
68
+                if ((count == -1) && (errno == EINTR)) {
69
+                        continue;
70
+                }
71
+                mdprintf(desc, "ERROR\n");
72
+                logg("!ScanStream: command timeout.\n");
73
+                return -1;
74
+        } else {
75
+                break;
76
+        }
77
+    }
60 78
 
61 79
     if((bread = read(desc, buff, 1024)) == -1) {
62 80
 	logg("!Command parser: read() failed.\n");
63 81
new file mode 100644
... ...
@@ -0,0 +1,372 @@
0
+/*
1
+ *  Copyright (C) 2004 Trog <trog@clamav.net>
2
+ *
3
+ *  The code is based on the book "Programming with POSIX threads" by Dave
4
+ *  Butenhof
5
+ *
6
+ *  This program is free software; you can redistribute it and/or modify
7
+ *  it under the terms of the GNU General Public License as published by
8
+ *  the Free Software Foundation; either version 2 of the License, or
9
+ *  (at your option) any later version.
10
+ *
11
+ *  This program is distributed in the hope that it will be useful,
12
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
+ *  GNU General Public License for more details.
15
+ *
16
+ *  You should have received a copy of the GNU General Public License
17
+ *  along with this program; if not, write to the Free Software
18
+ *  Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19
+ */
20
+/*
21
+ * thrmgr.c
22
+ *
23
+ * This file implements the interfaces for a "work queue"
24
+ * manager. A "manager object" is created with several
25
+ * parameters, including the required size of a work queue
26
+ * entry, the maximum desired degree of parallelism (number of
27
+ * threads to service the queue), and the address of an
28
+ * execution engine routine.
29
+ *
30
+ * The application requests a work queue entry from the manager,
31
+ * fills in the application-specific fields, and returns it to
32
+ * the queue manager for processing. The manager will create a
33
+ * new thread to service the queue if all current threads are
34
+ * busy and the maximum level of parallelism has not yet been
35
+ * reached.
36
+ *
37
+ * The manager will dequeue items and present them to the
38
+ * processing engine until the queue is empty; at that point,
39
+ * processing threads will begin to shut down. (They will be
40
+ * restarted when work appears.)
41
+ */
42
+#include <pthread.h>
43
+#include <stdlib.h>
44
+#include <time.h>
45
+#include <string.h>
46
+#include <errno.h>
47
+
48
+#include "thrmgr.h"
49
+
50
+/*
51
+ * Thread start routine to serve the work queue.
52
+ */
53
+static void *thrmgr_server (void *arg)
54
+{
55
+  thrmgr_t *thrmgr = (thrmgr_t *)arg;
56
+  work_element_t *we;
57
+  int status;
58
+
59
+  /*
60
+   * We don't need to validate the thrmgr_t here... we don't
61
+   * create server threads until requests are queued (the
62
+   * queue has been initialized by then!) and we wait for all
63
+   * server threads to terminate before destroying a work
64
+   * queue.
65
+   */
66
+/*    log_message ("A worker is starting"); */
67
+  status = pthread_mutex_lock (&thrmgr->mutex);
68
+  if (status != 0) {
69
+    //log_message ("A worker is dying");
70
+    return(NULL);
71
+  }
72
+
73
+  while (1) {
74
+    thrmgr->idle++;
75
+
76
+ /*     log_message ("Worker waiting for work - idle:%d", thrmgr->idle); */
77
+    
78
+    while ( (thrmgr->first == NULL) && !thrmgr->quit) {
79
+#ifndef BROKEN_COND_SIGNAL
80
+      status = pthread_cond_wait (&thrmgr->cond, &thrmgr->mutex);
81
+#else
82
+      status = pthread_mutex_unlock (&thrmgr->mutex);
83
+      status = sem_wait(&thrmgr->semaphore);
84
+      pthread_mutex_lock (&thrmgr->mutex);
85
+#endif
86
+      if (status != 0) {
87
+	/*
88
+	 * This shouldn't happen, so the work queue
89
+	 * package should fail. Because the work queue
90
+	 * API is asynchronous, that would add
91
+	 * complication. Because the chances of failure
92
+	 * are slim, I choose to avoid that
93
+	 * complication. The server thread will return,
94
+	 * and allow another server thread to pick up
95
+	 * the work later. Note that, if this was the
96
+	 * only server thread, the queue won't be
97
+	 * serviced until a new work item is
98
+	 * queued. That could be fixed by creating a new
99
+	 * server here.
100
+	 */
101
+	//log_message ("Worker wait failed, %d (%s)",
102
+		     //status, strerror (status));
103
+	thrmgr->counter--;
104
+	thrmgr->idle--;
105
+	pthread_mutex_unlock (&thrmgr->mutex);
106
+	return(NULL);
107
+      }
108
+    }
109
+    we = thrmgr->first;
110
+    
111
+    if (we != NULL) {
112
+      thrmgr->first = we->next;
113
+      if (thrmgr->last == we) {
114
+	thrmgr->last = NULL;
115
+      }
116
+      thrmgr->idle--;
117
+      status = pthread_mutex_unlock (&thrmgr->mutex);
118
+      if (status != 0) {
119
+	//log_message ("A worker is dying");
120
+	return(NULL);
121
+      }
122
+/*        log_message ("Worker calling handler"); */
123
+      thrmgr->handler (we->data);
124
+      free (we);
125
+      status = pthread_mutex_lock (&thrmgr->mutex);
126
+      if (status != 0) {
127
+	//log_message ("A worker is dying");
128
+	return(NULL);
129
+      }
130
+    }
131
+    
132
+    /*
133
+     * If there are no more work requests, and the servers
134
+     * have been asked to quit, then shut down.
135
+     */
136
+    if ( (thrmgr->first == NULL) &&thrmgr->quit) {
137
+      //log_message ("Worker shutting down");
138
+      thrmgr->counter--;
139
+      
140
+      /*
141
+       * NOTE: Just to prove that every rule has an
142
+       * exception, I'm using the "cond" condition for two
143
+       * separate predicates here.  That's OK, since the
144
+       * case used here applies only once during the life
145
+       * of a work queue -- during rundown. The overhead
146
+       * is minimal and it's not worth creating a separate
147
+       * condition variable that would be waited and
148
+       * signaled exactly once!
149
+       */
150
+#ifndef BROKEN_COND_SIGNAL
151
+      if (thrmgr->counter == 0) {
152
+	pthread_cond_broadcast (&thrmgr->cond);
153
+      }
154
+#endif
155
+      pthread_mutex_unlock (&thrmgr->mutex);
156
+      //log_message ("A worker is dying");
157
+      return(NULL);
158
+    }
159
+    
160
+  }
161
+
162
+  pthread_mutex_unlock (&thrmgr->mutex);
163
+  //log_message ("Worker exiting");
164
+  return(NULL);
165
+}
166
+
167
+/*
168
+ * Initialize a thread manager.
169
+ */
170
+int thrmgr_init( thrmgr_t *thrmgr,                 /* thread manager */
171
+		 int       max_threads,            /* maximum threads */
172
+		 int       alloc_unit,             /* thread creation unit */
173
+		 void      (*handler)(void *arg))  /* request handler */
174
+{
175
+  int status;
176
+
177
+  status = pthread_attr_init (&thrmgr->attr);
178
+  if (status != 0)
179
+    return(status);
180
+  status = pthread_attr_setdetachstate (&thrmgr->attr,
181
+					PTHREAD_CREATE_DETACHED);
182
+  if (status != 0) {
183
+    pthread_attr_destroy (&thrmgr->attr);
184
+    return(status);
185
+  }
186
+  status = pthread_mutex_init (&thrmgr->mutex, NULL);
187
+  if (status != 0) {
188
+    pthread_attr_destroy (&thrmgr->attr);
189
+    return(status);
190
+  }
191
+#ifndef BROKEN_COND_SIGNAL
192
+  status = pthread_cond_init (&thrmgr->cond, NULL);
193
+#else
194
+  status = sem_init(&thrmgr->semaphore, 0, 0);
195
+#endif
196
+  if (status != 0) {
197
+    pthread_mutex_destroy (&thrmgr->mutex);
198
+    pthread_attr_destroy (&thrmgr->attr);
199
+    return(status);
200
+  }
201
+  thrmgr->quit = 0;                       /* not time to quit */
202
+  thrmgr->first = thrmgr->last = NULL;    /* no queue entries */
203
+  thrmgr->parallelism = max_threads;      /* max servers */
204
+  thrmgr->alloc_unit = alloc_unit;        /* thread creation unit */
205
+  thrmgr->counter = 0;                    /* no server threads yet */
206
+  thrmgr->idle = 0;                       /* no idle servers */
207
+  thrmgr->handler = handler;
208
+  thrmgr->valid = THRMGR_VALID;
209
+  return(0);
210
+}
211
+
212
+/*
213
+ * Destroy a thread manager
214
+ */
215
+int thrmgr_destroy (thrmgr_t *thrmgr)
216
+{
217
+  int status, status1, status2;
218
+  
219
+  if (thrmgr->valid != THRMGR_VALID) {
220
+    return EINVAL;
221
+  }
222
+  status = pthread_mutex_lock (&thrmgr->mutex);
223
+  if (status != 0) {
224
+    return(status);
225
+  }
226
+  thrmgr->valid = 0;             /* prevent any other operations */
227
+
228
+  /*
229
+   * Check whether any threads are active, and run them down:
230
+   *
231
+   * 1.       set the quit flag
232
+   * 2.       broadcast to wake any servers that may be asleep
233
+   * 3.       wait for all threads to quit (counter goes to 0)
234
+   *
235
+   */
236
+  if (thrmgr->counter > 0) {
237
+    thrmgr->quit = 1;
238
+    /* if any threads are idling, wake them. */
239
+    if (thrmgr->idle > 0) {
240
+#ifndef BROKEN_COND_SIGNAL
241
+      status = pthread_cond_broadcast (&thrmgr->cond);
242
+      if (status != 0) {
243
+	pthread_mutex_unlock (&thrmgr->mutex);
244
+	return(status);
245
+      }
246
+#endif
247
+    }
248
+
249
+    /*
250
+     * Just to prove that every rule has an exception, I'm
251
+     * using the "cv" condition for two separate predicates
252
+     * here. That's OK, since the case used here applies
253
+     * only once during the life of a work queue -- during
254
+     * rundown. The overhead is minimal and it's not worth
255
+     * creating a separate condition variable that would be
256
+     * waited and signalled exactly once!
257
+     */
258
+    while (thrmgr->counter > 0) {
259
+#ifndef BROKEN_COND_SIGNAL
260
+      status = pthread_cond_wait (&thrmgr->cond, &thrmgr->mutex);
261
+      if (status != 0) {
262
+	pthread_mutex_unlock (&thrmgr->mutex);
263
+	return(status);
264
+      }
265
+#endif
266
+    }       
267
+  }
268
+  status = pthread_mutex_unlock (&thrmgr->mutex);
269
+  if (status != 0) {
270
+    return(status);
271
+  }
272
+  status = pthread_mutex_destroy (&thrmgr->mutex);
273
+#ifndef BROKEN_COND_SIGNAL
274
+  status1 = pthread_cond_destroy (&thrmgr->cond);
275
+#else
276
+  status1 = sem_destroy(&thrmgr->semaphore);
277
+#endif
278
+  status2 = pthread_attr_destroy (&thrmgr->attr);
279
+  return (status ? status : (status1 ? status1 : status2));
280
+}
281
+
282
+/*
283
+ * Add an item to a work queue.
284
+ */
285
+int thrmgr_add( thrmgr_t *thrmgr,
286
+		void     *element )
287
+{
288
+  work_element_t *item;
289
+  pthread_t id;
290
+  int status;
291
+  int count;
292
+
293
+  if (thrmgr->valid != THRMGR_VALID) {
294
+    return(EINVAL);
295
+  }
296
+
297
+  /*
298
+   * Create and initialize a request structure.
299
+   */
300
+  item = mmalloc( sizeof (work_element_t) );
301
+  item->data = element;
302
+  item->next = NULL;
303
+  status = pthread_mutex_lock (&thrmgr->mutex);
304
+  if (status != 0) {
305
+    free (item);
306
+    return(status);
307
+  }
308
+
309
+  /*
310
+   * Add the request to the end of the queue, updating the
311
+   * first and last pointers.
312
+   */
313
+  if (thrmgr->first == NULL) {
314
+    thrmgr->first = item;
315
+  } else {
316
+    thrmgr->last->next = item;
317
+  }
318
+  thrmgr->last = item;
319
+  
320
+  /*
321
+   * if any threads are idling, wake one.
322
+   */
323
+/*    printf("Idle threads: %d\n", thrmgr->idle); */
324
+  if (thrmgr->idle > 0) {
325
+#ifndef BROKEN_COND_SIGNAL
326
+    status = pthread_cond_signal (&thrmgr->cond);
327
+#else
328
+    status = sem_post(&thrmgr->semaphore);
329
+#endif
330
+    if (status != 0) {
331
+      pthread_mutex_unlock (&thrmgr->mutex);
332
+      return(status);
333
+    }
334
+  } else if (thrmgr->counter < thrmgr->parallelism) {
335
+    /*
336
+     * If there were no idling threads, and we're allowed to
337
+     * create a new thread, do so.
338
+     */
339
+    for ( count=0 ; count < thrmgr->alloc_unit ; count++ ) {
340
+/*        log_message ("Creating new worker"); */
341
+      status = pthread_create (&id, &thrmgr->attr, thrmgr_server, (void*)thrmgr);
342
+      if (status != 0) {
343
+	pthread_mutex_unlock (&thrmgr->mutex);
344
+	return(status);
345
+      }
346
+      thrmgr->counter++;
347
+    }
348
+  }
349
+  pthread_mutex_unlock (&thrmgr->mutex);
350
+  return(0);
351
+}
352
+
353
+int thrmgr_stat( thrmgr_t *thrmgr,
354
+		 int      *threads,
355
+		 int      *idle )
356
+{
357
+  int status;
358
+
359
+  status = pthread_mutex_lock (&thrmgr->mutex);
360
+  if (status != 0) {
361
+    return(-1);
362
+  }
363
+
364
+  *threads = thrmgr->counter;
365
+  *idle = thrmgr->idle;
366
+
367
+  pthread_mutex_unlock (&thrmgr->mutex);
368
+  return(0);
369
+}
370
+  
371
+
0 372
new file mode 100644
... ...
@@ -0,0 +1,109 @@
0
+/*
1
+ *  Copyright (C) 2004 Trog <trog@clamav.net>
2
+ *
3
+ *  The code is based on the book "Programming with POSIX threads" by Dave
4
+ *  Butenhof
5
+ *
6
+ *  This program is free software; you can redistribute it and/or modify
7
+ *  it under the terms of the GNU General Public License as published by
8
+ *  the Free Software Foundation; either version 2 of the License, or
9
+ *  (at your option) any later version.
10
+ *
11
+ *  This program is distributed in the hope that it will be useful,
12
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
+ *  GNU General Public License for more details.
15
+ *
16
+ *  You should have received a copy of the GNU General Public License
17
+ *  along with this program; if not, write to the Free Software
18
+ *  Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19
+ */
20
+/*
21
+ * workq.h
22
+ *
23
+ * This header file defines the interfaces for a "work queue"
24
+ * manager. A "manager object" is created with several
25
+ * parameters, including the required size of a work queue
26
+ * entry, the maximum desired degree of parallelism (number of
27
+ * threads to service the queue), and the address of an
28
+ * execution engine routine.
29
+ *
30
+ * The application requests a work queue entry from the manager,
31
+ * fills in the application-specific fields, and returns it to
32
+ * the queue manager for processing. The manager will create a
33
+ * new thread to service the queue if all current threads are
34
+ * busy and the maximum level of parallelism has not yet been
35
+ * reached.
36
+ *
37
+ * The manager will dequeue items and present them to the
38
+ * processing engine until the queue is empty; at that point,
39
+ * processing threads will begin to shut down. (They will be
40
+ * restarted when work appears.)
41
+ */
42
+
43
+#ifndef __THRMGR_H__
44
+#define __THRMGR_H__
45
+
46
+#ifdef DEBUG
47
+# define DPRINTF(arg) printf arg
48
+#else
49
+# define DPRINTF(arg)
50
+#endif
51
+
52
+#include <pthread.h>
53
+// #include "config.h"
54
+
55
+#ifdef BROKEN_COND_SIGNAL
56
+#include <semaphore.h>
57
+#endif
58
+
59
+/*
60
+ * Structure to keep track of work requests.
61
+ */
62
+typedef struct work_element_tag {
63
+  struct work_element_tag     *next;
64
+  void                        *data;
65
+} work_element_t;
66
+
67
+/*
68
+ * Structure describing a work queue.
69
+ */
70
+typedef struct thrmgr_tag {
71
+  pthread_mutex_t     mutex;
72
+#ifndef BROKEN_COND_SIGNAL
73
+  pthread_cond_t      cond;                    /* wait for work */
74
+#else
75
+  sem_t               semaphore;
76
+#endif
77
+  pthread_attr_t      attr;                    /* create detached threads */
78
+  work_element_t      *first, *last;           /* work queue */
79
+  int                 valid;                   /* set when valid */
80
+  int                 quit;                    /* set when workq should quit */
81
+  int                 parallelism;             /* number of threads required */
82
+  int                 alloc_unit;              /* unit of thread creation */
83
+  int                 counter;                 /* current number of threads */
84
+  int                 idle;                    /* number of idle threads */
85
+  void                (*handler)(void *arg);   /* request handler */
86
+} thrmgr_t;
87
+
88
+#define THRMGR_VALID     0xdeadfeed
89
+
90
+/*
91
+ * Define work queue functions
92
+ */
93
+extern int thrmgr_init( thrmgr_t *thrmgr,                 /* thread manager */
94
+		        int       max_threads,            /* maximum threads */
95
+		        int       alloc_unit,             /* thread creation unit */
96
+		        void      (*handler)(void *) );   /* request handler */
97
+
98
+extern int thrmgr_destroy( thrmgr_t *thrmgr );
99
+
100
+extern int thrmgr_add( thrmgr_t *thrmgr,
101
+		       void     *data );
102
+
103
+int thrmgr_stat( thrmgr_t     *thrmgr,
104
+		 int *threads,
105
+		 int *idle );
106
+
107
+#endif /* __THRMGR_H__ */
108
+
0 109
deleted file mode 100644
1 110
Binary files a/contrib/trashscan/trashscan-0.08.tar.gz and /dev/null differ
2 111
new file mode 100644
3 112
Binary files /dev/null and b/contrib/trashscan/trashscan-0.10.tar.gz differ