Browse code

add etcdserver launch mechanism

deads2k authored on 2016/01/14 03:59:41
Showing 9 changed files
... ...
@@ -291,6 +291,27 @@ _openshift_start_node()
291 291
     must_have_one_noun=()
292 292
 }
293 293
 
294
+_openshift_start_etcd()
295
+{
296
+    last_command="openshift_start_etcd"
297
+    commands=()
298
+
299
+    flags=()
300
+    two_word_flags=()
301
+    flags_with_completion=()
302
+    flags_completion=()
303
+
304
+    flags+=("--config=")
305
+    flags_with_completion+=("--config")
306
+    flags_completion+=("__handle_filename_extension_flag yaml|yml")
307
+    flags+=("--google-json-key=")
308
+    flags+=("--log-flush-frequency=")
309
+
310
+    must_have_one_flag=()
311
+    must_have_one_flag+=("--config=")
312
+    must_have_one_noun=()
313
+}
314
+
294 315
 _openshift_start_kubernetes_apiserver()
295 316
 {
296 317
     last_command="openshift_start_kubernetes_apiserver"
... ...
@@ -743,6 +764,7 @@ _openshift_start()
743 743
     commands=()
744 744
     commands+=("master")
745 745
     commands+=("node")
746
+    commands+=("etcd")
746 747
     commands+=("kubernetes")
747 748
 
748 749
     flags=()
... ...
@@ -16,61 +16,6 @@ import (
16 16
 	configapi "github.com/openshift/origin/pkg/cmd/server/api"
17 17
 )
18 18
 
19
-// RunEtcd starts an etcd server and runs it forever
20
-func RunEtcd(etcdServerConfig *configapi.EtcdConfig) {
21
-	cfg := &config{
22
-		name: defaultName,
23
-		dir:  etcdServerConfig.StorageDir,
24
-
25
-		TickMs:       100,
26
-		ElectionMs:   1000,
27
-		maxSnapFiles: 5,
28
-		maxWalFiles:  5,
29
-
30
-		initialClusterToken: "etcd-cluster",
31
-	}
32
-	var err error
33
-	if configapi.UseTLS(etcdServerConfig.ServingInfo) {
34
-		cfg.clientTLSInfo.CAFile = etcdServerConfig.ServingInfo.ClientCA
35
-		cfg.clientTLSInfo.CertFile = etcdServerConfig.ServingInfo.ServerCert.CertFile
36
-		cfg.clientTLSInfo.KeyFile = etcdServerConfig.ServingInfo.ServerCert.KeyFile
37
-	}
38
-	if cfg.lcurls, err = urlsFromStrings(etcdServerConfig.ServingInfo.BindAddress, cfg.clientTLSInfo); err != nil {
39
-		glog.Fatalf("Unable to build etcd client URLs: %v", err)
40
-	}
41
-
42
-	if configapi.UseTLS(etcdServerConfig.PeerServingInfo) {
43
-		cfg.peerTLSInfo.CAFile = etcdServerConfig.PeerServingInfo.ClientCA
44
-		cfg.peerTLSInfo.CertFile = etcdServerConfig.PeerServingInfo.ServerCert.CertFile
45
-		cfg.peerTLSInfo.KeyFile = etcdServerConfig.PeerServingInfo.ServerCert.KeyFile
46
-	}
47
-	if cfg.lpurls, err = urlsFromStrings(etcdServerConfig.PeerServingInfo.BindAddress, cfg.peerTLSInfo); err != nil {
48
-		glog.Fatalf("Unable to build etcd peer URLs: %v", err)
49
-	}
50
-
51
-	if cfg.acurls, err = urlsFromStrings(etcdServerConfig.Address, cfg.clientTLSInfo); err != nil {
52
-		glog.Fatalf("Unable to build etcd announce client URLs: %v", err)
53
-	}
54
-	if cfg.apurls, err = urlsFromStrings(etcdServerConfig.PeerAddress, cfg.peerTLSInfo); err != nil {
55
-		glog.Fatalf("Unable to build etcd announce peer URLs: %v", err)
56
-	}
57
-
58
-	if err := cfg.resolveUrls(); err != nil {
59
-		glog.Fatalf("Unable to resolve etcd URLs: %v", err)
60
-	}
61
-
62
-	cfg.initialCluster = fmt.Sprintf("%s=%s", cfg.name, cfg.apurls[0].String())
63
-
64
-	stopped, err := startEtcd(cfg)
65
-	if err != nil {
66
-		glog.Fatalf("Unable to start etcd: %v", err)
67
-	}
68
-	go func() {
69
-		glog.Infof("Started etcd at %s", etcdServerConfig.Address)
70
-		<-stopped
71
-	}()
72
-}
73
-
74 19
 // GetAndTestEtcdClient creates an etcd client based on the provided config. It will attempt to
75 20
 // connect to the etcd server and block until the server responds at least once, or return an
76 21
 // error if the server never responded.
77 22
new file mode 100644
... ...
@@ -0,0 +1,64 @@
0
+package etcdserver
1
+
2
+import (
3
+	"fmt"
4
+
5
+	"github.com/golang/glog"
6
+
7
+	configapi "github.com/openshift/origin/pkg/cmd/server/api"
8
+)
9
+
10
+// RunEtcd starts an etcd server and runs it forever
11
+func RunEtcd(etcdServerConfig *configapi.EtcdConfig) {
12
+	cfg := &config{
13
+		name: defaultName,
14
+		dir:  etcdServerConfig.StorageDir,
15
+
16
+		TickMs:       100,
17
+		ElectionMs:   1000,
18
+		maxSnapFiles: 5,
19
+		maxWalFiles:  5,
20
+
21
+		initialClusterToken: "etcd-cluster",
22
+	}
23
+	var err error
24
+	if configapi.UseTLS(etcdServerConfig.ServingInfo) {
25
+		cfg.clientTLSInfo.CAFile = etcdServerConfig.ServingInfo.ClientCA
26
+		cfg.clientTLSInfo.CertFile = etcdServerConfig.ServingInfo.ServerCert.CertFile
27
+		cfg.clientTLSInfo.KeyFile = etcdServerConfig.ServingInfo.ServerCert.KeyFile
28
+	}
29
+	if cfg.lcurls, err = urlsFromStrings(etcdServerConfig.ServingInfo.BindAddress, cfg.clientTLSInfo); err != nil {
30
+		glog.Fatalf("Unable to build etcd client URLs: %v", err)
31
+	}
32
+
33
+	if configapi.UseTLS(etcdServerConfig.PeerServingInfo) {
34
+		cfg.peerTLSInfo.CAFile = etcdServerConfig.PeerServingInfo.ClientCA
35
+		cfg.peerTLSInfo.CertFile = etcdServerConfig.PeerServingInfo.ServerCert.CertFile
36
+		cfg.peerTLSInfo.KeyFile = etcdServerConfig.PeerServingInfo.ServerCert.KeyFile
37
+	}
38
+	if cfg.lpurls, err = urlsFromStrings(etcdServerConfig.PeerServingInfo.BindAddress, cfg.peerTLSInfo); err != nil {
39
+		glog.Fatalf("Unable to build etcd peer URLs: %v", err)
40
+	}
41
+
42
+	if cfg.acurls, err = urlsFromStrings(etcdServerConfig.Address, cfg.clientTLSInfo); err != nil {
43
+		glog.Fatalf("Unable to build etcd announce client URLs: %v", err)
44
+	}
45
+	if cfg.apurls, err = urlsFromStrings(etcdServerConfig.PeerAddress, cfg.peerTLSInfo); err != nil {
46
+		glog.Fatalf("Unable to build etcd announce peer URLs: %v", err)
47
+	}
48
+
49
+	if err := cfg.resolveUrls(); err != nil {
50
+		glog.Fatalf("Unable to resolve etcd URLs: %v", err)
51
+	}
52
+
53
+	cfg.initialCluster = fmt.Sprintf("%s=%s", cfg.name, cfg.apurls[0].String())
54
+
55
+	stopped, err := startEtcd(cfg)
56
+	if err != nil {
57
+		glog.Fatalf("Unable to start etcd: %v", err)
58
+	}
59
+	go func() {
60
+		glog.Infof("Started etcd at %s", etcdServerConfig.Address)
61
+		<-stopped
62
+	}()
63
+}
0 64
new file mode 100644
... ...
@@ -0,0 +1,282 @@
0
+// This is a somewhat faithful reproduction of etcdmain/etcd.go
1
+package etcdserver
2
+
3
+import (
4
+	"fmt"
5
+	"io/ioutil"
6
+	"log"
7
+	"net"
8
+	"net/http"
9
+	"net/url"
10
+	"reflect"
11
+	"sort"
12
+	"strings"
13
+	"time"
14
+
15
+	"github.com/golang/glog"
16
+
17
+	"github.com/coreos/etcd/etcdserver"
18
+	"github.com/coreos/etcd/etcdserver/etcdhttp"
19
+	"github.com/coreos/etcd/pkg/osutil"
20
+	"github.com/coreos/etcd/pkg/transport"
21
+	"github.com/coreos/etcd/pkg/types"
22
+	"github.com/coreos/etcd/rafthttp"
23
+)
24
+
25
+type config struct {
26
+	// member
27
+	dir            string
28
+	lpurls, lcurls []url.URL
29
+	maxSnapFiles   uint
30
+	maxWalFiles    uint
31
+	name           string
32
+	snapCount      uint64
33
+	// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).
34
+	// make ticks a cluster wide configuration.
35
+	TickMs     uint
36
+	ElectionMs uint
37
+
38
+	// clustering
39
+	apurls, acurls      []url.URL
40
+	initialCluster      string
41
+	initialClusterToken string
42
+
43
+	// security
44
+	clientTLSInfo, peerTLSInfo transport.TLSInfo
45
+}
46
+
47
+const (
48
+	// the owner can make/remove files inside the directory
49
+	defaultName = "openshift.local"
50
+)
51
+
52
+// startEtcd launches the etcd server and HTTP handlers for client/server communication.
53
+func startEtcd(cfg *config) (<-chan struct{}, error) {
54
+	initialPeers, token, err := setupCluster(cfg)
55
+	if err != nil {
56
+		return nil, fmt.Errorf("error setting up initial cluster: %v", err)
57
+	}
58
+
59
+	pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, peerDialTimeout(cfg.ElectionMs), rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
60
+	if err != nil {
61
+		return nil, err
62
+	}
63
+
64
+	if !cfg.peerTLSInfo.Empty() {
65
+		glog.V(2).Infof("etcd: peerTLS: %s", cfg.peerTLSInfo)
66
+	}
67
+	plns := make([]net.Listener, 0)
68
+	for _, u := range cfg.lpurls {
69
+		var l net.Listener
70
+		l, err = transport.NewTimeoutListener(u.Host, u.Scheme, cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
71
+		if err != nil {
72
+			return nil, err
73
+		}
74
+
75
+		urlStr := u.String()
76
+		glog.V(2).Info("etcd: listening for peers on ", urlStr)
77
+		defer func() {
78
+			if err != nil {
79
+				l.Close()
80
+				glog.V(2).Info("etcd: stopping listening for peers on ", urlStr)
81
+			}
82
+		}()
83
+		plns = append(plns, l)
84
+	}
85
+
86
+	if !cfg.clientTLSInfo.Empty() {
87
+		glog.V(2).Infof("etcd: clientTLS: %s", cfg.clientTLSInfo)
88
+	}
89
+	clns := make([]net.Listener, 0)
90
+	for _, u := range cfg.lcurls {
91
+		var l net.Listener
92
+		l, err = transport.NewKeepAliveListener(u.Host, u.Scheme, cfg.clientTLSInfo)
93
+		if err != nil {
94
+			return nil, err
95
+		}
96
+
97
+		urlStr := u.String()
98
+		glog.V(2).Info("etcd: listening for client requests on ", urlStr)
99
+		defer func() {
100
+			if err != nil {
101
+				l.Close()
102
+				glog.V(2).Info("etcd: stopping listening for client requests on ", urlStr)
103
+			}
104
+		}()
105
+		clns = append(clns, l)
106
+	}
107
+
108
+	srvcfg := &etcdserver.ServerConfig{
109
+		Name:                cfg.name,
110
+		ClientURLs:          cfg.acurls,
111
+		PeerURLs:            cfg.apurls,
112
+		DataDir:             cfg.dir,
113
+		SnapCount:           cfg.snapCount,
114
+		MaxSnapFiles:        cfg.maxSnapFiles,
115
+		InitialPeerURLsMap:  initialPeers,
116
+		InitialClusterToken: token,
117
+		MaxWALFiles:         cfg.maxWalFiles,
118
+		NewCluster:          true,
119
+		ForceNewCluster:     false,
120
+		Transport:           pt,
121
+		TickMs:              cfg.TickMs,
122
+		ElectionTicks:       cfg.electionTicks(),
123
+	}
124
+	var s *etcdserver.EtcdServer
125
+	s, err = etcdserver.NewServer(srvcfg)
126
+	if err != nil {
127
+		return nil, err
128
+	}
129
+	osutil.HandleInterrupts()
130
+	s.Start()
131
+	osutil.RegisterInterruptHandler(s.Stop)
132
+
133
+	ch := etcdhttp.NewClientHandler(s, srvcfg.ReqTimeout())
134
+	ph := etcdhttp.NewPeerHandler(s.Cluster(), s.RaftHandler())
135
+	// Start the peer server in a goroutine
136
+	for _, l := range plns {
137
+		go func(l net.Listener) {
138
+			glog.Fatal(serveHTTP(l, ph, 5*time.Minute))
139
+		}(l)
140
+	}
141
+	// Start a client server goroutine for each listen address
142
+	for _, l := range clns {
143
+		go func(l net.Listener) {
144
+			// read timeout does not work with http close notify
145
+			// TODO: https://github.com/golang/go/issues/9524
146
+			glog.Fatal(serveHTTP(l, ch, 0))
147
+		}(l)
148
+	}
149
+	return s.StopNotify(), nil
150
+}
151
+
152
+// setupCluster sets up an initial cluster definition for bootstrap or discovery.
153
+func setupCluster(cfg *config) (types.URLsMap, string, error) {
154
+	// We're statically configured, and cluster has appropriately been set.
155
+	m, err := types.NewURLsMap(cfg.initialCluster)
156
+	return m, cfg.initialClusterToken, err
157
+}
158
+
159
+func genClusterString(name string, urls types.URLs) string {
160
+	addrs := make([]string, 0)
161
+	for _, u := range urls {
162
+		addrs = append(addrs, fmt.Sprintf("%v=%v", name, u.String()))
163
+	}
164
+	return strings.Join(addrs, ",")
165
+}
166
+
167
+func initialClusterFromName(name string) string {
168
+	n := name
169
+	if name == "" {
170
+		n = defaultName
171
+	}
172
+	return fmt.Sprintf("%s=http://localhost:7001", n)
173
+}
174
+
175
+func urlsFromStrings(input string, tlsInfo transport.TLSInfo) ([]url.URL, error) {
176
+	urls := []url.URL{}
177
+	for _, addr := range strings.Split(input, ",") {
178
+		addrURL := url.URL{Scheme: "http", Host: addr}
179
+		if !tlsInfo.Empty() {
180
+			addrURL.Scheme = "https"
181
+		}
182
+		urls = append(urls, addrURL)
183
+	}
184
+	return urls, nil
185
+}
186
+
187
+// serveHTTP accepts incoming HTTP connections on the listener l,
188
+// creating a new service goroutine for each. The service goroutines
189
+// read requests and then call handler to reply to them.
190
+func serveHTTP(l net.Listener, handler http.Handler, readTimeout time.Duration) error {
191
+	logger := log.New(ioutil.Discard, "etcdhttp", 0)
192
+	// TODO: add debug flag; enable logging when debug flag is set
193
+	srv := &http.Server{
194
+		Handler:     handler,
195
+		ReadTimeout: readTimeout,
196
+		ErrorLog:    logger, // do not log user error
197
+	}
198
+	return srv.Serve(l)
199
+}
200
+
201
+func (cfg *config) resolveUrls() error {
202
+	out, err := resolveTCPAddrs([][]url.URL{cfg.lpurls, cfg.apurls, cfg.lcurls, cfg.acurls})
203
+	if err != nil {
204
+		return err
205
+	}
206
+	cfg.lpurls, cfg.apurls, cfg.lcurls, cfg.acurls = out[0], out[1], out[2], out[3]
207
+	return nil
208
+}
209
+
210
+func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
211
+
212
+// private in etcdmain
213
+
214
+func peerDialTimeout(electionMs uint) time.Duration {
215
+	// 1s for queue wait and system delay
216
+	// + one RTT, which is smaller than 1/5 election timeout
217
+	return time.Second + time.Duration(electionMs)*time.Millisecond/5
218
+}
219
+
220
+// made private in netutil
221
+
222
+// resolveTCPAddrs is a convenience wrapper for net.ResolveTCPAddr.
223
+// resolveTCPAddrs return a new set of url.URLs, in which all DNS hostnames
224
+// are resolved.
225
+func resolveTCPAddrs(urls [][]url.URL) ([][]url.URL, error) {
226
+	newurls := make([][]url.URL, 0)
227
+	for _, us := range urls {
228
+		nus := make([]url.URL, len(us))
229
+		for i, u := range us {
230
+			nu, err := url.Parse(u.String())
231
+			if err != nil {
232
+				return nil, err
233
+			}
234
+			nus[i] = *nu
235
+		}
236
+		for i, u := range nus {
237
+			host, _, err := net.SplitHostPort(u.Host)
238
+			if err != nil {
239
+				glog.Errorf("could not parse url %s during tcp resolving", u.Host)
240
+				return nil, err
241
+			}
242
+			if host == "localhost" {
243
+				continue
244
+			}
245
+			if net.ParseIP(host) != nil {
246
+				continue
247
+			}
248
+			tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
249
+			if err != nil {
250
+				glog.Errorf("could not resolve host %s", u.Host)
251
+				return nil, err
252
+			}
253
+			glog.Infof("resolving %s to %s", u.Host, tcpAddr.String())
254
+			nus[i].Host = tcpAddr.String()
255
+		}
256
+		newurls = append(newurls, nus)
257
+	}
258
+	return newurls, nil
259
+}
260
+
261
+// urlsEqual checks equality of url.URLS between two arrays.
262
+// This check pass even if an URL is in hostname and opposite is in IP address.
263
+func urlsEqual(a []url.URL, b []url.URL) bool {
264
+	if len(a) != len(b) {
265
+		return false
266
+	}
267
+	urls, err := resolveTCPAddrs([][]url.URL{a, b})
268
+	if err != nil {
269
+		return false
270
+	}
271
+	a, b = urls[0], urls[1]
272
+	sort.Sort(types.URLs(a))
273
+	sort.Sort(types.URLs(b))
274
+	for i := range a {
275
+		if !reflect.DeepEqual(a[i], b[i]) {
276
+			return false
277
+		}
278
+	}
279
+
280
+	return true
281
+}
0 282
deleted file mode 100644
... ...
@@ -1,282 +0,0 @@
1
-// This is a somewhat faithful reproduction of etcdmain/etcd.go
2
-package etcd
3
-
4
-import (
5
-	"fmt"
6
-	"io/ioutil"
7
-	"log"
8
-	"net"
9
-	"net/http"
10
-	"net/url"
11
-	"reflect"
12
-	"sort"
13
-	"strings"
14
-	"time"
15
-
16
-	"github.com/golang/glog"
17
-
18
-	"github.com/coreos/etcd/etcdserver"
19
-	"github.com/coreos/etcd/etcdserver/etcdhttp"
20
-	"github.com/coreos/etcd/pkg/osutil"
21
-	"github.com/coreos/etcd/pkg/transport"
22
-	"github.com/coreos/etcd/pkg/types"
23
-	"github.com/coreos/etcd/rafthttp"
24
-)
25
-
26
-type config struct {
27
-	// member
28
-	dir            string
29
-	lpurls, lcurls []url.URL
30
-	maxSnapFiles   uint
31
-	maxWalFiles    uint
32
-	name           string
33
-	snapCount      uint64
34
-	// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).
35
-	// make ticks a cluster wide configuration.
36
-	TickMs     uint
37
-	ElectionMs uint
38
-
39
-	// clustering
40
-	apurls, acurls      []url.URL
41
-	initialCluster      string
42
-	initialClusterToken string
43
-
44
-	// security
45
-	clientTLSInfo, peerTLSInfo transport.TLSInfo
46
-}
47
-
48
-const (
49
-	// the owner can make/remove files inside the directory
50
-	defaultName = "openshift.local"
51
-)
52
-
53
-// startEtcd launches the etcd server and HTTP handlers for client/server communication.
54
-func startEtcd(cfg *config) (<-chan struct{}, error) {
55
-	initialPeers, token, err := setupCluster(cfg)
56
-	if err != nil {
57
-		return nil, fmt.Errorf("error setting up initial cluster: %v", err)
58
-	}
59
-
60
-	pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, peerDialTimeout(cfg.ElectionMs), rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
61
-	if err != nil {
62
-		return nil, err
63
-	}
64
-
65
-	if !cfg.peerTLSInfo.Empty() {
66
-		glog.V(2).Infof("etcd: peerTLS: %s", cfg.peerTLSInfo)
67
-	}
68
-	plns := make([]net.Listener, 0)
69
-	for _, u := range cfg.lpurls {
70
-		var l net.Listener
71
-		l, err = transport.NewTimeoutListener(u.Host, u.Scheme, cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
72
-		if err != nil {
73
-			return nil, err
74
-		}
75
-
76
-		urlStr := u.String()
77
-		glog.V(2).Info("etcd: listening for peers on ", urlStr)
78
-		defer func() {
79
-			if err != nil {
80
-				l.Close()
81
-				glog.V(2).Info("etcd: stopping listening for peers on ", urlStr)
82
-			}
83
-		}()
84
-		plns = append(plns, l)
85
-	}
86
-
87
-	if !cfg.clientTLSInfo.Empty() {
88
-		glog.V(2).Infof("etcd: clientTLS: %s", cfg.clientTLSInfo)
89
-	}
90
-	clns := make([]net.Listener, 0)
91
-	for _, u := range cfg.lcurls {
92
-		var l net.Listener
93
-		l, err = transport.NewKeepAliveListener(u.Host, u.Scheme, cfg.clientTLSInfo)
94
-		if err != nil {
95
-			return nil, err
96
-		}
97
-
98
-		urlStr := u.String()
99
-		glog.V(2).Info("etcd: listening for client requests on ", urlStr)
100
-		defer func() {
101
-			if err != nil {
102
-				l.Close()
103
-				glog.V(2).Info("etcd: stopping listening for client requests on ", urlStr)
104
-			}
105
-		}()
106
-		clns = append(clns, l)
107
-	}
108
-
109
-	srvcfg := &etcdserver.ServerConfig{
110
-		Name:                cfg.name,
111
-		ClientURLs:          cfg.acurls,
112
-		PeerURLs:            cfg.apurls,
113
-		DataDir:             cfg.dir,
114
-		SnapCount:           cfg.snapCount,
115
-		MaxSnapFiles:        cfg.maxSnapFiles,
116
-		InitialPeerURLsMap:  initialPeers,
117
-		InitialClusterToken: token,
118
-		MaxWALFiles:         cfg.maxWalFiles,
119
-		NewCluster:          true,
120
-		ForceNewCluster:     false,
121
-		Transport:           pt,
122
-		TickMs:              cfg.TickMs,
123
-		ElectionTicks:       cfg.electionTicks(),
124
-	}
125
-	var s *etcdserver.EtcdServer
126
-	s, err = etcdserver.NewServer(srvcfg)
127
-	if err != nil {
128
-		return nil, err
129
-	}
130
-	osutil.HandleInterrupts()
131
-	s.Start()
132
-	osutil.RegisterInterruptHandler(s.Stop)
133
-
134
-	ch := etcdhttp.NewClientHandler(s, srvcfg.ReqTimeout())
135
-	ph := etcdhttp.NewPeerHandler(s.Cluster(), s.RaftHandler())
136
-	// Start the peer server in a goroutine
137
-	for _, l := range plns {
138
-		go func(l net.Listener) {
139
-			glog.Fatal(serveHTTP(l, ph, 5*time.Minute))
140
-		}(l)
141
-	}
142
-	// Start a client server goroutine for each listen address
143
-	for _, l := range clns {
144
-		go func(l net.Listener) {
145
-			// read timeout does not work with http close notify
146
-			// TODO: https://github.com/golang/go/issues/9524
147
-			glog.Fatal(serveHTTP(l, ch, 0))
148
-		}(l)
149
-	}
150
-	return s.StopNotify(), nil
151
-}
152
-
153
-// setupCluster sets up an initial cluster definition for bootstrap or discovery.
154
-func setupCluster(cfg *config) (types.URLsMap, string, error) {
155
-	// We're statically configured, and cluster has appropriately been set.
156
-	m, err := types.NewURLsMap(cfg.initialCluster)
157
-	return m, cfg.initialClusterToken, err
158
-}
159
-
160
-func genClusterString(name string, urls types.URLs) string {
161
-	addrs := make([]string, 0)
162
-	for _, u := range urls {
163
-		addrs = append(addrs, fmt.Sprintf("%v=%v", name, u.String()))
164
-	}
165
-	return strings.Join(addrs, ",")
166
-}
167
-
168
-func initialClusterFromName(name string) string {
169
-	n := name
170
-	if name == "" {
171
-		n = defaultName
172
-	}
173
-	return fmt.Sprintf("%s=http://localhost:7001", n)
174
-}
175
-
176
-func urlsFromStrings(input string, tlsInfo transport.TLSInfo) ([]url.URL, error) {
177
-	urls := []url.URL{}
178
-	for _, addr := range strings.Split(input, ",") {
179
-		addrURL := url.URL{Scheme: "http", Host: addr}
180
-		if !tlsInfo.Empty() {
181
-			addrURL.Scheme = "https"
182
-		}
183
-		urls = append(urls, addrURL)
184
-	}
185
-	return urls, nil
186
-}
187
-
188
-// serveHTTP accepts incoming HTTP connections on the listener l,
189
-// creating a new service goroutine for each. The service goroutines
190
-// read requests and then call handler to reply to them.
191
-func serveHTTP(l net.Listener, handler http.Handler, readTimeout time.Duration) error {
192
-	logger := log.New(ioutil.Discard, "etcdhttp", 0)
193
-	// TODO: add debug flag; enable logging when debug flag is set
194
-	srv := &http.Server{
195
-		Handler:     handler,
196
-		ReadTimeout: readTimeout,
197
-		ErrorLog:    logger, // do not log user error
198
-	}
199
-	return srv.Serve(l)
200
-}
201
-
202
-func (cfg *config) resolveUrls() error {
203
-	out, err := resolveTCPAddrs([][]url.URL{cfg.lpurls, cfg.apurls, cfg.lcurls, cfg.acurls})
204
-	if err != nil {
205
-		return err
206
-	}
207
-	cfg.lpurls, cfg.apurls, cfg.lcurls, cfg.acurls = out[0], out[1], out[2], out[3]
208
-	return nil
209
-}
210
-
211
-func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
212
-
213
-// private in etcdmain
214
-
215
-func peerDialTimeout(electionMs uint) time.Duration {
216
-	// 1s for queue wait and system delay
217
-	// + one RTT, which is smaller than 1/5 election timeout
218
-	return time.Second + time.Duration(electionMs)*time.Millisecond/5
219
-}
220
-
221
-// made private in netutil
222
-
223
-// resolveTCPAddrs is a convenience wrapper for net.ResolveTCPAddr.
224
-// resolveTCPAddrs return a new set of url.URLs, in which all DNS hostnames
225
-// are resolved.
226
-func resolveTCPAddrs(urls [][]url.URL) ([][]url.URL, error) {
227
-	newurls := make([][]url.URL, 0)
228
-	for _, us := range urls {
229
-		nus := make([]url.URL, len(us))
230
-		for i, u := range us {
231
-			nu, err := url.Parse(u.String())
232
-			if err != nil {
233
-				return nil, err
234
-			}
235
-			nus[i] = *nu
236
-		}
237
-		for i, u := range nus {
238
-			host, _, err := net.SplitHostPort(u.Host)
239
-			if err != nil {
240
-				glog.Errorf("could not parse url %s during tcp resolving", u.Host)
241
-				return nil, err
242
-			}
243
-			if host == "localhost" {
244
-				continue
245
-			}
246
-			if net.ParseIP(host) != nil {
247
-				continue
248
-			}
249
-			tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
250
-			if err != nil {
251
-				glog.Errorf("could not resolve host %s", u.Host)
252
-				return nil, err
253
-			}
254
-			glog.Infof("resolving %s to %s", u.Host, tcpAddr.String())
255
-			nus[i].Host = tcpAddr.String()
256
-		}
257
-		newurls = append(newurls, nus)
258
-	}
259
-	return newurls, nil
260
-}
261
-
262
-// urlsEqual checks equality of url.URLS between two arrays.
263
-// This check pass even if an URL is in hostname and opposite is in IP address.
264
-func urlsEqual(a []url.URL, b []url.URL) bool {
265
-	if len(a) != len(b) {
266
-		return false
267
-	}
268
-	urls, err := resolveTCPAddrs([][]url.URL{a, b})
269
-	if err != nil {
270
-		return false
271
-	}
272
-	a, b = urls[0], urls[1]
273
-	sort.Sort(types.URLs(a))
274
-	sort.Sort(types.URLs(b))
275
-	for i := range a {
276
-		if !reflect.DeepEqual(a[i], b[i]) {
277
-			return false
278
-		}
279
-	}
280
-
281
-	return true
282
-}
... ...
@@ -114,8 +114,10 @@ func NewCommandStartAllInOne(basename string, out io.Writer) (*cobra.Command, *A
114 114
 
115 115
 	startMaster, _ := NewCommandStartMaster(basename, out)
116 116
 	startNode, _ := NewCommandStartNode(basename, out)
117
+	startEtcdServer, _ := NewCommandStartEtcdServer(RecommendedStartEtcdServerName, basename, out)
117 118
 	cmds.AddCommand(startMaster)
118 119
 	cmds.AddCommand(startNode)
120
+	cmds.AddCommand(startEtcdServer)
119 121
 
120 122
 	startKube := kubernetes.NewCommand("kubernetes", basename, out)
121 123
 	cmds.AddCommand(startKube)
122 124
new file mode 100644
... ...
@@ -0,0 +1,117 @@
0
+package start
1
+
2
+import (
3
+	"errors"
4
+	"fmt"
5
+	"io"
6
+	"os"
7
+
8
+	"github.com/coreos/go-systemd/daemon"
9
+	"github.com/golang/glog"
10
+	"github.com/spf13/cobra"
11
+
12
+	kerrors "k8s.io/kubernetes/pkg/api/errors"
13
+	kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
14
+	"k8s.io/kubernetes/pkg/util/fielderrors"
15
+
16
+	configapilatest "github.com/openshift/origin/pkg/cmd/server/api/latest"
17
+	"github.com/openshift/origin/pkg/cmd/server/api/validation"
18
+	"github.com/openshift/origin/pkg/cmd/server/etcd/etcdserver"
19
+)
20
+
21
+const RecommendedStartEtcdServerName = "etcd"
22
+
23
+type EtcdOptions struct {
24
+	ConfigFile string
25
+	Output     io.Writer
26
+}
27
+
28
+const etcdLong = `Start an etcd server for testing.
29
+
30
+This command starts an etcd server based on the config for testing.  It is not 
31
+Intended for production use.  Running
32
+
33
+  $ %[1]s start %[2]s
34
+
35
+will start the server listening for incoming requests. The server
36
+will run in the foreground until you terminate the process.`
37
+
38
+// NewCommandStartEtcdServer starts only the etcd server
39
+func NewCommandStartEtcdServer(name, basename string, out io.Writer) (*cobra.Command, *EtcdOptions) {
40
+	options := &EtcdOptions{Output: out}
41
+
42
+	cmd := &cobra.Command{
43
+		Use:   name,
44
+		Short: "Launch etcd server",
45
+		Long:  fmt.Sprintf(etcdLong, basename, name),
46
+		Run: func(c *cobra.Command, args []string) {
47
+			kcmdutil.CheckErr(options.Validate())
48
+
49
+			startProfiler()
50
+
51
+			if err := options.StartEtcdServer(); err != nil {
52
+				if kerrors.IsInvalid(err) {
53
+					if details := err.(*kerrors.StatusError).ErrStatus.Details; details != nil {
54
+						fmt.Fprintf(c.Out(), "Invalid %s %s\n", details.Kind, details.Name)
55
+						for _, cause := range details.Causes {
56
+							fmt.Fprintf(c.Out(), "  %s: %s\n", cause.Field, cause.Message)
57
+						}
58
+						os.Exit(255)
59
+					}
60
+				}
61
+				glog.Fatal(err)
62
+			}
63
+		},
64
+	}
65
+
66
+	flags := cmd.Flags()
67
+	// This command only supports reading from config
68
+	flags.StringVar(&options.ConfigFile, "config", "", "Location of the master configuration file to run from.")
69
+	cmd.MarkFlagFilename("config", "yaml", "yml")
70
+	cmd.MarkFlagRequired("config")
71
+
72
+	return cmd, options
73
+}
74
+
75
+func (o *EtcdOptions) Validate() error {
76
+	if len(o.ConfigFile) == 0 {
77
+		return errors.New("--config is required for this command")
78
+	}
79
+
80
+	return nil
81
+}
82
+
83
+// StartEtcdServer calls RunEtcdServer and then waits forever
84
+func (o *EtcdOptions) StartEtcdServer() error {
85
+	if err := o.RunEtcdServer(); err != nil {
86
+		return err
87
+	}
88
+
89
+	go daemon.SdNotify("READY=1")
90
+	select {}
91
+}
92
+
93
+// RunEtcdServer takes the options and starts the etcd server
94
+func (o *EtcdOptions) RunEtcdServer() error {
95
+	masterConfig, err := configapilatest.ReadAndResolveMasterConfig(o.ConfigFile)
96
+	if err != nil {
97
+		return err
98
+	}
99
+
100
+	validationResults := validation.ValidateMasterConfig(masterConfig)
101
+	if len(validationResults.Warnings) != 0 {
102
+		for _, warning := range validationResults.Warnings {
103
+			glog.Warningf("%v", warning)
104
+		}
105
+	}
106
+	if len(validationResults.Errors) != 0 {
107
+		return kerrors.NewInvalid("MasterConfig", o.ConfigFile, validationResults.Errors)
108
+	}
109
+
110
+	if masterConfig.EtcdConfig == nil {
111
+		return kerrors.NewInvalid("MasterConfig.EtcConfig", o.ConfigFile, fielderrors.ValidationErrorList{fielderrors.NewFieldRequired("etcdConfig")})
112
+	}
113
+
114
+	etcdserver.RunEtcd(masterConfig.EtcdConfig)
115
+	return nil
116
+}
... ...
@@ -26,6 +26,7 @@ import (
26 26
 	"github.com/openshift/origin/pkg/cmd/server/api/validation"
27 27
 	"github.com/openshift/origin/pkg/cmd/server/bootstrappolicy"
28 28
 	"github.com/openshift/origin/pkg/cmd/server/etcd"
29
+	"github.com/openshift/origin/pkg/cmd/server/etcd/etcdserver"
29 30
 	"github.com/openshift/origin/pkg/cmd/server/kubernetes"
30 31
 	"github.com/openshift/origin/pkg/cmd/server/origin"
31 32
 	cmdutil "github.com/openshift/origin/pkg/cmd/util"
... ...
@@ -424,7 +425,7 @@ func startHealth(openshiftConfig *origin.MasterConfig) error {
424 424
 func startAPI(oc *origin.MasterConfig, kc *kubernetes.MasterConfig) error {
425 425
 	// start etcd
426 426
 	if oc.Options.EtcdConfig != nil {
427
-		etcd.RunEtcd(oc.Options.EtcdConfig)
427
+		etcdserver.RunEtcd(oc.Options.EtcdConfig)
428 428
 	}
429 429
 
430 430
 	// verify we can connect to etcd with the provided config
431 431
new file mode 100755
... ...
@@ -0,0 +1,94 @@
0
+#!/bin/bash
1
+#
2
+# This scripts starts the OpenShift server with a default configuration.
3
+# The OpenShift Docker registry and router are installed.
4
+# It will run all tests that are imported into test/extended.
5
+
6
+set -o errexit
7
+set -o nounset
8
+set -o pipefail
9
+
10
+OS_ROOT=$(dirname "${BASH_SOURCE}")/../..
11
+source "${OS_ROOT}/hack/util.sh"
12
+source "${OS_ROOT}/hack/common.sh"
13
+os::log::install_errexit
14
+cd "${OS_ROOT}"
15
+
16
+os::build::setup_env
17
+
18
+export TMPDIR="${TMPDIR:-"/tmp"}"
19
+export BASETMPDIR="${TMPDIR}/openshift-extended-tests/alternate_launches"
20
+export EXTENDED_TEST_PATH="${OS_ROOT}/test/extended"
21
+
22
+function cleanup()
23
+{
24
+	out=$?
25
+	cleanup_openshift
26
+	echo "[INFO] Exiting"
27
+	exit $out
28
+}
29
+
30
+trap "exit" INT TERM
31
+trap "cleanup" EXIT
32
+
33
+
34
+echo "[INFO] Starting server as distinct processes"
35
+ensure_iptables_or_die
36
+setup_env_vars
37
+reset_tmp_dir
38
+configure_os_server
39
+
40
+echo "[INFO] `openshift version`"
41
+echo "[INFO] Server logs will be at:    ${LOG_DIR}/openshift.log"
42
+echo "[INFO] Test artifacts will be in: ${ARTIFACT_DIR}"
43
+echo "[INFO] Volumes dir is:            ${VOLUME_DIR}"
44
+echo "[INFO] Config dir is:             ${SERVER_CONFIG_DIR}"
45
+echo "[INFO] Using images:              ${USE_IMAGES}"
46
+echo "[INFO] MasterIP is:               ${MASTER_ADDR}"
47
+
48
+mkdir -p ${LOG_DIR}
49
+
50
+echo "[INFO] Scan of OpenShift related processes already up via ps -ef	| grep openshift : "
51
+ps -ef | grep openshift
52
+echo "[INFO] Starting etcdserver"
53
+sudo env "PATH=${PATH}" OPENSHIFT_ON_PANIC=crash openshift start etcd \
54
+ --config=${MASTER_CONFIG_DIR}/master-config.yaml \
55
+ --loglevel=4 \
56
+&>"${LOG_DIR}/os-etcdserver.log" &
57
+
58
+echo "[INFO] Starting api server"
59
+sudo env "PATH=${PATH}" OPENSHIFT_PROFILE=web OPENSHIFT_ON_PANIC=crash openshift start master api \
60
+ --config=${MASTER_CONFIG_DIR}/master-config.yaml \
61
+ --loglevel=4 \
62
+&>"${LOG_DIR}/os-apiserver.log" &
63
+
64
+wait_for_url "${API_SCHEME}://${API_HOST}:${API_PORT}/healthz" "apiserver: " 0.25 80
65
+wait_for_url "${API_SCHEME}://${API_HOST}:${API_PORT}/healthz/ready" "apiserver(ready): " 0.25 80
66
+echo "[INFO] OpenShift API server up at: "
67
+date
68
+
69
+echo "[INFO] Starting controllers"
70
+sudo env "PATH=${PATH}"  OPENSHIFT_ON_PANIC=crash openshift start master controllers \
71
+ --config=${MASTER_CONFIG_DIR}/master-config.yaml \
72
+ --loglevel=4 \
73
+&>"${LOG_DIR}/os-controllers.log" &
74
+
75
+echo "[INFO] Starting node"
76
+sudo env "PATH=${PATH}"  OPENSHIFT_ON_PANIC=crash openshift start node \
77
+ --config=${NODE_CONFIG_DIR}/node-config.yaml \
78
+ --loglevel=4 \
79
+&>"${LOG_DIR}/os-node.log" &
80
+export OS_PID=$!
81
+
82
+echo "[INFO] OpenShift server start at: "
83
+date
84
+
85
+wait_for_url "${KUBELET_SCHEME}://${KUBELET_HOST}:${KUBELET_PORT}/healthz" "[INFO] kubelet: " 0.5 60
86
+wait_for_url "${API_SCHEME}://${API_HOST}:${API_PORT}/api/v1/nodes/${KUBELET_HOST}" "apiserver(nodes): " 0.25 80
87
+echo "[INFO] OpenShift node health checks done at: "
88
+date
89
+
90
+# set our default KUBECONFIG location
91
+export KUBECONFIG="${ADMIN_KUBECONFIG}"
92
+
93
+${OS_ROOT}/test/end-to-end/core.sh