534a90a9 |
package container
import (
"encoding/base64"
"encoding/json" |
514adcf4 |
"errors" |
534a90a9 |
"fmt"
"io" |
92899ffa |
"os" |
a1fe1dc7 |
"runtime" |
534a90a9 |
"strings"
"syscall" |
1ded1f26 |
"time" |
534a90a9 |
|
3a127939 |
"github.com/docker/distribution/reference" |
7c36a1af |
"github.com/docker/docker/api/types" |
0ec68657 |
"github.com/docker/docker/api/types/backend" |
bad849fc |
containertypes "github.com/docker/docker/api/types/container" |
7c36a1af |
"github.com/docker/docker/api/types/events" |
cfdf84d5 |
containerpkg "github.com/docker/docker/container" |
bebd472e |
"github.com/docker/docker/daemon/cluster/convert" |
534a90a9 |
executorpkg "github.com/docker/docker/daemon/cluster/executor"
"github.com/docker/libnetwork" |
3716ec25 |
"github.com/docker/swarmkit/agent/exec" |
534a90a9 |
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log" |
3e987e17 |
gogotypes "github.com/gogo/protobuf/types" |
7a855799 |
"github.com/opencontainers/go-digest" |
1009e6a4 |
"github.com/sirupsen/logrus" |
534a90a9 |
"golang.org/x/net/context" |
fa0054a3 |
"golang.org/x/time/rate" |
534a90a9 |
)
// containerAdapter conducts remote operations for a container. All calls
// are mostly naked calls to the client API, seeded with information from
// containerConfig.
type containerAdapter struct { |
9e9fc7b5 |
backend executorpkg.Backend
container *containerConfig
dependencies exec.DependencyGetter |
534a90a9 |
}
|
e2f09fa6 |
func newContainerAdapter(b executorpkg.Backend, task *api.Task, node *api.NodeDescription, dependencies exec.DependencyGetter) (*containerAdapter, error) {
ctnr, err := newContainerConfig(task, node) |
534a90a9 |
if err != nil {
return nil, err
}
return &containerAdapter{ |
9e9fc7b5 |
container: ctnr,
backend: b,
dependencies: dependencies, |
534a90a9 |
}, nil
}
func (c *containerAdapter) pullImage(ctx context.Context) error { |
64a567d2 |
spec := c.container.spec()
|
089842c4 |
// Skip pulling if the image is referenced by image ID. |
7a855799 |
if _, err := digest.Parse(spec.Image); err == nil { |
089842c4 |
return nil
}
|
f69e5c18 |
// Skip pulling if the image is referenced by digest and already
// exists locally. |
3a127939 |
named, err := reference.ParseNormalizedNamed(spec.Image) |
f69e5c18 |
if err == nil {
if _, ok := named.(reference.Canonical); ok {
_, err := c.backend.LookupImage(spec.Image)
if err == nil {
return nil
}
}
}
|
534a90a9 |
// if the image needs to be pulled, the auth config will be retrieved and updated |
64a567d2 |
var encodedAuthConfig string
if spec.PullOptions != nil {
encodedAuthConfig = spec.PullOptions.RegistryAuth
} |
534a90a9 |
authConfig := &types.AuthConfig{}
if encodedAuthConfig != "" {
if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
logrus.Warnf("invalid authconfig: %v", err)
}
}
pr, pw := io.Pipe()
metaHeaders := map[string][]string{}
go func() { |
a1fe1dc7 |
// TODO @jhowardmsft LCOW Support: This will need revisiting as
// the stack is built up to include LCOW support for swarm.
platform := runtime.GOOS
err := c.backend.PullImage(ctx, c.container.image(), "", platform, metaHeaders, authConfig, pw) |
534a90a9 |
pw.CloseWithError(err)
}()
dec := json.NewDecoder(pr) |
fa0054a3 |
dec.UseNumber() |
534a90a9 |
m := map[string]interface{}{} |
fa0054a3 |
spamLimiter := rate.NewLimiter(rate.Every(time.Second), 1)
lastStatus := "" |
534a90a9 |
for {
if err := dec.Decode(&m); err != nil {
if err == io.EOF {
break
}
return err
} |
fa0054a3 |
l := log.G(ctx)
// limit pull progress logs unless the status changes
if spamLimiter.Allow() || lastStatus != m["status"] {
// if we have progress details, we have everything we need
if progress, ok := m["progressDetail"].(map[string]interface{}); ok {
// first, log the image and status
l = l.WithFields(logrus.Fields{
"image": c.container.image(),
"status": m["status"],
})
// then, if we have progress, log the progress
if progress["current"] != nil && progress["total"] != nil {
l = l.WithFields(logrus.Fields{
"current": progress["current"],
"total": progress["total"],
})
}
}
l.Debug("pull in progress")
}
// sometimes, we get no useful information at all, and add no fields
if status, ok := m["status"].(string); ok {
lastStatus = status
} |
534a90a9 |
} |
fa0054a3 |
|
534a90a9 |
// if the final stream object contained an error, return it
if errMsg, ok := m["error"]; ok {
return fmt.Errorf("%v", errMsg)
}
return nil
}
func (c *containerAdapter) createNetworks(ctx context.Context) error { |
cafed80c |
for name := range c.container.networksAttachments {
ncr, err := c.container.networkCreateRequest(name) |
534a90a9 |
if err != nil {
return err
}
if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
if _, ok := err.(libnetwork.NetworkNameError); ok {
continue
}
return err
}
}
return nil
}
func (c *containerAdapter) removeNetworks(ctx context.Context) error { |
cafed80c |
for name, v := range c.container.networksAttachments {
if err := c.backend.DeleteManagedNetwork(v.Network.ID); err != nil { |
434eae7d |
switch err.(type) {
case *libnetwork.ActiveEndpointsError: |
534a90a9 |
continue |
434eae7d |
case libnetwork.ErrNoSuchNetwork:
continue
default: |
cafed80c |
log.G(ctx).Errorf("network %s remove failed: %v", name, err) |
434eae7d |
return err |
534a90a9 |
}
}
}
return nil
}
|
99a98ccc |
func (c *containerAdapter) networkAttach(ctx context.Context) error { |
b34d3e73 |
config := c.container.createNetworkingConfig(c.backend) |
99a98ccc |
var (
networkName string
networkID string
)
if config != nil {
for n, epConfig := range config.EndpointsConfig {
networkName = n
networkID = epConfig.NetworkID
break
}
}
|
27c0131a |
return c.backend.UpdateAttachment(networkName, networkID, c.container.networkAttachmentContainerID(), config) |
99a98ccc |
}
func (c *containerAdapter) waitForDetach(ctx context.Context) error { |
b34d3e73 |
config := c.container.createNetworkingConfig(c.backend) |
99a98ccc |
var (
networkName string
networkID string
)
if config != nil {
for n, epConfig := range config.EndpointsConfig {
networkName = n
networkID = epConfig.NetworkID
break
}
}
|
27c0131a |
return c.backend.WaitForDetachment(ctx, networkName, networkID, c.container.taskID(), c.container.networkAttachmentContainerID()) |
99a98ccc |
}
|
96a27cf0 |
func (c *containerAdapter) create(ctx context.Context) error { |
bad849fc |
var cr containertypes.ContainerCreateCreatedBody |
534a90a9 |
var err error |
96a27cf0 |
if cr, err = c.backend.CreateManagedContainer(types.ContainerCreateConfig{ |
534a90a9 |
Name: c.container.name(),
Config: c.container.config(),
HostConfig: c.container.hostConfig(),
// Use the first network in container create |
b34d3e73 |
NetworkingConfig: c.container.createNetworkingConfig(c.backend), |
ef39256d |
}); err != nil { |
534a90a9 |
return err
}
|
7135afa7 |
// Docker daemon currently doesn't support multiple networks in container create |
534a90a9 |
// Connect to all other networks |
b34d3e73 |
nc := c.container.connectNetworkingConfig(c.backend) |
534a90a9 |
if nc != nil {
for n, ep := range nc.EndpointsConfig { |
96a27cf0 |
if err := c.backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil { |
534a90a9 |
return err
}
}
}
|
b2e4c7f3 |
container := c.container.task.Spec.GetContainer()
if container == nil { |
514adcf4 |
return errors.New("unable to get container from task spec") |
b2e4c7f3 |
} |
88dea0e0 |
|
9e9fc7b5 |
if err := c.backend.SetContainerDependencyStore(cr.ID, c.dependencies); err != nil {
return err
}
|
bebd472e |
// configure secrets |
9e9fc7b5 |
secretRefs := convert.SecretReferencesFromGRPC(container.Secrets)
if err := c.backend.SetContainerSecretReferences(cr.ID, secretRefs); err != nil { |
bebd472e |
return err |
3716ec25 |
}
|
9e9fc7b5 |
configRefs := convert.ConfigReferencesFromGRPC(container.Configs)
if err := c.backend.SetContainerConfigReferences(cr.ID, configRefs); err != nil { |
3716ec25 |
return err
}
|
b4a63139 |
return c.backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig()) |
534a90a9 |
}
|
92899ffa |
// checkMounts ensures that the provided mounts won't have any host-specific
// problems at start up. For example, we disallow bind mounts without an
// existing path, which slightly different from the container API.
func (c *containerAdapter) checkMounts() error {
spec := c.container.spec()
for _, mount := range spec.Mounts {
switch mount.Type {
case api.MountTypeBind:
if _, err := os.Stat(mount.Source); os.IsNotExist(err) {
return fmt.Errorf("invalid bind mount source, source path not found: %s", mount.Source)
}
}
}
return nil
}
|
534a90a9 |
func (c *containerAdapter) start(ctx context.Context) error { |
92899ffa |
if err := c.checkMounts(); err != nil {
return err
}
|
ef39256d |
return c.backend.ContainerStart(c.container.name(), nil, "", "") |
534a90a9 |
}
func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
cs, err := c.backend.ContainerInspectCurrent(c.container.name(), false)
if ctx.Err() != nil {
return types.ContainerJSON{}, ctx.Err()
}
if err != nil {
return types.ContainerJSON{}, err
}
return *cs, nil
}
// events issues a call to the events API and returns a channel with all
// events. The stream of events can be shutdown by cancelling the context. |
1ded1f26 |
func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
log.G(ctx).Debugf("waiting on events")
buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
eventsq := make(chan events.Message, len(buffer))
for _, event := range buffer {
eventsq <- event
}
go func() {
defer c.backend.UnsubscribeFromEvents(l)
for {
select {
case ev := <-l:
jev, ok := ev.(events.Message)
if !ok {
log.G(ctx).Warnf("unexpected event message: %q", ev)
continue
}
select {
case eventsq <- jev:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return eventsq
}
|
49211715 |
func (c *containerAdapter) wait(ctx context.Context) (<-chan containerpkg.StateStatus, error) {
return c.backend.ContainerWait(ctx, c.container.nameOrID(), containerpkg.WaitConditionNotRunning) |
534a90a9 |
}
func (c *containerAdapter) shutdown(ctx context.Context) error { |
cc703784 |
// Default stop grace period to nil (daemon will use the stopTimeout of the container)
var stopgrace *int |
534a90a9 |
spec := c.container.spec()
if spec.StopGracePeriod != nil { |
cc703784 |
stopgraceValue := int(spec.StopGracePeriod.Seconds)
stopgrace = &stopgraceValue |
534a90a9 |
}
return c.backend.ContainerStop(c.container.name(), stopgrace)
}
func (c *containerAdapter) terminate(ctx context.Context) error {
return c.backend.ContainerKill(c.container.name(), uint64(syscall.SIGKILL))
}
func (c *containerAdapter) remove(ctx context.Context) error {
return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{
RemoveVolume: true,
ForceRemove: true,
})
}
|
96a27cf0 |
func (c *containerAdapter) createVolumes(ctx context.Context) error { |
534a90a9 |
// Create plugin volumes that are embedded inside a Mount
for _, mount := range c.container.task.Spec.GetContainer().Mounts {
if mount.Type != api.MountTypeVolume {
continue
}
|
ffeb9fcb |
if mount.VolumeOptions == nil { |
534a90a9 |
continue
}
if mount.VolumeOptions.DriverConfig == nil {
continue
}
req := c.container.volumeCreateRequest(&mount)
// Check if this volume exists on the engine |
96a27cf0 |
if _, err := c.backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels); err != nil { |
534a90a9 |
// TODO(amitshukla): Today, volume create through the engine api does not return an error
// when the named volume with the same parameters already exists.
// It returns an error if the driver name is different - that is a valid error
return err
}
}
return nil
}
|
ca81f6ee |
func (c *containerAdapter) activateServiceBinding() error {
return c.backend.ActivateContainerServiceBinding(c.container.name())
}
func (c *containerAdapter) deactivateServiceBinding() error {
return c.backend.DeactivateContainerServiceBinding(c.container.name())
}
|
1044093b |
func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (<-chan *backend.LogMessage, error) {
apiOptions := &types.ContainerLogsOptions{
Follow: options.Follow,
|
68f21418 |
// Always say yes to Timestamps and Details. we make the decision
// of whether to return these to the user or not way higher up the
// stack. |
1044093b |
Timestamps: true, |
68f21418 |
Details: true, |
0ec68657 |
}
if options.Since != nil { |
3e987e17 |
since, err := gogotypes.TimestampFromProto(options.Since) |
0ec68657 |
if err != nil {
return nil, err
} |
8dc437bd |
// print since as this formatted string because the docker container
// logs interface expects it like this.
// see github.com/docker/docker/api/types/time.ParseTimestamps
apiOptions.Since = fmt.Sprintf("%d.%09d", since.Unix(), int64(since.Nanosecond())) |
0ec68657 |
}
if options.Tail < 0 {
// See protobuf documentation for details of how this works.
apiOptions.Tail = fmt.Sprint(-options.Tail - 1)
} else if options.Tail > 0 { |
514adcf4 |
return nil, errors.New("tail relative to start of logs not supported via docker API") |
0ec68657 |
}
if len(options.Streams) == 0 {
// empty == all
apiOptions.ShowStdout, apiOptions.ShowStderr = true, true
} else {
for _, stream := range options.Streams {
switch stream {
case api.LogStreamStdout:
apiOptions.ShowStdout = true
case api.LogStreamStderr:
apiOptions.ShowStderr = true
}
}
} |
ebcb7d6b |
msgs, _, err := c.backend.ContainerLogs(ctx, c.container.name(), apiOptions) |
1044093b |
if err != nil {
return nil, err
}
return msgs, nil |
0ec68657 |
}
|
534a90a9 |
// todo: typed/wrapped errors
func isContainerCreateNameConflict(err error) bool {
return strings.Contains(err.Error(), "Conflict. The name")
}
func isUnknownContainer(err error) bool {
return strings.Contains(err.Error(), "No such container:")
}
func isStoppedContainer(err error) bool {
return strings.Contains(err.Error(), "is already stopped")
} |