Browse code

move the command parsing, and stream handling code into their own functions. No functionality change.

git-svn: trunk@4834

Török Edvin authored on 2009/02/19 06:34:44
Showing 2 changed files
... ...
@@ -1,3 +1,8 @@
1
+Thu Feb 19 00:05:28 EET 2009 (edwin)
2
+------------------------------------
3
+ * clamd/server-th.c: move the command parsing, and stream handling
4
+ code into their own functions.  No functionality change.
5
+
1 6
 Thu Feb 19 00:05:03 EET 2009 (edwin)
2 7
 ------------------------------------
3 8
  * libclamav/regex_suffix.c: n->type has to be first field (revert
... ...
@@ -459,6 +459,219 @@ static void *acceptloop_th(void *arg)
459 459
     return NULL;
460 460
 }
461 461
 
462
+static const unsigned char* parse_dispatch_cmd(client_conn_t *conn, struct fd_buf *buf, size_t *ppos, int *error, const struct optstruct *opts, int readtimeout)
463
+{
464
+    const unsigned char *cmd = NULL;
465
+    int rc;
466
+    size_t cmdlen;
467
+    char term;
468
+    int oldstyle;
469
+    size_t pos = *ppos;
470
+    /* Parse & dispatch commands */
471
+    while ((conn->mode == MODE_COMMAND) &&
472
+	   (cmd = get_cmd(buf, pos, &cmdlen, &term, &oldstyle)) != NULL) {
473
+	const char *argument;
474
+	enum commands cmdtype;
475
+	if (conn->group && oldstyle) {
476
+	    logg("$Received oldstyle command inside IDSESSION: %s\n", cmd);
477
+	    conn_reply_error(conn, "Only nCMDS\\n and zCMDS\\0 are accepted inside IDSESSION.");
478
+	    *error = 1;
479
+	    break;
480
+	}
481
+	cmdtype = parse_command(cmd, &argument, oldstyle);
482
+	logg("$got command %s (%u, %u), argument: %s\n",
483
+	     cmd, (unsigned)cmdlen, (unsigned)cmdtype, argument ? argument : "");
484
+	if (cmdtype == COMMAND_FILDES) {
485
+	    if (buf->buffer + buf->off <= cmd + strlen("FILDES\n")) {
486
+		/* we need the extra byte from recvmsg */
487
+		conn->mode = MODE_WAITANCILL;
488
+		buf->mode = MODE_WAITANCILL;
489
+		/* put term back */
490
+		buf->buffer[pos + cmdlen] = term;
491
+		cmdlen = 0;
492
+		logg("$RECVTH: mode -> MODE_WAITANCILL\n");
493
+		break;
494
+	    }
495
+	    /* eat extra \0 for controlmsg */
496
+	    cmdlen++;
497
+	    logg("$RECVTH: FILDES command complete\n");
498
+	}
499
+	conn->term = term;
500
+	buf->term = term;
501
+
502
+	if ((rc = execute_or_dispatch_command(conn, cmdtype, argument)) < 0) {
503
+	    logg("!Command dispatch failed\n");
504
+	    if(rc == -1 && optget(opts, "ExitOnOOM")->enabled) {
505
+		pthread_mutex_lock(&exit_mutex);
506
+		progexit = 1;
507
+		pthread_mutex_unlock(&exit_mutex);
508
+	    }
509
+	    *error = 1;
510
+	}
511
+	if (thrmgr_group_need_terminate(conn->group)) {
512
+	    logg("$Receive thread: have to terminate group\n");
513
+	    *error = CL_ETIMEOUT;
514
+	    break;
515
+	}
516
+	if (*error || !conn->group || rc) {
517
+	    if (rc && thrmgr_group_finished(conn->group, EXIT_OK)) {
518
+		logg("$Receive thread: closing conn (FD %d), group finished\n", conn->sd);
519
+		/* if there are no more active jobs */
520
+		shutdown(conn->sd, 2);
521
+		closesocket(conn->sd);
522
+		buf->fd = -1;
523
+		conn->group = NULL;
524
+	    } else if (conn->mode != MODE_STREAM) {
525
+		logg("$mode -> MODE_WAITREPLY\n");
526
+		/* no more commands are accepted */
527
+		conn->mode = MODE_WAITREPLY;
528
+		/* Stop monitoring this FD, it will be closed either
529
+		 * by us, or by the scanner thread. 
530
+		 * Never close a file descriptor that is being
531
+		 * monitored by poll()/select() from another thread,
532
+		 * because this can lead to subtle bugs such as:
533
+		 * Other thread closes file descriptor -> POLLHUP is
534
+		 * set, but the poller thread doesn't wake up yet.
535
+		 * Another client opens a connection and sends some
536
+		 * data. If the socket reuses the previous file descriptor,
537
+		 * then POLLIN is set on the file descriptor too.
538
+		 * When poll() wakes up it sees POLLIN | POLLHUP
539
+		 * and thinks that the client has sent some data,
540
+		 * and closed the connection, so clamd closes the
541
+		 * connection in turn resulting in a bug.
542
+		 *
543
+		 * If we wouldn't have poll()-ed the file descriptor
544
+		 * we closed in another thread, but rather made sure
545
+		 * that we don't put a FD that we're about to close
546
+		 * into poll()'s list of watched fds; then POLLHUP
547
+		 * would be set, but the file descriptor would stay
548
+		 * open, until we wake up from poll() and close it.
549
+		 * Thus a new connection won't be able to reuse the
550
+		 * same FD, and there is no bug.
551
+		 * */
552
+		buf->fd = -1;
553
+	    }
554
+	}
555
+	/* we received a command, set readtimeout */
556
+	time(&buf->timeout_at);
557
+	buf->timeout_at += readtimeout;
558
+	pos += cmdlen+1;
559
+	if (conn->mode == MODE_STREAM) {
560
+	    /* TODO: this doesn't belong here */
561
+	    buf->dumpname = conn->filename;
562
+	    buf->dumpfd = conn->scanfd;
563
+	    logg("$Receive thread: INSTREAM: %s fd %u\n", buf->dumpname, buf->dumpfd);
564
+	}
565
+	if (conn->mode != MODE_COMMAND) {
566
+	    logg("$Breaking command loop, mode is no longer MODE_COMMAND\n");
567
+	    break;
568
+	}
569
+	conn->id++;
570
+    }
571
+    *ppos = pos;
572
+    buf->mode = conn->mode;
573
+    buf->id = conn->id;
574
+    buf->group = conn->group;
575
+    buf->quota = conn->quota;
576
+    if (conn->scanfd != -1 && conn->scanfd != buf->dumpfd) {
577
+	logg("$Unclaimed file descriptor received, closing: %d\n", conn->scanfd);
578
+	close(conn->scanfd);
579
+	/* protocol error */
580
+	conn_reply_error(conn, "PROTOCOL ERROR: ancillary data sent without FILDES.");
581
+	*error = 1;
582
+	return NULL;
583
+    }
584
+    if (!*error) {
585
+	/* move partial command to beginning of buffer */
586
+	if (pos < buf->off) {
587
+	    memmove (buf->buffer, &buf->buffer[pos], buf->off - pos);
588
+	    buf->off -= pos;
589
+	} else
590
+	    buf->off = 0;
591
+	if (buf->off)
592
+	    logg("$Moved partial command: %lu\n", (unsigned long)buf->off);
593
+	else
594
+	    logg("$Consumed entire command\n");
595
+    }
596
+    *ppos = pos;
597
+    return cmd;
598
+}
599
+
600
+//static const unsigned char* parse_dispatch_cmd(client_conn_t *conn, struct fd_buf *buf, size_t *ppos, int *error, const struct optstruct *opts, int readtimeout)
601
+static int handle_stream(client_conn_t *conn, struct fd_buf *buf, const struct optstruct *opts, int *error, size_t *ppos)
602
+{
603
+    int rc;
604
+    size_t pos = *ppos;
605
+    size_t cmdlen;
606
+
607
+    logg("$mode == MODE_STREAM\n");
608
+    if (!buf->chunksize) {
609
+	/* read chunksize */
610
+	if (buf->off >= 4) {
611
+	    uint32_t cs = *(uint32_t*)buf->buffer;
612
+	    buf->chunksize = ntohl(cs);
613
+	    logg("$Got chunksize: %u\n", buf->chunksize);
614
+	    if (!buf->chunksize) {
615
+		/* chunksize 0 marks end of stream */
616
+		conn->scanfd = buf->dumpfd;
617
+		conn->term = buf->term;
618
+		buf->dumpfd = -1;
619
+		buf->mode = buf->group ? MODE_COMMAND : MODE_WAITREPLY;
620
+		if (buf->mode == MODE_WAITREPLY)
621
+		    buf->fd = -1;
622
+		logg("$Chunks complete\n");
623
+		buf->dumpname = NULL;
624
+		if ((rc = execute_or_dispatch_command(conn, COMMAND_INSTREAMSCAN, NULL)) < 0) {
625
+		    logg("!Command dispatch failed\n");
626
+		    if(rc == -1 && optget(opts, "ExitOnOOM")->enabled) {
627
+			pthread_mutex_lock(&exit_mutex);
628
+			progexit = 1;
629
+			pthread_mutex_unlock(&exit_mutex);
630
+		    }
631
+		    *error = 1;
632
+		} else {
633
+		    pos = 4;
634
+		    memmove (buf->buffer, &buf->buffer[pos], buf->off - pos);
635
+		    buf->off -= pos;
636
+		    pos = 0;
637
+		    buf->id++;
638
+		    return 0;
639
+		}
640
+	    }
641
+	    if (buf->chunksize > buf->quota) {
642
+		logg("^INSTREAM: Size limit reached, (requested: %lu, max: %lu)\n", 
643
+		     (unsigned long)buf->chunksize, (unsigned long)buf->quota);
644
+		conn_reply_error(conn, "INSTREAM size limit exceeded.");
645
+		*error = 1;
646
+		return -1;
647
+	    } else {
648
+		buf->quota -= buf->chunksize;
649
+	    }
650
+	    logg("$Quota: %lu\n", buf->quota);
651
+	    pos = 4;
652
+	} else
653
+	    return -1;
654
+    } else
655
+	pos = 0;
656
+    if (pos + buf->chunksize < buf->off)
657
+	cmdlen = buf->chunksize;
658
+    else
659
+	cmdlen = buf->off - pos;
660
+    buf->chunksize -= cmdlen;
661
+    if (cli_writen(buf->dumpfd, buf->buffer + pos, cmdlen) < 0) {
662
+	conn_reply_error(conn, "Error writing to temporary file");
663
+	logg("!INSTREAM: Can't write to temporary file.\n");
664
+	*error = 1;
665
+    }
666
+    logg("$Processed %lu bytes of chunkdata\n", cmdlen);
667
+    pos += cmdlen;
668
+    if (pos == buf->off) {
669
+	buf->off = 0;
670
+    }
671
+    *ppos = pos;
672
+    return 0;
673
+}
674
+
462 675
 int recvloop_th(int *socketds, unsigned nsockets, struct cl_engine *engine, unsigned int dboptions, const struct optstruct *opts)
