libcontainerd/client_linux.go
9c4570a9
 package libcontainerd
 
 import (
 	"fmt"
 	"os"
 	"strings"
 	"sync"
 	"syscall"
d705dab1
 	"time"
9c4570a9
 
 	"github.com/Sirupsen/logrus"
 	containerd "github.com/docker/containerd/api/grpc/types"
6f2658fb
 	"github.com/docker/docker/pkg/ioutils"
9c4570a9
 	"github.com/docker/docker/pkg/mount"
51f21a16
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/timestamp"
041e5a21
 	specs "github.com/opencontainers/runtime-spec/specs-go"
9c4570a9
 	"golang.org/x/net/context"
 )
 
 type client struct {
 	clientCommon
 
 	// Platform specific properties below here.
 	remote        *remote
 	q             queue
 	exitNotifiers map[string]*exitNotifier
d705dab1
 	liveRestore   bool
9c4570a9
 }
 
2790ac68
 // GetServerVersion returns the connected server version information
 func (clnt *client) GetServerVersion(ctx context.Context) (*ServerVersion, error) {
 	resp, err := clnt.remote.apiClient.GetServerVersion(ctx, &containerd.GetServerVersionRequest{})
 	if err != nil {
 		return nil, err
 	}
 
 	sv := &ServerVersion{
 		GetServerVersionResponse: *resp,
 	}
 
 	return sv, nil
 }
 
18083481
 // AddProcess is the handler for adding a process to an already running
 // container. It's called through docker exec. It returns the system pid of the
 // exec'd process.
37a3be24
 func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) (int, error) {
9c4570a9
 	clnt.lock(containerID)
 	defer clnt.unlock(containerID)
 	container, err := clnt.getContainer(containerID)
 	if err != nil {
18083481
 		return -1, err
9c4570a9
 	}
 
 	spec, err := container.spec()
 	if err != nil {
18083481
 		return -1, err
9c4570a9
 	}
 	sp := spec.Process
 	sp.Args = specp.Args
 	sp.Terminal = specp.Terminal
e03bf122
 	if len(specp.Env) > 0 {
9c4570a9
 		sp.Env = specp.Env
 	}
 	if specp.Cwd != nil {
 		sp.Cwd = *specp.Cwd
 	}
 	if specp.User != nil {
 		sp.User = specs.User{
 			UID:            specp.User.UID,
 			GID:            specp.User.GID,
 			AdditionalGids: specp.User.AdditionalGids,
 		}
 	}
 	if specp.Capabilities != nil {
 		sp.Capabilities = specp.Capabilities
 	}
 
 	p := container.newProcess(processFriendlyName)
 
 	r := &containerd.AddProcessRequest{
 		Args:     sp.Args,
 		Cwd:      sp.Cwd,
 		Terminal: sp.Terminal,
 		Id:       containerID,
 		Env:      sp.Env,
 		User: &containerd.User{
 			Uid:            sp.User.UID,
 			Gid:            sp.User.GID,
 			AdditionalGids: sp.User.AdditionalGids,
 		},
 		Pid:             processFriendlyName,
 		Stdin:           p.fifo(syscall.Stdin),
 		Stdout:          p.fifo(syscall.Stdout),
 		Stderr:          p.fifo(syscall.Stderr),
 		Capabilities:    sp.Capabilities,
 		ApparmorProfile: sp.ApparmorProfile,
 		SelinuxLabel:    sp.SelinuxLabel,
 		NoNewPrivileges: sp.NoNewPrivileges,
8891afd8
 		Rlimits:         convertRlimits(sp.Rlimits),
9c4570a9
 	}
 
 	iopipe, err := p.openFifos(sp.Terminal)
 	if err != nil {
18083481
 		return -1, err
9c4570a9
 	}
 
18083481
 	resp, err := clnt.remote.apiClient.AddProcess(ctx, r)
 	if err != nil {
9c4570a9
 		p.closeFifos(iopipe)
18083481
 		return -1, err
9c4570a9
 	}
 
6f2658fb
 	var stdinOnce sync.Once
 	stdin := iopipe.Stdin
 	iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
 		var err error
 		stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
 			err = stdin.Close()
 			if err2 := p.sendCloseStdin(); err == nil {
 				err = err2
 			}
 		})
 		return err
 	})
 
