diff --git a/.gitignore b/.gitignore index c3447e5d1..c35cd4447 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,5 @@ temporalio/bridge/temporal_sdk_bridge* /.zed *.DS_Store tags +/.claude +tmpclaude-* diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 10fd594fd..80689c1eb 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -273,6 +273,7 @@ def __init__(self, det: WorkflowInstanceDetails) -> None: self._is_replaying: bool = False self._random = random.Random(det.randomness_seed) self._read_only = False + self._in_query_or_validator = False # Patches we have been notified of and memoized patch responses self._patches_notified: set[str] = set() @@ -618,7 +619,7 @@ async def run_update() -> None: ) if job.run_validator and defn.validator is not None: - with self._as_read_only(): + with self._as_read_only(in_query_or_validator=True): self._inbound.handle_update_validator(handler_input) # Re-process arguments to avoid any problems caused by user mutation of them during validation args = self._process_handler_args( @@ -710,7 +711,7 @@ def _apply_query_workflow( # Wrap entire bunch of work in a task async def run_query() -> None: try: - with self._as_read_only(): + with self._as_read_only(in_query_or_validator=True): # Named query or dynamic defn = self._queries.get(job.query_type) or self._queries.get(None) if not defn: @@ -1218,6 +1219,9 @@ def workflow_is_continue_as_new_suggested(self) -> bool: def workflow_is_replaying(self) -> bool: return self._is_replaying + def workflow_is_replaying_history_events(self) -> bool: + return self._is_replaying and not self._in_query_or_validator + def workflow_memo(self) -> Mapping[str, Any]: if self._untyped_converted_memo is None: self._untyped_converted_memo = { @@ -2008,13 +2012,16 @@ def _add_command(self) -> temporalio.bridge.proto.workflow_commands.WorkflowComm return self._current_completion.successful.commands.add() @contextmanager - def _as_read_only(self) -> Iterator[None]: - prev_val = self._read_only + def _as_read_only(self, *, in_query_or_validator: bool) -> Iterator[None]: + prev_read_only = self._read_only + prev_in_query_or_validator = self._in_query_or_validator self._read_only = True + self._in_query_or_validator = in_query_or_validator try: yield None finally: - self._read_only = prev_val + self._read_only = prev_read_only + self._in_query_or_validator = prev_in_query_or_validator def _assert_not_read_only( self, action_attempted: str, *, allow_during_delete: bool = False @@ -2191,7 +2198,7 @@ def _instantiate_workflow_object(self) -> Any: if self._defn.name is None and self._defn.dynamic_config_fn is not None: dynamic_config = None try: - with self._as_read_only(): + with self._as_read_only(in_query_or_validator=False): dynamic_config = self._defn.dynamic_config_fn(workflow_instance) except Exception as err: logger.exception( diff --git a/temporalio/workflow.py b/temporalio/workflow.py index 90daecbe2..4df2fe781 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -740,6 +740,9 @@ def workflow_is_continue_as_new_suggested(self) -> bool: ... @abstractmethod def workflow_is_replaying(self) -> bool: ... + @abstractmethod + def workflow_is_replaying_history_events(self) -> bool: ... + @abstractmethod def workflow_memo(self) -> Mapping[str, Any]: ... @@ -1444,11 +1447,24 @@ def _set_in_sandbox(v: bool) -> None: def is_replaying() -> bool: """Whether the workflow is currently replaying. + This includes queries and update validators that occur during replay. + Returns: True if the workflow is currently replaying """ return _Runtime.current().workflow_is_replaying() + @staticmethod + def is_replaying_history_events() -> bool: + """Whether the workflow is replaying history events. + + This excludes queries and update validators, which are live operations. + + Returns: + True if replaying history events, False otherwise. + """ + return _Runtime.current().workflow_is_replaying_history_events() + @staticmethod def is_sandbox_unrestricted() -> bool: """Whether the current block of code is not restricted via sandbox. @@ -1602,7 +1618,7 @@ def process( def isEnabledFor(self, level: int) -> bool: """Override to ignore replay logs.""" - if not self.log_during_replay and unsafe.is_replaying(): + if not self.log_during_replay and unsafe.is_replaying_history_events(): return False return super().isEnabledFor(level) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 8c7feae82..719b567e7 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -1986,6 +1986,7 @@ def my_update(self, value: str) -> None: @workflow.query def last_signal(self) -> str: + workflow.logger.info("Query called") return self._last_signal @@ -2021,6 +2022,7 @@ async def test_workflow_logging(client: Client): assert capturer.find_log("Signal: signal 2") assert capturer.find_log("Update: update 1") assert capturer.find_log("Update: update 2") + assert capturer.find_log("Query called") assert not capturer.find_log("Signal: signal 3") # Also make sure it has some workflow info and correct funcName record = capturer.find_log("Signal: signal 1")