Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
* [message_passing/safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals.
* [message_passing/update_with_start/lazy_initialization](message_passing/update_with_start/lazy_initialization/) - Use update-with-start to update a Shopping Cart, starting it if it does not exist.
* [multiline_logging](multiline_logging) - Format multiline exception logs as single-line JSON within Temporal boundaries.
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
Expand Down
48 changes: 48 additions & 0 deletions multiline_logging/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Multiline Exception Logging Interceptor

This sample demonstrates how to handle multiline exception logs in Temporal workflows and activities by formatting them as single-line JSON. This solves the issue where multiline tracebacks span multiple log entries in log aggregation systems like Datadog.

## Problem

When exceptions with multiline tracebacks are raised in Temporal activities or workflows, they can create multiple separate log entries in log aggregation systems, making them difficult to parse and analyze.

## Solution

The `MultilineLoggingInterceptor` captures exceptions within Temporal boundaries and:
1. Formats the exception details as a single-line JSON object
2. Logs the formatted exception
3. Re-raises the original exception to maintain normal error handling

The JSON format includes:
- `message`: The exception message
- `type`: The exception class name
- `traceback`: The full traceback with newlines replaced by " | "

## Usage

```python
from multiline_logging.interceptor import MultilineLoggingInterceptor

worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
interceptors=[MultilineLoggingInterceptor()],
)
```

## Running the Sample

1. Start Temporal server: `temporal server start-dev`
2. Run the worker: `python multiline_logging/worker.py`
3. In another terminal, run the starter: `python multiline_logging/starter.py`

You'll see the multiline exceptions formatted as single-line JSON in the worker logs.

## Key Benefits

- **Surgical**: Only affects logging within Temporal SDK boundaries
- **Non-intrusive**: Doesn't interfere with existing log formatters
- **Preserves behavior**: Original exceptions are still raised normally
- **Structured**: JSON format is easy to parse and analyze
Empty file added multiline_logging/__init__.py
Empty file.
27 changes: 27 additions & 0 deletions multiline_logging/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from temporalio import activity


@activity.defn
async def failing_activity(should_fail: bool) -> str:
if should_fail:
try:
raise ValueError("Inner exception with\nmultiple lines\nof text")
except ValueError as inner:
raise RuntimeError(
"Outer exception that wraps\nanother exception with\nmultiline content"
) from inner
return "Success!"


@activity.defn
async def complex_failing_activity() -> str:
"""Activity that creates a complex multiline traceback"""

def nested_function():
def deeply_nested():
raise Exception("Deep exception with\nvery long\nmultiline\nerror message")

deeply_nested()

nested_function()
return "This won't be reached"
66 changes: 66 additions & 0 deletions multiline_logging/interceptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import json
import logging
import traceback
from typing import Any, Optional, Type

from temporalio import activity, workflow
from temporalio.worker import (
ActivityInboundInterceptor,
ExecuteActivityInput,
ExecuteWorkflowInput,
Interceptor,
WorkflowInboundInterceptor,
WorkflowInterceptorClassInput,
)

logger = logging.getLogger(__name__)


class _MultilineLoggingActivityInboundInterceptor(ActivityInboundInterceptor):
async def execute_activity(self, input: ExecuteActivityInput) -> Any:
try:
return await super().execute_activity(input)
except Exception as e:
exception_data = {
"message": str(e),
"type": type(e).__name__,
"traceback": traceback.format_exc().replace("\n", " | "),
}

logger.error(f"Activity exception: {json.dumps(exception_data)}")

raise e


class _MultilineLoggingWorkflowInterceptor(WorkflowInboundInterceptor):
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
try:
return await super().execute_workflow(input)
except Exception as e:
exception_data = {
"message": str(e),
"type": type(e).__name__,
"traceback": traceback.format_exc().replace("\n", " | "),
}

if not workflow.unsafe.is_replaying():
with workflow.unsafe.sandbox_unrestricted():
logger.error(f"Workflow exception: {json.dumps(exception_data)}")

raise e


class MultilineLoggingInterceptor(Interceptor):
"""Temporal Interceptor that formats multiline exception logs as single-line JSON"""

def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
return _MultilineLoggingActivityInboundInterceptor(
super().intercept_activity(next)
)

def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[Type[WorkflowInboundInterceptor]]:
return _MultilineLoggingWorkflowInterceptor
32 changes: 32 additions & 0 deletions multiline_logging/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import asyncio

from temporalio.client import Client

from multiline_logging.workflows import MultilineLoggingWorkflow


async def main():
client = await Client.connect("localhost:7233")

test_cases = [
"activity_exception",
"complex_activity_exception",
"workflow_exception",
]

for test_case in test_cases:
print(f"\n--- Testing {test_case} ---")
try:
result = await client.execute_workflow(
MultilineLoggingWorkflow.run,
test_case,
id=f"multiline-logging-{test_case}",
task_queue="multiline-logging-task-queue",
)
print(f"Result: {result}")
except Exception as e:
print(f"Expected exception caught: {e}")


if __name__ == "__main__":
asyncio.run(main())
23 changes: 23 additions & 0 deletions multiline_logging/test_imports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env python3
"""
Test that all imports work correctly
"""

try:
from .interceptor import MultilineLoggingInterceptor

print("✅ Interceptor imports successfully")

from .activities import complex_failing_activity, failing_activity

print("✅ Activities import successfully")

from .workflows import MultilineLoggingWorkflow

print("✅ Workflows import successfully")

print("✅ All imports work correctly!")

except ImportError as e:
print(f"❌ Import error: {e}")
exit(1)
42 changes: 42 additions & 0 deletions multiline_logging/test_interceptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env python3
"""
Simple test to validate the multiline logging interceptor logic
"""
import json
import traceback


def test_multiline_formatting():
"""Test that multiline exceptions are properly formatted as single-line JSON"""
try:
try:
raise ValueError("Inner exception with\nmultiple lines\nof text")
except ValueError as inner:
raise RuntimeError(
"Outer exception that wraps\nanother exception with\nmultiline content"
) from inner
except Exception as e:
exception_data = {
"message": str(e),
"type": type(e).__name__,
"traceback": traceback.format_exc().replace("\n", " | "),
}

json_output = json.dumps(exception_data)

print("Original exception message:")
print(repr(str(e)))
print("\nFormatted JSON output:")
print(json_output)
print("\nValidation:")
print(f"- Contains newlines in JSON: {'\\n' in json_output}")
print(f"- JSON is valid: {json.loads(json_output) is not None}")
print(f"- Single line: {json_output.count('\\n') == 0}")

return json_output


if __name__ == "__main__":
print("Testing multiline exception formatting...")
result = test_multiline_formatting()
print("\n✅ Test completed successfully!")
30 changes: 30 additions & 0 deletions multiline_logging/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import asyncio
import logging

from temporalio.client import Client
from temporalio.worker import Worker

from multiline_logging.activities import complex_failing_activity, failing_activity
from multiline_logging.interceptor import MultilineLoggingInterceptor
from multiline_logging.workflows import MultilineLoggingWorkflow

logging.basicConfig(level=logging.INFO)


async def main():
client = await Client.connect("localhost:7233")

worker = Worker(
client,
task_queue="multiline-logging-task-queue",
workflows=[MultilineLoggingWorkflow],
activities=[failing_activity, complex_failing_activity],
interceptors=[MultilineLoggingInterceptor()],
)

print("Worker started. Ctrl+C to exit.")
await worker.run()


if __name__ == "__main__":
asyncio.run(main())
24 changes: 24 additions & 0 deletions multiline_logging/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from .activities import complex_failing_activity, failing_activity


@workflow.defn
class MultilineLoggingWorkflow:
@workflow.run
async def run(self, test_type: str) -> str:
if test_type == "activity_exception":
return await workflow.execute_activity(
failing_activity, True, schedule_to_close_timeout=timedelta(seconds=5)
)
elif test_type == "complex_activity_exception":
return await workflow.execute_activity(
complex_failing_activity, schedule_to_close_timeout=timedelta(seconds=5)
)
elif test_type == "workflow_exception":
raise RuntimeError("Workflow exception with\nmultiple lines\nof error text")
else:
return "No exception test"
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ packages = [
"hello",
"langchain",
"message_passing",
"multiline_logging",
"nexus",
"open_telemetry",
"patching",
Expand Down Expand Up @@ -143,4 +144,4 @@ ignore_errors = true

[[tool.mypy.overrides]]
module = "opentelemetry.*"
ignore_errors = true
ignore_errors = true
Loading