9c4570a9
 	container.processes[processFriendlyName] = p
 
37a3be24
 	if err := attachStdio(*iopipe); err != nil {
6f2658fb
 		p.closeFifos(iopipe)
18083481
 		return -1, err
9c4570a9
 	}
 
18083481
 	return int(resp.SystemPid), nil
9c4570a9
 }
 
b6c7becb
 func (clnt *client) SignalProcess(containerID string, pid string, sig int) error {
 	clnt.lock(containerID)
 	defer clnt.unlock(containerID)
 	_, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
 		Id:     containerID,
 		Pid:    pid,
 		Signal: uint32(sig),
 	})
 	return err
 }
 
9c4570a9
 func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error {
 	clnt.lock(containerID)
 	defer clnt.unlock(containerID)
 	if _, err := clnt.getContainer(containerID); err != nil {
 		return err
 	}
 	_, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
 		Id:     containerID,
 		Pid:    processFriendlyName,
 		Width:  uint32(width),
 		Height: uint32(height),
 	})
 	return err
 }
 
 func (clnt *client) Pause(containerID string) error {
 	return clnt.setState(containerID, StatePause)
 }
 
 func (clnt *client) setState(containerID, state string) error {
 	clnt.lock(containerID)
 	container, err := clnt.getContainer(containerID)
 	if err != nil {
 		clnt.unlock(containerID)
 		return err
 	}
 	if container.systemPid == 0 {
 		clnt.unlock(containerID)
 		return fmt.Errorf("No active process for container %s", containerID)
 	}
 	st := "running"
 	if state == StatePause {
 		st = "paused"
 	}
 	chstate := make(chan struct{})
 	_, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
 		Id:     containerID,
 		Pid:    InitFriendlyName,
 		Status: st,
 	})
 	if err != nil {
 		clnt.unlock(containerID)
 		return err
 	}
 	container.pauseMonitor.append(state, chstate)
 	clnt.unlock(containerID)
 	<-chstate
 	return nil
 }
 
 func (clnt *client) Resume(containerID string) error {
 	return clnt.setState(containerID, StateResume)
 }
 
 func (clnt *client) Stats(containerID string) (*Stats, error) {
 	resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID})
 	if err != nil {
 		return nil, err
 	}
 	return (*Stats)(resp), nil
 }
 
31358745
 // Take care of the old 1.11.0 behavior in case the version upgrade
e3490cdc
 // happened without a clean daemon shutdown
31358745
 func (clnt *client) cleanupOldRootfs(containerID string) {
 	// Unmount and delete the bundle folder
 	if mts, err := mount.GetMounts(); err == nil {
 		for _, mts := range mts {
 			if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") {
 				if err := syscall.Unmount(mts.Mountpoint, syscall.MNT_DETACH); err == nil {
 					os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs"))
 				}
 				break
 			}
 		}
 	}
 }
 
64483c3b
 func (clnt *client) setExited(containerID string, exitCode uint32) error {
9c4570a9
 	clnt.lock(containerID)
 	defer clnt.unlock(containerID)
 
 	err := clnt.backend.StateChanged(containerID, StateInfo{
818a5198
 		CommonStateInfo: CommonStateInfo{
 			State:    StateExit,
 			ExitCode: exitCode,
 		}})
9c4570a9
 
31358745
 	clnt.cleanupOldRootfs(containerID)
9c4570a9
 
 	return err
 }
 
 func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) {
 	cont, err := clnt.getContainerdContainer(containerID)
 	if err != nil {
 		return nil, err
 	}
 	pids := make([]int, len(cont.Pids))
 	for i, p := range cont.Pids {
 		pids[i] = int(p)
 	}
 	return pids, nil
 }
 
