Browse code

Decouple daemon and container to log events.

Create a supervisor interface to let the container monitor to emit events.

Signed-off-by: David Calavera <david.calavera@gmail.com>

David Calavera authored on 2015/11/04 02:33:13
Showing 19 changed files
... ...
@@ -219,7 +219,7 @@ func (daemon *Daemon) containerArchivePath(container *Container, path string) (c
219 219
 		return err
220 220
 	})
221 221
 
222
-	container.logEvent("archive-path")
222
+	daemon.LogContainerEvent(container, "archive-path")
223 223
 
224 224
 	return content, stat, nil
225 225
 }
... ...
@@ -318,7 +318,7 @@ func (daemon *Daemon) containerExtractToDir(container *Container, path string, n
318 318
 		return err
319 319
 	}
320 320
 
321
-	container.logEvent("extract-to-dir")
321
+	daemon.LogContainerEvent(container, "extract-to-dir")
322 322
 
323 323
 	return nil
324 324
 }
... ...
@@ -384,6 +384,6 @@ func (daemon *Daemon) containerCopy(container *Container, resource string) (rc i
384 384
 		container.Unlock()
385 385
 		return err
386 386
 	})
387
-	daemon.logContainerEvent(container, "copy")
387
+	daemon.LogContainerEvent(container, "copy")
388 388
 	return reader, nil
389 389
 }
... ...
@@ -2,7 +2,10 @@ package daemon
2 2
 
3 3
 import (
4 4
 	"io"
5
+	"time"
5 6
 
7
+	"github.com/Sirupsen/logrus"
8
+	"github.com/docker/docker/daemon/logger"
6 9
 	"github.com/docker/docker/pkg/stdcopy"
7 10
 )
8 11
 
... ...
@@ -43,7 +46,7 @@ func (daemon *Daemon) ContainerAttachWithLogs(prefixOrName string, c *ContainerA
43 43
 		stderr = errStream
44 44
 	}
45 45
 
46
-	return container.attachWithLogs(stdin, stdout, stderr, c.Logs, c.Stream)
46
+	return daemon.attachWithLogs(container, stdin, stdout, stderr, c.Logs, c.Stream)
47 47
 }
48 48
 
49 49
 // ContainerWsAttachWithLogsConfig attach with websockets, since all
... ...
@@ -60,5 +63,61 @@ func (daemon *Daemon) ContainerWsAttachWithLogs(prefixOrName string, c *Containe
60 60
 	if err != nil {
61 61
 		return err
62 62
 	}
63
-	return container.attachWithLogs(c.InStream, c.OutStream, c.ErrStream, c.Logs, c.Stream)
63
+	return daemon.attachWithLogs(container, c.InStream, c.OutStream, c.ErrStream, c.Logs, c.Stream)
64
+}
65
+
66
+func (daemon *Daemon) attachWithLogs(container *Container, stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error {
67
+	if logs {
68
+		logDriver, err := container.getLogger()
69
+		if err != nil {
70
+			return err
71
+		}
72
+		cLog, ok := logDriver.(logger.LogReader)
73
+		if !ok {
74
+			return logger.ErrReadLogsNotSupported
75
+		}
76
+		logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1})
77
+
78
+	LogLoop:
79
+		for {
80
+			select {
81
+			case msg, ok := <-logs.Msg:
82
+				if !ok {
83
+					break LogLoop
84
+				}
85
+				if msg.Source == "stdout" && stdout != nil {
86
+					stdout.Write(msg.Line)
87
+				}
88
+				if msg.Source == "stderr" && stderr != nil {
89
+					stderr.Write(msg.Line)
90
+				}
91
+			case err := <-logs.Err:
92
+				logrus.Errorf("Error streaming logs: %v", err)
93
+				break LogLoop
94
+			}
95
+		}
96
+	}
97
+
98
+	daemon.LogContainerEvent(container, "attach")
99
+
100
+	//stream
101
+	if stream {
102
+		var stdinPipe io.ReadCloser
103
+		if stdin != nil {
104
+			r, w := io.Pipe()
105
+			go func() {
106
+				defer w.Close()
107
+				defer logrus.Debugf("Closing buffered stdin pipe")
108
+				io.Copy(w, stdin)
109
+			}()
110
+			stdinPipe = r
111
+		}
112
+		<-container.Attach(stdinPipe, stdout, stderr)
113
+		// If we are in stdinonce mode, wait for the process to end
114
+		// otherwise, simply return
115
+		if container.Config.StdinOnce && !container.Config.Tty {
116
+			container.WaitStop(-1 * time.Second)
117
+		}
118
+	}
119
+	return nil
64 120
 }
