Browse code

cluster executor: Logs retrieving support.

Plumbed the executor to the container logs backend.

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>

Andrea Luzzardi authored on 2016/10/26 17:14:15
Showing 3 changed files
... ...
@@ -6,6 +6,7 @@ import (
6 6
 
7 7
 	"github.com/docker/distribution"
8 8
 	"github.com/docker/docker/api/types"
9
+	"github.com/docker/docker/api/types/backend"
9 10
 	"github.com/docker/docker/api/types/container"
10 11
 	"github.com/docker/docker/api/types/events"
11 12
 	"github.com/docker/docker/api/types/filters"
... ...
@@ -28,6 +29,7 @@ type Backend interface {
28 28
 	CreateManagedContainer(config types.ContainerCreateConfig, validateHostname bool) (container.ContainerCreateCreatedBody, error)
29 29
 	ContainerStart(name string, hostConfig *container.HostConfig, validateHostname bool, checkpoint string, checkpointDir string) error
30 30
 	ContainerStop(name string, seconds *int) error
31
+	ContainerLogs(context.Context, string, *backend.ContainerLogsConfig, chan struct{}) error
31 32
 	ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error
32 33
 	ActivateContainerServiceBinding(containerName string) error
33 34
 	DeactivateContainerServiceBinding(containerName string) error
... ...
@@ -12,6 +12,7 @@ import (
12 12
 	"github.com/Sirupsen/logrus"
13 13
 	"github.com/docker/docker/api/server/httputils"
14 14
 	"github.com/docker/docker/api/types"
15
+	"github.com/docker/docker/api/types/backend"
15 16
 	containertypes "github.com/docker/docker/api/types/container"
16 17
 	"github.com/docker/docker/api/types/events"
17 18
 	"github.com/docker/docker/api/types/versions"
... ...
@@ -20,6 +21,7 @@ import (
20 20
 	"github.com/docker/swarmkit/agent/exec"
21 21
 	"github.com/docker/swarmkit/api"
22 22
 	"github.com/docker/swarmkit/log"
23
+	"github.com/docker/swarmkit/protobuf/ptypes"
23 24
 	"golang.org/x/net/context"
24 25
 	"golang.org/x/time/rate"
25 26
 )
... ...
@@ -376,6 +378,56 @@ func (c *containerAdapter) deactivateServiceBinding() error {
376 376
 	return c.backend.DeactivateContainerServiceBinding(c.container.name())
377 377
 }
378 378
 
379
+func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (io.ReadCloser, error) {
380
+	reader, writer := io.Pipe()
381
+
382
+	apiOptions := &backend.ContainerLogsConfig{
383
+		ContainerLogsOptions: types.ContainerLogsOptions{
384
+			Follow: options.Follow,
385
+
386
+			// TODO(stevvooe): Parse timestamp out of message. This
387
+			// absolutely needs to be done before going to production with
388
+			// this, at it is completely redundant.
389
+			Timestamps: true,
390
+			Details:    false, // no clue what to do with this, let's just deprecate it.
391
+		},
392
+		OutStream: writer,
393
+	}
394
+
395
+	if options.Since != nil {
396
+		since, err := ptypes.Timestamp(options.Since)
397
+		if err != nil {
398
+			return nil, err
399
+		}
400
+		apiOptions.Since = since.Format(time.RFC3339Nano)
401
+	}
402
+
403
+	if options.Tail < 0 {
404
+		// See protobuf documentation for details of how this works.
405
+		apiOptions.Tail = fmt.Sprint(-options.Tail - 1)
406
+	} else if options.Tail > 0 {
407
+		return nil, fmt.Errorf("tail relative to start of logs not supported via docker API")
408
+	}
409
+
410
+	if len(options.Streams) == 0 {
411
+		// empty == all
412
+		apiOptions.ShowStdout, apiOptions.ShowStderr = true, true
413
+	} else {
414
+		for _, stream := range options.Streams {
415
+			switch stream {
416
+			case api.LogStreamStdout:
417
+				apiOptions.ShowStdout = true
418
+			case api.LogStreamStderr:
419
+				apiOptions.ShowStderr = true
420
+			}
421
+		}
422
+	}
423
+
424
+	chStarted := make(chan struct{})
425
+	go c.backend.ContainerLogs(ctx, c.container.name(), apiOptions, chStarted)
426
+	return reader, nil
427
+}
428
+
379 429
 // todo: typed/wrapped errors
380 430
 func isContainerCreateNameConflict(err error) bool {
381 431
 	return strings.Contains(err.Error(), "Conflict. The name")
... ...
@@ -1,8 +1,13 @@
1 1
 package container
2 2
 
3 3
 import (
4
+	"bufio"
5
+	"bytes"
6
+	"encoding/binary"
4 7
 	"fmt"
8
+	"io"
5 9
 	"os"
10
+	"time"
6 11
 
7 12
 	"github.com/docker/docker/api/types"
8 13
 	"github.com/docker/docker/api/types/events"
... ...
@@ -11,8 +16,10 @@ import (
11 11
 	"github.com/docker/swarmkit/agent/exec"
12 12
 	"github.com/docker/swarmkit/api"
13 13
 	"github.com/docker/swarmkit/log"
14
+	"github.com/docker/swarmkit/protobuf/ptypes"
14 15
 	"github.com/pkg/errors"
15 16
 	"golang.org/x/net/context"
17
+	"golang.org/x/time/rate"
16 18
 )
17 19
 
18 20
 // controller implements agent.Controller against docker's API.
... ...
@@ -374,6 +381,128 @@ func (r *controller) Remove(ctx context.Context) error {
374 374
 	return nil
375 375
 }
376 376
 
377
+// waitReady waits for a container to be "ready".
378
+// Ready means it's past the started state.
379
+func (r *controller) waitReady(pctx context.Context) error {
380
+	if err := r.checkClosed(); err != nil {
381
+		return err
382
+	}
383
+
384
+	ctx, cancel := context.WithCancel(pctx)
385
+	defer cancel()
386
+
387
+	eventq := r.adapter.events(ctx)
388
+
389
+	ctnr, err := r.adapter.inspect(ctx)
390
+	if err != nil {
391
+		if !isUnknownContainer(err) {
392
+			return errors.Wrap(err, "inspect container failed")
393
+		}
394
+	} else {
395
+		switch ctnr.State.Status {
396
+		case "running", "exited", "dead":
397
+			return nil
398
+		}
399
+	}
400
+
401
+	for {
402
+		select {
403
+		case event := <-eventq:
404
+			if !r.matchevent(event) {
405
+				continue
406
+			}
407
+
408
+			switch event.Action {
409
+			case "start":
410
+				return nil
411
+			}
412
+		case <-ctx.Done():
413
+			return ctx.Err()
414
+		case <-r.closed:
415
+			return r.err
416
+		}
417
+	}
418
+}
419
+
420
+func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, options api.LogSubscriptionOptions) error {
421
+	if err := r.checkClosed(); err != nil {
422
+		return err
423
+	}
424
+
425
+	if err := r.waitReady(ctx); err != nil {
426
+		return errors.Wrap(err, "container not ready for logs")
427
+	}
428
+
429
+	rc, err := r.adapter.logs(ctx, options)
430
+	if err != nil {
431
+		return errors.Wrap(err, "failed getting container logs")
432
+	}
433
+	defer rc.Close()
434
+
435
+	var (
436
+		// use a rate limiter to keep things under control but also provides some
437
+		// ability coalesce messages.
438
+		limiter = rate.NewLimiter(rate.Every(time.Second), 10<<20) // 10 MB/s
439
+		msgctx  = api.LogContext{
440
+			NodeID:    r.task.NodeID,
441
+			ServiceID: r.task.ServiceID,
442
+			TaskID:    r.task.ID,
443
+		}
444
+	)
445
+
446
+	brd := bufio.NewReader(rc)
447
+	for {
448
+		// so, message header is 8 bytes, treat as uint64, pull stream off MSB
449
+		var header uint64
450
+		if err := binary.Read(brd, binary.BigEndian, &header); err != nil {
451
+			if err == io.EOF {
452
+				return nil
453
+			}
454
+
455
+			return errors.Wrap(err, "failed reading log header")
456
+		}
457
+
458
+		stream, size := (header>>(7<<3))&0xFF, header & ^(uint64(0xFF)<<(7<<3))
459
+
460
+		// limit here to decrease allocation back pressure.
461
+		if err := limiter.WaitN(ctx, int(size)); err != nil {
462
+			return errors.Wrap(err, "failed rate limiter")
463
+		}
464
+
465
+		buf := make([]byte, size)
466
+		_, err := io.ReadFull(brd, buf)
467
+		if err != nil {
468
+			return errors.Wrap(err, "failed reading buffer")
469
+		}
470
+
471
+		// Timestamp is RFC3339Nano with 1 space after. Lop, parse, publish
472
+		parts := bytes.SplitN(buf, []byte(" "), 2)
473
+		if len(parts) != 2 {
474
+			return fmt.Errorf("invalid timestamp in log message: %v", buf)
475
+		}
476
+
477
+		ts, err := time.Parse(time.RFC3339Nano, string(parts[0]))
478
+		if err != nil {
479
+			return errors.Wrap(err, "failed to parse timestamp")
480
+		}
481
+
482
+		tsp, err := ptypes.TimestampProto(ts)
483
+		if err != nil {
484
+			return errors.Wrap(err, "failed to convert timestamp")
485
+		}
486
+
487
+		if err := publisher.Publish(ctx, api.LogMessage{
488
+			Context:   msgctx,
489
+			Timestamp: tsp,
490
+			Stream:    api.LogStream(stream),
491
+
492
+			Data: parts[1],
493
+		}); err != nil {
494
+			return errors.Wrap(err, "failed to publish log message")
495
+		}
496
+	}
497
+}
498
+
377 499
 // Close the runner and clean up any ephemeral resources.
378 500
 func (r *controller) Close() error {
379 501
 	select {