Browse code

libnetwork: split programIngress() and dependent functions on Add and Del functions

- refactor programIngressPorts to use Rule.Insert/Append/Delete for improved rule management
- split programIngress() and dependent functions on Add and Del functions

Signed-off-by: Andrey Epifanov <aepifanov@mirantis.com>

Andrey Epifanov authored on 2025/06/02 20:55:28
Showing 1 changed files
... ...
@@ -127,7 +127,7 @@ func (n *Network) addLBBackend(ip net.IP, lb *loadBalancer) {
127 127
 			if gwEP, _ := sb.getGatewayEndpoint(); gwEP != nil {
128 128
 				gwIP = gwEP.Iface().Address().IP
129 129
 			}
130
-			if err := programIngress(gwIP, lb.service.ingressPorts, false); err != nil {
130
+			if err := addIngressPorts(gwIP, lb.service.ingressPorts); err != nil {
131 131
 				log.G(context.TODO()).Errorf("Failed to add ingress: %v", err)
132 132
 				return
133 133
 			}
... ...
@@ -230,7 +230,7 @@ func (n *Network) rmLBBackend(ip net.IP, lb *loadBalancer, rmService bool, fullR
230 230
 			if gwEP, _ := sb.getGatewayEndpoint(); gwEP != nil {
231 231
 				gwIP = gwEP.Iface().Address().IP
232 232
 			}
233
-			if err := programIngress(gwIP, lb.service.ingressPorts, true); err != nil {
233
+			if err := removeIngressPorts(gwIP, lb.service.ingressPorts); err != nil {
234 234
 				log.G(context.TODO()).Errorf("Failed to delete ingress: %v", err)
235 235
 			}
236 236
 		}
... ...
@@ -360,47 +360,84 @@ func initIngressConfiguration(gwIP net.IP, iptable *iptables.IPTable) error {
360 360
 	return nil
361 361
 }
362 362
 
363
-func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) error {
363
+func removeIngressPorts(gwIP net.IP, ingressPorts []*PortConfig) error {
364
+	// TODO IPv6 support
365
+
366
+	ingressMu.Lock()
367
+	defer ingressMu.Unlock()
368
+
369
+	// Filter the ingress ports until port rules start to be added/deleted
370
+	filteredPorts := filterPortConfigs(ingressPorts, true)
371
+
372
+	if err := deleteIngressPortsRules(gwIP, filteredPorts); err != nil {
373
+		filterPortConfigs(ingressPorts, false)
374
+		return fmt.Errorf("failed to program ingress ports: %v", err)
375
+	}
376
+
377
+	closeIngressPortsProxy(filteredPorts)
378
+
379
+	return nil
380
+}
381
+
382
+func addIngressPorts(gwIP net.IP, ingressPorts []*PortConfig) error {
364 383
 	// TODO IPv6 support
365 384
 	iptable := iptables.GetIptable(iptables.IPv4)
366 385
 
367 386
 	ingressMu.Lock()
368 387
 	defer ingressMu.Unlock()
369 388
 
370
-	if !isDelete {
371
-		if err := initIngressConfiguration(gwIP, iptable); err != nil {
372
-			return err
373
-		}
389
+	if err := initIngressConfiguration(gwIP, iptable); err != nil {
390
+		return err
374 391
 	}
375 392
 
376 393
 	// Filter the ingress ports until port rules start to be added/deleted
377
-	filteredPorts := filterPortConfigs(ingressPorts, isDelete)
394
+	filteredPorts := filterPortConfigs(ingressPorts, false)
378 395
 
379
-	if err := programIngressPorts(gwIP, filteredPorts, iptable, isDelete); err != nil {
396
+	if err := programIngressPortsRules(gwIP, filteredPorts); err != nil {
397
+		filterPortConfigs(filteredPorts, true)
380 398
 		return fmt.Errorf("failed to program ingress ports: %v", err)
381 399
 	}
382 400
 
383
-	plumbIngressPortsProxy(filteredPorts, isDelete)
401
+	plumbIngressPortsProxy(filteredPorts)
384 402
 
385 403
 	return nil
386 404
 }
387 405
 
388
-func programIngressPorts(gwIP net.IP, filteredPorts []*PortConfig, iptable *iptables.IPTable, isDelete bool) error {
389
-
390
-	addDelOpt := "-I"
391
-	rollbackAddDelOpt := "-D"
392
-	if isDelete {
393
-		addDelOpt = "-D"
394
-		rollbackAddDelOpt = "-I"
406
+func generateIngressRules(port *PortConfig, destIP net.IP) []iptables.Rule {
407
+	var (
408
+		protocol      = strings.ToLower(port.Protocol.String())
409
+		publishedPort = strconv.FormatUint(uint64(port.PublishedPort), 10)
410
+		destination   = net.JoinHostPort(destIP.String(), publishedPort)
411
+	)
412
+	return []iptables.Rule{
413
+		{
414
+			IPVer: iptables.IPv4,
415
+			Table: iptables.Nat,
416
+			Chain: ingressChain,
417
+			Args:  []string{"-p", protocol, "--dport", publishedPort, "-j", "DNAT", "--to-destination", destination},
418
+		},
419
+		{
420
+			IPVer: iptables.IPv4,
421
+			Table: iptables.Filter,
422
+			Chain: ingressChain,
423
+			Args:  []string{"-p", protocol, "--sport", publishedPort, "-m", "conntrack", "--ctstate", "ESTABLISHED,RELATED", "-j", "ACCEPT"},
424
+		},
425
+		{
426
+			IPVer: iptables.IPv4,
427
+			Table: iptables.Filter,
428
+			Chain: ingressChain,
429
+			Args:  []string{"-p", protocol, "--dport", publishedPort, "-j", "ACCEPT"},
430
+		},
395 431
 	}
432
+}
396 433
 
397
-	rollbackRules := make([][]string, 0, len(filteredPorts)*3)
398
-	var portErr error
434
+func programIngressPortsRules(gwIP net.IP, filteredPorts []*PortConfig) (portErr error) {
435
+
436
+	rollbackRules := make([]iptables.Rule, 0, len(filteredPorts)*3)
399 437
 	defer func() {
400
-		if portErr != nil && !isDelete {
401
-			filterPortConfigs(filteredPorts, !isDelete)
438
+		if portErr != nil {
402 439
 			for _, rule := range rollbackRules {
403
-				if err := iptable.RawCombinedOutput(rule...); err != nil {
440
+				if err := rule.Delete(); err != nil {
404 441
 					log.G(context.TODO()).Warnf("roll back rule failed, %v: %v", rule, err)
405 442
 				}
406 443
 			}
... ...
@@ -408,62 +445,33 @@ func programIngressPorts(gwIP net.IP, filteredPorts []*PortConfig, iptable *ipta
408 408
 	}()
409 409
 
410 410
 	for _, iPort := range filteredPorts {
411
-		var (
412
-			protocol      = strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)])
413
-			publishedPort = strconv.FormatUint(uint64(iPort.PublishedPort), 10)
414
-			destination   = net.JoinHostPort(gwIP.String(), publishedPort)
415
-		)
416
-		if iptable.ExistChain(ingressChain, iptables.Nat) {
417
-			rule := []string{"-t", "nat", addDelOpt, ingressChain, "-p", protocol, "--dport", publishedPort, "-j", "DNAT", "--to-destination", destination}
418 411
 
419
-			if portErr = iptable.RawCombinedOutput(rule...); portErr != nil {
412
+		for _, rule := range generateIngressRules(iPort, gwIP) {
413
+			if portErr = rule.Insert(); portErr != nil {
420 414
 				err := fmt.Errorf("set up rule failed, %v: %v", rule, portErr)
421
-				if !isDelete {
422
-					return err
423
-				}
424
-				log.G(context.TODO()).Info(err)
425
-			}
426
-			rollbackRule := []string{"-t", "nat", rollbackAddDelOpt, ingressChain, "-p", protocol, "--dport", publishedPort, "-j", "DNAT", "--to-destination", destination}
427
-			rollbackRules = append(rollbackRules, rollbackRule)
428
-		}
429
-
430
-		// Filter table rules to allow a published service to be accessible in the local node from..
431
-		// 1) service tasks attached to other networks
432
-		// 2) unmanaged containers on bridge networks
433
-		rule := []string{addDelOpt, ingressChain, "-p", protocol, "--sport", publishedPort, "-m", "conntrack", "--ctstate", "ESTABLISHED,RELATED", "-j", "ACCEPT"}
434
-		if portErr = iptable.RawCombinedOutput(rule...); portErr != nil {
435
-			err := fmt.Errorf("set up rule failed, %v: %v", rule, portErr)
436
-			if !isDelete {
437
-				return err
438
-			}
439
-			log.G(context.TODO()).Warn(err)
440
-		}
441
-		rollbackRule := []string{rollbackAddDelOpt, ingressChain, "-p", protocol, "--sport", publishedPort, "-m", "conntrack", "--ctstate", "ESTABLISHED,RELATED", "-j", "ACCEPT"}
442
-		rollbackRules = append(rollbackRules, rollbackRule)
443
-
444
-		rule = []string{addDelOpt, ingressChain, "-p", protocol, "--dport", publishedPort, "-j", "ACCEPT"}
445
-		if portErr = iptable.RawCombinedOutput(rule...); portErr != nil {
446
-			err := fmt.Errorf("set up rule failed, %v: %v", rule, portErr)
447
-			if !isDelete {
448 415
 				return err
449 416
 			}
450
-			log.G(context.TODO()).Warn(err)
417
+			rollbackRules = append(rollbackRules, rule)
451 418
 		}
452
-		rollbackRule = []string{rollbackAddDelOpt, ingressChain, "-p", protocol, "--dport", publishedPort, "-j", "ACCEPT"}
453
-		rollbackRules = append(rollbackRules, rollbackRule)
454
-
455 419
 	}
456 420
 
457 421
 	return nil
458 422
 }
459 423
 
460
-func plumbIngressPortsProxy(ingressPorts []*PortConfig, isDelete bool) {
461
-	for _, iPort := range ingressPorts {
462
-		publishedPort := strconv.FormatUint(uint64(iPort.PublishedPort), 10)
463
-		if err := plumbProxy(iPort, isDelete); err != nil {
464
-			log.G(context.TODO()).Warnf("failed to create proxy for port %s: %v", publishedPort, err)
424
+func deleteIngressPortsRules(gwIP net.IP, filteredPorts []*PortConfig) error {
425
+
426
+	var portErr error
427
+
428
+	for _, iPort := range filteredPorts {
429
+		for _, rule := range generateIngressRules(iPort, gwIP) {
430
+			if portErr = rule.Delete(); portErr != nil {
431
+				err := fmt.Errorf("delete rule failed, %v: %v", rule, portErr)
432
+				log.G(context.TODO()).Warn(err)
433
+			}
465 434
 		}
466 435
 	}
436
+
437
+	return nil
467 438
 }
468 439
 
469 440
 func findOIFName(ip net.IP) (string, error) {
... ...
@@ -488,45 +496,51 @@ func findOIFName(ip net.IP) (string, error) {
488 488
 	return link.Attrs().Name, nil
489 489
 }
490 490
 
491
-func plumbProxy(iPort *PortConfig, isDelete bool) error {
492
-	var (
493
-		err error
494
-		l   io.Closer
495
-	)
491
+func closeIngressPortsProxy(ingressPorts []*PortConfig) {
492
+	for _, iPort := range ingressPorts {
493
+		portSpec := fmt.Sprintf("%d/%s", iPort.PublishedPort, strings.ToLower(iPort.Protocol.String()))
494
+		listener, ok := ingressProxyTbl[portSpec]
495
+		if !ok {
496
+			continue
497
+		}
496 498
 
497
-	portSpec := fmt.Sprintf("%d/%s", iPort.PublishedPort, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]))
498
-	listener := ingressProxyTbl[portSpec]
499
-	if isDelete {
500 499
 		if listener != nil {
501 500
 			listener.Close()
502 501
 		}
503 502
 		delete(ingressProxyTbl, portSpec)
504
-
505
-		return nil
506 503
 	}
504
+}
507 505
 
508
-	if listener != nil {
509
-		return nil
510
-	}
506
+func plumbIngressPortsProxy(ingressPorts []*PortConfig) {
507
+	var (
508
+		err error
509
+		l   io.Closer
510
+	)
511 511
 
512
-	switch iPort.Protocol {
513
-	case ProtocolTCP:
514
-		l, err = net.ListenTCP("tcp", &net.TCPAddr{Port: int(iPort.PublishedPort)})
515
-	case ProtocolUDP:
516
-		l, err = net.ListenUDP("udp", &net.UDPAddr{Port: int(iPort.PublishedPort)})
517
-	case ProtocolSCTP:
518
-		l, err = sctp.ListenSCTP("sctp", &sctp.SCTPAddr{Port: int(iPort.PublishedPort)})
519
-	default:
520
-		err = fmt.Errorf("unknown protocol %v", iPort.Protocol)
521
-	}
512
+	for _, iPort := range ingressPorts {
513
+		portSpec := fmt.Sprintf("%d/%s", iPort.PublishedPort, strings.ToLower(iPort.Protocol.String()))
514
+		listener, ok := ingressProxyTbl[portSpec]
515
+		if ok && listener != nil {
516
+			continue // already listening on this port
517
+		}
522 518
 
523
-	if err != nil {
524
-		return err
525
-	}
519
+		switch iPort.Protocol {
520
+		case ProtocolTCP:
521
+			l, err = net.ListenTCP("tcp", &net.TCPAddr{Port: int(iPort.PublishedPort)})
522
+		case ProtocolUDP:
523
+			l, err = net.ListenUDP("udp", &net.UDPAddr{Port: int(iPort.PublishedPort)})
524
+		case ProtocolSCTP:
525
+			l, err = sctp.ListenSCTP("sctp", &sctp.SCTPAddr{Port: int(iPort.PublishedPort)})
526
+		default:
527
+			err = fmt.Errorf("unknown protocol %v", iPort.Protocol)
528
+		}
526 529
 
527
-	ingressProxyTbl[portSpec] = l
530
+		if err != nil {
531
+			log.G(context.TODO()).Warnf("failed to create proxy for port %s: %v", iPort, err)
532
+		}
528 533
 
529
-	return nil
534
+		ingressProxyTbl[portSpec] = l
535
+	}
530 536
 }
531 537
 
532 538
 // configureFWMark configures the sandbox firewall to mark vip destined packets