diff --git a/internal/pkg/appconfig/types.go b/internal/pkg/appconfig/types.go index 9e3d9709..b3283cc9 100644 --- a/internal/pkg/appconfig/types.go +++ b/internal/pkg/appconfig/types.go @@ -18,6 +18,7 @@ package appconfig import ( "github.com/NVIDIA/go-dcgm/pkg/dcgm" + "k8s.io/client-go/kubernetes" ) type KubernetesGPUIDType string @@ -69,5 +70,6 @@ type Config struct { KubernetesVirtualGPUs bool DumpConfig DumpConfig // Configuration for file-based dumps KubernetesEnableDRA bool + KubernetesClient kubernetes.Interface DisableStartupValidate bool } diff --git a/internal/pkg/transformation/kubernetes.go b/internal/pkg/transformation/kubernetes.go index afd7baf6..9bf1acb7 100644 --- a/internal/pkg/transformation/kubernetes.go +++ b/internal/pkg/transformation/kubernetes.go @@ -32,9 +32,12 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" @@ -90,25 +93,66 @@ func NewPodMapper(c *appconfig.Config) *PodMapper { podMapper := &PodMapper{ Config: c, + Client: c.KubernetesClient, } if !c.KubernetesEnablePodLabels && !c.KubernetesEnablePodUID && !c.KubernetesEnableDRA { return podMapper } - clusterConfig, err := rest.InClusterConfig() - if err != nil { - slog.Warn("Failed to get in-cluster config, pod labels will not be available", "error", err) - return podMapper - } + if podMapper.Client == nil { + clusterConfig, err := rest.InClusterConfig() + if err != nil { + slog.Warn("Failed to get in-cluster config, pod labels will not be available", "error", err) + return podMapper + } - clientset, err := kubernetes.NewForConfig(clusterConfig) - if err != nil { - slog.Warn("Failed to get clientset, pod labels will not be available", "error", err) - return podMapper + clientset, err := kubernetes.NewForConfig(clusterConfig) + if err != nil { + slog.Warn("Failed to get clientset, pod labels will not be available", "error", err) + return podMapper + } + podMapper.Client = clientset } - podMapper.Client = clientset + if c.KubernetesEnablePodLabels || c.KubernetesEnablePodUID { + factory := informers.NewSharedInformerFactory(podMapper.Client, 30*time.Second) + podInformer := factory.Core().V1().Pods() + podMapper.informer = podInformer + podMapper.podCache = make(map[string]PodMetadata) + + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if pod, ok := obj.(*corev1.Pod); ok { + podMapper.cachePod(pod) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if pod, ok := newObj.(*corev1.Pod); ok { + podMapper.cachePod(pod) + } + }, + DeleteFunc: func(obj interface{}) { + if pod, ok := obj.(*corev1.Pod); ok { + podMapper.deleteCachedPod(pod.Namespace, pod.Name) + return + } + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + if pod, ok := tombstone.Obj.(*corev1.Pod); ok { + podMapper.deleteCachedPod(pod.Namespace, pod.Name) + } + } + }, + }) + + stopCh := make(chan struct{}) + factory.Start(stopCh) + if !cache.WaitForCacheSync(stopCh, podInformer.Informer().HasSynced) { + slog.Warn("Pod informer cache failed to sync; pod metadata may be unavailable initially") + } else { + slog.Info("Pod informer cache synced") + } + } if c.KubernetesEnableDRA { resourceSliceManager, err := NewDRAResourceSliceManager() @@ -734,83 +778,49 @@ func (p *PodMapper) toDeviceToPod( // createPodInfo creates a PodInfo struct with metadata if enabled func (p *PodMapper) createPodInfo(pod *podresourcesapi.PodResources, container *podresourcesapi.ContainerResources, metadataCache map[string]PodMetadata) PodInfo { - labels := map[string]string{} - uid := "" - cacheKey := pod.GetNamespace() + "/" + pod.GetName() - - // Check if we have cached metadata - cachedMetadata, hasCache := metadataCache[cacheKey] - - // Determine if we need labels - needLabels := p.Config.KubernetesEnablePodLabels && (cachedMetadata.Labels == nil) - - // Determine if we need UID - needUID := p.Config.KubernetesEnablePodUID && cachedMetadata.UID == "" - - // Only make API call if we need something that's not cached - if needLabels || needUID { - if podMetadata, err := p.getPodMetadata(pod.GetNamespace(), pod.GetName()); err != nil { - slog.Warn("Couldn't get pod metadata", - "pod", pod.GetName(), - "namespace", pod.GetNamespace(), - "error", err) - // Cache empty result to avoid repeated failures, but preserve existing cache data - if !hasCache { - metadataCache[cacheKey] = PodMetadata{} - } - } else { - // Update cache with new data, preserving existing data if we didn't fetch it - if needLabels { - cachedMetadata.Labels = podMetadata.Labels - } - if needUID { - cachedMetadata.UID = podMetadata.UID - } - metadataCache[cacheKey] = cachedMetadata - } - } - - // Extract the data we need based on config flags - if p.Config.KubernetesEnablePodLabels { - labels = cachedMetadata.Labels - } - if p.Config.KubernetesEnablePodUID { - uid = cachedMetadata.UID + var md PodMetadata + if p.Config.KubernetesEnablePodLabels || p.Config.KubernetesEnablePodUID { + md = p.getPodMetadata(pod.GetNamespace(), pod.GetName()) } return PodInfo{ Name: pod.GetName(), Namespace: pod.GetNamespace(), Container: container.GetName(), - UID: uid, - Labels: labels, + UID: md.UID, + Labels: md.Labels, } } -// getPodMetadata fetches metadata (labels and UID) from a Kubernetes pod via the API server. -// It sanitizes label names to ensure they are valid for Prometheus metrics. -func (p *PodMapper) getPodMetadata(namespace, podName string) (*PodMetadata, error) { - if p.Client == nil { - return nil, fmt.Errorf("kubernetes client is not initialized") - } - - ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) - defer cancel() - - pod, err := p.Client.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) - if err != nil { - return nil, err +func (p *PodMapper) cachePod(pod *corev1.Pod) { + if pod == nil { + return } - - // Sanitize label names sanitizedLabels := make(map[string]string, len(pod.Labels)) for k, v := range pod.Labels { sanitizedKey := utils.SanitizeLabelName(k) sanitizedLabels[sanitizedKey] = v } - - return &PodMetadata{ + md := PodMetadata{ UID: string(pod.UID), Labels: sanitizedLabels, - }, nil + } + key := pod.Namespace + "/" + pod.Name + p.podCacheMu.Lock() + defer p.podCacheMu.Unlock() + p.podCache[key] = md +} + +func (p *PodMapper) deleteCachedPod(namespace, name string) { + key := namespace + "/" + name + p.podCacheMu.Lock() + defer p.podCacheMu.Unlock() + delete(p.podCache, key) +} + +func (p *PodMapper) getPodMetadata(namespace, name string) PodMetadata { + key := namespace + "/" + name + p.podCacheMu.RLock() + defer p.podCacheMu.RUnlock() + return p.podCache[key] } diff --git a/internal/pkg/transformation/kubernetes_test.go b/internal/pkg/transformation/kubernetes_test.go index 3736bd32..e53e2870 100644 --- a/internal/pkg/transformation/kubernetes_test.go +++ b/internal/pkg/transformation/kubernetes_test.go @@ -536,9 +536,8 @@ func TestProcessPodMapper_WithLabels(t *testing.T) { KubernetesEnablePodLabels: true, KubernetesGPUIdType: appconfig.GPUUID, PodResourcesKubeletSocket: socketPath, + KubernetesClient: clientset, }) - // Inject the fake clientset - podMapper.Client = clientset // Setup metrics metrics := collector.MetricsByCounter{} @@ -761,9 +760,8 @@ func TestProcessPodMapper_WithUID(t *testing.T) { KubernetesEnablePodUID: true, KubernetesGPUIdType: appconfig.GPUUID, PodResourcesKubeletSocket: socketPath, + KubernetesClient: clientset, }) - // Inject the fake clientset - podMapper.Client = clientset // Setup metrics metrics := collector.MetricsByCounter{} @@ -871,9 +869,8 @@ func TestProcessPodMapper_WithLabelsAndUID(t *testing.T) { KubernetesEnablePodUID: true, KubernetesGPUIdType: appconfig.GPUUID, PodResourcesKubeletSocket: socketPath, + KubernetesClient: clientset, }) - // Inject the fake clientset - podMapper.Client = clientset // Setup metrics metrics := collector.MetricsByCounter{} diff --git a/internal/pkg/transformation/types.go b/internal/pkg/transformation/types.go index b7bb7994..ee1def68 100644 --- a/internal/pkg/transformation/types.go +++ b/internal/pkg/transformation/types.go @@ -21,6 +21,7 @@ import ( "sync" "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -40,6 +41,9 @@ type PodMapper struct { Config *appconfig.Config Client kubernetes.Interface ResourceSliceManager *DRAResourceSliceManager + informer coreinformers.PodInformer + podCacheMu sync.RWMutex + podCache map[string]PodMetadata } type PodInfo struct {