Browse code

Splunk Logging Driver performance improvements

New driver options:

- `splunk-gzip` - gzip compress all requests to Splunk HEC
(enabled by default)
- `splunk-gzip-level` - change compression level.

Messages are sent in batches by 1000, with frequency of 5 seconds.
Maximum buffer is 10,000 events. If HEC will not be available, Splunk
Logging Driver will keep retrying while it can hold messages in buffer.

Added unit tests for driver.

Signed-off-by: Denis Gladkikh <denis@gladkikh.email>

Denis Gladkikh authored on 2016/08/26 03:27:02
Showing 6 changed files
... ...
@@ -520,7 +520,7 @@ __docker_complete_log_options() {
520 520
 	local journald_options="env labels tag"
521 521
 	local json_file_options="env labels max-file max-size"
522 522
 	local syslog_options="env labels syslog-address syslog-facility syslog-format syslog-tls-ca-cert syslog-tls-cert syslog-tls-key syslog-tls-skip-verify tag"
523
-	local splunk_options="env labels splunk-caname splunk-capath splunk-format splunk-index splunk-insecureskipverify splunk-source splunk-sourcetype splunk-token splunk-url splunk-verify-connection tag"
523
+	local splunk_options="env labels splunk-caname splunk-capath splunk-format splunk-gzip splunk-gzip-level splunk-index splunk-insecureskipverify splunk-source splunk-sourcetype splunk-token splunk-url splunk-verify-connection tag"
524 524
 
525 525
 	local all_options="$fluentd_options $gcplogs_options $gelf_options $journald_options $json_file_options $syslog_options $splunk_options"
526 526
 
... ...
@@ -629,7 +629,7 @@ __docker_complete_log_driver_options() {
629 629
 			__ltrim_colon_completions "${cur}"
630 630
 			return
631 631
 			;;
632
-		splunk-insecureskipverify|splunk-verify-connection)
632
+		splunk-gzip|splunk-insecureskipverify|splunk-verify-connection)
633 633
 			COMPREPLY=( $( compgen -W "false true" -- "${cur##*=}" ) )
634 634
 			return
635 635
 			;;
