From da16351b7b4d334acc5a309e39c6d6ce9307e95b Mon Sep 17 00:00:00 2001 From: vsoch Date: Sat, 5 Jul 2025 15:15:20 -0600 Subject: [PATCH] feat: discovery of rdma raw devices Problem: we only see rdma devices associated with a netlink. Solution: do a discovery loop that also lists all rdma devices. Signed-off-by: vsoch --- README.md | 8 ++++ pkg/inventory/db.go | 90 +++++++++++++++++++----------------- pkg/inventory/net.go | 107 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 163 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 50591f05..2be7908f 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,14 @@ Install the latest stable version of DraNet using the provided manifest: kubectl apply -f https://raw.githubusercontent.com/google/dranet/refs/heads/main/install.yaml ``` +### Development + +To build your own image for testing. Here is an example with a custom registry `ghcr.io/converged-computing`: + +```sh +REGISTRY=ghcr.io/converged-computing make image +``` + ### How to Use It Once DraNet is running, you can inspect the network interfaces and their diff --git a/pkg/inventory/db.go b/pkg/inventory/db.go index 4c7c3cad..e7088070 100644 --- a/pkg/inventory/db.go +++ b/pkg/inventory/db.go @@ -19,7 +19,6 @@ package inventory import ( "context" "fmt" - "net" "strings" "sync" "time" @@ -116,12 +115,8 @@ func (db *DB) GetPodNamespace(pod string) string { func (db *DB) Run(ctx context.Context) error { defer close(db.notifications) - nlHandle, err := netlink.NewHandle() - if err != nil { - return fmt.Errorf("error creating netlink handle %v", err) - } // Resources are published periodically or if there is a netlink notification - // indicating a new interfaces was added or changed + // indicating a new interfaces was added or changed. nlChannel := make(chan netlink.LinkUpdate) doneCh := make(chan struct{}) defer close(doneCh) @@ -129,12 +124,8 @@ func (db *DB) Run(ctx context.Context) error { klog.Error(err, "error subscribing to netlink interfaces, only syncing periodically", "interval", maxInterval.String()) } - // Obtain data that will not change after the startup + // Obtain data that will not change after the startup. db.instance = getInstanceProperties(ctx) - // TODO: it is not common but may happen in edge cases that the default gateway changes - // revisit once we have more evidence this can be a potential problem or break some use - // cases. - gwInterfaces := getDefaultGwInterfaces() for { err := db.rateLimiter.Wait(ctx) @@ -142,47 +133,60 @@ func (db *DB) Run(ctx context.Context) error { klog.Error(err, "unexpected rate limited error trying to get system interfaces") } - devices := []resourceapi.Device{} - ifaces, err := nlHandle.LinkList() + // Device lookup map is used to prevent duplicated. + devices := make(map[string]*resourceapi.Device) + + // Keep track of seen devices to not register an RDMA device twice. + seenRdmaDevices := sets.New[string]() + + // Kernel network interfaces are first priority. + netlinkDevices, err := db.discoverNetlinkDevices() if err != nil { - klog.Error(err, "unexpected error trying to get system interfaces") + return err } - for _, iface := range ifaces { - klog.V(7).InfoS("Checking network interface", "name", iface.Attrs().Name) - if gwInterfaces.Has(iface.Attrs().Name) { - klog.V(4).Infof("iface %s is an uplink interface", iface.Attrs().Name) - continue - } + for pciAddr, device := range netlinkDevices { - if ignoredInterfaceNames.Has(iface.Attrs().Name) { - klog.V(4).Infof("iface %s is in the list of ignored interfaces", iface.Attrs().Name) + // Do not add un-named netowrk device interfaces. + ifName, ok := device.Basic.Attributes["dra.net/ifName"] + if !ok { continue } - // skip loopback interfaces - if iface.Attrs().Flags&net.FlagLoopback != 0 { - continue + // If it has RDMA, mark as seen. + if rdmaName, err := rdmamap.GetRdmaDeviceForNetdevice(*ifName.StringValue); err == nil && rdmaName != "" { + klog.V(4).Infof("Found netdev '%s' with associated RDMA device '%s'. Merging.", *ifName.StringValue, rdmaName) + seenRdmaDevices.Insert(pciAddr) } + devices[*ifName.StringValue] = device + + } + // We only allow rdma devices that have PCI addresses. + for pciAddr, rdmaDevice := range db.discoverRawRdmaDevices() { - // publish this network interface - device, err := db.netdevToDRAdev(iface) - if err != nil { - klog.V(2).Infof("could not obtain attributes for iface %s : %v", iface.Attrs().Name, err) + // Have we already seen it? + _, ok := seenRdmaDevices[pciAddr] + if ok { continue } + devices[rdmaDevice.Name] = rdmaDevice + } - devices = append(devices, *device) - klog.V(4).Infof("Found following network interface %s", iface.Attrs().Name) + // Create the final list to publish. + finalDevices := make([]resourceapi.Device, 0, len(devices)) + for _, device := range devices { + finalDevices = append(finalDevices, *device) } - klog.V(4).Infof("Found %d devices", len(devices)) - if len(devices) > 0 { - db.notifications <- devices + klog.V(4).Infof("Found %d devices", len(finalDevices)) + if len(finalDevices) > 0 { + db.notifications <- finalDevices } + + // Wait for the next event or timeout. select { - // trigger a reconcile + // Trigger a reconcile. case <-nlChannel: - // drain the channel so we only sync once + // Drain the channel so we only sync once. for len(nlChannel) > 0 { <-nlChannel } @@ -207,13 +211,14 @@ func (db *DB) netdevToDRAdev(link netlink.Link) (*resourceapi.Device, error) { } // Set the device name. It will be normalized only if necessary. device.Name = names.SetDeviceName(ifName) - // expose the real interface name as an attribute in case it is normalized. + + // Expose the real interface name as an attribute in case it is normalized. device.Basic.Attributes["dra.net/ifName"] = resourceapi.DeviceAttribute{StringValue: &ifName} linkType := link.Type() linkAttrs := link.Attrs() - // identify the namespace holding the link as the other end of a veth pair + // Identify the namespace holding the link as the other end of a veth pair. netnsid := link.Attrs().NetNsID if podName := db.GetPodName(netnsid); podName != "" { device.Basic.Attributes["dra.net/pod"] = resourceapi.DeviceAttribute{StringValue: &podName} @@ -251,7 +256,7 @@ func (db *DB) netdevToDRAdev(link netlink.Link) (*resourceapi.Device, error) { device.Basic.Attributes["dra.net/alias"] = resourceapi.DeviceAttribute{StringValue: &linkAttrs.Alias} device.Basic.Attributes["dra.net/type"] = resourceapi.DeviceAttribute{StringValue: &linkType} - // Get eBPF properties from the interface using the legacy tc hooks + // Get eBPF properties from the interface using the legacy tc hooks. isEbpf := false filterNames, ok := getTcFilters(link) if ok { @@ -259,7 +264,7 @@ func (db *DB) netdevToDRAdev(link netlink.Link) (*resourceapi.Device, error) { device.Basic.Attributes["dra.net/tcFilterNames"] = resourceapi.DeviceAttribute{StringValue: ptr.To(strings.Join(filterNames, ","))} } - // Get eBPF properties from the interface using the tcx hooks + // Get eBPF properties from the interface using the tcx hooks. programNames, ok := getTcxFilters(link) if ok { isEbpf = true @@ -291,6 +296,7 @@ func (db *DB) netdevToDRAdev(link netlink.Link) (*resourceapi.Device, error) { return &device, nil } +// addPCIAttributes adds dra.net device.Attributes func addPCIAttributes(device *resourceapi.BasicDevice, ifName string, path string) { device.Attributes["dra.net/virtual"] = resourceapi.DeviceAttribute{BoolValue: ptr.To(false)} @@ -303,7 +309,7 @@ func addPCIAttributes(device *resourceapi.BasicDevice, ifName string, path strin device.Attributes["resource.kubernetes.io/pcieRoot"] = resourceapi.DeviceAttribute{StringValue: &pcieRoot} } } else { - klog.Infof("could not get pci root : %v", err) + klog.Infof("could not get pci root for %s: %v", ifName, err) } entry, err := ids(ifName, path) @@ -318,7 +324,7 @@ func addPCIAttributes(device *resourceapi.BasicDevice, ifName string, path strin device.Attributes["dra.net/pciSubsystem"] = resourceapi.DeviceAttribute{StringValue: &entry.Subsystem} } } else { - klog.Infof("could not get pci vendor information : %v", err) + klog.Infof("could not get pci vendor information for %s: %v", ifName, err) } numa, err := numaNode(ifName, path) diff --git a/pkg/inventory/net.go b/pkg/inventory/net.go index f2f14d9a..03d25c01 100644 --- a/pkg/inventory/net.go +++ b/pkg/inventory/net.go @@ -17,12 +17,19 @@ limitations under the License. package inventory import ( + "fmt" + "net" + + "github.com/Mellanox/rdmamap" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" + "github.com/google/dranet/pkg/names" "github.com/vishvananda/netlink" + resourceapi "k8s.io/api/resource/v1beta1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ) func getDefaultGwInterfaces() sets.Set[string] { @@ -118,3 +125,103 @@ func getTcxFilters(device netlink.Link) ([]string, bool) { } return programNames.UnsortedList(), isTcxEBPF } + +// discoverNetlinkDevices scans for kernel network interfaces +func (db *DB) discoverNetlinkDevices() (map[string]*resourceapi.Device, error) { + klog.V(4).Info("Starting netlink device discovery...") + devices := make(map[string]*resourceapi.Device) + + // TODO: it is not common but may happen in edge cases that the default gateway changes + // revisit once we have more evidence this can be a potential problem or break some use + // cases. + gwInterfaces := getDefaultGwInterfaces() + nlHandle, err := netlink.NewHandle() + if err != nil { + return devices, fmt.Errorf("error creating netlink handle %v", err) + } + + // Don't return early - we want print to user at end of function + // @vsoch This is logic from previous refactored version. + ifaces, err := nlHandle.LinkList() + if err != nil { + klog.Error(err, "unexpected error trying to get system interfaces") + } + + for _, iface := range ifaces { + attrs := iface.Attrs() + + klog.V(7).InfoS("Checking network interface", "name", attrs.Name) + if gwInterfaces.Has(attrs.Name) { + klog.V(4).Infof("iface %s is an uplink interface", attrs.Name) + continue + } + + if ignoredInterfaceNames.Has(attrs.Name) { + klog.V(4).Infof("iface %s is in the list of ignored interfaces", attrs.Name) + continue + } + + // Skip loopback interfaces. + if attrs.Flags&net.FlagLoopback != 0 { + continue + } + + // Publish this network interface. + device, err := db.netdevToDRAdev(iface) + if err != nil { + klog.V(2).Infof("could not obtain attributes for iface %s : %v", attrs.Name, err) + continue + } + + // This could be error prone if a missing address leads to a second entry in rdma devices. + pciAddress, err := bdfAddress(attrs.Name, rdmamap.RdmaClassDir) + pciAddr := pciAddress.device + if err != nil { + klog.Warningf("could not get PCI address for netdev %s, using fallback key. error: %v", attrs.Name, err) + pciAddr = "netdev-" + attrs.Name + } + devices[pciAddr] = device + } + klog.V(4).Infof("Finished netlink discovery. Found %d devices.", len(devices)) + return devices, nil +} + +// discoverRawRdmaDevices scans for raw RDMA devices using rdmamap listing +func (db *DB) discoverRawRdmaDevices() map[string]*resourceapi.Device { + klog.V(4).Info("Starting raw RDMA device discovery...") + devices := make(map[string]*resourceapi.Device) + + // This was tested to work to list an Infiniband device without an associated netlink. + deviceNames := rdmamap.GetRdmaDeviceList() + + for _, rdmaName := range deviceNames { + pciAddr, err := bdfAddress(rdmaName, rdmamap.RdmaClassDir) + + // Assume that a missing PCI address would be missing for both netlink and rdma (not sure if this is true). + // I think there are cases when we wouldn't have one, but I want to be conservative and only + // allow RDMA interfaces with associated PCI addresses. This can change if needed. + if err != nil { + klog.Warningf("could not get PCI address for RDMA device %s, skipping: %v", rdmaName, err) + continue + } + sanitizedName := names.SetDeviceName(rdmaName) + + // Create a new resourceapi device for the RDMA raw device. + device := &resourceapi.Device{ + Name: sanitizedName, + Basic: &resourceapi.BasicDevice{ + Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ + "dra.net/rdma": {BoolValue: ptr.To(true)}, + "dra.net/ifName": {StringValue: &rdmaName}, + // https://github.com/vishvananda/netlink/blob/master/nl/nl_linux.go#L143 + // This could also be ib, but "infiniband" is more clear + "dra.net/type": {StringValue: ptr.To("infiniband")}, + }, + }, + } + addPCIAttributes(device.Basic, rdmaName, rdmamap.RdmaClassDir) + devices[pciAddr.device] = device + } + klog.V(4).Infof("Finished raw RDMA discovery. Found %d devices.", len(devices)) + return devices +}