Browse code

bb10979: clamd multi-threaded database reload

Offload the DB load to a separate thread and only replace the current
engine instance afterwards.

While reload is pending:
- existing scan requests use the old db (this is unchanged)
- new scan requests are honored instead of blocked and they also use
the old db (this is new)

After the reload is complete:
- existing scan requests use the old db (this is unchanged)
- new scan requests use the new db (this is unchanged)

The existing engine is refcounted so it'll be eventually freed when no
longer in use.

Reload requests while reload is pending are silently ignored (i.e. It
never forks more than a single reload thread).

Patch courtesy of Alberto Wu. We would also like to thank Julius Plenz
for original work on this issue, as well as Alexander Sulfrian,
Arjen de Korte, David Heidelberg, and Ged Haywood for their work
updating and testing these patches.

Micah Snyder authored on 2020/06/03 04:46:31
Showing 5 changed files
... ...
@@ -552,7 +552,7 @@ int main(int argc, char **argv)
552 552
             break;
553 553
         }
554 554
 
555
-        if ((ret = statinidir_th(dbdir))) {
555
+        if ((ret = statinidir(dbdir))) {
556 556
             logg("!%s\n", cl_strerror(ret));
557 557
             ret = 1;
558 558
             break;
... ...
@@ -744,7 +744,7 @@ int main(int argc, char **argv)
744 744
             break;
745 745
         }
746 746
 
747
-        ret = recvloop_th(lsockets, nlsockets, engine, dboptions, opts);
747
+        ret = recvloop(lsockets, nlsockets, engine, dboptions, opts);
748 748
 
749 749
     } while (0);
750 750
 
... ...
@@ -63,12 +63,33 @@
63 63
 
64 64
 #define BUFFSIZE 1024
65 65
 
66
+typedef enum {
67
+    RELOAD_STAGE__IDLE,
68
+    RELOAD_STAGE__RELOADING,
69
+    RELOAD_STAGE__NEW_DB_AVAILABLE,
70
+} reload_stage_t;
71
+
72
+struct reload_th_t {
73
+    struct cl_settings *settings;
74
+    char *dbdir;
75
+    int dboptions;
76
+};
77
+
78
+/*
79
+ * Global variables
80
+ */
81
+
66 82
 int progexit                 = 0;
67 83
 pthread_mutex_t exit_mutex   = PTHREAD_MUTEX_INITIALIZER;
68 84
 int reload                   = 0;
69 85
 time_t reloaded_time         = 0;
70 86
 pthread_mutex_t reload_mutex = PTHREAD_MUTEX_INITIALIZER;
71 87
 int sighup                   = 0;
88
+
89
+static pthread_mutex_t rldstage_mutex = PTHREAD_MUTEX_INITIALIZER;
90
+static reload_stage_t reload_stage    = RELOAD_STAGE__IDLE; /* protected by rldstage_mutex */
91
+struct cl_engine *g_newengine         = NULL;               /* protected by rldstage_mutex */
92
+
72 93
 extern pthread_mutex_t logg_mutex;
73 94
 static struct cl_stat dbstat;
74 95
 
... ...
@@ -160,89 +181,199 @@ void sighandler_th(int sig)
160 160
             logg("$Failed to write to syncpipe\n");
161 161
 }
162 162
 
