Browse code

Split reader interface from logger interface

Implement new reader interface on jsonfile.
Moves jsonlog decoding from daemon to jsonfile logger.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2015/07/03 22:50:06
Showing 13 changed files
... ...
@@ -19,7 +19,7 @@ func (cli *DockerCli) CmdLogs(args ...string) error {
19 19
 	follow := cmd.Bool([]string{"f", "-follow"}, false, "Follow log output")
20 20
 	since := cmd.String([]string{"-since"}, "", "Show logs since timestamp")
21 21
 	times := cmd.Bool([]string{"t", "-timestamps"}, false, "Show timestamps")
22
-	tail := cmd.String([]string{"-tail"}, "latest", "Number of lines to show from the end of the logs")
22
+	tail := cmd.String([]string{"-tail"}, "all", "Number of lines to show from the end of the logs")
23 23
 	cmd.Require(flag.Exact, 1)
24 24
 
25 25
 	cmd.ParseFlags(args, true)
... ...
@@ -629,6 +629,17 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite
629 629
 		closeNotifier = notifier.CloseNotify()
630 630
 	}
631 631
 
632
+	c, err := s.daemon.Get(vars["name"])
633
+	if err != nil {
634
+		return err
635
+	}
636
+
637
+	outStream := ioutils.NewWriteFlusher(w)
638
+	// write an empty chunk of data (this is to ensure that the
639
+	// HTTP Response is sent immediatly, even if the container has
640
+	// not yet produced any data)
641
+	outStream.Write(nil)
642
+
632 643
 	logsConfig := &daemon.ContainerLogsConfig{
633 644
 		Follow:     boolValue(r, "follow"),
634 645
 		Timestamps: boolValue(r, "timestamps"),
... ...
@@ -636,11 +647,11 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite
636 636
 		Tail:       r.Form.Get("tail"),
637 637
 		UseStdout:  stdout,
638 638
 		UseStderr:  stderr,
639
-		OutStream:  ioutils.NewWriteFlusher(w),
639
+		OutStream:  outStream,
640 640
 		Stop:       closeNotifier,
641 641
 	}
642 642
 
