Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions pkg/datapath/linux/netdevice/netdevice.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,24 @@ func GetIfaceFirstIPv4Address(ifaceName string) (netip.Addr, error) {
}

func TestForIfaceWithIPv4Address(ip netip.Addr) error {
_, err := getIfaceWithIPv4Address(ip)
return err
}

func GetIfaceWithIPv4Address(ip netip.Addr) (string, error) {
return getIfaceWithIPv4Address(ip)
}

func getIfaceWithIPv4Address(ip netip.Addr) (string, error) {
links, err := netlink.LinkList()
if err != nil {
return err
return "", err
}

for _, l := range links {
addrs, err := netlink.AddrList(l, netlink.FAMILY_V4)
if err != nil {
return err
return "", err
}

for _, addr := range addrs {
Expand All @@ -53,10 +62,10 @@ func TestForIfaceWithIPv4Address(ip netip.Addr) error {
continue
}
if a == ip {
return nil
return l.Attrs().Name, nil
}
}
}

return fmt.Errorf("no interface with %s IPv4 assigned to", ip)
return "", fmt.Errorf("no interface with %s IPv4 assigned to", ip)
}
38 changes: 38 additions & 0 deletions pkg/egressgateway/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"k8s.io/client-go/util/workqueue"

"github.com/cilium/cilium/pkg/datapath/linux/config/defines"
"github.com/cilium/cilium/pkg/datapath/linux/sysctl"
"github.com/cilium/cilium/pkg/datapath/tables"
"github.com/cilium/cilium/pkg/datapath/tunnel"
"github.com/cilium/cilium/pkg/identity"
identityCache "github.com/cilium/cilium/pkg/identity/cache"
Expand Down Expand Up @@ -137,6 +139,8 @@ type Manager struct {
// reconciliationEventsCount keeps track of how many reconciliation
// events have occoured
reconciliationEventsCount atomic.Uint64

sysctl sysctl.Sysctl
}

