Browse code

Switch to using the new 'embed' package to launch etcd

Clayton Coleman authored on 2016/09/18 13:24:30
Showing 3 changed files
... ...
@@ -1,64 +1,98 @@
1 1
 package etcdserver
2 2
 
3 3
 import (
4
-	"fmt"
4
+	"net/url"
5
+	"strings"
6
+	"time"
5 7
 
8
+	"github.com/coreos/etcd/embed"
9
+	"github.com/coreos/etcd/pkg/osutil"
10
+	"github.com/coreos/etcd/pkg/types"
6 11
 	"github.com/golang/glog"
7 12
 
8 13
 	configapi "github.com/openshift/origin/pkg/cmd/server/api"
9 14
 )
10 15
 
16
+const defaultName = "openshift.local"
17
+
11 18
 // RunEtcd starts an etcd server and runs it forever
12 19
 func RunEtcd(etcdServerConfig *configapi.EtcdConfig) {
13
-	cfg := &config{
14
-		name: defaultName,
15
-		dir:  etcdServerConfig.StorageDir,
16
-
17
-		TickMs:       100,
18
-		ElectionMs:   1000,
19
-		maxSnapFiles: 5,
20
-		maxWalFiles:  5,
20
+	cfg := embed.NewConfig()
21
+	cfg.Debug = true
22
+	cfg.Name = defaultName
23
+	cfg.Dir = etcdServerConfig.StorageDir
21 24
 
22
-		initialClusterToken: "etcd-cluster",
25
+	clientTLS := configapi.UseTLS(etcdServerConfig.ServingInfo)
26
+	if clientTLS {
27
+		cfg.ClientTLSInfo.CAFile = etcdServerConfig.ServingInfo.ClientCA
28
+		cfg.ClientTLSInfo.CertFile = etcdServerConfig.ServingInfo.ServerCert.CertFile
29
+		cfg.ClientTLSInfo.KeyFile = etcdServerConfig.ServingInfo.ServerCert.KeyFile
30
+		cfg.ClientTLSInfo.ClientCertAuth = len(cfg.ClientTLSInfo.CAFile) > 0
23 31
 	}
24
-	var err error
25
-	if configapi.UseTLS(etcdServerConfig.ServingInfo) {
26
-		cfg.clientTLSInfo.CAFile = etcdServerConfig.ServingInfo.ClientCA
27
-		cfg.clientTLSInfo.CertFile = etcdServerConfig.ServingInfo.ServerCert.CertFile
28
-		cfg.clientTLSInfo.KeyFile = etcdServerConfig.ServingInfo.ServerCert.KeyFile
29
-	}
30
-	if cfg.lcurls, err = urlsFromStrings(etcdServerConfig.ServingInfo.BindAddress, cfg.clientTLSInfo); err != nil {
31
-		glog.Fatalf("Unable to build etcd client URLs: %v", err)
32
+	u, err := types.NewURLs(addressToURLs(etcdServerConfig.ServingInfo.BindAddress, clientTLS))
33
+	if err != nil {
34
+		glog.Fatalf("Unable to build etcd peer URLs: %v", err)
32 35
 	}
36
+	cfg.LCUrls = []url.URL(u)
33 37
 
34
-	if configapi.UseTLS(etcdServerConfig.PeerServingInfo) {
35
-		cfg.peerTLSInfo.CAFile = etcdServerConfig.PeerServingInfo.ClientCA
36
-		cfg.peerTLSInfo.CertFile = etcdServerConfig.PeerServingInfo.ServerCert.CertFile
37
-		cfg.peerTLSInfo.KeyFile = etcdServerConfig.PeerServingInfo.ServerCert.KeyFile
38
+	peerTLS := configapi.UseTLS(etcdServerConfig.PeerServingInfo)
39
+	if peerTLS {
40
+		cfg.PeerTLSInfo.CAFile = etcdServerConfig.PeerServingInfo.ClientCA
41
+		cfg.PeerTLSInfo.CertFile = etcdServerConfig.PeerServingInfo.ServerCert.CertFile
42
+		cfg.PeerTLSInfo.KeyFile = etcdServerConfig.PeerServingInfo.ServerCert.KeyFile
43
+		cfg.PeerTLSInfo.ClientCertAuth = len(cfg.PeerTLSInfo.CAFile) > 0
38 44
 	}
39
-	if cfg.lpurls, err = urlsFromStrings(etcdServerConfig.PeerServingInfo.BindAddress, cfg.peerTLSInfo); err != nil {
45
+	u, err = types.NewURLs(addressToURLs(etcdServerConfig.PeerServingInfo.BindAddress, peerTLS))
46
+	if err != nil {
40 47
 		glog.Fatalf("Unable to build etcd peer URLs: %v", err)
41 48
 	}
49
+	cfg.LPUrls = []url.URL(u)
42 50
 
43
-	if cfg.acurls, err = urlsFromStrings(etcdServerConfig.Address, cfg.clientTLSInfo); err != nil {
51
+	u, err = types.NewURLs(addressToURLs(etcdServerConfig.Address, clientTLS))
52
+	if err != nil {
44 53
 		glog.Fatalf("Unable to build etcd announce client URLs: %v", err)
45 54
 	}
46
-	if cfg.apurls, err = urlsFromStrings(etcdServerConfig.PeerAddress, cfg.peerTLSInfo); err != nil {
55
+	cfg.ACUrls = []url.URL(u)
56
+
57
+	u, err = types.NewURLs(addressToURLs(etcdServerConfig.PeerAddress, peerTLS))
58
+	if err != nil {
47 59
 		glog.Fatalf("Unable to build etcd announce peer URLs: %v", err)
48 60
 	}
61
+	cfg.APUrls = []url.URL(u)
49 62
 
50
-	if err := cfg.resolveUrls(); err != nil {
51
-		glog.Fatalf("Unable to resolve etcd URLs: %v", err)
52
-	}
63
+	cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
53 64
 
54
-	cfg.initialCluster = fmt.Sprintf("%s=%s", cfg.name, cfg.apurls[0].String())
65
+	osutil.HandleInterrupts()
55 66
 
56
-	stopped, err := startEtcd(cfg)
67
+	e, err := embed.StartEtcd(cfg)
57 68
 	if err != nil {
58 69
 		glog.Fatalf("Unable to start etcd: %v", err)
59 70
 	}
71
+
60 72
 	go func() {
61
-		glog.Infof("Started etcd at %s", etcdServerConfig.Address)
62
-		<-stopped
73
+		defer e.Close()
74
+
75
+		select {
76
+		case <-e.Server.ReadyNotify():
77
+			glog.Infof("Started etcd at %s", etcdServerConfig.Address)
78
+		case <-time.After(60 * time.Second):
79
+			glog.Warning("etcd took too long to start, stopped")
80
+			e.Server.Stop() // trigger a shutdown
81
+		}
82
+		glog.Fatalf("etcd has returned an error: %v", <-e.Err())
63 83
 	}()
64 84
 }
85
+
86
+// addressToURLs turns a host:port comma delimited list into an array valid
87
+// URL strings with the appropriate prefix for the TLS mode.
88
+func addressToURLs(addr string, isTLS bool) []string {
89
+	addrs := strings.Split(addr, ",")
90
+	for i := range addrs {
91
+		if isTLS {
92
+			addrs[i] = "https://" + addrs[i]
93
+		} else {
94
+			addrs[i] = "http://" + addrs[i]
95
+		}
96
+	}
97
+	return addrs
98
+}
65 99
deleted file mode 100644
... ...
@@ -1,255 +0,0 @@
1
-// This is a somewhat faithful reproduction of etcdmain/etcd.go
2
-package etcdserver
3
-
4
-import (
5
-	"fmt"
6
-	"io/ioutil"
7
-	"log"
8
-	"net"
9
-	"net/http"
10
-	"net/url"
11
-	"strings"
12
-	"time"
13
-
14
-	"github.com/golang/glog"
15
-
16
-	"github.com/coreos/etcd/etcdserver"
17
-	etcdhttp "github.com/coreos/etcd/etcdserver/api/v2http"
18
-	"github.com/coreos/etcd/pkg/osutil"
19
-	"github.com/coreos/etcd/pkg/transport"
20
-	"github.com/coreos/etcd/pkg/types"
21
-	"github.com/coreos/etcd/rafthttp"
22
-)
23
-
24
-type config struct {
25
-	// member
26
-	dir            string
27
-	lpurls, lcurls []url.URL
28
-	maxSnapFiles   uint
29
-	maxWalFiles    uint
30
-	name           string
31
-	snapCount      uint64
32
-	// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).
33
-	// make ticks a cluster wide configuration.
34
-	TickMs     uint
35
-	ElectionMs uint
36
-
37
-	// clustering
38
-	apurls, acurls      []url.URL
39
-	initialCluster      string
40
-	initialClusterToken string
41
-
42
-	// security
43
-	clientTLSInfo, peerTLSInfo transport.TLSInfo
44
-}
45
-
46
-const (
47
-	// the owner can make/remove files inside the directory
48
-	defaultName = "openshift.local"
49
-)
50
-
51
-// startEtcd launches the etcd server and HTTP handlers for client/server communication.
52
-func startEtcd(cfg *config) (<-chan struct{}, error) {
53
-	initialPeers, token, err := setupCluster(cfg)
54
-	if err != nil {
55
-		return nil, fmt.Errorf("error setting up initial cluster: %v", err)
56
-	}
57
-
58
-	if !cfg.peerTLSInfo.Empty() {
59
-		glog.V(2).Infof("etcd: peerTLS: %s", cfg.peerTLSInfo)
60
-	}
61
-	plns := make([]net.Listener, 0)
62
-	for _, u := range cfg.lpurls {
63
-		var l net.Listener
64
-		peerTLSConfig, err := cfg.peerTLSInfo.ServerConfig()
65
-		if err != nil {
66
-			return nil, err
67
-		}
68
-		l, err = transport.NewTimeoutListener(u.Host, u.Scheme, peerTLSConfig, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
69
-		if err != nil {
70
-			return nil, err
71
-		}
72
-
73
-		urlStr := u.String()
74
-		glog.V(2).Info("etcd: listening for peers on ", urlStr)
75
-		defer func() {
76
-			if err != nil {
77
-				l.Close()
78
-				glog.V(2).Info("etcd: stopping listening for peers on ", urlStr)
79
-			}
80
-		}()
81
-		plns = append(plns, l)
82
-	}
83
-
84
-	if !cfg.clientTLSInfo.Empty() {
85
-		glog.V(2).Infof("etcd: clientTLS: %s", cfg.clientTLSInfo)
86
-	}
87
-	clns := make([]net.Listener, 0)
88
-	for _, u := range cfg.lcurls {
89
-		l, err := net.Listen("tcp", u.Host)
90
-		if err != nil {
91
-			return nil, err
92
-		}
93
-		clientTLSConfig, err := cfg.clientTLSInfo.ServerConfig()
94
-		if err != nil {
95
-			return nil, err
96
-		}
97
-		l, err = transport.NewKeepAliveListener(l, u.Scheme, clientTLSConfig)
98
-		if err != nil {
99
-			return nil, err
100
-		}
101
-
102
-		urlStr := u.String()
103
-		glog.V(2).Info("etcd: listening for client requests on ", urlStr)
104
-		defer func() {
105
-			if err != nil {
106
-				l.Close()
107
-				glog.V(2).Info("etcd: stopping listening for client requests on ", urlStr)
108
-			}
109
-		}()
110
-		clns = append(clns, l)
111
-	}
112
-
113
-	srvcfg := &etcdserver.ServerConfig{
114
-		Name:                cfg.name,
115
-		ClientURLs:          cfg.acurls,
116
-		PeerURLs:            cfg.apurls,
117
-		DataDir:             cfg.dir,
118
-		SnapCount:           cfg.snapCount,
119
-		MaxSnapFiles:        cfg.maxSnapFiles,
120
-		InitialPeerURLsMap:  initialPeers,
121
-		InitialClusterToken: token,
122
-		MaxWALFiles:         cfg.maxWalFiles,
123
-		NewCluster:          true,
124
-		ForceNewCluster:     false,
125
-		TickMs:              cfg.TickMs,
126
-		ElectionTicks:       cfg.electionTicks(),
127
-
128
-		PeerTLSInfo: cfg.peerTLSInfo,
129
-	}
130
-	var s *etcdserver.EtcdServer
131
-	s, err = etcdserver.NewServer(srvcfg)
132
-	if err != nil {
133
-		return nil, err
134
-	}
135
-	osutil.HandleInterrupts()
136
-	s.Start()
137
-	osutil.RegisterInterruptHandler(s.Stop)
138
-
139
-	ch := etcdhttp.NewClientHandler(s, srvcfg.ReqTimeout())
140
-	ph := etcdhttp.NewPeerHandler(s)
141
-	// Start the peer server in a goroutine
142
-	for _, l := range plns {
143
-		go func(l net.Listener) {
144
-			glog.Fatal(serveHTTP(l, ph, 5*time.Minute))
145
-		}(l)
146
-	}
147
-	// Start a client server goroutine for each listen address
148
-	for _, l := range clns {
149
-		go func(l net.Listener) {
150
-			// read timeout does not work with http close notify
151
-			// TODO: https://github.com/golang/go/issues/9524
152
-			glog.Fatal(serveHTTP(l, ch, 0))
153
-		}(l)
154
-	}
155
-	return s.StopNotify(), nil
156
-}
157
-
158
-// setupCluster sets up an initial cluster definition for bootstrap or discovery.
159
-func setupCluster(cfg *config) (types.URLsMap, string, error) {
160
-	// We're statically configured, and cluster has appropriately been set.
161
-	m, err := types.NewURLsMap(cfg.initialCluster)
162
-	return m, cfg.initialClusterToken, err
163
-}
164
-
165
-func genClusterString(name string, urls types.URLs) string {
166
-	addrs := make([]string, 0)
167
-	for _, u := range urls {
168
-		addrs = append(addrs, fmt.Sprintf("%v=%v", name, u.String()))
169
-	}
170
-	return strings.Join(addrs, ",")
171
-}
172
-
173
-func initialClusterFromName(name string) string {
174
-	n := name
175
-	if name == "" {
176
-		n = defaultName
177
-	}
178
-	return fmt.Sprintf("%s=http://localhost:7001", n)
179
-}
180
-
181
-func urlsFromStrings(input string, tlsInfo transport.TLSInfo) ([]url.URL, error) {
182
-	urls := []url.URL{}
183
-	for _, addr := range strings.Split(input, ",") {
184
-		addrURL := url.URL{Scheme: "http", Host: addr}
185
-		if !tlsInfo.Empty() {
186
-			addrURL.Scheme = "https"
187
-		}
188
-		urls = append(urls, addrURL)
189
-	}
190
-	return urls, nil
191
-}
192
-
193
-// serveHTTP accepts incoming HTTP connections on the listener l,
194
-// creating a new service goroutine for each. The service goroutines
195
-// read requests and then call handler to reply to them.
196
-func serveHTTP(l net.Listener, handler http.Handler, readTimeout time.Duration) error {
197
-	logger := log.New(ioutil.Discard, "etcdhttp", 0)
198
-	// TODO: add debug flag; enable logging when debug flag is set
199
-	srv := &http.Server{
200
-		Handler:     handler,
201
-		ReadTimeout: readTimeout,
202
-		ErrorLog:    logger, // do not log user error
203
-	}
204
-	return srv.Serve(l)
205
-}
206
-
207
-func (cfg *config) resolveUrls() error {
208
-	out, err := resolveTCPAddrs([][]url.URL{cfg.lpurls, cfg.apurls, cfg.lcurls, cfg.acurls})
209
-	if err != nil {
210
-		return err
211
-	}
212
-	cfg.lpurls, cfg.apurls, cfg.lcurls, cfg.acurls = out[0], out[1], out[2], out[3]
213
-	return nil
214
-}
215
-
216
-func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
217
-
218
-// resolveTCPAddrs is a convenience wrapper for net.ResolveTCPAddr.
219
-// resolveTCPAddrs return a new set of url.URLs, in which all DNS hostnames
220
-// are resolved.
221
-func resolveTCPAddrs(urls [][]url.URL) ([][]url.URL, error) {
222
-	newurls := make([][]url.URL, 0)
223
-	for _, us := range urls {
224
-		nus := make([]url.URL, len(us))
225
-		for i, u := range us {
226
-			nu, err := url.Parse(u.String())
227
-			if err != nil {
228
-				return nil, err
229
-			}
230
-			nus[i] = *nu
231
-		}
232
-		for i, u := range nus {
233
-			host, _, err := net.SplitHostPort(u.Host)
234
-			if err != nil {
235
-				glog.Errorf("could not parse url %s during tcp resolving", u.Host)
236
-				return nil, err
237
-			}
238
-			if host == "localhost" {
239
-				continue
240
-			}
241
-			if net.ParseIP(host) != nil {
242
-				continue
243
-			}
244
-			tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
245
-			if err != nil {
246
-				glog.Errorf("could not resolve host %s", u.Host)
247
-				return nil, err
248
-			}
249
-			glog.V(4).Infof("resolving %s to %s", u.Host, tcpAddr.String())
250
-			nus[i].Host = tcpAddr.String()
251
-		}
252
-		newurls = append(newurls, nus)
253
-	}
254
-	return newurls, nil
255
-}
... ...
@@ -30,12 +30,12 @@ type EtcdOptions struct {
30 30
 const etcdLong = `Start an etcd server for testing.
31 31
 
32 32
 This command starts an etcd server based on the config for testing.  It is not
33
-Intended for production use.  Running
33
+intended for production use.  Running
34 34
 
35 35
   %[1]s start %[2]s
36 36
 
37
-will start the server listening for incoming requests. The server
38
-will run in the foreground until you terminate the process.`
37
+will start the server listening for incoming requests. The server will run in
38
+the foreground until you terminate the process.`
39 39
 
40 40
 // NewCommandStartEtcdServer starts only the etcd server
41 41
 func NewCommandStartEtcdServer(name, basename string, out io.Writer) (*cobra.Command, *EtcdOptions) {