Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.venv
.idea
__pycache__
.vscode
.DS_Store
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [context_propagation](context_propagation) - Context propagation through workflows/activities via interceptor.
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
* [custom_metric](custom_metric) - Custom metric to record the workflow type in the activity schedule to start latency.
* [dsl](dsl) - DSL workflow that executes steps defined in a YAML file.
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
* [gevent_async](gevent_async) - Combine gevent and Temporal.
Expand Down
36 changes: 36 additions & 0 deletions custom_metric/README.md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test or two? (I know not all samples have tests, something we regret, but adding them helps us catch if they start breaking)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added!

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Custom Metric

This sample deminstrates two things: (1) how to make a custom metric, and (2) how to use an interceptor.
The custom metric in this sample is an activity schedule-to-start-latency with a workflow type tag.

Please see the top-level [README](../README.md) for prerequisites such as Python, uv, starting the local temporal development server, etc.

1. Run the worker with `uv run custom_metric/worker.py`
2. Request execution of the workflow with `uv run custom_metric/starter.py`
3. Go to `http://127.0.0.1:9090/metrics` in your browser

You'll get something like the following:

```txt
custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="100"} 1
custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="500"} 1
custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="1000"} 1
custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="5000"} 2
custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="10000"} 2
custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="100000"} 2
custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="1000000"} 2
custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="+Inf"} 2
custom_activity_schedule_to_start_latency_sum{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow"} 1010
custom_activity_schedule_to_start_latency_count{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow"} 2
...
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100"} 1
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="500"} 1
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="1000"} 1
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="5000"} 2
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="10000"} 2
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100000"} 2
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="1000000"} 2
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="+Inf"} 2
temporal_activity_schedule_to_start_latency_sum{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue"} 1010
temporal_activity_schedule_to_start_latency_count{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue"} 2
```
Empty file added custom_metric/__init__.py
Empty file.
9 changes: 9 additions & 0 deletions custom_metric/activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import time

from temporalio import activity


@activity.defn
def print_and_sleep():
print("In the activity.")
time.sleep(1)
23 changes: 23 additions & 0 deletions custom_metric/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import asyncio
import uuid

from temporalio.client import Client

from custom_metric.workflow import StartTwoActivitiesWorkflow


async def main():

client = await Client.connect(
"localhost:7233",
)

await client.start_workflow(
StartTwoActivitiesWorkflow.run,
id="execute-activity-workflow-" + str(uuid.uuid4()),
task_queue="custom-metric-task-queue",
)


if __name__ == "__main__":
asyncio.run(main())
71 changes: 71 additions & 0 deletions custom_metric/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor

from temporalio import activity
from temporalio.client import Client
from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig
from temporalio.worker import (
ActivityInboundInterceptor,
ExecuteActivityInput,
Interceptor,
Worker,
)

from custom_metric.activity import print_and_sleep
from custom_metric.workflow import StartTwoActivitiesWorkflow


class SimpleWorkerInterceptor(Interceptor):
def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
return CustomScheduleToStartInterceptor(next)


class CustomScheduleToStartInterceptor(ActivityInboundInterceptor):
async def execute_activity(self, input: ExecuteActivityInput):

schedule_to_start = (
activity.info().started_time
- activity.info().current_attempt_scheduled_time
)
# Could do the original schedule time instead of current attempt
# schedule_to_start_second_option = activity.info().started_time - activity.info().scheduled_time

meter = activity.metric_meter()
histogram = meter.create_histogram_timedelta(
"custom_activity_schedule_to_start_latency",
description="Time between activity scheduling and start",
unit="duration",
)
histogram.record(
schedule_to_start, {"workflow_type": activity.info().workflow_type}
)
return await self.next.execute_activity(input)


async def main():
runtime = Runtime(
telemetry=TelemetryConfig(metrics=PrometheusConfig(bind_address="0.0.0.0:9090"))
)
client = await Client.connect(
"localhost:7233",
runtime=runtime,
)
worker = Worker(
client,
task_queue="custom-metric-task-queue",
interceptors=[SimpleWorkerInterceptor()],
workflows=[StartTwoActivitiesWorkflow],
activities=[print_and_sleep],
# only one activity executor with two concurrently scheduled activities
# to force a nontrivial schedule to start times
activity_executor=ThreadPoolExecutor(1),
max_concurrent_activities=1,
)

await worker.run()


if __name__ == "__main__":
asyncio.run(main())
25 changes: 25 additions & 0 deletions custom_metric/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import asyncio
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from custom_metric.activity import print_and_sleep


@workflow.defn
class StartTwoActivitiesWorkflow:
@workflow.run
async def run(self):
# Request two concurrent activities with only one task slot so
# we can see nontrivial schedule to start times.
activity1 = workflow.execute_activity(
print_and_sleep,
start_to_close_timeout=timedelta(seconds=5),
)
activity2 = workflow.execute_activity(
print_and_sleep,
start_to_close_timeout=timedelta(seconds=5),
)
await asyncio.gather(activity1, activity2)
return None
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ packages = [
"context_propagation",
"custom_converter",
"custom_decorator",
"custom_metric",
"dsl",
"encryption",
"gevent_async",
Expand Down
Empty file.
32 changes: 32 additions & 0 deletions tests/custom_metric/workflow_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import uuid

from temporalio import activity
from temporalio.client import Client
from temporalio.worker import Worker

from custom_metric.worker import StartTwoActivitiesWorkflow

_TASK_QUEUE = "custom-metric-task-queue"

activity_counter = 0


async def test_custom_metric_workflow(client: Client):
@activity.defn(name="print_and_sleep")
async def print_message_mock():
global activity_counter
activity_counter += 1

async with Worker(
client,
task_queue=_TASK_QUEUE,
workflows=[StartTwoActivitiesWorkflow],
activities=[print_message_mock],
):
result = await client.execute_workflow(
StartTwoActivitiesWorkflow.run,
id=str(uuid.uuid4()),
task_queue=_TASK_QUEUE,
)
assert result is None
assert activity_counter == 2
Loading