From 755fe64a13860bac2f094596f0f5928c225a20e3 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Mon, 14 Apr 2025 10:24:07 -0500 Subject: [PATCH 1/8] added custom metric sample --- .gitignore | 2 + README.md | 1 + custom_metric/README.md | 33 +++++++++++++++ custom_metric/activity_worker.py | 71 ++++++++++++++++++++++++++++++++ custom_metric/client.py | 23 +++++++++++ custom_metric/workflow_worker.py | 39 ++++++++++++++++++ 6 files changed, 169 insertions(+) create mode 100644 custom_metric/README.md create mode 100644 custom_metric/activity_worker.py create mode 100644 custom_metric/client.py create mode 100644 custom_metric/workflow_worker.py diff --git a/.gitignore b/.gitignore index 5c5ffffd..41afe5f8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ .venv .idea __pycache__ +.vscode +.DS_Store diff --git a/README.md b/README.md index 832e0d1f..5c1941d1 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [context_propagation](context_propagation) - Context propagation through workflows/activities via interceptor. * [custom_converter](custom_converter) - Use a custom payload converter to handle custom types. * [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity. +* [custom_metric](custom_metric) - Custom metric to record the workflow type in the activity schedule to start latency. * [dsl](dsl) - DSL workflow that executes steps defined in a YAML file. * [encryption](encryption) - Apply end-to-end encryption for all input/output. * [gevent_async](gevent_async) - Combine gevent and Temporal. diff --git a/custom_metric/README.md b/custom_metric/README.md new file mode 100644 index 00000000..2af67477 --- /dev/null +++ b/custom_metric/README.md @@ -0,0 +1,33 @@ +# Custom Metric + +1. Run the server with `temporal server start-dev` +2. Run the client with `uv run custom_metric/client.py` +3. Run the workflow worker with `uv run custom_metric/workflow_worker.py` +4. Run the activity worker with `uv run custom_metric/activity_worker.py` +5. Go to `http://127.0.0.1:9090/metrics` in your browser + +You'll get something like the following: + +```txt +custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="100"} 0 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="500"} 0 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="1000"} 0 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="5000"} 0 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="10000"} 1 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="100000"} 1 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="1000000"} 1 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="+Inf"} 1 +custom_activity_schedule_to_start_latency_sum{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow"} 6336 +custom_activity_schedule_to_start_latency_count{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow"} 1 +... +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100"} 0 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="500"} 0 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="1000"} 0 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="5000"} 0 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="10000"} 1 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100000"} 1 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="1000000"} 1 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="+Inf"} 1 +temporal_activity_schedule_to_start_latency_sum{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue"} 6336 +temporal_activity_schedule_to_start_latency_count{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue"} 1 +``` diff --git a/custom_metric/activity_worker.py b/custom_metric/activity_worker.py new file mode 100644 index 00000000..2d1ffab0 --- /dev/null +++ b/custom_metric/activity_worker.py @@ -0,0 +1,71 @@ +import asyncio +from concurrent.futures import ThreadPoolExecutor + +from temporalio import activity +from temporalio.client import Client +from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig +from temporalio.worker import ( + ActivityInboundInterceptor, + ExecuteActivityInput, + Interceptor, + Worker, +) + + +class SimpleWorkerInterceptor(Interceptor): + + def intercept_activity( + self, next: ActivityInboundInterceptor + ) -> ActivityInboundInterceptor: + return CustomScheduleToStartInterceptor(next) + + +class CustomScheduleToStartInterceptor(ActivityInboundInterceptor): + + async def execute_activity(self, input: ExecuteActivityInput): + + schedule_to_start = ( + activity.info().started_time + - activity.info().current_attempt_scheduled_time + ) + # Could do the original schedule time instead of current attempt + # schedule_to_start_second_option = activity.info().started_time - activity.info().scheduled_time + + meter = activity.metric_meter() + histogram = meter.create_histogram_timedelta( + "custom_activity_schedule_to_start_latency", + description="Time between activity scheduling and start", + unit="duration", + ) + histogram.record( + schedule_to_start, {"workflow_type": activity.info().workflow_type} + ) + return await self.next.execute_activity(input) + + +@activity.defn +def print_message(): + print("in the activity") + + +async def main(): + runtime = Runtime( + telemetry=TelemetryConfig(metrics=PrometheusConfig(bind_address="0.0.0.0:9090")) + ) + client = await Client.connect( + "localhost:7233", + runtime=runtime, + ) + worker = Worker( + client, + task_queue="custom-metric-task-queue", + activities=[print_message], + activity_executor=ThreadPoolExecutor(5), + interceptors=[SimpleWorkerInterceptor()], + ) + + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/custom_metric/client.py b/custom_metric/client.py new file mode 100644 index 00000000..8b62d1b9 --- /dev/null +++ b/custom_metric/client.py @@ -0,0 +1,23 @@ +import asyncio +import uuid + +from temporalio.client import Client + +from custom_metric.workflow_worker import ExecuteActivityWorkflow + + +async def main(): + + client = await Client.connect( + "localhost:7233", + ) + + await client.start_workflow( + ExecuteActivityWorkflow.run, + id="execute-activity-workflow-" + str(uuid.uuid4()), + task_queue="custom-metric-task-queue", + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/custom_metric/workflow_worker.py b/custom_metric/workflow_worker.py new file mode 100644 index 00000000..5fad0fe9 --- /dev/null +++ b/custom_metric/workflow_worker.py @@ -0,0 +1,39 @@ +import asyncio +from datetime import timedelta + +from temporalio import workflow +from temporalio.client import Client +from temporalio.worker import Worker + +with workflow.unsafe.imports_passed_through(): + from custom_metric.activity_worker import print_message + + +@workflow.defn +class ExecuteActivityWorkflow: + + @workflow.run + async def run(self): + await workflow.execute_activity( + print_message, + start_to_close_timeout=timedelta(seconds=5), + ) + return None + + +async def main(): + + client = await Client.connect( + "localhost:7233", + ) + worker = Worker( + client, + task_queue="custom-metric-task-queue", + workflows=[ExecuteActivityWorkflow], + ) + + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) From e606ed5b8acf49f435895f53e86cbb39b12888cd Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Fri, 18 Apr 2025 13:33:21 -0500 Subject: [PATCH 2/8] Responding to code review --- custom_metric/README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/custom_metric/README.md b/custom_metric/README.md index 2af67477..6e401bc5 100644 --- a/custom_metric/README.md +++ b/custom_metric/README.md @@ -1,5 +1,10 @@ # Custom Metric +This sample deminstrates two things: how to make a custom metric, and how to use an interceptor. +The custom metric in this sample shows how to make an activity schedule-to-start-latency with a workflow type tag. + +Please see the top-level README for prerequisites such as Python, uv, starting the local temporal development server, etc. + 1. Run the server with `temporal server start-dev` 2. Run the client with `uv run custom_metric/client.py` 3. Run the workflow worker with `uv run custom_metric/workflow_worker.py` From 70c1c73a8d89a12742a1debba103a04d99182724 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Fri, 18 Apr 2025 14:03:28 -0500 Subject: [PATCH 3/8] responding to code review --- custom_metric/README.md | 52 +++++++++---------- custom_metric/client.py | 23 -------- .../{activity_worker.py => worker.py} | 42 ++++++++++++--- custom_metric/workflow_worker.py | 39 -------------- 4 files changed, 59 insertions(+), 97 deletions(-) delete mode 100644 custom_metric/client.py rename custom_metric/{activity_worker.py => worker.py} (66%) delete mode 100644 custom_metric/workflow_worker.py diff --git a/custom_metric/README.md b/custom_metric/README.md index 6e401bc5..cc63c743 100644 --- a/custom_metric/README.md +++ b/custom_metric/README.md @@ -1,38 +1,36 @@ # Custom Metric -This sample deminstrates two things: how to make a custom metric, and how to use an interceptor. -The custom metric in this sample shows how to make an activity schedule-to-start-latency with a workflow type tag. +This sample deminstrates two things: (1) how to make a custom metric, and (2) how to use an interceptor. +The custom metric in this sample is an activity schedule-to-start-latency with a workflow type tag. Please see the top-level README for prerequisites such as Python, uv, starting the local temporal development server, etc. -1. Run the server with `temporal server start-dev` -2. Run the client with `uv run custom_metric/client.py` -3. Run the workflow worker with `uv run custom_metric/workflow_worker.py` -4. Run the activity worker with `uv run custom_metric/activity_worker.py` -5. Go to `http://127.0.0.1:9090/metrics` in your browser +1. Run the worker with `uv run custom_metric/worker.py` +2. Request execution of the workflow with `temporal workflow start --type ExecuteActivityWorkflow --task-queue custom-metric-task-queue` +3. Go to `http://127.0.0.1:9090/metrics` in your browser You'll get something like the following: ```txt -custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="100"} 0 -custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="500"} 0 -custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="1000"} 0 -custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="5000"} 0 -custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="10000"} 1 -custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="100000"} 1 -custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="1000000"} 1 -custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="+Inf"} 1 -custom_activity_schedule_to_start_latency_sum{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow"} 6336 -custom_activity_schedule_to_start_latency_count{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow"} 1 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="100"} 1 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="500"} 1 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="1000"} 1 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="5000"} 2 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="10000"} 2 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="100000"} 2 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="1000000"} 2 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="+Inf"} 2 +custom_activity_schedule_to_start_latency_sum{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow"} 1010 +custom_activity_schedule_to_start_latency_count{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow"} 2 ... -temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100"} 0 -temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="500"} 0 -temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="1000"} 0 -temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="5000"} 0 -temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="10000"} 1 -temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100000"} 1 -temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="1000000"} 1 -temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="+Inf"} 1 -temporal_activity_schedule_to_start_latency_sum{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue"} 6336 -temporal_activity_schedule_to_start_latency_count{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue"} 1 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100"} 1 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="500"} 1 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="1000"} 1 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="5000"} 2 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="10000"} 2 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100000"} 2 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="1000000"} 2 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="+Inf"} 2 +temporal_activity_schedule_to_start_latency_sum{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue"} 1010 +temporal_activity_schedule_to_start_latency_count{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue"} 2 ``` diff --git a/custom_metric/client.py b/custom_metric/client.py deleted file mode 100644 index 8b62d1b9..00000000 --- a/custom_metric/client.py +++ /dev/null @@ -1,23 +0,0 @@ -import asyncio -import uuid - -from temporalio.client import Client - -from custom_metric.workflow_worker import ExecuteActivityWorkflow - - -async def main(): - - client = await Client.connect( - "localhost:7233", - ) - - await client.start_workflow( - ExecuteActivityWorkflow.run, - id="execute-activity-workflow-" + str(uuid.uuid4()), - task_queue="custom-metric-task-queue", - ) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/custom_metric/activity_worker.py b/custom_metric/worker.py similarity index 66% rename from custom_metric/activity_worker.py rename to custom_metric/worker.py index 2d1ffab0..a51ad487 100644 --- a/custom_metric/activity_worker.py +++ b/custom_metric/worker.py @@ -1,7 +1,9 @@ import asyncio +import time from concurrent.futures import ThreadPoolExecutor +from datetime import timedelta -from temporalio import activity +from temporalio import activity, workflow from temporalio.client import Client from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig from temporalio.worker import ( @@ -12,6 +14,31 @@ ) +@activity.defn +def print_message(): + print("In the activity.") + time.sleep(1) + + +@workflow.defn +class ExecuteActivityWorkflow: + + @workflow.run + async def run(self): + # Request two concurrent activities with only one task slot so + # we can see nontrivial schedule to start times. + activity1 = workflow.execute_activity( + print_message, + start_to_close_timeout=timedelta(seconds=5), + ) + activity2 = workflow.execute_activity( + print_message, + start_to_close_timeout=timedelta(seconds=5), + ) + await asyncio.gather(activity1, activity2) + return None + + class SimpleWorkerInterceptor(Interceptor): def intercept_activity( @@ -43,11 +70,6 @@ async def execute_activity(self, input: ExecuteActivityInput): return await self.next.execute_activity(input) -@activity.defn -def print_message(): - print("in the activity") - - async def main(): runtime = Runtime( telemetry=TelemetryConfig(metrics=PrometheusConfig(bind_address="0.0.0.0:9090")) @@ -59,9 +81,13 @@ async def main(): worker = Worker( client, task_queue="custom-metric-task-queue", - activities=[print_message], - activity_executor=ThreadPoolExecutor(5), interceptors=[SimpleWorkerInterceptor()], + workflows=[ExecuteActivityWorkflow], + activities=[print_message], + # only one activity executor with two concurrently scheduled activities + # to force a nontrivial schedule to start times + activity_executor=ThreadPoolExecutor(1), + max_concurrent_activities=1, ) await worker.run() diff --git a/custom_metric/workflow_worker.py b/custom_metric/workflow_worker.py deleted file mode 100644 index 5fad0fe9..00000000 --- a/custom_metric/workflow_worker.py +++ /dev/null @@ -1,39 +0,0 @@ -import asyncio -from datetime import timedelta - -from temporalio import workflow -from temporalio.client import Client -from temporalio.worker import Worker - -with workflow.unsafe.imports_passed_through(): - from custom_metric.activity_worker import print_message - - -@workflow.defn -class ExecuteActivityWorkflow: - - @workflow.run - async def run(self): - await workflow.execute_activity( - print_message, - start_to_close_timeout=timedelta(seconds=5), - ) - return None - - -async def main(): - - client = await Client.connect( - "localhost:7233", - ) - worker = Worker( - client, - task_queue="custom-metric-task-queue", - workflows=[ExecuteActivityWorkflow], - ) - - await worker.run() - - -if __name__ == "__main__": - asyncio.run(main()) From 95c09a6edb451031bd952bc5e0b4b5da4c4801ae Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Fri, 18 Apr 2025 14:27:31 -0500 Subject: [PATCH 4/8] added tests --- tests/custom_metric/__init__.py | 0 tests/custom_metric/workflow_test.py | 35 ++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 tests/custom_metric/__init__.py create mode 100644 tests/custom_metric/workflow_test.py diff --git a/tests/custom_metric/__init__.py b/tests/custom_metric/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/custom_metric/workflow_test.py b/tests/custom_metric/workflow_test.py new file mode 100644 index 00000000..39e6322e --- /dev/null +++ b/tests/custom_metric/workflow_test.py @@ -0,0 +1,35 @@ +import uuid + +from temporalio import activity +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from custom_metric.worker import ExecuteActivityWorkflow + +_TASK_QUEUE = "custom-metric-task-queue" + +activity_counter = 0 + + +async def test_sleep_for_days_workflow(): + + @activity.defn(name="print_message") + async def print_message_mock(): + global activity_counter + activity_counter += 1 + + async with await WorkflowEnvironment.start_time_skipping() as env: + + async with Worker( + env.client, + task_queue=_TASK_QUEUE, + workflows=[ExecuteActivityWorkflow], + activities=[print_message_mock], + ): + result = await env.client.execute_workflow( + ExecuteActivityWorkflow.run, + id=str(uuid.uuid4()), + task_queue=_TASK_QUEUE, + ) + assert result is None + assert activity_counter == 2 From 16218552c4f9828c174ce3218236d0a74ec837b5 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Mon, 21 Apr 2025 08:32:25 -0500 Subject: [PATCH 5/8] responding to code review --- custom_metric/README.md | 24 ++++++++++---------- custom_metric/activity.py | 9 ++++++++ custom_metric/starter.py | 23 +++++++++++++++++++ custom_metric/worker.py | 34 ++++------------------------ custom_metric/workflow.py | 26 +++++++++++++++++++++ tests/custom_metric/workflow_test.py | 16 ++++++------- 6 files changed, 82 insertions(+), 50 deletions(-) create mode 100644 custom_metric/activity.py create mode 100644 custom_metric/starter.py create mode 100644 custom_metric/workflow.py diff --git a/custom_metric/README.md b/custom_metric/README.md index cc63c743..de1d51d5 100644 --- a/custom_metric/README.md +++ b/custom_metric/README.md @@ -3,25 +3,25 @@ This sample deminstrates two things: (1) how to make a custom metric, and (2) how to use an interceptor. The custom metric in this sample is an activity schedule-to-start-latency with a workflow type tag. -Please see the top-level README for prerequisites such as Python, uv, starting the local temporal development server, etc. +Please see the top-level [README](../README.md) for prerequisites such as Python, uv, starting the local temporal development server, etc. 1. Run the worker with `uv run custom_metric/worker.py` -2. Request execution of the workflow with `temporal workflow start --type ExecuteActivityWorkflow --task-queue custom-metric-task-queue` +2. Request execution of the workflow with `uv run custom_metric/starter.py` 3. Go to `http://127.0.0.1:9090/metrics` in your browser You'll get something like the following: ```txt -custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="100"} 1 -custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="500"} 1 -custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="1000"} 1 -custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="5000"} 2 -custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="10000"} 2 -custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="100000"} 2 -custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="1000000"} 2 -custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="+Inf"} 2 -custom_activity_schedule_to_start_latency_sum{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow"} 1010 -custom_activity_schedule_to_start_latency_count{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow"} 2 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="100"} 1 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="500"} 1 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="1000"} 1 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="5000"} 2 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="10000"} 2 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="100000"} 2 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="1000000"} 2 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="+Inf"} 2 +custom_activity_schedule_to_start_latency_sum{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow"} 1010 +custom_activity_schedule_to_start_latency_count{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow"} 2 ... temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100"} 1 temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="500"} 1 diff --git a/custom_metric/activity.py b/custom_metric/activity.py new file mode 100644 index 00000000..7f2ee116 --- /dev/null +++ b/custom_metric/activity.py @@ -0,0 +1,9 @@ +import time + +from temporalio import activity + + +@activity.defn +def print_and_sleep(): + print("In the activity.") + time.sleep(1) diff --git a/custom_metric/starter.py b/custom_metric/starter.py new file mode 100644 index 00000000..ded3a626 --- /dev/null +++ b/custom_metric/starter.py @@ -0,0 +1,23 @@ +import asyncio +import uuid + +from temporalio.client import Client + +from custom_metric.workflow import StartTwoActivitiesWorkflow + + +async def main(): + + client = await Client.connect( + "localhost:7233", + ) + + await client.start_workflow( + StartTwoActivitiesWorkflow.run, + id="execute-activity-workflow-" + str(uuid.uuid4()), + task_queue="custom-metric-task-queue", + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/custom_metric/worker.py b/custom_metric/worker.py index a51ad487..168a21b3 100644 --- a/custom_metric/worker.py +++ b/custom_metric/worker.py @@ -1,9 +1,7 @@ import asyncio -import time from concurrent.futures import ThreadPoolExecutor -from datetime import timedelta -from temporalio import activity, workflow +from temporalio import activity from temporalio.client import Client from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig from temporalio.worker import ( @@ -13,30 +11,8 @@ Worker, ) - -@activity.defn -def print_message(): - print("In the activity.") - time.sleep(1) - - -@workflow.defn -class ExecuteActivityWorkflow: - - @workflow.run - async def run(self): - # Request two concurrent activities with only one task slot so - # we can see nontrivial schedule to start times. - activity1 = workflow.execute_activity( - print_message, - start_to_close_timeout=timedelta(seconds=5), - ) - activity2 = workflow.execute_activity( - print_message, - start_to_close_timeout=timedelta(seconds=5), - ) - await asyncio.gather(activity1, activity2) - return None +from custom_metric.activity import print_and_sleep +from custom_metric.workflow import StartTwoActivitiesWorkflow class SimpleWorkerInterceptor(Interceptor): @@ -82,8 +58,8 @@ async def main(): client, task_queue="custom-metric-task-queue", interceptors=[SimpleWorkerInterceptor()], - workflows=[ExecuteActivityWorkflow], - activities=[print_message], + workflows=[StartTwoActivitiesWorkflow], + activities=[print_and_sleep], # only one activity executor with two concurrently scheduled activities # to force a nontrivial schedule to start times activity_executor=ThreadPoolExecutor(1), diff --git a/custom_metric/workflow.py b/custom_metric/workflow.py new file mode 100644 index 00000000..bb8bfeb7 --- /dev/null +++ b/custom_metric/workflow.py @@ -0,0 +1,26 @@ +import asyncio +from datetime import timedelta + +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from custom_metric.activity import print_and_sleep + + +@workflow.defn +class StartTwoActivitiesWorkflow: + + @workflow.run + async def run(self): + # Request two concurrent activities with only one task slot so + # we can see nontrivial schedule to start times. + activity1 = workflow.execute_activity( + print_and_sleep, + start_to_close_timeout=timedelta(seconds=5), + ) + activity2 = workflow.execute_activity( + print_and_sleep, + start_to_close_timeout=timedelta(seconds=5), + ) + await asyncio.gather(activity1, activity2) + return None diff --git a/tests/custom_metric/workflow_test.py b/tests/custom_metric/workflow_test.py index 39e6322e..c223f077 100644 --- a/tests/custom_metric/workflow_test.py +++ b/tests/custom_metric/workflow_test.py @@ -1,33 +1,31 @@ import uuid from temporalio import activity -from temporalio.testing import WorkflowEnvironment +from temporalio.client import Client from temporalio.worker import Worker -from custom_metric.worker import ExecuteActivityWorkflow +from custom_metric.worker import StartTwoActivitiesWorkflow _TASK_QUEUE = "custom-metric-task-queue" activity_counter = 0 -async def test_sleep_for_days_workflow(): +async def test_custom_metric_workflow(client: Client): @activity.defn(name="print_message") async def print_message_mock(): global activity_counter activity_counter += 1 - async with await WorkflowEnvironment.start_time_skipping() as env: - async with Worker( - env.client, + client, task_queue=_TASK_QUEUE, - workflows=[ExecuteActivityWorkflow], + workflows=[StartTwoActivitiesWorkflow], activities=[print_message_mock], ): - result = await env.client.execute_workflow( - ExecuteActivityWorkflow.run, + result = await client.execute_workflow( + StartTwoActivitiesWorkflow.run, id=str(uuid.uuid4()), task_queue=_TASK_QUEUE, ) From 2ab0cf12ec2afbf156aed049e0797fd6ea5fc1bf Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Mon, 21 Apr 2025 08:33:38 -0500 Subject: [PATCH 6/8] ran formatter --- custom_metric/worker.py | 2 -- custom_metric/workflow.py | 1 - tests/custom_metric/workflow_test.py | 1 - 3 files changed, 4 deletions(-) diff --git a/custom_metric/worker.py b/custom_metric/worker.py index 168a21b3..9ffad207 100644 --- a/custom_metric/worker.py +++ b/custom_metric/worker.py @@ -16,7 +16,6 @@ class SimpleWorkerInterceptor(Interceptor): - def intercept_activity( self, next: ActivityInboundInterceptor ) -> ActivityInboundInterceptor: @@ -24,7 +23,6 @@ def intercept_activity( class CustomScheduleToStartInterceptor(ActivityInboundInterceptor): - async def execute_activity(self, input: ExecuteActivityInput): schedule_to_start = ( diff --git a/custom_metric/workflow.py b/custom_metric/workflow.py index bb8bfeb7..cf37823b 100644 --- a/custom_metric/workflow.py +++ b/custom_metric/workflow.py @@ -9,7 +9,6 @@ @workflow.defn class StartTwoActivitiesWorkflow: - @workflow.run async def run(self): # Request two concurrent activities with only one task slot so diff --git a/tests/custom_metric/workflow_test.py b/tests/custom_metric/workflow_test.py index c223f077..e3027a45 100644 --- a/tests/custom_metric/workflow_test.py +++ b/tests/custom_metric/workflow_test.py @@ -12,7 +12,6 @@ async def test_custom_metric_workflow(client: Client): - @activity.defn(name="print_message") async def print_message_mock(): global activity_counter From 3379774975b652a36977ddbf9c1a8b14a6d0c54c Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Mon, 21 Apr 2025 08:38:14 -0500 Subject: [PATCH 7/8] fixing lints --- custom_metric/__init__.py | 0 pyproject.toml | 1 + 2 files changed, 1 insertion(+) create mode 100644 custom_metric/__init__.py diff --git a/custom_metric/__init__.py b/custom_metric/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pyproject.toml b/pyproject.toml index 5fd96a81..e44b34b9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -86,6 +86,7 @@ packages = [ "context_propagation", "custom_converter", "custom_decorator", + "custom_metric", "dsl", "encryption", "gevent_async", From 84fdfeef7f2960872164714bfcee21dd990fb003 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Mon, 21 Apr 2025 09:52:18 -0500 Subject: [PATCH 8/8] responding to code review --- tests/custom_metric/workflow_test.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/tests/custom_metric/workflow_test.py b/tests/custom_metric/workflow_test.py index e3027a45..4e107b79 100644 --- a/tests/custom_metric/workflow_test.py +++ b/tests/custom_metric/workflow_test.py @@ -12,21 +12,21 @@ async def test_custom_metric_workflow(client: Client): - @activity.defn(name="print_message") + @activity.defn(name="print_and_sleep") async def print_message_mock(): global activity_counter activity_counter += 1 - async with Worker( - client, + async with Worker( + client, + task_queue=_TASK_QUEUE, + workflows=[StartTwoActivitiesWorkflow], + activities=[print_message_mock], + ): + result = await client.execute_workflow( + StartTwoActivitiesWorkflow.run, + id=str(uuid.uuid4()), task_queue=_TASK_QUEUE, - workflows=[StartTwoActivitiesWorkflow], - activities=[print_message_mock], - ): - result = await client.execute_workflow( - StartTwoActivitiesWorkflow.run, - id=str(uuid.uuid4()), - task_queue=_TASK_QUEUE, - ) - assert result is None - assert activity_counter == 2 + ) + assert result is None + assert activity_counter == 2