Browse code

Merge pull request #7918 from smarterclayton/improve_lease_message

Merged by openshift-bot

OpenShift Bot authored on 2016/06/16 06:30:47
Showing 7 changed files
... ...
@@ -85,7 +85,7 @@ function exectest() {
85 85
 		exit 0
86 86
 	else
87 87
 		os::text::print_red "failed  $1"
88
-		echo "${out}"
88
+		echo "${out:-}"
89 89
 
90 90
 		exit 1
91 91
 	fi
... ...
@@ -48,7 +48,7 @@ func initControllerRoutes(root *restful.WebService, path string, canStart bool,
48 48
 	root.Route(root.DELETE(path).To(func(req *restful.Request, resp *restful.Response) {
49 49
 		resp.ResponseWriter.WriteHeader(http.StatusAccepted)
50 50
 		fmt.Fprintf(resp, "terminating")
51
-		plug.Stop()
51
+		plug.Stop(nil)
52 52
 	}).Doc("Stop the master").
53 53
 		Returns(http.StatusAccepted, "if the master will stop", nil).
54 54
 		Produces(restful.MIME_JSON))
... ...
@@ -520,8 +520,10 @@ func startControllers(oc *origin.MasterConfig, kc *kubernetes.MasterConfig) erro
520 520
 		// when a manual shutdown (DELETE /controllers) or lease lost occurs, the process should exit
521 521
 		// this ensures no code is still running as a controller, and allows a process manager to reset
522 522
 		// the controller to come back into a candidate state and compete for the lease
523
-		oc.ControllerPlug.WaitForStop()
524
-		glog.Fatalf("Controller shutdown requested")
523
+		if err := oc.ControllerPlug.WaitForStop(); err != nil {
524
+			glog.Fatalf("Controller shutdown due to lease being lost: %v", err)
525
+		}
526
+		glog.Fatalf("Controller graceful shutdown requested")
525 527
 	}()
526 528
 
527 529
 	oc.ControllerPlug.WaitForStart()
... ...
@@ -13,12 +13,13 @@ type Plug interface {
13 13
 	Start()
14 14
 	// Ends operation of the plug and unblocks WaitForStop()
15 15
 	// May be invoked multiple times but only the first invocation has
16
-	// an effect. Calling Stop() before Start() is undefined.
17
-	Stop()
16
+	// an effect. Calling Stop() before Start() is undefined. An error
17
+	// may be returned with the stop.
18
+	Stop(err error)
18 19
 	// Blocks until Start() is invoked
19 20
 	WaitForStart()
20 21
 	// Blocks until Stop() is invoked
21
-	WaitForStop()
22
+	WaitForStop() error
22 23
 	// Returns true if Start() has been invoked
23 24
 	IsStarted() bool
24 25
 }
... ...
@@ -28,14 +29,14 @@ type plug struct {
28 28
 	start   sync.Once
29 29
 	stop    sync.Once
30 30
 	startCh chan struct{}
31
-	stopCh  chan struct{}
31
+	stopCh  chan error
32 32
 }
33 33
 
34 34
 // New returns a new plug that can begin in the Started state.
35 35
 func New(started bool) Plug {
36 36
 	p := &plug{
37 37
 		startCh: make(chan struct{}),
38
-		stopCh:  make(chan struct{}),
38
+		stopCh:  make(chan error, 1),
39 39
 	}
40 40
 	if started {
41 41
 		p.Start()
... ...
@@ -47,8 +48,13 @@ func (p *plug) Start() {
47 47
 	p.start.Do(func() { close(p.startCh) })
48 48
 }
49 49
 
50
-func (p *plug) Stop() {
51
-	p.stop.Do(func() { close(p.stopCh) })
50
+func (p *plug) Stop(err error) {
51
+	p.stop.Do(func() {
52
+		if err != nil {
53
+			p.stopCh <- err
54
+		}
55
+		close(p.stopCh)
56
+	})
52 57
 }
53 58
 
54 59
 func (p *plug) IsStarted() bool {
... ...
@@ -64,16 +70,21 @@ func (p *plug) WaitForStart() {
64 64
 	<-p.startCh
65 65
 }
66 66
 
67
-func (p *plug) WaitForStop() {
68
-	<-p.stopCh
67
+func (p *plug) WaitForStop() error {
68
+	err, ok := <-p.stopCh
69
+	if !ok {
70
+		return nil
71
+	}
72
+	return err
69 73
 }
70 74
 
71 75
 // Leaser controls access to a lease
72 76
 type Leaser interface {
73 77
 	// AcquireAndHold tries to acquire the lease and hold it until it expires, the lease is deleted,
74
-	// or we observe another party take the lease. The notify channel will be sent a value
75
-	// when the lease is held, and closed when the lease is lost.
76
-	AcquireAndHold(chan struct{})
78
+	// or we observe another party take the lease. The notify channel will be sent a nil value
79
+	// when the lease is held, and closed when the lease is lost. If an error is sent the lease
80
+	// is also considered lost.
81
+	AcquireAndHold(chan error)
77 82
 	Release()
78 83
 }
79 84
 
... ...
@@ -96,22 +107,31 @@ func NewLeased(leaser Leaser) *Leased {
96 96
 }
97 97
 
98 98
 // Stop releases the acquired lease
99
-func (l *Leased) Stop() {
99
+func (l *Leased) Stop(err error) {
100 100
 	l.leaser.Release()
101
-	l.Plug.Stop()
101
+	l.Plug.Stop(err)
102 102
 }
103 103
 
104 104
 // Run tries to acquire and hold a lease, invoking Start()
105 105
 // when the lease is held and invoking Stop() when the lease
106
-// is lost.
107
-func (l *Leased) Run() {
108
-	ch := make(chan struct{}, 1)
106
+// is lost. If the lease was lost gracefully, nil is returned.
107
+// If the lease was lost due to an error, the error is returned.
108
+func (l *Leased) Run() error {
109
+	ch := make(chan error, 1)
109 110
 	go l.leaser.AcquireAndHold(ch)
110
-	defer l.Stop()
111
+	var err error
112
+	defer l.Stop(err)
111 113
 	for {
112
-		_, ok := <-ch
114
+		var ok bool
115
+		err, ok = <-ch
113 116
 		if !ok {
114
-			return
117
+			return nil
118
+		}
119
+		if err != nil {
120
+			for range ch {
121
+				// read the rest of the channel
122
+			}
123
+			return err
115 124
 		}
116 125
 		l.Start()
117 126
 	}
... ...
@@ -19,7 +19,7 @@ type Leaser interface {
19 19
 	// lease is acquired, and the provided channel will be closed when the lease is lost. If the
20 20
 	// function returns true, the lease will be released on exit. If the function returns false,
21 21
 	// the lease will be held.
22
-	AcquireAndHold(chan struct{})
22
+	AcquireAndHold(chan error)
23 23
 	// Release returns any active leases
24 24
 	Release()
25 25
 }
... ...
@@ -54,7 +54,7 @@ func NewEtcd(client *etcdclient.Client, key, value string, ttl uint64) Leaser {
54 54
 		value:  value,
55 55
 		ttl:    ttl,
56 56
 
57
-		waitFraction:         0.75,
57
+		waitFraction:         0.66,
58 58
 		pauseInterval:        time.Second,
59 59
 		maxRetries:           10,
60 60
 		minimumRetryInterval: 100 * time.Millisecond,
... ...
@@ -62,7 +62,7 @@ func NewEtcd(client *etcdclient.Client, key, value string, ttl uint64) Leaser {
62 62
 }
63 63
 
64 64
 // AcquireAndHold implements an acquire and release of a lease.
65
-func (e *Etcd) AcquireAndHold(notify chan struct{}) {
65
+func (e *Etcd) AcquireAndHold(notify chan error) {
66 66
 	for {
67 67
 		ok, ttl, index, err := e.tryAcquire()
68 68
 		if err != nil {
... ...
@@ -76,12 +76,12 @@ func (e *Etcd) AcquireAndHold(notify chan struct{}) {
76 76
 		}
77 77
 
78 78
 		// notify
79
-		notify <- struct{}{}
79
+		notify <- nil
80 80
 		defer close(notify)
81 81
 
82 82
 		// hold the lease
83 83
 		if err := e.tryHold(ttl, index); err != nil {
84
-			utilruntime.HandleError(err)
84
+			notify <- err
85 85
 		}
86 86
 		break
87 87
 	}
... ...
@@ -3,6 +3,7 @@
3 3
 package integration
4 4
 
5 5
 import (
6
+	"strings"
6 7
 	"testing"
7 8
 	"time"
8 9
 
... ...
@@ -27,12 +28,15 @@ func TestLeaderLeaseAcquire(t *testing.T) {
27 27
 	}()
28 28
 
29 29
 	lease := leaderlease.NewEtcd(client, key, "holder", 10)
30
-	ch := make(chan struct{})
30
+	ch := make(chan error, 1)
31 31
 	go lease.AcquireAndHold(ch)
32 32
 
33 33
 	<-ch
34 34
 	glog.Infof("Lease acquired")
35 35
 	close(held)
36
+	if err, ok := <-ch; err == nil || !ok || !strings.Contains(err.Error(), "the lease has been lost") {
37
+		t.Errorf("Expected error and open channel when lease was swapped: %v %t", err, ok)
38
+	}
36 39
 	<-ch
37 40
 	glog.Infof("Lease lost")
38 41
 
... ...
@@ -65,12 +69,15 @@ func TestLeaderLeaseWait(t *testing.T) {
65 65
 	}()
66 66
 
67 67
 	lease := leaderlease.NewEtcd(client, key, "holder", 10)
68
-	ch := make(chan struct{})
68
+	ch := make(chan error, 1)
69 69
 	go lease.AcquireAndHold(ch)
70 70
 
71 71
 	<-ch
72 72
 	glog.Infof("Lease acquired")
73 73
 	close(held)
74
+	if err, ok := <-ch; err == nil || !ok || !strings.Contains(err.Error(), "the lease has been lost") {
75
+		t.Errorf("Expected error and open channel when lease was swapped: %v %t", err, ok)
76
+	}
74 77
 	<-ch
75 78
 	glog.Infof("Lease lost")
76 79
 
... ...
@@ -102,12 +109,15 @@ func TestLeaderLeaseSwapWhileWaiting(t *testing.T) {
102 102
 	}()
103 103
 
104 104
 	lease := leaderlease.NewEtcd(client, key, "other", 10)
105
-	ch := make(chan struct{})
105
+	ch := make(chan error, 1)
106 106
 	go lease.AcquireAndHold(ch)
107 107
 
108 108
 	<-ch
109 109
 	glog.Infof("Lease acquired")
110 110
 	lease.Release()
111
+	if err, ok := <-ch; err == nil || !ok || !strings.Contains(err.Error(), "the lease has been lost") {
112
+		t.Errorf("Expected error and open channel when lease was swapped: %v %t", err, ok)
113
+	}
111 114
 	<-ch
112 115
 	glog.Infof("Lease gone")
113 116
 }
... ...
@@ -131,7 +141,7 @@ func TestLeaderLeaseReacquire(t *testing.T) {
131 131
 	}()
132 132
 
133 133
 	lease := leaderlease.NewEtcd(client, key, "holder", 1)
134
-	ch := make(chan struct{})
134
+	ch := make(chan error, 1)
135 135
 	go lease.AcquireAndHold(ch)
136 136
 
137 137
 	<-ch
... ...
@@ -7,7 +7,6 @@ import (
7 7
 	"net/url"
8 8
 	"os"
9 9
 	"path"
10
-	"testing"
11 10
 	"time"
12 11
 
13 12
 	"github.com/golang/glog"
... ...
@@ -41,15 +40,6 @@ const ServiceAccountWaitTimeout = 30 * time.Second
41 41
 // is available for the admission control cache to catch up and allow pod creation
42 42
 const PodCreationWaitTimeout = 10 * time.Second
43 43
 
44
-// RequireServer verifies if the etcd and the OpenShift server are
45
-// available and you can successfully connect to them.
46
-func RequireServer(t *testing.T) {
47
-	util.RequireEtcd(t)
48
-	if _, err := util.GetClusterAdminClient(util.KubeConfigPath()); err != nil {
49
-		os.Exit(1)
50
-	}
51
-}
52
-
53 44
 // FindAvailableBindAddress returns a bind address on 127.0.0.1 with a free port in the low-high range.
54 45
 // If lowPort is 0, an ephemeral port is allocated.
55 46
 func FindAvailableBindAddress(lowPort, highPort int) (string, error) {