Browse code

add health check in docker cluster

Signed-off-by: runshenzhu <runshen.zhu@gmail.com>

runshenzhu authored on 2016/06/28 10:08:56
Showing 6 changed files
... ...
@@ -2,10 +2,13 @@ package executor
2 2
 
3 3
 import (
4 4
 	"io"
5
+	"time"
5 6
 
6 7
 	clustertypes "github.com/docker/docker/daemon/cluster/provider"
7 8
 	"github.com/docker/engine-api/types"
8 9
 	"github.com/docker/engine-api/types/container"
10
+	"github.com/docker/engine-api/types/events"
11
+	"github.com/docker/engine-api/types/filters"
9 12
 	"github.com/docker/engine-api/types/network"
10 13
 	"github.com/docker/libnetwork/cluster"
11 14
 	networktypes "github.com/docker/libnetwork/types"
... ...
@@ -33,4 +36,6 @@ type Backend interface {
33 33
 	SetNetworkBootstrapKeys([]*networktypes.EncryptionKey) error
34 34
 	SetClusterProvider(provider cluster.Provider)
35 35
 	IsSwarmCompatible() error
36
+	SubscribeToEvents(since, until time.Time, filter filters.Args) ([]events.Message, chan interface{})
37
+	UnsubscribeFromEvents(listener chan interface{})
36 38
 }
... ...
@@ -7,11 +7,13 @@ import (
7 7
 	"io"
8 8
 	"strings"
9 9
 	"syscall"
10
+	"time"
10 11
 
11 12
 	"github.com/Sirupsen/logrus"
12 13
 	"github.com/docker/docker/api/server/httputils"
13 14
 	executorpkg "github.com/docker/docker/daemon/cluster/executor"
14 15
 	"github.com/docker/engine-api/types"
16
+	"github.com/docker/engine-api/types/events"
15 17
 	"github.com/docker/engine-api/types/versions"
16 18
 	"github.com/docker/libnetwork"
17 19
 	"github.com/docker/swarmkit/api"
... ...
@@ -168,9 +170,40 @@ func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, er
168 168
 
169 169
 // events issues a call to the events API and returns a channel with all
170 170
 // events. The stream of events can be shutdown by cancelling the context.
171
-//
172
-// A chan struct{} is returned that will be closed if the event processing
173
-// fails and needs to be restarted.
171
+func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
172
+	log.G(ctx).Debugf("waiting on events")
173
+	buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
174
+	eventsq := make(chan events.Message, len(buffer))
175
+
176
+	for _, event := range buffer {
177
+		eventsq <- event
178
+	}
179
+
180
+	go func() {
181
+		defer c.backend.UnsubscribeFromEvents(l)
182
+
183
+		for {
184
+			select {
185
+			case ev := <-l:
186
+				jev, ok := ev.(events.Message)
187
+				if !ok {
188
+					log.G(ctx).Warnf("unexpected event message: %q", ev)
189
+					continue
190
+				}
191
+				select {
192
+				case eventsq <- jev:
193
+				case <-ctx.Done():
194
+					return
195
+				}
196
+			case <-ctx.Done():
197
+				return
198
+			}
199
+		}
200
+	}()
201
+
202
+	return eventsq
203
+}
204
+
174 205
 func (c *containerAdapter) wait(ctx context.Context) error {
175 206
 	return c.backend.ContainerWaitWithContext(ctx, c.container.name())
176 207
 }
... ...
@@ -13,6 +13,8 @@ import (
13 13
 	"github.com/docker/docker/reference"
14 14
 	"github.com/docker/engine-api/types"
15 15
 	enginecontainer "github.com/docker/engine-api/types/container"
16
+	"github.com/docker/engine-api/types/events"
17
+	"github.com/docker/engine-api/types/filters"
16 18
 	"github.com/docker/engine-api/types/network"
17 19
 	"github.com/docker/swarmkit/agent/exec"
18 20
 	"github.com/docker/swarmkit/api"
... ...
@@ -420,3 +422,11 @@ func (c *containerConfig) networkCreateRequest(name string) (clustertypes.Networ
420 420
 
421 421
 	return clustertypes.NetworkCreateRequest{na.Network.ID, types.NetworkCreateRequest{Name: name, NetworkCreate: options}}, nil
422 422
 }
423
+
424
+func (c containerConfig) eventFilter() filters.Args {
425
+	filter := filters.NewArgs()
426
+	filter.Add("type", events.ContainerEventType)
427
+	filter.Add("name", c.name())
428
+	filter.Add("label", fmt.Sprintf("%v.task.id=%v", systemLabelPrefix, c.task.ID))
429
+	return filter
430
+}
... ...
@@ -6,6 +6,7 @@ import (
6 6
 
7 7
 	executorpkg "github.com/docker/docker/daemon/cluster/executor"
8 8
 	"github.com/docker/engine-api/types"
9
+	"github.com/docker/engine-api/types/events"
9 10
 	"github.com/docker/swarmkit/agent/exec"
10 11
 	"github.com/docker/swarmkit/api"
11 12
 	"github.com/docker/swarmkit/log"
... ...
@@ -153,20 +154,39 @@ func (r *controller) Wait(pctx context.Context) error {
153 153
 	ctx, cancel := context.WithCancel(pctx)
154 154
 	defer cancel()
155 155
 
156
+	healthErr := make(chan error, 1)
157
+	go func() {
158
+		ectx, cancel := context.WithCancel(ctx) // cancel event context on first event
159
+		defer cancel()
160
+		if err := r.checkHealth(ectx); err == ErrContainerUnhealthy {
161
+			healthErr <- ErrContainerUnhealthy
162
+			if err := r.Shutdown(ectx); err != nil {
163
+				log.G(ectx).WithError(err).Debug("shutdown failed on unhealthy")
164
+			}
165
+		}
166
+	}()
167
+
156 168
 	err := r.adapter.wait(ctx)
157 169
 	if ctx.Err() != nil {
158 170
 		return ctx.Err()
159 171
 	}
172
+
160 173
 	if err != nil {
161 174
 		ee := &exitError{}
162
-		if err.Error() != "" {
163
-			ee.cause = err
164
-		}
165 175
 		if ec, ok := err.(exec.ExitCoder); ok {
166 176
 			ee.code = ec.ExitCode()
167 177
 		}
178
+		select {
179
+		case e := <-healthErr:
180
+			ee.cause = e
181
+		default:
182
+			if err.Error() != "" {
183
+				ee.cause = err
184
+			}
185
+		}
168 186
 		return ee
169 187
 	}
188
+
170 189
 	return nil
171 190
 }
172 191
 
... ...
@@ -250,6 +270,21 @@ func (r *controller) Close() error {
250 250
 	return nil
251 251
 }
252 252
 
253
+func (r *controller) matchevent(event events.Message) bool {
254
+	if event.Type != events.ContainerEventType {
255
+		return false
256
+	}
257
+
258
+	// TODO(stevvooe): Filter based on ID matching, in addition to name.
259
+
260
+	// Make sure the events are for this container.
261
+	if event.Actor.Attributes["name"] != r.adapter.container.name() {
262
+		return false
263
+	}
264
+
265
+	return true
266
+}
267
+
253 268
 func (r *controller) checkClosed() error {
254 269
 	select {
255 270
 	case <-r.closed:
... ...
@@ -289,3 +324,26 @@ func (e *exitError) ExitCode() int {
289 289
 func (e *exitError) Cause() error {
290 290
 	return e.cause
291 291
 }
292
+
293
+// checkHealth blocks until unhealthy container is detected or ctx exits
294
+func (r *controller) checkHealth(ctx context.Context) error {
295
+	eventq := r.adapter.events(ctx)
296
+
297
+	for {
298
+		select {
299
+		case <-ctx.Done():
300
+			return nil
301
+		case <-r.closed:
302
+			return nil
303
+		case event := <-eventq:
304
+			if !r.matchevent(event) {
305
+				continue
306
+			}
307
+
308
+			switch event.Action {
309
+			case "health_status: unhealthy":
310
+				return ErrContainerUnhealthy
311
+			}
312
+		}
313
+	}
314
+}
... ...
@@ -9,4 +9,7 @@ var (
9 9
 	// ErrContainerDestroyed returned when a container is prematurely destroyed
10 10
 	// during a wait call.
11 11
 	ErrContainerDestroyed = fmt.Errorf("dockerexec: container destroyed")
12
+
13
+	// ErrContainerUnhealthy returned if controller detects the health check failure
14
+	ErrContainerUnhealthy = fmt.Errorf("dockerexec: unhealthy container")
12 15
 )
13 16
new file mode 100644
... ...
@@ -0,0 +1,102 @@
0
+// +build !windows
1
+
2
+package container
3
+
4
+import (
5
+	"testing"
6
+	"time"
7
+
8
+	"github.com/docker/docker/container"
9
+	"github.com/docker/docker/daemon"
10
+	"github.com/docker/docker/daemon/events"
11
+	containertypes "github.com/docker/engine-api/types/container"
12
+	"github.com/docker/swarmkit/api"
13
+	"golang.org/x/net/context"
14
+)
15
+
16
+func TestHealthStates(t *testing.T) {
17
+
18
+	// set up environment: events, task, container ....
19
+	e := events.New()
20
+	_, l, _ := e.Subscribe()
21
+	defer e.Evict(l)
22
+
23
+	task := &api.Task{
24
+		ID:        "id",
25
+		ServiceID: "sid",
26
+		Spec: api.TaskSpec{
27
+			Runtime: &api.TaskSpec_Container{
28
+				Container: &api.ContainerSpec{
29
+					Image: "image_name",
30
+					Labels: map[string]string{
31
+						"com.docker.swarm.task.id": "id",
32
+					},
33
+				},
34
+			},
35
+		},
36
+		Annotations: api.Annotations{Name: "name"},
37
+	}
38
+
39
+	c := &container.Container{
40
+		CommonContainer: container.CommonContainer{
41
+			ID:   "id",
42
+			Name: "name",
43
+			Config: &containertypes.Config{
44
+				Image: "image_name",
45
+				Labels: map[string]string{
46
+					"com.docker.swarm.task.id": "id",
47
+				},
48
+			},
49
+		},
50
+	}
51
+
52
+	daemon := &daemon.Daemon{
53
+		EventsService: e,
54
+	}
55
+
56
+	controller, err := newController(daemon, task)
57
+	if err != nil {
58
+		t.Fatalf("create controller fail %v", err)
59
+	}
60
+
61
+	errChan := make(chan error, 1)
62
+	ctx, cancel := context.WithCancel(context.Background())
63
+	defer cancel()
64
+
65
+	// fire checkHealth
66
+	go func() {
67
+		err := controller.checkHealth(ctx)
68
+		select {
69
+		case errChan <- err:
70
+		case <-ctx.Done():
71
+		}
72
+	}()
73
+
74
+	// send an event and expect to get expectedErr
75
+	// if expectedErr is nil, shouldn't get any error
76
+	logAndExpect := func(msg string, expectedErr error) {
77
+		daemon.LogContainerEvent(c, msg)
78
+
79
+		timer := time.NewTimer(1 * time.Second)
80
+		defer timer.Stop()
81
+
82
+		select {
83
+		case err := <-errChan:
84
+			if err != expectedErr {
85
+				t.Fatalf("expect error %v, but get %v", expectedErr, err)
86
+			}
87
+		case <-timer.C:
88
+			if expectedErr != nil {
89
+				t.Fatalf("time limit exceeded, didn't get expected error")
90
+			}
91
+		}
92
+	}
93
+
94
+	// events that are ignored by checkHealth
95
+	logAndExpect("health_status: running", nil)
96
+	logAndExpect("health_status: healthy", nil)
97
+	logAndExpect("die", nil)
98
+
99
+	// unhealthy event will be caught by checkHealth
100
+	logAndExpect("health_status: unhealthy", ErrContainerUnhealthy)
101
+}