52237787
 // Summary returns a summary of the processes running in a container.
 // This is a no-op on Linux.
 func (clnt *client) Summary(containerID string) ([]Summary, error) {
 	return nil, nil
 }
 
9c4570a9
 func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) {
 	resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID})
 	if err != nil {
 		return nil, err
 	}
 	for _, cont := range resp.Containers {
 		if cont.Id == containerID {
 			return cont, nil
 		}
 	}
 	return nil, fmt.Errorf("invalid state response")
 }
 
 func (clnt *client) UpdateResources(containerID string, resources Resources) error {
 	clnt.lock(containerID)
 	defer clnt.unlock(containerID)
 	container, err := clnt.getContainer(containerID)
 	if err != nil {
 		return err
 	}
 	if container.systemPid == 0 {
 		return fmt.Errorf("No active process for container %s", containerID)
 	}
 	_, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
 		Id:        containerID,
 		Pid:       InitFriendlyName,
 		Resources: (*containerd.UpdateResource)(&resources),
 	})
 	if err != nil {
 		return err
 	}
 	return nil
 }
 
 func (clnt *client) getExitNotifier(containerID string) *exitNotifier {
 	clnt.mapMutex.RLock()
 	defer clnt.mapMutex.RUnlock()
 	return clnt.exitNotifiers[containerID]
 }
 
 func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
 	clnt.mapMutex.Lock()
 	w, ok := clnt.exitNotifiers[containerID]
 	defer clnt.mapMutex.Unlock()
 	if !ok {
 		w = &exitNotifier{c: make(chan struct{}), client: clnt}
 		clnt.exitNotifiers[containerID] = w
 	}
 	return w
 }
 
37a3be24
 func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, attachStdio StdioCallback, options ...CreateOption) (err error) {
31e903b0
 	clnt.lock(cont.Id)
 	defer clnt.unlock(cont.Id)
 
5231c553
 	logrus.Debugf("libcontainerd: restore container %s state %s", cont.Id, cont.Status)
31e903b0
 
 	containerID := cont.Id
 	if _, err := clnt.getContainer(containerID); err == nil {
 		return fmt.Errorf("container %s is already active", containerID)
 	}
 
 	defer func() {
 		if err != nil {
 			clnt.deleteContainer(cont.Id)
 		}
 	}()
 
 	container := clnt.newContainer(cont.BundlePath, options...)
 	container.systemPid = systemPid(cont)
 
 	var terminal bool
 	for _, p := range cont.Processes {
 		if p.Pid == InitFriendlyName {
 			terminal = p.Terminal
 		}
 	}
 
 	iopipe, err := container.openFifos(terminal)
 	if err != nil {
 		return err
 	}
6f2658fb
 	var stdinOnce sync.Once
 	stdin := iopipe.Stdin
 	iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
 		var err error
 		stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
 			err = stdin.Close()
 		})
 		return err
 	})
31e903b0
 
37a3be24
 	if err := attachStdio(*iopipe); err != nil {
6f2658fb
 		container.closeFifos(iopipe)
31e903b0
 		return err
 	}
 
 	clnt.appendContainer(container)
 
 	err = clnt.backend.StateChanged(containerID, StateInfo{
 		CommonStateInfo: CommonStateInfo{
 			State: StateRestore,
 			Pid:   container.systemPid,
 		}})
 
 	if err != nil {
6f2658fb
 		container.closeFifos(iopipe)
31e903b0
 		return err
 	}
 
64483c3b
 	if lastEvent != nil {
31e903b0
 		// This should only be a pause or resume event
64483c3b
 		if lastEvent.Type == StatePause || lastEvent.Type == StateResume {
31e903b0
 			return clnt.backend.StateChanged(containerID, StateInfo{
 				CommonStateInfo: CommonStateInfo{
64483c3b
 					State: lastEvent.Type,
31e903b0
 					Pid:   container.systemPid,
 				}})
 		}
 
5231c553
 		logrus.Warnf("libcontainerd: unexpected backlog event: %#v", lastEvent)
31e903b0
 	}
 
 	return nil
 }
 
