daemon/cluster/executor/container/adapter.go
534a90a9
 package container
 
 import (
 	"encoding/base64"
 	"encoding/json"
514adcf4
 	"errors"
534a90a9
 	"fmt"
 	"io"
92899ffa
 	"os"
a1fe1dc7
 	"runtime"
534a90a9
 	"strings"
 	"syscall"
1ded1f26
 	"time"
534a90a9
 
3a127939
 	"github.com/docker/distribution/reference"
7c36a1af
 	"github.com/docker/docker/api/types"
0ec68657
 	"github.com/docker/docker/api/types/backend"
bad849fc
 	containertypes "github.com/docker/docker/api/types/container"
7c36a1af
 	"github.com/docker/docker/api/types/events"
cfdf84d5
 	containerpkg "github.com/docker/docker/container"
bebd472e
 	"github.com/docker/docker/daemon/cluster/convert"
534a90a9
 	executorpkg "github.com/docker/docker/daemon/cluster/executor"
 	"github.com/docker/libnetwork"
3716ec25
 	"github.com/docker/swarmkit/agent/exec"
534a90a9
 	"github.com/docker/swarmkit/api"
 	"github.com/docker/swarmkit/log"
3e987e17
 	gogotypes "github.com/gogo/protobuf/types"
7a855799
 	"github.com/opencontainers/go-digest"
1009e6a4
 	"github.com/sirupsen/logrus"
534a90a9
 	"golang.org/x/net/context"
fa0054a3
 	"golang.org/x/time/rate"
534a90a9
 )
 
 // containerAdapter conducts remote operations for a container. All calls
 // are mostly naked calls to the client API, seeded with information from
 // containerConfig.
 type containerAdapter struct {
9e9fc7b5
 	backend      executorpkg.Backend
 	container    *containerConfig
 	dependencies exec.DependencyGetter
534a90a9
 }
 
e2f09fa6
 func newContainerAdapter(b executorpkg.Backend, task *api.Task, node *api.NodeDescription, dependencies exec.DependencyGetter) (*containerAdapter, error) {
 	ctnr, err := newContainerConfig(task, node)
534a90a9
 	if err != nil {
 		return nil, err
 	}
 
 	return &containerAdapter{
9e9fc7b5
 		container:    ctnr,
 		backend:      b,
 		dependencies: dependencies,
534a90a9
 	}, nil
 }
 
 func (c *containerAdapter) pullImage(ctx context.Context) error {
64a567d2
 	spec := c.container.spec()
 
089842c4
 	// Skip pulling if the image is referenced by image ID.
7a855799
 	if _, err := digest.Parse(spec.Image); err == nil {
089842c4
 		return nil
 	}
 
f69e5c18
 	// Skip pulling if the image is referenced by digest and already
 	// exists locally.
3a127939
 	named, err := reference.ParseNormalizedNamed(spec.Image)
f69e5c18
 	if err == nil {
 		if _, ok := named.(reference.Canonical); ok {
 			_, err := c.backend.LookupImage(spec.Image)
 			if err == nil {
 				return nil
 			}
 		}
 	}
 
534a90a9
 	// if the image needs to be pulled, the auth config will be retrieved and updated
64a567d2
 	var encodedAuthConfig string
 	if spec.PullOptions != nil {
 		encodedAuthConfig = spec.PullOptions.RegistryAuth
 	}
534a90a9
 
 	authConfig := &types.AuthConfig{}
 	if encodedAuthConfig != "" {
 		if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
 			logrus.Warnf("invalid authconfig: %v", err)
 		}
 	}
 
 	pr, pw := io.Pipe()
 	metaHeaders := map[string][]string{}
 	go func() {
a1fe1dc7
 		// TODO @jhowardmsft LCOW Support: This will need revisiting as
 		// the stack is built up to include LCOW support for swarm.
 		platform := runtime.GOOS
 		err := c.backend.PullImage(ctx, c.container.image(), "", platform, metaHeaders, authConfig, pw)
534a90a9
 		pw.CloseWithError(err)
 	}()
 
 	dec := json.NewDecoder(pr)
fa0054a3
 	dec.UseNumber()
534a90a9
 	m := map[string]interface{}{}