463 676
 {
464 677
 	int max_threads, max_queue, readtimeout, ret = 0;
... ...
@@ -908,205 +1121,25 @@ int recvloop_th(int *socketds, unsigned nsockets, struct cl_engine *engine, unsi
908 908
 		conn.filename = buf->dumpname;
909 909
 		conn.mode = buf->mode;
910 910
 		conn.term = buf->term;
911
-		/* Parse & dispatch commands */
912
-		while ((conn.mode == MODE_COMMAND) &&
913
-		       (cmd = get_cmd(buf, pos, &cmdlen, &term, &oldstyle)) != NULL) {
914
-		    const char *argument;
915
-		    enum commands cmdtype;
916
-		    if (conn.group && oldstyle) {
917
-			logg("$Received oldstyle command inside IDSESSION: %s\n", cmd);
918
-			conn_reply_error(&conn, "Only nCMDS\\n and zCMDS\\0 are accepted inside IDSESSION.");
919
-			error = 1;
920
-			break;
921
-		    }
922
-		    cmdtype = parse_command(cmd, &argument, oldstyle);
923
-		    logg("$got command %s (%u, %u), argument: %s\n",
924
-			 cmd, (unsigned)cmdlen, (unsigned)cmdtype, argument ? argument : "");
925
-		    if (cmdtype == COMMAND_FILDES) {
926
-			if (buf->buffer + buf->off <= cmd + strlen("FILDES\n")) {
927
-			    /* we need the extra byte from recvmsg */
928
-			    conn.mode = MODE_WAITANCILL;
929
-			    buf->mode = MODE_WAITANCILL;
930
-			    /* put term back */
931
-			    buf->buffer[pos + cmdlen] = term;
932
-			    cmdlen = 0;
933
-			    logg("$RECVTH: mode -> MODE_WAITANCILL\n");
934
-			    break;
935
-			}
936
-			/* eat extra \0 for controlmsg */
937
-			cmdlen++;
938
-			logg("$RECVTH: FILDES command complete\n");
939
-		    }
940 911
 
941
-		    conn.term = term;
942
-		    buf->term = term;
912
+		/* Parse & dispatch command */
913
+		cmd = parse_dispatch_cmd(&conn, buf, &pos, &error, opts, readtimeout);
943 914
 
944
-		    if ((rc = execute_or_dispatch_command(&conn, cmdtype, argument)) < 0) {
945
-			logg("!Command dispatch failed\n");
946
-			if(rc == -1 && optget(opts, "ExitOnOOM")->enabled) {
947
-			    pthread_mutex_lock(&exit_mutex);
948
-			    progexit = 1;
949
-			    pthread_mutex_unlock(&exit_mutex);
950
-			}
951
-			error = 1;
952
-		    }
953
-		    if (thrmgr_group_need_terminate(conn.group)) {
954
-			logg("$Receive thread: have to terminate group\n");
955
-			error = CL_ETIMEOUT;
956
-			break;
957
-		    }
958
-		    if (error || !conn.group || rc) {
959
-			if (rc && thrmgr_group_finished(conn.group, EXIT_OK)) {
960
-			    logg("$Receive thread: closing conn (FD %d), group finished\n", conn.sd);
961
-			    /* if there are no more active jobs */
962
-			    shutdown(conn.sd, 2);
963
-			    closesocket(conn.sd);
964
-			    buf->fd = -1;
965
-			    conn.group = NULL;
966
-			} else if (conn.mode != MODE_STREAM) {
967
-			    logg("$mode -> MODE_WAITREPLY\n");
968
-			    /* no more commands are accepted */
969
-			    conn.mode = MODE_WAITREPLY;
970
-			    /* Stop monitoring this FD, it will be closed either
971
-			     * by us, or by the scanner thread. 
972
-			     * Never close a file descriptor that is being
973
-			     * monitored by poll()/select() from another thread,
974
-			     * because this can lead to subtle bugs such as:
975
-			     * Other thread closes file descriptor -> POLLHUP is
976
-			     * set, but the poller thread doesn't wake up yet.
977
-			     * Another client opens a connection and sends some
978
-			     * data. If the socket reuses the previous file descriptor,
979
-			     * then POLLIN is set on the file descriptor too.
980
-			     * When poll() wakes up it sees POLLIN | POLLHUP
981
-			     * and thinks that the client has sent some data,
982
-			     * and closed the connection, so clamd closes the
983
-			     * connection in turn resulting in a bug.
984
-			     *
985
-			     * If we wouldn't have poll()-ed the file descriptor
986
-			     * we closed in another thread, but rather made sure
987
-			     * that we don't put a FD that we're about to close
988
-			     * into poll()'s list of watched fds; then POLLHUP
989
-			     * would be set, but the file descriptor would stay
990
-			     * open, until we wake up from poll() and close it.
991
-			     * Thus a new connection won't be able to reuse the
992
-			     * same FD, and there is no bug.
993
-			     * */
994
-			    buf->fd = -1;
995
-			}
996
-		    }
997
-		    /* we received a command, set readtimeout */
998
-		    time(&buf->timeout_at);
999
-		    buf->timeout_at += readtimeout;
1000
-		    pos += cmdlen+1;
1001
-		    if (conn.mode == MODE_STREAM) {
1002
-			/* TODO: this doesn't belong here */
1003
-			buf->dumpname = conn.filename;
1004
-			buf->dumpfd = conn.scanfd;
1005
-			logg("$Receive thread: INSTREAM: %s fd %u\n", buf->dumpname, buf->dumpfd);
1006
-		    }
1007
-		    if (conn.mode != MODE_COMMAND) {
1008
-			logg("$Breaking command loop, mode is no longer MODE_COMMAND\n");
1009
-			break;
1010
-		    }
1011
-		    conn.id++;
1012
-		}
1013
-		buf->mode = conn.mode;
1014
-		buf->id = conn.id;
1015
-		buf->group = conn.group;
1016
-		buf->quota = conn.quota;
1017
-		if (conn.scanfd != -1 && conn.scanfd != buf->dumpfd) {
1018
-		    logg("$Unclaimed file descriptor received, closing: %d\n", conn.scanfd);
1019
-		    close(conn.scanfd);
1020
-		    /* protocol error */
1021
-		    conn_reply_error(&conn, "PROTOCOL ERROR: ancillary data sent without FILDES.");
1022
-		    error = 1;
1023
-		    break;
1024
-		}
1025
-		if (!error) {
1026
-		    /* move partial command to beginning of buffer */
1027
-		    if (pos < buf->off) {
1028
-			memmove (buf->buffer, &buf->buffer[pos], buf->off - pos);
1029
-			buf->off -= pos;
1030
-		    } else
1031
-			buf->off = 0;
1032
-		    if (buf->off)
1033
-			logg("$Moved partial command: %lu\n", (unsigned long)buf->off);
1034
-		    else
1035
-			logg("$Consumed entire command\n");
1036
-		}
1037 915
 		if (conn.mode == MODE_COMMAND && !cmd)
1038 916
 		    break;
1039
-		if (!error && buf->mode == MODE_WAITREPLY && buf->off) {
1040
-		    /* Client is not supposed to send anything more */
1041
-		    logg("^Client sent garbage after last command: %lu bytes\n", (unsigned long)buf->off);
1042
-		    buf->buffer[buf->off] = '\0';
1043
-		    logg("$Garbage: %s\n", buf->buffer);
1044
-		    error = 1;
1045
-		}
1046
-		if (!error && buf->mode == MODE_STREAM) {
1047
-		    logg("$mode == MODE_STREAM\n");
1048
-		    if (!buf->chunksize) {
1049
-			/* read chunksize */
1050
-			if (buf->off >= 4) {
1051
-			    uint32_t cs = *(uint32_t*)buf->buffer;
1052
-			    buf->chunksize = ntohl(cs);
1053
-			    logg("$Got chunksize: %u\n", buf->chunksize);
1054
-			    if (!buf->chunksize) {
1055
-				/* chunksize 0 marks end of stream */
1056
-				conn.scanfd = buf->dumpfd;
1057
-				conn.term = buf->term;
1058
-				buf->dumpfd = -1;
1059
-				buf->mode = buf->group ? MODE_COMMAND : MODE_WAITREPLY;
1060
-				if (buf->mode == MODE_WAITREPLY)
1061
-				    buf->fd = -1;
1062
-				logg("$Chunks complete\n");
1063
-				buf->dumpname = NULL;
1064
-				if ((rc = execute_or_dispatch_command(&conn, COMMAND_INSTREAMSCAN, NULL)) < 0) {
1065
-				    logg("!Command dispatch failed\n");
1066
-				    if(rc == -1 && optget(opts, "ExitOnOOM")->enabled) {
1067
-					pthread_mutex_lock(&exit_mutex);
1068
-					progexit = 1;
1069
-					pthread_mutex_unlock(&exit_mutex);
1070
-				    }
1071
-				    error = 1;
1072
-				} else {
1073
-				    pos = 4;
1074
-				    memmove (buf->buffer, &buf->buffer[pos], buf->off - pos);
1075
-				    buf->off -= pos;
1076
-				    pos = 0;
1077
-				    buf->id++;
1078
-				    continue;
1079
-				}
1080
-			    }
1081
-			    if (buf->chunksize > buf->quota) {
1082
-				logg("^INSTREAM: Size limit reached, (requested: %lu, max: %lu)\n", 
1083
-				     (unsigned long)buf->chunksize, (unsigned long)buf->quota);
1084
-				conn_reply_error(&conn, "INSTREAM size limit exceeded.");
1085
-				error = 1;
1086
-				break;
1087
-			    } else {
1088
-				buf->quota -= buf->chunksize;
1089
-			    }
1090
-			    logg("$Quota: %lu\n", buf->quota);
1091
-			    pos = 4;
1092
-			} else
1093
-			    break;
1094
-		    } else
1095
-			pos = 0;
1096
-		    if (pos + buf->chunksize < buf->off)
1097
-			cmdlen = buf->chunksize;
1098
-		    else
1099
-			cmdlen = buf->off - pos;
1100
-		    buf->chunksize -= cmdlen;
1101
-		    if (cli_writen(buf->dumpfd, buf->buffer + pos, cmdlen) < 0) {
1102
-			conn_reply_error(&conn, "Error writing to temporary file");
1103
-			logg("!INSTREAM: Can't write to temporary file.\n");
917
+		if (!error) {
918
+		    if (buf->mode == MODE_WAITREPLY && buf->off) {
919
+			/* Client is not supposed to send anything more */
920
+			logg("^Client sent garbage after last command: %lu bytes\n", (unsigned long)buf->off);
921
+			buf->buffer[buf->off] = '\0';
922
+			logg("$Garbage: %s\n", buf->buffer);
1104 923
 			error = 1;
1105
-		    }
1106
-		    logg("$Processed %lu bytes of chunkdata\n", cmdlen);
1107
-		    pos += cmdlen;
1108
-		    if (pos == buf->off) {
1109
-			buf->off = 0;
924
+		    } else if (buf->mode == MODE_STREAM) {
925
+			rc = handle_stream(&conn, buf, opts, &error, &pos);
926
+			if (rc == -1)
927
+			    break;
928
+			else
929
+			    continue;
1110 930
 		    }
1111 931
 		}
1112 932
 		if (error && error != CL_ETIMEOUT) {