libcontainerd/remote_daemon.go
ddae20c0
 // +build !windows
 
 package libcontainerd
 
 import (
 	"context"
 	"fmt"
 	"io"
 	"io/ioutil"
 	"os"
 	"os/exec"
 	"path/filepath"
 	"strconv"
 	"strings"
 	"sync"
 	"syscall"
 	"time"
 
 	"github.com/BurntSushi/toml"
 	"github.com/containerd/containerd"
 	"github.com/containerd/containerd/server"
 	"github.com/docker/docker/pkg/system"
 	"github.com/pkg/errors"
 	"github.com/sirupsen/logrus"
 )
 
 const (
 	maxConnectionRetryCount = 3
 	healthCheckTimeout      = 3 * time.Second
 	shutdownTimeout         = 15 * time.Second
 	configFile              = "containerd.toml"
 	binaryName              = "docker-containerd"
 	pidFile                 = "docker-containerd.pid"
 )
 
 type pluginConfigs struct {
 	Plugins map[string]interface{} `toml:"plugins"`
 }
 
 type remote struct {
 	sync.RWMutex
 	server.Config
 
 	daemonPid int
 	logger    *logrus.Entry
 
 	daemonWaitCh    chan struct{}
 	clients         []*client
 	shutdownContext context.Context
 	shutdownCancel  context.CancelFunc
 	shutdown        bool
 
 	// Options
 	startDaemon bool
 	rootDir     string
 	stateDir    string
 	snapshotter string
 	pluginConfs pluginConfigs
 }
 
 // New creates a fresh instance of libcontainerd remote.
 func New(rootDir, stateDir string, options ...RemoteOption) (rem Remote, err error) {
 	defer func() {
 		if err != nil {
 			err = errors.Wrap(err, "Failed to connect to containerd")
 		}
 	}()
 
 	r := &remote{
 		rootDir:  rootDir,
 		stateDir: stateDir,
 		Config: server.Config{
 			Root:  filepath.Join(rootDir, "daemon"),
 			State: filepath.Join(stateDir, "daemon"),
 		},
 		pluginConfs: pluginConfigs{make(map[string]interface{})},
 		daemonPid:   -1,
 		logger:      logrus.WithField("module", "libcontainerd"),
 	}
 	r.shutdownContext, r.shutdownCancel = context.WithCancel(context.Background())
 
 	rem = r
 	for _, option := range options {
 		if err = option.Apply(r); err != nil {
 			return
 		}
 	}
 	r.setDefaults()
 
 	if err = system.MkdirAll(stateDir, 0700, ""); err != nil {
 		return
 	}
 
 	if r.startDaemon {
 		os.Remove(r.GRPC.Address)
 		if err = r.startContainerd(); err != nil {
 			return
 		}
 		defer func() {
 			if err != nil {
 				r.Cleanup()
 			}
 		}()
 	}
 
 	// This connection is just used to monitor the connection
 	client, err := containerd.New(r.GRPC.Address)
 	if err != nil {
 		return
 	}
 	if _, err := client.Version(context.Background()); err != nil {
 		system.KillProcess(r.daemonPid)
 		return nil, errors.Wrapf(err, "unable to get containerd version")
 	}
 
 	go r.monitorConnection(client)
 
 	return r, nil
 }
 
 func (r *remote) NewClient(ns string, b Backend) (Client, error) {
 	c := &client{
 		stateDir:   r.stateDir,
 		logger:     r.logger.WithField("namespace", ns),
 		namespace:  ns,
 		backend:    b,
 		containers: make(map[string]*container),
 	}
 
 	rclient, err := containerd.New(r.GRPC.Address, containerd.WithDefaultNamespace(ns))
 	if err != nil {
 		return nil, err
 	}
 	c.remote = rclient
 
 	go c.processEventStream(r.shutdownContext)
 
 	r.Lock()
 	r.clients = append(r.clients, c)
 	r.Unlock()
 	return c, nil
 }
 
 func (r *remote) Cleanup() {
 	if r.daemonPid != -1 {
 		r.shutdownCancel()
 		r.stopDaemon()
 	}
 
 	// cleanup some files
 	os.Remove(filepath.Join(r.stateDir, pidFile))
 
 	r.platformCleanup()
 }
 
 func (r *remote) getContainerdPid() (int, error) {
 	pidFile := filepath.Join(r.stateDir, pidFile)
 	f, err := os.OpenFile(pidFile, os.O_RDWR, 0600)
 	if err != nil {
 		if os.IsNotExist(err) {
 			return -1, nil
 		}
 		return -1, err
 	}
 	defer f.Close()
 
 	b := make([]byte, 8)
 	n, err := f.Read(b)
 	if err != nil && err != io.EOF {
 		return -1, err
 	}
 
 	if n > 0 {
 		pid, err := strconv.ParseUint(string(b[:n]), 10, 64)
 		if err != nil {
 			return -1, err
 		}
 		if system.IsProcessAlive(int(pid)) {
 			return int(pid), nil
 		}
 	}
 
 	return -1, nil
 }
 
 func (r *remote) getContainerdConfig() (string, error) {
 	path := filepath.Join(r.stateDir, configFile)
 	f, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
 	if err != nil {
 		return "", errors.Wrapf(err, "failed to open containerd config file at %s", path)
 	}
 	defer f.Close()
 
 	enc := toml.NewEncoder(f)
 	if err = enc.Encode(r.Config); err != nil {
 		return "", errors.Wrapf(err, "failed to encode general config")
 	}
 	if err = enc.Encode(r.pluginConfs); err != nil {
 		return "", errors.Wrapf(err, "failed to encode plugin configs")
 	}
 
 	return path, nil
 }
 
 func (r *remote) startContainerd() error {
 	pid, err := r.getContainerdPid()
 	if err != nil {
 		return err
 	}
 
 	if pid != -1 {
 		r.daemonPid = pid
 		logrus.WithField("pid", pid).
 			Infof("libcontainerd: %s is still running", binaryName)
 		return nil
 	}
 
 	configFile, err := r.getContainerdConfig()
 	if err != nil {
 		return err
 	}
 
 	args := []string{"--config", configFile}
 	cmd := exec.Command(binaryName, args...)
 	// redirect containerd logs to docker logs
 	cmd.Stdout = os.Stdout
 	cmd.Stderr = os.Stderr
 	cmd.SysProcAttr = containerdSysProcAttr()
 	// clear the NOTIFY_SOCKET from the env when starting containerd
 	cmd.Env = nil
 	for _, e := range os.Environ() {
 		if !strings.HasPrefix(e, "NOTIFY_SOCKET") {
 			cmd.Env = append(cmd.Env, e)
 		}
 	}
 	if err := cmd.Start(); err != nil {
 		return err
 	}
 
 	r.daemonWaitCh = make(chan struct{})
 	go func() {
 		// Reap our child when needed
 		if err := cmd.Wait(); err != nil {
 			r.logger.WithError(err).Errorf("containerd did not exit successfully")
 		}
 		close(r.daemonWaitCh)
 	}()
 
 	r.daemonPid = cmd.Process.Pid
 
 	err = ioutil.WriteFile(filepath.Join(r.stateDir, pidFile), []byte(fmt.Sprintf("%d", r.daemonPid)), 0660)
 	if err != nil {
 		system.KillProcess(r.daemonPid)
 		return errors.Wrap(err, "libcontainerd: failed to save daemon pid to disk")
 	}
 
 	logrus.WithField("pid", r.daemonPid).
 		Infof("libcontainerd: started new %s process", binaryName)
 
 	return nil
 }
 
 func (r *remote) monitorConnection(client *containerd.Client) {
 	var transientFailureCount = 0
 
 	ticker := time.NewTicker(500 * time.Millisecond)
 	defer ticker.Stop()
 
 	for {
 		<-ticker.C
 		ctx, cancel := context.WithTimeout(r.shutdownContext, healthCheckTimeout)
 		_, err := client.IsServing(ctx)
 		cancel()
 		if err == nil {
 			transientFailureCount = 0
 			continue
 		}
 
 		select {
 		case <-r.shutdownContext.Done():
5c3418e3
 			r.logger.Info("stopping healthcheck following graceful shutdown")
ddae20c0
 			client.Close()
 			return
 		default:
 		}
 
 		r.logger.WithError(err).WithField("binary", binaryName).Debug("daemon is not responding")
 
 		if r.daemonPid != -1 {
 			transientFailureCount++
 			if transientFailureCount >= maxConnectionRetryCount || !system.IsProcessAlive(r.daemonPid) {
 				transientFailureCount = 0
 				if system.IsProcessAlive(r.daemonPid) {
 					r.logger.WithField("pid", r.daemonPid).Info("killing and restarting containerd")
 					// Try to get a stack trace
 					syscall.Kill(r.daemonPid, syscall.SIGUSR1)
 					<-time.After(100 * time.Millisecond)
 					system.KillProcess(r.daemonPid)
 				}
 				<-r.daemonWaitCh
 				var err error
 				client.Close()
 				os.Remove(r.GRPC.Address)
 				if err = r.startContainerd(); err != nil {
 					r.logger.WithError(err).Error("failed restarting containerd")
 				} else {
 					newClient, err := containerd.New(r.GRPC.Address)
 					if err != nil {
 						r.logger.WithError(err).Error("failed connect to containerd")
 					} else {
 						client = newClient
 					}
 				}
 			}
 		}
 	}
 }