clamd/server-th.c
e3aaff8e
 /*
086eab5c
  *  Copyright (C) 2007-2009 Sourcefire, Inc.
  *
  *  Authors: Tomasz Kojm, Trog, Török Edvin
e3aaff8e
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
  *  the Free Software Foundation; either version 2 of the License, or
  *  (at your option) any later version.
  *
  *  This program is distributed in the hope that it will be useful,
  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  *  GNU General Public License for more details.
  *
  *  You should have received a copy of the GNU General Public License
  *  along with this program; if not, write to the Free Software
48b7b4a7
  *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  *  MA 02110-1301, USA.
e3aaff8e
  */
 
98ac8d19
 #if HAVE_CONFIG_H
 #include "clamav-config.h"
 #endif
 
e3aaff8e
 #include <pthread.h>
c238ac42
 #include <errno.h>
e3aaff8e
 #include <signal.h>
c238ac42
 #include <stdio.h>
a9ebff44
 #include <string.h>
c238ac42
 #include <time.h>
a9ebff44
 #include <sys/types.h>
e0bb54d7
 #ifndef	_WIN32
a9ebff44
 #include <sys/socket.h>
9a03413e
 #include <sys/time.h>
 #include <sys/resource.h>
e0bb54d7
 #include <arpa/inet.h>
67118e92
 #endif
 #ifdef	HAVE_UNISTD_H
520cf7eb
 #include <unistd.h>
67118e92
 #endif
bd8603aa
 
0378a9ab
 #include <fcntl.h>
41feff65
 #ifdef C_SOLARIS
 #include <stdio_ext.h>
 #endif
bd8603aa
 #include "libclamav/clamav.h"
 
 #include "shared/output.h"
064b4a0c
 #include "shared/optparser.h"
e3aaff8e
 
 #include "server.h"
c238ac42
 #include "thrmgr.h"
81131381
 #include "session.h"
c238ac42
 #include "clamuko.h"
 #include "others.h"
afb48b28
 #include "shared.h"
9e751804
 #include "libclamav/others.h"
3eba9d83
 #include "libclamav/readdb.h"
370892d0
 #include "libclamav/cltypes.h"
e3aaff8e
 
c238ac42
 #define BUFFSIZE 1024
e3aaff8e
 
ae12e285
 int progexit = 0;
c6266a2c
 pthread_mutex_t exit_mutex = PTHREAD_MUTEX_INITIALIZER;
ae12e285
 int reload = 0;
 time_t reloaded_time = 0;
c6266a2c
 pthread_mutex_t reload_mutex = PTHREAD_MUTEX_INITIALIZER;
ae12e285
 int sighup = 0;
949c6fe5
 static struct cl_stat dbstat;
81131381
 
fc83da82
 static void scanner_thread(void *arg)
e3aaff8e
 {
c238ac42
 	client_conn_t *conn = (client_conn_t *) arg;
e0bb54d7
 #ifndef	_WIN32
e3aaff8e
 	sigset_t sigset;
67118e92
 #endif
5f6edb22
 	int ret;
949c6fe5
 	unsigned virus=0, errors = 0;
c238ac42
 
e0bb54d7
 #ifndef	_WIN32
e3aaff8e
     /* ignore all signals */
     sigfillset(&sigset);
efac2f09
     /* The behavior of a process is undefined after it ignores a 
      * SIGFPE, SIGILL, SIGSEGV, or SIGBUS signal */
     sigdelset(&sigset, SIGFPE);
     sigdelset(&sigset, SIGILL);
     sigdelset(&sigset, SIGSEGV);
 #ifdef SIGBUS
     sigdelset(&sigset, SIGBUS);
 #endif
949c6fe5
     sigdelset(&sigset, SIGTSTP);
     sigdelset(&sigset, SIGCONT);
e3aaff8e
     pthread_sigmask(SIG_SETMASK, &sigset, NULL);
67118e92
 #endif
e3aaff8e
 
949c6fe5
     ret = command(conn, &virus);
     if (ret == -1) {
 	pthread_mutex_lock(&exit_mutex);
 	progexit = 1;
 	pthread_mutex_unlock(&exit_mutex);
 	errors = 1;
     } else
 	errors = ret;
45905a4a
 
deb30312
     thrmgr_setactiveengine(NULL);
949c6fe5
 
     if (conn->filename)
 	free(conn->filename);
fb6fe4f5
     logg("$Finished scanthread\n");
949c6fe5
     if (thrmgr_group_finished(conn->group, virus ? EXIT_OTHER :
 			      errors ? EXIT_ERROR : EXIT_OK)) {
4d26118b
 	logg("$Scanthread: connection shut down (FD %d)\n", conn->sd);
949c6fe5
 	/* close connection if we were last in group */
 	shutdown(conn->sd, 2);
 	closesocket(conn->sd);
     }
370892d0
     cl_engine_free(conn->engine);
5a66732f
     free(conn);
c238ac42
     return;
e3aaff8e
 }
 
b82eea8d
 static int syncpipe_wake_recv_w = -1;
 
c238ac42
 void sighandler_th(int sig)
e3aaff8e
 {
b82eea8d
     int action = 0;
c238ac42
     switch(sig) {
 	case SIGINT:
 	case SIGTERM:
 	    progexit = 1;
b82eea8d
 	    action = 1;
520cf7eb
 	    break;
e3aaff8e
 
67118e92
 #ifdef	SIGHUP
520cf7eb
 	case SIGHUP:
 	    sighup = 1;
b82eea8d
 	    action = 1;
520cf7eb
 	    break;
67118e92
 #endif
d056f4d6
 
67118e92
 #ifdef	SIGUSR2
ae203685
 	case SIGUSR2:
 	    reload = 1;
b82eea8d
 	    action = 1;
ae203685
 	    break;
67118e92
 #endif
ae203685
 
d056f4d6
 	default:
 	    break; /* Take no action on other signals - e.g. SIGPIPE */
c238ac42
     }
b82eea8d
     /* a signal doesn't always wake poll(), for example on FreeBSD */
     if (action && syncpipe_wake_recv_w != -1)
a1c9ad2c
 	if (write(syncpipe_wake_recv_w, "", 1) != 1)
 	    logg("$Failed to write to syncpipe\n");
c238ac42
 }
e3aaff8e
 
064b4a0c
 static struct cl_engine *reload_db(struct cl_engine *engine, unsigned int dboptions, const struct optstruct *opts, int do_check, int *ret)
