934328d8 |
// +build linux solaris
|
9c4570a9 |
package libcontainerd
import (
"fmt"
"io" |
b0280c37 |
"io/ioutil"
"log" |
9c4570a9 |
"net"
"os"
"os/exec"
"path/filepath" |
934328d8 |
goruntime "runtime" |
9c4570a9 |
"strconv" |
36540170 |
"strings" |
9c4570a9 |
"sync"
"syscall"
"time"
"github.com/Sirupsen/logrus"
containerd "github.com/docker/containerd/api/grpc/types" |
a7851e25 |
"github.com/docker/docker/pkg/locker" |
8c1ac816 |
"github.com/docker/docker/pkg/system" |
29b27145 |
"github.com/golang/protobuf/ptypes" |
64483c3b |
"github.com/golang/protobuf/ptypes/timestamp" |
9c4570a9 |
"golang.org/x/net/context"
"google.golang.org/grpc" |
b0280c37 |
"google.golang.org/grpc/grpclog" |
e4ddcb37 |
"google.golang.org/grpc/health/grpc_health_v1" |
a02ae66d |
"google.golang.org/grpc/transport" |
9c4570a9 |
)
const ( |
e4ddcb37 |
maxConnectionRetryCount = 3
containerdHealthCheckTimeout = 3 * time.Second
containerdShutdownTimeout = 15 * time.Second
containerdBinary = "docker-containerd"
containerdPidFilename = "docker-containerd.pid"
containerdSockFilename = "docker-containerd.sock"
containerdStateDir = "containerd"
eventTimestampFilename = "event.ts" |
9c4570a9 |
)
type remote struct {
sync.RWMutex |
64483c3b |
apiClient containerd.APIClient
daemonPid int
stateDir string
rpcAddr string
startDaemon bool
closeManually bool
debugLog bool
rpcConn *grpc.ClientConn
clients []*client
eventTsPath string
runtime string
runtimeArgs []string
daemonWaitCh chan struct{}
liveRestore bool
oomScore int
restoreFromTimestamp *timestamp.Timestamp |
9c4570a9 |
}
// New creates a fresh instance of libcontainerd remote.
func New(stateDir string, options ...RemoteOption) (_ Remote, err error) {
defer func() {
if err != nil { |
2d08a764 |
err = fmt.Errorf("Failed to connect to containerd. Please make sure containerd is installed in your PATH or you have specified the correct address. Got error: %v", err) |
9c4570a9 |
}
}()
r := &remote{
stateDir: stateDir,
daemonPid: -1,
eventTsPath: filepath.Join(stateDir, eventTimestampFilename),
}
for _, option := range options {
if err := option.Apply(r); err != nil {
return nil, err
}
}
|
8c1ac816 |
if err := system.MkdirAll(stateDir, 0700); err != nil { |
9c4570a9 |
return nil, err
}
if r.rpcAddr == "" {
r.rpcAddr = filepath.Join(stateDir, containerdSockFilename)
}
if r.startDaemon {
if err := r.runContainerdDaemon(); err != nil {
return nil, err
}
}
|
b0280c37 |
// don't output the grpc reconnect logging
grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags)) |
9c4570a9 |
dialOpts := append([]grpc.DialOption{grpc.WithInsecure()},
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
conn, err := grpc.Dial(r.rpcAddr, dialOpts...)
if err != nil {
return nil, fmt.Errorf("error connecting to containerd: %v", err)
}
r.rpcConn = conn
r.apiClient = containerd.NewAPIClient(conn)
|
64483c3b |
// Get the timestamp to restore from
t := r.getLastEventTimestamp()
tsp, err := ptypes.TimestampProto(t)
if err != nil {
logrus.Errorf("libcontainerd: failed to convert timestamp: %q", err)
}
r.restoreFromTimestamp = tsp
|
9c4570a9 |
go r.handleConnectionChange()
if err := r.startEventsMonitor(); err != nil {
return nil, err
}
return r, nil
}
|
d705dab1 |
func (r *remote) UpdateOptions(options ...RemoteOption) error {
for _, option := range options {
if err := option.Apply(r); err != nil {
return err
}
}
return nil
}
|
9c4570a9 |
func (r *remote) handleConnectionChange() {
var transientFailureCount = 0 |
e4ddcb37 |
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
healthClient := grpc_health_v1.NewHealthClient(r.rpcConn)
|
9c4570a9 |
for { |
e4ddcb37 |
<-ticker.C
ctx, cancel := context.WithTimeout(context.Background(), containerdHealthCheckTimeout)
_, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
cancel()
if err == nil {
continue |
9c4570a9 |
} |
e4ddcb37 |
logrus.Debugf("libcontainerd: containerd health check returned error: %v", err) |
9c4570a9 |
if r.daemonPid != -1 { |
e4ddcb37 |
if strings.Contains(err.Error(), "is closing") { |
9c4570a9 |
// Well, we asked for it to stop, just return
return
} |
e4ddcb37 |
// all other errors are transient
// Reset state to be notified of next failure
transientFailureCount++
if transientFailureCount >= maxConnectionRetryCount {
transientFailureCount = 0 |
8c1ac816 |
if system.IsProcessAlive(r.daemonPid) {
system.KillProcess(r.daemonPid) |
e4ddcb37 |
}
<-r.daemonWaitCh
if err := r.runContainerdDaemon(); err != nil { //FIXME: Handle error
logrus.Errorf("libcontainerd: error restarting containerd: %v", err)
}
continue
} |
9c4570a9 |
}
}
}
func (r *remote) Cleanup() {
if r.daemonPid == -1 {
return
} |
a02ae66d |
r.closeManually = true |
9c4570a9 |
r.rpcConn.Close()
// Ask the daemon to quit
syscall.Kill(r.daemonPid, syscall.SIGTERM)
// Wait up to 15secs for it to stop
for i := time.Duration(0); i < containerdShutdownTimeout; i += time.Second { |
8c1ac816 |
if !system.IsProcessAlive(r.daemonPid) { |
9c4570a9 |
break
}
time.Sleep(time.Second)
}
|
8c1ac816 |
if system.IsProcessAlive(r.daemonPid) { |
9c4570a9 |
logrus.Warnf("libcontainerd: containerd (%d) didn't stop within 15 secs, killing it\n", r.daemonPid)
syscall.Kill(r.daemonPid, syscall.SIGKILL)
}
// cleanup some files
os.Remove(filepath.Join(r.stateDir, containerdPidFilename))
os.Remove(filepath.Join(r.stateDir, containerdSockFilename))
}
func (r *remote) Client(b Backend) (Client, error) {
c := &client{
clientCommon: clientCommon{ |
a7851e25 |
backend: b,
containers: make(map[string]*container),
locker: locker.New(), |
9c4570a9 |
},
remote: r,
exitNotifiers: make(map[string]*exitNotifier), |
d705dab1 |
liveRestore: r.liveRestore, |
9c4570a9 |
}
r.Lock()
r.clients = append(r.clients, c)
r.Unlock()
return c, nil
}
func (r *remote) updateEventTimestamp(t time.Time) {
f, err := os.OpenFile(r.eventTsPath, syscall.O_CREAT|syscall.O_WRONLY|syscall.O_TRUNC, 0600)
if err != nil {
logrus.Warnf("libcontainerd: failed to open event timestamp file: %v", err)
return
} |
0ead6244 |
defer f.Close() |
9c4570a9 |
b, err := t.MarshalText()
if err != nil {
logrus.Warnf("libcontainerd: failed to encode timestamp: %v", err)
return
}
n, err := f.Write(b)
if err != nil || n != len(b) {
logrus.Warnf("libcontainerd: failed to update event timestamp file: %v", err)
f.Truncate(0)
return
}
}
|
29b27145 |
func (r *remote) getLastEventTimestamp() time.Time { |
9c4570a9 |
t := time.Now()
fi, err := os.Stat(r.eventTsPath) |
0d9b94c4 |
if os.IsNotExist(err) || fi.Size() == 0 { |
29b27145 |
return t |
9c4570a9 |
}
f, err := os.Open(r.eventTsPath)
if err != nil { |
44ccbb31 |
logrus.Warnf("libcontainerd: Unable to access last event ts: %v", err) |
29b27145 |
return t |
9c4570a9 |
} |
0ead6244 |
defer f.Close() |
9c4570a9 |
b := make([]byte, fi.Size())
n, err := f.Read(b)
if err != nil || n != len(b) { |
44ccbb31 |
logrus.Warnf("libcontainerd: Unable to read last event ts: %v", err) |
29b27145 |
return t |
9c4570a9 |
}
t.UnmarshalText(b)
|
29b27145 |
return t |
9c4570a9 |
}
func (r *remote) startEventsMonitor() error {
// First, get past events |
64483c3b |
t := r.getLastEventTimestamp()
tsp, err := ptypes.TimestampProto(t) |
29b27145 |
if err != nil {
logrus.Errorf("libcontainerd: failed to convert timestamp: %q", err)
} |
9c4570a9 |
er := &containerd.EventsRequest{ |
29b27145 |
Timestamp: tsp, |
9c4570a9 |
} |
5fb38324 |
events, err := r.apiClient.Events(context.Background(), er, grpc.FailFast(false)) |
9c4570a9 |
if err != nil {
return err
}
go r.handleEventStream(events)
return nil
}
func (r *remote) handleEventStream(events containerd.API_EventsClient) {
for {
e, err := events.Recv()
if err != nil { |
a02ae66d |
if grpc.ErrorDesc(err) == transport.ErrConnClosing.Desc &&
r.closeManually {
// ignore error if grpc remote connection is closed manually
return
} |
5231c553 |
logrus.Errorf("libcontainerd: failed to receive event from containerd: %v", err) |
9c4570a9 |
go r.startEventsMonitor()
return
}
|
5231c553 |
logrus.Debugf("libcontainerd: received containerd event: %#v", e) |
9c4570a9 |
|
29b27145 |
var container *container
var c *client
r.RLock()
for _, c = range r.clients {
container, err = c.getContainer(e.Id)
if err == nil {
break |
9c4570a9 |
} |
29b27145 |
}
r.RUnlock()
if container == nil { |
64483c3b |
logrus.Warnf("libcontainerd: unknown container %s", e.Id) |
29b27145 |
continue
}
if err := container.handleEvent(e); err != nil {
logrus.Errorf("libcontainerd: error processing state change for %s: %v", e.Id, err)
} |
9c4570a9 |
|
29b27145 |
tsp, err := ptypes.Timestamp(e.Timestamp)
if err != nil {
logrus.Errorf("libcontainerd: failed to convert event timestamp: %q", err)
continue |
9c4570a9 |
} |
29b27145 |
r.updateEventTimestamp(tsp) |
9c4570a9 |
}
}
func (r *remote) runContainerdDaemon() error {
pidFilename := filepath.Join(r.stateDir, containerdPidFilename)
f, err := os.OpenFile(pidFilename, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
} |
0ead6244 |
defer f.Close() |
9c4570a9 |
// File exist, check if the daemon is alive
b := make([]byte, 8)
n, err := f.Read(b)
if err != nil && err != io.EOF {
return err
}
if n > 0 {
pid, err := strconv.ParseUint(string(b[:n]), 10, 64)
if err != nil {
return err
} |
8c1ac816 |
if system.IsProcessAlive(int(pid)) { |
5231c553 |
logrus.Infof("libcontainerd: previous instance of containerd still alive (%d)", pid) |
9c4570a9 |
r.daemonPid = int(pid)
return nil
}
}
// rewind the file
_, err = f.Seek(0, os.SEEK_SET)
if err != nil {
return err
}
// Truncate it
err = f.Truncate(0)
if err != nil {
return err
}
// Start a new instance |
6889c327 |
args := []string{
"-l", fmt.Sprintf("unix://%s", r.rpcAddr),
"--metrics-interval=0", |
64a91ee7 |
"--start-timeout", "2m", |
8b5e5c61 |
"--state-dir", filepath.Join(r.stateDir, containerdStateDir), |
6889c327 |
} |
934328d8 |
if goruntime.GOOS == "solaris" {
args = append(args, "--shim", "containerd-shim", "--runtime", "runc")
} else {
args = append(args, "--shim", "docker-containerd-shim")
if r.runtime != "" {
args = append(args, "--runtime")
args = append(args, r.runtime)
} |
7b2e5216 |
} |
9c4570a9 |
if r.debugLog { |
42f9c25b |
args = append(args, "--debug") |
7ed3d265 |
}
if len(r.runtimeArgs) > 0 {
for _, v := range r.runtimeArgs {
args = append(args, "--runtime-args")
args = append(args, v)
} |
5231c553 |
logrus.Debugf("libcontainerd: runContainerdDaemon: runtimeArgs: %s", args) |
9c4570a9 |
} |
36540170 |
|
9c4570a9 |
cmd := exec.Command(containerdBinary, args...) |
d9c3b653 |
// redirect containerd logs to docker logs
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr |
934328d8 |
cmd.SysProcAttr = setSysProcAttr(true) |
36540170 |
cmd.Env = nil
// clear the NOTIFY_SOCKET from the env when starting containerd
for _, e := range os.Environ() {
if !strings.HasPrefix(e, "NOTIFY_SOCKET") {
cmd.Env = append(cmd.Env, e)
}
} |
9c4570a9 |
if err := cmd.Start(); err != nil {
return err
} |
5231c553 |
logrus.Infof("libcontainerd: new containerd process, pid: %d", cmd.Process.Pid) |
a894aec8 |
if err := setOOMScore(cmd.Process.Pid, r.oomScore); err != nil { |
8c1ac816 |
system.KillProcess(cmd.Process.Pid) |
a894aec8 |
return err
} |
9c4570a9 |
if _, err := f.WriteString(fmt.Sprintf("%d", cmd.Process.Pid)); err != nil { |
8c1ac816 |
system.KillProcess(cmd.Process.Pid) |
9c4570a9 |
return err
}
|
ce160b37 |
r.daemonWaitCh = make(chan struct{})
go func() {
cmd.Wait()
close(r.daemonWaitCh)
}() // Reap our child when needed |
9c4570a9 |
r.daemonPid = cmd.Process.Pid
return nil
}
// WithRemoteAddr sets the external containerd socket to connect to.
func WithRemoteAddr(addr string) RemoteOption {
return rpcAddr(addr)
}
type rpcAddr string
func (a rpcAddr) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.rpcAddr = string(a)
return nil
}
return fmt.Errorf("WithRemoteAddr option not supported for this remote")
}
|
7b2e5216 |
// WithRuntimePath sets the path of the runtime to be used as the
// default by containerd
func WithRuntimePath(rt string) RemoteOption {
return runtimePath(rt)
}
type runtimePath string
func (rt runtimePath) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.runtime = string(rt)
return nil
}
return fmt.Errorf("WithRuntime option not supported for this remote")
}
|
7ed3d265 |
// WithRuntimeArgs sets the list of runtime args passed to containerd
func WithRuntimeArgs(args []string) RemoteOption {
return runtimeArgs(args)
}
type runtimeArgs []string
func (rt runtimeArgs) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.runtimeArgs = rt
return nil
}
return fmt.Errorf("WithRuntimeArgs option not supported for this remote")
}
|
9c4570a9 |
// WithStartDaemon defines if libcontainerd should also run containerd daemon.
func WithStartDaemon(start bool) RemoteOption {
return startDaemon(start)
}
type startDaemon bool
func (s startDaemon) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.startDaemon = bool(s)
return nil
}
return fmt.Errorf("WithStartDaemon option not supported for this remote")
}
// WithDebugLog defines if containerd debug logs will be enabled for daemon.
func WithDebugLog(debug bool) RemoteOption {
return debugLog(debug)
}
type debugLog bool
func (d debugLog) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.debugLog = bool(d)
return nil
}
return fmt.Errorf("WithDebugLog option not supported for this remote")
} |
d705dab1 |
// WithLiveRestore defines if containers are stopped on shutdown or restored.
func WithLiveRestore(v bool) RemoteOption {
return liveRestore(v)
}
type liveRestore bool
func (l liveRestore) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.liveRestore = bool(l)
for _, c := range remote.clients {
c.liveRestore = bool(l)
}
return nil
}
return fmt.Errorf("WithLiveRestore option not supported for this remote")
} |
a894aec8 |
// WithOOMScore defines the oom_score_adj to set for the containerd process.
func WithOOMScore(score int) RemoteOption {
return oomScore(score)
}
type oomScore int
func (o oomScore) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.oomScore = int(o)
return nil
}
return fmt.Errorf("WithOOMScore option not supported for this remote")
} |