libcontainerd/remote_unix.go
934328d8
 // +build linux solaris
 
9c4570a9
 package libcontainerd
 
 import (
 	"fmt"
 	"io"
b0280c37
 	"io/ioutil"
 	"log"
9c4570a9
 	"net"
 	"os"
 	"os/exec"
 	"path/filepath"
934328d8
 	goruntime "runtime"
9c4570a9
 	"strconv"
36540170
 	"strings"
9c4570a9
 	"sync"
 	"syscall"
 	"time"
 
 	"github.com/Sirupsen/logrus"
 	containerd "github.com/docker/containerd/api/grpc/types"
a7851e25
 	"github.com/docker/docker/pkg/locker"
8c1ac816
 	"github.com/docker/docker/pkg/system"
29b27145
 	"github.com/golang/protobuf/ptypes"
64483c3b
 	"github.com/golang/protobuf/ptypes/timestamp"
9c4570a9
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
b0280c37
 	"google.golang.org/grpc/grpclog"
e4ddcb37
 	"google.golang.org/grpc/health/grpc_health_v1"
a02ae66d
 	"google.golang.org/grpc/transport"
9c4570a9
 )
 
 const (
e4ddcb37
 	maxConnectionRetryCount      = 3
 	containerdHealthCheckTimeout = 3 * time.Second
 	containerdShutdownTimeout    = 15 * time.Second
 	containerdBinary             = "docker-containerd"
 	containerdPidFilename        = "docker-containerd.pid"
 	containerdSockFilename       = "docker-containerd.sock"
 	containerdStateDir           = "containerd"
 	eventTimestampFilename       = "event.ts"
9c4570a9
 )
 
 type remote struct {
 	sync.RWMutex
64483c3b
 	apiClient            containerd.APIClient
 	daemonPid            int
 	stateDir             string
 	rpcAddr              string
 	startDaemon          bool
 	closeManually        bool
 	debugLog             bool
 	rpcConn              *grpc.ClientConn
 	clients              []*client
 	eventTsPath          string
 	runtime              string
 	runtimeArgs          []string
 	daemonWaitCh         chan struct{}
 	liveRestore          bool
 	oomScore             int
 	restoreFromTimestamp *timestamp.Timestamp
9c4570a9
 }
 
 // New creates a fresh instance of libcontainerd remote.
 func New(stateDir string, options ...RemoteOption) (_ Remote, err error) {
 	defer func() {
 		if err != nil {
2d08a764
 			err = fmt.Errorf("Failed to connect to containerd. Please make sure containerd is installed in your PATH or you have specified the correct address. Got error: %v", err)
9c4570a9
 		}
 	}()
 	r := &remote{
 		stateDir:    stateDir,
 		daemonPid:   -1,
 		eventTsPath: filepath.Join(stateDir, eventTimestampFilename),
 	}
 	for _, option := range options {
 		if err := option.Apply(r); err != nil {
 			return nil, err
 		}
 	}
 
8c1ac816
 	if err := system.MkdirAll(stateDir, 0700); err != nil {
9c4570a9
 		return nil, err
 	}
 
 	if r.rpcAddr == "" {
 		r.rpcAddr = filepath.Join(stateDir, containerdSockFilename)
 	}
 
 	if r.startDaemon {
 		if err := r.runContainerdDaemon(); err != nil {
 			return nil, err
 		}
 	}
 
b0280c37
 	// don't output the grpc reconnect logging
 	grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags))
9c4570a9
 	dialOpts := append([]grpc.DialOption{grpc.WithInsecure()},
 		grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
 			return net.DialTimeout("unix", addr, timeout)
 		}),
 	)
 	conn, err := grpc.Dial(r.rpcAddr, dialOpts...)
 	if err != nil {
 		return nil, fmt.Errorf("error connecting to containerd: %v", err)
 	}
 
 	r.rpcConn = conn
 	r.apiClient = containerd.NewAPIClient(conn)
 
