Browse code

Libnetwork revendoring

Diff:
https://github.com/docker/libnetwork/compare/5ab4ab830062fe8a30a44b75b0bda6b1f4f166a4...20dd462e0a0e883437a274bd61df4bc4de980830

- Memberlist revendor (fix for deadlock on exit)
- Network diagnostic client
- Fix for ndots configuration

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

Flavio Crisciani authored on 2018/01/30 04:19:37
Showing 24 changed files
... ...
@@ -312,17 +312,19 @@ func (daemon *Daemon) reloadLiveRestore(conf *config.Config, attributes map[stri
312 312
 	return nil
313 313
 }
314 314
 
315
-// reloadNetworkDiagnosticPort updates the network controller starting the diagnose mode if the config is valid
315
+// reloadNetworkDiagnosticPort updates the network controller starting the diagnostic if the config is valid
316 316
 func (daemon *Daemon) reloadNetworkDiagnosticPort(conf *config.Config, attributes map[string]string) error {
317
-	if conf == nil || daemon.netController == nil {
317
+	if conf == nil || daemon.netController == nil || !conf.IsValueSet("network-diagnostic-port") ||
318
+		conf.NetworkDiagnosticPort < 1 || conf.NetworkDiagnosticPort > 65535 {
319
+		// If there is no config make sure that the diagnostic is off
320
+		if daemon.netController != nil {
321
+			daemon.netController.StopDiagnostic()
322
+		}
318 323
 		return nil
319 324
 	}
320
-	// Enable the network diagnose if the flag is set with a valid port withing the range
321
-	if conf.IsValueSet("network-diagnostic-port") && conf.NetworkDiagnosticPort > 0 && conf.NetworkDiagnosticPort < 65536 {
322
-		logrus.Warnf("Calling the diagnostic start with %d", conf.NetworkDiagnosticPort)
323
-		daemon.netController.StartDiagnose(conf.NetworkDiagnosticPort)
324
-	} else {
325
-		daemon.netController.StopDiagnose()
326
-	}
325
+	// Enable the network diagnostic if the flag is set with a valid port withing the range
326
+	logrus.WithFields(logrus.Fields{"port": conf.NetworkDiagnosticPort, "ip": "127.0.0.1"}).Warn("Starting network diagnostic server")
327
+	daemon.netController.StartDiagnostic(conf.NetworkDiagnosticPort)
328
+
327 329
 	return nil
328 330
 }
... ...
@@ -513,18 +513,18 @@ func TestDaemonReloadNetworkDiagnosticPort(t *testing.T) {
513 513
 		if err := daemon.Reload(enableConfig); err != nil {
514 514
 			t.Fatal(err)
515 515
 		}
516
-		// Check that the diagnose is enabled
517
-		if !daemon.netController.IsDiagnoseEnabled() {
518
-			t.Fatalf("diagnosed should be enable")
516
+		// Check that the diagnostic is enabled
517
+		if !daemon.netController.IsDiagnosticEnabled() {
518
+			t.Fatalf("diagnostic should be enable")
519 519
 		}
520 520
 
521 521
 		// Reload
522 522
 		if err := daemon.Reload(disableConfig); err != nil {
523 523
 			t.Fatal(err)
524 524
 		}
525
-		// Check that the diagnose is disabled
526
-		if daemon.netController.IsDiagnoseEnabled() {
527
-			t.Fatalf("diagnosed should be disable")
525
+		// Check that the diagnostic is disabled
526
+		if daemon.netController.IsDiagnosticEnabled() {
527
+			t.Fatalf("diagnostic should be disable")
528 528
 		}
529 529
 	}
530 530
 
... ...
@@ -533,18 +533,18 @@ func TestDaemonReloadNetworkDiagnosticPort(t *testing.T) {
533 533
 	if err := daemon.Reload(enableConfig); err != nil {
534 534
 		t.Fatal(err)
535 535
 	}
536
-	// Check that the diagnose is enabled
537
-	if !daemon.netController.IsDiagnoseEnabled() {
538
-		t.Fatalf("diagnosed should be enable")
536
+	// Check that the diagnostic is enabled
537
+	if !daemon.netController.IsDiagnosticEnabled() {
538
+		t.Fatalf("diagnostic should be enable")
539 539
 	}
540 540
 
541 541
 	// Check that another reload does not cause issues
542 542
 	if err := daemon.Reload(enableConfig); err != nil {
543 543
 		t.Fatal(err)
544 544
 	}
545
-	// Check that the diagnose is enable
546
-	if !daemon.netController.IsDiagnoseEnabled() {
547
-		t.Fatalf("diagnosed should be enable")
545
+	// Check that the diagnostic is enable
546
+	if !daemon.netController.IsDiagnosticEnabled() {
547
+		t.Fatalf("diagnostic should be enable")
548 548
 	}
549 549
 
550 550
 }
... ...
@@ -31,12 +31,12 @@ github.com/moby/buildkit aaff9d591ef128560018433fe61beb802e149de8
31 31
 github.com/tonistiigi/fsutil dea3a0da73aee887fc02142d995be764106ac5e2
32 32
 
33 33
 #get libnetwork packages
34
-github.com/docker/libnetwork 5ab4ab830062fe8a30a44b75b0bda6b1f4f166a4
34
+github.com/docker/libnetwork 20dd462e0a0e883437a274bd61df4bc4de980830
35 35
 github.com/docker/go-events 9461782956ad83b30282bf90e31fa6a70c255ba9
36 36
 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
37 37
 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec
38 38
 github.com/hashicorp/go-msgpack 71c2886f5a673a35f909803f38ece5810165097b
39
-github.com/hashicorp/memberlist v0.1.0
39
+github.com/hashicorp/memberlist 3d8438da9589e7b608a83ffac1ef8211486bcb7c
40 40
 github.com/sean-/seed e2103e2c35297fb7e17febb81e49b312087a2372
41 41
 github.com/hashicorp/go-sockaddr acd314c5781ea706c710d9ea70069fd2e110d61d
42 42
 github.com/hashicorp/go-multierror fcdddc395df1ddf4247c69bd436e84cfa0733f7e
... ...
@@ -297,8 +297,8 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
297 297
 		return err
298 298
 	}
299 299
 
300
-	// Register the diagnose handlers
301
-	c.DiagnoseServer.RegisterHandler(nDB, networkdb.NetDbPaths2Func)
300
+	// Register the diagnostic handlers
301
+	c.DiagnosticServer.RegisterHandler(nDB, networkdb.NetDbPaths2Func)
302 302
 
303 303
 	var cancelList []func()
304 304
 	ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
... ...
@@ -61,7 +61,7 @@ import (
61 61
 	"github.com/docker/libnetwork/cluster"
62 62
 	"github.com/docker/libnetwork/config"
63 63
 	"github.com/docker/libnetwork/datastore"
64
-	"github.com/docker/libnetwork/diagnose"
64
+	"github.com/docker/libnetwork/diagnostic"
65 65
 	"github.com/docker/libnetwork/discoverapi"
66 66
 	"github.com/docker/libnetwork/driverapi"
67 67
 	"github.com/docker/libnetwork/drvregistry"
... ...
@@ -136,12 +136,12 @@ type NetworkController interface {
136 136
 	// SetKeys configures the encryption key for gossip and overlay data path
137 137
 	SetKeys(keys []*types.EncryptionKey) error
138 138
 
139
-	// StartDiagnose start the network diagnose mode
140
-	StartDiagnose(port int)
141
-	// StopDiagnose start the network diagnose mode
142
-	StopDiagnose()
143
-	// IsDiagnoseEnabled returns true if the diagnose is enabled
144
-	IsDiagnoseEnabled() bool
139
+	// StartDiagnostic start the network diagnostic mode
140
+	StartDiagnostic(port int)
141
+	// StopDiagnostic start the network diagnostic mode
142
+	StopDiagnostic()
143
+	// IsDiagnosticEnabled returns true if the diagnostic is enabled
144
+	IsDiagnosticEnabled() bool
145 145
 }
146 146
 
147 147
 // NetworkWalker is a client provided function which will be used to walk the Networks.
... ...
@@ -176,7 +176,7 @@ type controller struct {
176 176
 	agentStopDone          chan struct{}
177 177
 	keys                   []*types.EncryptionKey
178 178
 	clusterConfigAvailable bool
179
-	DiagnoseServer         *diagnose.Server
179
+	DiagnosticServer       *diagnostic.Server
180 180
 	sync.Mutex
181 181
 }
182 182
 
... ...
@@ -188,16 +188,16 @@ type initializer struct {
188 188
 // New creates a new instance of network controller.
189 189
 func New(cfgOptions ...config.Option) (NetworkController, error) {
190 190
 	c := &controller{
191
-		id:              stringid.GenerateRandomID(),
192
-		cfg:             config.ParseConfigOptions(cfgOptions...),
193
-		sandboxes:       sandboxTable{},
194
-		svcRecords:      make(map[string]svcInfo),
195
-		serviceBindings: make(map[serviceKey]*service),
196
-		agentInitDone:   make(chan struct{}),
197
-		networkLocker:   locker.New(),
198
-		DiagnoseServer:  diagnose.New(),
191
+		id:               stringid.GenerateRandomID(),
192
+		cfg:              config.ParseConfigOptions(cfgOptions...),
193
+		sandboxes:        sandboxTable{},
194
+		svcRecords:       make(map[string]svcInfo),
195
+		serviceBindings:  make(map[serviceKey]*service),
196
+		agentInitDone:    make(chan struct{}),
197
+		networkLocker:    locker.New(),
198
+		DiagnosticServer: diagnostic.New(),
199 199
 	}
200
-	c.DiagnoseServer.Init()
200
+	c.DiagnosticServer.Init()
201 201
 
202 202
 	if err := c.initStores(); err != nil {
203 203
 		return nil, err
... ...
@@ -1307,27 +1307,27 @@ func (c *controller) Stop() {
1307 1307
 	osl.GC()
1308 1308
 }
1309 1309
 
1310
-// StartDiagnose start the network diagnose mode
1311
-func (c *controller) StartDiagnose(port int) {
1310
+// StartDiagnostic start the network dias mode
1311
+func (c *controller) StartDiagnostic(port int) {
1312 1312
 	c.Lock()
1313
-	if !c.DiagnoseServer.IsDebugEnable() {
1314
-		c.DiagnoseServer.EnableDebug("127.0.0.1", port)
1313
+	if !c.DiagnosticServer.IsDiagnosticEnabled() {
1314
+		c.DiagnosticServer.EnableDiagnostic("127.0.0.1", port)
1315 1315
 	}
1316 1316
 	c.Unlock()
1317 1317
 }
1318 1318
 
1319
-// StopDiagnose start the network diagnose mode
1320
-func (c *controller) StopDiagnose() {
1319
+// StopDiagnostic start the network dias mode
1320
+func (c *controller) StopDiagnostic() {
1321 1321
 	c.Lock()
1322
-	if c.DiagnoseServer.IsDebugEnable() {
1323
-		c.DiagnoseServer.DisableDebug()
1322
+	if c.DiagnosticServer.IsDiagnosticEnabled() {
1323
+		c.DiagnosticServer.DisableDiagnostic()
1324 1324
 	}
1325 1325
 	c.Unlock()
1326 1326
 }
1327 1327
 
1328
-// IsDiagnoseEnabled returns true if the diagnose is enabled
1329
-func (c *controller) IsDiagnoseEnabled() bool {
1328
+// IsDiagnosticEnabled returns true if the dias is enabled
1329
+func (c *controller) IsDiagnosticEnabled() bool {
1330 1330
 	c.Lock()
1331 1331
 	defer c.Unlock()
1332
-	return c.DiagnoseServer.IsDebugEnable()
1332
+	return c.DiagnosticServer.IsDiagnosticEnabled()
1333 1333
 }
1334 1334
deleted file mode 100644
... ...
@@ -1,227 +0,0 @@
1
-package diagnose
2
-
3
-import (
4
-	"context"
5
-	"encoding/json"
6
-	"fmt"
7
-	"net/http"
8
-	"sync"
9
-	"sync/atomic"
10
-
11
-	stackdump "github.com/docker/docker/pkg/signal"
12
-	"github.com/docker/libnetwork/common"
13
-	"github.com/sirupsen/logrus"
14
-)
15
-
16
-// HTTPHandlerFunc TODO
17
-type HTTPHandlerFunc func(interface{}, http.ResponseWriter, *http.Request)
18
-
19
-type httpHandlerCustom struct {
20
-	ctx interface{}
21
-	F   func(interface{}, http.ResponseWriter, *http.Request)
22
-}
23
-
24
-// ServeHTTP TODO
25
-func (h httpHandlerCustom) ServeHTTP(w http.ResponseWriter, r *http.Request) {
26
-	h.F(h.ctx, w, r)
27
-}
28
-
29
-var diagPaths2Func = map[string]HTTPHandlerFunc{
30
-	"/":          notImplemented,
31
-	"/help":      help,
32
-	"/ready":     ready,
33
-	"/stackdump": stackTrace,
34
-}
35
-
36
-// Server when the debug is enabled exposes a
37
-// This data structure is protected by the Agent mutex so does not require and additional mutex here
38
-type Server struct {
39
-	enable            int32
40
-	srv               *http.Server
41
-	port              int
42
-	mux               *http.ServeMux
43
-	registeredHanders map[string]bool
44
-	sync.Mutex
45
-}
46
-
47
-// New creates a new diagnose server
48
-func New() *Server {
49
-	return &Server{
50
-		registeredHanders: make(map[string]bool),
51
-	}
52
-}
53
-
54
-// Init initialize the mux for the http handling and register the base hooks
55
-func (s *Server) Init() {
56
-	s.mux = http.NewServeMux()
57
-
58
-	// Register local handlers
59
-	s.RegisterHandler(s, diagPaths2Func)
60
-}
61
-
62
-// RegisterHandler allows to register new handlers to the mux and to a specific path
63
-func (s *Server) RegisterHandler(ctx interface{}, hdlrs map[string]HTTPHandlerFunc) {
64
-	s.Lock()
65
-	defer s.Unlock()
66
-	for path, fun := range hdlrs {
67
-		if _, ok := s.registeredHanders[path]; ok {
68
-			continue
69
-		}
70
-		s.mux.Handle(path, httpHandlerCustom{ctx, fun})
71
-		s.registeredHanders[path] = true
72
-	}
73
-}
74
-
75
-// ServeHTTP this is the method called bu the ListenAndServe, and is needed to allow us to
76
-// use our custom mux
77
-func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
78
-	s.mux.ServeHTTP(w, r)
79
-}
80
-
81
-// EnableDebug opens a TCP socket to debug the passed network DB
82
-func (s *Server) EnableDebug(ip string, port int) {
83
-	s.Lock()
84
-	defer s.Unlock()
85
-
86
-	s.port = port
87
-
88
-	if s.enable == 1 {
89
-		logrus.Info("The server is already up and running")
90
-		return
91
-	}
92
-
93
-	logrus.Infof("Starting the diagnose server listening on %d for commands", port)
94
-	srv := &http.Server{Addr: fmt.Sprintf("%s:%d", ip, port), Handler: s}
95
-	s.srv = srv
96
-	s.enable = 1
97
-	go func(n *Server) {
98
-		// Ingore ErrServerClosed that is returned on the Shutdown call
99
-		if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
100
-			logrus.Errorf("ListenAndServe error: %s", err)
101
-			atomic.SwapInt32(&n.enable, 0)
102
-		}
103
-	}(s)
104
-}
105
-
106
-// DisableDebug stop the dubug and closes the tcp socket
107
-func (s *Server) DisableDebug() {
108
-	s.Lock()
109
-	defer s.Unlock()
110
-
111
-	s.srv.Shutdown(context.Background())
112
-	s.srv = nil
113
-	s.enable = 0
114
-	logrus.Info("Disabling the diagnose server")
115
-}
116
-
117
-// IsDebugEnable returns true when the debug is enabled
118
-func (s *Server) IsDebugEnable() bool {
119
-	s.Lock()
120
-	defer s.Unlock()
121
-	return s.enable == 1
122
-}
123
-
124
-func notImplemented(ctx interface{}, w http.ResponseWriter, r *http.Request) {
125
-	r.ParseForm()
126
-	_, json := ParseHTTPFormOptions(r)
127
-	rsp := WrongCommand("not implemented", fmt.Sprintf("URL path: %s no method implemented check /help\n", r.URL.Path))
128
-
129
-	// audit logs
130
-	log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
131
-	log.Info("command not implemented done")
132
-
133
-	HTTPReply(w, rsp, json)
134
-}
135
-
136
-func help(ctx interface{}, w http.ResponseWriter, r *http.Request) {
137
-	r.ParseForm()
138
-	_, json := ParseHTTPFormOptions(r)
139
-
140
-	// audit logs
141
-	log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
142
-	log.Info("help done")
143
-
144
-	n, ok := ctx.(*Server)
145
-	var result string
146
-	if ok {
147
-		for path := range n.registeredHanders {
148
-			result += fmt.Sprintf("%s\n", path)
149
-		}
150
-		HTTPReply(w, CommandSucceed(&StringCmd{Info: result}), json)
151
-	}
152
-}
153
-
154
-func ready(ctx interface{}, w http.ResponseWriter, r *http.Request) {
155
-	r.ParseForm()
156
-	_, json := ParseHTTPFormOptions(r)
157
-
158
-	// audit logs
159
-	log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
160
-	log.Info("ready done")
161
-	HTTPReply(w, CommandSucceed(&StringCmd{Info: "OK"}), json)
162
-}
163
-
164
-func stackTrace(ctx interface{}, w http.ResponseWriter, r *http.Request) {
165
-	r.ParseForm()
166
-	_, json := ParseHTTPFormOptions(r)
167
-
168
-	// audit logs
169
-	log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
170
-	log.Info("stack trace")
171
-
172
-	path, err := stackdump.DumpStacks("/tmp/")
173
-	if err != nil {
174
-		log.WithError(err).Error("failed to write goroutines dump")
175
-		HTTPReply(w, FailCommand(err), json)
176
-	} else {
177
-		log.Info("stack trace done")
178
-		HTTPReply(w, CommandSucceed(&StringCmd{Info: fmt.Sprintf("goroutine stacks written to %s", path)}), json)
179
-	}
180
-}
181
-
182
-// DebugHTTPForm helper to print the form url parameters
183
-func DebugHTTPForm(r *http.Request) {
184
-	for k, v := range r.Form {
185
-		logrus.Debugf("Form[%q] = %q\n", k, v)
186
-	}
187
-}
188
-
189
-// JSONOutput contains details on JSON output printing
190
-type JSONOutput struct {
191
-	enable      bool
192
-	prettyPrint bool
193
-}
194
-
195
-// ParseHTTPFormOptions easily parse the JSON printing options
196
-func ParseHTTPFormOptions(r *http.Request) (bool, *JSONOutput) {
197
-	_, unsafe := r.Form["unsafe"]
198
-	v, json := r.Form["json"]
199
-	var pretty bool
200
-	if len(v) > 0 {
201
-		pretty = v[0] == "pretty"
202
-	}
203
-	return unsafe, &JSONOutput{enable: json, prettyPrint: pretty}
204
-}
205
-
206
-// HTTPReply helper function that takes care of sending the message out
207
-func HTTPReply(w http.ResponseWriter, r *HTTPResult, j *JSONOutput) (int, error) {
208
-	var response []byte
209
-	if j.enable {
210
-		w.Header().Set("Content-Type", "application/json")
211
-		var err error
212
-		if j.prettyPrint {
213
-			response, err = json.MarshalIndent(r, "", "  ")
214
-			if err != nil {
215
-				response, _ = json.MarshalIndent(FailCommand(err), "", "  ")
216
-			}
217
-		} else {
218
-			response, err = json.Marshal(r)
219
-			if err != nil {
220
-				response, _ = json.Marshal(FailCommand(err))
221
-			}
222
-		}
223
-	} else {
224
-		response = []byte(r.String())
225
-	}
226
-	return fmt.Fprint(w, string(response))
227
-}
228 1
deleted file mode 100644
... ...
@@ -1,122 +0,0 @@
1
-package diagnose
2
-
3
-import "fmt"
4
-
5
-// StringInterface interface that has to be implemented by messages
6
-type StringInterface interface {
7
-	String() string
8
-}
9
-
10
-// CommandSucceed creates a success message
11
-func CommandSucceed(result StringInterface) *HTTPResult {
12
-	return &HTTPResult{
13
-		Message: "OK",
14
-		Details: result,
15
-	}
16
-}
17
-
18
-// FailCommand creates a failure message with error
19
-func FailCommand(err error) *HTTPResult {
20
-	return &HTTPResult{
21
-		Message: "FAIL",
22
-		Details: &ErrorCmd{Error: err.Error()},
23
-	}
24
-}
25
-
26
-// WrongCommand creates a wrong command response
27
-func WrongCommand(message, usage string) *HTTPResult {
28
-	return &HTTPResult{
29
-		Message: message,
30
-		Details: &UsageCmd{Usage: usage},
31
-	}
32
-}
33
-
34
-// HTTPResult Diagnose Server HTTP result operation
35
-type HTTPResult struct {
36
-	Message string          `json:"message"`
37
-	Details StringInterface `json:"details"`
38
-}
39
-
40
-func (h *HTTPResult) String() string {
41
-	rsp := h.Message
42
-	if h.Details != nil {
43
-		rsp += "\n" + h.Details.String()
44
-	}
45
-	return rsp
46
-}
47
-
48
-// UsageCmd command with usage field
49
-type UsageCmd struct {
50
-	Usage string `json:"usage"`
51
-}
52
-
53
-func (u *UsageCmd) String() string {
54
-	return "Usage: " + u.Usage
55
-}
56
-
57
-// StringCmd command with info string
58
-type StringCmd struct {
59
-	Info string `json:"info"`
60
-}
61
-
62
-func (s *StringCmd) String() string {
63
-	return s.Info
64
-}
65
-
66
-// ErrorCmd command with error
67
-type ErrorCmd struct {
68
-	Error string `json:"error"`
69
-}
70
-
71
-func (e *ErrorCmd) String() string {
72
-	return "Error: " + e.Error
73
-}
74
-
75
-// TableObj network db table object
76
-type TableObj struct {
77
-	Length   int               `json:"size"`
78
-	Elements []StringInterface `json:"entries"`
79
-}
80
-
81
-func (t *TableObj) String() string {
82
-	output := fmt.Sprintf("total entries: %d\n", t.Length)
83
-	for _, e := range t.Elements {
84
-		output += e.String()
85
-	}
86
-	return output
87
-}
88
-
89
-// PeerEntryObj entry in the networkdb peer table
90
-type PeerEntryObj struct {
91
-	Index int    `json:"-"`
92
-	Name  string `json:"-=name"`
93
-	IP    string `json:"ip"`
94
-}
95
-
96
-func (p *PeerEntryObj) String() string {
97
-	return fmt.Sprintf("%d) %s -> %s\n", p.Index, p.Name, p.IP)
98
-}
99
-
100
-// TableEntryObj network db table entry object
101
-type TableEntryObj struct {
102
-	Index int    `json:"-"`
103
-	Key   string `json:"key"`
104
-	Value string `json:"value"`
105
-	Owner string `json:"owner"`
106
-}
107
-
108
-func (t *TableEntryObj) String() string {
109
-	return fmt.Sprintf("%d) k:`%s` -> v:`%s` owner:`%s`\n", t.Index, t.Key, t.Value, t.Owner)
110
-}
111
-
112
-// TableEndpointsResult fully typed message for proper unmarshaling on the client side
113
-type TableEndpointsResult struct {
114
-	TableObj
115
-	Elements []TableEntryObj `json:"entries"`
116
-}
117
-
118
-// TablePeersResult fully typed message for proper unmarshaling on the client side
119
-type TablePeersResult struct {
120
-	TableObj
121
-	Elements []PeerEntryObj `json:"entries"`
122
-}
123 1
new file mode 100644
... ...
@@ -0,0 +1,227 @@
0
+package diagnostic
1
+
2
+import (
3
+	"context"
4
+	"encoding/json"
5
+	"fmt"
6
+	"net/http"
7
+	"sync"
8
+	"sync/atomic"
9
+
10
+	stackdump "github.com/docker/docker/pkg/signal"
11
+	"github.com/docker/libnetwork/common"
12
+	"github.com/sirupsen/logrus"
13
+)
14
+
15
+// HTTPHandlerFunc TODO
16
+type HTTPHandlerFunc func(interface{}, http.ResponseWriter, *http.Request)
17
+
18
+type httpHandlerCustom struct {
19
+	ctx interface{}
20
+	F   func(interface{}, http.ResponseWriter, *http.Request)
21
+}
22
+
23
+// ServeHTTP TODO
24
+func (h httpHandlerCustom) ServeHTTP(w http.ResponseWriter, r *http.Request) {
25
+	h.F(h.ctx, w, r)
26
+}
27
+
28
+var diagPaths2Func = map[string]HTTPHandlerFunc{
29
+	"/":          notImplemented,
30
+	"/help":      help,
31
+	"/ready":     ready,
32
+	"/stackdump": stackTrace,
33
+}
34
+
35
+// Server when the debug is enabled exposes a
36
+// This data structure is protected by the Agent mutex so does not require and additional mutex here
37
+type Server struct {
38
+	enable            int32
39
+	srv               *http.Server
40
+	port              int
41
+	mux               *http.ServeMux
42
+	registeredHanders map[string]bool
43
+	sync.Mutex
44
+}
45
+
46
+// New creates a new diagnostic server
47
+func New() *Server {
48
+	return &Server{
49
+		registeredHanders: make(map[string]bool),
50
+	}
51
+}
52
+
53
+// Init initialize the mux for the http handling and register the base hooks
54
+func (s *Server) Init() {
55
+	s.mux = http.NewServeMux()
56
+
57
+	// Register local handlers
58
+	s.RegisterHandler(s, diagPaths2Func)
59
+}
60
+
61
+// RegisterHandler allows to register new handlers to the mux and to a specific path
62
+func (s *Server) RegisterHandler(ctx interface{}, hdlrs map[string]HTTPHandlerFunc) {
63
+	s.Lock()
64
+	defer s.Unlock()
65
+	for path, fun := range hdlrs {
66
+		if _, ok := s.registeredHanders[path]; ok {
67
+			continue
68
+		}
69
+		s.mux.Handle(path, httpHandlerCustom{ctx, fun})
70
+		s.registeredHanders[path] = true
71
+	}
72
+}
73
+
74
+// ServeHTTP this is the method called bu the ListenAndServe, and is needed to allow us to
75
+// use our custom mux
76
+func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
77
+	s.mux.ServeHTTP(w, r)
78
+}
79
+
80
+// EnableDiagnostic opens a TCP socket to debug the passed network DB
81
+func (s *Server) EnableDiagnostic(ip string, port int) {
82
+	s.Lock()
83
+	defer s.Unlock()
84
+
85
+	s.port = port
86
+
87
+	if s.enable == 1 {
88
+		logrus.Info("The server is already up and running")
89
+		return
90
+	}
91
+
92
+	logrus.Infof("Starting the diagnostic server listening on %d for commands", port)
93
+	srv := &http.Server{Addr: fmt.Sprintf("%s:%d", ip, port), Handler: s}
94
+	s.srv = srv
95
+	s.enable = 1
96
+	go func(n *Server) {
97
+		// Ingore ErrServerClosed that is returned on the Shutdown call
98
+		if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
99
+			logrus.Errorf("ListenAndServe error: %s", err)
100
+			atomic.SwapInt32(&n.enable, 0)
101
+		}
102
+	}(s)
103
+}
104
+
105
+// DisableDiagnostic stop the dubug and closes the tcp socket
106
+func (s *Server) DisableDiagnostic() {
107
+	s.Lock()
108
+	defer s.Unlock()
109
+
110
+	s.srv.Shutdown(context.Background())
111
+	s.srv = nil
112
+	s.enable = 0
113
+	logrus.Info("Disabling the diagnostic server")
114
+}
115
+
116
+// IsDiagnosticEnabled returns true when the debug is enabled
117
+func (s *Server) IsDiagnosticEnabled() bool {
118
+	s.Lock()
119
+	defer s.Unlock()
120
+	return s.enable == 1
121
+}
122
+
123
+func notImplemented(ctx interface{}, w http.ResponseWriter, r *http.Request) {
124
+	r.ParseForm()
125
+	_, json := ParseHTTPFormOptions(r)
126
+	rsp := WrongCommand("not implemented", fmt.Sprintf("URL path: %s no method implemented check /help\n", r.URL.Path))
127
+
128
+	// audit logs
129
+	log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
130
+	log.Info("command not implemented done")
131
+
132
+	HTTPReply(w, rsp, json)
133
+}
134
+
135
+func help(ctx interface{}, w http.ResponseWriter, r *http.Request) {
136
+	r.ParseForm()
137
+	_, json := ParseHTTPFormOptions(r)
138
+
139
+	// audit logs
140
+	log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
141
+	log.Info("help done")
142
+
143
+	n, ok := ctx.(*Server)
144
+	var result string
145
+	if ok {
146
+		for path := range n.registeredHanders {
147
+			result += fmt.Sprintf("%s\n", path)
148
+		}
149
+		HTTPReply(w, CommandSucceed(&StringCmd{Info: result}), json)
150
+	}
151
+}
152
+
153
+func ready(ctx interface{}, w http.ResponseWriter, r *http.Request) {
154
+	r.ParseForm()
155
+	_, json := ParseHTTPFormOptions(r)
156
+
157
+	// audit logs
158
+	log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
159
+	log.Info("ready done")
160
+	HTTPReply(w, CommandSucceed(&StringCmd{Info: "OK"}), json)
161
+}
162
+
163
+func stackTrace(ctx interface{}, w http.ResponseWriter, r *http.Request) {
164
+	r.ParseForm()
165
+	_, json := ParseHTTPFormOptions(r)
166
+
167
+	// audit logs
168
+	log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
169
+	log.Info("stack trace")
170
+
171
+	path, err := stackdump.DumpStacks("/tmp/")
172
+	if err != nil {
173
+		log.WithError(err).Error("failed to write goroutines dump")
174
+		HTTPReply(w, FailCommand(err), json)
175
+	} else {
176
+		log.Info("stack trace done")
177
+		HTTPReply(w, CommandSucceed(&StringCmd{Info: fmt.Sprintf("goroutine stacks written to %s", path)}), json)
178
+	}
179
+}
180
+
181
+// DebugHTTPForm helper to print the form url parameters
182
+func DebugHTTPForm(r *http.Request) {
183
+	for k, v := range r.Form {
184
+		logrus.Debugf("Form[%q] = %q\n", k, v)
185
+	}
186
+}
187
+
188
+// JSONOutput contains details on JSON output printing
189
+type JSONOutput struct {
190
+	enable      bool
191
+	prettyPrint bool
192
+}
193
+
194
+// ParseHTTPFormOptions easily parse the JSON printing options
195
+func ParseHTTPFormOptions(r *http.Request) (bool, *JSONOutput) {
196
+	_, unsafe := r.Form["unsafe"]
197
+	v, json := r.Form["json"]
198
+	var pretty bool
199
+	if len(v) > 0 {
200
+		pretty = v[0] == "pretty"
201
+	}
202
+	return unsafe, &JSONOutput{enable: json, prettyPrint: pretty}
203
+}
204
+
205
+// HTTPReply helper function that takes care of sending the message out
206
+func HTTPReply(w http.ResponseWriter, r *HTTPResult, j *JSONOutput) (int, error) {
207
+	var response []byte
208
+	if j.enable {
209
+		w.Header().Set("Content-Type", "application/json")
210
+		var err error
211
+		if j.prettyPrint {
212
+			response, err = json.MarshalIndent(r, "", "  ")
213
+			if err != nil {
214
+				response, _ = json.MarshalIndent(FailCommand(err), "", "  ")
215
+			}
216
+		} else {
217
+			response, err = json.Marshal(r)
218
+			if err != nil {
219
+				response, _ = json.Marshal(FailCommand(err))
220
+			}
221
+		}
222
+	} else {
223
+		response = []byte(r.String())
224
+	}
225
+	return fmt.Fprint(w, string(response))
226
+}
0 227
new file mode 100644
... ...
@@ -0,0 +1,122 @@
0
+package diagnostic
1
+
2
+import "fmt"
3
+
4
+// StringInterface interface that has to be implemented by messages
5
+type StringInterface interface {
6
+	String() string
7
+}
8
+
9
+// CommandSucceed creates a success message
10
+func CommandSucceed(result StringInterface) *HTTPResult {
11
+	return &HTTPResult{
12
+		Message: "OK",
13
+		Details: result,
14
+	}
15
+}
16
+
17
+// FailCommand creates a failure message with error
18
+func FailCommand(err error) *HTTPResult {
19
+	return &HTTPResult{
20
+		Message: "FAIL",
21
+		Details: &ErrorCmd{Error: err.Error()},
22
+	}
23
+}
24
+
25
+// WrongCommand creates a wrong command response
26
+func WrongCommand(message, usage string) *HTTPResult {
27
+	return &HTTPResult{
28
+		Message: message,
29
+		Details: &UsageCmd{Usage: usage},
30
+	}
31
+}
32
+
33
+// HTTPResult Diagnostic Server HTTP result operation
34
+type HTTPResult struct {
35
+	Message string          `json:"message"`
36
+	Details StringInterface `json:"details"`
37
+}
38
+
39
+func (h *HTTPResult) String() string {
40
+	rsp := h.Message
41
+	if h.Details != nil {
42
+		rsp += "\n" + h.Details.String()
43
+	}
44
+	return rsp
45
+}
46
+
47
+// UsageCmd command with usage field
48
+type UsageCmd struct {
49
+	Usage string `json:"usage"`
50
+}
51
+
52
+func (u *UsageCmd) String() string {
53
+	return "Usage: " + u.Usage
54
+}
55
+
56
+// StringCmd command with info string
57
+type StringCmd struct {
58
+	Info string `json:"info"`
59
+}
60
+
61
+func (s *StringCmd) String() string {
62
+	return s.Info
63
+}
64
+
65
+// ErrorCmd command with error
66
+type ErrorCmd struct {
67
+	Error string `json:"error"`
68
+}
69
+
70
+func (e *ErrorCmd) String() string {
71
+	return "Error: " + e.Error
72
+}
73
+
74
+// TableObj network db table object
75
+type TableObj struct {
76
+	Length   int               `json:"size"`
77
+	Elements []StringInterface `json:"entries"`
78
+}
79
+
80
+func (t *TableObj) String() string {
81
+	output := fmt.Sprintf("total entries: %d\n", t.Length)
82
+	for _, e := range t.Elements {
83
+		output += e.String()
84
+	}
85
+	return output
86
+}
87
+
88
+// PeerEntryObj entry in the networkdb peer table
89
+type PeerEntryObj struct {
90
+	Index int    `json:"-"`
91
+	Name  string `json:"-=name"`
92
+	IP    string `json:"ip"`
93
+}
94
+
95
+func (p *PeerEntryObj) String() string {
96
+	return fmt.Sprintf("%d) %s -> %s\n", p.Index, p.Name, p.IP)
97
+}
98
+
99
+// TableEntryObj network db table entry object
100
+type TableEntryObj struct {
101
+	Index int    `json:"-"`
102
+	Key   string `json:"key"`
103
+	Value string `json:"value"`
104
+	Owner string `json:"owner"`
105
+}
106
+
107
+func (t *TableEntryObj) String() string {
108
+	return fmt.Sprintf("%d) k:`%s` -> v:`%s` owner:`%s`\n", t.Index, t.Key, t.Value, t.Owner)
109
+}
110
+
111
+// TableEndpointsResult fully typed message for proper unmarshaling on the client side
112
+type TableEndpointsResult struct {
113
+	TableObj
114
+	Elements []TableEntryObj `json:"entries"`
115
+}
116
+
117
+// TablePeersResult fully typed message for proper unmarshaling on the client side
118
+type TablePeersResult struct {
119
+	TableObj
120
+	Elements []PeerEntryObj `json:"entries"`
121
+}
... ...
@@ -165,16 +165,19 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
165 165
 		}
166 166
 	}
167 167
 	nDB.RUnlock()
168
+
168 169
 	if !ok || network.leaving || !nodePresent {
169 170
 		// I'm out of the network OR the event owner is not anymore part of the network so do not propagate
170 171
 		return false
171 172
 	}
172 173
 
174
+	nDB.Lock()
173 175
 	e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
174 176
 	if err == nil {
175 177
 		// We have the latest state. Ignore the event
176 178
 		// since it is stale.
177 179
 		if e.ltime >= tEvent.LTime {
180
+			nDB.Unlock()
178 181
 			return false
179 182
 		}
180 183
 	}
... ...
@@ -195,8 +198,6 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
195 195
 			nDB.config.Hostname, nDB.config.NodeID, tEvent)
196 196
 		e.reapTime = nDB.config.reapEntryInterval
197 197
 	}
198
-
199
-	nDB.Lock()
200 198
 	nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
201 199
 	nDB.Unlock()
202 200
 
... ...
@@ -26,13 +26,10 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
26 26
 	e.broadcastNodeEvent(mn.Addr, opCreate)
27 27
 	e.nDB.Lock()
28 28
 	defer e.nDB.Unlock()
29
+
29 30
 	// In case the node is rejoining after a failure or leave,
30
-	// wait until an explicit join message arrives before adding
31
-	// it to the nodes just to make sure this is not a stale
32
-	// join. If you don't know about this node add it immediately.
33
-	_, fOk := e.nDB.failedNodes[mn.Name]
34
-	_, lOk := e.nDB.leftNodes[mn.Name]
35
-	if fOk || lOk {
31
+	// just add the node back to active
32
+	if moved, _ := e.nDB.changeNodeState(mn.Name, nodeActiveState); moved {
36 33
 		return
37 34
 	}
38 35
 
... ...
@@ -322,18 +322,20 @@ func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
322 322
 // GetEntry retrieves the value of a table entry in a given (network,
323 323
 // table, key) tuple
324 324
 func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
325
+	nDB.RLock()
326
+	defer nDB.RUnlock()
325 327
 	entry, err := nDB.getEntry(tname, nid, key)
326 328
 	if err != nil {
327 329
 		return nil, err
328 330
 	}
331
+	if entry != nil && entry.deleting {
332
+		return nil, types.NotFoundErrorf("entry in table %s network id %s and key %s deleted and pending garbage collection", tname, nid, key)
333
+	}
329 334
 
330 335
 	return entry.value, nil
331 336
 }
332 337
 
333 338
 func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
334
-	nDB.RLock()
335
-	defer nDB.RUnlock()
336
-
337 339
 	e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
338 340
 	if !ok {
339 341
 		return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
... ...
@@ -348,13 +350,10 @@ func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
348 348
 // entry for the same tuple for which there is already an existing
349 349
 // entry unless the current entry is deleting state.
350 350
 func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
351
+	nDB.Lock()
351 352
 	oldEntry, err := nDB.getEntry(tname, nid, key)
352
-	if err != nil {
353
-		if _, ok := err.(types.NotFoundError); !ok {
354
-			return fmt.Errorf("cannot create entry in table %s with network id %s and key %s: %v", tname, nid, key, err)
355
-		}
356
-	}
357
-	if oldEntry != nil && !oldEntry.deleting {
353
+	if err == nil || (oldEntry != nil && !oldEntry.deleting) {
354
+		nDB.Unlock()
358 355
 		return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key)
359 356
 	}
360 357
 
... ...
@@ -364,14 +363,13 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
364 364
 		value: value,
365 365
 	}
366 366
 
367
+	nDB.createOrUpdateEntry(nid, tname, key, entry)
368
+	nDB.Unlock()
369
+
367 370
 	if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil {
368 371
 		return fmt.Errorf("cannot send create event for table %s, %v", tname, err)
369 372
 	}
370 373
 
371
-	nDB.Lock()
372
-	nDB.createOrUpdateEntry(nid, tname, key, entry)
373
-	nDB.Unlock()
374
-
375 374
 	return nil
376 375
 }
377 376
 
... ...
@@ -380,7 +378,9 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
380 380
 // propagates this event to the cluster. It is an error to update a
381 381
 // non-existent entry.
382 382
 func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
383
-	if _, err := nDB.GetEntry(tname, nid, key); err != nil {
383
+	nDB.Lock()
384
+	if _, err := nDB.getEntry(tname, nid, key); err != nil {
385
+		nDB.Unlock()
384 386
 		return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
385 387
 	}
386 388
 
... ...
@@ -390,14 +390,13 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
390 390
 		value: value,
391 391
 	}
392 392
 
393
+	nDB.createOrUpdateEntry(nid, tname, key, entry)
394
+	nDB.Unlock()
395
+
393 396
 	if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil {
394 397
 		return fmt.Errorf("cannot send table update event: %v", err)
395 398
 	}
396 399
 
397
-	nDB.Lock()
398
-	nDB.createOrUpdateEntry(nid, tname, key, entry)
399
-	nDB.Unlock()
400
-
401 400
 	return nil
402 401
 }
403 402
 
... ...
@@ -427,27 +426,29 @@ func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]*TableElem
427 427
 // table, key) tuple and if the NetworkDB is part of the cluster
428 428
 // propagates this event to the cluster.
429 429
 func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
430
-	value, err := nDB.GetEntry(tname, nid, key)
431
-	if err != nil {
432
-		return fmt.Errorf("cannot delete entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
430
+	nDB.Lock()
431
+	oldEntry, err := nDB.getEntry(tname, nid, key)
432
+	if err != nil || oldEntry == nil || oldEntry.deleting {
433
+		nDB.Unlock()
434
+		return fmt.Errorf("cannot delete entry %s with network id %s and key %s "+
435
+			"does not exist or is already being deleted", tname, nid, key)
433 436
 	}
434 437
 
435 438
 	entry := &entry{
436 439
 		ltime:    nDB.tableClock.Increment(),
437 440
 		node:     nDB.config.NodeID,
438
-		value:    value,
441
+		value:    oldEntry.value,
439 442
 		deleting: true,
440 443
 		reapTime: nDB.config.reapEntryInterval,
441 444
 	}
442 445
 
446
+	nDB.createOrUpdateEntry(nid, tname, key, entry)
447
+	nDB.Unlock()
448
+
443 449
 	if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
444 450
 		return fmt.Errorf("cannot send table delete event: %v", err)
445 451
 	}
446 452
 
447
-	nDB.Lock()
448
-	nDB.createOrUpdateEntry(nid, tname, key, entry)
449
-	nDB.Unlock()
450
-
451 453
 	return nil
452 454
 }
453 455
 
454 456
deleted file mode 100644
... ...
@@ -1,409 +0,0 @@
1
-package networkdb
2
-
3
-import (
4
-	"encoding/base64"
5
-	"fmt"
6
-	"net/http"
7
-	"strings"
8
-
9
-	"github.com/docker/libnetwork/common"
10
-	"github.com/docker/libnetwork/diagnose"
11
-	"github.com/sirupsen/logrus"
12
-)
13
-
14
-const (
15
-	missingParameter = "missing parameter"
16
-	dbNotAvailable   = "database not available"
17
-)
18
-
19
-// NetDbPaths2Func TODO
20
-var NetDbPaths2Func = map[string]diagnose.HTTPHandlerFunc{
21
-	"/join":         dbJoin,
22
-	"/networkpeers": dbPeers,
23
-	"/clusterpeers": dbClusterPeers,
24
-	"/joinnetwork":  dbJoinNetwork,
25
-	"/leavenetwork": dbLeaveNetwork,
26
-	"/createentry":  dbCreateEntry,
27
-	"/updateentry":  dbUpdateEntry,
28
-	"/deleteentry":  dbDeleteEntry,
29
-	"/getentry":     dbGetEntry,
30
-	"/gettable":     dbGetTable,
31
-}
32
-
33
-func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) {
34
-	r.ParseForm()
35
-	diagnose.DebugHTTPForm(r)
36
-	_, json := diagnose.ParseHTTPFormOptions(r)
37
-
38
-	// audit logs
39
-	log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
40
-	log.Info("join cluster")
41
-
42
-	if len(r.Form["members"]) < 1 {
43
-		rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?members=ip1,ip2,...", r.URL.Path))
44
-		log.Error("join cluster failed, wrong input")
45
-		diagnose.HTTPReply(w, rsp, json)
46
-		return
47
-	}
48
-
49
-	nDB, ok := ctx.(*NetworkDB)
50
-	if ok {
51
-		err := nDB.Join(strings.Split(r.Form["members"][0], ","))
52
-		if err != nil {
53
-			rsp := diagnose.FailCommand(fmt.Errorf("%s error in the DB join %s", r.URL.Path, err))
54
-			log.WithError(err).Error("join cluster failed")
55
-			diagnose.HTTPReply(w, rsp, json)
56
-			return
57
-		}
58
-
59
-		log.Info("join cluster done")
60
-		diagnose.HTTPReply(w, diagnose.CommandSucceed(nil), json)
61
-		return
62
-	}
63
-	diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
64
-}
65
-
66
-func dbPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
67
-	r.ParseForm()
68
-	diagnose.DebugHTTPForm(r)
69
-	_, json := diagnose.ParseHTTPFormOptions(r)
70
-
71
-	// audit logs
72
-	log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
73
-	log.Info("network peers")
74
-
75
-	if len(r.Form["nid"]) < 1 {
76
-		rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=test", r.URL.Path))
77
-		log.Error("network peers failed, wrong input")
78
-		diagnose.HTTPReply(w, rsp, json)
79
-		return
80
-	}
81
-
82
-	nDB, ok := ctx.(*NetworkDB)
83
-	if ok {
84
-		peers := nDB.Peers(r.Form["nid"][0])
85
-		rsp := &diagnose.TableObj{Length: len(peers)}
86
-		for i, peerInfo := range peers {
87
-			rsp.Elements = append(rsp.Elements, &diagnose.PeerEntryObj{Index: i, Name: peerInfo.Name, IP: peerInfo.IP})
88
-		}
89
-		log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("network peers done")
90
-		diagnose.HTTPReply(w, diagnose.CommandSucceed(rsp), json)
91
-		return
92
-	}
93
-	diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
94
-}
95
-
96
-func dbClusterPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
97
-	r.ParseForm()
98
-	diagnose.DebugHTTPForm(r)
99
-	_, json := diagnose.ParseHTTPFormOptions(r)
100
-
101
-	// audit logs
102
-	log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
103
-	log.Info("cluster peers")
104
-
105
-	nDB, ok := ctx.(*NetworkDB)
106
-	if ok {
107
-		peers := nDB.ClusterPeers()
108
-		rsp := &diagnose.TableObj{Length: len(peers)}
109
-		for i, peerInfo := range peers {
110
-			rsp.Elements = append(rsp.Elements, &diagnose.PeerEntryObj{Index: i, Name: peerInfo.Name, IP: peerInfo.IP})
111
-		}
112
-		log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("cluster peers done")
113
-		diagnose.HTTPReply(w, diagnose.CommandSucceed(rsp), json)
114
-		return
115
-	}
116
-	diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
117
-}
118
-
119
-func dbCreateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
120
-	r.ParseForm()
121
-	diagnose.DebugHTTPForm(r)
122
-	unsafe, json := diagnose.ParseHTTPFormOptions(r)
123
-
124
-	// audit logs
125
-	log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
126
-	log.Info("create entry")
127
-
128
-	if len(r.Form["tname"]) < 1 ||
129
-		len(r.Form["nid"]) < 1 ||
130
-		len(r.Form["key"]) < 1 ||
131
-		len(r.Form["value"]) < 1 {
132
-		rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path))
133
-		log.Error("create entry failed, wrong input")
134
-		diagnose.HTTPReply(w, rsp, json)
135
-		return
136
-	}
137
-
138
-	tname := r.Form["tname"][0]
139
-	nid := r.Form["nid"][0]
140
-	key := r.Form["key"][0]
141
-	value := r.Form["value"][0]
142
-	decodedValue := []byte(value)
143
-	if !unsafe {
144
-		var err error
145
-		decodedValue, err = base64.StdEncoding.DecodeString(value)
146
-		if err != nil {
147
-			log.WithError(err).Error("create entry failed")
148
-			diagnose.HTTPReply(w, diagnose.FailCommand(err), json)
149
-			return
150
-		}
151
-	}
152
-
153
-	nDB, ok := ctx.(*NetworkDB)
154
-	if ok {
155
-		if err := nDB.CreateEntry(tname, nid, key, decodedValue); err != nil {
156
-			rsp := diagnose.FailCommand(err)
157
-			diagnose.HTTPReply(w, rsp, json)
158
-			log.WithError(err).Error("create entry failed")
159
-			return
160
-		}
161
-		log.Info("create entry done")
162
-		diagnose.HTTPReply(w, diagnose.CommandSucceed(nil), json)
163
-		return
164
-	}
165
-	diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
166
-}
167
-
168
-func dbUpdateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
169
-	r.ParseForm()
170
-	diagnose.DebugHTTPForm(r)
171
-	unsafe, json := diagnose.ParseHTTPFormOptions(r)
172
-
173
-	// audit logs
174
-	log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
175
-	log.Info("update entry")
176
-
177
-	if len(r.Form["tname"]) < 1 ||
178
-		len(r.Form["nid"]) < 1 ||
179
-		len(r.Form["key"]) < 1 ||
180
-		len(r.Form["value"]) < 1 {
181
-		rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path))
182
-		log.Error("update entry failed, wrong input")
183
-		diagnose.HTTPReply(w, rsp, json)
184
-		return
185
-	}
186
-
187
-	tname := r.Form["tname"][0]
188
-	nid := r.Form["nid"][0]
189
-	key := r.Form["key"][0]
190
-	value := r.Form["value"][0]
191
-	decodedValue := []byte(value)
192
-	if !unsafe {
193
-		var err error
194
-		decodedValue, err = base64.StdEncoding.DecodeString(value)
195
-		if err != nil {
196
-			log.WithError(err).Error("update entry failed")
197
-			diagnose.HTTPReply(w, diagnose.FailCommand(err), json)
198
-			return
199
-		}
200
-	}
201
-
202
-	nDB, ok := ctx.(*NetworkDB)
203
-	if ok {
204
-		if err := nDB.UpdateEntry(tname, nid, key, decodedValue); err != nil {
205
-			log.WithError(err).Error("update entry failed")
206
-			diagnose.HTTPReply(w, diagnose.FailCommand(err), json)
207
-			return
208
-		}
209
-		log.Info("update entry done")
210
-		diagnose.HTTPReply(w, diagnose.CommandSucceed(nil), json)
211
-		return
212
-	}
213
-	diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
214
-}
215
-
216
-func dbDeleteEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
217
-	r.ParseForm()
218
-	diagnose.DebugHTTPForm(r)
219
-	_, json := diagnose.ParseHTTPFormOptions(r)
220
-
221
-	// audit logs
222
-	log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
223
-	log.Info("delete entry")
224
-
225
-	if len(r.Form["tname"]) < 1 ||
226
-		len(r.Form["nid"]) < 1 ||
227
-		len(r.Form["key"]) < 1 {
228
-		rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path))
229
-		log.Error("delete entry failed, wrong input")
230
-		diagnose.HTTPReply(w, rsp, json)
231
-		return
232
-	}
233
-
234
-	tname := r.Form["tname"][0]
235
-	nid := r.Form["nid"][0]
236
-	key := r.Form["key"][0]
237
-
238
-	nDB, ok := ctx.(*NetworkDB)
239
-	if ok {
240
-		err := nDB.DeleteEntry(tname, nid, key)
241
-		if err != nil {
242
-			log.WithError(err).Error("delete entry failed")
243
-			diagnose.HTTPReply(w, diagnose.FailCommand(err), json)
244
-			return
245
-		}
246
-		log.Info("delete entry done")
247
-		diagnose.HTTPReply(w, diagnose.CommandSucceed(nil), json)
248
-		return
249
-	}
250
-	diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
251
-}
252
-
253
-func dbGetEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
254
-	r.ParseForm()
255
-	diagnose.DebugHTTPForm(r)
256
-	unsafe, json := diagnose.ParseHTTPFormOptions(r)
257
-
258
-	// audit logs
259
-	log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
260
-	log.Info("get entry")
261
-
262
-	if len(r.Form["tname"]) < 1 ||
263
-		len(r.Form["nid"]) < 1 ||
264
-		len(r.Form["key"]) < 1 {
265
-		rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path))
266
-		log.Error("get entry failed, wrong input")
267
-		diagnose.HTTPReply(w, rsp, json)
268
-		return
269
-	}
270
-
271
-	tname := r.Form["tname"][0]
272
-	nid := r.Form["nid"][0]
273
-	key := r.Form["key"][0]
274
-
275
-	nDB, ok := ctx.(*NetworkDB)
276
-	if ok {
277
-		value, err := nDB.GetEntry(tname, nid, key)
278
-		if err != nil {
279
-			log.WithError(err).Error("get entry failed")
280
-			diagnose.HTTPReply(w, diagnose.FailCommand(err), json)
281
-			return
282
-		}
283
-
284
-		var encodedValue string
285
-		if unsafe {
286
-			encodedValue = string(value)
287
-		} else {
288
-			encodedValue = base64.StdEncoding.EncodeToString(value)
289
-		}
290
-
291
-		rsp := &diagnose.TableEntryObj{Key: key, Value: encodedValue}
292
-		log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("update entry done")
293
-		diagnose.HTTPReply(w, diagnose.CommandSucceed(rsp), json)
294
-		return
295
-	}
296
-	diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
297
-}
298
-
299
-func dbJoinNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
300
-	r.ParseForm()
301
-	diagnose.DebugHTTPForm(r)
302
-	_, json := diagnose.ParseHTTPFormOptions(r)
303
-
304
-	// audit logs
305
-	log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
306
-	log.Info("join network")
307
-
308
-	if len(r.Form["nid"]) < 1 {
309
-		rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path))
310
-		log.Error("join network failed, wrong input")
311
-		diagnose.HTTPReply(w, rsp, json)
312
-		return
313
-	}
314
-
315
-	nid := r.Form["nid"][0]
316
-
317
-	nDB, ok := ctx.(*NetworkDB)
318
-	if ok {
319
-		if err := nDB.JoinNetwork(nid); err != nil {
320
-			log.WithError(err).Error("join network failed")
321
-			diagnose.HTTPReply(w, diagnose.FailCommand(err), json)
322
-			return
323
-		}
324
-		log.Info("join network done")
325
-		diagnose.HTTPReply(w, diagnose.CommandSucceed(nil), json)
326
-		return
327
-	}
328
-	diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
329
-}
330
-
331
-func dbLeaveNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
332
-	r.ParseForm()
333
-	diagnose.DebugHTTPForm(r)
334
-	_, json := diagnose.ParseHTTPFormOptions(r)
335
-
336
-	// audit logs
337
-	log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
338
-	log.Info("leave network")
339
-
340
-	if len(r.Form["nid"]) < 1 {
341
-		rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path))
342
-		log.Error("leave network failed, wrong input")
343
-		diagnose.HTTPReply(w, rsp, json)
344
-		return
345
-	}
346
-
347
-	nid := r.Form["nid"][0]
348
-
349
-	nDB, ok := ctx.(*NetworkDB)
350
-	if ok {
351
-		if err := nDB.LeaveNetwork(nid); err != nil {
352
-			log.WithError(err).Error("leave network failed")
353
-			diagnose.HTTPReply(w, diagnose.FailCommand(err), json)
354
-			return
355
-		}
356
-		log.Info("leave network done")
357
-		diagnose.HTTPReply(w, diagnose.CommandSucceed(nil), json)
358
-		return
359
-	}
360
-	diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
361
-}
362
-
363
-func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
364
-	r.ParseForm()
365
-	diagnose.DebugHTTPForm(r)
366
-	unsafe, json := diagnose.ParseHTTPFormOptions(r)
367
-
368
-	// audit logs
369
-	log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
370
-	log.Info("get table")
371
-
372
-	if len(r.Form["tname"]) < 1 ||
373
-		len(r.Form["nid"]) < 1 {
374
-		rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id", r.URL.Path))
375
-		log.Error("get table failed, wrong input")
376
-		diagnose.HTTPReply(w, rsp, json)
377
-		return
378
-	}
379
-
380
-	tname := r.Form["tname"][0]
381
-	nid := r.Form["nid"][0]
382
-
383
-	nDB, ok := ctx.(*NetworkDB)
384
-	if ok {
385
-		table := nDB.GetTableByNetwork(tname, nid)
386
-		rsp := &diagnose.TableObj{Length: len(table)}
387
-		var i = 0
388
-		for k, v := range table {
389
-			var encodedValue string
390
-			if unsafe {
391
-				encodedValue = string(v.Value)
392
-			} else {
393
-				encodedValue = base64.StdEncoding.EncodeToString(v.Value)
394
-			}
395
-			rsp.Elements = append(rsp.Elements,
396
-				&diagnose.TableEntryObj{
397
-					Index: i,
398
-					Key:   k,
399
-					Value: encodedValue,
400
-					Owner: v.owner,
401
-				})
402
-			i++
403
-		}
404
-		log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("get table done")
405
-		diagnose.HTTPReply(w, diagnose.CommandSucceed(rsp), json)
406
-		return
407
-	}
408
-	diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
409
-}
410 1
new file mode 100644
... ...
@@ -0,0 +1,409 @@
0
+package networkdb
1
+
2
+import (
3
+	"encoding/base64"
4
+	"fmt"
5
+	"net/http"
6
+	"strings"
7
+
8
+	"github.com/docker/libnetwork/common"
9
+	"github.com/docker/libnetwork/diagnostic"
10
+	"github.com/sirupsen/logrus"
11
+)
12
+
13
+const (
14
+	missingParameter = "missing parameter"
15
+	dbNotAvailable   = "database not available"
16
+)
17
+
18
+// NetDbPaths2Func TODO
19
+var NetDbPaths2Func = map[string]diagnostic.HTTPHandlerFunc{
20
+	"/join":         dbJoin,
21
+	"/networkpeers": dbPeers,
22
+	"/clusterpeers": dbClusterPeers,
23
+	"/joinnetwork":  dbJoinNetwork,
24
+	"/leavenetwork": dbLeaveNetwork,
25
+	"/createentry":  dbCreateEntry,
26
+	"/updateentry":  dbUpdateEntry,
27
+	"/deleteentry":  dbDeleteEntry,
28
+	"/getentry":     dbGetEntry,
29
+	"/gettable":     dbGetTable,
30
+}
31
+
32
+func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) {
33
+	r.ParseForm()
34
+	diagnostic.DebugHTTPForm(r)
35
+	_, json := diagnostic.ParseHTTPFormOptions(r)
36
+
37
+	// audit logs
38
+	log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
39
+	log.Info("join cluster")
40
+
41
+	if len(r.Form["members"]) < 1 {
42
+		rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?members=ip1,ip2,...", r.URL.Path))
43
+		log.Error("join cluster failed, wrong input")
44
+		diagnostic.HTTPReply(w, rsp, json)
45
+		return
46
+	}
47
+
48
+	nDB, ok := ctx.(*NetworkDB)
49
+	if ok {
50
+		err := nDB.Join(strings.Split(r.Form["members"][0], ","))
51
+		if err != nil {
52
+			rsp := diagnostic.FailCommand(fmt.Errorf("%s error in the DB join %s", r.URL.Path, err))
53
+			log.WithError(err).Error("join cluster failed")
54
+			diagnostic.HTTPReply(w, rsp, json)
55
+			return
56
+		}
57
+
58
+		log.Info("join cluster done")
59
+		diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
60
+		return
61
+	}
62
+	diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
63
+}
64
+
65
+func dbPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
66
+	r.ParseForm()
67
+	diagnostic.DebugHTTPForm(r)
68
+	_, json := diagnostic.ParseHTTPFormOptions(r)
69
+
70
+	// audit logs
71
+	log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
72
+	log.Info("network peers")
73
+
74
+	if len(r.Form["nid"]) < 1 {
75
+		rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=test", r.URL.Path))
76
+		log.Error("network peers failed, wrong input")
77
+		diagnostic.HTTPReply(w, rsp, json)
78
+		return
79
+	}
80
+
81
+	nDB, ok := ctx.(*NetworkDB)
82
+	if ok {
83
+		peers := nDB.Peers(r.Form["nid"][0])
84
+		rsp := &diagnostic.TableObj{Length: len(peers)}
85
+		for i, peerInfo := range peers {
86
+			rsp.Elements = append(rsp.Elements, &diagnostic.PeerEntryObj{Index: i, Name: peerInfo.Name, IP: peerInfo.IP})
87
+		}
88
+		log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("network peers done")
89
+		diagnostic.HTTPReply(w, diagnostic.CommandSucceed(rsp), json)
90
+		return
91
+	}
92
+	diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
93
+}
94
+
95
+func dbClusterPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
96
+	r.ParseForm()
97
+	diagnostic.DebugHTTPForm(r)
98
+	_, json := diagnostic.ParseHTTPFormOptions(r)
99
+
100
+	// audit logs
101
+	log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
102
+	log.Info("cluster peers")
103
+
104
+	nDB, ok := ctx.(*NetworkDB)
105
+	if ok {
106
+		peers := nDB.ClusterPeers()
107
+		rsp := &diagnostic.TableObj{Length: len(peers)}
108
+		for i, peerInfo := range peers {
109
+			rsp.Elements = append(rsp.Elements, &diagnostic.PeerEntryObj{Index: i, Name: peerInfo.Name, IP: peerInfo.IP})
110
+		}
111
+		log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("cluster peers done")
112
+		diagnostic.HTTPReply(w, diagnostic.CommandSucceed(rsp), json)
113
+		return
114
+	}
115
+	diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
116
+}
117
+
118
+func dbCreateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
119
+	r.ParseForm()
120
+	diagnostic.DebugHTTPForm(r)
121
+	unsafe, json := diagnostic.ParseHTTPFormOptions(r)
122
+
123
+	// audit logs
124
+	log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
125
+	log.Info("create entry")
126
+
127
+	if len(r.Form["tname"]) < 1 ||
128
+		len(r.Form["nid"]) < 1 ||
129
+		len(r.Form["key"]) < 1 ||
130
+		len(r.Form["value"]) < 1 {
131
+		rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path))
132
+		log.Error("create entry failed, wrong input")
133
+		diagnostic.HTTPReply(w, rsp, json)
134
+		return
135
+	}
136
+
137
+	tname := r.Form["tname"][0]
138
+	nid := r.Form["nid"][0]
139
+	key := r.Form["key"][0]
140
+	value := r.Form["value"][0]
141
+	decodedValue := []byte(value)
142
+	if !unsafe {
143
+		var err error
144
+		decodedValue, err = base64.StdEncoding.DecodeString(value)
145
+		if err != nil {
146
+			log.WithError(err).Error("create entry failed")
147
+			diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
148
+			return
149
+		}
150
+	}
151
+
152
+	nDB, ok := ctx.(*NetworkDB)
153
+	if ok {
154
+		if err := nDB.CreateEntry(tname, nid, key, decodedValue); err != nil {
155
+			rsp := diagnostic.FailCommand(err)
156
+			diagnostic.HTTPReply(w, rsp, json)
157
+			log.WithError(err).Error("create entry failed")
158
+			return
159
+		}
160
+		log.Info("create entry done")
161
+		diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
162
+		return
163
+	}
164
+	diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
165
+}
166
+
167
+func dbUpdateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
168
+	r.ParseForm()
169
+	diagnostic.DebugHTTPForm(r)
170
+	unsafe, json := diagnostic.ParseHTTPFormOptions(r)
171
+
172
+	// audit logs
173
+	log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
174
+	log.Info("update entry")
175
+
176
+	if len(r.Form["tname"]) < 1 ||
177
+		len(r.Form["nid"]) < 1 ||
178
+		len(r.Form["key"]) < 1 ||
179
+		len(r.Form["value"]) < 1 {
180
+		rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path))
181
+		log.Error("update entry failed, wrong input")
182
+		diagnostic.HTTPReply(w, rsp, json)
183
+		return
184
+	}
185
+
186
+	tname := r.Form["tname"][0]
187
+	nid := r.Form["nid"][0]
188
+	key := r.Form["key"][0]
189
+	value := r.Form["value"][0]
190
+	decodedValue := []byte(value)
191
+	if !unsafe {
192
+		var err error
193
+		decodedValue, err = base64.StdEncoding.DecodeString(value)
194
+		if err != nil {
195
+			log.WithError(err).Error("update entry failed")
196
+			diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
197
+			return
198
+		}
199
+	}
200
+
201
+	nDB, ok := ctx.(*NetworkDB)
202
+	if ok {
203
+		if err := nDB.UpdateEntry(tname, nid, key, decodedValue); err != nil {
204
+			log.WithError(err).Error("update entry failed")
205
+			diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
206
+			return
207
+		}
208
+		log.Info("update entry done")
209
+		diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
210
+		return
211
+	}
212
+	diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
213
+}
214
+
215
+func dbDeleteEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
216
+	r.ParseForm()
217
+	diagnostic.DebugHTTPForm(r)
218
+	_, json := diagnostic.ParseHTTPFormOptions(r)
219
+
220
+	// audit logs
221
+	log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
222
+	log.Info("delete entry")
223
+
224
+	if len(r.Form["tname"]) < 1 ||
225
+		len(r.Form["nid"]) < 1 ||
226
+		len(r.Form["key"]) < 1 {
227
+		rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path))
228
+		log.Error("delete entry failed, wrong input")
229
+		diagnostic.HTTPReply(w, rsp, json)
230
+		return
231
+	}
232
+
233
+	tname := r.Form["tname"][0]
234
+	nid := r.Form["nid"][0]
235
+	key := r.Form["key"][0]
236
+
237
+	nDB, ok := ctx.(*NetworkDB)
238
+	if ok {
239
+		err := nDB.DeleteEntry(tname, nid, key)
240
+		if err != nil {
241
+			log.WithError(err).Error("delete entry failed")
242
+			diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
243
+			return
244
+		}
245
+		log.Info("delete entry done")
246
+		diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
247
+		return
248
+	}
249
+	diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
250
+}
251
+
252
+func dbGetEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
253
+	r.ParseForm()
254
+	diagnostic.DebugHTTPForm(r)
255
+	unsafe, json := diagnostic.ParseHTTPFormOptions(r)
256
+
257
+	// audit logs
258
+	log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
259
+	log.Info("get entry")
260
+
261
+	if len(r.Form["tname"]) < 1 ||
262
+		len(r.Form["nid"]) < 1 ||
263
+		len(r.Form["key"]) < 1 {
264
+		rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path))
265
+		log.Error("get entry failed, wrong input")
266
+		diagnostic.HTTPReply(w, rsp, json)
267
+		return
268
+	}
269
+
270
+	tname := r.Form["tname"][0]
271
+	nid := r.Form["nid"][0]
272
+	key := r.Form["key"][0]
273
+
274
+	nDB, ok := ctx.(*NetworkDB)
275
+	if ok {
276
+		value, err := nDB.GetEntry(tname, nid, key)
277
+		if err != nil {
278
+			log.WithError(err).Error("get entry failed")
279
+			diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
280
+			return
281
+		}
282
+
283
+		var encodedValue string
284
+		if unsafe {
285
+			encodedValue = string(value)
286
+		} else {
287
+			encodedValue = base64.StdEncoding.EncodeToString(value)
288
+		}
289
+
290
+		rsp := &diagnostic.TableEntryObj{Key: key, Value: encodedValue}
291
+		log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("get entry done")
292
+		diagnostic.HTTPReply(w, diagnostic.CommandSucceed(rsp), json)
293
+		return
294
+	}
295
+	diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
296
+}
297
+
298
+func dbJoinNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
299
+	r.ParseForm()
300
+	diagnostic.DebugHTTPForm(r)
301
+	_, json := diagnostic.ParseHTTPFormOptions(r)
302
+
303
+	// audit logs
304
+	log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
305
+	log.Info("join network")
306
+
307
+	if len(r.Form["nid"]) < 1 {
308
+		rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path))
309
+		log.Error("join network failed, wrong input")
310
+		diagnostic.HTTPReply(w, rsp, json)
311
+		return
312
+	}
313
+
314
+	nid := r.Form["nid"][0]
315
+
316
+	nDB, ok := ctx.(*NetworkDB)
317
+	if ok {
318
+		if err := nDB.JoinNetwork(nid); err != nil {
319
+			log.WithError(err).Error("join network failed")
320
+			diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
321
+			return
322
+		}
323
+		log.Info("join network done")
324
+		diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
325
+		return
326
+	}
327
+	diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
328
+}
329
+
330
+func dbLeaveNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
331
+	r.ParseForm()
332
+	diagnostic.DebugHTTPForm(r)
333
+	_, json := diagnostic.ParseHTTPFormOptions(r)
334
+
335
+	// audit logs
336
+	log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
337
+	log.Info("leave network")
338
+
339
+	if len(r.Form["nid"]) < 1 {
340
+		rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path))
341
+		log.Error("leave network failed, wrong input")
342
+		diagnostic.HTTPReply(w, rsp, json)
343
+		return
344
+	}
345
+
346
+	nid := r.Form["nid"][0]
347
+
348
+	nDB, ok := ctx.(*NetworkDB)
349
+	if ok {
350
+		if err := nDB.LeaveNetwork(nid); err != nil {
351
+			log.WithError(err).Error("leave network failed")
352
+			diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
353
+			return
354
+		}
355
+		log.Info("leave network done")
356
+		diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
357
+		return
358
+	}
359
+	diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
360
+}
361
+
362
+func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
363
+	r.ParseForm()
364
+	diagnostic.DebugHTTPForm(r)
365
+	unsafe, json := diagnostic.ParseHTTPFormOptions(r)
366
+
367
+	// audit logs
368
+	log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
369
+	log.Info("get table")
370
+
371
+	if len(r.Form["tname"]) < 1 ||
372
+		len(r.Form["nid"]) < 1 {
373
+		rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id", r.URL.Path))
374
+		log.Error("get table failed, wrong input")
375
+		diagnostic.HTTPReply(w, rsp, json)
376
+		return
377
+	}
378
+
379
+	tname := r.Form["tname"][0]
380
+	nid := r.Form["nid"][0]
381
+
382
+	nDB, ok := ctx.(*NetworkDB)
383
+	if ok {
384
+		table := nDB.GetTableByNetwork(tname, nid)
385
+		rsp := &diagnostic.TableObj{Length: len(table)}
386
+		var i = 0
387
+		for k, v := range table {
388
+			var encodedValue string
389
+			if unsafe {
390
+				encodedValue = string(v.Value)
391
+			} else {
392
+				encodedValue = base64.StdEncoding.EncodeToString(v.Value)
393
+			}
394
+			rsp.Elements = append(rsp.Elements,
395
+				&diagnostic.TableEntryObj{
396
+					Index: i,
397
+					Key:   k,
398
+					Value: encodedValue,
399
+					Owner: v.owner,
400
+				})
401
+			i++
402
+		}
403
+		log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("get table done")
404
+		diagnostic.HTTPReply(w, diagnostic.CommandSucceed(rsp), json)
405
+		return
406
+	}
407
+	diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
408
+}
... ...
@@ -362,7 +362,7 @@ func (sb *sandbox) rebuildDNS() error {
362 362
 dnsOpt:
363 363
 	for _, resOpt := range resOptions {
364 364
 		if strings.Contains(resOpt, "ndots") {
365
-			for i, option := range dnsOptionsList {
365
+			for _, option := range dnsOptionsList {
366 366
 				if strings.Contains(option, "ndots") {
367 367
 					parts := strings.Split(option, ":")
368 368
 					if len(parts) != 2 {
... ...
@@ -371,10 +371,8 @@ dnsOpt:
371 371
 					if num, err := strconv.Atoi(parts[1]); err != nil {
372 372
 						return fmt.Errorf("invalid number for ndots option %v", option)
373 373
 					} else if num > 0 {
374
-						// if the user sets ndots, we mark it as set but we remove the option to guarantee
375
-						// that into the container land only ndots:0
374
+						// if the user sets ndots, use the user setting
376 375
 						sb.ndotsSet = true
377
-						dnsOptionsList = append(dnsOptionsList[:i], dnsOptionsList[i+1:]...)
378 376
 						break dnsOpt
379 377
 					}
380 378
 				}
... ...
@@ -382,7 +380,11 @@ dnsOpt:
382 382
 		}
383 383
 	}
384 384
 
385
-	dnsOptionsList = append(dnsOptionsList, resOptions...)
385
+	if !sb.ndotsSet {
386
+		// if the user did not set the ndots, set it to 0 to prioritize the service name resolution
387
+		// Ref: https://linux.die.net/man/5/resolv.conf
388
+		dnsOptionsList = append(dnsOptionsList, resOptions...)
389
+	}
386 390
 
387 391
 	_, err = resolvconf.Build(sb.config.resolvConfPath, dnsList, dnsSearchList, dnsOptionsList)
388 392
 	return err
... ...
@@ -27,7 +27,7 @@ github.com/gorilla/mux v1.1
27 27
 github.com/hashicorp/consul v0.5.2
28 28
 github.com/hashicorp/go-msgpack 71c2886f5a673a35f909803f38ece5810165097b
29 29
 github.com/hashicorp/go-multierror fcdddc395df1ddf4247c69bd436e84cfa0733f7e
30
-github.com/hashicorp/memberlist v0.1.0
30
+github.com/hashicorp/memberlist 3d8438da9589e7b608a83ffac1ef8211486bcb7c
31 31
 github.com/sean-/seed e2103e2c35297fb7e17febb81e49b312087a2372
32 32
 github.com/hashicorp/go-sockaddr acd314c5781ea706c710d9ea70069fd2e110d61d
33 33
 github.com/hashicorp/serf 598c54895cc5a7b1a24a398d635e8c0ea0959870
... ...
@@ -23,6 +23,8 @@ Please check your installation with:
23 23
 go version
24 24
 ```
25 25
 
26
+Run `make deps` to fetch dependencies before building
27
+
26 28
 ## Usage
27 29
 
28 30
 Memberlist is surprisingly simple to use. An example is shown below:
... ...
@@ -63,82 +65,11 @@ For complete documentation, see the associated [Godoc](http://godoc.org/github.c
63 63
 
64 64
 ## Protocol
65 65
 
66
-memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf),
67
-with a few minor adaptations, mostly to increase propagation speed and
66
+memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf). However, we extend the protocol in a number of ways:
67
+
68
+* Several extensions are made to increase propagation speed and
68 69
 convergence rate.
70
+* Another set of extensions, that we call Lifeguard, are made to make memberlist more robust in the presence of slow message processing (due to factors such as CPU starvation, and network delay or loss).
69 71
 
70
-A high level overview of the memberlist protocol (based on SWIM) is
71
-described below, but for details please read the full
72
-[SWIM paper](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf)
73
-followed by the memberlist source. We welcome any questions related
72
+For details on all of these extensions, please read our paper "[Lifeguard : SWIM-ing with Situational Awareness](https://arxiv.org/abs/1707.00788)", along with the memberlist source.  We welcome any questions related
74 73
 to the protocol on our issue tracker.
75
-
76
-### Protocol Description
77
-
78
-memberlist begins by joining an existing cluster or starting a new
79
-cluster. If starting a new cluster, additional nodes are expected to join
80
-it. New nodes in an existing cluster must be given the address of at
81
-least one existing member in order to join the cluster. The new member
82
-does a full state sync with the existing member over TCP and begins gossiping its
83
-existence to the cluster.
84
-
85
-Gossip is done over UDP with a configurable but fixed fanout and interval.
86
-This ensures that network usage is constant with regards to number of nodes, as opposed to
87
-exponential growth that can occur with traditional heartbeat mechanisms.
88
-Complete state exchanges with a random node are done periodically over
89
-TCP, but much less often than gossip messages. This increases the likelihood
90
-that the membership list converges properly since the full state is exchanged
91
-and merged. The interval between full state exchanges is configurable or can
92
-be disabled entirely.
93
-
94
-Failure detection is done by periodic random probing using a configurable interval.
95
-If the node fails to ack within a reasonable time (typically some multiple
96
-of RTT), then an indirect probe as well as a direct TCP probe are attempted. An
97
-indirect probe asks a configurable number of random nodes to probe the same node,
98
-in case there are network issues causing our own node to fail the probe. The direct
99
-TCP probe is used to help identify the common situation where networking is
100
-misconfigured to allow TCP but not UDP. Without the TCP probe, a UDP-isolated node
101
-would think all other nodes were suspect and could cause churn in the cluster when
102
-it attempts a TCP-based state exchange with another node. It is not desirable to
103
-operate with only TCP connectivity because convergence will be much slower, but it
104
-is enabled so that memberlist can detect this situation and alert operators.
105
-
106
-If both our probe, the indirect probes, and the direct TCP probe fail within a
107
-configurable time, then the node is marked "suspicious" and this knowledge is
108
-gossiped to the cluster. A suspicious node is still considered a member of
109
-cluster. If the suspect member of the cluster does not dispute the suspicion
110
-within a configurable period of time, the node is finally considered dead,
111
-and this state is then gossiped to the cluster.
112
-
113
-This is a brief and incomplete description of the protocol. For a better idea,
114
-please read the
115
-[SWIM paper](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf)
116
-in its entirety, along with the memberlist source code.
117
-
118
-### Changes from SWIM
119
-
120
-As mentioned earlier, the memberlist protocol is based on SWIM but includes
121
-minor changes, mostly to increase propagation speed and convergence rates.
122
-
123
-The changes from SWIM are noted here:
124
-
125
-* memberlist does a full state sync over TCP periodically. SWIM only propagates
126
-  changes over gossip. While both eventually reach convergence, the full state
127
-  sync increases the likelihood that nodes are fully converged more quickly,
128
-  at the expense of more bandwidth usage. This feature can be totally disabled
129
-  if you wish.
130
-
131
-* memberlist has a dedicated gossip layer separate from the failure detection
132
-  protocol. SWIM only piggybacks gossip messages on top of probe/ack messages.
133
-  memberlist also piggybacks gossip messages on top of probe/ack messages, but
134
-  also will periodically send out dedicated gossip messages on their own. This
135
-  feature lets you have a higher gossip rate (for example once per 200ms)
136
-  and a slower failure detection rate (such as once per second), resulting
137
-  in overall faster convergence rates and data propagation speeds. This feature
138
-  can be totally disabed as well, if you wish.
139
-
140
-* memberlist stores around the state of dead nodes for a set amount of time,
141
-  so that when full syncs are requested, the requester also receives information
142
-  about dead nodes. Because SWIM doesn't do full syncs, SWIM deletes dead node
143
-  state immediately upon learning that the node is dead. This change again helps
144
-  the cluster converge more quickly.
... ...
@@ -141,6 +141,16 @@ type Config struct {
141 141
 	GossipNodes         int
142 142
 	GossipToTheDeadTime time.Duration
143 143
 
144
+	// GossipVerifyIncoming controls whether to enforce encryption for incoming
145
+	// gossip. It is used for upshifting from unencrypted to encrypted gossip on
146
+	// a running cluster.
147
+	GossipVerifyIncoming bool
148
+
149
+	// GossipVerifyOutgoing controls whether to enforce encryption for outgoing
150
+	// gossip. It is used for upshifting from unencrypted to encrypted gossip on
151
+	// a running cluster.
152
+	GossipVerifyOutgoing bool
153
+
144 154
 	// EnableCompression is used to control message compression. This can
145 155
 	// be used to reduce bandwidth usage at the cost of slightly more CPU
146 156
 	// utilization. This is only available starting at protocol version 1.
... ...
@@ -225,7 +235,7 @@ func DefaultLANConfig() *Config {
225 225
 		TCPTimeout:              10 * time.Second,       // Timeout after 10 seconds
226 226
 		IndirectChecks:          3,                      // Use 3 nodes for the indirect ping
227 227
 		RetransmitMult:          4,                      // Retransmit a message 4 * log(N+1) nodes
228
-		SuspicionMult:           5,                      // Suspect a node for 5 * log(N+1) * Interval
228
+		SuspicionMult:           4,                      // Suspect a node for 4 * log(N+1) * Interval
229 229
 		SuspicionMaxTimeoutMult: 6,                      // For 10k nodes this will give a max timeout of 120 seconds
230 230
 		PushPullInterval:        30 * time.Second,       // Low frequency
231 231
 		ProbeTimeout:            500 * time.Millisecond, // Reasonable RTT time for LAN
... ...
@@ -233,9 +243,11 @@ func DefaultLANConfig() *Config {
233 233
 		DisableTcpPings:         false,                  // TCP pings are safe, even with mixed versions
234 234
 		AwarenessMaxMultiplier:  8,                      // Probe interval backs off to 8 seconds
235 235
 
236
-		GossipNodes:         3,                      // Gossip to 3 nodes
237
-		GossipInterval:      200 * time.Millisecond, // Gossip more rapidly
238
-		GossipToTheDeadTime: 30 * time.Second,       // Same as push/pull
236
+		GossipNodes:          3,                      // Gossip to 3 nodes
237
+		GossipInterval:       200 * time.Millisecond, // Gossip more rapidly
238
+		GossipToTheDeadTime:  30 * time.Second,       // Same as push/pull
239
+		GossipVerifyIncoming: true,
240
+		GossipVerifyOutgoing: true,
239 241
 
240 242
 		EnableCompression: true, // Enable compression by default
241 243
 
... ...
@@ -22,9 +22,10 @@ import (
22 22
 	"strconv"
23 23
 	"strings"
24 24
 	"sync"
25
+	"sync/atomic"
25 26
 	"time"
26 27
 
27
-	"github.com/hashicorp/go-multierror"
28
+	multierror "github.com/hashicorp/go-multierror"
28 29
 	sockaddr "github.com/hashicorp/go-sockaddr"
29 30
 	"github.com/miekg/dns"
30 31
 )
... ...
@@ -35,11 +36,14 @@ type Memberlist struct {
35 35
 	numNodes    uint32 // Number of known nodes (estimate)
36 36
 
37 37
 	config         *Config
38
-	shutdown       bool
38
+	shutdown       int32 // Used as an atomic boolean value
39 39
 	shutdownCh     chan struct{}
40
-	leave          bool
40
+	leave          int32 // Used as an atomic boolean value
41 41
 	leaveBroadcast chan struct{}
42 42
 
43
+	shutdownLock sync.Mutex // Serializes calls to Shutdown
44
+	leaveLock    sync.Mutex // Serializes calls to Leave
45
+
43 46
 	transport Transport
44 47
 	handoff   chan msgHandoff
45 48
 
... ...
@@ -113,15 +117,44 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
113 113
 			BindPort:  conf.BindPort,
114 114
 			Logger:    logger,
115 115
 		}
116
-		nt, err := NewNetTransport(nc)
116
+
117
+		// See comment below for details about the retry in here.
118
+		makeNetRetry := func(limit int) (*NetTransport, error) {
119
+			var err error
120
+			for try := 0; try < limit; try++ {
121
+				var nt *NetTransport
122
+				if nt, err = NewNetTransport(nc); err == nil {
123
+					return nt, nil
124
+				}
125
+				if strings.Contains(err.Error(), "address already in use") {
126
+					logger.Printf("[DEBUG] memberlist: Got bind error: %v", err)
127
+					continue
128
+				}
129
+			}
130
+
131
+			return nil, fmt.Errorf("failed to obtain an address: %v", err)
132
+		}
133
+
134
+		// The dynamic bind port operation is inherently racy because
135
+		// even though we are using the kernel to find a port for us, we
136
+		// are attempting to bind multiple protocols (and potentially
137
+		// multiple addresses) with the same port number. We build in a
138
+		// few retries here since this often gets transient errors in
139
+		// busy unit tests.
140
+		limit := 1
141
+		if conf.BindPort == 0 {
142
+			limit = 10
143
+		}
144
+
145
+		nt, err := makeNetRetry(limit)
117 146
 		if err != nil {
118 147
 			return nil, fmt.Errorf("Could not set up network transport: %v", err)
119 148
 		}
120
-
121 149
 		if conf.BindPort == 0 {
122 150
 			port := nt.GetAutoBindPort()
123 151
 			conf.BindPort = port
124
-			logger.Printf("[DEBUG] Using dynamic bind port %d", port)
152
+			conf.AdvertisePort = port
153
+			logger.Printf("[DEBUG] memberlist: Using dynamic bind port %d", port)
125 154
 		}
126 155
 		transport = nt
127 156
 	}
... ...
@@ -275,23 +308,17 @@ func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, err
275 275
 // resolveAddr is used to resolve the address into an address,
276 276
 // port, and error. If no port is given, use the default
277 277
 func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
278
-	// Normalize the incoming string to host:port so we can apply Go's
279
-	// parser to it.
280
-	port := uint16(0)
281
-	if !hasPort(hostStr) {
282
-		hostStr += ":" + strconv.Itoa(m.config.BindPort)
283
-	}
278
+	// This captures the supplied port, or the default one.
279
+	hostStr = ensurePort(hostStr, m.config.BindPort)
284 280
 	host, sport, err := net.SplitHostPort(hostStr)
285 281
 	if err != nil {
286 282
 		return nil, err
287 283
 	}
288
-
289
-	// This will capture the supplied port, or the default one added above.
290 284
 	lport, err := strconv.ParseUint(sport, 10, 16)
291 285
 	if err != nil {
292 286
 		return nil, err
293 287
 	}
294
-	port = uint16(lport)
288
+	port := uint16(lport)
295 289
 
296 290
 	// If it looks like an IP address we are done. The SplitHostPort() above
297 291
 	// will make sure the host part is in good shape for parsing, even for
... ...
@@ -525,18 +552,17 @@ func (m *Memberlist) NumMembers() (alive int) {
525 525
 // This method is safe to call multiple times, but must not be called
526 526
 // after the cluster is already shut down.
527 527
 func (m *Memberlist) Leave(timeout time.Duration) error {
528
-	m.nodeLock.Lock()
529
-	// We can't defer m.nodeLock.Unlock() because m.deadNode will also try to
530
-	// acquire a lock so we need to Unlock before that.
528
+	m.leaveLock.Lock()
529
+	defer m.leaveLock.Unlock()
531 530
 
532
-	if m.shutdown {
533
-		m.nodeLock.Unlock()
531
+	if m.hasShutdown() {
534 532
 		panic("leave after shutdown")
535 533
 	}
536 534
 
537
-	if !m.leave {
538
-		m.leave = true
535
+	if !m.hasLeft() {
536
+		atomic.StoreInt32(&m.leave, 1)
539 537
 
538
+		m.nodeLock.Lock()
540 539
 		state, ok := m.nodeMap[m.config.Name]
541 540
 		m.nodeLock.Unlock()
542 541
 		if !ok {
... ...
@@ -562,8 +588,6 @@ func (m *Memberlist) Leave(timeout time.Duration) error {
562 562
 				return fmt.Errorf("timeout waiting for leave broadcast")
563 563
 			}
564 564
 		}
565
-	} else {
566
-		m.nodeLock.Unlock()
567 565
 	}
568 566
 
569 567
 	return nil
... ...
@@ -605,21 +629,31 @@ func (m *Memberlist) ProtocolVersion() uint8 {
605 605
 //
606 606
 // This method is safe to call multiple times.
607 607
 func (m *Memberlist) Shutdown() error {
608
-	m.nodeLock.Lock()
609
-	defer m.nodeLock.Unlock()
608
+	m.shutdownLock.Lock()
609
+	defer m.shutdownLock.Unlock()
610 610
 
611
-	if m.shutdown {
611
+	if m.hasShutdown() {
612 612
 		return nil
613 613
 	}
614 614
 
615 615
 	// Shut down the transport first, which should block until it's
616 616
 	// completely torn down. If we kill the memberlist-side handlers
617 617
 	// those I/O handlers might get stuck.
618
-	m.transport.Shutdown()
618
+	if err := m.transport.Shutdown(); err != nil {
619
+		m.logger.Printf("[ERR] Failed to shutdown transport: %v", err)
620
+	}
619 621
 
620 622
 	// Now tear down everything else.
621
-	m.shutdown = true
623
+	atomic.StoreInt32(&m.shutdown, 1)
622 624
 	close(m.shutdownCh)
623 625
 	m.deschedule()
624 626
 	return nil
625 627
 }
628
+
629
+func (m *Memberlist) hasShutdown() bool {
630
+	return atomic.LoadInt32(&m.shutdown) == 1
631
+}
632
+
633
+func (m *Memberlist) hasLeft() bool {
634
+	return atomic.LoadInt32(&m.leave) == 1
635
+}
... ...
@@ -55,6 +55,7 @@ const (
55 55
 	encryptMsg
56 56
 	nackRespMsg
57 57
 	hasCrcMsg
58
+	errMsg
58 59
 )
59 60
 
60 61
 // compressionType is used to specify the compression algorithm
... ...
@@ -105,6 +106,11 @@ type nackResp struct {
105 105
 	SeqNo uint32
106 106
 }
107 107
 
108
+// err response is sent to relay the error from the remote end
109
+type errResp struct {
110
+	Error string
111
+}
112
+
108 113
 // suspect is broadcast when we suspect a node is dead
109 114
 type suspect struct {
110 115
 	Incarnation uint32
... ...
@@ -209,6 +215,19 @@ func (m *Memberlist) handleConn(conn net.Conn) {
209 209
 	if err != nil {
210 210
 		if err != io.EOF {
211 211
 			m.logger.Printf("[ERR] memberlist: failed to receive: %s %s", err, LogConn(conn))
212
+
213
+			resp := errResp{err.Error()}
214
+			out, err := encode(errMsg, &resp)
215
+			if err != nil {
216
+				m.logger.Printf("[ERR] memberlist: Failed to encode error response: %s", err)
217
+				return
218
+			}
219
+
220
+			err = m.rawSendMsgStream(conn, out.Bytes())
221
+			if err != nil {
222
+				m.logger.Printf("[ERR] memberlist: Failed to send error: %s %s", err, LogConn(conn))
223
+				return
224
+			}
212 225
 		}
213 226
 		return
214 227
 	}
... ...
@@ -283,8 +302,13 @@ func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time
283 283
 		// Decrypt the payload
284 284
 		plain, err := decryptPayload(m.config.Keyring.GetKeys(), buf, nil)
285 285
 		if err != nil {
286
-			m.logger.Printf("[ERR] memberlist: Decrypt packet failed: %v %s", err, LogAddress(from))
287
-			return
286
+			if !m.config.GossipVerifyIncoming {
287
+				// Treat the message as plaintext
288
+				plain = buf
289
+			} else {
290
+				m.logger.Printf("[ERR] memberlist: Decrypt packet failed: %v %s", err, LogAddress(from))
291
+				return
292
+			}
288 293
 		}
289 294
 
290 295
 		// Continue processing the plaintext buffer
... ...
@@ -557,7 +581,7 @@ func (m *Memberlist) encodeAndSendMsg(addr string, msgType messageType, msg inte
557 557
 func (m *Memberlist) sendMsg(addr string, msg []byte) error {
558 558
 	// Check if we can piggy back any messages
559 559
 	bytesAvail := m.config.UDPBufferSize - len(msg) - compoundHeaderOverhead
560
-	if m.config.EncryptionEnabled() {
560
+	if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing {
561 561
 		bytesAvail -= encryptOverhead(m.encryptionVersion())
562 562
 	}
563 563
 	extra := m.getBroadcasts(compoundOverhead, bytesAvail)
... ...
@@ -621,7 +645,7 @@ func (m *Memberlist) rawSendMsgPacket(addr string, node *Node, msg []byte) error
621 621
 	}
622 622
 
623 623
 	// Check if we have encryption enabled
624
-	if m.config.EncryptionEnabled() {
624
+	if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing {
625 625
 		// Encrypt the payload
626 626
 		var buf bytes.Buffer
627 627
 		primaryKey := m.config.Keyring.GetPrimaryKey()
... ...
@@ -652,7 +676,7 @@ func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte) error {
652 652
 	}
653 653
 
654 654
 	// Check if encryption is enabled
655
-	if m.config.EncryptionEnabled() {
655
+	if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing {
656 656
 		crypt, err := m.encryptLocalState(sendBuf)
657 657
 		if err != nil {
658 658
 			m.logger.Printf("[ERROR] memberlist: Failed to encrypt local state: %v", err)
... ...
@@ -721,6 +745,14 @@ func (m *Memberlist) sendAndReceiveState(addr string, join bool) ([]pushNodeStat
721 721
 		return nil, nil, err
722 722
 	}
723 723
 
724
+	if msgType == errMsg {
725
+		var resp errResp
726
+		if err := dec.Decode(&resp); err != nil {
727
+			return nil, nil, err
728
+		}
729
+		return nil, nil, fmt.Errorf("remote error: %v", resp.Error)
730
+	}
731
+
724 732
 	// Quit if not push/pull
725 733
 	if msgType != pushPullMsg {
726 734
 		err := fmt.Errorf("received invalid msgType (%d), expected pushPullMsg (%d) %s", msgType, pushPullMsg, LogConn(conn))
... ...
@@ -876,7 +908,7 @@ func (m *Memberlist) readStream(conn net.Conn) (messageType, io.Reader, *codec.D
876 876
 		// Reset message type and bufConn
877 877
 		msgType = messageType(plain[0])
878 878
 		bufConn = bytes.NewReader(plain[1:])
879
-	} else if m.config.EncryptionEnabled() {
879
+	} else if m.config.EncryptionEnabled() && m.config.GossipVerifyIncoming {
880 880
 		return 0, nil, nil,
881 881
 			fmt.Errorf("Encryption is configured but remote state is not encrypted")
882 882
 	}
... ...
@@ -1027,7 +1059,7 @@ func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error {
1027 1027
 // operations, given the deadline. The bool return parameter is true if we
1028 1028
 // we able to round trip a ping to the other node.
1029 1029
 func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time.Time) (bool, error) {
1030
-	conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout)
1030
+	conn, err := m.transport.DialTimeout(addr, deadline.Sub(time.Now()))
1031 1031
 	if err != nil {
1032 1032
 		// If the node is actually dead we expect this to fail, so we
1033 1033
 		// shouldn't spam the logs with it. After this point, errors
... ...
@@ -40,6 +40,11 @@ func (n *Node) Address() string {
40 40
 	return joinHostPort(n.Addr.String(), n.Port)
41 41
 }
42 42
 
43
+// String returns the node name
44
+func (n *Node) String() string {
45
+	return n.Name
46
+}
47
+
43 48
 // NodeState is used to manage our state view of another node
44 49
 type nodeState struct {
45 50
 	Node
... ...
@@ -246,10 +251,17 @@ func (m *Memberlist) probeNode(node *nodeState) {
246 246
 	nackCh := make(chan struct{}, m.config.IndirectChecks+1)
247 247
 	m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)
248 248
 
249
+	// Mark the sent time here, which should be after any pre-processing but
250
+	// before system calls to do the actual send. This probably over-reports
251
+	// a bit, but it's the best we can do. We had originally put this right
252
+	// after the I/O, but that would sometimes give negative RTT measurements
253
+	// which was not desirable.
254
+	sent := time.Now()
255
+
249 256
 	// Send a ping to the node. If this node looks like it's suspect or dead,
250 257
 	// also tack on a suspect message so that it has a chance to refute as
251 258
 	// soon as possible.
252
-	deadline := time.Now().Add(probeInterval)
259
+	deadline := sent.Add(probeInterval)
253 260
 	addr := node.Address()
254 261
 	if node.State == stateAlive {
255 262
 		if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
... ...
@@ -279,11 +291,6 @@ func (m *Memberlist) probeNode(node *nodeState) {
279 279
 		}
280 280
 	}
281 281
 
282
-	// Mark the sent time here, which should be after any pre-processing and
283
-	// system calls to do the actual send. This probably under-reports a bit,
284
-	// but it's the best we can do.
285
-	sent := time.Now()
286
-
287 282
 	// Arrange for our self-awareness to get updated. At this point we've
288 283
 	// sent the ping, so any return statement means the probe succeeded
289 284
 	// which will improve our health until we get to the failure scenarios
... ...
@@ -830,7 +837,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
830 830
 	// in-queue to be processed but blocked by the locks above. If we let
831 831
 	// that aliveMsg process, it'll cause us to re-join the cluster. This
832 832
 	// ensures that we don't.
833
-	if m.leave && a.Node == m.config.Name {
833
+	if m.hasLeft() && a.Node == m.config.Name {
834 834
 		return
835 835
 	}
836 836
 
... ...
@@ -1106,7 +1113,7 @@ func (m *Memberlist) deadNode(d *dead) {
1106 1106
 	// Check if this is us
1107 1107
 	if state.Name == m.config.Name {
1108 1108
 		// If we are not leaving we need to refute
1109
-		if !m.leave {
1109
+		if !m.hasLeft() {
1110 1110
 			m.refute(state, d.Incarnation)
1111 1111
 			m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From)
1112 1112
 			return // Do not mark ourself dead
... ...
@@ -117,7 +117,7 @@ func (s *suspicion) Confirm(from string) bool {
117 117
 	// stop the timer then we will call the timeout function directly from
118 118
 	// here.
119 119
 	n := atomic.AddInt32(&s.n, 1)
120
-	elapsed := time.Now().Sub(s.start)
120
+	elapsed := time.Since(s.start)
121 121
 	remaining := remainingSuspicionTime(n, s.k, elapsed, s.min, s.max)
122 122
 	if s.timer.Stop() {
123 123
 		if remaining > 0 {
... ...
@@ -17,7 +17,7 @@ type Packet struct {
17 17
 
18 18
 	// Timestamp is the time when the packet was received. This should be
19 19
 	// taken as close as possible to the actual receipt time to help make an
20
-	// accurate RTT measurements during probes.
20
+	// accurate RTT measurement during probes.
21 21
 	Timestamp time.Time
22 22
 }
23 23
 
... ...
@@ -217,20 +217,6 @@ func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) {
217 217
 	return
218 218
 }
219 219
 
220
-// Given a string of the form "host", "host:port",
221
-// "ipv6::addr" or "[ipv6::address]:port",
222
-// return true if the string includes a port.
223
-func hasPort(s string) bool {
224
-	last := strings.LastIndex(s, ":")
225
-	if last == -1 {
226
-		return false
227
-	}
228
-	if s[0] == '[' {
229
-		return s[last-1] == ']'
230
-	}
231
-	return strings.Index(s, ":") == last
232
-}
233
-
234 220
 // compressPayload takes an opaque input buffer, compresses it
235 221
 // and wraps it in a compress{} message that is encoded.
236 222
 func compressPayload(inp []byte) (*bytes.Buffer, error) {
... ...
@@ -294,3 +280,31 @@ func decompressBuffer(c *compress) ([]byte, error) {
294 294
 func joinHostPort(host string, port uint16) string {
295 295
 	return net.JoinHostPort(host, strconv.Itoa(int(port)))
296 296
 }
297
+
298
+// hasPort is given a string of the form "host", "host:port", "ipv6::address",
299
+// or "[ipv6::address]:port", and returns true if the string includes a port.
300
+func hasPort(s string) bool {
301
+	// IPv6 address in brackets.
302
+	if strings.LastIndex(s, "[") == 0 {
303
+		return strings.LastIndex(s, ":") > strings.LastIndex(s, "]")
304
+	}
305
+
306
+	// Otherwise the presence of a single colon determines if there's a port
307
+	// since IPv6 addresses outside of brackets (count > 1) can't have a
308
+	// port.
309
+	return strings.Count(s, ":") == 1
310
+}
311
+
312
+// ensurePort makes sure the given string has a port number on it, otherwise it
313
+// appends the given port as a default.
314
+func ensurePort(s string, port int) string {
315
+	if hasPort(s) {
316
+		return s
317
+	}
318
+
319
+	// If this is an IPv6 address, the join call will add another set of
320
+	// brackets, so we have to trim before we add the default port.
321
+	s = strings.Trim(s, "[]")
322
+	s = net.JoinHostPort(s, strconv.Itoa(port))
323
+	return s
324
+}