From 76370aa2eda90743a7e7bafc17427c63141cbab0 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 30 Oct 2025 02:56:43 +0000 Subject: [PATCH 1/2] Add trajectory condensation utility This utility applies OpenHands SDK context condensation to SFT trajectories. Features: - Imports openhands-sdk as a library - Initializes context condenser (LLMSummarizingCondenser or MockCondenser) - Applies condensation at appropriate timing based on max_size threshold - Splits trajectories when condensation occurs (prefix changes) - Outputs condensed trajectories in same SFT format Input: N trajectories Output: N*M trajectories (M = average condensations + 1) Co-authored-by: openhands --- scripts/README_condense_trajectories.md | 127 +++++++++ scripts/condense_trajectories.py | 349 ++++++++++++++++++++++++ scripts/mock_condenser.py | 37 +++ 3 files changed, 513 insertions(+) create mode 100644 scripts/README_condense_trajectories.md create mode 100644 scripts/condense_trajectories.py create mode 100644 scripts/mock_condenser.py diff --git a/scripts/README_condense_trajectories.md b/scripts/README_condense_trajectories.md new file mode 100644 index 0000000..a2495c5 --- /dev/null +++ b/scripts/README_condense_trajectories.md @@ -0,0 +1,127 @@ +# Trajectory Condensation Utility + +This utility applies OpenHands SDK context condensation to SFT trajectories, splitting them when condensation occurs. + +## Overview + +The `condense_trajectories.py` script uses the OpenHands agent SDK's context condenser to apply condensation to trajectories stored in the SFT format. When condensation occurs (due to the conversation history exceeding a threshold), the trajectory is split into multiple segments since the prefix (system prompt) changes after condensation. + +## Usage + +```bash +python scripts/condense_trajectories.py \ + \ + \ + [--max-size MAX_SIZE] \ + [--keep-first KEEP_FIRST] \ + [--use-mock-condenser] \ + [--llm-model MODEL] \ + [--llm-base-url URL] +``` + +### Arguments + +- `input_file`: Path to input sample_sft_openhands.json file +- `output_file`: Path to output condensed trajectories file +- `--max-size`: Maximum number of events before condensation (default: 10) +- `--keep-first`: Number of initial events to always keep (default: 2) +- `--use-mock-condenser`: Use mock condenser instead of LLM-based condenser (for testing) +- `--llm-model`: LLM model to use (default: from LLM_MODEL env var) +- `--llm-base-url`: LLM base URL (default: from LLM_BASE_URL env var) + +### Environment Variables + +When using the LLM-based condenser (without `--use-mock-condenser`): +- `LLM_API_KEY`: Required. API key for the LLM service. +- `LLM_MODEL`: Optional. Model identifier (default: anthropic/claude-3-5-sonnet-20241022) +- `LLM_BASE_URL`: Optional. Base URL for the LLM API + +## Examples + +### Using Mock Condenser (for testing) + +```bash +python scripts/condense_trajectories.py \ + datasets/swe-smith/sample_sft/sample_sft_openhands.json \ + output_condensed.json \ + --max-size 12 \ + --keep-first 2 \ + --use-mock-condenser +``` + +### Using LLM Condenser + +```bash +export LLM_API_KEY="your-api-key" +export LLM_MODEL="anthropic/claude-3-5-sonnet-20241022" + +python scripts/condense_trajectories.py \ + datasets/swe-smith/sample_sft/sample_sft_openhands.json \ + output_condensed.json \ + --max-size 120 \ + --keep-first 4 +``` + +## Input/Output Format + +### Input +- Format: `sample_sft_openhands.json` (list of trajectory objects) +- Each trajectory has: `id`, `system`, `conversations` +- N total trajectories + +### Output +- Same format as input +- N*M trajectories, where M is the average number of condensations + 1 +- Each segment gets a unique ID: `{original_id}_seg{index}` + +## How It Works + +1. **Load trajectories**: Reads the input JSON file containing SFT trajectories +2. **Convert to events**: Transforms each conversation turn into a MessageEvent +3. **Apply condenser**: Iteratively builds a View and applies condensation when threshold is exceeded +4. **Split on condensation**: When condensation occurs, creates a new trajectory segment +5. **Track condensation events**: Maintains a list of all events including Condensation events +6. **Output segments**: Writes all trajectory segments to the output file + +## Condensation Details + +When a trajectory exceeds the `max-size` threshold: +1. The condenser identifies events to forget (those not in the first `keep_first` or last few events) +2. Creates a summary of the forgotten events (LLM-generated or mock) +3. A Condensation event is added to the history +4. The trajectory is split at this point +5. Subsequent segments start with the condensed view + +## Testing + +The script includes comprehensive logging to track: +- Trajectory processing progress +- Condensation triggers and details +- Segment creation +- Final statistics + +Use `jq` to inspect the output: + +```bash +# Count trajectories +jq 'length' output_condensed.json + +# View trajectory IDs and conversation counts +jq '[.[] | {id, conversations: (.conversations | length)}]' output_condensed.json + +# Inspect a specific trajectory +jq '.[0]' output_condensed.json +``` + +## Dependencies + +- `openhands-sdk`: For context condenser and event handling +- `pydantic`: For data validation +- Standard library: `json`, `logging`, `argparse`, `os`, `sys` + +Install the OpenHands SDK: + +```bash +cd /path/to/software-agent-sdk +pip install -e ./openhands-sdk +``` diff --git a/scripts/condense_trajectories.py b/scripts/condense_trajectories.py new file mode 100644 index 0000000..6ff516a --- /dev/null +++ b/scripts/condense_trajectories.py @@ -0,0 +1,349 @@ +""" +Utility script to apply context condensation to OpenHands SFT trajectories. + +This script: +1. Loads sample_sft_openhands.json trajectories +2. Initializes an OpenHands context condenser +3. Applies condensation at appropriate timing to trajectories +4. Splits trajectories when condensation occurs (since prefix changes) +5. Outputs condensed trajectories in the same format + +Input: sample_sft_openhands.json trajectories (N total) +Output: sample_sft_openhands_condensed.json trajectories (N*M where M is average condensations + 1) +""" + +import argparse +import json +import logging +import os +import sys +from pathlib import Path +from typing import Any + +from pydantic import SecretStr + +# Add scripts directory to path for mock_condenser +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from openhands.sdk import LLM +from openhands.sdk.context.condenser import LLMSummarizingCondenser +from openhands.sdk.context.condenser.base import CondenserBase +from openhands.sdk.context.view import View +from openhands.sdk.event import Condensation +from openhands.sdk.event.llm_convertible import MessageEvent +from openhands.sdk.llm import Message, TextContent +from mock_condenser import MockCondenser + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +def parse_sft_conversation_to_events( + conversations: list[dict[str, str]], trajectory_id: str +) -> list[MessageEvent]: + """Convert SFT conversation format to MessageEvent objects. + + Args: + conversations: List of conversation turns with 'from' and 'value' keys + trajectory_id: Base ID for the trajectory + + Returns: + List of MessageEvent objects + """ + events = [] + for idx, conv in enumerate(conversations): + role = "user" if conv["from"] == "human" else "assistant" + content = conv["value"] + + message = Message(role=role, content=[TextContent(text=content)]) + + # Determine source based on role + source = "user" if role == "user" else "agent" + + event = MessageEvent( + source=source, + llm_message=message, + ) + events.append(event) + + return events + + +def events_to_sft_conversations(events: list[MessageEvent]) -> list[dict[str, str]]: + """Convert MessageEvent objects back to SFT conversation format. + + Args: + events: List of MessageEvent objects + + Returns: + List of conversation dictionaries with 'from' and 'value' keys + """ + conversations = [] + for event in events: + message = event.to_llm_message() + role_mapping = {"user": "human", "assistant": "gpt"} + from_role = role_mapping.get(message.role, message.role) + + # Extract text content + text_parts = [] + for content in message.content: + if isinstance(content, TextContent): + text_parts.append(content.text) + + value = "".join(text_parts) + conversations.append({"from": from_role, "value": value}) + + return conversations + + +def condense_trajectory( + trajectory: dict[str, Any], + condenser: CondenserBase, + trajectory_idx: int, +) -> list[dict[str, Any]]: + """Apply condensation to a single trajectory and split when condensation occurs. + + Args: + trajectory: Original trajectory dict with id, system, and conversations + condenser: Initialized condenser instance + trajectory_idx: Index of the trajectory for logging + + Returns: + List of trajectory dicts (split by condensation points) + """ + trajectory_id = trajectory["id"] + system_prompt = trajectory["system"] + conversations = trajectory["conversations"] + + logger.info( + f"Processing trajectory {trajectory_idx} (id={trajectory_id}) " + f"with {len(conversations)} conversation turns" + ) + + # Convert conversations to events + events = parse_sft_conversation_to_events(conversations, trajectory_id) + + # Track all events as we process them + all_events = [] + split_trajectories = [] + current_segment_start = 0 + condensation_count = 0 + + # Process events iteratively, checking for condensation after each addition + for event_idx, event in enumerate(events): + all_events.append(event) + + # Create a view from current events + view = View.from_events(all_events) + + # Try to condense + result = condenser.condense(view) + + if isinstance(result, Condensation): + condensation_count += 1 + logger.info( + f" Condensation {condensation_count} triggered after event {event_idx + 1}/{len(events)}" + ) + logger.info( + f" Forgetting {len(result.forgotten_event_ids)} events" + ) + if result.summary: + logger.info( + f" Summary (first 100 chars): {result.summary[:100]}..." + ) + + # Add the condensation event to all_events + all_events.append(result) + + # Create a new trajectory segment up to this point (before condensation) + segment_events = all_events[current_segment_start:event_idx + 1] + segment_conversations = events_to_sft_conversations( + [e for e in segment_events if isinstance(e, MessageEvent)] + ) + + if segment_conversations: + segment_id = f"{trajectory_id}_seg{len(split_trajectories)}" + split_trajectories.append( + { + "id": segment_id, + "system": system_prompt, + "conversations": segment_conversations, + } + ) + logger.info( + f" Created segment {segment_id} with {len(segment_conversations)} conversations" + ) + + # Get the new view after condensation + view_after = View.from_events(all_events) + + # Start a new segment from the condensed view + # The system prompt should now include the summary + current_segment_start = len(all_events) + + # Add the final segment (or the entire trajectory if no condensation occurred) + if condensation_count == 0: + # No condensation occurred, return the original trajectory + logger.info( + f" No condensation occurred for trajectory {trajectory_id}" + ) + split_trajectories.append(trajectory) + else: + # Add remaining events as final segment + segment_events = all_events[current_segment_start:] + segment_message_events = [ + e for e in segment_events if isinstance(e, MessageEvent) + ] + + if segment_message_events: + segment_conversations = events_to_sft_conversations(segment_message_events) + segment_id = f"{trajectory_id}_seg{len(split_trajectories)}" + + # Get the final view to include any summary + final_view = View.from_events(all_events) + + split_trajectories.append( + { + "id": segment_id, + "system": system_prompt, + "conversations": segment_conversations, + } + ) + logger.info( + f" Created final segment {segment_id} with {len(segment_conversations)} conversations" + ) + + logger.info( + f" Completed trajectory {trajectory_id}: " + f"{condensation_count} condensations, " + f"{len(split_trajectories)} segments" + ) + + return split_trajectories + + +def main(): + parser = argparse.ArgumentParser( + description="Apply context condensation to OpenHands SFT trajectories" + ) + parser.add_argument( + "input_file", + type=str, + help="Path to input sample_sft_openhands.json file", + ) + parser.add_argument( + "output_file", + type=str, + help="Path to output condensed trajectories file", + ) + parser.add_argument( + "--max-size", + type=int, + default=10, + help="Maximum number of events before condensation (default: 10)", + ) + parser.add_argument( + "--keep-first", + type=int, + default=2, + help="Number of initial events to always keep (default: 2)", + ) + parser.add_argument( + "--llm-model", + type=str, + default=None, + help="LLM model to use (default: from LLM_MODEL env var)", + ) + parser.add_argument( + "--llm-base-url", + type=str, + default=None, + help="LLM base URL (default: from LLM_BASE_URL env var)", + ) + parser.add_argument( + "--use-mock-condenser", + action="store_true", + help="Use mock condenser instead of LLM-based condenser (for testing)", + ) + + args = parser.parse_args() + + # Initialize condenser based on type + if args.use_mock_condenser: + logger.info( + f"Initializing mock condenser with max_size={args.max_size}, " + f"keep_first={args.keep_first}" + ) + condenser = MockCondenser(max_size=args.max_size, keep_first=args.keep_first) + else: + # Check for required environment variables + api_key = os.getenv("LLM_API_KEY") + if not api_key: + logger.error("LLM_API_KEY environment variable is not set") + return 1 + + model = args.llm_model or os.getenv("LLM_MODEL", "anthropic/claude-3-5-sonnet-20241022") + base_url = args.llm_base_url or os.getenv("LLM_BASE_URL") + + logger.info(f"Initializing LLM with model: {model}") + if base_url: + logger.info(f"Using base URL: {base_url}") + + # Initialize LLM + llm = LLM( + usage_id="condenser", + model=model, + base_url=base_url, + api_key=SecretStr(api_key), + ) + + # Initialize condenser + logger.info( + f"Initializing LLM condenser with max_size={args.max_size}, " + f"keep_first={args.keep_first}" + ) + condenser = LLMSummarizingCondenser( + llm=llm, max_size=args.max_size, keep_first=args.keep_first + ) + + # Load input trajectories + logger.info(f"Loading trajectories from {args.input_file}") + with open(args.input_file, "r") as f: + trajectories = json.load(f) + + logger.info(f"Loaded {len(trajectories)} trajectories") + + # Process each trajectory + all_output_trajectories = [] + for idx, trajectory in enumerate(trajectories): + try: + split_trajectories = condense_trajectory(trajectory, condenser, idx) + all_output_trajectories.extend(split_trajectories) + except Exception as e: + logger.error( + f"Error processing trajectory {idx} (id={trajectory.get('id', 'unknown')}): {e}", + exc_info=True, + ) + # Include original trajectory on error + all_output_trajectories.append(trajectory) + + # Write output + logger.info( + f"Writing {len(all_output_trajectories)} trajectories to {args.output_file}" + ) + with open(args.output_file, "w") as f: + json.dump(all_output_trajectories, f, indent=2) + + logger.info( + f"Condensation complete: {len(trajectories)} input trajectories -> " + f"{len(all_output_trajectories)} output trajectories" + ) + logger.info(f"Average split factor: {len(all_output_trajectories) / len(trajectories):.2f}") + + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/scripts/mock_condenser.py b/scripts/mock_condenser.py new file mode 100644 index 0000000..e778436 --- /dev/null +++ b/scripts/mock_condenser.py @@ -0,0 +1,37 @@ +"""Mock condenser for testing condensation logic without requiring LLM calls.""" + +from openhands.sdk.context.condenser.base import RollingCondenser +from openhands.sdk.context.view import View +from openhands.sdk.event.condenser import Condensation +from pydantic import Field + + +class MockCondenser(RollingCondenser): + """A mock condenser that triggers condensation based on a simple threshold.""" + + max_size: int = Field(default=10, gt=0) + keep_first: int = Field(default=2, ge=0) + + def handles_condensation_requests(self) -> bool: + return False + + def should_condense(self, view: View) -> bool: + return len(view) > self.max_size + + def get_condensation(self, view: View) -> Condensation: + """Create a condensation event with a mock summary.""" + head = view[: self.keep_first] + target_size = self.max_size // 2 + events_from_tail = target_size - len(head) - 1 + + # Identify events to be forgotten + forgotten_events = view[self.keep_first : -events_from_tail] + + # Create a simple summary + summary = f"[Summary: Condensed {len(forgotten_events)} events from the conversation history]" + + return Condensation( + forgotten_event_ids=[event.id for event in forgotten_events], + summary=summary, + summary_offset=self.keep_first, + ) From b17faec0b1d6c52913ac1ce54c363169606069e0 Mon Sep 17 00:00:00 2001 From: openhands Date: Mon, 8 Dec 2025 02:17:12 +0000 Subject: [PATCH 2/2] Fix pre-commit issues: remove unused variables and format code Co-authored-by: openhands --- scripts/condense_trajectories.py | 113 +++++++++++++------------------ scripts/mock_condenser.py | 18 ++--- 2 files changed, 58 insertions(+), 73 deletions(-) diff --git a/scripts/condense_trajectories.py b/scripts/condense_trajectories.py index 6ff516a..14a3bfb 100644 --- a/scripts/condense_trajectories.py +++ b/scripts/condense_trajectories.py @@ -17,7 +17,6 @@ import logging import os import sys -from pathlib import Path from typing import Any from pydantic import SecretStr @@ -25,6 +24,7 @@ # Add scripts directory to path for mock_condenser sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from mock_condenser import MockCondenser from openhands.sdk import LLM from openhands.sdk.context.condenser import LLMSummarizingCondenser from openhands.sdk.context.condenser.base import CondenserBase @@ -32,7 +32,6 @@ from openhands.sdk.event import Condensation from openhands.sdk.event.llm_convertible import MessageEvent from openhands.sdk.llm import Message, TextContent -from mock_condenser import MockCondenser logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" @@ -44,11 +43,11 @@ def parse_sft_conversation_to_events( conversations: list[dict[str, str]], trajectory_id: str ) -> list[MessageEvent]: """Convert SFT conversation format to MessageEvent objects. - + Args: conversations: List of conversation turns with 'from' and 'value' keys trajectory_id: Base ID for the trajectory - + Returns: List of MessageEvent objects """ @@ -56,27 +55,27 @@ def parse_sft_conversation_to_events( for idx, conv in enumerate(conversations): role = "user" if conv["from"] == "human" else "assistant" content = conv["value"] - + message = Message(role=role, content=[TextContent(text=content)]) - + # Determine source based on role source = "user" if role == "user" else "agent" - + event = MessageEvent( source=source, llm_message=message, ) events.append(event) - + return events def events_to_sft_conversations(events: list[MessageEvent]) -> list[dict[str, str]]: """Convert MessageEvent objects back to SFT conversation format. - + Args: events: List of MessageEvent objects - + Returns: List of conversation dictionaries with 'from' and 'value' keys """ @@ -85,16 +84,16 @@ def events_to_sft_conversations(events: list[MessageEvent]) -> list[dict[str, st message = event.to_llm_message() role_mapping = {"user": "human", "assistant": "gpt"} from_role = role_mapping.get(message.role, message.role) - + # Extract text content text_parts = [] for content in message.content: if isinstance(content, TextContent): text_parts.append(content.text) - + value = "".join(text_parts) conversations.append({"from": from_role, "value": value}) - + return conversations @@ -104,65 +103,61 @@ def condense_trajectory( trajectory_idx: int, ) -> list[dict[str, Any]]: """Apply condensation to a single trajectory and split when condensation occurs. - + Args: trajectory: Original trajectory dict with id, system, and conversations condenser: Initialized condenser instance trajectory_idx: Index of the trajectory for logging - + Returns: List of trajectory dicts (split by condensation points) """ trajectory_id = trajectory["id"] system_prompt = trajectory["system"] conversations = trajectory["conversations"] - + logger.info( f"Processing trajectory {trajectory_idx} (id={trajectory_id}) " f"with {len(conversations)} conversation turns" ) - + # Convert conversations to events events = parse_sft_conversation_to_events(conversations, trajectory_id) - + # Track all events as we process them all_events = [] split_trajectories = [] current_segment_start = 0 condensation_count = 0 - + # Process events iteratively, checking for condensation after each addition for event_idx, event in enumerate(events): all_events.append(event) - + # Create a view from current events view = View.from_events(all_events) - + # Try to condense result = condenser.condense(view) - + if isinstance(result, Condensation): condensation_count += 1 logger.info( f" Condensation {condensation_count} triggered after event {event_idx + 1}/{len(events)}" ) - logger.info( - f" Forgetting {len(result.forgotten_event_ids)} events" - ) + logger.info(f" Forgetting {len(result.forgotten_event_ids)} events") if result.summary: - logger.info( - f" Summary (first 100 chars): {result.summary[:100]}..." - ) - + logger.info(f" Summary (first 100 chars): {result.summary[:100]}...") + # Add the condensation event to all_events all_events.append(result) - + # Create a new trajectory segment up to this point (before condensation) - segment_events = all_events[current_segment_start:event_idx + 1] + segment_events = all_events[current_segment_start : event_idx + 1] segment_conversations = events_to_sft_conversations( [e for e in segment_events if isinstance(e, MessageEvent)] ) - + if segment_conversations: segment_id = f"{trajectory_id}_seg{len(split_trajectories)}" split_trajectories.append( @@ -175,35 +170,25 @@ def condense_trajectory( logger.info( f" Created segment {segment_id} with {len(segment_conversations)} conversations" ) - - # Get the new view after condensation - view_after = View.from_events(all_events) - + # Start a new segment from the condensed view # The system prompt should now include the summary current_segment_start = len(all_events) - + # Add the final segment (or the entire trajectory if no condensation occurred) if condensation_count == 0: # No condensation occurred, return the original trajectory - logger.info( - f" No condensation occurred for trajectory {trajectory_id}" - ) + logger.info(f" No condensation occurred for trajectory {trajectory_id}") split_trajectories.append(trajectory) else: # Add remaining events as final segment segment_events = all_events[current_segment_start:] - segment_message_events = [ - e for e in segment_events if isinstance(e, MessageEvent) - ] - + segment_message_events = [e for e in segment_events if isinstance(e, MessageEvent)] + if segment_message_events: segment_conversations = events_to_sft_conversations(segment_message_events) segment_id = f"{trajectory_id}_seg{len(split_trajectories)}" - - # Get the final view to include any summary - final_view = View.from_events(all_events) - + split_trajectories.append( { "id": segment_id, @@ -214,13 +199,13 @@ def condense_trajectory( logger.info( f" Created final segment {segment_id} with {len(segment_conversations)} conversations" ) - + logger.info( f" Completed trajectory {trajectory_id}: " f"{condensation_count} condensations, " f"{len(split_trajectories)} segments" ) - + return split_trajectories @@ -267,9 +252,9 @@ def main(): action="store_true", help="Use mock condenser instead of LLM-based condenser (for testing)", ) - + args = parser.parse_args() - + # Initialize condenser based on type if args.use_mock_condenser: logger.info( @@ -283,14 +268,14 @@ def main(): if not api_key: logger.error("LLM_API_KEY environment variable is not set") return 1 - + model = args.llm_model or os.getenv("LLM_MODEL", "anthropic/claude-3-5-sonnet-20241022") base_url = args.llm_base_url or os.getenv("LLM_BASE_URL") - + logger.info(f"Initializing LLM with model: {model}") if base_url: logger.info(f"Using base URL: {base_url}") - + # Initialize LLM llm = LLM( usage_id="condenser", @@ -298,7 +283,7 @@ def main(): base_url=base_url, api_key=SecretStr(api_key), ) - + # Initialize condenser logger.info( f"Initializing LLM condenser with max_size={args.max_size}, " @@ -307,14 +292,14 @@ def main(): condenser = LLMSummarizingCondenser( llm=llm, max_size=args.max_size, keep_first=args.keep_first ) - + # Load input trajectories logger.info(f"Loading trajectories from {args.input_file}") with open(args.input_file, "r") as f: trajectories = json.load(f) - + logger.info(f"Loaded {len(trajectories)} trajectories") - + # Process each trajectory all_output_trajectories = [] for idx, trajectory in enumerate(trajectories): @@ -328,20 +313,18 @@ def main(): ) # Include original trajectory on error all_output_trajectories.append(trajectory) - + # Write output - logger.info( - f"Writing {len(all_output_trajectories)} trajectories to {args.output_file}" - ) + logger.info(f"Writing {len(all_output_trajectories)} trajectories to {args.output_file}") with open(args.output_file, "w") as f: json.dump(all_output_trajectories, f, indent=2) - + logger.info( f"Condensation complete: {len(trajectories)} input trajectories -> " f"{len(all_output_trajectories)} output trajectories" ) logger.info(f"Average split factor: {len(all_output_trajectories) / len(trajectories):.2f}") - + return 0 diff --git a/scripts/mock_condenser.py b/scripts/mock_condenser.py index e778436..e3db0ae 100644 --- a/scripts/mock_condenser.py +++ b/scripts/mock_condenser.py @@ -8,28 +8,30 @@ class MockCondenser(RollingCondenser): """A mock condenser that triggers condensation based on a simple threshold.""" - + max_size: int = Field(default=10, gt=0) keep_first: int = Field(default=2, ge=0) - + def handles_condensation_requests(self) -> bool: return False - + def should_condense(self, view: View) -> bool: return len(view) > self.max_size - + def get_condensation(self, view: View) -> Condensation: """Create a condensation event with a mock summary.""" head = view[: self.keep_first] target_size = self.max_size // 2 events_from_tail = target_size - len(head) - 1 - + # Identify events to be forgotten forgotten_events = view[self.keep_first : -events_from_tail] - + # Create a simple summary - summary = f"[Summary: Condensed {len(forgotten_events)} events from the conversation history]" - + summary = ( + f"[Summary: Condensed {len(forgotten_events)} events from the conversation history]" + ) + return Condensation( forgotten_event_ids=[event.id for event in forgotten_events], summary=summary,