Browse code

Merge pull request #38522 from cpuguy83/fix_timers

Make sure timers are stopped after use.

Sebastiaan van Stijn authored on 2019/06/07 20:16:46
Showing 15 changed files
... ...
@@ -174,7 +174,9 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
174 174
 
175 175
 		if !onlyPastEvents {
176 176
 			dur := until.Sub(now)
177
-			timeout = time.After(dur)
177
+			timer := time.NewTimer(dur)
178
+			defer timer.Stop()
179
+			timeout = timer.C
178 180
 		}
179 181
 	}
180 182
 
... ...
@@ -400,10 +400,14 @@ func shutdownDaemon(d *daemon.Daemon) {
400 400
 		logrus.Debug("Clean shutdown succeeded")
401 401
 		return
402 402
 	}
403
+
404
+	timeout := time.NewTimer(time.Duration(shutdownTimeout) * time.Second)
405
+	defer timeout.Stop()
406
+
403 407
 	select {
404 408
 	case <-ch:
405 409
 		logrus.Debug("Clean shutdown succeeded")
406
-	case <-time.After(time.Duration(shutdownTimeout) * time.Second):
410
+	case <-timeout.C:
407 411
 		logrus.Error("Force shutdown daemon")
408 412
 	}
409 413
 }
... ...
@@ -33,8 +33,11 @@ func (container *Container) Reset(lock bool) {
33 33
 				container.LogCopier.Wait()
34 34
 				close(exit)
35 35
 			}()
36
+
37
+			timer := time.NewTimer(loggerCloseTimeout)
38
+			defer timer.Stop()
36 39
 			select {
37
-			case <-time.After(loggerCloseTimeout):
40
+			case <-timer.C:
38 41
 				logrus.Warn("Logger didn't exit in time: logs may be truncated")
39 42
 			case <-exit:
40 43
 			}
... ...
@@ -186,8 +186,11 @@ func (c *Cluster) Start() error {
186 186
 	}
187 187
 	c.nr = nr
188 188
 