fa0054a3
 	spamLimiter := rate.NewLimiter(rate.Every(time.Second), 1)
 
 	lastStatus := ""
534a90a9
 	for {
 		if err := dec.Decode(&m); err != nil {
 			if err == io.EOF {
 				break
 			}
 			return err
 		}
fa0054a3
 		l := log.G(ctx)
 		// limit pull progress logs unless the status changes
 		if spamLimiter.Allow() || lastStatus != m["status"] {
 			// if we have progress details, we have everything we need
 			if progress, ok := m["progressDetail"].(map[string]interface{}); ok {
 				// first, log the image and status
 				l = l.WithFields(logrus.Fields{
 					"image":  c.container.image(),
 					"status": m["status"],
 				})
 				// then, if we have progress, log the progress
 				if progress["current"] != nil && progress["total"] != nil {
 					l = l.WithFields(logrus.Fields{
 						"current": progress["current"],
 						"total":   progress["total"],
 					})
 				}
 			}
 			l.Debug("pull in progress")
 		}
 		// sometimes, we get no useful information at all, and add no fields
 		if status, ok := m["status"].(string); ok {
 			lastStatus = status
 		}
534a90a9
 	}
fa0054a3
 
534a90a9
 	// if the final stream object contained an error, return it
 	if errMsg, ok := m["error"]; ok {
 		return fmt.Errorf("%v", errMsg)
 	}
 	return nil
 }
 
 func (c *containerAdapter) createNetworks(ctx context.Context) error {
cafed80c
 	for name := range c.container.networksAttachments {
 		ncr, err := c.container.networkCreateRequest(name)
534a90a9
 		if err != nil {
 			return err
 		}
 
 		if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
 			if _, ok := err.(libnetwork.NetworkNameError); ok {
 				continue
 			}
 
 			return err
 		}
 	}
 
 	return nil
 }
 
 func (c *containerAdapter) removeNetworks(ctx context.Context) error {
cafed80c
 	for name, v := range c.container.networksAttachments {
 		if err := c.backend.DeleteManagedNetwork(v.Network.ID); err != nil {
434eae7d
 			switch err.(type) {
 			case *libnetwork.ActiveEndpointsError:
534a90a9
 				continue
434eae7d
 			case libnetwork.ErrNoSuchNetwork:
 				continue
 			default:
cafed80c
 				log.G(ctx).Errorf("network %s remove failed: %v", name, err)
434eae7d
 				return err
534a90a9
 			}
 		}
 	}
 
 	return nil
 }
 
99a98ccc
 func (c *containerAdapter) networkAttach(ctx context.Context) error {
b34d3e73
 	config := c.container.createNetworkingConfig(c.backend)
99a98ccc
 
 	var (
 		networkName string
 		networkID   string
 	)
 
 	if config != nil {
 		for n, epConfig := range config.EndpointsConfig {
 			networkName = n
 			networkID = epConfig.NetworkID
 			break
 		}
 	}
 
27c0131a
 	return c.backend.UpdateAttachment(networkName, networkID, c.container.networkAttachmentContainerID(), config)
99a98ccc
 }
 
 func (c *containerAdapter) waitForDetach(ctx context.Context) error {
b34d3e73
 	config := c.container.createNetworkingConfig(c.backend)
99a98ccc
 
 	var (
 		networkName string
 		networkID   string
 	)
 
 	if config != nil {
 		for n, epConfig := range config.EndpointsConfig {
 			networkName = n
 			networkID = epConfig.NetworkID
 			break
 		}
 	}
 
27c0131a
 	return c.backend.WaitForDetachment(ctx, networkName, networkID, c.container.taskID(), c.container.networkAttachmentContainerID())
99a98ccc
 }
 