... ...
@@ -48,7 +48,8 @@ func (daemon *Daemon) Commit(container *Container, c *ContainerCommitConfig) (*i
48 48
 			return img, err
49 49
 		}
50 50
 	}
51
-	container.logEvent("commit")
51
+
52
+	daemon.LogContainerEvent(container, "commit")
52 53
 	return img, nil
53 54
 }
54 55
 
... ...
@@ -172,15 +172,6 @@ func (container *Container) writeHostConfig() error {
172 172
 	return json.NewEncoder(f).Encode(&container.hostConfig)
173 173
 }
174 174
 
175
-func (container *Container) logEvent(action string) {
176
-	d := container.daemon
177
-	d.EventsService.Log(
178
-		action,
179
-		container.ID,
180
-		container.Config.Image,
181
-	)
182
-}
183
-
184 175
 // GetResourcePath evaluates `path` in the scope of the container's basefs, with proper path
185 176
 // sanitisation. Symlinks are all scoped to the basefs of the container, as
186 177
 // though the container's basefs was `/`.
... ...
@@ -278,7 +269,6 @@ func (container *Container) Resize(h, w int) error {
278 278
 	if err := container.command.ProcessConfig.Terminal.Resize(h, w); err != nil {
279 279
 		return err
280 280
 	}
281
-	container.logEvent("resize")
282 281
 	return nil
283 282
 }
284 283
 
... ...
@@ -380,20 +370,6 @@ func (container *Container) startLogging() error {
380 380
 	return nil
381 381
 }
382 382
 
383
-func (container *Container) waitForStart() error {
384
-	container.monitor = newContainerMonitor(container, container.hostConfig.RestartPolicy)
385
-
386
-	// block until we either receive an error from the initial start of the container's
387
-	// process or until the process is running in the container
388
-	select {
389
-	case <-container.monitor.startSignal:
390
-	case err := <-promise.Go(container.monitor.Start):
391
-		return err
392
-	}
393
-
394
-	return nil
395
-}
396
-
397 383
 func (container *Container) getProcessLabel() string {
398 384
 	// even if we have a process label return "" if we are running
399 385
 	// in privileged mode
... ...
@@ -424,62 +400,6 @@ func (container *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr
424 424
 	return attach(&container.streamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr)
425 425
 }
426 426
 
427
-func (container *Container) attachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error {
428
-	if logs {
429
-		logDriver, err := container.getLogger()
430
-		if err != nil {
431
-			return err
432
-		}
433
-		cLog, ok := logDriver.(logger.LogReader)
434
-		if !ok {
435
-			return logger.ErrReadLogsNotSupported
436
-		}
437
-		logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1})
438
-
439
-	LogLoop:
440
-		for {
441
-			select {
442
-			case msg, ok := <-logs.Msg:
443
-				if !ok {
444
-					break LogLoop
445
-				}
446
-				if msg.Source == "stdout" && stdout != nil {
447
-					stdout.Write(msg.Line)
448
-				}
449
-				if msg.Source == "stderr" && stderr != nil {
450
-					stderr.Write(msg.Line)
451
-				}
452
-			case err := <-logs.Err:
453
-				logrus.Errorf("Error streaming logs: %v", err)
454
-				break LogLoop
455
-			}
456
-		}
457
-	}
458
-
459
-	container.logEvent("attach")
460
-
461
-	//stream
462
-	if stream {
463
-		var stdinPipe io.ReadCloser
464
-		if stdin != nil {
465
-			r, w := io.Pipe()
466
-			go func() {
467
-				defer w.Close()
468
-				defer logrus.Debugf("Closing buffered stdin pipe")
469
-				io.Copy(w, stdin)
470
-			}()
471
-			stdinPipe = r
472
-		}
473
-		<-container.Attach(stdinPipe, stdout, stderr)
474
-		// If we are in stdinonce mode, wait for the process to end
475
-		// otherwise, simply return
476
-		if container.Config.StdinOnce && !container.Config.Tty {
477
-			container.WaitStop(-1 * time.Second)
478
-		}
479
-	}
480
-	return nil
481
-}
482
-
483 427
 func attach(streamConfig *streamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
484 428
 	var (
485 429
 		cStdout, cStderr io.ReadCloser
... ...
@@ -127,7 +127,7 @@ func (daemon *Daemon) create(params *ContainerCreateConfig) (retC *Container, re
127 127
 		logrus.Errorf("Error saving new container to disk: %v", err)
128 128
 		return nil, err
129 129
 	}
130
-	daemon.logContainerEvent(container, "create")
130
+	daemon.LogContainerEvent(container, "create")
131 131
 	return container, nil
132 132
 }
133 133
 
... ...
@@ -111,7 +111,7 @@ func (daemon *Daemon) rm(container *Container, forceRemove bool) (err error) {
111 111
 			daemon.idIndex.Delete(container.ID)
112 112
 			daemon.containers.Delete(container.ID)
113 113
 			os.RemoveAll(container.root)
114
-			container.logEvent("destroy")
114
+			daemon.LogContainerEvent(container, "destroy")
115 115
 		}
116 116
 	}()
