Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions grafi/workflows/impl/async_output_queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
from typing import AsyncGenerator
from typing import List

from grafi.common.events.topic_events.topic_event import TopicEvent
Expand Down Expand Up @@ -84,8 +83,9 @@ async def _output_listener(self, topic: TopicBase) -> None:
for t in pending:
t.cancel()

def __aiter__(self) -> AsyncGenerator[TopicEvent, None]:
def __aiter__(self) -> "AsyncOutputQueue":
"""Make AsyncOutputQueue async iterable."""
self._last_activity_count = 0
Copy link

Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _last_activity_count attribute is initialized in __aiter__ but is not initialized in __init__. This could cause an AttributeError if __anext__ is called without first calling __aiter__, or if methods try to access this attribute before iteration starts. Consider initializing this attribute in __init__ to ensure it always exists.

Copilot uses AI. Check for mistakes.
return self

async def __anext__(self) -> TopicEvent:
Expand Down Expand Up @@ -114,4 +114,8 @@ async def __anext__(self) -> TopicEvent:
await asyncio.sleep(0) # one event‑loop tick

if self.tracker.is_idle() and self.queue.empty():
raise StopAsyncIteration
current_activity = self.tracker.get_activity_count()
# Only terminate if no new activity since last check
if current_activity == self._last_activity_count:
raise StopAsyncIteration
self._last_activity_count = current_activity
Comment on lines +120 to +121
Copy link

Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The activity count check logic has a flaw. When the tracker becomes idle with an empty queue, the code checks if the activity count has changed. However, if it has changed (line 121 updates _last_activity_count), the loop continues, but then immediately checks is_idle() and queue.empty() again without waiting for new events. This creates a busy loop that repeatedly checks the same conditions until activity count stops changing, which wastes CPU cycles. Consider adding a small delay or waiting for the idle event to be cleared and set again before rechecking.

Suggested change
raise StopAsyncIteration
self._last_activity_count = current_activity
raise StopAsyncIteration
# Activity changed while idle; update and briefly pause to avoid busy looping
self._last_activity_count = current_activity
await asyncio.sleep(0.01)

Copilot uses AI. Check for mistakes.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "grafi"
version = "0.0.33"
version = "0.0.34"
description = "Grafi - a flexible, event-driven framework that enables the creation of domain-specific AI agents through composable agentic workflows."
authors = [{name = "Craig Li", email = "craig@binome.dev"}]
license = {text = "Mozilla Public License Version 2.0"}
Expand Down
245 changes: 245 additions & 0 deletions tests/assistants/test_assistant.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,251 @@ def test_generate_manifest_custom_directory(self, mock_assistant, tmp_path):
},
}

@pytest.mark.asyncio
async def test_eight_node_dag_workflow(self):
"""
Test a DAG workflow with 8 nodes: A->B, B->C, C->D, C->E, C->F, D->G, E->G, F->G, G->H.

Each node concatenates the previous input with its own label.
For example, node B receives "A" and outputs "AB".

The topology creates a fan-out at C (to D, E, F) and a fan-in at G (from D, E, F).
"""
from grafi.common.events.topic_events.publish_to_topic_event import (
PublishToTopicEvent,
)
from grafi.common.models.invoke_context import InvokeContext
from grafi.common.models.message import Message
from grafi.nodes.node import Node
from grafi.tools.functions.function_tool import FunctionTool
from grafi.topics.expressions.subscription_builder import SubscriptionBuilder
from grafi.topics.topic_impl.input_topic import InputTopic
from grafi.topics.topic_impl.output_topic import OutputTopic
from grafi.topics.topic_impl.topic import Topic
from grafi.workflows.impl.event_driven_workflow import EventDrivenWorkflow

# Define the concatenation function for each node
def make_concat_func(label: str):
def concat_func(messages):
# Collect all content from input messages
contents = []
for msg in messages:
if msg.content:
contents.append(msg.content)
# Sort to ensure deterministic ordering for fan-in scenarios
contents.sort()
combined = "".join(contents)
return f"{combined}{label}"

return concat_func

# Create topics
# Input/Output topics for the workflow
agent_input_topic = InputTopic(name="agent_input")
agent_output_topic = OutputTopic(name="agent_output")

# Intermediate topics for connecting nodes
topic_a_out = Topic(name="topic_a_out")
topic_b_out = Topic(name="topic_b_out")
topic_c_out = Topic(name="topic_c_out")
topic_d_out = Topic(name="topic_d_out")
topic_e_out = Topic(name="topic_e_out")
topic_f_out = Topic(name="topic_f_out")
topic_g_out = Topic(name="topic_g_out")

# Create nodes
# Node A: subscribes to agent_input, publishes to topic_a_out
node_a = (
Node.builder()
.name("NodeA")
.type("ConcatNode")
.subscribe(SubscriptionBuilder().subscribed_to(agent_input_topic).build())
.tool(
FunctionTool.builder()
.name("ConcatToolA")
.function(make_concat_func("A"))
.build()
)
.publish_to(topic_a_out)
.build()
)

