diff --git a/.gitignore b/.gitignore index 5c5ffffd..41afe5f8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ .venv .idea __pycache__ +.vscode +.DS_Store diff --git a/README.md b/README.md index 832e0d1f..5c1941d1 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [context_propagation](context_propagation) - Context propagation through workflows/activities via interceptor. * [custom_converter](custom_converter) - Use a custom payload converter to handle custom types. * [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity. +* [custom_metric](custom_metric) - Custom metric to record the workflow type in the activity schedule to start latency. * [dsl](dsl) - DSL workflow that executes steps defined in a YAML file. * [encryption](encryption) - Apply end-to-end encryption for all input/output. * [gevent_async](gevent_async) - Combine gevent and Temporal. diff --git a/custom_metric/README.md b/custom_metric/README.md new file mode 100644 index 00000000..de1d51d5 --- /dev/null +++ b/custom_metric/README.md @@ -0,0 +1,36 @@ +# Custom Metric + +This sample deminstrates two things: (1) how to make a custom metric, and (2) how to use an interceptor. +The custom metric in this sample is an activity schedule-to-start-latency with a workflow type tag. + +Please see the top-level [README](../README.md) for prerequisites such as Python, uv, starting the local temporal development server, etc. + +1. Run the worker with `uv run custom_metric/worker.py` +2. Request execution of the workflow with `uv run custom_metric/starter.py` +3. Go to `http://127.0.0.1:9090/metrics` in your browser + +You'll get something like the following: + +```txt +custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="100"} 1 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="500"} 1 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="1000"} 1 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="5000"} 2 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="10000"} 2 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="100000"} 2 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="1000000"} 2 +custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="+Inf"} 2 +custom_activity_schedule_to_start_latency_sum{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow"} 1010 +custom_activity_schedule_to_start_latency_count{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow"} 2 +... +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100"} 1 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="500"} 1 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="1000"} 1 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="5000"} 2 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="10000"} 2 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100000"} 2 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="1000000"} 2 +temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="+Inf"} 2 +temporal_activity_schedule_to_start_latency_sum{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue"} 1010 +temporal_activity_schedule_to_start_latency_count{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue"} 2 +``` diff --git a/custom_metric/__init__.py b/custom_metric/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/custom_metric/activity.py b/custom_metric/activity.py new file mode 100644 index 00000000..7f2ee116 --- /dev/null +++ b/custom_metric/activity.py @@ -0,0 +1,9 @@ +import time + +from temporalio import activity + + +@activity.defn +def print_and_sleep(): + print("In the activity.") + time.sleep(1) diff --git a/custom_metric/starter.py b/custom_metric/starter.py new file mode 100644 index 00000000..ded3a626 --- /dev/null +++ b/custom_metric/starter.py @@ -0,0 +1,23 @@ +import asyncio +import uuid + +from temporalio.client import Client + +from custom_metric.workflow import StartTwoActivitiesWorkflow + + +async def main(): + + client = await Client.connect( + "localhost:7233", + ) + + await client.start_workflow( + StartTwoActivitiesWorkflow.run, + id="execute-activity-workflow-" + str(uuid.uuid4()), + task_queue="custom-metric-task-queue", + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/custom_metric/worker.py b/custom_metric/worker.py new file mode 100644 index 00000000..9ffad207 --- /dev/null +++ b/custom_metric/worker.py @@ -0,0 +1,71 @@ +import asyncio +from concurrent.futures import ThreadPoolExecutor + +from temporalio import activity +from temporalio.client import Client +from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig +from temporalio.worker import ( + ActivityInboundInterceptor, + ExecuteActivityInput, + Interceptor, + Worker, +) + +from custom_metric.activity import print_and_sleep +from custom_metric.workflow import StartTwoActivitiesWorkflow + + +class SimpleWorkerInterceptor(Interceptor): + def intercept_activity( + self, next: ActivityInboundInterceptor + ) -> ActivityInboundInterceptor: + return CustomScheduleToStartInterceptor(next) + + +class CustomScheduleToStartInterceptor(ActivityInboundInterceptor): + async def execute_activity(self, input: ExecuteActivityInput): + + schedule_to_start = ( + activity.info().started_time + - activity.info().current_attempt_scheduled_time + ) + # Could do the original schedule time instead of current attempt + # schedule_to_start_second_option = activity.info().started_time - activity.info().scheduled_time + + meter = activity.metric_meter() + histogram = meter.create_histogram_timedelta( + "custom_activity_schedule_to_start_latency", + description="Time between activity scheduling and start", + unit="duration", + ) + histogram.record( + schedule_to_start, {"workflow_type": activity.info().workflow_type} + ) + return await self.next.execute_activity(input) + + +async def main(): + runtime = Runtime( + telemetry=TelemetryConfig(metrics=PrometheusConfig(bind_address="0.0.0.0:9090")) + ) + client = await Client.connect( + "localhost:7233", + runtime=runtime, + ) + worker = Worker( + client, + task_queue="custom-metric-task-queue", + interceptors=[SimpleWorkerInterceptor()], + workflows=[StartTwoActivitiesWorkflow], + activities=[print_and_sleep], + # only one activity executor with two concurrently scheduled activities + # to force a nontrivial schedule to start times + activity_executor=ThreadPoolExecutor(1), + max_concurrent_activities=1, + ) + + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/custom_metric/workflow.py b/custom_metric/workflow.py new file mode 100644 index 00000000..cf37823b --- /dev/null +++ b/custom_metric/workflow.py @@ -0,0 +1,25 @@ +import asyncio +from datetime import timedelta + +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from custom_metric.activity import print_and_sleep + + +@workflow.defn +class StartTwoActivitiesWorkflow: + @workflow.run + async def run(self): + # Request two concurrent activities with only one task slot so + # we can see nontrivial schedule to start times. + activity1 = workflow.execute_activity( + print_and_sleep, + start_to_close_timeout=timedelta(seconds=5), + ) + activity2 = workflow.execute_activity( + print_and_sleep, + start_to_close_timeout=timedelta(seconds=5), + ) + await asyncio.gather(activity1, activity2) + return None diff --git a/pyproject.toml b/pyproject.toml index 5fd96a81..e44b34b9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -86,6 +86,7 @@ packages = [ "context_propagation", "custom_converter", "custom_decorator", + "custom_metric", "dsl", "encryption", "gevent_async", diff --git a/tests/custom_metric/__init__.py b/tests/custom_metric/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/custom_metric/workflow_test.py b/tests/custom_metric/workflow_test.py new file mode 100644 index 00000000..4e107b79 --- /dev/null +++ b/tests/custom_metric/workflow_test.py @@ -0,0 +1,32 @@ +import uuid + +from temporalio import activity +from temporalio.client import Client +from temporalio.worker import Worker + +from custom_metric.worker import StartTwoActivitiesWorkflow + +_TASK_QUEUE = "custom-metric-task-queue" + +activity_counter = 0 + + +async def test_custom_metric_workflow(client: Client): + @activity.defn(name="print_and_sleep") + async def print_message_mock(): + global activity_counter + activity_counter += 1 + + async with Worker( + client, + task_queue=_TASK_QUEUE, + workflows=[StartTwoActivitiesWorkflow], + activities=[print_message_mock], + ): + result = await client.execute_workflow( + StartTwoActivitiesWorkflow.run, + id=str(uuid.uuid4()), + task_queue=_TASK_QUEUE, + ) + assert result is None + assert activity_counter == 2