Browse code

ReadTimeout handling, and introduce CommandReadTimeout.

git-svn: trunk@4803

Török Edvin authored on 2009/02/18 00:59:21
Showing 8 changed files
... ...
@@ -1,3 +1,10 @@
1
+Tue Feb 17 17:08:18 EET 2009 (edwin)
2
+------------------------------------
3
+ * clamd/others.c, clamd/others.h, clamd/scanner.c,
4
+ clamd/server-th.c, shared/optparser.c, unit_tests/check_clamd.c,
5
+ unit_tests/test-clamd.conf: ReadTimeout handling, and introduce
6
+ CommandReadTimeout.
7
+
1 8
 Tue Feb 17 12:07:37 CET 2009 (tk)
2 9
 ---------------------------------
3 10
  * sigtool/sigtool.c: better error messages (bb#1396)
... ...
@@ -216,19 +216,22 @@ int poll_fd(int fd, int timeout_sec, int check_signals)
216 216
     int ret;
217 217
     struct fd_data fds = FDS_INIT;
218 218
 
219
-    if (fds_add(&fds, fd, 1) == -1)
219
+    if (fds_add(&fds, fd, 1, timeout_sec) == -1)
220 220
 	return -1;
221 221
     pthread_mutex_lock(&fds.buf_mutex);
222
-    ret = fds_poll_recv(&fds, timeout_sec, check_signals);
222
+    do {
223
+	ret = fds_poll_recv(&fds, timeout_sec, check_signals);
224
+    } while (ret == -1 && errno == EINTR);
223 225
     pthread_mutex_unlock(&fds.buf_mutex);
224 226
     fds_free(&fds);
225 227
     return ret;
226 228
 }
227 229
 
228
-static void cleanup_fds(struct fd_data *data)
230
+void fds_cleanup(struct fd_data *data)
229 231
 {
230 232
     struct fd_buf *newbuf;
231 233
     unsigned i,j;
234
+
232 235
     for (i=0,j=0;i < data->nfds; i++) {
233 236
 	if (data->buf[i].fd < 0) {
234 237
 	    if (data->buf[i].buffer)
... ...
@@ -239,9 +242,12 @@ static void cleanup_fds(struct fd_data *data)
239 239
 	    data->buf[j] = data->buf[i];
240 240
 	j++;
241 241
     }
242
+    if (j == data->nfds)
243
+	return;
242 244
     for (i = j ; i < data->nfds; i++)
243 245
 	data->buf[i].fd = -1;
244 246
     data->nfds = j;
247
+    logg("*CLEANUP_FDS: %u fds\n", data->nfds);
245 248
     /* Shrink buffer */
246 249
     newbuf = realloc(data->buf, j*sizeof(*newbuf));
247 250
     if (newbuf)
... ...
@@ -314,7 +320,7 @@ static int read_fd_data(struct fd_buf *buf)
314 314
    return n;
315 315
 }
316 316
 
317
-static int buf_init(struct fd_buf *buf, int listen_only)
317
+static int buf_init(struct fd_buf *buf, int listen_only, int timeout)
318 318
 {
319 319
     buf->off = 0;
320 320
     buf->got_newdata = 0;
... ...
@@ -343,10 +349,16 @@ static int buf_init(struct fd_buf *buf, int listen_only)
343 343
 	buf->bufsize = 0;
344 344
 	buf->buffer = NULL;
345 345
     }
346
+    if (timeout) {
347
+	time(&buf->timeout_at);
348
+	buf->timeout_at += timeout;
349
+    } else {
350
+	buf->timeout_at = 0;
351
+    }
346 352
     return 0;
347 353
 }
348 354
 