163
-static struct cl_engine *reload_db(struct cl_engine *engine, unsigned int dboptions, const struct optstruct *opts, int do_check, int *ret)
163
+static int need_db_reload(void)
164 164
 {
165
-    const char *dbdir;
165
+    if (!dbstat.entries) {
166
+        logg("No stats for Database check - forcing reload\n");
167
+        return TRUE;
168
+    }
169
+    if (cl_statchkdir(&dbstat) == 1) {
170
+        logg("SelfCheck: Database modification detected. Forcing reload.\n");
171
+        return TRUE;
172
+    }
173
+    logg("SelfCheck: Database status OK.\n");
174
+    return FALSE;
175
+}
176
+
177
+/**
178
+ * @brief Thread entry point to load the signature databases & compile a new scanning engine.
179
+ *
180
+ * Once loaded, an event will be set to indicate that the new engine is ready.
181
+ *
182
+ * @param arg   A reload_th_t structure defining the db directory, db settings, engine settings.
183
+ * @return void*
184
+ */
185
+static void *reload_th(void *arg)
186
+{
187
+    cl_error_t status = CL_EMALFDB;
188
+
189
+    struct reload_th_t *rldata = arg;
190
+    struct cl_engine *engine;
191
+    unsigned int sigs = 0;
166 192
     int retval;
167
-    unsigned int sigs            = 0;
168
-    struct cl_settings *settings = NULL;
169
-
170
-    *ret = 0;
171
-    if (do_check) {
172
-        if (!dbstat.entries) {
173
-            logg("No stats for Database check - forcing reload\n");
174
-            return engine;
193
+
194
+    if (NULL == rldata || NULL == rldata->dbdir || NULL == rldata->settings) {
195
+        logg("!reload_th: Invalid arguments, unable to load signature databases.\n");
196
+        status = CL_EARG;
197
+        goto done;
198
+    }
199
+
200
+    logg("Reading databases from %s\n", rldata->dbdir);
201
+
202
+    if (NULL == (engine = cl_engine_new())) {
203
+        logg("!Can't initialize antivirus engine\n");
204
+        goto done;
205
+    }
206
+
207
+    retval = cl_engine_settings_apply(engine, rldata->settings);
208
+    if (CL_SUCCESS != retval) {
209
+        logg("^Can't apply previous engine settings: %s\n", cl_strerror(retval));
210
+        logg("^Using default engine settings\n");
211
+    }
212
+
213
+    retval = cl_load(rldata->dbdir, engine, &sigs, rldata->dboptions);
214
+    if (CL_SUCCESS != retval) {
215
+        logg("!reload_th: database load failed: %s\n", cl_strerror(retval));
216
+        goto done;
217
+    }
218
+
219
+    retval = cl_engine_compile(engine);
220
+    if (CL_SUCCESS != retval) {
221
+        logg("!reload_th: Database initialization error: can't compile engine: %s\n", cl_strerror(retval));
222
+        goto done;
223
+    }
224
+
225
+    logg("Database correctly reloaded (%u signatures)\n", sigs);
226
+    status = CL_SUCCESS;
227
+
228
+done:
229
+
230
+    if (NULL != rldata) {
231
+        if (NULL != rldata->settings) {
232
+            cl_engine_settings_free(rldata->settings);
233
+        }
234
+        if (NULL != rldata->dbdir) {
235
+            free(rldata->dbdir);
175 236
         }
237
+        free(rldata);
238
+    }
176 239
 
177
-        if (cl_statchkdir(&dbstat) == 1) {
178
-            logg("SelfCheck: Database modification detected. Forcing reload.\n");
179
-            return engine;
180
-        } else {
181
-            logg("SelfCheck: Database status OK.\n");
182
-            return NULL;
240
+    if (CL_SUCCESS != status) {
241
+        if (NULL != engine) {
242
+            cl_engine_free(engine);
243
+            engine = NULL;
183 244
         }
184 245
     }
185 246
 
186
-    /* release old structure */
247
+    pthread_mutex_lock(&rldstage_mutex);
248
+    reload_stage = RELOAD_STAGE__NEW_DB_AVAILABLE; /* New DB available */
249
+    g_newengine  = engine;
250
+    pthread_mutex_unlock(&rldstage_mutex);
251
+
252
+#ifdef _WIN32
253
+    SetEvent(event_wake_recv);
254
+#else
255
+    if (syncpipe_wake_recv_w != -1)
256
+        if (write(syncpipe_wake_recv_w, "", 1) != 1)
257
+            logg("$Failed to write to syncpipe\n");
258
+#endif
259
+
260
+    return NULL;
261
+}
262
+
263
+/**
264
+ * @brief Reload the database.
265
+ *
266
+ * @param engine        The current scan engine, used to copy the settings.
267
+ * @param dboptions     The current database options, used to copy the options.
268
+ * @param opts          The command line options, used to get the database directory.
269
+ * @return cl_error_t   CL_SUCCESS if the reload thread was successfully started. This does not mean that the database has reloaded successfully.
270
+ */
271
+static cl_error_t reload_db(struct cl_engine *engine, unsigned int dboptions, const struct optstruct *opts)
272
+{
273
+    cl_error_t status = CL_EMALFDB;
274
+    cl_error_t retval;
275
+    struct reload_th_t *rldata;
276
+    pthread_t th;
277
+    pthread_attr_t th_attr;
278
+
279
+    if (NULL == opts) {
280
+        logg("!reload_db: Invalid arguments, unable to load signature databases.\n");
281
+        status = CL_EARG;
282
+        goto done;
283
+    }
284
+
285
+    rldata = malloc(sizeof(struct reload_th_t));
286
+    if (!rldata) {
287
+        logg("!Failed to allocate reload context\n");
288
+        status = CL_EMEM;
289
+        goto done;
290
+    }
291
+    memset(rldata, 0, sizeof(struct reload_th_t));
292
+
293
+    rldata->dboptions = dboptions;
294
+
187 295
     if (engine) {
188 296
         /* copy current settings */
189
-        settings = cl_engine_settings_copy(engine);
190
-        if (!settings)
191
-            logg("^Can't make a copy of the current engine settings\n");
192
-
193
-        thrmgr_setactiveengine(NULL);
194
-        cl_engine_free(engine);
297
+        rldata->settings = cl_engine_settings_copy(engine);
298
+        if (!rldata->settings) {
299
+            logg("!Can't make a copy of the current engine settings\n");
300
+            goto done;
301
+        }
195 302
     }
196 303
 
197
-    dbdir = optget(opts, "DatabaseDirectory")->strarg;
198
-    logg("Reading databases from %s\n", dbdir);
304
+    rldata->dbdir = strdup(optget(opts, "DatabaseDirectory")->strarg);
305
+    if (!rldata->dbdir) {
306
+        logg("!Can't duplicate the database directory path\n");
307
+        goto done;
308
+    }
199 309
 
200
-    if (dbstat.entries)
310
+    if (dbstat.entries) {
201 311
         cl_statfree(&dbstat);
202
-
312
+    }
203 313
     memset(&dbstat, 0, sizeof(struct cl_stat));
204
-    if ((retval = cl_statinidir(dbdir, &dbstat))) {
314
+
315
+    retval = cl_statinidir(rldata->dbdir, &dbstat);
316
+    if (CL_SUCCESS != retval) {
205 317
         logg("!cl_statinidir() failed: %s\n", cl_strerror(retval));
206
-        *ret = 1;
207
-        if (settings)
208
-            cl_engine_settings_free(settings);
209
-        return NULL;
318
+        goto done;
210 319
     }
211 320
 
212
-    if (!(engine = cl_engine_new())) {
213
-        logg("!Can't initialize antivirus engine\n");
214
-        *ret = 1;
215
-        if (settings)
216
-            cl_engine_settings_free(settings);
217
-        return NULL;
321
+    if (pthread_attr_init(&th_attr)) {
322
+        logg("!Failed to init reload thread attributes\n");
323
+        goto done;
218 324
     }
219 325
 
220
-    if (settings) {
221
-        retval = cl_engine_settings_apply(engine, settings);
222
-        if (retval != CL_SUCCESS) {
223
-            logg("^Can't apply previous engine settings: %s\n", cl_strerror(retval));
224
-            logg("^Using default engine settings\n");
225
-        }
226
-        cl_engine_settings_free(settings);
326
+    pthread_attr_setdetachstate(&th_attr, PTHREAD_CREATE_DETACHED);
327
+    retval = pthread_create(&th, &th_attr, reload_th, rldata);
328
+    if (pthread_attr_destroy(&th_attr))
329
+        logg("^Failed to release reload thread attributes\n");
330
+    if (retval) {
331
+        logg("!Failed to spawn reload thread\n");
332
+        goto done;
227 333
     }
228 334
 
229
-    if ((retval = cl_load(dbdir, engine, &sigs, dboptions))) {
230
-        logg("!reload db failed: %s\n", cl_strerror(retval));
231
-        cl_engine_free(engine);
232
-        *ret = 1;
233
-        return NULL;
234
-    }
335
+    status = CL_SUCCESS;
235 336
 
236
-    if ((retval = cl_engine_compile(engine)) != 0) {
237
-        logg("!Database initialization error: can't compile engine: %s\n", cl_strerror(retval));
238
-        cl_engine_free(engine);
239
-        *ret = 1;
240
-        return NULL;
337
+done:
338
+
339
+    if (CL_SUCCESS != status) {
340
+        /*
341
+         * Failed to spawn reload thread, so we're responsible for cleaning up
342
+         * the rldata structure.
343
+         */
344
+        if (NULL != rldata) {
345
+            if (NULL != rldata->settings) {
346
+                cl_engine_settings_free(rldata->settings);
347
+            }
348
+            if (NULL != rldata->dbdir) {
349
+                free(rldata->dbdir);
350
+            }
351
+            free(rldata);
352
+        }
241 353
     }
242
-    logg("Database correctly reloaded (%u signatures)\n", sigs);
243 354
 
244
-    thrmgr_setactiveengine(engine);
245
-    return engine;
355
+    return status;
246 356
 }
247 357
 
248 358
 /*
... ...
@@ -297,7 +428,7 @@ static const char *get_cmd(struct fd_buf *buf, size_t off, size_t *len, char *te
297 297
     }
298 298
 }
299 299
 
300
-int statinidir_th(const char *dirname)
300
+int statinidir(const char *dirname)
301 301
 {
302 302
     if (!dbstat.entries) {
303 303
         memset(&dbstat, 0, sizeof(dbstat));
... ...
@@ -710,7 +841,7 @@ static int handle_stream(client_conn_t *conn, struct fd_buf *buf, const struct o
710 710
     return 0;
711 711
 }
712 712
 
713
-int recvloop_th(int *socketds, unsigned nsockets, struct cl_engine *engine, unsigned int dboptions, const struct optstruct *opts)
713
+int recvloop(int *socketds, unsigned nsockets, struct cl_engine *engine, unsigned int dboptions, const struct optstruct *opts)
714 714
 {
715 715
     int max_threads, max_queue, readtimeout, ret = 0;
716 716
     struct cl_scan_options options;
... ...
@@ -1532,7 +1663,7 @@ int recvloop_th(int *socketds, unsigned nsockets, struct cl_engine *engine, unsi
1532 1532
         if (selfchk) {
1533 1533
             time(&current_time);
1534 1534
             if ((current_time - start_time) >= (time_t)selfchk) {
1535
-                if (reload_db(engine, dboptions, opts, TRUE, &ret)) {
1535
+                if (need_db_reload()) {
1536 1536
                     pthread_mutex_lock(&reload_mutex);
1537 1537
                     reload = 1;
1538 1538
                     pthread_mutex_unlock(&reload_mutex);
... ...
@@ -1544,24 +1675,35 @@ int recvloop_th(int *socketds, unsigned nsockets, struct cl_engine *engine, unsi
1544 1544
         /* DB reload */
1545 1545
         pthread_mutex_lock(&reload_mutex);
1546 1546
         if (reload) {
1547
-            pthread_mutex_unlock(&reload_mutex);
1548
-
1549
-            engine = reload_db(engine, dboptions, opts, FALSE, &ret);
1550
-            if (ret) {
1551
-                logg("Terminating because of a fatal error.\n");
1552
-                if (new_sd >= 0)
1553
-                    closesocket(new_sd);
1554
-                break;
1547
+            /* Reload was requested */
1548
+            pthread_mutex_lock(&rldstage_mutex);
1549
+            if (reload_stage == RELOAD_STAGE__IDLE) {
1550
+                /* Reloading not already taking place */
1551
+                reload_stage = RELOAD_STAGE__RELOADING;
1552
+                if (CL_SUCCESS != reload_db(engine, dboptions, opts)) {
1553
+                    logg("^Database reload setup failed, keeping the previous instance\n");
1554
+                    reload       = 0;
1555
+                    reload_stage = RELOAD_STAGE__IDLE;
1556
+                }
1557
+            } else if (reload_stage == RELOAD_STAGE__NEW_DB_AVAILABLE) {
1558
+                /* New database available */
1559
+                if (g_newengine) {
1560
+                    /* Reload succeeded */
1561
+                    logg("Activating the newly loaded database...\n");
1562
+                    thrmgr_setactiveengine(g_newengine);
1563
+                    cl_engine_free(engine);
1564
+                    engine      = g_newengine;
1565
+                    g_newengine = NULL;
1566
+                } else {
1567
+                    logg("^Database reload failed, keeping the previous instance\n");
1568
+                }
1569
+                reload_stage = RELOAD_STAGE__IDLE;
1570
+                reload       = 0;
1571
+                time(&reloaded_time);
1555 1572
             }
1556
-
1557
-            pthread_mutex_lock(&reload_mutex);
1558
-            reload = 0;
1559
-            time(&reloaded_time);
1560
-            pthread_mutex_unlock(&reload_mutex);
1561
-            time(&start_time);
1562
-        } else {
1563
-            pthread_mutex_unlock(&reload_mutex);
1573
+            pthread_mutex_unlock(&rldstage_mutex);
1564 1574
         }
1575
+        pthread_mutex_unlock(&reload_mutex);
1565 1576
     }
1566 1577
 
1567 1578
     pthread_mutex_lock(&exit_mutex);
... ...
@@ -37,8 +37,8 @@ struct thrarg {
37 37
     const struct cl_engine *engine;
38 38
 };
39 39
 
40
-int recvloop_th(int *socketds, unsigned nsockets, struct cl_engine *engine, unsigned int dboptions, const struct optstruct *opts);
41
-int statinidir_th(const char *dirname);
40
+int recvloop(int *socketds, unsigned nsockets, struct cl_engine *engine, unsigned int dboptions, const struct optstruct *opts);
41
+int statinidir(const char *dirname);
42 42
 void sighandler(int sig);
43 43
 void sighandler_th(int sig);
44 44
 void sigsegv(int sig);
... ...
@@ -312,6 +312,7 @@ run_reload_test()
312 312
 	grep "ClamAV-RELOAD-TestFile" clamdscan.log >/dev/null 2>/dev/null && die "RELOAD test(1) failed!"
313 313
 	echo "ClamAV-RELOAD-TestFile:0:0:436c616d41562d52454c4f41442d54657374" >test-db/new.ndb
314 314
 	$CLAMDSCAN --reload --config-file=test-clamd.conf || die "clamdscan says reload failed!"
315
+	sleep 1
315 316
 	run_clamdscan reload-testfile
316 317
 	failed=0
317 318
 	grep "ClamAV-RELOAD-TestFile" clamdscan.log >/dev/null 2>/dev/null || die "RELOAD test failed! (after reload)"
... ...
@@ -196,7 +196,7 @@
196 196
    obj:/usr/local/lib/valgrind/memcheck-x86-freebsd
197 197
    fun:send
198 198
    fun:fds_poll_recv
199
-   fun:recvloop_th
199
+   fun:recvloop
200 200
    obj:/lib/libthr.so.3
201 201
 }
202 202
 {
... ...
@@ -207,7 +207,7 @@
207 207
    obj:/usr/local/lib/valgrind/memcheck-x86-freebsd
208 208
    fun:poll
209 209
    fun:fds_poll_recv
210
-   fun:recvloop_th
210
+   fun:recvloop
211 211
    ...
212 212
 }
213 213
 {