c238ac42
 {
fb787a06
 	const char *dbdir;
bd8603aa
 	int retval;
6f2f0491
 	unsigned int sigs = 0;
99f817e7
 	struct cl_settings *settings = NULL;
9e431a95
 
2dab0a15
     *ret = 0;
c238ac42
     if(do_check) {
949c6fe5
 	if(!dbstat.entries) {
c238ac42
 	    logg("No stats for Database check - forcing reload\n");
a57e3d41
 	    return engine;
e3aaff8e
 	}
 
949c6fe5
 	if(cl_statchkdir(&dbstat) == 1) {
c238ac42
 	    logg("SelfCheck: Database modification detected. Forcing reload.\n");
a57e3d41
 	    return engine;
c238ac42
 	} else {
 	    logg("SelfCheck: Database status OK.\n");
 	    return NULL;
e3aaff8e
 	}
c238ac42
     }
e3aaff8e
 
395fb661
     /* release old structure */
     if(engine) {
99f817e7
 	/* copy current settings */
 	settings = cl_engine_settings_copy(engine);
 	if(!settings)
 	    logg("^Can't make a copy of the current engine settings\n");
395fb661
 
 	thrmgr_setactiveengine(NULL);
 	cl_engine_free(engine);
     }
 
064b4a0c
     dbdir = optget(opts, "DatabaseDirectory")->strarg;
c238ac42
     logg("Reading databases from %s\n", dbdir);
e3aaff8e
 
949c6fe5
     if(dbstat.entries)
 	cl_statfree(&dbstat);
46c2e927
 
949c6fe5
     memset(&dbstat, 0, sizeof(struct cl_stat));
     if((retval = cl_statinidir(dbdir, &dbstat))) {
cd69cf20
 	logg("!cl_statinidir() failed: %s\n", cl_strerror(retval));
 	*ret = 1;
99f817e7
 	if(settings)
 	    cl_engine_settings_free(settings);
cd69cf20
 	return NULL;
     }
d6449522
 
b8fe70b3
     if(!(engine = cl_engine_new())) {
370892d0
 	logg("!Can't initialize antivirus engine\n");
6f2f0491
 	*ret = 1;
99f817e7
 	if(settings)
 	    cl_engine_settings_free(settings);
6f2f0491
 	return NULL;
120f3c85
     }
 
99f817e7
     if(settings) {
 	retval = cl_engine_settings_apply(engine, settings);
 	if(retval != CL_SUCCESS) {
 	    logg("^Can't apply previous engine settings: %s\n", cl_strerror(retval));
 	    logg("^Using default engine settings\n");
395fb661
 	}
99f817e7
 	cl_engine_settings_free(settings);
c238ac42
     }
46c2e927
 
370892d0
     if((retval = cl_load(dbdir, engine, &sigs, dboptions))) {
2dab0a15
 	logg("!reload db failed: %s\n", cl_strerror(retval));
370892d0
 	cl_engine_free(engine);
2dab0a15
 	*ret = 1;
 	return NULL;
c238ac42
     }
e3aaff8e
 
370892d0
     if((retval = cl_engine_compile(engine)) != 0) {
 	logg("!Database initialization error: can't compile engine: %s\n", cl_strerror(retval));
 	cl_engine_free(engine);
2dab0a15
 	*ret = 1;
 	return NULL;
e3aaff8e
     }
1095156a
     logg("Database correctly reloaded (%u signatures)\n", sigs);
e3aaff8e
 
deb30312
     thrmgr_setactiveengine(engine);
a57e3d41
     return engine;
e3aaff8e
 }
 
4e24a361
 /*
  * zCOMMANDS are delimited by \0
  * nCOMMANDS are delimited by \n
  * Old-style non-prefixed commands are one packet, optionally delimited by \n,
  * with trailing \r|\n ignored
  */
537292a7
 static const char *get_cmd(struct fd_buf *buf, size_t off, size_t *len, char *term, int *oldstyle)
949c6fe5
 {
     unsigned char *pos;
     if (!buf->off || off >= buf->off) {
 	*len = 0;
 	return NULL;
     }
 
     *term = '\n';
456e31a3
     switch (buf->buffer[off]) {
949c6fe5
 	/* commands terminated by delimiters */
 	case 'z':
 	    *term = '\0';
 	case 'n':
 	    pos = memchr(buf->buffer + off, *term, buf->off - off);
 	    if (!pos) {
 		/* we don't have another full command yet */
 		*len = 0;
 		return NULL;
 	    }
 	    *pos = '\0';
 	    if (*term) {
 		*len = cli_chomp(buf->buffer + off);
 	    } else {
 		*len = pos - buf->buffer - off;
 	    }
537292a7
 	    *oldstyle = 0;
949c6fe5
 	    return buf->buffer + off + 1;
 	default:
 	    /* one packet = one command */
4e24a361
 	    if (off)
 		return NULL;
 	    pos = memchr(buf->buffer, '\n', buf->off);
 	    if (pos) {
 		*len = pos - buf->buffer;
 		*pos = '\0';
 	    } else {
 		*len = buf->off;
 		buf->buffer[buf->off] = '\0';
 	    }
 	    cli_chomp(buf->buffer);
537292a7
 	    *oldstyle = 1;
4e24a361
 	    return buf->buffer;
949c6fe5
     }
 }
 
 struct acceptdata {
     struct fd_data fds;
     struct fd_data recv_fds;
5f6edb22
     pthread_cond_t cond_nfds;
     int max_queue;
     int commandtimeout;
949c6fe5
     int syncpipe_wake_recv[2];
     int syncpipe_wake_accept[2];
 };
 
7660b7cb
 #define ACCEPTDATA_INIT(mutex1, mutex2) { FDS_INIT(mutex1), FDS_INIT(mutex2), PTHREAD_COND_INITIALIZER, 0, 0, {-1, -1}, {-1, -1}}
80681b50
 
949c6fe5
 static void *acceptloop_th(void *arg)
 {
     char buff[BUFFSIZE + 1];
     size_t i;
     struct acceptdata *data = (struct acceptdata*)arg;
     struct fd_data *fds = &data->fds;
     struct fd_data *recv_fds = &data->recv_fds;
5f6edb22
     int max_queue = data->max_queue;
     int commandtimeout = data->commandtimeout;
949c6fe5
 
7660b7cb
     pthread_mutex_lock(fds->buf_mutex);
949c6fe5
     for (;;) {
 	/* Block waiting for data to become available for reading */
 	int new_sd = fds_poll_recv(fds, -1, 0);
 
 	/* TODO: what about sockets that get rm-ed? */
 	if (!fds->nfds) {
 	    /* no more sockets to poll, all gave an error */
 	    logg("!Main socket gone: fatal\n");
 	    break;
 	}
 
 	if (new_sd == -1 && errno != EINTR) {
 	    logg("!Failed to poll sockets, fatal\n");
 	    pthread_mutex_lock(&exit_mutex);
 	    progexit = 1;
 	    pthread_mutex_unlock(&exit_mutex);
 	    break;
 	}
 
 	/* accept() loop */
 	for (i=0;i < fds->nfds && new_sd >= 0; i++) {
 	    struct fd_buf *buf = &fds->buf[i];
 	    if (!buf->got_newdata)
 		continue;
 	    if (buf->fd == data->syncpipe_wake_accept[0]) {
 		/* dummy sync pipe, just to wake us */
 		if (read(buf->fd, buff, sizeof(buff)) < 0) {
 		    logg("^Syncpipe read failed\n");
 		}
 		continue;
 	    }
 	    if (buf->got_newdata == -1) {
4d26118b
 		logg("$Acceptloop closed FD: %d\n", buf->fd);
949c6fe5
 		shutdown(buf->fd, 2);
 		closesocket(buf->fd);
 		buf->fd = -1;
 		continue;
 	    }
5f6edb22
 
 	    /* don't accept unlimited number of connections, or
 	     * we'll run out of file descriptors */
7660b7cb
 	    pthread_mutex_lock(recv_fds->buf_mutex);
 	    while (recv_fds->nfds > (unsigned)max_queue) {
5f6edb22
 		pthread_mutex_lock(&exit_mutex);
 		if(progexit) {
 		    pthread_mutex_unlock(&exit_mutex);
 		    break;
 		}
 		pthread_mutex_unlock(&exit_mutex);
7660b7cb
 		pthread_cond_wait(&data->cond_nfds, recv_fds->buf_mutex);
5f6edb22
 	    }
7660b7cb
 	    pthread_mutex_unlock(recv_fds->buf_mutex);
949c6fe5
 
 	    pthread_mutex_lock(&exit_mutex);
 	    if(progexit) {
 		pthread_mutex_unlock(&exit_mutex);
 		break;
 	    }
 	    pthread_mutex_unlock(&exit_mutex);
 
5f6edb22
 	    /* listen only socket */
 	    new_sd = accept(fds->buf[i].fd, NULL, NULL);
 
949c6fe5
 	    if (new_sd >= 0) {
0378a9ab
 		int ret, flags;
 
 #ifdef F_GETFL
 		flags = fcntl(new_sd, F_GETFL, 0);
 		if (flags != -1) {
 		    if (fcntl(new_sd, F_SETFL, flags | O_NONBLOCK) == -1) {
 			logg("^Can't set socket to nonblocking mode, errno %d\n",
 			     errno);
 		    }
 		} else {
 			logg("^Can't get socket flags, errno %d\n", errno);
 		}
4d26118b
 #else
 		logg("^Nonblocking sockets not available!\n");
0378a9ab
 #endif
4d26118b
 		logg("$Got new connection, FD %d\n", new_sd);
7660b7cb
 		pthread_mutex_lock(recv_fds->buf_mutex);
5f6edb22
 		ret = fds_add(recv_fds, new_sd, 0, commandtimeout);
7660b7cb
 		pthread_mutex_unlock(recv_fds->buf_mutex);
949c6fe5
 
 		if (ret == -1) {
 		    logg("!fds_add failed\n");
 		    closesocket(new_sd);
 		    continue;
 		}
 
 		/* notify recvloop */
 		if (write(data->syncpipe_wake_recv[1], "", 1) == -1) {
 		    logg("!write syncpipe failed\n");
 		    continue;
 		}
 	    } else if (errno != EINTR) {
 		/* very bad - need to exit or restart */
 #ifdef HAVE_STRERROR_R
a414efbf
 		strerror_r(errno, buff, BUFFSIZE);
 		logg("!accept() failed: %s\n", buff);
949c6fe5
 #else
 		logg("!accept() failed\n");
 #endif
 		/* give the poll loop a chance to close disconnected FDs */
 		break;
 	    }
 
 	}
 
 	/* handle progexit */
 	pthread_mutex_lock(&exit_mutex);
 	if (progexit) {
 	    pthread_mutex_unlock(&exit_mutex);
 	    break;
 	}
 	pthread_mutex_unlock(&exit_mutex);
     }
7660b7cb
     pthread_mutex_unlock(fds->buf_mutex);
949c6fe5
 
     for (i=0;i < fds->nfds; i++) {
 	if (fds->buf[i].fd == -1)
 	    continue;
4d26118b
 	logg("$Shutdown: closed fd %d\n", fds->buf[i].fd);
949c6fe5
 	shutdown(fds->buf[i].fd, 2);
 	closesocket(fds->buf[i].fd);
     }
     fds_free(fds);
 
     pthread_mutex_lock(&exit_mutex);
     progexit = 1;
     pthread_mutex_unlock(&exit_mutex);
     if (write(data->syncpipe_wake_recv[1], "", 1) < 0) {
1c3895a6
 	logg("$Syncpipe write failed\n");
949c6fe5
     }
 
     return NULL;
 }
 
