Browse code

Block task starting until node attachments are ready

Blocks the execution of tasks during the Prepare phase until there
exists an IP address for every overlay network in use by the task. This
prevents a task from starting before the NetworkAttachment containing
the IP address has been sent down to the node.

Includes a basic test for the correct use case.

Signed-off-by: Drew Erny <drew.erny@docker.com>

Drew Erny authored on 2018/08/08 03:09:04
Showing 4 changed files
... ...
@@ -32,6 +32,9 @@ import (
32 32
 	"golang.org/x/time/rate"
33 33
 )
34 34
 
35
+// nodeAttachmentReadyInterval is the interval to poll
36
+const nodeAttachmentReadyInterval = 100 * time.Millisecond
37
+
35 38
 // containerAdapter conducts remote operations for a container. All calls
36 39
 // are mostly naked calls to the client API, seeded with information from
37 40
 // containerConfig.
... ...
@@ -146,6 +149,55 @@ func (c *containerAdapter) pullImage(ctx context.Context) error {
146 146
 	return nil
147 147
 }
148 148
 
149
+// waitNodeAttachments validates that NetworkAttachments exist on this node
150
+// for every network in use by this task. It blocks until the network
151
+// attachments are ready, or the context times out. If it returns nil, then the
152
+// node's network attachments are all there.
153
+func (c *containerAdapter) waitNodeAttachments(ctx context.Context) error {
154
+	// to do this, we're going to get the attachment store and try getting the
155
+	// IP address for each network. if any network comes back not existing,
156
+	// we'll wait and try again.
157
+	attachmentStore := c.backend.GetAttachmentStore()
158
+	if attachmentStore == nil {
159
+		return fmt.Errorf("error getting attachment store")
160
+	}
161
+
162
+	// essentially, we're long-polling here. this is really sub-optimal, but a
163
+	// better solution based off signaling channels would require a more
164
+	// substantial rearchitecture and probably not be worth our time in terms
165
+	// of performance gains.
166
+	poll := time.NewTicker(nodeAttachmentReadyInterval)
167
+	defer poll.Stop()
168
+	for {
169
+		// set a flag ready to true. if we try to get a network IP that doesn't
170
+		// exist yet, we will set this flag to "false"
171
+		ready := true
172
+		for _, attachment := range c.container.networksAttachments {
173
+			// we only need node attachments (IP address) for overlay networks
174
+			// TODO(dperny): unsure if this will work with other network
175
+			// drivers, but i also don't think other network drivers use the
176
+			// node attachment IP address.
177
+			if attachment.Network.DriverState.Name == "overlay" {
178
+				if _, exists := attachmentStore.GetIPForNetwork(attachment.Network.ID); !exists {
179
+					ready = false
180
+				}
181
+			}
182
+		}
183
+
184
+		// if everything is ready here, then we can just return no error
185
+		if ready {
186
+			return nil
187
+		}
188
+
189
+		// otherwise, try polling again, or wait for context canceled.
190
+		select {
191
+		case <-ctx.Done():
192
+			return fmt.Errorf("node is missing network attachments, ip addresses may be exhausted")
193
+		case <-poll.C:
194
+		}
195
+	}
196
+}
197
+
149 198
 func (c *containerAdapter) createNetworks(ctx context.Context) error {
150 199
 	for name := range c.container.networksAttachments {
151 200
 		ncr, err := c.container.networkCreateRequest(name)
152 201
new file mode 100644
... ...
@@ -0,0 +1,139 @@
0
+package container // import "github.com/docker/docker/daemon/cluster/executor/container"
1
+
2
+import (
3
+	"testing"
4
+
5
+	"context"
6
+	"time"
7
+
8
+	"github.com/docker/docker/daemon"
9
+	"github.com/docker/swarmkit/api"
10
+)
11
+
12
+// TestWaitNodeAttachment tests that the waitNodeAttachment method successfully
13
+// blocks until the required node attachment becomes available.
14
+func TestWaitNodeAttachment(t *testing.T) {
15
+	emptyDaemon := &daemon.Daemon{}
16
+
17
+	// the daemon creates an attachment store as an object, which means it's
18
+	// initialized to an empty store by default. get that attachment store here
19
+	// and add some attachments to it
20
+	attachmentStore := emptyDaemon.GetAttachmentStore()
21
+
22
+	// create a set of attachments to put into the attahcment store
23
+	attachments := map[string]string{
24
+		"network1": "10.1.2.3/24",
25
+	}
26
+
27
+	// this shouldn't fail, but check it anyway just in case
28
+	err := attachmentStore.ResetAttachments(attachments)
29
+	if err != nil {
30
+		t.Fatalf("error resetting attachments: %v", err)
31
+	}
32
+
33
+	// create a containerConfig to put in the adapter. we don't need the task,
34
+	// actually; only the networkAttachments are needed.
35
+	container := &containerConfig{
36
+		task: nil,
37
+		networksAttachments: map[string]*api.NetworkAttachment{
38
+			// network1 is already present in the attachment store.
39
+			"network1": {
40
+				Network: &api.Network{
41
+					ID: "network1",
42
+					DriverState: &api.Driver{
43
+						Name: "overlay",
44
+					},
45
+				},
46
+			},
47
+			// network2 is not yet present in the attachment store, and we
48
+			// should block while waiting for it.
49
+			"network2": {
50
+				Network: &api.Network{
51
+					ID: "network2",
52
+					DriverState: &api.Driver{
53
+						Name: "overlay",
54
+					},
55
+				},
56
+			},
57
+			// localnetwork is not and will never be in the attachment store,
58
+			// but we should not block on it, because it is not an overlay
59
+			// network
60
+			"localnetwork": {
61
+				Network: &api.Network{
62
+					ID: "localnetwork",
63
+					DriverState: &api.Driver{
64
+						Name: "bridge",
65
+					},
66
+				},
67
+			},
68
+		},
69
+	}
70
+
71
+	// we don't create an adapter using the newContainerAdapter package,
72
+	// because it does a bunch of checks and validations. instead, create one
73
+	// "from scratch" so we only have the fields we need.
74
+	adapter := &containerAdapter{
75
+		backend:   emptyDaemon,
76
+		container: container,
77
+	}
78
+
79
+	// create a context to do call the method with
80
+	ctx, cancel := context.WithCancel(context.Background())
81
+	defer cancel()
82
+
83
+	// create a channel to allow the goroutine that we run the method call in
84
+	// to signal that it's done.
85
+	doneChan := make(chan struct{})
86
+
87
+	// store the error return value of waitNodeAttachments in this variable
88
+	var waitNodeAttachmentsErr error
89
+	// NOTE(dperny): be careful running goroutines in test code. if a test
90
+	// terminates with ie t.Fatalf or a failed requirement, runtime.Goexit gets
91
+	// called, which does run defers but does not clean up child goroutines.
92
+	// we defer canceling the context here, which should stop this goroutine
93
+	// from leaking
94
+	go func() {
95
+		waitNodeAttachmentsErr = adapter.waitNodeAttachments(ctx)
96
+		// signal that we've completed
97
+		close(doneChan)
98
+	}()
99
+
100
+	// wait 200ms to allow the waitNodeAttachments call to spin for a bit
101
+	time.Sleep(200 * time.Millisecond)
102
+	select {
103
+	case <-doneChan:
104
+		if waitNodeAttachmentsErr == nil {
105
+			t.Fatalf("waitNodeAttachments exited early with no error")
106
+		} else {
107
+			t.Fatalf(
108
+				"waitNodeAttachments exited early with an error: %v",
109
+				waitNodeAttachmentsErr,
110
+			)
111
+		}
112
+	default:
113
+		// allow falling through; this is the desired case
114
+	}
115
+
116
+	// now update the node attachments to include another network attachment
117
+	attachments["network2"] = "10.3.4.5/24"
118
+	err = attachmentStore.ResetAttachments(attachments)
119
+	if err != nil {
120
+		t.Fatalf("error resetting attachments: %v", err)
121
+	}
122
+
123
+	// now wait 200 ms for waitNodeAttachments to pick up the change
124
+	time.Sleep(200 * time.Millisecond)
125
+
126
+	// and check that waitNodeAttachments has exited with no error
127
+	select {
128
+	case <-doneChan:
129
+		if waitNodeAttachmentsErr != nil {
130
+			t.Fatalf(
131
+				"waitNodeAttachments returned an error: %v",
132
+				waitNodeAttachmentsErr,
133
+			)
134
+		}
135
+	default:
136
+		t.Fatalf("waitNodeAttachments did not exit yet, but should have")
137
+	}
138
+}
... ...
@@ -23,6 +23,10 @@ import (
23 23
 
24 24
 const defaultGossipConvergeDelay = 2 * time.Second
25 25
 
26
+// waitNodeAttachmentsTimeout defines the total period of time we should wait
27
+// for node attachments to be ready before giving up on starting a task
28
+const waitNodeAttachmentsTimeout = 30 * time.Second
29
+
26 30
 // controller implements agent.Controller against docker's API.
27 31
 //
28 32
 // Most operations against docker's API are done through the container name,
... ...
@@ -98,6 +102,25 @@ func (r *controller) Prepare(ctx context.Context) error {
98 98
 		return err
99 99
 	}
100 100
 
101
+	// Before we create networks, we need to make sure that the node has all of
102
+	// the network attachments that the task needs. This will block until that
103
+	// is the case or the context has expired.
104
+	// NOTE(dperny): Prepare doesn't time out on its own (that is, the context
105
+	// passed in does not expire after any period of time), which means if the
106
+	// node attachment never arrives (for example, if the network's IP address
107
+	// space is exhausted), then the tasks on the node will park in PREPARING
108
+	// forever (or until the node dies). To avoid this case, we create a new
109
+	// context with a fixed deadline, and give up. In normal operation, a node
110
+	// update with the node IP address should come in hot on the tail of the
111
+	// task being assigned to the node, and this should exit on the order of
112
+	// milliseconds, but to be extra conservative we'll give it 30 seconds to
113
+	// time out before giving up.
114
+	waitNodeAttachmentsContext, waitCancel := context.WithTimeout(ctx, waitNodeAttachmentsTimeout)
115
+	defer waitCancel()
116
+	if err := r.adapter.waitNodeAttachments(waitNodeAttachmentsContext); err != nil {
117
+		return err
118
+	}
119
+
101 120
 	// Make sure all the networks that the task needs are created.
102 121
 	if err := r.adapter.createNetworks(ctx); err != nil {
103 122
 		return err
... ...
@@ -2,6 +2,7 @@ package network // import "github.com/docker/docker/daemon/network"
2 2
 
3 3
 import (
4 4
 	"net"
5
+	"sync"
5 6
 
6 7
 	networktypes "github.com/docker/docker/api/types/network"
7 8
 	clustertypes "github.com/docker/docker/daemon/cluster/provider"
... ...
@@ -37,6 +38,7 @@ type EndpointSettings struct {
37 37
 
38 38
 // AttachmentStore stores the load balancer IP address for a network id.
39 39
 type AttachmentStore struct {
40
+	sync.Mutex
40 41
 	//key: networkd id
41 42
 	//value: load balancer ip address
42 43
 	networkToNodeLBIP map[string]net.IP
... ...
@@ -45,7 +47,9 @@ type AttachmentStore struct {
45 45
 // ResetAttachments clears any existing load balancer IP to network mapping and
46 46
 // sets the mapping to the given attachments.
47 47
 func (store *AttachmentStore) ResetAttachments(attachments map[string]string) error {
48
-	store.ClearAttachments()
48
+	store.Lock()
49
+	defer store.Unlock()
50
+	store.clearAttachments()
49 51
 	for nid, nodeIP := range attachments {
50 52
 		ip, _, err := net.ParseCIDR(nodeIP)
51 53
 		if err != nil {
... ...
@@ -59,11 +63,19 @@ func (store *AttachmentStore) ResetAttachments(attachments map[string]string) er
59 59
 
60 60
 // ClearAttachments clears all the mappings of network to load balancer IP Address.
61 61
 func (store *AttachmentStore) ClearAttachments() {
62
+	store.Lock()
63
+	defer store.Unlock()
64
+	store.clearAttachments()
65
+}
66
+
67
+func (store *AttachmentStore) clearAttachments() {
62 68
 	store.networkToNodeLBIP = make(map[string]net.IP)
63 69
 }
64 70
 
65 71
 // GetIPForNetwork return the load balancer IP address for the given network.
66 72
 func (store *AttachmentStore) GetIPForNetwork(networkID string) (net.IP, bool) {
73
+	store.Lock()
74
+	defer store.Unlock()
67 75
 	ip, exists := store.networkToNodeLBIP[networkID]
68 76
 	return ip, exists
69 77
 }