51f21a16
 func (clnt *client) getContainerLastEventSinceTime(id string, tsp *timestamp.Timestamp) (*containerd.Event, error) {
64483c3b
 	er := &containerd.EventsRequest{
51f21a16
 		Timestamp:  tsp,
64483c3b
 		StoredOnly: true,
51f21a16
 		Id:         id,
64483c3b
 	}
 	events, err := clnt.remote.apiClient.Events(context.Background(), er)
 	if err != nil {
 		logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err)
 		return nil, err
 	}
 
 	var ev *containerd.Event
 	for {
 		e, err := events.Recv()
 		if err != nil {
 			if err.Error() == "EOF" {
 				break
d705dab1
 			}
51f21a16
 			logrus.Errorf("libcontainerd: failed to get container event for %s: %q", id, err)
64483c3b
 			return nil, err
 		}
5e5d02b9
 		ev = e
 		logrus.Debugf("libcontainerd: received past event %#v", ev)
d705dab1
 	}
 
64483c3b
 	return ev, nil
 }
 
51f21a16
 func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error) {
 	ev, err := clnt.getContainerLastEventSinceTime(id, clnt.remote.restoreFromTimestamp)
 	if err == nil && ev == nil {
 		// If ev is nil and the container is running in containerd,
 		// we already consumed all the event of the
 		// container, included the "exit" one.
 		// Thus, we request all events containerd has in memory for
 		// this container in order to get the last one (which should
 		// be an exit event)
 		logrus.Warnf("libcontainerd: client is out of sync, restore was called on a fully synced container (%s).", id)
 		// Request all events since beginning of time
 		t := time.Unix(0, 0)
 		tsp, err := ptypes.TimestampProto(t)
 		if err != nil {
 			logrus.Errorf("libcontainerd: getLastEventSinceTime() failed to convert timestamp: %q", err)
 			return nil, err
 		}
 
 		return clnt.getContainerLastEventSinceTime(id, tsp)
 	}
 
 	return ev, err
 }
 