643
-	if err := s.daemon.ContainerLogs(vars["name"], logsConfig); err != nil {
643
+	if err := s.daemon.ContainerLogs(c, logsConfig); err != nil {
644 644
 		fmt.Fprintf(w, "Error running logs job: %s\n", err)
645 645
 	}
646 646
 
... ...
@@ -25,7 +25,6 @@ import (
25 25
 	"github.com/docker/docker/pkg/broadcastwriter"
26 26
 	"github.com/docker/docker/pkg/fileutils"
27 27
 	"github.com/docker/docker/pkg/ioutils"
28
-	"github.com/docker/docker/pkg/jsonlog"
29 28
 	"github.com/docker/docker/pkg/mount"
30 29
 	"github.com/docker/docker/pkg/nat"
31 30
 	"github.com/docker/docker/pkg/promise"
... ...
@@ -721,6 +720,9 @@ func (container *Container) getLogConfig() runconfig.LogConfig {
721 721
 }
722 722
 
723 723
 func (container *Container) getLogger() (logger.Logger, error) {
724
+	if container.logDriver != nil && container.IsRunning() {
725
+		return container.logDriver, nil
726
+	}
724 727
 	cfg := container.getLogConfig()
725 728
 	if err := logger.ValidateLogOpts(cfg.Type, cfg.Config); err != nil {
726 729
 		return nil, err
... ...
@@ -894,36 +896,33 @@ func (c *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writ
894 894
 }
895 895
 
896 896
 func (c *Container) AttachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error {
897
-
898 897
 	if logs {
899 898
 		logDriver, err := c.getLogger()
900 899
 		if err != nil {
901
-			logrus.Errorf("Error obtaining the logger %v", err)
902 900
 			return err
903 901
 		}
904
-		if _, ok := logDriver.(logger.Reader); !ok {
905
-			logrus.Errorf("cannot read logs for [%s] driver", logDriver.Name())
906
-		} else {
907
-			if cLog, err := logDriver.(logger.Reader).ReadLog(); err != nil {
908
-				logrus.Errorf("Error reading logs %v", err)
909
-			} else {
910
-				dec := json.NewDecoder(cLog)
911
-				for {
912
-					l := &jsonlog.JSONLog{}
913
-
914
-					if err := dec.Decode(l); err == io.EOF {
915
-						break
916
-					} else if err != nil {
917
-						logrus.Errorf("Error streaming logs: %s", err)
918
-						break
919
-					}
920
-					if l.Stream == "stdout" && stdout != nil {
921
-						io.WriteString(stdout, l.Log)
922
-					}
923
-					if l.Stream == "stderr" && stderr != nil {
924
-						io.WriteString(stderr, l.Log)
925
-					}
902
+		cLog, ok := logDriver.(logger.LogReader)
903
+		if !ok {
904
+			return logger.ErrReadLogsNotSupported
905
+		}
906
+		logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1})
907
+
908
+	LogLoop:
909
+		for {
910
+			select {
911
+			case msg, ok := <-logs.Msg:
912
+				if !ok {
913
+					break LogLoop
914
+				}
915
+				if msg.Source == "stdout" && stdout != nil {
916
+					stdout.Write(msg.Line)
917
+				}
918
+				if msg.Source == "stderr" && stderr != nil {
919
+					stderr.Write(msg.Line)
926 920
 				}
921
+			case err := <-logs.Err:
922
+				logrus.Errorf("Error streaming logs: %v", err)
923
+				break LogLoop
927 924
 			}
928 925
 		}
929 926
 	}
... ...
@@ -27,6 +27,7 @@ type Context struct {
27 27
 	LogPath             string
28 28
 }
29 29
 
30
+// Hostname returns the hostname from the underlying OS
30 31
 func (ctx *Context) Hostname() (string, error) {
31 32
 	hostname, err := os.Hostname()
32 33
 	if err != nil {
... ...
@@ -35,6 +36,7 @@ func (ctx *Context) Hostname() (string, error) {
35 35
 	return hostname, nil
36 36
 }
37 37
 
38
+// Command returns the command that the container being logged was started with
38 39
 func (ctx *Context) Command() string {
39 40
 	terms := []string{ctx.ContainerEntrypoint}
40 41
 	for _, arg := range ctx.ContainerArgs {
... ...
@@ -2,32 +2,42 @@ package jsonfilelog
2 2
 
3 3
 import (
4 4
 	"bytes"
5
+	"encoding/json"
5 6
 	"fmt"
6 7
 	"io"
7 8
 	"os"
8 9
 	"strconv"
9 10
 	"sync"
11
+	"time"
12
+
13
+	"gopkg.in/fsnotify.v1"
10 14
 
11 15
 	"github.com/Sirupsen/logrus"
12 16
 	"github.com/docker/docker/daemon/logger"
17
+	"github.com/docker/docker/pkg/ioutils"
13 18
 	"github.com/docker/docker/pkg/jsonlog"
19
+	"github.com/docker/docker/pkg/pubsub"
20
+	"github.com/docker/docker/pkg/tailfile"
14 21
 	"github.com/docker/docker/pkg/timeutils"
15 22
 	"github.com/docker/docker/pkg/units"
16 23
 )
17 24
 
18 25
 const (
19
-	Name = "json-file"
26
+	Name               = "json-file"
27
+	maxJSONDecodeRetry = 10
20 28
 )
21 29
 
22 30
 // JSONFileLogger is Logger implementation for default docker logging:
23 31
 // JSON objects to file
24 32
 type JSONFileLogger struct {
25
-	buf      *bytes.Buffer
26
-	f        *os.File   // store for closing
27
-	mu       sync.Mutex // protects buffer
28
-	capacity int64      //maximum size of each file
29
-	n        int        //maximum number of files
30
-	ctx      logger.Context
33
+	buf          *bytes.Buffer
34
+	f            *os.File   // store for closing
35
+	mu           sync.Mutex // protects buffer
36
+	capacity     int64      //maximum size of each file
37
+	n            int        //maximum number of files
38
+	ctx          logger.Context
39
+	readers      map[*logger.LogWatcher]struct{} // stores the active log followers
40
+	notifyRotate *pubsub.Publisher
31 41
 }
32 42
 
33 43
 func init() {
... ...
@@ -64,11 +74,13 @@ func New(ctx logger.Context) (logger.Logger, error) {
64 64
 		}
65 65
 	}
66 66
 	return &JSONFileLogger{
67
-		f:        log,
68
-		buf:      bytes.NewBuffer(nil),
69
-		ctx:      ctx,
70
-		capacity: capval,
71
-		n:        maxFiles,
67
+		f:            log,
68
+		buf:          bytes.NewBuffer(nil),
69
+		ctx:          ctx,
70
+		capacity:     capval,
71
+		n:            maxFiles,
72
+		readers:      make(map[*logger.LogWatcher]struct{}),
73
+		notifyRotate: pubsub.NewPublisher(0, 1),
72 74
 	}, nil
73 75
 }
74 76
 
... ...
@@ -111,6 +123,7 @@ func writeLog(l *JSONFileLogger) (int64, error) {
111 111
 			return -1, err
112 112
 		}
113 113
 		l.f = file
114
+		l.notifyRotate.Publish(struct{}{})
114 115
 	}
115 116
 	return writeToBuf(l)
116 117
 }
... ...
@@ -148,11 +161,11 @@ func backup(old, curr string) error {
148 148
 		}
149 149
 	}
150 150
 	if _, err := os.Stat(curr); os.IsNotExist(err) {
151
-		if f, err := os.Create(curr); err != nil {
151
+		f, err := os.Create(curr)
152
+		if err != nil {
152 153
 			return err
153
-		} else {
154
-			f.Close()
155 154
 		}
155
+		f.Close()
156 156
 	}
157 157
 	return os.Rename(curr, old)
158 158
 }
... ...
@@ -169,31 +182,200 @@ func ValidateLogOpt(cfg map[string]string) error {
169 169
 	return nil
170 170
 }
171 171
 
172
-func (l *JSONFileLogger) ReadLog(args ...string) (io.Reader, error) {
173
-	pth := l.ctx.LogPath
174
-	if len(args) > 0 {
175
-		//check if args[0] is an integer index
176
-		index, err := strconv.ParseInt(args[0], 0, 0)
177
-		if err != nil {
178
-			return nil, err
179
-		}
180
-		if index > 0 {
181
-			pth = pth + "." + args[0]
182
-		}
183
-	}
184
-	return os.Open(pth)
185
-}
186
-
187 172
 func (l *JSONFileLogger) LogPath() string {
188 173
 	return l.ctx.LogPath
189 174
 }
190 175
 
191
-// Close closes underlying file
176
+// Close closes underlying file and signals all readers to stop
192 177
 func (l *JSONFileLogger) Close() error {
193
-	return l.f.Close()
178
+	l.mu.Lock()
179
+	err := l.f.Close()
180
+	for r := range l.readers {
181
+		r.Close()
182
+		delete(l.readers, r)
183
+	}
184
+	l.mu.Unlock()
185
+	return err
194 186
 }
195 187
 
196 188
 // Name returns name of this logger
197 189
 func (l *JSONFileLogger) Name() string {
198 190
 	return Name
199 191
 }
192
+
193
+func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
194
+	l.Reset()
195
+	if err := dec.Decode(l); err != nil {
196
+		return nil, err
197
+	}
198
+	msg := &logger.Message{
199
+		Source:    l.Stream,
200
+		Timestamp: l.Created,
201
+		Line:      []byte(l.Log),
202
+	}
203
+	return msg, nil
204
+}
205
+
206
+// Reads from the log file
207
+func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
208
+	logWatcher := logger.NewLogWatcher()
209
+
210
+	go l.readLogs(logWatcher, config)
211
+	return logWatcher
212
+}
213
+
214
+func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
215
+	defer close(logWatcher.Msg)
216
+
217
+	pth := l.ctx.LogPath
218
+	var files []io.ReadSeeker
219
+	for i := l.n; i > 1; i-- {
220
+		f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
221
+		if err != nil {
222
+			if !os.IsNotExist(err) {
223
+				logWatcher.Err <- err
224
+				break
225
+			}
226
+			continue
227
+		}
228
+		defer f.Close()
229
+		files = append(files, f)
230
+	}
231
+
232
+	latestFile, err := os.Open(pth)
233
+	if err != nil {
234
+		logWatcher.Err <- err
235
+		return
236
+	}
237
+	defer latestFile.Close()
238
+
239
+	files = append(files, latestFile)
240
+	tailer := ioutils.MultiReadSeeker(files...)
241
+
242
+	if config.Tail != 0 {
243
+		tailFile(tailer, logWatcher, config.Tail, config.Since)
244
+	}
245
+
246
+	if !config.Follow {
247
+		return
248
+	}
249
+	if config.Tail == 0 {
250
+		latestFile.Seek(0, os.SEEK_END)
251
+	}
252
+
253
+	l.mu.Lock()
254
+	l.readers[logWatcher] = struct{}{}
255
+	l.mu.Unlock()
256
+
257
+	notifyRotate := l.notifyRotate.Subscribe()
258
+	followLogs(latestFile, logWatcher, notifyRotate, config.Since)
259
+
260
+	l.mu.Lock()
261
+	delete(l.readers, logWatcher)
262
+	l.mu.Unlock()
263
+
264
+	l.notifyRotate.Evict(notifyRotate)
265
+}
266
+
267
+func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
268
+	var rdr io.Reader = f
269
+	if tail > 0 {
270
+		ls, err := tailfile.TailFile(f, tail)
271
+		if err != nil {
272
+			logWatcher.Err <- err
273
+			return
274
+		}
275
+		rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
276
+	}
277
+	dec := json.NewDecoder(rdr)
278
+	l := &jsonlog.JSONLog{}
279
+	for {
280
+		msg, err := decodeLogLine(dec, l)
281
+		if err != nil {
282
+			if err != io.EOF {
283
+				logWatcher.Err <- err
284
+			}
285
+			return
286
+		}
287
+		if !since.IsZero() && msg.Timestamp.Before(since) {
288
+			continue
289
+		}
290
+		logWatcher.Msg <- msg
291
+	}
292
+}
293
+
294
+func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
295
+	dec := json.NewDecoder(f)
296
+	l := &jsonlog.JSONLog{}
297
+	fileWatcher, err := fsnotify.NewWatcher()
298
+	if err != nil {
299
+		logWatcher.Err <- err
300
+		return
301
+	}
302
+	defer fileWatcher.Close()
303
+	if err := fileWatcher.Add(f.Name()); err != nil {
304
+		logWatcher.Err <- err
305
+		return
306
+	}
307
+
308
+	var retries int
309
+	for {
310
+		msg, err := decodeLogLine(dec, l)
311
+		if err != nil {
312
+			if err != io.EOF {
313
+				// try again because this shouldn't happen
314
+				if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
315
+					dec = json.NewDecoder(f)
316
+					retries += 1
317
+					continue
318
+				}
319
+				logWatcher.Err <- err
320
+				return
321
+			}
322
+
323
+			select {
324
+			case <-fileWatcher.Events:
325
+				dec = json.NewDecoder(f)
326
+				continue
327
+			case <-fileWatcher.Errors:
328
+				logWatcher.Err <- err
329
+				return
330
+			case <-logWatcher.WatchClose():
331
+				return
332
+			case <-notifyRotate:
333
+				fileWatcher.Remove(f.Name())
334
+
335
+				f, err = os.Open(f.Name())
336
+				if err != nil {
337
+					logWatcher.Err <- err
338
+					return
339
+				}
340
+				if err := fileWatcher.Add(f.Name()); err != nil {
341
+					logWatcher.Err <- err
342
+				}
343
+				dec = json.NewDecoder(f)
344
+				continue
345
+			}
346
+		}
347
+
348
+		retries = 0 // reset retries since we've succeeded
349
+		if !since.IsZero() && msg.Timestamp.Before(since) {
350
+			continue
351
+		}
352
+		select {
353
+		case logWatcher.Msg <- msg:
354
+		case <-logWatcher.WatchClose():
355
+			logWatcher.Msg <- msg
356
+			for {
357
+				msg, err := decodeLogLine(dec, l)
358
+				if err != nil {
359
+					return
360
+				}
361
+				if !since.IsZero() && msg.Timestamp.Before(since) {
362
+					continue
363
+				}
364
+				logWatcher.Msg <- msg
365
+			}
366
+		}
367
+	}
368
+}
... ...
@@ -2,11 +2,19 @@ package logger
2 2
 