5d34634e
 static const unsigned char* parse_dispatch_cmd(client_conn_t *conn, struct fd_buf *buf, size_t *ppos, int *error, const struct optstruct *opts, int readtimeout)
 {
     const unsigned char *cmd = NULL;
     int rc;
     size_t cmdlen;
     char term;
     int oldstyle;
     size_t pos = *ppos;
     /* Parse & dispatch commands */
     while ((conn->mode == MODE_COMMAND) &&
 	   (cmd = get_cmd(buf, pos, &cmdlen, &term, &oldstyle)) != NULL) {
 	const char *argument;
 	enum commands cmdtype;
 	if (conn->group && oldstyle) {
 	    logg("$Received oldstyle command inside IDSESSION: %s\n", cmd);
 	    conn_reply_error(conn, "Only nCMDS\\n and zCMDS\\0 are accepted inside IDSESSION.");
 	    *error = 1;
 	    break;
 	}
 	cmdtype = parse_command(cmd, &argument, oldstyle);
 	logg("$got command %s (%u, %u), argument: %s\n",
 	     cmd, (unsigned)cmdlen, (unsigned)cmdtype, argument ? argument : "");
 	if (cmdtype == COMMAND_FILDES) {
 	    if (buf->buffer + buf->off <= cmd + strlen("FILDES\n")) {
 		/* we need the extra byte from recvmsg */
 		conn->mode = MODE_WAITANCILL;
 		buf->mode = MODE_WAITANCILL;
 		/* put term back */
 		buf->buffer[pos + cmdlen] = term;
 		cmdlen = 0;
 		logg("$RECVTH: mode -> MODE_WAITANCILL\n");
 		break;
 	    }
 	    /* eat extra \0 for controlmsg */
 	    cmdlen++;
 	    logg("$RECVTH: FILDES command complete\n");
 	}
 	conn->term = term;
 	buf->term = term;
 
 	if ((rc = execute_or_dispatch_command(conn, cmdtype, argument)) < 0) {
 	    logg("!Command dispatch failed\n");
 	    if(rc == -1 && optget(opts, "ExitOnOOM")->enabled) {
 		pthread_mutex_lock(&exit_mutex);
 		progexit = 1;
 		pthread_mutex_unlock(&exit_mutex);
 	    }
 	    *error = 1;
 	}
 	if (thrmgr_group_need_terminate(conn->group)) {
 	    logg("$Receive thread: have to terminate group\n");
 	    *error = CL_ETIMEOUT;
 	    break;
 	}
 	if (*error || !conn->group || rc) {
 	    if (rc && thrmgr_group_finished(conn->group, EXIT_OK)) {
 		logg("$Receive thread: closing conn (FD %d), group finished\n", conn->sd);
 		/* if there are no more active jobs */
 		shutdown(conn->sd, 2);
 		closesocket(conn->sd);
 		buf->fd = -1;
 		conn->group = NULL;
 	    } else if (conn->mode != MODE_STREAM) {
 		logg("$mode -> MODE_WAITREPLY\n");
 		/* no more commands are accepted */
 		conn->mode = MODE_WAITREPLY;
 		/* Stop monitoring this FD, it will be closed either
 		 * by us, or by the scanner thread. 
 		 * Never close a file descriptor that is being
 		 * monitored by poll()/select() from another thread,
 		 * because this can lead to subtle bugs such as:
 		 * Other thread closes file descriptor -> POLLHUP is
 		 * set, but the poller thread doesn't wake up yet.
 		 * Another client opens a connection and sends some
 		 * data. If the socket reuses the previous file descriptor,
 		 * then POLLIN is set on the file descriptor too.
 		 * When poll() wakes up it sees POLLIN | POLLHUP
 		 * and thinks that the client has sent some data,
 		 * and closed the connection, so clamd closes the
 		 * connection in turn resulting in a bug.
 		 *
 		 * If we wouldn't have poll()-ed the file descriptor
 		 * we closed in another thread, but rather made sure
 		 * that we don't put a FD that we're about to close
 		 * into poll()'s list of watched fds; then POLLHUP
 		 * would be set, but the file descriptor would stay
 		 * open, until we wake up from poll() and close it.
 		 * Thus a new connection won't be able to reuse the
 		 * same FD, and there is no bug.
 		 * */
 		buf->fd = -1;
 	    }
 	}
 	/* we received a command, set readtimeout */
 	time(&buf->timeout_at);
 	buf->timeout_at += readtimeout;
 	pos += cmdlen+1;
 	if (conn->mode == MODE_STREAM) {
 	    /* TODO: this doesn't belong here */
 	    buf->dumpname = conn->filename;
 	    buf->dumpfd = conn->scanfd;
 	    logg("$Receive thread: INSTREAM: %s fd %u\n", buf->dumpname, buf->dumpfd);
 	}
 	if (conn->mode != MODE_COMMAND) {
 	    logg("$Breaking command loop, mode is no longer MODE_COMMAND\n");
 	    break;
 	}
 	conn->id++;
     }
     *ppos = pos;
     buf->mode = conn->mode;
     buf->id = conn->id;
     buf->group = conn->group;
     buf->quota = conn->quota;
     if (conn->scanfd != -1 && conn->scanfd != buf->dumpfd) {
 	logg("$Unclaimed file descriptor received, closing: %d\n", conn->scanfd);
 	close(conn->scanfd);
 	/* protocol error */
 	conn_reply_error(conn, "PROTOCOL ERROR: ancillary data sent without FILDES.");
 	*error = 1;
 	return NULL;
     }
     if (!*error) {
 	/* move partial command to beginning of buffer */
 	if (pos < buf->off) {
 	    memmove (buf->buffer, &buf->buffer[pos], buf->off - pos);
 	    buf->off -= pos;
 	} else
 	    buf->off = 0;
 	if (buf->off)
 	    logg("$Moved partial command: %lu\n", (unsigned long)buf->off);
 	else
 	    logg("$Consumed entire command\n");
     }
     *ppos = pos;
     return cmd;
 }
 