96a27cf0
 func (c *containerAdapter) create(ctx context.Context) error {
bad849fc
 	var cr containertypes.ContainerCreateCreatedBody
534a90a9
 	var err error
96a27cf0
 	if cr, err = c.backend.CreateManagedContainer(types.ContainerCreateConfig{
534a90a9
 		Name:       c.container.name(),
 		Config:     c.container.config(),
 		HostConfig: c.container.hostConfig(),
 		// Use the first network in container create
b34d3e73
 		NetworkingConfig: c.container.createNetworkingConfig(c.backend),
ef39256d
 	}); err != nil {
534a90a9
 		return err
 	}
 
7135afa7
 	// Docker daemon currently doesn't support multiple networks in container create
534a90a9
 	// Connect to all other networks
b34d3e73
 	nc := c.container.connectNetworkingConfig(c.backend)
534a90a9
 
 	if nc != nil {
 		for n, ep := range nc.EndpointsConfig {
96a27cf0
 			if err := c.backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil {
534a90a9
 				return err
 			}
 		}
 	}
 
b2e4c7f3
 	container := c.container.task.Spec.GetContainer()
 	if container == nil {
514adcf4
 		return errors.New("unable to get container from task spec")
b2e4c7f3
 	}
88dea0e0
 
9e9fc7b5
 	if err := c.backend.SetContainerDependencyStore(cr.ID, c.dependencies); err != nil {
 		return err
 	}
 
bebd472e
 	// configure secrets
9e9fc7b5
 	secretRefs := convert.SecretReferencesFromGRPC(container.Secrets)
 	if err := c.backend.SetContainerSecretReferences(cr.ID, secretRefs); err != nil {
bebd472e
 		return err
3716ec25
 	}
 
9e9fc7b5
 	configRefs := convert.ConfigReferencesFromGRPC(container.Configs)
 	if err := c.backend.SetContainerConfigReferences(cr.ID, configRefs); err != nil {
3716ec25
 		return err
 	}
 
b4a63139
 	return c.backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig())
534a90a9
 }
 
92899ffa
 // checkMounts ensures that the provided mounts won't have any host-specific
 // problems at start up. For example, we disallow bind mounts without an
 // existing path, which slightly different from the container API.
 func (c *containerAdapter) checkMounts() error {
 	spec := c.container.spec()
 	for _, mount := range spec.Mounts {
 		switch mount.Type {
 		case api.MountTypeBind:
 			if _, err := os.Stat(mount.Source); os.IsNotExist(err) {
 				return fmt.Errorf("invalid bind mount source, source path not found: %s", mount.Source)
 			}
 		}
 	}
 
 	return nil
 }
 
534a90a9
 func (c *containerAdapter) start(ctx context.Context) error {
92899ffa
 	if err := c.checkMounts(); err != nil {
 		return err
 	}
 
ef39256d
 	return c.backend.ContainerStart(c.container.name(), nil, "", "")
534a90a9
 }
 
 func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
 	cs, err := c.backend.ContainerInspectCurrent(c.container.name(), false)
 	if ctx.Err() != nil {
 		return types.ContainerJSON{}, ctx.Err()
 	}
 	if err != nil {
 		return types.ContainerJSON{}, err
 	}
 	return *cs, nil
 }
 
 // events issues a call to the events API and returns a channel with all
 // events. The stream of events can be shutdown by cancelling the context.
1ded1f26
 func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
 	log.G(ctx).Debugf("waiting on events")
 	buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
 	eventsq := make(chan events.Message, len(buffer))
 
 	for _, event := range buffer {
 		eventsq <- event
 	}
 
 	go func() {
 		defer c.backend.UnsubscribeFromEvents(l)
 
 		for {
 			select {
 			case ev := <-l:
 				jev, ok := ev.(events.Message)
 				if !ok {
 					log.G(ctx).Warnf("unexpected event message: %q", ev)
 					continue
 				}
 				select {
 				case eventsq <- jev:
 				case <-ctx.Done():
 					return
 				}
 			case <-ctx.Done():
 				return
 			}
 		}
 	}()
 
 	return eventsq
 }
 
