From c02d2f1829d3cb3b4a851d28d0c5268e77eb35fe Mon Sep 17 00:00:00 2001 From: Jonasz Lasut-Balcerzak Date: Tue, 2 Sep 2025 12:11:54 +0200 Subject: [PATCH 1/8] Initial operations support --- fn.go | 47 ++++++++++ fn_test.go | 261 ++++++++++++++++++++++++++++++++++++++++++++++++++++- go.mod | 8 +- go.sum | 28 +++--- 4 files changed, 322 insertions(+), 22 deletions(-) diff --git a/fn.go b/fn.go index 41f68d2..b423e28 100644 --- a/fn.go +++ b/fn.go @@ -24,6 +24,7 @@ import ( fnv1 "github.com/crossplane/function-sdk-go/proto/v1" "github.com/crossplane/function-sdk-go/request" "github.com/crossplane/function-sdk-go/resource" + "github.com/crossplane/function-sdk-go/resource/composite" "github.com/crossplane/function-sdk-go/response" ) @@ -138,6 +139,13 @@ func (f *Function) getXRAndStatus(req *fnv1.RunFunctionRequest) (map[string]inte // getObservedAndDesired gets both observed and desired XR resources func (f *Function) getObservedAndDesired(req *fnv1.RunFunctionRequest) (*resource.Composite, *resource.Composite, error) { + if req.GetObserved().GetComposite() != nil { + return getObservedAndDesiredInComposition(req) + } + return getObservedAndDesiredInOperation(req) +} + +func getObservedAndDesiredInComposition(req *fnv1.RunFunctionRequest) (*resource.Composite, *resource.Composite, error) { oxr, err := request.GetObservedCompositeResource(req) if err != nil { return nil, nil, errors.Wrap(err, "cannot get observed composite resource") @@ -151,6 +159,45 @@ func (f *Function) getObservedAndDesired(req *fnv1.RunFunctionRequest) (*resourc return oxr, dxr, nil } +func getObservedAndDesiredInOperation(req *fnv1.RunFunctionRequest) (*resource.Composite, *resource.Composite, error) { + rr, err := request.GetRequiredResources(req) + if err != nil { + return nil, nil, errors.Wrap(err, "operation: cannot get required resources") + } + + rs, found := rr["ops.crossplane.io/watched-resource"] + if !found { + return nil, nil, fmt.Errorf("operation: no resource to process with name %s", "ops.crossplane.io/watched-resource") + } + + if len(rs) != 1 { + return nil, nil, fmt.Errorf("operation: incorrect number of resources sent to the function. expected 1, got %d", len(rs)) + } + + r := rs[0] + if r.Resource == nil { + return nil, nil, errors.New("operation: Resource property in operation resource can not be nil") + } + + if len(r.Resource.Object) == 0 { + return nil, nil, errors.New("operation: Resource.Object property in operation resource can not be empty") + } + + oxr := &resource.Composite{ + Resource: composite.New(), + ConnectionDetails: make(resource.ConnectionDetails), + } + dxr := &resource.Composite{ + Resource: composite.New(), + ConnectionDetails: make(resource.ConnectionDetails), + } + + oxr.Resource.Object = r.Resource.Object + dxr.Resource.Object = r.Resource.Object + + return oxr, dxr, nil +} + // initializeAndCopyData initializes metadata and copies spec func (f *Function) initializeAndCopyData(oxr, dxr *resource.Composite) { // Initialize dxr from oxr if needed diff --git a/fn_test.go b/fn_test.go index 406957e..daabb59 100644 --- a/fn_test.go +++ b/fn_test.go @@ -1660,6 +1660,11 @@ func TestRunFunction(t *testing.T) { "queryType": "UserValidation", "users": ["user@example.com"] }`), + Observed: &fnv1.State{ + Composite: &fnv1.Resource{ + Resource: resource.MustStructJSON(xr), + }, + }, }, }, want: want{ @@ -1675,8 +1680,14 @@ func TestRunFunction(t *testing.T) { Desired: &fnv1.State{ Composite: &fnv1.Resource{ Resource: resource.MustStructJSON(`{ - "apiVersion": "", - "kind": "" + "apiVersion": "example.org/v1", + "kind": "XR", + "metadata": { + "name": "cool-xr" + }, + "spec": { + "count": 2 + } }`), }, }, @@ -1694,6 +1705,11 @@ func TestRunFunction(t *testing.T) { "queryType": "UserValidation", "users": ["user@example.com"] }`), + Observed: &fnv1.State{ + Composite: &fnv1.Resource{ + Resource: resource.MustStructJSON(xr), + }, + }, Credentials: map[string]*fnv1.Credentials{ "azure-creds": { Source: &fnv1.Credentials_CredentialData{CredentialData: creds}, @@ -1714,8 +1730,14 @@ func TestRunFunction(t *testing.T) { Desired: &fnv1.State{ Composite: &fnv1.Resource{ Resource: resource.MustStructJSON(`{ - "apiVersion": "", - "kind": "" + "apiVersion": "example.org/v1", + "kind": "XR", + "metadata": { + "name": "cool-xr" + }, + "spec": { + "count": 2 + } }`), }, }, @@ -2413,6 +2435,237 @@ func TestRunFunction(t *testing.T) { }, }, }, + "OperationWithoutWatchedResource": { + reason: "The Function should return fatal if it runs as operation without a watched resource", + args: args{ + ctx: context.Background(), + req: &fnv1.RunFunctionRequest{ + Meta: &fnv1.RequestMeta{Tag: "hello"}, + Input: resource.MustStructJSON(`{ + "apiVersion": "msgraph.fn.crossplane.io/v1alpha1", + "kind": "Input", + "queryType": "UserValidation", + "users": ["user@example.com"], + "target": "context.validatedUsers" + }`), + Credentials: map[string]*fnv1.Credentials{ + "azure-creds": { + Source: &fnv1.Credentials_CredentialData{CredentialData: creds}, + }, + }, + RequiredResources: map[string]*fnv1.Resources{}, + }, + }, + want: want{ + rsp: &fnv1.RunFunctionResponse{ + Meta: &fnv1.ResponseMeta{Tag: "hello", Ttl: durationpb.New(response.DefaultTTL)}, + Results: []*fnv1.Result{ + { + Severity: fnv1.Severity_SEVERITY_FATAL, + Message: `operation: no resource to process with name ops.crossplane.io/watched-resource`, + Target: fnv1.Target_TARGET_COMPOSITE.Enum(), + }, + }, + }, + }, + }, + "OperationWithLessThanOneWatchedResource": { + reason: "The Function should return fatal if it runs as operation with less than one watched resource", + args: args{ + ctx: context.Background(), + req: &fnv1.RunFunctionRequest{ + Meta: &fnv1.RequestMeta{Tag: "hello"}, + Input: resource.MustStructJSON(`{ + "apiVersion": "msgraph.fn.crossplane.io/v1alpha1", + "kind": "Input", + "queryType": "UserValidation", + "users": ["user@example.com"], + "target": "context.validatedUsers" + }`), + Credentials: map[string]*fnv1.Credentials{ + "azure-creds": { + Source: &fnv1.Credentials_CredentialData{CredentialData: creds}, + }, + }, + RequiredResources: map[string]*fnv1.Resources{ + "ops.crossplane.io/watched-resource": { + Items: nil, + }, + }, + }, + }, + want: want{ + rsp: &fnv1.RunFunctionResponse{ + Meta: &fnv1.ResponseMeta{Tag: "hello", Ttl: durationpb.New(response.DefaultTTL)}, + Results: []*fnv1.Result{ + { + Severity: fnv1.Severity_SEVERITY_FATAL, + Message: `operation: incorrect number of resources sent to the function. expected 1, got 0`, + Target: fnv1.Target_TARGET_COMPOSITE.Enum(), + }, + }, + }, + }, + }, + "OperationWithMoreThanOneWatchedResource": { + reason: "The Function should return fatal if it runs as operation with more than one watched resource", + args: args{ + ctx: context.Background(), + req: &fnv1.RunFunctionRequest{ + Meta: &fnv1.RequestMeta{Tag: "hello"}, + Input: resource.MustStructJSON(`{ + "apiVersion": "msgraph.fn.crossplane.io/v1alpha1", + "kind": "Input", + "queryType": "UserValidation", + "users": ["user@example.com"], + "target": "context.validatedUsers" + }`), + Credentials: map[string]*fnv1.Credentials{ + "azure-creds": { + Source: &fnv1.Credentials_CredentialData{CredentialData: creds}, + }, + }, + RequiredResources: map[string]*fnv1.Resources{ + "ops.crossplane.io/watched-resource": { + Items: []*fnv1.Resource{ + { + Resource: resource.MustStructJSON(xr), + }, + { + Resource: resource.MustStructJSON(xr), + }, + }, + }, + }, + }, + }, + want: want{ + rsp: &fnv1.RunFunctionResponse{ + Meta: &fnv1.ResponseMeta{Tag: "hello", Ttl: durationpb.New(response.DefaultTTL)}, + Results: []*fnv1.Result{ + { + Severity: fnv1.Severity_SEVERITY_FATAL, + Message: `operation: incorrect number of resources sent to the function. expected 1, got 2`, + Target: fnv1.Target_TARGET_COMPOSITE.Enum(), + }, + }, + }, + }, + }, + "OperationWithNilObjectInWatchedResource": { + reason: "The Function should return fatal if it runs as operation watched resource with zero length Resource.Object", + args: args{ + ctx: context.Background(), + req: &fnv1.RunFunctionRequest{ + Meta: &fnv1.RequestMeta{Tag: "hello"}, + Input: resource.MustStructJSON(`{ + "apiVersion": "msgraph.fn.crossplane.io/v1alpha1", + "kind": "Input", + "queryType": "UserValidation", + "users": ["user@example.com"], + "target": "context.validatedUsers" + }`), + Credentials: map[string]*fnv1.Credentials{ + "azure-creds": { + Source: &fnv1.Credentials_CredentialData{CredentialData: creds}, + }, + }, + RequiredResources: map[string]*fnv1.Resources{ + "ops.crossplane.io/watched-resource": { + Items: []*fnv1.Resource{ + {}, + }, + }, + }, + }, + }, + want: want{ + rsp: &fnv1.RunFunctionResponse{ + Meta: &fnv1.ResponseMeta{Tag: "hello", Ttl: durationpb.New(response.DefaultTTL)}, + Results: []*fnv1.Result{ + { + Severity: fnv1.Severity_SEVERITY_FATAL, + Message: `operation: Resource.Object property in operation resource can not be empty`, + Target: fnv1.Target_TARGET_COMPOSITE.Enum(), + }, + }, + }, + }, + }, + "OperationWithWatchedResource": { + reason: "The Function should return fatal if it runs as operation watched resource with nil Resource", + args: args{ + ctx: context.Background(), + req: &fnv1.RunFunctionRequest{ + Meta: &fnv1.RequestMeta{Tag: "hello"}, + Input: resource.MustStructJSON(`{ + "apiVersion": "msgraph.fn.crossplane.io/v1alpha1", + "kind": "Input", + "queryType": "UserValidation", + "users": ["user@example.com"], + "target": "status.validatedUsers" + }`), + Credentials: map[string]*fnv1.Credentials{ + "azure-creds": { + Source: &fnv1.Credentials_CredentialData{CredentialData: creds}, + }, + }, + RequiredResources: map[string]*fnv1.Resources{ + "ops.crossplane.io/watched-resource": { + Items: []*fnv1.Resource{ + { + Resource: resource.MustStructJSON(xr), + }, + }, + }, + }, + }, + }, + want: want{ + rsp: &fnv1.RunFunctionResponse{ + Meta: &fnv1.ResponseMeta{Tag: "hello", Ttl: durationpb.New(response.DefaultTTL)}, + Conditions: []*fnv1.Condition{ + { + Type: "FunctionSuccess", + Status: fnv1.Status_STATUS_CONDITION_TRUE, + Reason: "Success", + Target: fnv1.Target_TARGET_COMPOSITE_AND_CLAIM.Enum(), + }, + }, + Results: []*fnv1.Result{ + { + Severity: fnv1.Severity_SEVERITY_NORMAL, + Message: `QueryType: "UserValidation"`, + Target: fnv1.Target_TARGET_COMPOSITE.Enum(), + }, + }, + Desired: &fnv1.State{ + Composite: &fnv1.Resource{ + Resource: resource.MustStructJSON(`{ + "apiVersion": "example.org/v1", + "kind": "XR", + "metadata": { + "name": "cool-xr" + }, + "spec": { + "count": 2 + }, + "status": { + "validatedUsers": [ + { + "id": "test-user-id", + "displayName": "Test User", + "userPrincipalName": "user@example.com", + "mail": "user@example.com" + } + ] + } + }`), + }, + }, + }, + }, + }, } for name, tc := range cases { diff --git a/go.mod b/go.mod index d1acfb1..9cfb96b 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.11.0 github.com/alecthomas/kong v1.12.1 github.com/crossplane/crossplane-runtime v1.20.0 - github.com/crossplane/function-sdk-go v0.4.0 + github.com/crossplane/function-sdk-go v0.5.0-rc.0.0.20250805171053-2910b68d255d github.com/google/go-cmp v0.7.0 github.com/microsoft/kiota-authentication-azure-go v1.3.1 github.com/microsoftgraph/msgraph-sdk-go v1.84.0 @@ -77,15 +77,15 @@ require ( golang.org/x/crypto v0.41.0 // indirect golang.org/x/mod v0.27.0 // indirect golang.org/x/net v0.43.0 // indirect - golang.org/x/oauth2 v0.27.0 // indirect + golang.org/x/oauth2 v0.28.0 // indirect golang.org/x/sync v0.16.0 // indirect golang.org/x/sys v0.35.0 // indirect golang.org/x/term v0.34.0 // indirect golang.org/x/text v0.28.0 // indirect golang.org/x/time v0.9.0 // indirect golang.org/x/tools v0.36.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect - google.golang.org/grpc v1.72.1 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect + google.golang.org/grpc v1.73.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 7926d27..3d5ac4b 100644 --- a/go.sum +++ b/go.sum @@ -42,8 +42,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6N github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/crossplane/crossplane-runtime v1.20.0 h1:I54uipRIecqZyms+vz1J/l62yjVQ7HV5w+Nh3RMrUtc= github.com/crossplane/crossplane-runtime v1.20.0/go.mod h1:lfV1VJenDc9PNVLxDC80YjPoTm+JdSZ13xlS2h37Dvg= -github.com/crossplane/function-sdk-go v0.4.0 h1:1jd+UIaZlVNQCUO4hLAgUqWBRnUKw2ObF9ZuMw5CpKk= -github.com/crossplane/function-sdk-go v0.4.0/go.mod h1:jLnzUG8pt8tn/U6/uvtNStAhDjhIq4wCR31yECT54NM= +github.com/crossplane/function-sdk-go v0.5.0-rc.0.0.20250805171053-2910b68d255d h1:bzt8qEg9I2GrLc216IuuTn4x+GECxc+DoGlDZ4PMuJY= +github.com/crossplane/function-sdk-go v0.5.0-rc.0.0.20250805171053-2910b68d255d/go.mod h1:fEwSBgMH6+kicaBeOWz6PZRwhjLg4tu9QEDeP/9O2yE= github.com/crossplane/upjet v1.4.1-0.20240911184956-3afbb7796d46 h1:2IH1YPTBrNmBj0Z1OCjEBTrQCuRaLutZbWLaswFeCFQ= github.com/crossplane/upjet v1.4.1-0.20240911184956-3afbb7796d46/go.mod h1:wkdZf/Cvhr6PI30VdHIOjg4dX39Z5uijqnLWFk5PbGM= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -274,10 +274,10 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 h1:tgJ0u go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0/go.mod h1:U7HYyW0zt/a9x5J1Kjs+r1f/d4ZHnYFclhYY2+YbeoE= go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= -go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= -go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= -go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= -go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= +go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= +go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= +go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o= +go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= @@ -309,8 +309,8 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= -golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M= -golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= +golang.org/x/oauth2 v0.28.0 h1:CrgCKl8PPAVtLnU3c+EDw6x11699EWlsDeWNWKdIOkc= +golang.org/x/oauth2 v0.28.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -351,12 +351,12 @@ gomodules.xyz/jsonpatch/v2 v2.4.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuB google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 h1:wpZ8pe2x1Q3f2KyT5f8oP/fa9rHAKgFPr/HZdNuS+PQ= -google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb h1:p31xT4yrYrSM/G4Sn2+TNUkVhFCbG9y8itM2S6Th950= -google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:jbe3Bkdp+Dh2IrslsFCklNhweNTBgSYanP1UXhJDhKg= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb h1:TLPQVbx1GJ8VKZxz52VAxl1EBgKXXbTiU9Fc5fZeLn4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= -google.golang.org/grpc v1.72.1 h1:HR03wO6eyZ7lknl75XlxABNVLLFc2PAb6mHlYh756mA= -google.golang.org/grpc v1.72.1/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM= +google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463 h1:hE3bRWtU6uceqlh4fhrSnUyjKHMKB9KrTLLG+bc0ddM= +google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463/go.mod h1:U90ffi8eUL9MwPcrJylN5+Mk2v3vuPDptd5yyNUiRR8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 h1:pFyd6EwwL2TqFf8emdthzeX+gZE1ElRq3iM8pui4KBY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok= +google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc= google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From 184b7a20ee3cfa5493f8cd9c7094941a7bcf9a1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonasz=20=C5=81asut-Balcerzak?= Date: Thu, 4 Sep 2025 17:12:00 +0200 Subject: [PATCH 2/8] Test Operations, add capabilities --- README.md | 91 +++++++++++++++++++ example/operations/cronoperation.yaml | 39 ++++++++ example/operations/operation.yaml | 34 +++++++ example/operations/watchoperation.yaml | 35 +++++++ fn.go | 2 + package/crossplane.yaml | 7 +- .../msgraph.fn.crossplane.io_inputs.yaml | 2 +- 7 files changed, 207 insertions(+), 3 deletions(-) create mode 100644 example/operations/cronoperation.yaml create mode 100644 example/operations/operation.yaml create mode 100644 example/operations/watchoperation.yaml diff --git a/README.md b/README.md index a8b8112..c01e5e9 100644 --- a/README.md +++ b/README.md @@ -365,6 +365,97 @@ identity: type: AzureWorkloadIdentityCredentials ``` +## Operations support +function-msgraph support every kind of [operations](https://docs.crossplane.io/latest/operations/operation/), however CronOperations and WatchOperations are the most useful in context of graph queries. +Check [examples](./example/operations/) + +### CronOperation +CronOperation may be used to forcefully update XR's status in a predefined interval. +That functionality may be especially useful for XRs that are business critical and should have the data refreshed without worrying about throttling. +Supports only singular resource reference. + +```yaml +apiVersion: ops.crossplane.io/v1alpha1 +kind: CronOperation +metadata: + name: update-user-validation-for-critical-xr +spec: + schedule: "*/5 * * * *" # Every 5 minutes + concurrencyPolicy: Forbid + successfulHistoryLimit: 5 + failedHistoryLimit: 3 + operationTemplate: + spec: + mode: Pipeline + pipeline: + - step: user-validation + functionRef: + name: function-msgraph + input: + apiVersion: msgraph.fn.crossplane.io/v1alpha1 + kind: Input + queryType: UserValidation + # Replace these with actual users in your directory + users: + - "admin@example.onmicrosoft.com" + - "user@example.onmicrosoft.com" + - "yury@upbound.io" + target: "status.validatedUsers" + skipQueryWhenTargetHasData: false # Always query even if data is in status + credentials: + - name: azure-creds + source: Secret + secretRef: + namespace: upbound-system + name: azure-account-creds + requirements: + requiredResources: + - requirementName: ops.crossplane.io/watched-resource + apiVersion: example.crossplane.io/v1 + kind: XR + name: business-critical-xr +``` +### WatchOperation +WatchOperation may be used to forcefully update XR's status based on match condition. +For example it may be useful to refresh status in business critical XR's that are labeled with label `always-update: "true"`. +```yaml +apiVersion: ops.crossplane.io/v1alpha1 +kind: WatchOperation +metadata: + name: update-user-validation-for-critical-xrs +spec: + watch: + apiVersion: example.crossplane.io/v1 + kind: XR + matchLabels: + always-update: "true" + concurrencyPolicy: Allow + operationTemplate: + spec: + mode: Pipeline + pipeline: + - step: user-validation + functionRef: + name: function-msgraph + input: + apiVersion: msgraph.fn.crossplane.io/v1alpha1 + kind: Input + queryType: UserValidation + # Replace these with actual users in your directory + users: + - "admin@example.onmicrosoft.com" + - "user@example.onmicrosoft.com" + - "yury@upbound.io" + target: "status.validatedUsers" + skipQueryWhenTargetHasData: false # Always query even if data is in status + credentials: + - name: azure-creds + source: Secret + secretRef: + namespace: upbound-system + name: azure-account-creds +``` + ## References - [Microsoft Graph API Overview](https://learn.microsoft.com/en-us/graph/api/overview?view=graph-rest-1.0) diff --git a/example/operations/cronoperation.yaml b/example/operations/cronoperation.yaml new file mode 100644 index 0000000..ee2e97b --- /dev/null +++ b/example/operations/cronoperation.yaml @@ -0,0 +1,39 @@ +apiVersion: ops.crossplane.io/v1alpha1 +kind: CronOperation +metadata: + name: update-user-validation-for-critical-xr +spec: + schedule: "*/1 * * * *" # Every minute + concurrencyPolicy: Forbid + successfulHistoryLimit: 5 + failedHistoryLimit: 3 + operationTemplate: + spec: + mode: Pipeline + pipeline: + - step: user-validation + functionRef: + name: function-msgraph + input: + apiVersion: msgraph.fn.crossplane.io/v1alpha1 + kind: Input + queryType: UserValidation + # Replace these with actual users in your directory + users: + - "admin@example.onmicrosoft.com" + - "user@example.onmicrosoft.com" + - "yury@upbound.io" + target: "status.validatedUsers" + skipQueryWhenTargetHasData: false # Always query even if data is in status + credentials: + - name: azure-creds + source: Secret + secretRef: + namespace: upbound-system + name: azure-account-creds + requirements: + requiredResources: + - requirementName: ops.crossplane.io/watched-resource + apiVersion: example.crossplane.io/v1 + kind: XR + name: business-critical-xr diff --git a/example/operations/operation.yaml b/example/operations/operation.yaml new file mode 100644 index 0000000..ee5b767 --- /dev/null +++ b/example/operations/operation.yaml @@ -0,0 +1,34 @@ +apiVersion: ops.crossplane.io/v1alpha1 +kind: Operation +metadata: + name: update-user-validation-for-critical-xr-once +spec: + spec: + mode: Pipeline + pipeline: + - step: user-validation + functionRef: + name: function-msgraph + input: + apiVersion: msgraph.fn.crossplane.io/v1alpha1 + kind: Input + queryType: UserValidation + # Replace these with actual users in your directory + users: + - "admin@example.onmicrosoft.com" + - "user@example.onmicrosoft.com" + - "yury@upbound.io" + target: "status.validatedUsers" + skipQueryWhenTargetHasData: false # Always query even if data is in status + credentials: + - name: azure-creds + source: Secret + secretRef: + namespace: upbound-system + name: azure-account-creds + requirements: + requiredResources: + - requirementName: ops.crossplane.io/watched-resource + apiVersion: example.crossplane.io/v1 + kind: XR + name: example-xr diff --git a/example/operations/watchoperation.yaml b/example/operations/watchoperation.yaml new file mode 100644 index 0000000..191c588 --- /dev/null +++ b/example/operations/watchoperation.yaml @@ -0,0 +1,35 @@ +apiVersion: ops.crossplane.io/v1alpha1 +kind: WatchOperation +metadata: + name: update-user-validation-for-critical-xrs +spec: + watch: + apiVersion: example.crossplane.io/v1 + kind: XR + matchLabels: + always-update: "true" + concurrencyPolicy: Allow + operationTemplate: + spec: + mode: Pipeline + pipeline: + - step: user-validation + functionRef: + name: function-msgraph + input: + apiVersion: msgraph.fn.crossplane.io/v1alpha1 + kind: Input + queryType: UserValidation + # Replace these with actual users in your directory + users: + - "admin@example.onmicrosoft.com" + - "user@example.onmicrosoft.com" + - "yury@upbound.io" + target: "status.validatedUsers" + skipQueryWhenTargetHasData: false # Always query even if data is in status + credentials: + - name: azure-creds + source: Secret + secretRef: + namespace: upbound-system + name: azure-account-creds diff --git a/fn.go b/fn.go index b423e28..9184d56 100644 --- a/fn.go +++ b/fn.go @@ -140,8 +140,10 @@ func (f *Function) getXRAndStatus(req *fnv1.RunFunctionRequest) (map[string]inte // getObservedAndDesired gets both observed and desired XR resources func (f *Function) getObservedAndDesired(req *fnv1.RunFunctionRequest) (*resource.Composite, *resource.Composite, error) { if req.GetObserved().GetComposite() != nil { + f.log.Debug("triggered by composite resource") return getObservedAndDesiredInComposition(req) } + f.log.Debug("triggered by operation") return getObservedAndDesiredInOperation(req) } diff --git a/package/crossplane.yaml b/package/crossplane.yaml index 2e2dc21..189343e 100644 --- a/package/crossplane.yaml +++ b/package/crossplane.yaml @@ -1,5 +1,5 @@ --- -apiVersion: meta.pkg.crossplane.io/v1beta1 +apiVersion: meta.pkg.crossplane.io/v1 kind: Function metadata: name: function-msgraph @@ -13,4 +13,7 @@ metadata: to validate Azure AD users, get group memberships, group object IDs, and service principal details. The secret for Azure credentials is compatible with the [Official Azure Provider](https://marketplace.upbound.io/providers/upbound/provider-family-azure/latest). -spec: {} +spec: + capabilities: + - composition + - operation diff --git a/package/input/msgraph.fn.crossplane.io_inputs.yaml b/package/input/msgraph.fn.crossplane.io_inputs.yaml index cfeb7f8..894f1c7 100644 --- a/package/input/msgraph.fn.crossplane.io_inputs.yaml +++ b/package/input/msgraph.fn.crossplane.io_inputs.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.18.0 + controller-gen.kubebuilder.io/version: v0.19.0 name: inputs.msgraph.fn.crossplane.io spec: group: msgraph.fn.crossplane.io From c81fe0ffd75e979f66f39d3799237afb9288d281 Mon Sep 17 00:00:00 2001 From: Jonasz Lasut-Balcerzak Date: Mon, 8 Sep 2025 09:37:53 +0200 Subject: [PATCH 3/8] prepare failing tests with required operation fixes, removed connection details --- fn.go | 11 +++++---- fn_test.go | 68 ++++++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 58 insertions(+), 21 deletions(-) diff --git a/fn.go b/fn.go index 9184d56..70b8f69 100644 --- a/fn.go +++ b/fn.go @@ -6,6 +6,7 @@ import ( "fmt" "reflect" "regexp" + "slices" "strings" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" @@ -185,13 +186,15 @@ func getObservedAndDesiredInOperation(req *fnv1.RunFunctionRequest) (*resource.C return nil, nil, errors.New("operation: Resource.Object property in operation resource can not be empty") } + if !slices.Contains(r.Resource.GetFinalizers(), "composite.apiextensions.crossplane.io") { + return nil, nil, errors.New("operation: function-msgraph support only operations on composite resources") + } + oxr := &resource.Composite{ - Resource: composite.New(), - ConnectionDetails: make(resource.ConnectionDetails), + Resource: composite.New(), } dxr := &resource.Composite{ - Resource: composite.New(), - ConnectionDetails: make(resource.ConnectionDetails), + Resource: composite.New(), } oxr.Resource.Object = r.Resource.Object diff --git a/fn_test.go b/fn_test.go index daabb59..3669e79 100644 --- a/fn_test.go +++ b/fn_test.go @@ -1622,7 +1622,7 @@ func TestResolveServicePrincipalsRef(t *testing.T) { func TestRunFunction(t *testing.T) { var ( - xr = `{"apiVersion":"example.org/v1","kind":"XR","metadata":{"name":"cool-xr"},"spec":{"count":2}}` + xr = `{"apiVersion":"example.org/v1","kind":"XR","metadata":{"name":"cool-xr","finalizers":["composite.apiextensions.crossplane.io"]},"spec":{"count":2}}` creds = &fnv1.CredentialData{ Data: map[string][]byte{ "credentials": []byte(`{ @@ -2592,8 +2592,50 @@ func TestRunFunction(t *testing.T) { }, }, }, - "OperationWithWatchedResource": { - reason: "The Function should return fatal if it runs as operation watched resource with nil Resource", + "OperationWithWatchedResourceWhichIsNotXR": { + reason: "The Function should only allow operations on XRs based on finalizers", + args: args{ + ctx: context.Background(), + req: &fnv1.RunFunctionRequest{ + Meta: &fnv1.RequestMeta{Tag: "hello"}, + Input: resource.MustStructJSON(`{ + "apiVersion": "msgraph.fn.crossplane.io/v1alpha1", + "kind": "Input", + "queryType": "UserValidation", + "users": ["user@example.com"], + "target": "status.validatedUsers" + }`), + Credentials: map[string]*fnv1.Credentials{ + "azure-creds": { + Source: &fnv1.Credentials_CredentialData{CredentialData: creds}, + }, + }, + RequiredResources: map[string]*fnv1.Resources{ + "ops.crossplane.io/watched-resource": { + Items: []*fnv1.Resource{ + { + Resource: resource.MustStructJSON(`{"apiVersion":"example.org/v1","kind":"XR","metadata":{"name":"cool-xr"},"spec":{"count":2}}`), + }, + }, + }, + }, + }, + }, + want: want{ + rsp: &fnv1.RunFunctionResponse{ + Meta: &fnv1.ResponseMeta{Tag: "hello", Ttl: durationpb.New(response.DefaultTTL)}, + Results: []*fnv1.Result{ + { + Severity: fnv1.Severity_SEVERITY_FATAL, + Message: "operation: function-msgraph support only operations on composite resources", + Target: fnv1.Target_TARGET_COMPOSITE.Enum(), + }, + }, + }, + }, + }, + "OperationWithWatchedResourceWithoutDrift": { + reason: "The Function should set annotations on XR that notify user about lack of drift", args: args{ ctx: context.Background(), req: &fnv1.RunFunctionRequest{ @@ -2645,20 +2687,12 @@ func TestRunFunction(t *testing.T) { "apiVersion": "example.org/v1", "kind": "XR", "metadata": { - "name": "cool-xr" - }, - "spec": { - "count": 2 - }, - "status": { - "validatedUsers": [ - { - "id": "test-user-id", - "displayName": "Test User", - "userPrincipalName": "user@example.com", - "mail": "user@example.com" - } - ] + "name": "cool-xr", + "finalizers": ["composite.apiextensions.crossplane.io"], + "annotations": { + "function-msgraph/operation-last-queried": "...", + "function-msgraph/operation-drift-detected": "false" + } } }`), }, From b673f3ce0f27bec1868999e5057cf4d796c83b10 Mon Sep 17 00:00:00 2001 From: Jonasz Lasut-Balcerzak Date: Mon, 8 Sep 2025 12:25:30 +0200 Subject: [PATCH 4/8] Support for operations with annotation-based notification mechanism --- fn.go | 282 +++++++++++++++++++++++++++++++++++++++-------------- fn_test.go | 128 +++++++++++++++++++++++- main.go | 1 + 3 files changed, 332 insertions(+), 79 deletions(-) diff --git a/fn.go b/fn.go index 70b8f69..473ae13 100644 --- a/fn.go +++ b/fn.go @@ -7,7 +7,9 @@ import ( "reflect" "regexp" "slices" + "strconv" "strings" + "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" azauth "github.com/microsoft/kiota-authentication-azure-go" @@ -45,11 +47,23 @@ const ( WorkloadIdentityCredentialPath = "federatedTokenFile" ) +const ( + // LastExecutionAnnotation notifies the user when was the last time that Operation has run the query + LastExecutionAnnotation = "function-msgraph/last-execution" + // LastExecutionQueryDriftDetectedAnnotation notifies the user that the drift was detected after Operation has run the query + LastExecutionQueryDriftDetectedAnnotation = "function-msgraph/last-execution-query-drift-detected" +) + // GraphQueryInterface defines the methods required for querying Microsoft Graph API. type GraphQueryInterface interface { graphQuery(ctx context.Context, azureCreds map[string]string, in *v1beta1.Input) (interface{}, error) } +// TimerInterface defines the methods required to generate the current timestamp +type TimerInterface interface { + now() string +} + // Function returns whatever response you ask it to. type Function struct { fnv1.UnimplementedFunctionRunnerServiceServer @@ -57,6 +71,8 @@ type Function struct { graphQuery GraphQueryInterface log logging.Logger + + timer TimerInterface } // RunFunction runs the Function. @@ -65,8 +81,11 @@ func (f *Function) RunFunction(ctx context.Context, req *fnv1.RunFunctionRequest rsp := response.To(req, response.DefaultTTL) + // Check if pipeline runs as Composition or Operation + inOperation := (req.GetObserved().GetComposite() == nil) + // Initialize response with desired XR and preserve context - if err := f.initializeResponse(req, rsp); err != nil { + if err := f.initializeResponse(req, rsp, inOperation); err != nil { return rsp, nil //nolint:nilerr // errors are handled in rsp } @@ -77,12 +96,12 @@ func (f *Function) RunFunction(ctx context.Context, req *fnv1.RunFunctionRequest } // Validate and prepare input - if !f.validateAndPrepareInput(ctx, req, in, rsp) { + if !f.validateAndPrepareInput(ctx, req, in, rsp, inOperation) { return rsp, nil // Early return if validation failed or query should be skipped } // Execute the query and process results - if !f.executeAndProcessQuery(ctx, req, in, azureCreds, rsp) { + if !f.executeAndProcessQuery(ctx, req, in, azureCreds, rsp, inOperation) { return rsp, nil // Error already handled in response } @@ -121,16 +140,16 @@ func (f *Function) parseInputAndCredentials(req *fnv1.RunFunctionRequest, rsp *f return in, azureCreds, nil } -// getXRAndStatus retrieves status and desired XR, handling initialization if needed -func (f *Function) getXRAndStatus(req *fnv1.RunFunctionRequest) (map[string]interface{}, *resource.Composite, error) { +// getDXRAndStatus retrieves status and desired XR, handling initialization if needed +func (f *Function) getDXRAndStatus(req *fnv1.RunFunctionRequest, inOperation bool) (map[string]interface{}, *resource.Composite, error) { // Get composite resources - oxr, dxr, err := f.getObservedAndDesired(req) + oxr, dxr, err := f.getObservedAndDesired(req, inOperation) if err != nil { return nil, nil, err } // Initialize and copy data - f.initializeAndCopyData(oxr, dxr) + f.initializeAndCopyData(oxr, dxr, inOperation) // Get status xrStatus := f.getStatusFromResources(oxr, dxr) @@ -138,9 +157,26 @@ func (f *Function) getXRAndStatus(req *fnv1.RunFunctionRequest) (map[string]inte return xrStatus, dxr, nil } +// getDXRAndStatus retrieves status and desired XR, handling initialization if needed +func (f *Function) getOXRAndStatus(req *fnv1.RunFunctionRequest, inOperation bool) (map[string]interface{}, *resource.Composite, error) { + // Get composite resources + oxr, dxr, err := f.getObservedAndDesired(req, inOperation) + if err != nil { + return nil, nil, err + } + + // Initialize and copy data + f.initializeAndCopyData(oxr, dxr, inOperation) + + // Get status + xrStatus := f.getStatusFromResources(oxr, dxr) + + return xrStatus, oxr, nil +} + // getObservedAndDesired gets both observed and desired XR resources -func (f *Function) getObservedAndDesired(req *fnv1.RunFunctionRequest) (*resource.Composite, *resource.Composite, error) { - if req.GetObserved().GetComposite() != nil { +func (f *Function) getObservedAndDesired(req *fnv1.RunFunctionRequest, inOperation bool) (*resource.Composite, *resource.Composite, error) { + if !inOperation { f.log.Debug("triggered by composite resource") return getObservedAndDesiredInComposition(req) } @@ -198,13 +234,19 @@ func getObservedAndDesiredInOperation(req *fnv1.RunFunctionRequest) (*resource.C } oxr.Resource.Object = r.Resource.Object - dxr.Resource.Object = r.Resource.Object + + // Preserve only apiVersion, kind and mMetadata from OXR + dxr.Resource.SetAPIVersion(oxr.Resource.GetAPIVersion()) + dxr.Resource.SetKind(oxr.Resource.GetKind()) + dxr.Resource.Object = map[string]interface{}{ + "metadata": r.Resource.Object["metadata"], + } return oxr, dxr, nil } // initializeAndCopyData initializes metadata and copies spec -func (f *Function) initializeAndCopyData(oxr, dxr *resource.Composite) { +func (f *Function) initializeAndCopyData(oxr, dxr *resource.Composite, inOperation bool) { // Initialize dxr from oxr if needed if dxr.Resource.GetKind() == "" { dxr.Resource.SetAPIVersion(oxr.Resource.GetAPIVersion()) @@ -212,11 +254,13 @@ func (f *Function) initializeAndCopyData(oxr, dxr *resource.Composite) { dxr.Resource.SetName(oxr.Resource.GetName()) } - // Copy spec from observed to desired XR to preserve it - xrSpec := make(map[string]interface{}) - if err := oxr.Resource.GetValueInto("spec", &xrSpec); err == nil && len(xrSpec) > 0 { - if err := dxr.Resource.SetValue("spec", xrSpec); err != nil { - f.log.Debug("Cannot set spec in desired XR", "error", err) + if !inOperation { + // Copy spec from observed to desired XR to preserve it in Composition pipeline + xrSpec := make(map[string]interface{}) + if err := oxr.Resource.GetValueInto("spec", &xrSpec); err == nil && len(xrSpec) > 0 { + if err := dxr.Resource.SetValue("spec", xrSpec); err != nil { + f.log.Debug("Cannot set spec in desired XR", "error", err) + } } } } @@ -244,8 +288,8 @@ func (f *Function) getStatusFromResources(oxr, dxr *resource.Composite) map[stri } // checkStatusTargetHasData checks if the status target has data. -func (f *Function) checkStatusTargetHasData(req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse) bool { - xrStatus, _, err := f.getXRAndStatus(req) +func (f *Function) checkStatusTargetHasData(req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse, inOperation bool) bool { + xrStatus, _, err := f.getOXRAndStatus(req, inOperation) if err != nil { response.Fatal(rsp, err) return true @@ -285,7 +329,15 @@ func (f *Function) executeQuery(ctx context.Context, azureCreds map[string]strin } // processResults processes the query results. -func (f *Function) processResults(req *fnv1.RunFunctionRequest, in *v1beta1.Input, results interface{}, rsp *fnv1.RunFunctionResponse) error { +func (f *Function) processResults(req *fnv1.RunFunctionRequest, in *v1beta1.Input, results interface{}, rsp *fnv1.RunFunctionResponse, inOperation bool) error { + if inOperation { + hasDrifted := f.hasQueryResultDriftedFromTarget(req, in.Target, results) + err := f.putQueryResultToAnnotations(req, rsp, hasDrifted) + if err != nil { + response.Fatal(rsp, err) + } + return err + } switch { case strings.HasPrefix(in.Target, "status."): err := f.putQueryResultToStatus(req, rsp, in, results) @@ -867,9 +919,62 @@ func SetNestedKey(root map[string]interface{}, key string, value interface{}) er return nil } -// putQueryResultToStatus processes the query results to status +// Timer is a concrete implementation of the TimerInterface +// that generates current timestamp +type Timer struct{} + +func (Timer) now() string { + return time.Now().Format(time.RFC3339) +} + +// putQueryResultToAnnotations process the query results to annotations (only in Operation mode) +func (f *Function) putQueryResultToAnnotations(req *fnv1.RunFunctionRequest, rsp *fnv1.RunFunctionResponse, driftDetected bool) error { + _, dxr, err := f.getDXRAndStatus(req, true) + if err != nil { + return err + } + + annotations := dxr.Resource.GetAnnotations() + if annotations == nil { + // If annotations are nil initialize map which can hold both operation annotations + annotations = make(map[string]string, 2) + } + // Update the timestamp annotation + annotations[LastExecutionAnnotation] = f.timer.now() + // Set information about the drift + annotations[LastExecutionQueryDriftDetectedAnnotation] = strconv.FormatBool(driftDetected) + + if err := dxr.Resource.SetValue("metadata.annotations", annotations); err != nil { + return errors.Wrap(err, "cannot update composite resource annotations") + } + + // Save the updated desired composite resource + if err := response.SetDesiredCompositeResource(rsp, dxr); err != nil { + return errors.Wrapf(err, "cannot set desired composite resource in %T", rsp) + } + return nil +} + +// hasQueryResultDriftedFromTarget +func (f *Function) hasQueryResultDriftedFromTarget(req *fnv1.RunFunctionRequest, target string, results interface{}) bool { + _, oxr, err := f.getOXRAndStatus(req, true) + if err != nil { + f.log.Info("cannot get observed XR to check drift between results and target") + return true + } + + observedValue, err := oxr.Resource.GetValue(target) + if err != nil { + f.log.Info("could not get value from observed XR to check drift between results and target") + return true + } + + return !reflect.DeepEqual(observedValue, results) +} + +// putQueryResultToStatus processes the query results to status (only in Composition mode) func (f *Function) putQueryResultToStatus(req *fnv1.RunFunctionRequest, rsp *fnv1.RunFunctionResponse, in *v1beta1.Input, results interface{}) error { - xrStatus, dxr, err := f.getXRAndStatus(req) + xrStatus, dxr, err := f.getDXRAndStatus(req, false) if err != nil { return err } @@ -971,15 +1076,15 @@ func targetHasData(data map[string]interface{}, key string) (bool, error) { } // propagateDesiredXR ensures the desired XR is properly propagated without changing existing data -func (f *Function) propagateDesiredXR(req *fnv1.RunFunctionRequest, rsp *fnv1.RunFunctionResponse) error { - xrStatus, dxr, err := f.getXRAndStatus(req) +func (f *Function) propagateDesiredXR(req *fnv1.RunFunctionRequest, rsp *fnv1.RunFunctionResponse, inOperation bool) error { + xrStatus, dxr, err := f.getDXRAndStatus(req, inOperation) if err != nil { response.Fatal(rsp, err) return err } // Write any existing status back to dxr - if len(xrStatus) > 0 { + if len(xrStatus) > 0 && !inOperation { if err := dxr.Resource.SetValue("status", xrStatus); err != nil { f.log.Info("Error setting status in Desired XR", "error", err) return err @@ -1008,9 +1113,9 @@ func (f *Function) preserveContext(req *fnv1.RunFunctionRequest, rsp *fnv1.RunFu } // initializeResponse initializes the response with desired XR and preserves context -func (f *Function) initializeResponse(req *fnv1.RunFunctionRequest, rsp *fnv1.RunFunctionResponse) error { +func (f *Function) initializeResponse(req *fnv1.RunFunctionRequest, rsp *fnv1.RunFunctionResponse, inOperation bool) error { // Ensure oxr to dxr gets propagated and we keep status around - if err := f.propagateDesiredXR(req, rsp); err != nil { + if err := f.propagateDesiredXR(req, rsp, inOperation); err != nil { return err } // Ensure the context is preserved @@ -1019,7 +1124,7 @@ func (f *Function) initializeResponse(req *fnv1.RunFunctionRequest, rsp *fnv1.Ru } // validateAndPrepareInput validates the input and prepares it for execution -func (f *Function) validateAndPrepareInput(_ context.Context, req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse) bool { +func (f *Function) validateAndPrepareInput(_ context.Context, req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse, inOperation bool) bool { // Check if target is valid if !f.isValidTarget(in.Target) { response.Fatal(rsp, errors.Errorf("Unrecognized target field: %s", in.Target)) @@ -1027,7 +1132,7 @@ func (f *Function) validateAndPrepareInput(_ context.Context, req *fnv1.RunFunct } // Check if we should skip the query - if f.shouldSkipQuery(req, in, rsp) { + if f.shouldSkipQuery(req, in, rsp, inOperation) { // Set success condition response.ConditionTrue(rsp, "FunctionSuccess", "Success"). TargetCompositeAndClaim() @@ -1035,7 +1140,7 @@ func (f *Function) validateAndPrepareInput(_ context.Context, req *fnv1.RunFunct } // Process references based on query type - if !f.processReferences(req, in, rsp) { + if !f.processReferences(req, in, rsp, inOperation) { return false } @@ -1043,28 +1148,28 @@ func (f *Function) validateAndPrepareInput(_ context.Context, req *fnv1.RunFunct } // processReferences handles resolving references like groupRef, groupsRef, usersRef, and servicePrincipalsRef -func (f *Function) processReferences(req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse) bool { +func (f *Function) processReferences(req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse, inOperation bool) bool { // Process references based on query type switch in.QueryType { case "GroupMembership": - return f.processGroupRef(req, in, rsp) + return f.processGroupRef(req, in, rsp, inOperation) case "GroupObjectIDs": - return f.processGroupsRef(req, in, rsp) + return f.processGroupsRef(req, in, rsp, inOperation) case "UserValidation": - return f.processUsersRef(req, in, rsp) + return f.processUsersRef(req, in, rsp, inOperation) case "ServicePrincipalDetails": - return f.processServicePrincipalsRef(req, in, rsp) + return f.processServicePrincipalsRef(req, in, rsp, inOperation) } return true } // processGroupRef handles resolving the groupRef reference for GroupMembership query type -func (f *Function) processGroupRef(req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse) bool { +func (f *Function) processGroupRef(req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse, inOperation bool) bool { if in.GroupRef == nil || *in.GroupRef == "" { return true } - groupName, err := f.resolveGroupRef(req, in.GroupRef) + groupName, err := f.resolveGroupRef(req, in.GroupRef, inOperation) if err != nil { response.Fatal(rsp, err) return false @@ -1075,12 +1180,12 @@ func (f *Function) processGroupRef(req *fnv1.RunFunctionRequest, in *v1beta1.Inp } // processGroupsRef handles resolving the groupsRef reference for GroupObjectIDs query type -func (f *Function) processGroupsRef(req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse) bool { +func (f *Function) processGroupsRef(req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse, inOperation bool) bool { if in.GroupsRef == nil || *in.GroupsRef == "" { return true } - groupNames, err := f.resolveGroupsRef(req, in.GroupsRef) + groupNames, err := f.resolveGroupsRef(req, in.GroupsRef, inOperation) if err != nil { response.Fatal(rsp, err) return false @@ -1091,12 +1196,12 @@ func (f *Function) processGroupsRef(req *fnv1.RunFunctionRequest, in *v1beta1.In } // processUsersRef handles resolving the usersRef reference for UserValidation query type -func (f *Function) processUsersRef(req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse) bool { +func (f *Function) processUsersRef(req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse, inOperation bool) bool { if in.UsersRef == nil || *in.UsersRef == "" { return true } - userNames, err := f.resolveUsersRef(req, in.UsersRef) + userNames, err := f.resolveUsersRef(req, in.UsersRef, inOperation) if err != nil { response.Fatal(rsp, err) return false @@ -1107,12 +1212,12 @@ func (f *Function) processUsersRef(req *fnv1.RunFunctionRequest, in *v1beta1.Inp } // processServicePrincipalsRef handles resolving the servicePrincipalsRef reference for ServicePrincipalDetails query type -func (f *Function) processServicePrincipalsRef(req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse) bool { +func (f *Function) processServicePrincipalsRef(req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse, inOperation bool) bool { if in.ServicePrincipalsRef == nil || *in.ServicePrincipalsRef == "" { return true } - spNames, err := f.resolveServicePrincipalsRef(req, in.ServicePrincipalsRef) + spNames, err := f.resolveServicePrincipalsRef(req, in.ServicePrincipalsRef, inOperation) if err != nil { response.Fatal(rsp, err) return false @@ -1123,7 +1228,7 @@ func (f *Function) processServicePrincipalsRef(req *fnv1.RunFunctionRequest, in } // executeAndProcessQuery executes the query and processes the results -func (f *Function) executeAndProcessQuery(ctx context.Context, req *fnv1.RunFunctionRequest, in *v1beta1.Input, azureCreds map[string]string, rsp *fnv1.RunFunctionResponse) bool { +func (f *Function) executeAndProcessQuery(ctx context.Context, req *fnv1.RunFunctionRequest, in *v1beta1.Input, azureCreds map[string]string, rsp *fnv1.RunFunctionResponse, inOperation bool) bool { // Execute the query results, err := f.executeQuery(ctx, azureCreds, in, rsp) if err != nil { @@ -1131,7 +1236,7 @@ func (f *Function) executeAndProcessQuery(ctx context.Context, req *fnv1.RunFunc } // Process the results - if err := f.processResults(req, in, results, rsp); err != nil { + if err := f.processResults(req, in, results, rsp, inOperation); err != nil { return false } @@ -1144,20 +1249,30 @@ func (f *Function) isValidTarget(target string) bool { } // shouldSkipQuery checks if the query should be skipped. -func (f *Function) shouldSkipQuery(req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse) bool { +func (f *Function) shouldSkipQuery(req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse, inOperation bool) bool { // Determine if we should skip the query when target has data var shouldSkipQueryWhenTargetHasData = false // Default to false to ensure continuous reconciliation if in.SkipQueryWhenTargetHasData != nil { shouldSkipQueryWhenTargetHasData = *in.SkipQueryWhenTargetHasData } + // We should not skip if function is running as part of Operation + if inOperation { + return false + } + + // We should not skip if Operation annotation is set to "true" + if f.queryDriftDetected(req, inOperation) { + return false + } + if !shouldSkipQueryWhenTargetHasData { return false } switch { case strings.HasPrefix(in.Target, "status."): - return f.checkStatusTargetHasData(req, in, rsp) + return f.checkStatusTargetHasData(req, in, rsp, inOperation) case strings.HasPrefix(in.Target, "context."): return f.checkContextTargetHasData(req, in, rsp) } @@ -1165,6 +1280,26 @@ func (f *Function) shouldSkipQuery(req *fnv1.RunFunctionRequest, in *v1beta1.Inp return false } +func (f *Function) queryDriftDetected(req *fnv1.RunFunctionRequest, inOperation bool) bool { + _, oxr, err := f.getOXRAndStatus(req, inOperation) + if err != nil { + f.log.Info("cannot get observed XR to check drift from annotations") + return false + } + + annotations := oxr.Resource.GetAnnotations() + driftStr, found := annotations[LastExecutionQueryDriftDetectedAnnotation] + if !found { + return false + } + + drift, err := strconv.ParseBool(driftStr) + if err != nil { + f.log.Info("annotation notyfing about detected query drift has been manually modified and is of incorrect type", "annotation", LastExecutionQueryDriftDetectedAnnotation, "value", driftStr) + } + return drift +} + // checkContextTargetHasData checks if the context target has data. func (f *Function) checkContextTargetHasData(req *fnv1.RunFunctionRequest, in *v1beta1.Input, rsp *fnv1.RunFunctionResponse) bool { contextMap := req.GetContext().AsMap() @@ -1182,7 +1317,7 @@ func (f *Function) checkContextTargetHasData(req *fnv1.RunFunctionRequest, in *v } // resolveGroupRef resolves the group name from a reference in spec, status or context. -func (f *Function) resolveGroupRef(req *fnv1.RunFunctionRequest, groupRef *string) (string, error) { +func (f *Function) resolveGroupRef(req *fnv1.RunFunctionRequest, groupRef *string, inOperation bool) (string, error) { if groupRef == nil || *groupRef == "" { return "", errors.New("empty groupRef provided") } @@ -1192,19 +1327,19 @@ func (f *Function) resolveGroupRef(req *fnv1.RunFunctionRequest, groupRef *strin // Use a proper switch statement instead of if-else chain switch { case strings.HasPrefix(refKey, "status."): - return f.resolveFromStatus(req, refKey) + return f.resolveFromStatus(req, refKey, inOperation) case strings.HasPrefix(refKey, "context."): return f.resolveFromContext(req, refKey) case strings.HasPrefix(refKey, "spec."): - return f.resolveFromSpec(req, refKey) + return f.resolveFromSpec(req, refKey, inOperation) default: return "", errors.Errorf("unsupported groupRef format: %s", refKey) } } // resolveFromStatus resolves a reference from XR status -func (f *Function) resolveFromStatus(req *fnv1.RunFunctionRequest, refKey string) (string, error) { - xrStatus, _, err := f.getXRAndStatus(req) +func (f *Function) resolveFromStatus(req *fnv1.RunFunctionRequest, refKey string, inOperation bool) (string, error) { + xrStatus, _, err := f.getOXRAndStatus(req, inOperation) if err != nil { return "", errors.Wrap(err, "cannot get XR status") } @@ -1229,16 +1364,16 @@ func (f *Function) resolveFromContext(req *fnv1.RunFunctionRequest, refKey strin } // resolveFromSpec resolves a reference from XR spec -func (f *Function) resolveFromSpec(req *fnv1.RunFunctionRequest, refKey string) (string, error) { - // Use getXRAndStatus to ensure spec is copied to desired XR - _, dxr, err := f.getXRAndStatus(req) +func (f *Function) resolveFromSpec(req *fnv1.RunFunctionRequest, refKey string, inOperation bool) (string, error) { + // Use getOXRAndStatus to ensure spec is taken from observed XR which always has full object + _, oxr, err := f.getOXRAndStatus(req, inOperation) if err != nil { - return "", errors.Wrap(err, "cannot get XR status and desired XR") + return "", errors.Wrap(err, "cannot get XR status and observed XR") } - // Get spec from the desired XR (which now has the spec copied from observed) + // Get spec from the observed XR xrSpec := make(map[string]interface{}) - err = dxr.Resource.GetValueInto("spec", &xrSpec) + err = oxr.Resource.GetValueInto("spec", &xrSpec) if err != nil { return "", errors.Wrap(err, "cannot get XR spec") } @@ -1252,7 +1387,7 @@ func (f *Function) resolveFromSpec(req *fnv1.RunFunctionRequest, refKey string) } // resolveStringArrayRef resolves a list of string values from a reference in spec, status or context -func (f *Function) resolveStringArrayRef(req *fnv1.RunFunctionRequest, ref *string, refType string) ([]*string, error) { +func (f *Function) resolveStringArrayRef(req *fnv1.RunFunctionRequest, ref *string, refType string, inOperation bool) ([]*string, error) { if ref == nil || *ref == "" { return nil, errors.Errorf("empty %s provided", refType) } @@ -1267,11 +1402,11 @@ func (f *Function) resolveStringArrayRef(req *fnv1.RunFunctionRequest, ref *stri // Use proper switch statement instead of if-else chain switch { case strings.HasPrefix(refKey, "status."): - result, err = f.resolveStringArrayFromStatus(req, refKey) + result, err = f.resolveStringArrayFromStatus(req, refKey, inOperation) case strings.HasPrefix(refKey, "context."): result, err = f.resolveStringArrayFromContext(req, refKey) case strings.HasPrefix(refKey, "spec."): - result, err = f.resolveStringArrayFromSpec(req, refKey) + result, err = f.resolveStringArrayFromSpec(req, refKey, inOperation) default: return nil, errors.Errorf("unsupported %s format: %s", refType, refKey) } @@ -1287,8 +1422,8 @@ func (f *Function) resolveStringArrayRef(req *fnv1.RunFunctionRequest, ref *stri } // resolveStringArrayFromStatus resolves a list of string values from XR status -func (f *Function) resolveStringArrayFromStatus(req *fnv1.RunFunctionRequest, refKey string) ([]*string, error) { - xrStatus, _, err := f.getXRAndStatus(req) +func (f *Function) resolveStringArrayFromStatus(req *fnv1.RunFunctionRequest, refKey string, inOperation bool) ([]*string, error) { + xrStatus, _, err := f.getOXRAndStatus(req, inOperation) if err != nil { return nil, errors.Wrap(err, "cannot get XR status") } @@ -1305,16 +1440,15 @@ func (f *Function) resolveStringArrayFromContext(req *fnv1.RunFunctionRequest, r } // resolveStringArrayFromSpec resolves a list of string values from XR spec -func (f *Function) resolveStringArrayFromSpec(req *fnv1.RunFunctionRequest, refKey string) ([]*string, error) { - // Use getXRAndStatus to ensure spec is copied to desired XR - _, dxr, err := f.getXRAndStatus(req) +func (f *Function) resolveStringArrayFromSpec(req *fnv1.RunFunctionRequest, refKey string, inOperation bool) ([]*string, error) { + _, oxr, err := f.getOXRAndStatus(req, inOperation) if err != nil { - return nil, errors.Wrap(err, "cannot get XR status and desired XR") + return nil, errors.Wrap(err, "cannot get XR status and observed XR") } - // Get spec from the desired XR (which now has the spec copied from observed) + // Get spec from the observed XR (as desired XR may be part of the operation and have no spec) xrSpec := make(map[string]interface{}) - err = dxr.Resource.GetValueInto("spec", &xrSpec) + err = oxr.Resource.GetValueInto("spec", &xrSpec) if err != nil { return nil, errors.Wrap(err, "cannot get XR spec") } @@ -1324,18 +1458,18 @@ func (f *Function) resolveStringArrayFromSpec(req *fnv1.RunFunctionRequest, refK } // resolveGroupsRef resolves a list of group names from a reference in status or context -func (f *Function) resolveGroupsRef(req *fnv1.RunFunctionRequest, groupsRef *string) ([]*string, error) { - return f.resolveStringArrayRef(req, groupsRef, "groupsRef") +func (f *Function) resolveGroupsRef(req *fnv1.RunFunctionRequest, groupsRef *string, inOperation bool) ([]*string, error) { + return f.resolveStringArrayRef(req, groupsRef, "groupsRef", inOperation) } // resolveUsersRef resolves a list of user names from a reference in status or context -func (f *Function) resolveUsersRef(req *fnv1.RunFunctionRequest, usersRef *string) ([]*string, error) { - return f.resolveStringArrayRef(req, usersRef, "usersRef") +func (f *Function) resolveUsersRef(req *fnv1.RunFunctionRequest, usersRef *string, inOperation bool) ([]*string, error) { + return f.resolveStringArrayRef(req, usersRef, "usersRef", inOperation) } // resolveServicePrincipalsRef resolves a list of service principal names from a reference in status or context -func (f *Function) resolveServicePrincipalsRef(req *fnv1.RunFunctionRequest, servicePrincipalsRef *string) ([]*string, error) { - return f.resolveStringArrayRef(req, servicePrincipalsRef, "servicePrincipalsRef") +func (f *Function) resolveServicePrincipalsRef(req *fnv1.RunFunctionRequest, servicePrincipalsRef *string, inOperation bool) ([]*string, error) { + return f.resolveStringArrayRef(req, servicePrincipalsRef, "servicePrincipalsRef", inOperation) } // extractStringArrayFromMap extracts a string array from a map using nested key diff --git a/fn_test.go b/fn_test.go index 3669e79..49fb508 100644 --- a/fn_test.go +++ b/fn_test.go @@ -27,6 +27,12 @@ func (m *MockGraphQuery) graphQuery(ctx context.Context, azureCreds map[string]s return m.GraphQueryFunc(ctx, azureCreds, in) } +type MockTimer struct{} + +func (MockTimer) now() string { + return "2025-01-01T00:00:00+01:00" +} + // TestResolveGroupsRef tests the functionality of resolving groupsRef from context, status, or spec func TestResolveGroupsRef(t *testing.T) { var ( @@ -2634,8 +2640,8 @@ func TestRunFunction(t *testing.T) { }, }, }, - "OperationWithWatchedResourceWithoutDrift": { - reason: "The Function should set annotations on XR that notify user about lack of drift", + "OperationWithWatchedResourceQueryNoDrift": { + reason: "The Function should set annotations on XR that notify user about lack of drift in query results", args: args{ ctx: context.Background(), req: &fnv1.RunFunctionRequest{ @@ -2656,7 +2662,117 @@ func TestRunFunction(t *testing.T) { "ops.crossplane.io/watched-resource": { Items: []*fnv1.Resource{ { - Resource: resource.MustStructJSON(xr), + Resource: resource.MustStructJSON(`{ + "apiVersion": "example.org/v1", + "kind": "XR", + "metadata": { + "name": "cool-xr", + "finalizers": [ + "composite.apiextensions.crossplane.io" + ] + }, + "spec": { + "count": 2 + }, + "status": { + "validatedUsers": [ + { + "id": "test-user-id", + "displayName": "Test User", + "userPrincipalName": "user@example.com", + "mail": "user@example.com" + } + ] + } + }`), + }, + }, + }, + }, + }, + }, + want: want{ + rsp: &fnv1.RunFunctionResponse{ + Meta: &fnv1.ResponseMeta{Tag: "hello", Ttl: durationpb.New(response.DefaultTTL)}, + Conditions: []*fnv1.Condition{ + { + Type: "FunctionSuccess", + Status: fnv1.Status_STATUS_CONDITION_TRUE, + Reason: "Success", + Target: fnv1.Target_TARGET_COMPOSITE_AND_CLAIM.Enum(), + }, + }, + Results: []*fnv1.Result{ + { + Severity: fnv1.Severity_SEVERITY_NORMAL, + Message: `QueryType: "UserValidation"`, + Target: fnv1.Target_TARGET_COMPOSITE.Enum(), + }, + }, + Desired: &fnv1.State{ + Composite: &fnv1.Resource{ + Resource: resource.MustStructJSON(`{ + "apiVersion": "example.org/v1", + "kind": "XR", + "metadata": { + "name": "cool-xr", + "finalizers": ["composite.apiextensions.crossplane.io"], + "annotations": { + "function-msgraph/last-execution": "2025-01-01T00:00:00+01:00", + "function-msgraph/last-execution-query-drift-detected": "false" + } + } + }`), + }, + }, + }, + }, + }, + "OperationWithWatchedResourceQueryDrift": { + reason: "The Function should set annotations on XR that notify user about drift in query results", + args: args{ + ctx: context.Background(), + req: &fnv1.RunFunctionRequest{ + Meta: &fnv1.RequestMeta{Tag: "hello"}, + Input: resource.MustStructJSON(`{ + "apiVersion": "msgraph.fn.crossplane.io/v1alpha1", + "kind": "Input", + "queryType": "UserValidation", + "users": ["user@example.com"], + "target": "status.validatedUsers" + }`), + Credentials: map[string]*fnv1.Credentials{ + "azure-creds": { + Source: &fnv1.Credentials_CredentialData{CredentialData: creds}, + }, + }, + RequiredResources: map[string]*fnv1.Resources{ + "ops.crossplane.io/watched-resource": { + Items: []*fnv1.Resource{ + { + Resource: resource.MustStructJSON(`{ + "apiVersion": "example.org/v1", + "kind": "XR", + "metadata": { + "name": "cool-xr", + "finalizers": [ + "composite.apiextensions.crossplane.io" + ] + }, + "spec": { + "count": 2 + }, + "status": { + "validatedUsers": [ + { + "id": "incorrect-id", + "displayName": "Another Display Name", + "userPrincipalName": "user@example.com", + "mail": "user@example.com" + } + ] + } + }`), }, }, }, @@ -2690,8 +2806,8 @@ func TestRunFunction(t *testing.T) { "name": "cool-xr", "finalizers": ["composite.apiextensions.crossplane.io"], "annotations": { - "function-msgraph/operation-last-queried": "...", - "function-msgraph/operation-drift-detected": "false" + "function-msgraph/last-execution": "2025-01-01T00:00:00+01:00", + "function-msgraph/last-execution-query-drift-detected": "true" } } }`), @@ -2775,6 +2891,7 @@ func TestRunFunction(t *testing.T) { f := &Function{ graphQuery: mockQuery, + timer: &MockTimer{}, log: logging.NewNopLogger(), } rsp, err := f.RunFunction(tc.args.ctx, tc.args.req) @@ -2991,6 +3108,7 @@ func TestIdentityType(t *testing.T) { f := &Function{ graphQuery: mockQuery, + timer: &MockTimer{}, log: logging.NewNopLogger(), } rsp, err := f.RunFunction(tc.args.ctx, tc.args.req) diff --git a/main.go b/main.go index 079397c..db5fc70 100644 --- a/main.go +++ b/main.go @@ -28,6 +28,7 @@ func (c *CLI) Run() error { return function.Serve(&Function{ log: log, graphQuery: &GraphQuery{}, + timer: &Timer{}, }, function.Listen(c.Network, c.Address), function.MTLSCertificates(c.TLSCertsDir), From 61e086adad21d952ae5f0c53f7c65d61f71b4577 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonasz=20=C5=81asut-Balcerzak?= Date: Mon, 8 Sep 2025 13:15:55 +0200 Subject: [PATCH 5/8] Fix rsp.Desired, run manual tests and confirm the correct annotations are set --- fn.go | 26 +++++++++++++++++++++----- fn_test.go | 50 ++++++++++++++++++++++++++------------------------ 2 files changed, 47 insertions(+), 29 deletions(-) diff --git a/fn.go b/fn.go index 473ae13..915b113 100644 --- a/fn.go +++ b/fn.go @@ -27,6 +27,7 @@ import ( fnv1 "github.com/crossplane/function-sdk-go/proto/v1" "github.com/crossplane/function-sdk-go/request" "github.com/crossplane/function-sdk-go/resource" + "github.com/crossplane/function-sdk-go/resource/composed" "github.com/crossplane/function-sdk-go/resource/composite" "github.com/crossplane/function-sdk-go/response" ) @@ -105,6 +106,8 @@ func (f *Function) RunFunction(ctx context.Context, req *fnv1.RunFunctionRequest return rsp, nil // Error already handled in response } + fmt.Println(rsp.Desired.Resources) + // Set success condition response.ConditionTrue(rsp, "FunctionSuccess", "Success"). TargetCompositeAndClaim() @@ -235,11 +238,15 @@ func getObservedAndDesiredInOperation(req *fnv1.RunFunctionRequest) (*resource.C oxr.Resource.Object = r.Resource.Object - // Preserve only apiVersion, kind and mMetadata from OXR + // Preserve only apiVersion, kind and metadata.name, metadata.annotations from OXR dxr.Resource.SetAPIVersion(oxr.Resource.GetAPIVersion()) dxr.Resource.SetKind(oxr.Resource.GetKind()) - dxr.Resource.Object = map[string]interface{}{ - "metadata": r.Resource.Object["metadata"], + dxr.Resource.SetName(oxr.Resource.GetName()) + if oxrNs := oxr.Resource.GetNamespace(); oxrNs != "" { + dxr.Resource.SetNamespace(oxrNs) + } + if oxrAnnotations := oxr.Resource.GetAnnotations(); oxrAnnotations != nil { + dxr.Resource.SetAnnotations(oxrAnnotations) } return oxr, dxr, nil @@ -335,8 +342,9 @@ func (f *Function) processResults(req *fnv1.RunFunctionRequest, in *v1beta1.Inpu err := f.putQueryResultToAnnotations(req, rsp, hasDrifted) if err != nil { response.Fatal(rsp, err) + return err } - return err + return nil } switch { case strings.HasPrefix(in.Target, "status."): @@ -949,9 +957,17 @@ func (f *Function) putQueryResultToAnnotations(req *fnv1.RunFunctionRequest, rsp } // Save the updated desired composite resource - if err := response.SetDesiredCompositeResource(rsp, dxr); err != nil { + dcds := map[resource.Name]*resource.DesiredComposed{ + "xr": { + Resource: (*composed.Unstructured)(dxr.Resource), + }, + } + + if err := response.SetDesiredComposedResources(rsp, dcds); err != nil { return errors.Wrapf(err, "cannot set desired composite resource in %T", rsp) } + // In Operation only set rsp.Desired.Resources and not rsp.Desired.Composite + rsp.Desired.Composite = nil return nil } diff --git a/fn_test.go b/fn_test.go index 49fb508..016d642 100644 --- a/fn_test.go +++ b/fn_test.go @@ -2710,19 +2710,20 @@ func TestRunFunction(t *testing.T) { }, }, Desired: &fnv1.State{ - Composite: &fnv1.Resource{ - Resource: resource.MustStructJSON(`{ - "apiVersion": "example.org/v1", - "kind": "XR", - "metadata": { - "name": "cool-xr", - "finalizers": ["composite.apiextensions.crossplane.io"], - "annotations": { - "function-msgraph/last-execution": "2025-01-01T00:00:00+01:00", - "function-msgraph/last-execution-query-drift-detected": "false" + Resources: map[string]*fnv1.Resource{ + "xr": { + Resource: resource.MustStructJSON(`{ + "apiVersion": "example.org/v1", + "kind": "XR", + "metadata": { + "name": "cool-xr", + "annotations": { + "function-msgraph/last-execution": "2025-01-01T00:00:00+01:00", + "function-msgraph/last-execution-query-drift-detected": "false" + } } - } - }`), + }`), + }, }, }, }, @@ -2798,19 +2799,20 @@ func TestRunFunction(t *testing.T) { }, }, Desired: &fnv1.State{ - Composite: &fnv1.Resource{ - Resource: resource.MustStructJSON(`{ - "apiVersion": "example.org/v1", - "kind": "XR", - "metadata": { - "name": "cool-xr", - "finalizers": ["composite.apiextensions.crossplane.io"], - "annotations": { - "function-msgraph/last-execution": "2025-01-01T00:00:00+01:00", - "function-msgraph/last-execution-query-drift-detected": "true" + Resources: map[string]*fnv1.Resource{ + "xr": { + Resource: resource.MustStructJSON(`{ + "apiVersion": "example.org/v1", + "kind": "XR", + "metadata": { + "name": "cool-xr", + "annotations": { + "function-msgraph/last-execution": "2025-01-01T00:00:00+01:00", + "function-msgraph/last-execution-query-drift-detected": "true" + } } - } - }`), + }`), + }, }, }, }, From d20482f442faaa2dd9cce73e7d1c209f8882a7ed Mon Sep 17 00:00:00 2001 From: Jonasz Lasut-Balcerzak Date: Mon, 8 Sep 2025 13:24:57 +0200 Subject: [PATCH 6/8] Fix linter --- fn.go | 2 -- fn_test.go | 93 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 2 deletions(-) diff --git a/fn.go b/fn.go index 915b113..c34cfe6 100644 --- a/fn.go +++ b/fn.go @@ -106,8 +106,6 @@ func (f *Function) RunFunction(ctx context.Context, req *fnv1.RunFunctionRequest return rsp, nil // Error already handled in response } - fmt.Println(rsp.Desired.Resources) - // Set success condition response.ConditionTrue(rsp, "FunctionSuccess", "Success"). TargetCompositeAndClaim() diff --git a/fn_test.go b/fn_test.go index 016d642..0aec207 100644 --- a/fn_test.go +++ b/fn_test.go @@ -2729,6 +2729,99 @@ func TestRunFunction(t *testing.T) { }, }, }, + "OperationWithWatchedResourceQueryNoDriftWithExistingAnnotations": { + reason: "The Function should set annotations on XR that notify user about lack of drift in query results and in the same time not override existing annotations", + args: args{ + ctx: context.Background(), + req: &fnv1.RunFunctionRequest{ + Meta: &fnv1.RequestMeta{Tag: "hello"}, + Input: resource.MustStructJSON(`{ + "apiVersion": "msgraph.fn.crossplane.io/v1alpha1", + "kind": "Input", + "queryType": "UserValidation", + "users": ["user@example.com"], + "target": "status.validatedUsers" + }`), + Credentials: map[string]*fnv1.Credentials{ + "azure-creds": { + Source: &fnv1.Credentials_CredentialData{CredentialData: creds}, + }, + }, + RequiredResources: map[string]*fnv1.Resources{ + "ops.crossplane.io/watched-resource": { + Items: []*fnv1.Resource{ + { + Resource: resource.MustStructJSON(`{ + "apiVersion": "example.org/v1", + "kind": "XR", + "metadata": { + "name": "cool-xr", + "finalizers": [ + "composite.apiextensions.crossplane.io" + ], + "annotations": { + "my-cool-annotation": "love-msgraph" + } + }, + "spec": { + "count": 2 + }, + "status": { + "validatedUsers": [ + { + "id": "test-user-id", + "displayName": "Test User", + "userPrincipalName": "user@example.com", + "mail": "user@example.com" + } + ] + } + }`), + }, + }, + }, + }, + }, + }, + want: want{ + rsp: &fnv1.RunFunctionResponse{ + Meta: &fnv1.ResponseMeta{Tag: "hello", Ttl: durationpb.New(response.DefaultTTL)}, + Conditions: []*fnv1.Condition{ + { + Type: "FunctionSuccess", + Status: fnv1.Status_STATUS_CONDITION_TRUE, + Reason: "Success", + Target: fnv1.Target_TARGET_COMPOSITE_AND_CLAIM.Enum(), + }, + }, + Results: []*fnv1.Result{ + { + Severity: fnv1.Severity_SEVERITY_NORMAL, + Message: `QueryType: "UserValidation"`, + Target: fnv1.Target_TARGET_COMPOSITE.Enum(), + }, + }, + Desired: &fnv1.State{ + Resources: map[string]*fnv1.Resource{ + "xr": { + Resource: resource.MustStructJSON(`{ + "apiVersion": "example.org/v1", + "kind": "XR", + "metadata": { + "name": "cool-xr", + "annotations": { + "function-msgraph/last-execution": "2025-01-01T00:00:00+01:00", + "function-msgraph/last-execution-query-drift-detected": "false", + "my-cool-annotation": "love-msgraph" + } + } + }`), + }, + }, + }, + }, + }, + }, "OperationWithWatchedResourceQueryDrift": { reason: "The Function should set annotations on XR that notify user about drift in query results", args: args{ From adfce38d58c8b06820fbbc192ce286308b35f2de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonasz=20=C5=81asut-Balcerzak?= Date: Mon, 8 Sep 2025 13:49:17 +0200 Subject: [PATCH 7/8] update readme --- README.md | 21 +++++++++++++++++---- example/operations/cronoperation.yaml | 1 - example/operations/operation.yaml | 1 - example/operations/watchoperation.yaml | 1 - 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index c01e5e9..07528bf 100644 --- a/README.md +++ b/README.md @@ -366,8 +366,23 @@ identity: ``` ## Operations support -function-msgraph support every kind of [operations](https://docs.crossplane.io/latest/operations/operation/), however CronOperations and WatchOperations are the most useful in context of graph queries. -Check [examples](./example/operations/) +function-msgraph support every kind of [operations](https://docs.crossplane.io/latest/operations/operation/) but it only allows targeting Composite Resources +Function omits the input.skipQueryWhenTargetHasData parameter when running in operation mode to enforce compability with Cron/Watch modes. +CronOperations and WatchOperations are the most useful in context of graph queries, please check [examples](./example/operations/). + +### Operations results +function-msgraph operations result in two annotations set on the XR: +```yaml +apiVersion: "example.org/v1" +kind: XR +metadata: + name: "cool-xr" + annotations: + "function-msgraph/last-execution": "2025-01-01T00:00:00+01:00" + "function-msgraph/last-execution-query-drift-detected": "false" +``` +function-msgraph/last-execution sets RFC3339 timestamp informing about last succesful Operation run. +function-msgraph/last-execution-query-drift-detected sets a boolean if there's a drift between input.target field's value and query result, which is used by function-msgraph in Composition context for self-healing. skipQueryWhenTargetHasData input parameter is ommited when drift detected annotation is set which leads to XR update and after that next Operation run sets the annotation back to "false". ### CronOperation CronOperation may be used to forcefully update XR's status in a predefined interval. @@ -401,7 +416,6 @@ spec: - "user@example.onmicrosoft.com" - "yury@upbound.io" target: "status.validatedUsers" - skipQueryWhenTargetHasData: false # Always query even if data is in status credentials: - name: azure-creds source: Secret @@ -447,7 +461,6 @@ spec: - "user@example.onmicrosoft.com" - "yury@upbound.io" target: "status.validatedUsers" - skipQueryWhenTargetHasData: false # Always query even if data is in status credentials: - name: azure-creds source: Secret diff --git a/example/operations/cronoperation.yaml b/example/operations/cronoperation.yaml index ee2e97b..f26d714 100644 --- a/example/operations/cronoperation.yaml +++ b/example/operations/cronoperation.yaml @@ -24,7 +24,6 @@ spec: - "user@example.onmicrosoft.com" - "yury@upbound.io" target: "status.validatedUsers" - skipQueryWhenTargetHasData: false # Always query even if data is in status credentials: - name: azure-creds source: Secret diff --git a/example/operations/operation.yaml b/example/operations/operation.yaml index ee5b767..a51cad9 100644 --- a/example/operations/operation.yaml +++ b/example/operations/operation.yaml @@ -19,7 +19,6 @@ spec: - "user@example.onmicrosoft.com" - "yury@upbound.io" target: "status.validatedUsers" - skipQueryWhenTargetHasData: false # Always query even if data is in status credentials: - name: azure-creds source: Secret diff --git a/example/operations/watchoperation.yaml b/example/operations/watchoperation.yaml index 191c588..fa711b5 100644 --- a/example/operations/watchoperation.yaml +++ b/example/operations/watchoperation.yaml @@ -26,7 +26,6 @@ spec: - "user@example.onmicrosoft.com" - "yury@upbound.io" target: "status.validatedUsers" - skipQueryWhenTargetHasData: false # Always query even if data is in status credentials: - name: azure-creds source: Secret From ddef4bd56eaa1eccece48b98d1d61a898fc2fe0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonasz=20=C5=81asut-Balcerzak?= Date: Tue, 9 Sep 2025 19:06:30 +0200 Subject: [PATCH 8/8] Update README.md Co-authored-by: Yury Tsarev --- README.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/README.md b/README.md index 07528bf..a58bb19 100644 --- a/README.md +++ b/README.md @@ -369,7 +369,22 @@ identity: function-msgraph support every kind of [operations](https://docs.crossplane.io/latest/operations/operation/) but it only allows targeting Composite Resources Function omits the input.skipQueryWhenTargetHasData parameter when running in operation mode to enforce compability with Cron/Watch modes. CronOperations and WatchOperations are the most useful in context of graph queries, please check [examples](./example/operations/). +### Operations and Compositions Working Together +**Important**: Operations and Compositions work in conjunction to provide a self-healing mechanism: + +1. **Operations Role (Drift Detection)**: + - Query Microsoft Graph API on schedule/watch events + - Compare results with current XR status + - Set drift detection annotations (but don't update status directly) + +2. **Compositions Role (Drift Correction)**: + - Run when XR is reconciled (triggered by annotation changes) + - Check drift detection annotation + - If drift detected, ignore `skipQueryWhenTargetHasData` flag and update status + - Reset drift annotation to "false" after successful update + +This creates a **two-phase self-healing system** where Operations monitor for changes and Compositions perform the actual data updates. ### Operations results function-msgraph operations result in two annotations set on the XR: ```yaml