b44fbcf5
 /* static const unsigned char* parse_dispatch_cmd(client_conn_t *conn, struct fd_buf *buf, size_t *ppos, int *error, const struct optstruct *opts, int readtimeout) */
b6de553d
 static int handle_stream(client_conn_t *conn, struct fd_buf *buf, const struct optstruct *opts, int *error, size_t *ppos, int readtimeout)
5d34634e
 {
     int rc;
     size_t pos = *ppos;
     size_t cmdlen;
 
     logg("$mode == MODE_STREAM\n");
b6de553d
     /* we received a chunk, set readtimeout */
     time(&buf->timeout_at);
     buf->timeout_at += readtimeout;
5d34634e
     if (!buf->chunksize) {
 	/* read chunksize */
 	if (buf->off >= 4) {
 	    uint32_t cs = *(uint32_t*)buf->buffer;
 	    buf->chunksize = ntohl(cs);
 	    logg("$Got chunksize: %u\n", buf->chunksize);
 	    if (!buf->chunksize) {
 		/* chunksize 0 marks end of stream */
 		conn->scanfd = buf->dumpfd;
 		conn->term = buf->term;
 		buf->dumpfd = -1;
 		buf->mode = buf->group ? MODE_COMMAND : MODE_WAITREPLY;
 		if (buf->mode == MODE_WAITREPLY)
 		    buf->fd = -1;
 		logg("$Chunks complete\n");
 		buf->dumpname = NULL;
 		if ((rc = execute_or_dispatch_command(conn, COMMAND_INSTREAMSCAN, NULL)) < 0) {
 		    logg("!Command dispatch failed\n");
 		    if(rc == -1 && optget(opts, "ExitOnOOM")->enabled) {
 			pthread_mutex_lock(&exit_mutex);
 			progexit = 1;
 			pthread_mutex_unlock(&exit_mutex);
 		    }
 		    *error = 1;
 		} else {
 		    pos = 4;
 		    memmove (buf->buffer, &buf->buffer[pos], buf->off - pos);
 		    buf->off -= pos;
47d403f8
 		    *ppos = 0;
5d34634e
 		    buf->id++;
 		    return 0;
 		}
 	    }
 	    if (buf->chunksize > buf->quota) {
 		logg("^INSTREAM: Size limit reached, (requested: %lu, max: %lu)\n", 
 		     (unsigned long)buf->chunksize, (unsigned long)buf->quota);
 		conn_reply_error(conn, "INSTREAM size limit exceeded.");
 		*error = 1;
47d403f8
 		*ppos = pos;
5d34634e
 		return -1;
 	    } else {
 		buf->quota -= buf->chunksize;
 	    }
 	    logg("$Quota: %lu\n", buf->quota);
 	    pos = 4;
 	} else
 	    return -1;
     } else
 	pos = 0;
     if (pos + buf->chunksize < buf->off)
 	cmdlen = buf->chunksize;
     else
 	cmdlen = buf->off - pos;
     buf->chunksize -= cmdlen;
     if (cli_writen(buf->dumpfd, buf->buffer + pos, cmdlen) < 0) {
 	conn_reply_error(conn, "Error writing to temporary file");
 	logg("!INSTREAM: Can't write to temporary file.\n");
 	*error = 1;
     }
     logg("$Processed %lu bytes of chunkdata\n", cmdlen);
     pos += cmdlen;
     if (pos == buf->off) {
 	buf->off = 0;
     }
     *ppos = pos;
     return 0;
 }
 
949c6fe5
 int recvloop_th(int *socketds, unsigned nsockets, struct cl_engine *engine, unsigned int dboptions, const struct optstruct *opts)