189
+	timer := time.NewTimer(swarmConnectTimeout)
190
+	defer timer.Stop()
191
+
189 192
 	select {
190
-	case <-time.After(swarmConnectTimeout):
193
+	case <-timer.C:
191 194
 		logrus.Error("swarm component could not be started before timeout was reached")
192 195
 	case err := <-nr.Ready():
193 196
 		if err != nil {
... ...
@@ -194,8 +194,11 @@ func (c *Cluster) Join(req types.JoinRequest) error {
194 194
 	c.nr = nr
195 195
 	c.mu.Unlock()
196 196
 
197
+	timeout := time.NewTimer(swarmConnectTimeout)
198
+	defer timeout.Stop()
199
+
197 200
 	select {
198
-	case <-time.After(swarmConnectTimeout):
201
+	case <-timeout.C:
199 202
 		return errSwarmJoinTimeoutReached
200 203
 	case err := <-nr.Ready():
201 204
 		if err != nil {
... ...
@@ -486,12 +486,14 @@ func (daemon *Daemon) restore() error {
486 486
 			// ignore errors here as this is a best effort to wait for children to be
487 487
 			//   running before we try to start the container
488 488
 			children := daemon.children(c)
489
-			timeout := time.After(5 * time.Second)
489
+			timeout := time.NewTimer(5 * time.Second)
490
+			defer timeout.Stop()
491
+
490 492
 			for _, child := range children {
491 493
 				if notifier, exists := restartContainers[child]; exists {
492 494
 					select {
493 495
 					case <-notifier:
494
-					case <-timeout:
496
+					case <-timeout.C:
495 497
 					}
496 498
 				}
497 499
 			}
... ...
@@ -609,6 +611,7 @@ func (daemon *Daemon) waitForNetworks(c *container.Container) {
609 609
 	if daemon.discoveryWatcher == nil {
610 610
 		return
611 611
 	}
612
+
612 613
 	// Make sure if the container has a network that requires discovery that the discovery service is available before starting
613 614
 	for netName := range c.NetworkSettings.Networks {
614 615
 		// If we get `ErrNoSuchNetwork` here, we can assume that it is due to discovery not being ready
... ...
@@ -617,13 +620,19 @@ func (daemon *Daemon) waitForNetworks(c *container.Container) {
617 617
 			if _, ok := err.(libnetwork.ErrNoSuchNetwork); !ok {
618 618
 				continue
619 619
 			}
620
+
620 621
 			// use a longish timeout here due to some slowdowns in libnetwork if the k/v store is on anything other than --net=host
621 622
 			// FIXME: why is this slow???
623
+			dur := 60 * time.Second
624
+			timer := time.NewTimer(dur)
625
+
622 626
 			logrus.Debugf("Container %s waiting for network to be ready", c.Name)
623 627
 			select {
624 628
 			case <-daemon.discoveryWatcher.ReadyCh():
625
-			case <-time.After(60 * time.Second):
629
+			case <-timer.C:
626 630
 			}
631
+			timer.Stop()
632
+
627 633
 			return
628 634
 		}
629 635
 	}
... ...
@@ -673,10 +682,14 @@ func (daemon *Daemon) DaemonLeavesCluster() {
673 673
 	// This is called also on graceful daemon shutdown. We need to
674 674
 	// wait, because the ingress release has to happen before the
675 675
 	// network controller is stopped.
676
+
676 677
 	if done, err := daemon.ReleaseIngress(); err == nil {
678
+		timeout := time.NewTimer(5 * time.Second)
679
+		defer timeout.Stop()
680
+
677 681
 		select {
678 682
 		case <-done:
679
-		case <-time.After(5 * time.Second):
683
+		case <-timeout.C:
680 684
 			logrus.Warn("timeout while waiting for ingress network removal")
681 685
 		}
682 686
 	} else {
... ...
@@ -148,12 +148,14 @@ func (d *daemonDiscoveryReloader) initHeartbeat(address string) error {
148 148
 	// Setup a short ticker until the first heartbeat has succeeded
149 149
 	t := time.NewTicker(500 * time.Millisecond)
150 150
 	defer t.Stop()
151
+
151 152
 	// timeout makes sure that after a period of time we stop being so aggressive trying to reach the discovery service
152
-	timeout := time.After(60 * time.Second)
153
+	timeout := time.NewTimer(60 * time.Second)
154
+	defer timeout.Stop()
153 155
 
154 156
 	for {
155 157
 		select {
156
-		case <-timeout:
158
+		case <-timeout.C:
157 159
 			return errors.New("timeout waiting for initial discovery")
158 160
 		case <-d.term:
159 161
 			return errors.New("terminated")
... ...
@@ -23,7 +23,7 @@ import (
23 23
 )
24 24
 
25 25
 // Seconds to wait after sending TERM before trying KILL
26
-const termProcessTimeout = 10
26
+const termProcessTimeout = 10 * time.Second
27 27
 
28 28
 func (d *Daemon) registerExecCommand(container *container.Container, config *exec.Config) {
29 29
 	// Storing execs in container in order to kill them gracefully whenever the container is stopped or removed.
... ...
@@ -277,9 +277,13 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R
277 277
 	case <-ctx.Done():
278 278
 		logrus.Debugf("Sending TERM signal to process %v in container %v", name, c.ID)
279 279
 		d.containerd.SignalProcess(ctx, c.ID, name, int(signal.SignalMap["TERM"]))
280
+
281
+		timeout := time.NewTimer(termProcessTimeout)
282
+		defer timeout.Stop()
283
+
280 284
 		select {
281
-		case <-time.After(termProcessTimeout * time.Second):
282
-			logrus.Infof("Container %v, process %v failed to exit within %d seconds of signal TERM - using the force", c.ID, name, termProcessTimeout)
285
+		case <-timeout.C:
286
+			logrus.Infof("Container %v, process %v failed to exit within %v of signal TERM - using the force", c.ID, name, termProcessTimeout)
283 287
 			d.containerd.SignalProcess(ctx, c.ID, name, int(signal.SignalMap["KILL"]))
284 288
 		case <-attachErr:
285 289
 			// TERM signal worked
... ...
@@ -187,12 +187,18 @@ func handleProbeResult(d *Daemon, c *container.Container, result *types.Healthch
187 187
 func monitor(d *Daemon, c *container.Container, stop chan struct{}, probe probe) {
188 188
 	probeTimeout := timeoutWithDefault(c.Config.Healthcheck.Timeout, defaultProbeTimeout)
189 189
 	probeInterval := timeoutWithDefault(c.Config.Healthcheck.Interval, defaultProbeInterval)
190
+
191
+	intervalTimer := time.NewTimer(probeInterval)
192
+	defer intervalTimer.Stop()
193
+
190 194
 	for {
195
+		intervalTimer.Reset(probeInterval)
196
+
191 197
 		select {
192 198
 		case <-stop:
193 199
 			logrus.Debugf("Stop healthcheck monitoring for container %s (received while idle)", c.ID)
194 200
 			return
195
-		case <-time.After(probeInterval):
201
+		case <-intervalTimer.C:
196 202
 			logrus.Debugf("Running health check for container %s ...", c.ID)
197 203
 			startTime := time.Now()
198 204
 			ctx, cancelProbe := context.WithTimeout(context.Background(), probeTimeout)
... ...
@@ -38,13 +38,16 @@ func (daemon *Daemon) ContainerExecResize(name string, height, width int) error
38 38
 	if err != nil {
39 39
 		return err
40 40
 	}
41
+
41 42
 	// TODO: the timeout is hardcoded here, it would be more flexible to make it
42 43
 	// a parameter in resize request context, which would need API changes.
43
-	timeout := 10 * time.Second
44
+	timeout := time.NewTimer(10 * time.Second)
45
+	defer timeout.Stop()
46
+
44 47
 	select {
45 48
 	case <-ec.Started:
46 49
 		return daemon.containerd.ResizeTerminal(context.Background(), ec.ContainerID, ec.ID, width, height)
47
-	case <-time.After(timeout):
50
+	case <-timeout.C:
48 51
 		return fmt.Errorf("timeout waiting for exec session ready")
49 52
 	}
50 53
 }
... ...
@@ -89,8 +89,11 @@ func Start(ctx context.Context, rootDir, stateDir string, opts ...DaemonOpt) (Da
89 89
 
90 90
 	go r.monitorDaemon(ctx)
91 91
 
92
+	timeout := time.NewTimer(startupTimeout)
93
+	defer timeout.Stop()
94
+
92 95
 	select {
93
-	case <-time.After(startupTimeout):
96
+	case <-timeout.C:
94 97
 		return nil, errors.New("timeout waiting for containerd to start")
95 98
 	case err := <-r.daemonStartCh:
96 99
 		if err != nil {
... ...
@@ -101,8 +104,11 @@ func Start(ctx context.Context, rootDir, stateDir string, opts ...DaemonOpt) (Da
101 101
 	return r, nil
102 102
 }
103 103
 func (r *remote) WaitTimeout(d time.Duration) error {
104
+	timeout := time.NewTimer(d)
105
+	defer timeout.Stop()
106
+
104 107
 	select {
105
-	case <-time.After(d):
108
+	case <-timeout.C:
106 109
 		return errors.New("timeout waiting for containerd to stop")
107 110
 	case <-r.daemonStopCh:
108 111
 	}
... ...
@@ -230,7 +236,8 @@ func (r *remote) monitorDaemon(ctx context.Context) {
230 230
 		transientFailureCount = 0
231 231
 		client                *containerd.Client
232 232
 		err                   error
233
-		delay                 <-chan time.Time
233
+		delay                 time.Duration
234
+		timer                 = time.NewTimer(0)
234 235
 		started               bool
235 236
 	)
236 237
 
... ...
@@ -245,19 +252,25 @@ func (r *remote) monitorDaemon(ctx context.Context) {
245 245
 		r.platformCleanup()
246 246
 
247 247
 		close(r.daemonStopCh)
248
+		timer.Stop()
248 249
 	}()
249 250
 
251
+	// ensure no races on sending to timer.C even though there is a 0 duration.
252
+	if !timer.Stop() {
253
+		<-timer.C
254
+	}
255
+
250 256
 	for {
251
-		if delay != nil {
252
-			select {
253
-			case <-ctx.Done():
254
-				r.logger.Info("stopping healthcheck following graceful shutdown")
255
-				if client != nil {
256
-					client.Close()
257
-				}
258
-				return
259
-			case <-delay:
257
+		timer.Reset(delay)
258
+
259
+		select {
260
+		case <-ctx.Done():
261
+			r.logger.Info("stopping healthcheck following graceful shutdown")
262
+			if client != nil {
263
+				client.Close()
260 264
 			}
265
+			return
266
+		case <-timer.C:
261 267
 		}
262 268
 
263 269
 		if r.daemonPid == -1 {
... ...
@@ -277,14 +290,14 @@ func (r *remote) monitorDaemon(ctx context.Context) {
277 277
 					return
278 278
 				}
279 279
 				r.logger.WithError(err).Error("failed restarting containerd")
280
-				delay = time.After(50 * time.Millisecond)
280
+				delay = 50 * time.Millisecond
281 281
 				continue
282 282
 			}
283 283
 
284 284
 			client, err = containerd.New(r.GRPC.Address, containerd.WithTimeout(60*time.Second))
285 285
 			if err != nil {
286 286
 				r.logger.WithError(err).Error("failed connecting to containerd")
287
-				delay = time.After(100 * time.Millisecond)
287
+				delay = 100 * time.Millisecond
288 288
 				continue
289 289
 			}
290 290
 		}
... ...
@@ -300,7 +313,7 @@ func (r *remote) monitorDaemon(ctx context.Context) {
300 300
 				}
301 301
 
302 302
 				transientFailureCount = 0
303
-				delay = time.After(500 * time.Millisecond)
303
+				delay = 500 * time.Millisecond
304 304
 				continue
305 305
 			}
306 306
 
... ...
@@ -308,7 +321,7 @@ func (r *remote) monitorDaemon(ctx context.Context) {
308 308
 
309 309
 			transientFailureCount++
310 310
 			if transientFailureCount < maxConnectionRetryCount || system.IsProcessAlive(r.daemonPid) {
311
-				delay = time.After(time.Duration(transientFailureCount) * 200 * time.Millisecond)
311
+				delay = time.Duration(transientFailureCount) * 200 * time.Millisecond
312 312
 				continue
313 313
 			}
314 314
 			client.Close()
... ...
@@ -321,7 +334,7 @@ func (r *remote) monitorDaemon(ctx context.Context) {
321 321
 		}
322 322
 
323 323
 		r.daemonPid = -1
324
-		delay = nil
324
+		delay = 0
325 325
 		transientFailureCount = 0
326 326
 	}
327 327
 }
... ...
@@ -146,9 +146,18 @@ func (w *filePoller) sendErr(e error, chClose <-chan struct{}) error {
146 146
 // upon finding changes to a file or errors, sendEvent/sendErr is called
147 147
 func (w *filePoller) watch(f *os.File, lastFi os.FileInfo, chClose chan struct{}) {
148 148
 	defer f.Close()
149
+
150
+	timer := time.NewTimer(watchWaitTime)
151
+	if !timer.Stop() {
152
+		<-timer.C
153
+	}
154
+	defer timer.Stop()
155
+
149 156
 	for {
157
+		timer.Reset(watchWaitTime)
158
+
150 159
 		select {
151
-		case <-time.After(watchWaitTime):
160
+		case <-timer.C:
152 161
 		case <-chClose:
153 162
 			logrus.Debugf("watch for %s closed", f.Name())
154 163
 			return
... ...
@@ -107,9 +107,12 @@ func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg
107 107
 
108 108
 	// send under a select as to not block if the receiver is unavailable
109 109
 	if p.timeout > 0 {
110
+		timeout := time.NewTimer(p.timeout)
111
+		defer timeout.Stop()
112
+
110 113
 		select {
111 114
 		case sub <- v:
112
-		case <-time.After(p.timeout):
115
+		case <-timeout.C:
113 116
 		}
114 117
 		return
115 118
 	}
... ...
@@ -146,6 +146,8 @@ func (pm *Manager) restore(p *v2.Plugin, c *controller) error {
146 146
 	return nil
147 147
 }
148 148
 
149
+const shutdownTimeout = 10 * time.Second
150
+
149 151
 func shutdownPlugin(p *v2.Plugin, ec chan bool, executor Executor) {
150 152
 	pluginID := p.GetID()
151 153
 
... ...
@@ -153,19 +155,26 @@ func shutdownPlugin(p *v2.Plugin, ec chan bool, executor Executor) {
153 153
 	if err != nil {
154 154
 		logrus.Errorf("Sending SIGTERM to plugin failed with error: %v", err)
155 155
 	} else {
156
+
157
+		timeout := time.NewTimer(shutdownTimeout)
158
+		defer timeout.Stop()
159
+
156 160
 		select {
157 161
 		case <-ec:
158 162
 			logrus.Debug("Clean shutdown of plugin")
159
-		case <-time.After(time.Second * 10):
163
+		case <-timeout.C:
160 164
 			logrus.Debug("Force shutdown plugin")
161 165
 			if err := executor.Signal(pluginID, int(unix.SIGKILL)); err != nil {
162 166
 				logrus.Errorf("Sending SIGKILL to plugin failed with error: %v", err)
163 167
 			}
168
+
169
+			timeout.Reset(shutdownTimeout)
170
+
164 171
 			select {
165 172
 			case <-ec:
166 173
 				logrus.Debug("SIGKILL plugin shutdown")
167
-			case <-time.After(time.Second * 10):
168
-				logrus.Debug("Force shutdown plugin FAILED")
174
+			case <-timeout.C:
175
+				logrus.WithField("plugin", p.Name).Warn("Force shutdown plugin FAILED")
169 176
 			}
170 177
 		}
171 178
 	}
... ...
@@ -107,11 +107,14 @@ func (rm *restartManager) ShouldRestart(exitCode uint32, hasBeenManuallyStopped
107 107
 
108 108
 	ch := make(chan error)
109 109
 	go func() {
110
+		timeout := time.NewTimer(rm.timeout)
111
+		defer timeout.Stop()
112
+
110 113
 		select {
111 114
 		case <-rm.cancel:
112 115
 			ch <- ErrRestartCanceled
113 116
 			close(ch)
114
-		case <-time.After(rm.timeout):
117
+		case <-timeout.C:
115 118
 			rm.Lock()
116 119
 			close(ch)
117 120
 			rm.active = false