diff --git a/packages/prime/src/prime_cli/commands/env.py b/packages/prime/src/prime_cli/commands/env.py index 9ed18e84..9612b6d3 100644 --- a/packages/prime/src/prime_cli/commands/env.py +++ b/packages/prime/src/prime_cli/commands/env.py @@ -1,3 +1,4 @@ +import asyncio import hashlib import json import os @@ -28,6 +29,11 @@ from ..utils.env_metadata import find_environment_metadata from ..utils.eval_push import push_eval_results_to_hub from ..utils.formatters import format_file_size +from ..utils.hosted_eval import ( + HostedEvalConfig, + print_hosted_result, + run_hosted_evaluation, +) app = typer.Typer(help="Manage verifiers environments", no_args_is_help=True) console = Console() @@ -2076,9 +2082,17 @@ def run_eval( env_path: Optional[str], endpoints_path: Optional[str] = None, headers: Optional[List[str]] = None, + hosted: bool = False, + poll_interval: float = 10.0, + no_stream_logs: bool = False, + timeout_minutes: Optional[int] = None, + allow_sandbox_access: bool = False, + allow_instances_access: bool = False, + custom_secrets: Optional[str] = None, + eval_name: Optional[str] = None, ) -> None: """ - Run verifiers' vf-eval with Prime Inference + Run verifiers' vf-eval with Prime Inference (local) or as a hosted evaluation on the platform. """ is_slug = ( "/" in environment and not environment.startswith("./") and not environment.startswith("/") @@ -2087,10 +2101,11 @@ def run_eval( upstream_owner = None upstream_name = None env_name_for_vf_eval = environment + environment_id = None + requested_version = "latest" if is_slug: env_slug = environment - requested_version = "latest" if "@" in environment: env_slug, requested_version = environment.rsplit("@", 1) @@ -2098,20 +2113,165 @@ def run_eval( if len(parts) == 2 and parts[0] and parts[1]: upstream_owner, upstream_name = parts env_name_for_vf_eval = upstream_name + else: + console.print(f"[red]Invalid environment slug format: {environment}[/red]") + raise typer.Exit(1) + + if hosted: + if not is_slug or not upstream_owner or not upstream_name: + metadata = find_environment_metadata( + env_name=environment, + env_path=Path(env_path) if env_path else None, + ) + console.print( - f"[dim]Using upstream environment {upstream_owner}/{upstream_name}[/dim]\n" + "[red]Error: Hosted evaluations require environment slug (owner/name).[/red]" ) - if not _is_environment_installed(upstream_name, requested_version): - console.print(f"[cyan]Installing {environment}...[/cyan]") - if not _install_single_environment(environment): - raise typer.Exit(1) + if metadata and metadata.get("owner") and metadata.get("name"): + suggested_slug = f"{metadata['owner']}/{metadata['name']}" + console.print("[yellow]Tip:[/yellow] Found local environment metadata.") + console.print(f"[dim]Try:[/dim] prime eval {suggested_slug} --hosted") + else: + console.print( + f"[dim]Example: prime eval primeintellect/{environment} --hosted[/dim]" + ) + + raise typer.Exit(1) + + client = APIClient(require_auth=False) + try: + env_details = fetch_environment_details( + client, upstream_owner, upstream_name, requested_version + ) + environment_id = env_details.get("id") + except APIError as e: + console.print(f"[red]Error: Environment '{environment}' not found on the hub.[/red]") + console.print(f"[dim]{e}[/dim]") + console.print() + + metadata = find_environment_metadata( + env_name=upstream_name, env_path=Path(env_path) if env_path else None + ) + + if metadata and metadata.get("owner") == upstream_owner: + console.print( + "[yellow]Found local environment that hasn't been pushed yet.[/yellow]" + ) console.print() - is_resolved = True - else: - console.print(f"[red]Invalid environment slug format: {environment}[/red]") + should_push = typer.confirm( + "Would you like to push this environment to the hub now?", default=True + ) + + if should_push: + console.print() + console.print("[cyan]Pushing environment to hub...[/cyan]") + + env_dir = env_path if env_path else Path.cwd() + result = subprocess.run( + ["prime", "env", "push"], cwd=env_dir, capture_output=False, text=True + ) + + if result.returncode != 0: + console.print("[red]Failed to push environment.[/red]") + raise typer.Exit(1) + + console.print() + console.print("[green]✓ Environment pushed successfully![/green]") + console.print("[cyan]Continuing with hosted evaluation...[/cyan]") + console.print() + + try: + env_details = fetch_environment_details( + client, upstream_owner, upstream_name, requested_version + ) + environment_id = env_details.get("id") + except APIError as e2: + console.print( + f"[red]Error: Still couldn't find environment after push: {e2}[/red]" + ) + raise typer.Exit(1) + else: + console.print() + console.print("[yellow]Cancelled. To push manually, run:[/yellow]") + console.print(" prime env push") + raise typer.Exit(1) + else: + console.print("[dim]To publish your environment, run:[/dim]") + console.print(" prime env push") + raise typer.Exit(1) + + if not environment_id: + console.print(f"[red]Error: Could not get environment ID for '{environment}'[/red]") raise typer.Exit(1) + + console.print(f"[dim]Using environment {upstream_owner}/{upstream_name}[/dim]\n") + + # Parse env_args JSON if provided + parsed_env_args = None + if env_args: + try: + parsed_env_args = json.loads(env_args) + except json.JSONDecodeError as e: + console.print(f"[red]Error parsing --env-args: {e}[/red]") + raise typer.Exit(1) + + # Parse custom_secrets JSON if provided + parsed_custom_secrets = None + if custom_secrets: + try: + parsed_custom_secrets = json.loads(custom_secrets) + except json.JSONDecodeError as e: + console.print(f"[red]Error parsing --custom-secrets: {e}[/red]") + raise typer.Exit(1) + + # Create hosted eval config + hosted_config = HostedEvalConfig( + environment_id=environment_id, + inference_model=model, + num_examples=num_examples if num_examples is not None else 5, + rollouts_per_example=rollouts_per_example if rollouts_per_example is not None else 3, + env_args=parsed_env_args, + name=eval_name, + timeout_minutes=timeout_minutes, + allow_sandbox_access=allow_sandbox_access, + allow_instances_access=allow_instances_access, + custom_secrets=parsed_custom_secrets, + ) + + try: + result = asyncio.run( + run_hosted_evaluation( + config=hosted_config, + poll_interval=poll_interval, + stream_logs=not no_stream_logs, + ) + ) + print_hosted_result(result) + + if result.status != "COMPLETED": + raise typer.Exit(1) + except APIError as e: + console.print(f"[red]Hosted evaluation failed: {e}[/red]") + raise typer.Exit(1) + + return + + if is_slug: + console.print(f"[dim]Using upstream environment {upstream_owner}/{upstream_name}[/dim]\n") + + requested_version = "latest" + if "@" in environment: + _, requested_version = environment.rsplit("@", 1) + + if not _is_environment_installed(env_name_for_vf_eval, requested_version): + console.print(f"[cyan]Installing {environment}...[/cyan]") + if not _install_single_environment(environment): + raise typer.Exit(1) + console.print() + + is_resolved = True else: check_path = Path(env_path) if env_path else Path.cwd() is_resolved = display_upstream_environment_info( @@ -2392,6 +2552,11 @@ def eval_env( "(used to locate .prime/.env-metadata.json for upstream resolution)" ), ), + hosted: bool = typer.Option( + False, + "--hosted", + help="Run evaluation on the Prime Intellect platform instead of locally", + ), ) -> None: """Use 'prime eval' instead.""" @@ -2423,4 +2588,5 @@ def eval_env( env_path=env_path, endpoints_path=None, headers=None, + hosted=hosted, ) diff --git a/packages/prime/src/prime_cli/commands/evals.py b/packages/prime/src/prime_cli/commands/evals.py index 47202720..5512b5ac 100644 --- a/packages/prime/src/prime_cli/commands/evals.py +++ b/packages/prime/src/prime_cli/commands/evals.py @@ -641,13 +641,54 @@ def run_eval_cmd( "--header", help="Extra HTTP header for inference API ('Name: Value'). Repeatable.", ), + hosted: bool = typer.Option( + False, + "--hosted", + help="Run evaluation on the platform instead of locally", + ), + poll_interval: float = typer.Option( + 10.0, + "--poll-interval", + help="Polling interval in seconds for hosted evaluation status", + ), + no_stream_logs: bool = typer.Option( + False, + "--no-stream-logs", + help="Disable log streaming for hosted evaluations", + ), + timeout_minutes: Optional[int] = typer.Option( + None, + "--timeout-minutes", + help="Timeout in minutes for hosted evaluation (default: 120, max: 1440)", + ), + allow_sandbox_access: bool = typer.Option( + False, + "--allow-sandbox-access", + help="Allow sandbox read/write access for hosted evaluations", + ), + allow_instances_access: bool = typer.Option( + False, + "--allow-instances-access", + help="Allow pod/instance creation and management for hosted evaluations", + ), + custom_secrets: Optional[str] = typer.Option( + None, + "--custom-secrets", + help='Custom secrets for hosted eval as JSON (e.g., \'{"API_KEY": "xxx"}\')', + ), + eval_name: Optional[str] = typer.Option( + None, + "--eval-name", + help="Custom name for the hosted evaluation", + ), ) -> None: """ - Run verifiers' vf-eval with Prime Inference. + Run verifiers' vf-eval with Prime Inference (local) or on the platform (--hosted). Examples: prime eval run primeintellect/wordle -m openai/gpt-4.1-mini -n 5 prime eval run wordle -m openai/gpt-4.1-mini -n 2 -r 3 -t 1024 -T 0.7 + prime eval run primeintellect/gsm8k --hosted -m openai/gpt-4.1-mini -n 10 """ run_eval( environment=environment, @@ -677,4 +718,12 @@ def run_eval_cmd( env_path=env_path, endpoints_path=endpoints_path, headers=header, + hosted=hosted, + poll_interval=poll_interval, + no_stream_logs=no_stream_logs, + timeout_minutes=timeout_minutes, + allow_sandbox_access=allow_sandbox_access, + allow_instances_access=allow_instances_access, + custom_secrets=custom_secrets, + eval_name=eval_name, ) diff --git a/packages/prime/src/prime_cli/utils/hosted_eval.py b/packages/prime/src/prime_cli/utils/hosted_eval.py new file mode 100644 index 00000000..a7938312 --- /dev/null +++ b/packages/prime/src/prime_cli/utils/hosted_eval.py @@ -0,0 +1,293 @@ +import asyncio +import re +from dataclasses import dataclass +from typing import Any, Optional + +from rich.console import Console +from rich.live import Live +from rich.panel import Panel +from rich.text import Text + +from prime_cli.core import APIError, AsyncAPIClient + +console = Console() + +ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") +PROGRESS_BAR = re.compile(r".*\|[█▏▎▍▌▋▊▉ ]{10,}\|.*") + + +def strip_ansi(text: str) -> str: + return ANSI_ESCAPE.sub("", text) + + +def filter_progress_bars(text: str) -> str: + lines = text.splitlines() + filtered = [] + for line in lines: + if PROGRESS_BAR.search(line) or re.search(r"\d+%\|", line): + if "100%" in line: + match = re.search(r"([^|]*100%\|[█▏▎▍▌▋▊▉ ]+\|[^\n]*?)(?=\d+%\||$)", line) + if match: + filtered.append(match.group(1).strip()) + else: + filtered.append(line) + continue + if line.strip(): + filtered.append(line) + return "\n".join(filtered) + + +STATUS_MESSAGES = { + "Waiting for container to start...", + "No logs available", + "Unable to retrieve logs", + "Failed to fetch logs from sandbox", +} + + +def is_status_message(text: str) -> bool: + stripped = text.strip() + return any(stripped.startswith(msg) for msg in STATUS_MESSAGES) + + +def clean_logs(text: str) -> str: + cleaned = filter_progress_bars(strip_ansi(text)) + if is_status_message(cleaned): + return "" + return cleaned + + +@dataclass +class HostedEvalConfig: + environment_id: str + inference_model: str + num_examples: int + rollouts_per_example: int + env_args: Optional[dict[str, str]] = None + name: Optional[str] = None + timeout_minutes: Optional[int] = None + allow_sandbox_access: bool = False + allow_instances_access: bool = False + custom_secrets: Optional[dict[str, str]] = None + + +@dataclass +class HostedEvalResult: + evaluation_id: str + status: str + viewer_url: Optional[str] + total_samples: int + avg_score: Optional[float] + min_score: Optional[float] + max_score: Optional[float] + error_message: Optional[str] = None + logs: Optional[str] = None + + +async def create_hosted_evaluation( + client: AsyncAPIClient, + config: HostedEvalConfig, +) -> dict[str, Any]: + eval_config: dict[str, Any] = { + "num_examples": config.num_examples, + "rollouts_per_example": config.rollouts_per_example, + "allow_sandbox_access": config.allow_sandbox_access, + "allow_instances_access": config.allow_instances_access, + } + + if config.env_args: + eval_config["env_args"] = config.env_args + + if config.timeout_minutes: + eval_config["timeout_minutes"] = config.timeout_minutes + + if config.custom_secrets: + eval_config["custom_secrets"] = config.custom_secrets + + payload: dict[str, Any] = { + "environment_ids": [config.environment_id], + "inference_model": config.inference_model, + "eval_config": eval_config, + } + + if config.name: + payload["name"] = config.name + + return await client.post("/hosted-evaluations", json=payload) + + +async def get_evaluation(client: AsyncAPIClient, evaluation_id: str) -> dict[str, Any]: + return await client.get(f"/evaluations/{evaluation_id}") + + +async def get_evaluation_logs(client: AsyncAPIClient, evaluation_id: str) -> str: + try: + response = await client.get(f"/hosted-evaluations/{evaluation_id}/logs") + return response.get("logs") or "" + except APIError: + return "" + + +async def run_hosted_evaluation( + config: HostedEvalConfig, + poll_interval: float = 10.0, + stream_logs: bool = True, +) -> HostedEvalResult: + async with AsyncAPIClient() as client: + console.print( + f"[cyan]Creating hosted evaluation for environment {config.environment_id}[/cyan]" + ) + console.print(f"[dim]Model: {config.inference_model}[/dim]") + console.print( + f"[dim]Configuration: num_examples={config.num_examples}, " + f"rollouts_per_example={config.rollouts_per_example}[/dim]" + ) + console.print() + + result = await create_hosted_evaluation(client, config) + evaluation_id = result.get("evaluation_id") + + if not evaluation_id: + raise APIError(f"Failed to get evaluation ID from response. Response: {result}") + + console.print(f"[green]✓ Created hosted evaluation:[/green] {evaluation_id}") + console.print() + + last_logs = "" + terminal_statuses = {"COMPLETED", "FAILED", "TIMEOUT", "CANCELLED"} + consecutive_errors = 0 + + with Live( + Panel( + Text.assemble( + "[cyan]⠋[/cyan]", + " Waiting for evaluation to start...", + ), + title="[bold]Hosted Evaluation[/bold]", + border_style="blue", + ), + refresh_per_second=4, + console=console, + ) as live: + first_poll = True + while True: + if not first_poll: + await asyncio.sleep(poll_interval) + first_poll = False + + try: + eval_data = await get_evaluation(client, evaluation_id) + status = eval_data.get("status", "UNKNOWN") + consecutive_errors = 0 + + status_color = { + "PENDING": "yellow", + "RUNNING": "cyan", + "COMPLETED": "green", + "FAILED": "red", + "TIMEOUT": "red", + "CANCELLED": "yellow", + }.get(status, "white") + + total_samples = eval_data.get("total_samples", 0) + status_text = Text.assemble( + "Status: ", + (status, status_color), + f" | Samples: {total_samples}", + ) + live.update( + Panel( + status_text, + title="[bold]Hosted Evaluation[/bold]", + border_style="blue", + ) + ) + + if stream_logs and status in ("RUNNING", "COMPLETED", "FAILED"): + raw_logs = await get_evaluation_logs(client, evaluation_id) + logs = clean_logs(raw_logs) if raw_logs else "" + + if logs and logs != last_logs: + old_lines = last_logs.splitlines() if last_logs else [] + new_lines = logs.splitlines() + + # Calculate new lines to print (avoid duplicates) + if not last_logs: + lines_to_print = new_lines + else: + overlap = 0 + max_overlap = min(len(old_lines), len(new_lines)) + for i in range(1, max_overlap + 1): + if old_lines[-i:] == new_lines[:i]: + overlap = i + lines_to_print = new_lines[overlap:] + + # Print new lines using console.out which doesn't interfere with Live + if lines_to_print: + for line in lines_to_print: + live.console.print(line) + + last_logs = logs + + if status in terminal_statuses: + live.stop() + break + + except APIError as e: + consecutive_errors += 1 + if "429" in str(e): + if consecutive_errors >= 3: + live.console.print("[yellow]Rate limited. Waiting 30s...[/yellow]") + await asyncio.sleep(30) + else: + await asyncio.sleep(10) + continue + raise + + eval_data = await get_evaluation(client, evaluation_id) + final_logs = await get_evaluation_logs(client, evaluation_id) + + return HostedEvalResult( + evaluation_id=evaluation_id, + status=eval_data.get("status", "UNKNOWN"), + viewer_url=eval_data.get("viewer_url"), + total_samples=eval_data.get("total_samples", 0), + avg_score=eval_data.get("avg_score"), + min_score=eval_data.get("min_score"), + max_score=eval_data.get("max_score"), + error_message=eval_data.get("error_message"), + logs=final_logs, + ) + + +def print_hosted_result(result: HostedEvalResult) -> None: + console.print() + console.rule("[bold]Hosted Evaluation Results[/bold]") + console.print() + console.print(f"[cyan]Evaluation ID:[/cyan] {result.evaluation_id}") + + status_color = { + "COMPLETED": "green", + "FAILED": "red", + "TIMEOUT": "red", + "CANCELLED": "yellow", + }.get(result.status, "white") + console.print(f"[cyan]Status:[/cyan] [{status_color}]{result.status}[/{status_color}]") + console.print(f"[cyan]Total samples:[/cyan] {result.total_samples}") + + if result.avg_score is not None: + console.print(f"[cyan]Average score:[/cyan] {result.avg_score:.4f}") + if result.min_score is not None: + console.print(f"[cyan]Min score:[/cyan] {result.min_score:.4f}") + if result.max_score is not None: + console.print(f"[cyan]Max score:[/cyan] {result.max_score:.4f}") + + console.print() + + if result.viewer_url: + console.print(f"[bold green]View results:[/bold green] {result.viewer_url}") + + if result.error_message: + console.print(f"\n[red]Error:[/red] {result.error_message}") + + console.print() diff --git a/packages/prime/tests/test_hosted_eval.py b/packages/prime/tests/test_hosted_eval.py new file mode 100644 index 00000000..1ce1ec87 --- /dev/null +++ b/packages/prime/tests/test_hosted_eval.py @@ -0,0 +1,208 @@ +from prime_cli.utils.hosted_eval import clean_logs, filter_progress_bars, strip_ansi + + +class TestLogCleaning: + """Test log cleaning utilities""" + + def test_strip_ansi_basic(self): + """Test stripping basic ANSI escape codes""" + text = "\x1b[31mRed text\x1b[0m" + assert strip_ansi(text) == "Red text" + + def test_strip_ansi_multiple_codes(self): + """Test stripping multiple ANSI codes""" + text = "\x1b[1m\x1b[32mBold green\x1b[0m\x1b[0m text" + assert strip_ansi(text) == "Bold green text" + + def test_strip_ansi_no_codes(self): + """Test text without ANSI codes remains unchanged""" + text = "Plain text" + assert strip_ansi(text) == "Plain text" + + def test_strip_ansi_empty(self): + """Test empty string""" + assert strip_ansi("") == "" + + def test_filter_progress_bars_100_percent(self): + """Test that 100% progress bars are kept""" + text = "Progress: 100%|██████████| 10/10 [00:01<00:00]" + result = filter_progress_bars(text) + assert "100%" in result + + def test_filter_progress_bars_partial(self): + """Test that partial progress bars are filtered out""" + text = "Progress: 50%|█████ | 5/10 [00:01<00:01]" + result = filter_progress_bars(text) + assert result == "" + + def test_filter_progress_bars_mixed(self): + """Test mixed content with progress bars""" + text = """Starting evaluation +Progress: 50%|█████ | 5/10 [00:01<00:01] +Progress: 100%|██████████| 10/10 [00:02<00:00] +Evaluation complete""" + result = filter_progress_bars(text) + assert "Starting evaluation" in result + assert "Evaluation complete" in result + assert "100%" in result + assert "50%" not in result + + def test_filter_progress_bars_preserves_regular_lines(self): + """Test that regular log lines are preserved""" + text = """Model loaded successfully +Processing batch 1 +Result: accuracy=0.95""" + result = filter_progress_bars(text) + lines = result.splitlines() + assert len(lines) == 3 + assert "Model loaded successfully" in result + assert "Processing batch 1" in result + assert "Result: accuracy=0.95" in result + + def test_clean_logs_combined(self): + """Test combined cleaning of ANSI codes and progress bars""" + text = """\x1b[32mStarting evaluation\x1b[0m +Progress: 50%|█████ | 5/10 [00:01<00:01] +\x1b[1mProgress: 100%|██████████| 10/10 [00:02<00:00]\x1b[0m +\x1b[32m✓ Evaluation complete\x1b[0m""" + result = clean_logs(text) + assert "Starting evaluation" in result + assert "✓ Evaluation complete" in result + assert "100%" in result + assert "50%" not in result + assert "\x1b" not in result + + def test_clean_logs_empty(self): + """Test clean_logs with empty string""" + assert clean_logs("") == "" + + def test_clean_logs_multiline_with_empty_lines(self): + """Test that empty lines are filtered out""" + text = """Line 1 + +Line 3 + +Line 5""" + result = clean_logs(text) + lines = result.splitlines() + assert len(lines) == 3 + assert lines[0] == "Line 1" + assert lines[1] == "Line 3" + assert lines[2] == "Line 5" + + +class TestLogStreaming: + """Test log streaming logic""" + + def test_line_comparison_first_logs(self): + """Test printing all lines when no previous logs exist""" + last_logs = "" + new_logs = """Line 1 +Line 2 +Line 3""" + + new_lines = new_logs.splitlines() + + if not last_logs: + # Should print all lines + assert len(new_lines) == 3 + assert new_lines == ["Line 1", "Line 2", "Line 3"] + + def test_line_comparison_new_lines(self): + """Test printing only new lines when logs grow""" + last_logs = """Line 1 +Line 2 +Line 3""" + new_logs = """Line 1 +Line 2 +Line 3 +Line 4 +Line 5""" + + old_lines = last_logs.splitlines() + new_lines = new_logs.splitlines() + + # Find overlap + overlap = 0 + max_overlap = min(len(old_lines), len(new_lines)) + for i in range(1, max_overlap + 1): + if old_lines[-i:] == new_lines[:i]: + overlap = i + + # Should only print new lines + new_content = new_lines[overlap:] + assert new_content == ["Line 4", "Line 5"] + + def test_line_comparison_no_new_lines(self): + """Test no output when logs haven't changed""" + last_logs = """Line 1 +Line 2 +Line 3""" + new_logs = """Line 1 +Line 2 +Line 3""" + + # Logs are identical, no new lines to print + assert last_logs == new_logs + + def test_line_comparison_with_overlap(self): + """Test finding overlap between old and new logs""" + last_logs = """Line 1 +Line 2 +Line 3""" + new_logs = """Line 2 +Line 3 +Line 4 +Line 5""" + + old_lines = last_logs.splitlines() + new_lines = new_logs.splitlines() + + # Find overlap + overlap = 0 + max_overlap = min(len(old_lines), len(new_lines)) + for i in range(1, max_overlap + 1): + if old_lines[-i:] == new_lines[:i]: + overlap = i + + # Last 2 lines of old match first 2 lines of new + # So we should print lines after the overlap + new_content = new_lines[overlap:] + assert overlap == 2 + assert new_content == ["Line 4", "Line 5"] + + +class TestProgressBarPatterns: + """Test various progress bar patterns from different tools""" + + def test_tqdm_progress_bar(self): + """Test tqdm-style progress bar detection""" + text = "100%|██████████| 100/100 [00:10<00:00, 10.00it/s]" + result = filter_progress_bars(text) + assert "100%" in result + + def test_rich_progress_bar(self): + """Test rich-style progress indicators""" + text = "Processing ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100%" + # Should be filtered if it contains percentage and bar characters + # but our current implementation focuses on tqdm-style bars + # This test documents current behavior + result = filter_progress_bars(text) + # Rich bars with unicode chars should pass through if not matching tqdm pattern + assert len(result) > 0 + + def test_multiple_progress_updates(self): + """Test multiple progress updates where only 100% is kept""" + text = """Task started +25%|██▌ | 25/100 [00:02<00:06, 10.00it/s] +50%|█████ | 50/100 [00:05<00:05, 10.00it/s] +75%|███████▌ | 75/100 [00:07<00:02, 10.00it/s] +100%|██████████| 100/100 [00:10<00:00, 10.00it/s] +Task completed""" + result = filter_progress_bars(text) + assert "Task started" in result + assert "Task completed" in result + assert "100%" in result + assert "25%" not in result + assert "50%" not in result + assert "75%" not in result