Browse code

Add --until flag for docker logs; closes #32807

Signed-off-by: Jamie Hannaford <jamie.hannaford@rackspace.com>

Jamie Hannaford authored on 2017/04/28 20:53:00
Showing 12 changed files
... ...
@@ -96,6 +96,7 @@ func (s *containerRouter) getContainersLogs(ctx context.Context, w http.Response
96 96
 		Follow:     httputils.BoolValue(r, "follow"),
97 97
 		Timestamps: httputils.BoolValue(r, "timestamps"),
98 98
 		Since:      r.Form.Get("since"),
99
+		Until:      r.Form.Get("until"),
99 100
 		Tail:       r.Form.Get("tail"),
100 101
 		ShowStdout: stdout,
101 102
 		ShowStderr: stderr,
... ...
@@ -4955,6 +4955,11 @@ paths:
4955 4955
           description: "Only return logs since this time, as a UNIX timestamp"
4956 4956
           type: "integer"
4957 4957
           default: 0
4958
+        - name: "until"
4959
+          in: "query"
4960
+          description: "Only return logs before this time, as a UNIX timestamp"
4961
+          type: "integer"
4962
+          default: 0
4958 4963
         - name: "timestamps"
4959 4964
           in: "query"
4960 4965
           description: "Add timestamps to every log line"
... ...
@@ -74,6 +74,7 @@ type ContainerLogsOptions struct {
74 74
 	ShowStdout bool
75 75
 	ShowStderr bool
76 76
 	Since      string
77
+	Until      string
77 78
 	Timestamps bool
78 79
 	Follow     bool
79 80
 	Tail       string
... ...
@@ -51,6 +51,14 @@ func (cli *Client) ContainerLogs(ctx context.Context, container string, options
51 51
 		query.Set("since", ts)
52 52
 	}
53 53
 
54
+	if options.Until != "" {
55
+		ts, err := timetypes.GetTimestamp(options.Until, time.Now())
56
+		if err != nil {
57
+			return nil, err
58
+		}
59
+		query.Set("until", ts)
60
+	}
61
+
54 62
 	if options.Timestamps {
55 63
 		query.Set("timestamps", "1")
56 64
 	}
... ...
@@ -13,6 +13,7 @@ import (
13 13
 	"time"
14 14
 
15 15
 	"github.com/docker/docker/api/types"
16
+	"github.com/docker/docker/internal/testutil"
16 17
 
17 18
 	"golang.org/x/net/context"
18 19
 )
... ...
@@ -28,9 +29,11 @@ func TestContainerLogsError(t *testing.T) {
28 28
 	_, err = client.ContainerLogs(context.Background(), "container_id", types.ContainerLogsOptions{
29 29
 		Since: "2006-01-02TZ",
30 30
 	})
31
-	if err == nil || !strings.Contains(err.Error(), `parsing time "2006-01-02TZ"`) {
32
-		t.Fatalf("expected a 'parsing time' error, got %v", err)
33
-	}
31
+	testutil.ErrorContains(t, err, `parsing time "2006-01-02TZ"`)
32
+	_, err = client.ContainerLogs(context.Background(), "container_id", types.ContainerLogsOptions{
33
+		Until: "2006-01-02TZ",
34
+	})
35
+	testutil.ErrorContains(t, err, `parsing time "2006-01-02TZ"`)
34 36
 }
35 37
 
36 38
 func TestContainerLogs(t *testing.T) {
... ...
@@ -80,6 +83,17 @@ func TestContainerLogs(t *testing.T) {
80 80
 				"since": "invalid but valid",
81 81
 			},
82 82
 		},
83
+		{
84
+			options: types.ContainerLogsOptions{
85
+				// An complete invalid date, timestamp or go duration will be
86
+				// passed as is
87
+				Until: "invalid but valid",
88
+			},
89
+			expectedQueryParams: map[string]string{
90
+				"tail":  "",
91
+				"until": "invalid but valid",
92
+			},
93
+		},
83 94
 	}
84 95
 	for _, logCase := range cases {
85 96
 		client := &Client{
... ...
@@ -122,6 +122,9 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
122 122
 			if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
123 123
 				continue
124 124
 			}
125
+			if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
126
+				return
127
+			}
125 128
 
126 129
 			select {
127 130
 			case watcher.Msg <- msg:
... ...
@@ -171,13 +171,15 @@ func (s *journald) Close() error {
171 171
 	return nil
172 172
 }
173 173
 
174
-func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, oldCursor *C.char) *C.char {
174
+func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool) {
175 175
 	var msg, data, cursor *C.char
176 176
 	var length C.size_t
177 177
 	var stamp C.uint64_t
178 178
 	var priority, partial C.int
179
+	var done bool
179 180
 
180
-	// Walk the journal from here forward until we run out of new entries.
181
+	// Walk the journal from here forward until we run out of new entries
182
+	// or we reach the until value (if provided).
181 183
 drain:
182 184
 	for {
183 185
 		// Try not to send a given entry twice.
... ...
@@ -195,6 +197,12 @@ drain:
195 195
 			if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
196 196
 				break
197 197
 			}
198
+			// Break if the timestamp exceeds any provided until flag.
199
+			if untilUnixMicro != 0 && untilUnixMicro < uint64(stamp) {
200
+				done = true
201
+				break
202
+			}
203
+
198 204
 			// Set up the time and text of the entry.
199 205
 			timestamp := time.Unix(int64(stamp)/1000000, (int64(stamp)%1000000)*1000)
200 206
 			line := C.GoBytes(unsafe.Pointer(msg), C.int(length))
... ...
@@ -240,10 +248,10 @@ drain:
240 240
 		// ensure that we won't be freeing an address that's invalid
241 241
 		cursor = nil
242 242
 	}
243
-	return cursor
243
+	return cursor, done
244 244
 }