3 3
 import (
4 4
 	"errors"
5
-	"io"
6 5
 	"time"
6
+
7
+	"github.com/docker/docker/pkg/timeutils"
7 8
 )
8 9
 
9
-var ReadLogsNotSupported = errors.New("configured logging reader does not support reading")
10
+// ErrReadLogsNotSupported is returned when the logger does not support reading logs
11
+var ErrReadLogsNotSupported = errors.New("configured logging reader does not support reading")
12
+
13
+const (
14
+	// TimeFormat is the time format used for timestamps sent to log readers
15
+	TimeFormat           = timeutils.RFC3339NanoFixed
16
+	logWatcherBufferSize = 4096
17
+)
10 18
 
11 19
 // Message is datastructure that represents record from some container
12 20
 type Message struct {
... ...
@@ -16,14 +24,51 @@ type Message struct {
16 16
 	Timestamp   time.Time
17 17
 }
18 18
 
19
-// Logger is interface for docker logging drivers
19
+// Logger is the interface for docker logging drivers
20 20
 type Logger interface {
21 21
 	Log(*Message) error
22 22
 	Name() string
23 23
 	Close() error
24 24
 }
25 25
 
26
-//Reader is an interface for docker logging drivers that support reading
27
-type Reader interface {
28
-	ReadLog(args ...string) (io.Reader, error)
26
+// ReadConfig is the configuration passed into ReadLogs
27
+type ReadConfig struct {
28
+	Since  time.Time
29
+	Tail   int
30
+	Follow bool
31
+}
32
+
33
+// LogReader is the interface for reading log messages for loggers that support reading
34
+type LogReader interface {
35
+	// Read logs from underlying logging backend
36
+	ReadLogs(ReadConfig) *LogWatcher
37
+}
38
+
39
+// LogWatcher is used when consuming logs read from the LogReader interface
40
+type LogWatcher struct {
41
+	// For sending log messages to a reader
42
+	Msg chan *Message
43
+	// For sending error messages that occur while while reading logs
44
+	Err           chan error
45
+	closeNotifier chan struct{}
46
+}
47
+
48
+// NewLogWatcher returns a new LogWatcher.
49
+func NewLogWatcher() *LogWatcher {
50
+	return &LogWatcher{
51
+		Msg:           make(chan *Message, logWatcherBufferSize),
52
+		Err:           make(chan error, 1),
53
+		closeNotifier: make(chan struct{}),
54
+	}
55
+}
56
+
57
+// Close notifies the underlying log reader to stop
58
+func (w *LogWatcher) Close() {
59
+	close(w.closeNotifier)
60
+}
61
+
62
+// WatchClose returns a channel receiver that receives notification when the watcher has been closed
63
+// This should only be called from one goroutine
64
+func (w *LogWatcher) WatchClose() <-chan struct{} {
65
+	return w.closeNotifier
29 66
 }
... ...
@@ -1,23 +1,14 @@
1 1
 package daemon
2 2
 
3 3
 import (
4
-	"bytes"
5
-	"encoding/json"
6 4
 	"fmt"
7 5
 	"io"
8
-	"net"
9
-	"os"
10 6
 	"strconv"
11
-	"syscall"
12 7
 	"time"
13 8
 
14 9
 	"github.com/Sirupsen/logrus"
15 10
 	"github.com/docker/docker/daemon/logger"
16
-	"github.com/docker/docker/daemon/logger/jsonfilelog"
17
-	"github.com/docker/docker/pkg/jsonlog"
18 11
 	"github.com/docker/docker/pkg/stdcopy"
19
-	"github.com/docker/docker/pkg/tailfile"
20
-	"github.com/docker/docker/pkg/timeutils"
21 12
 )
22 13
 
23 14
 type ContainerLogsConfig struct {
... ...
@@ -29,209 +20,64 @@ type ContainerLogsConfig struct {
29 29
 	Stop                 <-chan bool
30 30
 }
31 31
 
32
-func (daemon *Daemon) ContainerLogs(name string, config *ContainerLogsConfig) error {
33
-	var (
34
-		lines  = -1
35
-		format string
36
-	)
32
+func (daemon *Daemon) ContainerLogs(container *Container, config *ContainerLogsConfig) error {
37 33
 	if !(config.UseStdout || config.UseStderr) {
38 34
 		return fmt.Errorf("You must choose at least one stream")
39 35
 	}
40
-	if config.Timestamps {
41
-		format = timeutils.RFC3339NanoFixed
42
-	}
43
-	if config.Tail == "" {
44
-		config.Tail = "latest"
45
-	}
46
-
47
-	container, err := daemon.Get(name)
48
-	if err != nil {
49
-		return err
50
-	}
51 36
 
52
-	var (
53
-		outStream = config.OutStream
54
-		errStream io.Writer
55
-	)
37
+	outStream := config.OutStream
38
+	errStream := outStream
56 39
 	if !container.Config.Tty {
57 40
 		errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr)
58 41
 		outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
59
-	} else {
60
-		errStream = outStream
61
-	}
62
-
63
-	if container.LogDriverType() != jsonfilelog.Name {
64
-		return fmt.Errorf("\"logs\" endpoint is supported only for \"json-file\" logging driver")
65
-	}
66
-
67
-	maxFile := 1
68
-	container.readHostConfig()
69
-	cfg := container.getLogConfig()
70
-	conf := cfg.Config
71
-	if val, ok := conf["max-file"]; ok {
72
-		var err error
73
-		maxFile, err = strconv.Atoi(val)
74
-		if err != nil {
75
-			return fmt.Errorf("Error reading max-file value: %s", err)
76
-		}
77 42
 	}
78 43
 
79
-	logDriver, err := container.getLogger()
44
+	cLog, err := container.getLogger()
80 45
 	if err != nil {
81 46
 		return err
82 47
 	}
83
-	_, ok := logDriver.(logger.Reader)
48
+	logReader, ok := cLog.(logger.LogReader)
84 49
 	if !ok {
85
-		logrus.Errorf("Cannot read logs of the [%s] driver", logDriver.Name())
86
-	} else {
87
-		// json-file driver
88
-		if config.Tail != "all" && config.Tail != "latest" {
89
-			var err error
90
-			lines, err = strconv.Atoi(config.Tail)
91
-			if err != nil {
92
-				logrus.Errorf("Failed to parse tail %s, error: %v, show all logs", config.Tail, err)
93
-				lines = -1
94
-			}
95
-		}
96
-
97
-		if lines != 0 {
98
-			n := maxFile
99
-			if config.Tail == "latest" && config.Since.IsZero() {
100
-				n = 1
101
-			}
102
-			before := false
103
-			for i := n; i > 0; i-- {
104
-				if before {
105
-					break
106
-				}
107
-				cLog, err := getReader(logDriver, i, n, lines)
108
-				if err != nil {
109
-					logrus.Debugf("Error reading %d log file: %v", i-1, err)
110
-					continue
111
-				}
112
-				//if lines are specified, then iterate only once
113
-				if lines > 0 {
114
-					i = 1
115
-				} else { // if lines are not specified, cLog is a file, It needs to be closed
116
-					defer cLog.(*os.File).Close()
117
-				}
118
-				dec := json.NewDecoder(cLog)
119
-				l := &jsonlog.JSONLog{}
120
-				for {
121
-					l.Reset()
122
-					if err := dec.Decode(l); err == io.EOF {
123
-						break
124
-					} else if err != nil {
125
-						logrus.Errorf("Error streaming logs: %s", err)
126
-						break
127
-					}
128
-					logLine := l.Log
129
-					if !config.Since.IsZero() && l.Created.Before(config.Since) {
130
-						continue
131
-					}
132
-					if config.Timestamps {
133
-						// format can be "" or time format, so here can't be error
134
-						logLine, _ = l.Format(format)
135
-					}
136
-					if l.Stream == "stdout" && config.UseStdout {
137
-						io.WriteString(outStream, logLine)
138
-					}
139
-					if l.Stream == "stderr" && config.UseStderr {
140
-						io.WriteString(errStream, logLine)
141
-					}
142
-				}
143
-			}
144
-		}
50
+		return logger.ErrReadLogsNotSupported
145 51
 	}
146 52
 
147
-	if config.Follow && container.IsRunning() {
148
-		chErrStderr := make(chan error)
149
-		chErrStdout := make(chan error)
150
-		var stdoutPipe, stderrPipe io.ReadCloser
151
-
152
-		// write an empty chunk of data (this is to ensure that the
153
-		// HTTP Response is sent immediatly, even if the container has
154
-		// not yet produced any data)
155
-		outStream.Write(nil)
53
+	follow := config.Follow && container.IsRunning()
54
+	tailLines, err := strconv.Atoi(config.Tail)
55
+	if err != nil {
56
+		tailLines = -1
57
+	}
156 58
 
157
-		if config.UseStdout {
158
-			stdoutPipe = container.StdoutLogPipe()
159
-			go func() {
160
-				logrus.Debug("logs: stdout stream begin")
161
-				chErrStdout <- jsonlog.WriteLog(stdoutPipe, outStream, format, config.Since)
162
-				logrus.Debug("logs: stdout stream end")
163
-			}()
164
-		}
165
-		if config.UseStderr {
166
-			stderrPipe = container.StderrLogPipe()
167
-			go func() {
168
-				logrus.Debug("logs: stderr stream begin")
169
-				chErrStderr <- jsonlog.WriteLog(stderrPipe, errStream, format, config.Since)
170
-				logrus.Debug("logs: stderr stream end")
171
-			}()
172
-		}
59
+	logrus.Debug("logs: begin stream")
60
+	readConfig := logger.ReadConfig{
61
+		Since:  config.Since,
62
+		Tail:   tailLines,
63
+		Follow: follow,
64
+	}
65
+	logs := logReader.ReadLogs(readConfig)
173 66
 
67
+	for {
174 68
 		select {
175
-		case err = <-chErrStderr:
176
-			if stdoutPipe != nil {
177
-				stdoutPipe.Close()
178
-				<-chErrStdout
179
-			}
180
-		case err = <-chErrStdout:
181
-			if stderrPipe != nil {
182
-				stderrPipe.Close()
183
-				<-chErrStderr
184
-			}
69
+		case err := <-logs.Err:
70
+			logrus.Errorf("Error streaming logs: %v", err)
71
+			return nil
185 72
 		case <-config.Stop:
186
-			if stdoutPipe != nil {
187
-				stdoutPipe.Close()
188
-				<-chErrStdout
73
+			logs.Close()
74
+			return nil
75
+		case msg, ok := <-logs.Msg:
76
+			if !ok {
77
+				logrus.Debugf("logs: end stream")
78
+				return nil
189 79
 			}
190
-			if stderrPipe != nil {
191
-				stderrPipe.Close()
192
-				<-chErrStderr
80
+			logLine := msg.Line
81
+			if config.Timestamps {
82
+				logLine = append([]byte(msg.Timestamp.Format(logger.TimeFormat)+" "), logLine...)
193 83
 			}
194
-			return nil
195
-		}
196
-
197
-		if err != nil && err != io.EOF && err != io.ErrClosedPipe {
198
-			if e, ok := err.(*net.OpError); ok && e.Err != syscall.EPIPE {
199
-				logrus.Errorf("error streaming logs: %v", err)
84
+			if msg.Source == "stdout" && config.UseStdout {
85
+				outStream.Write(logLine)
86
+			}
87
+			if msg.Source == "stderr" && config.UseStderr {
88
+				errStream.Write(logLine)
200 89
 			}
201 90
 		}
202 91
 	}
203
-	return nil
204
-}
205
-
206
-func getReader(logDriver logger.Logger, fileIndex, maxFiles, lines int) (io.Reader, error) {
207
-	if lines <= 0 {
208
-		index := strconv.Itoa(fileIndex - 1)
209
-		cLog, err := logDriver.(logger.Reader).ReadLog(index)
210
-		return cLog, err
211
-	}
212
-	buf := bytes.NewBuffer([]byte{})
213
-	remaining := lines
214
-	for i := 0; i < maxFiles; i++ {
215
-		index := strconv.Itoa(i)
216
-		cLog, err := logDriver.(logger.Reader).ReadLog(index)
217
-		if err != nil {
218
-			return buf, err
219
-		}
220
-		f := cLog.(*os.File)
221
-		ls, err := tailfile.TailFile(f, remaining)
222
-		if err != nil {
223
-			return buf, err
224
-		}
225
-		tmp := bytes.NewBuffer([]byte{})
226
-		for _, l := range ls {
227
-			fmt.Fprintf(tmp, "%s\n", l)
228
-		}
229
-		tmp.ReadFrom(buf)
230
-		buf = tmp
231
-		if len(ls) == remaining {
232
-			return buf, nil
233
-		}
234
-		remaining = remaining - len(ls)
235
-	}
236
-	return buf, nil
237 92
 }
... ...
@@ -12,7 +12,10 @@ import (
12 12
 	"github.com/docker/docker/runconfig"
13 13
 )
14 14
 
15
-const defaultTimeIncrement = 100
15
+const (
16
+	defaultTimeIncrement = 100
17
+	loggerCloseTimeout   = 10 * time.Second
18
+)
16 19
 
17 20
 // containerMonitor monitors the execution of a container's main process.
18 21
 // If a restart policy is specified for the container the monitor will ensure that the
... ...
@@ -310,7 +313,7 @@ func (m *containerMonitor) resetContainer(lock bool) {
310 310
 				close(exit)
311 311
 			}()
312 312
 			select {
313
-			case <-time.After(1 * time.Second):
313
+			case <-time.After(loggerCloseTimeout):
314 314
 				logrus.Warnf("Logger didn't exit in time: logs may be truncated")
315 315
 			case <-exit:
316 316
 			}
... ...
@@ -29,7 +29,7 @@ The `docker logs --follow` command will continue streaming the new output from
29 29
 the container's `STDOUT` and `STDERR`.
30 30
 
31 31
 Passing a negative number or a non-integer to `--tail` is invalid and the
32
-value is set to `latest` in that case.
32
+value is set to `all` in that case.
33 33
 
34 34
 The `docker logs --timestamp` commands will add an RFC3339Nano
35 35
 timestamp, for example `2014-09-16T06:17:46.000000000Z`, to each
... ...
@@ -250,13 +250,9 @@ func (s *DockerSuite) TestLogsFollowSlowStdoutConsumer(c *check.C) {
250 250
 	}()
251 251
 
252 252
 	logCmd := exec.Command(dockerBinary, "logs", "-f", cleanedContainerID)
253
-
254 253
 	stdout, err := logCmd.StdoutPipe()
255 254
 	c.Assert(err, check.IsNil)
256
-
257
-	if err := logCmd.Start(); err != nil {
258
-		c.Fatal(err)
259
-	}
255
+	c.Assert(logCmd.Start(), check.IsNil)
260 256
 
261 257
 	// First read slowly
262 258
 	bytes1, err := consumeWithSpeed(stdout, 10, 50*time.Millisecond, stopSlowRead)
263 259
new file mode 100644
... ...
@@ -0,0 +1,226 @@
0
+package ioutils
1
+
2
+import (
3
+	"bytes"
4
+	"fmt"
5
+	"io"
6
+	"os"
7
+)
8
+
9
+type pos struct {
10
+	idx    int
11
+	offset int64
12
+}
13
+
14
+type multiReadSeeker struct {
15
+	readers []io.ReadSeeker
16
+	pos     *pos
17
+	posIdx  map[io.ReadSeeker]int
18
+}
19
+
20
+func (r *multiReadSeeker) Seek(offset int64, whence int) (int64, error) {
21
+	var tmpOffset int64
22
+	switch whence {
23
+	case os.SEEK_SET:
24
+		for i, rdr := range r.readers {
25
+			// get size of the current reader
26
+			s, err := rdr.Seek(0, os.SEEK_END)
27
+			if err != nil {
28
+				return -1, err
29
+			}
30
+
31
+			if offset > tmpOffset+s {
32
+				if i == len(r.readers)-1 {
33
+					rdrOffset := s + (offset - tmpOffset)
34
+					if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil {
35
+						return -1, err
36
+					}
37
+					r.pos = &pos{i, rdrOffset}
38
+					return offset, nil
39
+				}
40
+
41
+				tmpOffset += s
42
+				continue
43
+			}
44
+
45
+			rdrOffset := offset - tmpOffset
46
+			idx := i
47
+
48
+			rdr.Seek(rdrOffset, os.SEEK_SET)
49
+			// make sure all following readers are at 0
50
+			for _, rdr := range r.readers[i+1:] {
51
+				rdr.Seek(0, os.SEEK_SET)
52
+			}
53
+
54
+			if rdrOffset == s && i != len(r.readers)-1 {
55
+				idx += 1
56
+				rdrOffset = 0
57
+			}
58
+			r.pos = &pos{idx, rdrOffset}
59
+			return offset, nil
60
+		}
61
+	case os.SEEK_END:
62
+		for _, rdr := range r.readers {
63
+			s, err := rdr.Seek(0, os.SEEK_END)
64
+			if err != nil {
65
+				return -1, err
66
+			}
67
+			tmpOffset += s
68
+		}
69
+		r.Seek(tmpOffset+offset, os.SEEK_SET)
70
+		return tmpOffset + offset, nil
71
+	case os.SEEK_CUR:
72
+		if r.pos == nil {
73
+			return r.Seek(offset, os.SEEK_SET)
74
+		}
75
+		// Just return the current offset
76
+		if offset == 0 {
77
+			return r.getCurOffset()
78
+		}
79
+
80
+		curOffset, err := r.getCurOffset()
81
+		if err != nil {
82
+			return -1, err
83
+		}
84
+		rdr, rdrOffset, err := r.getReaderForOffset(curOffset + offset)
85
+		if err != nil {
86
+			return -1, err
87
+		}
88
+
89
+		r.pos = &pos{r.posIdx[rdr], rdrOffset}
90
+		return curOffset + offset, nil
91
+	default:
92
+		return -1, fmt.Errorf("Invalid whence: %d", whence)
93
+	}
94
+
95
+	return -1, fmt.Errorf("Error seeking for whence: %d, offset: %d", whence, offset)
96
+}
97
+
98
+func (r *multiReadSeeker) getReaderForOffset(offset int64) (io.ReadSeeker, int64, error) {
99
+	var rdr io.ReadSeeker
100
+	var rdrOffset int64
101
+
102
+	for i, rdr := range r.readers {
103
+		offsetTo, err := r.getOffsetToReader(rdr)
104
+		if err != nil {
105
+			return nil, -1, err
106
+		}
107
+		if offsetTo > offset {
108
+			rdr = r.readers[i-1]
109
+			rdrOffset = offsetTo - offset
110
+			break
111
+		}
112
+
113
+		if rdr == r.readers[len(r.readers)-1] {
114
+			rdrOffset = offsetTo + offset
115
+			break
116
+		}
117
+	}
118
+
119
+	return rdr, rdrOffset, nil
120
+}
121
+
122
+func (r *multiReadSeeker) getCurOffset() (int64, error) {
123
+	var totalSize int64
124
+	for _, rdr := range r.readers[:r.pos.idx+1] {
125
+		if r.posIdx[rdr] == r.pos.idx {
126
+			totalSize += r.pos.offset
127
+			break
128
+		}
129
+
130
+		size, err := getReadSeekerSize(rdr)
131
+		if err != nil {
132
+			return -1, fmt.Errorf("error getting seeker size: %v", err)
133
+		}
134
+		totalSize += size
135
+	}
136
+	return totalSize, nil
137
+}
138
+
139
+func (r *multiReadSeeker) getOffsetToReader(rdr io.ReadSeeker) (int64, error) {
140
+	var offset int64
141
+	for _, r := range r.readers {
142
+		if r == rdr {
143
+			break
144
+		}
145
+
146
+		size, err := getReadSeekerSize(rdr)
147
+		if err != nil {
148
+			return -1, err
149
+		}
150
+		offset += size
151
+	}
152
+	return offset, nil
153
+}
154
+
155
+func (r *multiReadSeeker) Read(b []byte) (int, error) {
156
+	if r.pos == nil {
157
+		r.pos = &pos{0, 0}
158
+	}
159
+
160
+	bCap := int64(cap(b))
161
+	buf := bytes.NewBuffer(nil)
162
+	var rdr io.ReadSeeker
163
+
164
+	for _, rdr = range r.readers[r.pos.idx:] {
165
+		readBytes, err := io.CopyN(buf, rdr, bCap)
166
+		if err != nil && err != io.EOF {
167
+			return -1, err
168
+		}
169
+		bCap -= readBytes
170
+
171
+		if bCap == 0 {
172
+			break
173
+		}
174
+	}
175
+
176
+	rdrPos, err := rdr.Seek(0, os.SEEK_CUR)
177
+	if err != nil {
178
+		return -1, err
179
+	}
180
+	r.pos = &pos{r.posIdx[rdr], rdrPos}
181
+	return buf.Read(b)
182
+}
183
+
184
+func getReadSeekerSize(rdr io.ReadSeeker) (int64, error) {
185
+	// save the current position
186
+	pos, err := rdr.Seek(0, os.SEEK_CUR)
187
+	if err != nil {
188
+		return -1, err
189
+	}
190
+
191
+	// get the size
192
+	size, err := rdr.Seek(0, os.SEEK_END)
193
+	if err != nil {
194
+		return -1, err
195
+	}
196
+
197
+	// reset the position
198
+	if _, err := rdr.Seek(pos, os.SEEK_SET); err != nil {
199
+		return -1, err
200
+	}
201
+	return size, nil
202
+}
203
+
204
+// MultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided
205
+// input readseekers. After calling this method the initial position is set to the
206
+// beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances
207
+// to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker.
208
+// Seek can be used over the sum of lengths of all readseekers.
209
+//
210
+// When a MultiReadSeeker is used, no Read and Seek operations should be made on
211
+// its ReadSeeker components. Also, users should make no assumption on the state
212
+// of individual readseekers while the MultiReadSeeker is used.
213
+func MultiReadSeeker(readers ...io.ReadSeeker) io.ReadSeeker {
214
+	if len(readers) == 1 {
215
+		return readers[0]
216
+	}
217
+	idx := make(map[io.ReadSeeker]int)
218
+	for i, rdr := range readers {
219
+		idx[rdr] = i
220
+	}
221
+	return &multiReadSeeker{
222
+		readers: readers,
223
+		posIdx:  idx,
224
+	}
225
+}
0 226
new file mode 100644
... ...
@@ -0,0 +1,149 @@
0
+package ioutils
1
+
2
+import (
3
+	"bytes"
4
+	"fmt"
5
+	"io"
6
+	"io/ioutil"
7
+	"os"
8
+	"strings"
9
+	"testing"
10
+)
11
+
12
+func TestMultiReadSeekerReadAll(t *testing.T) {
13
+	str := "hello world"
14
+	s1 := strings.NewReader(str + " 1")
15
+	s2 := strings.NewReader(str + " 2")
16
+	s3 := strings.NewReader(str + " 3")
17
+	mr := MultiReadSeeker(s1, s2, s3)
18
+
19
+	expectedSize := int64(s1.Len() + s2.Len() + s3.Len())
20
+
21
+	b, err := ioutil.ReadAll(mr)
22
+	if err != nil {
23
+		t.Fatal(err)
24
+	}
25
+
26
+	expected := "hello world 1hello world 2hello world 3"
27
+	if string(b) != expected {
28
+		t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected)
29
+	}
30
+
31
+	size, err := mr.Seek(0, os.SEEK_END)
32
+	if err != nil {
33
+		t.Fatal(err)
34
+	}
35
+	if size != expectedSize {
36
+		t.Fatalf("reader size does not match, got %d, expected %d", size, expectedSize)
37
+	}
38
+
39
+	// Reset the position and read again
40
+	pos, err := mr.Seek(0, os.SEEK_SET)
41
+	if err != nil {
42
+		t.Fatal(err)
43
+	}
44
+	if pos != 0 {
45
+		t.Fatalf("expected position to be set to 0, got %d", pos)
46
+	}
47
+
48
+	b, err = ioutil.ReadAll(mr)
49
+	if err != nil {
50
+		t.Fatal(err)
51
+	}
52
+
53
+	if string(b) != expected {
54
+		t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected)
55
+	}
56
+}
57
+
58
+func TestMultiReadSeekerReadEach(t *testing.T) {
59
+	str := "hello world"
60
+	s1 := strings.NewReader(str + " 1")
61
+	s2 := strings.NewReader(str + " 2")
62
+	s3 := strings.NewReader(str + " 3")
63
+	mr := MultiReadSeeker(s1, s2, s3)
64
+
65
+	var totalBytes int64
66
+	for i, s := range []*strings.Reader{s1, s2, s3} {
67
+		sLen := int64(s.Len())
68
+		buf := make([]byte, s.Len())
69
+		expected := []byte(fmt.Sprintf("%s %d", str, i+1))
70
+
71
+		if _, err := mr.Read(buf); err != nil && err != io.EOF {
72
+			t.Fatal(err)
73
+		}
74
+
75
+		if !bytes.Equal(buf, expected) {
76
+			t.Fatalf("expected %q to be %q", string(buf), string(expected))
77
+		}
78
+
79
+		pos, err := mr.Seek(0, os.SEEK_CUR)
80
+		if err != nil {
81
+			t.Fatalf("iteration: %d, error: %v", i+1, err)
82
+		}
83
+
84
+		// check that the total bytes read is the current position of the seeker
85
+		totalBytes += sLen
86
+		if pos != totalBytes {
87
+			t.Fatalf("expected current position to be: %d, got: %d, iteration: %d", totalBytes, pos, i+1)
88
+		}
89
+
90
+		// This tests not only that SEEK_SET and SEEK_CUR give the same values, but that the next iteration is in the expected position as well
91
+		newPos, err := mr.Seek(pos, os.SEEK_SET)
92
+		if err != nil {
93
+			t.Fatal(err)
94
+		}
95
+		if newPos != pos {
96
+			t.Fatalf("expected to get same position when calling SEEK_SET with value from SEEK_CUR, cur: %d, set: %d", pos, newPos)
97
+		}
98
+	}
99
+}
100
+
101
+func TestMultiReadSeekerReadSpanningChunks(t *testing.T) {
102
+	str := "hello world"
103
+	s1 := strings.NewReader(str + " 1")
104
+	s2 := strings.NewReader(str + " 2")
105
+	s3 := strings.NewReader(str + " 3")
106
+	mr := MultiReadSeeker(s1, s2, s3)
107
+
108
+	buf := make([]byte, s1.Len()+3)
109
+	_, err := mr.Read(buf)
110
+	if err != nil {
111
+		t.Fatal(err)
112
+	}
113
+
114
+	// expected is the contents of s1 + 3 bytes from s2, ie, the `hel` at the end of this string
115
+	expected := "hello world 1hel"
116
+	if string(buf) != expected {
117
+		t.Fatalf("expected %s to be %s", string(buf), expected)
118
+	}
119
+}
120
+
121
+func TestMultiReadSeekerNegativeSeek(t *testing.T) {
122
+	str := "hello world"
123
+	s1 := strings.NewReader(str + " 1")
124
+	s2 := strings.NewReader(str + " 2")
125
+	s3 := strings.NewReader(str + " 3")
126
+	mr := MultiReadSeeker(s1, s2, s3)
127
+
128
+	s1Len := s1.Len()
129
+	s2Len := s2.Len()
130
+	s3Len := s3.Len()
131
+
132
+	s, err := mr.Seek(int64(-1*s3.Len()), os.SEEK_END)
133
+	if err != nil {
134
+		t.Fatal(err)
135
+	}
136
+	if s != int64(s1Len+s2Len) {
137
+		t.Fatalf("expected %d to be %d", s, s1.Len()+s2.Len())
138
+	}
139
+
140
+	buf := make([]byte, s3Len)
141
+	if _, err := mr.Read(buf); err != nil && err != io.EOF {
142
+		t.Fatal(err)
143
+	}
144
+	expected := fmt.Sprintf("%s %d", str, 3)
145
+	if string(buf) != fmt.Sprintf("%s %d", str, 3) {
146
+		t.Fatalf("expected %q to be %q", string(buf), expected)
147
+	}
148
+}
... ...
@@ -3,6 +3,7 @@ package tailfile
3 3
 import (
4 4
 	"bytes"
5 5
 	"errors"
6
+	"io"
6 7
 	"os"
7 8
 )
8 9
 
... ...
@@ -12,7 +13,7 @@ var eol = []byte("\n")
12 12
 var ErrNonPositiveLinesNumber = errors.New("Lines number must be positive")
13 13
 
14 14
 //TailFile returns last n lines of file f
15
-func TailFile(f *os.File, n int) ([][]byte, error) {
15
+func TailFile(f io.ReadSeeker, n int) ([][]byte, error) {
16 16
 	if n <= 0 {
17 17
 		return nil, ErrNonPositiveLinesNumber
18 18
 	}