From 98bbba72a965e06ce192d9cb45ad83dffdedf54c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles=20Kr=C3=BCger?= Date: Sat, 20 Dec 2025 15:28:23 +0100 Subject: [PATCH 1/3] implement cortex support for kubernetes native gang scheduling --- Tiltfile | 2 + api/delegation/pods/messages.go | 2 + .../cortex-pods/templates/pipelines.yaml | 2 + helm/bundles/cortex-pods/templates/steps.yaml | 13 ++ .../scheduling/decisions/pods/gang_filter.go | 116 ++++++++++++++++++ .../decisions/pods/pipeline_controller.go | 30 ++--- .../decisions/pods/supported_steps.go | 1 + samples/pods/gang-scheduling.yaml | 26 ++++ 8 files changed, 177 insertions(+), 15 deletions(-) create mode 100644 internal/scheduling/decisions/pods/gang_filter.go create mode 100644 samples/pods/gang-scheduling.yaml diff --git a/Tiltfile b/Tiltfile index 54751a43..6b7ddb6f 100644 --- a/Tiltfile +++ b/Tiltfile @@ -186,6 +186,8 @@ if 'pods' in ACTIVE_DEPLOYMENTS: # Deploy example resources k8s_yaml('samples/pods/node.yaml') k8s_yaml('samples/pods/pod.yaml') + k8s_yaml('samples/pods/gang-scheduling.yaml') + k8s_resource('gang-pod', labels=['Cortex-Pods']) k8s_resource('test-pod', labels=['Cortex-Pods']) ########### Dev Dependencies diff --git a/api/delegation/pods/messages.go b/api/delegation/pods/messages.go index 3439d584..67c5fa72 100644 --- a/api/delegation/pods/messages.go +++ b/api/delegation/pods/messages.go @@ -12,6 +12,8 @@ import ( type PodPipelineRequest struct { // The available nodes. Nodes []corev1.Node `json:"nodes"` + // The pod to schedule. + Pod *corev1.Pod `json:"pod"` } func (r PodPipelineRequest) GetSubjects() []string { diff --git a/helm/bundles/cortex-pods/templates/pipelines.yaml b/helm/bundles/cortex-pods/templates/pipelines.yaml index a30f0b79..81ca7e51 100644 --- a/helm/bundles/cortex-pods/templates/pipelines.yaml +++ b/helm/bundles/cortex-pods/templates/pipelines.yaml @@ -12,3 +12,5 @@ spec: steps: - ref: { name: pods-noop } mandatory: false + - ref: { name: pods-gang } + mandatory: true diff --git a/helm/bundles/cortex-pods/templates/steps.yaml b/helm/bundles/cortex-pods/templates/steps.yaml index 8aeec470..f3fcef90 100644 --- a/helm/bundles/cortex-pods/templates/steps.yaml +++ b/helm/bundles/cortex-pods/templates/steps.yaml @@ -11,3 +11,16 @@ spec: This is only a passthrough step which lets all pod candidates through. It is used as a placeholder step in the pods scheduler pipeline. knowledges: [] +--- +apiVersion: cortex.cloud/v1alpha1 +kind: Step +metadata: + name: pods-gang +spec: + operator: cortex + type: filter + impl: gang + description: | + This filter ensures that pods belonging to a PodGroup are only scheduled + if the PodGroup resource exists. + knowledges: [] \ No newline at end of file diff --git a/internal/scheduling/decisions/pods/gang_filter.go b/internal/scheduling/decisions/pods/gang_filter.go new file mode 100644 index 00000000..d0343b33 --- /dev/null +++ b/internal/scheduling/decisions/pods/gang_filter.go @@ -0,0 +1,116 @@ +package pods + +import ( + "context" + "errors" + "log/slog" + + "github.com/cobaltcore-dev/cortex/api/delegation/pods" + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// GangFilter ensures that pods belonging to a PodGroup are only scheduled +// if the PodGroup resource exists. +type GangFilter struct { + client client.Client +} + +func (f *GangFilter) Init(ctx context.Context, client client.Client, step v1alpha1.Step) error { + f.client = client + return nil +} + +func (f *GangFilter) Run(traceLog *slog.Logger, request pods.PodPipelineRequest) (*lib.StepResult, error) { + activations := make(map[string]float64, len(request.Nodes)) + stats := make(map[string]lib.StepStatistics) + + pod := request.Pod + if pod == nil { + traceLog.Warn("gang-filter: pod is nil in request") + return nil, errors.New("pod is nil in request") + } + + // Check for Workload API + // Fetch the full pod object to inspect new fields if they are not in the struct + workloadName := "" + // Note: We cannot access pod.Spec.WorkloadRef directly if the struct is old. + // Use unstructured to attempt to find it. + uPod := &unstructured.Unstructured{} + uPod.SetGroupVersionKind(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}) + if err := f.client.Get(context.Background(), client.ObjectKey{Name: pod.Name, Namespace: pod.Namespace}, uPod); err == nil { + val, found, err := unstructured.NestedString(uPod.Object, "spec", "workloadRef", "name") + if err != nil { + traceLog.Error("gang-filter: error accessing workloadRef in pod spec", "error", err) + } + if found { + workloadName = val + } + } + + if workloadName != "" { + traceLog.Info("gang-filter: checking for workload", "workloadName", workloadName) + workload := &unstructured.Unstructured{} + workload.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "scheduling.k8s.io", + Version: "v1alpha1", + Kind: "Workload", + }) + if err := f.client.Get(context.Background(), client.ObjectKey{Name: workloadName, Namespace: pod.Namespace}, workload); err != nil { + traceLog.Error("gang-filter: failed to fetch workload", "error", err) + // Deny all nodes if the gang resource is missing or cannot be fetched. + return &lib.StepResult{Activations: activations, Statistics: stats}, nil + } + traceLog.Info("gang-filter: workload found, allowing scheduling") + for _, node := range request.Nodes { + activations[node.Name] = 1.0 + } + return &lib.StepResult{Activations: activations, Statistics: stats}, nil + } + + // Fallback: Check if the pod belongs to a gang via Label + // We use the label "pod-group.scheduling.k8s.io/name" which is standard for gang scheduling. + gangName, ok := pod.Labels["pod-group.scheduling.k8s.io/name"] + if !ok { + // Not a gang pod, allow it. + for _, node := range request.Nodes { + activations[node.Name] = 1.0 + } + return &lib.StepResult{Activations: activations, Statistics: stats}, nil + } + + traceLog.Info("gang-filter: checking for pod group", "gangName", gangName) + + // Fetch the PodGroup. + // We use Unstructured because the PodGroup CRD might not be compiled into this binary. + // We assume the group is scheduling.k8s.io + podGroup := &unstructured.Unstructured{} + podGroup.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "scheduling.k8s.io", + Version: "v1alpha1", + Kind: "PodGroup", + }) + + err := f.client.Get(context.Background(), client.ObjectKey{ + Name: gangName, + Namespace: pod.Namespace, + }, podGroup) + + if err != nil { + traceLog.Error("gang-filter: failed to fetch pod group", "error", err) + // Deny all nodes if the gang resource is missing or cannot be fetched. + return &lib.StepResult{Activations: activations, Statistics: stats}, nil + } + + // If we found the PodGroup, we currently allow scheduling. + // In a full implementation, we would check 'minMember' and other status fields here. + traceLog.Info("gang-filter: pod group found, allowing scheduling") + for _, node := range request.Nodes { + activations[node.Name] = 1.0 + } + + return &lib.StepResult{Activations: activations, Statistics: stats}, nil +} diff --git a/internal/scheduling/decisions/pods/pipeline_controller.go b/internal/scheduling/decisions/pods/pipeline_controller.go index cb941fa9..ab20d548 100644 --- a/internal/scheduling/decisions/pods/pipeline_controller.go +++ b/internal/scheduling/decisions/pods/pipeline_controller.go @@ -125,6 +125,20 @@ func (c *DecisionPipelineController) process(ctx context.Context, decision *v1al return errors.New("pipeline not found or not ready") } + // Fetch the pod to schedule. + pod := &corev1.Pod{} + if err := c.Get(ctx, client.ObjectKey{ + Name: decision.Spec.PodRef.Name, + Namespace: decision.Spec.PodRef.Namespace, + }, pod); err != nil { + log.Error(err, "failed to fetch pod for decision") + return err + } + if pod.Spec.NodeName != "" { + log.Info("pod is already assigned to a node", "node", pod.Spec.NodeName) + return nil + } + // Find all available nodes. nodes := &corev1.NodeList{} if err := c.List(ctx, nodes); err != nil { @@ -135,7 +149,7 @@ func (c *DecisionPipelineController) process(ctx context.Context, decision *v1al } // Execute the scheduling pipeline. - request := pods.PodPipelineRequest{Nodes: nodes.Items} + request := pods.PodPipelineRequest{Nodes: nodes.Items, Pod: pod} result, err := pipeline.Run(request) if err != nil { log.V(1).Error(err, "failed to run scheduler pipeline") @@ -145,20 +159,6 @@ func (c *DecisionPipelineController) process(ctx context.Context, decision *v1al decision.Status.Took = metav1.Duration{Duration: time.Since(startedAt)} log.Info("decision processed successfully", "duration", time.Since(startedAt)) - // Check if the pod is already assigned to a node. - pod := &corev1.Pod{} - if err := c.Get(ctx, client.ObjectKey{ - Name: decision.Spec.PodRef.Name, - Namespace: decision.Spec.PodRef.Namespace, - }, pod); err != nil { - log.Error(err, "failed to fetch pod for decision") - return err - } - if pod.Spec.NodeName != "" { - log.Info("pod is already assigned to a node", "node", pod.Spec.NodeName) - return nil - } - // Assign the first node returned by the pipeline using a Binding. binding := &corev1.Binding{ ObjectMeta: metav1.ObjectMeta{ diff --git a/internal/scheduling/decisions/pods/supported_steps.go b/internal/scheduling/decisions/pods/supported_steps.go index 530d54f8..49a69f14 100644 --- a/internal/scheduling/decisions/pods/supported_steps.go +++ b/internal/scheduling/decisions/pods/supported_steps.go @@ -14,4 +14,5 @@ type PodStep = lib.Step[pods.PodPipelineRequest] // The steps actually used by the scheduler are defined through the configuration file. var supportedSteps = map[string]func() PodStep{ "noop": func() PodStep { return &NoopFilter{} }, + "gang": func() PodStep { return &GangFilter{} }, } diff --git a/samples/pods/gang-scheduling.yaml b/samples/pods/gang-scheduling.yaml new file mode 100644 index 00000000..b3723558 --- /dev/null +++ b/samples/pods/gang-scheduling.yaml @@ -0,0 +1,26 @@ +apiVersion: scheduling.k8s.io/v1alpha1 +kind: Workload +metadata: + name: test-workload + namespace: default +spec: + podGroups: + - name: test-group + policy: gang + gang: + minCount: 2 +--- +apiVersion: v1 +kind: Pod +metadata: + name: gang-pod + namespace: cortex-system +spec: + schedulerName: cortex + workloadRef: + name: test-workload + kind: Workload + apiGroup: scheduling.k8s.io + containers: + - name: nginx + image: nginx:latest \ No newline at end of file From 4061197754e45995f811da98ff7d3b11c38a0b07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles=20Kr=C3=BCger?= Date: Sun, 21 Dec 2025 09:38:34 +0100 Subject: [PATCH 2/3] add tests for gang scheduling --- .../decisions/pods/gang_filter_test.go | 207 ++++++++++++++++++ 1 file changed, 207 insertions(+) create mode 100644 internal/scheduling/decisions/pods/gang_filter_test.go diff --git a/internal/scheduling/decisions/pods/gang_filter_test.go b/internal/scheduling/decisions/pods/gang_filter_test.go new file mode 100644 index 00000000..53b2ed31 --- /dev/null +++ b/internal/scheduling/decisions/pods/gang_filter_test.go @@ -0,0 +1,207 @@ +// Copyright 2025 SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package pods + +import ( + "context" + "log/slog" + "testing" + + "github.com/cobaltcore-dev/cortex/api/delegation/pods" + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func TestGangFilter_Run(t *testing.T) { + scheme := runtime.NewScheme() + _ = corev1.AddToScheme(scheme) + + // Helper to create unstructured Pod with workloadRef + createPodWithWorkload := func(name, ns, workloadName string) *unstructured.Unstructured { + u := &unstructured.Unstructured{} + u.SetUnstructuredContent(map[string]interface{}{ + "spec": map[string]interface{}{ + "workloadRef": map[string]interface{}{ + "name": workloadName, + }, + }, + }) + u.SetGroupVersionKind(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}) + u.SetName(name) + u.SetNamespace(ns) + return u + } + + // Helper to create unstructured Workload + createWorkload := func(name, ns string) *unstructured.Unstructured { + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(schema.GroupVersionKind{Group: "scheduling.k8s.io", Version: "v1alpha1", Kind: "Workload"}) + u.SetName(name) + u.SetNamespace(ns) + return u + } + + // Helper to create unstructured PodGroup + createPodGroup := func(name, ns string) *unstructured.Unstructured { + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(schema.GroupVersionKind{Group: "scheduling.k8s.io", Version: "v1alpha1", Kind: "PodGroup"}) + u.SetName(name) + u.SetNamespace(ns) + return u + } + + tests := []struct { + name string + pod *corev1.Pod + existingObjs []client.Object + expectedNodes []string + expectError bool + }{ + { + name: "pod is nil", + pod: nil, + expectedNodes: nil, + expectError: true, + }, + { + name: "regular pod (no gang)", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default"}, + }, + existingObjs: []client.Object{ + &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default"}}, + }, + expectedNodes: []string{"node1", "node2"}, + expectError: false, + }, + { + name: "pod with workloadRef (workload exists)", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod-workload", Namespace: "default"}, + }, + existingObjs: []client.Object{ + createPodWithWorkload("pod-workload", "default", "my-workload"), + createWorkload("my-workload", "default"), + }, + expectedNodes: []string{"node1", "node2"}, + expectError: false, + }, + { + name: "pod with workloadRef (workload missing)", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod-workload-missing", Namespace: "default"}, + }, + existingObjs: []client.Object{ + createPodWithWorkload("pod-workload-missing", "default", "missing-workload"), + }, + expectedNodes: []string{}, // All filtered out + expectError: false, + }, + { + name: "pod with gang label (group exists)", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-gang", + Namespace: "default", + Labels: map[string]string{ + "pod-group.scheduling.k8s.io/name": "my-gang", + }, + }, + }, + existingObjs: []client.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-gang", + Namespace: "default", + Labels: map[string]string{ + "pod-group.scheduling.k8s.io/name": "my-gang", + }, + }, + }, + createPodGroup("my-gang", "default"), + }, + expectedNodes: []string{"node1", "node2"}, + expectError: false, + }, + { + name: "pod with gang label (group missing)", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-gang-missing", + Namespace: "default", + Labels: map[string]string{ + "pod-group.scheduling.k8s.io/name": "missing-gang", + }, + }, + }, + existingObjs: []client.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-gang-missing", + Namespace: "default", + Labels: map[string]string{ + "pod-group.scheduling.k8s.io/name": "missing-gang", + }, + }, + }, + }, + expectedNodes: []string{}, // All filtered out + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(tt.existingObjs...). + Build() + + filter := &GangFilter{} + // Initialize the filter with the fake client + err := filter.Init(context.Background(), fakeClient, v1alpha1.Step{}) + if err != nil { + t.Fatalf("failed to init filter: %v", err) + } + + nodes := []corev1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node2"}}, + } + + req := pods.PodPipelineRequest{ + Nodes: nodes, + Pod: tt.pod, + } + + result, err := filter.Run(slog.Default(), req) + + if tt.expectError { + if err == nil { + t.Errorf("expected error, got none") + } + return + } else if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + + // Check activations + if len(result.Activations) != len(tt.expectedNodes) { + t.Errorf("expected %d activations, got %d", len(tt.expectedNodes), len(result.Activations)) + } + for _, node := range tt.expectedNodes { + if score, ok := result.Activations[node]; !ok || score != 1.0 { + t.Errorf("node %s expected 1.0, got %v", node, score) + } + } + }) + } +} From dadbe6e25882836d844350bd89d41596ebb0e95f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles=20Kr=C3=BCger?= Date: Sun, 21 Dec 2025 10:54:19 +0100 Subject: [PATCH 3/3] fix linting --- .../scheduling/decisions/pods/gang_filter_test.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/internal/scheduling/decisions/pods/gang_filter_test.go b/internal/scheduling/decisions/pods/gang_filter_test.go index 53b2ed31..38e27ae8 100644 --- a/internal/scheduling/decisions/pods/gang_filter_test.go +++ b/internal/scheduling/decisions/pods/gang_filter_test.go @@ -21,14 +21,17 @@ import ( func TestGangFilter_Run(t *testing.T) { scheme := runtime.NewScheme() - _ = corev1.AddToScheme(scheme) + err := corev1.AddToScheme(scheme) + if err != nil { + t.Fatalf("failed to add corev1 to scheme: %v", err) + } // Helper to create unstructured Pod with workloadRef createPodWithWorkload := func(name, ns, workloadName string) *unstructured.Unstructured { u := &unstructured.Unstructured{} - u.SetUnstructuredContent(map[string]interface{}{ - "spec": map[string]interface{}{ - "workloadRef": map[string]interface{}{ + u.SetUnstructuredContent(map[string]any{ + "spec": map[string]any{ + "workloadRef": map[string]any{ "name": workloadName, }, },