diff --git a/src/agent_browser_utils.py b/src/agent_browser_utils.py index 0b9c53b..33395b2 100644 --- a/src/agent_browser_utils.py +++ b/src/agent_browser_utils.py @@ -28,7 +28,7 @@ import argparse from langchain_google_genai import ChatGoogleGenerativeAI from browser_use import Agent, Browser, BrowserContextConfig, BrowserConfig -from browser_use.browser.browser import BrowserContext +from browser_use.browser.browser import BrowserConfig as ActualBrowserContext # Changed import from pydantic import SecretStr from dotenv import load_dotenv @@ -45,7 +45,7 @@ async def setup_browser(headless: bool = False): highlight_elements=True, save_recording_path="./recordings", ) - return browser, BrowserContext(browser=browser, config=context_config) + return browser, ActualBrowserContext(browser=browser, config=context_config) # Changed usage async def agent_loop(llm, browser_context, query, initial_url=None): diff --git a/src/async_task_manager.py b/src/async_task_manager.py new file mode 100644 index 0000000..3dfeb4c --- /dev/null +++ b/src/async_task_manager.py @@ -0,0 +1,292 @@ +import asyncio +import threading +import uuid +import logging +from typing import Optional, Callable, Dict, Any, List, Coroutine + +from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn + +logger = logging.getLogger(__name__) + +class AsyncTaskManager: + def __init__(self, main_app_handler: Optional[Any] = None): + self.main_app_handler = main_app_handler + self.loop = asyncio.new_event_loop() + self._thread = threading.Thread(target=self._run_loop, daemon=True, name="AsyncTaskManagerThread") + self.active_background_tasks: Dict[str, Dict[str, Any]] = {} + # self.progress_bars: Dict[str, Progress] = {} # Store Progress objects per task + self._thread.start() + logger.info("AsyncTaskManager initialized and event loop thread started.") + + def _run_loop(self): + logger.info("Async task manager event loop started.") + asyncio.set_event_loop(self.loop) + try: + self.loop.run_forever() + finally: + self.loop.close() + logger.info("Async task manager event loop closed.") + + def shutdown(self): + logger.info("AsyncTaskManager shutdown requested.") + if self.loop.is_running(): + logger.info("Stopping asyncio event loop.") + self.loop.call_soon_threadsafe(self.loop.stop) + + # Wait for the thread to finish, with a timeout + # Ensure this is called from the main thread, not the loop's thread. + if threading.current_thread() != self._thread: + self._thread.join(timeout=5.0) + if self._thread.is_alive(): + logger.warning("AsyncTaskManager thread did not shut down gracefully.") + logger.info("AsyncTaskManager shutdown complete.") + + def _on_task_done(self, task_id: str, task_name: str, progress_bar: Progress, rich_task_id: Any, future: asyncio.Future): + """Callback executed when a background task finishes.""" + try: + result = future.result() # Raise exception if task failed + logger.info(f"Task '{task_name}' (ID: {task_id}) completed successfully. Result: {result}") + # For display, we might want to truncate long results if printed directly here. + # For now, assuming main_app_handler will handle detailed result display. + if self.main_app_handler and hasattr(self.main_app_handler, 'update_task_status_display'): + self.main_app_handler.update_task_status_display(task_id, f"✅ {task_name} completed.") + else: + print(f"\n✅ Task '{task_name}' (ID: {task_id}) completed successfully.") + + + task_info = self.active_background_tasks.get(task_id, {}) + meta = task_info.get("meta", {}) + + if meta.get("type") == "script_execution": + if self.main_app_handler and hasattr(self.main_app_handler, 'handle_script_completion'): + self.main_app_handler.handle_script_completion(task_id, task_name, str(result)) + else: + logger.warning(f"No main_app_handler or handle_script_completion method to process script output for task {task_id}.") + elif meta.get("type") == "pdf_processing": # Example for PDF processing + if self.main_app_handler and hasattr(self.main_app_handler, 'handle_pdf_completion'): + # Result here might be the GenAI file object or a status message + self.main_app_handler.handle_pdf_completion(task_id, task_name, result) + else: + logger.warning(f"No main_app_handler or handle_pdf_completion method for task {task_id}.") + + + except asyncio.CancelledError: + logger.warning(f"Task '{task_name}' (ID: {task_id}) was cancelled.") + if self.main_app_handler and hasattr(self.main_app_handler, 'update_task_status_display'): + self.main_app_handler.update_task_status_display(task_id, f"🚫 {task_name} cancelled.") + else: + print(f"\n🚫 Task '{task_name}' (ID: {task_id}) was cancelled.") + except Exception as e: + logger.error(f"Task '{task_name}' (ID: {task_id}) failed.", exc_info=True) + if self.main_app_handler and hasattr(self.main_app_handler, 'update_task_status_display'): + self.main_app_handler.update_task_status_display(task_id, f"❌ {task_name} error: {type(e).__name__}") + else: + print(f"\n❌ Task '{task_name}' (ID: {task_id}) failed: {type(e).__name__}: {e}") + finally: + self.active_background_tasks.pop(task_id, None) + # Stop the specific Rich progress task, not the whole Progress object if it's shared. + # If each task has its own Progress object, then stop it. + # For now, assuming progress_bar is the Rich Progress object itself, and rich_task_id is the TaskID from progress.add_task + if progress_bar and rich_task_id is not None: + # This logic might need refinement based on how Progress is used. + # If a single Progress object is used for all tasks, we update, not stop. + # If each task has its own Progress object, then stop it. + # The original code created a new Progress object per task. + progress_bar.update(rich_task_id, completed=progress_bar.tasks[0].total if progress_bar.tasks else 100) # Mark as complete + progress_bar.stop() # Stop this progress instance. + + # Refresh prompt if main_app_handler supports it + if self.main_app_handler and hasattr(self.main_app_handler, 'refresh_prompt_display'): + self.main_app_handler.refresh_prompt_display() + + + def submit_task(self, coro_creator: Callable[..., Coroutine[Any, Any, Any]], + task_name: str, progress_total: float = 100.0, + task_meta: Optional[Dict[str, Any]] = None) -> str: + task_id = str(uuid.uuid4()) + + # Each task gets its own Progress display instance for now + # This might be noisy if many tasks run; consider a shared Progress object if main_app_handler can manage it. + progress = Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TimeElapsedColumn(), + # transient=True # Keep progress visible until explicitly cleared or handled by UI + ) + rich_task_id = progress.add_task(description=f"Initializing {task_name}...", total=progress_total, start=False) + + # The coroutine created by coro_creator must accept these arguments: + # task_id, progress_bar (the Rich Progress object), rich_task_id (the ID for progress.update) + # plus any specific arguments it needs, passed via functools.partial when calling submit_task. + task_coro = coro_creator(task_id=task_id, progress_bar=progress, rich_task_id=rich_task_id) + + # Start the Rich Progress display for this task + # This needs to be handled carefully if run from a non-main thread or if prompt_toolkit is active. + # For now, let's assume direct printing of progress might interfere with prompt_toolkit. + # A better approach is for main_app_handler to manage Rich display. + # For simplicity here, we'll let Rich print, but this is a known issue for CLI apps. + # TODO: Integrate Rich Progress display with prompt_toolkit UI if possible, or use a simpler non-Rich progress. + # For now, we pass `progress` and `rich_task_id` to the coroutine, which can update it. + # The actual `progress.start()` or `Live` context should be managed by the UI layer ideally. + # However, since the original code started progress here, we'll keep a simplified version. + # This part is tricky with prompt_toolkit. For now, we'll just store it. + # The task itself will call progress.update() and progress.start() if needed by its logic. + + fut = asyncio.run_coroutine_threadsafe(task_coro, self.loop) + + self.active_background_tasks[task_id] = { + "future": fut, + "name": task_name, + "progress_bar": progress, + "rich_task_id": rich_task_id, + "meta": task_meta if task_meta else {} + } + + # Add done callback (needs to be partial as it takes more than just the future) + callback = functools.partial(self._on_task_done, task_id, task_name, progress, rich_task_id) + fut.add_done_callback(callback) + + # Print to console (or use main_app_handler to display this) + if self.main_app_handler and hasattr(self.main_app_handler, 'display_message'): + self.main_app_handler.display_message(f"⏳ Task '{task_name}' (ID: {task_id}) started in background.") + else: + print(f"⏳ Task '{task_name}' (ID: {task_id}) started in background – you can keep chatting.") + + return task_id + + def cancel_task(self, task_id_str: str): + task_info = self.active_background_tasks.get(task_id_str) + if not task_info: + message = f"\n❌ Task ID '{task_id_str}' not found or already completed." + if self.main_app_handler and hasattr(self.main_app_handler, 'display_message'): + self.main_app_handler.display_message(message) + else: + print(message) + return + + future = task_info.get("future") + task_name = task_info.get("name", "Unnamed Task") + + if future and not future.done(): + # The cancellation itself is thread-safe. + cancelled = self.loop.call_soon_threadsafe(future.cancel) + # Note: future.cancel() might return False if already done/cancelling. + # The callback _on_task_done will handle logging and cleanup. + message = f"\n➡️ Cancellation request sent for task '{task_name}' (ID: {task_id_str})." + elif future and future.done(): + message = f"\nℹ️ Task '{task_name}' (ID: {task_id_str}) has already completed." + else: + message = f"\n⚠️ Could not cancel task '{task_name}' (ID: {task_id_str}). Future object missing or invalid state." + + if self.main_app_handler and hasattr(self.main_app_handler, 'display_message'): + self.main_app_handler.display_message(message) + else: + print(message) + + def list_tasks(self): + if not self.active_background_tasks: + message = "\nℹ️ No active background tasks." + if self.main_app_handler and hasattr(self.main_app_handler, 'display_message'): + self.main_app_handler.display_message(message) + else: + print(message) + return + + output_lines = ["\n📋 Active Background Tasks:"] + for task_id, info in self.active_background_tasks.items(): + future = info.get("future") + name = info.get("name", "Unnamed Task") + status = "Running" + if future: + if future.cancelled(): status = "Cancelling" + elif future.done(): status = "Completed (Pending Removal)" + output_lines.append(f" - ID: {task_id}, Name: {name}, Status: {status}") + + full_message = "\n".join(output_lines) + if self.main_app_handler and hasattr(self.main_app_handler, 'display_message'): + self.main_app_handler.display_message(full_message) + else: + print(full_message) + + def get_loop(self): + """Returns the asyncio event loop used by the manager.""" + return self.loop + + def get_active_tasks_info(self) -> Dict[str, Dict[str, Any]]: + """Returns a copy of the active background tasks information.""" + return dict(self.active_background_tasks) + +# Example usage (for testing within the file if needed) +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + + class MockMainAppHandler: + def handle_script_completion(self, task_id, task_name, script_output): + logger.info(f"[MockMainAppHandler] Script completed: {task_id} - {task_name}. Output:\n{script_output}") + + def update_task_status_display(self, task_id, message): + logger.info(f"[MockMainAppHandler] Task status update: {task_id} - {message}") + + def display_message(self, message): + print(message) + + def refresh_prompt_display(self): + logger.info("[MockMainAppHandler] Refreshing prompt display (e.g., for prompt_toolkit).") + + + async def sample_long_task(task_id: str, progress_bar: Progress, rich_task_id: Any, duration: int, message: str): + logger.info(f"Task {task_id} ({message}) started. Will run for {duration}s. RichTaskID: {rich_task_id}") + if progress_bar: # Start the progress bar if it's passed and valid + progress_bar.start() + progress_bar.update(rich_task_id, description=f"Running {message}...", start=True) + + for i in range(duration): + await asyncio.sleep(1) + if progress_bar: + progress_bar.update(rich_task_id, advance=100/duration, description=f"{message}: {i+1}/{duration}s") + logger.debug(f"Task {task_id} ({message}) progress: {i+1}/{duration}") + if i == duration // 2 and message == "Failing Task": # Simulate failure + raise ValueError("Simulated failure in long task") + logger.info(f"Task {task_id} ({message}) finished.") + return f"Result from {task_id}: {message} completed after {duration}s" + + mock_handler = MockMainAppHandler() + manager = AsyncTaskManager(main_app_handler=mock_handler) + + try: + logger.info("Submitting tasks...") + # Use functools.partial to prepare coro_creator with specific args for the task + task1_coro_creator = functools.partial(sample_long_task, duration=5, message="Task 1 (Success)") + task_id1 = manager.submit_task(task1_coro_creator, "SuccessTask1", task_meta={"type": "generic"}) + + task2_coro_creator = functools.partial(sample_long_task, duration=3, message="Task 2 (Script)") + task_id2 = manager.submit_task(task2_coro_creator, "ScriptTask2", task_meta={"type": "script_execution"}) + + task3_coro_creator = functools.partial(sample_long_task, duration=6, message="Failing Task") + task_id3 = manager.submit_task(task3_coro_creator, "FailingTask3", task_meta={"type": "generic"}) + + logger.info(f"Tasks submitted: {task_id1}, {task_id2}, {task_id3}") + manager.list_tasks() + + time.sleep(2) + logger.info(f"Attempting to cancel Task 1 ({task_id1})") + manager.cancel_task(task_id1) + + manager.list_tasks() + + # Keep main thread alive to see task completions + logger.info("Main thread sleeping for 10 seconds to allow tasks to run...") + time.sleep(10) + + manager.list_tasks() + logger.info("Main thread finished sleeping.") + + except KeyboardInterrupt: + logger.info("Keyboard interrupt received.") + finally: + logger.info("Shutting down AsyncTaskManager...") + manager.shutdown() + logger.info("AsyncTaskManager shutdown complete.") + logger.info("Exiting test script.") diff --git a/src/import_test.py b/src/import_test.py new file mode 100644 index 0000000..48b88da --- /dev/null +++ b/src/import_test.py @@ -0,0 +1,13 @@ +import logging +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) # Configure basic logging for the test + +try: + from google import genai + from google.genai import types + print("Successfully imported google.genai and google.genai.types") + logger.info("Successfully imported google.genai and google.genai.types") +except ImportError as e: + print(f"Failed to import google.genai: {e}") + logger.error(f"Failed to import google.genai: {e}", exc_info=True) + raise diff --git a/src/llm_agent_core.py b/src/llm_agent_core.py new file mode 100644 index 0000000..30552d8 --- /dev/null +++ b/src/llm_agent_core.py @@ -0,0 +1,480 @@ +import os +import logging +from pathlib import Path +from typing import Optional, List, Dict, Any, Callable, TYPE_CHECKING +import functools # Added for _make_verbose_tool +import inspect # Added for checking async tools +import asyncio # Added for async tool execution + +import google.generativeai as genai + +if TYPE_CHECKING: + from .async_task_manager import AsyncTaskManager # For type hinting +# types will be accessed via genai.types + +# Assuming tools.py might be needed later, add a placeholder import +# from . import tools + +logger = logging.getLogger(__name__) + +# Fallback if not in config, matches CodeAgent's previous default +DEFAULT_THINKING_BUDGET_FALLBACK = 256 + +class LLMAgentCore: + def __init__(self, config: dict, api_key: Optional[str], model_name: str, task_manager: 'Optional[AsyncTaskManager]' = None): + self.config = config + self.api_key = api_key + self.model_name = model_name + self.task_manager = task_manager # Store the task manager + + self.client: Optional[genai.client.Client] = None + # Async client part is derived from the main client + self.async_client: Optional[genai.client.Client.AsyncClientPart] = None + self.chat: Optional[genai.ChatSession] = None # Changed from genai.generative_models.ChatSession + + self.tool_functions: List[Callable] = [] + # Use model_name from args, and thinking_budget from config or fallback + self.thinking_budget: int = config.get('default_thinking_budget', DEFAULT_THINKING_BUDGET_FALLBACK) + # Initialize generation_config here, replacing thinking_config + self.generation_config: Optional[genai.types.GenerationConfig] = genai.types.GenerationConfig() + + self.conversation_history: List[genai.types.Content] = [] + self.current_token_count: int = 0 + + if self.api_key: + self._configure_client() + if self.client: # Only initialize chat if client configuration was successful + self._initialize_chat() + else: + logger.warning("LLMAgentCore: GEMINI_API_KEY not found. LLM client not initialized.") + + def _configure_client(self): + """Configures the Google Generative AI client.""" + logger.info("LLMAgentCore: Configuring GenAI client...") + try: + self.client = genai.Client(api_key=self.api_key) + self.async_client = self.client.aio # Initialize async_client + logger.info("LLMAgentCore: GenAI client configured successfully.") + except Exception as e: + logger.error(f"LLMAgentCore: Error configuring GenAI client: {e}", exc_info=True) + self.client = None # Ensure client is None on failure + self.async_client = None + + def _initialize_chat(self): + """Initializes the chat session.""" + if not self.client: + logger.error("LLMAgentCore: Cannot initialize chat without a configured client.") + return + logger.info(f"LLMAgentCore: Initializing chat session with model {self.model_name}...") + try: + # Use the current self.conversation_history when creating a new chat session + self.chat = self.client.chats.create(model=self.model_name, history=self.conversation_history) + logger.info(f"LLMAgentCore: Chat session initialized/re-initialized with {len(self.conversation_history)} history entries.") + except Exception as e: + logger.error(f"LLMAgentCore: Error initializing/re-initializing chat session: {e}", exc_info=True) + self.chat = None # Ensure chat is None on failure + + def _make_verbose_tool(self, func: Callable) -> Callable: + """Wrap tool function to log verbose info when called.""" + @functools.wraps(func) + def wrapper(*args, **kwargs): + logger.info(f"LLMAgentCore: 🔧 Tool called: {func.__name__}, args: {args}, kwargs: {kwargs}") + result = func(*args, **kwargs) + # Consider truncating long results for logging if necessary + result_display = str(result) + if len(result_display) > 200: # Example length limit + result_display = result_display[:200] + "..." + logger.info(f"LLMAgentCore: ▶️ Tool result ({func.__name__}): {result_display}") + return result + return wrapper + + def register_tools(self, tools_list: List[Callable], verbose: bool = False): + """Registers tool functions, optionally wrapping them for verbose logging.""" + if verbose: + self.tool_functions = [self._make_verbose_tool(f) for f in tools_list] + else: + self.tool_functions = tools_list + logger.info(f"LLMAgentCore: Registered {len(self.tool_functions)} tools. Verbose: {verbose}") + # self.generation_config can be updated here if needed, but basic init is in __init__ + # For now, removing direct update related to thinking_budget as its equivalent in GenerationConfig is unclear. + + def send_message(self, user_message_text: str, + active_files: Optional[List[genai.types.File]] = None, + pending_context: Optional[str] = None) -> str: + """ + Sends a message to the LLM, manages history, and returns the response text. + """ + if not self.chat: + logger.error("LLMAgentCore: Chat not initialized. Cannot send message.") + return "Error: Chat session is not active. Please check API key and initialization." + + message_to_send_text = user_message_text + if pending_context: + logger.info("LLMAgentCore: Prepending pending context to the message.") + message_to_send_text = f"{pending_context}\n\n{message_to_send_text}" + + # --- Construct message content (Text + Files) --- + message_content_parts: List[Any] = [genai.types.Part(text=message_to_send_text)] # Changed from genai_types + if active_files: + # Ensure active_files are of the correct type (genai.types.File or compatible Part) + # For now, assume they are genai.types.File objects as per typical usage. + # If they are just paths, they need to be uploaded first (outside this method's scope for now) + message_content_parts.extend(active_files) + logger.info(f"LLMAgentCore: Including {len(active_files)} active files in the prompt parts.") + + # The user's turn/message to be added to history and sent + new_user_content = genai.types.Content(parts=message_content_parts, role="user") # Changed from genai_types + + # Append new user message to our managed history *before* sending + self.conversation_history.append(new_user_content) + logger.info(f"LLMAgentCore: Sending message to LLM. History length: {len(self.conversation_history)}") + + try: + # Initial message to the LLM + response = self.chat.send_message( + content=new_user_content, + tools=self.tool_functions, + generation_config=self.generation_config + ) + + # Check for function call + candidate = response.candidates[0] + if candidate.content and candidate.content.parts and candidate.content.parts[0].function_call: + # Model wants to call a tool + logger.info("LLMAgentCore: Function call requested by LLM.") + # Append the model's response (which includes the function call) to history + self.conversation_history.append(candidate.content) + + function_call_part = candidate.content.parts[0] + tool_func_name = function_call_part.function_call.name + args_dict = {key: value for key, value in function_call_part.function_call.args.items()} + + tool_func = next((f for f in self.tool_functions if f.__name__ == tool_func_name), None) + + tool_output = None + tool_had_error = False + if tool_func: + logger.info(f"LLMAgentCore: Executing tool: {tool_func_name} with args: {args_dict}") + try: + if inspect.iscoroutinefunction(tool_func): + if self.task_manager and self.task_manager.loop.is_running(): + future = asyncio.run_coroutine_threadsafe(tool_func(**args_dict), self.task_manager.loop) + tool_output = future.result() # Blocking call to get async result + logger.info(f"LLMAgentCore: Async tool {tool_func_name} executed. Output (first 100 chars): {str(tool_output)[:100]}") + else: + tool_output = "Error: AsyncTaskManager not available or loop not running for async tool." + tool_had_error = True + logger.error(f"LLMAgentCore: AsyncTaskManager not available/running for async tool: {tool_func_name}") + else: # Synchronous tool + tool_output = tool_func(**args_dict) + logger.info(f"LLMAgentCore: Sync tool {tool_func_name} executed. Output (first 100 chars): {str(tool_output)[:100]}") + except Exception as e: + tool_output = f"Error executing tool {tool_func_name}: {str(e)}" + tool_had_error = True + logger.error(f"LLMAgentCore: Error executing tool {tool_func_name}: {e}", exc_info=True) + else: + tool_output = f"Error: Tool '{tool_func_name}' not found." + tool_had_error = True + logger.error(f"LLMAgentCore: Tool '{tool_func_name}' requested by LLM not found.") + + # Construct the function response part + function_response_content = genai.types.Content( + parts=[genai.types.Part( + function_response=genai.types.FunctionResponse( + name=tool_func_name, + response={'output': tool_output, 'error': tool_had_error} + ) + )], + role="tool" # Using "tool" role for function/tool responses + ) + self.conversation_history.append(function_response_content) # Add tool response to history + + # Send the tool's response back to the model + logger.info(f"LLMAgentCore: Sending tool response for {tool_func_name} back to LLM.") + response = self.chat.send_message( + content=function_response_content, + # No tools needed when sending back a tool response usually + ) + # This 'response' is now the model's textual reply AFTER considering the tool output. + + # Extract final text response + agent_response_text = "" + if response.candidates and response.candidates[0].content and response.candidates[0].content.parts: + # Ensure we only join text parts for the final response + text_parts = [p.text for p in response.candidates[0].content.parts if hasattr(p, "text") and p.text] + agent_response_text = " ".join(text_parts).strip() + + if not agent_response_text: # If after tool call, response is empty (e.g. only another tool call) + if any(hasattr(p, "function_call") for p in response.candidates[0].content.parts): + agent_response_text = "[Tool call requested by LLM after previous tool execution]" + logger.info("LLMAgentCore: LLM requested another tool call.") + else: + agent_response_text = "[No textual response from LLM after processing tool output or message]" + + + # Append final model's response to our managed history + # This should be the text response or a message indicating another tool call + final_model_content = genai.types.Content(role="model", parts=[genai.types.Part(text=agent_response_text)]) + self.conversation_history.append(final_model_content) + + # Calculate token count based on the updated (complete turn) self.conversation_history + if self.client: + try: + # Ensure history for token counting contains only compatible parts (text/file data) + # For now, assume self.conversation_history is correctly formatted. + # If file parts are complex (e.g. genai_types.File directly), count_tokens might need specific handling. + # The current structure (Content objects with Part(text=...) or Part(file_data=...)) should be fine. + token_info = self.client.models.count_tokens( + model=self.model_name, + contents=self.conversation_history # Send the whole history + ) + self.current_token_count = token_info.total_tokens + logger.info(f"LLMAgentCore: Current token count: {self.current_token_count}") + except Exception as count_err: + logger.error(f"LLMAgentCore: Error calculating token count: {count_err}", exc_info=True) + # self.current_token_count might be stale or could be set to an error indicator + + return agent_response_text + + except Exception as e: + logger.error(f"LLMAgentCore: Error sending message to LLM: {e}", exc_info=True) + # Construct an error message to return and add to history + error_response_text = f"Error: Could not get response from LLM. Details: {str(e)}" + # Append this error as a "model" response in history + hist_error_content = genai.types.Content(role="model", parts=[genai.types.Part(text=error_response_text)]) # Changed from genai_types + self.conversation_history.append(hist_error_content) + # Optionally, try to update token count here as well + return error_response_text + + def reset_chat(self): + """Resets the conversation history, token count, and re-initializes the chat session.""" + logger.info("LLMAgentCore: Resetting chat...") + self.conversation_history = [] + self.current_token_count = 0 + if self.client: + self._initialize_chat() # Re-initialize the chat session with the LLM + logger.info("LLMAgentCore: Chat reset and re-initialized successfully.") + else: + logger.warning("LLMAgentCore: Client not available. Chat history cleared, but session not re-initialized.") + + def clear_history_turns(self, turns_to_keep: int = 5): + """ + Clears older history, keeping a specified number of recent turns. + A "turn" consists of one user message and one model response. + """ + logger.info(f"LLMAgentCore: Clearing history, attempting to keep last {turns_to_keep} turns.") + if turns_to_keep <= 0: + self.conversation_history = [] + else: + # Each turn is 2 items (user, model) + items_to_keep = turns_to_keep * 2 + if len(self.conversation_history) > items_to_keep: + self.conversation_history = self.conversation_history[-items_to_keep:] + + # Recalculate token count + if self.client and self.conversation_history: + try: + token_info = self.client.models.count_tokens( + model=self.model_name, + contents=self.conversation_history + ) + self.current_token_count = token_info.total_tokens + logger.info(f"LLMAgentCore: History cleared. New token count: {self.current_token_count}") + except Exception as count_err: + logger.error(f"LLMAgentCore: Error recalculating token count after clearing history: {count_err}", exc_info=True) + elif not self.conversation_history: + self.current_token_count = 0 + logger.info("LLMAgentCore: History cleared completely. Token count reset to 0.") + + def clear_history_by_tokens(self, tokens_to_clear_target: int) -> tuple[int, int]: + """ + Clears older history by a target number of tokens. + Returns a tuple of (messages_removed_count, actual_tokens_cleared_count). + """ + if not self.conversation_history: + logger.info("LLMAgentCore: History is already empty. Nothing to clear by tokens.") + return 0, 0 + if not self.client: + logger.error("LLMAgentCore: Client not available. Cannot accurately clear tokens by count.") + # Potentially could clear by message count as a fallback, but returning 0,0 for now. + return 0, 0 + + logger.info(f"LLMAgentCore: Attempting to clear approx. {tokens_to_clear_target} tokens from history.") + + new_history = list(self.conversation_history) + tokens_counted_for_removal = 0 + messages_removed_count = 0 + + temp_history_for_counting = [] + + while tokens_counted_for_removal < tokens_to_clear_target and new_history: + first_message = new_history[0] + message_tokens = 0 + try: + # Count tokens for the message being considered for removal + temp_history_for_counting.append(first_message) + count_response = self.client.models.count_tokens( + model=self.model_name, + contents=temp_history_for_counting # Count only this message in context of prior kept ones + ) + # We need token count of *just this message*. + # A simple way is to count history with and without it, if API allows counting single Content. + # Or, if count_tokens on a single Content object works: + single_message_count_response = self.client.models.count_tokens(model=self.model_name, contents=[first_message]) + message_tokens = single_message_count_response.total_tokens + temp_history_for_counting.pop() # remove after counting if only counting one by one + + except Exception as e_count: + logger.error(f"LLMAgentCore: Could not count tokens for a message during clear_history_by_tokens: {e_count}. Using estimate (75).", exc_info=True) + message_tokens = 75 # Fallback estimate + + tokens_counted_for_removal += message_tokens + new_history.pop(0) + messages_removed_count += 1 + + logger.info(f"LLMAgentCore: After token counting pass, {messages_removed_count} messages ({tokens_counted_for_removal} tokens) selected for removal.") + + # Ensure history starts with a 'user' message if not empty + additional_messages_removed_for_role = 0 + while new_history and new_history[0].role != "user": + logger.warning("LLMAgentCore: History after token clear pass starts with a model turn. Removing additional leading model messages.") + new_history.pop(0) + messages_removed_count += 1 + additional_messages_removed_for_role += 1 + if additional_messages_removed_for_role > 0: + logger.info(f"LLMAgentCore: Removed {additional_messages_removed_for_role} additional model messages to ensure user turn start.") + + self.conversation_history = new_history + + # Recalculate current_token_count + if not self.conversation_history: + self.current_token_count = 0 + else: + try: + self.current_token_count = self.client.models.count_tokens( + model=self.model_name, + contents=self.conversation_history + ).total_tokens + except Exception as e_recount: + logger.error(f"LLMAgentCore: Error recounting tokens after clear_history_by_tokens: {e_recount}. Token count may be inaccurate.", exc_info=True) + self.current_token_count = -1 # Indicate error + + # Re-initialize chat with the modified history + if self.client: + self._initialize_chat() # This will now use the updated self.conversation_history + + logger.info(f"LLMAgentCore: Cleared {messages_removed_count} message(s). Actual tokens cleared approx {tokens_counted_for_removal}. New total tokens: {self.current_token_count if self.current_token_count != -1 else 'Error counting'}") + return messages_removed_count, tokens_counted_for_removal + + def set_thinking_budget(self, budget: int): + """Sets the thinking budget. Note: Direct impact on GenerationConfig is TBD.""" + if 0 <= budget <= 24000: # Assuming same validation as before + self.thinking_budget = budget + # self.generation_config currently has no direct 'thinking_budget'. + # If there were other parameters in GenerationConfig to set based on this, they would go here. + # For example, if tool execution budget was part of it. + logger.info(f"LLMAgentCore: Thinking budget value set to: {self.thinking_budget}. Effect on GenerationConfig is pending SDK feature mapping.") + # Example: If GenerationConfig had a relevant field: + # self.generation_config.some_tool_execution_parameter = budget + else: + logger.warning(f"LLMAgentCore: Invalid thinking budget value: {budget}. Must be between 0 and 24000.") + + +# Example usage (for testing within the file if needed) +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) # Use DEBUG for more verbose output during testing + + # --- Mock Tools for Testing --- + def mock_search_tool(query: str) -> str: + return f"Search results for: {query}" + + def mock_read_file_tool(path: str) -> str: + return f"Contents of file: {path}" + + # --- Test Setup --- + sample_config_main = { + 'model_name': os.getenv("GEMINI_MODEL_NAME", 'gemini-1.5-flash-latest'), + 'default_thinking_budget': 150, # Lower for testing + 'verbose': True, # Test verbose tool logging + } + + # API key is essential + sample_api_key_main = os.getenv("GEMINI_API_KEY") + + if not sample_api_key_main: + logger.error("Please set GEMINI_API_KEY environment variable for this example to run.") + else: + logger.info(f"Using Model: {sample_config_main['model_name']}") + logger.info("--- Initializing LLMAgentCore ---") + core_agent_main = LLMAgentCore( + config=sample_config_main, + api_key=sample_api_key_main, + model_name=sample_config_main['model_name'] + ) + + if core_agent_main.client and core_agent_main.chat: + logger.info("--- LLMAgentCore initialized successfully ---") + + # Register mock tools + core_agent_main.register_tools([mock_search_tool, mock_read_file_tool], verbose=True) + + # Test send_message + logger.info("--- Test 1: Sending a simple message ---") + response1 = core_agent_main.send_message("Hello, LLM agent core!") + logger.info(f"Response 1: {response1}") + logger.info(f"History after msg 1: {len(core_agent_main.conversation_history)} entries, Token count: {core_agent_main.current_token_count}") + + # Test send_message with pending_context + logger.info("\n--- Test 2: Sending message with pending_context ---") + response2 = core_agent_main.send_message("Tell me about this.", pending_context="Some important background info here.") + logger.info(f"Response 2: {response2}") + logger.info(f"History after msg 2: {len(core_agent_main.conversation_history)} entries, Token count: {core_agent_main.current_token_count}") + + # Test clear_history_tokens + logger.info("\n--- Test 3: Clearing history (keeping 1 turn) ---") + core_agent_main.clear_history_turns(turns_to_keep=1) + logger.info(f"History after clear: {len(core_agent_main.conversation_history)} entries, Token count: {core_agent_main.current_token_count}") + if len(core_agent_main.conversation_history) == 2: # 1 user, 1 model + logger.info("History clear (1 turn) seems successful.") + else: + logger.error(f"History clear (1 turn) failed. Expected 2 entries, got {len(core_agent_main.conversation_history)}") + + + # Test reset_chat + logger.info("\n--- Test 4: Resetting chat ---") + core_agent_main.reset_chat() + logger.info(f"History after reset: {len(core_agent_main.conversation_history)} entries, Token count: {core_agent_main.current_token_count}") + if not core_agent_main.conversation_history and core_agent_main.current_token_count == 0: + logger.info("Chat reset successful.") + else: + logger.error("Chat reset failed.") + + # Test sending message after reset + logger.info("\n--- Test 5: Sending message after reset ---") + response3 = core_agent_main.send_message("Are you fresh now?") + logger.info(f"Response 3: {response3}") + logger.info(f"History after msg 3: {len(core_agent_main.conversation_history)} entries, Token count: {core_agent_main.current_token_count}") + + # Test message that might trigger a tool (if model is capable and tools are understood) + # This is harder to deterministically test without a sophisticated mock model. + logger.info("\n--- Test 6: Sending a message that might use a tool ---") + response4 = core_agent_main.send_message("Search for information about Large Language Models.") + logger.info(f"Response 4: {response4}") + logger.info(f"History after msg 4: {len(core_agent_main.conversation_history)} entries, Token count: {core_agent_main.current_token_count}") + # Check if response indicates a tool call (model dependent) + if "[Tool call requested by LLM]" in response4: + logger.info("Model indicated a tool call as expected (potentially).") + elif "Search results for:" in response4 : # If model directly uses tool-like phrasing + logger.info("Model provided a direct response that looks like tool output.") + else: + logger.info("Model provided a general response.") + + + else: + logger.error("--- LLMAgentCore initialization FAILED (client or chat not available) ---") + if not core_agent_main.api_key: + logger.error("Reason: API key is missing.") + if not core_agent_main.client: + logger.error("Reason: GenAI Client failed to initialize.") + if not core_agent_main.chat: + logger.error("Reason: Chat session failed to initialize.") diff --git a/src/main.py b/src/main.py index 647cb8b..69b0967 100644 --- a/src/main.py +++ b/src/main.py @@ -1,52 +1,51 @@ -from google import genai -from google.genai import types import os import sys from pathlib import Path from . import database from . import tools -from .autocomplete import PdfCompleter # <-- Import PdfCompleter +from .autocomplete import PdfCompleter import traceback import argparse -import functools +import functools # Keep for CodeAgent._make_verbose_tool if not fully removed, or for other potential uses import logging -import asyncio, threading, uuid # Added uuid -from rich.progress import Progress, SpinnerColumn, TimeElapsedColumn, TextColumn, BarColumn # Added TextColumn, BarColumn -from prompt_toolkit.document import Document # For completer -from prompt_toolkit.completion import Completer, Completion, NestedCompleter, WordCompleter, PathCompleter, FuzzyWordCompleter, CompleteEvent # For completer, Added CompleteEvent +import asyncio, threading, uuid +from rich.progress import Progress, SpinnerColumn, TimeElapsedColumn, TextColumn, BarColumn +from prompt_toolkit.document import Document +from prompt_toolkit.completion import Completer, Completion, NestedCompleter, WordCompleter, PathCompleter, FuzzyWordCompleter, CompleteEvent from prompt_toolkit import PromptSession from prompt_toolkit.history import InMemoryHistory from prompt_toolkit.key_binding import KeyBindings -from prompt_toolkit.filters import Condition # Added -from prompt_toolkit.keys import Keys # Added -from prompt_toolkit.application.current import get_app # Added +from prompt_toolkit.filters import Condition +from prompt_toolkit.keys import Keys +from prompt_toolkit.application.current import get_app from prompt_toolkit.formatted_text import HTML -from prompt_toolkit.shortcuts import CompleteStyle # Added for complete_style argument +from prompt_toolkit.shortcuts import CompleteStyle from prompt_toolkit.auto_suggest import AutoSuggestFromHistory -import re # For finding words in command input -from typing import Iterable, Dict, Any, Optional, Set, Union # For completer and CustomNestedCompleter -import bisect # Added for efficient searching +import re +from typing import Iterable, Dict, Any, Optional, Set, Union, List, Callable # Ensure List, Callable are here +import bisect import yaml import sqlite3 -from typing import Optional, List, Dict, Callable -from dotenv import load_dotenv # Import dotenv -from datetime import datetime, timezone # Added for processed_timestamp -import sys # For sys.executable -import subprocess # For running scripts -# from google.generativeai import types as genai_types # This was incorrect for the new SDK as the `google.generativeai` SDK is deprecated and we should use `google.genai` SDK instead +from dotenv import load_dotenv +from datetime import datetime, timezone +import subprocess + +# --- Import LLMAgentCore --- +from .llm_agent_core import LLMAgentCore +import google.generativeai as genai # Use the working import style +from .async_task_manager import AsyncTaskManager # Import AsyncTaskManager # --- Import slash command handlers --- -from . import slashcommands # Import the new module +from . import slashcommands # Setup basic logging -# Configure logging with a default of WARNING (less verbose) -logging.basicConfig(level=logging.WARNING) # Default to less verbose logging -logger = logging.getLogger(__name__) # Define module-level logger +logging.basicConfig(level=logging.WARNING) +logger = logging.getLogger(__name__) # --- Completer Logging Wrapper --- -class LoggingCompleterWrapper(Completer): +class LoggingCompleterWrapper(Completer): # Keep this class as it's UI related def __init__(self, wrapped_completer: Completer, name: str = "Wrapped"): self.wrapped_completer = wrapped_completer self.name = name @@ -58,7 +57,7 @@ def get_completions(self, document: Document, complete_event: CompleteEvent) -> yield from raw_completions # --- End Completer Logging Wrapper --- -class CustomNestedCompleter(NestedCompleter): +class CustomNestedCompleter(NestedCompleter): # Keep this UI related class def __init__(self, options: Dict[str, Optional[Completer]], ignore_case: bool = True): super().__init__(options, ignore_case=ignore_case) @@ -208,21 +207,43 @@ def load_config(config_path: Path): else: logger.info(f"No .env file found at {dotenv_home_path}, checking system environment variables.") - # 2. Check for API key in environment variables first + # 1. Load YAML file specified by config_path + try: + with open(config_path) as f_local: + loaded_yaml_config = yaml.safe_load(f_local) or {} + except Exception as e: + logger.error(f"Failed to load or parse YAML configuration at {config_path}: {e}", exc_info=True) + # Return a minimal config or raise error, depending on desired handling + # For now, return a dictionary that might miss keys, leading to later warnings/defaults. + loaded_yaml_config = {} + + # 2. Load environment variables from ~/.env (if it exists) + dotenv_home_path = Path.home() / '.env' + if dotenv_home_path.is_file(): + load_dotenv(dotenv_path=dotenv_home_path) + logger.info(f"Loaded environment variables from {dotenv_home_path}") + else: + logger.info(f"No .env file found at {dotenv_home_path}, checking system environment variables.") + + # 3. Check for API key in environment variables first env_api_key = os.getenv('GEMINI_API_KEY') # 4. Prioritize environment variable for API key if env_api_key: - config['gemini_api_key'] = env_api_key + loaded_yaml_config['gemini_api_key'] = env_api_key logger.info("Using GEMINI_API_KEY from environment.") - elif 'gemini_api_key' in yaml_data and yaml_data['gemini_api_key']: + elif 'gemini_api_key' in loaded_yaml_config and loaded_yaml_config['gemini_api_key']: logger.info("Using gemini_api_key from config.yaml (environment variable not set).") - # It's already in config from the update() step else: logger.warning("GEMINI_API_KEY not found in environment or config.yaml.") + # Update MODEL_NAME based on the loaded config (or use its existing global default) + # This function will return a config dictionary, the caller should update global MODEL_NAME if needed, + # or better, MODEL_NAME should be sourced from this returned config by the caller. + # For now, just log based on what this function sees. + current_model_name = loaded_yaml_config.get('model_name', MODEL_NAME) # Use global MODEL_NAME as fallback for logging + logger.info(f"Model specified in config/default: {current_model_name}") - logger.info(f"Using model: {MODEL_NAME}") # 5. Resolve paths (relative to project root, which is parent of src/) project_root = Path(__file__).parent.parent @@ -230,26 +251,23 @@ def load_config(config_path: Path): 'SAVED_CONVERSATIONS_DIRECTORY', 'PAPER_DB_PATH', 'PAPER_BLOBS_DIR']: - if key in config and isinstance(config[key], str): - resolved_path = project_root / config[key] + if key in loaded_yaml_config and isinstance(loaded_yaml_config[key], str): + resolved_path = project_root / loaded_yaml_config[key] if key == 'PAPER_DB_PATH': - # Ensure parent dir exists for DB path resolved_path.parent.mkdir(parents=True, exist_ok=True) else: - # Ensure the directory itself exists for others resolved_path.mkdir(parents=True, exist_ok=True) - config[key] = resolved_path - logger.info(f"Resolved path for {key}: {config[key]}") - elif key in config and isinstance(config[key], Path): - # Path might already be resolved if loaded from previous runs/complex config - logger.info(f"Path for {key} already resolved: {config[key]}") - # Ensure directories exist even if path was pre-resolved + loaded_yaml_config[key] = resolved_path + logger.info(f"Resolved path for {key}: {loaded_yaml_config[key]}") + elif key in loaded_yaml_config and isinstance(loaded_yaml_config[key], Path): + logger.info(f"Path for {key} already resolved: {loaded_yaml_config[key]}") if key == 'PAPER_DB_PATH': - config[key].parent.mkdir(parents=True, exist_ok=True) + loaded_yaml_config[key].parent.mkdir(parents=True, exist_ok=True) else: - config[key].mkdir(parents=True, exist_ok=True) + loaded_yaml_config[key].mkdir(parents=True, exist_ok=True) + # If key is missing or not a string/Path, it will be handled by downstream code using .get() with defaults - return config + return loaded_yaml_config def print_welcome_message(config): """Prints the initial welcome and help message.""" @@ -261,118 +279,110 @@ class CodeAgent: def __init__(self, config: dict, conn: Optional[sqlite3.Connection]): """Initializes the CodeAgent.""" self.config = config - self.api_key = config.get('gemini_api_key') # Correct key name - self.model_name = MODEL_NAME - self.pdf_processing_method = config.get('pdf_processing_method', 'Gemini') # Default method - self.client = None - self.chat = None - self.db_path_str = str(config.get('PAPER_DB_PATH')) if config.get('PAPER_DB_PATH') else None # Store DB path string - self.prefill_prompt_content: Optional[str] = None # For pre-filling the next prompt + self.api_key = config.get('gemini_api_key') + # LLMAgentCore now manages its own model_name, client, chat, thinking_budget, etc. + + # Initialize AsyncTaskManager first, as it's needed by LLMAgentCore + self.task_manager = AsyncTaskManager(main_app_handler=self) - # Background asyncio loop (daemon so app exits cleanly) - self.loop = asyncio.new_event_loop() - threading.Thread(target=self.loop.run_forever, daemon=True).start() - # Async GenAI client that lives on that loop - if self.api_key: # Ensure API key exists before creating async client - self.async_client = genai.Client(api_key=self.api_key).aio - else: - self.async_client = None - logger.warning("GEMINI_API_KEY not found. Async GenAI client not initialized.") + self.llm_core = LLMAgentCore( + config=config, + api_key=self.api_key, + model_name=MODEL_NAME, + task_manager=self.task_manager # Pass the initialized task manager + ) + + self.pdf_processing_method = config.get('pdf_processing_method', 'Gemini') + self.db_path_str = str(config.get('PAPER_DB_PATH')) if config.get('PAPER_DB_PATH') else None + self.prefill_prompt_content: Optional[str] = None + + # Async client for PDF processing is now accessed via self.llm_core.async_client + + # active_background_tasks is now managed by self.task_manager + self.pending_script_output: Optional[str] = None # Script output callback will set this - # Collection to keep track of active background tasks (Future objects and metadata) - self.active_background_tasks = {} - self.pending_script_output: Optional[str] = None # For output from /run_script + # active_files are GenAI File objects, managed by CodeAgent (UI layer) + self.active_files: List[genai.types.File] = [] # Changed genai_types.File to genai.types.File + + # UI-related stats, keep them here + self.prompt_time_counts = [0] + self.messages_per_interval = [0] + self._messages_this_interval = 0 - self.conversation_history = [] # Manual history for token counting ONLY - self.current_token_count = 0 # Store token count for the next prompt - self.active_files = [] # List to store active File objects - self.prompt_time_counts = [0] # Stores total tokens just before prompt - self.messages_per_interval = [0] # Stores # messages added in the last interval - self._messages_this_interval = 0 # Temporary counter - self.thinking_budget = config.get('default_thinking_budget', DEFAULT_THINKING_BUDGET) - self.thinking_config = None # Will be set in start_interaction - self.pdfs_dir_rel_path = config.get('PDFS_TO_CHAT_WITH_DIRECTORY') # Relative path from config + # PDF and Blob directories remain relevant for CodeAgent's handling of files + self.pdfs_dir_rel_path = config.get('PDFS_TO_CHAT_WITH_DIRECTORY') self.pdfs_dir_abs_path = Path(self.pdfs_dir_rel_path).resolve() if self.pdfs_dir_rel_path else None self.blob_dir_rel_path = config.get('PAPER_BLOBS_DIR') self.blob_dir = Path(self.blob_dir_rel_path).resolve() if self.blob_dir_rel_path else None - # Store the database connection passed from main - self.conn = conn - # Use imported tool functions - self.tool_functions = [ - tools.read_file, - tools.list_files, - tools.edit_file, - tools.execute_bash_command, - tools.run_in_sandbox, - tools.find_arxiv_papers, - tools.download_arxiv_paper, - tools.get_current_date_and_time, - tools.google_search, - tools.open_url, - tools.upload_pdf_for_gemini, - tools.run_sql_query + self.conn = conn + + # Define the list of tool functions that CodeAgent makes available + original_tool_functions = [ + tools.read_file, tools.list_files, tools.edit_file, + tools.execute_bash_command, tools.run_in_sandbox, + tools.find_arxiv_papers, tools.download_arxiv_paper, + tools.get_current_date_and_time, tools.google_search, # Re-enabled + tools.open_url, # Re-enabled + tools.upload_pdf_for_gemini, tools.run_sql_query ] - if self.config.get('verbose', config.get('verbose', False)): - self.tool_functions = [self._make_verbose_tool(f) for f in self.tool_functions] + + # Register tools with LLMAgentCore + # LLMAgentCore's register_tools now handles verbose wrapping internally. + if self.llm_core: + self.llm_core.register_tools( + original_tool_functions, + verbose=self.config.get('verbose', False) + ) + if self.pdfs_dir_abs_path: self.pdfs_dir_abs_path.mkdir(parents=True, exist_ok=True) logger.info(f"PDF directory set to: {self.pdfs_dir_abs_path}") else: - logger.warning("PDF directory not configured in config.yaml. /pdf command will be disabled.") + logger.warning("PDF directory not configured. /pdf command will be disabled.") if self.blob_dir: self.blob_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Blob directory set to: {self.blob_dir}") else: - logger.warning("Blob directory not configured in config.yaml. Saving extracted text will be disabled.") + logger.warning("Blob directory not configured. Saving extracted text will be disabled.") if not self.conn: - logger.warning("Database connection not established. Database operations will be disabled.") + logger.warning("DB connection not established. DB operations will be disabled.") - if not self.api_key: - logger.warning("GEMINI_API_KEY not found in config or environment. Gemini features disabled.") - else: - self._configure_client() - if self.client: - self._initialize_chat() + # LLMAgentCore handles its own client/chat initialization and warnings. + # No need for direct calls to self._configure_client() or self._initialize_chat() here. - self.pending_pdf_context: Optional[str] = None # For prepending PDF context + self.pending_pdf_context: Optional[str] = None self.prompts_dir = Path('src/prompts').resolve() - - # Ensure directories exist (moved from load_config for clarity) - if self.pdfs_dir_abs_path: - self.pdfs_dir_abs_path.mkdir(parents=True, exist_ok=True) - if self.blob_dir: - self.blob_dir.mkdir(parents=True, exist_ok=True) - if self.prompts_dir: - self.prompts_dir.mkdir(parents=True, exist_ok=True) - - def _configure_client(self): - """Configures the Google Generative AI client.""" - print("\n\u2692\ufe0f Configuring genai client...") - try: - # Configure the client with our API key - self.client = genai.Client(api_key=self.api_key) - print("\u2705 Client configured successfully.") - except Exception as e: - print(f"\u274c Error configuring genai client: {e}") - traceback.print_exc() - sys.exit(1) - - def _initialize_chat(self): - """Initializes the chat session.""" - print("\n\u2692\ufe0f Initializing chat session...") - try: - # Create a chat session using the client - self.chat = self.client.chats.create(model=self.model_name, history=[]) - print("\u2705 Chat session initialized.") - except Exception as e: - print(f"\u274c Error initializing chat session: {e}") - traceback.print_exc() - sys.exit(1) - - # --- Prompt Helper Methods --- - def _list_available_prompts(self) -> list[str]: + self.prompts_dir.mkdir(parents=True, exist_ok=True) # Ensure prompts dir exists + + # _configure_client, _initialize_chat, _make_verbose_tool are removed from CodeAgent + # _on_task_done, _launch_background_task are also removed (moved to AsyncTaskManager) + + def handle_script_completion(self, task_id: str, task_name: str, script_output: str): + """Callback for AsyncTaskManager to set script output.""" + logger.info(f"CodeAgent: Script task '{task_name}' (ID: {task_id}) completed. Output received.") + self.pending_script_output = script_output + output_summary = (script_output[:100] + '...') if len(script_output) > 103 else script_output + # This print might be better handled by a UI update method if using a more complex UI + print(f"\n📄 Output from '{task_name}' is ready and will be included in the next context. Output preview:\n{output_summary}") + self.refresh_prompt_display() # Example: if prompt needs to be re-rendered + + def refresh_prompt_display(self): + """Placeholder for refreshing the prompt_toolkit display if needed after async updates.""" + # In a real prompt_toolkit app, this might involve app.invalidate() or similar + app = get_app() + if app: + app.invalidate() + logger.debug("CodeAgent: Prompt display invalidated for refresh.") + + def update_task_status_display(self, task_id: str, message: str): + """Placeholder for updating task status in a more integrated UI.""" + # This could update a Rich Panel, status bar, etc. For now, just prints. + print(message) # Rich progress is handled by AsyncTaskManager for now + + # --- Prompt Helper Methods (UI related, stay in CodeAgent) --- + def _list_available_prompts(self) -> List[str]: """Lists available prompt names from the prompts directory.""" if not self.prompts_dir.is_dir(): return [] @@ -408,7 +418,9 @@ def _load_prompt(self, prompt_name: str) -> Optional[str]: def _get_dynamic_prompt_message(self): """Returns the dynamic prompt message with status indicators.""" active_files_info = f" [{len(self.active_files)} files]" if self.active_files else "" - token_info = f"({self.current_token_count})" + # Get token count from LLMAgentCore + token_count = self.llm_core.current_token_count if self.llm_core else 0 + token_info = f"({token_count})" return HTML( f'🔵 You ' @@ -416,7 +428,7 @@ def _get_dynamic_prompt_message(self): f'{active_files_info}: ' ) - def _get_continuation_prompt(self, width, line_number, is_soft_wrap): + def _get_continuation_prompt(self, width, line_number, is_soft_wrap): # UI related """Returns the continuation prompt for multi-line input.""" if is_soft_wrap: return ' ' * 2 # Indent for soft wraps @@ -496,23 +508,22 @@ def _handle_slash_command_typing(event): # input_processors=None, ) - def print_initial_help(self): + def print_initial_help(self): # UI related """Prints the initial brief help message.""" print("\n\u2692\ufe0f Agent ready. Ask me anything or type '/help' for commands.") print(" Type '/exit' or '/q' to quit.") - # Key commands can be highlighted if desired, but /help is the main source. def start_interaction(self): """Starts the main interaction loop with enhanced multi-line editing.""" - if not self.client: - print("\n\u274c Client not configured. Exiting.") + if not self.llm_core or not self.llm_core.client or not self.llm_core.chat: + logger.error("LLMAgentCore not initialized properly. Exiting.") + print("\n\u274c LLM Core not configured (API key or model issue likely). Exiting.") return - self.print_initial_help() # Use the new brief help message + self.print_initial_help() - # Set initial thinking budget from default/config - self.thinking_config = types.ThinkingConfig(thinking_budget=self.thinking_budget) - print(f"\n🧠 Initial thinking budget set to: {self.thinking_budget} tokens.") + # thinking_budget is now managed by llm_core, display it + print(f"\n🧠 Initial thinking budget set to: {self.llm_core.thinking_budget} tokens.") # Print multi-line editing instructions print("\n📝 Multi-line editing enabled:") @@ -585,12 +596,13 @@ def start_interaction(self): while True: try: - # ─── 1 · house‑keeping before we prompt ────────────────────────── - self.prompt_time_counts.append(self.current_token_count) + # --- House-keeping before prompt --- + # Use token count from llm_core for UI stats if needed by prompt_time_counts + self.prompt_time_counts.append(self.llm_core.current_token_count if self.llm_core else 0) self.messages_per_interval.append(self._messages_this_interval) self._messages_this_interval = 0 - # Get multi-line input with the enhanced session + # --- Get multi-line input --- # The prompt message is now handled by _get_dynamic_prompt_message # Prefill logic needs to be integrated with PromptSession's `default` if needed, or handled before prompt. # For now, we simplify and remove direct prefill_text here as the plan's session.prompt() is simpler. @@ -654,131 +666,44 @@ def start_interaction(self): continue # Command handled, loop to next prompt # --- If not a known slash command, proceed as LLM message --- - self._messages_this_interval += 1 - message_to_send = user_input + self._messages_this_interval += 1 - pdf_context_was_included = False - script_output_was_included = False # New flag - - # Check for PDF context AFTER prompt (so prompt comes first) + # Combine pending PDF context and script output for the message + combined_pending_context = "" if self.pending_pdf_context: + combined_pending_context += self.pending_pdf_context + "\n\n" print("[Including context from previously processed PDF in this message.]\n") - message_to_send = f"{self.pending_pdf_context}\n\n{message_to_send}" - pdf_context_was_included = True - - # Prepend script output AFTER PDF but BEFORE loaded prompt if self.pending_script_output: + combined_pending_context += f"OUTPUT FROM EXECUTED SCRIPT:\n---\n{self.pending_script_output}\n---\n\n" print("[Including output from previously run script in this message.]\n") - # Task name might not be easily available here, use a generic header - message_to_send = f"OUTPUT FROM EXECUTED SCRIPT:\n---\n{self.pending_script_output}\n---\n\n{message_to_send}" - script_output_was_included = True - # --- Log message details before sending --- - pdf_context_len_before_send = len(self.pending_pdf_context) if self.pending_pdf_context and pdf_context_was_included else 0 - script_output_len_before_send = len(self.pending_script_output) if self.pending_script_output and script_output_was_included else 0 - final_message_len = len(message_to_send) - logger.info(f"Preparing to send message.") - logger.info(f" - Original user input length: {len(user_input)}") - logger.info(f" - Included pending PDF context length: {pdf_context_len_before_send}") - logger.info(f" - Included pending script output length: {script_output_len_before_send}") - logger.info(f" - Final message_to_send length: {final_message_len}") - # Log snippets for verification - if final_message_len > 200: - logger.info(f" - Final message start: {message_to_send[:100]}...") - logger.info(f" - Final message end: ...{message_to_send[-100:]}") - - # --- Prepare message content (Text + Files) --- - message_content = [message_to_send] + # Log message details before sending + logger.info(f"Preparing to send message to LLMAgentCore.") + logger.info(f" - Original user input: '{user_input}'") + if combined_pending_context: + logger.info(f" - Combined pending context length: {len(combined_pending_context)}") if self.active_files: - message_content.extend(self.active_files) - if self.config.get('verbose', False): - print(f"\n📎 Attaching {len(self.active_files)} files to the prompt:") - for f in self.active_files: - print(f" - {f.display_name} ({f.name})") - - # --- Update manual history (for token counting ONLY - Use Text Only) --- - new_user_content =types.Content(parts=[types.Part(text=message_to_send)], role="user") - self.conversation_history.append(new_user_content) - - # --- Send Message --- - print("\n⏳ Sending message and processing...") - # Prepare tool configuration **inside the loop** to use the latest budget - tool_config = types.GenerateContentConfig( - tools=self.tool_functions, - thinking_config=self.thinking_config - ) + logger.info(f" - Active files: {[f.name for f in self.active_files]}") - # Send message using the chat object's send_message method - # Pass the potentially combined list of text and files - response = self.chat.send_message( - message=message_content, # Pass the list here - config=tool_config + print("\n⏳ Sending message to LLM Core and processing...") + agent_response_text = self.llm_core.send_message( + user_message_text=user_input, + active_files=self.active_files, # Pass the list of genai.File objects + pending_context=combined_pending_context.strip() if combined_pending_context else None ) - agent_response_text = "" - if response.candidates and response.candidates[0].content: - agent_parts = response.candidates[0].content.parts - agent_response_text = " ".join(p.text for p in agent_parts - if hasattr(p, "text")) - - if agent_response_text: - hist_agent_content = types.Content(role="model", - parts=[types.Part(text=agent_response_text)]) - self.conversation_history.append(hist_agent_content) - print(f"\n🟢 \x1b[92mAgent:\x1b[0m {agent_response_text or '[No response text]'}") - # --- Detailed History Logging Before Token Count --- - logger.debug(f"Inspecting conversation_history (length: {len(self.conversation_history)}) before count_tokens:") - history_seems_ok = True - for i, content in enumerate(self.conversation_history): - logger.debug(f" [{i}] Role: {getattr(content, 'role', 'N/A')}") - if hasattr(content, 'parts'): - for j, part in enumerate(content.parts): - part_type = type(part) - part_info = f"Part {j}: Type={part_type.__name__}" - if hasattr(part, 'text'): - part_info += f", Text='{part.text[:50]}...'" - elif hasattr(part, 'file_data'): - part_info += f", FileData URI='{getattr(part.file_data, 'file_uri', 'N/A')}'" - history_seems_ok = False # Found a file part! - logger.error(f" 🚨 ERROR: Found unexpected file_data part in history for token counting: {part_info}") - elif hasattr(part, 'function_call'): - part_info += f", FunctionCall Name='{getattr(part.function_call, 'name', 'N/A')}'" - history_seems_ok = False # Found a function call part! - logger.error(f" 🚨 ERROR: Found unexpected function_call part in history for token counting: {part_info}") - else: - # Log other unexpected part types - history_seems_ok = False - logger.error(f" 🚨 ERROR: Found unexpected part type in history for token counting: {part_info}") - logger.debug(f" {part_info}") - else: - logger.warning(f" [{i}] Content object has no 'parts' attribute.") - if history_seems_ok: - logger.debug("History inspection passed: Only text parts found.") - else: - logger.error("History inspection FAILED: Non-text parts found. Token counting will likely fail.") - # --- End Detailed History Logging --- + # Display token count from LLMAgentCore + print(f"\n[Token Count: {self.llm_core.current_token_count}]") - # Calculate and display token count using client.models - try: - token_info = self.client.models.count_tokens( - model=self.model_name, - contents=self.conversation_history - ) - self.current_token_count = token_info.total_tokens - print(f"\n[Token Count: {self.current_token_count}]") - except Exception as count_err: - logger.error(f"Error calculating token count: {count_err}", exc_info=True) - print("🚨 Error: Failed to calculate token count.") - - # --- NOW clear contexts that were actually sent --- - if pdf_context_was_included: + # Clear contexts now that they've been sent + if self.pending_pdf_context: self.pending_pdf_context = None - logger.info("Cleared pending_pdf_context after sending to LLM.") - if script_output_was_included: + logger.info("Cleared pending_pdf_context in CodeAgent after sending to LLM Core.") + if self.pending_script_output: self.pending_script_output = None - logger.info("Cleared pending_script_output after sending to LLM.") + logger.info("Cleared pending_script_output in CodeAgent after sending to LLM Core.") except KeyboardInterrupt: print("\n👋 Goodbye!") @@ -786,19 +711,10 @@ def start_interaction(self): except Exception as e: print(f"\n🔴 An error occurred during interaction: {e}") traceback.print_exc() - # Potentially add a small delay or a prompt to continue/exit here - - def _make_verbose_tool(self, func): - """Wrap tool function to print verbose info when called.""" - @functools.wraps(func) - def wrapper(*args, **kwargs): - print(f"\n🔧 Tool called: {func.__name__}, args: {args}, kwargs: {kwargs}") - result = func(*args, **kwargs) - print(f"\n▶️ Tool result ({func.__name__}): {result}") - return result - return wrapper - - def _handle_pdf_command(self, args: list): + + # _make_verbose_tool is removed, LLMAgentCore handles verbose tool logging. + + def _handle_pdf_command(self, args: list): # UI related, but calls core async client """Handles the /pdf command to asynchronously process a PDF file. Args: @@ -814,9 +730,9 @@ def _handle_pdf_command(self, args: list): print("\n⚠️ Database connection not available. Cannot save PDF metadata.") return - # Async client check (used by _process_pdf_async_v2) - if not self.async_client: - print("\n⚠️ Async Gemini client not initialized. Cannot process PDF asynchronously.") + # Async client check (used by _process_pdf_async_v2, now from llm_core) + if not self.llm_core or not self.llm_core.async_client: + print("\n⚠️ Async Gemini client (via LLM Core) not initialized. Cannot process PDF asynchronously.") return if not args: @@ -894,7 +810,7 @@ def _handle_pdf_command(self, args: list): # Optionally, if you want to also add the GenAI file to active_files if available genai_uri = cached_paper_info.get("genai_file_uri") - if genai_uri and self.client: + if genai_uri and self.llm_core and self.llm_core.client: # Check llm_core.client try: # We need to "reconstruct" a File object or decide if it's needed # For now, we focus on context. If direct file interaction is needed, @@ -957,15 +873,26 @@ def _handle_pdf_command(self, args: list): # and we add pdf_path, arxiv_id_arg, and paper_id via partial. specific_pdf_task_coro_creator = functools.partial(self._process_pdf_async_v2, pdf_path=pdf_path, - arxiv_id=arxiv_id_arg, - paper_id=paper_id) # _process_pdf_async_v2 will access self.db_path_str + arxiv_id=arxiv_id_arg, # This was arxiv_id_arg before, ensure consistency + paper_id=paper_id) - self._launch_background_task(specific_pdf_task_coro_creator, task_name=f"PDF-{pdf_path.name}") - # The _launch_background_task will print a message like: - # "⏳ 'PDF-mypaper.pdf' (ID: ) started in background – you can keep chatting." - # The actual result/feedback will come from the _on_task_done callback via prints. + self.task_manager.submit_task( + specific_pdf_task_coro_creator, + task_name=f"PDF-{pdf_path.name}", + task_meta={"type": "pdf_processing", "original_filename": pdf_path.name} + ) - def _finalize_pdf_ingest(self, pdf_file_resource: types.File, arxiv_id: Optional[str], original_pdf_path: Path, paper_id: Optional[int], db_path_str: Optional[str]): + def handle_pdf_completion(self, task_id: str, task_name: str, result: Any): + """Callback for AsyncTaskManager upon PDF processing completion.""" + # Result might be a status message or the genai.File object if processing was successful + # The _process_pdf_async_v2 already sets self.pending_pdf_context via _finalize_pdf_ingest + logger.info(f"CodeAgent: PDF Processing task '{task_name}' (ID: {task_id}) completed by manager. Result: {result}") + # Further actions can be taken here based on the result if needed. + # For example, if result indicates success, print a specific message to user. + # The _on_task_done in AsyncTaskManager already prints a generic completion message. + # self.pending_pdf_context is set within _finalize_pdf_ingest, called by the async task. + + def _finalize_pdf_ingest(self, pdf_file_resource: genai.types.File, arxiv_id: Optional[str], original_pdf_path: Path, paper_id: Optional[int], db_path_str: Optional[str]): """Synchronous method for final PDF ingestion steps after GenAI processing is ACTIVE. This method is called via asyncio.to_thread() from an async task. It handles text extraction (potentially blocking), blob saving (blocking), @@ -994,16 +921,17 @@ def _finalize_pdf_ingest(self, pdf_file_resource: types.File, arxiv_id: Optional logger.error(f"Finalize Thread Paper ID {paper_id}: Could not establish new DB connection. Aborting.") return # Cannot update status without DB connection - # Ensure synchronous GenAI client is available (it's on self, created in main thread, typically fine for read-only attributes or creating new requests) - if not self.client: - logger.error(f"Finalize Thread Paper ID {paper_id}: Synchronous GenAI client (self.client) not available. Aborting text extraction.") + # Ensure synchronous GenAI client is available via llm_core + if not self.llm_core or not self.llm_core.client: + logger.error(f"Finalize Thread Paper ID {paper_id}: Synchronous GenAI client (self.llm_core.client) not available. Aborting text extraction.") database.update_paper_field(local_conn, paper_id, 'status', 'error_extraction_final_no_client') return extracted_text: Optional[str] = None try: logger.info(f"Finalize Thread Paper ID {paper_id}: Extracting text from '{original_pdf_path.name}'.") - extracted_text = tools.extract_text_from_pdf_gemini(original_pdf_path, self.client, self.model_name) + # Use client and model_name from llm_core + extracted_text = tools.extract_text_from_pdf_gemini(original_pdf_path, self.llm_core.client, self.llm_core.model_name) if not extracted_text: logger.warning(f"Finalize Thread Paper ID {paper_id}: Text extraction returned no content for '{original_pdf_path.name}'.") database.update_paper_field(local_conn, paper_id, 'status', 'error_extraction_final_empty') @@ -1072,25 +1000,26 @@ async def _process_pdf_async_v2(self, task_id: str, pdf_path: Path, arxiv_id: st Processes a PDF file asynchronously using client.aio.files: uploads to GenAI, monitors processing, and finalizes ingestion. Designed for cooperative cancellation. """ - if not self.async_client: - error_message = f"Task {task_id}: Async client not available. Cannot process {pdf_path.name}." - logging.error(error_message) - progress_bar.update(rich_task_id, description=f"❌ {pdf_path.name} failed: Async client missing", completed=100, total=100) + # progress_bar and rich_task_id are now passed by AsyncTaskManager.submit_task + if not self.llm_core or not self.llm_core.async_client: + error_message = f"Task {task_id}: Async client (via LLM Core) not available. Cannot process {pdf_path.name}." + logger.error(error_message) + if progress_bar: progress_bar.update(rich_task_id, description=f"❌ {pdf_path.name} failed: Async client missing", completed=True) raise RuntimeError(error_message) - client = self.async_client + async_genai_client = self.llm_core.async_client pdf_file_display_name = pdf_path.name progress_bar.update(rich_task_id, description=f"Starting {pdf_file_display_name}…") - genai_file_resource: Optional[types.File] = None + genai_file_resource: Optional[genai.types.File] = None # Changed types.File to genai.types.File try: progress_bar.update(rich_task_id, description=f"Uploading {pdf_file_display_name}…") # TODO: Add timeout for upload if necessary, e.g., asyncio.timeout(60, ...) - upload_config = types.UploadFileConfig( - display_name=pdf_path.name # Use the original PDF filename as the display name + upload_config = genai.types.UploadFileConfig( # Changed genai_types to genai.types + display_name=pdf_path.name ) - genai_file_resource = await client.files.upload( + genai_file_resource = await async_genai_client.files.upload( # Use the alias file=pdf_path, config=upload_config ) @@ -1098,13 +1027,11 @@ async def _process_pdf_async_v2(self, task_id: str, pdf_path: Path, arxiv_id: st progress_bar.update(rich_task_id, description=f"Processing {genai_file_resource.display_name} with GenAI…") while genai_file_resource.state.name == "PROCESSING": - await asyncio.sleep(5) # Non-blocking poll interval - # Refresh file state using its unique resource name (genai_file_resource.name) - # TODO: Add timeout for get if necessary - genai_file_resource = await client.files.get(name=genai_file_resource.name) + await asyncio.sleep(5) + genai_file_resource = await async_genai_client.files.get(name=genai_file_resource.name) # Use the alias logger.debug(f"Task {task_id}: Polled {genai_file_resource.name}, state: {genai_file_resource.state.name}") - if genai_file_resource.state.name != "ACTIVE": # Check for "ACTIVE" as the desired terminal success state + if genai_file_resource.state.name != "ACTIVE": error_message = f"Task {task_id}: PDF {genai_file_resource.display_name} processing failed or unexpected state: {genai_file_resource.state.name}" logging.error(error_message) if genai_file_resource.state.name == "FAILED" and hasattr(genai_file_resource, 'error') and genai_file_resource.error: @@ -1154,132 +1081,31 @@ async def _process_pdf_async_v2(self, task_id: str, pdf_path: Path, arxiv_id: st if genai_file_resource and genai_file_resource.name: try: logger.info(f"Task {task_id} ({display_name}): Attempting to delete GenAI file {genai_file_resource.name} due to error.") - await client.files.delete(name=genai_file_resource.name) + await async_genai_client.files.delete(name=genai_file_resource.name) # Use the alias logger.info(f"Task {task_id} ({display_name}): Successfully deleted GenAI file {genai_file_resource.name} after error.") except Exception as del_e: logger.error(f"Task {task_id} ({display_name}): Failed to delete GenAI file {genai_file_resource.name} after error: {del_e}") raise finally: - # Ensure progress bar always stops if not already explicitly marked completed/failed by an update. - # This check might be overly cautious if all paths correctly update and stop the bar. - # progress_bar.stop() # This is handled by _on_task_done + # The AsyncTaskManager._on_task_done will handle stopping the progress bar. pass - def _on_task_done(self, task_id: str, task_name: str, future: asyncio.Future): - """Callback executed when a background task finishes.""" - try: - result = future.result() # Raise exception if task failed - print(f"\n✅ Task '{task_name}' (ID: {task_id}) completed successfully. Result: {result}") - logging.info(f"Task '{task_name}' (ID: {task_id}) completed successfully. Result: {result}") - except asyncio.CancelledError: - print(f"\n🚫 Task '{task_name}' (ID: {task_id}) was cancelled.") - logging.warning(f"Task '{task_name}' (ID: {task_id}) was cancelled.") - except Exception as e: - print(f"\n❌ Task '{task_name}' (ID: {task_id}) failed: {type(e).__name__}: {e}") - logging.error(f"Task '{task_name}' (ID: {task_id}) failed.", exc_info=True) - finally: - # Remove task from active list - task_info = self.active_background_tasks.pop(task_id, None) - # Stop progress bar if it exists and task_info is not None - if task_info and "progress_bar" in task_info: - task_info["progress_bar"].stop() - # Potentially refresh prompt or UI - # Check task_meta for script execution output - if task_info and future.exception() is None and not future.cancelled(): - meta = task_info.get("meta", {}) - if meta.get("type") == "script_execution": - script_output = future.result() # This is the string output from _execute_script_async - self.pending_script_output = script_output - original_command = meta.get("original_command", "Unknown script") - # Truncate for print message if too long - output_summary = (script_output[:100] + '...') if len(script_output) > 103 else script_output - print(f"\n📄 Output from '{original_command}' is ready and will be included in the next context. Output preview:\n{output_summary}") - logger.info(f"Task '{task_name}' (ID: {task_id}) was a script execution. Output stored in pending_script_output.") - - def _launch_background_task(self, coro_func, task_name: str, progress_total: float = 100.0, task_meta: Optional[dict] = None): - """ - Launches a coroutine as a background task with progress display. - `coro_func` should be a functools.partial or lambda that creates the coroutine, - and the coroutine it creates should accept (task_id, progress_bar, rich_task_id) as arguments. - `task_meta` is an optional dictionary to store extra info about the task. - """ - task_id = str(uuid.uuid4()) - - progress = Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), - TimeElapsedColumn(), - # transient=True # Consider if progress should disappear after completion - ) - rich_task_id = progress.add_task(description=f"Initializing {task_name}...", total=progress_total) - - # The coroutine (created by coro_func) must accept these specific arguments. - task_coro = coro_func(task_id=task_id, progress_bar=progress, rich_task_id=rich_task_id) - - fut = asyncio.run_coroutine_threadsafe(task_coro, self.loop) - self.active_background_tasks[task_id] = { - "future": fut, - "name": task_name, - "progress_bar": progress, # Store to stop it in _on_task_done - "rich_task_id": rich_task_id, - "meta": task_meta if task_meta else {} # Store metadata - } - - fut.add_done_callback( - lambda f: self._on_task_done(task_id, task_name, f) - ) - - print(f"⏳ '{task_name}' (ID: {task_id}) started in background – you can keep chatting.") - # The Progress object itself will be updated by the task coroutine. - # How it's displayed (e.g., via rich.Live or direct printing) will be - # handled by the calling environment or a dedicated UI management part. - return task_id # Return task_id along with progress for better management if needed in future + # _on_task_done and _launch_background_task are now part of AsyncTaskManager. def _handle_cancel_command(self, task_id_str: str): - """Attempts to cancel an active background task.""" - task_info = self.active_background_tasks.get(task_id_str) - if not task_info: - print(f"\n❌ Task ID '{task_id_str}' not found or already completed.") + """Delegates task cancellation to AsyncTaskManager.""" + if not task_id_str: # Basic validation, though slashcommands.py should ensure arg + print("\n⚠️ Usage: /cancel ") return - - future = task_info.get("future") - task_name = task_info.get("name", "Unnamed Task") - - if future and not future.done(): - cancelled = future.cancel() - if cancelled: - print(f"\n➡️ Cancellation request sent for task '{task_name}' (ID: {task_id_str}).") - # The _on_task_done callback will eventually report it as cancelled. - else: - print(f"\n❌ Failed to send cancellation request for task '{task_name}' (ID: {task_id_str}). It might be already completing or uncancelable.") - elif future and future.done(): - print(f"\nℹ️ Task '{task_name}' (ID: {task_id_str}) has already completed.") - else: - print(f"\n⚠️ Could not cancel task '{task_name}' (ID: {task_id_str}). Future object missing or invalid state.") + self.task_manager.cancel_task(task_id_str) def _handle_list_tasks_command(self): - """Lists active background tasks.""" - if not self.active_background_tasks: - print("\nℹ️ No active background tasks.") - return - - print("\n📋 Active Background Tasks:") - for task_id, info in self.active_background_tasks.items(): - future = info.get("future") - name = info.get("name", "Unnamed Task") - status = "Running" - if future: - if future.cancelled(): - status = "Cancelling" - elif future.done(): # Should ideally be removed by _on_task_done, but check just in case - status = "Completed (Pending Removal)" - print(f" - ID: {task_id}, Name: {name}, Status: {status}") + """Delegates listing tasks to AsyncTaskManager.""" + self.task_manager.list_tasks() def _handle_run_script_command(self, script_type: str, script_path: str, script_args: list[str]): - """Handles the /run_script command to execute a script asynchronously.""" + """Submits a script execution task to AsyncTaskManager.""" logger.info(f"Received /run_script command: type={script_type}, path={script_path}, args={script_args}") # Basic validation for script_path to prevent execution of arbitrary system commands if script_type is 'shell' @@ -1294,16 +1120,19 @@ def _handle_run_script_command(self, script_type: str, script_path: str, script_ original_full_command = f"{script_type} {script_path} {' '.join(script_args)}".strip() task_meta = {"type": "script_execution", "original_command": original_full_command} - script_coro_creator = functools.partial(self._execute_script_async, + script_coro_creator = functools.partial(self._execute_script_async, # _execute_script_async is now part of CodeAgent script_type=script_type, script_path_str=script_path, script_args=script_args) - self._launch_background_task(script_coro_creator, task_name=task_name, task_meta=task_meta) + self.task_manager.submit_task(script_coro_creator, task_name=task_name, task_meta=task_meta) - async def _execute_script_async(self, task_id: str, progress_bar: Progress, rich_task_id, script_type: str, script_path_str: str, script_args: list[str]): - """Asynchronously executes a python or shell script and captures its output.""" - progress_bar.update(rich_task_id, description=f"Preparing {script_type} script: {Path(script_path_str).name}") + async def _execute_script_async(self, task_id: str, progress_bar: Progress, rich_task_id: Any, script_type: str, script_path_str: str, script_args: list[str]): + """Asynchronously executes a python or shell script and captures its output. + This method is now a coroutine that will be run by AsyncTaskManager. + """ + # progress_bar and rich_task_id are passed by the AsyncTaskManager + if progress_bar: progress_bar.update(rich_task_id, description=f"Preparing {script_type} script: {Path(script_path_str).name}", start=True) # Define a scripts directory (e.g., project_root / 'scripts') # For now, let's assume scripts are relative to the workspace_root (agent's CWD) @@ -1412,21 +1241,29 @@ def main(): print("\n⚠️ Warning: 'PAPER_DB_PATH' not specified in config.yaml. Proceeding without database features.") # --- End Database Setup --- + agent = None # Initialize agent to None for the finally block try: # Pass the established connection (or None) to the agent agent = CodeAgent(config=config, conn=conn) agent.start_interaction() except KeyboardInterrupt: - print("\nExiting...") + print("\nExiting...") # User interruption except Exception as e: logger.error(f"An unexpected error occurred in the main loop: {e}", exc_info=True) print(f"\n❌ An unexpected error occurred: {e}") finally: # Ensure database connection is closed on exit if conn: - logger.info("Closing database connection...") - database.close_db_connection(conn) - logger.info("Database connection closed.") + logger.info("Closing database connection...") + database.close_db_connection(conn) + logger.info("Database connection closed.") + + # Shutdown async task manager if agent was initialized + if agent and hasattr(agent, 'task_manager') and agent.task_manager: + logger.info("Shutting down AsyncTaskManager from main...") + agent.task_manager.shutdown() + logger.info("AsyncTaskManager shutdown complete from main.") + print("\nGoodbye!") if __name__ == "__main__": diff --git a/src/slashcommands.py b/src/slashcommands.py index 5736517..9a84059 100644 --- a/src/slashcommands.py +++ b/src/slashcommands.py @@ -3,7 +3,7 @@ import json import datetime import logging -from google.genai import types # Assuming types will be needed +import google.generativeai as genai # Changed import from prompt_toolkit import PromptSession # Add this import from prompt_toolkit.document import Document # Add this import @@ -34,10 +34,10 @@ def handle_help_command(agent: 'CodeAgent'): print(" /pdf [id] - Process a PDF file") print(" /prompt - Load a system prompt") print(" /reset - Clear chat history") - print(" /clear - Remove tokens from history") + print(" /clear - Remove tokens from history (experimental with LLMAgentCore)") print(" /save [filename] - Save conversation") print(" /load - Load conversation") - print(f" /thinking_budget - Set thinking budget (current: {agent.thinking_budget})") + print(f" /thinking_budget - Set thinking budget (current: {agent.llm_core.thinking_budget if agent.llm_core else 'N/A'})") print(" /tasks - List background tasks") print(" /cancel - Cancel background task") print(" /run_script - Run a script") @@ -104,29 +104,38 @@ def handle_save_command(agent: 'CodeAgent', args: List[str]): save_path = save_path_base / filename save_state = { - 'conversation_history': [], - 'current_token_count': agent.current_token_count, + # CLI-specific states from CodeAgent 'prompt_time_counts': agent.prompt_time_counts, 'messages_per_interval': agent.messages_per_interval, '_messages_this_interval': agent._messages_this_interval, - 'active_files': [getattr(f, 'name', str(f)) for f in agent.active_files], # Ensure files are serializable - 'thinking_budget': agent.thinking_budget, + 'active_files': [getattr(f, 'name', str(f)) for f in agent.active_files], 'pending_pdf_context': agent.pending_pdf_context, - 'model_name': agent.model_name, # Save the model name used for this conversation - 'saved_at': now_str # Add a timestamp for when it was saved + 'saved_at': now_str } - serializable_history = [] - for content in agent.conversation_history: # Access via agent instance - if hasattr(content, 'parts') and hasattr(content, 'role'): - parts_text = [part.text for part in content.parts if hasattr(part, 'text') and part.text is not None] - serializable_history.append({ - 'role': content.role, - 'parts': parts_text - }) - else: - logger.warning(f"Skipping non-standard content object during save: {content}") - save_state['conversation_history'] = serializable_history + if agent.llm_core: + # LLM-specific states from LLMAgentCore + serializable_history = [] + for content in agent.llm_core.conversation_history: + if hasattr(content, 'parts') and hasattr(content, 'role'): + parts_text = [part.text for part in content.parts if hasattr(part, 'text') and part.text is not None] + serializable_history.append({ + 'role': content.role, + 'parts': parts_text + }) + else: + logger.warning(f"Skipping non-standard content object during save: {content}") + save_state['conversation_history'] = serializable_history + save_state['current_token_count'] = agent.llm_core.current_token_count + save_state['model_name'] = agent.llm_core.model_name + save_state['thinking_budget'] = agent.llm_core.thinking_budget + else: + # Fallback if llm_core is not available (should not happen in normal operation) + save_state['conversation_history'] = [] + save_state['current_token_count'] = 0 + save_state['model_name'] = "unknown" + save_state['thinking_budget'] = agent.config.get('default_thinking_budget', 256) + try: with open(save_path, 'w') as f: json.dump(save_state, f, indent=2) @@ -167,9 +176,9 @@ def handle_load_command(agent: 'CodeAgent', args: List[str]): for item in load_state['conversation_history']: if isinstance(item, dict) and 'role' in item and 'parts' in item and isinstance(item['parts'], list): # Ensure parts are correctly formed for google.genai.types.Content - valid_parts = [types.Part(text=part_text) for part_text in item['parts'] if isinstance(part_text, str)] - if valid_parts: # Only create content if there are valid parts - content = types.Content(role=item['role'], parts=valid_parts) + valid_parts = [genai.types.Part(text=part_text) for part_text in item['parts'] if isinstance(part_text, str)] + if valid_parts: + content = genai.types.Content(role=item['role'], parts=valid_parts) reconstructed_history.append(content) else: logger.warning(f"Skipping history item with no valid text parts: {item}") @@ -178,33 +187,36 @@ def handle_load_command(agent: 'CodeAgent', args: List[str]): else: logger.warning(f"'conversation_history' key missing or not a list in {filename}") - agent.conversation_history = reconstructed_history - agent.current_token_count = load_state.get('current_token_count', 0) + # Load into LLMAgentCore + if agent.llm_core: + agent.llm_core.conversation_history = reconstructed_history + agent.llm_core.current_token_count = load_state.get('current_token_count', 0) + + loaded_model_name = load_state.get('model_name', agent.llm_core.model_name) + if agent.llm_core.model_name != loaded_model_name: + logger.info(f"Loaded conversation used model '{loaded_model_name}'. Agent's current model is '{agent.llm_core.model_name}'. Re-initializing chat with loaded model.") + agent.llm_core.model_name = loaded_model_name # Update model name in core + + agent.llm_core.thinking_budget = load_state.get('thinking_budget', agent.config.get('default_thinking_budget', 256)) + # Update generation_config in llm_core (currently GenerationConfig() is simple, but if it had params from thinking_budget) + agent.llm_core.generation_config = genai.types.GenerationConfig() # Re-init; add params if any depend on thinking_budget + + if agent.llm_core.client: + # Re-initialize chat session in LLMCore with loaded history and model + agent.llm_core._initialize_chat() # This now uses self.conversation_history + print(f"\n📂 Loaded conversation into LLM Core from: {filename} (using model {agent.llm_core.model_name})") + else: + print("\n❌ LLM Core client not configured. Cannot fully restore chat session.") + else: + print("\n❌ LLM Core not available. Cannot load conversation state.") + + # Load/reset CodeAgent (CLI) specific states agent.prompt_time_counts = load_state.get('prompt_time_counts', [0]) agent.messages_per_interval = load_state.get('messages_per_interval', [0]) agent._messages_this_interval = load_state.get('_messages_this_interval', 0) - agent.active_files = [] # Files are not restored from save, user needs to re-add - - # Restore thinking budget and pending states - agent.thinking_budget = load_state.get('thinking_budget', agent.config.get('default_thinking_budget', 256)) - agent.thinking_config = types.ThinkingConfig(thinking_budget=agent.thinking_budget) # Re-apply thinking config + agent.active_files = [] agent.pending_pdf_context = load_state.get('pending_pdf_context') - - # Restore model name if available, otherwise use current agent's default - loaded_model_name = load_state.get('model_name', agent.model_name) - if agent.model_name != loaded_model_name: - logger.info(f"Loaded conversation used model '{loaded_model_name}'. Current agent model is '{agent.model_name}'.") - # For simplicity, we'll use the agent's current model for the new chat session. - # If strict model adherence from save file is needed, agent.model_name would need to be updated - # and potentially the client/chat re-initialized if the model change is significant. - - if agent.client: - agent.chat = agent.client.chats.create(model=agent.model_name, history=agent.conversation_history) - print(f"\n📂 Loaded conversation from: {filename} (using model {agent.model_name})") - if agent.pending_pdf_context: print(" Loaded pending PDF context is active.") - else: - print("\n❌ Client not configured. Cannot fully restore chat session from load.") - # History and pending states are loaded, but chat object isn't live. + if agent.pending_pdf_context: print(" Loaded pending PDF context is active for next prompt.") except json.JSONDecodeError as e: print(f"\n❌ Failed to load conversation: Invalid JSON in {filename}. Error: {e}") @@ -219,12 +231,16 @@ def handle_thinking_budget_command(agent: 'CodeAgent', args: List[str]): if len(args) == 1: try: new_budget = int(args[0]) - if 0 <= new_budget <= 24000: - agent.thinking_budget = new_budget - agent.thinking_config = types.ThinkingConfig(thinking_budget=agent.thinking_budget) - print(f"\n🧠 Thinking budget updated to: {agent.thinking_budget} tokens.") + if agent.llm_core: + agent.llm_core.set_thinking_budget(new_budget) # Delegate to LLMAgentCore + # The set_thinking_budget method in LLMAgentCore will log the effect. + # We still update CodeAgent's thinking_budget for display in /help if needed. + agent.thinking_budget = agent.llm_core.thinking_budget + print(f"\n🧠 Thinking budget value set to: {agent.llm_core.thinking_budget} tokens. Note: Effect on LLM generation behavior via GenerationConfig is pending specific SDK mapping.") else: - print("\n⚠️ Thinking budget must be between 0 and 24000.") + print("\n❌ LLM Core not available. Cannot set thinking budget.") + except ValueError: + print("\n⚠️ Invalid number format for thinking budget.") except ValueError: print("\n⚠️ Invalid number format for thinking budget.") else: @@ -232,113 +248,53 @@ def handle_thinking_budget_command(agent: 'CodeAgent', args: List[str]): def handle_reset_command(agent: 'CodeAgent'): - """Handles the /reset command to clear chat history.""" - print("\n🎯 Resetting context and starting a new chat session...") - if agent.client: - agent.chat = agent.client.chats.create(model=agent.model_name, history=[]) + """Handles the /reset command to clear chat history and CLI state.""" + print("\n🎯 Resetting LLM chat session and CLI state...") + if agent.llm_core: + agent.llm_core.reset_chat() + print(" LLM Core chat session reset.") else: - agent.chat = None - print("\n⚠️ Client not configured, cannot create new chat session. History cleared locally.") + print(" LLM Core not available.") - agent.conversation_history = [] - agent.current_token_count = 0 + # Clear CodeAgent specific states agent.active_files = [] - agent.prompt_time_counts = [0] + agent.prompt_time_counts = [0] # Reset UI stats agent.messages_per_interval = [0] agent._messages_this_interval = 0 agent.pending_pdf_context = None agent.pending_script_output = None - print("\n✅ Chat session and history cleared.") + print(" CodeAgent CLI state (active files, pending contexts, stats) cleared.") + print("\n✅ Reset complete.") def handle_clear_command(agent: 'CodeAgent', args: List[str]): - """Handles the /clear command to remove tokens from history.""" + """Handles the /clear command to remove tokens from LLMAgentCore's history.""" if not args: - print("\n⚠️ Usage: /clear ") + print("\n⚠️ Usage: /clear ") + return + + if not agent.llm_core: + print("\n❌ LLM Core not available. Cannot clear history.") return + try: tokens_to_clear_target = int(args[0]) if tokens_to_clear_target <= 0: print("\n⚠️ Number of tokens must be positive.") return - if not agent.conversation_history: - print("Chat history is already empty.") - return + messages_removed, tokens_cleared = agent.llm_core.clear_history_by_tokens(tokens_to_clear_target) - if not agent.client: - print("\n⚠️ Gemini client not available. Cannot accurately clear tokens by count.") - return - - logger.info(f"Attempting to clear approx. {tokens_to_clear_target} tokens from history.") - - new_history = list(agent.conversation_history) - tokens_counted_for_removal = 0 - messages_removed_count = 0 - history_modified_by_clear = False - - while tokens_counted_for_removal < tokens_to_clear_target and new_history: - first_message = new_history[0] - message_tokens = 0 - try: - if isinstance(first_message, types.Content): - message_tokens = agent.client.models.count_tokens(model=agent.model_name, contents=[first_message]).total_tokens - else: - logger.warning(f"Skipping non-Content item in history for token counting: {type(first_message)}") - message_tokens = 75 - except Exception as e_count: - logger.error(f"Could not count tokens for a message during /clear: {e_count}. Using estimate.", exc_info=True) - message_tokens = 75 - - tokens_counted_for_removal += message_tokens - new_history.pop(0) - messages_removed_count += 1 - history_modified_by_clear = True - - logger.info(f"After initial pass, {messages_removed_count} messages ({tokens_counted_for_removal} tokens) selected for removal.") - - additional_messages_removed_for_role = 0 - while new_history and new_history[0].role != "user": - logger.warning("History after initial clear pass starts with a model turn. Removing additional leading model messages.") - new_history.pop(0) - messages_removed_count += 1 - additional_messages_removed_for_role += 1 - history_modified_by_clear = True - if additional_messages_removed_for_role > 0: - logger.info(f"Removed {additional_messages_removed_for_role} additional model messages to ensure user turn start.") - - if not history_modified_by_clear: - print(f"No messages were cleared. Requested {tokens_to_clear_target} tokens might be less than the first message(s), history is empty, or history is too short.") - return - - agent.conversation_history = new_history - - if not agent.conversation_history: - agent.current_token_count = 0 - agent.prompt_time_counts = [0] - agent.messages_per_interval = [0] + if messages_removed > 0: + print(f"\n✅ Cleared {messages_removed} message(s) (approx. {tokens_cleared} tokens counted for removal).") + print(f" New total LLM history tokens: {agent.llm_core.current_token_count if agent.llm_core.current_token_count != -1 else 'Error counting'}") else: - try: - agent.current_token_count = agent.client.models.count_tokens( - model=agent.model_name, - contents=agent.conversation_history - ).total_tokens - except Exception as e_recount: - logger.error(f"Error recounting tokens after /clear: {e_recount}. Token count may be inaccurate.", exc_info=True) - agent.current_token_count = -1 # Indicate error - agent.prompt_time_counts = [0, agent.current_token_count if agent.current_token_count != -1 else 0] - agent.messages_per_interval = [0, len(agent.conversation_history)] - - agent._messages_this_interval = 0 + print("\nℹ️ No messages were cleared. History might be empty or target tokens too low.") - try: - agent.chat = agent.client.chats.create(model=agent.model_name, history=agent.conversation_history) - logger.info("Chat session re-initialized after /clear operation.") - print(f"\n✅ Cleared {messages_removed_count} message(s) (approx. {tokens_counted_for_removal} tokens counted). " - f"New total tokens: {agent.current_token_count if agent.current_token_count != -1 else 'Error counting'}") - except Exception as e_chat_reinit: - logger.error(f"Error re-initializing chat after /clear: {e_chat_reinit}", exc_info=True) - print(f"\n⚠️ Error re-initializing chat session: {e_chat_reinit}. History updated, but chat object may be stale.") + # Reset CLI-specific history tracking stats on CodeAgent + agent.prompt_time_counts = [0, agent.llm_core.current_token_count if agent.llm_core.current_token_count != -1 else 0] + agent.messages_per_interval = [0, len(agent.llm_core.conversation_history)] # Assuming direct access for length, or add a getter + agent._messages_this_interval = 0 except ValueError: print("\n⚠️ Invalid number format for tokens.") @@ -418,7 +374,7 @@ def _display_full_history(agent: 'CodeAgent'): print("-" * 80) for i, msg in enumerate(agent.conversation_history, 1): - if not isinstance(msg, types.Content): + if not isinstance(msg, genai.types.Content): # Changed types to genai.types logger.warning(f"Skipping non-Content item in history: {type(msg)}") continue @@ -470,7 +426,7 @@ def _display_token_limited_history(agent: 'CodeAgent', mode: str, num_tokens_tar messages_to_display = [] for msg in history_to_scan: - if not isinstance(msg, types.Content): + if not isinstance(msg, genai.types.Content): # Changed types to genai.types logger.warning(f"Skipping non-Content item in history: {type(msg)}") continue diff --git a/src/tools.py b/src/tools.py index 2cbacf1..b6bf511 100644 --- a/src/tools.py +++ b/src/tools.py @@ -10,7 +10,7 @@ from datetime import datetime from zoneinfo import ZoneInfo from src.find_arxiv_papers import build_query, fetch_entries # build_query now in find_arxiv_papers.py -from google import genai +import google.generativeai as genai # Changed import style from pypdf import PdfReader import yaml @@ -18,7 +18,8 @@ import asyncio from pydantic import SecretStr from dotenv import load_dotenv -from src.agent_browser_utils import setup_browser, agent_loop +from src.agent_browser_utils import setup_browser, agent_loop # Uncommented +from langchain_google_genai import ChatGoogleGenerativeAI # Added import import logging import sqlite3 @@ -77,9 +78,11 @@ def _check_docker_running() -> tuple[bool, docker.DockerClient | None, str]: # --- Gemini PDF Processing --- +from typing import Any # Add Any for temporary type hint + def extract_text_from_pdf_gemini( pdf_path: Path, - genai_client: genai.client.Client, + genai_client: Any, # Temporarily changed type hint from genai.Client to Any model_name: str, ) -> str | None: """ @@ -88,7 +91,7 @@ def extract_text_from_pdf_gemini( Args: pdf_path: Path to the local PDF file. - genai_client: An initialized google.genai.client.Client instance. + genai_client: An initialized google.generativeai.Client instance. model_name: The name of the Gemini model to use (e.g., 'gemini-1.5-flash-latest'). Returns: @@ -99,7 +102,7 @@ def extract_text_from_pdf_gemini( return None logger.info(f"Uploading PDF {pdf_path.name} to Gemini...") - uploaded_file: genai.types.File | None = None + uploaded_file: genai.types.File | None = None # genai.types is correct with new import try: # 1. Upload the file synchronously uploaded_file = genai_client.files.upload( @@ -588,7 +591,7 @@ def upload_pdf_for_gemini(pdf_path_str: str) -> genai.types.File | None: print(f"⚠️ Could not delete file during error cleanup: {delete_e}") return None -def google_search(query: str, num_results: int = 10) -> str: +async def google_search(query: str, num_results: int = 10) -> str: """Search Google for the given query using browser-use and return JSON-formatted results. Args: query: The search query. @@ -601,31 +604,31 @@ def google_search(query: str, num_results: int = 10) -> str: model=MODEL_NAME, api_key=SecretStr(os.getenv("GEMINI_API_KEY")) ) - browser, context = asyncio.run(setup_browser(headless=True)) - result = asyncio.run(agent_loop( + browser, context = await setup_browser(headless=True) + result = await agent_loop( llm, context, f"Search Google for '{query}' and extract the first {num_results} results as JSON list of {{'title','url'}}.", initial_url=f"https://www.google.com/search?q={query}" - )) + ) return result or "No results." except Exception as e: return f"Error during google_search: {e}" -def open_url(url: str) -> str: +async def open_url(url: str) -> str: # Changed to async def """Open a URL using browser-use and return the page's visible text content.""" try: llm = ChatGoogleGenerativeAI( model=MODEL_NAME, api_key=SecretStr(os.getenv("GEMINI_API_KEY")) ) - browser, context = asyncio.run(setup_browser(headless=True)) - result = asyncio.run(agent_loop( + browser, context = await setup_browser(headless=True) # Changed to await + result = await agent_loop( # Changed to await llm, context, f"Extract and return visible text content from the page at: {url}.", initial_url=url - )) + ) return result or "No content." except Exception as e: return f"Error during open_url: {e}"