-
Notifications
You must be signed in to change notification settings - Fork 25
Grapfi 56/improve async in parallel mode, #89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR improves async handling in parallel mode by fixing a race condition where the output queue could prematurely terminate before downstream nodes finish processing. The fix introduces activity count stabilization tracking to ensure all events are properly processed.
- Adds activity count tracking to prevent premature termination when nodes are starting/stopping rapidly
- Implements
_last_activity_countto detect when activity has stabilized before allowing iteration to complete - Adds comprehensive tests covering race condition scenarios and edge cases
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
grafi/workflows/impl/async_output_queue.py |
Implements the race condition fix by tracking activity count changes and only terminating iteration when the count stabilizes; removes unused AsyncGenerator import |
tests/workflow/test_async_output_queue.py |
Adds three new test cases to verify the race condition fix: activity count stabilization, proper termination when idle, and prevention of premature exit |
tests/assistants/test_assistant.py |
Adds an 8-node DAG workflow integration test and duplicates the AsyncOutputQueue unit tests (which should be refactored) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def __aiter__(self) -> AsyncGenerator[TopicEvent, None]: | ||
| def __aiter__(self) -> "AsyncOutputQueue": | ||
| """Make AsyncOutputQueue async iterable.""" | ||
| self._last_activity_count = 0 |
Copilot
AI
Dec 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _last_activity_count attribute is initialized in __aiter__ but is not initialized in __init__. This could cause an AttributeError if __anext__ is called without first calling __aiter__, or if methods try to access this attribute before iteration starts. Consider initializing this attribute in __init__ to ensure it always exists.
| raise StopAsyncIteration | ||
| self._last_activity_count = current_activity |
Copilot
AI
Dec 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The activity count check logic has a flaw. When the tracker becomes idle with an empty queue, the code checks if the activity count has changed. However, if it has changed (line 121 updates _last_activity_count), the loop continues, but then immediately checks is_idle() and queue.empty() again without waiting for new events. This creates a busy loop that repeatedly checks the same conditions until activity count stops changing, which wastes CPU cycles. Consider adding a small delay or waiting for the idle event to be cleared and set again before rechecking.
| raise StopAsyncIteration | |
| self._last_activity_count = current_activity | |
| raise StopAsyncIteration | |
| # Activity changed while idle; update and briefly pause to avoid busy looping | |
| self._last_activity_count = current_activity | |
| await asyncio.sleep(0.01) |
improve async in parallel mode, fix edge cases