Contains Service Discovery hardening fixes via
https://github.com/docker/libnetwork/pull/1796
Fixes multiple issues such as #32830
Signed-off-by: Madhu Venugopal <madhu@docker.com>
| ... | ... |
@@ -26,7 +26,7 @@ github.com/imdario/mergo 0.2.1 |
| 26 | 26 |
golang.org/x/sync de49d9dcd27d4f764488181bea099dfe6179bcf0 |
| 27 | 27 |
|
| 28 | 28 |
#get libnetwork packages |
| 29 |
-github.com/docker/libnetwork eb57059e91bc54c9da23c5a633b75b3faf910a68 |
|
| 29 |
+github.com/docker/libnetwork f4a15a0890383619ad797b3bd2481cc6f46a978d |
|
| 30 | 30 |
github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894 |
| 31 | 31 |
github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 |
| 32 | 32 |
github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec |
| ... | ... |
@@ -583,7 +583,7 @@ func (ep *endpoint) deleteDriverInfoFromCluster() error {
|
| 583 | 583 |
return nil |
| 584 | 584 |
} |
| 585 | 585 |
|
| 586 |
-func (ep *endpoint) addServiceInfoToCluster() error {
|
|
| 586 |
+func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
|
|
| 587 | 587 |
if ep.isAnonymous() && len(ep.myAliases) == 0 || ep.Iface().Address() == nil {
|
| 588 | 588 |
return nil |
| 589 | 589 |
} |
| ... | ... |
@@ -593,24 +593,49 @@ func (ep *endpoint) addServiceInfoToCluster() error {
|
| 593 | 593 |
return nil |
| 594 | 594 |
} |
| 595 | 595 |
|
| 596 |
+ sb.Service.Lock() |
|
| 597 |
+ defer sb.Service.Unlock() |
|
| 598 |
+ logrus.Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())
|
|
| 599 |
+ |
|
| 600 |
+ // Check that the endpoint is still present on the sandbox before adding it to the service discovery. |
|
| 601 |
+ // This is to handle a race between the EnableService and the sbLeave |
|
| 602 |
+ // It is possible that the EnableService starts, fetches the list of the endpoints and |
|
| 603 |
+ // by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox |
|
| 604 |
+ // The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster. |
|
| 605 |
+ // This check under the Service lock of the sandbox ensure the correct behavior. |
|
| 606 |
+ // If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit |
|
| 607 |
+ // but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed. |
|
| 608 |
+ // In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is |
|
| 609 |
+ // removed from the list, in this situation the delete will bail out not finding any data to cleanup |
|
| 610 |
+ // and the add will bail out not finding the endpoint on the sandbox. |
|
| 611 |
+ if e := sb.getEndpoint(ep.ID()); e == nil {
|
|
| 612 |
+ logrus.Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
|
|
| 613 |
+ return nil |
|
| 614 |
+ } |
|
| 615 |
+ |
|
| 596 | 616 |
c := n.getController() |
| 597 | 617 |
agent := c.getAgent() |
| 598 | 618 |
|
| 619 |
+ name := ep.Name() |
|
| 620 |
+ if ep.isAnonymous() {
|
|
| 621 |
+ name = ep.MyAliases()[0] |
|
| 622 |
+ } |
|
| 623 |
+ |
|
| 599 | 624 |
var ingressPorts []*PortConfig |
| 600 | 625 |
if ep.svcID != "" {
|
| 626 |
+ // This is a task part of a service |
|
| 601 | 627 |
// Gossip ingress ports only in ingress network. |
| 602 | 628 |
if n.ingress {
|
| 603 | 629 |
ingressPorts = ep.ingressPorts |
| 604 | 630 |
} |
| 605 |
- |
|
| 606 |
- if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
|
|
| 631 |
+ if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
|
|
| 632 |
+ return err |
|
| 633 |
+ } |
|
| 634 |
+ } else {
|
|
| 635 |
+ // This is a container simply attached to an attachable network |
|
| 636 |
+ if err := c.addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
|
|
| 607 | 637 |
return err |
| 608 | 638 |
} |
| 609 |
- } |
|
| 610 |
- |
|
| 611 |
- name := ep.Name() |
|
| 612 |
- if ep.isAnonymous() {
|
|
| 613 |
- name = ep.MyAliases()[0] |
|
| 614 | 639 |
} |
| 615 | 640 |
|
| 616 | 641 |
buf, err := proto.Marshal(&EndpointRecord{
|
| ... | ... |
@@ -634,10 +659,12 @@ func (ep *endpoint) addServiceInfoToCluster() error {
|
| 634 | 634 |
} |
| 635 | 635 |
} |
| 636 | 636 |
|
| 637 |
+ logrus.Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())
|
|
| 638 |
+ |
|
| 637 | 639 |
return nil |
| 638 | 640 |
} |
| 639 | 641 |
|
| 640 |
-func (ep *endpoint) deleteServiceInfoFromCluster() error {
|
|
| 642 |
+func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) error {
|
|
| 641 | 643 |
if ep.isAnonymous() && len(ep.myAliases) == 0 {
|
| 642 | 644 |
return nil |
| 643 | 645 |
} |
| ... | ... |
@@ -647,17 +674,33 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
|
| 647 | 647 |
return nil |
| 648 | 648 |
} |
| 649 | 649 |
|
| 650 |
+ sb.Service.Lock() |
|
| 651 |
+ defer sb.Service.Unlock() |
|
| 652 |
+ logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())
|
|
| 653 |
+ |
|
| 650 | 654 |
c := n.getController() |
| 651 | 655 |
agent := c.getAgent() |
| 652 | 656 |
|
| 653 |
- if ep.svcID != "" && ep.Iface().Address() != nil {
|
|
| 654 |
- var ingressPorts []*PortConfig |
|
| 655 |
- if n.ingress {
|
|
| 656 |
- ingressPorts = ep.ingressPorts |
|
| 657 |
- } |
|
| 657 |
+ name := ep.Name() |
|
| 658 |
+ if ep.isAnonymous() {
|
|
| 659 |
+ name = ep.MyAliases()[0] |
|
| 660 |
+ } |
|
| 658 | 661 |
|
| 659 |
- if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
|
|
| 660 |
- return err |
|
| 662 |
+ if ep.Iface().Address() != nil {
|
|
| 663 |
+ if ep.svcID != "" {
|
|
| 664 |
+ // This is a task part of a service |
|
| 665 |
+ var ingressPorts []*PortConfig |
|
| 666 |
+ if n.ingress {
|
|
| 667 |
+ ingressPorts = ep.ingressPorts |
|
| 668 |
+ } |
|
| 669 |
+ if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
|
|
| 670 |
+ return err |
|
| 671 |
+ } |
|
| 672 |
+ } else {
|
|
| 673 |
+ // This is a container simply attached to an attachable network |
|
| 674 |
+ if err := c.delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
|
|
| 675 |
+ return err |
|
| 676 |
+ } |
|
| 661 | 677 |
} |
| 662 | 678 |
} |
| 663 | 679 |
|
| ... | ... |
@@ -667,6 +710,8 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
|
| 667 | 667 |
} |
| 668 | 668 |
} |
| 669 | 669 |
|
| 670 |
+ logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())
|
|
| 671 |
+ |
|
| 670 | 672 |
return nil |
| 671 | 673 |
} |
| 672 | 674 |
|
| ... | ... |
@@ -814,58 +859,56 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
|
| 814 | 814 |
value = event.Value |
| 815 | 815 |
case networkdb.UpdateEvent: |
| 816 | 816 |
logrus.Errorf("Unexpected update service table event = %#v", event)
|
| 817 |
- } |
|
| 818 |
- |
|
| 819 |
- nw, err := c.NetworkByID(nid) |
|
| 820 |
- if err != nil {
|
|
| 821 |
- logrus.Errorf("Could not find network %s while handling service table event: %v", nid, err)
|
|
| 822 | 817 |
return |
| 823 | 818 |
} |
| 824 |
- n := nw.(*network) |
|
| 825 | 819 |
|
| 826 |
- err = proto.Unmarshal(value, &epRec) |
|
| 820 |
+ err := proto.Unmarshal(value, &epRec) |
|
| 827 | 821 |
if err != nil {
|
| 828 | 822 |
logrus.Errorf("Failed to unmarshal service table value: %v", err)
|
| 829 | 823 |
return |
| 830 | 824 |
} |
| 831 | 825 |
|
| 832 |
- name := epRec.Name |
|
| 826 |
+ containerName := epRec.Name |
|
| 833 | 827 |
svcName := epRec.ServiceName |
| 834 | 828 |
svcID := epRec.ServiceID |
| 835 | 829 |
vip := net.ParseIP(epRec.VirtualIP) |
| 836 | 830 |
ip := net.ParseIP(epRec.EndpointIP) |
| 837 | 831 |
ingressPorts := epRec.IngressPorts |
| 838 |
- aliases := epRec.Aliases |
|
| 839 |
- taskaliases := epRec.TaskAliases |
|
| 832 |
+ serviceAliases := epRec.Aliases |
|
| 833 |
+ taskAliases := epRec.TaskAliases |
|
| 840 | 834 |
|
| 841 |
- if name == "" || ip == nil {
|
|
| 835 |
+ if containerName == "" || ip == nil {
|
|
| 842 | 836 |
logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
|
| 843 | 837 |
return |
| 844 | 838 |
} |
| 845 | 839 |
|
| 846 | 840 |
if isAdd {
|
| 841 |
+ logrus.Debugf("handleEpTableEvent ADD %s R:%v", isAdd, eid, epRec)
|
|
| 847 | 842 |
if svcID != "" {
|
| 848 |
- if err := c.addServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
|
|
| 849 |
- logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
|
|
| 843 |
+ // This is a remote task part of a service |
|
| 844 |
+ if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
|
|
| 845 |
+ logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
|
|
| 850 | 846 |
return |
| 851 | 847 |
} |
| 852 |
- } |
|
| 853 |
- |
|
| 854 |
- n.addSvcRecords(name, ip, nil, true) |
|
| 855 |
- for _, alias := range taskaliases {
|
|
| 856 |
- n.addSvcRecords(alias, ip, nil, true) |
|
| 848 |
+ } else {
|
|
| 849 |
+ // This is a remote container simply attached to an attachable network |
|
| 850 |
+ if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
|
|
| 851 |
+ logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
|
|
| 852 |
+ } |
|
| 857 | 853 |
} |
| 858 | 854 |
} else {
|
| 855 |
+ logrus.Debugf("handleEpTableEvent DEL %s R:%v", isAdd, eid, epRec)
|
|
| 859 | 856 |
if svcID != "" {
|
| 860 |
- if err := c.rmServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
|
|
| 861 |
- logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
|
|
| 857 |
+ // This is a remote task part of a service |
|
| 858 |
+ if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
|
|
| 859 |
+ logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err)
|
|
| 862 | 860 |
return |
| 863 | 861 |
} |
| 864 |
- } |
|
| 865 |
- |
|
| 866 |
- n.deleteSvcRecords(name, ip, nil, true) |
|
| 867 |
- for _, alias := range taskaliases {
|
|
| 868 |
- n.deleteSvcRecords(alias, ip, nil, true) |
|
| 862 |
+ } else {
|
|
| 863 |
+ // This is a remote container simply attached to an attachable network |
|
| 864 |
+ if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
|
|
| 865 |
+ logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
|
|
| 866 |
+ } |
|
| 869 | 867 |
} |
| 870 | 868 |
} |
| 871 | 869 |
} |
| ... | ... |
@@ -14,7 +14,7 @@ option (gogoproto.goproto_stringer_all) = false; |
| 14 | 14 |
// EndpointRecord specifies all the endpoint specific information that |
| 15 | 15 |
// needs to gossiped to nodes participating in the network. |
| 16 | 16 |
message EndpointRecord {
|
| 17 |
- // Name of the endpoint |
|
| 17 |
+ // Name of the container |
|
| 18 | 18 |
string name = 1; |
| 19 | 19 |
|
| 20 | 20 |
// Service name of the service to which this endpoint belongs. |
| 21 | 21 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,123 @@ |
| 0 |
+package common |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ "sync" |
|
| 4 |
+ |
|
| 5 |
+ mapset "github.com/deckarep/golang-set" |
|
| 6 |
+) |
|
| 7 |
+ |
|
| 8 |
+// SetMatrix is a map of Sets |
|
| 9 |
+type SetMatrix interface {
|
|
| 10 |
+ // Get returns the members of the set for a specific key as a slice. |
|
| 11 |
+ Get(key string) ([]interface{}, bool)
|
|
| 12 |
+ // Contains is used to verify is an element is in a set for a specific key |
|
| 13 |
+ // returns true if the element is in the set |
|
| 14 |
+ // returns true if there is a set for the key |
|
| 15 |
+ Contains(key string, value interface{}) (bool, bool)
|
|
| 16 |
+ // Insert inserts the mapping between the IP and the endpoint identifier |
|
| 17 |
+ // returns true if the mapping was not present, false otherwise |
|
| 18 |
+ // returns also the number of endpoints associated to the IP |
|
| 19 |
+ Insert(key string, value interface{}) (bool, int)
|
|
| 20 |
+ // Remove removes the mapping between the IP and the endpoint identifier |
|
| 21 |
+ // returns true if the mapping was deleted, false otherwise |
|
| 22 |
+ // returns also the number of endpoints associated to the IP |
|
| 23 |
+ Remove(key string, value interface{}) (bool, int)
|
|
| 24 |
+ // Cardinality returns the number of elements in the set of a specfic key |
|
| 25 |
+ // returns false if the key is not in the map |
|
| 26 |
+ Cardinality(key string) (int, bool) |
|
| 27 |
+ // String returns the string version of the set, empty otherwise |
|
| 28 |
+ // returns false if the key is not in the map |
|
| 29 |
+ String(key string) (string, bool) |
|
| 30 |
+} |
|
| 31 |
+ |
|
| 32 |
+type setMatrix struct {
|
|
| 33 |
+ matrix map[string]mapset.Set |
|
| 34 |
+ |
|
| 35 |
+ sync.Mutex |
|
| 36 |
+} |
|
| 37 |
+ |
|
| 38 |
+// NewSetMatrix creates a new set matrix object |
|
| 39 |
+func NewSetMatrix() SetMatrix {
|
|
| 40 |
+ s := &setMatrix{}
|
|
| 41 |
+ s.init() |
|
| 42 |
+ return s |
|
| 43 |
+} |
|
| 44 |
+ |
|
| 45 |
+func (s *setMatrix) init() {
|
|
| 46 |
+ s.matrix = make(map[string]mapset.Set) |
|
| 47 |
+} |
|
| 48 |
+ |
|
| 49 |
+func (s *setMatrix) Get(key string) ([]interface{}, bool) {
|
|
| 50 |
+ s.Lock() |
|
| 51 |
+ defer s.Unlock() |
|
| 52 |
+ set, ok := s.matrix[key] |
|
| 53 |
+ if !ok {
|
|
| 54 |
+ return nil, ok |
|
| 55 |
+ } |
|
| 56 |
+ return set.ToSlice(), ok |
|
| 57 |
+} |
|
| 58 |
+ |
|
| 59 |
+func (s *setMatrix) Contains(key string, value interface{}) (bool, bool) {
|
|
| 60 |
+ s.Lock() |
|
| 61 |
+ defer s.Unlock() |
|
| 62 |
+ set, ok := s.matrix[key] |
|
| 63 |
+ if !ok {
|
|
| 64 |
+ return false, ok |
|
| 65 |
+ } |
|
| 66 |
+ return set.Contains(value), ok |
|
| 67 |
+} |
|
| 68 |
+ |
|
| 69 |
+func (s *setMatrix) Insert(key string, value interface{}) (bool, int) {
|
|
| 70 |
+ s.Lock() |
|
| 71 |
+ defer s.Unlock() |
|
| 72 |
+ set, ok := s.matrix[key] |
|
| 73 |
+ if !ok {
|
|
| 74 |
+ s.matrix[key] = mapset.NewSet() |
|
| 75 |
+ s.matrix[key].Add(value) |
|
| 76 |
+ return true, 1 |
|
| 77 |
+ } |
|
| 78 |
+ |
|
| 79 |
+ return set.Add(value), set.Cardinality() |
|
| 80 |
+} |
|
| 81 |
+ |
|
| 82 |
+func (s *setMatrix) Remove(key string, value interface{}) (bool, int) {
|
|
| 83 |
+ s.Lock() |
|
| 84 |
+ defer s.Unlock() |
|
| 85 |
+ set, ok := s.matrix[key] |
|
| 86 |
+ if !ok {
|
|
| 87 |
+ return false, 0 |
|
| 88 |
+ } |
|
| 89 |
+ |
|
| 90 |
+ var removed bool |
|
| 91 |
+ if set.Contains(value) {
|
|
| 92 |
+ set.Remove(value) |
|
| 93 |
+ removed = true |
|
| 94 |
+ // If the set is empty remove it from the matrix |
|
| 95 |
+ if set.Cardinality() == 0 {
|
|
| 96 |
+ delete(s.matrix, key) |
|
| 97 |
+ } |
|
| 98 |
+ } |
|
| 99 |
+ |
|
| 100 |
+ return removed, set.Cardinality() |
|
| 101 |
+} |
|
| 102 |
+ |
|
| 103 |
+func (s *setMatrix) Cardinality(key string) (int, bool) {
|
|
| 104 |
+ s.Lock() |
|
| 105 |
+ defer s.Unlock() |
|
| 106 |
+ set, ok := s.matrix[key] |
|
| 107 |
+ if !ok {
|
|
| 108 |
+ return 0, ok |
|
| 109 |
+ } |
|
| 110 |
+ |
|
| 111 |
+ return set.Cardinality(), ok |
|
| 112 |
+} |
|
| 113 |
+ |
|
| 114 |
+func (s *setMatrix) String(key string) (string, bool) {
|
|
| 115 |
+ s.Lock() |
|
| 116 |
+ defer s.Unlock() |
|
| 117 |
+ set, ok := s.matrix[key] |
|
| 118 |
+ if !ok {
|
|
| 119 |
+ return "", ok |
|
| 120 |
+ } |
|
| 121 |
+ return set.String(), ok |
|
| 122 |
+} |
| ... | ... |
@@ -597,8 +597,14 @@ func (ep *endpoint) rename(name string) error {
|
| 597 | 597 |
|
| 598 | 598 |
c := n.getController() |
| 599 | 599 |
|
| 600 |
+ sb, ok := ep.getSandbox() |
|
| 601 |
+ if !ok {
|
|
| 602 |
+ logrus.Warnf("rename for %s aborted, sandbox %s is not anymore present", ep.ID(), ep.sandboxID)
|
|
| 603 |
+ return nil |
|
| 604 |
+ } |
|
| 605 |
+ |
|
| 600 | 606 |
if c.isAgent() {
|
| 601 |
- if err = ep.deleteServiceInfoFromCluster(); err != nil {
|
|
| 607 |
+ if err = ep.deleteServiceInfoFromCluster(sb, "rename"); err != nil {
|
|
| 602 | 608 |
return types.InternalErrorf("Could not delete service state for endpoint %s from cluster on rename: %v", ep.Name(), err)
|
| 603 | 609 |
} |
| 604 | 610 |
} else {
|
| ... | ... |
@@ -617,15 +623,15 @@ func (ep *endpoint) rename(name string) error {
|
| 617 | 617 |
ep.anonymous = false |
| 618 | 618 |
|
| 619 | 619 |
if c.isAgent() {
|
| 620 |
- if err = ep.addServiceInfoToCluster(); err != nil {
|
|
| 620 |
+ if err = ep.addServiceInfoToCluster(sb); err != nil {
|
|
| 621 | 621 |
return types.InternalErrorf("Could not add service state for endpoint %s to cluster on rename: %v", ep.Name(), err)
|
| 622 | 622 |
} |
| 623 | 623 |
defer func() {
|
| 624 | 624 |
if err != nil {
|
| 625 |
- ep.deleteServiceInfoFromCluster() |
|
| 625 |
+ ep.deleteServiceInfoFromCluster(sb, "rename") |
|
| 626 | 626 |
ep.name = oldName |
| 627 | 627 |
ep.anonymous = oldAnonymous |
| 628 |
- ep.addServiceInfoToCluster() |
|
| 628 |
+ ep.addServiceInfoToCluster(sb) |
|
| 629 | 629 |
} |
| 630 | 630 |
}() |
| 631 | 631 |
} else {
|
| ... | ... |
@@ -746,7 +752,7 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption) |
| 746 | 746 |
return err |
| 747 | 747 |
} |
| 748 | 748 |
|
| 749 |
- if e := ep.deleteServiceInfoFromCluster(); e != nil {
|
|
| 749 |
+ if e := ep.deleteServiceInfoFromCluster(sb, "sbLeave"); e != nil {
|
|
| 750 | 750 |
logrus.Errorf("Could not delete service state for endpoint %s from cluster: %v", ep.Name(), e)
|
| 751 | 751 |
} |
| 752 | 752 |
|
| ... | ... |
@@ -10,6 +10,7 @@ import ( |
| 10 | 10 |
|
| 11 | 11 |
"github.com/Sirupsen/logrus" |
| 12 | 12 |
"github.com/docker/docker/pkg/stringid" |
| 13 |
+ "github.com/docker/libnetwork/common" |
|
| 13 | 14 |
"github.com/docker/libnetwork/config" |
| 14 | 15 |
"github.com/docker/libnetwork/datastore" |
| 15 | 16 |
"github.com/docker/libnetwork/driverapi" |
| ... | ... |
@@ -97,7 +98,7 @@ type ipInfo struct {
|
| 97 | 97 |
type svcInfo struct {
|
| 98 | 98 |
svcMap map[string][]net.IP |
| 99 | 99 |
svcIPv6Map map[string][]net.IP |
| 100 |
- ipMap map[string]*ipInfo |
|
| 100 |
+ ipMap common.SetMatrix |
|
| 101 | 101 |
service map[string][]servicePorts |
| 102 | 102 |
} |
| 103 | 103 |
|
| ... | ... |
@@ -990,6 +991,12 @@ func (n *network) delete(force bool) error {
|
| 990 | 990 |
|
| 991 | 991 |
c.cleanupServiceBindings(n.ID()) |
| 992 | 992 |
|
| 993 |
+ // The network had been left, the service discovery can be cleaned up |
|
| 994 |
+ c.Lock() |
|
| 995 |
+ logrus.Debugf("network %s delete, clean svcRecords", n.id)
|
|
| 996 |
+ delete(c.svcRecords, n.id) |
|
| 997 |
+ c.Unlock() |
|
| 998 |
+ |
|
| 993 | 999 |
removeFromStore: |
| 994 | 1000 |
// deleteFromStore performs an atomic delete operation and the |
| 995 | 1001 |
// network.epCnt will help prevent any possible |
| ... | ... |
@@ -1227,36 +1234,34 @@ func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool |
| 1227 | 1227 |
// breaks some apps |
| 1228 | 1228 |
if ep.isAnonymous() {
|
| 1229 | 1229 |
if len(myAliases) > 0 {
|
| 1230 |
- n.addSvcRecords(myAliases[0], iface.Address().IP, ipv6, true) |
|
| 1230 |
+ n.addSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord") |
|
| 1231 | 1231 |
} |
| 1232 | 1232 |
} else {
|
| 1233 |
- n.addSvcRecords(epName, iface.Address().IP, ipv6, true) |
|
| 1233 |
+ n.addSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord") |
|
| 1234 | 1234 |
} |
| 1235 | 1235 |
for _, alias := range myAliases {
|
| 1236 |
- n.addSvcRecords(alias, iface.Address().IP, ipv6, false) |
|
| 1236 |
+ n.addSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord") |
|
| 1237 | 1237 |
} |
| 1238 | 1238 |
} else {
|
| 1239 | 1239 |
if ep.isAnonymous() {
|
| 1240 | 1240 |
if len(myAliases) > 0 {
|
| 1241 |
- n.deleteSvcRecords(myAliases[0], iface.Address().IP, ipv6, true) |
|
| 1241 |
+ n.deleteSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord") |
|
| 1242 | 1242 |
} |
| 1243 | 1243 |
} else {
|
| 1244 |
- n.deleteSvcRecords(epName, iface.Address().IP, ipv6, true) |
|
| 1244 |
+ n.deleteSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord") |
|
| 1245 | 1245 |
} |
| 1246 | 1246 |
for _, alias := range myAliases {
|
| 1247 |
- n.deleteSvcRecords(alias, iface.Address().IP, ipv6, false) |
|
| 1247 |
+ n.deleteSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord") |
|
| 1248 | 1248 |
} |
| 1249 | 1249 |
} |
| 1250 | 1250 |
} |
| 1251 | 1251 |
} |
| 1252 | 1252 |
|
| 1253 |
-func addIPToName(ipMap map[string]*ipInfo, name string, ip net.IP) {
|
|
| 1253 |
+func addIPToName(ipMap common.SetMatrix, name string, ip net.IP) {
|
|
| 1254 | 1254 |
reverseIP := netutils.ReverseIP(ip.String()) |
| 1255 |
- if _, ok := ipMap[reverseIP]; !ok {
|
|
| 1256 |
- ipMap[reverseIP] = &ipInfo{
|
|
| 1257 |
- name: name, |
|
| 1258 |
- } |
|
| 1259 |
- } |
|
| 1255 |
+ ipMap.Insert(reverseIP, ipInfo{
|
|
| 1256 |
+ name: name, |
|
| 1257 |
+ }) |
|
| 1260 | 1258 |
} |
| 1261 | 1259 |
|
| 1262 | 1260 |
func addNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) {
|
| ... | ... |
@@ -1284,24 +1289,25 @@ func delNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) {
|
| 1284 | 1284 |
} |
| 1285 | 1285 |
} |
| 1286 | 1286 |
|
| 1287 |
-func (n *network) addSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool) {
|
|
| 1287 |
+func (n *network) addSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) {
|
|
| 1288 | 1288 |
// Do not add service names for ingress network as this is a |
| 1289 | 1289 |
// routing only network |
| 1290 | 1290 |
if n.ingress {
|
| 1291 | 1291 |
return |
| 1292 | 1292 |
} |
| 1293 | 1293 |
|
| 1294 |
- logrus.Debugf("(%s).addSvcRecords(%s, %s, %s, %t)", n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate)
|
|
| 1294 |
+ logrus.Debugf("%s (%s).addSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method)
|
|
| 1295 | 1295 |
|
| 1296 | 1296 |
c := n.getController() |
| 1297 | 1297 |
c.Lock() |
| 1298 | 1298 |
defer c.Unlock() |
| 1299 |
+ |
|
| 1299 | 1300 |
sr, ok := c.svcRecords[n.ID()] |
| 1300 | 1301 |
if !ok {
|
| 1301 | 1302 |
sr = svcInfo{
|
| 1302 | 1303 |
svcMap: make(map[string][]net.IP), |
| 1303 | 1304 |
svcIPv6Map: make(map[string][]net.IP), |
| 1304 |
- ipMap: make(map[string]*ipInfo), |
|
| 1305 |
+ ipMap: common.NewSetMatrix(), |
|
| 1305 | 1306 |
} |
| 1306 | 1307 |
c.svcRecords[n.ID()] = sr |
| 1307 | 1308 |
} |
| ... | ... |
@@ -1319,28 +1325,33 @@ func (n *network) addSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUp |
| 1319 | 1319 |
} |
| 1320 | 1320 |
} |
| 1321 | 1321 |
|
| 1322 |
-func (n *network) deleteSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool) {
|
|
| 1322 |
+func (n *network) deleteSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) {
|
|
| 1323 | 1323 |
// Do not delete service names from ingress network as this is a |
| 1324 | 1324 |
// routing only network |
| 1325 | 1325 |
if n.ingress {
|
| 1326 | 1326 |
return |
| 1327 | 1327 |
} |
| 1328 | 1328 |
|
| 1329 |
- logrus.Debugf("(%s).deleteSvcRecords(%s, %s, %s, %t)", n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate)
|
|
| 1329 |
+ logrus.Debugf("%s (%s).deleteSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method)
|
|
| 1330 | 1330 |
|
| 1331 | 1331 |
c := n.getController() |
| 1332 | 1332 |
c.Lock() |
| 1333 | 1333 |
defer c.Unlock() |
| 1334 |
+ |
|
| 1334 | 1335 |
sr, ok := c.svcRecords[n.ID()] |
| 1335 | 1336 |
if !ok {
|
| 1336 | 1337 |
return |
| 1337 | 1338 |
} |
| 1338 | 1339 |
|
| 1339 | 1340 |
if ipMapUpdate {
|
| 1340 |
- delete(sr.ipMap, netutils.ReverseIP(epIP.String())) |
|
| 1341 |
+ sr.ipMap.Remove(netutils.ReverseIP(epIP.String()), ipInfo{
|
|
| 1342 |
+ name: name, |
|
| 1343 |
+ }) |
|
| 1341 | 1344 |
|
| 1342 | 1345 |
if epIPv6 != nil {
|
| 1343 |
- delete(sr.ipMap, netutils.ReverseIP(epIPv6.String())) |
|
| 1346 |
+ sr.ipMap.Remove(netutils.ReverseIP(epIPv6.String()), ipInfo{
|
|
| 1347 |
+ name: name, |
|
| 1348 |
+ }) |
|
| 1344 | 1349 |
} |
| 1345 | 1350 |
} |
| 1346 | 1351 |
|
| ... | ... |
@@ -1868,9 +1879,11 @@ func (n *network) HandleQueryResp(name string, ip net.IP) {
|
| 1868 | 1868 |
} |
| 1869 | 1869 |
|
| 1870 | 1870 |
ipStr := netutils.ReverseIP(ip.String()) |
| 1871 |
- |
|
| 1872 |
- if ipInfo, ok := sr.ipMap[ipStr]; ok {
|
|
| 1873 |
- ipInfo.extResolver = true |
|
| 1871 |
+ // If an object with extResolver == true is already in the set this call will fail |
|
| 1872 |
+ // but anyway it means that has already been inserted before |
|
| 1873 |
+ if ok, _ := sr.ipMap.Contains(ipStr, ipInfo{name: name}); ok {
|
|
| 1874 |
+ sr.ipMap.Remove(ipStr, ipInfo{name: name})
|
|
| 1875 |
+ sr.ipMap.Insert(ipStr, ipInfo{name: name, extResolver: true})
|
|
| 1874 | 1876 |
} |
| 1875 | 1877 |
} |
| 1876 | 1878 |
|
| ... | ... |
@@ -1886,13 +1899,27 @@ func (n *network) ResolveIP(ip string) string {
|
| 1886 | 1886 |
|
| 1887 | 1887 |
nwName := n.Name() |
| 1888 | 1888 |
|
| 1889 |
- ipInfo, ok := sr.ipMap[ip] |
|
| 1889 |
+ elemSet, ok := sr.ipMap.Get(ip) |
|
| 1890 |
+ if !ok || len(elemSet) == 0 {
|
|
| 1891 |
+ return "" |
|
| 1892 |
+ } |
|
| 1893 |
+ // NOTE it is possible to have more than one element in the Set, this will happen |
|
| 1894 |
+ // because of interleave of diffent events from differnt sources (local container create vs |
|
| 1895 |
+ // network db notifications) |
|
| 1896 |
+ // In such cases the resolution will be based on the first element of the set, and can vary |
|
| 1897 |
+ // during the system stabilitation |
|
| 1898 |
+ elem, ok := elemSet[0].(ipInfo) |
|
| 1899 |
+ if !ok {
|
|
| 1900 |
+ setStr, b := sr.ipMap.String(ip) |
|
| 1901 |
+ logrus.Errorf("expected set of ipInfo type for key %s set:%t %s", ip, b, setStr)
|
|
| 1902 |
+ return "" |
|
| 1903 |
+ } |
|
| 1890 | 1904 |
|
| 1891 |
- if !ok || ipInfo.extResolver {
|
|
| 1905 |
+ if elem.extResolver {
|
|
| 1892 | 1906 |
return "" |
| 1893 | 1907 |
} |
| 1894 | 1908 |
|
| 1895 |
- return ipInfo.name + "." + nwName |
|
| 1909 |
+ return elem.name + "." + nwName |
|
| 1896 | 1910 |
} |
| 1897 | 1911 |
|
| 1898 | 1912 |
func (n *network) ResolveService(name string) ([]*net.SRV, []net.IP) {
|
| ... | ... |
@@ -285,7 +285,6 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
|
| 285 | 285 |
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
|
| 286 | 286 |
nDB.Unlock() |
| 287 | 287 |
|
| 288 |
- nDB.broadcaster.Write(makeEvent(opCreate, tname, nid, key, value)) |
|
| 289 | 288 |
return nil |
| 290 | 289 |
} |
| 291 | 290 |
|
| ... | ... |
@@ -313,7 +312,6 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
|
| 313 | 313 |
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
|
| 314 | 314 |
nDB.Unlock() |
| 315 | 315 |
|
| 316 |
- nDB.broadcaster.Write(makeEvent(opUpdate, tname, nid, key, value)) |
|
| 317 | 316 |
return nil |
| 318 | 317 |
} |
| 319 | 318 |
|
| ... | ... |
@@ -359,7 +357,6 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
|
| 359 | 359 |
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
|
| 360 | 360 |
nDB.Unlock() |
| 361 | 361 |
|
| 362 |
- nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, value)) |
|
| 363 | 362 |
return nil |
| 364 | 363 |
} |
| 365 | 364 |
|
| ... | ... |
@@ -86,6 +86,9 @@ type sandbox struct {
|
| 86 | 86 |
ingress bool |
| 87 | 87 |
ndotsSet bool |
| 88 | 88 |
sync.Mutex |
| 89 |
+ // This mutex is used to serialize service related operation for an endpoint |
|
| 90 |
+ // The lock is here because the endpoint is saved into the store so is not unique |
|
| 91 |
+ Service sync.Mutex |
|
| 89 | 92 |
} |
| 90 | 93 |
|
| 91 | 94 |
// These are the container configs used to customize container /etc/hosts file. |
| ... | ... |
@@ -668,26 +671,25 @@ func (sb *sandbox) SetKey(basePath string) error {
|
| 668 | 668 |
} |
| 669 | 669 |
|
| 670 | 670 |
func (sb *sandbox) EnableService() error {
|
| 671 |
+ logrus.Debugf("EnableService %s START", sb.containerID)
|
|
| 671 | 672 |
for _, ep := range sb.getConnectedEndpoints() {
|
| 672 | 673 |
if ep.enableService(true) {
|
| 673 |
- if err := ep.addServiceInfoToCluster(); err != nil {
|
|
| 674 |
+ if err := ep.addServiceInfoToCluster(sb); err != nil {
|
|
| 674 | 675 |
ep.enableService(false) |
| 675 | 676 |
return fmt.Errorf("could not update state for endpoint %s into cluster: %v", ep.Name(), err)
|
| 676 | 677 |
} |
| 677 | 678 |
} |
| 678 | 679 |
} |
| 680 |
+ logrus.Debugf("EnableService %s DONE", sb.containerID)
|
|
| 679 | 681 |
return nil |
| 680 | 682 |
} |
| 681 | 683 |
|
| 682 | 684 |
func (sb *sandbox) DisableService() error {
|
| 685 |
+ logrus.Debugf("DisableService %s START", sb.containerID)
|
|
| 683 | 686 |
for _, ep := range sb.getConnectedEndpoints() {
|
| 684 |
- if ep.enableService(false) {
|
|
| 685 |
- if err := ep.deleteServiceInfoFromCluster(); err != nil {
|
|
| 686 |
- ep.enableService(true) |
|
| 687 |
- return fmt.Errorf("could not delete state for endpoint %s from cluster: %v", ep.Name(), err)
|
|
| 688 |
- } |
|
| 689 |
- } |
|
| 687 |
+ ep.enableService(false) |
|
| 690 | 688 |
} |
| 689 |
+ logrus.Debugf("DisableService %s DONE", sb.containerID)
|
|
| 691 | 690 |
return nil |
| 692 | 691 |
} |
| 693 | 692 |
|
| ... | ... |
@@ -4,6 +4,8 @@ import ( |
| 4 | 4 |
"fmt" |
| 5 | 5 |
"net" |
| 6 | 6 |
"sync" |
| 7 |
+ |
|
| 8 |
+ "github.com/docker/libnetwork/common" |
|
| 7 | 9 |
) |
| 8 | 10 |
|
| 9 | 11 |
var ( |
| ... | ... |
@@ -48,17 +50,49 @@ type service struct {
|
| 48 | 48 |
// Service aliases |
| 49 | 49 |
aliases []string |
| 50 | 50 |
|
| 51 |
+ // This maps tracks for each IP address the list of endpoints ID |
|
| 52 |
+ // associated with it. At stable state the endpoint ID expected is 1 |
|
| 53 |
+ // but during transition and service change it is possible to have |
|
| 54 |
+ // temporary more than 1 |
|
| 55 |
+ ipToEndpoint common.SetMatrix |
|
| 56 |
+ |
|
| 57 |
+ deleted bool |
|
| 58 |
+ |
|
| 51 | 59 |
sync.Mutex |
| 52 | 60 |
} |
| 53 | 61 |
|
| 62 |
+// assignIPToEndpoint inserts the mapping between the IP and the endpoint identifier |
|
| 63 |
+// returns true if the mapping was not present, false otherwise |
|
| 64 |
+// returns also the number of endpoints associated to the IP |
|
| 65 |
+func (s *service) assignIPToEndpoint(ip, eID string) (bool, int) {
|
|
| 66 |
+ return s.ipToEndpoint.Insert(ip, eID) |
|
| 67 |
+} |
|
| 68 |
+ |
|
| 69 |
+// removeIPToEndpoint removes the mapping between the IP and the endpoint identifier |
|
| 70 |
+// returns true if the mapping was deleted, false otherwise |
|
| 71 |
+// returns also the number of endpoints associated to the IP |
|
| 72 |
+func (s *service) removeIPToEndpoint(ip, eID string) (bool, int) {
|
|
| 73 |
+ return s.ipToEndpoint.Remove(ip, eID) |
|
| 74 |
+} |
|
| 75 |
+ |
|
| 76 |
+func (s *service) printIPToEndpoint(ip string) (string, bool) {
|
|
| 77 |
+ return s.ipToEndpoint.String(ip) |
|
| 78 |
+} |
|
| 79 |
+ |
|
| 54 | 80 |
type loadBalancer struct {
|
| 55 | 81 |
vip net.IP |
| 56 | 82 |
fwMark uint32 |
| 57 | 83 |
|
| 58 | 84 |
// Map of backend IPs backing this loadbalancer on this |
| 59 | 85 |
// network. It is keyed with endpoint ID. |
| 60 |
- backEnds map[string]net.IP |
|
| 86 |
+ backEnds map[string]loadBalancerBackend |
|
| 61 | 87 |
|
| 62 | 88 |
// Back pointer to service to which the loadbalancer belongs. |
| 63 | 89 |
service *service |
| 64 | 90 |
} |
| 91 |
+ |
|
| 92 |
+type loadBalancerBackend struct {
|
|
| 93 |
+ ip net.IP |
|
| 94 |
+ containerName string |
|
| 95 |
+ taskAliases []string |
|
| 96 |
+} |
| ... | ... |
@@ -6,15 +6,126 @@ import ( |
| 6 | 6 |
"net" |
| 7 | 7 |
|
| 8 | 8 |
"github.com/Sirupsen/logrus" |
| 9 |
+ "github.com/docker/libnetwork/common" |
|
| 9 | 10 |
) |
| 10 | 11 |
|
| 11 |
-func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service {
|
|
| 12 |
+func (c *controller) addEndpointNameResolution(svcName, svcID, nID, eID, containerName string, vip net.IP, serviceAliases, taskAliases []string, ip net.IP, addService bool, method string) error {
|
|
| 13 |
+ n, err := c.NetworkByID(nID) |
|
| 14 |
+ if err != nil {
|
|
| 15 |
+ return err |
|
| 16 |
+ } |
|
| 17 |
+ |
|
| 18 |
+ logrus.Debugf("addEndpointNameResolution %s %s add_service:%t", eID, svcName, addService)
|
|
| 19 |
+ |
|
| 20 |
+ // Add container resolution mappings |
|
| 21 |
+ c.addContainerNameResolution(nID, eID, containerName, taskAliases, ip, method) |
|
| 22 |
+ |
|
| 23 |
+ // Add endpoint IP to special "tasks.svc_name" so that the applications have access to DNS RR. |
|
| 24 |
+ n.(*network).addSvcRecords(eID, "tasks."+svcName, ip, nil, false, method) |
|
| 25 |
+ for _, alias := range serviceAliases {
|
|
| 26 |
+ n.(*network).addSvcRecords(eID, "tasks."+alias, ip, nil, false, method) |
|
| 27 |
+ } |
|
| 28 |
+ |
|
| 29 |
+ // Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR |
|
| 30 |
+ if len(vip) == 0 {
|
|
| 31 |
+ n.(*network).addSvcRecords(eID, svcName, ip, nil, false, method) |
|
| 32 |
+ for _, alias := range serviceAliases {
|
|
| 33 |
+ n.(*network).addSvcRecords(eID, alias, ip, nil, false, method) |
|
| 34 |
+ } |
|
| 35 |
+ } |
|
| 36 |
+ |
|
| 37 |
+ if addService && len(vip) != 0 {
|
|
| 38 |
+ n.(*network).addSvcRecords(eID, svcName, vip, nil, false, method) |
|
| 39 |
+ for _, alias := range serviceAliases {
|
|
| 40 |
+ n.(*network).addSvcRecords(eID, alias, vip, nil, false, method) |
|
| 41 |
+ } |
|
| 42 |
+ } |
|
| 43 |
+ |
|
| 44 |
+ return nil |
|
| 45 |
+} |
|
| 46 |
+ |
|
| 47 |
+func (c *controller) addContainerNameResolution(nID, eID, containerName string, taskAliases []string, ip net.IP, method string) error {
|
|
| 48 |
+ n, err := c.NetworkByID(nID) |
|
| 49 |
+ if err != nil {
|
|
| 50 |
+ return err |
|
| 51 |
+ } |
|
| 52 |
+ logrus.Debugf("addContainerNameResolution %s %s", eID, containerName)
|
|
| 53 |
+ |
|
| 54 |
+ // Add resolution for container name |
|
| 55 |
+ n.(*network).addSvcRecords(eID, containerName, ip, nil, true, method) |
|
| 56 |
+ |
|
| 57 |
+ // Add resolution for taskaliases |
|
| 58 |
+ for _, alias := range taskAliases {
|
|
| 59 |
+ n.(*network).addSvcRecords(eID, alias, ip, nil, true, method) |
|
| 60 |
+ } |
|
| 61 |
+ |
|
| 62 |
+ return nil |
|
| 63 |
+} |
|
| 64 |
+ |
|
| 65 |
+func (c *controller) deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName string, vip net.IP, serviceAliases, taskAliases []string, ip net.IP, rmService, multipleEntries bool, method string) error {
|
|
| 66 |
+ n, err := c.NetworkByID(nID) |
|
| 67 |
+ if err != nil {
|
|
| 68 |
+ return err |
|
| 69 |
+ } |
|
| 70 |
+ |
|
| 71 |
+ logrus.Debugf("deleteEndpointNameResolution %s %s rm_service:%t suppress:%t", eID, svcName, rmService, multipleEntries)
|
|
| 72 |
+ |
|
| 73 |
+ // Delete container resolution mappings |
|
| 74 |
+ c.delContainerNameResolution(nID, eID, containerName, taskAliases, ip, method) |
|
| 75 |
+ |
|
| 76 |
+ // Delete the special "tasks.svc_name" backend record. |
|
| 77 |
+ if !multipleEntries {
|
|
| 78 |
+ n.(*network).deleteSvcRecords(eID, "tasks."+svcName, ip, nil, false, method) |
|
| 79 |
+ for _, alias := range serviceAliases {
|
|
| 80 |
+ n.(*network).deleteSvcRecords(eID, "tasks."+alias, ip, nil, false, method) |
|
| 81 |
+ } |
|
| 82 |
+ } |
|
| 83 |
+ |
|
| 84 |
+ // If we are doing DNS RR delete the endpoint IP from DNS record right away. |
|
| 85 |
+ if !multipleEntries && len(vip) == 0 {
|
|
| 86 |
+ n.(*network).deleteSvcRecords(eID, svcName, ip, nil, false, method) |
|
| 87 |
+ for _, alias := range serviceAliases {
|
|
| 88 |
+ n.(*network).deleteSvcRecords(eID, alias, ip, nil, false, method) |
|
| 89 |
+ } |
|
| 90 |
+ } |
|
| 91 |
+ |
|
| 92 |
+ // Remove the DNS record for VIP only if we are removing the service |
|
| 93 |
+ if rmService && len(vip) != 0 && !multipleEntries {
|
|
| 94 |
+ n.(*network).deleteSvcRecords(eID, svcName, vip, nil, false, method) |
|
| 95 |
+ for _, alias := range serviceAliases {
|
|
| 96 |
+ n.(*network).deleteSvcRecords(eID, alias, vip, nil, false, method) |
|
| 97 |
+ } |
|
| 98 |
+ } |
|
| 99 |
+ |
|
| 100 |
+ return nil |
|
| 101 |
+} |
|
| 102 |
+ |
|
| 103 |
+func (c *controller) delContainerNameResolution(nID, eID, containerName string, taskAliases []string, ip net.IP, method string) error {
|
|
| 104 |
+ n, err := c.NetworkByID(nID) |
|
| 105 |
+ if err != nil {
|
|
| 106 |
+ return err |
|
| 107 |
+ } |
|
| 108 |
+ logrus.Debugf("delContainerNameResolution %s %s", eID, containerName)
|
|
| 109 |
+ |
|
| 110 |
+ // Delete resolution for container name |
|
| 111 |
+ n.(*network).deleteSvcRecords(eID, containerName, ip, nil, true, method) |
|
| 112 |
+ |
|
| 113 |
+ // Delete resolution for taskaliases |
|
| 114 |
+ for _, alias := range taskAliases {
|
|
| 115 |
+ n.(*network).deleteSvcRecords(eID, alias, ip, nil, true, method) |
|
| 116 |
+ } |
|
| 117 |
+ |
|
| 118 |
+ return nil |
|
| 119 |
+} |
|
| 120 |
+ |
|
| 121 |
+func newService(name string, id string, ingressPorts []*PortConfig, serviceAliases []string) *service {
|
|
| 12 | 122 |
return &service{
|
| 13 | 123 |
name: name, |
| 14 | 124 |
id: id, |
| 15 | 125 |
ingressPorts: ingressPorts, |
| 16 | 126 |
loadBalancers: make(map[string]*loadBalancer), |
| 17 |
- aliases: aliases, |
|
| 127 |
+ aliases: serviceAliases, |
|
| 128 |
+ ipToEndpoint: common.NewSetMatrix(), |
|
| 18 | 129 |
} |
| 19 | 130 |
} |
| 20 | 131 |
|
| ... | ... |
@@ -50,21 +161,26 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) {
|
| 50 | 50 |
|
| 51 | 51 |
for _, s := range services {
|
| 52 | 52 |
s.Lock() |
| 53 |
+ // Skip the serviceBindings that got deleted |
|
| 54 |
+ if s.deleted {
|
|
| 55 |
+ s.Unlock() |
|
| 56 |
+ continue |
|
| 57 |
+ } |
|
| 53 | 58 |
for nid, lb := range s.loadBalancers {
|
| 54 | 59 |
if cleanupNID != "" && nid != cleanupNID {
|
| 55 | 60 |
continue |
| 56 | 61 |
} |
| 57 | 62 |
|
| 58 |
- for eid, ip := range lb.backEnds {
|
|
| 63 |
+ for eid, be := range lb.backEnds {
|
|
| 59 | 64 |
service := s |
| 60 | 65 |
loadBalancer := lb |
| 61 | 66 |
networkID := nid |
| 62 | 67 |
epID := eid |
| 63 |
- epIP := ip |
|
| 68 |
+ epIP := be.ip |
|
| 64 | 69 |
|
| 65 | 70 |
cleanupFuncs = append(cleanupFuncs, func() {
|
| 66 |
- if err := c.rmServiceBinding(service.name, service.id, networkID, epID, loadBalancer.vip, |
|
| 67 |
- service.ingressPorts, service.aliases, epIP); err != nil {
|
|
| 71 |
+ if err := c.rmServiceBinding(service.name, service.id, networkID, epID, be.containerName, loadBalancer.vip, |
|
| 72 |
+ service.ingressPorts, service.aliases, be.taskAliases, epIP, "cleanupServiceBindings"); err != nil {
|
|
| 68 | 73 |
logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v",
|
| 69 | 74 |
service.id, networkID, epID, err) |
| 70 | 75 |
} |
| ... | ... |
@@ -80,67 +196,72 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) {
|
| 80 | 80 |
|
| 81 | 81 |
} |
| 82 | 82 |
|
| 83 |
-func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error {
|
|
| 84 |
- n, err := c.NetworkByID(nid) |
|
| 83 |
+func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases, taskAliases []string, ip net.IP, method string) error {
|
|
| 84 |
+ var addService bool |
|
| 85 |
+ |
|
| 86 |
+ n, err := c.NetworkByID(nID) |
|
| 85 | 87 |
if err != nil {
|
| 86 | 88 |
return err |
| 87 | 89 |
} |
| 88 | 90 |
|
| 89 | 91 |
skey := serviceKey{
|
| 90 |
- id: sid, |
|
| 92 |
+ id: svcID, |
|
| 91 | 93 |
ports: portConfigs(ingressPorts).String(), |
| 92 | 94 |
} |
| 93 | 95 |
|
| 94 |
- c.Lock() |
|
| 95 |
- s, ok := c.serviceBindings[skey] |
|
| 96 |
- if !ok {
|
|
| 97 |
- // Create a new service if we are seeing this service |
|
| 98 |
- // for the first time. |
|
| 99 |
- s = newService(name, sid, ingressPorts, aliases) |
|
| 100 |
- c.serviceBindings[skey] = s |
|
| 101 |
- } |
|
| 102 |
- c.Unlock() |
|
| 103 |
- |
|
| 104 |
- // Add endpoint IP to special "tasks.svc_name" so that the |
|
| 105 |
- // applications have access to DNS RR. |
|
| 106 |
- n.(*network).addSvcRecords("tasks."+name, ip, nil, false)
|
|
| 107 |
- for _, alias := range aliases {
|
|
| 108 |
- n.(*network).addSvcRecords("tasks."+alias, ip, nil, false)
|
|
| 109 |
- } |
|
| 110 |
- |
|
| 111 |
- // Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR |
|
| 112 |
- svcIP := vip |
|
| 113 |
- if len(svcIP) == 0 {
|
|
| 114 |
- svcIP = ip |
|
| 115 |
- } |
|
| 116 |
- n.(*network).addSvcRecords(name, svcIP, nil, false) |
|
| 117 |
- for _, alias := range aliases {
|
|
| 118 |
- n.(*network).addSvcRecords(alias, svcIP, nil, false) |
|
| 96 |
+ var s *service |
|
| 97 |
+ for {
|
|
| 98 |
+ c.Lock() |
|
| 99 |
+ var ok bool |
|
| 100 |
+ s, ok = c.serviceBindings[skey] |
|
| 101 |
+ if !ok {
|
|
| 102 |
+ // Create a new service if we are seeing this service |
|
| 103 |
+ // for the first time. |
|
| 104 |
+ s = newService(svcName, svcID, ingressPorts, serviceAliases) |
|
| 105 |
+ c.serviceBindings[skey] = s |
|
| 106 |
+ } |
|
| 107 |
+ c.Unlock() |
|
| 108 |
+ s.Lock() |
|
| 109 |
+ if !s.deleted {
|
|
| 110 |
+ // ok the object is good to be used |
|
| 111 |
+ break |
|
| 112 |
+ } |
|
| 113 |
+ s.Unlock() |
|
| 119 | 114 |
} |
| 115 |
+ logrus.Debugf("addServiceBinding from %s START for %s %s", method, svcName, eID)
|
|
| 120 | 116 |
|
| 121 |
- s.Lock() |
|
| 122 | 117 |
defer s.Unlock() |
| 123 | 118 |
|
| 124 |
- lb, ok := s.loadBalancers[nid] |
|
| 119 |
+ lb, ok := s.loadBalancers[nID] |
|
| 125 | 120 |
if !ok {
|
| 126 | 121 |
// Create a new load balancer if we are seeing this |
| 127 | 122 |
// network attachment on the service for the first |
| 128 | 123 |
// time. |
| 124 |
+ fwMarkCtrMu.Lock() |
|
| 125 |
+ |
|
| 129 | 126 |
lb = &loadBalancer{
|
| 130 | 127 |
vip: vip, |
| 131 | 128 |
fwMark: fwMarkCtr, |
| 132 |
- backEnds: make(map[string]net.IP), |
|
| 129 |
+ backEnds: make(map[string]loadBalancerBackend), |
|
| 133 | 130 |
service: s, |
| 134 | 131 |
} |
| 135 | 132 |
|
| 136 |
- fwMarkCtrMu.Lock() |
|
| 137 | 133 |
fwMarkCtr++ |
| 138 | 134 |
fwMarkCtrMu.Unlock() |
| 139 | 135 |
|
| 140 |
- s.loadBalancers[nid] = lb |
|
| 136 |
+ s.loadBalancers[nID] = lb |
|
| 137 |
+ addService = true |
|
| 141 | 138 |
} |
| 142 | 139 |
|
| 143 |
- lb.backEnds[eid] = ip |
|
| 140 |
+ lb.backEnds[eID] = loadBalancerBackend{ip: ip,
|
|
| 141 |
+ containerName: containerName, |
|
| 142 |
+ taskAliases: taskAliases} |
|
| 143 |
+ |
|
| 144 |
+ ok, entries := s.assignIPToEndpoint(ip.String(), eID) |
|
| 145 |
+ if !ok || entries > 1 {
|
|
| 146 |
+ setStr, b := s.printIPToEndpoint(ip.String()) |
|
| 147 |
+ logrus.Warnf("addServiceBinding %s possible trainsient state ok:%t entries:%d set:%t %s", eID, ok, entries, b, setStr)
|
|
| 148 |
+ } |
|
| 144 | 149 |
|
| 145 | 150 |
// Add loadbalancer service and backend in all sandboxes in |
| 146 | 151 |
// the network only if vip is valid. |
| ... | ... |
@@ -148,89 +269,87 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i |
| 148 | 148 |
n.(*network).addLBBackend(ip, vip, lb.fwMark, ingressPorts) |
| 149 | 149 |
} |
| 150 | 150 |
|
| 151 |
+ // Add the appropriate name resolutions |
|
| 152 |
+ c.addEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, addService, "addServiceBinding") |
|
| 153 |
+ |
|
| 154 |
+ logrus.Debugf("addServiceBinding from %s END for %s %s", method, svcName, eID)
|
|
| 155 |
+ |
|
| 151 | 156 |
return nil |
| 152 | 157 |
} |
| 153 | 158 |
|
| 154 |
-func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error {
|
|
| 159 |
+func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string) error {
|
|
| 160 |
+ |
|
| 155 | 161 |
var rmService bool |
| 156 | 162 |
|
| 157 |
- n, err := c.NetworkByID(nid) |
|
| 163 |
+ n, err := c.NetworkByID(nID) |
|
| 158 | 164 |
if err != nil {
|
| 159 | 165 |
return err |
| 160 | 166 |
} |
| 161 | 167 |
|
| 162 | 168 |
skey := serviceKey{
|
| 163 |
- id: sid, |
|
| 169 |
+ id: svcID, |
|
| 164 | 170 |
ports: portConfigs(ingressPorts).String(), |
| 165 | 171 |
} |
| 166 | 172 |
|
| 167 | 173 |
c.Lock() |
| 168 | 174 |
s, ok := c.serviceBindings[skey] |
| 169 | 175 |
c.Unlock() |
| 176 |
+ logrus.Debugf("rmServiceBinding from %s START for %s %s", method, svcName, eID)
|
|
| 170 | 177 |
if !ok {
|
| 178 |
+ logrus.Warnf("rmServiceBinding %s %s %s aborted c.serviceBindings[skey] !ok", method, svcName, eID)
|
|
| 171 | 179 |
return nil |
| 172 | 180 |
} |
| 173 | 181 |
|
| 174 | 182 |
s.Lock() |
| 175 |
- lb, ok := s.loadBalancers[nid] |
|
| 183 |
+ defer s.Unlock() |
|
| 184 |
+ lb, ok := s.loadBalancers[nID] |
|
| 176 | 185 |
if !ok {
|
| 177 |
- s.Unlock() |
|
| 186 |
+ logrus.Warnf("rmServiceBinding %s %s %s aborted s.loadBalancers[nid] !ok", method, svcName, eID)
|
|
| 178 | 187 |
return nil |
| 179 | 188 |
} |
| 180 | 189 |
|
| 181 |
- _, ok = lb.backEnds[eid] |
|
| 190 |
+ _, ok = lb.backEnds[eID] |
|
| 182 | 191 |
if !ok {
|
| 183 |
- s.Unlock() |
|
| 192 |
+ logrus.Warnf("rmServiceBinding %s %s %s aborted lb.backEnds[eid] !ok", method, svcName, eID)
|
|
| 184 | 193 |
return nil |
| 185 | 194 |
} |
| 186 | 195 |
|
| 187 |
- delete(lb.backEnds, eid) |
|
| 196 |
+ delete(lb.backEnds, eID) |
|
| 188 | 197 |
if len(lb.backEnds) == 0 {
|
| 189 | 198 |
// All the backends for this service have been |
| 190 | 199 |
// removed. Time to remove the load balancer and also |
| 191 | 200 |
// remove the service entry in IPVS. |
| 192 | 201 |
rmService = true |
| 193 | 202 |
|
| 194 |
- delete(s.loadBalancers, nid) |
|
| 203 |
+ delete(s.loadBalancers, nID) |
|
| 195 | 204 |
} |
| 196 | 205 |
|
| 197 | 206 |
if len(s.loadBalancers) == 0 {
|
| 198 | 207 |
// All loadbalancers for the service removed. Time to |
| 199 | 208 |
// remove the service itself. |
| 200 | 209 |
c.Lock() |
| 210 |
+ |
|
| 211 |
+ // Mark the object as deleted so that the add won't use it wrongly |
|
| 212 |
+ s.deleted = true |
|
| 201 | 213 |
delete(c.serviceBindings, skey) |
| 202 | 214 |
c.Unlock() |
| 203 | 215 |
} |
| 204 | 216 |
|
| 217 |
+ ok, entries := s.removeIPToEndpoint(ip.String(), eID) |
|
| 218 |
+ if !ok || entries > 0 {
|
|
| 219 |
+ setStr, b := s.printIPToEndpoint(ip.String()) |
|
| 220 |
+ logrus.Warnf("rmServiceBinding %s possible trainsient state ok:%t entries:%d set:%t %s", eID, ok, entries, b, setStr)
|
|
| 221 |
+ } |
|
| 222 |
+ |
|
| 205 | 223 |
// Remove loadbalancer service(if needed) and backend in all |
| 206 | 224 |
// sandboxes in the network only if the vip is valid. |
| 207 |
- if len(vip) != 0 {
|
|
| 225 |
+ if len(vip) != 0 && entries == 0 {
|
|
| 208 | 226 |
n.(*network).rmLBBackend(ip, vip, lb.fwMark, ingressPorts, rmService) |
| 209 | 227 |
} |
| 210 |
- s.Unlock() |
|
| 211 | 228 |
|
| 212 |
- // Delete the special "tasks.svc_name" backend record. |
|
| 213 |
- n.(*network).deleteSvcRecords("tasks."+name, ip, nil, false)
|
|
| 214 |
- for _, alias := range aliases {
|
|
| 215 |
- n.(*network).deleteSvcRecords("tasks."+alias, ip, nil, false)
|
|
| 216 |
- } |
|
| 217 |
- |
|
| 218 |
- // If we are doing DNS RR add the endpoint IP to DNS record |
|
| 219 |
- // right away. |
|
| 220 |
- if len(vip) == 0 {
|
|
| 221 |
- n.(*network).deleteSvcRecords(name, ip, nil, false) |
|
| 222 |
- for _, alias := range aliases {
|
|
| 223 |
- n.(*network).deleteSvcRecords(alias, ip, nil, false) |
|
| 224 |
- } |
|
| 225 |
- } |
|
| 226 |
- |
|
| 227 |
- // Remove the DNS record for VIP only if we are removing the service |
|
| 228 |
- if rmService && len(vip) != 0 {
|
|
| 229 |
- n.(*network).deleteSvcRecords(name, vip, nil, false) |
|
| 230 |
- for _, alias := range aliases {
|
|
| 231 |
- n.(*network).deleteSvcRecords(alias, vip, nil, false) |
|
| 232 |
- } |
|
| 233 |
- } |
|
| 229 |
+ // Delete the name resolutions |
|
| 230 |
+ c.deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, rmService, entries > 0, "rmServiceBinding") |
|
| 234 | 231 |
|
| 232 |
+ logrus.Debugf("rmServiceBinding from %s END for %s %s", method, svcName, eID)
|
|
| 235 | 233 |
return nil |
| 236 | 234 |
} |
| ... | ... |
@@ -44,6 +44,11 @@ func (n *network) connectedLoadbalancers() []*loadBalancer {
|
| 44 | 44 |
var lbs []*loadBalancer |
| 45 | 45 |
for _, s := range serviceBindings {
|
| 46 | 46 |
s.Lock() |
| 47 |
+ // Skip the serviceBindings that got deleted |
|
| 48 |
+ if s.deleted {
|
|
| 49 |
+ s.Unlock() |
|
| 50 |
+ continue |
|
| 51 |
+ } |
|
| 47 | 52 |
if lb, ok := s.loadBalancers[n.ID()]; ok {
|
| 48 | 53 |
lbs = append(lbs, lb) |
| 49 | 54 |
} |
| ... | ... |
@@ -97,8 +102,8 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
|
| 97 | 97 |
} |
| 98 | 98 |
|
| 99 | 99 |
lb.service.Lock() |
| 100 |
- for _, ip := range lb.backEnds {
|
|
| 101 |
- sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress) |
|
| 100 |
+ for _, l := range lb.backEnds {
|
|
| 101 |
+ sb.addLBBackend(l.ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress) |
|
| 102 | 102 |
} |
| 103 | 103 |
lb.service.Unlock() |
| 104 | 104 |
} |