245 245
 
246
-func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor *C.char) *C.char {
246
+func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, pfd [2]C.int, cursor *C.char, untilUnixMicro uint64) *C.char {
247 247
 	s.mu.Lock()
248 248
 	s.readers.readers[logWatcher] = logWatcher
249 249
 	if s.closed {
... ...
@@ -270,9 +278,10 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.Re
270 270
 				break
271 271
 			}
272 272
 
273
-			cursor = s.drainJournal(logWatcher, config, j, cursor)
273
+			var done bool
274
+			cursor, done = s.drainJournal(logWatcher, j, cursor, untilUnixMicro)
274 275
 
275
-			if status != 1 {
276
+			if status != 1 || done {
276 277
 				// We were notified to stop
277 278
 				break
278 279
 			}
... ...
@@ -304,6 +313,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
304 304
 	var cmatch, cursor *C.char
305 305
 	var stamp C.uint64_t
306 306
 	var sinceUnixMicro uint64
307
+	var untilUnixMicro uint64
307 308
 	var pipes [2]C.int
308 309
 
309 310
 	// Get a handle to the journal.
... ...
@@ -343,10 +353,19 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
343 343
 		nano := config.Since.UnixNano()
344 344
 		sinceUnixMicro = uint64(nano / 1000)
345 345
 	}
346
+	// If we have an until value, convert it too
347
+	if !config.Until.IsZero() {
348
+		nano := config.Until.UnixNano()
349
+		untilUnixMicro = uint64(nano / 1000)
350
+	}
346 351
 	if config.Tail > 0 {
347 352
 		lines := config.Tail
348
-		// Start at the end of the journal.
349
-		if C.sd_journal_seek_tail(j) < 0 {
353
+		// If until time provided, start from there.
354
+		// Otherwise start at the end of the journal.
355
+		if untilUnixMicro != 0 && C.sd_journal_seek_realtime_usec(j, C.uint64_t(untilUnixMicro)) < 0 {
356
+			logWatcher.Err <- fmt.Errorf("error seeking provided until value")
357
+			return
358
+		} else if C.sd_journal_seek_tail(j) < 0 {
350 359
 			logWatcher.Err <- fmt.Errorf("error seeking to end of journal")
351 360
 			return
352 361
 		}
... ...
@@ -362,8 +381,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
362 362
 			if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
363 363
 				break
364 364
 			} else {
365
-				// Compare the timestamp on the entry
366
-				// to our threshold value.
365
+				// Compare the timestamp on the entry to our threshold value.
367 366
 				if sinceUnixMicro != 0 && sinceUnixMicro > uint64(stamp) {
368 367
 					break
369 368
 				}
... ...
@@ -392,7 +410,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
392 392
 			return
393 393
 		}
394 394
 	}
395
-	cursor = s.drainJournal(logWatcher, config, j, nil)
395
+	cursor, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro)
396 396
 	if config.Follow {
397 397
 		// Allocate a descriptor for following the journal, if we'll
398 398
 		// need one.  Do it here so that we can report if it fails.
... ...
@@ -404,7 +422,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
404 404
 			if C.pipe(&pipes[0]) == C.int(-1) {
405 405
 				logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe")
406 406
 			} else {
407
-				cursor = s.followJournal(logWatcher, config, j, pipes, cursor)
407
+				cursor = s.followJournal(logWatcher, j, pipes, cursor, untilUnixMicro)
408 408
 				// Let followJournal handle freeing the journal context
409 409
 				// object and closing the channel.
410 410
 				following = true
... ...
@@ -98,7 +98,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
98 98
 
99 99
 	if config.Tail != 0 {
100 100
 		tailer := multireader.MultiReadSeeker(append(files, latestChunk)...)
101
-		tailFile(tailer, logWatcher, config.Tail, config.Since)
101
+		tailFile(tailer, logWatcher, config.Tail, config.Since, config.Until)
102 102
 	}
103 103
 
104 104
 	// close all the rotated files
... ...
@@ -119,7 +119,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
119 119
 	l.readers[logWatcher] = struct{}{}
120 120
 	l.mu.Unlock()
121 121
 
122
-	followLogs(latestFile, logWatcher, notifyRotate, config.Since)
122
+	followLogs(latestFile, logWatcher, notifyRotate, config)
123 123
 
124 124
 	l.mu.Lock()
125 125
 	delete(l.readers, logWatcher)
... ...
@@ -136,7 +136,7 @@ func newSectionReader(f *os.File) (*io.SectionReader, error) {
136 136
 	return io.NewSectionReader(f, 0, size), nil
137 137
 }
138 138
 
139
-func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
139
+func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since, until time.Time) {
140 140
 	rdr := io.Reader(f)
141 141
 	if tail > 0 {
142 142
 		ls, err := tailfile.TailFile(f, tail)
... ...
@@ -158,6 +158,9 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti
158 158
 		if !since.IsZero() && msg.Timestamp.Before(since) {
159 159
 			continue
160 160
 		}
161
+		if !until.IsZero() && msg.Timestamp.After(until) {
162
+			return
163
+		}
161 164
 		select {
162 165
 		case <-logWatcher.WatchClose():
163 166
 			return
... ...
@@ -186,7 +189,7 @@ func watchFile(name string) (filenotify.FileWatcher, error) {
186 186
 	return fileWatcher, nil
187 187
 }
188 188
 
189
-func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
189
+func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, config logger.ReadConfig) {
190 190
 	dec := json.NewDecoder(f)
191 191
 	l := &jsonlog.JSONLog{}
192 192
 
... ...
@@ -324,14 +327,22 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
324 324
 			continue
325 325
 		}
326 326
 
327
+		since := config.Since
328
+		until := config.Until
329
+
327 330
 		retries = 0 // reset retries since we've succeeded
328 331
 		if !since.IsZero() && msg.Timestamp.Before(since) {
329 332
 			continue
330 333
 		}
334
+		if !until.IsZero() && msg.Timestamp.After(until) {
335
+			return
336
+		}
331 337
 		select {
332 338
 		case logWatcher.Msg <- msg:
333 339
 		case <-ctx.Done():
334 340
 			logWatcher.Msg <- msg
341
+			// This for loop is used when the logger is closed (ie, container
342
+			// stopped) but the consumer is still waiting for logs.
335 343
 			for {
336 344
 				msg, err := decodeLogLine(dec, l)
337 345
 				if err != nil {
... ...
@@ -340,6 +351,9 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
340 340
 				if !since.IsZero() && msg.Timestamp.Before(since) {
341 341
 					continue
342 342
 				}
343
+				if !until.IsZero() && msg.Timestamp.After(until) {
344
+					return
345
+				}
343 346
 				logWatcher.Msg <- msg
344 347
 			}
345 348
 		}
... ...
@@ -81,6 +81,7 @@ type Logger interface {
81 81
 // ReadConfig is the configuration passed into ReadLogs.
82 82
 type ReadConfig struct {
83 83
 	Since  time.Time
84
+	Until  time.Time
84 85
 	Tail   int
85 86
 	Follow bool
86 87
 }
... ...
@@ -77,8 +77,18 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c
77 77
 		since = time.Unix(s, n)
78 78
 	}
79 79
 
80
+	var until time.Time
81
+	if config.Until != "" && config.Until != "0" {
82
+		s, n, err := timetypes.ParseTimestamps(config.Until, 0)
83
+		if err != nil {
84
+			return nil, false, err
85
+		}
86
+		until = time.Unix(s, n)
87
+	}
88
+
80 89
 	readConfig := logger.ReadConfig{
81 90
 		Since:  since,
91
+		Until:  until,
82 92
 		Tail:   tailLines,
83 93
 		Follow: follow,
84 94
 	}
... ...
@@ -23,6 +23,7 @@ keywords: "API, Docker, rcli, REST, documentation"
23 23
   If `Error` is `null`, container removal has succeeded, otherwise
24 24
   the test of an error message indicating why container removal has failed
25 25
   is available from `Error.Message` field.
26
+* `GET /containers/(name)/logs` now supports an additional query parameter: `until`, which returns log lines that occurred before the specified timestamp.
26 27
 
27 28
 ## v1.33 API changes
28 29
 
... ...
@@ -93,7 +94,7 @@ keywords: "API, Docker, rcli, REST, documentation"
93 93
 * `POST /containers/(name)/wait` now accepts a `condition` query parameter to indicate which state change condition to wait for. Also, response headers are now returned immediately to acknowledge that the server has registered a wait callback for the client.
94 94
 * `POST /swarm/init` now accepts a `DataPathAddr` property to set the IP-address or network interface to use for data traffic
95 95
 * `POST /swarm/join` now accepts a `DataPathAddr` property to set the IP-address or network interface to use for data traffic
96
-* `GET /events` now supports service, node and secret events which are emitted when users create, update and remove service, node and secret 
96
+* `GET /events` now supports service, node and secret events which are emitted when users create, update and remove service, node and secret
97 97
 * `GET /events` now supports network remove event which is emitted when users remove a swarm scoped network
98 98
 * `GET /events` now supports a filter type `scope` in which supported value could be swarm and local
99 99
 
... ...
@@ -2,8 +2,12 @@ package main
2 2
 
3 3
 import (
4 4
 	"bufio"
5
+	"bytes"
5 6
 	"fmt"
7
+	"io"
8
+	"io/ioutil"
6 9
 	"net/http"
10
+	"strconv"
7 11
 	"strings"
8 12
 	"time"
9 13
 
... ...
@@ -11,6 +15,7 @@ import (
11 11
 	"github.com/docker/docker/client"
12 12
 	"github.com/docker/docker/integration-cli/checker"
13 13
 	"github.com/docker/docker/integration-cli/request"
14
+	"github.com/docker/docker/pkg/stdcopy"
14 15
 	"github.com/go-check/check"
15 16
 	"golang.org/x/net/context"
16 17
 )
... ...
@@ -85,3 +90,125 @@ func (s *DockerSuite) TestLogsAPIContainerNotFound(c *check.C) {
85 85
 	c.Assert(err, checker.IsNil)
86 86
 	c.Assert(resp.StatusCode, checker.Equals, http.StatusNotFound)
87 87
 }
88
+
89
+func (s *DockerSuite) TestLogsAPIUntilFutureFollow(c *check.C) {
90
+	testRequires(c, DaemonIsLinux)
91
+
92
+	name := "logsuntilfuturefollow"
93
+	dockerCmd(c, "run", "-d", "--name", name, "busybox", "/bin/sh", "-c", "while true; do date +%s; sleep 1; done")
94
+	c.Assert(waitRun(name), checker.IsNil)
95
+
96
+	untilSecs := 5
97
+	untilDur, err := time.ParseDuration(fmt.Sprintf("%ds", untilSecs))
98
+	c.Assert(err, checker.IsNil)
99
+	until := daemonTime(c).Add(untilDur)
100
+
101
+	client, err := request.NewClient()
102
+	if err != nil {
103
+		c.Fatal(err)
104
+	}
105
+
106
+	cfg := types.ContainerLogsOptions{Until: until.Format(time.RFC3339Nano), Follow: true, ShowStdout: true, Timestamps: true}
107
+	reader, err := client.ContainerLogs(context.Background(), name, cfg)
108
+	c.Assert(err, checker.IsNil)
109
+
110
+	type logOut struct {
111
+		out string
112
+		err error
113
+	}
114
+
115
+	chLog := make(chan logOut)
116
+
117
+	go func() {
118
+		bufReader := bufio.NewReader(reader)
119
+		defer reader.Close()
120
+		for i := 0; i < untilSecs; i++ {
121
+			out, _, err := bufReader.ReadLine()
122
+			if err != nil {
123
+				if err == io.EOF {
124
+					return
125
+				}
126
+				chLog <- logOut{"", err}
127
+				return
128
+			}
129
+
130
+			chLog <- logOut{strings.TrimSpace(string(out)), err}
131
+		}
132
+	}()
133
+
134
+	for i := 0; i < untilSecs; i++ {
135
+		select {
136
+		case l := <-chLog:
137
+			c.Assert(l.err, checker.IsNil)
138
+			i, err := strconv.ParseInt(strings.Split(l.out, " ")[1], 10, 64)
139
+			c.Assert(err, checker.IsNil)
140
+			c.Assert(time.Unix(i, 0).UnixNano(), checker.LessOrEqualThan, until.UnixNano())
141
+		case <-time.After(20 * time.Second):
142
+			c.Fatal("timeout waiting for logs to exit")
143
+		}
144
+	}
145
+}
146
+
147
+func (s *DockerSuite) TestLogsAPIUntil(c *check.C) {
148
+	name := "logsuntil"
149
+	dockerCmd(c, "run", "--name", name, "busybox", "/bin/sh", "-c", "for i in $(seq 1 3); do echo log$i; sleep 0.5; done")
150
+
151
+	client, err := request.NewClient()
152
+	if err != nil {
153
+		c.Fatal(err)
154
+	}
155
+
156
+	extractBody := func(c *check.C, cfg types.ContainerLogsOptions) []string {
157
+		reader, err := client.ContainerLogs(context.Background(), name, cfg)
158
+		c.Assert(err, checker.IsNil)
159
+
160
+		actualStdout := new(bytes.Buffer)
161
+		actualStderr := ioutil.Discard
162
+		_, err = stdcopy.StdCopy(actualStdout, actualStderr, reader)
163
+		c.Assert(err, checker.IsNil)
164
+
165
+		return strings.Split(actualStdout.String(), "\n")
166
+	}
167
+
168
+	// Get timestamp of second log line
169
+	allLogs := extractBody(c, types.ContainerLogsOptions{Timestamps: true, ShowStdout: true})
170
+	t, err := time.Parse(time.RFC3339Nano, strings.Split(allLogs[1], " ")[0])
171
+	c.Assert(err, checker.IsNil)
172
+	until := t.Format(time.RFC3339Nano)
173
+
174
+	// Get logs until the timestamp of second line, i.e. first two lines
175
+	logs := extractBody(c, types.ContainerLogsOptions{Timestamps: true, ShowStdout: true, Until: until})
176
+
177
+	// Ensure log lines after cut-off are excluded
178
+	logsString := strings.Join(logs, "\n")
179
+	c.Assert(logsString, checker.Not(checker.Contains), "log3", check.Commentf("unexpected log message returned, until=%v", until))
180
+}
181
+
182
+func (s *DockerSuite) TestLogsAPIUntilDefaultValue(c *check.C) {
183
+	name := "logsuntildefaultval"
184
+	dockerCmd(c, "run", "--name", name, "busybox", "/bin/sh", "-c", "for i in $(seq 1 3); do echo log$i; done")
185
+
186
+	client, err := request.NewClient()
187
+	if err != nil {
188
+		c.Fatal(err)
189
+	}
190
+
191
+	extractBody := func(c *check.C, cfg types.ContainerLogsOptions) []string {
192
+		reader, err := client.ContainerLogs(context.Background(), name, cfg)
193
+		c.Assert(err, checker.IsNil)
194
+
195
+		actualStdout := new(bytes.Buffer)
196
+		actualStderr := ioutil.Discard
197
+		_, err = stdcopy.StdCopy(actualStdout, actualStderr, reader)
198
+		c.Assert(err, checker.IsNil)
199
+
200
+		return strings.Split(actualStdout.String(), "\n")
201
+	}
202
+
203
+	// Get timestamp of second log line
204
+	allLogs := extractBody(c, types.ContainerLogsOptions{Timestamps: true, ShowStdout: true})
205
+
206
+	// Test with default value specified and parameter omitted
207
+	defaultLogs := extractBody(c, types.ContainerLogsOptions{Timestamps: true, ShowStdout: true, Until: "0"})
208
+	c.Assert(defaultLogs, checker.DeepEquals, allLogs)
209
+}