64483c3b
 	// Get the timestamp to restore from
 	t := r.getLastEventTimestamp()
 	tsp, err := ptypes.TimestampProto(t)
 	if err != nil {
 		logrus.Errorf("libcontainerd: failed to convert timestamp: %q", err)
 	}
 	r.restoreFromTimestamp = tsp
 
9c4570a9
 	go r.handleConnectionChange()
 
 	if err := r.startEventsMonitor(); err != nil {
 		return nil, err
 	}
 
 	return r, nil
 }
 
d705dab1
 func (r *remote) UpdateOptions(options ...RemoteOption) error {
 	for _, option := range options {
 		if err := option.Apply(r); err != nil {
 			return err
 		}
 	}
 	return nil
 }
 
9c4570a9
 func (r *remote) handleConnectionChange() {
 	var transientFailureCount = 0
e4ddcb37
 
 	ticker := time.NewTicker(500 * time.Millisecond)
 	defer ticker.Stop()
 	healthClient := grpc_health_v1.NewHealthClient(r.rpcConn)
 
9c4570a9
 	for {
e4ddcb37
 		<-ticker.C
 		ctx, cancel := context.WithTimeout(context.Background(), containerdHealthCheckTimeout)
 		_, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
 		cancel()
 		if err == nil {
 			continue
9c4570a9
 		}
e4ddcb37
 
 		logrus.Debugf("libcontainerd: containerd health check returned error: %v", err)
9c4570a9
 
 		if r.daemonPid != -1 {
e4ddcb37
 			if strings.Contains(err.Error(), "is closing") {
9c4570a9
 				// Well, we asked for it to stop, just return
 				return
 			}
e4ddcb37
 			// all other errors are transient
 			// Reset state to be notified of next failure
 			transientFailureCount++
 			if transientFailureCount >= maxConnectionRetryCount {
 				transientFailureCount = 0
8c1ac816
 				if system.IsProcessAlive(r.daemonPid) {
 					system.KillProcess(r.daemonPid)
e4ddcb37
 				}
 				<-r.daemonWaitCh
 				if err := r.runContainerdDaemon(); err != nil { //FIXME: Handle error
 					logrus.Errorf("libcontainerd: error restarting containerd: %v", err)
 				}
 				continue
 			}
9c4570a9
 		}
 	}
 }
 
 func (r *remote) Cleanup() {
 	if r.daemonPid == -1 {
 		return
 	}
a02ae66d
 	r.closeManually = true
9c4570a9
 	r.rpcConn.Close()
 	// Ask the daemon to quit
 	syscall.Kill(r.daemonPid, syscall.SIGTERM)
 
 	// Wait up to 15secs for it to stop
 	for i := time.Duration(0); i < containerdShutdownTimeout; i += time.Second {
8c1ac816
 		if !system.IsProcessAlive(r.daemonPid) {
9c4570a9
 			break
 		}
 		time.Sleep(time.Second)
 	}
 
8c1ac816
 	if system.IsProcessAlive(r.daemonPid) {
9c4570a9
 		logrus.Warnf("libcontainerd: containerd (%d) didn't stop within 15 secs, killing it\n", r.daemonPid)
 		syscall.Kill(r.daemonPid, syscall.SIGKILL)
 	}
 
 	// cleanup some files
 	os.Remove(filepath.Join(r.stateDir, containerdPidFilename))
 	os.Remove(filepath.Join(r.stateDir, containerdSockFilename))
 }
 
 func (r *remote) Client(b Backend) (Client, error) {
 	c := &client{
 		clientCommon: clientCommon{
a7851e25
 			backend:    b,
 			containers: make(map[string]*container),
 			locker:     locker.New(),
9c4570a9
 		},
 		remote:        r,
 		exitNotifiers: make(map[string]*exitNotifier),
d705dab1
 		liveRestore:   r.liveRestore,
9c4570a9
 	}
 
 	r.Lock()
 	r.clients = append(r.clients, c)
 	r.Unlock()
 	return c, nil
 }
 
 func (r *remote) updateEventTimestamp(t time.Time) {
 	f, err := os.OpenFile(r.eventTsPath, syscall.O_CREAT|syscall.O_WRONLY|syscall.O_TRUNC, 0600)
 	if err != nil {
 		logrus.Warnf("libcontainerd: failed to open event timestamp file: %v", err)
 		return
 	}
0ead6244
 	defer f.Close()
9c4570a9
 
 	b, err := t.MarshalText()
 	if err != nil {
 		logrus.Warnf("libcontainerd: failed to encode timestamp: %v", err)
 		return
 	}
 
 	n, err := f.Write(b)
 	if err != nil || n != len(b) {
 		logrus.Warnf("libcontainerd: failed to update event timestamp file: %v", err)
 		f.Truncate(0)
 		return
 	}
 }
 
