diff --git a/api/v1/kube/kube_api.pb.go b/api/v1/kube/kube_api.pb.go index 644e5787..fda8cc5d 100644 --- a/api/v1/kube/kube_api.pb.go +++ b/api/v1/kube/kube_api.pb.go @@ -1,17 +1,18 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.9 -// protoc v6.32.0 +// protoc-gen-go v1.36.11 +// protoc v6.33.0 // source: api/v1/kube/kube_api.proto package v1 import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" unsafe "unsafe" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( @@ -1370,6 +1371,266 @@ func (x *RuntimeStats) GetContainerFs() *FsStats { return nil } +type GetPodVolumesRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + NodeName string `protobuf:"bytes,1,opt,name=node_name,json=nodeName,proto3" json:"node_name,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetPodVolumesRequest) Reset() { + *x = GetPodVolumesRequest{} + mi := &file_api_v1_kube_kube_api_proto_msgTypes[21] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetPodVolumesRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetPodVolumesRequest) ProtoMessage() {} + +func (x *GetPodVolumesRequest) ProtoReflect() protoreflect.Message { + mi := &file_api_v1_kube_kube_api_proto_msgTypes[21] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetPodVolumesRequest.ProtoReflect.Descriptor instead. +func (*GetPodVolumesRequest) Descriptor() ([]byte, []int) { + return file_api_v1_kube_kube_api_proto_rawDescGZIP(), []int{21} +} + +func (x *GetPodVolumesRequest) GetNodeName() string { + if x != nil { + return x.NodeName + } + return "" +} + +type GetPodVolumesResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Volumes []*PodVolumeInfo `protobuf:"bytes,1,rep,name=volumes,proto3" json:"volumes,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetPodVolumesResponse) Reset() { + *x = GetPodVolumesResponse{} + mi := &file_api_v1_kube_kube_api_proto_msgTypes[22] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetPodVolumesResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetPodVolumesResponse) ProtoMessage() {} + +func (x *GetPodVolumesResponse) ProtoReflect() protoreflect.Message { + mi := &file_api_v1_kube_kube_api_proto_msgTypes[22] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetPodVolumesResponse.ProtoReflect.Descriptor instead. +func (*GetPodVolumesResponse) Descriptor() ([]byte, []int) { + return file_api_v1_kube_kube_api_proto_rawDescGZIP(), []int{22} +} + +func (x *GetPodVolumesResponse) GetVolumes() []*PodVolumeInfo { + if x != nil { + return x.Volumes + } + return nil +} + +type PodVolumeInfo struct { + state protoimpl.MessageState `protogen:"open.v1"` + Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` + PodName string `protobuf:"bytes,2,opt,name=pod_name,json=podName,proto3" json:"pod_name,omitempty"` + PodUid string `protobuf:"bytes,3,opt,name=pod_uid,json=podUid,proto3" json:"pod_uid,omitempty"` + ControllerKind string `protobuf:"bytes,4,opt,name=controller_kind,json=controllerKind,proto3" json:"controller_kind,omitempty"` + ControllerName string `protobuf:"bytes,5,opt,name=controller_name,json=controllerName,proto3" json:"controller_name,omitempty"` + ContainerName string `protobuf:"bytes,6,opt,name=container_name,json=containerName,proto3" json:"container_name,omitempty"` + VolumeName string `protobuf:"bytes,7,opt,name=volume_name,json=volumeName,proto3" json:"volume_name,omitempty"` + MountPath string `protobuf:"bytes,8,opt,name=mount_path,json=mountPath,proto3" json:"mount_path,omitempty"` + PvcName string `protobuf:"bytes,9,opt,name=pvc_name,json=pvcName,proto3" json:"pvc_name,omitempty"` + PvcUid string `protobuf:"bytes,10,opt,name=pvc_uid,json=pvcUid,proto3" json:"pvc_uid,omitempty"` + RequestedSizeBytes int64 `protobuf:"varint,11,opt,name=requested_size_bytes,json=requestedSizeBytes,proto3" json:"requested_size_bytes,omitempty"` + PvName string `protobuf:"bytes,12,opt,name=pv_name,json=pvName,proto3" json:"pv_name,omitempty"` + StorageClass string `protobuf:"bytes,13,opt,name=storage_class,json=storageClass,proto3" json:"storage_class,omitempty"` + CsiDriver string `protobuf:"bytes,14,opt,name=csi_driver,json=csiDriver,proto3" json:"csi_driver,omitempty"` + CsiVolumeHandle string `protobuf:"bytes,15,opt,name=csi_volume_handle,json=csiVolumeHandle,proto3" json:"csi_volume_handle,omitempty"` + VolumeMode string `protobuf:"bytes,16,opt,name=volume_mode,json=volumeMode,proto3" json:"volume_mode,omitempty"` // "Filesystem" or "Block" + DevicePath string `protobuf:"bytes,17,opt,name=device_path,json=devicePath,proto3" json:"device_path,omitempty"` // For block volumes: container's volumeDevices[].devicePath + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PodVolumeInfo) Reset() { + *x = PodVolumeInfo{} + mi := &file_api_v1_kube_kube_api_proto_msgTypes[23] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PodVolumeInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PodVolumeInfo) ProtoMessage() {} + +func (x *PodVolumeInfo) ProtoReflect() protoreflect.Message { + mi := &file_api_v1_kube_kube_api_proto_msgTypes[23] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PodVolumeInfo.ProtoReflect.Descriptor instead. +func (*PodVolumeInfo) Descriptor() ([]byte, []int) { + return file_api_v1_kube_kube_api_proto_rawDescGZIP(), []int{23} +} + +func (x *PodVolumeInfo) GetNamespace() string { + if x != nil { + return x.Namespace + } + return "" +} + +func (x *PodVolumeInfo) GetPodName() string { + if x != nil { + return x.PodName + } + return "" +} + +func (x *PodVolumeInfo) GetPodUid() string { + if x != nil { + return x.PodUid + } + return "" +} + +func (x *PodVolumeInfo) GetControllerKind() string { + if x != nil { + return x.ControllerKind + } + return "" +} + +func (x *PodVolumeInfo) GetControllerName() string { + if x != nil { + return x.ControllerName + } + return "" +} + +func (x *PodVolumeInfo) GetContainerName() string { + if x != nil { + return x.ContainerName + } + return "" +} + +func (x *PodVolumeInfo) GetVolumeName() string { + if x != nil { + return x.VolumeName + } + return "" +} + +func (x *PodVolumeInfo) GetMountPath() string { + if x != nil { + return x.MountPath + } + return "" +} + +func (x *PodVolumeInfo) GetPvcName() string { + if x != nil { + return x.PvcName + } + return "" +} + +func (x *PodVolumeInfo) GetPvcUid() string { + if x != nil { + return x.PvcUid + } + return "" +} + +func (x *PodVolumeInfo) GetRequestedSizeBytes() int64 { + if x != nil { + return x.RequestedSizeBytes + } + return 0 +} + +func (x *PodVolumeInfo) GetPvName() string { + if x != nil { + return x.PvName + } + return "" +} + +func (x *PodVolumeInfo) GetStorageClass() string { + if x != nil { + return x.StorageClass + } + return "" +} + +func (x *PodVolumeInfo) GetCsiDriver() string { + if x != nil { + return x.CsiDriver + } + return "" +} + +func (x *PodVolumeInfo) GetCsiVolumeHandle() string { + if x != nil { + return x.CsiVolumeHandle + } + return "" +} + +func (x *PodVolumeInfo) GetVolumeMode() string { + if x != nil { + return x.VolumeMode + } + return "" +} + +func (x *PodVolumeInfo) GetDevicePath() string { + if x != nil { + return x.DevicePath + } + return "" +} + var File_api_v1_kube_kube_api_proto protoreflect.FileDescriptor const file_api_v1_kube_kube_api_proto_rawDesc = "" + @@ -1471,7 +1732,35 @@ const file_api_v1_kube_kube_api_proto_rawDesc = "" + "\fRuntimeStats\x12!\n" + "\ftime_seconds\x18\x01 \x01(\x03R\vtimeSeconds\x12+\n" + "\bimage_fs\x18\x02 \x01(\v2\x10.kube.v1.FsStatsR\aimageFs\x123\n" + - "\fcontainer_fs\x18\x03 \x01(\v2\x10.kube.v1.FsStatsR\vcontainerFs*\xed\x01\n" + + "\fcontainer_fs\x18\x03 \x01(\v2\x10.kube.v1.FsStatsR\vcontainerFs\"3\n" + + "\x14GetPodVolumesRequest\x12\x1b\n" + + "\tnode_name\x18\x01 \x01(\tR\bnodeName\"I\n" + + "\x15GetPodVolumesResponse\x120\n" + + "\avolumes\x18\x01 \x03(\v2\x16.kube.v1.PodVolumeInfoR\avolumes\"\xcb\x04\n" + + "\rPodVolumeInfo\x12\x1c\n" + + "\tnamespace\x18\x01 \x01(\tR\tnamespace\x12\x19\n" + + "\bpod_name\x18\x02 \x01(\tR\apodName\x12\x17\n" + + "\apod_uid\x18\x03 \x01(\tR\x06podUid\x12'\n" + + "\x0fcontroller_kind\x18\x04 \x01(\tR\x0econtrollerKind\x12'\n" + + "\x0fcontroller_name\x18\x05 \x01(\tR\x0econtrollerName\x12%\n" + + "\x0econtainer_name\x18\x06 \x01(\tR\rcontainerName\x12\x1f\n" + + "\vvolume_name\x18\a \x01(\tR\n" + + "volumeName\x12\x1d\n" + + "\n" + + "mount_path\x18\b \x01(\tR\tmountPath\x12\x19\n" + + "\bpvc_name\x18\t \x01(\tR\apvcName\x12\x17\n" + + "\apvc_uid\x18\n" + + " \x01(\tR\x06pvcUid\x120\n" + + "\x14requested_size_bytes\x18\v \x01(\x03R\x12requestedSizeBytes\x12\x17\n" + + "\apv_name\x18\f \x01(\tR\x06pvName\x12#\n" + + "\rstorage_class\x18\r \x01(\tR\fstorageClass\x12\x1d\n" + + "\n" + + "csi_driver\x18\x0e \x01(\tR\tcsiDriver\x12*\n" + + "\x11csi_volume_handle\x18\x0f \x01(\tR\x0fcsiVolumeHandle\x12\x1f\n" + + "\vvolume_mode\x18\x10 \x01(\tR\n" + + "volumeMode\x12\x1f\n" + + "\vdevice_path\x18\x11 \x01(\tR\n" + + "devicePath*\xed\x01\n" + "\fWorkloadKind\x12\x19\n" + "\x15WORKLOAD_KIND_UNKNOWN\x10\x00\x12\x1c\n" + "\x18WORKLOAD_KIND_DEPLOYMENT\x10\x01\x12\x1d\n" + @@ -1480,7 +1769,7 @@ const file_api_v1_kube_kube_api_proto_rawDesc = "" + "\x18WORKLOAD_KIND_DAEMON_SET\x10\x04\x12\x15\n" + "\x11WORKLOAD_KIND_JOB\x10\x05\x12\x19\n" + "\x15WORKLOAD_KIND_CRONJOB\x10\x06\x12\x15\n" + - "\x11WORKLOAD_KIND_POD\x10\a2\xc2\x03\n" + + "\x11WORKLOAD_KIND_POD\x10\a2\x92\x04\n" + "\aKubeAPI\x12Q\n" + "\x0eGetClusterInfo\x12\x1e.kube.v1.GetClusterInfoRequest\x1a\x1f.kube.v1.GetClusterInfoResponse\x12B\n" + "\tGetIPInfo\x12\x19.kube.v1.GetIPInfoRequest\x1a\x1a.kube.v1.GetIPInfoResponse\x12E\n" + @@ -1488,7 +1777,8 @@ const file_api_v1_kube_kube_api_proto_rawDesc = "" + "GetIPsInfo\x12\x1a.kube.v1.GetIPsInfoRequest\x1a\x1b.kube.v1.GetIPsInfoResponse\x129\n" + "\x06GetPod\x12\x16.kube.v1.GetPodRequest\x1a\x17.kube.v1.GetPodResponse\x12<\n" + "\aGetNode\x12\x17.kube.v1.GetNodeRequest\x1a\x18.kube.v1.GetNodeResponse\x12`\n" + - "\x13GetNodeStatsSummary\x12#.kube.v1.GetNodeStatsSummaryRequest\x1a$.kube.v1.GetNodeStatsSummaryResponseB&Z$github.com/castai/kvisor/api/kube/v1b\x06proto3" + "\x13GetNodeStatsSummary\x12#.kube.v1.GetNodeStatsSummaryRequest\x1a$.kube.v1.GetNodeStatsSummaryResponse\x12N\n" + + "\rGetPodVolumes\x12\x1d.kube.v1.GetPodVolumesRequest\x1a\x1e.kube.v1.GetPodVolumesResponseB&Z$github.com/castai/kvisor/api/kube/v1b\x06proto3" var ( file_api_v1_kube_kube_api_proto_rawDescOnce sync.Once @@ -1503,7 +1793,7 @@ func file_api_v1_kube_kube_api_proto_rawDescGZIP() []byte { } var file_api_v1_kube_kube_api_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_api_v1_kube_kube_api_proto_msgTypes = make([]protoimpl.MessageInfo, 22) +var file_api_v1_kube_kube_api_proto_msgTypes = make([]protoimpl.MessageInfo, 25) var file_api_v1_kube_kube_api_proto_goTypes = []any{ (WorkloadKind)(0), // 0: kube.v1.WorkloadKind (*GetClusterInfoRequest)(nil), // 1: kube.v1.GetClusterInfoRequest @@ -1527,7 +1817,10 @@ var file_api_v1_kube_kube_api_proto_goTypes = []any{ (*NetworkStats)(nil), // 19: kube.v1.NetworkStats (*FsStats)(nil), // 20: kube.v1.FsStats (*RuntimeStats)(nil), // 21: kube.v1.RuntimeStats - nil, // 22: kube.v1.Node.LabelsEntry + (*GetPodVolumesRequest)(nil), // 22: kube.v1.GetPodVolumesRequest + (*GetPodVolumesResponse)(nil), // 23: kube.v1.GetPodVolumesResponse + (*PodVolumeInfo)(nil), // 24: kube.v1.PodVolumeInfo + nil, // 25: kube.v1.Node.LabelsEntry } var file_api_v1_kube_kube_api_proto_depIdxs = []int32{ 7, // 0: kube.v1.GetIPInfoResponse.info:type_name -> kube.v1.IPInfo @@ -1535,7 +1828,7 @@ var file_api_v1_kube_kube_api_proto_depIdxs = []int32{ 10, // 2: kube.v1.GetPodResponse.pod:type_name -> kube.v1.Pod 0, // 3: kube.v1.Pod.workload_kind:type_name -> kube.v1.WorkloadKind 13, // 4: kube.v1.GetNodeResponse.node:type_name -> kube.v1.Node - 22, // 5: kube.v1.Node.labels:type_name -> kube.v1.Node.LabelsEntry + 25, // 5: kube.v1.Node.labels:type_name -> kube.v1.Node.LabelsEntry 16, // 6: kube.v1.GetNodeStatsSummaryResponse.node:type_name -> kube.v1.NodeStats 17, // 7: kube.v1.NodeStats.cpu:type_name -> kube.v1.CPUStats 18, // 8: kube.v1.NodeStats.memory:type_name -> kube.v1.MemoryStats @@ -1544,23 +1837,26 @@ var file_api_v1_kube_kube_api_proto_depIdxs = []int32{ 21, // 11: kube.v1.NodeStats.runtime:type_name -> kube.v1.RuntimeStats 20, // 12: kube.v1.RuntimeStats.image_fs:type_name -> kube.v1.FsStats 20, // 13: kube.v1.RuntimeStats.container_fs:type_name -> kube.v1.FsStats - 1, // 14: kube.v1.KubeAPI.GetClusterInfo:input_type -> kube.v1.GetClusterInfoRequest - 3, // 15: kube.v1.KubeAPI.GetIPInfo:input_type -> kube.v1.GetIPInfoRequest - 5, // 16: kube.v1.KubeAPI.GetIPsInfo:input_type -> kube.v1.GetIPsInfoRequest - 8, // 17: kube.v1.KubeAPI.GetPod:input_type -> kube.v1.GetPodRequest - 11, // 18: kube.v1.KubeAPI.GetNode:input_type -> kube.v1.GetNodeRequest - 14, // 19: kube.v1.KubeAPI.GetNodeStatsSummary:input_type -> kube.v1.GetNodeStatsSummaryRequest - 2, // 20: kube.v1.KubeAPI.GetClusterInfo:output_type -> kube.v1.GetClusterInfoResponse - 4, // 21: kube.v1.KubeAPI.GetIPInfo:output_type -> kube.v1.GetIPInfoResponse - 6, // 22: kube.v1.KubeAPI.GetIPsInfo:output_type -> kube.v1.GetIPsInfoResponse - 9, // 23: kube.v1.KubeAPI.GetPod:output_type -> kube.v1.GetPodResponse - 12, // 24: kube.v1.KubeAPI.GetNode:output_type -> kube.v1.GetNodeResponse - 15, // 25: kube.v1.KubeAPI.GetNodeStatsSummary:output_type -> kube.v1.GetNodeStatsSummaryResponse - 20, // [20:26] is the sub-list for method output_type - 14, // [14:20] is the sub-list for method input_type - 14, // [14:14] is the sub-list for extension type_name - 14, // [14:14] is the sub-list for extension extendee - 0, // [0:14] is the sub-list for field type_name + 24, // 14: kube.v1.GetPodVolumesResponse.volumes:type_name -> kube.v1.PodVolumeInfo + 1, // 15: kube.v1.KubeAPI.GetClusterInfo:input_type -> kube.v1.GetClusterInfoRequest + 3, // 16: kube.v1.KubeAPI.GetIPInfo:input_type -> kube.v1.GetIPInfoRequest + 5, // 17: kube.v1.KubeAPI.GetIPsInfo:input_type -> kube.v1.GetIPsInfoRequest + 8, // 18: kube.v1.KubeAPI.GetPod:input_type -> kube.v1.GetPodRequest + 11, // 19: kube.v1.KubeAPI.GetNode:input_type -> kube.v1.GetNodeRequest + 14, // 20: kube.v1.KubeAPI.GetNodeStatsSummary:input_type -> kube.v1.GetNodeStatsSummaryRequest + 22, // 21: kube.v1.KubeAPI.GetPodVolumes:input_type -> kube.v1.GetPodVolumesRequest + 2, // 22: kube.v1.KubeAPI.GetClusterInfo:output_type -> kube.v1.GetClusterInfoResponse + 4, // 23: kube.v1.KubeAPI.GetIPInfo:output_type -> kube.v1.GetIPInfoResponse + 6, // 24: kube.v1.KubeAPI.GetIPsInfo:output_type -> kube.v1.GetIPsInfoResponse + 9, // 25: kube.v1.KubeAPI.GetPod:output_type -> kube.v1.GetPodResponse + 12, // 26: kube.v1.KubeAPI.GetNode:output_type -> kube.v1.GetNodeResponse + 15, // 27: kube.v1.KubeAPI.GetNodeStatsSummary:output_type -> kube.v1.GetNodeStatsSummaryResponse + 23, // 28: kube.v1.KubeAPI.GetPodVolumes:output_type -> kube.v1.GetPodVolumesResponse + 22, // [22:29] is the sub-list for method output_type + 15, // [15:22] is the sub-list for method input_type + 15, // [15:15] is the sub-list for extension type_name + 15, // [15:15] is the sub-list for extension extendee + 0, // [0:15] is the sub-list for field type_name } func init() { file_api_v1_kube_kube_api_proto_init() } @@ -1574,7 +1870,7 @@ func file_api_v1_kube_kube_api_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_api_v1_kube_kube_api_proto_rawDesc), len(file_api_v1_kube_kube_api_proto_rawDesc)), NumEnums: 1, - NumMessages: 22, + NumMessages: 25, NumExtensions: 0, NumServices: 1, }, diff --git a/api/v1/kube/kube_api.proto b/api/v1/kube/kube_api.proto index ea27ed77..5950f6b1 100644 --- a/api/v1/kube/kube_api.proto +++ b/api/v1/kube/kube_api.proto @@ -12,6 +12,7 @@ service KubeAPI { rpc GetPod(GetPodRequest) returns (GetPodResponse); rpc GetNode(GetNodeRequest) returns (GetNodeResponse); rpc GetNodeStatsSummary(GetNodeStatsSummaryRequest) returns (GetNodeStatsSummaryResponse); + rpc GetPodVolumes(GetPodVolumesRequest) returns (GetPodVolumesResponse); } message GetClusterInfoRequest {} @@ -152,3 +153,31 @@ message RuntimeStats { FsStats image_fs = 2; FsStats container_fs = 3; } + +message GetPodVolumesRequest { + string node_name = 1; +} + +message GetPodVolumesResponse { + repeated PodVolumeInfo volumes = 1; +} + +message PodVolumeInfo { + string namespace = 1; + string pod_name = 2; + string pod_uid = 3; + string controller_kind = 4; + string controller_name = 5; + string container_name = 6; + string volume_name = 7; + string mount_path = 8; + string pvc_name = 9; + string pvc_uid = 10; + int64 requested_size_bytes = 11; + string pv_name = 12; + string storage_class = 13; + string csi_driver = 14; + string csi_volume_handle = 15; + string volume_mode = 16; // "Filesystem" or "Block" + string device_path = 17; // For block volumes: container's volumeDevices[].devicePath +} diff --git a/api/v1/kube/kube_api_grpc.pb.go b/api/v1/kube/kube_api_grpc.pb.go index c80009c8..b4a99541 100644 --- a/api/v1/kube/kube_api_grpc.pb.go +++ b/api/v1/kube/kube_api_grpc.pb.go @@ -1,13 +1,14 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.5.1 -// - protoc v6.32.0 +// - protoc-gen-go-grpc v1.6.0 +// - protoc v6.33.0 // source: api/v1/kube/kube_api.proto package v1 import ( context "context" + grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" @@ -25,6 +26,7 @@ const ( KubeAPI_GetPod_FullMethodName = "/kube.v1.KubeAPI/GetPod" KubeAPI_GetNode_FullMethodName = "/kube.v1.KubeAPI/GetNode" KubeAPI_GetNodeStatsSummary_FullMethodName = "/kube.v1.KubeAPI/GetNodeStatsSummary" + KubeAPI_GetPodVolumes_FullMethodName = "/kube.v1.KubeAPI/GetPodVolumes" ) // KubeAPIClient is the client API for KubeAPI service. @@ -38,6 +40,7 @@ type KubeAPIClient interface { GetPod(ctx context.Context, in *GetPodRequest, opts ...grpc.CallOption) (*GetPodResponse, error) GetNode(ctx context.Context, in *GetNodeRequest, opts ...grpc.CallOption) (*GetNodeResponse, error) GetNodeStatsSummary(ctx context.Context, in *GetNodeStatsSummaryRequest, opts ...grpc.CallOption) (*GetNodeStatsSummaryResponse, error) + GetPodVolumes(ctx context.Context, in *GetPodVolumesRequest, opts ...grpc.CallOption) (*GetPodVolumesResponse, error) } type kubeAPIClient struct { @@ -108,6 +111,16 @@ func (c *kubeAPIClient) GetNodeStatsSummary(ctx context.Context, in *GetNodeStat return out, nil } +func (c *kubeAPIClient) GetPodVolumes(ctx context.Context, in *GetPodVolumesRequest, opts ...grpc.CallOption) (*GetPodVolumesResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetPodVolumesResponse) + err := c.cc.Invoke(ctx, KubeAPI_GetPodVolumes_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // KubeAPIServer is the server API for KubeAPI service. // All implementations should embed UnimplementedKubeAPIServer // for forward compatibility. @@ -119,6 +132,7 @@ type KubeAPIServer interface { GetPod(context.Context, *GetPodRequest) (*GetPodResponse, error) GetNode(context.Context, *GetNodeRequest) (*GetNodeResponse, error) GetNodeStatsSummary(context.Context, *GetNodeStatsSummaryRequest) (*GetNodeStatsSummaryResponse, error) + GetPodVolumes(context.Context, *GetPodVolumesRequest) (*GetPodVolumesResponse, error) } // UnimplementedKubeAPIServer should be embedded to have @@ -129,22 +143,25 @@ type KubeAPIServer interface { type UnimplementedKubeAPIServer struct{} func (UnimplementedKubeAPIServer) GetClusterInfo(context.Context, *GetClusterInfoRequest) (*GetClusterInfoResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetClusterInfo not implemented") + return nil, status.Error(codes.Unimplemented, "method GetClusterInfo not implemented") } func (UnimplementedKubeAPIServer) GetIPInfo(context.Context, *GetIPInfoRequest) (*GetIPInfoResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetIPInfo not implemented") + return nil, status.Error(codes.Unimplemented, "method GetIPInfo not implemented") } func (UnimplementedKubeAPIServer) GetIPsInfo(context.Context, *GetIPsInfoRequest) (*GetIPsInfoResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetIPsInfo not implemented") + return nil, status.Error(codes.Unimplemented, "method GetIPsInfo not implemented") } func (UnimplementedKubeAPIServer) GetPod(context.Context, *GetPodRequest) (*GetPodResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetPod not implemented") + return nil, status.Error(codes.Unimplemented, "method GetPod not implemented") } func (UnimplementedKubeAPIServer) GetNode(context.Context, *GetNodeRequest) (*GetNodeResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetNode not implemented") + return nil, status.Error(codes.Unimplemented, "method GetNode not implemented") } func (UnimplementedKubeAPIServer) GetNodeStatsSummary(context.Context, *GetNodeStatsSummaryRequest) (*GetNodeStatsSummaryResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetNodeStatsSummary not implemented") + return nil, status.Error(codes.Unimplemented, "method GetNodeStatsSummary not implemented") +} +func (UnimplementedKubeAPIServer) GetPodVolumes(context.Context, *GetPodVolumesRequest) (*GetPodVolumesResponse, error) { + return nil, status.Error(codes.Unimplemented, "method GetPodVolumes not implemented") } func (UnimplementedKubeAPIServer) testEmbeddedByValue() {} @@ -156,7 +173,7 @@ type UnsafeKubeAPIServer interface { } func RegisterKubeAPIServer(s grpc.ServiceRegistrar, srv KubeAPIServer) { - // If the following call pancis, it indicates UnimplementedKubeAPIServer was + // If the following call panics, it indicates UnimplementedKubeAPIServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. @@ -274,6 +291,24 @@ func _KubeAPI_GetNodeStatsSummary_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _KubeAPI_GetPodVolumes_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetPodVolumesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(KubeAPIServer).GetPodVolumes(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: KubeAPI_GetPodVolumes_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(KubeAPIServer).GetPodVolumes(ctx, req.(*GetPodVolumesRequest)) + } + return interceptor(ctx, in, info, handler) +} + // KubeAPI_ServiceDesc is the grpc.ServiceDesc for KubeAPI service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -305,6 +340,10 @@ var KubeAPI_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetNodeStatsSummary", Handler: _KubeAPI_GetNodeStatsSummary_Handler, }, + { + MethodName: "GetPodVolumes", + Handler: _KubeAPI_GetPodVolumes_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "api/v1/kube/kube_api.proto", diff --git a/api/v1/runtime/common.pb.go b/api/v1/runtime/common.pb.go index c22fd516..39f38365 100644 --- a/api/v1/runtime/common.pb.go +++ b/api/v1/runtime/common.pb.go @@ -1,17 +1,18 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.9 -// protoc v6.32.0 +// protoc-gen-go v1.36.11 +// protoc v6.33.0 // source: api/v1/runtime/common.proto package v1 import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" unsafe "unsafe" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( diff --git a/api/v1/runtime/runtime_agent_api.pb.go b/api/v1/runtime/runtime_agent_api.pb.go index 774318f0..6fcc4dfa 100644 --- a/api/v1/runtime/runtime_agent_api.pb.go +++ b/api/v1/runtime/runtime_agent_api.pb.go @@ -1,18 +1,19 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.9 -// protoc v6.32.0 +// protoc-gen-go v1.36.11 +// protoc v6.33.0 // source: api/v1/runtime/runtime_agent_api.proto package v1 import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" unsafe "unsafe" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" ) const ( diff --git a/api/v1/runtime/runtime_agent_api_grpc.pb.go b/api/v1/runtime/runtime_agent_api_grpc.pb.go index be06d716..7b3f2ea3 100644 --- a/api/v1/runtime/runtime_agent_api_grpc.pb.go +++ b/api/v1/runtime/runtime_agent_api_grpc.pb.go @@ -1,13 +1,14 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.5.1 -// - protoc v6.32.0 +// - protoc-gen-go-grpc v1.6.0 +// - protoc v6.33.0 // source: api/v1/runtime/runtime_agent_api.proto package v1 import ( context "context" + grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" @@ -160,28 +161,28 @@ type RuntimeSecurityAgentAPIServer interface { type UnimplementedRuntimeSecurityAgentAPIServer struct{} func (UnimplementedRuntimeSecurityAgentAPIServer) GetConfiguration(context.Context, *GetConfigurationRequest) (*GetConfigurationResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetConfiguration not implemented") + return nil, status.Error(codes.Unimplemented, "method GetConfiguration not implemented") } func (UnimplementedRuntimeSecurityAgentAPIServer) LogsWriteStream(grpc.ClientStreamingServer[LogEvent, WriteStreamResponse]) error { - return status.Errorf(codes.Unimplemented, "method LogsWriteStream not implemented") + return status.Error(codes.Unimplemented, "method LogsWriteStream not implemented") } func (UnimplementedRuntimeSecurityAgentAPIServer) WriteDataBatch(context.Context, *WriteDataBatchRequest) (*WriteDataBatchResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method WriteDataBatch not implemented") + return nil, status.Error(codes.Unimplemented, "method WriteDataBatch not implemented") } func (UnimplementedRuntimeSecurityAgentAPIServer) GetSyncState(context.Context, *GetSyncStateRequest) (*GetSyncStateResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetSyncState not implemented") + return nil, status.Error(codes.Unimplemented, "method GetSyncState not implemented") } func (UnimplementedRuntimeSecurityAgentAPIServer) UpdateSyncState(context.Context, *UpdateSyncStateRequest) (*UpdateSyncStateResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method UpdateSyncState not implemented") + return nil, status.Error(codes.Unimplemented, "method UpdateSyncState not implemented") } func (UnimplementedRuntimeSecurityAgentAPIServer) ImageMetadataIngest(context.Context, *ImageMetadata) (*ImageMetadataIngestResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method ImageMetadataIngest not implemented") + return nil, status.Error(codes.Unimplemented, "method ImageMetadataIngest not implemented") } func (UnimplementedRuntimeSecurityAgentAPIServer) KubeBenchReportIngest(context.Context, *KubeBenchReport) (*KubeBenchReportIngestResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method KubeBenchReportIngest not implemented") + return nil, status.Error(codes.Unimplemented, "method KubeBenchReportIngest not implemented") } func (UnimplementedRuntimeSecurityAgentAPIServer) KubeLinterReportIngest(context.Context, *KubeLinterReport) (*KubeLinterReportIngestResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method KubeLinterReportIngest not implemented") + return nil, status.Error(codes.Unimplemented, "method KubeLinterReportIngest not implemented") } func (UnimplementedRuntimeSecurityAgentAPIServer) testEmbeddedByValue() {} @@ -193,7 +194,7 @@ type UnsafeRuntimeSecurityAgentAPIServer interface { } func RegisterRuntimeSecurityAgentAPIServer(s grpc.ServiceRegistrar, srv RuntimeSecurityAgentAPIServer) { - // If the following call pancis, it indicates UnimplementedRuntimeSecurityAgentAPIServer was + // If the following call panics, it indicates UnimplementedRuntimeSecurityAgentAPIServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. diff --git a/charts/kvisor/templates/controller.yaml b/charts/kvisor/templates/controller.yaml index d0c48717..fbeca56c 100644 --- a/charts/kvisor/templates/controller.yaml +++ b/charts/kvisor/templates/controller.yaml @@ -257,6 +257,8 @@ rules: - namespaces - services - endpoints + - persistentvolumeclaims + - persistentvolumes verbs: - get - list diff --git a/cmd/agent/daemon/app/app.go b/cmd/agent/daemon/app/app.go index 6b8e24df..ac76a384 100644 --- a/cmd/agent/daemon/app/app.go +++ b/cmd/agent/daemon/app/app.go @@ -255,6 +255,7 @@ func (a *App) Run(ctx context.Context) error { var blockDeviceMetricsWriter pipeline.BlockDeviceMetricsWriter var filesystemMetricsWriter pipeline.FilesystemMetricsWriter var nodeStatsSummaryWriter pipeline.NodeStatsSummaryWriter + var podVolumeMetricsWriter pipeline.K8sPodVolumeMetricsWriter var storageInfoProvider pipeline.StorageInfoProvider if cfg.Stats.StorageEnabled { metricsClient, err := createMetricsClient(cfg) @@ -263,12 +264,12 @@ func (a *App) Run(ctx context.Context) error { } go func() { - if err = metricsClient.Start(ctx); err != nil { - log.Warnf("metric client failed with:%v", err) + if err := metricsClient.Start(ctx); err != nil { + log.Warnf("metrics client failed: %v", err) } }() - blockDeviceMetricsWriter, filesystemMetricsWriter, nodeStatsSummaryWriter, err = setupStorageMetrics(metricsClient) + blockDeviceMetricsWriter, filesystemMetricsWriter, nodeStatsSummaryWriter, podVolumeMetricsWriter, err = setupStorageMetrics(metricsClient) if err != nil { return fmt.Errorf("failed to setup storage metrics: %w", err) } @@ -301,6 +302,7 @@ func (a *App) Run(ctx context.Context) error { filesystemMetricsWriter, storageInfoProvider, nodeStatsSummaryWriter, + podVolumeMetricsWriter, ) for _, namespace := range cfg.MutedNamespaces { @@ -569,23 +571,28 @@ func waitWithTimeout(errg *errgroup.Group, timeout time.Duration) error { } } -func setupStorageMetrics(metricsClient custommetrics.MetricClient) (pipeline.BlockDeviceMetricsWriter, pipeline.FilesystemMetricsWriter, pipeline.NodeStatsSummaryWriter, error) { +func setupStorageMetrics(metricsClient custommetrics.MetricClient) (pipeline.BlockDeviceMetricsWriter, pipeline.FilesystemMetricsWriter, pipeline.NodeStatsSummaryWriter, pipeline.K8sPodVolumeMetricsWriter, error) { blockDeviceMetrics, err := pipeline.NewBlockDeviceMetricsWriter(metricsClient) if err != nil { - return nil, nil, nil, fmt.Errorf("failed to create block device metrics writer: %w", err) + return nil, nil, nil, nil, fmt.Errorf("failed to create block device metrics writer: %w", err) } filesystemMetrics, err := pipeline.NewFilesystemMetricsWriter(metricsClient) if err != nil { - return nil, nil, nil, fmt.Errorf("failed to create filesystem metrics writer: %w", err) + return nil, nil, nil, nil, fmt.Errorf("failed to create filesystem metrics writer: %w", err) } nodeStatsSummaryWriter, err := pipeline.NewNodeStatsSummaryWriter(metricsClient) if err != nil { - return nil, nil, nil, fmt.Errorf("failed to create node storage stats summary writer: %w", err) + return nil, nil, nil, nil, fmt.Errorf("failed to create node storage stats summary writer: %w", err) } - return blockDeviceMetrics, filesystemMetrics, nodeStatsSummaryWriter, nil + podVolumeMetricsWriter, err := pipeline.NewK8sPodVolumeMetricsWriter(metricsClient) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("failed to create pod volume metrics writer: %w", err) + } + + return blockDeviceMetrics, filesystemMetrics, nodeStatsSummaryWriter, podVolumeMetricsWriter, nil } // resolveMetricsAddr transforms kvisor.* addresses to telemetry.* addresses diff --git a/cmd/agent/daemon/pipeline/controller.go b/cmd/agent/daemon/pipeline/controller.go index 152b762b..25d22b1d 100644 --- a/cmd/agent/daemon/pipeline/controller.go +++ b/cmd/agent/daemon/pipeline/controller.go @@ -96,6 +96,10 @@ type NodeStatsSummaryWriter interface { Write(metrics ...NodeStatsSummaryMetric) error } +type K8sPodVolumeMetricsWriter interface { + Write(metrics ...K8sPodVolumeMetric) error +} + func NewBlockDeviceMetricsWriter(metricsClient custommetrics.MetricClient) (BlockDeviceMetricsWriter, error) { return custommetrics.NewMetric[BlockDeviceMetric]( metricsClient, @@ -120,6 +124,14 @@ func NewNodeStatsSummaryWriter(metricsClient custommetrics.MetricClient) (NodeSt ) } +func NewK8sPodVolumeMetricsWriter(metricsClient custommetrics.MetricClient) (K8sPodVolumeMetricsWriter, error) { + return custommetrics.NewMetric[K8sPodVolumeMetric]( + metricsClient, + custommetrics.WithCollectionName[K8sPodVolumeMetric]("k8s_pod_volume_metrics"), + custommetrics.WithSkipTimestamp[K8sPodVolumeMetric](), + ) +} + func NewController( log *logging.Logger, cfg Config, @@ -136,6 +148,7 @@ func NewController( filesystemMetricsWriter FilesystemMetricsWriter, storageInfoProvider StorageInfoProvider, nodeStatsSummaryWriter NodeStatsSummaryWriter, + podVolumeMetricsWriter K8sPodVolumeMetricsWriter, ) *Controller { podCache, err := freelru.NewSynced[string, *kubepb.Pod](256, func(k string) uint32 { return uint32(xxhash.Sum64String(k)) // nolint:gosec @@ -174,6 +187,7 @@ func NewController( filesystemMetricsWriter: filesystemMetricsWriter, storageInfoProvider: storageInfoProvider, nodeStatsSummaryWriter: nodeStatsSummaryWriter, + podVolumeMetricsWriter: podVolumeMetricsWriter, } } @@ -208,6 +222,7 @@ type Controller struct { filesystemMetricsWriter FilesystemMetricsWriter storageInfoProvider StorageInfoProvider nodeStatsSummaryWriter NodeStatsSummaryWriter + podVolumeMetricsWriter K8sPodVolumeMetricsWriter } func (c *Controller) Run(ctx context.Context) error { diff --git a/cmd/agent/daemon/pipeline/controller_test.go b/cmd/agent/daemon/pipeline/controller_test.go index 5c5f11df..0b95eb44 100644 --- a/cmd/agent/daemon/pipeline/controller_test.go +++ b/cmd/agent/daemon/pipeline/controller_test.go @@ -808,7 +808,7 @@ func TestController(t *testing.T) { blockWriter := ctrl.blockDeviceMetricsWriter.(*mockBlockDeviceMetricsWriter) fsWriter := ctrl.filesystemMetricsWriter.(*mockFilesystemMetricsWriter) - ctrl.collectStorageMetrics() + ctrl.collectStorageMetrics(t.Context()) r.Len(blockWriter.metrics, 1) r.Len(fsWriter.metrics, 1) @@ -929,6 +929,7 @@ func newTestController(opts ...any) *Controller { filesystemMetrics, &mockStorageInfoProvider{}, nodeStatsSummaryWriter, + nil, // podVolumeMetricsWriter ) return ctrl } @@ -1237,6 +1238,12 @@ func (m *mockKubeClient) GetNodeStatsSummary(ctx context.Context, req *kubepb.Ge }, nil } +func (m *mockKubeClient) GetPodVolumes(ctx context.Context, req *kubepb.GetPodVolumesRequest, opts ...grpc.CallOption) (*kubepb.GetPodVolumesResponse, error) { + return &kubepb.GetPodVolumesResponse{ + Volumes: []*kubepb.PodVolumeInfo{}, + }, nil +} + type mockProcessTreeController struct { } @@ -1316,7 +1323,7 @@ func (m *mockNodeStatsSummaryWriter) Write(metrics ...NodeStatsSummaryMetric) er type mockStorageInfoProvider struct{} -func (m *mockStorageInfoProvider) BuildFilesystemMetrics(timestamp time.Time) ([]FilesystemMetric, error) { +func (m *mockStorageInfoProvider) BuildFilesystemMetrics(ctx context.Context, timestamp time.Time) ([]FilesystemMetric, error) { return []FilesystemMetric{ { NodeName: "test-node", @@ -1360,3 +1367,7 @@ func (m *mockStorageInfoProvider) CollectNodeStatsSummary(ctx context.Context) ( Timestamp: time.Now(), }, nil } + +func (m *mockStorageInfoProvider) CollectPodVolumeMetrics(ctx context.Context) ([]K8sPodVolumeMetric, error) { + return []K8sPodVolumeMetric{}, nil +} diff --git a/cmd/agent/daemon/pipeline/storage_info_provider.go b/cmd/agent/daemon/pipeline/storage_info_provider.go index 8c3d1b93..e76068eb 100644 --- a/cmd/agent/daemon/pipeline/storage_info_provider.go +++ b/cmd/agent/daemon/pipeline/storage_info_provider.go @@ -76,6 +76,9 @@ type FilesystemMetric struct { TotalInodes *int64 `avro:"total_inodes"` UsedInodes *int64 `avro:"used_inodes"` Timestamp time.Time `avro:"ts"` + + // PV name for joining with K8sPodVolumeMetric (nil for node-level filesystems) + PVName *string `avro:"pv_name"` } // NodeStatsSummaryMetric represents node-level filesystem statistics from kubelet @@ -89,15 +92,39 @@ type NodeStatsSummaryMetric struct { Timestamp time.Time `avro:"ts"` } +// K8sPodVolumeMetric represents pod volume information from Kubernetes +type K8sPodVolumeMetric struct { + NodeName string `avro:"node_name"` + NodeTemplate *string `avro:"node_template"` + Namespace string `avro:"namespace"` + PodName string `avro:"pod_name"` + PodUID string `avro:"pod_uid"` + ControllerKind string `avro:"controller_kind"` + ControllerName string `avro:"controller_name"` + ContainerName string `avro:"container_name"` + VolumeName string `avro:"volume_name"` + MountPath string `avro:"mount_path"` + PVCName *string `avro:"pvc_name"` + RequestedSizeBytes *int64 `avro:"requested_size_bytes"` + PVName *string `avro:"pv_name"` + StorageClass *string `avro:"storage_class"` + CSIDriver *string `avro:"csi_driver"` + CSIVolumeHandle *string `avro:"csi_volume_handle"` // For EBS: vol-xxx, can be joined with block_device.ebs_volume_id + VolumeMode string `avro:"volume_mode"` // "Filesystem" or "Block" + DevicePath *string `avro:"device_path"` // For block volumes: container's volumeDevices[].devicePath + Timestamp time.Time `avro:"ts"` +} + type storageMetricsState struct { blockDevices map[string]*BlockDeviceMetric filesystems map[string]*FilesystemMetric } type StorageInfoProvider interface { - BuildFilesystemMetrics(timestamp time.Time) ([]FilesystemMetric, error) + BuildFilesystemMetrics(ctx context.Context, timestamp time.Time) ([]FilesystemMetric, error) BuildBlockDeviceMetrics(timestamp time.Time) ([]BlockDeviceMetric, error) CollectNodeStatsSummary(ctx context.Context) (*NodeStatsSummaryMetric, error) + CollectPodVolumeMetrics(ctx context.Context) ([]K8sPodVolumeMetric, error) } type SysfsStorageInfoProvider struct { @@ -286,27 +313,158 @@ func (s *SysfsStorageInfoProvider) CollectNodeStatsSummary(ctx context.Context) return metric, nil } -func (s *SysfsStorageInfoProvider) BuildFilesystemMetrics(timestamp time.Time) ([]FilesystemMetric, error) { +// CollectPodVolumeMetrics retrieves pod volume information from the controller +func (s *SysfsStorageInfoProvider) CollectPodVolumeMetrics(ctx context.Context) ([]K8sPodVolumeMetric, error) { + if s.kubeClient == nil { + return nil, fmt.Errorf("kube client is not initialized") + } + + s.log.Infof("CollectPodVolumeMetrics: requesting pod volumes for node %s", s.nodeName) + resp, err := s.kubeClient.GetPodVolumes(ctx, &kubepb.GetPodVolumesRequest{ + NodeName: s.nodeName, + }, grpc.UseCompressor(gzip.Name)) + if err != nil { + return nil, fmt.Errorf("failed to get pod volumes for %s: %w", s.nodeName, err) + } + s.log.Infof("CollectPodVolumeMetrics: received %d volumes from controller", len(resp.Volumes)) + + nodeTemplate, err := s.getNodeTemplate() + if err != nil { + s.log.Warnf("failed to get node template: %v", err) + nodeTemplate = nil + } + + timestamp := time.Now() + metrics := make([]K8sPodVolumeMetric, 0, len(resp.Volumes)) + + for _, v := range resp.Volumes { + metric := K8sPodVolumeMetric{ + NodeName: s.nodeName, + NodeTemplate: nodeTemplate, + Namespace: v.Namespace, + PodName: v.PodName, + PodUID: v.PodUid, + ControllerKind: v.ControllerKind, + ControllerName: v.ControllerName, + ContainerName: v.ContainerName, + VolumeName: v.VolumeName, + MountPath: v.MountPath, + VolumeMode: v.VolumeMode, + Timestamp: timestamp, + } + + if v.PvcName != "" { + metric.PVCName = &v.PvcName + } + if v.RequestedSizeBytes > 0 { + metric.RequestedSizeBytes = &v.RequestedSizeBytes + } + if v.PvName != "" { + metric.PVName = &v.PvName + } + if v.StorageClass != "" { + metric.StorageClass = &v.StorageClass + } + if v.CsiDriver != "" { + metric.CSIDriver = &v.CsiDriver + } + if v.CsiVolumeHandle != "" { + metric.CSIVolumeHandle = &v.CsiVolumeHandle + } + if v.DevicePath != "" { + metric.DevicePath = &v.DevicePath + } + + metrics = append(metrics, metric) + } + + return metrics, nil +} + +func (s *SysfsStorageInfoProvider) BuildFilesystemMetrics(ctx context.Context, timestamp time.Time) ([]FilesystemMetric, error) { // Read mount information from /proc/1/mountinfo mounts, err := readMountInfo("/proc/1/mountinfo") if err != nil { return nil, fmt.Errorf("failed to read mountinfo: %w", err) } - filesystemMetrics := make([]FilesystemMetric, 0, len(mounts)) + // Build pod volume lookup map for enrichment + podVolumeMap := s.buildPodVolumeLookupMap(ctx) + + // Deduplicate by major:minor device ID + // When multiple mounts point to the same device (bind mounts), prefer paths + // matching /var/lib/kubelet/pods because they can be enriched with pod metadata + seenDevices := make(map[string]FilesystemMetric) for _, mount := range mounts { - metric, err := s.buildFilesystemMetric(mount, timestamp) + metric, err := s.buildFilesystemMetric(mount, timestamp, podVolumeMap) if err != nil { s.log.Warnf("skipping filesystem metric for %s: %v", mount.MountPoint, err) continue } + + deviceKey := mount.MajorMinor + if existing, seen := seenDevices[deviceKey]; seen { + // Prefer the mount that has PV metadata (was enriched with K8s info) + if metric.PVName != nil && existing.PVName == nil { + seenDevices[deviceKey] = metric + } + // Otherwise keep the first one we saw + } else { + seenDevices[deviceKey] = metric + } + } + + filesystemMetrics := make([]FilesystemMetric, 0, len(seenDevices)) + for _, metric := range seenDevices { filesystemMetrics = append(filesystemMetrics, metric) } return filesystemMetrics, nil } -func (s *SysfsStorageInfoProvider) buildFilesystemMetric(mount mountInfo, timestamp time.Time) (FilesystemMetric, error) { +// podVolumeKey generates a lookup key from pod UID and volume name +func podVolumeKey(podUID, volumeName string) string { + return podUID + "/" + volumeName +} + +// buildPodVolumeLookupMap fetches pod volumes from controller and builds a lookup map +// The map is keyed by both: +// - podUID/volumeName (for emptyDir, configMap, etc.) +// - podUID/pvName (for CSI volumes where the mount path contains the PV name) +func (s *SysfsStorageInfoProvider) buildPodVolumeLookupMap(ctx context.Context) map[string]*kubepb.PodVolumeInfo { + if s.kubeClient == nil { + return nil + } + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + resp, err := s.kubeClient.GetPodVolumes(ctx, &kubepb.GetPodVolumesRequest{ + NodeName: s.nodeName, + }, grpc.UseCompressor(gzip.Name)) + if err != nil { + s.log.Warnf("failed to get pod volumes for enrichment: %v", err) + return nil + } + + volumeMap := make(map[string]*kubepb.PodVolumeInfo, len(resp.Volumes)*2) + for _, v := range resp.Volumes { + // Primary key: podUID/volumeName + key := podVolumeKey(v.PodUid, v.VolumeName) + volumeMap[key] = v + + // Secondary key: podUID/pvName (for CSI volumes) + // CSI mount paths use the PV name as the directory name, not the volume name + if v.PvName != "" { + pvKey := podVolumeKey(v.PodUid, v.PvName) + volumeMap[pvKey] = v + } + } + + return volumeMap +} + +func (s *SysfsStorageInfoProvider) buildFilesystemMetric(mount mountInfo, timestamp time.Time, podVolumeMap map[string]*kubepb.PodVolumeInfo) (FilesystemMetric, error) { // Construct the path from host's root to access the filesystem fileSystemPath := filepath.Join(s.hostRootPath, mount.MountPoint) @@ -324,7 +482,7 @@ func (s *SysfsStorageInfoProvider) buildFilesystemMetric(mount mountInfo, timest // Check whether the filesystem is holding kubelet and/or castai-storage directories labels := buildFilesystemLabels(devID, s.wellKnownPathDeviceID) - return FilesystemMetric{ + metric := FilesystemMetric{ Devices: s.getBackingDevices(mount.Device), NodeName: s.nodeName, NodeTemplate: nodeTemplate, @@ -337,7 +495,17 @@ func (s *SysfsStorageInfoProvider) buildFilesystemMetric(mount mountInfo, timest UsedInodes: &usedInodes, Labels: labels, Timestamp: timestamp, - }, nil + } + + // Check if this is a pod volume mount and enrich with PV name for joining + if volInfo := ParseVolumeMountPath(mount.MountPoint); volInfo != nil && podVolumeMap != nil { + key := podVolumeKey(volInfo.PodUID, volInfo.VolumeName) + if pv, ok := podVolumeMap[key]; ok && pv.PvName != "" { + metric.PVName = &pv.PvName + } + } + + return metric, nil } // getBackingDevices resolves a device to its backing device. @@ -799,7 +967,7 @@ func getFilesystemStats(mountPoint string) (sizeBytes, usedBytes int64, totalIno var mountPointStat unix.Stat_t err = unix.Stat(mountPoint, &mountPointStat) if err == nil { - devID = mountPointStat.Dev + devID = uint64(mountPointStat.Dev) } // statfs.Bsize is uint32 on Darwin, int64 on Linux - convert safely to uint64 @@ -837,7 +1005,7 @@ func getDeviceIDForPath(path string) (uint64, error) { return 0, err } - return stat.Dev, nil + return uint64(stat.Dev), nil } func buildFilesystemLabels(fsMountPointDeviceID uint64, wellKnownPathsDeviceID map[string]uint64) map[string]string { diff --git a/cmd/agent/daemon/pipeline/storage_pipeline.go b/cmd/agent/daemon/pipeline/storage_pipeline.go index 1ea9b972..4b5d616d 100644 --- a/cmd/agent/daemon/pipeline/storage_pipeline.go +++ b/cmd/agent/daemon/pipeline/storage_pipeline.go @@ -19,14 +19,14 @@ func (c *Controller) runStoragePipeline(ctx context.Context) error { return ctx.Err() case <-ticker.C: start := time.Now() - c.collectStorageMetrics() + c.collectStorageMetrics(ctx) c.collectNodeStatsSummary(ctx) c.log.Debugf("storage stats exported, duration=%v", time.Since(start)) } } } -func (c *Controller) collectStorageMetrics() { +func (c *Controller) collectStorageMetrics(ctx context.Context) { start := time.Now() c.log.Debug("starting storage stats collection") @@ -35,10 +35,14 @@ func (c *Controller) collectStorageMetrics() { c.log.Errorf("failed to collect block device metrics: %v", err) } - if err := c.processFilesystemMetrics(timestamp); err != nil { + if err := c.processFilesystemMetrics(ctx, timestamp); err != nil { c.log.Errorf("failed to collect filesystem metrics: %v", err) } + if err := c.processPodVolumeMetrics(ctx); err != nil { + c.log.Errorf("failed to collect pod volume metrics: %v", err) + } + c.log.Debugf("storage stats collection completed in %v", time.Since(start)) } @@ -61,12 +65,12 @@ func (c *Controller) processBlockDeviceMetrics(timestamp time.Time) error { return nil } -func (c *Controller) processFilesystemMetrics(timestamp time.Time) error { +func (c *Controller) processFilesystemMetrics(ctx context.Context, timestamp time.Time) error { if c.filesystemMetricsWriter == nil { return fmt.Errorf("filesystem metrics writer not initialized") } - fsMetrics, err := c.storageInfoProvider.BuildFilesystemMetrics(timestamp) + fsMetrics, err := c.storageInfoProvider.BuildFilesystemMetrics(ctx, timestamp) if err != nil { return fmt.Errorf("failed to collect filesystem metrics: %w", err) } @@ -80,6 +84,33 @@ func (c *Controller) processFilesystemMetrics(timestamp time.Time) error { return nil } +func (c *Controller) processPodVolumeMetrics(ctx context.Context) error { + if c.podVolumeMetricsWriter == nil { + return nil // Pod volume metrics writer not configured, skip + } + + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + metrics, err := c.storageInfoProvider.CollectPodVolumeMetrics(ctx) + if err != nil { + return fmt.Errorf("failed to collect pod volume metrics: %w", err) + } + + if len(metrics) == 0 { + c.log.Info("no pod volume metrics collected from controller (empty response)") + return nil + } + + c.log.Infof("collected %d pod volume metrics", len(metrics)) + + if err := c.podVolumeMetricsWriter.Write(metrics...); err != nil { + return fmt.Errorf("failed to write pod volume metrics: %w", err) + } + + return nil +} + func (c *Controller) collectNodeStatsSummary(ctx context.Context) { if c.nodeStatsSummaryWriter == nil || c.storageInfoProvider == nil { return diff --git a/cmd/agent/daemon/pipeline/volume_mapper.go b/cmd/agent/daemon/pipeline/volume_mapper.go new file mode 100644 index 00000000..0285a106 --- /dev/null +++ b/cmd/agent/daemon/pipeline/volume_mapper.go @@ -0,0 +1,30 @@ +package pipeline + +import "regexp" + +// podVolumeMountRegex matches kubelet pod volume mount paths. +// Format: /var/lib/kubelet/pods//volumes// +var podVolumeMountRegex = regexp.MustCompile( + `/var/lib/kubelet/pods/([a-f0-9-]+)/volumes/([^/]+)/([^/]+)`, +) + +// VolumePathInfo contains extracted information from a volume mount path. +type VolumePathInfo struct { + PodUID string + VolumePlugin string + VolumeName string +} + +// ParseVolumeMountPath extracts pod and volume info from a kubelet mount path. +// Returns nil if the path is not a pod volume mount. +func ParseVolumeMountPath(mountPath string) *VolumePathInfo { + matches := podVolumeMountRegex.FindStringSubmatch(mountPath) + if len(matches) != 4 { + return nil + } + return &VolumePathInfo{ + PodUID: matches[1], + VolumePlugin: matches[2], + VolumeName: matches[3], + } +} diff --git a/cmd/agent/daemon/pipeline/volume_mapper_test.go b/cmd/agent/daemon/pipeline/volume_mapper_test.go new file mode 100644 index 00000000..e06ad868 --- /dev/null +++ b/cmd/agent/daemon/pipeline/volume_mapper_test.go @@ -0,0 +1,84 @@ +package pipeline + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParseVolumeMountPath(t *testing.T) { + tests := []struct { + name string + path string + expected *VolumePathInfo + }{ + { + name: "CSI volume mount", + path: "/var/lib/kubelet/pods/3e61c214-bc3e-d9ff-81e2-1474dd6cba17/volumes/kubernetes.io~csi/pvc-abc123", + expected: &VolumePathInfo{ + PodUID: "3e61c214-bc3e-d9ff-81e2-1474dd6cba17", + VolumePlugin: "kubernetes.io~csi", + VolumeName: "pvc-abc123", + }, + }, + { + name: "EmptyDir volume mount", + path: "/var/lib/kubelet/pods/abc-def-123/volumes/kubernetes.io~empty-dir/cache", + expected: &VolumePathInfo{ + PodUID: "abc-def-123", + VolumePlugin: "kubernetes.io~empty-dir", + VolumeName: "cache", + }, + }, + { + name: "HostPath volume mount", + path: "/var/lib/kubelet/pods/12345678-1234-1234-1234-123456789012/volumes/kubernetes.io~host-path/host-data", + expected: &VolumePathInfo{ + PodUID: "12345678-1234-1234-1234-123456789012", + VolumePlugin: "kubernetes.io~host-path", + VolumeName: "host-data", + }, + }, + { + name: "ConfigMap volume mount", + path: "/var/lib/kubelet/pods/aabbccdd-1122-3344-5566-778899aabbcc/volumes/kubernetes.io~configmap/config", + expected: &VolumePathInfo{ + PodUID: "aabbccdd-1122-3344-5566-778899aabbcc", + VolumePlugin: "kubernetes.io~configmap", + VolumeName: "config", + }, + }, + { + name: "Root filesystem - not a pod volume", + path: "/", + expected: nil, + }, + { + name: "Kubelet directory - not a volume mount", + path: "/var/lib/kubelet", + expected: nil, + }, + { + name: "Containerd directory - not a pod volume", + path: "/var/lib/containerd", + expected: nil, + }, + { + name: "Regular mount point - not a pod volume", + path: "/mnt/data", + expected: nil, + }, + { + name: "Pod directory without volumes - not a volume mount", + path: "/var/lib/kubelet/pods/abc-123", + expected: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ParseVolumeMountPath(tt.path) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/cmd/controller/kube/client.go b/cmd/controller/kube/client.go index 272fa074..dadd19e8 100644 --- a/cmd/controller/kube/client.go +++ b/cmd/controller/kube/client.go @@ -111,6 +111,8 @@ func (c *Client) RegisterHandlers(factory informers.SharedInformerFactory) { factory.Core().V1().Services().Informer(), factory.Core().V1().Endpoints().Informer(), factory.Core().V1().Namespaces().Informer(), + factory.Core().V1().PersistentVolumeClaims().Informer(), + factory.Core().V1().PersistentVolumes().Informer(), factory.Apps().V1().Deployments().Informer(), factory.Apps().V1().StatefulSets().Informer(), factory.Apps().V1().DaemonSets().Informer(), @@ -194,6 +196,10 @@ func (c *Client) eventHandler() cache.ResourceEventHandler { c.index.addFromEndpoints(t) case *corev1.Node: c.index.addFromNode(t) + case *corev1.PersistentVolumeClaim: + c.index.addFromPVC(t) + case *corev1.PersistentVolume: + c.index.addFromPV(t) case *batchv1.Job: c.index.jobs[t.UID] = t.ObjectMeta case *appsv1.ReplicaSet: @@ -219,6 +225,10 @@ func (c *Client) eventHandler() cache.ResourceEventHandler { c.index.addFromEndpoints(t) case *corev1.Node: c.index.addFromNode(t) + case *corev1.PersistentVolumeClaim: + c.index.addFromPVC(t) + case *corev1.PersistentVolume: + c.index.addFromPV(t) case *batchv1.Job: c.index.jobs[t.UID] = t.ObjectMeta case *appsv1.ReplicaSet: @@ -244,6 +254,10 @@ func (c *Client) eventHandler() cache.ResourceEventHandler { c.index.deleteFromEndpoints(t) case *corev1.Node: c.index.deleteByNode(t) + case *corev1.PersistentVolumeClaim: + c.index.deleteFromPVC(t) + case *corev1.PersistentVolume: + c.index.deleteFromPV(t) case *batchv1.Job: delete(c.index.jobs, t.UID) case *appsv1.ReplicaSet: @@ -311,6 +325,27 @@ func (c *Client) GetNodeInfo(name string) (*corev1.Node, bool) { return node, true } +func (c *Client) GetPVCByName(namespace, name string) (*corev1.PersistentVolumeClaim, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + return c.index.GetPVCByName(namespace, name) +} + +func (c *Client) GetPVByName(name string) (*corev1.PersistentVolume, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + return c.index.GetPVByName(name) +} + +func (c *Client) GetPodsOnNode(nodeName string) []*PodInfo { + c.mu.RLock() + defer c.mu.RUnlock() + + return c.index.GetPodsOnNode(nodeName) +} + func (c *Client) GetOwnerUID(obj Object) string { c.mu.RLock() defer c.mu.RUnlock() diff --git a/cmd/controller/kube/index.go b/cmd/controller/kube/index.go index 710a2fc6..5eb73882 100644 --- a/cmd/controller/kube/index.go +++ b/cmd/controller/kube/index.go @@ -15,24 +15,28 @@ import ( func NewIndex() *Index { nodesCIDRIndex, _ := cidrindex.NewIndex[string](1000, 30*time.Second) return &Index{ - ipsDetails: make(ipsDetails), - replicaSets: make(map[types.UID]metav1.ObjectMeta), - jobs: make(map[types.UID]metav1.ObjectMeta), - deployments: make(map[types.UID]*appsv1.Deployment), - pods: make(map[types.UID]*PodInfo), - nodesByName: make(map[string]*corev1.Node), + ipsDetails: make(ipsDetails), + replicaSets: make(map[types.UID]metav1.ObjectMeta), + jobs: make(map[types.UID]metav1.ObjectMeta), + deployments: make(map[types.UID]*appsv1.Deployment), + pods: make(map[types.UID]*PodInfo), + nodesByName: make(map[string]*corev1.Node), nodesCIDRIndex: nodesCIDRIndex, + pvcs: make(map[string]*corev1.PersistentVolumeClaim), + pvs: make(map[string]*corev1.PersistentVolume), } } type Index struct { - ipsDetails ipsDetails - replicaSets map[types.UID]metav1.ObjectMeta - jobs map[types.UID]metav1.ObjectMeta - deployments map[types.UID]*appsv1.Deployment - pods map[types.UID]*PodInfo - nodesByName map[string]*corev1.Node + ipsDetails ipsDetails + replicaSets map[types.UID]metav1.ObjectMeta + jobs map[types.UID]metav1.ObjectMeta + deployments map[types.UID]*appsv1.Deployment + pods map[types.UID]*PodInfo + nodesByName map[string]*corev1.Node nodesCIDRIndex *cidrindex.Index[string] + pvcs map[string]*corev1.PersistentVolumeClaim // key: namespace/name + pvs map[string]*corev1.PersistentVolume // key: PV name } func (i *Index) addFromPod(pod *corev1.Pod) { @@ -336,3 +340,43 @@ type PodInfo struct { Zone string Region string } + +func pvcKey(namespace, name string) string { + return namespace + "/" + name +} + +func (i *Index) addFromPVC(pvc *corev1.PersistentVolumeClaim) { + i.pvcs[pvcKey(pvc.Namespace, pvc.Name)] = pvc +} + +func (i *Index) deleteFromPVC(pvc *corev1.PersistentVolumeClaim) { + delete(i.pvcs, pvcKey(pvc.Namespace, pvc.Name)) +} + +func (i *Index) GetPVCByName(namespace, name string) (*corev1.PersistentVolumeClaim, bool) { + pvc, found := i.pvcs[pvcKey(namespace, name)] + return pvc, found +} + +func (i *Index) addFromPV(pv *corev1.PersistentVolume) { + i.pvs[pv.Name] = pv +} + +func (i *Index) deleteFromPV(pv *corev1.PersistentVolume) { + delete(i.pvs, pv.Name) +} + +func (i *Index) GetPVByName(name string) (*corev1.PersistentVolume, bool) { + pv, found := i.pvs[name] + return pv, found +} + +func (i *Index) GetPodsOnNode(nodeName string) []*PodInfo { + var pods []*PodInfo + for _, podInfo := range i.pods { + if podInfo.Pod.Spec.NodeName == nodeName { + pods = append(pods, podInfo) + } + } + return pods +} diff --git a/cmd/controller/kube/server.go b/cmd/controller/kube/server.go index 22f5bb0a..eba50a6c 100644 --- a/cmd/controller/kube/server.go +++ b/cmd/controller/kube/server.go @@ -3,9 +3,11 @@ package kube import ( "context" "net/netip" + "strings" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + corev1 "k8s.io/api/core/v1" _ "google.golang.org/grpc/encoding/gzip" @@ -176,6 +178,172 @@ func (s *Server) GetNodeStatsSummary(ctx context.Context, req *kubepb.GetNodeSta return resp, nil } +func (s *Server) GetPodVolumes(ctx context.Context, req *kubepb.GetPodVolumesRequest) (*kubepb.GetPodVolumesResponse, error) { + if req.NodeName == "" { + return nil, status.Errorf(codes.InvalidArgument, "node_name is required") + } + + pods := s.client.GetPodsOnNode(req.NodeName) + s.client.log.Infof("GetPodVolumes: found %d pods on node %s", len(pods), req.NodeName) + var volumes []*kubepb.PodVolumeInfo + + for _, podInfo := range pods { + pod := podInfo.Pod + if pod == nil { + continue + } + + // Skip kube-system namespace - system pods don't have user-relevant PVCs + if pod.Namespace == "kube-system" { + continue + } + + // Build a map of volume name -> volume for quick lookup + volumeMap := make(map[string]corev1.Volume) + for _, vol := range pod.Spec.Volumes { + volumeMap[vol.Name] = vol + } + + // Iterate through containers and their volume mounts (filesystem volumes) + // Only include PVC-backed volumes - skip ephemeral volumes like configMaps, secrets, serviceAccount tokens + for _, container := range pod.Spec.Containers { + for _, mount := range container.VolumeMounts { + vol, exists := volumeMap[mount.Name] + if !exists { + continue + } + + // Only include PVC-backed volumes + if vol.PersistentVolumeClaim == nil { + continue + } + + volInfo := &kubepb.PodVolumeInfo{ + Namespace: pod.Namespace, + PodName: pod.Name, + PodUid: string(pod.UID), + ControllerKind: podInfo.Owner.Kind, + ControllerName: podInfo.Owner.Name, + ContainerName: container.Name, + VolumeName: vol.Name, + MountPath: mount.MountPath, + VolumeMode: "Filesystem", + } + + s.enrichPVCDetails(volInfo, vol, pod.Namespace) + volumes = append(volumes, volInfo) + } + + // Handle block volumes (VolumeDevices) + for _, device := range container.VolumeDevices { + vol, exists := volumeMap[device.Name] + if !exists { + continue + } + + // Only include PVC-backed volumes + if vol.PersistentVolumeClaim == nil { + continue + } + + volInfo := &kubepb.PodVolumeInfo{ + Namespace: pod.Namespace, + PodName: pod.Name, + PodUid: string(pod.UID), + ControllerKind: podInfo.Owner.Kind, + ControllerName: podInfo.Owner.Name, + ContainerName: container.Name, + VolumeName: vol.Name, + DevicePath: device.DevicePath, + VolumeMode: "Block", + } + + s.enrichPVCDetails(volInfo, vol, pod.Namespace) + volumes = append(volumes, volInfo) + } + } + } + + s.client.log.Infof("GetPodVolumes: returning %d volumes for node %s", len(volumes), req.NodeName) + return &kubepb.GetPodVolumesResponse{ + Volumes: volumes, + }, nil +} + +func (s *Server) enrichPVCDetails(volInfo *kubepb.PodVolumeInfo, vol corev1.Volume, namespace string) { + pvcName := vol.PersistentVolumeClaim.ClaimName + volInfo.PvcName = pvcName + + pvc, found := s.client.GetPVCByName(namespace, pvcName) + if !found { + return + } + + volInfo.PvcUid = string(pvc.UID) + + // Get requested storage size + if req, ok := pvc.Spec.Resources.Requests[corev1.ResourceStorage]; ok { + volInfo.RequestedSizeBytes = req.Value() + } + + // Get storage class + if pvc.Spec.StorageClassName != nil { + volInfo.StorageClass = *pvc.Spec.StorageClassName + } + + // Get PV details if bound + if pvc.Spec.VolumeName != "" { + volInfo.PvName = pvc.Spec.VolumeName + + if pv, found := s.client.GetPVByName(pvc.Spec.VolumeName); found { + // Get volume source details - supports both CSI and in-tree provisioners + if pv.Spec.CSI != nil { + // CSI volume (modern) + volInfo.CsiDriver = pv.Spec.CSI.Driver + volInfo.CsiVolumeHandle = extractVolumeID(pv.Spec.CSI.VolumeHandle) + } else if pv.Spec.AWSElasticBlockStore != nil { + // In-tree AWS EBS provisioner (gp2 storage class) + volInfo.CsiDriver = "kubernetes.io/aws-ebs" + volInfo.CsiVolumeHandle = pv.Spec.AWSElasticBlockStore.VolumeID + } else if pv.Spec.GCEPersistentDisk != nil { + // In-tree GCE PD provisioner + volInfo.CsiDriver = "kubernetes.io/gce-pd" + volInfo.CsiVolumeHandle = pv.Spec.GCEPersistentDisk.PDName + } else if pv.Spec.AzureDisk != nil { + // In-tree Azure Disk provisioner + volInfo.CsiDriver = "kubernetes.io/azure-disk" + volInfo.CsiVolumeHandle = pv.Spec.AzureDisk.DiskName + } + } else { + s.client.log.Warnf("PV %s not found in index for PVC %s/%s", pvc.Spec.VolumeName, namespace, pvcName) + } + } +} + +// extractVolumeID extracts the volume/disk name from a CSI volume handle. +// Different CSI drivers use different path formats: +// - GCP: projects//zones//disks/ +// - AWS: vol-xxx → vol-xxx (no change) +// - Azure: /subscriptions/.../providers/Microsoft.Compute/disks/ +func extractVolumeID(csiVolumeHandle string) string { + // GCP format: projects//zones//disks/ + if strings.Contains(csiVolumeHandle, "/disks/") { + parts := strings.Split(csiVolumeHandle, "/disks/") + if len(parts) == 2 { + return parts[1] + } + } + // Azure format: /subscriptions/.../providers/Microsoft.Compute/disks/ + if strings.Contains(csiVolumeHandle, "Microsoft.Compute/disks/") { + parts := strings.Split(csiVolumeHandle, "Microsoft.Compute/disks/") + if len(parts) == 2 { + return parts[1] + } + } + // AWS and others: return as-is + return csiVolumeHandle +} + func toProtoWorkloadKind(kind string) kubepb.WorkloadKind { switch kind { case "Deployment": diff --git a/cmd/controller/kube/server_test.go b/cmd/controller/kube/server_test.go index d00df137..3afc6b86 100644 --- a/cmd/controller/kube/server_test.go +++ b/cmd/controller/kube/server_test.go @@ -238,3 +238,44 @@ func TestServer(t *testing.T) { r.Equal([]string{"10.8.0.0/14", "fd00::/48"}, resp.ServiceCidr) }) } + +func TestExtractVolumeID(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + { + name: "GCP CSI volume handle", + input: "projects/engineering-test-353509/zones/us-central1-a/disks/pvc-2f7e7ae2-bbe2-410b-a109-571ce3298b98", + expected: "pvc-2f7e7ae2-bbe2-410b-a109-571ce3298b98", + }, + { + name: "AWS EBS CSI volume handle", + input: "vol-08d551180685f8611", + expected: "vol-08d551180685f8611", + }, + { + name: "Azure CSI volume handle", + input: "/subscriptions/abc123/resourceGroups/rg/providers/Microsoft.Compute/disks/pvc-xxx-yyy-zzz", + expected: "pvc-xxx-yyy-zzz", + }, + { + name: "Simple volume ID", + input: "simple-volume-id", + expected: "simple-volume-id", + }, + { + name: "Empty string", + input: "", + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := extractVolumeID(tt.input) + require.Equal(t, tt.expected, result) + }) + } +} diff --git a/e2e/e2e.go b/e2e/e2e.go index fac00df5..e792d937 100644 --- a/e2e/e2e.go +++ b/e2e/e2e.go @@ -427,6 +427,29 @@ func (t *testCASTAIServer) decodeNodeStatsSummaryMetrics(schema, data []byte) ([ return results, nil } +func (t *testCASTAIServer) decodeK8sPodVolumeMetrics(schema, data []byte) ([]pipeline.K8sPodVolumeMetric, error) { + avroSchema, err := avro.Parse(string(schema)) + if err != nil { + return nil, fmt.Errorf("failed to parse schema: %w", err) + } + + decoder := avro.NewDecoderForSchema(avroSchema, bytes.NewReader(data)) + var results []pipeline.K8sPodVolumeMetric + + for { + var metric pipeline.K8sPodVolumeMetric + if err := decoder.Decode(&metric); err != nil { + if errors.Is(err, io.EOF) { + break // End of data + } + return nil, fmt.Errorf("failed to decode metric: %w", err) + } + results = append(results, metric) + } + + return results, nil +} + func (t *testCASTAIServer) KubeBenchReportIngest(ctx context.Context, report *castaipb.KubeBenchReport) (*castaipb.KubeBenchReportIngestResponse, error) { t.kubeBenchReports = append(t.kubeBenchReports, report) return &castaipb.KubeBenchReportIngestResponse{}, nil @@ -539,7 +562,7 @@ func (t *testCASTAIServer) assertLogs(ctx context.Context) error { } func (t *testCASTAIServer) assertStorageMetrics(ctx context.Context) error { - timeout := time.After(15 * time.Second) + timeout := time.After(30 * time.Second) expectedCollections := []string{ "storage_block_device_metrics_v2", @@ -665,6 +688,34 @@ func (t *testCASTAIServer) assertStorageMetrics(ctx context.Context) error { } } + // k8s_pod_volume_metrics is optional - only sent if there are PVCs in the cluster + if batches, exists := metrics["k8s_pod_volume_metrics"]; exists { + for _, batch := range batches { + podVolumeMetrics, err := t.decodeK8sPodVolumeMetrics(batch.Schema, batch.Metrics) + if err != nil { + return fmt.Errorf("failed to decode k8s pod volume metrics: %w", err) + } + + for _, metric := range podVolumeMetrics { + if metric.NodeName == "" { + return errors.New("k8s pod volume metric missing node name") + } + if metric.Namespace == "" { + return errors.New("k8s pod volume metric missing namespace") + } + if metric.PodName == "" { + return errors.New("k8s pod volume metric missing pod name") + } + if metric.PodUID == "" { + return errors.New("k8s pod volume metric missing pod uid") + } + if metric.VolumeName == "" { + return errors.New("k8s pod volume metric missing volume name") + } + } + } + } + foundCollections := 0 for _, expectedCollection := range expectedCollections { if _, exists := metrics[expectedCollection]; exists { diff --git a/e2e/pvc-test.yaml b/e2e/pvc-test.yaml new file mode 100644 index 00000000..05713074 --- /dev/null +++ b/e2e/pvc-test.yaml @@ -0,0 +1,30 @@ +# PVC test for E2E - generates k8s_pod_volume_metrics +# Uses Kind's default "standard" storage class (local-path-provisioner) +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: e2e-test-pvc +spec: + accessModes: + - ReadWriteOnce + storageClassName: standard + resources: + requests: + storage: 100Mi +--- +apiVersion: v1 +kind: Pod +metadata: + name: e2e-test-pvc-pod +spec: + containers: + - name: test + image: busybox + command: ["sleep", "3600"] + volumeMounts: + - name: data + mountPath: /data + volumes: + - name: data + persistentVolumeClaim: + claimName: e2e-test-pvc diff --git a/e2e/run.sh b/e2e/run.sh index ac43b722..a05bb57f 100755 --- a/e2e/run.sh +++ b/e2e/run.sh @@ -59,6 +59,8 @@ kubectl apply -f ./e2e/socks5-generator.yaml kubectl apply -f ./e2e/nc-server-client.yaml kubectl apply -f ./e2e/conn-generator.yaml kubectl apply -f ./e2e/iperf.yaml +# Deploy PVC test to generate k8s_pod_volume_metrics +kubectl apply -f ./e2e/pvc-test.yaml echo "Waiting for job to finish"