# Node B: subscribes to topic_a_out, publishes to topic_b_out
node_b = (
Node.builder()
.name("NodeB")
.type("ConcatNode")
.subscribe(SubscriptionBuilder().subscribed_to(topic_a_out).build())
.tool(
FunctionTool.builder()
.name("ConcatToolB")
.function(make_concat_func("B"))
.build()
)
.publish_to(topic_b_out)
.build()
)

# Node C: subscribes to topic_b_out, publishes to topic_c_out
node_c = (
Node.builder()
.name("NodeC")
.type("ConcatNode")
.subscribe(SubscriptionBuilder().subscribed_to(topic_b_out).build())
.tool(
FunctionTool.builder()
.name("ConcatToolC")
.function(make_concat_func("C"))
.build()
)
.publish_to(topic_c_out)
.build()
)

# Node D: subscribes to topic_c_out, publishes to topic_d_out (fan-out from C)
node_d = (
Node.builder()
.name("NodeD")
.type("ConcatNode")
.subscribe(SubscriptionBuilder().subscribed_to(topic_c_out).build())
.tool(
FunctionTool.builder()
.name("ConcatToolD")
.function(make_concat_func("D"))
.build()
)
.publish_to(topic_d_out)
.build()
)

# Node E: subscribes to topic_c_out, publishes to topic_e_out (fan-out from C)
node_e = (
Node.builder()
.name("NodeE")
.type("ConcatNode")
.subscribe(SubscriptionBuilder().subscribed_to(topic_c_out).build())
.tool(
FunctionTool.builder()
.name("ConcatToolE")
.function(make_concat_func("E"))
.build()
)
.publish_to(topic_e_out)
.build()
)

# Node F: subscribes to topic_c_out, publishes to topic_f_out (fan-out from C)
node_f = (
Node.builder()
.name("NodeF")
.type("ConcatNode")
.subscribe(SubscriptionBuilder().subscribed_to(topic_c_out).build())
.tool(
FunctionTool.builder()
.name("ConcatToolF")
.function(make_concat_func("F"))
.build()
)
.publish_to(topic_f_out)
.build()
)

# Node G: subscribes to topic_d_out AND topic_e_out AND topic_f_out (fan-in)
node_g = (
Node.builder()
.name("NodeG")
.type("ConcatNode")
.subscribe(
SubscriptionBuilder()
.subscribed_to(topic_d_out)
.and_()
.subscribed_to(topic_e_out)
.and_()
.subscribed_to(topic_f_out)
.build()
)
.tool(
FunctionTool.builder()
.name("ConcatToolG")
.function(make_concat_func("G"))
.build()
)
.publish_to(topic_g_out)
.build()
)

# Node H: subscribes to topic_g_out, publishes to agent_output
node_h = (
Node.builder()
.name("NodeH")
.type("ConcatNode")
.subscribe(SubscriptionBuilder().subscribed_to(topic_g_out).build())
.tool(
FunctionTool.builder()
.name("ConcatToolH")
.function(make_concat_func("H"))
.build()
)
.publish_to(agent_output_topic)
.build()
)

# Build the workflow
workflow = (
EventDrivenWorkflow.builder()
.name("EightNodeDAGWorkflow")
.node(node_a)
.node(node_b)
.node(node_c)
.node(node_d)
.node(node_e)
.node(node_f)
.node(node_g)
.node(node_h)
.build()
)

# Create assistant with the workflow
with patch.object(Assistant, "_construct_workflow"):
assistant = Assistant(
name="EightNodeDAGAssistant",
workflow=workflow,
)

# Create invoke context and input
invoke_context = InvokeContext(
conversation_id="test_dag_conversation",
invoke_id="test_dag_invoke",
assistant_request_id="test_dag_request",
)

# Start with empty input - each node adds its label
input_messages = [Message(content="", role="user")]
input_data = PublishToTopicEvent(
invoke_context=invoke_context, data=input_messages
)

# Invoke the workflow (using default parallel mode)
result_events = []
async for event in assistant.invoke(input_data):
result_events.append(event)

# Verify we get exactly 1 event from the agent_output topic
assert len(result_events) == 1, f"Expected 1 event, got {len(result_events)}"
assert result_events[0].name == "agent_output"

# The expected output path is:
# A: "" -> "A"
# B: "A" -> "AB"
# C: "AB" -> "ABC"
# D: "ABC" -> "ABCD"
# E: "ABC" -> "ABCE"
# F: "ABC" -> "ABCF"
# G: combines "ABCD", "ABCE", "ABCF" (sorted) -> "ABCDABCEABCFG"
# H: "ABCDABCEABCFG" -> "ABCDABCEABCFGH"
expected_output = "ABCDABCEABCFGH"
assert result_events[0].data[0].content == expected_output

def test_generate_manifest_file_write_error(self, mock_assistant):
"""Test manifest generation with file write error."""
with patch("builtins.open", side_effect=IOError("Permission denied")):
Expand Down
Loading
Loading