e3aaff8e
 {
5f6edb22
 	int max_threads, max_queue, readtimeout, ret = 0;
c6dbfbcb
 	unsigned int options = 0;
9e751804
 	char timestr[32];
e0bb54d7
 #ifndef	_WIN32
c238ac42
 	struct sigaction sigact;
9a03413e
 	sigset_t sigset;
 	struct rlimit rlim;
67118e92
 #endif
c238ac42
 	mode_t old_umask;
064b4a0c
 	const struct optstruct *opt;
abf06850
 	char buff[BUFFSIZE + 1];
6b8aa2d0
 	pid_t mainpid;
02b4b0c7
 	int idletimeout;
2accc66f
 	unsigned long long val;
949c6fe5
 	size_t i, j, rr_last = 0;
 	pthread_t accept_th;
7660b7cb
 	pthread_mutex_t fds_mutex = PTHREAD_MUTEX_INITIALIZER;
 	pthread_mutex_t recvfds_mutex = PTHREAD_MUTEX_INITIALIZER;
 	struct acceptdata acceptdata = ACCEPTDATA_INIT(&fds_mutex, &recvfds_mutex);
949c6fe5
 	struct fd_data *fds = &acceptdata.recv_fds;
 	time_t start_time, current_time;
 	unsigned int selfchk;
 	threadpool_t *thr_pool;
6b8aa2d0
 
c238ac42
 #ifdef CLAMUKO
 	pthread_t clamuko_pid;
 	pthread_attr_t clamuko_attr;
afb48b28
 	struct thrarg *tharg = NULL; /* shut up gcc */
c238ac42
 #endif
67118e92
 
e0bb54d7
 #ifndef	_WIN32
c238ac42
 	memset(&sigact, 0, sizeof(struct sigaction));
67118e92
 #endif
e3aaff8e
 
370892d0
     /* set up limits */
68dbfcd9
     if((opt = optget(opts, "MaxScanSize"))->active) {
2accc66f
 	if((ret = cl_engine_set_num(engine, CL_ENGINE_MAX_SCANSIZE, opt->numarg))) {
 	    logg("!cl_engine_set_num(CL_ENGINE_MAX_SCANSIZE) failed: %s\n", cl_strerror(ret));
370892d0
 	    cl_engine_free(engine);
 	    return 1;
e3aaff8e
 	}
     }
2accc66f
     val = cl_engine_get_num(engine, CL_ENGINE_MAX_SCANSIZE, NULL);
     if(val)
     	logg("Limits: Global size limit set to %llu bytes.\n", val);
370892d0
     else
23f5dfbd
     	logg("^Limits: Global size limit protection disabled.\n");
281c7642
 
68dbfcd9
     if((opt = optget(opts, "MaxFileSize"))->active) {
2accc66f
 	if((ret = cl_engine_set_num(engine, CL_ENGINE_MAX_FILESIZE, opt->numarg))) {
 	    logg("!cl_engine_set_num(CL_ENGINE_MAX_FILESIZE) failed: %s\n", cl_strerror(ret));
370892d0
 	    cl_engine_free(engine);
 	    return 1;
 	}
23f5dfbd
     }
2accc66f
     val = cl_engine_get_num(engine, CL_ENGINE_MAX_FILESIZE, NULL);
     if(val)
     	logg("Limits: File size limit set to %llu bytes.\n", val);
370892d0
     else
     	logg("^Limits: File size limit protection disabled.\n");
e3aaff8e
 
e0bb54d7
 #ifndef _WIN32
9a03413e
     if(getrlimit(RLIMIT_FSIZE, &rlim) == 0) {
9390411b
 	if(rlim.rlim_cur < (rlim_t) cl_engine_get_num(engine, CL_ENGINE_MAX_FILESIZE, NULL))
370892d0
 	    logg("^System limit for file size is lower than engine->maxfilesize\n");
9390411b
 	if(rlim.rlim_cur < (rlim_t) cl_engine_get_num(engine, CL_ENGINE_MAX_SCANSIZE, NULL))
370892d0
 	    logg("^System limit for file size is lower than engine->maxscansize\n");
9a03413e
     } else {
 	logg("^Cannot obtain resource limits for file size\n");
     }
 #endif
 
68dbfcd9
     if((opt = optget(opts, "MaxRecursion"))->active) {
2accc66f
 	if((ret = cl_engine_set_num(engine, CL_ENGINE_MAX_RECURSION, opt->numarg))) {
 	    logg("!cl_engine_set_num(CL_ENGINE_MAX_RECURSION) failed: %s\n", cl_strerror(ret));
370892d0
 	    cl_engine_free(engine);
 	    return 1;
 	}
23f5dfbd
     }
2accc66f
     val = cl_engine_get_num(engine, CL_ENGINE_MAX_RECURSION, NULL);
     if(val)
     	logg("Limits: Recursion level limit set to %u.\n", (unsigned int) val);
370892d0
     else
     	logg("^Limits: Recursion level limit protection disabled.\n");
 
68dbfcd9
     if((opt = optget(opts, "MaxFiles"))->active) {
2accc66f
 	if((ret = cl_engine_set_num(engine, CL_ENGINE_MAX_FILES, opt->numarg))) {
 	    logg("!cl_engine_set_num(CL_ENGINE_MAX_FILES) failed: %s\n", cl_strerror(ret));
370892d0
 	    cl_engine_free(engine);
 	    return 1;
 	}
23f5dfbd
     }
2accc66f
     val = cl_engine_get_num(engine, CL_ENGINE_MAX_FILES, NULL);
     if(val)
     	logg("Limits: Files limit set to %u.\n", (unsigned int) val);
370892d0
     else
     	logg("^Limits: Files limit protection disabled.\n");
e3aaff8e
 
e0bb54d7
 #ifndef _WIN32
9390411b
     if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
 	logg("*Limits: Core-dump limit is %lu.\n", (unsigned long)rlim.rlim_cur);
     }
 #endif
23f5dfbd
 
064b4a0c
     if(optget(opts, "ScanArchive")->enabled) {
23f5dfbd
 	logg("Archive support enabled.\n");
 	options |= CL_SCAN_ARCHIVE;
a6945b5d
 
064b4a0c
 	if(optget(opts, "ArchiveBlockEncrypted")->enabled) {
d272908a
 	    logg("Archive: Blocking encrypted archives.\n");
08d6b1e3
 	    options |= CL_SCAN_BLOCKENCRYPTED;
0f34221a
 	}
 
e3aaff8e
     } else {
 	logg("Archive support disabled.\n");
     }
 
064b4a0c
     if(optget(opts, "AlgorithmicDetection")->enabled) {
1b661cef
 	logg("Algorithmic detection enabled.\n");
6fd2fb47
 	options |= CL_SCAN_ALGORITHMIC;
1b661cef
     } else {
 	logg("Algorithmic detection disabled.\n");
     }
 
064b4a0c
     if(optget(opts, "ScanPE")->enabled) {
a9082ea2
 	logg("Portable Executable support enabled.\n");
3805ebcb
 	options |= CL_SCAN_PE;
3f97a1e7
     } else {
 	logg("Portable Executable support disabled.\n");
     }
20c3d44d
 
064b4a0c
     if(optget(opts, "ScanELF")->enabled) {
3f97a1e7
 	logg("ELF support enabled.\n");
 	options |= CL_SCAN_ELF;
     } else {
 	logg("ELF support disabled.\n");
     }
 
064b4a0c
     if(optget(opts, "ScanPE")->enabled || optget(opts, "ScanELF")->enabled) {
 	if(optget(opts, "DetectBrokenExecutables")->enabled) {
20c3d44d
 	    logg("Detection of broken executables enabled.\n");
453581ae
 	    options |= CL_SCAN_BLOCKBROKEN;
20c3d44d
 	}
a9082ea2
     }
 
064b4a0c
     if(optget(opts, "ScanMail")->enabled) {
e3aaff8e
 	logg("Mail files support enabled.\n");
3805ebcb
 	options |= CL_SCAN_MAIL;
a36e6e5c
 
064b4a0c
 	if(optget(opts, "ScanPartialMessages")->enabled) {
4270f93b
 	    logg("Mail: RFC1341 handling enabled.\n");
 	    options |= CL_SCAN_PARTIAL_MESSAGE;
 	}
 
e3aaff8e
     } else {
 	logg("Mail files support disabled.\n");
     }
 
064b4a0c
     if(optget(opts, "ScanOLE2")->enabled) {
e8c9ccdb
 	logg("OLE2 support enabled.\n");
3805ebcb
 	options |= CL_SCAN_OLE2;
e8c9ccdb
     } else {
 	logg("OLE2 support disabled.\n");
     }
 
064b4a0c
     if(optget(opts, "ScanPDF")->enabled) {
c5107e70
 	logg("PDF support enabled.\n");
 	options |= CL_SCAN_PDF;
     } else {
 	logg("PDF support disabled.\n");
     }
 
064b4a0c
     if(optget(opts, "ScanHTML")->enabled) {
888f5794
 	logg("HTML support enabled.\n");
3805ebcb
 	options |= CL_SCAN_HTML;
888f5794
     } else {
 	logg("HTML support disabled.\n");
     }
 
064b4a0c
     if(optget(opts,"PhishingScanURLs")->enabled) {
19b3e182
 
064b4a0c
 	if(optget(opts,"PhishingAlwaysBlockCloak")->enabled) {
19b3e182
 	    options |= CL_SCAN_PHISHING_BLOCKCLOAK; 
 	    logg("Phishing: Always checking for cloaked urls\n");
 	}
 
064b4a0c
 	if(optget(opts,"PhishingAlwaysBlockSSLMismatch")->enabled) {
19b3e182
 	    options |= CL_SCAN_PHISHING_BLOCKSSL;
 	    logg("Phishing: Always checking for ssl mismatches\n");
 	}
     }
 
064b4a0c
     if(optget(opts,"HeuristicScanPrecedence")->enabled) {
7f0d1148
 	    options |= CL_SCAN_HEURISTIC_PRECEDENCE;
 	    logg("Heuristic: precedence enabled\n");
     }
 
