Browse code

Merge pull request #37604 from dperny/task-wait-for-attachments

Block task starting until node attachments are ready

Tibor Vass authored on 2018/08/21 10:14:44
Showing 4 changed files
... ...
@@ -32,6 +32,9 @@ import (
32 32
 	"golang.org/x/time/rate"
33 33
 )
34 34
 
35
+// nodeAttachmentReadyInterval is the interval to poll
36
+const nodeAttachmentReadyInterval = 100 * time.Millisecond
37
+
35 38
 // containerAdapter conducts remote operations for a container. All calls
36 39
 // are mostly naked calls to the client API, seeded with information from
37 40
 // containerConfig.
... ...
@@ -146,6 +149,55 @@ func (c *containerAdapter) pullImage(ctx context.Context) error {
146 146
 	return nil
147 147
 }
148 148
 
149
+// waitNodeAttachments validates that NetworkAttachments exist on this node
150
+// for every network in use by this task. It blocks until the network
151
+// attachments are ready, or the context times out. If it returns nil, then the
152
+// node's network attachments are all there.
153
+func (c *containerAdapter) waitNodeAttachments(ctx context.Context) error {
154
+	// to do this, we're going to get the attachment store and try getting the
155
+	// IP address for each network. if any network comes back not existing,
156
+	// we'll wait and try again.
157
+	attachmentStore := c.backend.GetAttachmentStore()
158
+	if attachmentStore == nil {
159
+		return fmt.Errorf("error getting attachment store")
160
+	}
161
+
162
+	// essentially, we're long-polling here. this is really sub-optimal, but a
163
+	// better solution based off signaling channels would require a more
164
+	// substantial rearchitecture and probably not be worth our time in terms
165
+	// of performance gains.
166
+	poll := time.NewTicker(nodeAttachmentReadyInterval)
167
+	defer poll.Stop()
168
+	for {
169
+		// set a flag ready to true. if we try to get a network IP that doesn't
170
+		// exist yet, we will set this flag to "false"
171
+		ready := true
172
+		for _, attachment := range c.container.networksAttachments {
173
+			// we only need node attachments (IP address) for overlay networks
174
+			// TODO(dperny): unsure if this will work with other network
175
+			// drivers, but i also don't think other network drivers use the
176
+			// node attachment IP address.
177
+			if attachment.Network.DriverState.Name == "overlay" {
178
+				if _, exists := attachmentStore.GetIPForNetwork(attachment.Network.ID); !exists {
179
+					ready = false
180
+				}
181
+			}
182
+		}
183
+
184
+		// if everything is ready here, then we can just return no error
185
+		if ready {
186
+			return nil
187
+		}
188
+
189
+		// otherwise, try polling again, or wait for context canceled.
190
+		select {
191
+		case <-ctx.Done():
192
+			return fmt.Errorf("node is missing network attachments, ip addresses may be exhausted")
193
+		case <-poll.C:
194
+		}
195
+	}
196
+}
197
+
149 198
 func (c *containerAdapter) createNetworks(ctx context.Context) error {
150 199
 	for name := range c.container.networksAttachments {
151 200
 		ncr, err := c.container.networkCreateRequest(name)
152 201
new file mode 100644
... ...
@@ -0,0 +1,139 @@
0
+package container // import "github.com/docker/docker/daemon/cluster/executor/container"
1
+
2
+import (
3
+	"testing"
4
+
5
+	"context"
6
+	"time"
7
+
8
+	"github.com/docker/docker/daemon"
9
+	"github.com/docker/swarmkit/api"
10
+)
11
+
12
+// TestWaitNodeAttachment tests that the waitNodeAttachment method successfully
13
+// blocks until the required node attachment becomes available.
14
+func TestWaitNodeAttachment(t *testing.T) {
15
+	emptyDaemon := &daemon.Daemon{}
16
+
17
+	// the daemon creates an attachment store as an object, which means it's
18
+	// initialized to an empty store by default. get that attachment store here
19
+	// and add some attachments to it
20
+	attachmentStore := emptyDaemon.GetAttachmentStore()
21
+
22
+	// create a set of attachments to put into the attahcment store
23
+	attachments := map[string]string{
24
+		"network1": "10.1.2.3/24",
25
+	}
26
+
27
+	// this shouldn't fail, but check it anyway just in case
28
+	err := attachmentStore.ResetAttachments(attachments)
29
+	if err != nil {
30
+		t.Fatalf("error resetting attachments: %v", err)
31
+	}
32
+
33
+	// create a containerConfig to put in the adapter. we don't need the task,
34
+	// actually; only the networkAttachments are needed.
35
+	container := &containerConfig{
36
+		task: nil,
37
+		networksAttachments: map[string]*api.NetworkAttachment{
38
+			// network1 is already present in the attachment store.
39
+			"network1": {
40
+				Network: &api.Network{
41
+					ID: "network1",
42
+					DriverState: &api.Driver{
43
+						Name: "overlay",
44
+					},
45
+				},
46
+			},
47
+			// network2 is not yet present in the attachment store, and we
48
+			// should block while waiting for it.
49
+			"network2": {
50
+				Network: &api.Network{
51
+					ID: "network2",
52
+					DriverState: &api.Driver{
53
+						Name: "overlay",
54
+					},
55
+				},
56
+			},
57
+			// localnetwork is not and will never be in the attachment store,
58
+			// but we should not block on it, because it is not an overlay
59
+			// network
60
+			"localnetwork": {
61
+				Network: &api.Network{
62
+					ID: "localnetwork",
63
+					DriverState: &api.Driver{
64
+						Name: "bridge",
65
+					},
66
+				},
67
+			},
68
+		},
69
+	}
70
+
71
+	// we don't create an adapter using the newContainerAdapter package,
72
+	// because it does a bunch of checks and validations. instead, create one
73
+	// "from scratch" so we only have the fields we need.
74
+	adapter := &containerAdapter{
75
+		backend:   emptyDaemon,
76
+		container: container,
77
+	}
78
+
79
+	// create a context to do call the method with
80
+	ctx, cancel := context.WithCancel(context.Background())
81
+	defer cancel()
82
+
83
+	// create a channel to allow the goroutine that we run the method call in
84
+	// to signal that it's done.
85
+	doneChan := make(chan struct{})
86
+
87
+	// store the error return value of waitNodeAttachments in this variable
88
+	var waitNodeAttachmentsErr error
89
+	// NOTE(dperny): be careful running goroutines in test code. if a test
90
+	// terminates with ie t.Fatalf or a failed requirement, runtime.Goexit gets
91
+	// called, which does run defers but does not clean up child goroutines.
92
+	// we defer canceling the context here, which should stop this goroutine
93
+	// from leaking
94
+	go func() {
95
+		waitNodeAttachmentsErr = adapter.waitNodeAttachments(ctx)
96
+		// signal that we've completed
97
+		close(doneChan)
98
+	}()
99
+
100
+	// wait 200ms to allow the waitNodeAttachments call to spin for a bit
101
+	time.Sleep(200 * time.Millisecond)
102
+	select {
103
+	case <-doneChan:
104
+		if waitNodeAttachmentsErr == nil {
105
+			t.Fatalf("waitNodeAttachments exited early with no error")
106
+		} else {
107
+			t.Fatalf(
108
+				"waitNodeAttachments exited early with an error: %v",
109
+				waitNodeAttachmentsErr,
110
+			)
111
+		}
112
+	default:
113
+		// allow falling through; this is the desired case
114
+	}
115
+
116
+	// now update the node attachments to include another network attachment
117
+	attachments["network2"] = "10.3.4.5/24"
118
+	err = attachmentStore.ResetAttachments(attachments)
119
+	if err != nil {
120
+		t.Fatalf("error resetting attachments: %v", err)
121
+	}
122
+
123
+	// now wait 200 ms for waitNodeAttachments to pick up the change
124
+	time.Sleep(200 * time.Millisecond)
125
+
126
+	// and check that waitNodeAttachments has exited with no error
127
+	select {
128
+	case <-doneChan:
129
+		if waitNodeAttachmentsErr != nil {
130
+			t.Fatalf(
131
+				"waitNodeAttachments returned an error: %v",
132
+				waitNodeAttachmentsErr,
133
+			)
134
+		}
135
+	default:
136
+		t.Fatalf("waitNodeAttachments did not exit yet, but should have")
137
+	}
138
+}
... ...
@@ -23,6 +23,10 @@ import (
23 23
 
24 24
 const defaultGossipConvergeDelay = 2 * time.Second
25 25
 
26
+// waitNodeAttachmentsTimeout defines the total period of time we should wait
27
+// for node attachments to be ready before giving up on starting a task
28
+const waitNodeAttachmentsTimeout = 30 * time.Second
29
+
26 30
 // controller implements agent.Controller against docker's API.
27 31
 //
28 32
 // Most operations against docker's API are done through the container name,
... ...
@@ -98,6 +102,25 @@ func (r *controller) Prepare(ctx context.Context) error {
98 98
 		return err
99 99
 	}
100 100
 
101
+	// Before we create networks, we need to make sure that the node has all of
102
+	// the network attachments that the task needs. This will block until that
103
+	// is the case or the context has expired.
104
+	// NOTE(dperny): Prepare doesn't time out on its own (that is, the context
105
+	// passed in does not expire after any period of time), which means if the
106
+	// node attachment never arrives (for example, if the network's IP address
107
+	// space is exhausted), then the tasks on the node will park in PREPARING
108
+	// forever (or until the node dies). To avoid this case, we create a new
109
+	// context with a fixed deadline, and give up. In normal operation, a node
110
+	// update with the node IP address should come in hot on the tail of the
111
+	// task being assigned to the node, and this should exit on the order of
112
+	// milliseconds, but to be extra conservative we'll give it 30 seconds to
113
+	// time out before giving up.
114
+	waitNodeAttachmentsContext, waitCancel := context.WithTimeout(ctx, waitNodeAttachmentsTimeout)
115
+	defer waitCancel()
116
+	if err := r.adapter.waitNodeAttachments(waitNodeAttachmentsContext); err != nil {
117
+		return err
118
+	}
119
+
101 120
 	// Make sure all the networks that the task needs are created.
102 121
 	if err := r.adapter.createNetworks(ctx); err != nil {
103 122
 		return err
... ...
@@ -2,6 +2,7 @@ package network // import "github.com/docker/docker/daemon/network"
2 2
 
3 3
 import (
4 4
 	"net"
5
+	"sync"
5 6
 
6 7
 	networktypes "github.com/docker/docker/api/types/network"
7 8
 	clustertypes "github.com/docker/docker/daemon/cluster/provider"
... ...
@@ -37,6 +38,7 @@ type EndpointSettings struct {
37 37
 
38 38
 // AttachmentStore stores the load balancer IP address for a network id.
39 39
 type AttachmentStore struct {
40
+	sync.Mutex
40 41
 	//key: networkd id
41 42
 	//value: load balancer ip address
42 43
 	networkToNodeLBIP map[string]net.IP
... ...
@@ -45,7 +47,9 @@ type AttachmentStore struct {
45 45
 // ResetAttachments clears any existing load balancer IP to network mapping and
46 46
 // sets the mapping to the given attachments.
47 47
 func (store *AttachmentStore) ResetAttachments(attachments map[string]string) error {
48
-	store.ClearAttachments()
48
+	store.Lock()
49
+	defer store.Unlock()
50
+	store.clearAttachments()
49 51
 	for nid, nodeIP := range attachments {
50 52
 		ip, _, err := net.ParseCIDR(nodeIP)
51 53
 		if err != nil {
... ...
@@ -59,11 +63,19 @@ func (store *AttachmentStore) ResetAttachments(attachments map[string]string) er
59 59
 
60 60
 // ClearAttachments clears all the mappings of network to load balancer IP Address.
61 61
 func (store *AttachmentStore) ClearAttachments() {
62
+	store.Lock()
63
+	defer store.Unlock()
64
+	store.clearAttachments()
65
+}
66
+
67
+func (store *AttachmentStore) clearAttachments() {
62 68
 	store.networkToNodeLBIP = make(map[string]net.IP)
63 69
 }
64 70
 
65 71
 // GetIPForNetwork return the load balancer IP address for the given network.
66 72
 func (store *AttachmentStore) GetIPForNetwork(networkID string) (net.IP, bool) {
73
+	store.Lock()
74
+	defer store.Unlock()
67 75
 	ip, exists := store.networkToNodeLBIP[networkID]
68 76
 	return ip, exists
69 77
 }