diff --git a/README.md b/README.md index 43d2cba6..35947369 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates. * [message_passing/safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals. * [message_passing/update_with_start/lazy_initialization](message_passing/update_with_start/lazy_initialization/) - Use update-with-start to update a Shopping Cart, starting it if it does not exist. +* [multiline_logging](multiline_logging) - Format multiline exception logs as single-line JSON within Temporal boundaries. * [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry. * [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`. * [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. diff --git a/multiline_logging/README.md b/multiline_logging/README.md new file mode 100644 index 00000000..9e55a70a --- /dev/null +++ b/multiline_logging/README.md @@ -0,0 +1,48 @@ +# Multiline Exception Logging Interceptor + +This sample demonstrates how to handle multiline exception logs in Temporal workflows and activities by formatting them as single-line JSON. This solves the issue where multiline tracebacks span multiple log entries in log aggregation systems like Datadog. + +## Problem + +When exceptions with multiline tracebacks are raised in Temporal activities or workflows, they can create multiple separate log entries in log aggregation systems, making them difficult to parse and analyze. + +## Solution + +The `MultilineLoggingInterceptor` captures exceptions within Temporal boundaries and: +1. Formats the exception details as a single-line JSON object +2. Logs the formatted exception +3. Re-raises the original exception to maintain normal error handling + +The JSON format includes: +- `message`: The exception message +- `type`: The exception class name +- `traceback`: The full traceback with newlines replaced by " | " + +## Usage + +```python +from multiline_logging.interceptor import MultilineLoggingInterceptor + +worker = Worker( + client, + task_queue="my-task-queue", + workflows=[MyWorkflow], + activities=[my_activity], + interceptors=[MultilineLoggingInterceptor()], +) +``` + +## Running the Sample + +1. Start Temporal server: `temporal server start-dev` +2. Run the worker: `python multiline_logging/worker.py` +3. In another terminal, run the starter: `python multiline_logging/starter.py` + +You'll see the multiline exceptions formatted as single-line JSON in the worker logs. + +## Key Benefits + +- **Surgical**: Only affects logging within Temporal SDK boundaries +- **Non-intrusive**: Doesn't interfere with existing log formatters +- **Preserves behavior**: Original exceptions are still raised normally +- **Structured**: JSON format is easy to parse and analyze diff --git a/multiline_logging/__init__.py b/multiline_logging/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/multiline_logging/activities.py b/multiline_logging/activities.py new file mode 100644 index 00000000..3db2f941 --- /dev/null +++ b/multiline_logging/activities.py @@ -0,0 +1,27 @@ +from temporalio import activity + + +@activity.defn +async def failing_activity(should_fail: bool) -> str: + if should_fail: + try: + raise ValueError("Inner exception with\nmultiple lines\nof text") + except ValueError as inner: + raise RuntimeError( + "Outer exception that wraps\nanother exception with\nmultiline content" + ) from inner + return "Success!" + + +@activity.defn +async def complex_failing_activity() -> str: + """Activity that creates a complex multiline traceback""" + + def nested_function(): + def deeply_nested(): + raise Exception("Deep exception with\nvery long\nmultiline\nerror message") + + deeply_nested() + + nested_function() + return "This won't be reached" diff --git a/multiline_logging/interceptor.py b/multiline_logging/interceptor.py new file mode 100644 index 00000000..3e981ead --- /dev/null +++ b/multiline_logging/interceptor.py @@ -0,0 +1,66 @@ +import json +import logging +import traceback +from typing import Any, Optional, Type + +from temporalio import activity, workflow +from temporalio.worker import ( + ActivityInboundInterceptor, + ExecuteActivityInput, + ExecuteWorkflowInput, + Interceptor, + WorkflowInboundInterceptor, + WorkflowInterceptorClassInput, +) + +logger = logging.getLogger(__name__) + + +class _MultilineLoggingActivityInboundInterceptor(ActivityInboundInterceptor): + async def execute_activity(self, input: ExecuteActivityInput) -> Any: + try: + return await super().execute_activity(input) + except Exception as e: + exception_data = { + "message": str(e), + "type": type(e).__name__, + "traceback": traceback.format_exc().replace("\n", " | "), + } + + logger.error(f"Activity exception: {json.dumps(exception_data)}") + + raise e + + +class _MultilineLoggingWorkflowInterceptor(WorkflowInboundInterceptor): + async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any: + try: + return await super().execute_workflow(input) + except Exception as e: + exception_data = { + "message": str(e), + "type": type(e).__name__, + "traceback": traceback.format_exc().replace("\n", " | "), + } + + if not workflow.unsafe.is_replaying(): + with workflow.unsafe.sandbox_unrestricted(): + logger.error(f"Workflow exception: {json.dumps(exception_data)}") + + raise e + + +class MultilineLoggingInterceptor(Interceptor): + """Temporal Interceptor that formats multiline exception logs as single-line JSON""" + + def intercept_activity( + self, next: ActivityInboundInterceptor + ) -> ActivityInboundInterceptor: + return _MultilineLoggingActivityInboundInterceptor( + super().intercept_activity(next) + ) + + def workflow_interceptor_class( + self, input: WorkflowInterceptorClassInput + ) -> Optional[Type[WorkflowInboundInterceptor]]: + return _MultilineLoggingWorkflowInterceptor diff --git a/multiline_logging/starter.py b/multiline_logging/starter.py new file mode 100644 index 00000000..f4e4f4e8 --- /dev/null +++ b/multiline_logging/starter.py @@ -0,0 +1,32 @@ +import asyncio + +from temporalio.client import Client + +from multiline_logging.workflows import MultilineLoggingWorkflow + + +async def main(): + client = await Client.connect("localhost:7233") + + test_cases = [ + "activity_exception", + "complex_activity_exception", + "workflow_exception", + ] + + for test_case in test_cases: + print(f"\n--- Testing {test_case} ---") + try: + result = await client.execute_workflow( + MultilineLoggingWorkflow.run, + test_case, + id=f"multiline-logging-{test_case}", + task_queue="multiline-logging-task-queue", + ) + print(f"Result: {result}") + except Exception as e: + print(f"Expected exception caught: {e}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/multiline_logging/test_imports.py b/multiline_logging/test_imports.py new file mode 100644 index 00000000..1cec2f31 --- /dev/null +++ b/multiline_logging/test_imports.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +""" +Test that all imports work correctly +""" + +try: + from .interceptor import MultilineLoggingInterceptor + + print("✅ Interceptor imports successfully") + + from .activities import complex_failing_activity, failing_activity + + print("✅ Activities import successfully") + + from .workflows import MultilineLoggingWorkflow + + print("✅ Workflows import successfully") + + print("✅ All imports work correctly!") + +except ImportError as e: + print(f"❌ Import error: {e}") + exit(1) diff --git a/multiline_logging/test_interceptor.py b/multiline_logging/test_interceptor.py new file mode 100644 index 00000000..7f107e2d --- /dev/null +++ b/multiline_logging/test_interceptor.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +""" +Simple test to validate the multiline logging interceptor logic +""" +import json +import traceback + + +def test_multiline_formatting(): + """Test that multiline exceptions are properly formatted as single-line JSON""" + try: + try: + raise ValueError("Inner exception with\nmultiple lines\nof text") + except ValueError as inner: + raise RuntimeError( + "Outer exception that wraps\nanother exception with\nmultiline content" + ) from inner + except Exception as e: + exception_data = { + "message": str(e), + "type": type(e).__name__, + "traceback": traceback.format_exc().replace("\n", " | "), + } + + json_output = json.dumps(exception_data) + + print("Original exception message:") + print(repr(str(e))) + print("\nFormatted JSON output:") + print(json_output) + print("\nValidation:") + print(f"- Contains newlines in JSON: {'\\n' in json_output}") + print(f"- JSON is valid: {json.loads(json_output) is not None}") + print(f"- Single line: {json_output.count('\\n') == 0}") + + return json_output + + +if __name__ == "__main__": + print("Testing multiline exception formatting...") + result = test_multiline_formatting() + print("\n✅ Test completed successfully!") diff --git a/multiline_logging/worker.py b/multiline_logging/worker.py new file mode 100644 index 00000000..d698abd6 --- /dev/null +++ b/multiline_logging/worker.py @@ -0,0 +1,30 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from multiline_logging.activities import complex_failing_activity, failing_activity +from multiline_logging.interceptor import MultilineLoggingInterceptor +from multiline_logging.workflows import MultilineLoggingWorkflow + +logging.basicConfig(level=logging.INFO) + + +async def main(): + client = await Client.connect("localhost:7233") + + worker = Worker( + client, + task_queue="multiline-logging-task-queue", + workflows=[MultilineLoggingWorkflow], + activities=[failing_activity, complex_failing_activity], + interceptors=[MultilineLoggingInterceptor()], + ) + + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/multiline_logging/workflows.py b/multiline_logging/workflows.py new file mode 100644 index 00000000..86c7fb48 --- /dev/null +++ b/multiline_logging/workflows.py @@ -0,0 +1,24 @@ +from datetime import timedelta + +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from .activities import complex_failing_activity, failing_activity + + +@workflow.defn +class MultilineLoggingWorkflow: + @workflow.run + async def run(self, test_type: str) -> str: + if test_type == "activity_exception": + return await workflow.execute_activity( + failing_activity, True, schedule_to_close_timeout=timedelta(seconds=5) + ) + elif test_type == "complex_activity_exception": + return await workflow.execute_activity( + complex_failing_activity, schedule_to_close_timeout=timedelta(seconds=5) + ) + elif test_type == "workflow_exception": + raise RuntimeError("Workflow exception with\nmultiple lines\nof error text") + else: + return "No exception test" diff --git a/pyproject.toml b/pyproject.toml index fa5a0300..7432cdb6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -91,6 +91,7 @@ packages = [ "hello", "langchain", "message_passing", + "multiline_logging", "nexus", "open_telemetry", "patching", @@ -143,4 +144,4 @@ ignore_errors = true [[tool.mypy.overrides]] module = "opentelemetry.*" -ignore_errors = true \ No newline at end of file +ignore_errors = true