type Params struct {
Expand All @@ -149,6 +153,7 @@ type Params struct {
Policies resource.Resource[*Policy]
Nodes resource.Resource[*cilium_api_v2.CiliumNode]
Endpoints resource.Resource[*k8sTypes.CiliumEndpoint]
Sysctl sysctl.Sysctl

Lifecycle cell.Lifecycle
}
Expand Down Expand Up @@ -213,6 +218,7 @@ func newEgressGatewayManager(p Params) (*Manager, error) {
policies: p.Policies,
ciliumNodes: p.Nodes,
endpoints: p.Endpoints,
sysctl: p.Sysctl,
}

t, err := trigger.NewTrigger(trigger.Parameters{
Expand Down Expand Up @@ -593,6 +599,33 @@ func (manager *Manager) regenerateGatewayConfigs() {
}
}

func (manager *Manager) relaxRPFilter() error {
var sysSettings []tables.Sysctl
ifSet := make(map[string]struct{})

for _, pc := range manager.policyConfigs {
if !pc.gatewayConfig.localNodeConfiguredAsGateway {
continue
}

ifaceName := pc.gatewayConfig.ifaceName
if _, ok := ifSet[ifaceName]; !ok {
ifSet[ifaceName] = struct{}{}
sysSettings = append(sysSettings, tables.Sysctl{
Name: fmt.Sprintf("net.ipv4.conf.%s.rp_filter", ifaceName),
Val: "2",
IgnoreErr: false,
})
}
}

if len(sysSettings) == 0 {
return nil
}

return manager.sysctl.ApplySettings(sysSettings)
}

func (manager *Manager) addMissingEgressRules() {
egressPolicies := map[egressmap.EgressPolicyKey4]egressmap.EgressPolicyVal4{}
manager.policyMap.IterateWithCallback(
Expand Down Expand Up @@ -693,6 +726,11 @@ func (manager *Manager) reconcileLocked() {

manager.regenerateGatewayConfigs()

if err := manager.relaxRPFilter(); err != nil {
manager.reconciliationTrigger.TriggerWithReason("retry after error")
return
}

// The order of the next 2 function calls matters, as by first adding missing policies and
// only then removing obsolete ones we make sure there will be no connectivity disruption
manager.addMissingEgressRules()
Expand Down
63 changes: 59 additions & 4 deletions pkg/egressgateway/manager_privileged_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import (
"github.com/cilium/ebpf/rlimit"
"github.com/cilium/hive/hivetest"
"github.com/google/uuid"
"github.com/spf13/afero"
"github.com/stretchr/testify/require"
"github.com/vishvananda/netlink"
"k8s.io/apimachinery/pkg/types"

"github.com/cilium/cilium/pkg/bpf"
"github.com/cilium/cilium/pkg/datapath/linux/sysctl"
"github.com/cilium/cilium/pkg/hive"
"github.com/cilium/cilium/pkg/identity"
cilium_api_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
Expand Down Expand Up @@ -89,11 +91,17 @@ type parsedEgressRule struct {
gatewayIP netip.Addr
}

type rpFilterSetting struct {
iFaceName string
rpFilterSetting string
}

type EgressGatewayTestSuite struct {
manager *Manager
policies fakeResource[*Policy]
nodes fakeResource[*cilium_api_v2.CiliumNode]
endpoints fakeResource[*k8sTypes.CiliumEndpoint]
sysctl sysctl.Sysctl
}

func setupEgressGatewayTestSuite(t *testing.T) *EgressGatewayTestSuite {
Expand All @@ -109,6 +117,7 @@ func setupEgressGatewayTestSuite(t *testing.T) *EgressGatewayTestSuite {
k.policies = make(fakeResource[*Policy])
k.nodes = make(fakeResource[*cilium_api_v2.CiliumNode])
k.endpoints = make(fakeResource[*k8sTypes.CiliumEndpoint])
k.sysctl = sysctl.NewDirectSysctl(afero.NewOsFs(), "/proc")

lc := hivetest.Lifecycle(t)
policyMap := egressmap.CreatePrivatePolicyMap(lc, egressmap.DefaultPolicyConfig)
Expand All @@ -122,6 +131,7 @@ func setupEgressGatewayTestSuite(t *testing.T) *EgressGatewayTestSuite {
Policies: k.policies,
Nodes: k.nodes,
Endpoints: k.endpoints,
Sysctl: k.sysctl,
})
require.NoError(t, err)
require.NotNil(t, k.manager)
Expand Down Expand Up @@ -203,8 +213,8 @@ func TestEgressGatewayCEGPParser(t *testing.T) {

func TestEgressGatewayManager(t *testing.T) {
k := setupEgressGatewayTestSuite(t)
createTestInterface(t, testInterface1, egressCIDR1)
createTestInterface(t, testInterface2, egressCIDR2)
createTestInterface(t, k.sysctl, testInterface1, egressCIDR1)
createTestInterface(t, k.sysctl, testInterface2, egressCIDR2)

policyMap := k.manager.policyMap
egressGatewayManager := k.manager
Expand Down Expand Up @@ -242,6 +252,10 @@ func TestEgressGatewayManager(t *testing.T) {
addPolicy(t, k.policies, &policy1)
reconciliationEventsCount = waitForReconciliationRun(t, egressGatewayManager, reconciliationEventsCount)

assertRPFilter(t, k.sysctl, []rpFilterSetting{
{iFaceName: testInterface1, rpFilterSetting: "2"},
{iFaceName: testInterface2, rpFilterSetting: "1"},
})
assertEgressRules(t, policyMap, []egressRule{})

// Add a new endpoint & ID which matches policy-1
Expand Down Expand Up @@ -403,7 +417,7 @@ func TestEgressGatewayManager(t *testing.T) {
func TestEndpointDataStore(t *testing.T) {
k := setupEgressGatewayTestSuite(t)

createTestInterface(t, testInterface1, egressCIDR1)
createTestInterface(t, k.sysctl, testInterface1, egressCIDR1)

policyMap := k.manager.policyMap
egressGatewayManager := k.manager
Expand Down Expand Up @@ -481,7 +495,7 @@ func TestCell(t *testing.T) {
}
}

func createTestInterface(tb testing.TB, iface string, addr string) {
func createTestInterface(tb testing.TB, sysctl sysctl.Sysctl, iface string, addr string) {
tb.Helper()

la := netlink.NewLinkAttrs()
Expand Down Expand Up @@ -510,6 +524,28 @@ func createTestInterface(tb testing.TB, iface string, addr string) {
if err := netlink.AddrAdd(link, a); err != nil {
tb.Fatal(err)
}

ensureRPFilterIsEnabled(tb, sysctl, iface)
}

func ensureRPFilterIsEnabled(tb testing.TB, sysctl sysctl.Sysctl, iface string) {
rpFilterSetting := fmt.Sprintf("net.ipv4.conf.%s.rp_filter", iface)

for i := 0; i < 10; i++ {
if err := sysctl.Enable(rpFilterSetting); err != nil {
tb.Fatal(err)
}

time.Sleep(100 * time.Millisecond)

if val, err := sysctl.Read(rpFilterSetting); err == nil {
if val == "1" {
return
}
}
}

tb.Fatal("failed to enable rp_filter")
}

func waitForReconciliationRun(tb testing.TB, egressGatewayManager *Manager, currentRun uint64) uint64 {
Expand Down Expand Up @@ -631,3 +667,22 @@ func tryAssertEgressRules(policyMap egressmap.PolicyMap, rules []egressRule) err

return nil
}

func assertRPFilter(t *testing.T, sysctl sysctl.Sysctl, rpFilterSettings []rpFilterSetting) {
t.Helper()

err := tryAssertRPFilterSettings(sysctl, rpFilterSettings)
require.NoError(t, err)
}

func tryAssertRPFilterSettings(sysctl sysctl.Sysctl, rpFilterSettings []rpFilterSetting) error {
for _, setting := range rpFilterSettings {
if val, err := sysctl.Read(fmt.Sprintf("net.ipv4.conf.%s.rp_filter", setting.iFaceName)); err != nil {
return fmt.Errorf("failed to read rp_filter")
} else if val != setting.rpFilterSetting {
return fmt.Errorf("mismatched rp_filter iface: %s rp_filter: %s", setting.iFaceName, val)
}
}

return nil
}
17 changes: 15 additions & 2 deletions pkg/egressgateway/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,17 @@ type policyGatewayConfig struct {
// that IP assigned to) or the interface (and in this case we need to find the
// first IPv4 assigned to that).
type gatewayConfig struct {
// ifaceName is the name of the interface used to SNAT traffic
ifaceName string
// egressIP is the IP used to SNAT traffic
egressIP netip.Addr
// gatewayIP is the node internal IP of the gateway
gatewayIP netip.Addr
// localNodeConfiguredAsGateway tells if the local node is configured to
// act as an egress gateway node for this config.
// This information is used to decide if it is necessary to relax the rp_filter
// on the interface used to SNAT traffic
localNodeConfiguredAsGateway bool
}

// PolicyConfig is the internal representation of CiliumEgressGatewayPolicy.
Expand Down Expand Up @@ -131,10 +138,13 @@ func (config *PolicyConfig) regenerateGatewayConfig(manager *Manager) {
func (gwc *gatewayConfig) deriveFromPolicyGatewayConfig(gc *policyGatewayConfig) error {
var err error

gwc.localNodeConfiguredAsGateway = false

Comment on lines +141 to +142
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting localNodeConfiguredAsGateway to false at the start of deriveFromPolicyGatewayConfig and then unconditionally setting it to true at line 175 means it will always be true when the function succeeds. This initialization appears unnecessary since the value is always overwritten before return. Consider removing the initial false assignment or documenting why this pattern is needed.

Suggested change
gwc.localNodeConfiguredAsGateway = false

Copilot uses AI. Check for mistakes.
switch {
case gc.iface != "":
// If the gateway config specifies an interface, use the first IPv4 assigned to that
// interface as egress IP
gwc.ifaceName = gc.iface
gwc.egressIP, err = netdevice.GetIfaceFirstIPv4Address(gc.iface)
if err != nil {
return fmt.Errorf("failed to retrieve IPv4 address for egress interface: %w", err)
Expand All @@ -143,7 +153,7 @@ func (gwc *gatewayConfig) deriveFromPolicyGatewayConfig(gc *policyGatewayConfig)
// If the gateway config specifies an egress IP, use the interface with that IP as egress
// interface
gwc.egressIP = gc.egressIP
err = netdevice.TestForIfaceWithIPv4Address(gc.egressIP)
gwc.ifaceName, err = netdevice.GetIfaceWithIPv4Address(gc.egressIP)
if err != nil {
return fmt.Errorf("failed to retrieve interface with egress IP: %w", err)
}
Expand All @@ -155,12 +165,15 @@ func (gwc *gatewayConfig) deriveFromPolicyGatewayConfig(gc *policyGatewayConfig)
return fmt.Errorf("failed to find interface with default route: %w", err)
}

gwc.egressIP, err = netdevice.GetIfaceFirstIPv4Address(iface.Attrs().Name)
gwc.ifaceName = iface.Attrs().Name
gwc.egressIP, err = netdevice.GetIfaceFirstIPv4Address(gwc.ifaceName)
if err != nil {
return fmt.Errorf("failed to retrieve IPv4 address for egress interface: %w", err)
}
}

gwc.localNodeConfiguredAsGateway = true

return nil
}

Expand Down