diff --git a/common/searchattribute/sadefs/constants.go b/common/searchattribute/sadefs/constants.go index dffe1d77f6..2f0866c33a 100644 --- a/common/searchattribute/sadefs/constants.go +++ b/common/searchattribute/sadefs/constants.go @@ -111,6 +111,14 @@ const ( // successfully, the search attribute is removed. Format of a single problem: // "category= cause=". TemporalReportedProblems = "TemporalReportedProblems" + + // TemporalExternalPayloadCount is the count of external payloads referenced in the + // entire history tree of the execution. + TemporalExternalPayloadCount = "TemporalExternalPayloadCount" + + // TemporalExternalPayloadSizeBytes is the total size in bytes of all external payloads + // referenced in the entire history tree of the execution. + TemporalExternalPayloadSizeBytes = "TemporalExternalPayloadSizeBytes" ) var ( @@ -177,6 +185,8 @@ var ( TemporalWorkflowVersioningBehavior: enumspb.INDEXED_VALUE_TYPE_KEYWORD, TemporalWorkerDeployment: enumspb.INDEXED_VALUE_TYPE_KEYWORD, TemporalUsedWorkerDeploymentVersions: enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, + TemporalExternalPayloadCount: enumspb.INDEXED_VALUE_TYPE_INT, + TemporalExternalPayloadSizeBytes: enumspb.INDEXED_VALUE_TYPE_INT, } // reserved are internal field names that can't be used as search attribute names. diff --git a/service/history/visibility_queue_factory.go b/service/history/visibility_queue_factory.go index 69fbe0c56f..badcbd138e 100644 --- a/service/history/visibility_queue_factory.go +++ b/service/history/visibility_queue_factory.go @@ -103,6 +103,7 @@ func (f *visibilityQueueFactory) CreateQueue( f.Config.VisibilityProcessorEnsureCloseBeforeDelete, f.Config.VisibilityProcessorEnableCloseWorkflowCleanup, f.Config.VisibilityProcessorRelocateAttributesMinBlobSize, + f.Config.ExternalPayloadsEnabled, ) if f.ExecutorWrapper != nil { executor = f.ExecutorWrapper.Wrap(executor) diff --git a/service/history/visibility_queue_task_executor.go b/service/history/visibility_queue_task_executor.go index 3bcc952180..9ac064b28d 100644 --- a/service/history/visibility_queue_task_executor.go +++ b/service/history/visibility_queue_task_executor.go @@ -38,6 +38,7 @@ type ( ensureCloseBeforeDelete dynamicconfig.BoolPropertyFn enableCloseWorkflowCleanup dynamicconfig.BoolPropertyFnWithNamespaceFilter relocateAttributesMinBlobSize dynamicconfig.IntPropertyFnWithNamespaceFilter + externalPayloadsEnabled dynamicconfig.BoolPropertyFnWithNamespaceFilter } ) @@ -52,6 +53,7 @@ func newVisibilityQueueTaskExecutor( ensureCloseBeforeDelete dynamicconfig.BoolPropertyFn, enableCloseWorkflowCleanup dynamicconfig.BoolPropertyFnWithNamespaceFilter, relocateAttributesMinBlobSize dynamicconfig.IntPropertyFnWithNamespaceFilter, + externalPayloadsEnabled dynamicconfig.BoolPropertyFnWithNamespaceFilter, ) queues.Executor { return &visibilityQueueTaskExecutor{ shardContext: shardContext, @@ -63,6 +65,7 @@ func newVisibilityQueueTaskExecutor( ensureCloseBeforeDelete: ensureCloseBeforeDelete, enableCloseWorkflowCleanup: enableCloseWorkflowCleanup, relocateAttributesMinBlobSize: relocateAttributesMinBlobSize, + externalPayloadsEnabled: externalPayloadsEnabled, } } @@ -270,7 +273,7 @@ func (t *visibilityQueueTaskExecutor) processCloseExecution( mutableState.GetExecutionInfo().Memo, mutableState.GetExecutionInfo().SearchAttributes, ) - closedRequest, err := t.getClosedVisibilityRequest(ctx, requestBase, mutableState) + closedRequest, err := t.getClosedVisibilityRequest(ctx, requestBase, mutableState, namespaceEntry) if err != nil { return err } @@ -482,7 +485,7 @@ func (t *visibilityQueueTaskExecutor) processChasmTask( ) } - closedRequest, err := t.getClosedVisibilityRequest(ctx, requestBase, mutableState) + closedRequest, err := t.getClosedVisibilityRequest(ctx, requestBase, mutableState, namespaceEntry) if err != nil { return err } @@ -545,6 +548,7 @@ func (t *visibilityQueueTaskExecutor) getClosedVisibilityRequest( ctx context.Context, base *manager.VisibilityRequestBase, mutableState historyi.MutableState, + namespaceEntry *namespace.Namespace, ) (*manager.RecordWorkflowExecutionClosedRequest, error) { wfCloseTime, err := mutableState.GetWorkflowCloseTime(ctx) if err != nil { @@ -558,6 +562,24 @@ func (t *visibilityQueueTaskExecutor) getClosedVisibilityRequest( executionInfo := mutableState.GetExecutionInfo() stateTransitionCount := executionInfo.GetStateTransitionCount() historySizeBytes := executionInfo.GetExecutionStats().GetHistorySize() + + if base.SearchAttributes == nil { + base.SearchAttributes = &commonpb.SearchAttributes{ + IndexedFields: make(map[string]*commonpb.Payload), + } + } else if base.SearchAttributes.IndexedFields == nil { + base.SearchAttributes.IndexedFields = make(map[string]*commonpb.Payload) + } + + if t.externalPayloadsEnabled(namespaceEntry.Name().String()) { + externalPayloadCount := executionInfo.GetExecutionStats().GetExternalPayloadCount() + externalPayloadSizeBytes := executionInfo.GetExecutionStats().GetExternalPayloadSize() + externalPayloadCountPayload, _ := payload.Encode(externalPayloadCount) + externalPayloadSizeBytesPayload, _ := payload.Encode(externalPayloadSizeBytes) + base.SearchAttributes.IndexedFields[sadefs.TemporalExternalPayloadCount] = externalPayloadCountPayload + base.SearchAttributes.IndexedFields[sadefs.TemporalExternalPayloadSizeBytes] = externalPayloadSizeBytesPayload + } + return &manager.RecordWorkflowExecutionClosedRequest{ VisibilityRequestBase: base, CloseTime: wfCloseTime, diff --git a/service/history/visibility_queue_task_executor_test.go b/service/history/visibility_queue_task_executor_test.go index 15130d1e3b..03a9ba8a50 100644 --- a/service/history/visibility_queue_task_executor_test.go +++ b/service/history/visibility_queue_task_executor_test.go @@ -184,6 +184,7 @@ func (s *visibilityQueueTaskExecutorSuite) SetupTest() { config.VisibilityProcessorEnsureCloseBeforeDelete, func(_ string) bool { return s.enableCloseWorkflowCleanup }, config.VisibilityProcessorRelocateAttributesMinBlobSize, + config.ExternalPayloadsEnabled, ) } diff --git a/tests/advanced_visibility_test.go b/tests/advanced_visibility_test.go index 246a2250b6..48c752c6d5 100644 --- a/tests/advanced_visibility_test.go +++ b/tests/advanced_visibility_test.go @@ -38,6 +38,7 @@ import ( "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/searchattribute/sadefs" "go.temporal.io/server/common/testing/protorequire" + "go.temporal.io/server/common/testing/testvars" "go.temporal.io/server/common/worker_versioning" "go.temporal.io/server/service/worker/scanner/build_ids" "go.temporal.io/server/tests/testcore" @@ -87,6 +88,8 @@ func (s *AdvancedVisibilitySuite) SetupSuite() { dynamicconfig.RemovableBuildIdDurationSinceDefault.Key(): time.Microsecond, // Enable the unified query converter dynamicconfig.VisibilityEnableUnifiedQueryConverter.Key(): s.enableUnifiedQueryConverter, + // Enable external payload tracking for TestListWorkflow_ExternalPayloadSearchAttributes + dynamicconfig.ExternalPayloadsEnabled.Key(): true, } s.FunctionalTestBase.SetupSuiteWithCluster(testcore.WithDynamicConfigOverrides(dynamicConfigOverrides)) @@ -235,7 +238,12 @@ func (s *AdvancedVisibilitySuite) TestListWorkflow_SearchAttribute() { we, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request) s.NoError(err) query := fmt.Sprintf(`WorkflowId = "%s" and %s = "%s"`, id, testSearchAttributeKey, testSearchAttributeVal) - s.testHelperForReadOnce(we.GetRunId(), query) + openExecution := s.testHelperForReadOnce(we.GetRunId(), query) + searchValBytes, ok := openExecution.GetSearchAttributes().GetIndexedFields()[testSearchAttributeKey] + s.True(ok) + var searchVal string + _ = payload.Decode(searchValBytes, &searchVal) + s.Equal(testSearchAttributeVal, searchVal) searchAttributes := s.createSearchAttributes() // test upsert @@ -846,7 +854,7 @@ func (s *AdvancedVisibilitySuite) testListWorkflowHelper( s.Nil(nextPageToken) } -func (s *AdvancedVisibilitySuite) testHelperForReadOnce(expectedRunID string, query string) { +func (s *AdvancedVisibilitySuite) testHelperForReadOnce(expectedRunID string, query string) *workflowpb.WorkflowExecutionInfo { var openExecution *workflowpb.WorkflowExecutionInfo listRequest := &workflowservice.ListWorkflowExecutionsRequest{ Namespace: s.Namespace().String(), @@ -867,12 +875,7 @@ func (s *AdvancedVisibilitySuite) testHelperForReadOnce(expectedRunID string, qu s.NotNil(openExecution) s.Equal(expectedRunID, openExecution.GetExecution().GetRunId()) s.True(!openExecution.GetExecutionTime().AsTime().Before(openExecution.GetStartTime().AsTime())) - if openExecution.SearchAttributes != nil && len(openExecution.SearchAttributes.GetIndexedFields()) > 0 { - searchValBytes := openExecution.SearchAttributes.GetIndexedFields()[testSearchAttributeKey] - var searchVal string - _ = payload.Decode(searchValBytes, &searchVal) - s.Equal(testSearchAttributeVal, searchVal) - } + return openExecution } func (s *AdvancedVisibilitySuite) TestCountWorkflow() { @@ -2477,6 +2480,62 @@ func (s *AdvancedVisibilitySuite) TestBuildIdScavenger_DeletesUnusedBuildId() { s.Require().Equal(0, len(res.BuildIdReachability[0].TaskQueueReachability)) } +func (s *AdvancedVisibilitySuite) TestListWorkflow_ExternalPayloadSearchAttributes() { + id := "es-functional-external-payload-test" + wt := "es-functional-external-payload-test-type" + tl := "es-functional-external-payload-test-taskqueue" + + // Create workflow input with external payload + externalPayloadSize := int64(1024) + workflowInput := &commonpb.Payloads{ + Payloads: []*commonpb.Payload{{ + Data: []byte("workflow input"), + ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{ + {SizeBytes: externalPayloadSize}, + }, + }}, + } + + request := &workflowservice.StartWorkflowExecutionRequest{ + RequestId: uuid.NewString(), + Namespace: s.Namespace().String(), + WorkflowId: id, + WorkflowType: &commonpb.WorkflowType{Name: wt}, + TaskQueue: &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Input: workflowInput, + WorkflowRunTimeout: durationpb.New(100 * time.Second), + WorkflowTaskTimeout: durationpb.New(10 * time.Second), + Identity: "test-identity", + } + + we, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request) + s.NoError(err) + + // Complete the workflow using the new taskpoller API + tv := testvars.New(s.T()).WithTaskQueue(tl) + _, err = s.TaskPoller().PollAndHandleWorkflowTask(tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ + CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{ + Result: payloads.EncodeString("Done"), + }, + }, + }}, + }, nil + }, + ) + s.NoError(err) + + query := fmt.Sprintf(`WorkflowId = "%s" AND %s = 1`, id, sadefs.TemporalExternalPayloadCount) + s.testHelperForReadOnce(we.GetRunId(), query) + + query = fmt.Sprintf(`WorkflowId = "%s" AND %s = %d`, id, sadefs.TemporalExternalPayloadSizeBytes, externalPayloadSize) + s.testHelperForReadOnce(we.GetRunId(), query) +} + func (s *AdvancedVisibilitySuite) TestScheduleListingWithSearchAttributes() { ctx := testcore.NewContext()