Support the same options as before.
| ... | ... |
@@ -232,7 +232,8 @@ function start_etcd {
|
| 232 | 232 |
etcd -name test -data-dir ${ETCD_DIR} -bind-addr ${host}:${port} ${initial_cluster} >/dev/null 2>/dev/null &
|
| 233 | 233 |
export ETCD_PID=$! |
| 234 | 234 |
|
| 235 |
- wait_for_url "http://${host}:${port}/version" "etcd: "
|
|
| 235 |
+ wait_for_url "http://${host}:${port}/version" "etcd: " 0.25 80
|
|
| 236 |
+ curl -X PUT "http://${host}:${port}/v2/keys/_test"
|
|
| 236 | 237 |
} |
| 237 | 238 |
|
| 238 | 239 |
# stop_openshift_server utility function to terminate an |
| ... | ... |
@@ -4,13 +4,10 @@ import ( |
| 4 | 4 |
"fmt" |
| 5 | 5 |
"time" |
| 6 | 6 |
|
| 7 |
- //etcdconfig "github.com/coreos/etcd/config" |
|
| 8 |
- _ "github.com/coreos/etcd/etcdmain" |
|
| 9 | 7 |
etcdclient "github.com/coreos/go-etcd/etcd" |
| 10 | 8 |
"github.com/golang/glog" |
| 11 | 9 |
|
| 12 | 10 |
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools" |
| 13 |
- "github.com/GoogleCloudPlatform/kubernetes/pkg/util" |
|
| 14 | 11 |
|
| 15 | 12 |
"github.com/openshift/origin/pkg/api/latest" |
| 16 | 13 |
configapi "github.com/openshift/origin/pkg/cmd/server/api" |
| ... | ... |
@@ -18,40 +15,60 @@ import ( |
| 18 | 18 |
|
| 19 | 19 |
// RunEtcd starts an etcd server and runs it forever |
| 20 | 20 |
func RunEtcd(etcdServerConfig *configapi.EtcdConfig) {
|
| 21 |
+ cfg := &config{
|
|
| 22 |
+ name: defaultName, |
|
| 23 |
+ dir: etcdServerConfig.StorageDir, |
|
| 21 | 24 |
|
| 22 |
- /*config := etcdconfig.New() |
|
| 23 |
- |
|
| 24 |
- config.Addr = etcdServerConfig.Address |
|
| 25 |
- config.BindAddr = etcdServerConfig.ServingInfo.BindAddress |
|
| 25 |
+ TickMs: 100, |
|
| 26 |
+ ElectionMs: 1000, |
|
| 27 |
+ maxSnapFiles: 5, |
|
| 28 |
+ maxWalFiles: 5, |
|
| 26 | 29 |
|
| 30 |
+ initialClusterToken: "etcd-cluster", |
|
| 31 |
+ } |
|
| 32 |
+ var err error |
|
| 27 | 33 |
if configapi.UseTLS(etcdServerConfig.ServingInfo) {
|
| 28 |
- config.CAFile = etcdServerConfig.ServingInfo.ClientCA |
|
| 29 |
- config.CertFile = etcdServerConfig.ServingInfo.ServerCert.CertFile |
|
| 30 |
- config.KeyFile = etcdServerConfig.ServingInfo.ServerCert.KeyFile |
|
| 34 |
+ cfg.clientTLSInfo.CAFile = etcdServerConfig.ServingInfo.ClientCA |
|
| 35 |
+ cfg.clientTLSInfo.CertFile = etcdServerConfig.ServingInfo.ServerCert.CertFile |
|
| 36 |
+ cfg.clientTLSInfo.KeyFile = etcdServerConfig.ServingInfo.ServerCert.KeyFile |
|
| 37 |
+ } |
|
| 38 |
+ if cfg.lcurls, err = urlsFromStrings(etcdServerConfig.ServingInfo.BindAddress, cfg.clientTLSInfo); err != nil {
|
|
| 39 |
+ glog.Fatalf("Unable to build etcd client URLs: %v", err)
|
|
| 31 | 40 |
} |
| 32 |
- |
|
| 33 |
- config.Peer.Addr = etcdServerConfig.PeerAddress |
|
| 34 |
- config.Peer.BindAddr = etcdServerConfig.PeerServingInfo.BindAddress |
|
| 35 | 41 |
|
| 36 | 42 |
if configapi.UseTLS(etcdServerConfig.PeerServingInfo) {
|
| 37 |
- config.Peer.CAFile = etcdServerConfig.PeerServingInfo.ClientCA |
|
| 38 |
- config.Peer.CertFile = etcdServerConfig.PeerServingInfo.ServerCert.CertFile |
|
| 39 |
- config.Peer.KeyFile = etcdServerConfig.PeerServingInfo.ServerCert.KeyFile |
|
| 43 |
+ cfg.peerTLSInfo.CAFile = etcdServerConfig.PeerServingInfo.ClientCA |
|
| 44 |
+ cfg.peerTLSInfo.CertFile = etcdServerConfig.PeerServingInfo.ServerCert.CertFile |
|
| 45 |
+ cfg.peerTLSInfo.KeyFile = etcdServerConfig.PeerServingInfo.ServerCert.KeyFile |
|
| 46 |
+ } |
|
| 47 |
+ if cfg.lpurls, err = urlsFromStrings(etcdServerConfig.PeerServingInfo.BindAddress, cfg.peerTLSInfo); err != nil {
|
|
| 48 |
+ glog.Fatalf("Unable to build etcd peer URLs: %v", err)
|
|
| 49 |
+ } |
|
| 50 |
+ |
|
| 51 |
+ if cfg.acurls, err = urlsFromStrings(etcdServerConfig.Address, cfg.clientTLSInfo); err != nil {
|
|
| 52 |
+ glog.Fatalf("Unable to build etcd announce client URLs: %v", err)
|
|
| 53 |
+ } |
|
| 54 |
+ if cfg.apurls, err = urlsFromStrings(etcdServerConfig.PeerAddress, cfg.peerTLSInfo); err != nil {
|
|
| 55 |
+ glog.Fatalf("Unable to build etcd announce peer URLs: %v", err)
|
|
| 56 |
+ } |
|
| 57 |
+ |
|
| 58 |
+ if err := cfg.resolveUrls(); err != nil {
|
|
| 59 |
+ glog.Fatalf("Unable to resolve etcd URLs: %v", err)
|
|
| 40 | 60 |
} |
| 41 | 61 |
|
| 42 |
- config.DataDir = etcdServerConfig.StorageDir |
|
| 43 |
- config.Name = "openshift.local" |
|
| 62 |
+ cfg.initialCluster = fmt.Sprintf("%s=%s", cfg.name, cfg.apurls[0].String())
|
|
| 44 | 63 |
|
| 45 |
- server := etcd.New(config) |
|
| 46 |
- go util.Forever(func() {
|
|
| 47 |
- glog.Infof("Started etcd at %s", config.Addr)
|
|
| 48 |
- server.Run() |
|
| 49 |
- glog.Fatalf("etcd died, exiting.")
|
|
| 50 |
- }, 500*time.Millisecond) |
|
| 51 |
- <-server.ReadyNotify()*/ |
|
| 64 |
+ stopped, err := startEtcd(cfg) |
|
| 65 |
+ if err != nil {
|
|
| 66 |
+ glog.Fatalf("Unable to start etcd: %v", err)
|
|
| 67 |
+ } |
|
| 68 |
+ go func() {
|
|
| 69 |
+ glog.Infof("Started etcd at %s", etcdServerConfig.Address)
|
|
| 70 |
+ <-stopped |
|
| 71 |
+ }() |
|
| 52 | 72 |
} |
| 53 | 73 |
|
| 54 |
-// getAndTestEtcdClient creates an etcd client based on the provided config and waits |
|
| 74 |
+// GetAndTestEtcdClient creates an etcd client based on the provided config and waits |
|
| 55 | 75 |
// until etcd server is reachable. It errors out and exits if the server cannot |
| 56 | 76 |
// be reached for a certain amount of time. |
| 57 | 77 |
func GetAndTestEtcdClient(etcdClientInfo configapi.EtcdConnectionInfo) (*etcdclient.Client, error) {
|
| ... | ... |
@@ -93,7 +110,7 @@ func GetAndTestEtcdClient(etcdClientInfo configapi.EtcdConnectionInfo) (*etcdcli |
| 93 | 93 |
return etcdClient, nil |
| 94 | 94 |
} |
| 95 | 95 |
|
| 96 |
-// newOpenShiftEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version |
|
| 96 |
+// NewOpenShiftEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version |
|
| 97 | 97 |
// is incorrect. |
| 98 | 98 |
func NewOpenShiftEtcdHelper(etcdClientInfo configapi.EtcdConnectionInfo) (helper tools.EtcdHelper, err error) {
|
| 99 | 99 |
// Connect and setup etcd interfaces |
| 100 | 100 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,209 @@ |
| 0 |
+// This is a somewhat faithful reproduction of etcdmain/etcd.go |
|
| 1 |
+package etcd |
|
| 2 |
+ |
|
| 3 |
+import ( |
|
| 4 |
+ "fmt" |
|
| 5 |
+ "io/ioutil" |
|
| 6 |
+ "log" |
|
| 7 |
+ "net" |
|
| 8 |
+ "net/http" |
|
| 9 |
+ "net/url" |
|
| 10 |
+ "strings" |
|
| 11 |
+ "time" |
|
| 12 |
+ |
|
| 13 |
+ "github.com/golang/glog" |
|
| 14 |
+ |
|
| 15 |
+ "github.com/coreos/etcd/etcdserver" |
|
| 16 |
+ "github.com/coreos/etcd/etcdserver/etcdhttp" |
|
| 17 |
+ "github.com/coreos/etcd/pkg/netutil" |
|
| 18 |
+ "github.com/coreos/etcd/pkg/osutil" |
|
| 19 |
+ "github.com/coreos/etcd/pkg/transport" |
|
| 20 |
+ "github.com/coreos/etcd/pkg/types" |
|
| 21 |
+ "github.com/coreos/etcd/rafthttp" |
|
| 22 |
+) |
|
| 23 |
+ |
|
| 24 |
+type config struct {
|
|
| 25 |
+ // member |
|
| 26 |
+ dir string |
|
| 27 |
+ lpurls, lcurls []url.URL |
|
| 28 |
+ maxSnapFiles uint |
|
| 29 |
+ maxWalFiles uint |
|
| 30 |
+ name string |
|
| 31 |
+ snapCount uint64 |
|
| 32 |
+ // TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1). |
|
| 33 |
+ // make ticks a cluster wide configuration. |
|
| 34 |
+ TickMs uint |
|
| 35 |
+ ElectionMs uint |
|
| 36 |
+ |
|
| 37 |
+ // clustering |
|
| 38 |
+ apurls, acurls []url.URL |
|
| 39 |
+ initialCluster string |
|
| 40 |
+ initialClusterToken string |
|
| 41 |
+ |
|
| 42 |
+ // security |
|
| 43 |
+ clientTLSInfo, peerTLSInfo transport.TLSInfo |
|
| 44 |
+} |
|
| 45 |
+ |
|
| 46 |
+const ( |
|
| 47 |
+ // the owner can make/remove files inside the directory |
|
| 48 |
+ defaultName = "openshift.local" |
|
| 49 |
+) |
|
| 50 |
+ |
|
| 51 |
+// startEtcd launches the etcd server and HTTP handlers for client/server communication. |
|
| 52 |
+func startEtcd(cfg *config) (<-chan struct{}, error) {
|
|
| 53 |
+ cls, err := setupCluster(cfg) |
|
| 54 |
+ if err != nil {
|
|
| 55 |
+ return nil, fmt.Errorf("error setting up initial cluster: %v", err)
|
|
| 56 |
+ } |
|
| 57 |
+ |
|
| 58 |
+ pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout) |
|
| 59 |
+ if err != nil {
|
|
| 60 |
+ return nil, err |
|
| 61 |
+ } |
|
| 62 |
+ |
|
| 63 |
+ if !cfg.peerTLSInfo.Empty() {
|
|
| 64 |
+ glog.V(2).Infof("etcd: peerTLS: %s", cfg.peerTLSInfo)
|
|
| 65 |
+ } |
|
| 66 |
+ plns := make([]net.Listener, 0) |
|
| 67 |
+ for _, u := range cfg.lpurls {
|
|
| 68 |
+ var l net.Listener |
|
| 69 |
+ l, err = transport.NewTimeoutListener(u.Host, u.Scheme, cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout) |
|
| 70 |
+ if err != nil {
|
|
| 71 |
+ return nil, err |
|
| 72 |
+ } |
|
| 73 |
+ |
|
| 74 |
+ urlStr := u.String() |
|
| 75 |
+ glog.V(2).Info("etcd: listening for peers on ", urlStr)
|
|
| 76 |
+ defer func() {
|
|
| 77 |
+ if err != nil {
|
|
| 78 |
+ l.Close() |
|
| 79 |
+ glog.V(2).Info("etcd: stopping listening for peers on ", urlStr)
|
|
| 80 |
+ } |
|
| 81 |
+ }() |
|
| 82 |
+ plns = append(plns, l) |
|
| 83 |
+ } |
|
| 84 |
+ |
|
| 85 |
+ if !cfg.clientTLSInfo.Empty() {
|
|
| 86 |
+ glog.V(2).Infof("etcd: clientTLS: %s", cfg.clientTLSInfo)
|
|
| 87 |
+ } |
|
| 88 |
+ clns := make([]net.Listener, 0) |
|
| 89 |
+ for _, u := range cfg.lcurls {
|
|
| 90 |
+ var l net.Listener |
|
| 91 |
+ l, err = transport.NewKeepAliveListener(u.Host, u.Scheme, cfg.clientTLSInfo) |
|
| 92 |
+ if err != nil {
|
|
| 93 |
+ return nil, err |
|
| 94 |
+ } |
|
| 95 |
+ |
|
| 96 |
+ urlStr := u.String() |
|
| 97 |
+ glog.V(2).Info("etcd: listening for client requests on ", urlStr)
|
|
| 98 |
+ defer func() {
|
|
| 99 |
+ if err != nil {
|
|
| 100 |
+ l.Close() |
|
| 101 |
+ glog.V(2).Info("etcd: stopping listening for client requests on ", urlStr)
|
|
| 102 |
+ } |
|
| 103 |
+ }() |
|
| 104 |
+ clns = append(clns, l) |
|
| 105 |
+ } |
|
| 106 |
+ |
|
| 107 |
+ srvcfg := &etcdserver.ServerConfig{
|
|
| 108 |
+ Name: cfg.name, |
|
| 109 |
+ ClientURLs: cfg.acurls, |
|
| 110 |
+ PeerURLs: cfg.apurls, |
|
| 111 |
+ DataDir: cfg.dir, |
|
| 112 |
+ SnapCount: cfg.snapCount, |
|
| 113 |
+ MaxSnapFiles: cfg.maxSnapFiles, |
|
| 114 |
+ MaxWALFiles: cfg.maxWalFiles, |
|
| 115 |
+ Cluster: cls, |
|
| 116 |
+ NewCluster: true, |
|
| 117 |
+ ForceNewCluster: false, |
|
| 118 |
+ Transport: pt, |
|
| 119 |
+ TickMs: cfg.TickMs, |
|
| 120 |
+ ElectionTicks: cfg.electionTicks(), |
|
| 121 |
+ } |
|
| 122 |
+ var s *etcdserver.EtcdServer |
|
| 123 |
+ s, err = etcdserver.NewServer(srvcfg) |
|
| 124 |
+ if err != nil {
|
|
| 125 |
+ return nil, err |
|
| 126 |
+ } |
|
| 127 |
+ osutil.HandleInterrupts() |
|
| 128 |
+ s.Start() |
|
| 129 |
+ osutil.RegisterInterruptHandler(s.Stop) |
|
| 130 |
+ |
|
| 131 |
+ ch := etcdhttp.NewClientHandler(s) |
|
| 132 |
+ ph := etcdhttp.NewPeerHandler(s.Cluster, etcdserver.RaftTimer(s), s.RaftHandler()) |
|
| 133 |
+ // Start the peer server in a goroutine |
|
| 134 |
+ for _, l := range plns {
|
|
| 135 |
+ go func(l net.Listener) {
|
|
| 136 |
+ glog.Fatal(serveHTTP(l, ph, 5*time.Minute)) |
|
| 137 |
+ }(l) |
|
| 138 |
+ } |
|
| 139 |
+ // Start a client server goroutine for each listen address |
|
| 140 |
+ for _, l := range clns {
|
|
| 141 |
+ go func(l net.Listener) {
|
|
| 142 |
+ // read timeout does not work with http close notify |
|
| 143 |
+ // TODO: https://github.com/golang/go/issues/9524 |
|
| 144 |
+ glog.Fatal(serveHTTP(l, ch, 0)) |
|
| 145 |
+ }(l) |
|
| 146 |
+ } |
|
| 147 |
+ return s.StopNotify(), nil |
|
| 148 |
+} |
|
| 149 |
+ |
|
| 150 |
+// setupCluster sets up an initial cluster definition for bootstrap or discovery. |
|
| 151 |
+func setupCluster(cfg *config) (*etcdserver.Cluster, error) {
|
|
| 152 |
+ var cls *etcdserver.Cluster |
|
| 153 |
+ var err error |
|
| 154 |
+ switch {
|
|
| 155 |
+ default: |
|
| 156 |
+ // We're statically configured, and cluster has appropriately been set. |
|
| 157 |
+ cls, err = etcdserver.NewClusterFromString(cfg.initialClusterToken, cfg.initialCluster) |
|
| 158 |
+ } |
|
| 159 |
+ return cls, err |
|
| 160 |
+} |
|
| 161 |
+ |
|
| 162 |
+func genClusterString(name string, urls types.URLs) string {
|
|
| 163 |
+ addrs := make([]string, 0) |
|
| 164 |
+ for _, u := range urls {
|
|
| 165 |
+ addrs = append(addrs, fmt.Sprintf("%v=%v", name, u.String()))
|
|
| 166 |
+ } |
|
| 167 |
+ return strings.Join(addrs, ",") |
|
| 168 |
+} |
|
| 169 |
+ |
|
| 170 |
+func initialClusterFromName(name string) string {
|
|
| 171 |
+ n := name |
|
| 172 |
+ if name == "" {
|
|
| 173 |
+ n = defaultName |
|
| 174 |
+ } |
|
| 175 |
+ return fmt.Sprintf("%s=http://localhost:7001", n)
|
|
| 176 |
+} |
|
| 177 |
+ |
|
| 178 |
+func urlsFromStrings(input string, tlsInfo transport.TLSInfo) ([]url.URL, error) {
|
|
| 179 |
+ urls := []url.URL{}
|
|
| 180 |
+ for _, addr := range strings.Split(input, ",") {
|
|
| 181 |
+ addrURL := url.URL{Scheme: "http", Host: addr}
|
|
| 182 |
+ if !tlsInfo.Empty() {
|
|
| 183 |
+ addrURL.Scheme = "https" |
|
| 184 |
+ } |
|
| 185 |
+ urls = append(urls, addrURL) |
|
| 186 |
+ } |
|
| 187 |
+ return urls, nil |
|
| 188 |
+} |
|
| 189 |
+ |
|
| 190 |
+// serveHTTP accepts incoming HTTP connections on the listener l, |
|
| 191 |
+// creating a new service goroutine for each. The service goroutines |
|
| 192 |
+// read requests and then call handler to reply to them. |
|
| 193 |
+func serveHTTP(l net.Listener, handler http.Handler, readTimeout time.Duration) error {
|
|
| 194 |
+ logger := log.New(ioutil.Discard, "etcdhttp", 0) |
|
| 195 |
+ // TODO: add debug flag; enable logging when debug flag is set |
|
| 196 |
+ srv := &http.Server{
|
|
| 197 |
+ Handler: handler, |
|
| 198 |
+ ReadTimeout: readTimeout, |
|
| 199 |
+ ErrorLog: logger, // do not log user error |
|
| 200 |
+ } |
|
| 201 |
+ return srv.Serve(l) |
|
| 202 |
+} |
|
| 203 |
+ |
|
| 204 |
+func (cfg *config) resolveUrls() error {
|
|
| 205 |
+ return netutil.ResolveTCPAddrs(cfg.lpurls, cfg.apurls, cfg.lcurls, cfg.acurls) |
|
| 206 |
+} |
|
| 207 |
+ |
|
| 208 |
+func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
|