diff --git a/grafi/workflows/impl/async_output_queue.py b/grafi/workflows/impl/async_output_queue.py index 3607126..32145a6 100644 --- a/grafi/workflows/impl/async_output_queue.py +++ b/grafi/workflows/impl/async_output_queue.py @@ -1,5 +1,4 @@ import asyncio -from typing import AsyncGenerator from typing import List from grafi.common.events.topic_events.topic_event import TopicEvent @@ -84,8 +83,9 @@ async def _output_listener(self, topic: TopicBase) -> None: for t in pending: t.cancel() - def __aiter__(self) -> AsyncGenerator[TopicEvent, None]: + def __aiter__(self) -> "AsyncOutputQueue": """Make AsyncOutputQueue async iterable.""" + self._last_activity_count = 0 return self async def __anext__(self) -> TopicEvent: @@ -114,4 +114,8 @@ async def __anext__(self) -> TopicEvent: await asyncio.sleep(0) # one event‑loop tick if self.tracker.is_idle() and self.queue.empty(): - raise StopAsyncIteration + current_activity = self.tracker.get_activity_count() + # Only terminate if no new activity since last check + if current_activity == self._last_activity_count: + raise StopAsyncIteration + self._last_activity_count = current_activity diff --git a/pyproject.toml b/pyproject.toml index 606d2b8..92315dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "grafi" -version = "0.0.33" +version = "0.0.34" description = "Grafi - a flexible, event-driven framework that enables the creation of domain-specific AI agents through composable agentic workflows." authors = [{name = "Craig Li", email = "craig@binome.dev"}] license = {text = "Mozilla Public License Version 2.0"} diff --git a/tests/assistants/test_assistant.py b/tests/assistants/test_assistant.py index cdbfbb3..e181c9a 100644 --- a/tests/assistants/test_assistant.py +++ b/tests/assistants/test_assistant.py @@ -314,6 +314,251 @@ def test_generate_manifest_custom_directory(self, mock_assistant, tmp_path): }, } + @pytest.mark.asyncio + async def test_eight_node_dag_workflow(self): + """ + Test a DAG workflow with 8 nodes: A->B, B->C, C->D, C->E, C->F, D->G, E->G, F->G, G->H. + + Each node concatenates the previous input with its own label. + For example, node B receives "A" and outputs "AB". + + The topology creates a fan-out at C (to D, E, F) and a fan-in at G (from D, E, F). + """ + from grafi.common.events.topic_events.publish_to_topic_event import ( + PublishToTopicEvent, + ) + from grafi.common.models.invoke_context import InvokeContext + from grafi.common.models.message import Message + from grafi.nodes.node import Node + from grafi.tools.functions.function_tool import FunctionTool + from grafi.topics.expressions.subscription_builder import SubscriptionBuilder + from grafi.topics.topic_impl.input_topic import InputTopic + from grafi.topics.topic_impl.output_topic import OutputTopic + from grafi.topics.topic_impl.topic import Topic + from grafi.workflows.impl.event_driven_workflow import EventDrivenWorkflow + + # Define the concatenation function for each node + def make_concat_func(label: str): + def concat_func(messages): + # Collect all content from input messages + contents = [] + for msg in messages: + if msg.content: + contents.append(msg.content) + # Sort to ensure deterministic ordering for fan-in scenarios + contents.sort() + combined = "".join(contents) + return f"{combined}{label}" + + return concat_func + + # Create topics + # Input/Output topics for the workflow + agent_input_topic = InputTopic(name="agent_input") + agent_output_topic = OutputTopic(name="agent_output") + + # Intermediate topics for connecting nodes + topic_a_out = Topic(name="topic_a_out") + topic_b_out = Topic(name="topic_b_out") + topic_c_out = Topic(name="topic_c_out") + topic_d_out = Topic(name="topic_d_out") + topic_e_out = Topic(name="topic_e_out") + topic_f_out = Topic(name="topic_f_out") + topic_g_out = Topic(name="topic_g_out") + + # Create nodes + # Node A: subscribes to agent_input, publishes to topic_a_out + node_a = ( + Node.builder() + .name("NodeA") + .type("ConcatNode") + .subscribe(SubscriptionBuilder().subscribed_to(agent_input_topic).build()) + .tool( + FunctionTool.builder() + .name("ConcatToolA") + .function(make_concat_func("A")) + .build() + ) + .publish_to(topic_a_out) + .build() + ) + + # Node B: subscribes to topic_a_out, publishes to topic_b_out + node_b = ( + Node.builder() + .name("NodeB") + .type("ConcatNode") + .subscribe(SubscriptionBuilder().subscribed_to(topic_a_out).build()) + .tool( + FunctionTool.builder() + .name("ConcatToolB") + .function(make_concat_func("B")) + .build() + ) + .publish_to(topic_b_out) + .build() + ) + + # Node C: subscribes to topic_b_out, publishes to topic_c_out + node_c = ( + Node.builder() + .name("NodeC") + .type("ConcatNode") + .subscribe(SubscriptionBuilder().subscribed_to(topic_b_out).build()) + .tool( + FunctionTool.builder() + .name("ConcatToolC") + .function(make_concat_func("C")) + .build() + ) + .publish_to(topic_c_out) + .build() + ) + + # Node D: subscribes to topic_c_out, publishes to topic_d_out (fan-out from C) + node_d = ( + Node.builder() + .name("NodeD") + .type("ConcatNode") + .subscribe(SubscriptionBuilder().subscribed_to(topic_c_out).build()) + .tool( + FunctionTool.builder() + .name("ConcatToolD") + .function(make_concat_func("D")) + .build() + ) + .publish_to(topic_d_out) + .build() + ) + + # Node E: subscribes to topic_c_out, publishes to topic_e_out (fan-out from C) + node_e = ( + Node.builder() + .name("NodeE") + .type("ConcatNode") + .subscribe(SubscriptionBuilder().subscribed_to(topic_c_out).build()) + .tool( + FunctionTool.builder() + .name("ConcatToolE") + .function(make_concat_func("E")) + .build() + ) + .publish_to(topic_e_out) + .build() + ) + + # Node F: subscribes to topic_c_out, publishes to topic_f_out (fan-out from C) + node_f = ( + Node.builder() + .name("NodeF") + .type("ConcatNode") + .subscribe(SubscriptionBuilder().subscribed_to(topic_c_out).build()) + .tool( + FunctionTool.builder() + .name("ConcatToolF") + .function(make_concat_func("F")) + .build() + ) + .publish_to(topic_f_out) + .build() + ) + + # Node G: subscribes to topic_d_out AND topic_e_out AND topic_f_out (fan-in) + node_g = ( + Node.builder() + .name("NodeG") + .type("ConcatNode") + .subscribe( + SubscriptionBuilder() + .subscribed_to(topic_d_out) + .and_() + .subscribed_to(topic_e_out) + .and_() + .subscribed_to(topic_f_out) + .build() + ) + .tool( + FunctionTool.builder() + .name("ConcatToolG") + .function(make_concat_func("G")) + .build() + ) + .publish_to(topic_g_out) + .build() + ) + + # Node H: subscribes to topic_g_out, publishes to agent_output + node_h = ( + Node.builder() + .name("NodeH") + .type("ConcatNode") + .subscribe(SubscriptionBuilder().subscribed_to(topic_g_out).build()) + .tool( + FunctionTool.builder() + .name("ConcatToolH") + .function(make_concat_func("H")) + .build() + ) + .publish_to(agent_output_topic) + .build() + ) + + # Build the workflow + workflow = ( + EventDrivenWorkflow.builder() + .name("EightNodeDAGWorkflow") + .node(node_a) + .node(node_b) + .node(node_c) + .node(node_d) + .node(node_e) + .node(node_f) + .node(node_g) + .node(node_h) + .build() + ) + + # Create assistant with the workflow + with patch.object(Assistant, "_construct_workflow"): + assistant = Assistant( + name="EightNodeDAGAssistant", + workflow=workflow, + ) + + # Create invoke context and input + invoke_context = InvokeContext( + conversation_id="test_dag_conversation", + invoke_id="test_dag_invoke", + assistant_request_id="test_dag_request", + ) + + # Start with empty input - each node adds its label + input_messages = [Message(content="", role="user")] + input_data = PublishToTopicEvent( + invoke_context=invoke_context, data=input_messages + ) + + # Invoke the workflow (using default parallel mode) + result_events = [] + async for event in assistant.invoke(input_data): + result_events.append(event) + + # Verify we get exactly 1 event from the agent_output topic + assert len(result_events) == 1, f"Expected 1 event, got {len(result_events)}" + assert result_events[0].name == "agent_output" + + # The expected output path is: + # A: "" -> "A" + # B: "A" -> "AB" + # C: "AB" -> "ABC" + # D: "ABC" -> "ABCD" + # E: "ABC" -> "ABCE" + # F: "ABC" -> "ABCF" + # G: combines "ABCD", "ABCE", "ABCF" (sorted) -> "ABCDABCEABCFG" + # H: "ABCDABCEABCFG" -> "ABCDABCEABCFGH" + expected_output = "ABCDABCEABCFGH" + assert result_events[0].data[0].content == expected_output + def test_generate_manifest_file_write_error(self, mock_assistant): """Test manifest generation with file write error.""" with patch("builtins.open", side_effect=IOError("Permission denied")): diff --git a/tests/workflow/test_async_output_queue.py b/tests/workflow/test_async_output_queue.py index afd4455..049178f 100644 --- a/tests/workflow/test_async_output_queue.py +++ b/tests/workflow/test_async_output_queue.py @@ -1,4 +1,5 @@ import asyncio +from unittest.mock import Mock import pytest @@ -268,3 +269,146 @@ async def test_concurrent_listeners(self, tracker): # Should have collected all events assert len(collected) == 3 assert all(isinstance(e, PublishToTopicEvent) for e in collected) + + @pytest.mark.asyncio + async def test_anext_waits_for_activity_count_stabilization(self): + """ + Test that __anext__ doesn't prematurely terminate when activity count changes. + + This tests the race condition fix where the output queue could terminate + before downstream nodes finish processing. + """ + tracker = AsyncNodeTracker() + + output_queue = AsyncOutputQueue( + output_topics=[], # Empty - we'll put events directly in queue + consumer_name="test_consumer", + tracker=tracker, + ) + + # Simulate: node enters, adds item to queue, leaves + # Then another node should enter before we terminate + + async def simulate_node_activity(): + """Simulate node activity that should prevent premature termination.""" + # First node processes + await tracker.enter("node_1") + await output_queue.queue.put(Mock(name="event_1")) + await tracker.leave("node_1") + + # Yield control - simulates realistic timing where next node + # starts within the same event loop cycle + await asyncio.sleep(0) + + # Second node picks up and processes + await tracker.enter("node_2") + await output_queue.queue.put(Mock(name="event_2")) + await tracker.leave("node_2") + + # Start the activity simulation + activity_task = asyncio.create_task(simulate_node_activity()) + + # Iterate over the queue + events = [] + async for event in output_queue: + events.append(event) + if len(events) >= 2: + break + + await activity_task + + # Should have received both events + assert len(events) == 2 + + @pytest.mark.asyncio + async def test_anext_terminates_when_truly_idle(self): + """ + Test that __anext__ correctly terminates when no more activity. + """ + tracker = AsyncNodeTracker() + + output_queue = AsyncOutputQueue( + output_topics=[], # Empty - we'll put events directly in queue + consumer_name="test_consumer", + tracker=tracker, + ) + + # Single node processes and finishes + async def simulate_single_node(): + await tracker.enter("node_1") + await output_queue.queue.put(Mock(name="event_1")) + await tracker.leave("node_1") + + activity_task = asyncio.create_task(simulate_single_node()) + + events = [] + async for event in output_queue: + events.append(event) + + await activity_task + + # Should terminate after receiving the single event + assert len(events) == 1 + + @pytest.mark.asyncio + async def test_activity_count_prevents_premature_exit(self): + """ + Test specifically that activity count tracking prevents race condition. + + Scenario: + 1. Node A finishes and tracker goes idle + 2. __anext__ sees idle but activity count changed + 3. Node B starts before __anext__ decides to terminate + 4. All events are properly yielded + """ + tracker = AsyncNodeTracker() + + output_queue = AsyncOutputQueue( + output_topics=[], # Empty - we'll put events directly in queue + consumer_name="test_consumer", + tracker=tracker, + ) + + events_received = [] + iteration_complete = asyncio.Event() + + async def consumer(): + async for event in output_queue: + events_received.append(event) + iteration_complete.set() + + async def producer(): + # Node A processes + await tracker.enter("node_a") + await output_queue.queue.put(Mock(name="event_a")) + await tracker.leave("node_a") + + # Critical timing window - yield to let consumer check idle state + await asyncio.sleep(0) + + # Node B starts before consumer terminates (if fix works) + await tracker.enter("node_b") + await output_queue.queue.put(Mock(name="event_b")) + await tracker.leave("node_b") + + consumer_task = asyncio.create_task(consumer()) + producer_task = asyncio.create_task(producer()) + + # Wait for producer to finish + await producer_task + + # Wait a bit for consumer to process + try: + await asyncio.wait_for(iteration_complete.wait(), timeout=1.0) + except asyncio.TimeoutError: + consumer_task.cancel() + try: + await consumer_task + except asyncio.CancelledError: + pass + + # With the fix, we should receive both events + assert len(events_received) == 2, ( + f"Expected 2 events but got {len(events_received)}. " + "Race condition may have caused premature termination." + ) diff --git a/tests_integration/agents/run_agents.py b/tests_integration/agents/run_agents.py index 16d3c21..883c0b5 100644 --- a/tests_integration/agents/run_agents.py +++ b/tests_integration/agents/run_agents.py @@ -7,17 +7,22 @@ from pathlib import Path -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +try: + sys.stdout.reconfigure(encoding="utf-8") +except AttributeError: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", write_through=True) -def run_scripts(pass_local: bool = True) -> int: + +def run_scripts(pass_local: bool = True, collect: bool = False): """Run all example scripts in this directory. Args: pass_local: If True, skip tests with 'ollama' or 'local' in their name. + collect: If True, return per-script results without printing. Returns: - Exit code (0 for success, 1 for failure). + List of per-script results if collect is True, otherwise exit code (0 for success, 1 for failure). """ python_executable = sys.executable current_directory = Path(__file__).parent @@ -25,16 +30,26 @@ def run_scripts(pass_local: bool = True) -> int: # Find all example files example_files = sorted(current_directory.glob("*_example.py")) - passed_scripts = [] - failed_scripts = {} + results = [] for file in example_files: filename = file.name if pass_local and ("ollama" in filename or "_local" in filename): - print(f"Skipping {filename} (local test)") + message = f"Skipping {filename} (local test)" + if not collect: + print(message) + results.append( + { + "name": filename, + "status": "skipped", + "output": "", + "error": message, + } + ) continue - print(f"Running {filename}...") + if not collect: + print(f"Running {filename}...") try: result = subprocess.run( [python_executable, str(file)], @@ -43,23 +58,44 @@ def run_scripts(pass_local: bool = True) -> int: check=True, cwd=current_directory, ) - print(f"Output of {filename}:\n{result.stdout}") - passed_scripts.append(filename) + if not collect: + print(f"Output of {filename}:\n{result.stdout}") + results.append( + { + "name": filename, + "status": "passed", + "output": result.stdout, + "error": "", + } + ) except subprocess.CalledProcessError as e: - print(f"Error running {filename}:\n{e.stderr}") - failed_scripts[filename] = e.stderr + if not collect: + print(f"Error running {filename}:\n{e.stderr}") + results.append( + { + "name": filename, + "status": "failed", + "output": e.stdout, + "error": e.stderr, + } + ) + + if collect: + return results + + passed_scripts = [r for r in results if r["status"] == "passed"] + failed_scripts = [r for r in results if r["status"] == "failed"] - # Summary print("\n" + "=" * 50) print("Summary:") print(f"Passed: {len(passed_scripts)}") for script in passed_scripts: - print(f" ✓ {script}") + print(f" ✓ {script['name']}") if failed_scripts: print(f"\nFailed: {len(failed_scripts)}") for script in failed_scripts: - print(f" ✗ {script}") + print(f" ✗ {script['name']}") return 1 return 0 diff --git a/tests_integration/embedding_assistant/run_embedding_assistant.py b/tests_integration/embedding_assistant/run_embedding_assistant.py index 30325ab..10aae9f 100644 --- a/tests_integration/embedding_assistant/run_embedding_assistant.py +++ b/tests_integration/embedding_assistant/run_embedding_assistant.py @@ -7,17 +7,22 @@ from pathlib import Path -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +try: + sys.stdout.reconfigure(encoding="utf-8") +except AttributeError: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", write_through=True) -def run_scripts(pass_local: bool = True) -> int: + +def run_scripts(pass_local: bool = True, collect: bool = False): """Run all example scripts in this directory. Args: pass_local: If True, skip tests with 'ollama' or 'local' in their name. + collect: If True, return per-script results without printing. Returns: - Exit code (0 for success, 1 for failure). + List of per-script results if collect is True, otherwise exit code (0 for success, 1 for failure). """ python_executable = sys.executable current_directory = Path(__file__).parent @@ -25,16 +30,26 @@ def run_scripts(pass_local: bool = True) -> int: # Find all example files example_files = sorted(current_directory.glob("*_example.py")) - passed_scripts = [] - failed_scripts = {} + results = [] for file in example_files: filename = file.name if pass_local and ("ollama" in filename or "_local" in filename): - print(f"Skipping {filename} (local test)") + message = f"Skipping {filename} (local test)" + if not collect: + print(message) + results.append( + { + "name": filename, + "status": "skipped", + "output": "", + "error": message, + } + ) continue - print(f"Running {filename}...") + if not collect: + print(f"Running {filename}...") try: result = subprocess.run( [python_executable, str(file)], @@ -43,23 +58,44 @@ def run_scripts(pass_local: bool = True) -> int: check=True, cwd=current_directory, ) - print(f"Output of {filename}:\n{result.stdout}") - passed_scripts.append(filename) + if not collect: + print(f"Output of {filename}:\n{result.stdout}") + results.append( + { + "name": filename, + "status": "passed", + "output": result.stdout, + "error": "", + } + ) except subprocess.CalledProcessError as e: - print(f"Error running {filename}:\n{e.stderr}") - failed_scripts[filename] = e.stderr + if not collect: + print(f"Error running {filename}:\n{e.stderr}") + results.append( + { + "name": filename, + "status": "failed", + "output": e.stdout, + "error": e.stderr, + } + ) + + if collect: + return results + + passed_scripts = [r for r in results if r["status"] == "passed"] + failed_scripts = [r for r in results if r["status"] == "failed"] - # Summary print("\n" + "=" * 50) print("Summary:") print(f"Passed: {len(passed_scripts)}") for script in passed_scripts: - print(f" ✓ {script}") + print(f" ✓ {script['name']}") if failed_scripts: print(f"\nFailed: {len(failed_scripts)}") for script in failed_scripts: - print(f" ✗ {script}") + print(f" ✗ {script['name']}") return 1 return 0 diff --git a/tests_integration/event_store_postgres/run_event_store_postgres.py b/tests_integration/event_store_postgres/run_event_store_postgres.py index fdb4ffd..c9f60ff 100644 --- a/tests_integration/event_store_postgres/run_event_store_postgres.py +++ b/tests_integration/event_store_postgres/run_event_store_postgres.py @@ -7,17 +7,22 @@ from pathlib import Path -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +try: + sys.stdout.reconfigure(encoding="utf-8") +except AttributeError: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", write_through=True) -def run_scripts(pass_local: bool = True) -> int: + +def run_scripts(pass_local: bool = True, collect: bool = False): """Run all example scripts in this directory. Args: pass_local: If True, skip tests with 'ollama' or 'local' in their name. + collect: If True, return per-script results without printing. Returns: - Exit code (0 for success, 1 for failure). + List of per-script results if collect is True, otherwise exit code (0 for success, 1 for failure). """ python_executable = sys.executable current_directory = Path(__file__).parent @@ -25,16 +30,26 @@ def run_scripts(pass_local: bool = True) -> int: # Find all example files example_files = sorted(current_directory.glob("*_example.py")) - passed_scripts = [] - failed_scripts = {} + results = [] for file in example_files: filename = file.name if pass_local and ("ollama" in filename or "_local" in filename): - print(f"Skipping {filename} (local test)") + message = f"Skipping {filename} (local test)" + if not collect: + print(message) + results.append( + { + "name": filename, + "status": "skipped", + "output": "", + "error": message, + } + ) continue - print(f"Running {filename}...") + if not collect: + print(f"Running {filename}...") try: result = subprocess.run( [python_executable, str(file)], @@ -43,23 +58,44 @@ def run_scripts(pass_local: bool = True) -> int: check=True, cwd=current_directory, ) - print(f"Output of {filename}:\n{result.stdout}") - passed_scripts.append(filename) + if not collect: + print(f"Output of {filename}:\n{result.stdout}") + results.append( + { + "name": filename, + "status": "passed", + "output": result.stdout, + "error": "", + } + ) except subprocess.CalledProcessError as e: - print(f"Error running {filename}:\n{e.stderr}") - failed_scripts[filename] = e.stderr + if not collect: + print(f"Error running {filename}:\n{e.stderr}") + results.append( + { + "name": filename, + "status": "failed", + "output": e.stdout, + "error": e.stderr, + } + ) + + if collect: + return results + + passed_scripts = [r for r in results if r["status"] == "passed"] + failed_scripts = [r for r in results if r["status"] == "failed"] - # Summary print("\n" + "=" * 50) print("Summary:") print(f"Passed: {len(passed_scripts)}") for script in passed_scripts: - print(f" ✓ {script}") + print(f" ✓ {script['name']}") if failed_scripts: print(f"\nFailed: {len(failed_scripts)}") for script in failed_scripts: - print(f" ✗ {script}") + print(f" ✗ {script['name']}") return 1 return 0 diff --git a/tests_integration/function_assistant/run_function_assistant.py b/tests_integration/function_assistant/run_function_assistant.py index f059cbd..dbe48bb 100644 --- a/tests_integration/function_assistant/run_function_assistant.py +++ b/tests_integration/function_assistant/run_function_assistant.py @@ -7,17 +7,22 @@ from pathlib import Path -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +try: + sys.stdout.reconfigure(encoding="utf-8") +except AttributeError: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", write_through=True) -def run_scripts(pass_local: bool = True) -> int: + +def run_scripts(pass_local: bool = True, collect: bool = False): """Run all example scripts in this directory. Args: pass_local: If True, skip tests with 'ollama' or 'local' in their name. + collect: If True, return per-script results without printing. Returns: - Exit code (0 for success, 1 for failure). + List of per-script results if collect is True, otherwise exit code (0 for success, 1 for failure). """ python_executable = sys.executable current_directory = Path(__file__).parent @@ -25,16 +30,26 @@ def run_scripts(pass_local: bool = True) -> int: # Find all example files example_files = sorted(current_directory.glob("*_example.py")) - passed_scripts = [] - failed_scripts = {} + results = [] for file in example_files: filename = file.name if pass_local and ("ollama" in filename or "_local" in filename): - print(f"Skipping {filename} (local test)") + message = f"Skipping {filename} (local test)" + if not collect: + print(message) + results.append( + { + "name": filename, + "status": "skipped", + "output": "", + "error": message, + } + ) continue - print(f"Running {filename}...") + if not collect: + print(f"Running {filename}...") try: result = subprocess.run( [python_executable, str(file)], @@ -43,23 +58,44 @@ def run_scripts(pass_local: bool = True) -> int: check=True, cwd=current_directory, ) - print(f"Output of {filename}:\n{result.stdout}") - passed_scripts.append(filename) + if not collect: + print(f"Output of {filename}:\n{result.stdout}") + results.append( + { + "name": filename, + "status": "passed", + "output": result.stdout, + "error": "", + } + ) except subprocess.CalledProcessError as e: - print(f"Error running {filename}:\n{e.stderr}") - failed_scripts[filename] = e.stderr + if not collect: + print(f"Error running {filename}:\n{e.stderr}") + results.append( + { + "name": filename, + "status": "failed", + "output": e.stdout, + "error": e.stderr, + } + ) + + if collect: + return results + + passed_scripts = [r for r in results if r["status"] == "passed"] + failed_scripts = [r for r in results if r["status"] == "failed"] - # Summary print("\n" + "=" * 50) print("Summary:") print(f"Passed: {len(passed_scripts)}") for script in passed_scripts: - print(f" ✓ {script}") + print(f" ✓ {script['name']}") if failed_scripts: print(f"\nFailed: {len(failed_scripts)}") for script in failed_scripts: - print(f" ✗ {script}") + print(f" ✗ {script['name']}") return 1 return 0 diff --git a/tests_integration/function_call_assistant/run_function_call_assistant.py b/tests_integration/function_call_assistant/run_function_call_assistant.py index 8765391..f53579f 100644 --- a/tests_integration/function_call_assistant/run_function_call_assistant.py +++ b/tests_integration/function_call_assistant/run_function_call_assistant.py @@ -7,17 +7,22 @@ from pathlib import Path -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +try: + sys.stdout.reconfigure(encoding="utf-8") +except AttributeError: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", write_through=True) -def run_scripts(pass_local: bool = True) -> int: + +def run_scripts(pass_local: bool = True, collect: bool = False): """Run all example scripts in this directory. Args: pass_local: If True, skip tests with 'ollama' or 'local' in their name. + collect: If True, return per-script results without printing. Returns: - Exit code (0 for success, 1 for failure). + List of per-script results if collect is True, otherwise exit code (0 for success, 1 for failure). """ python_executable = sys.executable current_directory = Path(__file__).parent @@ -25,16 +30,26 @@ def run_scripts(pass_local: bool = True) -> int: # Find all example files example_files = sorted(current_directory.glob("*_example.py")) - passed_scripts = [] - failed_scripts = {} + results = [] for file in example_files: filename = file.name if pass_local and ("ollama" in filename or "_local" in filename): - print(f"Skipping {filename} (local test)") + message = f"Skipping {filename} (local test)" + if not collect: + print(message) + results.append( + { + "name": filename, + "status": "skipped", + "output": "", + "error": message, + } + ) continue - print(f"Running {filename}...") + if not collect: + print(f"Running {filename}...") try: result = subprocess.run( [python_executable, str(file)], @@ -43,23 +58,44 @@ def run_scripts(pass_local: bool = True) -> int: check=True, cwd=current_directory, ) - print(f"Output of {filename}:\n{result.stdout}") - passed_scripts.append(filename) + if not collect: + print(f"Output of {filename}:\n{result.stdout}") + results.append( + { + "name": filename, + "status": "passed", + "output": result.stdout, + "error": "", + } + ) except subprocess.CalledProcessError as e: - print(f"Error running {filename}:\n{e.stderr}") - failed_scripts[filename] = e.stderr + if not collect: + print(f"Error running {filename}:\n{e.stderr}") + results.append( + { + "name": filename, + "status": "failed", + "output": e.stdout, + "error": e.stderr, + } + ) + + if collect: + return results + + passed_scripts = [r for r in results if r["status"] == "passed"] + failed_scripts = [r for r in results if r["status"] == "failed"] - # Summary print("\n" + "=" * 50) print("Summary:") print(f"Passed: {len(passed_scripts)}") for script in passed_scripts: - print(f" ✓ {script}") + print(f" ✓ {script['name']}") if failed_scripts: print(f"\nFailed: {len(failed_scripts)}") for script in failed_scripts: - print(f" ✗ {script}") + print(f" ✗ {script['name']}") return 1 return 0 diff --git a/tests_integration/hith_assistant/run_hith_assistant.py b/tests_integration/hith_assistant/run_hith_assistant.py index 9db1407..5ac8497 100644 --- a/tests_integration/hith_assistant/run_hith_assistant.py +++ b/tests_integration/hith_assistant/run_hith_assistant.py @@ -7,17 +7,22 @@ from pathlib import Path -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +try: + sys.stdout.reconfigure(encoding="utf-8") +except AttributeError: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", write_through=True) -def run_scripts(pass_local: bool = True) -> int: + +def run_scripts(pass_local: bool = True, collect: bool = False): """Run all example scripts in this directory. Args: pass_local: If True, skip tests with 'ollama' or 'local' in their name. + collect: If True, return per-script results without printing. Returns: - Exit code (0 for success, 1 for failure). + List of per-script results if collect is True, otherwise exit code (0 for success, 1 for failure). """ python_executable = sys.executable current_directory = Path(__file__).parent @@ -25,16 +30,26 @@ def run_scripts(pass_local: bool = True) -> int: # Find all example files example_files = sorted(current_directory.glob("*_example.py")) - passed_scripts = [] - failed_scripts = {} + results = [] for file in example_files: filename = file.name if pass_local and ("ollama" in filename or "_local" in filename): - print(f"Skipping {filename} (local test)") + message = f"Skipping {filename} (local test)" + if not collect: + print(message) + results.append( + { + "name": filename, + "status": "skipped", + "output": "", + "error": message, + } + ) continue - print(f"Running {filename}...") + if not collect: + print(f"Running {filename}...") try: result = subprocess.run( [python_executable, str(file)], @@ -43,23 +58,44 @@ def run_scripts(pass_local: bool = True) -> int: check=True, cwd=current_directory, ) - print(f"Output of {filename}:\n{result.stdout}") - passed_scripts.append(filename) + if not collect: + print(f"Output of {filename}:\n{result.stdout}") + results.append( + { + "name": filename, + "status": "passed", + "output": result.stdout, + "error": "", + } + ) except subprocess.CalledProcessError as e: - print(f"Error running {filename}:\n{e.stderr}") - failed_scripts[filename] = e.stderr + if not collect: + print(f"Error running {filename}:\n{e.stderr}") + results.append( + { + "name": filename, + "status": "failed", + "output": e.stdout, + "error": e.stderr, + } + ) + + if collect: + return results + + passed_scripts = [r for r in results if r["status"] == "passed"] + failed_scripts = [r for r in results if r["status"] == "failed"] - # Summary print("\n" + "=" * 50) print("Summary:") print(f"Passed: {len(passed_scripts)}") for script in passed_scripts: - print(f" ✓ {script}") + print(f" ✓ {script['name']}") if failed_scripts: print(f"\nFailed: {len(failed_scripts)}") for script in failed_scripts: - print(f" ✗ {script}") + print(f" ✗ {script['name']}") return 1 return 0 diff --git a/tests_integration/input_output_topics/run_input_output_topics.py b/tests_integration/input_output_topics/run_input_output_topics.py index 1e663a3..1873e53 100644 --- a/tests_integration/input_output_topics/run_input_output_topics.py +++ b/tests_integration/input_output_topics/run_input_output_topics.py @@ -7,17 +7,22 @@ from pathlib import Path -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +try: + sys.stdout.reconfigure(encoding="utf-8") +except AttributeError: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", write_through=True) -def run_scripts(pass_local: bool = True) -> int: + +def run_scripts(pass_local: bool = True, collect: bool = False): """Run all example scripts in this directory. Args: pass_local: If True, skip tests with 'ollama' or 'local' in their name. + collect: If True, return per-script results without printing. Returns: - Exit code (0 for success, 1 for failure). + List of per-script results if collect is True, otherwise exit code (0 for success, 1 for failure). """ python_executable = sys.executable current_directory = Path(__file__).parent @@ -25,16 +30,26 @@ def run_scripts(pass_local: bool = True) -> int: # Find all example files example_files = sorted(current_directory.glob("*_example.py")) - passed_scripts = [] - failed_scripts = {} + results = [] for file in example_files: filename = file.name if pass_local and ("ollama" in filename or "_local" in filename): - print(f"Skipping {filename} (local test)") + message = f"Skipping {filename} (local test)" + if not collect: + print(message) + results.append( + { + "name": filename, + "status": "skipped", + "output": "", + "error": message, + } + ) continue - print(f"Running {filename}...") + if not collect: + print(f"Running {filename}...") try: result = subprocess.run( [python_executable, str(file)], @@ -43,23 +58,44 @@ def run_scripts(pass_local: bool = True) -> int: check=True, cwd=current_directory, ) - print(f"Output of {filename}:\n{result.stdout}") - passed_scripts.append(filename) + if not collect: + print(f"Output of {filename}:\n{result.stdout}") + results.append( + { + "name": filename, + "status": "passed", + "output": result.stdout, + "error": "", + } + ) except subprocess.CalledProcessError as e: - print(f"Error running {filename}:\n{e.stderr}") - failed_scripts[filename] = e.stderr + if not collect: + print(f"Error running {filename}:\n{e.stderr}") + results.append( + { + "name": filename, + "status": "failed", + "output": e.stdout, + "error": e.stderr, + } + ) + + if collect: + return results + + passed_scripts = [r for r in results if r["status"] == "passed"] + failed_scripts = [r for r in results if r["status"] == "failed"] - # Summary print("\n" + "=" * 50) print("Summary:") print(f"Passed: {len(passed_scripts)}") for script in passed_scripts: - print(f" ✓ {script}") + print(f" ✓ {script['name']}") if failed_scripts: print(f"\nFailed: {len(failed_scripts)}") for script in failed_scripts: - print(f" ✗ {script}") + print(f" ✗ {script['name']}") return 1 return 0 diff --git a/tests_integration/invoke_kwargs/run_invoke_kwargs.py b/tests_integration/invoke_kwargs/run_invoke_kwargs.py index 3fed0d4..bbf29f6 100644 --- a/tests_integration/invoke_kwargs/run_invoke_kwargs.py +++ b/tests_integration/invoke_kwargs/run_invoke_kwargs.py @@ -7,17 +7,22 @@ from pathlib import Path -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +try: + sys.stdout.reconfigure(encoding="utf-8") +except AttributeError: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", write_through=True) -def run_scripts(pass_local: bool = True) -> int: + +def run_scripts(pass_local: bool = True, collect: bool = False): """Run all example scripts in this directory. Args: pass_local: If True, skip tests with 'ollama' or 'local' in their name. + collect: If True, return per-script results without printing. Returns: - Exit code (0 for success, 1 for failure). + List of per-script results if collect is True, otherwise exit code (0 for success, 1 for failure). """ python_executable = sys.executable current_directory = Path(__file__).parent @@ -25,16 +30,26 @@ def run_scripts(pass_local: bool = True) -> int: # Find all example files example_files = sorted(current_directory.glob("*_example.py")) - passed_scripts = [] - failed_scripts = {} + results = [] for file in example_files: filename = file.name if pass_local and ("ollama" in filename or "_local" in filename): - print(f"Skipping {filename} (local test)") + message = f"Skipping {filename} (local test)" + if not collect: + print(message) + results.append( + { + "name": filename, + "status": "skipped", + "output": "", + "error": message, + } + ) continue - print(f"Running {filename}...") + if not collect: + print(f"Running {filename}...") try: result = subprocess.run( [python_executable, str(file)], @@ -43,23 +58,44 @@ def run_scripts(pass_local: bool = True) -> int: check=True, cwd=current_directory, ) - print(f"Output of {filename}:\n{result.stdout}") - passed_scripts.append(filename) + if not collect: + print(f"Output of {filename}:\n{result.stdout}") + results.append( + { + "name": filename, + "status": "passed", + "output": result.stdout, + "error": "", + } + ) except subprocess.CalledProcessError as e: - print(f"Error running {filename}:\n{e.stderr}") - failed_scripts[filename] = e.stderr + if not collect: + print(f"Error running {filename}:\n{e.stderr}") + results.append( + { + "name": filename, + "status": "failed", + "output": e.stdout, + "error": e.stderr, + } + ) + + if collect: + return results + + passed_scripts = [r for r in results if r["status"] == "passed"] + failed_scripts = [r for r in results if r["status"] == "failed"] - # Summary print("\n" + "=" * 50) print("Summary:") print(f"Passed: {len(passed_scripts)}") for script in passed_scripts: - print(f" ✓ {script}") + print(f" ✓ {script['name']}") if failed_scripts: print(f"\nFailed: {len(failed_scripts)}") for script in failed_scripts: - print(f" ✗ {script}") + print(f" ✗ {script['name']}") return 1 return 0 diff --git a/tests_integration/mcp_assistant/run_mcp_assistant.py b/tests_integration/mcp_assistant/run_mcp_assistant.py index 8a06451..81b17c4 100644 --- a/tests_integration/mcp_assistant/run_mcp_assistant.py +++ b/tests_integration/mcp_assistant/run_mcp_assistant.py @@ -7,17 +7,22 @@ from pathlib import Path -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +try: + sys.stdout.reconfigure(encoding="utf-8") +except AttributeError: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", write_through=True) -def run_scripts(pass_local: bool = True) -> int: + +def run_scripts(pass_local: bool = True, collect: bool = False): """Run all example scripts in this directory. Args: pass_local: If True, skip tests with 'ollama' or 'local' in their name. + collect: If True, return per-script results without printing. Returns: - Exit code (0 for success, 1 for failure). + List of per-script results if collect is True, otherwise exit code (0 for success, 1 for failure). """ python_executable = sys.executable current_directory = Path(__file__).parent @@ -25,16 +30,26 @@ def run_scripts(pass_local: bool = True) -> int: # Find all example files example_files = sorted(current_directory.glob("*_example.py")) - passed_scripts = [] - failed_scripts = {} + results = [] for file in example_files: filename = file.name if pass_local and ("ollama" in filename or "_local" in filename): - print(f"Skipping {filename} (local test)") + message = f"Skipping {filename} (local test)" + if not collect: + print(message) + results.append( + { + "name": filename, + "status": "skipped", + "output": "", + "error": message, + } + ) continue - print(f"Running {filename}...") + if not collect: + print(f"Running {filename}...") try: result = subprocess.run( [python_executable, str(file)], @@ -43,23 +58,44 @@ def run_scripts(pass_local: bool = True) -> int: check=True, cwd=current_directory, ) - print(f"Output of {filename}:\n{result.stdout}") - passed_scripts.append(filename) + if not collect: + print(f"Output of {filename}:\n{result.stdout}") + results.append( + { + "name": filename, + "status": "passed", + "output": result.stdout, + "error": "", + } + ) except subprocess.CalledProcessError as e: - print(f"Error running {filename}:\n{e.stderr}") - failed_scripts[filename] = e.stderr + if not collect: + print(f"Error running {filename}:\n{e.stderr}") + results.append( + { + "name": filename, + "status": "failed", + "output": e.stdout, + "error": e.stderr, + } + ) + + if collect: + return results + + passed_scripts = [r for r in results if r["status"] == "passed"] + failed_scripts = [r for r in results if r["status"] == "failed"] - # Summary print("\n" + "=" * 50) print("Summary:") print(f"Passed: {len(passed_scripts)}") for script in passed_scripts: - print(f" ✓ {script}") + print(f" ✓ {script['name']}") if failed_scripts: print(f"\nFailed: {len(failed_scripts)}") for script in failed_scripts: - print(f" ✗ {script}") + print(f" ✗ {script['name']}") return 1 return 0 diff --git a/tests_integration/multimodal_assistant/run_multimodal_assistant.py b/tests_integration/multimodal_assistant/run_multimodal_assistant.py index cfb0830..7d83370 100644 --- a/tests_integration/multimodal_assistant/run_multimodal_assistant.py +++ b/tests_integration/multimodal_assistant/run_multimodal_assistant.py @@ -7,17 +7,22 @@ from pathlib import Path -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +try: + sys.stdout.reconfigure(encoding="utf-8") +except AttributeError: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", write_through=True) -def run_scripts(pass_local: bool = True) -> int: + +def run_scripts(pass_local: bool = True, collect: bool = False): """Run all example scripts in this directory. Args: pass_local: If True, skip tests with 'ollama' or 'local' in their name. + collect: If True, return per-script results without printing. Returns: - Exit code (0 for success, 1 for failure). + List of per-script results if collect is True, otherwise exit code (0 for success, 1 for failure). """ python_executable = sys.executable current_directory = Path(__file__).parent @@ -25,16 +30,26 @@ def run_scripts(pass_local: bool = True) -> int: # Find all example files example_files = sorted(current_directory.glob("*_example.py")) - passed_scripts = [] - failed_scripts = {} + results = [] for file in example_files: filename = file.name if pass_local and ("ollama" in filename or "_local" in filename): - print(f"Skipping {filename} (local test)") + message = f"Skipping {filename} (local test)" + if not collect: + print(message) + results.append( + { + "name": filename, + "status": "skipped", + "output": "", + "error": message, + } + ) continue - print(f"Running {filename}...") + if not collect: + print(f"Running {filename}...") try: result = subprocess.run( [python_executable, str(file)], @@ -43,23 +58,44 @@ def run_scripts(pass_local: bool = True) -> int: check=True, cwd=current_directory, ) - print(f"Output of {filename}:\n{result.stdout}") - passed_scripts.append(filename) + if not collect: + print(f"Output of {filename}:\n{result.stdout}") + results.append( + { + "name": filename, + "status": "passed", + "output": result.stdout, + "error": "", + } + ) except subprocess.CalledProcessError as e: - print(f"Error running {filename}:\n{e.stderr}") - failed_scripts[filename] = e.stderr + if not collect: + print(f"Error running {filename}:\n{e.stderr}") + results.append( + { + "name": filename, + "status": "failed", + "output": e.stdout, + "error": e.stderr, + } + ) + + if collect: + return results + + passed_scripts = [r for r in results if r["status"] == "passed"] + failed_scripts = [r for r in results if r["status"] == "failed"] - # Summary print("\n" + "=" * 50) print("Summary:") print(f"Passed: {len(passed_scripts)}") for script in passed_scripts: - print(f" ✓ {script}") + print(f" ✓ {script['name']}") if failed_scripts: print(f"\nFailed: {len(failed_scripts)}") for script in failed_scripts: - print(f" ✗ {script}") + print(f" ✗ {script['name']}") return 1 return 0 diff --git a/tests_integration/rag_assistant/run_rag_assistant.py b/tests_integration/rag_assistant/run_rag_assistant.py index 5b04b6c..d1f047b 100644 --- a/tests_integration/rag_assistant/run_rag_assistant.py +++ b/tests_integration/rag_assistant/run_rag_assistant.py @@ -7,17 +7,22 @@ from pathlib import Path -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +try: + sys.stdout.reconfigure(encoding="utf-8") +except AttributeError: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", write_through=True) -def run_scripts(pass_local: bool = True) -> int: + +def run_scripts(pass_local: bool = True, collect: bool = False): """Run all example scripts in this directory. Args: pass_local: If True, skip tests with 'ollama' or 'local' in their name. + collect: If True, return per-script results without printing. Returns: - Exit code (0 for success, 1 for failure). + List of per-script results if collect is True, otherwise exit code (0 for success, 1 for failure). """ python_executable = sys.executable current_directory = Path(__file__).parent @@ -25,16 +30,26 @@ def run_scripts(pass_local: bool = True) -> int: # Find all example files example_files = sorted(current_directory.glob("*_example.py")) - passed_scripts = [] - failed_scripts = {} + results = [] for file in example_files: filename = file.name if pass_local and ("ollama" in filename or "_local" in filename): - print(f"Skipping {filename} (local test)") + message = f"Skipping {filename} (local test)" + if not collect: + print(message) + results.append( + { + "name": filename, + "status": "skipped", + "output": "", + "error": message, + } + ) continue - print(f"Running {filename}...") + if not collect: + print(f"Running {filename}...") try: result = subprocess.run( [python_executable, str(file)], @@ -43,23 +58,44 @@ def run_scripts(pass_local: bool = True) -> int: check=True, cwd=current_directory, ) - print(f"Output of {filename}:\n{result.stdout}") - passed_scripts.append(filename) + if not collect: + print(f"Output of {filename}:\n{result.stdout}") + results.append( + { + "name": filename, + "status": "passed", + "output": result.stdout, + "error": "", + } + ) except subprocess.CalledProcessError as e: - print(f"Error running {filename}:\n{e.stderr}") - failed_scripts[filename] = e.stderr + if not collect: + print(f"Error running {filename}:\n{e.stderr}") + results.append( + { + "name": filename, + "status": "failed", + "output": e.stdout, + "error": e.stderr, + } + ) + + if collect: + return results + + passed_scripts = [r for r in results if r["status"] == "passed"] + failed_scripts = [r for r in results if r["status"] == "failed"] - # Summary print("\n" + "=" * 50) print("Summary:") print(f"Passed: {len(passed_scripts)}") for script in passed_scripts: - print(f" ✓ {script}") + print(f" ✓ {script['name']}") if failed_scripts: print(f"\nFailed: {len(failed_scripts)}") for script in failed_scripts: - print(f" ✗ {script}") + print(f" ✗ {script['name']}") return 1 return 0 diff --git a/tests_integration/react_assistant/run_react_assistant.py b/tests_integration/react_assistant/run_react_assistant.py index 2af2eda..d31af28 100644 --- a/tests_integration/react_assistant/run_react_assistant.py +++ b/tests_integration/react_assistant/run_react_assistant.py @@ -7,17 +7,22 @@ from pathlib import Path -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +try: + sys.stdout.reconfigure(encoding="utf-8") +except AttributeError: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", write_through=True) -def run_scripts(pass_local: bool = True) -> int: + +def run_scripts(pass_local: bool = True, collect: bool = False): """Run all example scripts in this directory. Args: pass_local: If True, skip tests with 'ollama' or 'local' in their name. + collect: If True, return per-script results without printing. Returns: - Exit code (0 for success, 1 for failure). + List of per-script results if collect is True, otherwise exit code (0 for success, 1 for failure). """ python_executable = sys.executable current_directory = Path(__file__).parent @@ -25,16 +30,26 @@ def run_scripts(pass_local: bool = True) -> int: # Find all example files example_files = sorted(current_directory.glob("*_example.py")) - passed_scripts = [] - failed_scripts = {} + results = [] for file in example_files: filename = file.name if pass_local and ("ollama" in filename or "_local" in filename): - print(f"Skipping {filename} (local test)") + message = f"Skipping {filename} (local test)" + if not collect: + print(message) + results.append( + { + "name": filename, + "status": "skipped", + "output": "", + "error": message, + } + ) continue - print(f"Running {filename}...") + if not collect: + print(f"Running {filename}...") try: result = subprocess.run( [python_executable, str(file)], @@ -43,23 +58,44 @@ def run_scripts(pass_local: bool = True) -> int: check=True, cwd=current_directory, ) - print(f"Output of {filename}:\n{result.stdout}") - passed_scripts.append(filename) + if not collect: + print(f"Output of {filename}:\n{result.stdout}") + results.append( + { + "name": filename, + "status": "passed", + "output": result.stdout, + "error": "", + } + ) except subprocess.CalledProcessError as e: - print(f"Error running {filename}:\n{e.stderr}") - failed_scripts[filename] = e.stderr + if not collect: + print(f"Error running {filename}:\n{e.stderr}") + results.append( + { + "name": filename, + "status": "failed", + "output": e.stdout, + "error": e.stderr, + } + ) + + if collect: + return results + + passed_scripts = [r for r in results if r["status"] == "passed"] + failed_scripts = [r for r in results if r["status"] == "failed"] - # Summary print("\n" + "=" * 50) print("Summary:") print(f"Passed: {len(passed_scripts)}") for script in passed_scripts: - print(f" ✓ {script}") + print(f" ✓ {script['name']}") if failed_scripts: print(f"\nFailed: {len(failed_scripts)}") for script in failed_scripts: - print(f" ✗ {script}") + print(f" ✗ {script['name']}") return 1 return 0 diff --git a/tests_integration/run_all.py b/tests_integration/run_all.py index 9dcdee5..6a821c5 100644 --- a/tests_integration/run_all.py +++ b/tests_integration/run_all.py @@ -2,13 +2,29 @@ """Run all integration tests by executing run_*.py scripts in each subfolder.""" import argparse +import importlib.util import io -import subprocess import sys from pathlib import Path +from textwrap import indent -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +try: + sys.stdout.reconfigure(encoding="utf-8") +except AttributeError: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", write_through=True) + + + +def _load_runner_module(script: Path): + """Load a run_*.py file as a module so we can call run_scripts directly.""" + module_name = f"tests_integration.{script.parent.name}.{script.stem}_runner" + spec = importlib.util.spec_from_file_location(module_name, script) + if spec is None or spec.loader is None: + raise ImportError(f"Unable to load spec for {script}") + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module def run_all_scripts(pass_local: bool = True) -> int: @@ -23,12 +39,14 @@ def run_all_scripts(pass_local: bool = True) -> int: """ python_executable = sys.executable current_directory = Path(__file__).parent + repo_root = current_directory.parent # Find all run_*.py scripts in subdirectories run_scripts = sorted(current_directory.glob("*/run_*.py")) - passed_folders = [] - failed_folders = {} + passed_examples = [] + failed_examples = {} + skipped_examples = [] print(f"Found {len(run_scripts)} test runners:") for script in run_scripts: @@ -42,37 +60,84 @@ def run_all_scripts(pass_local: bool = True) -> int: print(f"Running tests in: {folder_name}") print(f"{'=' * 60}") - cmd = [python_executable, str(script)] - if not pass_local: - cmd.append("--no-pass-local") - try: - result = subprocess.run( - cmd, - capture_output=True, - text=True, - check=True, - cwd=script.parent, - ) - print(result.stdout) - passed_folders.append(folder_name) - except subprocess.CalledProcessError as e: - print(f"Output:\n{e.stdout}") - print(f"Error:\n{e.stderr}") - failed_folders[folder_name] = e.stderr + runner_module = _load_runner_module(script) + runner_results = runner_module.run_scripts(pass_local=pass_local, collect=True) + except Exception as exc: # noqa: BLE001 + example_rel = script.relative_to(repo_root) + error_message = f"Runner failed before executing examples: {exc}" + print(f" ✗ {example_rel}") + print(f" Error: {error_message}") + failed_examples[example_rel] = { + "error": error_message, + "output": "", + "rerun_cmd": f"{python_executable} {example_rel}", + } + continue + + if not isinstance(runner_results, list): + example_rel = script.relative_to(repo_root) + error_message = "Runner did not return result details." + print(f" ✗ {example_rel}") + print(f" Error: {error_message}") + failed_examples[example_rel] = { + "error": error_message, + "output": "", + "rerun_cmd": f"{python_executable} {example_rel}", + } + continue + + for result in runner_results: + example_rel = (script.parent / result["name"]).relative_to(repo_root) + status = result.get("status", "unknown") + output = result.get("output", "").rstrip() + error = result.get("error", "").rstrip() + + if status == "passed": + print(f" ✓ {example_rel}") + if output: + print(indent(output, " ")) + passed_examples.append(example_rel) + elif status == "failed": + print(f" ✗ {example_rel}") + if output: + print(" Output:") + print(indent(output, " ")) + if error: + print(" Error:") + print(indent(error, " ")) + rerun_cmd = f"{python_executable} {example_rel}" + print(f" Rerun with: {rerun_cmd}") + failed_examples[example_rel] = { + "error": error, + "output": output, + "rerun_cmd": rerun_cmd, + } + else: + print(f" - {example_rel} (skipped)") + if error: + print(f" Reason: {error}") + skipped_examples.append(example_rel) # Summary print("\n" + "=" * 60) print("FINAL SUMMARY") print("=" * 60) - print(f"\nPassed folders: {len(passed_folders)}") - for folder in passed_folders: - print(f" ✓ {folder}") - - if failed_folders: - print(f"\nFailed folders: {len(failed_folders)}") - for folder in failed_folders: - print(f" ✗ {folder}") + print(f"\nPassed examples: {len(passed_examples)}") + for example in passed_examples: + print(f" ✓ {example}") + + if skipped_examples: + print(f"\nSkipped examples: {len(skipped_examples)}") + for example in skipped_examples: + print(f" - {example}") + + if failed_examples: + print(f"\nFailed examples: {len(failed_examples)}") + for example, data in failed_examples.items(): + print(f" ✗ {example}") + if data.get("rerun_cmd"): + print(f" Rerun with: {data['rerun_cmd']}") return 1 print("\nAll integration tests passed!") diff --git a/tests_integration/simple_llm_assistant/run_simple_llm_assistant.py b/tests_integration/simple_llm_assistant/run_simple_llm_assistant.py index f51edd2..14a27ea 100644 --- a/tests_integration/simple_llm_assistant/run_simple_llm_assistant.py +++ b/tests_integration/simple_llm_assistant/run_simple_llm_assistant.py @@ -7,17 +7,22 @@ from pathlib import Path -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +try: + sys.stdout.reconfigure(encoding="utf-8") +except AttributeError: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", write_through=True) -def run_scripts(pass_local: bool = True) -> int: + +def run_scripts(pass_local: bool = True, collect: bool = False): """Run all example scripts in this directory. Args: pass_local: If True, skip tests with 'ollama' or 'local' in their name. + collect: If True, return per-script results without printing. Returns: - Exit code (0 for success, 1 for failure). + List of per-script results if collect is True, otherwise exit code (0 for success, 1 for failure). """ python_executable = sys.executable current_directory = Path(__file__).parent @@ -25,16 +30,26 @@ def run_scripts(pass_local: bool = True) -> int: # Find all example files example_files = sorted(current_directory.glob("*_example.py")) - passed_scripts = [] - failed_scripts = {} + results = [] for file in example_files: filename = file.name if pass_local and ("ollama" in filename or "_local" in filename): - print(f"Skipping {filename} (local test)") + message = f"Skipping {filename} (local test)" + if not collect: + print(message) + results.append( + { + "name": filename, + "status": "skipped", + "output": "", + "error": message, + } + ) continue - print(f"Running {filename}...") + if not collect: + print(f"Running {filename}...") try: result = subprocess.run( [python_executable, str(file)], @@ -43,23 +58,44 @@ def run_scripts(pass_local: bool = True) -> int: check=True, cwd=current_directory, ) - print(f"Output of {filename}:\n{result.stdout}") - passed_scripts.append(filename) + if not collect: + print(f"Output of {filename}:\n{result.stdout}") + results.append( + { + "name": filename, + "status": "passed", + "output": result.stdout, + "error": "", + } + ) except subprocess.CalledProcessError as e: - print(f"Error running {filename}:\n{e.stderr}") - failed_scripts[filename] = e.stderr + if not collect: + print(f"Error running {filename}:\n{e.stderr}") + results.append( + { + "name": filename, + "status": "failed", + "output": e.stdout, + "error": e.stderr, + } + ) + + if collect: + return results + + passed_scripts = [r for r in results if r["status"] == "passed"] + failed_scripts = [r for r in results if r["status"] == "failed"] - # Summary print("\n" + "=" * 50) print("Summary:") print(f"Passed: {len(passed_scripts)}") for script in passed_scripts: - print(f" ✓ {script}") + print(f" ✓ {script['name']}") if failed_scripts: print(f"\nFailed: {len(failed_scripts)}") for script in failed_scripts: - print(f" ✗ {script}") + print(f" ✗ {script['name']}") return 1 return 0 diff --git a/tests_integration/simple_stream_assistant/run_simple_stream_assistant.py b/tests_integration/simple_stream_assistant/run_simple_stream_assistant.py index 16c23df..3b71737 100644 --- a/tests_integration/simple_stream_assistant/run_simple_stream_assistant.py +++ b/tests_integration/simple_stream_assistant/run_simple_stream_assistant.py @@ -7,17 +7,22 @@ from pathlib import Path -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +try: + sys.stdout.reconfigure(encoding="utf-8") +except AttributeError: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", write_through=True) -def run_scripts(pass_local: bool = True) -> int: + +def run_scripts(pass_local: bool = True, collect: bool = False): """Run all example scripts in this directory. Args: pass_local: If True, skip tests with 'ollama' or 'local' in their name. + collect: If True, return per-script results without printing. Returns: - Exit code (0 for success, 1 for failure). + List of per-script results if collect is True, otherwise exit code (0 for success, 1 for failure). """ python_executable = sys.executable current_directory = Path(__file__).parent @@ -25,16 +30,26 @@ def run_scripts(pass_local: bool = True) -> int: # Find all example files example_files = sorted(current_directory.glob("*_example.py")) - passed_scripts = [] - failed_scripts = {} + results = [] for file in example_files: filename = file.name if pass_local and ("ollama" in filename or "_local" in filename): - print(f"Skipping {filename} (local test)") + message = f"Skipping {filename} (local test)" + if not collect: + print(message) + results.append( + { + "name": filename, + "status": "skipped", + "output": "", + "error": message, + } + ) continue - print(f"Running {filename}...") + if not collect: + print(f"Running {filename}...") try: result = subprocess.run( [python_executable, str(file)], @@ -43,23 +58,44 @@ def run_scripts(pass_local: bool = True) -> int: check=True, cwd=current_directory, ) - print(f"Output of {filename}:\n{result.stdout}") - passed_scripts.append(filename) + if not collect: + print(f"Output of {filename}:\n{result.stdout}") + results.append( + { + "name": filename, + "status": "passed", + "output": result.stdout, + "error": "", + } + ) except subprocess.CalledProcessError as e: - print(f"Error running {filename}:\n{e.stderr}") - failed_scripts[filename] = e.stderr + if not collect: + print(f"Error running {filename}:\n{e.stderr}") + results.append( + { + "name": filename, + "status": "failed", + "output": e.stdout, + "error": e.stderr, + } + ) + + if collect: + return results + + passed_scripts = [r for r in results if r["status"] == "passed"] + failed_scripts = [r for r in results if r["status"] == "failed"] - # Summary print("\n" + "=" * 50) print("Summary:") print(f"Passed: {len(passed_scripts)}") for script in passed_scripts: - print(f" ✓ {script}") + print(f" ✓ {script['name']}") if failed_scripts: print(f"\nFailed: {len(failed_scripts)}") for script in failed_scripts: - print(f" ✗ {script}") + print(f" ✗ {script['name']}") return 1 return 0 diff --git a/uv.lock b/uv.lock index 0cc3438..f8079d1 100644 --- a/uv.lock +++ b/uv.lock @@ -1283,7 +1283,7 @@ wheels = [ [[package]] name = "grafi" -version = "0.0.33" +version = "0.0.34" source = { editable = "." } dependencies = [ { name = "anyio" },