064b4a0c
     if(optget(opts, "StructuredDataDetection")->enabled) {
26fbf6bd
         options |= CL_SCAN_STRUCTURED;
 
064b4a0c
 	if((opt = optget(opts, "StructuredMinCreditCardCount"))->enabled) {
2accc66f
 	    if((ret = cl_engine_set_num(engine, CL_ENGINE_MIN_CC_COUNT, opt->numarg))) {
 		logg("!cl_engine_set_num(CL_ENGINE_MIN_CC_COUNT) failed: %s\n", cl_strerror(ret));
370892d0
 		cl_engine_free(engine);
 		return 1;
 	    }
 	}
2accc66f
 	val = cl_engine_get_num(engine, CL_ENGINE_MIN_CC_COUNT, NULL);
 	logg("Structured: Minimum Credit Card Number Count set to %u\n", (unsigned int) val);
370892d0
 
064b4a0c
 	if((opt = optget(opts, "StructuredMinSSNCount"))->enabled) {
2accc66f
 	    if((ret = cl_engine_set_num(engine, CL_ENGINE_MIN_SSN_COUNT, opt->numarg))) {
 		logg("!cl_engine_set_num(CL_ENGINE_MIN_SSN_COUNT) failed: %s\n", cl_strerror(ret));
370892d0
 		cl_engine_free(engine);
 		return 1;
 	    }
 	}
2accc66f
 	val = cl_engine_get_num(engine, CL_ENGINE_MIN_SSN_COUNT, NULL);
         logg("Structured: Minimum Social Security Number Count set to %u\n", (unsigned int) val);
26fbf6bd
 
064b4a0c
         if(optget(opts, "StructuredSSNFormatNormal")->enabled)
26fbf6bd
             options |= CL_SCAN_STRUCTURED_SSN_NORMAL;
 
064b4a0c
         if(optget(opts, "StructuredSSNFormatStripped")->enabled)
26fbf6bd
 	    options |= CL_SCAN_STRUCTURED_SSN_STRIPPED;
     }
 
064b4a0c
     selfchk = optget(opts, "SelfCheck")->numarg;
c238ac42
     if(!selfchk) {
 	logg("Self checking disabled.\n");
     } else {
1095156a
 	logg("Self checking every %u seconds.\n", selfchk);
c238ac42
     }
949c6fe5
     memset(&dbstat, 0, sizeof(dbstat));
e3aaff8e
 
370892d0
     /* save the PID */
     mainpid = getpid();
064b4a0c
     if((opt = optget(opts, "PidFile"))->enabled) {
370892d0
 	    FILE *fd;
cd0d6a0b
 	old_umask = umask(0002);
064b4a0c
 	if((fd = fopen(opt->strarg, "w")) == NULL) {
 	    logg("!Can't save PID in file %s\n", opt->strarg);
370892d0
 	} else {
 	    if (fprintf(fd, "%u", (unsigned int) mainpid)<0) {
064b4a0c
 	    	logg("!Can't save PID in file %s\n", opt->strarg);
370892d0
 	    }
 	    fclose(fd);
 	}
 	umask(old_umask);
     }
 
     logg("*Listening daemon: PID: %u\n", (unsigned int) mainpid);
064b4a0c
     max_threads = optget(opts, "MaxThreads")->numarg;
9390411b
     max_queue = optget(opts, "MaxQueue")->numarg;
5f6edb22
     acceptdata.commandtimeout = optget(opts, "CommandReadTimeout")->numarg;
     readtimeout = optget(opts, "ReadTimeout")->numarg;
370892d0
 
e0bb54d7
 #if !defined(_WIN32) && defined(RLIMIT_NOFILE)
9390411b
     if (getrlimit(RLIMIT_NOFILE, &rlim) == 0) {
b6de553d
 	/* don't warn if default value is too high, silently fix it */
da3e0e40
 	unsigned maxrec;
 	int max_max_queue;
b6de553d
 	unsigned warn = optget(opts, "MaxQueue")->active;
 	const unsigned clamdfiles = 6;
 	/* Condition to not run out of file descriptors:
 	 * MaxThreads * MaxRecursion + (MaxQueue - MaxThreads) + CLAMDFILES < RLIMIT_NOFILE 
 	 * CLAMDFILES is 6: 3 standard FD + logfile + 2 FD for reloading the DB
 	 * */
41feff65
 #ifdef C_SOLARIS
 #ifdef F_BADFD
 	if (enable_extended_FILE_stdio(-1, -1) == -1) {
 	    logg("^Unable to set extended FILE stdio, clamd will be limited to max 256 open files\n");
 	    rlim.rlim_cur = rlim.rlim_cur > 255 ? 255 : rlim.rlim_cur;
 	}
 #elif !defined(_LP64)
 	if (rlim.rlim_cur > 255) {
dbc4970e
 	    rlim.rlim_cur = 255;
41feff65
 	    logg("^Solaris only supports 256 open files for 32-bit processes, you need at least Solaris 10u4, or compile as 64-bit to support more!\n");
 	}
 #endif
 #endif
b6de553d
 	opt = optget(opts,"MaxRecursion");
da3e0e40
 	maxrec = opt->numarg;
 	max_max_queue = rlim.rlim_cur - maxrec * max_threads - clamdfiles + max_threads;
b6de553d
 	if (max_queue < max_threads) {
 	    max_queue = max_threads;
 	    if (warn)
 		logg("^MaxQueue value too low, increasing to: %d\n", max_queue);
 	}
 	if (max_max_queue < max_threads) {
da3e0e40
 	    logg("^MaxThreads * MaxRecursion is too high: %d, open file descriptor limit is: %lu\n",
 		 maxrec*max_threads, (unsigned long)rlim.rlim_cur);
b6de553d
 	    max_max_queue = max_threads;
 	}
 	if (max_queue > max_max_queue) {
 	    max_queue = max_max_queue;
 	    if (warn)
 		logg("^MaxQueue value too high, lowering to: %d\n", max_queue);
 	} else if (max_queue < 2*max_threads && max_queue < max_max_queue) {
 	    max_queue = 2*max_threads;
 	    if (max_queue > max_max_queue)
 		max_queue = max_max_queue;
 	    /* always warn here */
 	    logg("^MaxQueue is lower than twice MaxThreads, increasing to: %d\n", max_queue);
9390411b
 	}
     }
 #endif
b6de553d
     logg("*MaxQueue set to: %d\n", max_queue);
9390411b
     acceptdata.max_queue = max_queue;
 
064b4a0c
     if(optget(opts, "ClamukoScanOnAccess")->enabled)
e3aaff8e
 #ifdef CLAMUKO
32fc1d7b
     {
5dcb8a7a
         do {
edc677c1
 	    if(pthread_attr_init(&clamuko_attr)) break;
cc4232a3
 	    pthread_attr_setdetachstate(&clamuko_attr, PTHREAD_CREATE_JOINABLE);
5dcb8a7a
 	    if(!(tharg = (struct thrarg *) malloc(sizeof(struct thrarg)))) break;
064b4a0c
 	    tharg->opts = opts;
5dcb8a7a
 	    tharg->engine = engine;
 	    tharg->options = options;
edc677c1
 	    if(!pthread_create(&clamuko_pid, &clamuko_attr, clamukoth, tharg)) break;
5dcb8a7a
 	    free(tharg);
 	    tharg=NULL;
 	} while(0);
 	if (!tharg) logg("!Unable to start Clamuko\n");
32fc1d7b
     }
e3aaff8e
 #else
a3e7b8a1
 	logg("Clamuko is not available.\n");
e3aaff8e
 #endif
 
e0bb54d7
 #ifndef	_WIN32
e3aaff8e
     /* set up signal handling */
     sigfillset(&sigset);
     sigdelset(&sigset, SIGINT);
     sigdelset(&sigset, SIGTERM);