349
-int fds_add(struct fd_data *data, int fd, int listen_only)
355
+int fds_add(struct fd_data *data, int fd, int listen_only, int timeout)
350 356
 {
351 357
     struct fd_buf *buf;
352 358
     unsigned n;
... ...
@@ -359,7 +371,7 @@ int fds_add(struct fd_data *data, int fd, int listen_only)
359 359
     for (n = 0; n < data->nfds; n++)
360 360
 	if (data->buf[n].fd == fd) {
361 361
 	    /* clear stale data in buffer */
362
-	    if (buf_init(&data->buf[n], listen_only) < 0)
362
+	    if (buf_init(&data->buf[n], listen_only, timeout) < 0)
363 363
 		return -1;
364 364
 	    return 0;
365 365
 	}
... ...
@@ -373,7 +385,7 @@ int fds_add(struct fd_data *data, int fd, int listen_only)
373 373
     data->buf = buf;
374 374
     data->nfds = n;
375 375
     data->buf[n-1].buffer = NULL;
376
-    if (buf_init(&data->buf[n-1], listen_only) < 0)
376
+    if (buf_init(&data->buf[n-1], listen_only, timeout) < 0)
377 377
 	return -1;
378 378
     data->buf[n-1].fd = fd;
379 379
     return 0;
... ...
@@ -412,12 +424,40 @@ int fds_poll_recv(struct fd_data *data, int timeout, int check_signals)
412 412
     unsigned fdsok = data->nfds;
413 413
     size_t i;
414 414
     int retval;
415
+    time_t now, closest_timeout;
415 416
 
416 417
     /* we must have at least one fd, the control fd! */
418
+    fds_cleanup(data);
417 419
     if (!data->nfds)
418 420
 	return 0;
419
-    for (i=0;i < data->nfds;i++)
421
+
422
+    for (i=0;i < data->nfds;i++) {
420 423
 	data->buf[i].got_newdata = 0;
424
+    }
425
+
426
+    if (timeout > 0)
427
+	closest_timeout = now + timeout;
428
+    else
429
+	closest_timeout = 0;
430
+    time(&now);
431
+    for (i=0;i < data->nfds; i++) {
432
+	time_t timeout_at = data->buf[i].timeout_at;
433
+	if (timeout_at && timeout_at < now) {
434
+	    /* timed out */
435
+	    data->buf[i].got_newdata = -2;
436
+	    /* we must return immediately from poll/select, we have a timeout! */
437
+	    closest_timeout = now;
438
+	} else {
439
+	    if (!closest_timeout)
440
+		closest_timeout = timeout_at;
441
+	    else if (timeout_at < closest_timeout)
442
+		closest_timeout = timeout_at;
443
+	}
444
+    }
445
+    if (closest_timeout)
446
+	timeout = closest_timeout - now;
447
+    else
448
+	timeout = -1;
421 449
 #ifdef HAVE_POLL
422 450
     /* Use poll() if available, preferred because:
423 451
      *  - can poll any number of FDs
... ...
@@ -577,9 +617,6 @@ int fds_poll_recv(struct fd_data *data, int timeout, int check_signals)
577 577
 #endif
578 578
     }
579 579
 
580
-    /* Remove closed / error fds */
581
-    if (fdsok != data->poll_data_nfds)
582
-	cleanup_fds(data);
583 580
     return retval;
584 581
 }
585 582
 
... ...
@@ -53,6 +53,7 @@ struct fd_buf {
53 53
     uint32_t chunksize;
54 54
     long quota;
55 55
     char *dumpname;
56
+    time_t timeout_at; /* 0 - no timeout */
56 57
     jobgroup_t *group;
57 58
 };
58 59
 
