package fluentd import ( "bufio" "context" "net" "path/filepath" "runtime" "sync" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/moby/moby/v2/daemon/logger" "gotest.tools/v3/assert" ) func TestValidateLogOptReconnectInterval(t *testing.T) { invalidIntervals := []string{"-1", "1", "-1s", "99ms", "11s"} for _, v := range invalidIntervals { t.Run("invalid "+v, func(t *testing.T) { err := ValidateLogOpt(map[string]string{asyncReconnectIntervalKey: v}) assert.ErrorContains(t, err, "invalid value for fluentd-async-reconnect-interval:") }) } validIntervals := []string{"100ms", "10s"} for _, v := range validIntervals { t.Run("valid "+v, func(t *testing.T) { err := ValidateLogOpt(map[string]string{asyncReconnectIntervalKey: v}) assert.NilError(t, err) }) } } func TestValidateLogOptAddress(t *testing.T) { // ports to try, and their results validPorts := map[string]int{ "": defaultPort, ":": defaultPort, ":123": 123, ":65535": 65535, } // paths to try, which should result in an error paths := []string{"/", "/some-path"} tests := []struct { addr string ports map[string]int // combinations of addr + port -> expected port paths []string // paths to append to addr, should be an error for tcp/udp expected location expectedErr string }{ { addr: "", expected: location{ protocol: defaultProtocol, host: defaultHost, port: defaultPort, }, }, { addr: "192.168.1.1", ports: validPorts, paths: paths, expected: location{ protocol: defaultProtocol, host: "192.168.1.1", }, }, { addr: "[::1]", ports: validPorts, paths: paths, expected: location{ protocol: defaultProtocol, host: "::1", }, }, { addr: "example.com", ports: validPorts, paths: paths, expected: location{ protocol: defaultProtocol, host: "example.com", }, }, { addr: "tcp://", paths: paths, expected: location{ protocol: "tcp", host: defaultHost, port: defaultPort, }, }, { addr: "tcp://example.com", ports: validPorts, paths: paths, expected: location{ protocol: "tcp", host: "example.com", }, }, { addr: "tls://", paths: paths, expected: location{ protocol: "tls", host: defaultHost, port: defaultPort, }, }, { addr: "tls://example.com", ports: validPorts, paths: paths, expected: location{ protocol: "tls", host: "example.com", }, }, { addr: "://", expectedErr: "missing protocol scheme", }, { addr: "something://", expectedErr: "unsupported scheme: 'something'", }, { addr: "udp://", expectedErr: "unsupported scheme: 'udp'", }, { addr: "unixgram://", expectedErr: "unsupported scheme: 'unixgram'", }, { addr: "tcp+tls://", expectedErr: "unsupported scheme: 'tcp+tls'", }, { addr: "corrupted:c", expectedErr: "invalid port", }, { addr: "tcp://example.com:port", expectedErr: "invalid port", }, { addr: "tcp://example.com:-1", expectedErr: "invalid port", }, { addr: "tcp://example.com:65536", expectedErr: "invalid port", }, { addr: "unix://", expectedErr: "path is empty", }, { addr: "unix:///some/socket.sock", expected: location{ protocol: "unix", path: "/some/socket.sock", }, }, { addr: "unix:///some/socket.sock:80", // unusual, but technically valid expected: location{ protocol: "unix", path: "/some/socket.sock:80", }, }, } for _, tc := range tests { if len(tc.ports) == 0 { tc.ports = map[string]int{"": tc.expected.port} } // always try empty paths; add paths to try if the test specifies it tc.paths = append([]string{""}, tc.paths...) for port, expectedPort := range tc.ports { for _, path := range tc.paths { address := tc.addr + port + path expected := tc.expected expected.port = expectedPort t.Run(address, func(t *testing.T) { err := ValidateLogOpt(map[string]string{addressKey: address}) if path != "" { assert.ErrorContains(t, err, "should not contain a path element") return } if tc.expectedErr != "" { assert.ErrorContains(t, err, tc.expectedErr) return } assert.NilError(t, err) addr, _ := parseAddress(address) assert.DeepEqual(t, expected, *addr, cmp.AllowUnexported(location{})) }) } } } } func TestValidateWriteTimeoutDuration(t *testing.T) { invalidDurations := []string{"-1", "1", "-1s"} for _, d := range invalidDurations { t.Run("invalid "+d, func(t *testing.T) { err := ValidateLogOpt(map[string]string{writeTimeoutKey: d}) assert.ErrorContains(t, err, "invalid value for fluentd-write-timeout:") }) } validDurations := map[string]time.Duration{ "100ms": 100 * time.Millisecond, "10s": 10 * time.Second, "": 0, } for k, v := range validDurations { t.Run("valid "+k, func(t *testing.T) { err := ValidateLogOpt(map[string]string{writeTimeoutKey: k}) assert.NilError(t, err) cfg, err := parseConfig(map[string]string{writeTimeoutKey: k}) // This check is mostly redundant since it's checked in ValidateLogOpt as well. // This is here to guard against potential regressions in the future. assert.NilError(t, err) assert.Equal(t, cfg.WriteTimeout, v) }) } } // TestReadWriteTimeoutsAreEffective tests that read and write timeout values are effective // for fluentd. func TestReadWriteTimeoutsAreEffective(t *testing.T) { if runtime.GOOS == "windows" { t.Skip("windows not supported") } for _, tc := range []struct { name string cfg map[string]string connectionHandler connHandler rwValidator func(ctx context.Context, f logger.Logger) }{ { // This test case tests that writes timeout when the server is unresponsive. // The test ensures that instead of hanging forever, the fluentd write operation // returns an error when writes cannot be completed within the specified duration. name: "write timeout", cfg: map[string]string{ "fluentd-write-timeout": "1ms", }, connectionHandler: blackholeConnectionHandler, rwValidator: func(ctx context.Context, f logger.Logger) { // Attempt writing 1MiB worth of log data (all 0's) repeatedly. We should see a failure // after the 1st or the 2nd attempt depending on when the connection's write buffer gets // filled up. // If we don't set a write timeout on the connection, this will hang forever. But, because // we have a write timeout, we expect the `Log` method to return an error. data := make([]byte, 1024*1024) var err error for range 10 { err = f.Log(&logger.Message{ Line: data, Timestamp: time.Now(), }) if err != nil { break } } // Checks if the error contains the expected message. The full message is of the format: // "fluent#write: failed to write after %d attempts". assert.ErrorContains(t, err, "fluent#write: failed to write after") }, }, { // This test case tests that reads timeout when the server is unresponsive and unable to // send acks back. name: "read timeout", cfg: map[string]string{ "fluentd-read-timeout": "1ms", "fluentd-request-ack": "true", }, connectionHandler: noAckConnectionHandler, rwValidator: func(ctx context.Context, f logger.Logger) { data := make([]byte, 1024*1024) done := make(chan error, 1) go func() { // Log will hang forever if the read timeout is not set. Hence, we invoke that // asynchronously so that we can timeout the test if something goes wrong. done <- f.Log(&logger.Message{ Line: data, Timestamp: time.Now(), }) }() select { case err := <-done: // In an ideal world, we would expect the following error to be returned by fluentd: // "fluent#write: error reading message response ack" // (Ref: https://github.com/fluent/fluent-logger-golang/blob/6b31033c91e794274fd6b77c692412ae945d7d67/fluent/fluent.go#L686) // However, the write method returns a generic message back to the caller and consumes // the original error message without returning it to the caller. // (Ref: https://github.com/fluent/fluent-logger-golang/blob/6b31033c91e794274fd6b77c692412ae945d7d67/fluent/fluent.go#L597) assert.ErrorContains(t, err, "fluent#write: failed to write after 1 attempts") case <-ctx.Done(): t.Log("Test timed out, which is unexpected") t.Fail() } }, }, } { t.Run(tc.name, func(t *testing.T) { // Create a temporary directory for the socket file. tmpDir := t.TempDir() socketFile := filepath.Join(tmpDir, "fluent-logger-golang.sock") l, err := net.Listen("unix", socketFile) assert.NilError(t, err, "unable to create listener for socket %s", socketFile) defer l.Close() // This is to guard against potential run-away test scenario so that a future change // doesn't cause the tests suite to timeout. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() var connectedWG sync.WaitGroup connectedWG.Add(1) // Start accepting connections. go acceptConnection(t, ctx, l, tc.connectionHandler, &connectedWG) // Create a base configuration for fluentd logger, agnostic of the test. cfg := map[string]string{ "fluentd-address": "unix://" + socketFile, "tag": "{{.Name}}/{{.FullID}}", // Disabling async behavior with limited retries and buffer size lets us test this in a more // preditable manner for failures. The "fluentd-read-timeout" flag should be equally effective // regardless of async being enabled/disabled. "fluentd-async": "false", "fluentd-max-retries": "1", "fluentd-retry-wait": "10ms", "fluentd-buffer-limit": "1", } // Update the config with test specific configs. for k, v := range tc.cfg { cfg[k] = v } f, err := New(logger.Info{ ContainerName: "/test-container", ContainerID: "container-abcdefghijklmnopqrstuvwxyz01234567890", Config: cfg, }) assert.NilError(t, err) defer closeLoggerWithContext(ctx, f) // Ensure that the server is ready to accept connections since we have disabled async mode // in fluentd options. connectedWG.Wait() tc.rwValidator(ctx, f) }) } } // closeLoggerWithContext enables the caller to return early if logger's Close() method is stuck. // Fluentd's Close() will be stuck as long as there's a pending write in progress since the mutex // associated with the connection will be locked. // Ref: https://github.com/fluent/fluent-logger-golang/blob/6b31033c91e794274fd6b77c692412ae945d7d67/fluent/fluent.go#L600 func closeLoggerWithContext(ctx context.Context, f logger.Logger) { doneClose := make(chan error, 1) go func() { doneClose <- f.Close() }() // Return if either Close() returns or if the context is done. select { case <-doneClose: case <-ctx.Done(): } } func acceptConnection( t *testing.T, ctx context.Context, l net.Listener, handleConnection connHandler, wg *sync.WaitGroup, ) { wg.Done() for { conn, err := l.Accept() if err != nil { // Unable to process the connection. This can happen if the connection is closed. select { case <-ctx.Done(): // If the context is canceled, there's nothing for us to do here. return default: t.Logf("Unable to accept connection: %v", err) continue } } // Handle an incoming connection. go handleConnection(ctx, conn) } } type connHandler func(ctx context.Context, conn net.Conn) func blackholeConnectionHandler(ctx context.Context, conn net.Conn) { // Simulate unresponsive server: do nothing with the connection. We're essentially blackholing this // by not reading from or writing to the connection. <-ctx.Done() _ = conn.Close() } func noAckConnectionHandler(ctx context.Context, conn net.Conn) { // Create a buffered reader to read from the connection. reader := bufio.NewReader(conn) // Read data from the connection. _, err := reader.ReadString('\n') if err != nil { // If there's an error reading from the connection, it means the connection is closed. select { case <-ctx.Done(): // If the context is canceled, there's nothing for us to do here. return default: return } } // Don't write an ack back. The fluent configuration is set to expect an ack from the server. <-ctx.Done() _ = conn.Close() }