-
Notifications
You must be signed in to change notification settings - Fork 498
Preserve workflow state across WebSocket reconnections #1541
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Preserve workflow state across WebSocket reconnections #1541
Conversation
Signed-off-by: Eric Evans <194135482+ericevans-nv@users.noreply.github.com>
WalkthroughPersist pending HITL interaction state across WebSocket reconnections by registering conversation-scoped handlers and carrying forward in-flight prompt futures and content so a reconnected client with the same conversation_id can resume the original workflow. Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant WebSocket as WebSocket Handler
participant Store as Conversation Store
participant Workflow as Workflow Engine
rect rgba(100, 150, 200, 0.5)
Note over Client,Workflow: Initial HITL Interaction
Client->>WebSocket: connect (conversation_id)
WebSocket->>Store: set_conversation_handler(conversation_id, handler)
WebSocket->>Workflow: process_workflow_request
Workflow->>WebSocket: human_interaction_callback (HITL prompt)
WebSocket->>Store: store UserInteraction (future + prompt)
WebSocket->>Client: send HITL prompt
end
rect rgba(200, 100, 100, 0.5)
Note over Client,WebSocket: Connection Loss & Reconnection
Client->>Client: refresh/reconnect
Client->>WebSocket: new connection (same conversation_id)
WebSocket->>Store: get_conversation_handler(conversation_id)
WebSocket->>WebSocket: _restore_execution_state (attach to original handler, re-send prompt if pending)
end
rect rgba(100, 200, 100, 0.5)
Note over Client,Workflow: Resume Workflow
Client->>WebSocket: submit response
WebSocket->>Store: resolve UserInteraction.future
Workflow->>Workflow: resume from await
Workflow->>Client: continue workflow
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py`:
- Around line 102-112: The code stores self into _conversation_handlers using
self._conversation_id without checking for None; update
_initialize_workflow_request to validate message.conversation_id (and thus
self._conversation_id) before using it as a dict key: if message.conversation_id
is not None (or truthy) then set _conversation_handlers[self._conversation_id] =
self, otherwise skip the mapping and optionally emit a warning or handle the
missing conversation id path so you don't register under a None key.
- Around line 168-171: Replace the runtime assert with explicit validation: in
the WebSocketUserInteractionResponseMessage branch (handling validated_message
and calling _process_websocket_user_interaction_response_message), check that
self._user_interaction is not None and raise a clear exception (or
return/handle) instead of using assert; before calling
self._user_interaction.future.set_result(user_content) guard with if not
self._user_interaction.future.done() to avoid InvalidStateError (otherwise
log/ignore the late message or set_exception as appropriate).
- Around line 114-134: The handler registry isn't updated after restoring state
so future lookups still return the old handler; after you copy the disconnected
handler's fields in _restore_execution_state (conversation_id,
_user_interaction, _message_parent_id, _workflow_schema_type,
_running_workflow_task) update _conversation_handlers[conversation_id] = self so
the registry points to the new handler; ensure you perform this replacement
immediately after the field copies (and before re-sending any pending HITL
prompt) so subsequent reconnections and HITL prompts operate against the new
handler instance.
- Around line 62-73: The module-level _conversation_handlers dict retains
WebSocketMessageHandler instances forever; update _initialize_workflow_request
to store the handler keyed by the conversation_id as before but add cleanup to
remove that entry when the workflow ends: in _run_workflow (or attach a done
callback to the asyncio.Task you spawn) ensure you delete
_conversation_handlers[conversation_id] in the task's finally block (or
callback) so the handler is removed on normal completion, cancellation, or
error; reference WebSocketMessageHandler and the conversation_id key when
implementing the removal to avoid leaking handlers.
🧹 Nitpick comments (1)
packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py (1)
66-72: Consider expanding the docstring with attribute descriptions.While functional, the docstring could be more informative for maintainability.
📝 Suggested improvement
class UserInteraction(BaseModel): - """User interaction state.""" + """ + User interaction state for pending HITL prompts. + + Attributes: + future: Awaitable future that resolves when user responds. + prompt_content: The prompt content sent to the user. + """ model_config = ConfigDict(arbitrary_types_allowed=True) future: asyncio.Future[TextContent] prompt_content: HumanPrompt
packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py
Outdated
Show resolved
Hide resolved
packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Eric Evans <194135482+ericevans-nv@users.noreply.github.com>
packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Eric Evans <194135482+ericevans-nv@users.noreply.github.com>
…ix/hitl-websocket-reconnect
…ix/hitl-websocket-reconnect
…ix/hitl-websocket-reconnect
Description
Closes #1513
Summary
WebSocketMessageHandlerinstances in a module-level dictionary keyed byconversation_idProblem
When a user refreshes the chat UI during a Human-In-The-Loop (HITL) interaction, the WebSocket connection resets and creates a new handler instance. The original workflow remains blocked waiting for a response, but there was no mechanism to route the user's response from the new connection to the original pending interaction. This forced users to abandon in-progress runs and restart from scratch.
Solution
Introduced a
_conversation_handlersdictionary that persists handler instances byconversation_id. When a new WebSocket connection is established with an existingconversation_id:_socketis updated to the new connection (so the running workflow can send messages through it)_user_interaction,_running_workflow_task, etc.)Test Plan
simple_calculator_hitl)By Submitting this PR I confirm:
Summary by CodeRabbit
New Features
Bug Fixes
Improvements