From b1c0ee7e6bf4c41be021373fbac99b38ae22257b Mon Sep 17 00:00:00 2001 From: Vladyslav Simonenko Date: Wed, 14 Jan 2026 16:46:14 -0800 Subject: [PATCH 1/3] Set external payload stats in visibility --- common/searchattribute/sadefs/constants.go | 10 +++ .../history/visibility_queue_task_executor.go | 16 +++++ tests/advanced_visibility_test.go | 68 +++++++++++++++++++ 3 files changed, 94 insertions(+) diff --git a/common/searchattribute/sadefs/constants.go b/common/searchattribute/sadefs/constants.go index dffe1d77f6..2f0866c33a 100644 --- a/common/searchattribute/sadefs/constants.go +++ b/common/searchattribute/sadefs/constants.go @@ -111,6 +111,14 @@ const ( // successfully, the search attribute is removed. Format of a single problem: // "category= cause=". TemporalReportedProblems = "TemporalReportedProblems" + + // TemporalExternalPayloadCount is the count of external payloads referenced in the + // entire history tree of the execution. + TemporalExternalPayloadCount = "TemporalExternalPayloadCount" + + // TemporalExternalPayloadSizeBytes is the total size in bytes of all external payloads + // referenced in the entire history tree of the execution. + TemporalExternalPayloadSizeBytes = "TemporalExternalPayloadSizeBytes" ) var ( @@ -177,6 +185,8 @@ var ( TemporalWorkflowVersioningBehavior: enumspb.INDEXED_VALUE_TYPE_KEYWORD, TemporalWorkerDeployment: enumspb.INDEXED_VALUE_TYPE_KEYWORD, TemporalUsedWorkerDeploymentVersions: enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, + TemporalExternalPayloadCount: enumspb.INDEXED_VALUE_TYPE_INT, + TemporalExternalPayloadSizeBytes: enumspb.INDEXED_VALUE_TYPE_INT, } // reserved are internal field names that can't be used as search attribute names. diff --git a/service/history/visibility_queue_task_executor.go b/service/history/visibility_queue_task_executor.go index 3bcc952180..0dfd956308 100644 --- a/service/history/visibility_queue_task_executor.go +++ b/service/history/visibility_queue_task_executor.go @@ -558,6 +558,22 @@ func (t *visibilityQueueTaskExecutor) getClosedVisibilityRequest( executionInfo := mutableState.GetExecutionInfo() stateTransitionCount := executionInfo.GetStateTransitionCount() historySizeBytes := executionInfo.GetExecutionStats().GetHistorySize() + + externalPayloadCount := executionInfo.GetExecutionStats().GetExternalPayloadCount() + externalPayloadSizeBytes := executionInfo.GetExecutionStats().GetExternalPayloadSize() + if base.SearchAttributes == nil { + base.SearchAttributes = &commonpb.SearchAttributes{ + IndexedFields: make(map[string]*commonpb.Payload), + } + } + if base.SearchAttributes.IndexedFields == nil { + base.SearchAttributes.IndexedFields = make(map[string]*commonpb.Payload) + } + externalPayloadCountPayload, _ := payload.Encode(externalPayloadCount) + externalPayloadSizeBytesPayload, _ := payload.Encode(externalPayloadSizeBytes) + base.SearchAttributes.IndexedFields[sadefs.TemporalExternalPayloadCount] = externalPayloadCountPayload + base.SearchAttributes.IndexedFields[sadefs.TemporalExternalPayloadSizeBytes] = externalPayloadSizeBytesPayload + return &manager.RecordWorkflowExecutionClosedRequest{ VisibilityRequestBase: base, CloseTime: wfCloseTime, diff --git a/tests/advanced_visibility_test.go b/tests/advanced_visibility_test.go index 246a2250b6..25766c70c3 100644 --- a/tests/advanced_visibility_test.go +++ b/tests/advanced_visibility_test.go @@ -38,6 +38,7 @@ import ( "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/searchattribute/sadefs" "go.temporal.io/server/common/testing/protorequire" + "go.temporal.io/server/common/testing/testvars" "go.temporal.io/server/common/worker_versioning" "go.temporal.io/server/service/worker/scanner/build_ids" "go.temporal.io/server/tests/testcore" @@ -87,6 +88,8 @@ func (s *AdvancedVisibilitySuite) SetupSuite() { dynamicconfig.RemovableBuildIdDurationSinceDefault.Key(): time.Microsecond, // Enable the unified query converter dynamicconfig.VisibilityEnableUnifiedQueryConverter.Key(): s.enableUnifiedQueryConverter, + // Enable external payload tracking for TestListWorkflow_ExternalPayloadSearchAttributes + dynamicconfig.ExternalPayloadsEnabled.Key(): true, } s.FunctionalTestBase.SetupSuiteWithCluster(testcore.WithDynamicConfigOverrides(dynamicConfigOverrides)) @@ -2477,6 +2480,71 @@ func (s *AdvancedVisibilitySuite) TestBuildIdScavenger_DeletesUnusedBuildId() { s.Require().Equal(0, len(res.BuildIdReachability[0].TaskQueueReachability)) } +func (s *AdvancedVisibilitySuite) TestListWorkflow_ExternalPayloadSearchAttributes() { + id := "es-functional-external-payload-test" + wt := "es-functional-external-payload-test-type" + tl := "es-functional-external-payload-test-taskqueue" + + // Create workflow input with external payload + externalPayloadSize := int64(1024) + workflowInput := &commonpb.Payloads{ + Payloads: []*commonpb.Payload{{ + Data: []byte("workflow input"), + ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{ + {SizeBytes: externalPayloadSize}, + }, + }}, + } + + // Add testSearchAttributeKey to satisfy testHelperForReadOnce validation + attrValBytes, _ := payload.Encode(testSearchAttributeVal) + searchAttr := &commonpb.SearchAttributes{ + IndexedFields: map[string]*commonpb.Payload{ + testSearchAttributeKey: attrValBytes, + }, + } + + request := &workflowservice.StartWorkflowExecutionRequest{ + RequestId: uuid.NewString(), + Namespace: s.Namespace().String(), + WorkflowId: id, + WorkflowType: &commonpb.WorkflowType{Name: wt}, + TaskQueue: &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Input: workflowInput, + SearchAttributes: searchAttr, + WorkflowRunTimeout: durationpb.New(100 * time.Second), + WorkflowTaskTimeout: durationpb.New(10 * time.Second), + Identity: "test-identity", + } + + we, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request) + s.NoError(err) + + // Complete the workflow using the new taskpoller API + tv := testvars.New(s.T()).WithTaskQueue(tl) + _, err = s.TaskPoller().PollAndHandleWorkflowTask(tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ + CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{ + Result: payloads.EncodeString("Done"), + }, + }, + }}, + }, nil + }, + ) + s.NoError(err) + + query := fmt.Sprintf(`WorkflowId = "%s" AND %s = 1`, id, sadefs.TemporalExternalPayloadCount) + s.testHelperForReadOnce(we.GetRunId(), query) + + query = fmt.Sprintf(`WorkflowId = "%s" AND %s = %d`, id, sadefs.TemporalExternalPayloadSizeBytes, externalPayloadSize) + s.testHelperForReadOnce(we.GetRunId(), query) +} + func (s *AdvancedVisibilitySuite) TestScheduleListingWithSearchAttributes() { ctx := testcore.NewContext() From fb972c92d889178c1ee8cd3a474528c15305d193 Mon Sep 17 00:00:00 2001 From: Vladyslav Simonenko Date: Thu, 22 Jan 2026 17:15:12 -0800 Subject: [PATCH 2/3] Gate adding external payload stats to visibility --- .../history/visibility_queue_task_executor.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/service/history/visibility_queue_task_executor.go b/service/history/visibility_queue_task_executor.go index 0dfd956308..bfcbe936b8 100644 --- a/service/history/visibility_queue_task_executor.go +++ b/service/history/visibility_queue_task_executor.go @@ -559,20 +559,22 @@ func (t *visibilityQueueTaskExecutor) getClosedVisibilityRequest( stateTransitionCount := executionInfo.GetStateTransitionCount() historySizeBytes := executionInfo.GetExecutionStats().GetHistorySize() - externalPayloadCount := executionInfo.GetExecutionStats().GetExternalPayloadCount() - externalPayloadSizeBytes := executionInfo.GetExecutionStats().GetExternalPayloadSize() if base.SearchAttributes == nil { base.SearchAttributes = &commonpb.SearchAttributes{ IndexedFields: make(map[string]*commonpb.Payload), } - } - if base.SearchAttributes.IndexedFields == nil { + } else if base.SearchAttributes.IndexedFields == nil { base.SearchAttributes.IndexedFields = make(map[string]*commonpb.Payload) } - externalPayloadCountPayload, _ := payload.Encode(externalPayloadCount) - externalPayloadSizeBytesPayload, _ := payload.Encode(externalPayloadSizeBytes) - base.SearchAttributes.IndexedFields[sadefs.TemporalExternalPayloadCount] = externalPayloadCountPayload - base.SearchAttributes.IndexedFields[sadefs.TemporalExternalPayloadSizeBytes] = externalPayloadSizeBytesPayload + + if t.shardContext.GetConfig().ExternalPayloadsEnabled(mutableState.GetNamespaceEntry().Name().String()) { + externalPayloadCount := executionInfo.GetExecutionStats().GetExternalPayloadCount() + externalPayloadSizeBytes := executionInfo.GetExecutionStats().GetExternalPayloadSize() + externalPayloadCountPayload, _ := payload.Encode(externalPayloadCount) + externalPayloadSizeBytesPayload, _ := payload.Encode(externalPayloadSizeBytes) + base.SearchAttributes.IndexedFields[sadefs.TemporalExternalPayloadCount] = externalPayloadCountPayload + base.SearchAttributes.IndexedFields[sadefs.TemporalExternalPayloadSizeBytes] = externalPayloadSizeBytesPayload + } return &manager.RecordWorkflowExecutionClosedRequest{ VisibilityRequestBase: base, From fc389348c7cbb60fda22e80545c7ab7c999e0d93 Mon Sep 17 00:00:00 2001 From: Vladyslav Simonenko Date: Fri, 23 Jan 2026 11:31:15 -0800 Subject: [PATCH 3/3] Cache ExternalPayloadsEnabled config and simiplify test --- service/history/visibility_queue_factory.go | 1 + .../history/visibility_queue_task_executor.go | 10 +++++--- .../visibility_queue_task_executor_test.go | 1 + tests/advanced_visibility_test.go | 25 ++++++------------- 4 files changed, 17 insertions(+), 20 deletions(-) diff --git a/service/history/visibility_queue_factory.go b/service/history/visibility_queue_factory.go index 69fbe0c56f..badcbd138e 100644 --- a/service/history/visibility_queue_factory.go +++ b/service/history/visibility_queue_factory.go @@ -103,6 +103,7 @@ func (f *visibilityQueueFactory) CreateQueue( f.Config.VisibilityProcessorEnsureCloseBeforeDelete, f.Config.VisibilityProcessorEnableCloseWorkflowCleanup, f.Config.VisibilityProcessorRelocateAttributesMinBlobSize, + f.Config.ExternalPayloadsEnabled, ) if f.ExecutorWrapper != nil { executor = f.ExecutorWrapper.Wrap(executor) diff --git a/service/history/visibility_queue_task_executor.go b/service/history/visibility_queue_task_executor.go index bfcbe936b8..9ac064b28d 100644 --- a/service/history/visibility_queue_task_executor.go +++ b/service/history/visibility_queue_task_executor.go @@ -38,6 +38,7 @@ type ( ensureCloseBeforeDelete dynamicconfig.BoolPropertyFn enableCloseWorkflowCleanup dynamicconfig.BoolPropertyFnWithNamespaceFilter relocateAttributesMinBlobSize dynamicconfig.IntPropertyFnWithNamespaceFilter + externalPayloadsEnabled dynamicconfig.BoolPropertyFnWithNamespaceFilter } ) @@ -52,6 +53,7 @@ func newVisibilityQueueTaskExecutor( ensureCloseBeforeDelete dynamicconfig.BoolPropertyFn, enableCloseWorkflowCleanup dynamicconfig.BoolPropertyFnWithNamespaceFilter, relocateAttributesMinBlobSize dynamicconfig.IntPropertyFnWithNamespaceFilter, + externalPayloadsEnabled dynamicconfig.BoolPropertyFnWithNamespaceFilter, ) queues.Executor { return &visibilityQueueTaskExecutor{ shardContext: shardContext, @@ -63,6 +65,7 @@ func newVisibilityQueueTaskExecutor( ensureCloseBeforeDelete: ensureCloseBeforeDelete, enableCloseWorkflowCleanup: enableCloseWorkflowCleanup, relocateAttributesMinBlobSize: relocateAttributesMinBlobSize, + externalPayloadsEnabled: externalPayloadsEnabled, } } @@ -270,7 +273,7 @@ func (t *visibilityQueueTaskExecutor) processCloseExecution( mutableState.GetExecutionInfo().Memo, mutableState.GetExecutionInfo().SearchAttributes, ) - closedRequest, err := t.getClosedVisibilityRequest(ctx, requestBase, mutableState) + closedRequest, err := t.getClosedVisibilityRequest(ctx, requestBase, mutableState, namespaceEntry) if err != nil { return err } @@ -482,7 +485,7 @@ func (t *visibilityQueueTaskExecutor) processChasmTask( ) } - closedRequest, err := t.getClosedVisibilityRequest(ctx, requestBase, mutableState) + closedRequest, err := t.getClosedVisibilityRequest(ctx, requestBase, mutableState, namespaceEntry) if err != nil { return err } @@ -545,6 +548,7 @@ func (t *visibilityQueueTaskExecutor) getClosedVisibilityRequest( ctx context.Context, base *manager.VisibilityRequestBase, mutableState historyi.MutableState, + namespaceEntry *namespace.Namespace, ) (*manager.RecordWorkflowExecutionClosedRequest, error) { wfCloseTime, err := mutableState.GetWorkflowCloseTime(ctx) if err != nil { @@ -567,7 +571,7 @@ func (t *visibilityQueueTaskExecutor) getClosedVisibilityRequest( base.SearchAttributes.IndexedFields = make(map[string]*commonpb.Payload) } - if t.shardContext.GetConfig().ExternalPayloadsEnabled(mutableState.GetNamespaceEntry().Name().String()) { + if t.externalPayloadsEnabled(namespaceEntry.Name().String()) { externalPayloadCount := executionInfo.GetExecutionStats().GetExternalPayloadCount() externalPayloadSizeBytes := executionInfo.GetExecutionStats().GetExternalPayloadSize() externalPayloadCountPayload, _ := payload.Encode(externalPayloadCount) diff --git a/service/history/visibility_queue_task_executor_test.go b/service/history/visibility_queue_task_executor_test.go index 15130d1e3b..03a9ba8a50 100644 --- a/service/history/visibility_queue_task_executor_test.go +++ b/service/history/visibility_queue_task_executor_test.go @@ -184,6 +184,7 @@ func (s *visibilityQueueTaskExecutorSuite) SetupTest() { config.VisibilityProcessorEnsureCloseBeforeDelete, func(_ string) bool { return s.enableCloseWorkflowCleanup }, config.VisibilityProcessorRelocateAttributesMinBlobSize, + config.ExternalPayloadsEnabled, ) } diff --git a/tests/advanced_visibility_test.go b/tests/advanced_visibility_test.go index 25766c70c3..48c752c6d5 100644 --- a/tests/advanced_visibility_test.go +++ b/tests/advanced_visibility_test.go @@ -238,7 +238,12 @@ func (s *AdvancedVisibilitySuite) TestListWorkflow_SearchAttribute() { we, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request) s.NoError(err) query := fmt.Sprintf(`WorkflowId = "%s" and %s = "%s"`, id, testSearchAttributeKey, testSearchAttributeVal) - s.testHelperForReadOnce(we.GetRunId(), query) + openExecution := s.testHelperForReadOnce(we.GetRunId(), query) + searchValBytes, ok := openExecution.GetSearchAttributes().GetIndexedFields()[testSearchAttributeKey] + s.True(ok) + var searchVal string + _ = payload.Decode(searchValBytes, &searchVal) + s.Equal(testSearchAttributeVal, searchVal) searchAttributes := s.createSearchAttributes() // test upsert @@ -849,7 +854,7 @@ func (s *AdvancedVisibilitySuite) testListWorkflowHelper( s.Nil(nextPageToken) } -func (s *AdvancedVisibilitySuite) testHelperForReadOnce(expectedRunID string, query string) { +func (s *AdvancedVisibilitySuite) testHelperForReadOnce(expectedRunID string, query string) *workflowpb.WorkflowExecutionInfo { var openExecution *workflowpb.WorkflowExecutionInfo listRequest := &workflowservice.ListWorkflowExecutionsRequest{ Namespace: s.Namespace().String(), @@ -870,12 +875,7 @@ func (s *AdvancedVisibilitySuite) testHelperForReadOnce(expectedRunID string, qu s.NotNil(openExecution) s.Equal(expectedRunID, openExecution.GetExecution().GetRunId()) s.True(!openExecution.GetExecutionTime().AsTime().Before(openExecution.GetStartTime().AsTime())) - if openExecution.SearchAttributes != nil && len(openExecution.SearchAttributes.GetIndexedFields()) > 0 { - searchValBytes := openExecution.SearchAttributes.GetIndexedFields()[testSearchAttributeKey] - var searchVal string - _ = payload.Decode(searchValBytes, &searchVal) - s.Equal(testSearchAttributeVal, searchVal) - } + return openExecution } func (s *AdvancedVisibilitySuite) TestCountWorkflow() { @@ -2496,14 +2496,6 @@ func (s *AdvancedVisibilitySuite) TestListWorkflow_ExternalPayloadSearchAttribut }}, } - // Add testSearchAttributeKey to satisfy testHelperForReadOnce validation - attrValBytes, _ := payload.Encode(testSearchAttributeVal) - searchAttr := &commonpb.SearchAttributes{ - IndexedFields: map[string]*commonpb.Payload{ - testSearchAttributeKey: attrValBytes, - }, - } - request := &workflowservice.StartWorkflowExecutionRequest{ RequestId: uuid.NewString(), Namespace: s.Namespace().String(), @@ -2511,7 +2503,6 @@ func (s *AdvancedVisibilitySuite) TestListWorkflow_ExternalPayloadSearchAttribut WorkflowType: &commonpb.WorkflowType{Name: wt}, TaskQueue: &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, Input: workflowInput, - SearchAttributes: searchAttr, WorkflowRunTimeout: durationpb.New(100 * time.Second), WorkflowTaskTimeout: durationpb.New(10 * time.Second), Identity: "test-identity",