Browse code

clamonacc - add consumer queue; add thread pool library; add thread pool support in consumer queue; flesh out consumer queue code; refactor scan functions into thread pool worker functions; refactor scan functions to work off slimmed down params and event metadata instead of a single, giant context; sundry fixups

Mickey Sola authored on 2019/05/10 01:42:33
Showing 16 changed files
... ...
@@ -54,7 +54,12 @@ clamonacc_SOURCES = \
54 54
     ./misc/onaccess_others.h \
55 55
     ./misc/priv_fts.h \
56 56
     ./scan/onaccess_scth.c \
57
-    ./scan/onaccess_scth.h
57
+    ./scan/onaccess_scth.h \
58
+    ./scan/onaccess_scque.c \
59
+    ./scan/onaccess_scque.h \
60
+    ./c-thread-pool/thpool.c \
61
+    ./c-thread-pool/thpool.h
62
+
58 63
 
59 64
 if !SYSTEM_LFS_FTS
60 65
 clamonacc_SOURCES += ./misc/fts.c
61 66
new file mode 100644
... ...
@@ -0,0 +1,21 @@
0
+The MIT License (MIT)
1
+
2
+Copyright (c) 2016 Johan Hanssen Seferidis
3
+
4
+Permission is hereby granted, free of charge, to any person obtaining a copy
5
+of this software and associated documentation files (the "Software"), to deal
6
+in the Software without restriction, including without limitation the rights
7
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
+copies of the Software, and to permit persons to whom the Software is
9
+furnished to do so, subject to the following conditions:
10
+
11
+The above copyright notice and this permission notice shall be included in all
12
+copies or substantial portions of the Software.
13
+
14
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20
+SOFTWARE.
0 21
new file mode 100644
... ...
@@ -0,0 +1,551 @@
0
+/* ********************************
1
+ * Author:       Johan Hanssen Seferidis
2
+ * License:	     MIT
3
+ * Description:  Library providing a threading pool where you can add
4
+ *               work. For usage, check the thpool.h file or README.md
5
+ *
6
+ *//** @file thpool.h *//*
7
+ *
8
+ ********************************/
9
+
10
+#define _POSIX_C_SOURCE 200809L
11
+#include <unistd.h>
12
+#include <signal.h>
13
+#include <stdio.h>
14
+#include <stdlib.h>
15
+#include <pthread.h>
16
+#include <errno.h>
17
+#include <time.h>
18
+#if defined(__linux__)
19
+#include <sys/prctl.h>
20
+#endif
21
+
22
+#include "thpool.h"
23
+
24
+#ifdef THPOOL_DEBUG
25
+#define THPOOL_DEBUG 1
26
+#else
27
+#define THPOOL_DEBUG 0
28
+#endif
29
+
30
+#if !defined(DISABLE_PRINT) || defined(THPOOL_DEBUG)
31
+#define err(str) fprintf(stderr, str)
32
+#else
33
+#define err(str)
34
+#endif
35
+
36
+static volatile int threads_keepalive;
37
+static volatile int threads_on_hold;
38
+
39
+
40
+
41
+/* ========================== STRUCTURES ============================ */
42
+
43
+
44
+/* Binary semaphore */
45
+typedef struct bsem {
46
+	pthread_mutex_t mutex;
47
+	pthread_cond_t   cond;
48
+	int v;
49
+} bsem;
50
+
51
+
52
+/* Job */
53
+typedef struct job{
54
+	struct job*  prev;                   /* pointer to previous job   */
55
+	void   (*function)(void* arg);       /* function pointer          */
56
+	void*  arg;                          /* function's argument       */
57
+} job;
58
+
59
+
60
+/* Job queue */
61
+typedef struct jobqueue{
62
+	pthread_mutex_t rwmutex;             /* used for queue r/w access */
63
+	job  *front;                         /* pointer to front of queue */
64
+	job  *rear;                          /* pointer to rear  of queue */
65
+	bsem *has_jobs;                      /* flag as binary semaphore  */
66
+	int   len;                           /* number of jobs in queue   */
67
+} jobqueue;
68
+
69
+
70
+/* Thread */
71
+typedef struct thread{
72
+	int       id;                        /* friendly id               */
73
+	pthread_t pthread;                   /* pointer to actual thread  */
74
+	struct thpool_* thpool_p;            /* access to thpool          */
75
+} thread;
76
+
77
+
78
+/* Threadpool */
79
+typedef struct thpool_{
80
+	thread**   threads;                  /* pointer to threads        */
81
+	volatile int num_threads_alive;      /* threads currently alive   */
82
+	volatile int num_threads_working;    /* threads currently working */
83
+	pthread_mutex_t  thcount_lock;       /* used for thread count etc */
84
+	pthread_cond_t  threads_all_idle;    /* signal to thpool_wait     */
85
+	jobqueue  jobqueue;                  /* job queue                 */
86
+} thpool_;
87
+
88
+
89
+
90
+
91
+
92
+/* ========================== PROTOTYPES ============================ */
93
+
94
+
95
+static int  thread_init(thpool_* thpool_p, struct thread** thread_p, int id);
96
+static void* thread_do(struct thread* thread_p);
97
+static void  thread_hold(int sig_id);
98
+static void  thread_destroy(struct thread* thread_p);
99
+
100
+static int   jobqueue_init(jobqueue* jobqueue_p);
101
+static void  jobqueue_clear(jobqueue* jobqueue_p);
102
+static void  jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p);
103
+static struct job* jobqueue_pull(jobqueue* jobqueue_p);
104
+static void  jobqueue_destroy(jobqueue* jobqueue_p);
105
+
106
+static void  bsem_init(struct bsem *bsem_p, int value);
107
+static void  bsem_reset(struct bsem *bsem_p);
108
+static void  bsem_post(struct bsem *bsem_p);
109
+static void  bsem_post_all(struct bsem *bsem_p);
110
+static void  bsem_wait(struct bsem *bsem_p);
111
+
112
+
113
+
114
+
115
+
116
+/* ========================== THREADPOOL ============================ */
117
+
118
+
119
+/* Initialise thread pool */
120
+struct thpool_* thpool_init(int num_threads){
121
+
122
+	threads_on_hold   = 0;
123
+	threads_keepalive = 1;
124
+
125
+	if (num_threads < 0){
126
+		num_threads = 0;
127
+	}
128
+
129
+	/* Make new thread pool */
130
+	thpool_* thpool_p;
131
+	thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_));
132
+	if (thpool_p == NULL){
133
+		err("thpool_init(): Could not allocate memory for thread pool\n");
134
+		return NULL;
135
+	}
136
+	thpool_p->num_threads_alive   = 0;
137
+	thpool_p->num_threads_working = 0;
138
+
139
+	/* Initialise the job queue */
140
+	if (jobqueue_init(&thpool_p->jobqueue) == -1){
141
+		err("thpool_init(): Could not allocate memory for job queue\n");
142
+		free(thpool_p);
143
+		return NULL;
144
+	}
145
+
146
+	/* Make threads in pool */
147
+	thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread *));
148
+	if (thpool_p->threads == NULL){
149
+		err("thpool_init(): Could not allocate memory for threads\n");
150
+		jobqueue_destroy(&thpool_p->jobqueue);
151
+		free(thpool_p);
152
+		return NULL;
153
+	}
154
+
155
+	pthread_mutex_init(&(thpool_p->thcount_lock), NULL);
156
+	pthread_cond_init(&thpool_p->threads_all_idle, NULL);
157
+
158
+	/* Thread init */
159
+	int n;
160
+	for (n=0; n<num_threads; n++){
161
+		thread_init(thpool_p, &thpool_p->threads[n], n);
162
+#if THPOOL_DEBUG
163
+			printf("THPOOL_DEBUG: Created thread %d in pool \n", n);
164
+#endif
165
+	}
166
+
167
+	/* Wait for threads to initialize */
168
+	while (thpool_p->num_threads_alive != num_threads) {}
169
+
170
+	return thpool_p;
171
+}
172
+
173
+
174
+/* Add work to the thread pool */
175
+int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p){
176
+	job* newjob;
177
+
178
+	newjob=(struct job*)malloc(sizeof(struct job));
179
+	if (newjob==NULL){
180
+		err("thpool_add_work(): Could not allocate memory for new job\n");
181
+		return -1;
182
+	}
183
+
184
+	/* add function and argument */
185
+	newjob->function=function_p;
186
+	newjob->arg=arg_p;
187
+
188
+	/* add job to queue */
189
+	jobqueue_push(&thpool_p->jobqueue, newjob);
190
+
191
+	return 0;
192
+}
193
+
194
+
195
+/* Wait until all jobs have finished */
196
+void thpool_wait(thpool_* thpool_p){
197
+	pthread_mutex_lock(&thpool_p->thcount_lock);
198
+	while (thpool_p->jobqueue.len || thpool_p->num_threads_working) {
199
+		pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock);
200
+	}
201
+	pthread_mutex_unlock(&thpool_p->thcount_lock);
202
+}
203
+
204
+
205
+/* Destroy the threadpool */
206
+void thpool_destroy(thpool_* thpool_p){
207
+	/* No need to destory if it's NULL */
208
+	if (thpool_p == NULL) return ;
209
+
210
+	volatile int threads_total = thpool_p->num_threads_alive;
211
+
212
+	/* End each thread 's infinite loop */
213
+	threads_keepalive = 0;
214
+
215
+	/* Give one second to kill idle threads */
216
+	double TIMEOUT = 1.0;
217
+	time_t start, end;
218
+	double tpassed = 0.0;
219
+	time (&start);
220
+	while (tpassed < TIMEOUT && thpool_p->num_threads_alive){
221
+		bsem_post_all(thpool_p->jobqueue.has_jobs);
222
+		time (&end);
223
+		tpassed = difftime(end,start);
224
+	}
225
+
226
+	/* Poll remaining threads */
227
+	while (thpool_p->num_threads_alive){
228
+		bsem_post_all(thpool_p->jobqueue.has_jobs);
229
+		sleep(1);
230
+	}
231
+
232
+	/* Job queue cleanup */
233
+	jobqueue_destroy(&thpool_p->jobqueue);
234
+	/* Deallocs */
235
+	int n;
236
+	for (n=0; n < threads_total; n++){
237
+		thread_destroy(thpool_p->threads[n]);
238
+	}
239
+	free(thpool_p->threads);
240
+	free(thpool_p);
241
+}
242
+
243
+
244
+/* Pause all threads in threadpool */
245
+void thpool_pause(thpool_* thpool_p) {
246
+	int n;
247
+	for (n=0; n < thpool_p->num_threads_alive; n++){
248
+		pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1);
249
+	}
250
+}
251
+
252
+
253
+/* Resume all threads in threadpool */
254
+void thpool_resume(thpool_* thpool_p) {
255
+    // resuming a single threadpool hasn't been
256
+    // implemented yet, meanwhile this supresses
257
+    // the warnings
258
+    (void)thpool_p;
259
+
260
+	threads_on_hold = 0;
261
+}
262
+
263
+
264
+int thpool_num_threads_working(thpool_* thpool_p){
265
+	return thpool_p->num_threads_working;
266
+}
267
+
268
+
269
+
270
+
271
+
272
+/* ============================ THREAD ============================== */
273
+
274
+
275
+/* Initialize a thread in the thread pool
276
+ *
277
+ * @param thread        address to the pointer of the thread to be created
278
+ * @param id            id to be given to the thread
279
+ * @return 0 on success, -1 otherwise.
280
+ */
281
+static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id){
282
+
283
+	*thread_p = (struct thread*)malloc(sizeof(struct thread));
284
+	if (thread_p == NULL){
285
+		err("thread_init(): Could not allocate memory for thread\n");
286
+		return -1;
287
+	}
288
+
289
+	(*thread_p)->thpool_p = thpool_p;
290
+	(*thread_p)->id       = id;
291
+
292
+	pthread_create(&(*thread_p)->pthread, NULL, (void *)thread_do, (*thread_p));
293
+	pthread_detach((*thread_p)->pthread);
294
+	return 0;
295
+}
296
+
297
+
298
+/* Sets the calling thread on hold */
299
+static void thread_hold(int sig_id) {
300
+    (void)sig_id;
301
+	threads_on_hold = 1;
302
+	while (threads_on_hold){
303
+		sleep(1);
304
+	}
305
+}
306
+
307
+
308
+/* What each thread is doing
309
+*
310
+* In principle this is an endless loop. The only time this loop gets interuppted is once
311
+* thpool_destroy() is invoked or the program exits.
312
+*
313
+* @param  thread        thread that will run this function
314
+* @return nothing
315
+*/
316
+static void* thread_do(struct thread* thread_p){
317
+
318
+	/* Set thread name for profiling and debuging */
319
+	char thread_name[128] = {0};
320
+	sprintf(thread_name, "thread-pool-%d", thread_p->id);
321
+
322
+#if defined(__linux__)
323
+	/* Use prctl instead to prevent using _GNU_SOURCE flag and implicit declaration */
324
+	prctl(PR_SET_NAME, thread_name);
325
+#elif defined(__APPLE__) && defined(__MACH__)
326
+	pthread_setname_np(thread_name);
327
+#else
328
+	err("thread_do(): pthread_setname_np is not supported on this system");
329
+#endif
330
+
331
+	/* Assure all threads have been created before starting serving */
332
+	thpool_* thpool_p = thread_p->thpool_p;
333
+
334
+	/* Register signal handler */
335
+	struct sigaction act;
336
+	sigemptyset(&act.sa_mask);
337
+	act.sa_flags = 0;
338
+	act.sa_handler = thread_hold;
339
+	if (sigaction(SIGUSR1, &act, NULL) == -1) {
340
+		err("thread_do(): cannot handle SIGUSR1");
341
+	}
342
+
343
+	/* Mark thread as alive (initialized) */
344
+	pthread_mutex_lock(&thpool_p->thcount_lock);
345
+	thpool_p->num_threads_alive += 1;
346
+	pthread_mutex_unlock(&thpool_p->thcount_lock);
347
+
348
+	while(threads_keepalive){
349
+
350
+		bsem_wait(thpool_p->jobqueue.has_jobs);
351
+
352
+		if (threads_keepalive){
353
+
354
+			pthread_mutex_lock(&thpool_p->thcount_lock);
355
+			thpool_p->num_threads_working++;
356
+			pthread_mutex_unlock(&thpool_p->thcount_lock);
357
+
358
+			/* Read job from queue and execute it */
359
+			void (*func_buff)(void*);
360
+			void*  arg_buff;
361
+			job* job_p = jobqueue_pull(&thpool_p->jobqueue);
362
+			if (job_p) {
363
+				func_buff = job_p->function;
364
+				arg_buff  = job_p->arg;
365
+				func_buff(arg_buff);
366
+				free(job_p);
367
+			}
368
+
369
+			pthread_mutex_lock(&thpool_p->thcount_lock);
370
+			thpool_p->num_threads_working--;
371
+			if (!thpool_p->num_threads_working) {
372
+				pthread_cond_signal(&thpool_p->threads_all_idle);
373
+			}
374
+			pthread_mutex_unlock(&thpool_p->thcount_lock);
375
+
376
+		}
377
+	}
378
+	pthread_mutex_lock(&thpool_p->thcount_lock);
379
+	thpool_p->num_threads_alive --;
380
+	pthread_mutex_unlock(&thpool_p->thcount_lock);
381
+
382
+	return NULL;
383
+}
384
+
385
+
386
+/* Frees a thread  */
387
+static void thread_destroy (thread* thread_p){
388
+	free(thread_p);
389
+}
390
+
391
+
392
+
393
+
394
+
395
+/* ============================ JOB QUEUE =========================== */
396
+
397
+
398
+/* Initialize queue */
399
+static int jobqueue_init(jobqueue* jobqueue_p){
400
+	jobqueue_p->len = 0;
401
+	jobqueue_p->front = NULL;
402
+	jobqueue_p->rear  = NULL;
403
+
404
+	jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem));
405
+	if (jobqueue_p->has_jobs == NULL){
406
+		return -1;
407
+	}
408
+
409
+	pthread_mutex_init(&(jobqueue_p->rwmutex), NULL);
410
+	bsem_init(jobqueue_p->has_jobs, 0);
411
+
412
+	return 0;
413
+}
414
+
415
+
416
+/* Clear the queue */
417
+static void jobqueue_clear(jobqueue* jobqueue_p){
418
+
419
+	while(jobqueue_p->len){
420
+		free(jobqueue_pull(jobqueue_p));
421
+	}
422
+
423
+	jobqueue_p->front = NULL;
424
+	jobqueue_p->rear  = NULL;
425
+	bsem_reset(jobqueue_p->has_jobs);
426
+	jobqueue_p->len = 0;
427
+
428
+}
429
+
430
+
431
+/* Add (allocated) job to queue
432
+ */
433
+static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){
434
+
435
+	pthread_mutex_lock(&jobqueue_p->rwmutex);
436
+	newjob->prev = NULL;
437
+
438
+	switch(jobqueue_p->len){
439
+
440
+		case 0:  /* if no jobs in queue */
441
+					jobqueue_p->front = newjob;
442
+					jobqueue_p->rear  = newjob;
443
+					break;
444
+
445
+		default: /* if jobs in queue */
446
+					jobqueue_p->rear->prev = newjob;
447
+					jobqueue_p->rear = newjob;
448
+
449
+	}
450
+	jobqueue_p->len++;
451
+
452
+	bsem_post(jobqueue_p->has_jobs);
453
+	pthread_mutex_unlock(&jobqueue_p->rwmutex);
454
+}
455
+
456
+
457
+/* Get first job from queue(removes it from queue)
458
+<<<<<<< HEAD
459
+ *
460
+ * Notice: Caller MUST hold a mutex
461
+=======
462
+>>>>>>> da2c0fe45e43ce0937f272c8cd2704bdc0afb490
463
+ */
464
+static struct job* jobqueue_pull(jobqueue* jobqueue_p){
465
+
466
+	pthread_mutex_lock(&jobqueue_p->rwmutex);
467
+	job* job_p = jobqueue_p->front;
468
+
469
+	switch(jobqueue_p->len){
470
+
471
+		case 0:  /* if no jobs in queue */
472
+		  			break;
473
+
474
+		case 1:  /* if one job in queue */
475
+					jobqueue_p->front = NULL;
476
+					jobqueue_p->rear  = NULL;
477
+					jobqueue_p->len = 0;
478
+					break;
479
+
480
+		default: /* if >1 jobs in queue */
481
+					jobqueue_p->front = job_p->prev;
482
+					jobqueue_p->len--;
483
+					/* more than one job in queue -> post it */
484
+					bsem_post(jobqueue_p->has_jobs);
485
+
486
+	}
487
+
488
+	pthread_mutex_unlock(&jobqueue_p->rwmutex);
489
+	return job_p;
490
+}
491
+
492
+
493
+/* Free all queue resources back to the system */
494
+static void jobqueue_destroy(jobqueue* jobqueue_p){
495
+	jobqueue_clear(jobqueue_p);
496
+	free(jobqueue_p->has_jobs);
497
+}
498
+
499
+
500
+
501
+
502
+
503
+/* ======================== SYNCHRONISATION ========================= */
504
+
505
+
506
+/* Init semaphore to 1 or 0 */
507
+static void bsem_init(bsem *bsem_p, int value) {
508
+	if (value < 0 || value > 1) {
509
+		err("bsem_init(): Binary semaphore can take only values 1 or 0");
510
+		exit(1);
511
+	}
512
+	pthread_mutex_init(&(bsem_p->mutex), NULL);
513
+	pthread_cond_init(&(bsem_p->cond), NULL);
514
+	bsem_p->v = value;
515
+}
516
+
517
+
518
+/* Reset semaphore to 0 */
519
+static void bsem_reset(bsem *bsem_p) {
520
+	bsem_init(bsem_p, 0);
521
+}
522
+
523
+
524
+/* Post to at least one thread */
525
+static void bsem_post(bsem *bsem_p) {
526
+	pthread_mutex_lock(&bsem_p->mutex);
527
+	bsem_p->v = 1;
528
+	pthread_cond_signal(&bsem_p->cond);
529
+	pthread_mutex_unlock(&bsem_p->mutex);
530
+}
531
+
532
+
533
+/* Post to all threads */
534
+static void bsem_post_all(bsem *bsem_p) {
535
+	pthread_mutex_lock(&bsem_p->mutex);
536
+	bsem_p->v = 1;
537
+	pthread_cond_broadcast(&bsem_p->cond);
538
+	pthread_mutex_unlock(&bsem_p->mutex);
539
+}
540
+
541
+
542
+/* Wait on semaphore until semaphore has value 0 */
543
+static void bsem_wait(bsem* bsem_p) {
544
+	pthread_mutex_lock(&bsem_p->mutex);
545
+	while (bsem_p->v != 1) {
546
+		pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex);
547
+	}
548
+	bsem_p->v = 0;
549
+	pthread_mutex_unlock(&bsem_p->mutex);
550
+}
0 551
new file mode 100644
... ...
@@ -0,0 +1,187 @@
0
+/**********************************
1
+ * @author      Johan Hanssen Seferidis
2
+ * License:     MIT
3
+ *
4
+ **********************************/
5
+
6
+#ifndef _THPOOL_
7
+#define _THPOOL_
8
+
9
+#ifdef __cplusplus
10
+extern "C" {
11
+#endif
12
+
13
+/* =================================== API ======================================= */
14
+
15
+
16
+typedef struct thpool_* threadpool;
17
+
18
+
19
+/**
20
+ * @brief  Initialize threadpool
21
+ *
22
+ * Initializes a threadpool. This function will not return untill all
23
+ * threads have initialized successfully.
24
+ *
25
+ * @example
26
+ *
27
+ *    ..
28
+ *    threadpool thpool;                     //First we declare a threadpool
29
+ *    thpool = thpool_init(4);               //then we initialize it to 4 threads
30
+ *    ..
31
+ *
32
+ * @param  num_threads   number of threads to be created in the threadpool
33
+ * @return threadpool    created threadpool on success,
34
+ *                       NULL on error
35
+ */
36
+threadpool thpool_init(int num_threads);
37
+
38
+
39
+/**
40
+ * @brief Add work to the job queue
41
+ *
42
+ * Takes an action and its argument and adds it to the threadpool's job queue.
43
+ * If you want to add to work a function with more than one arguments then
44
+ * a way to implement this is by passing a pointer to a structure.
45
+ *
46
+ * NOTICE: You have to cast both the function and argument to not get warnings.
47
+ *
48
+ * @example
49
+ *
50
+ *    void print_num(int num){
51
+ *       printf("%d\n", num);
52
+ *    }
53
+ *
54
+ *    int main() {
55
+ *       ..
56
+ *       int a = 10;
57
+ *       thpool_add_work(thpool, (void*)print_num, (void*)a);
58
+ *       ..
59
+ *    }
60
+ *
61
+ * @param  threadpool    threadpool to which the work will be added
62
+ * @param  function_p    pointer to function to add as work
63
+ * @param  arg_p         pointer to an argument
64
+ * @return 0 on successs, -1 otherwise.
65
+ */
66
+int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p);
67
+
68
+
69
+/**
70
+ * @brief Wait for all queued jobs to finish
71
+ *
72
+ * Will wait for all jobs - both queued and currently running to finish.
73
+ * Once the queue is empty and all work has completed, the calling thread
74
+ * (probably the main program) will continue.
75
+ *
76
+ * Smart polling is used in wait. The polling is initially 0 - meaning that
77
+ * there is virtually no polling at all. If after 1 seconds the threads
78
+ * haven't finished, the polling interval starts growing exponentially
79
+ * untill it reaches max_secs seconds. Then it jumps down to a maximum polling
80
+ * interval assuming that heavy processing is being used in the threadpool.
81
+ *
82
+ * @example
83
+ *
84
+ *    ..
85
+ *    threadpool thpool = thpool_init(4);
86
+ *    ..
87
+ *    // Add a bunch of work
88
+ *    ..
89
+ *    thpool_wait(thpool);
90
+ *    puts("All added work has finished");
91
+ *    ..
92
+ *
93
+ * @param threadpool     the threadpool to wait for
94
+ * @return nothing
95
+ */
96
+void thpool_wait(threadpool);
97
+
98
+
99
+/**
100
+ * @brief Pauses all threads immediately
101
+ *
102
+ * The threads will be paused no matter if they are idle or working.
103
+ * The threads return to their previous states once thpool_resume
104
+ * is called.
105
+ *
106
+ * While the thread is being paused, new work can be added.
107
+ *
108
+ * @example
109
+ *
110
+ *    threadpool thpool = thpool_init(4);
111
+ *    thpool_pause(thpool);
112
+ *    ..
113
+ *    // Add a bunch of work
114
+ *    ..
115
+ *    thpool_resume(thpool); // Let the threads start their magic
116
+ *
117
+ * @param threadpool    the threadpool where the threads should be paused
118
+ * @return nothing
119
+ */
120
+void thpool_pause(threadpool);
121
+
122
+
123
+/**
124
+ * @brief Unpauses all threads if they are paused
125
+ *
126
+ * @example
127
+ *    ..
128
+ *    thpool_pause(thpool);
129
+ *    sleep(10);              // Delay execution 10 seconds
130
+ *    thpool_resume(thpool);
131
+ *    ..
132
+ *
133
+ * @param threadpool     the threadpool where the threads should be unpaused
134
+ * @return nothing
135
+ */
136
+void thpool_resume(threadpool);
137
+
138
+
139
+/**
140
+ * @brief Destroy the threadpool
141
+ *
142
+ * This will wait for the currently active threads to finish and then 'kill'
143
+ * the whole threadpool to free up memory.
144
+ *
145
+ * @example
146
+ * int main() {
147
+ *    threadpool thpool1 = thpool_init(2);
148
+ *    threadpool thpool2 = thpool_init(2);
149
+ *    ..
150
+ *    thpool_destroy(thpool1);
151
+ *    ..
152
+ *    return 0;
153
+ * }
154
+ *
155
+ * @param threadpool     the threadpool to destroy
156
+ * @return nothing
157
+ */
158
+void thpool_destroy(threadpool);
159
+
160
+
161
+/**
162
+ * @brief Show currently working threads
163
+ *
164
+ * Working threads are the threads that are performing work (not idle).
165
+ *
166
+ * @example
167
+ * int main() {
168
+ *    threadpool thpool1 = thpool_init(2);
169
+ *    threadpool thpool2 = thpool_init(2);
170
+ *    ..
171
+ *    printf("Working threads: %d\n", thpool_num_threads_working(thpool1));
172
+ *    ..
173
+ *    return 0;
174
+ * }
175
+ *
176
+ * @param threadpool     the threadpool of interest
177
+ * @return integer       number of threads working
178
+ */
179
+int thpool_num_threads_working(threadpool);
180
+
181
+
182
+#ifdef __cplusplus
183
+}
184
+#endif
185
+
186
+#endif
... ...
@@ -47,6 +47,7 @@
47 47
 #include "./client/onaccess_client.h"
