Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ temporalio/bridge/temporal_sdk_bridge*
/.zed
*.DS_Store
tags
/.claude
tmpclaude-*
19 changes: 13 additions & 6 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
self._is_replaying: bool = False
self._random = random.Random(det.randomness_seed)
self._read_only = False
self._in_query_or_validator = False

# Patches we have been notified of and memoized patch responses
self._patches_notified: set[str] = set()
Expand Down Expand Up @@ -618,7 +619,7 @@ async def run_update() -> None:
)

if job.run_validator and defn.validator is not None:
with self._as_read_only():
with self._as_read_only(in_query_or_validator=True):
self._inbound.handle_update_validator(handler_input)
# Re-process arguments to avoid any problems caused by user mutation of them during validation
args = self._process_handler_args(
Expand Down Expand Up @@ -710,7 +711,7 @@ def _apply_query_workflow(
# Wrap entire bunch of work in a task
async def run_query() -> None:
try:
with self._as_read_only():
with self._as_read_only(in_query_or_validator=True):
# Named query or dynamic
defn = self._queries.get(job.query_type) or self._queries.get(None)
if not defn:
Expand Down Expand Up @@ -1218,6 +1219,9 @@ def workflow_is_continue_as_new_suggested(self) -> bool:
def workflow_is_replaying(self) -> bool:
return self._is_replaying

def workflow_is_replaying_history_events(self) -> bool:
return self._is_replaying and not self._in_query_or_validator

def workflow_memo(self) -> Mapping[str, Any]:
if self._untyped_converted_memo is None:
self._untyped_converted_memo = {
Expand Down Expand Up @@ -2008,13 +2012,16 @@ def _add_command(self) -> temporalio.bridge.proto.workflow_commands.WorkflowComm
return self._current_completion.successful.commands.add()

@contextmanager
def _as_read_only(self) -> Iterator[None]:
prev_val = self._read_only
def _as_read_only(self, *, in_query_or_validator: bool) -> Iterator[None]:
prev_read_only = self._read_only
prev_in_query_or_validator = self._in_query_or_validator
self._read_only = True
self._in_query_or_validator = in_query_or_validator
try:
yield None
finally:
self._read_only = prev_val
self._read_only = prev_read_only
self._in_query_or_validator = prev_in_query_or_validator

def _assert_not_read_only(
self, action_attempted: str, *, allow_during_delete: bool = False
Expand Down Expand Up @@ -2191,7 +2198,7 @@ def _instantiate_workflow_object(self) -> Any:
if self._defn.name is None and self._defn.dynamic_config_fn is not None:
dynamic_config = None
try:
with self._as_read_only():
with self._as_read_only(in_query_or_validator=False):
dynamic_config = self._defn.dynamic_config_fn(workflow_instance)
except Exception as err:
logger.exception(
Expand Down
18 changes: 17 additions & 1 deletion temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,9 @@ def workflow_is_continue_as_new_suggested(self) -> bool: ...
@abstractmethod
def workflow_is_replaying(self) -> bool: ...

@abstractmethod
def workflow_is_replaying_history_events(self) -> bool: ...

@abstractmethod
def workflow_memo(self) -> Mapping[str, Any]: ...

Expand Down Expand Up @@ -1444,11 +1447,24 @@ def _set_in_sandbox(v: bool) -> None:
def is_replaying() -> bool:
"""Whether the workflow is currently replaying.

This includes queries and update validators that occur during replay.

Returns:
True if the workflow is currently replaying
"""
return _Runtime.current().workflow_is_replaying()

@staticmethod
def is_replaying_history_events() -> bool:
"""Whether the workflow is replaying history events.

This excludes queries and update validators, which are live operations.

Returns:
True if replaying history events, False otherwise.
"""
return _Runtime.current().workflow_is_replaying_history_events()

@staticmethod
def is_sandbox_unrestricted() -> bool:
"""Whether the current block of code is not restricted via sandbox.
Expand Down Expand Up @@ -1602,7 +1618,7 @@ def process(

def isEnabledFor(self, level: int) -> bool:
"""Override to ignore replay logs."""
if not self.log_during_replay and unsafe.is_replaying():
if not self.log_during_replay and unsafe.is_replaying_history_events():
return False
return super().isEnabledFor(level)

Expand Down
2 changes: 2 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1986,6 +1986,7 @@ def my_update(self, value: str) -> None:

@workflow.query
def last_signal(self) -> str:
workflow.logger.info("Query called")
return self._last_signal


Expand Down Expand Up @@ -2021,6 +2022,7 @@ async def test_workflow_logging(client: Client):
assert capturer.find_log("Signal: signal 2")
assert capturer.find_log("Update: update 1")
assert capturer.find_log("Update: update 2")
assert capturer.find_log("Query called")
assert not capturer.find_log("Signal: signal 3")
# Also make sure it has some workflow info and correct funcName
record = capturer.find_log("Signal: signal 1")
Expand Down