Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ if 'pods' in ACTIVE_DEPLOYMENTS:
# Deploy example resources
k8s_yaml('samples/pods/node.yaml')
k8s_yaml('samples/pods/pod.yaml')
k8s_yaml('samples/pods/gang-scheduling.yaml')
k8s_resource('gang-pod', labels=['Cortex-Pods'])
k8s_resource('test-pod', labels=['Cortex-Pods'])

########### Dev Dependencies
Expand Down
2 changes: 2 additions & 0 deletions api/delegation/pods/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
type PodPipelineRequest struct {
// The available nodes.
Nodes []corev1.Node `json:"nodes"`
// The pod to schedule.
Pod *corev1.Pod `json:"pod"`
}

func (r PodPipelineRequest) GetSubjects() []string {
Expand Down
2 changes: 2 additions & 0 deletions helm/bundles/cortex-pods/templates/pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ spec:
steps:
- ref: { name: pods-noop }
mandatory: false
- ref: { name: pods-gang }
mandatory: true
13 changes: 13 additions & 0 deletions helm/bundles/cortex-pods/templates/steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,16 @@ spec:
This is only a passthrough step which lets all pod candidates through.
It is used as a placeholder step in the pods scheduler pipeline.
knowledges: []
---
apiVersion: cortex.cloud/v1alpha1
kind: Step
metadata:
name: pods-gang
spec:
operator: cortex
type: filter
impl: gang
description: |
This filter ensures that pods belonging to a PodGroup are only scheduled
if the PodGroup resource exists.
knowledges: []
116 changes: 116 additions & 0 deletions internal/scheduling/decisions/pods/gang_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package pods

import (
"context"
"errors"
"log/slog"

"github.com/cobaltcore-dev/cortex/api/delegation/pods"
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// GangFilter ensures that pods belonging to a PodGroup are only scheduled
// if the PodGroup resource exists.
type GangFilter struct {
client client.Client
}

func (f *GangFilter) Init(ctx context.Context, client client.Client, step v1alpha1.Step) error {
f.client = client
return nil
}

func (f *GangFilter) Run(traceLog *slog.Logger, request pods.PodPipelineRequest) (*lib.StepResult, error) {
activations := make(map[string]float64, len(request.Nodes))
stats := make(map[string]lib.StepStatistics)

pod := request.Pod
if pod == nil {
traceLog.Warn("gang-filter: pod is nil in request")
return nil, errors.New("pod is nil in request")
}

// Check for Workload API
// Fetch the full pod object to inspect new fields if they are not in the struct
workloadName := ""
// Note: We cannot access pod.Spec.WorkloadRef directly if the struct is old.
// Use unstructured to attempt to find it.
uPod := &unstructured.Unstructured{}
uPod.SetGroupVersionKind(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"})
if err := f.client.Get(context.Background(), client.ObjectKey{Name: pod.Name, Namespace: pod.Namespace}, uPod); err == nil {
val, found, err := unstructured.NestedString(uPod.Object, "spec", "workloadRef", "name")
if err != nil {
traceLog.Error("gang-filter: error accessing workloadRef in pod spec", "error", err)
}
if found {
workloadName = val
}
}

if workloadName != "" {
traceLog.Info("gang-filter: checking for workload", "workloadName", workloadName)
workload := &unstructured.Unstructured{}
workload.SetGroupVersionKind(schema.GroupVersionKind{
Group: "scheduling.k8s.io",
Version: "v1alpha1",
Kind: "Workload",
})
if err := f.client.Get(context.Background(), client.ObjectKey{Name: workloadName, Namespace: pod.Namespace}, workload); err != nil {
traceLog.Error("gang-filter: failed to fetch workload", "error", err)
// Deny all nodes if the gang resource is missing or cannot be fetched.
return &lib.StepResult{Activations: activations, Statistics: stats}, nil
}
traceLog.Info("gang-filter: workload found, allowing scheduling")
for _, node := range request.Nodes {
activations[node.Name] = 1.0
}
return &lib.StepResult{Activations: activations, Statistics: stats}, nil
}

// Fallback: Check if the pod belongs to a gang via Label
// We use the label "pod-group.scheduling.k8s.io/name" which is standard for gang scheduling.
gangName, ok := pod.Labels["pod-group.scheduling.k8s.io/name"]
if !ok {
// Not a gang pod, allow it.
for _, node := range request.Nodes {
activations[node.Name] = 1.0
}
return &lib.StepResult{Activations: activations, Statistics: stats}, nil
}

traceLog.Info("gang-filter: checking for pod group", "gangName", gangName)

// Fetch the PodGroup.
// We use Unstructured because the PodGroup CRD might not be compiled into this binary.
// We assume the group is scheduling.k8s.io
podGroup := &unstructured.Unstructured{}
podGroup.SetGroupVersionKind(schema.GroupVersionKind{
Group: "scheduling.k8s.io",
Version: "v1alpha1",
Kind: "PodGroup",
})

err := f.client.Get(context.Background(), client.ObjectKey{
Name: gangName,
Namespace: pod.Namespace,
}, podGroup)

if err != nil {
traceLog.Error("gang-filter: failed to fetch pod group", "error", err)
// Deny all nodes if the gang resource is missing or cannot be fetched.
return &lib.StepResult{Activations: activations, Statistics: stats}, nil
}

// If we found the PodGroup, we currently allow scheduling.
// In a full implementation, we would check 'minMember' and other status fields here.
traceLog.Info("gang-filter: pod group found, allowing scheduling")
for _, node := range request.Nodes {
activations[node.Name] = 1.0
}

return &lib.StepResult{Activations: activations, Statistics: stats}, nil
}
207 changes: 207 additions & 0 deletions internal/scheduling/decisions/pods/gang_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// Copyright 2025 SAP SE
// SPDX-License-Identifier: Apache-2.0

package pods

import (
"context"
"log/slog"
"testing"

"github.com/cobaltcore-dev/cortex/api/delegation/pods"
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func TestGangFilter_Run(t *testing.T) {
scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)

Check failure on line 24 in internal/scheduling/decisions/pods/gang_filter_test.go

View workflow job for this annotation

GitHub Actions / Checks

Error return value is not checked (errcheck)

// Helper to create unstructured Pod with workloadRef
createPodWithWorkload := func(name, ns, workloadName string) *unstructured.Unstructured {
u := &unstructured.Unstructured{}
u.SetUnstructuredContent(map[string]interface{}{
"spec": map[string]interface{}{
"workloadRef": map[string]interface{}{
"name": workloadName,
},
},
})
u.SetGroupVersionKind(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"})
u.SetName(name)
u.SetNamespace(ns)
return u
}

// Helper to create unstructured Workload
createWorkload := func(name, ns string) *unstructured.Unstructured {
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(schema.GroupVersionKind{Group: "scheduling.k8s.io", Version: "v1alpha1", Kind: "Workload"})
u.SetName(name)
u.SetNamespace(ns)
return u
}

// Helper to create unstructured PodGroup
createPodGroup := func(name, ns string) *unstructured.Unstructured {
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(schema.GroupVersionKind{Group: "scheduling.k8s.io", Version: "v1alpha1", Kind: "PodGroup"})
u.SetName(name)
u.SetNamespace(ns)
return u
}

tests := []struct {
name string
pod *corev1.Pod
existingObjs []client.Object
expectedNodes []string
expectError bool
}{
{
name: "pod is nil",
pod: nil,
expectedNodes: nil,
expectError: true,
},
{
name: "regular pod (no gang)",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default"},
},
existingObjs: []client.Object{
&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default"}},
},
expectedNodes: []string{"node1", "node2"},
expectError: false,
},
{
name: "pod with workloadRef (workload exists)",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "pod-workload", Namespace: "default"},
},
existingObjs: []client.Object{
createPodWithWorkload("pod-workload", "default", "my-workload"),
createWorkload("my-workload", "default"),
},
expectedNodes: []string{"node1", "node2"},
expectError: false,
},
{
name: "pod with workloadRef (workload missing)",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "pod-workload-missing", Namespace: "default"},
},
existingObjs: []client.Object{
createPodWithWorkload("pod-workload-missing", "default", "missing-workload"),
},
expectedNodes: []string{}, // All filtered out
expectError: false,
},
{
name: "pod with gang label (group exists)",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-gang",
Namespace: "default",
Labels: map[string]string{
"pod-group.scheduling.k8s.io/name": "my-gang",
},
},
},
existingObjs: []client.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-gang",
Namespace: "default",
Labels: map[string]string{
"pod-group.scheduling.k8s.io/name": "my-gang",
},
},
},
createPodGroup("my-gang", "default"),
},
expectedNodes: []string{"node1", "node2"},
expectError: false,
},
{
name: "pod with gang label (group missing)",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-gang-missing",
Namespace: "default",
Labels: map[string]string{
"pod-group.scheduling.k8s.io/name": "missing-gang",
},
},
},
existingObjs: []client.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-gang-missing",
Namespace: "default",
Labels: map[string]string{
"pod-group.scheduling.k8s.io/name": "missing-gang",
},
},
},
},
expectedNodes: []string{}, // All filtered out
expectError: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fakeClient := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(tt.existingObjs...).
Build()

filter := &GangFilter{}
// Initialize the filter with the fake client
err := filter.Init(context.Background(), fakeClient, v1alpha1.Step{})
if err != nil {
t.Fatalf("failed to init filter: %v", err)
}

nodes := []corev1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "node1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "node2"}},
}

req := pods.PodPipelineRequest{
Nodes: nodes,
Pod: tt.pod,
}

result, err := filter.Run(slog.Default(), req)

if tt.expectError {
if err == nil {
t.Errorf("expected error, got none")
}
return
} else if err != nil {
t.Errorf("unexpected error: %v", err)
return
}

// Check activations
if len(result.Activations) != len(tt.expectedNodes) {
t.Errorf("expected %d activations, got %d", len(tt.expectedNodes), len(result.Activations))
}
for _, node := range tt.expectedNodes {
if score, ok := result.Activations[node]; !ok || score != 1.0 {
t.Errorf("node %s expected 1.0, got %v", node, score)
}
}
})
}
}
Loading
Loading