diff --git a/Cargo.lock b/Cargo.lock index a9119f98..3aede43f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4269,6 +4269,7 @@ dependencies = [ "mofa-kernel", "mofa-runtime", "mofa-sdk", + "nix 0.29.0", "predicates", "ratatui", "serde", diff --git a/crates/mofa-cli/Cargo.toml b/crates/mofa-cli/Cargo.toml index 6770b25c..2261d2e7 100644 --- a/crates/mofa-cli/Cargo.toml +++ b/crates/mofa-cli/Cargo.toml @@ -50,6 +50,9 @@ indicatif = "0.17" comfy-table = "7" dirs-next = "2" +# Process management +nix = { version = "0.29", features = ["process", "signal"] } + # Database (optional, for db init command) sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "mysql", "sqlite"], optional = true } diff --git a/crates/mofa-cli/src/commands/agent/list.rs b/crates/mofa-cli/src/commands/agent/list.rs index 8f216c56..feb5d188 100644 --- a/crates/mofa-cli/src/commands/agent/list.rs +++ b/crates/mofa-cli/src/commands/agent/list.rs @@ -54,7 +54,7 @@ pub async fn run(ctx: &CliContext, running_only: bool, _show_all: bool) -> anyho println!(" No agents registered."); println!(); println!( - " Use {} to start an agent.", + " Use {} to register an agent.", "mofa agent start ".cyan() ); return Ok(()); @@ -81,9 +81,22 @@ pub async fn run(ctx: &CliContext, running_only: bool, _show_all: bool) -> anyho println!("{}", table); } + println!(); + println!(" Total: {} agent(s)", filtered.len()); + Ok(()) } +/// Format timestamp as human-readable string +fn format_timestamp(millis: u64) -> String { + use chrono::{DateTime, Local}; + use std::time::UNIX_EPOCH; + + let duration = std::time::Duration::from_millis(millis); + let datetime = DateTime::::from(UNIX_EPOCH + duration); + datetime.format("%Y-%m-%d %H:%M:%S").to_string() +} + #[derive(Debug, Clone, Serialize)] struct AgentInfo { id: String, diff --git a/crates/mofa-cli/src/commands/agent/start.rs b/crates/mofa-cli/src/commands/agent/start.rs index 6a300c94..1b1ac94d 100644 --- a/crates/mofa-cli/src/commands/agent/start.rs +++ b/crates/mofa-cli/src/commands/agent/start.rs @@ -3,6 +3,8 @@ use crate::config::loader::ConfigLoader; use crate::context::{AgentConfigEntry, CliContext}; use colored::Colorize; +use std::path::{Path, PathBuf}; +use tracing::info; /// Execute the `mofa agent start` command pub async fn run( @@ -18,20 +20,30 @@ pub async fn run( println!(" Mode: {}", "daemon".yellow()); } - // Check if agent is already registered - if ctx.agent_registry.contains(agent_id).await { - anyhow::bail!("Agent '{}' is already registered", agent_id); + // Check if agent already exists + if ctx.persistent_agents.exists(agent_id).await { + let existing = ctx.persistent_agents.get(agent_id).await; + if let Some(agent) = existing { + if agent.last_state == crate::state::AgentProcessState::Running { + anyhow::bail!( + "Agent '{}' is already running (PID: {})", + agent_id, + agent.process_id.unwrap_or(0) + ); + } + println!( + " {} Agent exists but is not running. Restarting...", + "!".yellow() + ); + } } - // Load agent configuration - let agent_config = if let Some(path) = config_path { + // Load or discover agent configuration + let (config_file, agent_name) = if let Some(path) = config_path { println!(" Config: {}", path.display().to_string().cyan()); - let loader = ConfigLoader::new(); - let cli_config = loader.load(path)?; - println!(" Agent: {}", cli_config.agent.name.white()); + ctx.process_manager.validate_config(path)?; - // Convert CLI AgentConfig to kernel AgentConfig - mofa_kernel::agent::config::AgentConfig::new(agent_id, &cli_config.agent.name) + (path.to_path_buf(), agent_id.to_string()) } else { // Try to auto-discover configuration let loader = ConfigLoader::new(); @@ -41,17 +53,20 @@ pub async fn run( " Config: {} (auto-discovered)", found_path.display().to_string().cyan() ); - let cli_config = loader.load(&found_path)?; - println!(" Agent: {}", cli_config.agent.name.white()); - mofa_kernel::agent::config::AgentConfig::new(agent_id, &cli_config.agent.name) + ctx.process_manager.validate_config(&found_path)?; + + (found_path, agent_id.to_string()) } None => { println!(" {} No config file found, using defaults", "!".yellow()); - mofa_kernel::agent::config::AgentConfig::new(agent_id, agent_id) + (PathBuf::new(), agent_id.to_string()) } } }; + // Create agent config + let agent_config = mofa_kernel::agent::config::AgentConfig::new(agent_id, &agent_name); + // Check if a matching factory type is available let mut factory_types = ctx.agent_registry.list_factory_types().await; if factory_types.is_empty() { @@ -72,6 +87,11 @@ pub async fn run( ); } + // Check if agent already exists + if ctx.agent_registry.contains(agent_id).await { + anyhow::bail!("Agent '{}' is already registered", agent_id); + } + // Try to create via factory ctx.agent_registry .create_and_register(&selected_factory, agent_config.clone()) @@ -105,7 +125,7 @@ pub async fn run( } } - println!("{} Agent '{}' started", "✓".green(), agent_id); + println!("{} Agent '{}' started successfully", "✓".green(), agent_id); Ok(()) } diff --git a/crates/mofa-cli/src/commands/agent/stop.rs b/crates/mofa-cli/src/commands/agent/stop.rs index b657fc46..d78f1cfc 100644 --- a/crates/mofa-cli/src/commands/agent/stop.rs +++ b/crates/mofa-cli/src/commands/agent/stop.rs @@ -2,6 +2,7 @@ use crate::context::CliContext; use colored::Colorize; +use tracing::info; /// Execute the `mofa agent stop` command pub async fn run( @@ -11,37 +12,42 @@ pub async fn run( ) -> anyhow::Result<()> { println!("{} Stopping agent: {}", "→".green(), agent_id.cyan()); + // Check if agent exists in registry or store + let in_registry = ctx.agent_registry.contains(agent_id).await; + let previous_entry = ctx .agent_store .get(agent_id) .map_err(|e| anyhow::anyhow!("Failed to load persisted agent '{}': {}", agent_id, e))?; - // When commands run in separate CLI invocations, runtime registry state can be absent. - // In that case, treat stop as a persisted-state transition if the agent exists on disk. - if !ctx.agent_registry.contains(agent_id).await { - if let Some(mut entry) = previous_entry.clone() { - if !force_persisted_stop { - anyhow::bail!( - "Agent '{}' is not active in runtime registry. Use --force-persisted-stop to mark persisted state as Stopped.", - agent_id - ); - } + let in_store = previous_entry.is_some(); + + if !in_registry && !in_store { + anyhow::bail!("Agent '{}' not found", agent_id); + } + // When commands run in separate CLI invocations, runtime registry state can be absent. + // In that case, if force_persisted_stop is true, update the persisted state. + if !in_registry && in_store && force_persisted_stop { + if let Some(mut entry) = previous_entry { entry.state = "Stopped".to_string(); ctx.agent_store .save(agent_id, &entry) .map_err(|e| anyhow::anyhow!("Failed to update agent '{}': {}", agent_id, e))?; println!( - "{} Agent '{}' was not running; updated persisted state to Stopped", - "!".yellow(), + "{} Agent '{}' persisted state updated to Stopped", + "✓".green(), agent_id ); return Ok(()); } + } + // If not in registry and no force flag, error out + if !in_registry { anyhow::bail!( - "Agent '{}' not found in registry or persisted store", + "Agent '{}' is not active in runtime registry. Use --force-persisted-stop to update persisted state.", agent_id ); } @@ -91,13 +97,13 @@ pub async fn run( agent_id ); } else { - println!( - "{} Agent '{}' was not in the registry", - "!".yellow(), - agent_id - ); + println!(" {} Agent is not running", "!".yellow()); } + println!("{} Agent '{}' stopped", "✓".green(), agent_id); + + info!("Agent '{}' stopped", agent_id); + Ok(()) } diff --git a/crates/mofa-cli/src/context.rs b/crates/mofa-cli/src/context.rs index e0fd8b8c..870f7698 100644 --- a/crates/mofa-cli/src/context.rs +++ b/crates/mofa-cli/src/context.rs @@ -1,6 +1,8 @@ //! CLI context providing access to backend services +use crate::state::PersistentAgentRegistry; use crate::store::PersistedStore; +use crate::utils::AgentProcessManager; use crate::utils::paths; use async_trait::async_trait; use mofa_foundation::agent::base::BaseAgent; @@ -62,6 +64,10 @@ pub struct CliContext { pub plugin_store: PersistedStore, /// Persistent tool source specifications pub tool_store: PersistedStore, + /// Persistent agent state storage + pub persistent_agents: Arc, + /// Agent process manager for spawning/managing processes + pub process_manager: AgentProcessManager, /// In-memory plugin registry pub plugin_registry: Arc, /// In-memory tool registry @@ -94,12 +100,21 @@ impl CliContext { let mut tool_registry = ToolRegistry::new(); replay_persisted_tools(&mut tool_registry, &tool_store)?; + let agents_dir = data_dir.join("agents"); + let persistent_agents = Arc::new(PersistentAgentRegistry::new(agents_dir).await.map_err( + |e| anyhow::anyhow!("Failed to initialize persistent agent registry: {}", e), + )?); + + let process_manager = AgentProcessManager::new(config_dir.clone()); + Ok(Self { session_manager, agent_registry, agent_store, plugin_store, tool_store, + persistent_agents, + process_manager, plugin_registry, tool_registry, data_dir, @@ -132,12 +147,21 @@ impl CliContext { let mut tool_registry = ToolRegistry::new(); replay_persisted_tools(&mut tool_registry, &tool_store)?; + let agents_dir = data_dir.join("agents"); + let persistent_agents = Arc::new(PersistentAgentRegistry::new(agents_dir).await.map_err( + |e| anyhow::anyhow!("Failed to initialize persistent agent registry: {}", e), + )?); + + let process_manager = AgentProcessManager::new(config_dir.clone()); + Ok(Self { session_manager, agent_registry, agent_store, plugin_store, tool_store, + persistent_agents, + process_manager, plugin_registry, tool_registry, data_dir, diff --git a/crates/mofa-cli/src/main.rs b/crates/mofa-cli/src/main.rs index a1327f3f..286dc902 100644 --- a/crates/mofa-cli/src/main.rs +++ b/crates/mofa-cli/src/main.rs @@ -6,6 +6,7 @@ mod config; mod context; mod output; mod render; +mod state; mod store; mod tui; mod utils; diff --git a/crates/mofa-cli/src/state/agent_state.rs b/crates/mofa-cli/src/state/agent_state.rs new file mode 100644 index 00000000..fa99146d --- /dev/null +++ b/crates/mofa-cli/src/state/agent_state.rs @@ -0,0 +1,387 @@ +//! Agent state persistence layer +//! +//! Manages persistent storage and lifecycle of agents on the local system. + +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::process::Child; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::{debug, info, warn}; + +/// Agent runtime state (in-memory process tracking) +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum AgentProcessState { + /// Agent is not running + Stopped, + /// Agent is starting up + Starting, + /// Agent is running + Running, + /// Agent is stopping + Stopping, + /// Agent has encountered an error + Error, +} + +impl std::fmt::Display for AgentProcessState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Stopped => write!(f, "Stopped"), + Self::Starting => write!(f, "Starting"), + Self::Running => write!(f, "Running"), + Self::Stopping => write!(f, "Stopping"), + Self::Error => write!(f, "Error"), + } + } +} + +/// Persistent agent metadata (stored on disk) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AgentMetadata { + /// Unique agent identifier + pub id: String, + /// Human-readable name + pub name: String, + /// Agent description + pub description: Option, + /// Path to agent configuration file + pub config_path: Option, + /// Last known state + pub last_state: AgentProcessState, + /// Timestamp when agent was registered (ms since epoch) + pub registered_at: u64, + /// Timestamp when agent was last started (ms since epoch) + pub last_started: Option, + /// Timestamp when agent was last stopped (ms since epoch) + pub last_stopped: Option, + /// Process ID if running + pub process_id: Option, + /// Number of times agent has been started + pub start_count: u32, + /// Custom metadata + pub tags: Vec, +} + +impl AgentMetadata { + /// Create new agent metadata + pub fn new(id: String, name: String) -> Self { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + + Self { + id, + name, + description: None, + config_path: None, + last_state: AgentProcessState::Stopped, + registered_at: now, + last_started: None, + last_stopped: None, + process_id: None, + start_count: 0, + tags: Vec::new(), + } + } + + /// Set configuration path + pub fn with_config(mut self, path: PathBuf) -> Self { + self.config_path = Some(path); + self + } + + /// Add tag + pub fn with_tag(mut self, tag: String) -> Self { + self.tags.push(tag); + self + } + + /// Mark as started + pub fn mark_started(&mut self, pid: u32) { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + self.last_started = Some(now); + self.process_id = Some(pid); + self.last_state = AgentProcessState::Running; + self.start_count += 1; + } + + /// Mark as stopped + pub fn mark_stopped(&mut self) { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + self.last_stopped = Some(now); + self.process_id = None; + self.last_state = AgentProcessState::Stopped; + } + + /// Mark as error + pub fn mark_error(&mut self) { + self.last_state = AgentProcessState::Error; + } +} + +/// Persistent agent registry - stores agents to disk +pub struct PersistentAgentRegistry { + /// Directory where agent metadata is stored + agents_dir: PathBuf, + /// In-memory cache of agent metadata + metadata_cache: Arc>>, + /// Map of running processes (agent_id -> Child process) + running_processes: Arc>>, +} + +/// Represents a running agent process +pub struct RunningAgent { + /// Agent metadata + pub metadata: AgentMetadata, + /// Process handle (may be None if process handle is lost) + pub process: Option, +} + +impl PersistentAgentRegistry { + /// Create or load agent registry from disk + pub async fn new(agents_dir: PathBuf) -> Result { + // Ensure directory exists + tokio::fs::create_dir_all(&agents_dir).await?; + + let mut metadata_cache = HashMap::new(); + + // Load existing agents from disk + let mut entries = tokio::fs::read_dir(&agents_dir).await?; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.extension().is_some_and(|ext| ext == "json") { + match tokio::fs::read_to_string(&path).await { + Ok(content) => { + match serde_json::from_str::(&content) { + Ok(mut metadata) => { + // Reset running state to stopped (process was not preserved across restarts) + metadata.last_state = AgentProcessState::Stopped; + metadata.process_id = None; + metadata_cache.insert(metadata.id.clone(), metadata); + } + Err(e) => { + warn!( + "Failed to parse agent metadata from {}: {}", + path.display(), + e + ); + } + } + } + Err(e) => { + warn!( + "Failed to read agent metadata from {}: {}", + path.display(), + e + ); + } + } + } + } + + Ok(Self { + agents_dir, + metadata_cache: Arc::new(RwLock::new(metadata_cache)), + running_processes: Arc::new(RwLock::new(HashMap::new())), + }) + } + + /// Register a new agent + pub async fn register(&self, metadata: AgentMetadata) -> Result<()> { + let agent_id = metadata.id.clone(); + let file_path = self.agents_dir.join(format!("{}.json", agent_id)); + + // Write to disk + let json = serde_json::to_string_pretty(&metadata)?; + tokio::fs::write(&file_path, json).await?; + + // Update cache + { + let mut cache = self.metadata_cache.write().await; + cache.insert(agent_id.clone(), metadata); + } + + info!("Registered agent: {} in {}", agent_id, file_path.display()); + Ok(()) + } + + /// Get agent metadata + pub async fn get(&self, agent_id: &str) -> Option { + let cache = self.metadata_cache.read().await; + cache.get(agent_id).cloned() + } + + /// List all agents + pub async fn list(&self) -> Vec { + let cache = self.metadata_cache.read().await; + let mut agents: Vec<_> = cache.values().cloned().collect(); + agents.sort_by(|a, b| a.id.cmp(&b.id)); + agents + } + + /// List running agents + pub async fn list_running(&self) -> Vec { + self.list() + .await + .into_iter() + .filter(|a| a.last_state == AgentProcessState::Running) + .collect() + } + + /// Update agent metadata + pub async fn update(&self, metadata: AgentMetadata) -> Result<()> { + let agent_id = metadata.id.clone(); + let file_path = self.agents_dir.join(format!("{}.json", agent_id)); + + // Write to disk + let json = serde_json::to_string_pretty(&metadata)?; + tokio::fs::write(&file_path, json).await?; + + // Update cache + { + let mut cache = self.metadata_cache.write().await; + cache.insert(agent_id.clone(), metadata); + } + + debug!("Updated agent metadata: {}", agent_id); + Ok(()) + } + + /// Remove agent + pub async fn remove(&self, agent_id: &str) -> Result { + let file_path = self.agents_dir.join(format!("{}.json", agent_id)); + + // Try to remove file + let removed = match tokio::fs::remove_file(&file_path).await { + Ok(_) => true, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => false, + Err(e) => return Err(e.into()), + }; + + // Remove from cache + { + let mut cache = self.metadata_cache.write().await; + cache.remove(agent_id); + } + + if removed { + info!("Removed agent: {} from {}", agent_id, file_path.display()); + } + + Ok(removed) + } + + /// Check if agent exists + pub async fn exists(&self, agent_id: &str) -> bool { + let cache = self.metadata_cache.read().await; + cache.contains_key(agent_id) + } + + /// Get count of registered agents + pub async fn count(&self) -> usize { + let cache = self.metadata_cache.read().await; + cache.len() + } + + /// Track a running process + pub async fn track_process(&self, agent_id: String, process: Child, metadata: AgentMetadata) { + let mut processes = self.running_processes.write().await; + processes.insert( + agent_id, + RunningAgent { + metadata, + process: Some(process), + }, + ); + } + + /// Get running process + pub async fn get_process(&self, agent_id: &str) -> Option { + let processes = self.running_processes.read().await; + processes.get(agent_id).and_then(|a| a.metadata.process_id) + } + + /// Remove tracking of a process + pub async fn untrack_process(&self, agent_id: &str) { + let mut processes = self.running_processes.write().await; + processes.remove(agent_id); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[tokio::test] + async fn test_agent_metadata_lifecycle() { + let mut metadata = AgentMetadata::new("agent-1".into(), "My Agent".into()); + assert_eq!(metadata.last_state, AgentProcessState::Stopped); + assert_eq!(metadata.start_count, 0); + + metadata.mark_started(12345); + assert_eq!(metadata.last_state, AgentProcessState::Running); + assert_eq!(metadata.process_id, Some(12345)); + assert_eq!(metadata.start_count, 1); + + metadata.mark_stopped(); + assert_eq!(metadata.last_state, AgentProcessState::Stopped); + assert_eq!(metadata.process_id, None); + } + + #[tokio::test] + async fn test_registry_persistence() -> Result<()> { + let temp_dir = TempDir::new()?; + let registry = PersistentAgentRegistry::new(temp_dir.path().to_path_buf()).await?; + + // Register agent + let metadata = AgentMetadata::new("agent-1".into(), "Test Agent".into()); + registry.register(metadata.clone()).await?; + + // Verify it exists + assert!(registry.exists("agent-1").await); + let retrieved = registry.get("agent-1").await; + assert_eq!(retrieved.map(|m| m.name), Some("Test Agent".into())); + + // Reload registry (simulating restart) + let registry2 = PersistentAgentRegistry::new(temp_dir.path().to_path_buf()).await?; + assert!(registry2.exists("agent-1").await); + assert_eq!(registry2.count().await, 1); + + Ok(()) + } + + #[tokio::test] + async fn test_list_running_agents() -> Result<()> { + let temp_dir = TempDir::new()?; + let registry = PersistentAgentRegistry::new(temp_dir.path().to_path_buf()).await?; + + // Register multiple agents + let mut agent1 = AgentMetadata::new("agent-1".into(), "Agent 1".into()); + agent1.mark_started(111); + registry.register(agent1).await?; + + let mut agent2 = AgentMetadata::new("agent-2".into(), "Agent 2".into()); + agent2.mark_started(222); + registry.register(agent2).await?; + + let agent3 = AgentMetadata::new("agent-3".into(), "Agent 3".into()); + registry.register(agent3).await?; + + // Only 2 are running + let running = registry.list_running().await; + assert_eq!(running.len(), 2); + + Ok(()) + } +} diff --git a/crates/mofa-cli/src/state/mod.rs b/crates/mofa-cli/src/state/mod.rs new file mode 100644 index 00000000..644832fb --- /dev/null +++ b/crates/mofa-cli/src/state/mod.rs @@ -0,0 +1,5 @@ +//! CLI state management + +pub mod agent_state; + +pub use agent_state::{AgentMetadata, AgentProcessState, PersistentAgentRegistry}; diff --git a/crates/mofa-cli/src/utils/mod.rs b/crates/mofa-cli/src/utils/mod.rs index e9fa4f2c..4e468ea0 100644 --- a/crates/mofa-cli/src/utils/mod.rs +++ b/crates/mofa-cli/src/utils/mod.rs @@ -2,5 +2,7 @@ pub mod env; pub mod paths; +pub mod process_manager; pub use paths::*; +pub use process_manager::AgentProcessManager; diff --git a/crates/mofa-cli/src/utils/process_manager.rs b/crates/mofa-cli/src/utils/process_manager.rs new file mode 100644 index 00000000..5fceeaa7 --- /dev/null +++ b/crates/mofa-cli/src/utils/process_manager.rs @@ -0,0 +1,255 @@ +//! Agent process manager - handles spawning and managing agent runtime processes + +use anyhow::{Result, bail}; +use std::path::{Path, PathBuf}; +use std::process::{Child, Command, Stdio}; +use tracing::{debug, info, warn}; + +/// Manages agent runtime processes +pub struct AgentProcessManager { + /// Directory containing agent configurations + config_dir: PathBuf, +} + +impl AgentProcessManager { + /// Create new process manager + pub fn new(config_dir: PathBuf) -> Self { + Self { config_dir } + } + + /// Start an agent process + /// + /// # Arguments + /// * `agent_id` - Unique agent identifier + /// * `config_path` - Path to agent configuration file + /// * `daemon` - If true, run in background; if false, run in foreground + /// + /// # Returns + /// Process ID of the started agent, or error if startup failed + pub fn start_agent( + &self, + agent_id: &str, + config_path: Option<&Path>, + daemon: bool, + ) -> Result { + debug!("Starting agent: {} (daemon: {})", agent_id, daemon); + + // Determine config file to use + let config = if let Some(path) = config_path { + path.to_path_buf() + } else { + // Try to find config in default locations + let default_path = self.config_dir.join(format!("{}.yaml", agent_id)); + if !default_path.exists() { + bail!( + "No configuration found for agent '{}' at {}", + agent_id, + default_path.display() + ); + } + default_path + }; + + // Verify config file exists + if !config.exists() { + bail!("Agent configuration not found at: {}", config.display()); + } + + info!( + "Starting agent '{}' with config: {}", + agent_id, + config.display() + ); + + // Build the command to run the agent + let mut cmd = Command::new("cargo"); + cmd.arg("run") + .arg("-p") + .arg("mofa-cli") + .arg("--") + .arg("run") + .arg(config.to_string_lossy().as_ref()); + + // Configure output + if daemon { + cmd.stdout(Stdio::null()) + .stderr(Stdio::null()) + .stdin(Stdio::null()); + } else { + cmd.stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) + .stdin(Stdio::inherit()); + } + + // Spawn the process + let mut child = cmd + .spawn() + .map_err(|e| anyhow::anyhow!("Failed to start agent '{}' process: {}", agent_id, e))?; + + // Get the child process ID + let pid = child.id(); + info!("Agent '{}' started with PID: {}", agent_id, pid); + + // For daemon mode, we can let the process run independently + // For foreground mode, we wait for completion + if daemon { + // Detach from parent process in daemon mode + // On Unix, this is automatic; on Windows, we just let it go + std::mem::drop(child); + } + + Ok(pid) + } + + /// Stop an agent process by PID + /// + /// # Arguments + /// * `pid` - Process ID to terminate + /// * `force` - If true, use SIGKILL; if false, try SIGTERM first + pub async fn stop_agent_by_pid(&self, pid: u32, force: bool) -> Result<()> { + debug!("Stopping agent with PID: {} (force: {})", pid, force); + + #[cfg(unix)] + { + use nix::sys::signal::{Signal, kill}; + use nix::unistd::Pid; + + let nix_pid = Pid::from_raw(pid as i32); + let signal = if force { + Signal::SIGKILL + } else { + Signal::SIGTERM + }; + + match kill(nix_pid, Some(signal)) { + Ok(_) => { + info!("Sent {:?} to process {}", signal, pid); + Ok(()) + } + Err(e) => { + warn!("Failed to send {:?} to process {}: {}", signal, pid, e); + Err(anyhow::anyhow!( + "Failed to terminate process {}: {}", + pid, + e + )) + } + } + } + + #[cfg(windows)] + { + use std::process::Command; + + // On Windows, use taskkill command + let status = Command::new("taskkill") + .arg(if force { "/F" } else { "" }) + .arg("/PID") + .arg(pid.to_string()) + .status()?; + + if status.success() { + info!("Successfully terminated process {}", pid); + Ok(()) + } else { + bail!("Failed to terminate process {}", pid) + } + } + + #[cfg(not(any(unix, windows)))] + { + bail!("Agent process termination not supported on this platform") + } + } + + /// Check if a process is running + pub fn is_running(&self, pid: u32) -> bool { + #[cfg(unix)] + { + use nix::sys::signal::kill; + use nix::unistd::Pid; + + let nix_pid = Pid::from_raw(pid as i32); + // Signal 0 is used to check if process exists without sending a signal + kill(nix_pid, None).is_ok() + } + + #[cfg(windows)] + { + use std::process::Command; + + let output = Command::new("tasklist") + .arg("/FI") + .arg(format!("PID eq {}", pid)) + .output(); + + output + .map(|o| String::from_utf8_lossy(&o.stdout).contains(&pid.to_string())) + .unwrap_or(false) + } + + #[cfg(not(any(unix, windows)))] + { + false + } + } + + /// Validate agent configuration + pub fn validate_config(&self, config_path: &Path) -> Result<()> { + if !config_path.exists() { + bail!("Configuration file not found: {}", config_path.display()); + } + + // Try to load and parse as YAML + let content = std::fs::read_to_string(config_path)?; + serde_yaml::from_str::(&content) + .map_err(|e| anyhow::anyhow!("Invalid YAML in config: {}", e))?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[test] + fn test_process_manager_creation() { + let temp_dir = TempDir::new().unwrap(); + let manager = AgentProcessManager::new(temp_dir.path().to_path_buf()); + + assert!(manager.config_dir.exists()); + } + + #[test] + fn test_validate_config_missing_file() { + let temp_dir = TempDir::new().unwrap(); + let manager = AgentProcessManager::new(temp_dir.path().to_path_buf()); + + let result = manager.validate_config(std::path::Path::new("/nonexistent/config.yaml")); + assert!(result.is_err()); + } + + #[test] + fn test_validate_config_valid_yaml() { + let temp_dir = TempDir::new().unwrap(); + let config_path = temp_dir.path().join("config.yaml"); + std::fs::write(&config_path, "agent:\n name: Test\n id: test-1\n").unwrap(); + + let manager = AgentProcessManager::new(temp_dir.path().to_path_buf()); + let result = manager.validate_config(&config_path); + assert!(result.is_ok()); + } + + #[test] + fn test_validate_config_invalid_yaml() { + let temp_dir = TempDir::new().unwrap(); + let config_path = temp_dir.path().join("config.yaml"); + std::fs::write(&config_path, "agent:\n name: [unclosed").unwrap(); + + let manager = AgentProcessManager::new(temp_dir.path().to_path_buf()); + let result = manager.validate_config(&config_path); + assert!(result.is_err()); + } +} diff --git a/crates/mofa-foundation/src/rag/chunker.rs b/crates/mofa-foundation/src/rag/chunker.rs index 3faa0e46..1d97c8ba 100644 --- a/crates/mofa-foundation/src/rag/chunker.rs +++ b/crates/mofa-foundation/src/rag/chunker.rs @@ -65,7 +65,11 @@ impl TextChunker { let mut chunks = Vec::new(); let chars: Vec = text.chars().collect(); - let step = self.config.chunk_size.saturating_sub(self.config.chunk_overlap).max(1); + let step = self + .config + .chunk_size + .saturating_sub(self.config.chunk_overlap) + .max(1); let mut start = 0; while start < chars.len() { diff --git a/crates/mofa-foundation/src/rag/vector_store.rs b/crates/mofa-foundation/src/rag/vector_store.rs index cf6613e6..61fa26df 100644 --- a/crates/mofa-foundation/src/rag/vector_store.rs +++ b/crates/mofa-foundation/src/rag/vector_store.rs @@ -89,7 +89,11 @@ impl VectorStore for InMemoryVectorStore { }) .collect(); - scored.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal)); + scored.sort_by(|a, b| { + b.score + .partial_cmp(&a.score) + .unwrap_or(std::cmp::Ordering::Equal) + }); scored.truncate(top_k); Ok(scored) @@ -188,10 +192,7 @@ mod tests { .await .unwrap(); - let results = store - .search(&[1.0, 0.0, 0.0], 2, None) - .await - .unwrap(); + let results = store.search(&[1.0, 0.0, 0.0], 2, None).await.unwrap(); assert_eq!(results.len(), 2); assert_eq!(results[0].id, "a"); @@ -238,14 +239,8 @@ mod tests { async fn test_clear() { let mut store = InMemoryVectorStore::cosine(); - store - .upsert(make_chunk("1", "a", vec![1.0])) - .await - .unwrap(); - store - .upsert(make_chunk("2", "b", vec![2.0])) - .await - .unwrap(); + store.upsert(make_chunk("1", "a", vec![1.0])).await.unwrap(); + store.upsert(make_chunk("2", "b", vec![2.0])).await.unwrap(); assert_eq!(store.count().await.unwrap(), 2); store.clear().await.unwrap(); diff --git a/crates/mofa-kernel/src/rag/types.rs b/crates/mofa-kernel/src/rag/types.rs index da291bec..1725102f 100644 --- a/crates/mofa-kernel/src/rag/types.rs +++ b/crates/mofa-kernel/src/rag/types.rs @@ -24,11 +24,7 @@ pub struct DocumentChunk { impl DocumentChunk { /// Create a new document chunk - pub fn new( - id: impl Into, - text: impl Into, - embedding: Vec, - ) -> Self { + pub fn new(id: impl Into, text: impl Into, embedding: Vec) -> Self { Self { id: id.into(), text: text.into(),