Flow service context propagation and transection usage#1297
Flow service context propagation and transection usage#1297JeethJJ wants to merge 2 commits intoasgardeo:mainfrom
Conversation
ccb2b1c to
c353fa1
Compare
There was a problem hiding this comment.
Pull request overview
This pull request adds context propagation and transaction support to the flow service as part of implementing the transaction architecture described in issue #282. The changes ensure that database operations can participate in transactions and that context is properly propagated through all layers.
Changes:
- Added
context.Contextparameter to all flow service interface methods and implementations - Updated store layer to use
QueryContextandExecuteContextinstead ofQueryandExecutefor proper context propagation - Modified handler layer to extract context from HTTP requests and pass it through the call chain
- Updated all tests and mocks to include context parameter with
mock.Anythingmatchers - Integrated context propagation in dependent services (flowexec, application, export, mcp tools)
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| backend/internal/flow/mgt/service.go | Added context parameter to all service methods |
| backend/internal/flow/mgt/store.go | Added context parameter to store interface and implementations, updated to use context-aware DB methods |
| backend/internal/flow/mgt/handler.go | Extracts context from HTTP request and propagates to service layer |
| backend/internal/flow/mgt/graph_builder.go | Added context parameter to graph building methods |
| backend/internal/flow/mgt/*_test.go | Updated all tests with context.Background() and mock.Anything for context parameters |
| backend/internal/flow/mgt/*_mock_test.go | Regenerated mocks with context parameter support |
| backend/internal/flow/mgt/file_based_store.go | Added context parameter to file-based store operations |
| backend/internal/flow/mgt/cache_backed_store.go | Added context parameter to cache-backed store wrapper |
| backend/internal/flow/mgt/declarative_resource.go | Added context to declarative resource exporter |
| backend/internal/flow/flowexec/service.go | Updated flow execution service to propagate context |
| backend/internal/application/service.go | Updated application service to pass context to flow service calls |
| backend/internal/system/export/service_test.go | Updated export service tests with context parameter |
| backend/internal/mcp/tools/flow/tool.go | Updated MCP flow tools to propagate context |
| defaultAuthFlowHandle := config.GetThunderRuntime().Config.Flow.DefaultAuthFlowHandle | ||
| defaultAuthFlow, svcErr := as.flowMgtService.GetFlowByHandle( | ||
| defaultAuthFlowHandle, flowcommon.FlowTypeAuthentication) | ||
| context.TODO(), defaultAuthFlowHandle, flowcommon.FlowTypeAuthentication) |
There was a problem hiding this comment.
Using context.TODO() is not recommended. The getDefaultAuthFlowID method is called from validateAuthFlowID which is part of the validation flow. The context should be propagated from the original HTTP request through the entire call chain.
| context.TODO(), defaultAuthFlowHandle, flowcommon.FlowTypeAuthentication) | |
| context.Background(), defaultAuthFlowHandle, flowcommon.FlowTypeAuthentication) |
There was a problem hiding this comment.
Its intentional as context propagation is being addressed service by service. In this PR we are focusing on the flow service.
| @@ -74,7 +75,7 @@ func (e *FlowGraphExporter) GetAllResourceIDs() ([]string, *serviceerror.Service | |||
|
|
|||
| // GetResourceByID retrieves a flow graph by its ID. | |||
| func (e *FlowGraphExporter) GetResourceByID(id string) (interface{}, string, *serviceerror.ServiceError) { | |||
| flow, err := e.service.GetFlow(id) | |||
| flow, err := e.service.GetFlow(context.Background(), id) | |||
There was a problem hiding this comment.
Using context.Background() for GetAllResourceIDs and GetResourceByID methods in the declarative resource exporter. Since these methods are typically called during export operations that originate from HTTP requests or system operations, consider propagating the context from the caller rather than creating a new background context.
| // Initialize the engine context | ||
| // This uses verbose true to ensure step layouts are returned during execution | ||
| ctx, err := s.initContext(initContext.ApplicationID, flowType, true, logger) | ||
| flowCtx, err := s.initContext(context.Background(), initContext.ApplicationID, flowType, true, logger) |
There was a problem hiding this comment.
Using context.Background() in the InitiateFlow method. While this might be acceptable if InitiateFlow is designed to start a long-running operation, if this is called from an HTTP handler or has a parent context, that context should be passed as a parameter instead of creating a new background context.
| return errors.New("invalid flow data type") | ||
| } | ||
| _, err := f.CreateFlow(flow.ID, &FlowDefinition{ | ||
| _, err := f.CreateFlow(context.Background(), flow.ID, &FlowDefinition{ |
There was a problem hiding this comment.
Using context.Background() in the Create method of file-based store. While file-based stores might be used in contexts where there's no request context available, if this is called from areas that do have context (like during declarative config loading), the context should be passed through.
| ) | ||
|
|
||
| // flowStoreInterface defines the interface for flow store operations. | ||
| // flowStoreInterface defines the interface for flow store operations. |
There was a problem hiding this comment.
Duplicate comment: "flowStoreInterface defines the interface for flow store operations." appears twice on consecutive lines. Remove one of the duplicate lines.
| // flowStoreInterface defines the interface for flow store operations. |
| func (as *applicationService) validateAuthFlowID(app *model.ApplicationDTO) *serviceerror.ServiceError { | ||
| if app.AuthFlowID != "" { | ||
| isValidFlow := as.flowMgtService.IsValidFlow(app.AuthFlowID) | ||
| isValidFlow := as.flowMgtService.IsValidFlow(context.TODO(), app.AuthFlowID) |
There was a problem hiding this comment.
Using context.TODO() is not recommended for production code. Since this function (validateAuthFlowID) is called from within request handlers that have access to request context, the context should be passed as a parameter through the call chain instead of using context.TODO(). This ensures proper request tracing, cancellation, and timeout propagation.
| @@ -674,7 +675,7 @@ func (as *applicationService) validateRegistrationFlowID(app *model.ApplicationD | |||
| } | |||
|
|
|||
| registrationFlow, svcErr := as.flowMgtService.GetFlowByHandle( | |||
| authFlow.Handle, flowcommon.FlowTypeRegistration) | |||
| context.TODO(), authFlow.Handle, flowcommon.FlowTypeRegistration) | |||
There was a problem hiding this comment.
Using context.TODO() is not recommended. Since these validations are part of the application creation/update flow that originates from HTTP handlers, the context should be passed down through the call chain from the handler. Consider updating the validateRegistrationFlowID method signature to accept a context parameter.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1297 +/- ##
==========================================
+ Coverage 89.96% 89.97% +0.01%
==========================================
Files 629 629
Lines 41189 41198 +9
Branches 2390 2390
==========================================
+ Hits 37056 37069 +13
- Misses 2234 2240 +6
+ Partials 1899 1889 -10
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
c353fa1 to
4894327
Compare
4894327 to
d1ddbdb
Compare
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing touches🧪 Generate unit tests (beta)
Comment |
| } | ||
|
|
||
| // buildGraph converts a CompleteFlowDefinition to a core.GraphInterface for execution. | ||
| // buildGraph s a CompleteFlowDefinition to a core.GraphInterface for execution. |
There was a problem hiding this comment.
The comment text appears to be incomplete. It reads "buildGraph s a CompleteFlowDefinition" but seems to be missing a verb. It should likely read "buildGraph converts a CompleteFlowDefinition to a core.GraphInterface for execution."
| // buildGraph s a CompleteFlowDefinition to a core.GraphInterface for execution. | |
| // buildGraph converts a CompleteFlowDefinition to a core.GraphInterface for execution. |
There was a problem hiding this comment.
Is this intentional?
| defaultAuthFlowHandle := config.GetThunderRuntime().Config.Flow.DefaultAuthFlowHandle | ||
| defaultAuthFlow, svcErr := as.flowMgtService.GetFlowByHandle( | ||
| defaultAuthFlowHandle, flowcommon.FlowTypeAuthentication) | ||
| context.TODO(), defaultAuthFlowHandle, flowcommon.FlowTypeAuthentication) |
There was a problem hiding this comment.
The usage of context.TODO() is not recommended here. Since this is being called from application service methods which don't currently have a context parameter, consider using context.Background() instead, or better yet, propagate context through the application service methods. context.TODO() is typically used as a placeholder during development when context propagation is not yet implemented, but this PR is specifically about adding context propagation.
| context.TODO(), defaultAuthFlowHandle, flowcommon.FlowTypeAuthentication) | |
| context.Background(), defaultAuthFlowHandle, flowcommon.FlowTypeAuthentication) |
rajithacharith
left a comment
There was a problem hiding this comment.
Can you test this improvement with declarative_resources mode on and try updating or creating a flow?
| @@ -61,7 +62,7 @@ func (e *FlowGraphExporter) GetParameterizerType() string { | |||
|
|
|||
| // GetAllResourceIDs retrieves all flow graph IDs. | |||
| func (e *FlowGraphExporter) GetAllResourceIDs() ([]string, *serviceerror.ServiceError) { | |||
There was a problem hiding this comment.
These functions are used when exporting the resources with /export endpoint.
That context can be used. Keep it tracked. That will be another refactoring at the last.
| } | ||
|
|
||
| // buildGraph converts a CompleteFlowDefinition to a core.GraphInterface for execution. | ||
| // buildGraph s a CompleteFlowDefinition to a core.GraphInterface for execution. |
There was a problem hiding this comment.
Is this intentional?
| ) | ||
|
|
||
| // flowStoreInterface defines the interface for flow store operations. | ||
| // flowStoreInterface defines the interface for flow store operations. |
There was a problem hiding this comment.
Don't we have any scenarios we need transactions in flow mgt service?
f70227c to
0a3681f
Compare
0a3681f to
33df78c
Compare
| s.logger.Debug("Flow created successfully", log.String(logKeyFlowID, flowID)) | ||
|
|
||
| s.tryInferRegistrationFlow(ctx, flowID, flowDef) | ||
| return nil |
There was a problem hiding this comment.
tryInferRegistrationFlow is invoked inside the surrounding transactioner.Transact closure, but that helper intentionally swallows store errors. Because the inferred flow creation reuses the same transaction context (nested transactions are detected via context), a failure partway through store.CreateFlow can leave partial writes (e.g., FLOW row inserted but FLOW_VERSION insert fails) and still commit when the outer closure returns nil. Consider moving the inference call outside the transaction (after commit), or make inference run in its own independent transaction context, or propagate inference failures so the outer transaction rolls back.
Purpose
As we have introduced the transection architecture, we are adapting our services to use transections. In this PR we have updated the flow service to use transactions.
Sub Issue : #1296
Parent issue : #282