diff --git a/controllers/kafkacluster_controller.go b/controllers/kafkacluster_controller.go index 62341e1fa..41f05a4c2 100644 --- a/controllers/kafkacluster_controller.go +++ b/controllers/kafkacluster_controller.go @@ -388,7 +388,10 @@ func SetupKafkaClusterWithManager(mgr ctrl.Manager) *ctrl.Builder { UpdateFunc: func(e event.UpdateEvent) bool { switch newObj := e.ObjectNew.(type) { case *corev1.Pod, *corev1.ConfigMap, *corev1.PersistentVolumeClaim: - patchResult, err := patch.DefaultPatchMaker.Calculate(e.ObjectOld, e.ObjectNew) + opts := []patch.CalculateOption{ + k8sutil.IgnoreMutationWebhookFields(), + } + patchResult, err := patch.DefaultPatchMaker.Calculate(e.ObjectOld, e.ObjectNew, opts...) if err != nil { log.Error(err, "could not match objects", "kind", e.ObjectOld.GetObjectKind()) } else if patchResult.IsEmpty() { diff --git a/go.mod b/go.mod index e17b83695..92a6a1cf1 100644 --- a/go.mod +++ b/go.mod @@ -105,7 +105,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/json-iterator/go v1.1.12 // indirect + github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.18.1 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/pkg/k8sutil/patch_options.go b/pkg/k8sutil/patch_options.go new file mode 100644 index 000000000..05b39fa7f --- /dev/null +++ b/pkg/k8sutil/patch_options.go @@ -0,0 +1,222 @@ +// Copyright © 2019 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8sutil + +import ( + "emperror.dev/errors" + json "github.com/json-iterator/go" + corev1 "k8s.io/api/core/v1" + + "github.com/banzaicloud/k8s-objectmatcher/patch" +) + +// IgnoreMutationWebhookFields creates a CalculateOption that ignores fields commonly +// modified by mutation webhooks like Gatekeeper, OPA, and Pod Security Policies +func IgnoreMutationWebhookFields() patch.CalculateOption { + return func(current, modified []byte) ([]byte, []byte, error) { + currentPod := &corev1.Pod{} + if err := json.Unmarshal(current, currentPod); err != nil { + // Not a pod, return unchanged + return current, modified, nil + } + + modifiedPod := &corev1.Pod{} + if err := json.Unmarshal(modified, modifiedPod); err != nil { + return current, modified, nil + } + + // Check if ScaleOps is managing resources in EITHER pod + isScaleOpsManaged := isScaleOpsManagedPod(currentPod) || isScaleOpsManagedPod(modifiedPod) + + // Remove fields that mutation webhooks commonly modify + currentPod = cleanMutationWebhookFields(currentPod, isScaleOpsManaged) + modifiedPod = cleanMutationWebhookFields(modifiedPod, isScaleOpsManaged) + + currentBytes, err := json.Marshal(currentPod) + if err != nil { + return []byte{}, []byte{}, errors.Wrap(err, "could not marshal cleaned current pod") + } + + modifiedBytes, err := json.Marshal(modifiedPod) + if err != nil { + return []byte{}, []byte{}, errors.Wrap(err, "could not marshal cleaned modified pod") + } + + return currentBytes, modifiedBytes, nil + } +} + +// isScaleOpsManagedPod checks if a pod is managed by ScaleOps +func isScaleOpsManagedPod(pod *corev1.Pod) bool { + return pod.Annotations != nil && (pod.Annotations["scaleops.sh/managed-containers"] != "" || + pod.Annotations["scaleops.sh/pod-owner-grouping"] != "") +} + +func cleanMutationWebhookFields(pod *corev1.Pod, isScaleOpsManaged bool) *corev1.Pod { + // Create a copy to avoid modifying the original + cleaned := pod.DeepCopy() + + // Remove mutation webhook annotations that should not trigger reconciliation + if cleaned.Annotations != nil { + // Gatekeeper annotations + delete(cleaned.Annotations, "gatekeeper.sh/mutation-id") + delete(cleaned.Annotations, "gatekeeper.sh/mutations") + + // ScaleOps annotations + delete(cleaned.Annotations, "scaleops.sh/admission") + delete(cleaned.Annotations, "scaleops.sh/applied-policy") + delete(cleaned.Annotations, "scaleops.sh/last-applied-resources") + delete(cleaned.Annotations, "scaleops.sh/managed-containers") + delete(cleaned.Annotations, "scaleops.sh/managed-keep-limit-cpu") + delete(cleaned.Annotations, "scaleops.sh/managed-keep-limit-memory") + delete(cleaned.Annotations, "scaleops.sh/origin-resources") + delete(cleaned.Annotations, "scaleops.sh/pod-owner-grouping") + delete(cleaned.Annotations, "scaleops.sh/pod-owner-identifier") + + // Remove the last-applied annotation that may contain ScaleOps fields + // Note: This is regenerated on updates by the k8s-objectmatcher library + delete(cleaned.Annotations, "banzaicloud.com/last-applied") + + // If annotations map is empty, set to nil to normalize comparison + if len(cleaned.Annotations) == 0 { + cleaned.Annotations = nil + } + } + + // Remove ScaleOps labels + if cleaned.Labels != nil { + delete(cleaned.Labels, "scaleops.sh/applied-recommendation") + delete(cleaned.Labels, "scaleops.sh/managed") + delete(cleaned.Labels, "scaleops.sh/managed-unevictable") + delete(cleaned.Labels, "scaleops.sh/pod-owner-grouping") + delete(cleaned.Labels, "scaleops.sh/pod-owner-identifier") + + // If labels map is empty, set to nil to normalize comparison + if len(cleaned.Labels) == 0 { + cleaned.Labels = nil + } + } + + // Remove ScaleOps-added affinity rules (preferred scheduling only) + if cleaned.Spec.Affinity != nil { + if cleaned.Spec.Affinity.NodeAffinity != nil { + // Remove preferred node affinity added by ScaleOps (node-packing) + if cleaned.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil { + var filtered []corev1.PreferredSchedulingTerm + for _, term := range cleaned.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution { + // Keep only terms that are NOT ScaleOps node-packing preferences + isScaleOpsTerm := false + for _, expr := range term.Preference.MatchExpressions { + if expr.Key == "scaleops.sh/node-packing" { + isScaleOpsTerm = true + break + } + } + if !isScaleOpsTerm { + filtered = append(filtered, term) + } + } + if len(filtered) == 0 { + cleaned.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = nil + } else { + cleaned.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = filtered + } + } + // Clean up empty NodeAffinity + if cleaned.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution == nil && + cleaned.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { + cleaned.Spec.Affinity.NodeAffinity = nil + } + } + + if cleaned.Spec.Affinity.PodAffinity != nil { + // Remove preferred pod affinity added by ScaleOps (managed-unevictable) + if cleaned.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil { + var filtered []corev1.WeightedPodAffinityTerm + for _, term := range cleaned.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution { + // Keep only terms that are NOT ScaleOps managed-unevictable preferences + isScaleOpsTerm := false + if term.PodAffinityTerm.LabelSelector != nil { + for _, expr := range term.PodAffinityTerm.LabelSelector.MatchExpressions { + if expr.Key == "scaleops.sh/managed-unevictable" { + isScaleOpsTerm = true + break + } + } + } + if !isScaleOpsTerm { + filtered = append(filtered, term) + } + } + if len(filtered) == 0 { + cleaned.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution = nil + } else { + cleaned.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution = filtered + } + } + // Clean up empty PodAffinity + if cleaned.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution == nil && + cleaned.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { + cleaned.Spec.Affinity.PodAffinity = nil + } + } + + // Clean up empty Affinity + if cleaned.Spec.Affinity.NodeAffinity == nil && + cleaned.Spec.Affinity.PodAffinity == nil && + cleaned.Spec.Affinity.PodAntiAffinity == nil { + cleaned.Spec.Affinity = nil + } + } + + // Clean resources if ScaleOps is managing them + if isScaleOpsManaged { + for i := range cleaned.Spec.InitContainers { + cleaned.Spec.InitContainers[i].Resources = corev1.ResourceRequirements{} + } + for i := range cleaned.Spec.Containers { + cleaned.Spec.Containers[i].Resources = corev1.ResourceRequirements{} + } + } + + // Clean security context fields commonly set by PSPs/Gatekeeper + for i := range cleaned.Spec.InitContainers { + cleanSecurityContext(&cleaned.Spec.InitContainers[i]) + } + for i := range cleaned.Spec.Containers { + cleanSecurityContext(&cleaned.Spec.Containers[i]) + } + + return cleaned +} + +func cleanSecurityContext(container *corev1.Container) { + if container.SecurityContext == nil { + return + } + + // Note: We intentionally do NOT clean security context fields here by default + // because those are typically important security controls that should be reconciled. + // If you need to ignore specific security context fields, uncomment the relevant lines below: + + // AllowPrivilegeEscalation is often set by PSPs + // container.SecurityContext.AllowPrivilegeEscalation = nil + + // ReadOnlyRootFilesystem is often set by PSPs + // container.SecurityContext.ReadOnlyRootFilesystem = nil + + // Capabilities are often modified by PSPs + // container.SecurityContext.Capabilities = nil +} diff --git a/pkg/k8sutil/patch_options_test.go b/pkg/k8sutil/patch_options_test.go new file mode 100644 index 000000000..d9cc4a22e --- /dev/null +++ b/pkg/k8sutil/patch_options_test.go @@ -0,0 +1,640 @@ +// Copyright © 2019 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8sutil + +import ( + "testing" + + "github.com/banzaicloud/k8s-objectmatcher/patch" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestIgnoreMutationWebhookFields(t *testing.T) { + tests := []struct { + name string + currentPod *corev1.Pod + modifiedPod *corev1.Pod + expectDiff bool + description string + }{ + { + name: "ignore gatekeeper mutation annotations", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Annotations: map[string]string{ + "gatekeeper.sh/mutation-id": "abc123", + "gatekeeper.sh/mutations": "Assign//policy1:1", + "other-annotation": "keep-this", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Image: "kafka:latest", + }, + }, + }, + }, + modifiedPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Annotations: map[string]string{ + "other-annotation": "keep-this", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Image: "kafka:latest", + }, + }, + }, + }, + expectDiff: false, + description: "Gatekeeper mutation annotations should be ignored", + }, + { + name: "detect actual spec changes", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Image: "kafka:1.0", + }, + }, + }, + }, + modifiedPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Image: "kafka:2.0", + }, + }, + }, + }, + expectDiff: true, + description: "Real spec changes should be detected", + }, + { + name: "only gatekeeper annotations differ", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Annotations: map[string]string{ + "gatekeeper.sh/mutation-id": "xyz789", + "app": "kafka", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Image: "kafka:latest", + }, + }, + }, + }, + modifiedPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Annotations: map[string]string{ + "app": "kafka", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Image: "kafka:latest", + }, + }, + }, + }, + expectDiff: false, + description: "Only gatekeeper annotations differ, should not trigger diff", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Set last applied annotation on current pod + if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(tt.currentPod); err != nil { + t.Fatalf("Failed to set last applied annotation: %v", err) + } + + opts := []patch.CalculateOption{ + IgnoreMutationWebhookFields(), + } + + patchResult, err := patch.DefaultPatchMaker.Calculate(tt.currentPod, tt.modifiedPod, opts...) + if err != nil { + t.Fatalf("Failed to calculate patch: %v", err) + } + + hasDiff := !patchResult.IsEmpty() + if hasDiff != tt.expectDiff { + t.Errorf("%s: expected diff=%v, got diff=%v\nPatch: %s", + tt.description, tt.expectDiff, hasDiff, string(patchResult.Patch)) + } + }) + } +} + +func TestCombinedIgnoreOptions(t *testing.T) { + t.Run("combine mutation webhook and resource ignoring", func(t *testing.T) { + currentPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Annotations: map[string]string{ + "gatekeeper.sh/mutation-id": "abc123", + "gatekeeper.sh/mutations": "Assign//policy1:1", + "scaleops.sh/pod-owner-grouping": "kafkacluster", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Image: "kafka:latest", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2000m"), + corev1.ResourceMemory: resource.MustParse("4Gi"), + }, + }, + }, + }, + }, + } + + modifiedPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Annotations: map[string]string{ + "scaleops.sh/pod-owner-grouping": "kafkacluster", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Image: "kafka:latest", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1000m"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + }, + }, + }, + } + + if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(currentPod); err != nil { + t.Fatalf("Failed to set last applied annotation: %v", err) + } + + opts := []patch.CalculateOption{ + IgnoreMutationWebhookFields(), + } + + patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, modifiedPod, opts...) + if err != nil { + t.Fatalf("Failed to calculate patch: %v", err) + } + + if !patchResult.IsEmpty() { + t.Errorf("Expected no diff when both mutation webhook annotations and resources differ (ScaleOps managed), but got patch: %s", + string(patchResult.Patch)) + } + }) +} + +func TestCleanMutationWebhookFields(t *testing.T) { + t.Run("removes gatekeeper annotations", func(t *testing.T) { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Annotations: map[string]string{ + "gatekeeper.sh/mutation-id": "abc123", + "gatekeeper.sh/mutations": "Assign//policy1:1", + "keep-this": "value", + }, + }, + } + + cleaned := cleanMutationWebhookFields(pod, false) + + if _, exists := cleaned.Annotations["gatekeeper.sh/mutation-id"]; exists { + t.Error("gatekeeper.sh/mutation-id should be removed") + } + if _, exists := cleaned.Annotations["gatekeeper.sh/mutations"]; exists { + t.Error("gatekeeper.sh/mutations should be removed") + } + if cleaned.Annotations["keep-this"] != "value" { + t.Error("Other annotations should be preserved") + } + }) + + t.Run("does not modify original pod", func(t *testing.T) { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Annotations: map[string]string{ + "gatekeeper.sh/mutation-id": "abc123", + }, + }, + } + + cleanMutationWebhookFields(pod, false) + + if _, exists := pod.Annotations["gatekeeper.sh/mutation-id"]; !exists { + t.Error("Original pod should not be modified") + } + }) +} + +// Helper functions for creating test pods +func createBasicPod() *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Annotations: map[string]string{}, + Labels: map[string]string{}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Image: "kafka:latest", + }, + }, + }, + } +} + +func createScaleOpsAffinity() *corev1.Affinity { + return &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 95, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "scaleops.sh/node-packing", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"high"}, + }, + }, + }, + }, + { + Weight: 50, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "scaleops.sh/node-packing", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"medium"}, + }, + }, + }, + }, + }, + }, + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "scaleops.sh/managed-unevictable", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + } +} + +func createKafkaContainer(image, cpuRequest string) corev1.Container { + container := corev1.Container{ + Name: "kafka", + Image: image, + } + if cpuRequest != "" { + container.Resources = corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(cpuRequest), + corev1.ResourceMemory: resource.MustParse("4Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("4Gi"), + }, + } + } + return container +} + +func createComplexScaleOpsPodBefore() *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pipeline-kafka-101-4s7b2", + Namespace: "default", + Annotations: map[string]string{ + "scaleops.sh/admission": "true", + "scaleops.sh/applied-policy": "high-availability", + "scaleops.sh/managed-containers": "{}", + "scaleops.sh/pod-owner-grouping": "kafkabroker", + "app": "kafka", + }, + Labels: map[string]string{ + "scaleops.sh/managed": "true", + "scaleops.sh/managed-unevictable": "true", + "scaleops.sh/pod-owner-grouping": "kafkabroker", + "app": "kafka", + "brokerId": "101", + }, + }, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 95, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "scaleops.sh/node-packing", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"high"}, + }, + }, + }, + }, + }, + }, + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "scaleops.sh/managed-unevictable", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + }, + Containers: []corev1.Container{ + createKafkaContainer("kafka:3.9.1", "697m"), + { + Name: "fluent-bit", + Image: "fluent-bit:latest", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("100Mi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }, + }, + }, + } +} + +func createComplexScaleOpsPodAfter() *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pipeline-kafka-101-4s7b2", + Namespace: "default", + Annotations: map[string]string{ + "app": "kafka", + }, + Labels: map[string]string{ + "app": "kafka", + "brokerId": "101", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + createKafkaContainer("kafka:3.9.1", "1"), + { + Name: "fluent-bit", + Image: "fluent-bit:latest", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }, + }, + }, + } +} + +func getScaleOpsTestCases() []struct { + name string + currentPod *corev1.Pod + modifiedPod *corev1.Pod + expectDiff bool + description string +} { + return []struct { + name string + currentPod *corev1.Pod + modifiedPod *corev1.Pod + expectDiff bool + description string + }{ + { + name: "ignore scaleops annotations and labels", + currentPod: func() *corev1.Pod { + pod := createBasicPod() + pod.Annotations = map[string]string{ + "scaleops.sh/admission": "true", + "scaleops.sh/applied-policy": "high-availability", + "scaleops.sh/last-applied-resources": "{}", + "scaleops.sh/managed-containers": "{}", + "scaleops.sh/managed-keep-limit-cpu": "true", + "scaleops.sh/managed-keep-limit-memory": "true", + "scaleops.sh/origin-resources": "{}", + "scaleops.sh/pod-owner-grouping": "kafkabroker", + "scaleops.sh/pod-owner-identifier": "pipeline-kafka-123", + "app": "kafka", + } + pod.Labels = map[string]string{ + "scaleops.sh/applied-recommendation": "kafkabroker-pipeline-kafka-123", + "scaleops.sh/managed": "true", + "scaleops.sh/managed-unevictable": "true", + "scaleops.sh/pod-owner-grouping": "kafkabroker", + "scaleops.sh/pod-owner-identifier": "pipeline-kafka-123", + "app": "kafka", + } + return pod + }(), + modifiedPod: func() *corev1.Pod { + pod := createBasicPod() + pod.Annotations["app"] = "kafka" + pod.Labels["app"] = "kafka" + return pod + }(), + expectDiff: false, + description: "ScaleOps annotations and labels should be ignored", + }, + { + name: "ignore scaleops-modified resources", + currentPod: func() *corev1.Pod { + pod := createBasicPod() + pod.Annotations = map[string]string{ + "scaleops.sh/managed-containers": "{}", + "scaleops.sh/pod-owner-grouping": "kafkabroker", + } + pod.Spec.Containers[0] = createKafkaContainer("kafka:latest", "697m") + return pod + }(), + modifiedPod: func() *corev1.Pod { + pod := createBasicPod() + pod.Annotations = map[string]string{ + "scaleops.sh/managed-containers": "{}", + "scaleops.sh/pod-owner-grouping": "kafkabroker", + } + pod.Spec.Containers[0] = createKafkaContainer("kafka:latest", "1") + return pod + }(), + expectDiff: false, + description: "ScaleOps-modified resources should be ignored when annotations present", + }, + { + name: "ignore scaleops-added affinity rules", + currentPod: func() *corev1.Pod { + pod := createBasicPod() + pod.Spec.Affinity = createScaleOpsAffinity() + return pod + }(), + modifiedPod: createBasicPod(), + expectDiff: false, + description: "ScaleOps-added affinity rules should be ignored", + }, + { + name: "detect image changes even with scaleops", + currentPod: func() *corev1.Pod { + pod := createBasicPod() + pod.Annotations["scaleops.sh/pod-owner-grouping"] = "kafkabroker" + pod.Labels["scaleops.sh/managed"] = "true" + pod.Spec.Containers[0].Image = "kafka:3.6.1" + return pod + }(), + modifiedPod: func() *corev1.Pod { + pod := createBasicPod() + pod.Annotations["scaleops.sh/pod-owner-grouping"] = "kafkabroker" + pod.Labels["scaleops.sh/managed"] = "true" + pod.Spec.Containers[0].Image = "kafka:3.9.1" + return pod + }(), + expectDiff: true, + description: "Image changes should be detected even with ScaleOps annotations", + }, + { + name: "complex scaleops scenario - all mutations ignored", + currentPod: createComplexScaleOpsPodBefore(), + modifiedPod: createComplexScaleOpsPodAfter(), + expectDiff: false, + description: "Complex ScaleOps scenario: all ScaleOps mutations should be ignored", + }, + } +} + +func TestIgnoreScaleOpsFields(t *testing.T) { + tests := getScaleOpsTestCases() + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Set last applied annotation on current pod + if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(tt.currentPod); err != nil { + t.Fatalf("Failed to set last applied annotation: %v", err) + } + + opts := []patch.CalculateOption{ + IgnoreMutationWebhookFields(), + } + + patchResult, err := patch.DefaultPatchMaker.Calculate(tt.currentPod, tt.modifiedPod, opts...) + if err != nil { + t.Fatalf("Failed to calculate patch: %v", err) + } + + hasDiff := !patchResult.IsEmpty() + if hasDiff != tt.expectDiff { + t.Errorf("%s: expected diff=%v, got diff=%v\nPatch: %s\nCurrent: %s\nModified: %s", + tt.description, tt.expectDiff, hasDiff, + string(patchResult.Patch), + string(patchResult.Current), + string(patchResult.Modified)) + } + }) + } +} diff --git a/pkg/k8sutil/resource.go b/pkg/k8sutil/resource.go index b50d4228d..9117c7381 100644 --- a/pkg/k8sutil/resource.go +++ b/pkg/k8sutil/resource.go @@ -164,7 +164,10 @@ func Reconcile(log logr.Logger, client runtimeClient.Client, desired runtime.Obj // CheckIfObjectUpdated checks if the given object is updated using K8sObjectMatcher func CheckIfObjectUpdated(log logr.Logger, desiredType reflect.Type, current, desired runtime.Object) bool { - patchResult, err := patch.DefaultPatchMaker.Calculate(current, desired) + opts := []patch.CalculateOption{ + IgnoreMutationWebhookFields(), + } + patchResult, err := patch.DefaultPatchMaker.Calculate(current, desired, opts...) if err != nil { log.Error(err, "could not match objects", "kind", desiredType) return true diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 72c907bcd..aa43e7228 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -953,7 +953,10 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo desiredPod.Spec.Tolerations = uniqueTolerations } // Check if the resource actually updated or if labels match TaintedBrokersSelector - patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod) + opts := []patch.CalculateOption{ + k8sutil.IgnoreMutationWebhookFields(), + } + patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod, opts...) switch { case err != nil: log.Error(err, "could not match objects", "kind", desiredType)