49211715
 func (c *containerAdapter) wait(ctx context.Context) (<-chan containerpkg.StateStatus, error) {
 	return c.backend.ContainerWait(ctx, c.container.nameOrID(), containerpkg.WaitConditionNotRunning)
534a90a9
 }
 
 func (c *containerAdapter) shutdown(ctx context.Context) error {
cc703784
 	// Default stop grace period to nil (daemon will use the stopTimeout of the container)
 	var stopgrace *int
534a90a9
 	spec := c.container.spec()
 	if spec.StopGracePeriod != nil {
cc703784
 		stopgraceValue := int(spec.StopGracePeriod.Seconds)
 		stopgrace = &stopgraceValue
534a90a9
 	}
 	return c.backend.ContainerStop(c.container.name(), stopgrace)
 }
 
 func (c *containerAdapter) terminate(ctx context.Context) error {
 	return c.backend.ContainerKill(c.container.name(), uint64(syscall.SIGKILL))
 }
 
 func (c *containerAdapter) remove(ctx context.Context) error {
 	return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{
 		RemoveVolume: true,
 		ForceRemove:  true,
 	})
 }
 
96a27cf0
 func (c *containerAdapter) createVolumes(ctx context.Context) error {
534a90a9
 	// Create plugin volumes that are embedded inside a Mount
 	for _, mount := range c.container.task.Spec.GetContainer().Mounts {
 		if mount.Type != api.MountTypeVolume {
 			continue
 		}
 
ffeb9fcb
 		if mount.VolumeOptions == nil {
534a90a9
 			continue
 		}
 
 		if mount.VolumeOptions.DriverConfig == nil {
 			continue
 		}
 
 		req := c.container.volumeCreateRequest(&mount)
 
 		// Check if this volume exists on the engine
96a27cf0
 		if _, err := c.backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels); err != nil {
534a90a9
 			// TODO(amitshukla): Today, volume create through the engine api does not return an error
 			// when the named volume with the same parameters already exists.
 			// It returns an error if the driver name is different - that is a valid error
 			return err
 		}
 
 	}
 
 	return nil
 }
 
ca81f6ee
 func (c *containerAdapter) activateServiceBinding() error {
 	return c.backend.ActivateContainerServiceBinding(c.container.name())
 }
 
 func (c *containerAdapter) deactivateServiceBinding() error {
 	return c.backend.DeactivateContainerServiceBinding(c.container.name())
 }
 
1044093b
 func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (<-chan *backend.LogMessage, error) {
 	apiOptions := &types.ContainerLogsOptions{
 		Follow: options.Follow,
 
68f21418
 		// Always say yes to Timestamps and Details. we make the decision
 		// of whether to return these to the user or not way higher up the
 		// stack.
1044093b
 		Timestamps: true,
68f21418
 		Details:    true,
0ec68657
 	}
 
 	if options.Since != nil {
3e987e17
 		since, err := gogotypes.TimestampFromProto(options.Since)
0ec68657
 		if err != nil {
 			return nil, err
 		}
8dc437bd
 		// print since as this formatted string because the docker container
 		// logs interface expects it like this.
 		// see github.com/docker/docker/api/types/time.ParseTimestamps
 		apiOptions.Since = fmt.Sprintf("%d.%09d", since.Unix(), int64(since.Nanosecond()))
0ec68657
 	}
 
 	if options.Tail < 0 {
 		// See protobuf documentation for details of how this works.
 		apiOptions.Tail = fmt.Sprint(-options.Tail - 1)
 	} else if options.Tail > 0 {
514adcf4
 		return nil, errors.New("tail relative to start of logs not supported via docker API")
0ec68657
 	}
 
 	if len(options.Streams) == 0 {
 		// empty == all
 		apiOptions.ShowStdout, apiOptions.ShowStderr = true, true
 	} else {
 		for _, stream := range options.Streams {
 			switch stream {
 			case api.LogStreamStdout:
 				apiOptions.ShowStdout = true
 			case api.LogStreamStderr:
 				apiOptions.ShowStderr = true
 			}
 		}
 	}
ebcb7d6b
 	msgs, _, err := c.backend.ContainerLogs(ctx, c.container.name(), apiOptions)
1044093b
 	if err != nil {
 		return nil, err
 	}
 	return msgs, nil
0ec68657
 }
 
534a90a9
 // todo: typed/wrapped errors
 func isContainerCreateNameConflict(err error) bool {
 	return strings.Contains(err.Error(), "Conflict. The name")
 }
 
 func isUnknownContainer(err error) bool {
 	return strings.Contains(err.Error(), "No such container:")
 }
 
 func isStoppedContainer(err error) bool {
 	return strings.Contains(err.Error(), "is already stopped")
 }