Fixes #33415
Fixes #33346
Implemented few additional IPVS APIs to be used by other projects
Signed-off-by: Madhu Venugopal <madhu@docker.com>
| ... | ... |
@@ -26,7 +26,7 @@ github.com/imdario/mergo 0.2.1 |
| 26 | 26 |
golang.org/x/sync de49d9dcd27d4f764488181bea099dfe6179bcf0 |
| 27 | 27 |
|
| 28 | 28 |
#get libnetwork packages |
| 29 |
-github.com/docker/libnetwork 83e1e49475b88a9f1f8ba89a690a7d5de42e24b9 |
|
| 29 |
+github.com/docker/libnetwork 2e99f06621c23a5f4038968f1af1e28c84e4104e |
|
| 30 | 30 |
github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894 |
| 31 | 31 |
github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 |
| 32 | 32 |
github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec |
| ... | ... |
@@ -85,6 +85,23 @@ const ( |
| 85 | 85 |
ipvsDestAttrInactiveConnections |
| 86 | 86 |
ipvsDestAttrPersistentConnections |
| 87 | 87 |
ipvsDestAttrStats |
| 88 |
+ ipvsDestAttrAddressFamily |
|
| 89 |
+) |
|
| 90 |
+ |
|
| 91 |
+// IPVS Svc Statistics constancs |
|
| 92 |
+ |
|
| 93 |
+const ( |
|
| 94 |
+ ipvsSvcStatsUnspec int = iota |
|
| 95 |
+ ipvsSvcStatsConns |
|
| 96 |
+ ipvsSvcStatsPktsIn |
|
| 97 |
+ ipvsSvcStatsPktsOut |
|
| 98 |
+ ipvsSvcStatsBytesIn |
|
| 99 |
+ ipvsSvcStatsBytesOut |
|
| 100 |
+ ipvsSvcStatsCPS |
|
| 101 |
+ ipvsSvcStatsPPSIn |
|
| 102 |
+ ipvsSvcStatsPPSOut |
|
| 103 |
+ ipvsSvcStatsBPSIn |
|
| 104 |
+ ipvsSvcStatsBPSOut |
|
| 88 | 105 |
) |
| 89 | 106 |
|
| 90 | 107 |
// Destination forwarding methods |
| ... | ... |
@@ -6,6 +6,7 @@ import ( |
| 6 | 6 |
"net" |
| 7 | 7 |
"syscall" |
| 8 | 8 |
|
| 9 |
+ "fmt" |
|
| 9 | 10 |
"github.com/vishvananda/netlink/nl" |
| 10 | 11 |
"github.com/vishvananda/netns" |
| 11 | 12 |
) |
| ... | ... |
@@ -25,6 +26,21 @@ type Service struct {
|
| 25 | 25 |
Netmask uint32 |
| 26 | 26 |
AddressFamily uint16 |
| 27 | 27 |
PEName string |
| 28 |
+ Stats SvcStats |
|
| 29 |
+} |
|
| 30 |
+ |
|
| 31 |
+// SvcStats defines an IPVS service statistics |
|
| 32 |
+type SvcStats struct {
|
|
| 33 |
+ Connections uint32 |
|
| 34 |
+ PacketsIn uint32 |
|
| 35 |
+ PacketsOut uint32 |
|
| 36 |
+ BytesIn uint64 |
|
| 37 |
+ BytesOut uint64 |
|
| 38 |
+ CPS uint32 |
|
| 39 |
+ BPSOut uint32 |
|
| 40 |
+ PPSIn uint32 |
|
| 41 |
+ PPSOut uint32 |
|
| 42 |
+ BPSIn uint32 |
|
| 28 | 43 |
} |
| 29 | 44 |
|
| 30 | 45 |
// Destination defines an IPVS destination (real server) in its |
| ... | ... |
@@ -117,3 +133,29 @@ func (i *Handle) UpdateDestination(s *Service, d *Destination) error {
|
| 117 | 117 |
func (i *Handle) DelDestination(s *Service, d *Destination) error {
|
| 118 | 118 |
return i.doCmd(s, d, ipvsCmdDelDest) |
| 119 | 119 |
} |
| 120 |
+ |
|
| 121 |
+// GetServices returns an array of services configured on the Node |
|
| 122 |
+func (i *Handle) GetServices() ([]*Service, error) {
|
|
| 123 |
+ return i.doGetServicesCmd(nil) |
|
| 124 |
+} |
|
| 125 |
+ |
|
| 126 |
+// GetDestinations returns an array of Destinations configured for this Service |
|
| 127 |
+func (i *Handle) GetDestinations(s *Service) ([]*Destination, error) {
|
|
| 128 |
+ return i.doGetDestinationsCmd(s, nil) |
|
| 129 |
+} |
|
| 130 |
+ |
|
| 131 |
+// GetService gets details of a specific IPVS services, useful in updating statisics etc., |
|
| 132 |
+func (i *Handle) GetService(s *Service) (*Service, error) {
|
|
| 133 |
+ |
|
| 134 |
+ res, err := i.doGetServicesCmd(s) |
|
| 135 |
+ if err != nil {
|
|
| 136 |
+ return nil, err |
|
| 137 |
+ } |
|
| 138 |
+ |
|
| 139 |
+ // We are looking for exactly one service otherwise error out |
|
| 140 |
+ if len(res) != 1 {
|
|
| 141 |
+ return nil, fmt.Errorf("Expected only one service obtained=%d", len(res))
|
|
| 142 |
+ } |
|
| 143 |
+ |
|
| 144 |
+ return res[0], nil |
|
| 145 |
+} |
| ... | ... |
@@ -19,6 +19,7 @@ import ( |
| 19 | 19 |
"github.com/vishvananda/netns" |
| 20 | 20 |
) |
| 21 | 21 |
|
| 22 |
+// For Quick Reference IPVS related netlink message is described at the end of this file. |
|
| 22 | 23 |
var ( |
| 23 | 24 |
native = nl.NativeEndian() |
| 24 | 25 |
ipvsFamily int |
| ... | ... |
@@ -89,7 +90,6 @@ func fillService(s *Service) nl.NetlinkRequestData {
|
| 89 | 89 |
if s.PEName != "" {
|
| 90 | 90 |
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPEName, nl.ZeroTerminated(s.PEName)) |
| 91 | 91 |
} |
| 92 |
- |
|
| 93 | 92 |
f := &ipvsFlags{
|
| 94 | 93 |
flags: s.Flags, |
| 95 | 94 |
mask: 0xFFFFFFFF, |
| ... | ... |
@@ -117,20 +117,38 @@ func fillDestinaton(d *Destination) nl.NetlinkRequestData {
|
| 117 | 117 |
return cmdAttr |
| 118 | 118 |
} |
| 119 | 119 |
|
| 120 |
-func (i *Handle) doCmd(s *Service, d *Destination, cmd uint8) error {
|
|
| 120 |
+func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) {
|
|
| 121 | 121 |
req := newIPVSRequest(cmd) |
| 122 | 122 |
req.Seq = atomic.AddUint32(&i.seq, 1) |
| 123 |
- req.AddData(fillService(s)) |
|
| 124 | 123 |
|
| 125 |
- if d != nil {
|
|
| 124 |
+ if s == nil {
|
|
| 125 |
+ req.Flags |= syscall.NLM_F_DUMP //Flag to dump all messages |
|
| 126 |
+ req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute |
|
| 127 |
+ } else {
|
|
| 128 |
+ req.AddData(fillService(s)) |
|
| 129 |
+ } |
|
| 130 |
+ |
|
| 131 |
+ if d == nil {
|
|
| 132 |
+ if cmd == ipvsCmdGetDest {
|
|
| 133 |
+ req.Flags |= syscall.NLM_F_DUMP |
|
| 134 |
+ } |
|
| 135 |
+ |
|
| 136 |
+ } else {
|
|
| 126 | 137 |
req.AddData(fillDestinaton(d)) |
| 127 | 138 |
} |
| 128 | 139 |
|
| 129 |
- if _, err := execute(i.sock, req, 0); err != nil {
|
|
| 130 |
- return err |
|
| 140 |
+ res, err := execute(i.sock, req, 0) |
|
| 141 |
+ if err != nil {
|
|
| 142 |
+ return [][]byte{}, err
|
|
| 131 | 143 |
} |
| 132 | 144 |
|
| 133 |
- return nil |
|
| 145 |
+ return res, nil |
|
| 146 |
+} |
|
| 147 |
+ |
|
| 148 |
+func (i *Handle) doCmd(s *Service, d *Destination, cmd uint8) error {
|
|
| 149 |
+ _, err := i.doCmdwithResponse(s, d, cmd) |
|
| 150 |
+ |
|
| 151 |
+ return err |
|
| 134 | 152 |
} |
| 135 | 153 |
|
| 136 | 154 |
func getIPVSFamily() (int, error) {
|
| ... | ... |
@@ -171,7 +189,6 @@ func rawIPData(ip net.IP) []byte {
|
| 171 | 171 |
if family == nl.FAMILY_V4 {
|
| 172 | 172 |
return ip.To4() |
| 173 | 173 |
} |
| 174 |
- |
|
| 175 | 174 |
return ip |
| 176 | 175 |
} |
| 177 | 176 |
|
| ... | ... |
@@ -235,3 +252,295 @@ done: |
| 235 | 235 |
} |
| 236 | 236 |
return res, nil |
| 237 | 237 |
} |
| 238 |
+ |
|
| 239 |
+func parseIP(ip []byte, family uint16) (net.IP, error) {
|
|
| 240 |
+ |
|
| 241 |
+ var resIP net.IP |
|
| 242 |
+ |
|
| 243 |
+ switch family {
|
|
| 244 |
+ case syscall.AF_INET: |
|
| 245 |
+ resIP = (net.IP)(ip[:4]) |
|
| 246 |
+ case syscall.AF_INET6: |
|
| 247 |
+ resIP = (net.IP)(ip[:16]) |
|
| 248 |
+ default: |
|
| 249 |
+ return nil, fmt.Errorf("parseIP Error ip=%v", ip)
|
|
| 250 |
+ |
|
| 251 |
+ } |
|
| 252 |
+ return resIP, nil |
|
| 253 |
+} |
|
| 254 |
+ |
|
| 255 |
+// parseStats |
|
| 256 |
+func assembleStats(msg []byte) (SvcStats, error) {
|
|
| 257 |
+ |
|
| 258 |
+ var s SvcStats |
|
| 259 |
+ |
|
| 260 |
+ attrs, err := nl.ParseRouteAttr(msg) |
|
| 261 |
+ if err != nil {
|
|
| 262 |
+ return s, err |
|
| 263 |
+ } |
|
| 264 |
+ |
|
| 265 |
+ for _, attr := range attrs {
|
|
| 266 |
+ attrType := int(attr.Attr.Type) |
|
| 267 |
+ switch attrType {
|
|
| 268 |
+ case ipvsSvcStatsConns: |
|
| 269 |
+ s.Connections = native.Uint32(attr.Value) |
|
| 270 |
+ case ipvsSvcStatsPktsIn: |
|
| 271 |
+ s.PacketsIn = native.Uint32(attr.Value) |
|
| 272 |
+ case ipvsSvcStatsPktsOut: |
|
| 273 |
+ s.PacketsOut = native.Uint32(attr.Value) |
|
| 274 |
+ case ipvsSvcStatsBytesIn: |
|
| 275 |
+ s.BytesIn = native.Uint64(attr.Value) |
|
| 276 |
+ case ipvsSvcStatsBytesOut: |
|
| 277 |
+ s.BytesOut = native.Uint64(attr.Value) |
|
| 278 |
+ case ipvsSvcStatsCPS: |
|
| 279 |
+ s.CPS = native.Uint32(attr.Value) |
|
| 280 |
+ case ipvsSvcStatsPPSIn: |
|
| 281 |
+ s.PPSIn = native.Uint32(attr.Value) |
|
| 282 |
+ case ipvsSvcStatsPPSOut: |
|
| 283 |
+ s.PPSOut = native.Uint32(attr.Value) |
|
| 284 |
+ case ipvsSvcStatsBPSIn: |
|
| 285 |
+ s.BPSIn = native.Uint32(attr.Value) |
|
| 286 |
+ case ipvsSvcStatsBPSOut: |
|
| 287 |
+ s.BPSOut = native.Uint32(attr.Value) |
|
| 288 |
+ } |
|
| 289 |
+ } |
|
| 290 |
+ return s, nil |
|
| 291 |
+} |
|
| 292 |
+ |
|
| 293 |
+// assembleService assembles a services back from a hain of netlink attributes |
|
| 294 |
+func assembleService(attrs []syscall.NetlinkRouteAttr) (*Service, error) {
|
|
| 295 |
+ |
|
| 296 |
+ var s Service |
|
| 297 |
+ |
|
| 298 |
+ for _, attr := range attrs {
|
|
| 299 |
+ |
|
| 300 |
+ attrType := int(attr.Attr.Type) |
|
| 301 |
+ |
|
| 302 |
+ switch attrType {
|
|
| 303 |
+ |
|
| 304 |
+ case ipvsSvcAttrAddressFamily: |
|
| 305 |
+ s.AddressFamily = native.Uint16(attr.Value) |
|
| 306 |
+ case ipvsSvcAttrProtocol: |
|
| 307 |
+ s.Protocol = native.Uint16(attr.Value) |
|
| 308 |
+ case ipvsSvcAttrAddress: |
|
| 309 |
+ ip, err := parseIP(attr.Value, s.AddressFamily) |
|
| 310 |
+ if err != nil {
|
|
| 311 |
+ return nil, err |
|
| 312 |
+ } |
|
| 313 |
+ s.Address = ip |
|
| 314 |
+ case ipvsSvcAttrPort: |
|
| 315 |
+ s.Port = binary.BigEndian.Uint16(attr.Value) |
|
| 316 |
+ case ipvsSvcAttrFWMark: |
|
| 317 |
+ s.FWMark = native.Uint32(attr.Value) |
|
| 318 |
+ case ipvsSvcAttrSchedName: |
|
| 319 |
+ s.SchedName = nl.BytesToString(attr.Value) |
|
| 320 |
+ case ipvsSvcAttrFlags: |
|
| 321 |
+ s.Flags = native.Uint32(attr.Value) |
|
| 322 |
+ case ipvsSvcAttrTimeout: |
|
| 323 |
+ s.Timeout = native.Uint32(attr.Value) |
|
| 324 |
+ case ipvsSvcAttrNetmask: |
|
| 325 |
+ s.Netmask = native.Uint32(attr.Value) |
|
| 326 |
+ case ipvsSvcAttrStats: |
|
| 327 |
+ stats, err := assembleStats(attr.Value) |
|
| 328 |
+ if err != nil {
|
|
| 329 |
+ return nil, err |
|
| 330 |
+ } |
|
| 331 |
+ s.Stats = stats |
|
| 332 |
+ } |
|
| 333 |
+ |
|
| 334 |
+ } |
|
| 335 |
+ return &s, nil |
|
| 336 |
+} |
|
| 337 |
+ |
|
| 338 |
+// parseService given a ipvs netlink response this function will respond with a valid service entry, an error otherwise |
|
| 339 |
+func (i *Handle) parseService(msg []byte) (*Service, error) {
|
|
| 340 |
+ |
|
| 341 |
+ var s *Service |
|
| 342 |
+ |
|
| 343 |
+ //Remove General header for this message and parse the NetLink message |
|
| 344 |
+ hdr := deserializeGenlMsg(msg) |
|
| 345 |
+ NetLinkAttrs, err := nl.ParseRouteAttr(msg[hdr.Len():]) |
|
| 346 |
+ if err != nil {
|
|
| 347 |
+ return nil, err |
|
| 348 |
+ } |
|
| 349 |
+ if len(NetLinkAttrs) == 0 {
|
|
| 350 |
+ return nil, fmt.Errorf("error no valid netlink message found while parsing service record")
|
|
| 351 |
+ } |
|
| 352 |
+ |
|
| 353 |
+ //Now Parse and get IPVS related attributes messages packed in this message. |
|
| 354 |
+ ipvsAttrs, err := nl.ParseRouteAttr(NetLinkAttrs[0].Value) |
|
| 355 |
+ if err != nil {
|
|
| 356 |
+ return nil, err |
|
| 357 |
+ } |
|
| 358 |
+ |
|
| 359 |
+ //Assemble all the IPVS related attribute messages and create a service record |
|
| 360 |
+ s, err = assembleService(ipvsAttrs) |
|
| 361 |
+ if err != nil {
|
|
| 362 |
+ return nil, err |
|
| 363 |
+ } |
|
| 364 |
+ |
|
| 365 |
+ return s, nil |
|
| 366 |
+} |
|
| 367 |
+ |
|
| 368 |
+// doGetServicesCmd a wrapper which could be used commonly for both GetServices() and GetService(*Service) |
|
| 369 |
+func (i *Handle) doGetServicesCmd(svc *Service) ([]*Service, error) {
|
|
| 370 |
+ var res []*Service |
|
| 371 |
+ |
|
| 372 |
+ msgs, err := i.doCmdwithResponse(svc, nil, ipvsCmdGetService) |
|
| 373 |
+ if err != nil {
|
|
| 374 |
+ return nil, err |
|
| 375 |
+ } |
|
| 376 |
+ |
|
| 377 |
+ for _, msg := range msgs {
|
|
| 378 |
+ srv, err := i.parseService(msg) |
|
| 379 |
+ if err != nil {
|
|
| 380 |
+ return nil, err |
|
| 381 |
+ } |
|
| 382 |
+ res = append(res, srv) |
|
| 383 |
+ } |
|
| 384 |
+ |
|
| 385 |
+ return res, nil |
|
| 386 |
+} |
|
| 387 |
+ |
|
| 388 |
+func assembleDestination(attrs []syscall.NetlinkRouteAttr) (*Destination, error) {
|
|
| 389 |
+ |
|
| 390 |
+ var d Destination |
|
| 391 |
+ |
|
| 392 |
+ for _, attr := range attrs {
|
|
| 393 |
+ |
|
| 394 |
+ attrType := int(attr.Attr.Type) |
|
| 395 |
+ |
|
| 396 |
+ switch attrType {
|
|
| 397 |
+ case ipvsDestAttrAddress: |
|
| 398 |
+ ip, err := parseIP(attr.Value, syscall.AF_INET) |
|
| 399 |
+ if err != nil {
|
|
| 400 |
+ return nil, err |
|
| 401 |
+ } |
|
| 402 |
+ d.Address = ip |
|
| 403 |
+ case ipvsDestAttrPort: |
|
| 404 |
+ d.Port = binary.BigEndian.Uint16(attr.Value) |
|
| 405 |
+ case ipvsDestAttrForwardingMethod: |
|
| 406 |
+ d.ConnectionFlags = native.Uint32(attr.Value) |
|
| 407 |
+ case ipvsDestAttrWeight: |
|
| 408 |
+ d.Weight = int(native.Uint16(attr.Value)) |
|
| 409 |
+ case ipvsDestAttrUpperThreshold: |
|
| 410 |
+ d.UpperThreshold = native.Uint32(attr.Value) |
|
| 411 |
+ case ipvsDestAttrLowerThreshold: |
|
| 412 |
+ d.LowerThreshold = native.Uint32(attr.Value) |
|
| 413 |
+ case ipvsDestAttrAddressFamily: |
|
| 414 |
+ d.AddressFamily = native.Uint16(attr.Value) |
|
| 415 |
+ } |
|
| 416 |
+ } |
|
| 417 |
+ return &d, nil |
|
| 418 |
+} |
|
| 419 |
+ |
|
| 420 |
+// parseDestination given a ipvs netlink response this function will respond with a valid destination entry, an error otherwise |
|
| 421 |
+func (i *Handle) parseDestination(msg []byte) (*Destination, error) {
|
|
| 422 |
+ var dst *Destination |
|
| 423 |
+ |
|
| 424 |
+ //Remove General header for this message |
|
| 425 |
+ hdr := deserializeGenlMsg(msg) |
|
| 426 |
+ NetLinkAttrs, err := nl.ParseRouteAttr(msg[hdr.Len():]) |
|
| 427 |
+ if err != nil {
|
|
| 428 |
+ return nil, err |
|
| 429 |
+ } |
|
| 430 |
+ if len(NetLinkAttrs) == 0 {
|
|
| 431 |
+ return nil, fmt.Errorf("error no valid netlink message found while parsing destination record")
|
|
| 432 |
+ } |
|
| 433 |
+ |
|
| 434 |
+ //Now Parse and get IPVS related attributes messages packed in this message. |
|
| 435 |
+ ipvsAttrs, err := nl.ParseRouteAttr(NetLinkAttrs[0].Value) |
|
| 436 |
+ if err != nil {
|
|
| 437 |
+ return nil, err |
|
| 438 |
+ } |
|
| 439 |
+ |
|
| 440 |
+ //Assemble netlink attributes and create a Destination record |
|
| 441 |
+ dst, err = assembleDestination(ipvsAttrs) |
|
| 442 |
+ if err != nil {
|
|
| 443 |
+ return nil, err |
|
| 444 |
+ } |
|
| 445 |
+ |
|
| 446 |
+ return dst, nil |
|
| 447 |
+} |
|
| 448 |
+ |
|
| 449 |
+// doGetDestinationsCmd a wrapper function to be used by GetDestinations and GetDestination(d) apis |
|
| 450 |
+func (i *Handle) doGetDestinationsCmd(s *Service, d *Destination) ([]*Destination, error) {
|
|
| 451 |
+ |
|
| 452 |
+ var res []*Destination |
|
| 453 |
+ |
|
| 454 |
+ msgs, err := i.doCmdwithResponse(s, d, ipvsCmdGetDest) |
|
| 455 |
+ if err != nil {
|
|
| 456 |
+ return nil, err |
|
| 457 |
+ } |
|
| 458 |
+ |
|
| 459 |
+ for _, msg := range msgs {
|
|
| 460 |
+ dest, err := i.parseDestination(msg) |
|
| 461 |
+ if err != nil {
|
|
| 462 |
+ return res, err |
|
| 463 |
+ } |
|
| 464 |
+ res = append(res, dest) |
|
| 465 |
+ } |
|
| 466 |
+ return res, nil |
|
| 467 |
+} |
|
| 468 |
+ |
|
| 469 |
+// IPVS related netlink message format explained |
|
| 470 |
+ |
|
| 471 |
+/* EACH NETLINK MSG is of the below format, this is what we will receive from execute() api. |
|
| 472 |
+ If we have multiple netlink objects to process like GetServices() etc., execute() will |
|
| 473 |
+ supply an array of this below object |
|
| 474 |
+ |
|
| 475 |
+ NETLINK MSG |
|
| 476 |
+|-----------------------------------| |
|
| 477 |
+ 0 1 2 3 |
|
| 478 |
+|--------|--------|--------|--------| - |
|
| 479 |
+| CMD ID | VER | RESERVED | |==> General Message Header represented by genlMsgHdr |
|
| 480 |
+|-----------------------------------| - |
|
| 481 |
+| ATTR LEN | ATTR TYPE | | |
|
| 482 |
+|-----------------------------------| | |
|
| 483 |
+| | | |
|
| 484 |
+| VALUE | | |
|
| 485 |
+| []byte Array of IPVS MSG | |==> Attribute Message represented by syscall.NetlinkRouteAttr |
|
| 486 |
+| PADDED BY 4 BYTES | | |
|
| 487 |
+| | | |
|
| 488 |
+|-----------------------------------| - |
|
| 489 |
+ |
|
| 490 |
+ |
|
| 491 |
+ Once We strip genlMsgHdr from above NETLINK MSG, we should parse the VALUE. |
|
| 492 |
+ VALUE will have an array of netlink attributes (syscall.NetlinkRouteAttr) such that each attribute will |
|
| 493 |
+ represent a "Service" or "Destination" object's field. If we assemble these attributes we can construct |
|
| 494 |
+ Service or Destination. |
|
| 495 |
+ |
|
| 496 |
+ IPVS MSG |
|
| 497 |
+|-----------------------------------| |
|
| 498 |
+ 0 1 2 3 |
|
| 499 |
+|--------|--------|--------|--------| |
|
| 500 |
+| ATTR LEN | ATTR TYPE | |
|
| 501 |
+|-----------------------------------| |
|
| 502 |
+| | |
|
| 503 |
+| | |
|
| 504 |
+| []byte IPVS ATTRIBUTE BY 4 BYTES | |
|
| 505 |
+| | |
|
| 506 |
+| | |
|
| 507 |
+|-----------------------------------| |
|
| 508 |
+ NEXT ATTRIBUTE |
|
| 509 |
+|-----------------------------------| |
|
| 510 |
+| ATTR LEN | ATTR TYPE | |
|
| 511 |
+|-----------------------------------| |
|
| 512 |
+| | |
|
| 513 |
+| | |
|
| 514 |
+| []byte IPVS ATTRIBUTE BY 4 BYTES | |
|
| 515 |
+| | |
|
| 516 |
+| | |
|
| 517 |
+|-----------------------------------| |
|
| 518 |
+ NEXT ATTRIBUTE |
|
| 519 |
+|-----------------------------------| |
|
| 520 |
+| ATTR LEN | ATTR TYPE | |
|
| 521 |
+|-----------------------------------| |
|
| 522 |
+| | |
|
| 523 |
+| | |
|
| 524 |
+| []byte IPVS ATTRIBUTE BY 4 BYTES | |
|
| 525 |
+| | |
|
| 526 |
+| | |
|
| 527 |
+|-----------------------------------| |
|
| 528 |
+ |
|
| 529 |
+*/ |
| ... | ... |
@@ -412,6 +412,9 @@ func (n *network) applyConfigurationTo(to *network) error {
|
| 412 | 412 |
} |
| 413 | 413 |
} |
| 414 | 414 |
} |
| 415 |
+ if len(n.ipamType) != 0 {
|
|
| 416 |
+ to.ipamType = n.ipamType |
|
| 417 |
+ } |
|
| 415 | 418 |
if len(n.ipamOptions) > 0 {
|
| 416 | 419 |
to.ipamOptions = make(map[string]string, len(n.ipamOptions)) |
| 417 | 420 |
for k, v := range n.ipamOptions {
|
| ... | ... |
@@ -17,6 +17,25 @@ func (d *delegate) NodeMeta(limit int) []byte {
|
| 17 | 17 |
return []byte{}
|
| 18 | 18 |
} |
| 19 | 19 |
|
| 20 |
+func (nDB *NetworkDB) getNode(nEvent *NodeEvent) *node {
|
|
| 21 |
+ nDB.Lock() |
|
| 22 |
+ defer nDB.Unlock() |
|
| 23 |
+ |
|
| 24 |
+ for _, nodes := range []map[string]*node{
|
|
| 25 |
+ nDB.failedNodes, |
|
| 26 |
+ nDB.leftNodes, |
|
| 27 |
+ nDB.nodes, |
|
| 28 |
+ } {
|
|
| 29 |
+ if n, ok := nodes[nEvent.NodeName]; ok {
|
|
| 30 |
+ if n.ltime >= nEvent.LTime {
|
|
| 31 |
+ return nil |
|
| 32 |
+ } |
|
| 33 |
+ return n |
|
| 34 |
+ } |
|
| 35 |
+ } |
|
| 36 |
+ return nil |
|
| 37 |
+} |
|
| 38 |
+ |
|
| 20 | 39 |
func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
|
| 21 | 40 |
nDB.Lock() |
| 22 | 41 |
defer nDB.Unlock() |
| ... | ... |
@@ -63,10 +82,28 @@ func (nDB *NetworkDB) purgeSameNode(n *node) {
|
| 63 | 63 |
} |
| 64 | 64 |
|
| 65 | 65 |
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
|
| 66 |
- n := nDB.checkAndGetNode(nEvent) |
|
| 66 |
+ // Update our local clock if the received messages has newer |
|
| 67 |
+ // time. |
|
| 68 |
+ nDB.networkClock.Witness(nEvent.LTime) |
|
| 69 |
+ |
|
| 70 |
+ n := nDB.getNode(nEvent) |
|
| 67 | 71 |
if n == nil {
|
| 68 | 72 |
return false |
| 69 | 73 |
} |
| 74 |
+ // If its a node leave event for a manager and this is the only manager we |
|
| 75 |
+ // know of we want the reconnect logic to kick in. In a single manager |
|
| 76 |
+ // cluster manager's gossip can't be bootstrapped unless some other node |
|
| 77 |
+ // connects to it. |
|
| 78 |
+ if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave {
|
|
| 79 |
+ for _, ip := range nDB.bootStrapIP {
|
|
| 80 |
+ if ip.Equal(n.Addr) {
|
|
| 81 |
+ n.ltime = nEvent.LTime |
|
| 82 |
+ return true |
|
| 83 |
+ } |
|
| 84 |
+ } |
|
| 85 |
+ } |
|
| 86 |
+ |
|
| 87 |
+ n = nDB.checkAndGetNode(nEvent) |
|
| 70 | 88 |
|
| 71 | 89 |
nDB.purgeSameNode(n) |
| 72 | 90 |
n.ltime = nEvent.LTime |
| ... | ... |
@@ -76,11 +113,13 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
|
| 76 | 76 |
nDB.Lock() |
| 77 | 77 |
nDB.nodes[n.Name] = n |
| 78 | 78 |
nDB.Unlock() |
| 79 |
+ logrus.Infof("Node join event for %s/%s", n.Name, n.Addr)
|
|
| 79 | 80 |
return true |
| 80 | 81 |
case NodeEventTypeLeave: |
| 81 | 82 |
nDB.Lock() |
| 82 | 83 |
nDB.leftNodes[n.Name] = n |
| 83 | 84 |
nDB.Unlock() |
| 85 |
+ logrus.Infof("Node leave event for %s/%s", n.Name, n.Addr)
|
|
| 84 | 86 |
return true |
| 85 | 87 |
} |
| 86 | 88 |
|
| ... | ... |
@@ -22,6 +22,7 @@ func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) {
|
| 22 | 22 |
} |
| 23 | 23 |
|
| 24 | 24 |
func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
|
| 25 |
+ logrus.Infof("Node %s/%s, joined gossip cluster", mn.Name, mn.Addr)
|
|
| 25 | 26 |
e.broadcastNodeEvent(mn.Addr, opCreate) |
| 26 | 27 |
e.nDB.Lock() |
| 27 | 28 |
// In case the node is rejoining after a failure or leave, |
| ... | ... |
@@ -37,9 +38,12 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
|
| 37 | 37 |
|
| 38 | 38 |
e.nDB.nodes[mn.Name] = &node{Node: *mn}
|
| 39 | 39 |
e.nDB.Unlock() |
| 40 |
+ logrus.Infof("Node %s/%s, added to nodes list", mn.Name, mn.Addr)
|
|
| 40 | 41 |
} |
| 41 | 42 |
|
| 42 | 43 |
func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
|
| 44 |
+ var failed bool |
|
| 45 |
+ logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr)
|
|
| 43 | 46 |
e.broadcastNodeEvent(mn.Addr, opDelete) |
| 44 | 47 |
e.nDB.deleteNodeTableEntries(mn.Name) |
| 45 | 48 |
e.nDB.deleteNetworkEntriesForNode(mn.Name) |
| ... | ... |
@@ -47,10 +51,17 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
|
| 47 | 47 |
if n, ok := e.nDB.nodes[mn.Name]; ok {
|
| 48 | 48 |
delete(e.nDB.nodes, mn.Name) |
| 49 | 49 |
|
| 50 |
- n.reapTime = reapInterval |
|
| 50 |
+ // In case of node failure, keep retrying to reconnect every retryInterval (1sec) for nodeReapInterval (24h) |
|
| 51 |
+ // Explicit leave will have already removed the node from the list of nodes (nDB.nodes) and put it into the leftNodes map |
|
| 52 |
+ n.reapTime = nodeReapInterval |
|
| 51 | 53 |
e.nDB.failedNodes[mn.Name] = n |
| 54 |
+ failed = true |
|
| 52 | 55 |
} |
| 53 | 56 |
e.nDB.Unlock() |
| 57 |
+ if failed {
|
|
| 58 |
+ logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
|
|
| 59 |
+ } |
|
| 60 |
+ |
|
| 54 | 61 |
} |
| 55 | 62 |
|
| 56 | 63 |
func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) {
|
| ... | ... |
@@ -4,6 +4,7 @@ package networkdb |
| 4 | 4 |
|
| 5 | 5 |
import ( |
| 6 | 6 |
"fmt" |
| 7 |
+ "net" |
|
| 7 | 8 |
"strings" |
| 8 | 9 |
"sync" |
| 9 | 10 |
"time" |
| ... | ... |
@@ -88,6 +89,10 @@ type NetworkDB struct {
|
| 88 | 88 |
|
| 89 | 89 |
// Reference to the memberlist's keyring to add & remove keys |
| 90 | 90 |
keyring *memberlist.Keyring |
| 91 |
+ |
|
| 92 |
+ // bootStrapIP is the list of IPs that can be used to bootstrap |
|
| 93 |
+ // the gossip. |
|
| 94 |
+ bootStrapIP []net.IP |
|
| 91 | 95 |
} |
| 92 | 96 |
|
| 93 | 97 |
// PeerInfo represents the peer (gossip cluster) nodes of a network |
| ... | ... |
@@ -194,6 +199,11 @@ func New(c *Config) (*NetworkDB, error) {
|
| 194 | 194 |
// Join joins this NetworkDB instance with a list of peer NetworkDB |
| 195 | 195 |
// instances passed by the caller in the form of addr:port |
| 196 | 196 |
func (nDB *NetworkDB) Join(members []string) error {
|
| 197 |
+ nDB.Lock() |
|
| 198 |
+ for _, m := range members {
|
|
| 199 |
+ nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m)) |
|
| 200 |
+ } |
|
| 201 |
+ nDB.Unlock() |
|
| 197 | 202 |
return nDB.clusterJoin(members) |
| 198 | 203 |
} |
| 199 | 204 |
|