-
Notifications
You must be signed in to change notification settings - Fork 46
refact: introduce runtime plugin architecture and context separation #296
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
🦋 Changeset detectedLatest commit: bc659fd The changes in this PR will be included in the next version bump. This PR includes changesets to release 2 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| async def stream_published_events(self) -> AsyncGenerator[Event, None]: | ||
| async with self._queues.stream_lock: | ||
| if self._queues.complete.done() and self._queues.publish_queue.empty(): | ||
| return | ||
| while True: | ||
| item = await self._queues.publish_queue.get() | ||
| yield item | ||
| if isinstance(item, StopEvent): | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 WorkflowHandler.stream_events() may be callable multiple times if adapter stream ends without yielding StopEvent
WorkflowHandler.stream_events() enforces single-consumption via _all_events_consumed, but it only flips that flag when it sees a StopEvent (workflows/handler.py:150-163).
With the new asyncio runtime adapter, ExternalAsyncioAdapter.stream_published_events() can terminate without yielding a StopEvent: it returns immediately when the run is complete and the publish queue is empty (workflows/plugins/basic.py:140-148, specifically the early return at workflows/plugins/basic.py:141-143). This happens when another consumer already drained the publish queue (including the StopEvent) and a later caller starts streaming.
Actual behavior:
- The async generator ends without yielding
StopEvent, so_all_events_consumedremainsFalse. - A second call to
handler.stream_events()will not raiseWorkflowRuntimeErrorand will start another (empty) stream, violating the documented “only be streamed once per handler instance” contract.
Expected behavior:
- Either the adapter should always yield a terminal
StopEventto late subscribers, or the handler should treat a clean end-of-stream as “consumed” and set_all_events_consumed=True.
Impact: breaks API contract and can confuse server/clients that rely on the one-time streaming invariant, especially in multi-consumer scenarios (e.g., server draining + user code streaming).
Recommendation: Fix at one layer:
- Adapter-level: for completed runs, yield the final
StopEvent(e.g., cache it and yield it to late subscribers), instead of returning with an empty stream; or - Handler-level: if the underlying stream terminates without a
StopEvent, set_all_events_consumed=Truebefore returning, so subsequent calls raise as documented.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 ResourceManager caching treats falsey cached resources as missing and recomputes them
ResourceManager.get() uses elif resource.cache and not self.resources.get(resource.name, None): to decide if a cached resource exists. If a resource factory legitimately returns a falsey value (e.g., 0, False, "", empty list), the manager will repeatedly re-create it instead of returning the cached value.
Code:
elif resource.cache and not self.resources.get(resource.name, None):
val = await resource.call()
await self.set(resource.name, val)
else:
val = self.resources.get(resource.name)packages/llama-index-workflows/src/workflows/resource.py:234-241
Actual: falsey resources are effectively non-cacheable. Expected: cache presence should be based on key existence, not truthiness.
(Refers to lines 232-241)
Recommendation: Use if resource.name not in self.resources (or self.resources.get(...) is None if None is the only sentinel) to test cache presence.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good feedback, but seems kind of unrelated to changes here. Addressed elsewhere
packages/llama-index-workflows/src/workflows/runtime/types/step_function.py
Show resolved
Hide resolved
| @property | ||
| def is_running(self) -> bool: | ||
| """Whether the workflow is currently running.""" | ||
| return self._require_v2_runtime_compatibility().is_running | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 Context.is_running can raise on non-v2 runtimes because ExternalContext.is_running requires V2RuntimeCompatibilityShim
Context.is_running delegates to ExternalContext.is_running when the context is in the external face. But ExternalContext.is_running calls _require_v2_runtime_compatibility() and raises WorkflowRuntimeError if the runtime does not implement the deprecated V2RuntimeCompatibilityShim.
Code:
ExternalContext.is_running -> self._require_v2_runtime_compatibility().is_runningpackages/llama-index-workflows/src/workflows/context/external_context.py:56-59Context.is_runninguses that for external facepackages/llama-index-workflows/src/workflows/context/context.py:159-165
Impact: Any call to ctx.is_running (including internal library usage such as Workflow.run() checking whether it should build a start event) will crash for runtimes that implement the new adapter interfaces but not the v2 shim. Actual: WorkflowRuntimeError thrown from a basic property access. Expected: is_running should be supported for all runtimes, or Workflow.run() should not rely on it.
Recommendation: Make ExternalContext.is_running work without the v2 shim (e.g., infer from get_result_or_none equivalent, or maintain running state in the adapter), or avoid calling ctx.is_running in places that must support non-v2 runtimes.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made this a bit more compatible
d0b6b74 to
bed12a8
Compare
- Add Runtime abstract base class with BasicRuntime as default implementation - Split context into external, internal, and pre-context modules - Replace workflow_registry with workflow_tracker for workflow lifecycle management - Remove broker.py in favor of runtime-managed execution - Add plugins package with get_current_runtime() for context-aware runtime access - Update handler and workflow to use new runtime interfaces Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
bed12a8 to
1ac259b
Compare
Uh oh!
There was an error while loading. Please reload this page.