Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/searchattribute/sadefs/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ const (
// successfully, the search attribute is removed. Format of a single problem:
// "category=<category> cause=<cause>".
TemporalReportedProblems = "TemporalReportedProblems"

// TemporalExternalPayloadCount is the count of external payloads referenced in the
// entire history tree of the execution.
TemporalExternalPayloadCount = "TemporalExternalPayloadCount"

// TemporalExternalPayloadSizeBytes is the total size in bytes of all external payloads
// referenced in the entire history tree of the execution.
TemporalExternalPayloadSizeBytes = "TemporalExternalPayloadSizeBytes"
)

var (
Expand Down Expand Up @@ -177,6 +185,8 @@ var (
TemporalWorkflowVersioningBehavior: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
TemporalWorkerDeployment: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
TemporalUsedWorkerDeploymentVersions: enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST,
TemporalExternalPayloadCount: enumspb.INDEXED_VALUE_TYPE_INT,
TemporalExternalPayloadSizeBytes: enumspb.INDEXED_VALUE_TYPE_INT,
}

// reserved are internal field names that can't be used as search attribute names.
Expand Down
1 change: 1 addition & 0 deletions service/history/visibility_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (f *visibilityQueueFactory) CreateQueue(
f.Config.VisibilityProcessorEnsureCloseBeforeDelete,
f.Config.VisibilityProcessorEnableCloseWorkflowCleanup,
f.Config.VisibilityProcessorRelocateAttributesMinBlobSize,
f.Config.ExternalPayloadsEnabled,
)
if f.ExecutorWrapper != nil {
executor = f.ExecutorWrapper.Wrap(executor)
Expand Down
26 changes: 24 additions & 2 deletions service/history/visibility_queue_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type (
ensureCloseBeforeDelete dynamicconfig.BoolPropertyFn
enableCloseWorkflowCleanup dynamicconfig.BoolPropertyFnWithNamespaceFilter
relocateAttributesMinBlobSize dynamicconfig.IntPropertyFnWithNamespaceFilter
externalPayloadsEnabled dynamicconfig.BoolPropertyFnWithNamespaceFilter
}
)

Expand All @@ -52,6 +53,7 @@ func newVisibilityQueueTaskExecutor(
ensureCloseBeforeDelete dynamicconfig.BoolPropertyFn,
enableCloseWorkflowCleanup dynamicconfig.BoolPropertyFnWithNamespaceFilter,
relocateAttributesMinBlobSize dynamicconfig.IntPropertyFnWithNamespaceFilter,
externalPayloadsEnabled dynamicconfig.BoolPropertyFnWithNamespaceFilter,
) queues.Executor {
return &visibilityQueueTaskExecutor{
shardContext: shardContext,
Expand All @@ -63,6 +65,7 @@ func newVisibilityQueueTaskExecutor(
ensureCloseBeforeDelete: ensureCloseBeforeDelete,
enableCloseWorkflowCleanup: enableCloseWorkflowCleanup,
relocateAttributesMinBlobSize: relocateAttributesMinBlobSize,
externalPayloadsEnabled: externalPayloadsEnabled,
}
}

Expand Down Expand Up @@ -270,7 +273,7 @@ func (t *visibilityQueueTaskExecutor) processCloseExecution(
mutableState.GetExecutionInfo().Memo,
mutableState.GetExecutionInfo().SearchAttributes,
)
closedRequest, err := t.getClosedVisibilityRequest(ctx, requestBase, mutableState)
closedRequest, err := t.getClosedVisibilityRequest(ctx, requestBase, mutableState, namespaceEntry)
if err != nil {
return err
}
Expand Down Expand Up @@ -482,7 +485,7 @@ func (t *visibilityQueueTaskExecutor) processChasmTask(
)
}

closedRequest, err := t.getClosedVisibilityRequest(ctx, requestBase, mutableState)
closedRequest, err := t.getClosedVisibilityRequest(ctx, requestBase, mutableState, namespaceEntry)
if err != nil {
return err
}
Expand Down Expand Up @@ -545,6 +548,7 @@ func (t *visibilityQueueTaskExecutor) getClosedVisibilityRequest(
ctx context.Context,
base *manager.VisibilityRequestBase,
mutableState historyi.MutableState,
namespaceEntry *namespace.Namespace,
) (*manager.RecordWorkflowExecutionClosedRequest, error) {
wfCloseTime, err := mutableState.GetWorkflowCloseTime(ctx)
if err != nil {
Expand All @@ -558,6 +562,24 @@ func (t *visibilityQueueTaskExecutor) getClosedVisibilityRequest(
executionInfo := mutableState.GetExecutionInfo()
stateTransitionCount := executionInfo.GetStateTransitionCount()
historySizeBytes := executionInfo.GetExecutionStats().GetHistorySize()

if base.SearchAttributes == nil {
base.SearchAttributes = &commonpb.SearchAttributes{
IndexedFields: make(map[string]*commonpb.Payload),
}
} else if base.SearchAttributes.IndexedFields == nil {
base.SearchAttributes.IndexedFields = make(map[string]*commonpb.Payload)
}

if t.externalPayloadsEnabled(namespaceEntry.Name().String()) {
externalPayloadCount := executionInfo.GetExecutionStats().GetExternalPayloadCount()
externalPayloadSizeBytes := executionInfo.GetExecutionStats().GetExternalPayloadSize()
externalPayloadCountPayload, _ := payload.Encode(externalPayloadCount)
externalPayloadSizeBytesPayload, _ := payload.Encode(externalPayloadSizeBytes)
base.SearchAttributes.IndexedFields[sadefs.TemporalExternalPayloadCount] = externalPayloadCountPayload
base.SearchAttributes.IndexedFields[sadefs.TemporalExternalPayloadSizeBytes] = externalPayloadSizeBytesPayload
}

return &manager.RecordWorkflowExecutionClosedRequest{
VisibilityRequestBase: base,
CloseTime: wfCloseTime,
Expand Down
1 change: 1 addition & 0 deletions service/history/visibility_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (s *visibilityQueueTaskExecutorSuite) SetupTest() {
config.VisibilityProcessorEnsureCloseBeforeDelete,
func(_ string) bool { return s.enableCloseWorkflowCleanup },
config.VisibilityProcessorRelocateAttributesMinBlobSize,
config.ExternalPayloadsEnabled,
)
}

Expand Down
75 changes: 67 additions & 8 deletions tests/advanced_visibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/searchattribute/sadefs"
"go.temporal.io/server/common/testing/protorequire"
"go.temporal.io/server/common/testing/testvars"
"go.temporal.io/server/common/worker_versioning"
"go.temporal.io/server/service/worker/scanner/build_ids"
"go.temporal.io/server/tests/testcore"
Expand Down Expand Up @@ -87,6 +88,8 @@ func (s *AdvancedVisibilitySuite) SetupSuite() {
dynamicconfig.RemovableBuildIdDurationSinceDefault.Key(): time.Microsecond,
// Enable the unified query converter
dynamicconfig.VisibilityEnableUnifiedQueryConverter.Key(): s.enableUnifiedQueryConverter,
// Enable external payload tracking for TestListWorkflow_ExternalPayloadSearchAttributes
dynamicconfig.ExternalPayloadsEnabled.Key(): true,
}
s.FunctionalTestBase.SetupSuiteWithCluster(testcore.WithDynamicConfigOverrides(dynamicConfigOverrides))

Expand Down Expand Up @@ -235,7 +238,12 @@ func (s *AdvancedVisibilitySuite) TestListWorkflow_SearchAttribute() {
we, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request)
s.NoError(err)
query := fmt.Sprintf(`WorkflowId = "%s" and %s = "%s"`, id, testSearchAttributeKey, testSearchAttributeVal)
s.testHelperForReadOnce(we.GetRunId(), query)
openExecution := s.testHelperForReadOnce(we.GetRunId(), query)
searchValBytes, ok := openExecution.GetSearchAttributes().GetIndexedFields()[testSearchAttributeKey]
s.True(ok)
var searchVal string
_ = payload.Decode(searchValBytes, &searchVal)
s.Equal(testSearchAttributeVal, searchVal)

searchAttributes := s.createSearchAttributes()
// test upsert
Expand Down Expand Up @@ -846,7 +854,7 @@ func (s *AdvancedVisibilitySuite) testListWorkflowHelper(
s.Nil(nextPageToken)
}

func (s *AdvancedVisibilitySuite) testHelperForReadOnce(expectedRunID string, query string) {
func (s *AdvancedVisibilitySuite) testHelperForReadOnce(expectedRunID string, query string) *workflowpb.WorkflowExecutionInfo {
var openExecution *workflowpb.WorkflowExecutionInfo
listRequest := &workflowservice.ListWorkflowExecutionsRequest{
Namespace: s.Namespace().String(),
Expand All @@ -867,12 +875,7 @@ func (s *AdvancedVisibilitySuite) testHelperForReadOnce(expectedRunID string, qu
s.NotNil(openExecution)
s.Equal(expectedRunID, openExecution.GetExecution().GetRunId())
s.True(!openExecution.GetExecutionTime().AsTime().Before(openExecution.GetStartTime().AsTime()))
if openExecution.SearchAttributes != nil && len(openExecution.SearchAttributes.GetIndexedFields()) > 0 {
searchValBytes := openExecution.SearchAttributes.GetIndexedFields()[testSearchAttributeKey]
var searchVal string
_ = payload.Decode(searchValBytes, &searchVal)
s.Equal(testSearchAttributeVal, searchVal)
}
return openExecution
}

func (s *AdvancedVisibilitySuite) TestCountWorkflow() {
Expand Down Expand Up @@ -2477,6 +2480,62 @@ func (s *AdvancedVisibilitySuite) TestBuildIdScavenger_DeletesUnusedBuildId() {
s.Require().Equal(0, len(res.BuildIdReachability[0].TaskQueueReachability))
}

func (s *AdvancedVisibilitySuite) TestListWorkflow_ExternalPayloadSearchAttributes() {
id := "es-functional-external-payload-test"
wt := "es-functional-external-payload-test-type"
tl := "es-functional-external-payload-test-taskqueue"

// Create workflow input with external payload
externalPayloadSize := int64(1024)
workflowInput := &commonpb.Payloads{
Payloads: []*commonpb.Payload{{
Data: []byte("workflow input"),
ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{
{SizeBytes: externalPayloadSize},
},
}},
}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.NewString(),
Namespace: s.Namespace().String(),
WorkflowId: id,
WorkflowType: &commonpb.WorkflowType{Name: wt},
TaskQueue: &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
Input: workflowInput,
WorkflowRunTimeout: durationpb.New(100 * time.Second),
WorkflowTaskTimeout: durationpb.New(10 * time.Second),
Identity: "test-identity",
}

we, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request)
s.NoError(err)

// Complete the workflow using the new taskpoller API
tv := testvars.New(s.T()).WithTaskQueue(tl)
_, err = s.TaskPoller().PollAndHandleWorkflowTask(tv,
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
return &workflowservice.RespondWorkflowTaskCompletedRequest{
Commands: []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{
CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("Done"),
},
},
}},
}, nil
},
)
s.NoError(err)

query := fmt.Sprintf(`WorkflowId = "%s" AND %s = 1`, id, sadefs.TemporalExternalPayloadCount)
s.testHelperForReadOnce(we.GetRunId(), query)

query = fmt.Sprintf(`WorkflowId = "%s" AND %s = %d`, id, sadefs.TemporalExternalPayloadSizeBytes, externalPayloadSize)
s.testHelperForReadOnce(we.GetRunId(), query)
}

func (s *AdvancedVisibilitySuite) TestScheduleListingWithSearchAttributes() {
ctx := testcore.NewContext()

Expand Down
Loading