37a3be24
 func (clnt *client) Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error {
64483c3b
 	// Synchronize with live events
 	clnt.remote.Lock()
 	defer clnt.remote.Unlock()
 	// Check that containerd still knows this container.
 	//
 	// In the unlikely event that Restore for this container process
 	// the its past event before the main loop, the event will be
 	// processed twice. However, this is not an issue as all those
 	// events will do is change the state of the container to be
 	// exactly the same.
31e903b0
 	cont, err := clnt.getContainerdContainer(containerID)
64483c3b
 	// Get its last event
 	ev, eerr := clnt.getContainerLastEvent(containerID)
 	if err != nil || cont.Status == "Stopped" {
5e5d02b9
 		if err != nil {
 			logrus.Warnf("libcontainerd: failed to retrieve container %s state: %v", containerID, err)
64483c3b
 		}
32e0ea34
 		if ev != nil && (ev.Pid != InitFriendlyName || ev.Type != StateExit) {
5e5d02b9
 			// Wait a while for the exit event
 			timeout := time.NewTimer(10 * time.Second)
 			tick := time.NewTicker(100 * time.Millisecond)
 		stop:
 			for {
 				select {
 				case <-timeout.C:
 					break stop
 				case <-tick.C:
 					ev, eerr = clnt.getContainerLastEvent(containerID)
 					if eerr != nil {
 						break stop
 					}
 					if ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit {
 						break stop
 					}
 				}
2650b1b6
 			}
5e5d02b9
 			timeout.Stop()
 			tick.Stop()
64483c3b
 		}
 
5e5d02b9
 		// get the exit status for this container, if we don't have
 		// one, indicate an error
 		ec := uint32(255)
 		if eerr == nil && ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit {
64483c3b
 			ec = ev.Status
 		}
 		clnt.setExited(containerID, ec)
 
 		return nil
 	}
 
 	// container is still alive
 	if clnt.liveRestore {
37a3be24
 		if err := clnt.restore(cont, ev, attachStdio, options...); err != nil {
5231c553
 			logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
64483c3b
 		}
 		return nil
 	}
 
 	// Kill the container if liveRestore == false
 	w := clnt.getOrCreateExitNotifier(containerID)
 	clnt.lock(cont.Id)
 	container := clnt.newContainer(cont.BundlePath)
 	container.systemPid = systemPid(cont)
 	clnt.appendContainer(container)
 	clnt.unlock(cont.Id)
 
 	container.discardFifos()
 
 	if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil {
5231c553
 		logrus.Errorf("libcontainerd: error sending sigterm to %v: %v", containerID, err)
64483c3b
 	}
ac068a1f
 	// Let the main loop handle the exit event
 	clnt.remote.Unlock()
64483c3b
 	select {
 	case <-time.After(10 * time.Second):
 		if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil {
5231c553
 			logrus.Errorf("libcontainerd: error sending sigkill to %v: %v", containerID, err)
d705dab1
 		}
 		select {
64483c3b
 		case <-time.After(2 * time.Second):
d705dab1
 		case <-w.wait():
ac068a1f
 			// relock because of the defer
 			clnt.remote.Lock()
d705dab1
 			return nil
31e903b0
 		}
64483c3b
 	case <-w.wait():
ac068a1f
 		// relock because of the defer
 		clnt.remote.Lock()
64483c3b
 		return nil
31e903b0
 	}
c75de8e3
 	// relock because of the defer
 	clnt.remote.Lock()
d705dab1
 
 	clnt.deleteContainer(containerID)
 
64483c3b
 	return clnt.setExited(containerID, uint32(255))
31e903b0
 }
 
d8fef66b
 func (clnt *client) CreateCheckpoint(containerID string, checkpointID string, checkpointDir string, exit bool) error {
 	clnt.lock(containerID)
 	defer clnt.unlock(containerID)
 	if _, err := clnt.getContainer(containerID); err != nil {
 		return err
 	}
 
 	_, err := clnt.remote.apiClient.CreateCheckpoint(context.Background(), &containerd.CreateCheckpointRequest{
 		Id: containerID,
 		Checkpoint: &containerd.Checkpoint{
 			Name:        checkpointID,
 			Exit:        exit,
 			Tcp:         true,
 			UnixSockets: true,
 			Shell:       false,
 			EmptyNS:     []string{"network"},
 		},
 		CheckpointDir: checkpointDir,
 	})
 	return err
 }
 
 func (clnt *client) DeleteCheckpoint(containerID string, checkpointID string, checkpointDir string) error {
 	clnt.lock(containerID)
 	defer clnt.unlock(containerID)
 	if _, err := clnt.getContainer(containerID); err != nil {
 		return err
 	}
 
 	_, err := clnt.remote.apiClient.DeleteCheckpoint(context.Background(), &containerd.DeleteCheckpointRequest{
 		Id:            containerID,
 		Name:          checkpointID,
 		CheckpointDir: checkpointDir,
 	})
 	return err
 }
 
 func (clnt *client) ListCheckpoints(containerID string, checkpointDir string) (*Checkpoints, error) {
 	clnt.lock(containerID)
 	defer clnt.unlock(containerID)
 	if _, err := clnt.getContainer(containerID); err != nil {
 		return nil, err
 	}
 
 	resp, err := clnt.remote.apiClient.ListCheckpoint(context.Background(), &containerd.ListCheckpointRequest{
 		Id:            containerID,
 		CheckpointDir: checkpointDir,
 	})
 	if err != nil {
 		return nil, err
 	}
 	return (*Checkpoints)(resp), nil
 }