... ...
@@ -75,8 +76,9 @@ struct fd_data {
75 75
 int poll_fd(int fd, int timeout_sec, int check_signals);
76 76
 void virusaction(const char *filename, const char *virname, const struct optstruct *opts);
77 77
 int writen(int fd, void *buff, unsigned int count);
78
-int fds_add(struct fd_data *data, int fd, int listen_only);
78
+int fds_add(struct fd_data *data, int fd, int listen_only, int timeout);
79 79
 void fds_remove(struct fd_data *data, int fd);
80
+void fds_cleanup(struct fd_data *data);
80 81
 int fds_poll_recv(struct fd_data *data, int timeout, int check_signals);
81 82
 void fds_free(struct fd_data *data);
82 83
 
... ...
@@ -286,7 +286,7 @@ int scanfd(const int fd, const client_conn_t *conn, unsigned long int *scanned,
286 286
 int scanstream(int odesc, unsigned long int *scanned, const struct cl_engine *engine, unsigned int options, const struct optstruct *opts, char term)
287 287
 {
288 288
 	int ret, sockfd, acceptd;
289
-	int tmpd, bread, retval, timeout, btread;
289
+	int tmpd, bread, retval, firsttimeout, timeout, btread;
290 290
 	unsigned int port = 0, portscan, min_port, max_port;
291 291
 	unsigned long int quota = 0, maxsize = 0;
292 292
 	short bound = 0;
... ...
@@ -327,7 +327,11 @@ int scanstream(int odesc, unsigned long int *scanned, const struct cl_engine *en
327 327
 
328 328
     timeout = optget(opts, "ReadTimeout")->numarg;
329 329
     if(timeout == 0)
330
-    	timeout = -1;
330
+	timeout = -1;
331
+
332
+    firsttimeout = optget(opts, "CommandReadTimeout")->numarg;
333
+    if (firsttimeout == 0)
334
+	firsttimeout = -1;
331 335
 
332 336
     if(!bound && !portscan) {
333 337
 	logg("!ScanStream: Can't find any free port.\n");
... ...
@@ -343,7 +347,7 @@ int scanstream(int odesc, unsigned long int *scanned, const struct cl_engine *en
343 343
 	}
344 344
     }
345 345
 
346
-    retval = poll_fd(sockfd, timeout, 0);
346
+    retval = poll_fd(sockfd, firsttimeout, 0);
347 347
     if (!retval || retval == -1) {
348 348
 	const char *reason = !retval ? "timeout" : "poll";
349 349
 	mdprintf(odesc, "Accept %s. ERROR%c", reason, term);
... ...
@@ -86,7 +86,7 @@ static void scanner_thread(void *arg)
86 86
 #ifndef	C_WINDOWS
87 87
 	sigset_t sigset;
88 88
 #endif
89
-	int ret, timeout;
89
+	int ret;
90 90
 	unsigned virus=0, errors = 0;
91 91
 
92 92
 #ifndef	C_WINDOWS
... ...
@@ -105,10 +105,6 @@ static void scanner_thread(void *arg)
105 105
     pthread_sigmask(SIG_SETMASK, &sigset, NULL);
106 106
 #endif
107 107
 
108
-    timeout = optget(conn->opts, "ReadTimeout")->numarg;
109
-    if(!timeout)
110
-	timeout = -1;
111
-
112 108
     ret = command(conn, &virus);
113 109
     if (ret == -1) {
114 110
 	pthread_mutex_lock(&exit_mutex);
... ...
@@ -294,11 +290,14 @@ static const char *get_cmd(struct fd_buf *buf, size_t off, size_t *len, char *te
294 294
 struct acceptdata {
295 295
     struct fd_data fds;
296 296
     struct fd_data recv_fds;
297
+    pthread_cond_t cond_nfds;
298
+    int max_queue;
299
+    int commandtimeout;
297 300
     int syncpipe_wake_recv[2];
298 301
     int syncpipe_wake_accept[2];
299 302
 };
300 303
 
301
-#define ACCEPTDATA_INIT { FDS_INIT, FDS_INIT, {-1, -1}, {-1, -1} }
304
+#define ACCEPTDATA_INIT { FDS_INIT, FDS_INIT, PTHREAD_COND_INITIALIZER, 0, 0, {-1, -1}, {-1, -1}}
302 305
 
303 306
 static void *acceptloop_th(void *arg)
304 307
 {
... ...
@@ -309,6 +308,8 @@ static void *acceptloop_th(void *arg)
309 309
     struct acceptdata *data = (struct acceptdata*)arg;
310 310
     struct fd_data *fds = &data->fds;
311 311
     struct fd_data *recv_fds = &data->recv_fds;
312
+    int max_queue = data->max_queue;
313
+    int commandtimeout = data->commandtimeout;
312 314
 
313 315
     pthread_mutex_lock(&fds->buf_mutex);
314 316
     for (;;) {
... ...
@@ -348,8 +349,20 @@ static void *acceptloop_th(void *arg)
348 348
 		buf->fd = -1;
349 349
 		continue;
350 350
 	    }
351
-	    /* listen only socket */
352
-	    new_sd = accept(fds->buf[i].fd, NULL, NULL);
351
+
352
+	    /* don't accept unlimited number of connections, or
353
+	     * we'll run out of file descriptors */
354
+	    pthread_mutex_lock(&recv_fds->buf_mutex);
355
+	    while (recv_fds->nfds > max_queue) {
356
+		pthread_mutex_lock(&exit_mutex);
357
+		if(progexit) {
358
+		    pthread_mutex_unlock(&exit_mutex);
359
+		    break;
360
+		}
361
+		pthread_mutex_unlock(&exit_mutex);
362
+		pthread_cond_wait(&data->cond_nfds, &recv_fds->buf_mutex);
363
+	    }
364
+	    pthread_mutex_unlock(&recv_fds->buf_mutex);
353 365
 
354 366
 	    pthread_mutex_lock(&exit_mutex);
355 367
 	    if(progexit) {
... ...
@@ -358,6 +371,10 @@ static void *acceptloop_th(void *arg)
358 358
 	    }
359 359
 	    pthread_mutex_unlock(&exit_mutex);
360 360
 
361
+	    /* listen only socket */
362
+	    new_sd = accept(fds->buf[i].fd, NULL, NULL);
363
+
364
+
361 365
 	    if (new_sd >= 0) {
362 366
 		int ret, flags;
363 367
 
... ...
@@ -374,7 +391,7 @@ static void *acceptloop_th(void *arg)
374 374
 #endif
375 375
 
376 376
 		pthread_mutex_lock(&recv_fds->buf_mutex);
377
-		ret = fds_add(recv_fds, new_sd, 0);
377
+		ret = fds_add(recv_fds, new_sd, 0, commandtimeout);
378 378
 		pthread_mutex_unlock(&recv_fds->buf_mutex);
379 379
 
380 380
 		if (ret == -1) {
... ...
@@ -432,7 +449,7 @@ static void *acceptloop_th(void *arg)
432 432
 
433 433
 int recvloop_th(int *socketds, unsigned nsockets, struct cl_engine *engine, unsigned int dboptions, const struct optstruct *opts)
434 434
 {
435
-	int max_threads, max_queue, ret = 0;
435
+	int max_threads, max_queue, readtimeout, ret = 0;
436 436
 	unsigned int options = 0;
437 437
 	char timestr[32];
438 438
 #ifndef	C_WINDOWS
... ...
@@ -692,7 +709,9 @@ int recvloop_th(int *socketds, unsigned nsockets, struct cl_engine *engine, unsi
692 692
 
693 693
     logg("*Listening daemon: PID: %u\n", (unsigned int) mainpid);
694 694
     max_threads = optget(opts, "MaxThreads")->numarg;
695
-    max_queue = optget(opts, "MaxQueue")->numarg;
695
+    acceptdata.max_queue = max_queue = optget(opts, "MaxQueue")->numarg;
696
+    acceptdata.commandtimeout = optget(opts, "CommandReadTimeout")->numarg;
697
+    readtimeout = optget(opts, "ReadTimeout")->numarg;
696 698
 
697 699
     if(optget(opts, "ClamukoScanOnAccess")->enabled)
698 700
 #ifdef CLAMUKO
... ...
@@ -753,7 +772,7 @@ int recvloop_th(int *socketds, unsigned nsockets, struct cl_engine *engine, unsi
753 753
     idletimeout = optget(opts, "IdleTimeout")->numarg;
754 754
 
755 755
     for (i=0;i < nsockets;i++)
756
-	if (fds_add(&acceptdata.fds, socketds[i], 1) == -1) {
756
+	if (fds_add(&acceptdata.fds, socketds[i], 1, 0) == -1) {
757 757
 	    logg("!fds_add failed\n");
758 758
 	    cl_engine_free(engine);
759 759
 	    return 1;
... ...
@@ -766,8 +785,8 @@ int recvloop_th(int *socketds, unsigned nsockets, struct cl_engine *engine, unsi
766 766
 	exit(-1);
767 767
     }
768 768
 
769
-    if (fds_add(fds, acceptdata.syncpipe_wake_recv[0], 1) == -1 ||
770
-	fds_add(&acceptdata.fds, acceptdata.syncpipe_wake_accept[0], 1)) {
769
+    if (fds_add(fds, acceptdata.syncpipe_wake_recv[0], 1, 0) == -1 ||
770
+	fds_add(&acceptdata.fds, acceptdata.syncpipe_wake_accept[0], 1, 0)) {
771 771
 	logg("!failed to add pipe fd\n");
772 772
 	exit(-1);
773 773
     }
... ...
@@ -787,6 +806,10 @@ int recvloop_th(int *socketds, unsigned nsockets, struct cl_engine *engine, unsi
787 787
 	int new_sd;
788 788
 	/* Block waiting for connection on any of the sockets */
789 789
 	pthread_mutex_lock(&fds->buf_mutex);
790
+	fds_cleanup(fds);
791
+	/* signal that we can accept more connections */
792
+	if (fds->nfds <= max_queue)
793
+	    pthread_cond_signal(&acceptdata.cond_nfds);
790 794
 	new_sd = fds_poll_recv(fds, -1, 1);
791 795
 
792 796
 	if (!fds->nfds) {
... ...
@@ -806,6 +829,7 @@ int recvloop_th(int *socketds, unsigned nsockets, struct cl_engine *engine, unsi
806 806
 	    pthread_mutex_unlock(&exit_mutex);
807 807
 	}
808 808
 
809
+
809 810
 	i = (rr_last + 1) % fds->nfds;
810 811
 	for (j = 0;  j < fds->nfds && new_sd >= 0; j++, i = (i+1) % fds->nfds) {
811 812
 	    size_t pos = 0;
... ...
@@ -833,6 +857,12 @@ int recvloop_th(int *socketds, unsigned nsockets, struct cl_engine *engine, unsi
833 833
 		}
834 834
 	    }
835 835
 
836
+	    if (buf->fd != -1 && buf->got_newdata == -2) {
837
+		logg("*RECVTH: client read timed out\n");
838
+		mdprintf(buf->fd, "COMMAND READ TIMED OUT\n");
839
+		error = 1;
840
+	    }
841
+
836 842
 	    rr_last = i;
837 843
 	    if (buf->mode == MODE_WAITANCILL) {
838 844
 		buf->mode = MODE_COMMAND;
... ...
@@ -940,6 +970,9 @@ int recvloop_th(int *socketds, unsigned nsockets, struct cl_engine *engine, unsi
940 940
 			    buf->fd = -1;
941 941
 			}
942 942
 		    }
943
+		    /* we received a command, set readtimeout */
944
+		    time(&buf->timeout_at);
945
+		    buf->timeout_at += readtimeout;
943 946
 		    pos += cmdlen+1;
944 947
 		    if (conn.mode == MODE_STREAM) {
945 948
 			/* TODO: this doesn't belong here */
... ...
@@ -187,6 +187,8 @@ const struct clam_option clam_options[] = {
187 187
 
188 188
     { "ReadTimeout", NULL, 0, TYPE_NUMBER, MATCH_NUMBER, 120, NULL, 0, OPT_CLAMD, "This option specifies the time (in seconds) after which clamd should\ntimeout if a client doesn't provide any data.", "120" },
189 189
 
190
+    { "CommandReadTimeout", NULL, 0, TYPE_NUMBER, MATCH_NUMBER, 5, NULL, 0, OPT_CLAMD, "This option specifies the time (in seconds) after which clamd should\ntimeout if a client doesn't provide any initial command after connecting.", "5" },
191
+
190 192
     { "ReadTimeout", NULL, 0, TYPE_NUMBER, MATCH_NUMBER, 120, NULL, 0, OPT_MILTER, "Waiting for data from clamd will timeout after this time (seconds).\nValue of 0 disables the timeout.", "300" },
191 193
 
192 194
     { "MaxQueue", NULL, 0, TYPE_NUMBER, MATCH_NUMBER, 100, NULL, 0, OPT_CLAMD, "Maximum number of queued items (including those being processed)\nWARNING: you shouldn't increase this beyond 512, since you may run out of File Descriptors (usual max is 1024)\n", "200" },
... ...
@@ -17,6 +17,12 @@
17 17
 #include <sys/un.h>
18 18
 #include <unistd.h>
19 19
 
20
+#include <sys/time.h>
21
+#include <sys/resource.h>
22
+#include <sys/select.h>
23
+#include <sys/time.h>
24
+#include <sys/types.h>
25
+#include <unistd.h>
20 26
 #include <check.h>
21 27
 #include "checks_common.h"
22 28
 #include "libclamav/version.h"
... ...
@@ -510,6 +516,70 @@ START_TEST (test_fildes_unwanted)
510 510
 }
511 511
 END_TEST
512 512
 
513
+#define TIMEOUT_REPLY "TIMED OUT WAITING FOR COMMAND\n"
514
+
515
+START_TEST (test_connections)
516
+{
517
+    int rc;
518
+    size_t i;
519
+    struct rlimit rlim;
520
+    int *sock;
521
+    int nf, maxfd=0;
522
+    fail_unless_fmt(getrlimit(RLIMIT_NOFILE, &rlim) != -1,
523
+		    "Failed to get RLIMIT_NOFILE: %s\n", strerror(errno));
524
+    nf = rlim.rlim_cur - 5;
525
+    sock = malloc(sizeof(int)*nf);
526
+
527
+    fail_unless(!!sock, "malloc failed\n");
528
+
529
+    for (i=0;i<nf;i++) {
530
+	/* just open connections, and let them time out */
531
+	conn_setup();
532
+	sock[i] = sockd;
533
+	if (sockd > maxfd)
534
+	    maxfd = sockd;
535
+    }
536
+    rc = fork();
537
+    fail_unless(rc != -1, "fork() failed: %s\n", strerror(errno));
538
+    if (rc == 0) {
539
+	char dummy;
540
+	int ret;
541
+	fd_set rfds;
542
+	FD_ZERO(&rfds);
543
+	for (i=0;i<nf;i++) {
544
+	    FD_SET(sock[i], &rfds);
545
+	}
546
+	while (1) {
547
+	    ret = select(maxfd+1, &rfds, NULL, NULL, NULL);
548
+	    if (ret < 0)
549
+		break;
550
+	    for (i=0;i<nf;i++) {
551
+		if (FD_ISSET(sock[i], &rfds)) {
552
+		    if (recv(sock[i], &dummy, 1, 0) == 0) {
553
+			close(sock[i]);
554
+			FD_CLR(sock[i], &rfds);
555
+		    }
556
+		}
557
+	    }
558
+	}
559
+	printf("exited\n");
560
+	free(sock);
561
+	exit(0);
562
+    } else {
563
+	for (i=0;i<nf;i++) {
564
+	    close(sock[i]);
565
+	}
566
+	free(sock);
567
+	/* now see if clamd is able to do anything else */
568
+	for (i=0;i<10;i++) {
569
+	    conn_setup();
570
+	    test_command("RELOAD", sizeof("RELOAD")-1, NULL, "RELOADING\n", sizeof("RELOADING\n")-1);
571
+	    conn_teardown();
572
+	}
573
+    }
574
+}
575
+END_TEST
576
+
513 577
 static Suite *test_clamd_suite(void)
514 578
 {
515 579
     Suite *s = suite_create("clamd");
... ...
@@ -528,6 +598,7 @@ static Suite *test_clamd_suite(void)
528 528
     suite_add_tcase(s, tc_stress);
529 529
     tcase_add_test(tc_stress, test_fildes_many);
530 530
     tcase_add_test(tc_stress, test_fildes_unwanted);
531
+    tcase_add_test(tc_stress, test_connections);
531 532
 
532 533
     return s;
533 534
 }
... ...
@@ -12,3 +12,5 @@ TCPSocket 331X
12 12
 ExitOnOOM yes
13 13
 DetectPUA yes
14 14
 ScanPDF yes
15
+CommandReadTimeout 1
16
+MaxQueue 800