diff --git a/README.md b/README.md index 5c1941d1..8bd9986a 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [hello_async_activity_completion](hello/hello_async_activity_completion.py) - Complete an activity outside of the function that was called. * [hello_cancellation](hello/hello_cancellation.py) - Manually react to cancellation inside workflows and activities. + * [hello_change_log_level](hello/hello_change_log_level.py) - Change the level of workflow task failure from WARN to ERROR. * [hello_child_workflow](hello/hello_child_workflow.py) - Execute a child workflow from a workflow. * [hello_continue_as_new](hello/hello_continue_as_new.py) - Use continue as new to restart a workflow. * [hello_cron](hello/hello_cron.py) - Execute a workflow once a minute. diff --git a/hello/README.md b/hello/README.md index 45d24768..fba44461 100644 --- a/hello/README.md +++ b/hello/README.md @@ -28,6 +28,7 @@ Replace `hello_activity.py` in the command with any other example filename to ru * [hello_async_activity_completion](hello_async_activity_completion.py) - Complete an activity outside of the function that was called. * [hello_cancellation](hello_cancellation.py) - Manually react to cancellation inside workflows and activities. +* [hello_change_log_level](hello_change_log_level.py) - Change the level of workflow task failure from WARN to ERROR. * [hello_child_workflow](hello_child_workflow.py) - Execute a child workflow from a workflow. * [hello_continue_as_new](hello_continue_as_new.py) - Use continue as new to restart a workflow. * [hello_cron](hello_cron.py) - Execute a workflow once a minute. diff --git a/hello/hello_change_log_level.py b/hello/hello_change_log_level.py new file mode 100644 index 00000000..89bb2e1d --- /dev/null +++ b/hello/hello_change_log_level.py @@ -0,0 +1,67 @@ +""" +Changes the log level of workflow task failures from WARN to ERROR. + +Note that the __temporal_error_identifier attribute was added in +version 1.13.0 of the Python SDK. +""" + +import asyncio +import logging +import sys + +from temporalio import workflow +from temporalio.client import Client +from temporalio.worker import Worker + +# --- Begin logging set‑up ---------------------------------------------------------- +logging.basicConfig( + stream=sys.stdout, + level=logging.INFO, + format="%(asctime)s %(levelname)-8s %(name)s %(message)s", +) + + +class CustomLogFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + # Note that the __temporal_error_identifier attribute was added in + # version 1.13.0 of the Python SDK. + if ( + hasattr(record, "__temporal_error_identifier") + and getattr(record, "__temporal_error_identifier") == "WorkflowTaskFailure" + ): + record.levelno = logging.ERROR + record.levelname = logging.getLevelName(logging.ERROR) + return True + + +for h in logging.getLogger().handlers: + h.addFilter(CustomLogFilter()) +# --- End logging set‑up ---------------------------------------------------------- + + +LOG_MESSAGE = "This error is an experiment to check the log level" + + +@workflow.defn +class GreetingWorkflow: + @workflow.run + async def run(self): + raise RuntimeError(LOG_MESSAGE) + + +async def main(): + client = await Client.connect("localhost:7233") + async with Worker( + client, + task_queue="hello-change-log-level-task-queue", + workflows=[GreetingWorkflow], + ): + await client.execute_workflow( + GreetingWorkflow.run, + id="hello-change-log-level-workflow-id", + task_queue="hello-change-log-level-task-queue", + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/hello/hello_change_log_level_test.py b/tests/hello/hello_change_log_level_test.py new file mode 100644 index 00000000..e3cd2334 --- /dev/null +++ b/tests/hello/hello_change_log_level_test.py @@ -0,0 +1,43 @@ +import asyncio +import io +import logging +import uuid + +from temporalio.client import Client +from temporalio.worker import Worker + +from hello.hello_change_log_level import LOG_MESSAGE, GreetingWorkflow + + +async def test_workflow_with_log_capture(client: Client): + + log_stream = io.StringIO() + handler = logging.StreamHandler(log_stream) + handler.setLevel(logging.ERROR) + + logger = logging.getLogger() + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + task_queue = f"tq-{uuid.uuid4()}" + + async with Worker( + client, + task_queue=task_queue, + workflows=[GreetingWorkflow], + ): + handle = await client.start_workflow( + GreetingWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + await asyncio.sleep( + 0.2 + ) # small wait to ensure the workflow has started, failed, and logged + await handle.terminate() + + logger.removeHandler(handler) + handler.flush() + + logs = log_stream.getvalue() + assert LOG_MESSAGE in logs