9c4570a9 |
package libcontainerd
import (
"fmt"
"os"
"strings"
"sync"
"syscall" |
d705dab1 |
"time" |
9c4570a9 |
"github.com/Sirupsen/logrus"
containerd "github.com/docker/containerd/api/grpc/types" |
6f2658fb |
"github.com/docker/docker/pkg/ioutils" |
9c4570a9 |
"github.com/docker/docker/pkg/mount" |
51f21a16 |
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp" |
041e5a21 |
specs "github.com/opencontainers/runtime-spec/specs-go" |
9c4570a9 |
"golang.org/x/net/context"
)
type client struct {
clientCommon
// Platform specific properties below here.
remote *remote
q queue
exitNotifiers map[string]*exitNotifier |
d705dab1 |
liveRestore bool |
9c4570a9 |
}
|
2790ac68 |
// GetServerVersion returns the connected server version information
func (clnt *client) GetServerVersion(ctx context.Context) (*ServerVersion, error) {
resp, err := clnt.remote.apiClient.GetServerVersion(ctx, &containerd.GetServerVersionRequest{})
if err != nil {
return nil, err
}
sv := &ServerVersion{
GetServerVersionResponse: *resp,
}
return sv, nil
}
|
18083481 |
// AddProcess is the handler for adding a process to an already running
// container. It's called through docker exec. It returns the system pid of the
// exec'd process. |
37a3be24 |
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) (int, error) { |
9c4570a9 |
clnt.lock(containerID)
defer clnt.unlock(containerID)
container, err := clnt.getContainer(containerID)
if err != nil { |
18083481 |
return -1, err |
9c4570a9 |
}
spec, err := container.spec()
if err != nil { |
18083481 |
return -1, err |
9c4570a9 |
}
sp := spec.Process
sp.Args = specp.Args
sp.Terminal = specp.Terminal |
e03bf122 |
if len(specp.Env) > 0 { |
9c4570a9 |
sp.Env = specp.Env
}
if specp.Cwd != nil {
sp.Cwd = *specp.Cwd
}
if specp.User != nil {
sp.User = specs.User{
UID: specp.User.UID,
GID: specp.User.GID,
AdditionalGids: specp.User.AdditionalGids,
}
}
if specp.Capabilities != nil {
sp.Capabilities = specp.Capabilities
}
p := container.newProcess(processFriendlyName)
r := &containerd.AddProcessRequest{
Args: sp.Args,
Cwd: sp.Cwd,
Terminal: sp.Terminal,
Id: containerID,
Env: sp.Env,
User: &containerd.User{
Uid: sp.User.UID,
Gid: sp.User.GID,
AdditionalGids: sp.User.AdditionalGids,
},
Pid: processFriendlyName,
Stdin: p.fifo(syscall.Stdin),
Stdout: p.fifo(syscall.Stdout),
Stderr: p.fifo(syscall.Stderr),
Capabilities: sp.Capabilities,
ApparmorProfile: sp.ApparmorProfile,
SelinuxLabel: sp.SelinuxLabel,
NoNewPrivileges: sp.NoNewPrivileges, |
8891afd8 |
Rlimits: convertRlimits(sp.Rlimits), |
9c4570a9 |
}
iopipe, err := p.openFifos(sp.Terminal)
if err != nil { |
18083481 |
return -1, err |
9c4570a9 |
}
|
18083481 |
resp, err := clnt.remote.apiClient.AddProcess(ctx, r)
if err != nil { |
9c4570a9 |
p.closeFifos(iopipe) |
18083481 |
return -1, err |
9c4570a9 |
}
|
6f2658fb |
var stdinOnce sync.Once
stdin := iopipe.Stdin
iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
var err error
stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
err = stdin.Close()
if err2 := p.sendCloseStdin(); err == nil {
err = err2
}
})
return err
})
|
9c4570a9 |
container.processes[processFriendlyName] = p
|
37a3be24 |
if err := attachStdio(*iopipe); err != nil { |
6f2658fb |
p.closeFifos(iopipe) |
18083481 |
return -1, err |
9c4570a9 |
}
|
18083481 |
return int(resp.SystemPid), nil |
9c4570a9 |
}
|
b6c7becb |
func (clnt *client) SignalProcess(containerID string, pid string, sig int) error {
clnt.lock(containerID)
defer clnt.unlock(containerID)
_, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
Id: containerID,
Pid: pid,
Signal: uint32(sig),
})
return err
}
|
9c4570a9 |
func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error {
clnt.lock(containerID)
defer clnt.unlock(containerID)
if _, err := clnt.getContainer(containerID); err != nil {
return err
}
_, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
Id: containerID,
Pid: processFriendlyName,
Width: uint32(width),
Height: uint32(height),
})
return err
}
func (clnt *client) Pause(containerID string) error {
return clnt.setState(containerID, StatePause)
}
func (clnt *client) setState(containerID, state string) error {
clnt.lock(containerID)
container, err := clnt.getContainer(containerID)
if err != nil {
clnt.unlock(containerID)
return err
}
if container.systemPid == 0 {
clnt.unlock(containerID)
return fmt.Errorf("No active process for container %s", containerID)
}
st := "running"
if state == StatePause {
st = "paused"
}
chstate := make(chan struct{})
_, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
Id: containerID,
Pid: InitFriendlyName,
Status: st,
})
if err != nil {
clnt.unlock(containerID)
return err
}
container.pauseMonitor.append(state, chstate)
clnt.unlock(containerID)
<-chstate
return nil
}
func (clnt *client) Resume(containerID string) error {
return clnt.setState(containerID, StateResume)
}
func (clnt *client) Stats(containerID string) (*Stats, error) {
resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID})
if err != nil {
return nil, err
}
return (*Stats)(resp), nil
}
|
31358745 |
// Take care of the old 1.11.0 behavior in case the version upgrade |
e3490cdc |
// happened without a clean daemon shutdown |
31358745 |
func (clnt *client) cleanupOldRootfs(containerID string) {
// Unmount and delete the bundle folder
if mts, err := mount.GetMounts(); err == nil {
for _, mts := range mts {
if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") {
if err := syscall.Unmount(mts.Mountpoint, syscall.MNT_DETACH); err == nil {
os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs"))
}
break
}
}
}
}
|
64483c3b |
func (clnt *client) setExited(containerID string, exitCode uint32) error { |
9c4570a9 |
clnt.lock(containerID)
defer clnt.unlock(containerID)
err := clnt.backend.StateChanged(containerID, StateInfo{ |
818a5198 |
CommonStateInfo: CommonStateInfo{
State: StateExit,
ExitCode: exitCode,
}}) |
9c4570a9 |
|
31358745 |
clnt.cleanupOldRootfs(containerID) |
9c4570a9 |
return err
}
func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) {
cont, err := clnt.getContainerdContainer(containerID)
if err != nil {
return nil, err
}
pids := make([]int, len(cont.Pids))
for i, p := range cont.Pids {
pids[i] = int(p)
}
return pids, nil
}
|
52237787 |
// Summary returns a summary of the processes running in a container.
// This is a no-op on Linux.
func (clnt *client) Summary(containerID string) ([]Summary, error) {
return nil, nil
}
|
9c4570a9 |
func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) {
resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID})
if err != nil {
return nil, err
}
for _, cont := range resp.Containers {
if cont.Id == containerID {
return cont, nil
}
}
return nil, fmt.Errorf("invalid state response")
}
func (clnt *client) UpdateResources(containerID string, resources Resources) error {
clnt.lock(containerID)
defer clnt.unlock(containerID)
container, err := clnt.getContainer(containerID)
if err != nil {
return err
}
if container.systemPid == 0 {
return fmt.Errorf("No active process for container %s", containerID)
}
_, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
Id: containerID,
Pid: InitFriendlyName,
Resources: (*containerd.UpdateResource)(&resources),
})
if err != nil {
return err
}
return nil
}
func (clnt *client) getExitNotifier(containerID string) *exitNotifier {
clnt.mapMutex.RLock()
defer clnt.mapMutex.RUnlock()
return clnt.exitNotifiers[containerID]
}
func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
clnt.mapMutex.Lock()
w, ok := clnt.exitNotifiers[containerID]
defer clnt.mapMutex.Unlock()
if !ok {
w = &exitNotifier{c: make(chan struct{}), client: clnt}
clnt.exitNotifiers[containerID] = w
}
return w
}
|
37a3be24 |
func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, attachStdio StdioCallback, options ...CreateOption) (err error) { |
31e903b0 |
clnt.lock(cont.Id)
defer clnt.unlock(cont.Id)
|
5231c553 |
logrus.Debugf("libcontainerd: restore container %s state %s", cont.Id, cont.Status) |
31e903b0 |
containerID := cont.Id
if _, err := clnt.getContainer(containerID); err == nil {
return fmt.Errorf("container %s is already active", containerID)
}
defer func() {
if err != nil {
clnt.deleteContainer(cont.Id)
}
}()
container := clnt.newContainer(cont.BundlePath, options...)
container.systemPid = systemPid(cont)
var terminal bool
for _, p := range cont.Processes {
if p.Pid == InitFriendlyName {
terminal = p.Terminal
}
}
iopipe, err := container.openFifos(terminal)
if err != nil {
return err
} |
6f2658fb |
var stdinOnce sync.Once
stdin := iopipe.Stdin
iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
var err error
stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
err = stdin.Close()
})
return err
}) |
31e903b0 |
|
37a3be24 |
if err := attachStdio(*iopipe); err != nil { |
6f2658fb |
container.closeFifos(iopipe) |
31e903b0 |
return err
}
clnt.appendContainer(container)
err = clnt.backend.StateChanged(containerID, StateInfo{
CommonStateInfo: CommonStateInfo{
State: StateRestore,
Pid: container.systemPid,
}})
if err != nil { |
6f2658fb |
container.closeFifos(iopipe) |
31e903b0 |
return err
}
|
64483c3b |
if lastEvent != nil { |
31e903b0 |
// This should only be a pause or resume event |
64483c3b |
if lastEvent.Type == StatePause || lastEvent.Type == StateResume { |
31e903b0 |
return clnt.backend.StateChanged(containerID, StateInfo{
CommonStateInfo: CommonStateInfo{ |
64483c3b |
State: lastEvent.Type, |
31e903b0 |
Pid: container.systemPid,
}})
}
|
5231c553 |
logrus.Warnf("libcontainerd: unexpected backlog event: %#v", lastEvent) |
31e903b0 |
}
return nil
}
|
51f21a16 |
func (clnt *client) getContainerLastEventSinceTime(id string, tsp *timestamp.Timestamp) (*containerd.Event, error) { |
64483c3b |
er := &containerd.EventsRequest{ |
51f21a16 |
Timestamp: tsp, |
64483c3b |
StoredOnly: true, |
51f21a16 |
Id: id, |
64483c3b |
}
events, err := clnt.remote.apiClient.Events(context.Background(), er)
if err != nil {
logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err)
return nil, err
}
var ev *containerd.Event
for {
e, err := events.Recv()
if err != nil {
if err.Error() == "EOF" {
break |
d705dab1 |
} |
51f21a16 |
logrus.Errorf("libcontainerd: failed to get container event for %s: %q", id, err) |
64483c3b |
return nil, err
} |
5e5d02b9 |
ev = e
logrus.Debugf("libcontainerd: received past event %#v", ev) |
d705dab1 |
}
|
64483c3b |
return ev, nil
}
|
51f21a16 |
func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error) {
ev, err := clnt.getContainerLastEventSinceTime(id, clnt.remote.restoreFromTimestamp)
if err == nil && ev == nil {
// If ev is nil and the container is running in containerd,
// we already consumed all the event of the
// container, included the "exit" one.
// Thus, we request all events containerd has in memory for
// this container in order to get the last one (which should
// be an exit event)
logrus.Warnf("libcontainerd: client is out of sync, restore was called on a fully synced container (%s).", id)
// Request all events since beginning of time
t := time.Unix(0, 0)
tsp, err := ptypes.TimestampProto(t)
if err != nil {
logrus.Errorf("libcontainerd: getLastEventSinceTime() failed to convert timestamp: %q", err)
return nil, err
}
return clnt.getContainerLastEventSinceTime(id, tsp)
}
return ev, err
}
|
37a3be24 |
func (clnt *client) Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error { |
64483c3b |
// Synchronize with live events
clnt.remote.Lock()
defer clnt.remote.Unlock()
// Check that containerd still knows this container.
//
// In the unlikely event that Restore for this container process
// the its past event before the main loop, the event will be
// processed twice. However, this is not an issue as all those
// events will do is change the state of the container to be
// exactly the same. |
31e903b0 |
cont, err := clnt.getContainerdContainer(containerID) |
64483c3b |
// Get its last event
ev, eerr := clnt.getContainerLastEvent(containerID)
if err != nil || cont.Status == "Stopped" { |
5e5d02b9 |
if err != nil {
logrus.Warnf("libcontainerd: failed to retrieve container %s state: %v", containerID, err) |
64483c3b |
} |
32e0ea34 |
if ev != nil && (ev.Pid != InitFriendlyName || ev.Type != StateExit) { |
5e5d02b9 |
// Wait a while for the exit event
timeout := time.NewTimer(10 * time.Second)
tick := time.NewTicker(100 * time.Millisecond)
stop:
for {
select {
case <-timeout.C:
break stop
case <-tick.C:
ev, eerr = clnt.getContainerLastEvent(containerID)
if eerr != nil {
break stop
}
if ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit {
break stop
}
} |
2650b1b6 |
} |
5e5d02b9 |
timeout.Stop()
tick.Stop() |
64483c3b |
}
|
5e5d02b9 |
// get the exit status for this container, if we don't have
// one, indicate an error
ec := uint32(255)
if eerr == nil && ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit { |
64483c3b |
ec = ev.Status
}
clnt.setExited(containerID, ec)
return nil
}
// container is still alive
if clnt.liveRestore { |
37a3be24 |
if err := clnt.restore(cont, ev, attachStdio, options...); err != nil { |
5231c553 |
logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err) |
64483c3b |
}
return nil
}
// Kill the container if liveRestore == false
w := clnt.getOrCreateExitNotifier(containerID)
clnt.lock(cont.Id)
container := clnt.newContainer(cont.BundlePath)
container.systemPid = systemPid(cont)
clnt.appendContainer(container)
clnt.unlock(cont.Id)
container.discardFifos()
if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil { |
5231c553 |
logrus.Errorf("libcontainerd: error sending sigterm to %v: %v", containerID, err) |
64483c3b |
} |
ac068a1f |
// Let the main loop handle the exit event
clnt.remote.Unlock() |
64483c3b |
select {
case <-time.After(10 * time.Second):
if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil { |
5231c553 |
logrus.Errorf("libcontainerd: error sending sigkill to %v: %v", containerID, err) |
d705dab1 |
}
select { |
64483c3b |
case <-time.After(2 * time.Second): |
d705dab1 |
case <-w.wait(): |
ac068a1f |
// relock because of the defer
clnt.remote.Lock() |
d705dab1 |
return nil |
31e903b0 |
} |
64483c3b |
case <-w.wait(): |
ac068a1f |
// relock because of the defer
clnt.remote.Lock() |
64483c3b |
return nil |
31e903b0 |
} |
c75de8e3 |
// relock because of the defer
clnt.remote.Lock() |
d705dab1 |
clnt.deleteContainer(containerID)
|
64483c3b |
return clnt.setExited(containerID, uint32(255)) |
31e903b0 |
}
|
d8fef66b |
func (clnt *client) CreateCheckpoint(containerID string, checkpointID string, checkpointDir string, exit bool) error {
clnt.lock(containerID)
defer clnt.unlock(containerID)
if _, err := clnt.getContainer(containerID); err != nil {
return err
}
_, err := clnt.remote.apiClient.CreateCheckpoint(context.Background(), &containerd.CreateCheckpointRequest{
Id: containerID,
Checkpoint: &containerd.Checkpoint{
Name: checkpointID,
Exit: exit,
Tcp: true,
UnixSockets: true,
Shell: false,
EmptyNS: []string{"network"},
},
CheckpointDir: checkpointDir,
})
return err
}
func (clnt *client) DeleteCheckpoint(containerID string, checkpointID string, checkpointDir string) error {
clnt.lock(containerID)
defer clnt.unlock(containerID)
if _, err := clnt.getContainer(containerID); err != nil {
return err
}
_, err := clnt.remote.apiClient.DeleteCheckpoint(context.Background(), &containerd.DeleteCheckpointRequest{
Id: containerID,
Name: checkpointID,
CheckpointDir: checkpointDir,
})
return err
}
func (clnt *client) ListCheckpoints(containerID string, checkpointDir string) (*Checkpoints, error) {
clnt.lock(containerID)
defer clnt.unlock(containerID)
if _, err := clnt.getContainer(containerID); err != nil {
return nil, err
}
resp, err := clnt.remote.apiClient.ListCheckpoint(context.Background(), &containerd.ListCheckpointRequest{
Id: containerID,
CheckpointDir: checkpointDir,
})
if err != nil {
return nil, err
}
return (*Checkpoints)(resp), nil
} |