29b27145
 func (r *remote) getLastEventTimestamp() time.Time {
9c4570a9
 	t := time.Now()
 
 	fi, err := os.Stat(r.eventTsPath)
0d9b94c4
 	if os.IsNotExist(err) || fi.Size() == 0 {
29b27145
 		return t
9c4570a9
 	}
 
 	f, err := os.Open(r.eventTsPath)
 	if err != nil {
44ccbb31
 		logrus.Warnf("libcontainerd: Unable to access last event ts: %v", err)
29b27145
 		return t
9c4570a9
 	}
0ead6244
 	defer f.Close()
9c4570a9
 
 	b := make([]byte, fi.Size())
 	n, err := f.Read(b)
 	if err != nil || n != len(b) {
44ccbb31
 		logrus.Warnf("libcontainerd: Unable to read last event ts: %v", err)
29b27145
 		return t
9c4570a9
 	}
 
 	t.UnmarshalText(b)
 
29b27145
 	return t
9c4570a9
 }
 
 func (r *remote) startEventsMonitor() error {
 	// First, get past events
64483c3b
 	t := r.getLastEventTimestamp()
 	tsp, err := ptypes.TimestampProto(t)
29b27145
 	if err != nil {
 		logrus.Errorf("libcontainerd: failed to convert timestamp: %q", err)
 	}
9c4570a9
 	er := &containerd.EventsRequest{
29b27145
 		Timestamp: tsp,
9c4570a9
 	}
5fb38324
 	events, err := r.apiClient.Events(context.Background(), er, grpc.FailFast(false))
9c4570a9
 	if err != nil {
 		return err
 	}
 	go r.handleEventStream(events)
 	return nil
 }
 
 func (r *remote) handleEventStream(events containerd.API_EventsClient) {
 	for {
 		e, err := events.Recv()
 		if err != nil {
a02ae66d
 			if grpc.ErrorDesc(err) == transport.ErrConnClosing.Desc &&
 				r.closeManually {
 				// ignore error if grpc remote connection is closed manually
 				return
 			}
5231c553
 			logrus.Errorf("libcontainerd: failed to receive event from containerd: %v", err)
9c4570a9
 			go r.startEventsMonitor()
 			return
 		}
 
5231c553
 		logrus.Debugf("libcontainerd: received containerd event: %#v", e)
9c4570a9
 
29b27145
 		var container *container
 		var c *client
 		r.RLock()
 		for _, c = range r.clients {
 			container, err = c.getContainer(e.Id)
 			if err == nil {
 				break
9c4570a9
 			}
29b27145
 		}
 		r.RUnlock()
 		if container == nil {
64483c3b
 			logrus.Warnf("libcontainerd: unknown container %s", e.Id)
29b27145
 			continue
 		}
 
 		if err := container.handleEvent(e); err != nil {
 			logrus.Errorf("libcontainerd: error processing state change for %s: %v", e.Id, err)
 		}
9c4570a9
 
29b27145
 		tsp, err := ptypes.Timestamp(e.Timestamp)
 		if err != nil {
 			logrus.Errorf("libcontainerd: failed to convert event timestamp: %q", err)
 			continue
9c4570a9
 		}
29b27145
 
 		r.updateEventTimestamp(tsp)
9c4570a9
 	}
 }
 
 func (r *remote) runContainerdDaemon() error {
 	pidFilename := filepath.Join(r.stateDir, containerdPidFilename)
 	f, err := os.OpenFile(pidFilename, os.O_RDWR|os.O_CREATE, 0600)
 	if err != nil {
 		return err
 	}
0ead6244
 	defer f.Close()
9c4570a9
 
 	// File exist, check if the daemon is alive
 	b := make([]byte, 8)
 	n, err := f.Read(b)
 	if err != nil && err != io.EOF {
 		return err
 	}
 
 	if n > 0 {
 		pid, err := strconv.ParseUint(string(b[:n]), 10, 64)
 		if err != nil {
 			return err
 		}
8c1ac816
 		if system.IsProcessAlive(int(pid)) {
5231c553
 			logrus.Infof("libcontainerd: previous instance of containerd still alive (%d)", pid)
9c4570a9
 			r.daemonPid = int(pid)
 			return nil
 		}
 	}
 
 	// rewind the file
 	_, err = f.Seek(0, os.SEEK_SET)
 	if err != nil {
 		return err
 	}
 
 	// Truncate it
 	err = f.Truncate(0)
 	if err != nil {
 		return err
 	}
 
 	// Start a new instance
6889c327
 	args := []string{
 		"-l", fmt.Sprintf("unix://%s", r.rpcAddr),
 		"--metrics-interval=0",
64a91ee7
 		"--start-timeout", "2m",
8b5e5c61
 		"--state-dir", filepath.Join(r.stateDir, containerdStateDir),
6889c327
 	}
934328d8
 	if goruntime.GOOS == "solaris" {
 		args = append(args, "--shim", "containerd-shim", "--runtime", "runc")
 	} else {
 		args = append(args, "--shim", "docker-containerd-shim")
 		if r.runtime != "" {
 			args = append(args, "--runtime")
 			args = append(args, r.runtime)
 		}
7b2e5216
 	}
9c4570a9
 	if r.debugLog {
42f9c25b
 		args = append(args, "--debug")
7ed3d265
 	}
 	if len(r.runtimeArgs) > 0 {
 		for _, v := range r.runtimeArgs {
 			args = append(args, "--runtime-args")
 			args = append(args, v)
 		}
5231c553
 		logrus.Debugf("libcontainerd: runContainerdDaemon: runtimeArgs: %s", args)
9c4570a9
 	}
36540170
 
9c4570a9
 	cmd := exec.Command(containerdBinary, args...)
d9c3b653
 	// redirect containerd logs to docker logs
 	cmd.Stdout = os.Stdout
 	cmd.Stderr = os.Stderr
934328d8
 	cmd.SysProcAttr = setSysProcAttr(true)
36540170
 	cmd.Env = nil
 	// clear the NOTIFY_SOCKET from the env when starting containerd
 	for _, e := range os.Environ() {
 		if !strings.HasPrefix(e, "NOTIFY_SOCKET") {
 			cmd.Env = append(cmd.Env, e)
 		}
 	}
9c4570a9
 	if err := cmd.Start(); err != nil {
 		return err
 	}
5231c553
 	logrus.Infof("libcontainerd: new containerd process, pid: %d", cmd.Process.Pid)
a894aec8
 	if err := setOOMScore(cmd.Process.Pid, r.oomScore); err != nil {
8c1ac816
 		system.KillProcess(cmd.Process.Pid)
a894aec8
 		return err
 	}
9c4570a9
 	if _, err := f.WriteString(fmt.Sprintf("%d", cmd.Process.Pid)); err != nil {
8c1ac816
 		system.KillProcess(cmd.Process.Pid)
9c4570a9
 		return err
 	}
 
ce160b37
 	r.daemonWaitCh = make(chan struct{})
 	go func() {
 		cmd.Wait()
 		close(r.daemonWaitCh)
 	}() // Reap our child when needed
9c4570a9
 	r.daemonPid = cmd.Process.Pid
 	return nil
 }
 
 // WithRemoteAddr sets the external containerd socket to connect to.
 func WithRemoteAddr(addr string) RemoteOption {
 	return rpcAddr(addr)
 }
 
 type rpcAddr string
 
 func (a rpcAddr) Apply(r Remote) error {
 	if remote, ok := r.(*remote); ok {
 		remote.rpcAddr = string(a)
 		return nil
 	}
 	return fmt.Errorf("WithRemoteAddr option not supported for this remote")
 }
 