b4912e71
     sigdelset(&sigset, SIGSEGV);
f456f28e
     sigdelset(&sigset, SIGHUP);
d056f4d6
     sigdelset(&sigset, SIGPIPE);
ae203685
     sigdelset(&sigset, SIGUSR2);
efac2f09
     /* The behavior of a process is undefined after it ignores a 
      * SIGFPE, SIGILL, SIGSEGV, or SIGBUS signal */
     sigdelset(&sigset, SIGFPE);
     sigdelset(&sigset, SIGILL);
     sigdelset(&sigset, SIGSEGV);
 #ifdef SIGBUS    
     sigdelset(&sigset, SIGBUS);
 #endif
949c6fe5
     sigdelset(&sigset, SIGTSTP);
     sigdelset(&sigset, SIGCONT);
e3aaff8e
     sigprocmask(SIG_SETMASK, &sigset, NULL);
efac2f09
 
b4912e71
     /* SIGINT, SIGTERM, SIGSEGV */
8ac7b1ce
     sigact.sa_handler = sighandler_th;
e3aaff8e
     sigemptyset(&sigact.sa_mask);
     sigaddset(&sigact.sa_mask, SIGINT);
     sigaddset(&sigact.sa_mask, SIGTERM);
f456f28e
     sigaddset(&sigact.sa_mask, SIGHUP);
d056f4d6
     sigaddset(&sigact.sa_mask, SIGPIPE);
ae203685
     sigaddset(&sigact.sa_mask, SIGUSR2);
e3aaff8e
     sigaction(SIGINT, &sigact, NULL);
     sigaction(SIGTERM, &sigact, NULL);
d056f4d6
     sigaction(SIGHUP, &sigact, NULL);
     sigaction(SIGPIPE, &sigact, NULL);
ae203685
     sigaction(SIGUSR2, &sigact, NULL);
67118e92
 #endif
c238ac42
 
064b4a0c
     idletimeout = optget(opts, "IdleTimeout")->numarg;
02b4b0c7
 
949c6fe5
     for (i=0;i < nsockets;i++)
5f6edb22
 	if (fds_add(&acceptdata.fds, socketds[i], 1, 0) == -1) {
949c6fe5
 	    logg("!fds_add failed\n");
 	    cl_engine_free(engine);
 	    return 1;
 	}
 
     if (pipe(acceptdata.syncpipe_wake_recv) == -1 ||
 	(pipe(acceptdata.syncpipe_wake_accept) == -1)) {
 
 	logg("!pipe failed\n");
 	exit(-1);
     }
 
b82eea8d
     syncpipe_wake_recv_w = acceptdata.syncpipe_wake_recv[1];
 
5f6edb22
     if (fds_add(fds, acceptdata.syncpipe_wake_recv[0], 1, 0) == -1 ||
 	fds_add(&acceptdata.fds, acceptdata.syncpipe_wake_accept[0], 1, 0)) {
949c6fe5
 	logg("!failed to add pipe fd\n");
 	exit(-1);
     }
 
     if ((thr_pool = thrmgr_new(max_threads, idletimeout, max_queue, scanner_thread)) == NULL) {
10b04232
 	logg("!thrmgr_new failed\n");
c238ac42
 	exit(-1);
     }
e3aaff8e
 
949c6fe5
     if (pthread_create(&accept_th, NULL, acceptloop_th, &acceptdata)) {
 	logg("!pthread_create failed\n");
 	exit(-1);
     }
e3aaff8e
 