... ...
@@ -228,7 +228,7 @@ __docker_get_log_options() {
228 228
     journald_options=("env" "labels" "tag")
229 229
     json_file_options=("env" "labels" "max-file" "max-size")
230 230
     syslog_options=("env" "labels" "syslog-address" "syslog-facility" "syslog-format" "syslog-tls-ca-cert" "syslog-tls-cert" "syslog-tls-key" "syslog-tls-skip-verify" "tag")
231
-    splunk_options=("env" "labels" "splunk-caname" "splunk-capath" "splunk-format" "splunk-index" "splunk-insecureskipverify" "splunk-source" "splunk-sourcetype" "splunk-token" "splunk-url" "splunk-verify-connection" "tag")
231
+    splunk_options=("env" "labels" "splunk-caname" "splunk-capath" "splunk-format" "splunk-gzip" "splunk-gzip-level" "splunk-index" "splunk-insecureskipverify" "splunk-source" "splunk-sourcetype" "splunk-token" "splunk-url" "splunk-verify-connection" "tag")
232 232
 
233 233
     [[ $log_driver = (awslogs|all) ]] && _describe -t awslogs-options "awslogs options" awslogs_options "$@" && ret=0
234 234
     [[ $log_driver = (fluentd|all) ]] && _describe -t fluentd-options "fluentd options" fluentd_options "$@" && ret=0
... ...
@@ -4,6 +4,7 @@ package splunk
4 4
 
5 5
 import (
6 6
 	"bytes"
7
+	"compress/gzip"
7 8
 	"crypto/tls"
8 9
 	"crypto/x509"
9 10
 	"encoding/json"
... ...
@@ -12,7 +13,9 @@ import (
12 12
 	"io/ioutil"
13 13
 	"net/http"
14 14
 	"net/url"
15
+	"os"
15 16
 	"strconv"
17
+	"sync"
16 18
 	"time"
17 19
 
18 20
 	"github.com/Sirupsen/logrus"
... ...
@@ -22,22 +25,47 @@ import (
22 22
 )
23 23
 
24 24
 const (
25
-	driverName                  = "splunk"
26
-	splunkURLKey                = "splunk-url"
27
-	splunkTokenKey              = "splunk-token"
28
-	splunkSourceKey             = "splunk-source"
29
-	splunkSourceTypeKey         = "splunk-sourcetype"
30
-	splunkIndexKey              = "splunk-index"
31
-	splunkCAPathKey             = "splunk-capath"
32
-	splunkCANameKey             = "splunk-caname"
33
-	splunkInsecureSkipVerifyKey = "splunk-insecureskipverify"
34
-	splunkFormatKey             = "splunk-format"
35
-	splunkVerifyConnectionKey   = "splunk-verify-connection"
36
-	envKey                      = "env"
37
-	labelsKey                   = "labels"
38
-	tagKey                      = "tag"
25
+	driverName                    = "splunk"
26
+	splunkURLKey                  = "splunk-url"
27
+	splunkTokenKey                = "splunk-token"
28
+	splunkSourceKey               = "splunk-source"
29
+	splunkSourceTypeKey           = "splunk-sourcetype"
30
+	splunkIndexKey                = "splunk-index"
31
+	splunkCAPathKey               = "splunk-capath"
32
+	splunkCANameKey               = "splunk-caname"
33
+	splunkInsecureSkipVerifyKey   = "splunk-insecureskipverify"
34
+	splunkFormatKey               = "splunk-format"
35
+	splunkVerifyConnectionKey     = "splunk-verify-connection"
36
+	splunkGzipCompressionKey      = "splunk-gzip"
37
+	splunkGzipCompressionLevelKey = "splunk-gzip-level"
38
+	envKey                        = "env"
39
+	labelsKey                     = "labels"
40
+	tagKey                        = "tag"
39 41
 )
40 42
 
43
+const (
44
+	// How often do we send messages (if we are not reaching batch size)
45
+	defaultPostMessagesFrequency = 5 * time.Second
46
+	// How big can be batch of messages
47
+	defaultPostMessagesBatchSize = 1000
48
+	// Maximum number of messages we can store in buffer
49
+	defaultBufferMaximum = 10 * defaultPostMessagesBatchSize
50
+	// Number of messages allowed to be queued in the channel
51
+	defaultStreamChannelSize = 4 * defaultPostMessagesBatchSize
52
+)
53
+
54
+const (
55
+	envVarPostMessagesFrequency = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_FREQUENCY"
56
+	envVarPostMessagesBatchSize = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_BATCH_SIZE"
57
+	envVarBufferMaximum         = "SPLUNK_LOGGING_DRIVER_BUFFER_MAX"
58
+	envVarStreamChannelSize     = "SPLUNK_LOGGING_DRIVER_CHANNEL_SIZE"
59
+)
60
+
61
+type splunkLoggerInterface interface {
62
+	logger.Logger
63
+	worker()
64
+}
65
+
41 66
 type splunkLogger struct {
42 67
 	client    *http.Client
43 68
 	transport *http.Transport
... ...
@@ -45,6 +73,23 @@ type splunkLogger struct {
45 45
 	url         string
46 46
 	auth        string
47 47
 	nullMessage *splunkMessage
48
+
49
+	// http compression
50
+	gzipCompression      bool
51
+	gzipCompressionLevel int
52
+
53
+	// Advanced options
54
+	postMessagesFrequency time.Duration
55
+	postMessagesBatchSize int
56
+	bufferMaximum         int
57
+
58
+	// For synchronization between background worker and logger.
59
+	// We use channel to send messages to worker go routine.
60
+	// All other variables for blocking Close call before we flush all messages to HEC
61
+	stream     chan *splunkMessage
62
+	lock       sync.RWMutex
63
+	closed     bool
64
+	closedCond *sync.Cond
48 65
 }
49 66
 
50 67
 type splunkLoggerInline struct {
... ...
@@ -140,6 +185,29 @@ func New(ctx logger.Context) (logger.Logger, error) {
140 140
 		tlsConfig.ServerName = caName
141 141
 	}
142 142
 
143
+	gzipCompression := false
144
+	if gzipCompressionStr, ok := ctx.Config[splunkGzipCompressionKey]; ok {
145
+		gzipCompression, err = strconv.ParseBool(gzipCompressionStr)
146
+		if err != nil {
147
+			return nil, err
148
+		}
149
+	}
150
+
151
+	gzipCompressionLevel := gzip.DefaultCompression
152
+	if gzipCompressionLevelStr, ok := ctx.Config[splunkGzipCompressionLevelKey]; ok {
153
+		var err error
154
+		gzipCompressionLevel64, err := strconv.ParseInt(gzipCompressionLevelStr, 10, 32)
155
+		if err != nil {
156
+			return nil, err
157
+		}
158
+		gzipCompressionLevel = int(gzipCompressionLevel64)
159
+		if gzipCompressionLevel < gzip.DefaultCompression || gzipCompressionLevel > gzip.BestCompression {
160
+			err := fmt.Errorf("Not supported level '%s' for %s (supported values between %d and %d).",
161
+				gzipCompressionLevelStr, splunkGzipCompressionLevelKey, gzip.DefaultCompression, gzip.BestCompression)
162
+			return nil, err
163
+		}
164
+	}
165
+
143 166
 	transport := &http.Transport{
144 167
 		TLSClientConfig: tlsConfig,
145 168
 	}
... ...
@@ -158,19 +226,36 @@ func New(ctx logger.Context) (logger.Logger, error) {
158 158
 		Index:      index,
159 159
 	}
160 160
 
161
-	tag, err := loggerutils.ParseLogTag(ctx, loggerutils.DefaultTemplate)
162
-	if err != nil {
163
-		return nil, err
161
+	// Allow user to remove tag from the messages by setting tag to empty string
162
+	tag := ""
163
+	if tagTemplate, ok := ctx.Config[tagKey]; !ok || tagTemplate != "" {
164
+		tag, err = loggerutils.ParseLogTag(ctx, loggerutils.DefaultTemplate)
165
+		if err != nil {
166
+			return nil, err
167
+		}
164 168
 	}
165 169
 
166 170
 	attrs := ctx.ExtraAttributes(nil)
167 171
 
172
+	var (
173
+		postMessagesFrequency = getAdvancedOptionDuration(envVarPostMessagesFrequency, defaultPostMessagesFrequency)
174
+		postMessagesBatchSize = getAdvancedOptionInt(envVarPostMessagesBatchSize, defaultPostMessagesBatchSize)
175
+		bufferMaximum         = getAdvancedOptionInt(envVarBufferMaximum, defaultBufferMaximum)
176
+		streamChannelSize     = getAdvancedOptionInt(envVarStreamChannelSize, defaultStreamChannelSize)
177
+	)
178
+
168 179
 	logger := &splunkLogger{
169
-		client:      client,
170
-		transport:   transport,
171
-		url:         splunkURL.String(),
172
-		auth:        "Splunk " + splunkToken,
173
-		nullMessage: nullMessage,
180
+		client:                client,
181
+		transport:             transport,
182
+		url:                   splunkURL.String(),
183
+		auth:                  "Splunk " + splunkToken,
184
+		nullMessage:           nullMessage,
185
+		gzipCompression:       gzipCompression,
186
+		gzipCompressionLevel:  gzipCompressionLevel,
187
+		stream:                make(chan *splunkMessage, streamChannelSize),
188
+		postMessagesFrequency: postMessagesFrequency,
189
+		postMessagesBatchSize: postMessagesBatchSize,
190
+		bufferMaximum:         bufferMaximum,
174 191
 	}
175 192
 
176 193
 	// By default we verify connection, but we allow use to skip that
... ...
@@ -203,6 +288,8 @@ func New(ctx logger.Context) (logger.Logger, error) {
203 203
 		splunkFormat = splunkFormatInline
204 204
 	}
205 205
 
206
+	var loggerWrapper splunkLoggerInterface
207
+
206 208
 	switch splunkFormat {
207 209
 	case splunkFormatInline:
208 210
 		nullEvent := &splunkMessageEvent{
... ...
@@ -210,18 +297,20 @@ func New(ctx logger.Context) (logger.Logger, error) {
210 210
 			Attrs: attrs,
211 211
 		}
212 212
 
213
-		return &splunkLoggerInline{logger, nullEvent}, nil
213
+		loggerWrapper = &splunkLoggerInline{logger, nullEvent}
214 214
 	case splunkFormatJSON:
215 215
 		nullEvent := &splunkMessageEvent{
216 216
 			Tag:   tag,
217 217
 			Attrs: attrs,
218 218
 		}
219 219
 
220
-		return &splunkLoggerJSON{&splunkLoggerInline{logger, nullEvent}}, nil
220
+		loggerWrapper = &splunkLoggerJSON{&splunkLoggerInline{logger, nullEvent}}
221 221
 	case splunkFormatRaw:
222 222
 		var prefix bytes.Buffer
223
-		prefix.WriteString(tag)
224
-		prefix.WriteString(" ")
223
+		if tag != "" {
224
+			prefix.WriteString(tag)
225
+			prefix.WriteString(" ")
226
+		}
225 227
 		for key, value := range attrs {
226 228
 			prefix.WriteString(key)
227 229
 			prefix.WriteString("=")
... ...
@@ -229,10 +318,14 @@ func New(ctx logger.Context) (logger.Logger, error) {
229 229
 			prefix.WriteString(" ")
230 230
 		}
231 231
 
232
-		return &splunkLoggerRaw{logger, prefix.Bytes()}, nil
232
+		loggerWrapper = &splunkLoggerRaw{logger, prefix.Bytes()}
233 233
 	default:
234 234
 		return nil, fmt.Errorf("Unexpected format %s", splunkFormat)
235 235
 	}
236
+
237
+	go loggerWrapper.worker()
238
+
239
+	return loggerWrapper, nil
236 240
 }
237 241
 
238 242
 func (l *splunkLoggerInline) Log(msg *logger.Message) error {
... ...
@@ -244,7 +337,7 @@ func (l *splunkLoggerInline) Log(msg *logger.Message) error {
244 244
 
245 245
 	message.Event = &event
246 246
 
247
-	return l.postMessage(message)
247
+	return l.queueMessageAsync(message)
248 248
 }
249 249
 
250 250
 func (l *splunkLoggerJSON) Log(msg *logger.Message) error {
... ...
@@ -262,7 +355,7 @@ func (l *splunkLoggerJSON) Log(msg *logger.Message) error {
262 262
 
263 263
 	message.Event = &event
264 264
 
265
-	return l.postMessage(message)
265
+	return l.queueMessageAsync(message)
266 266
 }
267 267
 
268 268
 func (l *splunkLoggerRaw) Log(msg *logger.Message) error {
... ...
@@ -270,19 +363,124 @@ func (l *splunkLoggerRaw) Log(msg *logger.Message) error {
270 270
 
271 271
 	message.Event = string(append(l.prefix, msg.Line...))
272 272
 
273
-	return l.postMessage(message)
273
+	return l.queueMessageAsync(message)
274 274
 }
275 275
 
276
-func (l *splunkLogger) postMessage(message *splunkMessage) error {
277
-	jsonEvent, err := json.Marshal(message)
278
-	if err != nil {
279
-		return err
276
+func (l *splunkLogger) queueMessageAsync(message *splunkMessage) error {
277
+	l.lock.RLock()
278
+	defer l.lock.RUnlock()
279
+	if l.closedCond != nil {
280
+		return fmt.Errorf("%s: driver is closed", driverName)
281
+	}
282
+	l.stream <- message
283
+	return nil
284
+}
285
+
286
+func (l *splunkLogger) worker() {
287
+	timer := time.NewTicker(l.postMessagesFrequency)
288
+	var messages []*splunkMessage
289
+	for {
290
+		select {
291
+		case message, open := <-l.stream:
292
+			if !open {
293
+				l.postMessages(messages, true)
294
+				l.lock.Lock()
295
+				defer l.lock.Unlock()
296
+				l.transport.CloseIdleConnections()
297
+				l.closed = true
298
+				l.closedCond.Signal()
299
+				return
300
+			}
301
+			messages = append(messages, message)
302
+			// Only sending when we get exactly to the batch size,
303
+			// This also helps not to fire postMessages on every new message,
304
+			// when previous try failed.
305
+			if len(messages)%l.postMessagesBatchSize == 0 {
306
+				messages = l.postMessages(messages, false)
307
+			}
308
+		case <-timer.C:
309
+			messages = l.postMessages(messages, false)
310
+		}
311
+	}
312
+}
313
+
314
+func (l *splunkLogger) postMessages(messages []*splunkMessage, lastChance bool) []*splunkMessage {
315
+	messagesLen := len(messages)
316
+	for i := 0; i < messagesLen; i += l.postMessagesBatchSize {
317
+		upperBound := i + l.postMessagesBatchSize
318
+		if upperBound > messagesLen {
319
+			upperBound = messagesLen
320
+		}
321
+		if err := l.tryPostMessages(messages[i:upperBound]); err != nil {
322
+			logrus.Error(err)
323
+			if messagesLen-i >= l.bufferMaximum || lastChance {
324
+				// If this is last chance - print them all to the daemon log
325
+				if lastChance {
326
+					upperBound = messagesLen
327
+				}
328
+				// Not all sent, but buffer has got to its maximum, let's log all messages
329
+				// we could not send and return buffer minus one batch size
330
+				for j := i; j < upperBound; j++ {
331
+					if jsonEvent, err := json.Marshal(messages[j]); err != nil {
332
+						logrus.Error(err)
333
+					} else {
334
+						logrus.Error(fmt.Errorf("Failed to send a message '%s'", string(jsonEvent)))
335
+					}
336
+				}
337
+				return messages[upperBound:messagesLen]
338
+			}
339
+			// Not all sent, returning buffer from where we have not sent messages
340
+			return messages[i:messagesLen]
341
+		}
342
+	}
343
+	// All sent, return empty buffer
344
+	return messages[:0]
345
+}
346
+
347
+func (l *splunkLogger) tryPostMessages(messages []*splunkMessage) error {
348
+	if len(messages) == 0 {
349
+		return nil
350
+	}
351
+	var buffer bytes.Buffer
352
+	var writer io.Writer
353
+	var gzipWriter *gzip.Writer
354
+	var err error
355
+	// If gzip compression is enabled - create gzip writer with specified compression
356
+	// level. If gzip compression is disabled, use standard buffer as a writer
357
+	if l.gzipCompression {
358
+		gzipWriter, err = gzip.NewWriterLevel(&buffer, l.gzipCompressionLevel)
359
+		if err != nil {
360
+			return err
361
+		}
362
+		writer = gzipWriter
363
+	} else {
364
+		writer = &buffer
365
+	}
366
+	for _, message := range messages {
367
+		jsonEvent, err := json.Marshal(message)
368
+		if err != nil {
369
+			return err
370
+		}
371
+		if _, err := writer.Write(jsonEvent); err != nil {
372
+			return err
373
+		}
374
+	}
375
+	// If gzip compression is enabled, tell it, that we are done
376
+	if l.gzipCompression {
377
+		err = gzipWriter.Close()
378
+		if err != nil {
379
+			return err
380
+		}
280 381
 	}
281
-	req, err := http.NewRequest("POST", l.url, bytes.NewBuffer(jsonEvent))
382
+	req, err := http.NewRequest("POST", l.url, bytes.NewBuffer(buffer.Bytes()))
282 383
 	if err != nil {
283 384
 		return err
284 385
 	}
285 386
 	req.Header.Set("Authorization", l.auth)
387
+	// Tell if we are sending gzip compressed body
388
+	if l.gzipCompression {
389
+		req.Header.Set("Content-Encoding", "gzip")
390
+	}
286 391
 	res, err := l.client.Do(req)
287 392
 	if err != nil {
288 393
 		return err
... ...
@@ -301,7 +499,15 @@ func (l *splunkLogger) postMessage(message *splunkMessage) error {
301 301
 }
302 302
 
303 303
 func (l *splunkLogger) Close() error {
304
-	l.transport.CloseIdleConnections()
304
+	l.lock.Lock()
305
+	defer l.lock.Unlock()
306
+	if l.closedCond == nil {
307
+		l.closedCond = sync.NewCond(&l.lock)
308
+		close(l.stream)
309
+		for !l.closed {
310
+			l.closedCond.Wait()
311
+		}
312
+	}
305 313
 	return nil
306 314
 }
307 315
 
... ...
@@ -329,6 +535,8 @@ func ValidateLogOpt(cfg map[string]string) error {
329 329
 		case splunkInsecureSkipVerifyKey:
330 330
 		case splunkFormatKey:
331 331
 		case splunkVerifyConnectionKey:
332
+		case splunkGzipCompressionKey:
333
+		case splunkGzipCompressionLevelKey:
332 334
 		case envKey:
333 335
 		case labelsKey:
334 336
 		case tagKey:
... ...
@@ -364,7 +572,7 @@ func parseURL(ctx logger.Context) (*url.URL, error) {
364 364
 }
365 365
 
366 366
 func verifySplunkConnection(l *splunkLogger) error {
367
-	req, err := http.NewRequest("OPTIONS", l.url, nil)
367
+	req, err := http.NewRequest(http.MethodOptions, l.url, nil)
368 368
 	if err != nil {
369 369
 		return err
370 370
 	}
... ...
@@ -385,3 +593,29 @@ func verifySplunkConnection(l *splunkLogger) error {
385 385
 	}
386 386
 	return nil
387 387
 }
388
+
389
+func getAdvancedOptionDuration(envName string, defaultValue time.Duration) time.Duration {
390
+	valueStr := os.Getenv(envName)
391
+	if valueStr == "" {
392
+		return defaultValue
393
+	}
394
+	parsedValue, err := time.ParseDuration(valueStr)
395
+	if err != nil {
396
+		logrus.Error(fmt.Sprintf("Failed to parse value of %s as duration. Using default %v. %v", envName, defaultValue, err))
397
+		return defaultValue
398
+	}
399
+	return parsedValue
400
+}
401
+
402
+func getAdvancedOptionInt(envName string, defaultValue int) int {
403
+	valueStr := os.Getenv(envName)
404
+	if valueStr == "" {
405
+		return defaultValue
406
+	}
407
+	parsedValue, err := strconv.ParseInt(valueStr, 10, 32)
408
+	if err != nil {
409
+		logrus.Error(fmt.Sprintf("Failed to parse value of %s as integer. Using default %d. %v", envName, defaultValue, err))
410
+		return defaultValue
411
+	}
412
+	return int(parsedValue)
413
+}
388 414
new file mode 100644
... ...
@@ -0,0 +1,1302 @@
0
+package splunk
1
+
2
+import (
3
+	"compress/gzip"
4
+	"fmt"
5
+	"os"
6
+	"testing"
7
+	"time"
8
+
9
+	"github.com/docker/docker/daemon/logger"
10
+)
11
+
12
+// Validate options
13
+func TestValidateLogOpt(t *testing.T) {
14
+	err := ValidateLogOpt(map[string]string{
15
+		splunkURLKey:                  "http://127.0.0.1",
16
+		splunkTokenKey:                "2160C7EF-2CE9-4307-A180-F852B99CF417",
17
+		splunkSourceKey:               "mysource",
18
+		splunkSourceTypeKey:           "mysourcetype",
19
+		splunkIndexKey:                "myindex",
20
+		splunkCAPathKey:               "/usr/cert.pem",
21
+		splunkCANameKey:               "ca_name",
22
+		splunkInsecureSkipVerifyKey:   "true",
23
+		splunkFormatKey:               "json",
24
+		splunkVerifyConnectionKey:     "true",
25
+		splunkGzipCompressionKey:      "true",
26
+		splunkGzipCompressionLevelKey: "1",
27
+		envKey:    "a",
28
+		labelsKey: "b",
29
+		tagKey:    "c",
30
+	})
31
+	if err != nil {
32
+		t.Fatal(err)
33
+	}
34
+
35
+	err = ValidateLogOpt(map[string]string{
36
+		"not-supported-option": "a",
37
+	})
38
+	if err == nil {
39
+		t.Fatal("Expecting error on unsupported options")
40
+	}
41
+}
42
+
43
+// Driver require user to specify required options
44
+func TestNewMissedConfig(t *testing.T) {
45
+	ctx := logger.Context{
46
+		Config: map[string]string{},
47
+	}
48
+	_, err := New(ctx)
49
+	if err == nil {
50
+		t.Fatal("Logger driver should fail when no required parameters specified")
51
+	}
52
+}
53
+
54
+// Driver require user to specify splunk-url
55
+func TestNewMissedUrl(t *testing.T) {
56
+	ctx := logger.Context{
57
+		Config: map[string]string{
58
+			splunkTokenKey: "4642492F-D8BD-47F1-A005-0C08AE4657DF",
59
+		},
60
+	}
61
+	_, err := New(ctx)
62
+	if err.Error() != "splunk: splunk-url is expected" {
63
+		t.Fatal("Logger driver should fail when no required parameters specified")
64
+	}
65
+}
66
+
67
+// Driver require user to specify splunk-token
68
+func TestNewMissedToken(t *testing.T) {
69
+	ctx := logger.Context{
70
+		Config: map[string]string{
71
+			splunkURLKey: "http://127.0.0.1:8088",
72
+		},
73
+	}
74
+	_, err := New(ctx)
75
+	if err.Error() != "splunk: splunk-token is expected" {
76
+		t.Fatal("Logger driver should fail when no required parameters specified")
77
+	}
78
+}
79
+
80
+// Test default settings
81
+func TestDefault(t *testing.T) {
82
+	hec := NewHTTPEventCollectorMock(t)
83
+
84
+	go hec.Serve()
85
+
86
+	ctx := logger.Context{
87
+		Config: map[string]string{
88
+			splunkURLKey:   hec.URL(),
89
+			splunkTokenKey: hec.token,
90
+		},
91
+		ContainerID:        "containeriid",
92
+		ContainerName:      "container_name",
93
+		ContainerImageID:   "contaimageid",
94
+		ContainerImageName: "container_image_name",
95
+	}
96
+
97
+	hostname, err := ctx.Hostname()
98
+	if err != nil {
99
+		t.Fatal(err)
100
+	}
101
+
102
+	loggerDriver, err := New(ctx)
103
+	if err != nil {
104
+		t.Fatal(err)
105
+	}
106
+
107
+	if loggerDriver.Name() != driverName {
108
+		t.Fatal("Unexpected logger driver name")
109
+	}
110
+
111
+	if !hec.connectionVerified {
112
+		t.Fatal("By default connection should be verified")
113
+	}
114
+
115
+	splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
116
+	if !ok {
117
+		t.Fatal("Unexpected Splunk Logging Driver type")
118
+	}
119
+
120
+	if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
121
+		splunkLoggerDriver.auth != "Splunk "+hec.token ||
122
+		splunkLoggerDriver.nullMessage.Host != hostname ||
123
+		splunkLoggerDriver.nullMessage.Source != "" ||
124
+		splunkLoggerDriver.nullMessage.SourceType != "" ||
125
+		splunkLoggerDriver.nullMessage.Index != "" ||
126
+		splunkLoggerDriver.gzipCompression != false ||
127
+		splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
128
+		splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
129
+		splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
130
+		cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
131
+		t.Fatal("Found not default values setup in Splunk Logging Driver.")
132
+	}
133
+
134
+	message1Time := time.Now()
135
+	if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
136
+		t.Fatal(err)
137
+	}
138
+	message2Time := time.Now()
139
+	if err := loggerDriver.Log(&logger.Message{[]byte("notajson"), "stdout", message2Time, nil, false}); err != nil {
140
+		t.Fatal(err)
141
+	}
142
+
143
+	err = loggerDriver.Close()
144
+	if err != nil {
145
+		t.Fatal(err)
146
+	}
147
+
148
+	if len(hec.messages) != 2 {
149
+		t.Fatal("Expected two messages")
150
+	}
151
+
152
+	if *hec.gzipEnabled {
153
+		t.Fatal("Gzip should not be used")
154
+	}
155
+
156
+	message1 := hec.messages[0]
157
+	if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
158
+		message1.Host != hostname ||
159
+		message1.Source != "" ||
160
+		message1.SourceType != "" ||
161
+		message1.Index != "" {
162
+		t.Fatalf("Unexpected values of message 1 %v", message1)
163
+	}
164
+
165
+	if event, err := message1.EventAsMap(); err != nil {
166
+		t.Fatal(err)
167
+	} else {
168
+		if event["line"] != "{\"a\":\"b\"}" ||
169
+			event["source"] != "stdout" ||
170
+			event["tag"] != "containeriid" ||
171
+			len(event) != 3 {
172
+			t.Fatalf("Unexpected event in message %v", event)
173
+		}
174
+	}
175
+
176
+	message2 := hec.messages[1]
177
+	if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
178
+		message2.Host != hostname ||
179
+		message2.Source != "" ||
180
+		message2.SourceType != "" ||
181
+		message2.Index != "" {
182
+		t.Fatalf("Unexpected values of message 1 %v", message2)
183
+	}
184
+
185
+	if event, err := message2.EventAsMap(); err != nil {
186
+		t.Fatal(err)
187
+	} else {
188
+		if event["line"] != "notajson" ||
189
+			event["source"] != "stdout" ||
190
+			event["tag"] != "containeriid" ||
191
+			len(event) != 3 {
192
+			t.Fatalf("Unexpected event in message %v", event)
193
+		}
194
+	}
195
+
196
+	err = hec.Close()
197
+	if err != nil {
198
+		t.Fatal(err)
199
+	}
200
+}
201
+
202
+// Verify inline format with a not default settings for most of options
203
+func TestInlineFormatWithNonDefaultOptions(t *testing.T) {
204
+	hec := NewHTTPEventCollectorMock(t)
205
+
206
+	go hec.Serve()
207
+
208
+	ctx := logger.Context{
209
+		Config: map[string]string{
210
+			splunkURLKey:             hec.URL(),
211
+			splunkTokenKey:           hec.token,
212
+			splunkSourceKey:          "mysource",
213
+			splunkSourceTypeKey:      "mysourcetype",
214
+			splunkIndexKey:           "myindex",
215
+			splunkFormatKey:          splunkFormatInline,
216
+			splunkGzipCompressionKey: "true",
217
+			tagKey:    "{{.ImageName}}/{{.Name}}",
218
+			labelsKey: "a",
219
+		},
220
+		ContainerID:        "containeriid",
221
+		ContainerName:      "/container_name",
222
+		ContainerImageID:   "contaimageid",
223
+		ContainerImageName: "container_image_name",
224
+		ContainerLabels: map[string]string{
225
+			"a": "b",
226
+		},
227
+	}
228
+
229
+	hostname, err := ctx.Hostname()
230
+	if err != nil {
231
+		t.Fatal(err)
232
+	}
233
+
234
+	loggerDriver, err := New(ctx)
235
+	if err != nil {
236
+		t.Fatal(err)
237
+	}
238
+
239
+	if !hec.connectionVerified {
240
+		t.Fatal("By default connection should be verified")
241
+	}
242
+
243
+	splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
244
+	if !ok {
245
+		t.Fatal("Unexpected Splunk Logging Driver type")
246
+	}
247
+
248
+	if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
249
+		splunkLoggerDriver.auth != "Splunk "+hec.token ||
250
+		splunkLoggerDriver.nullMessage.Host != hostname ||
251
+		splunkLoggerDriver.nullMessage.Source != "mysource" ||
252
+		splunkLoggerDriver.nullMessage.SourceType != "mysourcetype" ||
253
+		splunkLoggerDriver.nullMessage.Index != "myindex" ||
254
+		splunkLoggerDriver.gzipCompression != true ||
255
+		splunkLoggerDriver.gzipCompressionLevel != gzip.DefaultCompression ||
256
+		splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
257
+		splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
258
+		splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
259
+		cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
260
+		t.Fatal("Values do not match configuration.")
261
+	}
262
+
263
+	messageTime := time.Now()
264
+	if err := loggerDriver.Log(&logger.Message{[]byte("1"), "stdout", messageTime, nil, false}); err != nil {
265
+		t.Fatal(err)
266
+	}
267
+
268
+	err = loggerDriver.Close()
269
+	if err != nil {
270
+		t.Fatal(err)
271
+	}
272
+
273
+	if len(hec.messages) != 1 {
274
+		t.Fatal("Expected one message")
275
+	}
276
+
277
+	if !*hec.gzipEnabled {
278
+		t.Fatal("Gzip should be used")
279
+	}
280
+
281
+	message := hec.messages[0]
282
+	if message.Time != fmt.Sprintf("%f", float64(messageTime.UnixNano())/float64(time.Second)) ||
283
+		message.Host != hostname ||
284
+		message.Source != "mysource" ||
285
+		message.SourceType != "mysourcetype" ||
286
+		message.Index != "myindex" {
287
+		t.Fatalf("Unexpected values of message %v", message)
288
+	}
289
+
290
+	if event, err := message.EventAsMap(); err != nil {
291
+		t.Fatal(err)
292
+	} else {
293
+		if event["line"] != "1" ||
294
+			event["source"] != "stdout" ||
295
+			event["tag"] != "container_image_name/container_name" ||
296
+			event["attrs"].(map[string]interface{})["a"] != "b" ||
297
+			len(event) != 4 {
298
+			t.Fatalf("Unexpected event in message %v", event)
299
+		}
300
+	}
301
+
302
+	err = hec.Close()
303
+	if err != nil {
304
+		t.Fatal(err)
305
+	}
306
+}
307
+
308
+// Verify JSON format
309
+func TestJsonFormat(t *testing.T) {
310
+	hec := NewHTTPEventCollectorMock(t)
311
+
312
+	go hec.Serve()
313
+
314
+	ctx := logger.Context{
315
+		Config: map[string]string{
316
+			splunkURLKey:                  hec.URL(),
317
+			splunkTokenKey:                hec.token,
318
+			splunkFormatKey:               splunkFormatJSON,
319
+			splunkGzipCompressionKey:      "true",
320
+			splunkGzipCompressionLevelKey: "1",
321
+		},
322
+		ContainerID:        "containeriid",
323
+		ContainerName:      "/container_name",
324
+		ContainerImageID:   "contaimageid",
325
+		ContainerImageName: "container_image_name",
326
+	}
327
+
328
+	hostname, err := ctx.Hostname()
329
+	if err != nil {
330
+		t.Fatal(err)
331
+	}
332
+
333
+	loggerDriver, err := New(ctx)
334
+	if err != nil {
335
+		t.Fatal(err)
336
+	}
337
+
338
+	if !hec.connectionVerified {
339
+		t.Fatal("By default connection should be verified")
340
+	}
341
+
342
+	splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerJSON)
343
+	if !ok {
344
+		t.Fatal("Unexpected Splunk Logging Driver type")
345
+	}
346
+
347
+	if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
348
+		splunkLoggerDriver.auth != "Splunk "+hec.token ||
349
+		splunkLoggerDriver.nullMessage.Host != hostname ||
350
+		splunkLoggerDriver.nullMessage.Source != "" ||
351
+		splunkLoggerDriver.nullMessage.SourceType != "" ||
352
+		splunkLoggerDriver.nullMessage.Index != "" ||
353
+		splunkLoggerDriver.gzipCompression != true ||
354
+		splunkLoggerDriver.gzipCompressionLevel != gzip.BestSpeed ||
355
+		splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
356
+		splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
357
+		splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
358
+		cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
359
+		t.Fatal("Values do not match configuration.")
360
+	}
361
+
362
+	message1Time := time.Now()
363
+	if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
364
+		t.Fatal(err)
365
+	}
366
+	message2Time := time.Now()
367
+	if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil {
368
+		t.Fatal(err)
369
+	}
370
+
371
+	err = loggerDriver.Close()
372
+	if err != nil {
373
+		t.Fatal(err)
374
+	}
375
+
376
+	if len(hec.messages) != 2 {
377
+		t.Fatal("Expected two messages")
378
+	}
379
+
380
+	message1 := hec.messages[0]
381
+	if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
382
+		message1.Host != hostname ||
383
+		message1.Source != "" ||
384
+		message1.SourceType != "" ||
385
+		message1.Index != "" {
386
+		t.Fatalf("Unexpected values of message 1 %v", message1)
387
+	}
388
+
389
+	if event, err := message1.EventAsMap(); err != nil {
390
+		t.Fatal(err)
391
+	} else {
392
+		if event["line"].(map[string]interface{})["a"] != "b" ||
393
+			event["source"] != "stdout" ||
394
+			event["tag"] != "containeriid" ||
395
+			len(event) != 3 {
396
+			t.Fatalf("Unexpected event in message 1 %v", event)
397
+		}
398
+	}
399
+
400
+	message2 := hec.messages[1]
401
+	if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
402
+		message2.Host != hostname ||
403
+		message2.Source != "" ||
404
+		message2.SourceType != "" ||
405
+		message2.Index != "" {
406
+		t.Fatalf("Unexpected values of message 2 %v", message2)
407
+	}
408
+
409
+	// If message cannot be parsed as JSON - it should be sent as a line
410
+	if event, err := message2.EventAsMap(); err != nil {
411
+		t.Fatal(err)
412
+	} else {
413
+		if event["line"] != "notjson" ||
414
+			event["source"] != "stdout" ||
415
+			event["tag"] != "containeriid" ||
416
+			len(event) != 3 {
417
+			t.Fatalf("Unexpected event in message 2 %v", event)
418
+		}
419
+	}
420
+
421
+	err = hec.Close()
422
+	if err != nil {
423
+		t.Fatal(err)
424
+	}
425
+}
426
+
427
+// Verify raw format
428
+func TestRawFormat(t *testing.T) {
429
+	hec := NewHTTPEventCollectorMock(t)
430
+
431
+	go hec.Serve()
432
+
433
+	ctx := logger.Context{
434
+		Config: map[string]string{
435
+			splunkURLKey:    hec.URL(),
436
+			splunkTokenKey:  hec.token,
437
+			splunkFormatKey: splunkFormatRaw,
438
+		},
439
+		ContainerID:        "containeriid",
440
+		ContainerName:      "/container_name",
441
+		ContainerImageID:   "contaimageid",
442
+		ContainerImageName: "container_image_name",
443
+	}
444
+
445
+	hostname, err := ctx.Hostname()
446
+	if err != nil {
447
+		t.Fatal(err)
448
+	}
449
+
450
+	loggerDriver, err := New(ctx)
451
+	if err != nil {
452
+		t.Fatal(err)
453
+	}
454
+
455
+	if !hec.connectionVerified {
456
+		t.Fatal("By default connection should be verified")
457
+	}
458
+
459
+	splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
460
+	if !ok {
461
+		t.Fatal("Unexpected Splunk Logging Driver type")
462
+	}
463
+
464
+	if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
465
+		splunkLoggerDriver.auth != "Splunk "+hec.token ||
466
+		splunkLoggerDriver.nullMessage.Host != hostname ||
467
+		splunkLoggerDriver.nullMessage.Source != "" ||
468
+		splunkLoggerDriver.nullMessage.SourceType != "" ||
469
+		splunkLoggerDriver.nullMessage.Index != "" ||
470
+		splunkLoggerDriver.gzipCompression != false ||
471
+		splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
472
+		splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
473
+		splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
474
+		cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
475
+		string(splunkLoggerDriver.prefix) != "containeriid " {
476
+		t.Fatal("Values do not match configuration.")
477
+	}
478
+
479
+	message1Time := time.Now()
480
+	if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
481
+		t.Fatal(err)
482
+	}
483
+	message2Time := time.Now()
484
+	if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil {
485
+		t.Fatal(err)
486
+	}
487
+
488
+	err = loggerDriver.Close()
489
+	if err != nil {
490
+		t.Fatal(err)
491
+	}
492
+
493
+	if len(hec.messages) != 2 {
494
+		t.Fatal("Expected two messages")
495
+	}
496
+
497
+	message1 := hec.messages[0]
498
+	if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
499
+		message1.Host != hostname ||
500
+		message1.Source != "" ||
501
+		message1.SourceType != "" ||
502
+		message1.Index != "" {
503
+		t.Fatalf("Unexpected values of message 1 %v", message1)
504
+	}
505
+
506
+	if event, err := message1.EventAsString(); err != nil {
507
+		t.Fatal(err)
508
+	} else {
509
+		if event != "containeriid {\"a\":\"b\"}" {
510
+			t.Fatalf("Unexpected event in message 1 %v", event)
511
+		}
512
+	}
513
+
514
+	message2 := hec.messages[1]
515
+	if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
516
+		message2.Host != hostname ||
517
+		message2.Source != "" ||
518
+		message2.SourceType != "" ||
519
+		message2.Index != "" {
520
+		t.Fatalf("Unexpected values of message 2 %v", message2)
521
+	}
522
+
523
+	if event, err := message2.EventAsString(); err != nil {
524
+		t.Fatal(err)
525
+	} else {
526
+		if event != "containeriid notjson" {
527
+			t.Fatalf("Unexpected event in message 1 %v", event)
528
+		}
529
+	}
530
+
531
+	err = hec.Close()
532
+	if err != nil {
533
+		t.Fatal(err)
534
+	}
535
+}
536
+
537
+// Verify raw format with labels
538
+func TestRawFormatWithLabels(t *testing.T) {
539
+	hec := NewHTTPEventCollectorMock(t)
540
+
541
+	go hec.Serve()
542
+
543
+	ctx := logger.Context{
544
+		Config: map[string]string{
545
+			splunkURLKey:    hec.URL(),
546
+			splunkTokenKey:  hec.token,
547
+			splunkFormatKey: splunkFormatRaw,
548
+			labelsKey:       "a",
549
+		},
550
+		ContainerID:        "containeriid",
551
+		ContainerName:      "/container_name",
552
+		ContainerImageID:   "contaimageid",
553
+		ContainerImageName: "container_image_name",
554
+		ContainerLabels: map[string]string{
555
+			"a": "b",
556
+		},
557
+	}
558
+
559
+	hostname, err := ctx.Hostname()
560
+	if err != nil {
561
+		t.Fatal(err)
562
+	}
563
+
564
+	loggerDriver, err := New(ctx)
565
+	if err != nil {
566
+		t.Fatal(err)
567
+	}
568
+
569
+	if !hec.connectionVerified {
570
+		t.Fatal("By default connection should be verified")
571
+	}
572
+
573
+	splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
574
+	if !ok {
575
+		t.Fatal("Unexpected Splunk Logging Driver type")
576
+	}
577
+
578
+	if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
579
+		splunkLoggerDriver.auth != "Splunk "+hec.token ||
580
+		splunkLoggerDriver.nullMessage.Host != hostname ||
581
+		splunkLoggerDriver.nullMessage.Source != "" ||
582
+		splunkLoggerDriver.nullMessage.SourceType != "" ||
583
+		splunkLoggerDriver.nullMessage.Index != "" ||
584
+		splunkLoggerDriver.gzipCompression != false ||
585
+		splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
586
+		splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
587
+		splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
588
+		cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
589
+		string(splunkLoggerDriver.prefix) != "containeriid a=b " {
590
+		t.Fatal("Values do not match configuration.")
591
+	}
592
+
593
+	message1Time := time.Now()
594
+	if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
595
+		t.Fatal(err)
596
+	}
597
+	message2Time := time.Now()
598
+	if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil {
599
+		t.Fatal(err)
600
+	}
601
+
602
+	err = loggerDriver.Close()
603
+	if err != nil {
604
+		t.Fatal(err)
605
+	}
606
+
607
+	if len(hec.messages) != 2 {
608
+		t.Fatal("Expected two messages")
609
+	}
610
+
611
+	message1 := hec.messages[0]
612
+	if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
613
+		message1.Host != hostname ||
614
+		message1.Source != "" ||
615
+		message1.SourceType != "" ||
616
+		message1.Index != "" {
617
+		t.Fatalf("Unexpected values of message 1 %v", message1)
618
+	}
619
+
620
+	if event, err := message1.EventAsString(); err != nil {
621
+		t.Fatal(err)
622
+	} else {
623
+		if event != "containeriid a=b {\"a\":\"b\"}" {
624
+			t.Fatalf("Unexpected event in message 1 %v", event)
625
+		}
626
+	}
627
+
628
+	message2 := hec.messages[1]
629
+	if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
630
+		message2.Host != hostname ||
631
+		message2.Source != "" ||
632
+		message2.SourceType != "" ||
633
+		message2.Index != "" {
634
+		t.Fatalf("Unexpected values of message 2 %v", message2)
635
+	}
636
+
637
+	if event, err := message2.EventAsString(); err != nil {
638
+		t.Fatal(err)
639
+	} else {
640
+		if event != "containeriid a=b notjson" {
641
+			t.Fatalf("Unexpected event in message 1 %v", event)
642
+		}
643
+	}
644
+
645
+	err = hec.Close()
646
+	if err != nil {
647
+		t.Fatal(err)
648
+	}
649
+}
650
+
651
+// Verify that Splunk Logging Driver can accept tag="" which will allow to send raw messages
652
+// in the same way we get them in stdout/stderr
653
+func TestRawFormatWithoutTag(t *testing.T) {
654
+	hec := NewHTTPEventCollectorMock(t)
655
+
656
+	go hec.Serve()
657
+
658
+	ctx := logger.Context{
659
+		Config: map[string]string{
660
+			splunkURLKey:    hec.URL(),
661
+			splunkTokenKey:  hec.token,
662
+			splunkFormatKey: splunkFormatRaw,
663
+			tagKey:          "",
664
+		},
665
+		ContainerID:        "containeriid",
666
+		ContainerName:      "/container_name",
667
+		ContainerImageID:   "contaimageid",
668
+		ContainerImageName: "container_image_name",
669
+	}
670
+
671
+	hostname, err := ctx.Hostname()
672
+	if err != nil {
673
+		t.Fatal(err)
674
+	}
675
+
676
+	loggerDriver, err := New(ctx)
677
+	if err != nil {
678
+		t.Fatal(err)
679
+	}
680
+
681
+	if !hec.connectionVerified {
682
+		t.Fatal("By default connection should be verified")
683
+	}
684
+
685
+	splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
686
+	if !ok {
687
+		t.Fatal("Unexpected Splunk Logging Driver type")
688
+	}
689
+
690
+	if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
691
+		splunkLoggerDriver.auth != "Splunk "+hec.token ||
692
+		splunkLoggerDriver.nullMessage.Host != hostname ||
693
+		splunkLoggerDriver.nullMessage.Source != "" ||
694
+		splunkLoggerDriver.nullMessage.SourceType != "" ||
695
+		splunkLoggerDriver.nullMessage.Index != "" ||
696
+		splunkLoggerDriver.gzipCompression != false ||
697
+		splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
698
+		splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
699
+		splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
700
+		cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
701
+		string(splunkLoggerDriver.prefix) != "" {
702
+		t.Log(string(splunkLoggerDriver.prefix) + "a")
703
+		t.Fatal("Values do not match configuration.")
704
+	}
705
+
706
+	message1Time := time.Now()
707
+	if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
708
+		t.Fatal(err)
709
+	}
710
+	message2Time := time.Now()
711
+	if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil {
712
+		t.Fatal(err)
713
+	}
714
+
715
+	err = loggerDriver.Close()
716
+	if err != nil {
717
+		t.Fatal(err)
718
+	}
719
+
720
+	if len(hec.messages) != 2 {
721
+		t.Fatal("Expected two messages")
722
+	}
723
+
724
+	message1 := hec.messages[0]
725
+	if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
726
+		message1.Host != hostname ||
727
+		message1.Source != "" ||
728
+		message1.SourceType != "" ||
729
+		message1.Index != "" {
730
+		t.Fatalf("Unexpected values of message 1 %v", message1)
731
+	}
732
+
733
+	if event, err := message1.EventAsString(); err != nil {
734
+		t.Fatal(err)
735
+	} else {
736
+		if event != "{\"a\":\"b\"}" {
737
+			t.Fatalf("Unexpected event in message 1 %v", event)
738
+		}
739
+	}
740
+
741
+	message2 := hec.messages[1]
742
+	if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
743
+		message2.Host != hostname ||
744
+		message2.Source != "" ||
745
+		message2.SourceType != "" ||
746
+		message2.Index != "" {
747
+		t.Fatalf("Unexpected values of message 2 %v", message2)
748
+	}
749
+
750
+	if event, err := message2.EventAsString(); err != nil {
751
+		t.Fatal(err)
752
+	} else {
753
+		if event != "notjson" {
754
+			t.Fatalf("Unexpected event in message 1 %v", event)
755
+		}
756
+	}
757
+
758
+	err = hec.Close()
759
+	if err != nil {
760
+		t.Fatal(err)
761
+	}
762
+}
763
+
764
+// Verify that we will send messages in batches with default batching parameters,
765
+// but change frequency to be sure that numOfRequests will match expected 17 requests
766
+func TestBatching(t *testing.T) {
767
+	if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
768
+		t.Fatal(err)
769
+	}
770
+
771
+	hec := NewHTTPEventCollectorMock(t)
772
+
773
+	go hec.Serve()
774
+
775
+	ctx := logger.Context{
776
+		Config: map[string]string{
777
+			splunkURLKey:   hec.URL(),
778
+			splunkTokenKey: hec.token,
779
+		},
780
+		ContainerID:        "containeriid",
781
+		ContainerName:      "/container_name",
782
+		ContainerImageID:   "contaimageid",
783
+		ContainerImageName: "container_image_name",
784
+	}
785
+
786
+	loggerDriver, err := New(ctx)
787
+	if err != nil {
788
+		t.Fatal(err)
789
+	}
790
+
791
+	for i := 0; i < defaultStreamChannelSize*4; i++ {
792
+		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
793
+			t.Fatal(err)
794
+		}
795
+	}
796
+
797
+	err = loggerDriver.Close()
798
+	if err != nil {
799
+		t.Fatal(err)
800
+	}
801
+
802
+	if len(hec.messages) != defaultStreamChannelSize*4 {
803
+		t.Fatal("Not all messages delivered")
804
+	}
805
+
806
+	for i, message := range hec.messages {
807
+		if event, err := message.EventAsMap(); err != nil {
808
+			t.Fatal(err)
809
+		} else {
810
+			if event["line"] != fmt.Sprintf("%d", i) {
811
+				t.Fatalf("Unexpected event in message %v", event)
812
+			}
813
+		}
814
+	}
815
+
816
+	// 1 to verify connection and 16 batches
817
+	if hec.numOfRequests != 17 {
818
+		t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
819
+	}
820
+
821
+	err = hec.Close()
822
+	if err != nil {
823
+		t.Fatal(err)
824
+	}
825
+
826
+	if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
827
+		t.Fatal(err)
828
+	}
829
+}
830
+
831
+// Verify that test is using time to fire events not rare than specified frequency
832
+func TestFrequency(t *testing.T) {
833
+	if err := os.Setenv(envVarPostMessagesFrequency, "5ms"); err != nil {
834
+		t.Fatal(err)
835
+	}
836
+
837
+	hec := NewHTTPEventCollectorMock(t)
838
+
839
+	go hec.Serve()
840
+
841
+	ctx := logger.Context{
842
+		Config: map[string]string{
843
+			splunkURLKey:   hec.URL(),
844
+			splunkTokenKey: hec.token,
845
+		},
846
+		ContainerID:        "containeriid",
847
+		ContainerName:      "/container_name",
848
+		ContainerImageID:   "contaimageid",
849
+		ContainerImageName: "container_image_name",
850
+	}
851
+
852
+	loggerDriver, err := New(ctx)
853
+	if err != nil {
854
+		t.Fatal(err)
855
+	}
856
+
857
+	for i := 0; i < 10; i++ {
858
+		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
859
+			t.Fatal(err)
860
+		}
861
+		time.Sleep(15 * time.Millisecond)
862
+	}
863
+
864
+	err = loggerDriver.Close()
865
+	if err != nil {
866
+		t.Fatal(err)
867
+	}
868
+
869
+	if len(hec.messages) != 10 {
870
+		t.Fatal("Not all messages delivered")
871
+	}
872
+
873
+	for i, message := range hec.messages {
874
+		if event, err := message.EventAsMap(); err != nil {
875
+			t.Fatal(err)
876
+		} else {
877
+			if event["line"] != fmt.Sprintf("%d", i) {
878
+				t.Fatalf("Unexpected event in message %v", event)
879
+			}
880
+		}
881
+	}
882
+
883
+	// 1 to verify connection and 10 to verify that we have sent messages with required frequency,
884
+	// but because frequency is too small (to keep test quick), instead of 11, use 9 if context switches will be slow
885
+	if hec.numOfRequests < 9 {
886
+		t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
887
+	}
888
+
889
+	err = hec.Close()
890
+	if err != nil {
891
+		t.Fatal(err)
892
+	}
893
+
894
+	if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
895
+		t.Fatal(err)
896
+	}
897
+}
898
+
899
+// Simulate behavior similar to first version of Splunk Logging Driver, when we were sending one message
900
+// per request
901
+func TestOneMessagePerRequest(t *testing.T) {
902
+	if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
903
+		t.Fatal(err)
904
+	}
905
+
906
+	if err := os.Setenv(envVarPostMessagesBatchSize, "1"); err != nil {
907
+		t.Fatal(err)
908
+	}
909
+
910
+	if err := os.Setenv(envVarBufferMaximum, "1"); err != nil {
911
+		t.Fatal(err)
912
+	}
913
+
914
+	if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
915
+		t.Fatal(err)
916
+	}
917
+
918
+	hec := NewHTTPEventCollectorMock(t)
919
+
920
+	go hec.Serve()
921
+
922
+	ctx := logger.Context{
923
+		Config: map[string]string{
924
+			splunkURLKey:   hec.URL(),
925
+			splunkTokenKey: hec.token,
926
+		},
927
+		ContainerID:        "containeriid",
928
+		ContainerName:      "/container_name",
929
+		ContainerImageID:   "contaimageid",
930
+		ContainerImageName: "container_image_name",
931
+	}
932
+
933
+	loggerDriver, err := New(ctx)
934
+	if err != nil {
935
+		t.Fatal(err)
936
+	}
937
+
938
+	for i := 0; i < 10; i++ {
939
+		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
940
+			t.Fatal(err)
941
+		}
942
+	}
943
+
944
+	err = loggerDriver.Close()
945
+	if err != nil {
946
+		t.Fatal(err)
947
+	}
948
+
949
+	if len(hec.messages) != 10 {
950
+		t.Fatal("Not all messages delivered")
951
+	}
952
+
953
+	for i, message := range hec.messages {
954
+		if event, err := message.EventAsMap(); err != nil {
955
+			t.Fatal(err)
956
+		} else {
957
+			if event["line"] != fmt.Sprintf("%d", i) {
958
+				t.Fatalf("Unexpected event in message %v", event)
959
+			}
960
+		}
961
+	}
962
+
963
+	// 1 to verify connection and 10 messages
964
+	if hec.numOfRequests != 11 {
965
+		t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
966
+	}
967
+
968
+	err = hec.Close()
969
+	if err != nil {
970
+		t.Fatal(err)
971
+	}
972
+
973
+	if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
974
+		t.Fatal(err)
975
+	}
976
+
977
+	if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
978
+		t.Fatal(err)
979
+	}
980
+
981
+	if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
982
+		t.Fatal(err)
983
+	}
984
+
985
+	if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
986
+		t.Fatal(err)
987
+	}
988
+}
989
+
990
+// Driver should not be created when HEC is unresponsive
991
+func TestVerify(t *testing.T) {
992
+	hec := NewHTTPEventCollectorMock(t)
993
+	hec.simulateServerError = true
994
+	go hec.Serve()
995
+
996
+	ctx := logger.Context{
997
+		Config: map[string]string{
998
+			splunkURLKey:   hec.URL(),
999
+			splunkTokenKey: hec.token,
1000
+		},
1001
+		ContainerID:        "containeriid",
1002
+		ContainerName:      "/container_name",
1003
+		ContainerImageID:   "contaimageid",
1004
+		ContainerImageName: "container_image_name",
1005
+	}
1006
+
1007
+	_, err := New(ctx)
1008
+	if err == nil {
1009
+		t.Fatal("Expecting driver to fail, when server is unresponsive")
1010
+	}
1011
+
1012
+	err = hec.Close()
1013
+	if err != nil {
1014
+		t.Fatal(err)
1015
+	}
1016
+}
1017
+
1018
+// Verify that user can specify to skip verification that Splunk HEC is working.
1019
+// Also in this test we verify retry logic.
1020
+func TestSkipVerify(t *testing.T) {
1021
+	hec := NewHTTPEventCollectorMock(t)
1022
+	hec.simulateServerError = true
1023
+	go hec.Serve()
1024
+
1025
+	ctx := logger.Context{
1026
+		Config: map[string]string{
1027
+			splunkURLKey:              hec.URL(),
1028
+			splunkTokenKey:            hec.token,
1029
+			splunkVerifyConnectionKey: "false",
1030
+		},
1031
+		ContainerID:        "containeriid",
1032
+		ContainerName:      "/container_name",
1033
+		ContainerImageID:   "contaimageid",
1034
+		ContainerImageName: "container_image_name",
1035
+	}
1036
+
1037
+	loggerDriver, err := New(ctx)
1038
+	if err != nil {
1039
+		t.Fatal(err)
1040
+	}
1041
+
1042
+	if hec.connectionVerified {
1043
+		t.Fatal("Connection should not be verified")
1044
+	}
1045
+
1046
+	for i := 0; i < defaultStreamChannelSize*2; i++ {
1047
+		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
1048
+			t.Fatal(err)
1049
+		}
1050
+	}
1051
+
1052
+	if len(hec.messages) != 0 {
1053
+		t.Fatal("No messages should be accepted at this point")
1054
+	}
1055
+
1056
+	hec.simulateServerError = false
1057
+
1058
+	for i := defaultStreamChannelSize * 2; i < defaultStreamChannelSize*4; i++ {
1059
+		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
1060
+			t.Fatal(err)
1061
+		}
1062
+	}
1063
+
1064
+	err = loggerDriver.Close()
1065
+	if err != nil {
1066
+		t.Fatal(err)
1067
+	}
1068
+
1069
+	if len(hec.messages) != defaultStreamChannelSize*4 {
1070
+		t.Fatal("Not all messages delivered")
1071
+	}
1072
+
1073
+	for i, message := range hec.messages {
1074
+		if event, err := message.EventAsMap(); err != nil {
1075
+			t.Fatal(err)
1076
+		} else {
1077
+			if event["line"] != fmt.Sprintf("%d", i) {
1078
+				t.Fatalf("Unexpected event in message %v", event)
1079
+			}
1080
+		}
1081
+	}
1082
+
1083
+	err = hec.Close()
1084
+	if err != nil {
1085
+		t.Fatal(err)
1086
+	}
1087
+}
1088
+
1089
+// Verify logic for when we filled whole buffer
1090
+func TestBufferMaximum(t *testing.T) {
1091
+	if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
1092
+		t.Fatal(err)
1093
+	}
1094
+
1095
+	if err := os.Setenv(envVarBufferMaximum, "10"); err != nil {
1096
+		t.Fatal(err)
1097
+	}
1098
+
1099
+	if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
1100
+		t.Fatal(err)
1101
+	}
1102
+
1103
+	hec := NewHTTPEventCollectorMock(t)
1104
+	hec.simulateServerError = true
1105
+	go hec.Serve()
1106
+
1107
+	ctx := logger.Context{
1108
+		Config: map[string]string{
1109
+			splunkURLKey:              hec.URL(),
1110
+			splunkTokenKey:            hec.token,
1111
+			splunkVerifyConnectionKey: "false",
1112
+		},
1113
+		ContainerID:        "containeriid",
1114
+		ContainerName:      "/container_name",
1115
+		ContainerImageID:   "contaimageid",
1116
+		ContainerImageName: "container_image_name",
1117
+	}
1118
+
1119
+	loggerDriver, err := New(ctx)
1120
+	if err != nil {
1121
+		t.Fatal(err)
1122
+	}
1123
+
1124
+	if hec.connectionVerified {
1125
+		t.Fatal("Connection should not be verified")
1126
+	}
1127
+
1128
+	for i := 0; i < 11; i++ {
1129
+		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
1130
+			t.Fatal(err)
1131
+		}
1132
+	}
1133
+
1134
+	if len(hec.messages) != 0 {
1135
+		t.Fatal("No messages should be accepted at this point")
1136
+	}
1137
+
1138
+	hec.simulateServerError = false
1139
+
1140
+	err = loggerDriver.Close()
1141
+	if err != nil {
1142
+		t.Fatal(err)
1143
+	}
1144
+
1145
+	if len(hec.messages) != 9 {
1146
+		t.Fatalf("Expected # of messages %d, got %d", 9, len(hec.messages))
1147
+	}
1148
+
1149
+	// First 1000 messages are written to daemon log when buffer was full
1150
+	for i, message := range hec.messages {
1151
+		if event, err := message.EventAsMap(); err != nil {
1152
+			t.Fatal(err)
1153
+		} else {
1154
+			if event["line"] != fmt.Sprintf("%d", i+2) {
1155
+				t.Fatalf("Unexpected event in message %v", event)
1156
+			}
1157
+		}
1158
+	}
1159
+
1160
+	err = hec.Close()
1161
+	if err != nil {
1162
+		t.Fatal(err)
1163
+	}
1164
+
1165
+	if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
1166
+		t.Fatal(err)
1167
+	}
1168
+
1169
+	if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
1170
+		t.Fatal(err)
1171
+	}
1172
+
1173
+	if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
1174
+		t.Fatal(err)
1175
+	}
1176
+}
1177
+
1178
+// Verify that we are not blocking close when HEC is down for the whole time
1179
+func TestServerAlwaysDown(t *testing.T) {
1180
+	if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
1181
+		t.Fatal(err)
1182
+	}
1183
+
1184
+	if err := os.Setenv(envVarBufferMaximum, "4"); err != nil {
1185
+		t.Fatal(err)
1186
+	}
1187
+
1188
+	if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
1189
+		t.Fatal(err)
1190
+	}
1191
+
1192
+	hec := NewHTTPEventCollectorMock(t)
1193
+	hec.simulateServerError = true
1194
+	go hec.Serve()
1195
+
1196
+	ctx := logger.Context{
1197
+		Config: map[string]string{
1198
+			splunkURLKey:              hec.URL(),
1199
+			splunkTokenKey:            hec.token,
1200
+			splunkVerifyConnectionKey: "false",
1201
+		},
1202
+		ContainerID:        "containeriid",
1203
+		ContainerName:      "/container_name",
1204
+		ContainerImageID:   "contaimageid",
1205
+		ContainerImageName: "container_image_name",
1206
+	}
1207
+
1208
+	loggerDriver, err := New(ctx)
1209
+	if err != nil {
1210
+		t.Fatal(err)
1211
+	}
1212
+
1213
+	if hec.connectionVerified {
1214
+		t.Fatal("Connection should not be verified")
1215
+	}
1216
+
1217
+	for i := 0; i < 5; i++ {
1218
+		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
1219
+			t.Fatal(err)
1220
+		}
1221
+	}
1222
+
1223
+	err = loggerDriver.Close()
1224
+	if err != nil {
1225
+		t.Fatal(err)
1226
+	}
1227
+
1228
+	if len(hec.messages) != 0 {
1229
+		t.Fatal("No messages should be sent")
1230
+	}
1231
+
1232
+	err = hec.Close()
1233
+	if err != nil {
1234
+		t.Fatal(err)
1235
+	}
1236
+
1237
+	if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
1238
+		t.Fatal(err)
1239
+	}
1240
+
1241
+	if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
1242
+		t.Fatal(err)
1243
+	}
1244
+
1245
+	if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
1246
+		t.Fatal(err)
1247
+	}
1248
+}
1249
+
1250
+// Cannot send messages after we close driver
1251
+func TestCannotSendAfterClose(t *testing.T) {
1252
+	hec := NewHTTPEventCollectorMock(t)
1253
+	go hec.Serve()
1254
+
1255
+	ctx := logger.Context{
1256
+		Config: map[string]string{
1257
+			splunkURLKey:   hec.URL(),
1258
+			splunkTokenKey: hec.token,
1259
+		},
1260
+		ContainerID:        "containeriid",
1261
+		ContainerName:      "/container_name",
1262
+		ContainerImageID:   "contaimageid",
1263
+		ContainerImageName: "container_image_name",
1264
+	}
1265
+
1266
+	loggerDriver, err := New(ctx)
1267
+	if err != nil {
1268
+		t.Fatal(err)
1269
+	}
1270
+
1271
+	if err := loggerDriver.Log(&logger.Message{[]byte("message1"), "stdout", time.Now(), nil, false}); err != nil {
1272
+		t.Fatal(err)
1273
+	}
1274
+
1275
+	err = loggerDriver.Close()
1276
+	if err != nil {
1277
+		t.Fatal(err)
1278
+	}
1279
+
1280
+	if err := loggerDriver.Log(&logger.Message{[]byte("message2"), "stdout", time.Now(), nil, false}); err == nil {
1281
+		t.Fatal("Driver should not allow to send messages after close")
1282
+	}
1283
+
1284
+	if len(hec.messages) != 1 {
1285
+		t.Fatal("Only one message should be sent")
1286
+	}
1287
+
1288
+	message := hec.messages[0]
1289
+	if event, err := message.EventAsMap(); err != nil {
1290
+		t.Fatal(err)
1291
+	} else {
1292
+		if event["line"] != "message1" {
1293
+			t.Fatalf("Unexpected event in message %v", event)
1294
+		}
1295
+	}
1296
+
1297
+	err = hec.Close()
1298
+	if err != nil {
1299
+		t.Fatal(err)
1300
+	}
1301
+}
0 1302
new file mode 100644
... ...
@@ -0,0 +1,157 @@
0
+package splunk
1
+
2
+import (
3
+	"compress/gzip"
4
+	"encoding/json"
5
+	"fmt"
6
+	"io"
7
+	"io/ioutil"
8
+	"net"
9
+	"net/http"
10
+	"testing"
11
+)
12
+
13
+func (message *splunkMessage) EventAsString() (string, error) {
14
+	if val, ok := message.Event.(string); ok {
15
+		return val, nil
16
+	}
17
+	return "", fmt.Errorf("Cannot cast Event %v to string", message.Event)
18
+}
19
+
20
+func (message *splunkMessage) EventAsMap() (map[string]interface{}, error) {
21
+	if val, ok := message.Event.(map[string]interface{}); ok {
22
+		return val, nil
23
+	}
24
+	return nil, fmt.Errorf("Cannot cast Event %v to map", message.Event)
25
+}
26
+
27
+type HTTPEventCollectorMock struct {
28
+	tcpAddr     *net.TCPAddr
29
+	tcpListener *net.TCPListener
30
+
31
+	token               string
32
+	simulateServerError bool
33
+
34
+	test *testing.T
35
+
36
+	connectionVerified bool
37
+	gzipEnabled        *bool
38
+	messages           []*splunkMessage
39
+	numOfRequests      int
40
+}
41
+
42
+func NewHTTPEventCollectorMock(t *testing.T) *HTTPEventCollectorMock {
43
+	tcpAddr := &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0, Zone: ""}
44
+	tcpListener, err := net.ListenTCP("tcp", tcpAddr)
45
+	if err != nil {
46
+		t.Fatal(err)
47
+	}
48
+	return &HTTPEventCollectorMock{
49
+		tcpAddr:             tcpAddr,
50
+		tcpListener:         tcpListener,
51
+		token:               "4642492F-D8BD-47F1-A005-0C08AE4657DF",
52
+		simulateServerError: false,
53
+		test:                t,
54
+		connectionVerified:  false}
55
+}
56
+
57
+func (hec *HTTPEventCollectorMock) URL() string {
58
+	return "http://" + hec.tcpListener.Addr().String()
59
+}
60
+
61
+func (hec *HTTPEventCollectorMock) Serve() error {
62
+	return http.Serve(hec.tcpListener, hec)
63
+}
64
+
65
+func (hec *HTTPEventCollectorMock) Close() error {
66
+	return hec.tcpListener.Close()
67
+}
68
+
69
+func (hec *HTTPEventCollectorMock) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
70
+	var err error
71
+
72
+	hec.numOfRequests++
73
+
74
+	if hec.simulateServerError {
75
+		if request.Body != nil {
76
+			defer request.Body.Close()
77
+		}
78
+		writer.WriteHeader(http.StatusInternalServerError)
79
+		return
80
+	}
81
+
82
+	switch request.Method {
83
+	case http.MethodOptions:
84
+		// Verify that options method is getting called only once
85
+		if hec.connectionVerified {
86
+			hec.test.Errorf("Connection should not be verified more than once. Got second request with %s method.", request.Method)
87
+		}
88
+		hec.connectionVerified = true
89
+		writer.WriteHeader(http.StatusOK)
90
+	case http.MethodPost:
91
+		// Always verify that Driver is using correct path to HEC
92
+		if request.URL.String() != "/services/collector/event/1.0" {
93
+			hec.test.Errorf("Unexpected path %v", request.URL)
94
+		}
95
+		defer request.Body.Close()
96
+
97
+		if authorization, ok := request.Header["Authorization"]; !ok || authorization[0] != ("Splunk "+hec.token) {
98
+			hec.test.Error("Authorization header is invalid.")
99
+		}
100
+
101
+		gzipEnabled := false
102
+		if contentEncoding, ok := request.Header["Content-Encoding"]; ok && contentEncoding[0] == "gzip" {
103
+			gzipEnabled = true
104
+		}
105
+
106
+		if hec.gzipEnabled == nil {
107
+			hec.gzipEnabled = &gzipEnabled
108
+		} else if gzipEnabled != *hec.gzipEnabled {
109
+			// Nothing wrong with that, but we just know that Splunk Logging Driver does not do that
110
+			hec.test.Error("Driver should not change Content Encoding.")
111
+		}
112
+
113
+		var gzipReader *gzip.Reader
114
+		var reader io.Reader
115
+		if gzipEnabled {
116
+			gzipReader, err = gzip.NewReader(request.Body)
117
+			if err != nil {
118
+				hec.test.Fatal(err)
119
+			}
120
+			reader = gzipReader
121
+		} else {
122
+			reader = request.Body
123
+		}
124
+
125
+		// Read body
126
+		var body []byte
127
+		body, err = ioutil.ReadAll(reader)
128
+		if err != nil {
129
+			hec.test.Fatal(err)
130
+		}
131
+
132
+		// Parse message
133
+		messageStart := 0
134
+		for i := 0; i < len(body); i++ {
135
+			if i == len(body)-1 || (body[i] == '}' && body[i+1] == '{') {
136
+				var message splunkMessage
137
+				err = json.Unmarshal(body[messageStart:i+1], &message)
138
+				if err != nil {
139
+					hec.test.Log(string(body[messageStart : i+1]))
140
+					hec.test.Fatal(err)
141
+				}
142
+				hec.messages = append(hec.messages, &message)
143
+				messageStart = i + 1
144
+			}
145
+		}
146
+
147
+		if gzipEnabled {
148
+			gzipReader.Close()
149
+		}
150
+
151
+		writer.WriteHeader(http.StatusOK)
152
+	default:
153
+		hec.test.Errorf("Unexpected HTTP method %s", http.MethodOptions)
154
+		writer.WriteHeader(http.StatusBadRequest)
155
+	}
156
+}
... ...
@@ -32,21 +32,23 @@ You can set the logging driver for a specific container by using the
32 32
 You can use the `--log-opt NAME=VALUE` flag to specify these additional Splunk
