Browse code

Lock all calls to hcsshim to prevent close races

Signed-off-by: Darren Stahl <darst@microsoft.com>

Darren Stahl authored on 2016/08/16 08:51:45
Showing 7 changed files
... ...
@@ -121,11 +121,8 @@ func (daemon *Daemon) Kill(container *container.Container) error {
121 121
 			return nil
122 122
 		}
123 123
 
124
-		if container.IsRunning() {
125
-			container.WaitStop(2 * time.Second)
126
-			if container.IsRunning() {
127
-				return err
128
-			}
124
+		if _, err2 := container.WaitStop(2 * time.Second); err2 != nil {
125
+			return err
129 126
 		}
130 127
 	}
131 128
 
... ...
@@ -162,7 +162,9 @@ func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error {
162 162
 		if iop.Stdin != nil {
163 163
 			go func() {
164 164
 				io.Copy(iop.Stdin, stdin)
165
-				iop.Stdin.Close()
165
+				if err := iop.Stdin.Close(); err != nil {
166
+					logrus.Error(err)
167
+				}
166 168
 			}()
167 169
 		}
168 170
 	} else {
... ...
@@ -172,7 +174,9 @@ func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error {
172 172
 		if (c != nil && !c.Config.Tty) || (ec != nil && !ec.Tty && runtime.GOOS == "windows") {
173 173
 			// tty is enabled, so dont close containerd's iopipe stdin.
174 174
 			if iop.Stdin != nil {
175
-				iop.Stdin.Close()
175
+				if err := iop.Stdin.Close(); err != nil {
176
+					logrus.Error(err)
177
+				}
176 178
 			}
177 179
 		}
178 180
 	}
... ...
@@ -46,9 +46,21 @@ func (daemon *Daemon) containerStop(container *container.Container, seconds int)
46 46
 	stopSignal := container.StopSignal()
47 47
 	// 1. Send a stop signal
48 48
 	if err := daemon.killPossiblyDeadProcess(container, stopSignal); err != nil {
49
-		logrus.Infof("Failed to send signal %d to the process, force killing", stopSignal)
50
-		if err := daemon.killPossiblyDeadProcess(container, 9); err != nil {
51
-			return err
49
+		// While normally we might "return err" here we're not going to
50
+		// because if we can't stop the container by this point then
51
+		// its probably because its already stopped. Meaning, between
52
+		// the time of the IsRunning() call above and now it stopped.
53
+		// Also, since the err return will be environment specific we can't
54
+		// look for any particular (common) error that would indicate
55
+		// that the process is already dead vs something else going wrong.
56
+		// So, instead we'll give it up to 2 more seconds to complete and if
57
+		// by that time the container is still running, then the error
58
+		// we got is probably valid and so we force kill it.
59
+		if _, err := container.WaitStop(2 * time.Second); err != nil {
60
+			logrus.Infof("Container failed to stop after sending signal %d to the process, force killing", stopSignal)
61
+			if err := daemon.killPossiblyDeadProcess(container, 9); err != nil {
62
+				return err
63
+			}
52 64
 		}
53 65
 	}
54 66
 
... ...
@@ -29,9 +29,9 @@ func (clnt *client) appendContainer(cont *container) {
29 29
 	clnt.containers[cont.containerID] = cont
30 30
 	clnt.mapMutex.Unlock()
31 31
 }
32
-func (clnt *client) deleteContainer(friendlyName string) {
32
+func (clnt *client) deleteContainer(containerID string) {
33 33
 	clnt.mapMutex.Lock()
34
-	delete(clnt.containers, friendlyName)
34
+	delete(clnt.containers, containerID)
35 35
 	clnt.mapMutex.Unlock()
36 36
 }
37 37
 
... ...
@@ -38,6 +38,8 @@ const defaultOwner = "docker"
38 38
 // Create is the entrypoint to create a container from a spec, and if successfully
39 39
 // created, start it too.
40 40
 func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec Spec, options ...CreateOption) error {
41
+	clnt.lock(containerID)
42
+	defer clnt.unlock(containerID)
41 43
 	logrus.Debugln("libcontainerd: client.Create() with spec", spec)
42 44
 
43 45
 	configuration := &hcsshim.ContainerConfig{
... ...
@@ -220,6 +222,13 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
220 220
 		return err
221 221
 	}
222 222
 
223
+	pid := newProcess.Pid()
224
+	openedProcess, err := container.hcsContainer.OpenProcess(pid)
225
+	if err != nil {
226
+		logrus.Errorf("AddProcess %s OpenProcess() failed %s", containerID, err)
227
+		return err
228
+	}
229
+
223 230
 	stdin, stdout, stderr, err = newProcess.Stdio()
224 231
 	if err != nil {
225 232
 		logrus.Errorf("libcontainerd: %s getting std pipes failed %s", containerID, err)
... ...
@@ -237,8 +246,6 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
237 237
 		iopipe.Stderr = openReaderFromPipe(stderr)
238 238
 	}
239 239
 
240
-	pid := newProcess.Pid()
241
-
242 240
 	proc := &process{
243 241
 		processCommon: processCommon{
244 242
 			containerID:  containerID,
... ...
@@ -247,7 +254,7 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
247 247
 			systemPid:    uint32(pid),
248 248
 		},
249 249
 		commandLine: createProcessParms.CommandLine,
250
-		hcsProcess:  newProcess,
250
+		hcsProcess:  openedProcess,
251 251
 	}
252 252
 
253 253
 	// Add the process to the container's list of processes
... ...
@@ -279,7 +286,7 @@ func (clnt *client) Signal(containerID string, sig int) error {
279 279
 		err  error
280 280
 	)
281 281
 
282
-	// Get the container as we need it to find the pid of the process.
282
+	// Get the container as we need it to get the container handle.
283 283
 	clnt.lock(containerID)
284 284
 	defer clnt.unlock(containerID)
285 285
 	if cont, err = clnt.getContainer(containerID); err != nil {
... ...
@@ -35,6 +35,8 @@ func (ctr *container) newProcess(friendlyName string) *process {
35 35
 	}
36 36
 }
37 37
 
38
+// start starts a created container.
39
+// Caller needs to lock container ID before calling this method.
38 40
 func (ctr *container) start() error {
39 41
 	var err error
40 42
 	isServicing := false
... ...
@@ -77,7 +79,7 @@ func (ctr *container) start() error {
77 77
 	createProcessParms.CommandLine = strings.Join(ctr.ociSpec.Process.Args, " ")
78 78
 
79 79
 	// Start the command running in the container.
80
-	hcsProcess, err := ctr.hcsContainer.CreateProcess(createProcessParms)
80
+	newProcess, err := ctr.hcsContainer.CreateProcess(createProcessParms)
81 81
 	if err != nil {
82 82
 		logrus.Errorf("libcontainerd: CreateProcess() failed %s", err)
83 83
 		if err := ctr.terminate(); err != nil {
... ...
@@ -89,10 +91,21 @@ func (ctr *container) start() error {
89 89
 	}
90 90
 	ctr.startedAt = time.Now()
91 91
 
92
+	pid := newProcess.Pid()
93
+	openedProcess, err := ctr.hcsContainer.OpenProcess(pid)
94
+	if err != nil {
95
+		logrus.Errorf("OpenProcess() failed %s", err)
96
+		if err := ctr.terminate(); err != nil {
97
+			logrus.Errorf("Failed to cleanup after a failed OpenProcess. %s", err)
98
+		} else {
99
+			logrus.Debugln("Cleaned up after failed OpenProcess by calling Terminate")
100
+		}
101
+		return err
102
+	}
103
+
92 104
 	// Save the hcs Process and PID
93 105
 	ctr.process.friendlyName = InitFriendlyName
94
-	pid := hcsProcess.Pid()
95
-	ctr.process.hcsProcess = hcsProcess
106
+	ctr.process.hcsProcess = openedProcess
96 107
 
97 108
 	// If this is a servicing container, wait on the process synchronously here and
98 109
 	// if it succeeds, wait for it cleanly shutdown and merge into the parent container.
... ...
@@ -109,7 +122,7 @@ func (ctr *container) start() error {
109 109
 
110 110
 	var stdout, stderr io.ReadCloser
111 111
 	var stdin io.WriteCloser
112
-	stdin, stdout, stderr, err = hcsProcess.Stdio()
112
+	stdin, stdout, stderr, err = newProcess.Stdio()
113 113
 	if err != nil {
114 114
 		logrus.Errorf("libcontainerd: failed to get stdio pipes: %s", err)
115 115
 		if err := ctr.terminate(); err != nil {
... ...
@@ -120,7 +133,7 @@ func (ctr *container) start() error {
120 120
 
121 121
 	iopipe := &IOPipe{Terminal: ctr.ociSpec.Process.Terminal}
122 122
 
123
-	iopipe.Stdin = createStdInCloser(stdin, hcsProcess)
123
+	iopipe.Stdin = createStdInCloser(stdin, newProcess)
124 124
 
125 125
 	// Convert io.ReadClosers to io.Readers
126 126
 	if stdout != nil {
... ...
@@ -150,6 +163,7 @@ func (ctr *container) start() error {
150 150
 			State: StateStart,
151 151
 			Pid:   ctr.systemPid, // Not sure this is needed? Double-check monitor.go in daemon BUGBUG @jhowardmsft
152 152
 		}}
153
+	logrus.Debugf("libcontainerd: start() completed OK, %+v", si)
153 154
 	return ctr.client.backend.StateChanged(ctr.containerID, si)
154 155
 
155 156
 }
... ...
@@ -181,10 +195,6 @@ func (ctr *container) waitProcessExitCode(process *process) int {
181 181
 		// has exited to avoid a container being dropped on the floor.
182 182
 	}
183 183
 
184
-	if err := process.hcsProcess.Close(); err != nil {
185
-		logrus.Errorf("libcontainerd: hcsProcess.Close(): %v", err)
186
-	}
187
-
188 184
 	return exitCode
189 185
 }
190 186
 
... ...
@@ -196,6 +206,8 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err
196 196
 	logrus.Debugln("libcontainerd: waitExit() on pid", process.systemPid)
197 197
 
198 198
 	exitCode := ctr.waitProcessExitCode(process)
199
+	// Lock the container while shutting down
200
+	ctr.client.lock(ctr.containerID)
199 201
 
200 202
 	// Assume the container has exited
201 203
 	si := StateInfo{
... ...
@@ -211,6 +223,7 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err
211 211
 	// But it could have been an exec'd process which exited
212 212
 	if !isFirstProcessToStart {
213 213
 		si.State = StateExitProcess
214
+		ctr.cleanProcess(process.friendlyName)
214 215
 	} else {
215 216
 		updatePending, err := ctr.hcsContainer.HasPendingUpdates()
216 217
 		if err != nil {
... ...
@@ -236,6 +249,7 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err
236 236
 			} else if restart {
237 237
 				si.State = StateRestart
238 238
 				ctr.restarting = true
239
+				ctr.client.deleteContainer(ctr.containerID)
239 240
 				waitRestart = wait
240 241
 			}
241 242
 		}
... ...
@@ -243,10 +257,17 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err
243 243
 		// Remove process from list if we have exited
244 244
 		// We need to do so here in case the Message Handler decides to restart it.
245 245
 		if si.State == StateExit {
246
-			ctr.client.deleteContainer(ctr.friendlyName)
246
+			ctr.client.deleteContainer(ctr.containerID)
247 247
 		}
248 248
 	}
249 249
 
250
+	if err := process.hcsProcess.Close(); err != nil {
251
+		logrus.Errorf("libcontainerd: hcsProcess.Close(): %v", err)
252
+	}
253
+
254
+	// Unlock here before we call back into the daemon to update state
255
+	ctr.client.unlock(ctr.containerID)
256
+
250 257
 	// Call into the backend to notify it of the state change.
251 258
 	logrus.Debugf("libcontainerd: waitExit() calling backend.StateChanged %+v", si)
252 259
 	if err := ctr.client.backend.StateChanged(ctr.containerID, si); err != nil {
... ...
@@ -256,7 +277,6 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err
256 256
 		go func() {
257 257
 			err := <-waitRestart
258 258
 			ctr.restarting = false
259
-			ctr.client.deleteContainer(ctr.friendlyName)
260 259
 			if err == nil {
261 260
 				if err = ctr.client.Create(ctr.containerID, "", "", ctr.ociSpec, ctr.options...); err != nil {
262 261
 					logrus.Errorf("libcontainerd: error restarting %v", err)
... ...
@@ -275,6 +295,14 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err
275 275
 	return nil
276 276
 }
277 277
 
278
+// cleanProcess removes process from the map.
279
+// Caller needs to lock container ID before calling this method.
280
+func (ctr *container) cleanProcess(id string) {
281
+	delete(ctr.processes, id)
282
+}
283
+
284
+// shutdown shuts down the container in HCS
285
+// Caller needs to lock container ID before calling this method.
278 286
 func (ctr *container) shutdown() error {
279 287
 	const shutdownTimeout = time.Minute * 5
280 288
 	err := ctr.hcsContainer.Shutdown()
... ...
@@ -296,6 +324,8 @@ func (ctr *container) shutdown() error {
296 296
 	return nil
297 297
 }
298 298
 
299
+// terminate terminates the container in HCS
300
+// Caller needs to lock container ID before calling this method.
299 301
 func (ctr *container) terminate() error {
300 302
 	const terminateTimeout = time.Minute * 5
301 303
 	err := ctr.hcsContainer.Terminate()
... ...
@@ -4,6 +4,7 @@ import (
4 4
 	"io"
5 5
 
6 6
 	"github.com/Microsoft/hcsshim"
7
+	"github.com/docker/docker/pkg/ioutils"
7 8
 )
8 9
 
9 10
 // process keeps the state for both main container process and exec process.
... ...
@@ -29,26 +30,23 @@ func openReaderFromPipe(p io.ReadCloser) io.Reader {
29 29
 	return r
30 30
 }
31 31
 
32
-type stdInCloser struct {
33
-	io.WriteCloser
34
-	hcsshim.Process
35
-}
36
-
37
-func createStdInCloser(pipe io.WriteCloser, process hcsshim.Process) *stdInCloser {
38
-	return &stdInCloser{
39
-		WriteCloser: pipe,
40
-		Process:     process,
41
-	}
42
-}
43
-
44
-func (stdin *stdInCloser) Close() error {
45
-	if err := stdin.WriteCloser.Close(); err != nil {
46
-		return err
47
-	}
32
+func createStdInCloser(pipe io.WriteCloser, process hcsshim.Process) io.WriteCloser {
33
+	return ioutils.NewWriteCloserWrapper(pipe, func() error {
34
+		if err := pipe.Close(); err != nil {
35
+			return err
36
+		}
48 37
 
49
-	return stdin.Process.CloseStdin()
50
-}
38
+		// We do not need to lock container ID here, even though
39
+		// we are calling into hcsshim. This is safe, because the
40
+		// only place that closes this process handle is this method.
41
+		err := process.CloseStdin()
42
+		if err != nil && !hcsshim.IsNotExist(err) {
43
+			// This error will occur if the compute system is currently shutting down
44
+			if perr, ok := err.(*hcsshim.ProcessError); ok && perr.Err != hcsshim.ErrVmcomputeOperationInvalidState {
45
+				return err
46
+			}
47
+		}
51 48
 
52
-func (stdin *stdInCloser) Write(p []byte) (n int, err error) {
53
-	return stdin.WriteCloser.Write(p)
49
+		return process.Close()
50
+	})
54 51
 }