7b2e5216
 // WithRuntimePath sets the path of the runtime to be used as the
 // default by containerd
 func WithRuntimePath(rt string) RemoteOption {
 	return runtimePath(rt)
 }
 
 type runtimePath string
 
 func (rt runtimePath) Apply(r Remote) error {
 	if remote, ok := r.(*remote); ok {
 		remote.runtime = string(rt)
 		return nil
 	}
 	return fmt.Errorf("WithRuntime option not supported for this remote")
 }
 
7ed3d265
 // WithRuntimeArgs sets the list of runtime args passed to containerd
 func WithRuntimeArgs(args []string) RemoteOption {
 	return runtimeArgs(args)
 }
 
 type runtimeArgs []string
 
 func (rt runtimeArgs) Apply(r Remote) error {
 	if remote, ok := r.(*remote); ok {
 		remote.runtimeArgs = rt
 		return nil
 	}
 	return fmt.Errorf("WithRuntimeArgs option not supported for this remote")
 }
 
9c4570a9
 // WithStartDaemon defines if libcontainerd should also run containerd daemon.
 func WithStartDaemon(start bool) RemoteOption {
 	return startDaemon(start)
 }
 
 type startDaemon bool
 
 func (s startDaemon) Apply(r Remote) error {
 	if remote, ok := r.(*remote); ok {
 		remote.startDaemon = bool(s)
 		return nil
 	}
 	return fmt.Errorf("WithStartDaemon option not supported for this remote")
 }
 
 // WithDebugLog defines if containerd debug logs will be enabled for daemon.
 func WithDebugLog(debug bool) RemoteOption {
 	return debugLog(debug)
 }
 
 type debugLog bool
 
 func (d debugLog) Apply(r Remote) error {
 	if remote, ok := r.(*remote); ok {
 		remote.debugLog = bool(d)
 		return nil
 	}
 	return fmt.Errorf("WithDebugLog option not supported for this remote")
 }
d705dab1
 
 // WithLiveRestore defines if containers are stopped on shutdown or restored.
 func WithLiveRestore(v bool) RemoteOption {
 	return liveRestore(v)
 }
 
 type liveRestore bool
 
 func (l liveRestore) Apply(r Remote) error {
 	if remote, ok := r.(*remote); ok {
 		remote.liveRestore = bool(l)
 		for _, c := range remote.clients {
 			c.liveRestore = bool(l)
 		}
 		return nil
 	}
 	return fmt.Errorf("WithLiveRestore option not supported for this remote")
 }
a894aec8
 
 // WithOOMScore defines the oom_score_adj to set for the containerd process.
 func WithOOMScore(score int) RemoteOption {
 	return oomScore(score)
 }
 
 type oomScore int
 
 func (o oomScore) Apply(r Remote) error {
 	if remote, ok := r.(*remote); ok {
 		remote.oomScore = int(o)
 		return nil
 	}
 	return fmt.Errorf("WithOOMScore option not supported for this remote")
 }