Browse code

Add new `local` log driver

This driver uses protobuf to store log messages and has better defaults
for log file handling (e.g. compression and file rotation enabled by
default).

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2018/04/06 01:41:35
Showing 7 changed files
... ...
@@ -22,7 +22,9 @@ import (
22 22
 	"github.com/docker/docker/daemon/exec"
23 23
 	"github.com/docker/docker/daemon/logger"
24 24
 	"github.com/docker/docker/daemon/logger/jsonfilelog"
25
+	"github.com/docker/docker/daemon/logger/local"
25 26
 	"github.com/docker/docker/daemon/network"
27
+	"github.com/docker/docker/errdefs"
26 28
 	"github.com/docker/docker/image"
27 29
 	"github.com/docker/docker/layer"
28 30
 	"github.com/docker/docker/pkg/containerfs"
... ...
@@ -375,13 +377,27 @@ func (container *Container) StartLogger() (logger.Logger, error) {
375 375
 	}
376 376
 
377 377
 	// Set logging file for "json-logger"
378
-	if cfg.Type == jsonfilelog.Name {
378
+	// TODO(@cpuguy83): Setup here based on log driver is a little weird.
379
+	switch cfg.Type {
380
+	case jsonfilelog.Name:
379 381
 		info.LogPath, err = container.GetRootResourcePath(fmt.Sprintf("%s-json.log", container.ID))
380 382
 		if err != nil {
381 383
 			return nil, err
382 384
 		}
383 385
 
384 386
 		container.LogPath = info.LogPath
387
+	case local.Name:
388
+		// Do not set container.LogPath for the local driver
389
+		// This would expose the value to the API, which should not be done as it means
390
+		// that the log file implementation would become a stable API that cannot change.
391
+		logDir, err := container.GetRootResourcePath("local-logs")
392
+		if err != nil {
393
+			return nil, err
394
+		}
395
+		if err := os.MkdirAll(logDir, 0700); err != nil {
396
+			return nil, errdefs.System(errors.Wrap(err, "error creating local logs dir"))
397
+		}
398
+		info.LogPath = filepath.Join(logDir, "container.log")
385 399
 	}
386 400
 
387 401
 	l, err := initDriver(info)
... ...
@@ -9,6 +9,7 @@ import (
9 9
 	_ "github.com/docker/docker/daemon/logger/gelf"
10 10
 	_ "github.com/docker/docker/daemon/logger/journald"
11 11
 	_ "github.com/docker/docker/daemon/logger/jsonfilelog"
12
+	_ "github.com/docker/docker/daemon/logger/local"
12 13
 	_ "github.com/docker/docker/daemon/logger/logentries"
13 14
 	_ "github.com/docker/docker/daemon/logger/splunk"
14 15
 	_ "github.com/docker/docker/daemon/logger/syslog"
15 16
new file mode 100644
... ...
@@ -0,0 +1,36 @@
0
+package local
1
+
2
+import (
3
+	"github.com/pkg/errors"
4
+)
5
+
6
+// CreateConfig is used to configure new instances of driver
7
+type CreateConfig struct {
8
+	DisableCompression bool
9
+	MaxFileSize        int64
10
+	MaxFileCount       int
11
+}
12
+
13
+func newDefaultConfig() *CreateConfig {
14
+	return &CreateConfig{
15
+		MaxFileSize:        defaultMaxFileSize,
16
+		MaxFileCount:       defaultMaxFileCount,
17
+		DisableCompression: !defaultCompressLogs,
18
+	}
19
+}
20
+
21
+func validateConfig(cfg *CreateConfig) error {
22
+	if cfg.MaxFileSize < 0 {
23
+		return errors.New("max size should be a positive number")
24
+	}
25
+	if cfg.MaxFileCount < 0 {
26
+		return errors.New("max file count cannot be less than 0")
27
+	}
28
+
29
+	if !cfg.DisableCompression {
30
+		if cfg.MaxFileCount <= 1 {
31
+			return errors.New("compression cannot be enabled when max file count is 1")
32
+		}
33
+	}
34
+	return nil
35
+}
0 36
new file mode 100644
... ...
@@ -0,0 +1,9 @@
0
+// Package local provides a logger implementation that stores logs on disk.
1
+//
2
+// Log messages are encoded as protobufs with a header and footer for each message.
3
+// The header and footer are big-endian binary encoded uint32 values which indicate the size of the log message.
4
+// The header and footer of each message allows you to efficiently read through a file either forwards or in
5
+// backwards (such as is the case when tailing a file)
6
+//
7
+// Example log message format: [22][This is a log message.][22][28][This is another log message.][28]
8
+package local // import "github.com/docker/docker/daemon/logger/local"
0 9
new file mode 100644
... ...
@@ -0,0 +1,218 @@
0
+package local // import "github.com/docker/docker/daemon/logger/local"
1
+
2
+import (
3
+	"encoding/binary"
4
+	"io"
5
+	"strconv"
6
+	"sync"
7
+	"time"
8
+
9
+	"github.com/docker/docker/api/types/backend"
10
+	"github.com/docker/docker/api/types/plugins/logdriver"
11
+	"github.com/docker/docker/daemon/logger"
12
+	"github.com/docker/docker/daemon/logger/loggerutils"
13
+	"github.com/docker/docker/errdefs"
14
+	"github.com/docker/go-units"
15
+	"github.com/pkg/errors"
16
+	"github.com/sirupsen/logrus"
17
+)
18
+
19
+const (
20
+	// Name is the name of the driver
21
+	Name = "local"
22
+
23
+	encodeBinaryLen = 4
24
+	initialBufSize  = 2048
25
+	maxDecodeRetry  = 20000
26
+
27
+	defaultMaxFileSize  int64 = 20 * 1024 * 1024
28
+	defaultMaxFileCount       = 5
29
+	defaultCompressLogs       = true
30
+)
31
+
32
+// LogOptKeys are the keys names used for log opts passed in to initialize the driver.
33
+var LogOptKeys = map[string]bool{
34
+	"max-file": true,
35
+	"max-size": true,
36
+	"compress": true,
37
+}
38
+
39
+// ValidateLogOpt looks for log driver specific options.
40
+func ValidateLogOpt(cfg map[string]string) error {
41
+	for key := range cfg {
42
+		if !LogOptKeys[key] {
43
+			return errors.Errorf("unknown log opt '%s' for log driver %s", key, Name)
44
+		}
45
+	}
46
+	return nil
47
+}
48
+
49
+func init() {
50
+	if err := logger.RegisterLogDriver(Name, New); err != nil {
51
+		logrus.Fatal(err)
52
+	}
53
+	if err := logger.RegisterLogOptValidator(Name, ValidateLogOpt); err != nil {
54
+		logrus.Fatal(err)
55
+	}
56
+}
57
+
58
+type driver struct {
59
+	mu      sync.Mutex
60
+	closed  bool
61
+	logfile *loggerutils.LogFile
62
+	readers map[*logger.LogWatcher]struct{} // stores the active log followers
63
+}
64
+
65
+// New creates a new local logger
66
+// You must provide the `LogPath` in the passed in info argument, this is the file path that logs are written to.
67
+func New(info logger.Info) (logger.Logger, error) {
68
+	if info.LogPath == "" {
69
+		return nil, errdefs.System(errors.New("log path is missing -- this is a bug and should not happen"))
70
+	}
71
+
72
+	cfg := newDefaultConfig()
73
+	if capacity, ok := info.Config["max-size"]; ok {
74
+		var err error
75
+		cfg.MaxFileSize, err = units.FromHumanSize(capacity)
76
+		if err != nil {
77
+			return nil, errdefs.InvalidParameter(errors.Wrapf(err, "invalid value for max-size: %s", capacity))
78
+		}
79
+	}
80
+
81
+	if userMaxFileCount, ok := info.Config["max-file"]; ok {
82
+		var err error
83
+		cfg.MaxFileCount, err = strconv.Atoi(userMaxFileCount)
84
+		if err != nil {
85
+			return nil, errdefs.InvalidParameter(errors.Wrapf(err, "invalid value for max-file: %s", userMaxFileCount))
86
+		}
87
+	}
88
+
89
+	if userCompress, ok := info.Config["compress"]; ok {
90
+		compressLogs, err := strconv.ParseBool(userCompress)
91
+		if err != nil {
92
+			return nil, errdefs.InvalidParameter(errors.Wrap(err, "error reading compress log option"))
93
+		}
94
+		cfg.DisableCompression = !compressLogs
95
+	}
96
+	return newDriver(info.LogPath, cfg)
97
+}
98
+
99
+func makeMarshaller() func(m *logger.Message) ([]byte, error) {
100
+	buf := make([]byte, initialBufSize)
101
+
102
+	// allocate the partial log entry separately, which allows for easier re-use
103
+	proto := &logdriver.LogEntry{}
104
+	md := &logdriver.PartialLogEntryMetadata{}
105
+
106
+	return func(m *logger.Message) ([]byte, error) {
107
+		resetProto(proto)
108
+
109
+		messageToProto(m, proto, md)
110
+		protoSize := proto.Size()
111
+		writeLen := protoSize + (2 * encodeBinaryLen) //+ len(messageDelimiter)
112
+
113
+		if writeLen > len(buf) {
114
+			buf = make([]byte, writeLen)
115
+		} else {
116
+			// shrink the buffer back down
117
+			if writeLen <= initialBufSize {
118
+				buf = buf[:initialBufSize]
119
+			} else {
120
+				buf = buf[:writeLen]
121
+			}
122
+		}
123
+
124
+		binary.BigEndian.PutUint32(buf[:encodeBinaryLen], uint32(protoSize))
125
+		n, err := proto.MarshalTo(buf[encodeBinaryLen:writeLen])
126
+		if err != nil {
127
+			return nil, errors.Wrap(err, "error marshaling log entry")
128
+		}
129
+		if n+(encodeBinaryLen*2) != writeLen {
130
+			return nil, io.ErrShortWrite
131
+		}
132
+		binary.BigEndian.PutUint32(buf[writeLen-encodeBinaryLen:writeLen], uint32(protoSize))
133
+		return buf[:writeLen], nil
134
+	}
135
+}
136
+
137
+func newDriver(logPath string, cfg *CreateConfig) (logger.Logger, error) {
138
+	if err := validateConfig(cfg); err != nil {
139
+		return nil, errdefs.InvalidParameter(err)
140
+	}
141
+
142
+	lf, err := loggerutils.NewLogFile(logPath, cfg.MaxFileSize, cfg.MaxFileCount, !cfg.DisableCompression, makeMarshaller(), decodeFunc, 0640, getTailReader)
143
+	if err != nil {
144
+		return nil, err
145
+	}
146
+	return &driver{
147
+		logfile: lf,
148
+		readers: make(map[*logger.LogWatcher]struct{}),
149
+	}, nil
150
+}
151
+
152
+func (d *driver) Name() string {
153
+	return Name
154
+}
155
+
156
+func (d *driver) Log(msg *logger.Message) error {
157
+	d.mu.Lock()
158
+	err := d.logfile.WriteLogEntry(msg)
159
+	d.mu.Unlock()
160
+	return err
161
+}
162
+
163
+func (d *driver) Close() error {
164
+	d.mu.Lock()
165
+	d.closed = true
166
+	err := d.logfile.Close()
167
+	for r := range d.readers {
168
+		r.Close()
169
+		delete(d.readers, r)
170
+	}
171
+	d.mu.Unlock()
172
+	return err
173
+}
174
+
175
+func messageToProto(msg *logger.Message, proto *logdriver.LogEntry, partial *logdriver.PartialLogEntryMetadata) {
176
+	proto.Source = msg.Source
177
+	proto.TimeNano = msg.Timestamp.UnixNano()
178
+	proto.Line = append(proto.Line[:0], msg.Line...)
179
+	proto.Partial = msg.PLogMetaData != nil
180
+	if proto.Partial {
181
+		partial.Ordinal = int32(msg.PLogMetaData.Ordinal)
182
+		partial.Last = msg.PLogMetaData.Last
183
+		partial.Id = msg.PLogMetaData.ID
184
+		proto.PartialLogMetadata = partial
185
+	} else {
186
+		proto.PartialLogMetadata = nil
187
+	}
188
+}
189
+
190
+func protoToMessage(proto *logdriver.LogEntry) *logger.Message {
191
+	msg := &logger.Message{
192
+		Source:    proto.Source,
193
+		Timestamp: time.Unix(0, proto.TimeNano),
194
+	}
195
+	if proto.Partial {
196
+		var md backend.PartialLogMetaData
197
+		md.Last = proto.GetPartialLogMetadata().GetLast()
198
+		md.ID = proto.GetPartialLogMetadata().GetId()
199
+		md.Ordinal = int(proto.GetPartialLogMetadata().GetOrdinal())
200
+		msg.PLogMetaData = &md
201
+	}
202
+	msg.Line = append(msg.Line[:0], proto.Line...)
203
+	return msg
204
+}
205
+
206
+func resetProto(proto *logdriver.LogEntry) {
207
+	proto.Source = ""
208
+	proto.Line = proto.Line[:0]
209
+	proto.TimeNano = 0
210
+	proto.Partial = false
211
+	if proto.PartialLogMetadata != nil {
212
+		proto.PartialLogMetadata.Id = ""
213
+		proto.PartialLogMetadata.Last = false
214
+		proto.PartialLogMetadata.Ordinal = 0
215
+	}
216
+	proto.PartialLogMetadata = nil
217
+}
0 218
new file mode 100644
... ...
@@ -0,0 +1,220 @@
0
+package local
1
+
2
+import (
3
+	"context"
4
+	"encoding/binary"
5
+	"io/ioutil"
6
+	"os"
7
+	"path/filepath"
8
+	"testing"
9
+	"time"
10
+
11
+	"bytes"
12
+	"fmt"
13
+
14
+	"strings"
15
+
16
+	"io"
17
+
18
+	"github.com/docker/docker/api/types/backend"
19
+	"github.com/docker/docker/api/types/plugins/logdriver"
20
+	"github.com/docker/docker/daemon/logger"
21
+	protoio "github.com/gogo/protobuf/io"
22
+	"gotest.tools/assert"
23
+	is "gotest.tools/assert/cmp"
24
+)
25
+
26
+func TestWriteLog(t *testing.T) {
27
+	t.Parallel()
28
+
29
+	dir, err := ioutil.TempDir("", t.Name())
30
+	assert.Assert(t, err)
31
+	defer os.RemoveAll(dir)
32
+
33
+	logPath := filepath.Join(dir, "test.log")
34
+
35
+	l, err := New(logger.Info{LogPath: logPath})
36
+	assert.Assert(t, err)
37
+	defer l.Close()
38
+
39
+	m1 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("message 1")}
40
+	m2 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 20 * time.Minute), Line: []byte("message 2"), PLogMetaData: &backend.PartialLogMetaData{Last: true, ID: "0001", Ordinal: 1}}
41
+	m3 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("message 3")}
42
+
43
+	// copy the log message because the underying log writer resets the log message and returns it to a buffer pool
44
+	err = l.Log(copyLogMessage(&m1))
45
+	assert.Assert(t, err)
46
+	err = l.Log(copyLogMessage(&m2))
47
+	assert.Assert(t, err)
48
+	err = l.Log(copyLogMessage(&m3))
49
+	assert.Assert(t, err)
50
+
51
+	f, err := os.Open(logPath)
52
+	assert.Assert(t, err)
53
+	defer f.Close()
54
+	dec := protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6)
55
+
56
+	var (
57
+		proto     logdriver.LogEntry
58
+		testProto logdriver.LogEntry
59
+		partial   logdriver.PartialLogEntryMetadata
60
+	)
61
+
62
+	lenBuf := make([]byte, encodeBinaryLen)
63
+	seekMsgLen := func() {
64
+		io.ReadFull(f, lenBuf)
65
+	}
66
+
67
+	err = dec.ReadMsg(&proto)
68
+	assert.Assert(t, err)
69
+	messageToProto(&m1, &testProto, &partial)
70
+	assert.Check(t, is.DeepEqual(testProto, proto), "expected:\n%+v\ngot:\n%+v", testProto, proto)
71
+	seekMsgLen()
72
+
73
+	err = dec.ReadMsg(&proto)
74
+	assert.Assert(t, err)
75
+	messageToProto(&m2, &testProto, &partial)
76
+	assert.Check(t, is.DeepEqual(testProto, proto))
77
+	seekMsgLen()
78
+
79
+	err = dec.ReadMsg(&proto)
80
+	assert.Assert(t, err)
81
+	messageToProto(&m3, &testProto, &partial)
82
+	assert.Check(t, is.DeepEqual(testProto, proto), "expected:\n%+v\ngot:\n%+v", testProto, proto)
83
+}
84
+
85
+func TestReadLog(t *testing.T) {
86
+	t.Parallel()
87
+
88
+	dir, err := ioutil.TempDir("", t.Name())
89
+	assert.Assert(t, err)
90
+	defer os.RemoveAll(dir)
91
+
92
+	logPath := filepath.Join(dir, "test.log")
93
+	l, err := New(logger.Info{LogPath: logPath})
94
+	assert.Assert(t, err)
95
+	defer l.Close()
96
+
97
+	m1 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("a message")}
98
+	m2 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 20 * time.Minute), Line: []byte("another message"), PLogMetaData: &backend.PartialLogMetaData{Ordinal: 1, Last: true}}
99
+	longMessage := []byte("a really long message " + strings.Repeat("a", initialBufSize*2))
100
+	m3 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: longMessage}
101
+	m4 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("just one more message")}
102
+
103
+	// copy the log message because the underlying log writer resets the log message and returns it to a buffer pool
104
+	err = l.Log(copyLogMessage(&m1))
105
+	assert.Assert(t, err)
106
+	err = l.Log(copyLogMessage(&m2))
107
+	assert.Assert(t, err)
108
+	err = l.Log(copyLogMessage(&m3))
109
+	assert.Assert(t, err)
110
+	err = l.Log(copyLogMessage(&m4))
111
+	assert.Assert(t, err)
112
+
113
+	lr := l.(logger.LogReader)
114
+
115
+	testMessage := func(t *testing.T, lw *logger.LogWatcher, m *logger.Message) {
116
+		t.Helper()
117
+		ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
118
+		defer cancel()
119
+		select {
120
+		case <-ctx.Done():
121
+			assert.Assert(t, ctx.Err())
122
+		case err := <-lw.Err:
123
+			assert.Assert(t, err)
124
+		case msg, open := <-lw.Msg:
125
+			if !open {
126
+				select {
127
+				case err := <-lw.Err:
128
+					assert.Assert(t, err)
129
+				default:
130
+					assert.Assert(t, m == nil)
131
+					return
132
+				}
133
+			}
134
+			assert.Assert(t, m != nil)
135
+			if m.PLogMetaData == nil {
136
+				// a `\n` is appended on read to make this work with the existing API's when the message is not a partial.
137
+				// make sure it's the last entry in the line, and then truncate it for the deep equal below.
138
+				assert.Check(t, msg.Line[len(msg.Line)-1] == '\n')
139
+				msg.Line = msg.Line[:len(msg.Line)-1]
140
+			}
141
+			assert.Check(t, is.DeepEqual(m, msg), fmt.Sprintf("\n%+v\n%+v", m, msg))
142
+		}
143
+	}
144
+
145
+	t.Run("tail exact", func(t *testing.T) {
146
+		lw := lr.ReadLogs(logger.ReadConfig{Tail: 4})
147
+
148
+		testMessage(t, lw, &m1)
149
+		testMessage(t, lw, &m2)
150
+		testMessage(t, lw, &m3)
151
+		testMessage(t, lw, &m4)
152
+		testMessage(t, lw, nil) // no more messages
153
+	})
154
+
155
+	t.Run("tail less than available", func(t *testing.T) {
156
+		lw := lr.ReadLogs(logger.ReadConfig{Tail: 2})
157
+
158
+		testMessage(t, lw, &m3)
159
+		testMessage(t, lw, &m4)
160
+		testMessage(t, lw, nil) // no more messages
161
+	})
162
+
163
+	t.Run("tail more than available", func(t *testing.T) {
164
+		lw := lr.ReadLogs(logger.ReadConfig{Tail: 100})
165
+
166
+		testMessage(t, lw, &m1)
167
+		testMessage(t, lw, &m2)
168
+		testMessage(t, lw, &m3)
169
+		testMessage(t, lw, &m4)
170
+		testMessage(t, lw, nil) // no more messages
171
+	})
172
+}
173
+
174
+func BenchmarkLogWrite(b *testing.B) {
175
+	f, err := ioutil.TempFile("", b.Name())
176
+	assert.Assert(b, err)
177
+	defer os.Remove(f.Name())
178
+	f.Close()
179
+
180
+	local, err := New(logger.Info{LogPath: f.Name()})
181
+	assert.Assert(b, err)
182
+	defer local.Close()
183
+
184
+	t := time.Now().UTC()
185
+	for _, data := range [][]byte{
186
+		[]byte(""),
187
+		[]byte("a short string"),
188
+		bytes.Repeat([]byte("a long string"), 100),
189
+		bytes.Repeat([]byte("a really long string"), 10000),
190
+	} {
191
+		b.Run(fmt.Sprintf("%d", len(data)), func(b *testing.B) {
192
+			entry := &logdriver.LogEntry{Line: data, Source: "stdout", TimeNano: t.UnixNano()}
193
+			b.SetBytes(int64(entry.Size() + encodeBinaryLen + encodeBinaryLen))
194
+			b.ResetTimer()
195
+			for i := 0; i < b.N; i++ {
196
+				msg := logger.NewMessage()
197
+				msg.Line = data
198
+				msg.Timestamp = t
199
+				msg.Source = "stdout"
200
+				if err := local.Log(msg); err != nil {
201
+					b.Fatal(err)
202
+				}
203
+			}
204
+		})
205
+	}
206
+}
207
+
208
+func copyLogMessage(src *logger.Message) *logger.Message {
209
+	dst := logger.NewMessage()
210
+	dst.Source = src.Source
211
+	dst.Timestamp = src.Timestamp
212
+	dst.Attrs = src.Attrs
213
+	dst.Err = src.Err
214
+	dst.Line = append(dst.Line, src.Line...)
215
+	if src.PLogMetaData != nil {
216
+		dst.PLogMetaData = &(*src.PLogMetaData)
217
+	}
218
+	return dst
219
+}
0 220
new file mode 100644
... ...
@@ -0,0 +1,174 @@
0
+package local
1
+
2
+import (
3
+	"context"
4
+	"encoding/binary"
5
+	"io"
6
+
7
+	"bytes"
8
+
9
+	"github.com/docker/docker/api/types/plugins/logdriver"
10
+	"github.com/docker/docker/daemon/logger"
11
+	"github.com/docker/docker/daemon/logger/loggerutils"
12
+	"github.com/docker/docker/errdefs"
13
+	"github.com/pkg/errors"
14
+)
15
+
16
+func (d *driver) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
17
+	logWatcher := logger.NewLogWatcher()
18
+
19
+	go d.readLogs(logWatcher, config)
20
+	return logWatcher
21
+}
22
+
23
+func (d *driver) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
24
+	defer close(watcher.Msg)
25
+
26
+	d.mu.Lock()
27
+	d.readers[watcher] = struct{}{}
28
+	d.mu.Unlock()
29
+
30
+	d.logfile.ReadLogs(config, watcher)
31
+
32
+	d.mu.Lock()
33
+	delete(d.readers, watcher)
34
+	d.mu.Unlock()
35
+}
36
+
37
+func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) {
38
+	size := r.Size()
39
+	if req < 0 {
40
+		return nil, 0, errdefs.InvalidParameter(errors.Errorf("invalid number of lines to tail: %d", req))
41
+	}
42
+
43
+	if size < (encodeBinaryLen*2)+1 {
44
+		return bytes.NewReader(nil), 0, nil
45
+	}
46
+
47
+	const encodeBinaryLen64 = int64(encodeBinaryLen)
48
+	var found int
49
+
50
+	buf := make([]byte, encodeBinaryLen)
51
+
52
+	offset := size
53
+	for {
54
+		select {
55
+		case <-ctx.Done():
56
+			return nil, 0, ctx.Err()
57
+		default:
58
+		}
59
+
60
+		n, err := r.ReadAt(buf, offset-encodeBinaryLen64)
61
+		if err != nil && err != io.EOF {
62
+			return nil, 0, errors.Wrap(err, "error reading log message footer")
63
+		}
64
+
65
+		if n != encodeBinaryLen {
66
+			return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message footer"))
67
+		}
68
+
69
+		msgLen := binary.BigEndian.Uint32(buf)
70
+
71
+		n, err = r.ReadAt(buf, offset-encodeBinaryLen64-encodeBinaryLen64-int64(msgLen))
72
+		if err != nil && err != io.EOF {
73
+			return nil, 0, errors.Wrap(err, "error reading log message header")
74
+		}
75
+
76
+		if n != encodeBinaryLen {
77
+			return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message header"))
78
+		}
79
+
80
+		if msgLen != binary.BigEndian.Uint32(buf) {
81
+			return nil, 0, errdefs.DataLoss(errors.Wrap(err, "log message header and footer indicate different message sizes"))
82
+		}
83
+
84
+		found++
85
+		offset -= int64(msgLen)
86
+		offset -= encodeBinaryLen64 * 2
87
+		if found == req {
88
+			break
89
+		}
90
+		if offset <= 0 {
91
+			break
92
+		}
93
+	}
94
+
95
+	return io.NewSectionReader(r, offset, size), found, nil
96
+}
97
+
98
+func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
99
+	proto := &logdriver.LogEntry{}
100
+	buf := make([]byte, initialBufSize)
101
+
102
+	return func() (*logger.Message, error) {
103
+		var (
104
+			read int
105
+			err  error
106
+		)
107
+
108
+		resetProto(proto)
109
+
110
+		for i := 0; i < maxDecodeRetry; i++ {
111
+			var n int
112
+			n, err = io.ReadFull(rdr, buf[read:encodeBinaryLen])
113
+			if err != nil {
114
+				if err != io.ErrUnexpectedEOF {
115
+					return nil, errors.Wrap(err, "error reading log message length")
116
+				}
117
+				read += n
118
+				continue
119
+			}
120
+			read += n
121
+			break
122
+		}
123
+		if err != nil {
124
+			return nil, errors.Wrapf(err, "could not read log message length: read: %d, expected: %d", read, encodeBinaryLen)
125
+		}
126
+
127
+		msgLen := int(binary.BigEndian.Uint32(buf[:read]))
128
+
129
+		if len(buf) < msgLen+encodeBinaryLen {
130
+			buf = make([]byte, msgLen+encodeBinaryLen)
131
+		} else {
132
+			if msgLen <= initialBufSize {
133
+				buf = buf[:initialBufSize]
134
+			} else {
135
+				buf = buf[:msgLen+encodeBinaryLen]
136
+			}
137
+		}
138
+
139
+		return decodeLogEntry(rdr, proto, buf, msgLen)
140
+	}
141
+}
142
+
143
+func decodeLogEntry(rdr io.Reader, proto *logdriver.LogEntry, buf []byte, msgLen int) (*logger.Message, error) {
144
+	var (
145
+		read int
146
+		err  error
147
+	)
148
+	for i := 0; i < maxDecodeRetry; i++ {
149
+		var n int
150
+		n, err = io.ReadFull(rdr, buf[read:msgLen+encodeBinaryLen])
151
+		if err != nil {
152
+			if err != io.ErrUnexpectedEOF {
153
+				return nil, errors.Wrap(err, "could not decode log entry")
154
+			}
155
+			read += n
156
+			continue
157
+		}
158
+		break
159
+	}
160
+	if err != nil {
161
+		return nil, errors.Wrapf(err, "could not decode entry: read %d, expected: %d", read, msgLen)
162
+	}
163
+
164
+	if err := proto.Unmarshal(buf[:msgLen]); err != nil {
165
+		return nil, errors.Wrap(err, "error unmarshalling log entry")
166
+	}
167
+
168
+	msg := protoToMessage(proto)
169
+	if msg.PLogMetaData == nil {
170
+		msg.Line = append(msg.Line, '\n')
171
+	}
172
+	return msg, nil
173
+}