Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/pkg/appconfig/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package appconfig

import (
"github.com/NVIDIA/go-dcgm/pkg/dcgm"
"k8s.io/client-go/kubernetes"
)

type KubernetesGPUIDType string
Expand Down Expand Up @@ -69,5 +70,6 @@ type Config struct {
KubernetesVirtualGPUs bool
DumpConfig DumpConfig // Configuration for file-based dumps
KubernetesEnableDRA bool
KubernetesClient kubernetes.Interface
DisableStartupValidate bool
}
156 changes: 83 additions & 73 deletions internal/pkg/transformation/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"

Expand Down Expand Up @@ -90,25 +93,66 @@ func NewPodMapper(c *appconfig.Config) *PodMapper {

podMapper := &PodMapper{
Config: c,
Client: c.KubernetesClient,
}

if !c.KubernetesEnablePodLabels && !c.KubernetesEnablePodUID && !c.KubernetesEnableDRA {
return podMapper
}

clusterConfig, err := rest.InClusterConfig()
if err != nil {
slog.Warn("Failed to get in-cluster config, pod labels will not be available", "error", err)
return podMapper
}
if podMapper.Client == nil {
clusterConfig, err := rest.InClusterConfig()
if err != nil {
slog.Warn("Failed to get in-cluster config, pod labels will not be available", "error", err)
return podMapper
}

clientset, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
slog.Warn("Failed to get clientset, pod labels will not be available", "error", err)
return podMapper
clientset, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
slog.Warn("Failed to get clientset, pod labels will not be available", "error", err)
return podMapper
}
podMapper.Client = clientset
}

podMapper.Client = clientset
if c.KubernetesEnablePodLabels || c.KubernetesEnablePodUID {
factory := informers.NewSharedInformerFactory(podMapper.Client, 30*time.Second)
podInformer := factory.Core().V1().Pods()
podMapper.informer = podInformer
podMapper.podCache = make(map[string]PodMetadata)

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if pod, ok := obj.(*corev1.Pod); ok {
podMapper.cachePod(pod)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if pod, ok := newObj.(*corev1.Pod); ok {
podMapper.cachePod(pod)
}
},
DeleteFunc: func(obj interface{}) {
if pod, ok := obj.(*corev1.Pod); ok {
podMapper.deleteCachedPod(pod.Namespace, pod.Name)
return
}
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
if pod, ok := tombstone.Obj.(*corev1.Pod); ok {
podMapper.deleteCachedPod(pod.Namespace, pod.Name)
}
}
},
})

stopCh := make(chan struct{})
factory.Start(stopCh)
if !cache.WaitForCacheSync(stopCh, podInformer.Informer().HasSynced) {
slog.Warn("Pod informer cache failed to sync; pod metadata may be unavailable initially")
} else {
slog.Info("Pod informer cache synced")
}
}

