diff --git a/api/v1/config/crd/eno.azure.io_inputmirrors.yaml b/api/v1/config/crd/eno.azure.io_inputmirrors.yaml new file mode 100644 index 00000000..7ec44326 --- /dev/null +++ b/api/v1/config/crd/eno.azure.io_inputmirrors.yaml @@ -0,0 +1,168 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.18.0 + name: inputmirrors.eno.azure.io +spec: + group: eno.azure.io + names: + kind: InputMirror + listKind: InputMirrorList + plural: inputmirrors + singular: inputmirror + scope: Namespaced + versions: + - additionalPrinterColumns: + - jsonPath: .spec.sourceResource.name + name: Source + type: string + - jsonPath: .status.conditions[?(@.type=="Synced")].status + name: Synced + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1 + schema: + openAPIV3Schema: + description: |- + InputMirror stores a copy of a resource from an overlay cluster. + It is created and managed by the OverlaySyncController based on Symphony.spec.overlayResourceRefs. + Compositions can bind to InputMirrors just like any other resource. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + type: string + metadata: + type: object + spec: + properties: + key: + description: Key matches the Symphony's overlayResourceRef key + type: string + symphonyRef: + description: SymphonyRef points to the owning Symphony + properties: + name: + description: Name of the referent. + type: string + required: + - name + type: object + sourceResource: + description: SourceResource describes what resource to sync from + the overlay + properties: + group: + description: API Group of the resource (empty string for core + API group) + type: string + kind: + description: Kind of the resource (e.g., ConfigMap, Secret) + type: string + name: + description: Name of the resource + type: string + namespace: + description: Namespace of the resource (empty for cluster-scoped + resources) + type: string + version: + description: API Version of the resource + type: string + required: + - kind + - name + - version + type: object + required: + - key + - sourceResource + - symphonyRef + type: object + status: + properties: + conditions: + description: Conditions describe the sync state + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: lastTransitionTime is the last time the condition + transitioned from one status to another. + format: date-time + type: string + message: + description: message is a human readable message indicating + details about the transition. + maxLength: 32768 + type: string + observedGeneration: + description: observedGeneration represents the .metadata.generation + that the condition was set based upon. + format: int64 + minimum: 0 + type: integer + reason: + description: reason contains a programmatic identifier indicating + the reason for the condition's last transition. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + x-kubernetes-list-map-keys: + - type + x-kubernetes-list-type: map + data: + description: |- + Data contains the actual resource data from the overlay cluster. + This is the full resource serialized as JSON. + type: object + x-kubernetes-preserve-unknown-fields: true + lastSyncTime: + description: LastSyncTime records when the resource was last successfully + synced + format: date-time + type: string + syncGeneration: + description: SyncGeneration tracks the source resource's resourceVersion + type: string + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/api/v1/config/crd/eno.azure.io_symphonies.yaml b/api/v1/config/crd/eno.azure.io_symphonies.yaml index 7242466c..785aeef3 100644 --- a/api/v1/config/crd/eno.azure.io_symphonies.yaml +++ b/api/v1/config/crd/eno.azure.io_symphonies.yaml @@ -72,6 +72,87 @@ spec: - resource type: object type: array + overlayCredentials: + description: |- + OverlayCredentials specifies how to access the overlay cluster. + When set, the OverlaySyncController will use these credentials to sync + resources specified in OverlayResourceRefs. + properties: + key: + default: kubeconfig + description: Key within the secret containing the kubeconfig data + type: string + secretRef: + description: SecretRef references a Secret containing the kubeconfig + for the overlay cluster + properties: + name: + description: name is unique within a namespace to reference + a secret resource. + type: string + namespace: + description: namespace defines the space within which the + secret name must be unique. + type: string + type: object + required: + - secretRef + type: object + overlayResourceRefs: + description: |- + OverlayResourceRefs specifies resources to sync from the overlay cluster. + Each ref results in an InputMirror being created that can be bound as an input. + items: + description: OverlayResourceRef defines a resource to sync from + an overlay cluster + properties: + key: + description: |- + Key that will be used to reference this input in Composition bindings. + This key maps to an auto-created InputMirror resource. + type: string + optional: + default: false + description: Optional indicates that synthesis can proceed if + this resource doesn't exist in the overlay. + type: boolean + resource: + description: Resource specifies what to fetch from the overlay + cluster + properties: + group: + description: API Group of the resource (empty string for + core API group) + type: string + kind: + description: Kind of the resource (e.g., ConfigMap, Secret) + type: string + name: + description: Name of the resource + type: string + namespace: + description: Namespace of the resource (empty for cluster-scoped + resources) + type: string + version: + description: API Version of the resource + type: string + required: + - kind + - name + - version + type: object + syncInterval: + default: 5m + description: SyncInterval determines how often to re-sync the + resource. + type: string + required: + - key + - resource + type: object + maxItems: 20 + type: array synthesisEnv: description: |- SynthesisEnv diff --git a/api/v1/inputmirror.go b/api/v1/inputmirror.go new file mode 100644 index 00000000..fcdb52eb --- /dev/null +++ b/api/v1/inputmirror.go @@ -0,0 +1,111 @@ +package v1 + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// +kubebuilder:object:root=true +type InputMirrorList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []InputMirror `json:"items"` +} + +// InputMirror stores a copy of a resource from a remote cluster. +// It is created and managed by the RemoteSyncController based on Symphony.spec.remoteResourceRefs. +// Compositions can bind to InputMirrors just like any other resource. +// +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:printcolumn:name="Source",type=string,JSONPath=`.spec.sourceResource.name` +// +kubebuilder:printcolumn:name="Synced",type=string,JSONPath=`.status.conditions[?(@.type=="Synced")].status` +// +kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp` +type InputMirror struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec InputMirrorSpec `json:"spec,omitempty"` + Status InputMirrorStatus `json:"status,omitempty"` +} + +type InputMirrorSpec struct { + // Key matches the Symphony's remoteResourceRef key + Key string `json:"key"` + + // SymphonyRef points to the owning Symphony + SymphonyRef corev1.LocalObjectReference `json:"symphonyRef"` + + // SourceResource describes what resource to sync from the remote cluster + SourceResource RemoteResourceSelector `json:"sourceResource"` +} + +type InputMirrorStatus struct { + // Data contains the actual resource data from the remote cluster. + // This is the full resource serialized as JSON. + // +kubebuilder:pruning:PreserveUnknownFields + Data *runtime.RawExtension `json:"data,omitempty"` + + // LastSyncTime records when the resource was last successfully synced + LastSyncTime *metav1.Time `json:"lastSyncTime,omitempty"` + + // SyncGeneration tracks the source resource's resourceVersion + SyncGeneration string `json:"syncGeneration,omitempty"` + + // Conditions describe the sync state + // +listType=map + // +listMapKey=type + Conditions []metav1.Condition `json:"conditions,omitempty"` +} + +// RemoteResourceSelector describes a resource to sync from a remote cluster +type RemoteResourceSelector struct { + // API Group of the resource (empty string for core API group) + // +optional + Group string `json:"group,omitempty"` + + // API Version of the resource + Version string `json:"version"` + + // Kind of the resource (e.g., ConfigMap, Secret) + Kind string `json:"kind"` + + // Name of the resource + Name string `json:"name"` + + // Namespace of the resource (empty for cluster-scoped resources) + // +optional + Namespace string `json:"namespace,omitempty"` +} + +// RemoteCredentials specifies how to access a remote cluster +type RemoteCredentials struct { + // SecretRef references a Secret containing the kubeconfig for the remote cluster + SecretRef corev1.SecretReference `json:"secretRef"` + + // Key within the secret containing the kubeconfig data + // +kubebuilder:default="kubeconfig" + // +optional + Key string `json:"key,omitempty"` +} + +// RemoteResourceRef defines a resource to sync from a remote cluster +type RemoteResourceRef struct { + // Key that will be used to reference this input in Composition bindings. + // This key maps to an auto-created InputMirror resource. + Key string `json:"key"` + + // Resource specifies what to fetch from the remote cluster + Resource RemoteResourceSelector `json:"resource"` + + // SyncInterval determines how often to re-sync the resource. + // +kubebuilder:default="5m" + // +optional + SyncInterval *metav1.Duration `json:"syncInterval,omitempty"` + + // Optional indicates that synthesis can proceed if this resource doesn't exist in the remote cluster. + // +kubebuilder:default=false + // +optional + Optional bool `json:"optional,omitempty"` +} diff --git a/api/v1/symphony.go b/api/v1/symphony.go index 83cc27ef..11d6cfb4 100644 --- a/api/v1/symphony.go +++ b/api/v1/symphony.go @@ -39,6 +39,18 @@ type SymphonySpec struct { // Copied opaquely into the compositions managed by this symphony. // +kubebuilder:validation:MaxItems:=50 SynthesisEnv []EnvVar `json:"synthesisEnv,omitempty"` // deprecated synthesis env should always be variation scoped. + + // RemoteCredentials specifies how to access the remote cluster. + // When set, the RemoteSyncController will use these credentials to sync + // resources specified in RemoteResourceRefs. + // +optional + RemoteCredentials *RemoteCredentials `json:"remoteCredentials,omitempty"` + + // RemoteResourceRefs specifies resources to sync from the remote cluster. + // Each ref results in an InputMirror being created that can be bound as an input. + // +optional + // +kubebuilder:validation:MaxItems:=20 + RemoteResourceRefs []RemoteResourceRef `json:"remoteResourceRefs,omitempty"` } type SymphonyStatus struct { diff --git a/api/v1/types.go b/api/v1/types.go index e5fdb085..20e69935 100644 --- a/api/v1/types.go +++ b/api/v1/types.go @@ -19,4 +19,5 @@ func init() { SchemeBuilder.Register(&CompositionList{}, &Composition{}) SchemeBuilder.Register(&SymphonyList{}, &Symphony{}) SchemeBuilder.Register(&ResourceSliceList{}, &ResourceSlice{}) + SchemeBuilder.Register(&InputMirrorList{}, &InputMirror{}) } diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 5a8d0e02..4ac033d7 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -582,6 +582,18 @@ func (in *SymphonySpec) DeepCopyInto(out *SymphonySpec) { *out = make([]EnvVar, len(*in)) copy(*out, *in) } + if in.RemoteCredentials != nil { + in, out := &in.RemoteCredentials, &out.RemoteCredentials + *out = new(RemoteCredentials) + **out = **in + } + if in.RemoteResourceRefs != nil { + in, out := &in.RemoteResourceRefs, &out.RemoteResourceRefs + *out = make([]RemoteResourceRef, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SymphonySpec. @@ -845,3 +857,162 @@ func (in *Variation) DeepCopy() *Variation { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InputMirror) DeepCopyInto(out *InputMirror) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirror. +func (in *InputMirror) DeepCopy() *InputMirror { + if in == nil { + return nil + } + out := new(InputMirror) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *InputMirror) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InputMirrorList) DeepCopyInto(out *InputMirrorList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]InputMirror, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirrorList. +func (in *InputMirrorList) DeepCopy() *InputMirrorList { + if in == nil { + return nil + } + out := new(InputMirrorList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *InputMirrorList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InputMirrorSpec) DeepCopyInto(out *InputMirrorSpec) { + *out = *in + out.SymphonyRef = in.SymphonyRef + out.SourceResource = in.SourceResource +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirrorSpec. +func (in *InputMirrorSpec) DeepCopy() *InputMirrorSpec { + if in == nil { + return nil + } + out := new(InputMirrorSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InputMirrorStatus) DeepCopyInto(out *InputMirrorStatus) { + *out = *in + if in.Data != nil { + in, out := &in.Data, &out.Data + *out = new(runtime.RawExtension) + (*in).DeepCopyInto(*out) + } + if in.LastSyncTime != nil { + in, out := &in.LastSyncTime, &out.LastSyncTime + *out = (*in).DeepCopy() + } + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]metav1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirrorStatus. +func (in *InputMirrorStatus) DeepCopy() *InputMirrorStatus { + if in == nil { + return nil + } + out := new(InputMirrorStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RemoteResourceSelector) DeepCopyInto(out *RemoteResourceSelector) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RemoteResourceSelector. +func (in *RemoteResourceSelector) DeepCopy() *RemoteResourceSelector { + if in == nil { + return nil + } + out := new(RemoteResourceSelector) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RemoteCredentials) DeepCopyInto(out *RemoteCredentials) { + *out = *in + out.SecretRef = in.SecretRef +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RemoteCredentials. +func (in *RemoteCredentials) DeepCopy() *RemoteCredentials { + if in == nil { + return nil + } + out := new(RemoteCredentials) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RemoteResourceRef) DeepCopyInto(out *RemoteResourceRef) { + *out = *in + out.Resource = in.Resource + if in.SyncInterval != nil { + in, out := &in.SyncInterval, &out.SyncInterval + *out = new(metav1.Duration) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RemoteResourceRef. +func (in *RemoteResourceRef) DeepCopy() *RemoteResourceRef { + if in == nil { + return nil + } + out := new(RemoteResourceRef) + in.DeepCopyInto(out) + return out +} diff --git a/cmd/eno-controller/main.go b/cmd/eno-controller/main.go index f0135384..d8c66729 100644 --- a/cmd/eno-controller/main.go +++ b/cmd/eno-controller/main.go @@ -19,6 +19,7 @@ import ( v1 "github.com/Azure/eno/api/v1" "github.com/Azure/eno/internal/controllers/composition" + "github.com/Azure/eno/internal/controllers/remotesync" "github.com/Azure/eno/internal/controllers/resourceslice" "github.com/Azure/eno/internal/controllers/scheduling" "github.com/Azure/eno/internal/controllers/symphony" @@ -170,6 +171,11 @@ func runController() error { return fmt.Errorf("constructing symphony controller: %w", err) } + err = remotesync.NewController(mgr) + if err != nil { + return fmt.Errorf("constructing remote sync controller: %w", err) + } + return mgr.Start(ctx) } diff --git a/internal/controllers/remotesync/controller.go b/internal/controllers/remotesync/controller.go new file mode 100644 index 00000000..48b90a55 --- /dev/null +++ b/internal/controllers/remotesync/controller.go @@ -0,0 +1,536 @@ +// Package overlaysync implements the OverlaySyncController which syncs resources +// from overlay clusters to the underlay as InputMirror resources. +// +// SECURITY CONSIDERATIONS: +// - Overlay credentials are stored in Secrets and never logged +// - Secret access is restricted to the Symphony's namespace by default +// - REST client has timeouts to prevent resource exhaustion +// - Cached clients are invalidated on credential rotation +// - Only specified resource types can be synced (no arbitrary access) +package overlaysync + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "sync" + "time" + + apiv1 "github.com/Azure/eno/api/v1" + "github.com/Azure/eno/internal/manager" + "github.com/go-logr/logr" + "golang.org/x/sync/errgroup" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/clientcmd" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +const ( + // ConditionTypeSynced indicates whether the InputMirror has been successfully synced + ConditionTypeSynced = "Synced" + + // DefaultSyncInterval is the default interval for re-syncing overlay resources + DefaultSyncInterval = 5 * time.Minute + + // FinalizerName is the finalizer added to InputMirrors + FinalizerName = "eno.azure.io/overlay-sync" + + // Client timeout settings for security + overlayClientTimeout = 30 * time.Second + overlayClientQPS = 5 + overlayClientBurst = 10 + + // maxSyncConcurrency limits parallel overlay resource fetches per Symphony. + // This prevents overwhelming the overlay cluster's API server while still + // providing significant speedup over sequential syncing. + // With 100 refs at ~50ms each: sequential = 5s, parallel (10) = 500ms + maxSyncConcurrency = 10 +) + +// AllowedSyncKinds defines which resource kinds can be synced from overlay. +// This is a security control to prevent syncing sensitive resources. +var AllowedSyncKinds = map[schema.GroupKind]bool{ + {Group: "", Kind: "ConfigMap"}: true, + // Add other allowed kinds here as needed + // Explicitly NOT allowing: Secret, ServiceAccount, etc. +} + +// overlayClient holds a cached client for an overlay cluster +type overlayClient struct { + client client.Client + createdAt time.Time + credentialHash string // Hash of credentials to detect rotation +} + +// Controller reconciles Symphonies with overlay resource refs, syncing resources +// from overlay clusters to InputMirror resources on the underlay. +type Controller struct { + client client.Client + scheme *runtime.Scheme + + // overlayClients caches overlay cluster clients keyed by symphony namespace/name + overlayClients sync.Map + + // clientCacheTTL determines how long overlay clients are cached + clientCacheTTL time.Duration + + // allowedKinds can be overridden for testing + allowedKinds map[schema.GroupKind]bool +} + +// NewController creates a new OverlaySyncController and registers it with the manager. +func NewController(mgr ctrl.Manager) error { + c := &Controller{ + client: mgr.GetClient(), + scheme: mgr.GetScheme(), + clientCacheTTL: 10 * time.Minute, + allowedKinds: AllowedSyncKinds, + } + + return ctrl.NewControllerManagedBy(mgr). + For(&apiv1.Symphony{}). + Owns(&apiv1.InputMirror{}). + WithLogConstructor(manager.NewLogConstructor(mgr, "overlaySyncController")). + Complete(c) +} + +func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := logr.FromContextOrDiscard(ctx) + + symphony := &apiv1.Symphony{} + if err := c.client.Get(ctx, req.NamespacedName, symphony); err != nil { + if errors.IsNotFound(err) { + // Symphony deleted, overlay clients will be cleaned up by GC + c.overlayClients.Delete(req.String()) + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + logger = logger.WithValues( + "symphonyName", symphony.Name, + "symphonyNamespace", symphony.Namespace, + ) + ctx = logr.NewContext(ctx, logger) + + // Skip if no overlay resource refs defined + if len(symphony.Spec.OverlayResourceRefs) == 0 { + return ctrl.Result{}, nil + } + + // Skip if no overlay credentials provided + if symphony.Spec.OverlayCredentials == nil { + logger.V(1).Info("symphony has overlay resource refs but no credentials, skipping") + return ctrl.Result{}, nil + } + + // Handle deletion + if symphony.DeletionTimestamp != nil { + // InputMirrors will be garbage collected via owner references + c.overlayClients.Delete(req.String()) + return ctrl.Result{}, nil + } + + // Get or create overlay client + overlayClient, err := c.getOrCreateOverlayClient(ctx, symphony) + if err != nil { + logger.Error(err, "failed to create overlay client") + return ctrl.Result{RequeueAfter: time.Minute}, nil + } + + // Sync all overlay resource refs in parallel with bounded concurrency + minRequeue := c.syncOverlayResourcesParallel(ctx, symphony, overlayClient) + + // Clean up InputMirrors for refs that no longer exist + if err := c.cleanupOrphanedMirrors(ctx, symphony); err != nil { + logger.Error(err, "failed to cleanup orphaned mirrors") + } + + if minRequeue > 0 { + return ctrl.Result{RequeueAfter: minRequeue}, nil + } + return ctrl.Result{RequeueAfter: DefaultSyncInterval}, nil +} + +// hashCredentials creates a SHA256 hash of credential data for change detection. +// This allows detecting credential rotation without storing the credentials. +func hashCredentials(data []byte) string { + h := sha256.Sum256(data) + return hex.EncodeToString(h[:]) +} + +// getOrCreateOverlayClient gets a cached overlay client or creates a new one. +// Security: Credentials are never logged, client has timeouts, cache invalidates on rotation. +func (c *Controller) getOrCreateOverlayClient(ctx context.Context, symphony *apiv1.Symphony) (client.Client, error) { + logger := logr.FromContextOrDiscard(ctx) + key := fmt.Sprintf("%s/%s", symphony.Namespace, symphony.Name) + + // Get the kubeconfig secret first to check for credential rotation + creds := symphony.Spec.OverlayCredentials + secret := &corev1.Secret{} + secretKey := types.NamespacedName{ + Name: creds.SecretRef.Name, + Namespace: creds.SecretRef.Namespace, + } + + // SECURITY: Only allow accessing secrets in the Symphony's namespace + // This prevents cross-namespace credential access + if secretKey.Namespace == "" { + secretKey.Namespace = symphony.Namespace + } + if secretKey.Namespace != symphony.Namespace { + return nil, fmt.Errorf("security: credential secret must be in symphony namespace %q, got %q", + symphony.Namespace, secretKey.Namespace) + } + + if err := c.client.Get(ctx, secretKey, secret); err != nil { + return nil, fmt.Errorf("getting overlay credentials secret: %w", err) + } + + // Get kubeconfig data - NEVER log this + kubeconfigKey := creds.Key + if kubeconfigKey == "" { + kubeconfigKey = "kubeconfig" + } + kubeconfigData, ok := secret.Data[kubeconfigKey] + if !ok { + return nil, fmt.Errorf("kubeconfig key %q not found in secret", kubeconfigKey) + } + + // Hash credentials to detect rotation without storing them + credHash := hashCredentials(kubeconfigData) + + // Check cache - invalidate if credentials changed or TTL expired + if cached, ok := c.overlayClients.Load(key); ok { + oc := cached.(*overlayClient) + if time.Since(oc.createdAt) < c.clientCacheTTL && oc.credentialHash == credHash { + return oc.client, nil + } + // Cache expired or credentials rotated + logger.V(1).Info("invalidating cached overlay client", + "reason", map[bool]string{true: "credential_rotation", false: "ttl_expired"}[oc.credentialHash != credHash]) + } + + // Create REST config from kubeconfig + restConfig, err := clientcmd.RESTConfigFromKubeConfig(kubeconfigData) + if err != nil { + return nil, fmt.Errorf("parsing kubeconfig: %w", err) + } + + // SECURITY: Apply rate limiting and timeouts to prevent resource exhaustion + restConfig.Timeout = overlayClientTimeout + restConfig.QPS = overlayClientQPS + restConfig.Burst = overlayClientBurst + + // Set a meaningful user agent for audit logs on the overlay + restConfig.UserAgent = "eno-overlay-sync-controller" + + // Create client + oc, err := client.New(restConfig, client.Options{}) + if err != nil { + return nil, fmt.Errorf("creating overlay client: %w", err) + } + + // Cache the client with credential hash for rotation detection + c.overlayClients.Store(key, &overlayClient{ + client: oc, + createdAt: time.Now(), + credentialHash: credHash, + }) + + // SECURITY: Don't log secret name in production, only log that client was created + logger.V(1).Info("created overlay client") + return oc, nil +} + +// syncResult holds the result of syncing a single overlay resource +type syncResult struct { + key string + requeue time.Duration + err error +} + +// syncOverlayResourcesParallel syncs all overlay resource refs in parallel with bounded concurrency. +// This reduces reconcile latency from O(n * latency) to O(n/concurrency * latency). +// For example, with 100 refs at ~50ms each: sequential = 5s, parallel (10) = ~500ms. +func (c *Controller) syncOverlayResourcesParallel( + ctx context.Context, + symphony *apiv1.Symphony, + overlayClient client.Client, +) time.Duration { + logger := logr.FromContextOrDiscard(ctx) + refs := symphony.Spec.OverlayResourceRefs + + if len(refs) == 0 { + return DefaultSyncInterval + } + + // Use a semaphore to limit concurrent overlay API calls + sem := make(chan struct{}, maxSyncConcurrency) + results := make(chan syncResult, len(refs)) + + // Use errgroup for structured concurrency, but we don't fail fast on errors + // since we want to sync as many refs as possible + g, ctx := errgroup.WithContext(ctx) + + for _, ref := range refs { + ref := ref // capture loop variable + g.Go(func() error { + // Acquire semaphore + select { + case sem <- struct{}{}: + defer func() { <-sem }() + case <-ctx.Done(): + results <- syncResult{key: ref.Key, err: ctx.Err()} + return nil + } + + requeue, err := c.syncOverlayResource(ctx, symphony, overlayClient, ref) + results <- syncResult{key: ref.Key, requeue: requeue, err: err} + return nil // Don't propagate errors - we handle them individually + }) + } + + // Wait for all goroutines to complete + _ = g.Wait() + close(results) + + // Process results and determine minimum requeue time + var minRequeue time.Duration + var successCount, failCount int + + for result := range results { + if result.err != nil { + logger.Error(result.err, "failed to sync overlay resource", "key", result.key) + failCount++ + continue + } + successCount++ + if result.requeue > 0 && (minRequeue == 0 || result.requeue < minRequeue) { + minRequeue = result.requeue + } + } + + logger.V(1).Info("completed parallel overlay sync", + "total", len(refs), + "success", successCount, + "failed", failCount, + "minRequeue", minRequeue, + ) + + if minRequeue == 0 { + return DefaultSyncInterval + } + return minRequeue +} + +// syncOverlayResource syncs a single overlay resource to an InputMirror +func (c *Controller) syncOverlayResource( + ctx context.Context, + symphony *apiv1.Symphony, + overlayClient client.Client, + ref apiv1.OverlayResourceRef, +) (time.Duration, error) { + logger := logr.FromContextOrDiscard(ctx).WithValues("key", ref.Key, "resourceName", ref.Resource.Name) + + // SECURITY: Validate the resource kind is allowed to be synced + gk := schema.GroupKind{Group: ref.Resource.Group, Kind: ref.Resource.Kind} + if !c.allowedKinds[gk] { + return 0, fmt.Errorf("security: resource kind %q is not allowed to be synced from overlay", gk.String()) + } + + // Fetch from overlay + obj := &unstructured.Unstructured{} + obj.SetGroupVersionKind(schema.GroupVersionKind{ + Group: ref.Resource.Group, + Version: ref.Resource.Version, + Kind: ref.Resource.Kind, + }) + + objKey := types.NamespacedName{ + Name: ref.Resource.Name, + Namespace: ref.Resource.Namespace, + } + + err := overlayClient.Get(ctx, objKey, obj) + if err != nil { + if errors.IsNotFound(err) && ref.Optional { + logger.V(1).Info("optional overlay resource not found, skipping") + // Update InputMirror to reflect missing state + return c.updateMirrorMissing(ctx, symphony, ref) + } + return 0, fmt.Errorf("getting overlay resource: %w", err) + } + + // Create/Update InputMirror + mirrorName := inputMirrorName(symphony.Name, ref.Key) + mirror := &apiv1.InputMirror{ + ObjectMeta: metav1.ObjectMeta{ + Name: mirrorName, + Namespace: symphony.Namespace, + }, + } + + result, err := controllerutil.CreateOrUpdate(ctx, c.client, mirror, func() error { + // Set owner reference + if err := controllerutil.SetControllerReference(symphony, mirror, c.scheme); err != nil { + return err + } + + // Update spec + mirror.Spec.Key = ref.Key + mirror.Spec.SymphonyRef = corev1.LocalObjectReference{Name: symphony.Name} + mirror.Spec.SourceResource = ref.Resource + + return nil + }) + + if err != nil { + return 0, fmt.Errorf("creating/updating InputMirror: %w", err) + } + + // Update status separately - CreateOrUpdate only updates spec, not status subresource + rawData, err := json.Marshal(obj.Object) + if err != nil { + return 0, fmt.Errorf("marshaling resource data: %w", err) + } + mirror.Status.Data = &runtime.RawExtension{Raw: rawData} + mirror.Status.LastSyncTime = &metav1.Time{Time: time.Now()} + mirror.Status.SyncGeneration = obj.GetResourceVersion() + + // Update conditions + setSyncedCondition(mirror, true, "SyncSuccess", "Successfully synced from overlay cluster") + + if err := c.client.Status().Update(ctx, mirror); err != nil { + return 0, fmt.Errorf("updating InputMirror status: %w", err) + } + + logger.V(1).Info("synced overlay resource", "result", result, "mirrorName", mirrorName) + + // Determine requeue interval + syncInterval := DefaultSyncInterval + if ref.SyncInterval != nil { + syncInterval = ref.SyncInterval.Duration + } + return syncInterval, nil +} + +// updateMirrorMissing updates the InputMirror to reflect that the source resource is missing +func (c *Controller) updateMirrorMissing( + ctx context.Context, + symphony *apiv1.Symphony, + ref apiv1.OverlayResourceRef, +) (time.Duration, error) { + mirrorName := inputMirrorName(symphony.Name, ref.Key) + mirror := &apiv1.InputMirror{} + err := c.client.Get(ctx, types.NamespacedName{Name: mirrorName, Namespace: symphony.Namespace}, mirror) + if errors.IsNotFound(err) { + // No mirror exists, nothing to update + return DefaultSyncInterval, nil + } + if err != nil { + return 0, err + } + + // Update condition to reflect missing state + setSyncedCondition(mirror, false, "SourceNotFound", "Optional source resource not found in overlay") + mirror.Status.Data = nil + + if err := c.client.Status().Update(ctx, mirror); err != nil { + return 0, err + } + + syncInterval := DefaultSyncInterval + if ref.SyncInterval != nil { + syncInterval = ref.SyncInterval.Duration + } + return syncInterval, nil +} + +// cleanupOrphanedMirrors removes InputMirrors for refs that no longer exist in the Symphony +func (c *Controller) cleanupOrphanedMirrors(ctx context.Context, symphony *apiv1.Symphony) error { + logger := logr.FromContextOrDiscard(ctx) + + // List all InputMirrors owned by this Symphony + mirrors := &apiv1.InputMirrorList{} + if err := c.client.List(ctx, mirrors, + client.InNamespace(symphony.Namespace), + client.MatchingFields{"spec.symphonyRef.name": symphony.Name}, + ); err != nil { + // If the index isn't set up, fall back to filtering manually + if err := c.client.List(ctx, mirrors, client.InNamespace(symphony.Namespace)); err != nil { + return err + } + } + + // Build set of expected mirror names + expected := make(map[string]struct{}) + for _, ref := range symphony.Spec.OverlayResourceRefs { + expected[inputMirrorName(symphony.Name, ref.Key)] = struct{}{} + } + + // Delete orphaned mirrors + for _, mirror := range mirrors.Items { + // Check if owned by this symphony + if mirror.Spec.SymphonyRef.Name != symphony.Name { + continue + } + if _, ok := expected[mirror.Name]; !ok { + logger.V(1).Info("deleting orphaned InputMirror", "mirrorName", mirror.Name) + if err := c.client.Delete(ctx, &mirror); err != nil && !errors.IsNotFound(err) { + return err + } + } + } + + return nil +} + +// inputMirrorName generates the name for an InputMirror +func inputMirrorName(symphonyName, key string) string { + return fmt.Sprintf("%s-%s", symphonyName, key) +} + +// setSyncedCondition updates the Synced condition on an InputMirror +func setSyncedCondition(mirror *apiv1.InputMirror, synced bool, reason, message string) { + status := metav1.ConditionFalse + if synced { + status = metav1.ConditionTrue + } + + now := metav1.Now() + condition := metav1.Condition{ + Type: ConditionTypeSynced, + Status: status, + ObservedGeneration: mirror.Generation, + LastTransitionTime: now, + Reason: reason, + Message: message, + } + + // Find and update existing condition or append + for i, c := range mirror.Status.Conditions { + if c.Type == ConditionTypeSynced { + if c.Status != condition.Status { + mirror.Status.Conditions[i] = condition + } else { + // Only update reason/message, keep transition time + mirror.Status.Conditions[i].Reason = reason + mirror.Status.Conditions[i].Message = message + mirror.Status.Conditions[i].ObservedGeneration = mirror.Generation + } + return + } + } + mirror.Status.Conditions = append(mirror.Status.Conditions, condition) +} diff --git a/internal/controllers/remotesync/controller_test.go b/internal/controllers/remotesync/controller_test.go new file mode 100644 index 00000000..ba4b63e3 --- /dev/null +++ b/internal/controllers/remotesync/controller_test.go @@ -0,0 +1,172 @@ +package overlaysync + +import ( + "context" + "testing" + "time" + + apiv1 "github.com/Azure/eno/api/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +func TestInputMirrorName(t *testing.T) { + tests := []struct { + symphonyName string + key string + expected string + }{ + {"symphony-123", "metricsSettings", "symphony-123-metricsSettings"}, + {"my-symphony", "config", "my-symphony-config"}, + } + + for _, tt := range tests { + t.Run(tt.expected, func(t *testing.T) { + result := inputMirrorName(tt.symphonyName, tt.key) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestSetSyncedCondition(t *testing.T) { + mirror := &apiv1.InputMirror{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-mirror", + Namespace: "test-ns", + Generation: 1, + }, + } + + // Test setting synced=true + setSyncedCondition(mirror, true, "SyncSuccess", "Successfully synced") + + require.Len(t, mirror.Status.Conditions, 1) + assert.Equal(t, ConditionTypeSynced, mirror.Status.Conditions[0].Type) + assert.Equal(t, metav1.ConditionTrue, mirror.Status.Conditions[0].Status) + assert.Equal(t, "SyncSuccess", mirror.Status.Conditions[0].Reason) + assert.Equal(t, "Successfully synced", mirror.Status.Conditions[0].Message) + + // Test updating to synced=false + setSyncedCondition(mirror, false, "SyncFailed", "Failed to sync") + + require.Len(t, mirror.Status.Conditions, 1) + assert.Equal(t, metav1.ConditionFalse, mirror.Status.Conditions[0].Status) + assert.Equal(t, "SyncFailed", mirror.Status.Conditions[0].Reason) +} + +func TestReconcile_NoOverlayRefs(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, apiv1.SchemeBuilder.AddToScheme(scheme)) + require.NoError(t, corev1.AddToScheme(scheme)) + + symphony := &apiv1.Symphony{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-symphony", + Namespace: "test-ns", + }, + Spec: apiv1.SymphonySpec{ + // No OverlayResourceRefs + }, + } + + client := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(symphony). + Build() + + controller := &Controller{ + client: client, + scheme: scheme, + clientCacheTTL: 10 * time.Minute, + } + + result, err := controller.Reconcile(context.Background(), reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "test-symphony", + Namespace: "test-ns", + }, + }) + + require.NoError(t, err) + assert.Equal(t, reconcile.Result{}, result) +} + +func TestReconcile_NoCredentials(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, apiv1.SchemeBuilder.AddToScheme(scheme)) + require.NoError(t, corev1.AddToScheme(scheme)) + + symphony := &apiv1.Symphony{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-symphony", + Namespace: "test-ns", + }, + Spec: apiv1.SymphonySpec{ + OverlayResourceRefs: []apiv1.OverlayResourceRef{ + { + Key: "test", + Resource: apiv1.OverlayResourceSelector{ + Kind: "ConfigMap", + Version: "v1", + Name: "test-cm", + }, + }, + }, + // No OverlayCredentials + }, + } + + client := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(symphony). + Build() + + controller := &Controller{ + client: client, + scheme: scheme, + clientCacheTTL: 10 * time.Minute, + } + + result, err := controller.Reconcile(context.Background(), reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "test-symphony", + Namespace: "test-ns", + }, + }) + + require.NoError(t, err) + // Should return empty result since no credentials + assert.Equal(t, reconcile.Result{}, result) +} + +func TestReconcile_SymphonyNotFound(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, apiv1.SchemeBuilder.AddToScheme(scheme)) + require.NoError(t, corev1.AddToScheme(scheme)) + + client := fake.NewClientBuilder(). + WithScheme(scheme). + Build() + + controller := &Controller{ + client: client, + scheme: scheme, + clientCacheTTL: 10 * time.Minute, + } + + result, err := controller.Reconcile(context.Background(), reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "non-existent", + Namespace: "test-ns", + }, + }) + + require.NoError(t, err) + assert.Equal(t, reconcile.Result{}, result) +}