From f9c9da1544a4953b1b6ac29d8252b8f74fa32513 Mon Sep 17 00:00:00 2001 From: Andrew Li <12980971+Li357@users.noreply.github.com> Date: Tue, 2 Sep 2025 18:48:44 -0400 Subject: [PATCH 1/2] Use informers for pod labels and UIDs instead of querying apiserver every scrape --- internal/pkg/transformation/kubernetes.go | 135 ++++++++++++---------- internal/pkg/transformation/types.go | 4 + 2 files changed, 75 insertions(+), 64 deletions(-) diff --git a/internal/pkg/transformation/kubernetes.go b/internal/pkg/transformation/kubernetes.go index d91db3f9..46263a12 100644 --- a/internal/pkg/transformation/kubernetes.go +++ b/internal/pkg/transformation/kubernetes.go @@ -32,9 +32,12 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" @@ -110,6 +113,44 @@ func NewPodMapper(c *appconfig.Config) *PodMapper { podMapper.Client = clientset + if c.KubernetesEnablePodLabels || c.KubernetesEnablePodUID { + factory := informers.NewSharedInformerFactory(clientset, 30*time.Second) + podInformer := factory.Core().V1().Pods() + podMapper.informer = podInformer + + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if pod, ok := obj.(*corev1.Pod); ok { + podMapper.cachePod(pod) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if pod, ok := newObj.(*corev1.Pod); ok { + podMapper.cachePod(pod) + } + }, + DeleteFunc: func(obj interface{}) { + if pod, ok := obj.(*corev1.Pod); ok { + podMapper.deleteCachedPod(pod.Namespace, pod.Name) + return + } + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + if pod, ok := tombstone.Obj.(*corev1.Pod); ok { + podMapper.deleteCachedPod(pod.Namespace, pod.Name) + } + } + }, + }) + + stopCh := make(chan struct{}) + factory.Start(stopCh) + if !cache.WaitForCacheSync(stopCh, podInformer.Informer().HasSynced) { + slog.Warn("Pod informer cache failed to sync; pod metadata may be unavailable initially") + } else { + slog.Info("Pod informer cache synced") + } + } + if c.KubernetesEnableDRA { resourceSliceManager, err := NewDRAResourceSliceManager() if err != nil { @@ -265,7 +306,7 @@ func (p *PodMapper) Process(metrics collector.MetricsByCounter, deviceInfo devic metrics[counter][j].Attributes[oldNamespaceAttribute] = podInfo.Namespace metrics[counter][j].Attributes[oldContainerAttribute] = podInfo.Container } - + metrics[counter][j].Attributes[uidAttribute] = podInfo.UID maps.Copy(metrics[counter][j].Labels, podInfo.Labels) } @@ -723,83 +764,49 @@ func (p *PodMapper) toDeviceToPod( // createPodInfo creates a PodInfo struct with metadata if enabled func (p *PodMapper) createPodInfo(pod *podresourcesapi.PodResources, container *podresourcesapi.ContainerResources, metadataCache map[string]PodMetadata) PodInfo { - labels := map[string]string{} - uid := "" - cacheKey := pod.GetNamespace() + "/" + pod.GetName() - - // Check if we have cached metadata - cachedMetadata, hasCache := metadataCache[cacheKey] - - // Determine if we need labels - needLabels := p.Config.KubernetesEnablePodLabels && (cachedMetadata.Labels == nil) - - // Determine if we need UID - needUID := p.Config.KubernetesEnablePodUID && cachedMetadata.UID == "" - - // Only make API call if we need something that's not cached - if needLabels || needUID { - if podMetadata, err := p.getPodMetadata(pod.GetNamespace(), pod.GetName()); err != nil { - slog.Warn("Couldn't get pod metadata", - "pod", pod.GetName(), - "namespace", pod.GetNamespace(), - "error", err) - // Cache empty result to avoid repeated failures, but preserve existing cache data - if !hasCache { - metadataCache[cacheKey] = PodMetadata{} - } - } else { - // Update cache with new data, preserving existing data if we didn't fetch it - if needLabels { - cachedMetadata.Labels = podMetadata.Labels - } - if needUID { - cachedMetadata.UID = podMetadata.UID - } - metadataCache[cacheKey] = cachedMetadata - } - } - - // Extract the data we need based on config flags - if p.Config.KubernetesEnablePodLabels { - labels = cachedMetadata.Labels - } - if p.Config.KubernetesEnablePodUID { - uid = cachedMetadata.UID + var md PodMetadata + if p.Config.KubernetesEnablePodLabels || p.Config.KubernetesEnablePodUID { + md = p.getPodMetadata(pod.GetNamespace(), pod.GetName()) } return PodInfo{ Name: pod.GetName(), Namespace: pod.GetNamespace(), Container: container.GetName(), - UID: uid, - Labels: labels, + UID: md.UID, + Labels: md.Labels, } } -// getPodMetadata fetches metadata (labels and UID) from a Kubernetes pod via the API server. -// It sanitizes label names to ensure they are valid for Prometheus metrics. -func (p *PodMapper) getPodMetadata(namespace, podName string) (*PodMetadata, error) { - if p.Client == nil { - return nil, fmt.Errorf("kubernetes client is not initialized") - } - - ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) - defer cancel() - - pod, err := p.Client.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) - if err != nil { - return nil, err +func (p *PodMapper) cachePod(pod *corev1.Pod) { + if pod == nil { + return } - - // Sanitize label names sanitizedLabels := make(map[string]string, len(pod.Labels)) for k, v := range pod.Labels { sanitizedKey := utils.SanitizeLabelName(k) sanitizedLabels[sanitizedKey] = v } - - return &PodMetadata{ + md := PodMetadata{ UID: string(pod.UID), Labels: sanitizedLabels, - }, nil + } + key := pod.Namespace + "/" + pod.Name + p.podCacheMu.Lock() + defer p.podCacheMu.Unlock() + p.podCache[key] = md +} + +func (p *PodMapper) deleteCachedPod(namespace, name string) { + key := namespace + "/" + name + p.podCacheMu.Lock() + defer p.podCacheMu.Unlock() + delete(p.podCache, key) +} + +func (p *PodMapper) getPodMetadata(namespace, name string) PodMetadata { + key := namespace + "/" + name + p.podCacheMu.RLock() + defer p.podCacheMu.RUnlock() + return p.podCache[key] } diff --git a/internal/pkg/transformation/types.go b/internal/pkg/transformation/types.go index b7bb7994..ee1def68 100644 --- a/internal/pkg/transformation/types.go +++ b/internal/pkg/transformation/types.go @@ -21,6 +21,7 @@ import ( "sync" "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -40,6 +41,9 @@ type PodMapper struct { Config *appconfig.Config Client kubernetes.Interface ResourceSliceManager *DRAResourceSliceManager + informer coreinformers.PodInformer + podCacheMu sync.RWMutex + podCache map[string]PodMetadata } type PodInfo struct { From 135d7429214a36e42941af90a3335781334d427f Mon Sep 17 00:00:00 2001 From: Andrew Li <12980971+Li357@users.noreply.github.com> Date: Thu, 4 Sep 2025 12:13:55 -0400 Subject: [PATCH 2/2] Use injected clientset correctly in tests and fix nil map error --- internal/pkg/appconfig/types.go | 2 ++ internal/pkg/transformation/kubernetes.go | 27 ++++++++++--------- .../pkg/transformation/kubernetes_test.go | 9 +++---- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/internal/pkg/appconfig/types.go b/internal/pkg/appconfig/types.go index 5ef69791..397a6190 100644 --- a/internal/pkg/appconfig/types.go +++ b/internal/pkg/appconfig/types.go @@ -18,6 +18,7 @@ package appconfig import ( "github.com/NVIDIA/go-dcgm/pkg/dcgm" + "k8s.io/client-go/kubernetes" ) type KubernetesGPUIDType string @@ -69,4 +70,5 @@ type Config struct { KubernetesVirtualGPUs bool DumpConfig DumpConfig // Configuration for file-based dumps KubernetesEnableDRA bool + KubernetesClient kubernetes.Interface } diff --git a/internal/pkg/transformation/kubernetes.go b/internal/pkg/transformation/kubernetes.go index 46263a12..ff42a496 100644 --- a/internal/pkg/transformation/kubernetes.go +++ b/internal/pkg/transformation/kubernetes.go @@ -93,30 +93,33 @@ func NewPodMapper(c *appconfig.Config) *PodMapper { podMapper := &PodMapper{ Config: c, + Client: c.KubernetesClient, } if !c.KubernetesEnablePodLabels && !c.KubernetesEnablePodUID && !c.KubernetesEnableDRA { return podMapper } - clusterConfig, err := rest.InClusterConfig() - if err != nil { - slog.Warn("Failed to get in-cluster config, pod labels will not be available", "error", err) - return podMapper - } + if podMapper.Client == nil { + clusterConfig, err := rest.InClusterConfig() + if err != nil { + slog.Warn("Failed to get in-cluster config, pod labels will not be available", "error", err) + return podMapper + } - clientset, err := kubernetes.NewForConfig(clusterConfig) - if err != nil { - slog.Warn("Failed to get clientset, pod labels will not be available", "error", err) - return podMapper + clientset, err := kubernetes.NewForConfig(clusterConfig) + if err != nil { + slog.Warn("Failed to get clientset, pod labels will not be available", "error", err) + return podMapper + } + podMapper.Client = clientset } - podMapper.Client = clientset - if c.KubernetesEnablePodLabels || c.KubernetesEnablePodUID { - factory := informers.NewSharedInformerFactory(clientset, 30*time.Second) + factory := informers.NewSharedInformerFactory(podMapper.Client, 30*time.Second) podInformer := factory.Core().V1().Pods() podMapper.informer = podInformer + podMapper.podCache = make(map[string]PodMetadata) podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { diff --git a/internal/pkg/transformation/kubernetes_test.go b/internal/pkg/transformation/kubernetes_test.go index 400d8358..12dd13d2 100644 --- a/internal/pkg/transformation/kubernetes_test.go +++ b/internal/pkg/transformation/kubernetes_test.go @@ -502,9 +502,8 @@ func TestProcessPodMapper_WithLabels(t *testing.T) { KubernetesEnablePodLabels: true, KubernetesGPUIdType: appconfig.GPUUID, PodResourcesKubeletSocket: socketPath, + KubernetesClient: clientset, }) - // Inject the fake clientset - podMapper.Client = clientset // Setup metrics metrics := collector.MetricsByCounter{} @@ -727,9 +726,8 @@ func TestProcessPodMapper_WithUID(t *testing.T) { KubernetesEnablePodUID: true, KubernetesGPUIdType: appconfig.GPUUID, PodResourcesKubeletSocket: socketPath, + KubernetesClient: clientset, }) - // Inject the fake clientset - podMapper.Client = clientset // Setup metrics metrics := collector.MetricsByCounter{} @@ -837,9 +835,8 @@ func TestProcessPodMapper_WithLabelsAndUID(t *testing.T) { KubernetesEnablePodUID: true, KubernetesGPUIdType: appconfig.GPUUID, PodResourcesKubeletSocket: socketPath, + KubernetesClient: clientset, }) - // Inject the fake clientset - podMapper.Client = clientset // Setup metrics metrics := collector.MetricsByCounter{}