117 117
 
... ...
@@ -140,7 +140,7 @@ func (daemon *Daemon) rm(container *Container, forceRemove bool) (err error) {
140 140
 	daemon.idIndex.Delete(container.ID)
141 141
 	daemon.containers.Delete(container.ID)
142 142
 
143
-	container.logEvent("destroy")
143
+	daemon.LogContainerEvent(container, "destroy")
144 144
 	return nil
145 145
 }
146 146
 
... ...
@@ -1,7 +1,7 @@
1 1
 package daemon
2 2
 
3
-// logContainerEvent generates an event related to a container.
4
-func (daemon *Daemon) logContainerEvent(container *Container, action string) {
3
+// LogContainerEvent generates an event related to a container.
4
+func (daemon *Daemon) LogContainerEvent(container *Container, action string) {
5 5
 	daemon.EventsService.Log(
6 6
 		action,
7 7
 		container.ID,
... ...
@@ -188,7 +188,7 @@ func (d *Daemon) ContainerExecCreate(config *runconfig.ExecConfig) (string, erro
188 188
 
189 189
 	d.registerExecCommand(ExecConfig)
190 190
 
191
-	container.logEvent("exec_create: " + ExecConfig.ProcessConfig.Entrypoint + " " + strings.Join(ExecConfig.ProcessConfig.Arguments, " "))
191
+	d.LogContainerEvent(container, "exec_create: "+ExecConfig.ProcessConfig.Entrypoint+" "+strings.Join(ExecConfig.ProcessConfig.Arguments, " "))
192 192
 
193 193
 	return ExecConfig.ID, nil
194 194
 }
... ...
@@ -216,7 +216,7 @@ func (d *Daemon) ContainerExecStart(name string, stdin io.ReadCloser, stdout io.
216 216
 
217 217
 	logrus.Debugf("starting exec command %s in container %s", ec.ID, ec.Container.ID)
218 218
 	container := ec.Container
219
-	container.logEvent("exec_start: " + ec.ProcessConfig.Entrypoint + " " + strings.Join(ec.ProcessConfig.Arguments, " "))
219
+	d.LogContainerEvent(container, "exec_start: "+ec.ProcessConfig.Entrypoint+" "+strings.Join(ec.ProcessConfig.Arguments, " "))
220 220
 
221 221
 	if ec.OpenStdin {
222 222
 		r, w := io.Pipe()
... ...
@@ -49,6 +49,6 @@ func (daemon *Daemon) containerExport(container *Container) (archive.Archive, er
49 49
 		daemon.Unmount(container)
50 50
 		return err
51 51
 	})
52
-	daemon.logContainerEvent(container, "export")
52
+	daemon.LogContainerEvent(container, "export")
53 53
 	return arch, err
54 54
 }
... ...
@@ -71,7 +71,7 @@ func (daemon *Daemon) killWithSignal(container *Container, sig int) error {
71 71
 		return err
72 72
 	}
73 73
 
74
-	daemon.logContainerEvent(container, "kill")
74
+	daemon.LogContainerEvent(container, "kill")
75 75
 	return nil
76 76
 }
77 77
 
... ...
@@ -17,6 +17,12 @@ const (
17 17
 	loggerCloseTimeout   = 10 * time.Second
18 18
 )
19 19
 
20
+// containerSupervisor defines the interface that a supervisor must implement
21
+type containerSupervisor interface {
22
+	// LogContainerEvent generates events related to a given container
23
+	LogContainerEvent(*Container, string)
24
+}
25
+
20 26
 // containerMonitor monitors the execution of a container's main process.
21 27
 // If a restart policy is specified for the container the monitor will ensure that the
22 28
 // process is restarted based on the rules of the policy.  When the container is finally stopped
... ...
@@ -25,6 +31,9 @@ const (
25 25
 type containerMonitor struct {
26 26
 	mux sync.Mutex
27 27
 
28
+	// supervisor keeps track of the container and the events it generates
29
+	supervisor containerSupervisor
30
+
28 31
 	// container is the container being monitored
29 32
 	container *Container
30 33
 
... ...
@@ -57,8 +66,9 @@ type containerMonitor struct {
57 57
 
58 58
 // newContainerMonitor returns an initialized containerMonitor for the provided container
59 59
 // honoring the provided restart policy
60
-func newContainerMonitor(container *Container, policy runconfig.RestartPolicy) *containerMonitor {
60
+func (daemon *Daemon) newContainerMonitor(container *Container, policy runconfig.RestartPolicy) *containerMonitor {
61 61
 	return &containerMonitor{
62
+		supervisor:    daemon,
62 63
 		container:     container,
63 64
 		restartPolicy: policy,
64 65
 		timeIncrement: defaultTimeIncrement,
... ...
@@ -138,7 +148,7 @@ func (m *containerMonitor) Start() error {
138 138
 
139 139
 		pipes := execdriver.NewPipes(m.container.stdin, m.container.stdout, m.container.stderr, m.container.Config.OpenStdin)
140 140
 
141
-		m.container.logEvent("start")
141
+		m.logEvent("start")
142 142
 
143 143
 		m.lastStartTime = time.Now()
144 144
 
... ...
@@ -162,7 +172,7 @@ func (m *containerMonitor) Start() error {
162 162
 
163 163
 		if m.shouldRestart(exitStatus.ExitCode) {
164 164
 			m.container.setRestarting(&exitStatus)
165
-			m.container.logEvent("die")
165
+			m.logEvent("die")
166 166
 			m.resetContainer(true)
167 167
 
168 168
 			// sleep with a small time increment between each restart to help avoid issues cased by quickly
... ...
@@ -177,7 +187,7 @@ func (m *containerMonitor) Start() error {
177 177
 			continue
178 178
 		}
179 179
 
180
-		m.container.logEvent("die")
180
+		m.logEvent("die")
181 181
 		m.resetContainer(true)
182 182
 		return err
183 183
 	}
... ...
@@ -249,7 +259,7 @@ func (m *containerMonitor) callback(processConfig *execdriver.ProcessConfig, pid
249 249
 	go func() {
250 250
 		_, ok := <-chOOM
251 251
 		if ok {
252
-			m.container.logEvent("oom")
252
+			m.logEvent("oom")
253 253
 		}
254 254
 	}()
255 255
 
... ...
@@ -345,3 +355,7 @@ func (m *containerMonitor) resetContainer(lock bool) {
345 345
 		SysProcAttr: c.SysProcAttr,
346 346
 	}
347 347
 }
348
+
349
+func (m *containerMonitor) logEvent(action string) {
350
+	m.supervisor.LogContainerEvent(m.container, action)
351
+}
... ...
@@ -38,6 +38,6 @@ func (daemon *Daemon) containerPause(container *Container) error {
38 38
 		return err
39 39
 	}
40 40
 	container.Paused = true
41
-	daemon.logContainerEvent(container, "pause")
41
+	daemon.LogContainerEvent(container, "pause")
42 42
 	return nil
43 43
 }
... ...
@@ -1,10 +1,11 @@
1 1
 package daemon
2 2
 
3 3
 import (
4
+	"strings"
5
+
4 6
 	"github.com/Sirupsen/logrus"
5 7
 	derr "github.com/docker/docker/errors"
6 8
 	"github.com/docker/libnetwork"
7
-	"strings"
8 9
 )
9 10
 
10 11
 // ContainerRename changes the name of a container, using the oldName
... ...
@@ -54,7 +55,7 @@ func (daemon *Daemon) ContainerRename(oldName, newName string) error {
54 54
 	}
55 55
 
56 56
 	if !container.Running {
57
-		container.logEvent("rename")
57
+		daemon.LogContainerEvent(container, "rename")
58 58
 		return nil
59 59
 	}
60 60
 
... ...
@@ -78,6 +79,6 @@ func (daemon *Daemon) ContainerRename(oldName, newName string) error {
78 78
 		return err
79 79
 	}
80 80
 
81
-	container.logEvent("rename")
81
+	daemon.LogContainerEvent(container, "rename")
82 82
 	return nil
83 83
 }
... ...
@@ -8,7 +8,10 @@ func (daemon *Daemon) ContainerResize(name string, height, width int) error {
8 8
 		return err
9 9
 	}
10 10
 
11
-	return container.Resize(height, width)
11
+	if err = container.Resize(height, width); err == nil {
12
+		daemon.LogContainerEvent(container, "resize")
13
+	}
14
+	return err
12 15
 }
13 16
 
14 17
 // ContainerExecResize changes the size of the TTY of the process
... ...
@@ -41,6 +41,6 @@ func (daemon *Daemon) containerRestart(container *Container, seconds int) error
41 41
 		return err
42 42
 	}
43 43
 
44
-	daemon.logContainerEvent(container, "restart")
44
+	daemon.LogContainerEvent(container, "restart")
45 45
 	return nil
46 46
 }
... ...
@@ -4,6 +4,7 @@ import (
4 4
 	"runtime"
5 5
 
6 6
 	derr "github.com/docker/docker/errors"
7
+	"github.com/docker/docker/pkg/promise"
7 8
 	"github.com/docker/docker/runconfig"
8 9
 	"github.com/docker/docker/utils"
9 10
 )
... ...
@@ -83,7 +84,7 @@ func (daemon *Daemon) containerStart(container *Container) (err error) {
83 83
 			}
84 84
 			container.toDisk()
85 85
 			container.cleanup()
86
-			daemon.logContainerEvent(container, "die")
86
+			daemon.LogContainerEvent(container, "die")
87 87
 		}
88 88
 	}()
89 89
 
... ...
@@ -123,5 +124,19 @@ func (daemon *Daemon) containerStart(container *Container) (err error) {
123 123
 	mounts = append(mounts, container.ipcMounts()...)
124 124
 
125 125
 	container.command.Mounts = mounts
126
-	return container.waitForStart()
126
+	return daemon.waitForStart(container)
127
+}
128
+
129
+func (daemon *Daemon) waitForStart(container *Container) error {
130
+	container.monitor = daemon.newContainerMonitor(container, container.hostConfig.RestartPolicy)
131
+
132
+	// block until we either receive an error from the initial start of the container's
133
+	// process or until the process is running in the container
134
+	select {
135
+	case <-container.monitor.startSignal:
136
+	case err := <-promise.Go(container.monitor.Start):
137
+		return err
138
+	}
139
+
140
+	return nil
127 141
 }
... ...
@@ -55,6 +55,6 @@ func (daemon *Daemon) containerStop(container *Container, seconds int) error {
55 55
 		}
56 56
 	}
57 57
 
58
-	daemon.logContainerEvent(container, "stop")
58
+	daemon.LogContainerEvent(container, "stop")
59 59
 	return nil
60 60
 }
... ...
@@ -76,6 +76,6 @@ func (daemon *Daemon) ContainerTop(name string, psArgs string) (*types.Container
76 76
 			}
77 77
 		}
78 78
 	}
79
-	container.logEvent("top")
79
+	daemon.LogContainerEvent(container, "top")
80 80
 	return procList, nil
81 81
 }
... ...
@@ -38,6 +38,6 @@ func (daemon *Daemon) containerUnpause(container *Container) error {
38 38
 	}
39 39
 
40 40
 	container.Paused = false
41
-	daemon.logContainerEvent(container, "unpause")
41
+	daemon.LogContainerEvent(container, "unpause")
42 42
 	return nil
43 43
 }