if c.KubernetesEnableDRA {
resourceSliceManager, err := NewDRAResourceSliceManager()
Expand Down Expand Up @@ -734,83 +778,49 @@ func (p *PodMapper) toDeviceToPod(

// createPodInfo creates a PodInfo struct with metadata if enabled
func (p *PodMapper) createPodInfo(pod *podresourcesapi.PodResources, container *podresourcesapi.ContainerResources, metadataCache map[string]PodMetadata) PodInfo {
labels := map[string]string{}
uid := ""
cacheKey := pod.GetNamespace() + "/" + pod.GetName()

// Check if we have cached metadata
cachedMetadata, hasCache := metadataCache[cacheKey]

// Determine if we need labels
needLabels := p.Config.KubernetesEnablePodLabels && (cachedMetadata.Labels == nil)

// Determine if we need UID
needUID := p.Config.KubernetesEnablePodUID && cachedMetadata.UID == ""

// Only make API call if we need something that's not cached
if needLabels || needUID {
if podMetadata, err := p.getPodMetadata(pod.GetNamespace(), pod.GetName()); err != nil {
slog.Warn("Couldn't get pod metadata",
"pod", pod.GetName(),
"namespace", pod.GetNamespace(),
"error", err)
// Cache empty result to avoid repeated failures, but preserve existing cache data
if !hasCache {
metadataCache[cacheKey] = PodMetadata{}
}
} else {
// Update cache with new data, preserving existing data if we didn't fetch it
if needLabels {
cachedMetadata.Labels = podMetadata.Labels
}
if needUID {
cachedMetadata.UID = podMetadata.UID
}
metadataCache[cacheKey] = cachedMetadata
}
}

// Extract the data we need based on config flags
if p.Config.KubernetesEnablePodLabels {
labels = cachedMetadata.Labels
}
if p.Config.KubernetesEnablePodUID {
uid = cachedMetadata.UID
var md PodMetadata
if p.Config.KubernetesEnablePodLabels || p.Config.KubernetesEnablePodUID {
md = p.getPodMetadata(pod.GetNamespace(), pod.GetName())
}

return PodInfo{
Name: pod.GetName(),
Namespace: pod.GetNamespace(),
Container: container.GetName(),
UID: uid,
Labels: labels,
UID: md.UID,
Labels: md.Labels,
}
}

// getPodMetadata fetches metadata (labels and UID) from a Kubernetes pod via the API server.
// It sanitizes label names to ensure they are valid for Prometheus metrics.
func (p *PodMapper) getPodMetadata(namespace, podName string) (*PodMetadata, error) {
if p.Client == nil {
return nil, fmt.Errorf("kubernetes client is not initialized")
}

ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel()

pod, err := p.Client.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return nil, err
func (p *PodMapper) cachePod(pod *corev1.Pod) {
if pod == nil {
return
}

// Sanitize label names
sanitizedLabels := make(map[string]string, len(pod.Labels))
for k, v := range pod.Labels {
sanitizedKey := utils.SanitizeLabelName(k)
sanitizedLabels[sanitizedKey] = v
}

return &PodMetadata{
md := PodMetadata{
UID: string(pod.UID),
Labels: sanitizedLabels,
}, nil
}
key := pod.Namespace + "/" + pod.Name
p.podCacheMu.Lock()
defer p.podCacheMu.Unlock()
p.podCache[key] = md
}

func (p *PodMapper) deleteCachedPod(namespace, name string) {
key := namespace + "/" + name
p.podCacheMu.Lock()
defer p.podCacheMu.Unlock()
delete(p.podCache, key)
}

func (p *PodMapper) getPodMetadata(namespace, name string) PodMetadata {
key := namespace + "/" + name
p.podCacheMu.RLock()
defer p.podCacheMu.RUnlock()
return p.podCache[key]
}
9 changes: 3 additions & 6 deletions internal/pkg/transformation/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,8 @@ func TestProcessPodMapper_WithLabels(t *testing.T) {
KubernetesEnablePodLabels: true,
KubernetesGPUIdType: appconfig.GPUUID,
PodResourcesKubeletSocket: socketPath,
KubernetesClient: clientset,
})
// Inject the fake clientset
podMapper.Client = clientset

// Setup metrics
metrics := collector.MetricsByCounter{}
Expand Down Expand Up @@ -761,9 +760,8 @@ func TestProcessPodMapper_WithUID(t *testing.T) {
KubernetesEnablePodUID: true,
KubernetesGPUIdType: appconfig.GPUUID,
PodResourcesKubeletSocket: socketPath,
KubernetesClient: clientset,
})
// Inject the fake clientset
podMapper.Client = clientset

// Setup metrics
metrics := collector.MetricsByCounter{}
Expand Down Expand Up @@ -871,9 +869,8 @@ func TestProcessPodMapper_WithLabelsAndUID(t *testing.T) {
KubernetesEnablePodUID: true,
KubernetesGPUIdType: appconfig.GPUUID,
PodResourcesKubeletSocket: socketPath,
KubernetesClient: clientset,
})
// Inject the fake clientset
podMapper.Client = clientset

// Setup metrics
metrics := collector.MetricsByCounter{}
Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/transformation/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"

"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

Expand All @@ -40,6 +41,9 @@ type PodMapper struct {
Config *appconfig.Config
Client kubernetes.Interface
ResourceSliceManager *DRAResourceSliceManager
informer coreinformers.PodInformer
podCacheMu sync.RWMutex
podCache map[string]PodMetadata
}

type PodInfo struct {
Expand Down