33 33
 logging driver options:
34 34
 
35
-| Option                      | Required | Description                                                                                                                                                                                                        |
36
-|-----------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
37
-| `splunk-token`              | required | Splunk HTTP Event Collector token.                                                                                                                                                                                 |
38
-| `splunk-url`                | required | Path to your Splunk Enterprise or Splunk Cloud instance (including port and scheme used by HTTP Event Collector) `https://your_splunk_instance:8088`.                                                              |
39
-| `splunk-source`             | optional | Event source.                                                                                                                                                                                                      |
40
-| `splunk-sourcetype`         | optional | Event source type.                                                                                                                                                                                                 |
41
-| `splunk-index`              | optional | Event index.                                                                                                                                                                                                       |
42
-| `splunk-capath`             | optional | Path to root certificate.                                                                                                                                                                                          |
43
-| `splunk-caname`             | optional | Name to use for validating server certificate; by default the hostname of the `splunk-url` will be used.                                                                                                           |
44
-| `splunk-insecureskipverify` | optional | Ignore server certificate validation.                                                                                                                                                                              |
45
-| `splunk-format`             | optional | Message format. Can be `inline`, `json` or `raw`. Defaults to `inline`.                                                                                                                                            |
46
-| `splunk-verify-connection`   | optional | Verify on start, that docker can connect to Splunk server. Defaults to true.                                                                                                                                       |
47
-| `tag`                       | optional | Specify tag for message, which interpret some markup. Default value is `{{.ID}}` (12 characters of the container ID). Refer to the [log tag option documentation](log_tags.md) for customizing the log tag format. |
48
-| `labels`                    | optional | Comma-separated list of keys of labels, which should be included in message, if these labels are specified for container.                                                                                          |
49
-| `env`                       | optional | Comma-separated list of keys of environment variables, which should be included in message, if these variables are specified for container.                                                                        |
35
+| Option                      | Required | Description                                                                                                                                                                                                             |
36
+|-----------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
37
+| `splunk-token`              | required | Splunk HTTP Event Collector token.                                                                                                                                                                                      |
38
+| `splunk-url`                | required | Path to your Splunk Enterprise or Splunk Cloud instance (including port and scheme used by HTTP Event Collector) `https://your_splunk_instance:8088`.                                                                   |
39
+| `splunk-source`             | optional | Event source.                                                                                                                                                                                                           |
40
+| `splunk-sourcetype`         | optional | Event source type.                                                                                                                                                                                                      |
41
+| `splunk-index`              | optional | Event index.                                                                                                                                                                                                            |
42
+| `splunk-capath`             | optional | Path to root certificate.                                                                                                                                                                                               |
43
+| `splunk-caname`             | optional | Name to use for validating server certificate; by default the hostname of the `splunk-url` will be used.                                                                                                                |
44
+| `splunk-insecureskipverify` | optional | Ignore server certificate validation.                                                                                                                                                                                   |
45
+| `splunk-format`             | optional | Message format. Can be `inline`, `json` or `raw`. Defaults to `inline`.                                                                                                                                                 |
46
+| `splunk-verify-connection`  | optional | Verify on start, that docker can connect to Splunk server. Defaults to true.                                                                                                                                            |
47
+| `splunk-gzip`               | optional | Enable/disable gzip compression to send events to Splunk Enterprise or Splunk Cloud instance. Defaults to false.                                                                                                         |
48
+| `splunk-gzip-level`         | optional | Set compression level for gzip. Valid values are -1 (default), 0 (no compression), 1 (best speed) ... 9 (best compression). Defaults to [DefaultCompression](https://golang.org/pkg/compress/gzip/#DefaultCompression). |
49
+| `tag`                       | optional | Specify tag for message, which interpret some markup. Default value is `{{.ID}}` (12 characters of the container ID). Refer to the [log tag option documentation](log_tags.md) for customizing the log tag format.      |
50
+| `labels`                    | optional | Comma-separated list of keys of labels, which should be included in message, if these labels are specified for container.                                                                                               |
51
+| `env`                       | optional | Comma-separated list of keys of environment variables, which should be included in message, if these variables are specified for container.                                                                             |
50 52
 
51 53
 If there is collision between `label` and `env` keys, the value of the `env` takes precedence.
52 54
 Both options add additional fields to the attributes of a logging message.
... ...
@@ -132,3 +134,14 @@ tag will be prefixed to the message. For example
132 132
 MyImage/MyContainer env1=val1 label1=label1 my message
133 133
 MyImage/MyContainer env1=val1 label1=label1 {"foo": "bar"}
134 134
 ```
135
+
136
+## Advanced options
137
+
138
+Splunk Logging Driver allows you to configure few advanced options by specifying next environment variables for the Docker daemon.
139
+
140
+| Environment variable name                        | Default value | Description                                                                                                                                        |
141
+|--------------------------------------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------|
142
+| `SPLUNK_LOGGING_DRIVER_POST_MESSAGES_FREQUENCY`  | `5s`          | If there is nothing to batch how often driver will post messages. You can think about this as the maximum time to wait for more messages to batch. |
143
+| `SPLUNK_LOGGING_DRIVER_POST_MESSAGES_BATCH_SIZE` | `1000`        | How many messages driver should wait before sending them in one batch.                                                                             |
144
+| `SPLUNK_LOGGING_DRIVER_BUFFER_MAX`               | `10 * 1000`   | If driver cannot connect to remote server, what is the maximum amount of messages it can hold in buffer for retries.                               |
145
+| `SPLUNK_LOGGING_DRIVER_CHANNEL_SIZE`             | `4 * 1000`    | How many pending messages can be in the channel which is used to send messages to background logger worker, which batches them.                    |