48 48
 #include "./fanotif/onaccess_fan.h"
49 49
 #include "./inotif/onaccess_ddd.h"
50
+#include "./scan/onaccess_scque.h"
50 51
 
51 52
 
52 53
 pthread_t ddd_pid = 0;
... ...
@@ -80,6 +81,20 @@ int main(int argc, char **argv)
80 80
 	}
81 81
 	ctx->clamdopts = clamdopts;
82 82
 
83
+        /* Setup our event queue */
84
+        switch(onas_scanque_start(&ctx)) {
85
+            case CL_SUCCESS:
86
+                break;
87
+            case CL_BREAK:
88
+            case CL_EARG:
89
+            case CL_ECREAT:
90
+            default:
91
+                ret = 2;
92
+                logg("!Clamonacc: can't setup event consumer queue\n");
93
+                goto clean_up;
94
+                break;
95
+        }
96
+
83 97
 	/* Setup our client */
84 98
 	switch(onas_setup_client(&ctx)) {
85 99
 		case CL_SUCCESS:
... ...
@@ -45,9 +45,9 @@ struct onas_context {
45 45
 
46 46
         int fan_fd;
47 47
         uint64_t fan_mask;
48
-        int retry_on_error;
49
-        int retry_attempts;
50
-        int deny_on_error;
48
+        uint8_t retry_on_error;
49
+        uint8_t retry_attempts;
50
+        uint8_t deny_on_error;
51 51
 
52 52
         uint64_t sizelimit;
53 53
         uint64_t extinfo;
... ...
@@ -55,6 +55,7 @@ struct onas_context {
55 55
         int scantype;
56 56
         int isremote;
57 57
         int session;
58
+	int timeout;
58 59
 
59 60
         int64_t portnum;
60 61
 } __attribute__((packed));
... ...
@@ -160,7 +160,7 @@ int onas_check_remote(struct onas_context  **ctx, cl_error_t *err) {
160 160
 	}
161 161
 }
162 162
 
163
-CURLcode onas_curl_init(CURL **curl, char *ipaddr, int64_t port, int64_t timeout) {
163
+CURLcode onas_curl_init(CURL **curl, const char *ipaddr, int64_t port, int64_t timeout) {
164 164
 
165 165
 	CURLcode curlcode = CURLE_OK;
166 166
 
... ...
@@ -281,6 +281,8 @@ cl_error_t onas_setup_client (struct onas_context **ctx) {
281 281
         return CL_EARG;
282 282
     }
283 283
 
284
+    (*ctx)->timeout = optget((*ctx)->clamdopts, "OnAccessCurlTimeout")->numarg;
285
+
284 286
     (*ctx)->isremote = onas_check_remote(ctx, &err);
285 287
     if (err) {
286 288
         return CL_EARG;
... ...
@@ -406,30 +408,24 @@ int onas_get_clamd_version(struct onas_context **ctx)
406 406
     return 0;
407 407
 }
408 408
 
409
-int onas_client_scan(struct onas_context **ctx, const char *fname, STATBUF sb, int *infected, int *err, cl_error_t *ret_code)
409
+int onas_client_scan(const char *tcpaddr, int64_t portnum, int32_t scantype, uint64_t maxstream, const char *fname, int64_t timeout, STATBUF sb, int *infected, int *err, cl_error_t *ret_code)
410 410
 {
411 411
 	CURL *curl = NULL;
412 412
 	CURLcode curlcode = CURLE_OK;
413
-	int scantype, errors = 0;
413
+	int errors = 0;
414 414
 	int sockd, ret;
415
-    int64_t timeout;
416
-
417
-    timeout = optget((*ctx)->clamdopts, "OnAccessCurlTimeout")->numarg;
418
-
419 415
 
420 416
 	*infected = 0;
421 417
 
422 418
 	if((sb.st_mode & S_IFMT) != S_IFREG) {
423 419
 		scantype = STREAM;
424
-	} else {
425
-		scantype = (*ctx)->scantype;
426
-        }
420
+	}
427 421
 
428
-	curlcode = onas_curl_init(&curl, optget((*ctx)->clamdopts, "TCPAddr")->strarg, (*ctx)->portnum, timeout);
422
+	curlcode = onas_curl_init(&curl, tcpaddr, portnum, timeout);
429 423
 	if (CURLE_OK != curlcode) {
430 424
 		logg("!ClamClient: could not setup curl with tcp address and port, %s\n", curl_easy_strerror(curlcode));
431
-		/* curl cleanup done in ons_curl_init on error */
432
-		return 2;
425
+		/* curl cleanup done in onas_curl_init on error */
426
+		return CL_ECREAT;
433 427
 	}
434 428
 
435 429
 	/* logg here is noisy even for debug, enable only for dev work if something has gone very wrong. */
... ...
@@ -437,11 +433,11 @@ int onas_client_scan(struct onas_context **ctx, const char *fname, STATBUF sb, i
437 437
 	curlcode = curl_easy_perform(curl);
438 438
 	if (CURLE_OK != curlcode) {
439 439
 		logg("!ClamClient: could not establish connection, %s\n", curl_easy_strerror(curlcode));
440
-		return 2;
440
+		return CL_ECREAT;
441 441
 	}
442 442
 
443 443
 
444
-	if((ret = onas_dsresult(ctx, curl, scantype, fname, &ret, err, ret_code)) >= 0) {
444
+	if((ret = onas_dsresult(curl, scantype, maxstream, fname, timeout, &ret, err, ret_code)) >= 0) {
445 445
 		*infected = ret;
446 446
 	} else {
447 447
 		logg("*ClamClient: connection could not be established ... return code %d\n", *ret_code);
... ...
@@ -451,6 +447,6 @@ int onas_client_scan(struct onas_context **ctx, const char *fname, STATBUF sb, i
451 451
 	//logg("*ClamClient: done, closing connection ...\n");
452 452
 
453 453
 	curl_easy_cleanup(curl);
454
-	return *infected ? 1 : (errors ? 2 : 0);
454
+	return *infected ? CL_VIRUS : (errors ? CL_ECREAT : CL_CLEAN);
455 455
 }
456 456
 
... ...
@@ -37,8 +37,8 @@ enum {
37 37
 };
38 38
 
39 39
 
40
-int onas_client_scan(struct onas_context **ctx, const char *fname, STATBUF sb, int *infected, int *err, cl_error_t *ret_code);
41
-CURLcode onas_curl_init(CURL **curl, char *ipaddr, int64_t port, int64_t timeout);
40
+int onas_client_scan(const char *tcpaddr, int64_t portnum, int32_t scantype, uint64_t maxstream, const char *fname, int64_t timeout, STATBUF sb, int *infected, int *err, cl_error_t *ret_code);
41
+CURLcode onas_curl_init(CURL **curl, const char *ipaddr, int64_t port, int64_t timeout);
42 42
 int onas_get_clamd_version(struct onas_context **ctx);
43 43
 cl_error_t onas_setup_client(struct onas_context **ctx);
44 44
 int onas_check_remote(struct onas_context  **ctx, cl_error_t *err);
... ...
@@ -73,13 +73,10 @@ static const char *scancmd[] = { "CONTSCAN", "MULTISCAN", "INSTREAM", "FILDES",
73 73
 
74 74
 /* Issues an INSTREAM command to clamd and streams the given file
75 75
  * Returns >0 on success, 0 soft fail, -1 hard fail */
76
-static int onas_send_stream(struct onas_context **ctx, CURL *curl, const char *filename) {
76
+static int onas_send_stream(CURL *curl, const char *filename, int64_t timeout, uint64_t maxstream) {
77 77
 	uint32_t buf[BUFSIZ/sizeof(uint32_t)];
78
-	int fd, len;
79
-	unsigned long int todo = (*ctx)->maxstream;
80
-        int64_t timeout;
81
-
82
-        timeout = optget((*ctx)->clamdopts, "OnAccessCurlTimeout")->numarg;
78
+    	uint64_t fd, len;
79
+        uint64_t todo = maxstream;
83 80
 
84 81
 	if(filename) {
85 82
 		if((fd = safe_open(filename, O_RDONLY | O_BINARY))<0) {
... ...
@@ -94,7 +91,7 @@ static int onas_send_stream(struct onas_context **ctx, CURL *curl, const char *f
94 94
 	}
95 95
 
96 96
 	while((len = read(fd, &buf[1], sizeof(buf) - sizeof(uint32_t))) > 0) {
97
-		if((unsigned int)len > todo) len = todo;
97
+		if((uint64_t)len > todo) len = todo;
98 98
 		buf[0] = htonl(len);
99 99
 		if (onas_sendln(curl, (const char *)buf, len+sizeof(uint32_t), timeout)) {
100 100
 			close(fd);
... ...
@@ -162,39 +159,15 @@ static int onas_send_fdpass(CURL *curl, const char *filename, int64_t timeout) {
162 162
 }
163 163
 #endif
164 164
 
165
-/* 0: scan, 1: skip */
166
-static int chkpath(struct onas_context **ctx, const char *path)
167
-{
168
-	const struct optstruct *opt;
169
-
170
-	if((opt = optget((*ctx)->clamdopts, "ExcludePath"))->enabled) {
171
-		while(opt) {
172
-			if(match_regex(path, opt->strarg) == 1) {
173
-				if ((*ctx)->printinfected != 1)
174
-					logg("~%s: Excluded\n", path);
175
-				return 1;
176
-			}
177
-			opt = opt->nextarg;
178
-		}
179
-	}
180
-	return 0;
181
-}
182
-
183 165
 /* Sends a proper scan request to clamd and parses its replies
184 166
  * This is used only in non IDSESSION mode
185 167
  * Returns the number of infected files or -1 on error
186 168
  * NOTE: filename may be NULL for STREAM scantype. */
187
-int onas_dsresult(struct onas_context **ctx, CURL *curl, int scantype, const char *filename, int *printok, int *errors, cl_error_t *ret_code) {
169
+int onas_dsresult(CURL *curl, int scantype, uint64_t maxstream, const char *filename, int64_t timeout, int *printok, int *errors, cl_error_t *ret_code) {
188 170
 	int infected = 0, len = 0, beenthere = 0;
189 171
 	char *bol, *eol;
190 172
 	struct RCVLN rcv;
191 173
 	STATBUF sb;
192
-        int64_t timeout;
193
-
194
-        timeout = optget((*ctx)->clamdopts, "OnAccessCurlTimeout")->numarg;
195
-
196
-	if(filename && chkpath(ctx, filename))
197
-		return 0;
198 174
 
199 175
 	onas_recvlninit(&rcv, curl);
200 176
 
... ...
@@ -234,7 +207,7 @@ int onas_dsresult(struct onas_context **ctx, CURL *curl, int scantype, const cha
234 234
 
235 235
 		case STREAM:
236 236
 			/* NULL filename safe in send_stream() */
237
-			len = onas_send_stream(ctx, curl, filename);
237
+			len = onas_send_stream(curl, filename, timeout, maxstream);
238 238
 			break;
239 239
 #ifdef HAVE_FD_PASSING
240 240
 		case FILDES:
... ...
@@ -2,7 +2,7 @@
2 2
  *  Copyright (C) 2015 Cisco Systems, Inc. and/or its affiliates. All rights reserved.
3 3
  *  Copyright (C) 2009 Sourcefire, Inc.
4 4
  *
5
- *  Authors: Tomasz Kojm, aCaB
5
+ *  Authors: Tomasz Kojm, aCaB, Mickey Sola
6 6
  *
7 7
  *  This program is free software; you can redistribute it and/or modify
8 8
  *  it under the terms of the GNU General Public License version 2 as
... ...
@@ -27,6 +27,5 @@
27 27
 #include "shared/misc.h"
28 28
 #include "../clamonacc.h"
29 29
 
30
-/*int onas_dconnect(struct onas_context **ctx);*/
31
-int onas_dsresult(struct onas_context **ctx, CURL *curl, int scantype, const char *filename, int *printok, int *errors, cl_error_t *ret_code);
30
+int onas_dsresult(CURL *curl, int scantype, uint64_t maxstream, const char *filename, int64_t timeout, int *printok, int *errors, cl_error_t *ret_code);
32 31
 #endif
33 32
new file mode 100644
... ...
@@ -0,0 +1,22 @@
0
+COPYRIGHT AND PERMISSION NOTICE
1
+
2
+Copyright (c) 1996 - 2019, Daniel Stenberg, <daniel@haxx.se>, and many
3
+contributors, see the THANKS file.
4
+
5
+All rights reserved.
6
+
7
+Permission to use, copy, modify, and distribute this software for any purpose
8
+with or without fee is hereby granted, provided that the above copyright
9
+notice and this permission notice appear in all copies.
10
+
11
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
12
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
13
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT OF THIRD PARTY RIGHTS. IN
14
+NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
15
+DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
16
+OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
17
+OR OTHER DEALINGS IN THE SOFTWARE.
18
+
19
+Except as contained in this notice, the name of a copyright holder shall not
20
+be used in advertising or otherwise to promote the sale, use or other dealings
21
+in this Software without prior written authorization of the copyright holder.
... ...
@@ -54,8 +54,10 @@
54 54
 #include "../client/onaccess_client.h"
55 55
 
56 56
 #include "../scan/onaccess_scth.h"
57
+#include "../scan/onaccess_scque.h"
57 58
 
58 59
 extern pthread_t ddd_pid;
60
+extern pthread_t scque_pid;
59 61
 
60 62
 /*static void onas_fan_exit(int sig)
61 63
 {
... ...
@@ -68,42 +70,13 @@ extern pthread_t ddd_pid;
68 68
         pthread_join(ddd_pid, NULL);
69 69
     }
70 70
 
71
-    pthread_exit(NULL);
72
-	logg("ClamFanotif: stopped\n");
73
-}*/
74
-
75
-/* TODO: rework this to feed multithreading consumer queue
76
- * static int onas_fan_scanfile(const char *fname, struct fanotify_event_metadata *fmd, STATBUF sb, int scan, struct onas_context **ctx)
77
-{
78
-    struct fanotify_response res;
79
-        int infected = 0;
80
-        int err = 0;
81
-    int ret             = 0;
82
-	int i = 0;
83
-	cl_error_t ret_code = 0;
84
-
85
-    res.fd       = fmd->fd;
86
-    res.response = FAN_ALLOW;
87
-
88
-    if (scan) {
89
-		ret = onas_scan(ctx, fname, sb, &infected, &err, &ret_code);
90
-
91
-		if (err && ret_code != CL_SUCCESS) {
92
-			logg("*ClamFanotif: scan failed with error code %d\n", ret_code);
93
-		}
94
-
95
-		if ((err && ret_code && (*ctx)->deny_on_error) || infected) {
96
-            res.response = FAN_DENY;
97
-        }
71
+        if (scque_pid > 0) {
72
+		pthread_kill(ddd_pid, SIGUSR1);
73
+		pthread_join(ddd_pid, NULL);
98 74
     }
99 75
 
100
-    if (fmd->mask & FAN_ALL_PERM_EVENTS) {
101
-		ret = write((*ctx)->fan_fd, &res, sizeof(res));
102
-        if (ret == -1)
103
-			logg("!ClamFanotif: internal error (can't write to fanotify)\n");
104
-    }
105
-
106
-    return ret;
76
+	pthread_exit(NULL);
77
+	logg("ClamFanotif: stopped\n");
107 78
 }*/
108 79
 
109 80
 cl_error_t onas_setup_fanotif(struct onas_context **ctx) {
... ...
@@ -186,9 +159,6 @@ cl_error_t onas_setup_fanotif(struct onas_context **ctx) {
186 186
 
187 187
 	extinfo = optget((*ctx)->clamdopts, "ExtendedDetectionInfo")->enabled;
188 188
 
189
-	//(*ctx)->sizelimit = sizelimit;
190
-	//(*ctx)->extinfo = extinfo;
191
-
192 189
 	return CL_SUCCESS;
193 190
 }
194 191
 
... ...
@@ -239,12 +209,7 @@ int onas_fan_eloop(struct onas_context **ctx) {
239 239
 
240 240
 				if((check = onas_fan_checkowner(fmd->pid, (*ctx)->clamdopts))) {
241 241
                     scan = 0;
242
-/* TODO: Re-enable OnAccessExtraScanning once the thread resource consumption issue is resolved. */
243
-#if 0
244
-					if ((check != CHK_SELF) || !(optget(tharg->opts, "OnAccessExtraScanning")->enabled))
245
-#else
246 242
                     if (check != CHK_SELF) {
247
-#endif
248 243
 							logg("*ClamFanotif: %s skipped (excluded UID)\n", fname);
249 244
                 }
250 245
             }
... ...
@@ -253,23 +218,26 @@ int onas_fan_eloop(struct onas_context **ctx) {
253 253
 					struct onas_scan_event *event_data;
254 254
 
255 255
 					event_data = cli_calloc(1, sizeof(struct onas_scan_event));
256
+                                        if (NULL == event_data) {
257
+					    logg("!ClamFanotif: could not allocate memory for event data struct\n");
258
+                                            return 2;
259
+                                        }
256 260
 
257
-					event_data->b_fanotify = 1;
261
+                                        /* general mapping */
262
+                                        onas_map_context_info_to_event_data(*ctx, &event_data);
263
+					scan ? event_data->bool_opts |= ONAS_SCTH_B_SCAN : scan;
264
+
265
+                                        /* fanotify specific stuffs */
266
+					event_data->bool_opts |= ONAS_SCTH_B_FANOTIFY;
258 267
 					event_data->fmd = fmd;
259
-					event_data->b_scan = scan;
260 268
 
261
-					/* TODO: rework to feed consumer queue */
262
-					if (onas_scth_handle_file(ctx, fname, event_data) == -1) {
269
+					/* feed consumer queue */
270
+					if (CL_SUCCESS != onas_queue_event(event_data)) {
263 271
                 close(fmd->fd);
264
-						logg("!ClamFanotif: unrecoverable fanotify error occurred :(\n");
272
+						logg("!ClamFanotif: error occurred while feeding consumer queue :(\n");
265 273
 						return 2;
266 274
 					}
267 275
                                 }
268
-
269
-            if (close(fmd->fd) == -1) {
270
-					printf("!ClamFanotif: internal error (close(%d) failed)\n", fmd->fd);
271
-						return 2;
272
-            }
273 276
         }
274 277
         fmd = FAN_EVENT_NEXT(fmd, bread);
275 278
     }
276 279
new file mode 100644
... ...
@@ -0,0 +1,290 @@
0
+/*
1
+ *  Copyright (C) 2019 Cisco Systems, Inc. and/or its affiliates. All rights reserved.
2
+ *
3
+ *  Authors: Mickey Sola
4
+ *
5
+ *  This program is free software; you can redistribute it and/or modify
6
+ *  it under the terms of the GNU General Public License version 2 as
7
+ *  published by the Free Software Foundation.
8
+ *
9
+ *  This program is distributed in the hope that it will be useful,
10
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
+ *  GNU General Public License for more details.
13
+ *
14
+ *  You should have received a copy of the GNU General Public License
15
+ *  along with this program; if not, write to the Free Software
16
+ *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17
+ *  MA 02110-1301, USA.
18
+ */
19
+
20
+#if HAVE_CONFIG_H
21
+#include "clamav-config.h"
22
+#endif
23
+
24
+#if defined(FANOTIFY)
25
+
26
+#include <stdio.h>
27
+#include <stdlib.h>
28
+#include <unistd.h>
29
+#include <sys/types.h>
30
+#include <sys/stat.h>
31
+#include <fcntl.h>
32
+#include <signal.h>
33
+#include <pthread.h>
34
+#include <string.h>
35
+#include <errno.h>
36
+#include <stdbool.h>
37
+
38
+#include <sys/fanotify.h>
39
+#include <sys/inotify.h>
40
+
41
+#include "../fanotif/onaccess_fan.h"
42
+#include "onaccess_hash.h"
43
+#include "onaccess_ddd.h"
44
+#include "../scan/onaccess_scth.h"
45
+#include "../misc/onaccess_others.h"
46
+
47
+#include "libclamav/clamav.h"
48
+#include "libclamav/scanners.h"
49
+
50
+#include "shared/optparser.h"
51
+#include "shared/output.h"
52
+
53
+#include "clamd/server.h"
54
+#include "clamd/others.h"
55
+#include "clamd/scanner.h"
56
+
57
+#include "../c-thread-pool/thpool.h"
58
+
59
+static void onas_scanque_exit(int sig);
60
+static int onas_consume_event(struct *event_data);
61
+
62
+static pthread_mutex_t onas_queue_lock = PTHREAD_MUTEX_INITIALIZER;
63
+
64
+static threadpool g_thpool;
65
+
66
+static struct onas_event_queue_node *g_onas_event_queue_head = NULL;
67
+static struct onas_event_queue_node *g_onas_event_queue_tail = NULL;
68
+
69
+static struct onas_event_queue g_onas_event_queue = {
70
+    head = g_onas_event_queue_head,
71
+    tail = g_onas_event_queue_tail,
72
+
73
+    size = 0;
74
+};
75
+
76
+static void *onas_init_event_queue() {
77
+    *g_onas_event_queue_head = (struct event_queue_node) {
78
+        .next = NULL,
79
+            .prev = NULL,
80
+
81
+            .data = NULL
82
+    };
83
+
84
+    *g_onas_event_queue_tail = &(struct event_queue_node) {
85
+        .next = NULL,
86
+            .prev = NULL,
87
+
88
+            .data = NULL
89
+    };
90
+
91
+    g_onas_event_queue_tail->prev = g_onas_event_queue_head;
92
+    g_onas_event_queue_head->next = g_onas_event_queue_tail;
93
+}
94
+
95
+extern pthread_t scque_pid;
96
+
97
+static cl_error_t onas_new_event_queue_node(struct event_queue_node **node) {
98
+
99
+	*node = malloc(sizeof(struct onas_event_queue));
100
+	if (NULL == *node) {
101
+		return CL_EMEM;
102
+	}
103
+
104
+
105
+	**node = (struct event_queue_node) {
106
+		.next = NULL,
107
+		.prev = NULL,
108
+
109
+		.data = NULL
110
+	};
111
+
112
+	return CL_SUCCESS;
113
+}
114
+
115
+static void onas_destroy_event_queue_node(struct event_queue_node *node) {
116
+
117
+	if (NULL == node) {
118
+		return;
119
+	}
120
+
121
+	node->next = NULL;
122
+	node->prev = NULL;
123
+	node->data = NULL;
124
+
125
+	free(node);
126
+	node = NULL;
127
+
128
+	return;
129
+}
130
+
131
+static void onas_destroy_event_queue() {
132
+
133
+	struct onas_event_queue_node *curr = g_onas_event_queue_head;
134
+	struct onas_event_queue_node *next = curr->next;
135
+
136
+	do {
137
+		onas_destroy_event_queue_node(curr);
138
+		curr = next;
139
+		if (curr) {
140
+			next = curr->next;
141
+		}
142
+	} while (curr);
143
+
144
+	return;
145
+}
146
+
147
+
148
+void *onas_scanque_th(void *arg) {
149
+
150
+	/* not a ton of use for context right now, but perhaps in the future we can pass in more options */
151
+	struct onas_context *ctx = (struct onas_context *) arg;
152
+	sigset_t sigset;
153
+	struct sigaction act;
154
+	const struct optstruct *pt;
155
+	int ret, len, idx;
156
+
157
+        cl_error_t err;
158
+
159
+	/* ignore all signals except SIGUSR1 */
160
+	sigfillset(&sigset);
161
+	sigdelset(&sigset, SIGUSR1);
162
+	/* The behavior of a process is undefined after it ignores a
163
+	 * SIGFPE, SIGILL, SIGSEGV, or SIGBUS signal */
164
+	sigdelset(&sigset, SIGFPE);
165
+	sigdelset(&sigset, SIGILL);
166
+	sigdelset(&sigset, SIGSEGV);
167
+#ifdef SIGBUS
168
+	sigdelset(&sigset, SIGBUS);
169
+#endif
170
+	pthread_sigmask(SIG_SETMASK, &sigset, NULL);
171
+	memset(&act, 0, sizeof(struct sigaction));
172
+	act.sa_handler = onas_scanque_exit;
173
+	sigfillset(&(act.sa_mask));
174
+	sigaction(SIGUSR1, &act, NULL);
175
+	sigaction(SIGSEGV, &act, NULL);
176
+
177
+        onas_init_event_queue();
178
+        threadpool thpool = thpool_init(ctx->maxthreads);
179
+	g_thpool = thpool;
180
+
181
+        /* loop w/ onas_consume_event until we die */
182
+	do {
183
+		/* if there's no event to consume ... */
184
+		if (!onas_consume_event(thpool)) {
185
+			/* sleep for a bit */
186
+			usleep(500);
187
+		}
188
+	} while(1);
189
+
190
+	return;
191
+}
192
+
193
+static int onas_queue_is_b_empty() {
194
+
195
+    if (g_onas_event_queue->head->next == g_onas_event_queue->tail) {
196
+        return 1;
197
+    }
198
+
199
+    return 0;
200
+}
201
+
202
+static int onas_consume_event(threadpool thpool) {
203
+
204
+    pthread_mutex_lock(&onas_queue_lock);
205
+
206
+    struct onas_event_queue_node *popped_node = g_onas_event_queue_head->next;
207
+
208
+    /* TODO: create scth arg using head event data, use get queue head here before lock*/
209
+    if (onas_queue_is_b_empty()) {
210
+        return 1;
211
+    }
212
+
213
+    thpool_add_work(thpool, (void *) onas_scan_worker, (void *) popped_node->data);
214
+
215
+    g_onas_event_queue_head->next = g_onas_event_queue_head->next->next;
216
+    g_onas_event_queue_head->next->prev = g_onas_event_head;
217
+
218
+    onas_destroy_event_queue_node(popped_node);
219
+
220
+    g_onas_event_queue->size--;
221
+
222
+    pthread_mutex_unlock(&onas_queue_lock);
223
+    return 0;
224
+}
225
+
226
+cl_error_t onas_queue_event(struct onas_scan_event *event_data) {
227
+
228
+    pthread_mutex_lock(&onas_queue_lock);
229
+
230
+    struct onas_event_queue_node *node = NULL;
231
+
232
+    if (CL_EMEM == onas_new_event_queue_node(&node)) {
233
+	    return CL_EMEM;
234
+    }
235
+
236
+    node->next = g_onas_event_queue_tail;
237
+    node->prev = g_onas_event_queue_tail->prev;
238
+
239
+    node->data = event_data;
240
+
241
+    /* tail will always have a .prev */
242
+    ((struct onas_event_queue_node *) g_onas_event_queue_tail->prev)->next = node;
243
+    g_onas_event_queue_tail->prev = node;
244
+
245
+    g_onas_event_queue->size++;
246
+
247
+    pthread_mutex_unlock(&onas_queue_lock);
248
+
249
+    return CL_SUCCESS;
250
+}
251
+
252
+cl_error_t onas_scanque_start(struct onas_context **ctx) {
253
+
254
+	pthread_attr_t scque_attr;
255
+	int32_t thread_started = 1;
256
+
257
+	if (!ctx || !*ctx) {
258
+		logg("*ClamQueue: unable to start clamonacc. (bad context)\n");
259
+		return CL_EARG;
260
+	}
261
+
262
+        if(pthread_attr_init(&scque_attr)) {
263
+            return CL_BREAK;
264
+        }
265
+        pthread_attr_setdetachstate(&scque_attr, PTHREAD_CREATE_JOINABLE);
266
+	thread_started = pthread_create(&scque_pid, &scque_attr, onas_scanque_th, *ctx);
267
+
268
+	if (0 != thread_started) {
269
+		/* Failed to create thread */
270
+		logg("*ClamQueue: Unable to start event consumer queue thread ... \n");
271
+		return CL_ECREAT;
272
+	}
273
+
274
+	return CL_SUCCESS;
275
+}
276
+
277
+static void onas_scanque_exit(int sig) {
278
+
279
+	logg("*ClamScanque: onas_scanque_exit(), signal %d\n", sig);
280
+
281
+        /* TODO: cleanup queue struct */
282
+	onas_destroy_event_queue();
283
+	thpool_destroy(g_thpool);
284
+
285
+	pthread_exit(NULL);
286
+	logg("ClamScanque: stopped\n");
287
+}
288
+
289
+#endif
0 290
new file mode 100644
... ...
@@ -0,0 +1,43 @@
0
+/*
1
+ *  Copyright (C) 2019 Cisco Systems, Inc. and/or its affiliates. All rights reserved.
2
+ *
3
+ *  Authors: Mickey Sola
4
+ *
5
+ *  This program is free software; you can redistribute it and/or modify
6
+ *  it under the terms of the GNU General Public License version 2 as
7
+ *  published by the Free Software Foundation.
8
+ *
9
+ *  This program is distributed in the hope that it will be useful,
10
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
+ *  GNU General Public License for more details.
13
+ *
14
+ *  You should have received a copy of the GNU General Public License
15
+ *  along with this program; if not, write to the Free Software
16
+ *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17
+ *  MA 02110-1301, USA.
18
+ */
19
+
20
+#ifndef __ONAS_SCQUE_H
21
+#define __ONAS_SCQUE_H
22
+
23
+/* extremely simple event queue implmentation w/ obj number tracking in case we want to place limits later */
24
+struct onas_event_queue {
25
+    struct onas_event_queue_node *head;
26
+    struct onas_event_queue_node *tail;
27
+    uint64_t size;
28
+};
29
+
30
+struct onas_event_queue_node {
31
+    struct onas_event_queue_node *next;
32
+    struct onas_event_queue_node *prev;
33
+
34
+    struct onas_scan_event *data;
35
+};
36
+
37
+void *onas_scanque_th(void *arg);
38
+
39
+cl_error_t onas_queue_event(struct onas_scan_event *event_data);
40
+cl_error_t onas_scanque_start(struct onas_context **ctx);
41
+
42
+#endif
... ...
@@ -47,10 +47,10 @@
47 47
 static pthread_mutex_t onas_scan_lock = PTHREAD_MUTEX_INITIALIZER;
48 48
 
49 49
 //static int onas_scan(struct onas_context **ctx, const char *fname, STATBUF sb, int *infected, int *err, cl_error_t *ret_code);
50
-static int onas_scan_safe(struct onas_context **ctx, const char *fname, STATBUF sb, int *infected, int *err, cl_error_t *ret_code);
51
-static int onas_scth_scanfile(struct onas_context **ctx, const char *fname, STATBUF sb, struct onas_scan_event *event_data, int *infected, int *err, cl_error_t *ret_code);
52
-static int onas_scth_handle_dir(struct onas_context **ctx, const char *pathname, struct onas_scan_event *event_data);
53
-//static int onas_scth_handle_file(struct onas_context **ctx, const char *pathname, struct onas_scan_event *event_data);
50
+static cl_error_t onas_scan_safe(struct onas_scan_event *event_data, const char *fname, STATBUF sb, int *infected, int *err, cl_error_t *ret_code);
51
+static cl_error_t onas_scth_scanfile(struct onas_scan_event *event_data, const char *fname, STATBUF sb, int *infected, int *err, cl_error_t *ret_code);
52
+static cl_error_t onas_scth_handle_dir(struct onas_scan_event *event_data, const char *pathname);
53
+static cl_error_t onas_scth_handle_file(struct onas_scan_event *event_data, const char *pathname);
54 54
 
55 55
 static void onas_scth_exit(int sig);
56 56
 
... ...
@@ -65,12 +65,13 @@ static void onas_scth_exit(int sig)
65 65
  * Scan wrapper, used by both inotify and fanotify threads. Owned by scanthread to force multithreaded client archtiecture
66 66
  * which better avoids kernel level deadlocks from fanotify blocking/prevention
67 67
  */
68
-int onas_scan(struct onas_context **ctx, const char *fname, STATBUF sb, int *infected, int *err, cl_error_t *ret_code)
68
+int onas_scan(struct onas_scan_event *event_data, const char *fname, STATBUF sb, int *infected, int *err, cl_error_t *ret_code)
69 69
 {
70 70
     int ret             = 0;
71 71
     int i = 0;
72
+    uint8_t retry_on_error = event_data->bool_opts & ONAS_SCTH_B_RETRY_ON_E;
72 73
 
73
-    ret = onas_scan_safe(ctx, fname, sb, infected, err, ret_code);
74
+    ret = onas_scan_safe(event_data, fname, sb, infected, err, ret_code);
74 75
 
75 76
     if (*err) {
76 77
         switch (*ret_code) {
... ...
@@ -88,13 +89,13 @@ int onas_scan(struct onas_context **ctx, const char *fname, STATBUF sb, int *inf
88 88
             default:
89 89
                 logg("~ClamMisc: internal issue (client failed to scan)\n");
90 90
         }
91
-	    if ((*ctx)->retry_on_error) {
91
+	    if (retry_on_error) {
92 92
 		    logg("*ClamMisc: reattempting scan ... \n");
93 93
 		    while (err) {
94
-			    ret = onas_scan_safe(ctx, fname, sb, infected, err, ret_code);
94
+			    ret = onas_scan_safe(event_data, fname, sb, infected, err, ret_code);
95 95
 
96 96
 			    i++;
97
-			    if (*err && i == (*ctx)->retry_attempts) {
97
+			    if (*err && i == event_data->retry_attempts) {
98 98
 				    *err = 0;
99 99
 			    }
100 100
 		    }
... ...
@@ -107,185 +108,232 @@ int onas_scan(struct onas_context **ctx, const char *fname, STATBUF sb, int *inf
107 107
 /**
108 108
  * Thread-safe scan wrapper to ensure there's no processs contention over use of the socket.
109 109
  */
110
-static int onas_scan_safe(struct onas_context **ctx, const char *fname, STATBUF sb, int *infected, int *err, cl_error_t *ret_code)
111
-{
110
+static cl_error_t onas_scan_safe(struct onas_scan_event *event_data, const char *fname, STATBUF sb, int *infected, int *err, cl_error_t *ret_code) {
111
+
112 112
 	int ret = 0;
113 113
 
114 114
 	pthread_mutex_lock(&onas_scan_lock);
115 115
 
116
-	ret = onas_client_scan(ctx, fname, sb, infected, err, ret_code);
116
+	ret = onas_client_scan(event_data->tcpaddr, event_data->portnum, event_data->scantype, event_data->maxstream,
117
+                                    fname, event_data->timeout, sb, infected, err, ret_code);
117 118
 
118 119
 	pthread_mutex_unlock(&onas_scan_lock);
119 120
 
120 121
 	return ret;
121 122
 }
122 123
 
123
-int onas_scth_scanfile(struct onas_context **ctx, const char *fname, STATBUF sb, struct onas_scan_event *event_data, int *infected, int *err, cl_error_t *ret_code)
124
-{
124
+static cl_error_t onas_scth_scanfile(struct onas_scan_event *event_data, const char *fname, STATBUF sb, int *infected, int *err, cl_error_t *ret_code) {
125
+
125 126
 	struct fanotify_response res;
126 127
 	int ret = 0;
127
-	int i = 0;
128 128
 
129
-	if (event_data->b_fanotify) {
129
+	uint8_t b_scan;
130
+	uint8_t b_fanotify;
131
+	uint8_t b_deny_on_error;
132
+
133
+	if (NULL == event_data || NULL == fname || NULL == infected || NULL == err || NULL == ret_code) {
134
+		/* TODO: log */
135
+		return CL_ENULLARG;
136
+	}
137
+
138
+	b_scan = event_data->bool_opts & ONAS_SCTH_B_SCAN ? 1 : 0;
139
+        b_fanotify = event_data->bool_opts & ONAS_SCTH_B_FANOTIFY ? 1 : 0;
140
+        b_deny_on_error = event_data->bool_opts & ONAS_SCTH_B_DENY_ON_E ? 1 : 0;
141
+
142
+	if (b_fanotify) {
130 143
 		res.fd = event_data->fmd->fd;
131 144
 		res.response = FAN_ALLOW;
132 145
 	}
133 146
 
134
-	if (event_data->b_scan) {
135
-		ret = onas_scan(ctx, fname, sb, infected, err, ret_code);
147
+	if (b_scan) {
148
+		ret = onas_scan(event_data, fname, sb, infected, err, ret_code);
136 149
 
137 150
 		if (*err && *ret_code != CL_SUCCESS) {
138
-			logg("*Clamonacc: scan failed with error code %d\n", *ret_code);
151
+			logg("*ClamWorker: scan failed with error code %d\n", *ret_code);
139 152
 		}
140 153
 
141 154
 
142
-		if (event_data->b_fanotify) {
143
-			if ((*err && *ret_code && (*ctx)->deny_on_error) || *infected) {
155
+		if (b_fanotify) {
156
+			if ((*err && *ret_code && b_deny_on_error) || *infected) {
144 157
 				res.response = FAN_DENY;
145 158
 			}
146 159
 		}
147 160
 	}
148 161
 
149 162
 
150
-	if (event_data->b_fanotify) {
163
+	if (b_fanotify) {
151 164
 		if(event_data->fmd->mask & FAN_ALL_PERM_EVENTS) {
152
-			ret = write((*ctx)->fan_fd, &res, sizeof(res));
153
-			if(ret == -1)
154
-				logg("!Clamonacc: internal error (can't write to fanotify)\n");
165
+			ret = write(event_data->fan_fd, &res, sizeof(res));
166
+			if(ret == -1) {
167
+				logg("!ClamWorker: internal error (can't write to fanotify)\n");
168
+				ret = CL_EWRITE;
169
+			}
170
+		}
171
+	}
172
+
173
+        if (b_fanotify) {
174
+            if (-1 == close(event_data->fmd->fd) ) {
175
+                logg("!ClamWorker: internal error (can't close fanotify meta fd)\n");
176
+                ret = CL_EUNLINK;
155 177
 		}
156 178
 	}
157 179
 
158 180
 	return ret;
159 181
 }
160 182
 
161
-static int onas_scth_handle_dir(struct onas_context **ctx, const char *pathname, struct onas_scan_event *event_data) {
183
+static cl_error_t onas_scth_handle_dir(struct onas_scan_event *event_data, const char *pathname) {
162 184
     FTS *ftsp = NULL;
163 185
 	int32_t ftspopts = FTS_PHYSICAL | FTS_XDEV;
186
+	FTSENT *curr = NULL;
187
+
164 188
 	int32_t infected = 0;
165 189
 	int32_t err = 0;
166 190
         cl_error_t ret_code = CL_SUCCESS;
167
-	int32_t ret = 0;
191
+	cl_error_t ret = CL_SUCCESS;
192
+
168 193
 	int32_t fres = 0;
169
-    FTSENT *curr = NULL;
170 194
         STATBUF sb;
171 195
 
172 196
     char *const pathargv[] = {(char *)pathname, NULL};
173
-    if (!(ftsp = _priv_fts_open(pathargv, ftspopts, NULL))) return CL_EOPEN;
197
+
198
+	if (!(ftsp = _priv_fts_open(pathargv, ftspopts, NULL))) {
199
+		return CL_EOPEN;
200
+	}
174 201
 
175 202
     while ((curr = _priv_fts_read(ftsp))) {
176 203
         if (curr->fts_info != FTS_D) {
177 204
 
178 205
 			fres = CLAMSTAT(curr->fts_path, &sb);
179 206
 
180
-			if ((*ctx)->sizelimit) {
181
-				if (fres != 0 || sb.st_size > (*ctx)->sizelimit)  {
182
-					//okay to skip, directory from inotify events (probably) won't block w/ protection enabled
183
-                                        //log here later
207
+			if (event_data->sizelimit) {
208
+				if (fres != 0 || sb.st_size > event_data->sizelimit)  {
209
+					/* okay to skip w/o allow/deny since dir comes from inotify
210
+					 * events and (probably) won't block w/ protection enabled */
211
+                                        // TODO: log here later ??
184 212
 					continue;
185 213
 				}
186 214
 			}
187 215
 
188
-                        ret = onas_scth_scanfile(ctx, curr->fts_path, sb, event_data, &infected, &err, &ret_code);
189
-                        // probs need to error check here later, or at least log
216
+                        ret = onas_scth_scanfile(event_data, curr->fts_path, sb, &infected, &err, &ret_code);
217
+                        // TODO: probs need to error check here later, or at least log
190 218
         }
191 219
     }
192 220
 
193 221
     return ret;
194 222
 }
195 223
 
196
-int onas_scth_handle_file(struct onas_context **ctx, const char *pathname, struct onas_scan_event *event_data) {
224
+static cl_error_t onas_scth_handle_file(struct onas_scan_event *event_data, const char *pathname) {
197 225
 
198 226
 	STATBUF sb;
199 227
 	int32_t infected = 0;
200 228
 	int32_t err = 0;
201 229
 	cl_error_t ret_code = CL_SUCCESS;
202 230
 	int fres = 0;
203
-	int ret = 0;
231
+	cl_error_t ret = 0;
204 232
 
205
-	if (!pathname) return CL_ENULLARG;
233
+	if (NULL == pathname || NULL == event_data) {
234
+            return CL_ENULLARG;
235
+        }
206 236
 
207 237
 	fres = CLAMSTAT(pathname, &sb);
208
-	if ((*ctx)->sizelimit) {
209
-		if (fres != 0 || sb.st_size > (*ctx)->sizelimit)  {
210
-			/* don't skip so we avoid lockups, but don't scan either */
211
-			event_data->b_scan = 0;
238
+	if (event_data->sizelimit) {
239
+		if (fres != 0 || sb.st_size > event_data->sizelimit)  {
240
+			/* don't skip so we avoid lockups, but don't scan either;
241
+			 * while it should be obvious, this will unconditionally set
242
+			 * the bit in the map to 0 regardless of original orientation */
243
+                        event_data->bool_opts &= ((uint16_t) ~ONAS_SCTH_B_SCAN);
212 244
 		}
213 245
 	}
214 246
 
215
-	ret = onas_scth_scanfile(ctx, pathname, sb, event_data, &infected, &err, &ret_code);
247
+	ret = onas_scth_scanfile(event_data, pathname, sb, &infected, &err, &ret_code);
216 248
 	// probs need to error check here later, or at least log
217 249
 
218 250
     return ret;
219 251
 }
220 252
 
221
-void *onas_scan_th(void *arg) {
222
-
223
-    struct scth_thrarg *tharg = (struct scth_thrarg *)arg;
224
-	struct onas_scan_event *event_data = NULL;
225
-	struct onas_context **ctx = NULL;
226
-    sigset_t sigset;
227
-    struct sigaction act;
228
-
229
-    /* ignore all signals except SIGUSR1 */
230
-    sigfillset(&sigset);
231
-    sigdelset(&sigset, SIGUSR1);
232
-    /* The behavior of a process is undefined after it ignores a
233
-	 * SIGFPE, SIGILL, SIGSEGV, or SIGBUS signal */
234
-    sigdelset(&sigset, SIGFPE);
235
-    sigdelset(&sigset, SIGILL);
236
-	//sigdelset(&sigset, SIGSEGV);
237
-#ifdef SIGBUS
238
-    sigdelset(&sigset, SIGBUS);
239
-#endif
240
-    pthread_sigmask(SIG_SETMASK, &sigset, NULL);
241
-    memset(&act, 0, sizeof(struct sigaction));
242
-    act.sa_handler = onas_scth_exit;
243
-    sigfillset(&(act.sa_mask));
244
-    sigaction(SIGUSR1, &act, NULL);
245
-    sigaction(SIGSEGV, &act, NULL);
246
-
247
-	if (NULL == tharg || NULL == tharg->ctx || NULL == tharg->event_data || NULL == tharg->event_data->pathname || NULL == (*(tharg->ctx))->opts) {
248
-        logg("ScanOnAccess: Invalid thread arguments for extra scanning\n");
253
+void *onas_scan_worker(void *arg) {
254
+
255
+	struct onas_scan_event *event_data = (struct onas_scan_event *) arg;
256
+
257
+	uint8_t b_dir;
258
+	uint8_t b_file;
259
+	uint8_t b_inotify;
260
+	uint8_t b_fanotify;
261
+
262
+	if (NULL == event_data || NULL == event_data->pathname) {
263
+		logg("ClamWorker: invalid worker arguments for scanning thread\n");
249 264
         goto done;
250 265
     }
251 266
 
252
-        /* this event_data is ours and ours alone */
253
-	event_data = tharg->event_data;
267
+	/* load in boolean info from event struct; makes for easier reading--you're welcome */
268
+	b_dir = event_data->bool_opts & ONAS_SCTH_B_DIR ? 1 : 0;
269
+	b_file = event_data->bool_opts & ONAS_SCTH_B_FILE ? 1 : 0;
270
+	b_inotify = event_data->bool_opts & ONAS_SCTH_B_INOTIFY ? 1 : 0;
271
+	b_fanotify = event_data->bool_opts & ONAS_SCTH_B_FANOTIFY ? 1 : 0;
272
+
273
+
274
+	if (b_inotify) {
275
+		logg("*ClamWorker: handling inotify event ...\n");
254 276
 
255
-        /* we share this context globally--it's not ours to touch/edit */
256
-	ctx = tharg->ctx;
277
+		if (b_dir) {
278
+			logg("*ClamWorker: performing (extra) scanning on directory '%s'\n", event_data->pathname);
279
+			onas_scth_handle_dir(event_data, event_data->pathname);
280
+
281
+		} else if (b_file) {
282
+			logg("*ClamWorker: performing (extra) scanning on file '%s'\n", event_data->pathname);
283
+			onas_scth_handle_file(event_data, event_data->pathname);
257 284
 
258
-        if (event_data->b_inotify) {
259
-            if (event_data->extra_options & ONAS_SCTH_ISDIR) {
260
-                logg("*ScanOnAccess: Performing additional scanning on directory '%s'\n", event_data->pathname);
261
-                onas_scth_handle_dir(ctx, event_data->pathname, event_data);
262
-            } else if (event_data->extra_options & ONAS_SCTH_ISFILE) {
263
-                logg("*ScanOnAccess: Performing additional scanning on file '%s'\n", event_data->pathname);
264
-                onas_scth_handle_file(ctx, event_data->pathname, event_data);
265 285
             }
266
-        } else if (event_data->b_fanotify) {
267
-            logg("*ScanOnAccess: Performing scanning on file '%s'\n", event_data->pathname);
268
-            onas_scth_handle_file(ctx, event_data->pathname, event_data);
286
+
287
+	} else if (b_fanotify) {
288
+
289
+		logg("*ClamWorker: performing scanning on file '%s'\n", event_data->pathname);
290
+		onas_scth_handle_file(event_data, event_data->pathname);
269 291
     }
270
-        /* TODO: else something went wrong and we should error out here */
292
+	/* TODO: else something went wrong and we should probably error out here, maybe try to recover somehow */
271 293
 
272 294
 done:
273
-        /* our job to cleanup event data: worker queue just kicks us off, drops the event object
274
-         * from the queue and forgets about us. */
295
+	/* our job to cleanup event data: worker queue just kicks us off in a thread pool, drops the event object
296
+	 * from the queue and forgets about us */
275 297
 
276
-	if (NULL != tharg) {
277
-		if (NULL != tharg->event_data) {
278
-			if (NULL != tharg->event_data->pathname) {
279
-				free(tharg->event_data->pathname);
298
+	if (NULL != event_data) {
299
+		if (NULL != event_data->pathname) {
300
+			free(event_data->pathname);
280 301
 				event_data->pathname = NULL;
281 302
     }
282
-			free(tharg->event_data);
283
-			tharg->event_data = NULL;
284
-    }
285
-		/* don't free context, cleanup for context is handled at the highest layer */
286
-        free(tharg);
303
+		free(event_data);
304
+		event_data = NULL;
287 305
     }
288 306
 
289 307
     return NULL;
290 308
 }
309
+
310
+/* Simple utility function for external interfaces to add relevant context information to scan_event struct;
311
+ * doing this mapping cuts down significantly on memory overhead when queueing hundreds of these scan_event structs */
312
+cl_error_t onas_map_context_info_to_event_data(struct onas_context *ctx, struct onas_scan_event **event_data) {
313
+
314
+    if(NULL == ctx || NULL == event_data || NULL == *event_data) {
315
+        logg("*ClamScThread: context and scan event struct are null ...\n");
316
+        return CL_ENULLARG;
317
+    }
318
+
319
+    (*event_data)->scantype = ctx->scantype;
320
+    (*event_data)->timeout = ctx->timeout;
321
+    (*event_data)->maxstream = ctx->maxstream;
322
+    (*event_data)->tcpaddr = optget(ctx->clamdopts, "TCPAddr")->strarg;
323
+    (*event_data)->portnum = ctx->portnum;
324
+    (*event_data)->fan_fd = ctx->fan_fd;
325
+    (*event_data)->sizelimit = ctx->sizelimit;
326
+    (*event_data)->retry_attempts = ctx->retry_attempts;
327
+
328
+    if (ctx->retry_on_error) {
329
+        (*event_data)->bool_opts |= ONAS_SCTH_B_RETRY_ON_E;
330
+    }
331
+
332
+    if (ctx->deny_on_error) {
333
+        (*event_data)->bool_opts |= ONAS_SCTH_B_DENY_ON_E;
334
+    }
335
+
336
+    return CL_SUCCESS;
337
+}
291 338
 #endif
... ...
@@ -26,26 +26,33 @@
26 26
 #include "shared/optparser.h"
27 27
 #include "libclamav/clamav.h"
28 28
 
29
-#define ONAS_SCTH_ISDIR 0x01
30
-#define ONAS_SCTH_ISFILE 0x02
29
+#define ONAS_SCTH_B_DIR         0x01
30
+#define ONAS_SCTH_B_FILE        0x02
31
+#define ONAS_SCTH_B_INOTIFY     0x04
32
+#define ONAS_SCTH_B_FANOTIFY    0x08
33
+#define ONAS_SCTH_B_SCAN        0x10
34
+#define ONAS_SCTH_B_RETRY_ON_E  0x20
35
+#define ONAS_SCTH_B_DENY_ON_E   0x40
31 36
 
32 37
 struct onas_scan_event {
38
+        const char *tcpaddr;
39
+        int64_t portnum;
33 40
     char *pathname;
41
+        int fan_fd;
34 42
         struct fanotify_event_metadata *fmd;
35
-	int16_t b_inotify;
36
-	int16_t b_fanotify;
37
-        int16_t b_scan;
38
-	uint32_t extra_options;
39
-};
40
-
41
-struct scth_thrarg {
42
-	struct onas_scan_event *event_data;
43
-	struct onas_context **ctx;
43
+        uint8_t retry_attempts;
44
+        uint64_t sizelimit;
45
+        int32_t scantype;
46
+        int64_t maxstream;
47
+        int64_t timeout;
48
+	uint8_t bool_opts;
44 49
 };
45 50
 
46 51
 void *onas_scan_th(void *arg);
47 52
 
48
-int onas_scan(struct onas_context **ctx, const char *fname, STATBUF sb, int *infected, int *err, cl_error_t *ret_code);
49
-int onas_scth_handle_file(struct onas_context **ctx, const char *pathname, struct onas_scan_event *event_data);
53
+void *onas_scan_worker(void *arg);
54
+
55
+int onas_scan(struct onas_scan_event *event_data, const char *fname, STATBUF sb, int *infected, int *err, cl_error_t *ret_code);
56
+cl_error_t onas_map_context_info_to_event_data(struct onas_context *ctx, struct onas_scan_event **event_data);
50 57
 
51 58
 #endif