949c6fe5
     time(&start_time);
     for(;;) {
 	int new_sd;
 	/* Block waiting for connection on any of the sockets */
7660b7cb
 	pthread_mutex_lock(fds->buf_mutex);
5f6edb22
 	fds_cleanup(fds);
 	/* signal that we can accept more connections */
7660b7cb
 	if (fds->nfds <= (unsigned)max_queue)
5f6edb22
 	    pthread_cond_signal(&acceptdata.cond_nfds);
7660b7cb
 	new_sd = fds_poll_recv(fds, selfchk ? (int)selfchk : -1, 1);
949c6fe5
 
 	if (!fds->nfds) {
 	    /* at least the dummy/sync pipe should have remained */
 	    logg("!All recv() descriptors gone: fatal\n");
87e91444
 	    pthread_mutex_lock(&exit_mutex);
949c6fe5
 	    progexit = 1;
87e91444
 	    pthread_mutex_unlock(&exit_mutex);
7660b7cb
 	    pthread_mutex_unlock(fds->buf_mutex);
949c6fe5
 	    break;
e3aaff8e
 	}
 
949c6fe5
 	if (new_sd == -1 && errno != EINTR) {
 	    logg("!Failed to poll sockets, fatal\n");
 	    pthread_mutex_lock(&exit_mutex);
 	    progexit = 1;
 	    pthread_mutex_unlock(&exit_mutex);
506a6176
 	}
 
5f6edb22
 
949c6fe5
 	i = (rr_last + 1) % fds->nfds;
 	for (j = 0;  j < fds->nfds && new_sd >= 0; j++, i = (i+1) % fds->nfds) {
 	    size_t pos = 0;
 	    int error = 0;
 	    struct fd_buf *buf = &fds->buf[i];
 	    if (!buf->got_newdata)
 		continue;
 
 	    if (buf->fd == acceptdata.syncpipe_wake_recv[0]) {
 		/* dummy sync pipe, just to wake us */
 		if (read(buf->fd, buff, sizeof(buff)) < 0) {
 		    logg("^Syncpipe read failed\n");
 		}
 		continue;
 	    }
 
 	    if (buf->got_newdata == -1) {
 		if (buf->mode == MODE_WAITREPLY) {
fb6fe4f5
 		    logg("$mode WAIT_REPLY -> closed\n");
949c6fe5
 		    buf->fd = -1;
fb6fe4f5
 		    thrmgr_group_terminate(buf->group);
 		    thrmgr_group_finished(buf->group, EXIT_ERROR);
949c6fe5
 		    continue;
 		} else {
fb6fe4f5
 		    logg("$client read error or EOF on read\n");
949c6fe5
 		    error = 1;
 		}
 	    }
 
5f6edb22
 	    if (buf->fd != -1 && buf->got_newdata == -2) {
fb6fe4f5
 		logg("$Client read timed out\n");
5f6edb22
 		mdprintf(buf->fd, "COMMAND READ TIMED OUT\n");
 		error = 1;
 	    }
 
949c6fe5
 	    rr_last = i;
 	    if (buf->mode == MODE_WAITANCILL) {
 		buf->mode = MODE_COMMAND;
fb6fe4f5
 		logg("$mode -> MODE_COMMAND\n");
949c6fe5
 	    }
 	    while (!error && buf->fd != -1 && buf->buffer && pos < buf->off &&
 		   buf->mode != MODE_WAITANCILL) {
 		client_conn_t conn;
 		const unsigned char *cmd = NULL;
 		int rc;
 		/* New data available to read on socket. */
 
 		memset(&conn, 0, sizeof(conn));
 		conn.scanfd = buf->recvfd;
 		buf->recvfd = -1;
 		conn.sd = buf->fd;
 		conn.options = options;
 		conn.opts = opts;
 		conn.thrpool = thr_pool;
 		conn.engine = engine;
 		conn.group = buf->group;
 		conn.id = buf->id;
 		conn.quota = buf->quota;
 		conn.filename = buf->dumpname;
 		conn.mode = buf->mode;
4e24a361
 		conn.term = buf->term;
949c6fe5
 
5d34634e
 		/* Parse & dispatch command */
 		cmd = parse_dispatch_cmd(&conn, buf, &pos, &error, opts, readtimeout);
949c6fe5
 
 		if (conn.mode == MODE_COMMAND && !cmd)
 		    break;
5d34634e
 		if (!error) {
 		    if (buf->mode == MODE_WAITREPLY && buf->off) {
 			/* Client is not supposed to send anything more */
 			logg("^Client sent garbage after last command: %lu bytes\n", (unsigned long)buf->off);
 			buf->buffer[buf->off] = '\0';
 			logg("$Garbage: %s\n", buf->buffer);
949c6fe5
 			error = 1;
5d34634e
 		    } else if (buf->mode == MODE_STREAM) {
b6de553d
 			rc = handle_stream(&conn, buf, opts, &error, &pos, readtimeout);
5d34634e
 			if (rc == -1)
 			    break;
 			else
 			    continue;
949c6fe5
 		    }
 		}
0378a9ab
 		if (error && error != CL_ETIMEOUT) {
949c6fe5
 		    conn_reply_error(&conn, "Error processing command.");
 		}
 	    }
 	    if (error) {
 		if (buf->dumpfd != -1) {
 		    close(buf->dumpfd);
 		    if (buf->dumpname) {
 			cli_unlink(buf->dumpname);
 			free(buf->dumpname);
 		    }
 		    buf->dumpfd = -1;
 		}
0378a9ab
 		thrmgr_group_terminate(buf->group);
 		if (thrmgr_group_finished(buf->group, EXIT_ERROR)) {
4d26118b
 		    logg("$Shutting down socket after error (FD %d)\n", buf->fd);
949c6fe5
 		    shutdown(buf->fd, 2);
 		    closesocket(buf->fd);
 		} else
fb6fe4f5
 		    logg("$Socket not shut down due to active tasks\n");
949c6fe5
 		buf->fd = -1;
 	    }
506a6176
 	}
7660b7cb
 	pthread_mutex_unlock(fds->buf_mutex);
c238ac42
 
949c6fe5
 	/* handle progexit */
c238ac42
 	pthread_mutex_lock(&exit_mutex);
949c6fe5
 	if (progexit) {
c238ac42
 	    pthread_mutex_unlock(&exit_mutex);
7660b7cb
 	    pthread_mutex_lock(fds->buf_mutex);
949c6fe5
 	    for (i=0;i < fds->nfds; i++) {
 		if (fds->buf[i].fd == -1)
 		    continue;
0378a9ab
 		thrmgr_group_terminate(fds->buf[i].group);
 		if (thrmgr_group_finished(fds->buf[i].group, EXIT_ERROR)) {
4d26118b
 		    logg("$Shutdown closed fd %d\n", fds->buf[i].fd);
949c6fe5
 		    shutdown(fds->buf[i].fd, 2);
 		    closesocket(fds->buf[i].fd);
 		    fds->buf[i].fd = -1;
 		}
 	    }
7660b7cb
 	    pthread_mutex_unlock(fds->buf_mutex);
c238ac42
 	    break;
e3aaff8e
 	}
c238ac42
 	pthread_mutex_unlock(&exit_mutex);
e3aaff8e
 
949c6fe5
 	/* SIGHUP */
 	if (sighup) {
 	    logg("SIGHUP caught: re-opening log file.\n");
 	    logg_close();
 	    sighup = 0;
 	    if(!logg_file && (opt = optget(opts, "LogFile"))->enabled)
 		logg_file = opt->strarg;
 	}
 
 	/* SelfCheck */
c238ac42
 	if(selfchk) {
 	    time(&current_time);
f8e8ab4f
 	    if((current_time - start_time) >= (time_t)selfchk) {
064b4a0c
 		if(reload_db(engine, dboptions, opts, TRUE, &ret)) {
c238ac42
 		    pthread_mutex_lock(&reload_mutex);
 		    reload = 1;
 		    pthread_mutex_unlock(&reload_mutex);
 		}
 		time(&start_time);
 	    }
0f387b1b
 	}
e3aaff8e
 
949c6fe5
 	/* DB reload */
c238ac42
 	pthread_mutex_lock(&reload_mutex);
 	if(reload) {
 	    pthread_mutex_unlock(&reload_mutex);
09196f45
 #ifdef CLAMUKO
 	    if(optget(opts, "ClamukoScanOnAccess")->enabled && tharg) {
 		logg("Stopping and restarting Clamuko.\n");
 		pthread_kill(clamuko_pid, SIGUSR1);
 		pthread_join(clamuko_pid, NULL);
 	    }
 #endif
064b4a0c
 	    engine = reload_db(engine, dboptions, opts, FALSE, &ret);
2dab0a15
 	    if(ret) {
b6e75665
 		logg("Terminating because of a fatal error.\n");
2dab0a15
 		if(new_sd >= 0)
949c6fe5
 		    closesocket(new_sd);
2dab0a15
 		break;
 	    }
04ba76d2
 
45905a4a
 	    pthread_mutex_lock(&reload_mutex);
 	    reload = 0;
ae12e285
 	    time(&reloaded_time);
45905a4a
 	    pthread_mutex_unlock(&reload_mutex);
c238ac42
 #ifdef CLAMUKO
064b4a0c
 	    if(optget(opts, "ClamukoScanOnAccess")->enabled && tharg) {
a57e3d41
 		tharg->engine = engine;
32fc1d7b
 		pthread_create(&clamuko_pid, &clamuko_attr, clamukoth, tharg);
 	    }
2d70a403
 #endif
c238ac42
 	} else {
 	    pthread_mutex_unlock(&reload_mutex);
 	}
     }
e3aaff8e
 
949c6fe5
     pthread_mutex_lock(&exit_mutex);
     progexit = 1;
     pthread_mutex_unlock(&exit_mutex);
     if (write(acceptdata.syncpipe_wake_accept[1], "", 1) < 0) {
 	logg("^Write to syncpipe failed\n");
     }
40107990
     /* Destroy the thread manager.
      * This waits for all current tasks to end
      */
949c6fe5
     logg("*Waiting for all threads to finish\n");
40107990
     thrmgr_destroy(thr_pool);
c238ac42
 #ifdef CLAMUKO
064b4a0c
     if(optget(opts, "ClamukoScanOnAccess")->enabled) {
32fc1d7b
 	logg("Stopping Clamuko.\n");
 	pthread_kill(clamuko_pid, SIGUSR1);
 	pthread_join(clamuko_pid, NULL);
     }
6a2532ca
 #endif
deb30312
     if(engine) {
 	thrmgr_setactiveengine(NULL);
370892d0
 	cl_engine_free(engine);
deb30312
     }
2dab0a15
 
949c6fe5
     pthread_join(accept_th, NULL);
     fds_free(fds);
     close(acceptdata.syncpipe_wake_accept[1]);
     close(acceptdata.syncpipe_wake_recv[1]);
     if(dbstat.entries)
 	cl_statfree(&dbstat);
57358cc8
     logg("*Shutting down the main socket%s.\n", (nsockets > 1) ? "s" : "");
     for (i = 0; i < nsockets; i++)
 	shutdown(socketds[i], 2);
a3e7b8a1
 
064b4a0c
     if((opt = optget(opts, "PidFile"))->enabled) {
 	if(unlink(opt->strarg) == -1)
 	    logg("!Can't unlink the pid file %s\n", opt->strarg);
a3e7b8a1
 	else
 	    logg("Pid file removed.\n");
     }
 
520cf7eb
     time(&current_time);
9e751804
     logg("--- Stopped at %s", cli_ctime(&current_time, timestr, sizeof(timestr)));
520cf7eb
 
c95ab85e
     return ret;
b4912e71
 }