From 598c1af2f2d8c3b092d024f717c5baa4008d1bdd Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Fri, 23 May 2025 12:44:34 -0500 Subject: [PATCH 01/10] added sample showing how to change log levels --- README.md | 1 + hello/README.md | 1 + hello/hello_change_log_level.py | 58 +++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+) create mode 100644 hello/hello_change_log_level.py diff --git a/README.md b/README.md index 5c1941d1..8bd9986a 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [hello_async_activity_completion](hello/hello_async_activity_completion.py) - Complete an activity outside of the function that was called. * [hello_cancellation](hello/hello_cancellation.py) - Manually react to cancellation inside workflows and activities. + * [hello_change_log_level](hello/hello_change_log_level.py) - Change the level of workflow task failure from WARN to ERROR. * [hello_child_workflow](hello/hello_child_workflow.py) - Execute a child workflow from a workflow. * [hello_continue_as_new](hello/hello_continue_as_new.py) - Use continue as new to restart a workflow. * [hello_cron](hello/hello_cron.py) - Execute a workflow once a minute. diff --git a/hello/README.md b/hello/README.md index 45d24768..fba44461 100644 --- a/hello/README.md +++ b/hello/README.md @@ -28,6 +28,7 @@ Replace `hello_activity.py` in the command with any other example filename to ru * [hello_async_activity_completion](hello_async_activity_completion.py) - Complete an activity outside of the function that was called. * [hello_cancellation](hello_cancellation.py) - Manually react to cancellation inside workflows and activities. +* [hello_change_log_level](hello_change_log_level.py) - Change the level of workflow task failure from WARN to ERROR. * [hello_child_workflow](hello_child_workflow.py) - Execute a child workflow from a workflow. * [hello_continue_as_new](hello_continue_as_new.py) - Use continue as new to restart a workflow. * [hello_cron](hello_cron.py) - Execute a workflow once a minute. diff --git a/hello/hello_change_log_level.py b/hello/hello_change_log_level.py new file mode 100644 index 00000000..281ba63a --- /dev/null +++ b/hello/hello_change_log_level.py @@ -0,0 +1,58 @@ +""" +Changes the log level of workflow task failures from WARN to ERROR. +""" + +import asyncio, logging, sys +from temporalio import workflow +from temporalio.client import Client +from temporalio.worker import Worker + +# --- Begin logging set‑up ---------------------------------------------------------- +logging.basicConfig( + stream=sys.stdout, + level=logging.INFO, + format="%(asctime)s %(levelname)-8s %(name)s %(message)s", +) + + +class CustomLogFilter(logging.Filter): + def filter(self, record): + msg = record.getMessage() + if ( + msg.startswith("Failed activation on workflow") + and " with ID " in msg + and " and run ID " in msg + and record.levelno < logging.ERROR + ): + record.levelno = logging.ERROR + record.levelname = logging.getLevelName(logging.ERROR) + return True + + +logging.getLogger("temporalio.worker._workflow_instance").addFilter(CustomLogFilter()) +# --- End logging set‑up ---------------------------------------------------------- + + +@workflow.defn +class GreetingWorkflow: + @workflow.run + async def run(self): + raise RuntimeError("This is a test error") + + +async def main(): + client = await Client.connect("localhost:7233") + async with Worker( + client, + task_queue="hello-task-queue", + workflows=[GreetingWorkflow], + ): + await client.execute_workflow( + GreetingWorkflow.run, + id="hello-workflow-id", + task_queue="hello-task-queue", + ) + + +if __name__ == "__main__": + asyncio.run(main()) From 75af4c06209a8be1df28be5a3349147b252a1c93 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Fri, 23 May 2025 12:49:40 -0500 Subject: [PATCH 02/10] fixed isort --- hello/hello_change_log_level.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hello/hello_change_log_level.py b/hello/hello_change_log_level.py index 281ba63a..a8fc3657 100644 --- a/hello/hello_change_log_level.py +++ b/hello/hello_change_log_level.py @@ -2,7 +2,10 @@ Changes the log level of workflow task failures from WARN to ERROR. """ -import asyncio, logging, sys +import asyncio +import logging +import sys + from temporalio import workflow from temporalio.client import Client from temporalio.worker import Worker From 48f104dfd74ad04bd08ac5f45576848128b24734 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Tue, 27 May 2025 10:08:10 -0500 Subject: [PATCH 03/10] Responded to one comment --- hello/hello_change_log_level.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/hello/hello_change_log_level.py b/hello/hello_change_log_level.py index a8fc3657..75dbd484 100644 --- a/hello/hello_change_log_level.py +++ b/hello/hello_change_log_level.py @@ -23,8 +23,6 @@ def filter(self, record): msg = record.getMessage() if ( msg.startswith("Failed activation on workflow") - and " with ID " in msg - and " and run ID " in msg and record.levelno < logging.ERROR ): record.levelno = logging.ERROR From 9b93793109045ccf27ce8ff9c96c23776c485d85 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Tue, 27 May 2025 10:13:48 -0500 Subject: [PATCH 04/10] Responded to second comment --- hello/hello_change_log_level.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/hello/hello_change_log_level.py b/hello/hello_change_log_level.py index 75dbd484..10455400 100644 --- a/hello/hello_change_log_level.py +++ b/hello/hello_change_log_level.py @@ -19,10 +19,9 @@ class CustomLogFilter(logging.Filter): - def filter(self, record): - msg = record.getMessage() + def filter(self, record: logging.LogRecord) -> bool: if ( - msg.startswith("Failed activation on workflow") + record.msg.startswith("Failed activation on workflow") and record.levelno < logging.ERROR ): record.levelno = logging.ERROR From 84679663db50cfce909d754594aa01ac1a2d65d5 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Mon, 16 Jun 2025 12:17:01 -0500 Subject: [PATCH 05/10] first-try --- hello/hello_change_log_level.py | 2 +- tests/hello/hello_change_log_level_test.py | 30 ++++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) create mode 100644 tests/hello/hello_change_log_level_test.py diff --git a/hello/hello_change_log_level.py b/hello/hello_change_log_level.py index 10455400..7c54cfba 100644 --- a/hello/hello_change_log_level.py +++ b/hello/hello_change_log_level.py @@ -37,7 +37,7 @@ def filter(self, record: logging.LogRecord) -> bool: class GreetingWorkflow: @workflow.run async def run(self): - raise RuntimeError("This is a test error") + raise RuntimeError("This error is an experiment to check the log level") async def main(): diff --git a/tests/hello/hello_change_log_level_test.py b/tests/hello/hello_change_log_level_test.py new file mode 100644 index 00000000..98fe0e2a --- /dev/null +++ b/tests/hello/hello_change_log_level_test.py @@ -0,0 +1,30 @@ +import uuid +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from hello.hello_change_log_level import GreetingWorkflow + + +async def test_workflow_with_changed_log_level(client: Client, caplog): + + task_queue = f"tq-{uuid.uuid4()}" + + async with Worker( + client, + task_queue=task_queue, + workflows=[GreetingWorkflow], + ): + with caplog.at_level(logging.ERROR): + handle = await client.start_workflow( + GreetingWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + await asyncio.sleep(.1) + handle.terminate() + + assert any("log level" in m for m in caplog.messages) + assert True From 37f797883bd58e008fcd40077571c54d434de37c Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Mon, 16 Jun 2025 12:38:39 -0500 Subject: [PATCH 06/10] added logging test --- hello/hello_change_log_level.py | 5 ++- tests/hello/hello_change_log_level_test.py | 41 ++++++++++++++-------- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/hello/hello_change_log_level.py b/hello/hello_change_log_level.py index 7c54cfba..12a524e3 100644 --- a/hello/hello_change_log_level.py +++ b/hello/hello_change_log_level.py @@ -33,11 +33,14 @@ def filter(self, record: logging.LogRecord) -> bool: # --- End logging set‑up ---------------------------------------------------------- +LOG_MESSAGE = "This error is an experiment to check the log level" + + @workflow.defn class GreetingWorkflow: @workflow.run async def run(self): - raise RuntimeError("This error is an experiment to check the log level") + raise RuntimeError(LOG_MESSAGE) async def main(): diff --git a/tests/hello/hello_change_log_level_test.py b/tests/hello/hello_change_log_level_test.py index 98fe0e2a..6674d25b 100644 --- a/tests/hello/hello_change_log_level_test.py +++ b/tests/hello/hello_change_log_level_test.py @@ -1,14 +1,23 @@ -import uuid import asyncio +import io import logging +import uuid from temporalio.client import Client from temporalio.worker import Worker -from hello.hello_change_log_level import GreetingWorkflow +from hello.hello_change_log_level import LOG_MESSAGE, GreetingWorkflow + +async def test_workflow_with_log_capture(client: Client): -async def test_workflow_with_changed_log_level(client: Client, caplog): + log_stream = io.StringIO() + handler = logging.StreamHandler(log_stream) + handler.setLevel(logging.DEBUG) + + logger = logging.getLogger() + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) task_queue = f"tq-{uuid.uuid4()}" @@ -17,14 +26,18 @@ async def test_workflow_with_changed_log_level(client: Client, caplog): task_queue=task_queue, workflows=[GreetingWorkflow], ): - with caplog.at_level(logging.ERROR): - handle = await client.start_workflow( - GreetingWorkflow.run, - id=f"wf-{uuid.uuid4()}", - task_queue=task_queue, - ) - await asyncio.sleep(.1) - handle.terminate() - - assert any("log level" in m for m in caplog.messages) - assert True + handle = await client.start_workflow( + GreetingWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + await asyncio.sleep( + 0.2 + ) # arbitrary wait to ensure the workflow has started and logged + await handle.terminate() + + logger.removeHandler(handler) + handler.flush() + + logs = log_stream.getvalue() + assert LOG_MESSAGE in logs From 65c60c2b8b02b84dce56d892451a4af139145f37 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Mon, 16 Jun 2025 12:46:07 -0500 Subject: [PATCH 07/10] test is passing --- tests/hello/hello_change_log_level_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/hello/hello_change_log_level_test.py b/tests/hello/hello_change_log_level_test.py index 6674d25b..70739f7b 100644 --- a/tests/hello/hello_change_log_level_test.py +++ b/tests/hello/hello_change_log_level_test.py @@ -13,7 +13,7 @@ async def test_workflow_with_log_capture(client: Client): log_stream = io.StringIO() handler = logging.StreamHandler(log_stream) - handler.setLevel(logging.DEBUG) + handler.setLevel(logging.ERROR) logger = logging.getLogger() logger.addHandler(handler) From bfc22ca2a36b1c3e5b65a76ffb5ae886d8b1eea8 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Tue, 17 Jun 2025 09:07:33 -0500 Subject: [PATCH 08/10] responded to PR comments --- hello/hello_change_log_level.py | 7 +++++-- tests/hello/hello_change_log_level_test.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/hello/hello_change_log_level.py b/hello/hello_change_log_level.py index 12a524e3..5b3af771 100644 --- a/hello/hello_change_log_level.py +++ b/hello/hello_change_log_level.py @@ -11,6 +11,8 @@ from temporalio.worker import Worker # --- Begin logging set‑up ---------------------------------------------------------- +_WORKFLOW_TASK_FAILURE_LOG_PREFIX = "Failed activation on workflow" + logging.basicConfig( stream=sys.stdout, level=logging.INFO, @@ -21,7 +23,7 @@ class CustomLogFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: if ( - record.msg.startswith("Failed activation on workflow") + record.msg.startswith(_WORKFLOW_TASK_FAILURE_LOG_PREFIX) and record.levelno < logging.ERROR ): record.levelno = logging.ERROR @@ -29,7 +31,8 @@ def filter(self, record: logging.LogRecord) -> bool: return True -logging.getLogger("temporalio.worker._workflow_instance").addFilter(CustomLogFilter()) +for h in logging.getLogger().handlers: + h.addFilter(CustomLogFilter()) # --- End logging set‑up ---------------------------------------------------------- diff --git a/tests/hello/hello_change_log_level_test.py b/tests/hello/hello_change_log_level_test.py index 70739f7b..e3cd2334 100644 --- a/tests/hello/hello_change_log_level_test.py +++ b/tests/hello/hello_change_log_level_test.py @@ -33,7 +33,7 @@ async def test_workflow_with_log_capture(client: Client): ) await asyncio.sleep( 0.2 - ) # arbitrary wait to ensure the workflow has started and logged + ) # small wait to ensure the workflow has started, failed, and logged await handle.terminate() logger.removeHandler(handler) From 6c20201c2a6b178eda0bc98784f19784417c7699 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Fri, 20 Jun 2025 15:47:51 -0500 Subject: [PATCH 09/10] responded to comment --- hello/hello_change_log_level.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hello/hello_change_log_level.py b/hello/hello_change_log_level.py index 5b3af771..586c8f4a 100644 --- a/hello/hello_change_log_level.py +++ b/hello/hello_change_log_level.py @@ -50,13 +50,13 @@ async def main(): client = await Client.connect("localhost:7233") async with Worker( client, - task_queue="hello-task-queue", + task_queue="hello-change-log-level-task-queue", workflows=[GreetingWorkflow], ): await client.execute_workflow( GreetingWorkflow.run, - id="hello-workflow-id", - task_queue="hello-task-queue", + id="hello-change-log-level-workflow-id", + task_queue="hello-change-log-level-task-queue", ) From b1bc492cacf073f51b2bb58dc59d0bb61afc046d Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Wed, 25 Jun 2025 09:52:01 -0500 Subject: [PATCH 10/10] responded to PR comments --- hello/hello_change_log_level.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/hello/hello_change_log_level.py b/hello/hello_change_log_level.py index 586c8f4a..89bb2e1d 100644 --- a/hello/hello_change_log_level.py +++ b/hello/hello_change_log_level.py @@ -1,5 +1,8 @@ """ Changes the log level of workflow task failures from WARN to ERROR. + +Note that the __temporal_error_identifier attribute was added in +version 1.13.0 of the Python SDK. """ import asyncio @@ -11,8 +14,6 @@ from temporalio.worker import Worker # --- Begin logging set‑up ---------------------------------------------------------- -_WORKFLOW_TASK_FAILURE_LOG_PREFIX = "Failed activation on workflow" - logging.basicConfig( stream=sys.stdout, level=logging.INFO, @@ -22,9 +23,11 @@ class CustomLogFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: + # Note that the __temporal_error_identifier attribute was added in + # version 1.13.0 of the Python SDK. if ( - record.msg.startswith(_WORKFLOW_TASK_FAILURE_LOG_PREFIX) - and record.levelno < logging.ERROR + hasattr(record, "__temporal_error_identifier") + and getattr(record, "__temporal_error_identifier") == "WorkflowTaskFailure" ): record.levelno = logging.ERROR record.levelname = logging.getLevelName(logging.ERROR)