From e5559f0f203f14f4f20fbd598fc99edb90ab5d51 Mon Sep 17 00:00:00 2001 From: Dorotea Monaco <134923734+doroteaMonaco@users.noreply.github.com> Date: Sun, 22 Feb 2026 14:27:32 +0100 Subject: [PATCH 1/5] feat: Implement persistent agent state management and process handling - Added `PersistentAgentRegistry` for managing agent metadata on disk. - Introduced `AgentProcessManager` for spawning and managing agent processes. - Updated agent commands (`start`, `stop`, `list`) to utilize persistent storage. - Enhanced agent metadata structure to include process tracking and state management. - Implemented configuration validation for agent startup. - Refactored CLI context to include persistent agent registry and process manager. - Added timestamp formatting for agent start/stop events. --- Cargo.lock | 102 ++++- crates/mofa-cli/Cargo.toml | 3 + crates/mofa-cli/src/commands/agent/list.rs | 62 ++- crates/mofa-cli/src/commands/agent/restart.rs | 6 +- crates/mofa-cli/src/commands/agent/start.rs | 124 +++--- crates/mofa-cli/src/commands/agent/status.rs | 5 +- crates/mofa-cli/src/commands/agent/stop.rs | 88 ++-- .../mofa-cli/src/commands/plugin/uninstall.rs | 6 +- .../mofa-cli/src/commands/session/delete.rs | 11 +- crates/mofa-cli/src/commands/session/list.rs | 6 +- crates/mofa-cli/src/commands/tool/info.rs | 5 +- crates/mofa-cli/src/context.rs | 15 + crates/mofa-cli/src/main.rs | 1 + crates/mofa-cli/src/state/agent_state.rs | 387 ++++++++++++++++++ crates/mofa-cli/src/state/mod.rs | 5 + crates/mofa-cli/src/utils/mod.rs | 2 + crates/mofa-cli/src/utils/process_manager.rs | 255 ++++++++++++ .../mofa-cli/tests/agent_integration_tests.rs | 5 +- 18 files changed, 954 insertions(+), 134 deletions(-) create mode 100644 crates/mofa-cli/src/state/agent_state.rs create mode 100644 crates/mofa-cli/src/state/mod.rs create mode 100644 crates/mofa-cli/src/utils/process_manager.rs diff --git a/Cargo.lock b/Cargo.lock index 153abf6c..3aede43f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1892,9 +1892,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.5.7" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2163a0e204a148662b6b6816d4b5d5668a5f2f8df498ccbd5cd0e864e78fecba" +checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" dependencies = [ "powerfmt", "serde_core", @@ -2191,7 +2191,7 @@ dependencies = [ "serde_yaml", "shared-memory-server", "shellexpand 3.1.1", - "sysinfo", + "sysinfo 0.36.1", "tokio", "tokio-stream", "tracing", @@ -3723,9 +3723,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.87" +version = "0.3.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93f0862381daaec758576dcc22eb7bbf4d7efd67328553f3b45a412a51a3fb21" +checksum = "c7e709f3e3d22866f9c25b3aff01af289b18422cc8b4262fb19103ee80fe513d" dependencies = [ "once_cell", "wasm-bindgen", @@ -4269,6 +4269,7 @@ dependencies = [ "mofa-kernel", "mofa-runtime", "mofa-sdk", + "nix 0.29.0", "predicates", "ratatui", "serde", @@ -4394,6 +4395,7 @@ dependencies = [ "rust-embed", "serde", "serde_json", + "sysinfo 0.32.1", "thiserror 1.0.69", "tokio", "tower 0.4.13", @@ -7556,6 +7558,20 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "sysinfo" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c33cd241af0f2e9e3b5c32163b873b29956890b5342e6745b917ce9d490f4af" +dependencies = [ + "core-foundation-sys", + "libc", + "memchr", + "ntapi", + "rayon", + "windows 0.57.0", +] + [[package]] name = "sysinfo" version = "0.36.1" @@ -8824,9 +8840,9 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" [[package]] name = "wasm-bindgen" -version = "0.2.110" +version = "0.2.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1de241cdc66a9d91bd84f097039eb140cdc6eec47e0cdbaf9d932a1dd6c35866" +checksum = "ec1adf1535672f5b7824f817792b1afd731d7e843d2d04ec8f27e8cb51edd8ac" dependencies = [ "cfg-if", "once_cell", @@ -8837,9 +8853,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.60" +version = "0.4.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a42e96ea38f49b191e08a1bab66c7ffdba24b06f9995b39a9dd60222e5b6f1da" +checksum = "fe88540d1c934c4ec8e6db0afa536876c5441289d7f9f9123d4f065ac1250a6b" dependencies = [ "cfg-if", "futures-util", @@ -8851,9 +8867,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.110" +version = "0.2.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e12fdf6649048f2e3de6d7d5ff3ced779cdedee0e0baffd7dff5cdfa3abc8a52" +checksum = "19e638317c08b21663aed4d2b9a2091450548954695ff4efa75bff5fa546b3b1" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -8861,9 +8877,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.110" +version = "0.2.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e63d1795c565ac3462334c1e396fd46dbf481c40f51f5072c310717bc4fb309" +checksum = "2c64760850114d03d5f65457e96fc988f11f01d38fbaa51b254e4ab5809102af" dependencies = [ "bumpalo", "proc-macro2", @@ -8874,9 +8890,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.110" +version = "0.2.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9f9cdac23a5ce71f6bf9f8824898a501e511892791ea2a0c6b8568c68b9cb53" +checksum = "60eecd4fe26177cfa3339eb00b4a36445889ba3ad37080c2429879718e20ca41" dependencies = [ "unicode-ident", ] @@ -9290,9 +9306,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.87" +version = "0.3.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2c7c5718134e770ee62af3b6b4a84518ec10101aad610c024b64d6ff29bb1ff" +checksum = "9d6bb20ed2d9572df8584f6dc81d68a41a625cadc6f15999d649a70ce7e3597a" dependencies = [ "js-sys", "wasm-bindgen", @@ -9534,6 +9550,16 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12342cb4d8e3b046f3d80effd474a7a02447231330ef77d71daa6fbc40681143" +dependencies = [ + "windows-core 0.57.0", + "windows-targets 0.52.6", +] + [[package]] name = "windows" version = "0.61.3" @@ -9587,14 +9613,26 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ed2439a290666cd67ecce2b0ffaad89c2a56b976b736e6ece670297897832d" +dependencies = [ + "windows-implement 0.57.0", + "windows-interface 0.57.0", + "windows-result 0.1.2", + "windows-targets 0.52.6", +] + [[package]] name = "windows-core" version = "0.61.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" dependencies = [ - "windows-implement", - "windows-interface", + "windows-implement 0.60.2", + "windows-interface 0.59.3", "windows-link 0.1.3", "windows-result 0.3.4", "windows-strings 0.4.2", @@ -9606,8 +9644,8 @@ version = "0.62.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" dependencies = [ - "windows-implement", - "windows-interface", + "windows-implement 0.60.2", + "windows-interface 0.59.3", "windows-link 0.2.1", "windows-result 0.4.1", "windows-strings 0.5.1", @@ -9635,6 +9673,17 @@ dependencies = [ "windows-threading 0.2.1", ] +[[package]] +name = "windows-implement" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "windows-implement" version = "0.60.2" @@ -9646,6 +9695,17 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "windows-interface" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "windows-interface" version = "0.59.3" diff --git a/crates/mofa-cli/Cargo.toml b/crates/mofa-cli/Cargo.toml index 6770b25c..1b38cb0a 100644 --- a/crates/mofa-cli/Cargo.toml +++ b/crates/mofa-cli/Cargo.toml @@ -50,6 +50,9 @@ indicatif = "0.17" comfy-table = "7" dirs-next = "2" +# Process management +nix = { version = "0.29", features = ["process"] } + # Database (optional, for db init command) sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "mysql", "sqlite"], optional = true } diff --git a/crates/mofa-cli/src/commands/agent/list.rs b/crates/mofa-cli/src/commands/agent/list.rs index 4d536480..3a33d6a8 100644 --- a/crates/mofa-cli/src/commands/agent/list.rs +++ b/crates/mofa-cli/src/commands/agent/list.rs @@ -10,61 +10,87 @@ pub async fn run(ctx: &CliContext, running_only: bool, _show_all: bool) -> anyho println!("{} Listing agents", "→".green()); println!(); - let agents_metadata = ctx.agent_registry.list().await; + // Load all agents from persistent storage + let all_agents = ctx.persistent_agents.list().await; - if agents_metadata.is_empty() { + if all_agents.is_empty() { println!(" No agents registered."); println!(); println!( - " Use {} to start an agent.", + " Use {} to register an agent.", "mofa agent start ".cyan() ); return Ok(()); } - let agents: Vec = agents_metadata + // Filter agents based on flags + let agents: Vec = all_agents .iter() + .filter(|a| { + if running_only { + a.last_state == crate::state::AgentProcessState::Running + } else { + true + } + }) .map(|m| { - let status = format!("{:?}", m.state); + let status = m.last_state.to_string(); + let process_id = m.process_id.map(|pid| pid.to_string()); + AgentInfo { id: m.id.clone(), name: m.name.clone(), status, + process_id, + starts: m.start_count, + last_started: m.last_started.map(format_timestamp), description: m.description.clone(), } }) .collect(); - // Filter based on flags - let filtered: Vec<_> = if running_only { - agents - .into_iter() - .filter(|a| a.status == "Running" || a.status == "Ready") - .collect() - } else { - agents - }; - - if filtered.is_empty() { - println!(" No agents found matching criteria."); + if agents.is_empty() { + if running_only { + println!(" No running agents found."); + } else { + println!(" No agents found."); + } return Ok(()); } // Display as table - let json = serde_json::to_value(&filtered)?; + let json = serde_json::to_value(&agents)?; if let Some(arr) = json.as_array() { let table = Table::from_json_array(arr); println!("{}", table); } + println!(); + println!(" Total: {} agent(s)", agents.len()); + Ok(()) } +/// Format timestamp as human-readable string +fn format_timestamp(millis: u64) -> String { + use chrono::{DateTime, Local}; + use std::time::UNIX_EPOCH; + + let duration = std::time::Duration::from_millis(millis); + let datetime = DateTime::::from(UNIX_EPOCH + duration); + datetime.format("%Y-%m-%d %H:%M:%S").to_string() +} + #[derive(Debug, Clone, Serialize)] struct AgentInfo { id: String, name: String, status: String, #[serde(skip_serializing_if = "Option::is_none")] + process_id: Option, + starts: u32, + #[serde(skip_serializing_if = "Option::is_none")] + last_started: Option, + #[serde(skip_serializing_if = "Option::is_none")] description: Option, } diff --git a/crates/mofa-cli/src/commands/agent/restart.rs b/crates/mofa-cli/src/commands/agent/restart.rs index fde0d0a8..decb98ff 100644 --- a/crates/mofa-cli/src/commands/agent/restart.rs +++ b/crates/mofa-cli/src/commands/agent/restart.rs @@ -17,11 +17,7 @@ pub async fn run( if let Some(agent) = ctx.agent_registry.get(agent_id).await { let mut agent_guard = agent.write().await; if let Err(e) = agent_guard.shutdown().await { - println!( - " {} Graceful shutdown failed: {}", - "!".yellow(), - e - ); + println!(" {} Graceful shutdown failed: {}", "!".yellow(), e); } } diff --git a/crates/mofa-cli/src/commands/agent/start.rs b/crates/mofa-cli/src/commands/agent/start.rs index b627d65d..8961f00c 100644 --- a/crates/mofa-cli/src/commands/agent/start.rs +++ b/crates/mofa-cli/src/commands/agent/start.rs @@ -2,13 +2,16 @@ use crate::config::loader::ConfigLoader; use crate::context::CliContext; +use crate::state::AgentMetadata; use colored::Colorize; +use std::path::Path; +use tracing::info; /// Execute the `mofa agent start` command pub async fn run( ctx: &CliContext, agent_id: &str, - config_path: Option<&std::path::Path>, + config_path: Option<&Path>, daemon: bool, ) -> anyhow::Result<()> { println!("{} Starting agent: {}", "→".green(), agent_id.cyan()); @@ -17,20 +20,33 @@ pub async fn run( println!(" Mode: {}", "daemon".yellow()); } - // Check if agent is already registered - if ctx.agent_registry.contains(agent_id).await { - anyhow::bail!("Agent '{}' is already registered", agent_id); + // Check if agent already exists + if ctx.persistent_agents.exists(agent_id).await { + let existing = ctx.persistent_agents.get(agent_id).await; + if let Some(agent) = existing { + if agent.last_state == crate::state::AgentProcessState::Running { + anyhow::bail!( + "Agent '{}' is already running (PID: {})", + agent_id, + agent.process_id.unwrap_or(0) + ); + } + println!( + " {} Agent exists but is not running. Restarting...", + "!".yellow() + ); + } } - // Load agent configuration - let agent_config = if let Some(path) = config_path { + // Load or discover agent configuration + let (config_file, agent_name) = if let Some(path) = config_path { println!(" Config: {}", path.display().to_string().cyan()); - let loader = ConfigLoader::new(); - let cli_config = loader.load(path)?; - println!(" Agent: {}", cli_config.agent.name.white()); + ctx.process_manager.validate_config(path)?; + + // Try to load name from config + let name = load_agent_name_from_config(path).unwrap_or_else(|| agent_id.to_string()); - // Convert CLI AgentConfig to kernel AgentConfig - mofa_kernel::agent::config::AgentConfig::new(agent_id, &cli_config.agent.name) + (path.to_path_buf(), name) } else { // Try to auto-discover configuration let loader = ConfigLoader::new(); @@ -40,50 +56,66 @@ pub async fn run( " Config: {} (auto-discovered)", found_path.display().to_string().cyan() ); - let cli_config = loader.load(&found_path)?; - println!(" Agent: {}", cli_config.agent.name.white()); - mofa_kernel::agent::config::AgentConfig::new(agent_id, &cli_config.agent.name) + ctx.process_manager.validate_config(&found_path)?; + + let name = load_agent_name_from_config(&found_path) + .unwrap_or_else(|| agent_id.to_string()); + + (found_path, name) } None => { - println!( - " {} No config file found, using defaults", - "!".yellow() + anyhow::bail!( + "No configuration found for agent '{}'. Specify with --config or create a mofa.yaml file.", + agent_id ); - mofa_kernel::agent::config::AgentConfig::new(agent_id, agent_id) } } }; - // Check if a matching factory type is available - let factory_types = ctx.agent_registry.list_factory_types().await; - if factory_types.is_empty() { - println!( - " {} No agent factories registered. Agent registered with config only.", - "!".yellow() - ); - println!(" Agent config stored for: {}", agent_config.name.cyan()); - } else { - // Try to create via factory - let type_id = factory_types.first().unwrap(); - match ctx - .agent_registry - .create_and_register(type_id, agent_config.clone()) - .await - { - Ok(_) => { - println!("{} Agent '{}' created and registered", "✓".green(), agent_id); - } - Err(e) => { - println!( - " {} Failed to create agent via factory: {}", - "!".yellow(), - e - ); - } + println!(" Agent: {}", agent_name.white()); + + // Start the agent process + println!(" {} Spawning process...", "→".green()); + let pid = match ctx + .process_manager + .start_agent(agent_id, Some(&config_file), daemon) + { + Ok(p) => { + println!(" PID: {}", p.to_string().cyan()); + p } - } + Err(e) => { + println!(" {} Failed to start process: {}", "✗".red(), e); + anyhow::bail!("Failed to start agent process: {}", e); + } + }; + + // Create and persist agent metadata + let mut metadata = AgentMetadata::new(agent_id.to_string(), agent_name); + metadata = metadata.with_config(config_file); + metadata.mark_started(pid); - println!("{} Agent '{}' started", "✓".green(), agent_id); + ctx.persistent_agents.register(metadata).await?; + + println!("{} Agent '{}' started successfully", "✓".green(), agent_id); + println!(" PID: {}", pid.to_string().cyan()); + + info!("Agent '{}' started with PID: {}", agent_id, pid); Ok(()) } + +/// Load agent name from configuration file +fn load_agent_name_from_config(path: &Path) -> Option { + match std::fs::read_to_string(path) { + Ok(content) => match serde_yaml::from_str::(&content) { + Ok(doc) => doc + .get("agent") + .and_then(|agent| agent.get("name")) + .and_then(|name| name.as_str()) + .map(|s| s.to_string()), + Err(_) => None, + }, + Err(_) => None, + } +} diff --git a/crates/mofa-cli/src/commands/agent/status.rs b/crates/mofa-cli/src/commands/agent/status.rs index 041f6d78..9291f1a5 100644 --- a/crates/mofa-cli/src/commands/agent/status.rs +++ b/crates/mofa-cli/src/commands/agent/status.rs @@ -14,7 +14,10 @@ pub async fn run(ctx: &CliContext, agent_id: Option<&str>) -> anyhow::Result<()> Some(metadata) => { println!(" ID: {}", metadata.id.cyan()); println!(" Name: {}", metadata.name.white()); - println!(" State: {}", format!("{:?}", metadata.state).green()); + println!( + " State: {}", + format!("{:?}", metadata.state).green() + ); if let Some(desc) = &metadata.description { println!(" Description: {}", desc.white()); } diff --git a/crates/mofa-cli/src/commands/agent/stop.rs b/crates/mofa-cli/src/commands/agent/stop.rs index 96263869..972c8d3c 100644 --- a/crates/mofa-cli/src/commands/agent/stop.rs +++ b/crates/mofa-cli/src/commands/agent/stop.rs @@ -2,44 +2,80 @@ use crate::context::CliContext; use colored::Colorize; +use tracing::info; /// Execute the `mofa agent stop` command pub async fn run(ctx: &CliContext, agent_id: &str) -> anyhow::Result<()> { println!("{} Stopping agent: {}", "→".green(), agent_id.cyan()); // Check if agent exists - if !ctx.agent_registry.contains(agent_id).await { - anyhow::bail!("Agent '{}' not found in registry", agent_id); + if !ctx.persistent_agents.exists(agent_id).await { + anyhow::bail!("Agent '{}' not found", agent_id); } - // Attempt graceful shutdown via the agent instance - if let Some(agent) = ctx.agent_registry.get(agent_id).await { - let mut agent_guard = agent.write().await; - if let Err(e) = agent_guard.shutdown().await { - println!( - " {} Graceful shutdown failed: {}", - "!".yellow(), - e - ); - } - } - - // Unregister from the registry - let removed = ctx - .agent_registry - .unregister(agent_id) + // Get agent metadata + let mut agent_metadata = ctx + .persistent_agents + .get(agent_id) .await - .map_err(|e| anyhow::anyhow!("Failed to unregister agent: {}", e))?; + .ok_or_else(|| anyhow::anyhow!("Failed to retrieve agent metadata"))?; + + println!(" Name: {}", agent_metadata.name.cyan()); + println!( + " Status: {}", + agent_metadata.last_state.to_string().yellow() + ); + + // If running, stop the process + if agent_metadata.last_state == crate::state::AgentProcessState::Running { + if let Some(pid) = agent_metadata.process_id { + println!(" PID: {}", pid.to_string().cyan()); + println!(" {} Sending termination signal...", "→".green()); + + // Try graceful shutdown first, then force if needed + match ctx.process_manager.stop_agent_by_pid(pid, false).await { + Ok(_) => { + println!(" {} Process terminated gracefully", "✓".green()); - if removed { - println!("{} Agent '{}' stopped and unregistered", "✓".green(), agent_id); + // Give process time to clean up + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // Check if still running, force if necessary + if ctx.process_manager.is_running(pid) { + println!( + " {} Process still running, forcing termination...", + "!".yellow() + ); + if let Err(e) = ctx.process_manager.stop_agent_by_pid(pid, true).await { + eprintln!(" {} Failed to force termination: {}", "✗".red(), e); + } else { + println!(" {} Process force-terminated", "✓".green()); + } + } + } + Err(e) => { + println!(" {} Failed to terminate process: {}", "✗".red(), e); + println!( + " {} The agent may still be running. Try 'tasklist' or 'ps' to verify.", + "!".yellow() + ); + } + } + } } else { - println!( - "{} Agent '{}' was not in the registry", - "!".yellow(), - agent_id - ); + println!(" {} Agent is not running", "!".yellow()); } + // Update agent state to stopped + agent_metadata.mark_stopped(); + ctx.persistent_agents.update(agent_metadata).await?; + + // Untrack the process + ctx.persistent_agents.untrack_process(agent_id).await; + + println!("{} Agent '{}' stopped", "✓".green(), agent_id); + + info!("Agent '{}' stopped", agent_id); + Ok(()) } diff --git a/crates/mofa-cli/src/commands/plugin/uninstall.rs b/crates/mofa-cli/src/commands/plugin/uninstall.rs index 6bba45c0..6458a33c 100644 --- a/crates/mofa-cli/src/commands/plugin/uninstall.rs +++ b/crates/mofa-cli/src/commands/plugin/uninstall.rs @@ -34,11 +34,7 @@ pub async fn run(ctx: &CliContext, name: &str, force: bool) -> anyhow::Result<() if removed { println!("{} Plugin '{}' uninstalled", "✓".green(), name); } else { - println!( - "{} Plugin '{}' was not in the registry", - "!".yellow(), - name - ); + println!("{} Plugin '{}' was not in the registry", "!".yellow(), name); } Ok(()) diff --git a/crates/mofa-cli/src/commands/session/delete.rs b/crates/mofa-cli/src/commands/session/delete.rs index 363e9d69..783d4d86 100644 --- a/crates/mofa-cli/src/commands/session/delete.rs +++ b/crates/mofa-cli/src/commands/session/delete.rs @@ -8,7 +8,10 @@ use dialoguer::Confirm; pub async fn run(ctx: &CliContext, session_id: &str, force: bool) -> anyhow::Result<()> { if !force { let confirmed = Confirm::new() - .with_prompt(format!("Delete session '{}'? This cannot be undone", session_id)) + .with_prompt(format!( + "Delete session '{}'? This cannot be undone", + session_id + )) .default(false) .interact()?; @@ -29,11 +32,7 @@ pub async fn run(ctx: &CliContext, session_id: &str, force: bool) -> anyhow::Res if deleted { println!("{} Session '{}' deleted", "✓".green(), session_id); } else { - println!( - "{} Session '{}' not found", - "!".yellow(), - session_id - ); + println!("{} Session '{}' not found", "!".yellow(), session_id); } Ok(()) diff --git a/crates/mofa-cli/src/commands/session/list.rs b/crates/mofa-cli/src/commands/session/list.rs index 0d7cd9d3..723f1a81 100644 --- a/crates/mofa-cli/src/commands/session/list.rs +++ b/crates/mofa-cli/src/commands/session/list.rs @@ -6,7 +6,11 @@ use colored::Colorize; use serde::Serialize; /// Execute the `mofa session list` command -pub async fn run(ctx: &CliContext, agent_id: Option<&str>, limit: Option) -> anyhow::Result<()> { +pub async fn run( + ctx: &CliContext, + agent_id: Option<&str>, + limit: Option, +) -> anyhow::Result<()> { println!("{} Listing sessions", "→".green()); if let Some(agent) = agent_id { diff --git a/crates/mofa-cli/src/commands/tool/info.rs b/crates/mofa-cli/src/commands/tool/info.rs index 682694fe..7f8cb0ce 100644 --- a/crates/mofa-cli/src/commands/tool/info.rs +++ b/crates/mofa-cli/src/commands/tool/info.rs @@ -77,10 +77,7 @@ pub async fn run(ctx: &CliContext, name: &str) -> anyhow::Result<()> { None => { println!(" Tool '{}' not found in registry", name); println!(); - println!( - " Use {} to see available tools.", - "mofa tool list".cyan() - ); + println!(" Use {} to see available tools.", "mofa tool list".cyan()); } } diff --git a/crates/mofa-cli/src/context.rs b/crates/mofa-cli/src/context.rs index c4cbcc85..e16433a7 100644 --- a/crates/mofa-cli/src/context.rs +++ b/crates/mofa-cli/src/context.rs @@ -1,5 +1,7 @@ //! CLI context providing access to backend services +use crate::state::PersistentAgentRegistry; +use crate::utils::AgentProcessManager; use crate::utils::paths; use mofa_foundation::agent::session::SessionManager; use mofa_foundation::agent::tools::registry::ToolRegistry; @@ -14,6 +16,10 @@ pub struct CliContext { pub session_manager: SessionManager, /// In-memory agent registry pub agent_registry: AgentRegistry, + /// Persistent agent state storage + pub persistent_agents: Arc, + /// Agent process manager for spawning/managing processes + pub process_manager: AgentProcessManager, /// In-memory plugin registry pub plugin_registry: Arc, /// In-memory tool registry @@ -35,9 +41,18 @@ impl CliContext { .await .map_err(|e| anyhow::anyhow!("Failed to initialize session manager: {}", e))?; + let agents_dir = data_dir.join("agents"); + let persistent_agents = Arc::new(PersistentAgentRegistry::new(agents_dir).await.map_err( + |e| anyhow::anyhow!("Failed to initialize persistent agent registry: {}", e), + )?); + + let process_manager = AgentProcessManager::new(config_dir.clone()); + Ok(Self { session_manager, agent_registry: AgentRegistry::new(), + persistent_agents, + process_manager, plugin_registry: Arc::new(SimplePluginRegistry::new()), tool_registry: ToolRegistry::new(), data_dir, diff --git a/crates/mofa-cli/src/main.rs b/crates/mofa-cli/src/main.rs index a7660953..63df0cb5 100644 --- a/crates/mofa-cli/src/main.rs +++ b/crates/mofa-cli/src/main.rs @@ -6,6 +6,7 @@ mod config; mod context; mod output; mod render; +mod state; mod tui; mod utils; mod widgets; diff --git a/crates/mofa-cli/src/state/agent_state.rs b/crates/mofa-cli/src/state/agent_state.rs new file mode 100644 index 00000000..fa99146d --- /dev/null +++ b/crates/mofa-cli/src/state/agent_state.rs @@ -0,0 +1,387 @@ +//! Agent state persistence layer +//! +//! Manages persistent storage and lifecycle of agents on the local system. + +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::process::Child; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::{debug, info, warn}; + +/// Agent runtime state (in-memory process tracking) +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum AgentProcessState { + /// Agent is not running + Stopped, + /// Agent is starting up + Starting, + /// Agent is running + Running, + /// Agent is stopping + Stopping, + /// Agent has encountered an error + Error, +} + +impl std::fmt::Display for AgentProcessState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Stopped => write!(f, "Stopped"), + Self::Starting => write!(f, "Starting"), + Self::Running => write!(f, "Running"), + Self::Stopping => write!(f, "Stopping"), + Self::Error => write!(f, "Error"), + } + } +} + +/// Persistent agent metadata (stored on disk) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AgentMetadata { + /// Unique agent identifier + pub id: String, + /// Human-readable name + pub name: String, + /// Agent description + pub description: Option, + /// Path to agent configuration file + pub config_path: Option, + /// Last known state + pub last_state: AgentProcessState, + /// Timestamp when agent was registered (ms since epoch) + pub registered_at: u64, + /// Timestamp when agent was last started (ms since epoch) + pub last_started: Option, + /// Timestamp when agent was last stopped (ms since epoch) + pub last_stopped: Option, + /// Process ID if running + pub process_id: Option, + /// Number of times agent has been started + pub start_count: u32, + /// Custom metadata + pub tags: Vec, +} + +impl AgentMetadata { + /// Create new agent metadata + pub fn new(id: String, name: String) -> Self { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + + Self { + id, + name, + description: None, + config_path: None, + last_state: AgentProcessState::Stopped, + registered_at: now, + last_started: None, + last_stopped: None, + process_id: None, + start_count: 0, + tags: Vec::new(), + } + } + + /// Set configuration path + pub fn with_config(mut self, path: PathBuf) -> Self { + self.config_path = Some(path); + self + } + + /// Add tag + pub fn with_tag(mut self, tag: String) -> Self { + self.tags.push(tag); + self + } + + /// Mark as started + pub fn mark_started(&mut self, pid: u32) { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + self.last_started = Some(now); + self.process_id = Some(pid); + self.last_state = AgentProcessState::Running; + self.start_count += 1; + } + + /// Mark as stopped + pub fn mark_stopped(&mut self) { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + self.last_stopped = Some(now); + self.process_id = None; + self.last_state = AgentProcessState::Stopped; + } + + /// Mark as error + pub fn mark_error(&mut self) { + self.last_state = AgentProcessState::Error; + } +} + +/// Persistent agent registry - stores agents to disk +pub struct PersistentAgentRegistry { + /// Directory where agent metadata is stored + agents_dir: PathBuf, + /// In-memory cache of agent metadata + metadata_cache: Arc>>, + /// Map of running processes (agent_id -> Child process) + running_processes: Arc>>, +} + +/// Represents a running agent process +pub struct RunningAgent { + /// Agent metadata + pub metadata: AgentMetadata, + /// Process handle (may be None if process handle is lost) + pub process: Option, +} + +impl PersistentAgentRegistry { + /// Create or load agent registry from disk + pub async fn new(agents_dir: PathBuf) -> Result { + // Ensure directory exists + tokio::fs::create_dir_all(&agents_dir).await?; + + let mut metadata_cache = HashMap::new(); + + // Load existing agents from disk + let mut entries = tokio::fs::read_dir(&agents_dir).await?; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.extension().is_some_and(|ext| ext == "json") { + match tokio::fs::read_to_string(&path).await { + Ok(content) => { + match serde_json::from_str::(&content) { + Ok(mut metadata) => { + // Reset running state to stopped (process was not preserved across restarts) + metadata.last_state = AgentProcessState::Stopped; + metadata.process_id = None; + metadata_cache.insert(metadata.id.clone(), metadata); + } + Err(e) => { + warn!( + "Failed to parse agent metadata from {}: {}", + path.display(), + e + ); + } + } + } + Err(e) => { + warn!( + "Failed to read agent metadata from {}: {}", + path.display(), + e + ); + } + } + } + } + + Ok(Self { + agents_dir, + metadata_cache: Arc::new(RwLock::new(metadata_cache)), + running_processes: Arc::new(RwLock::new(HashMap::new())), + }) + } + + /// Register a new agent + pub async fn register(&self, metadata: AgentMetadata) -> Result<()> { + let agent_id = metadata.id.clone(); + let file_path = self.agents_dir.join(format!("{}.json", agent_id)); + + // Write to disk + let json = serde_json::to_string_pretty(&metadata)?; + tokio::fs::write(&file_path, json).await?; + + // Update cache + { + let mut cache = self.metadata_cache.write().await; + cache.insert(agent_id.clone(), metadata); + } + + info!("Registered agent: {} in {}", agent_id, file_path.display()); + Ok(()) + } + + /// Get agent metadata + pub async fn get(&self, agent_id: &str) -> Option { + let cache = self.metadata_cache.read().await; + cache.get(agent_id).cloned() + } + + /// List all agents + pub async fn list(&self) -> Vec { + let cache = self.metadata_cache.read().await; + let mut agents: Vec<_> = cache.values().cloned().collect(); + agents.sort_by(|a, b| a.id.cmp(&b.id)); + agents + } + + /// List running agents + pub async fn list_running(&self) -> Vec { + self.list() + .await + .into_iter() + .filter(|a| a.last_state == AgentProcessState::Running) + .collect() + } + + /// Update agent metadata + pub async fn update(&self, metadata: AgentMetadata) -> Result<()> { + let agent_id = metadata.id.clone(); + let file_path = self.agents_dir.join(format!("{}.json", agent_id)); + + // Write to disk + let json = serde_json::to_string_pretty(&metadata)?; + tokio::fs::write(&file_path, json).await?; + + // Update cache + { + let mut cache = self.metadata_cache.write().await; + cache.insert(agent_id.clone(), metadata); + } + + debug!("Updated agent metadata: {}", agent_id); + Ok(()) + } + + /// Remove agent + pub async fn remove(&self, agent_id: &str) -> Result { + let file_path = self.agents_dir.join(format!("{}.json", agent_id)); + + // Try to remove file + let removed = match tokio::fs::remove_file(&file_path).await { + Ok(_) => true, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => false, + Err(e) => return Err(e.into()), + }; + + // Remove from cache + { + let mut cache = self.metadata_cache.write().await; + cache.remove(agent_id); + } + + if removed { + info!("Removed agent: {} from {}", agent_id, file_path.display()); + } + + Ok(removed) + } + + /// Check if agent exists + pub async fn exists(&self, agent_id: &str) -> bool { + let cache = self.metadata_cache.read().await; + cache.contains_key(agent_id) + } + + /// Get count of registered agents + pub async fn count(&self) -> usize { + let cache = self.metadata_cache.read().await; + cache.len() + } + + /// Track a running process + pub async fn track_process(&self, agent_id: String, process: Child, metadata: AgentMetadata) { + let mut processes = self.running_processes.write().await; + processes.insert( + agent_id, + RunningAgent { + metadata, + process: Some(process), + }, + ); + } + + /// Get running process + pub async fn get_process(&self, agent_id: &str) -> Option { + let processes = self.running_processes.read().await; + processes.get(agent_id).and_then(|a| a.metadata.process_id) + } + + /// Remove tracking of a process + pub async fn untrack_process(&self, agent_id: &str) { + let mut processes = self.running_processes.write().await; + processes.remove(agent_id); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[tokio::test] + async fn test_agent_metadata_lifecycle() { + let mut metadata = AgentMetadata::new("agent-1".into(), "My Agent".into()); + assert_eq!(metadata.last_state, AgentProcessState::Stopped); + assert_eq!(metadata.start_count, 0); + + metadata.mark_started(12345); + assert_eq!(metadata.last_state, AgentProcessState::Running); + assert_eq!(metadata.process_id, Some(12345)); + assert_eq!(metadata.start_count, 1); + + metadata.mark_stopped(); + assert_eq!(metadata.last_state, AgentProcessState::Stopped); + assert_eq!(metadata.process_id, None); + } + + #[tokio::test] + async fn test_registry_persistence() -> Result<()> { + let temp_dir = TempDir::new()?; + let registry = PersistentAgentRegistry::new(temp_dir.path().to_path_buf()).await?; + + // Register agent + let metadata = AgentMetadata::new("agent-1".into(), "Test Agent".into()); + registry.register(metadata.clone()).await?; + + // Verify it exists + assert!(registry.exists("agent-1").await); + let retrieved = registry.get("agent-1").await; + assert_eq!(retrieved.map(|m| m.name), Some("Test Agent".into())); + + // Reload registry (simulating restart) + let registry2 = PersistentAgentRegistry::new(temp_dir.path().to_path_buf()).await?; + assert!(registry2.exists("agent-1").await); + assert_eq!(registry2.count().await, 1); + + Ok(()) + } + + #[tokio::test] + async fn test_list_running_agents() -> Result<()> { + let temp_dir = TempDir::new()?; + let registry = PersistentAgentRegistry::new(temp_dir.path().to_path_buf()).await?; + + // Register multiple agents + let mut agent1 = AgentMetadata::new("agent-1".into(), "Agent 1".into()); + agent1.mark_started(111); + registry.register(agent1).await?; + + let mut agent2 = AgentMetadata::new("agent-2".into(), "Agent 2".into()); + agent2.mark_started(222); + registry.register(agent2).await?; + + let agent3 = AgentMetadata::new("agent-3".into(), "Agent 3".into()); + registry.register(agent3).await?; + + // Only 2 are running + let running = registry.list_running().await; + assert_eq!(running.len(), 2); + + Ok(()) + } +} diff --git a/crates/mofa-cli/src/state/mod.rs b/crates/mofa-cli/src/state/mod.rs new file mode 100644 index 00000000..644832fb --- /dev/null +++ b/crates/mofa-cli/src/state/mod.rs @@ -0,0 +1,5 @@ +//! CLI state management + +pub mod agent_state; + +pub use agent_state::{AgentMetadata, AgentProcessState, PersistentAgentRegistry}; diff --git a/crates/mofa-cli/src/utils/mod.rs b/crates/mofa-cli/src/utils/mod.rs index e9fa4f2c..4e468ea0 100644 --- a/crates/mofa-cli/src/utils/mod.rs +++ b/crates/mofa-cli/src/utils/mod.rs @@ -2,5 +2,7 @@ pub mod env; pub mod paths; +pub mod process_manager; pub use paths::*; +pub use process_manager::AgentProcessManager; diff --git a/crates/mofa-cli/src/utils/process_manager.rs b/crates/mofa-cli/src/utils/process_manager.rs new file mode 100644 index 00000000..af691c04 --- /dev/null +++ b/crates/mofa-cli/src/utils/process_manager.rs @@ -0,0 +1,255 @@ +//! Agent process manager - handles spawning and managing agent runtime processes + +use anyhow::{Result, bail}; +use std::path::{Path, PathBuf}; +use std::process::{Child, Command, Stdio}; +use tracing::{debug, info, warn}; + +/// Manages agent runtime processes +pub struct AgentProcessManager { + /// Directory containing agent configurations + config_dir: PathBuf, +} + +impl AgentProcessManager { + /// Create new process manager + pub fn new(config_dir: PathBuf) -> Self { + Self { config_dir } + } + + /// Start an agent process + /// + /// # Arguments + /// * `agent_id` - Unique agent identifier + /// * `config_path` - Path to agent configuration file + /// * `daemon` - If true, run in background; if false, run in foreground + /// + /// # Returns + /// Process ID of the started agent, or error if startup failed + pub fn start_agent( + &self, + agent_id: &str, + config_path: Option<&Path>, + daemon: bool, + ) -> Result { + debug!("Starting agent: {} (daemon: {})", agent_id, daemon); + + // Determine config file to use + let config = if let Some(path) = config_path { + path.to_path_buf() + } else { + // Try to find config in default locations + let default_path = self.config_dir.join(format!("{}.yaml", agent_id)); + if !default_path.exists() { + bail!( + "No configuration found for agent '{}' at {}", + agent_id, + default_path.display() + ); + } + default_path + }; + + // Verify config file exists + if !config.exists() { + bail!("Agent configuration not found at: {}", config.display()); + } + + info!( + "Starting agent '{}' with config: {}", + agent_id, + config.display() + ); + + // Build the command to run the agent + let mut cmd = Command::new("cargo"); + cmd.arg("run") + .arg("-p") + .arg("mofa-cli") + .arg("--") + .arg("run") + .arg(config.to_string_lossy().as_ref()); + + // Configure output + if daemon { + cmd.stdout(Stdio::null()) + .stderr(Stdio::null()) + .stdin(Stdio::null()); + } else { + cmd.stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) + .stdin(Stdio::inherit()); + } + + // Spawn the process + let mut child = cmd + .spawn() + .map_err(|e| anyhow::anyhow!("Failed to start agent '{}' process: {}", agent_id, e))?; + + // Get the child process ID + let pid = child.id(); + info!("Agent '{}' started with PID: {}", agent_id, pid); + + // For daemon mode, we can let the process run independently + // For foreground mode, we wait for completion + if daemon { + // Detach from parent process in daemon mode + // On Unix, this is automatic; on Windows, we just let it go + std::mem::drop(child); + } + + Ok(pid) + } + + /// Stop an agent process by PID + /// + /// # Arguments + /// * `pid` - Process ID to terminate + /// * `force` - If true, use SIGKILL; if false, try SIGTERM first + pub async fn stop_agent_by_pid(&self, pid: u32, force: bool) -> Result<()> { + debug!("Stopping agent with PID: {} (force: {})", pid, force); + + #[cfg(unix)] + { + use nix::process::{Signal, kill}; + use nix::unistd::Pid; + + let nix_pid = Pid::from_raw(pid as i32); + let signal = if force { + Signal::SIGKILL + } else { + Signal::SIGTERM + }; + + match kill(nix_pid, Some(signal)) { + Ok(_) => { + info!("Sent {:?} to process {}", signal, pid); + Ok(()) + } + Err(e) => { + warn!("Failed to send {:?} to process {}: {}", signal, pid, e); + Err(anyhow::anyhow!( + "Failed to terminate process {}: {}", + pid, + e + )) + } + } + } + + #[cfg(windows)] + { + use std::process::Command; + + // On Windows, use taskkill command + let status = Command::new("taskkill") + .arg(if force { "/F" } else { "" }) + .arg("/PID") + .arg(pid.to_string()) + .status()?; + + if status.success() { + info!("Successfully terminated process {}", pid); + Ok(()) + } else { + bail!("Failed to terminate process {}", pid) + } + } + + #[cfg(not(any(unix, windows)))] + { + bail!("Agent process termination not supported on this platform") + } + } + + /// Check if a process is running + pub fn is_running(&self, pid: u32) -> bool { + #[cfg(unix)] + { + use nix::process::{Signal, kill}; + use nix::unistd::Pid; + + let nix_pid = Pid::from_raw(pid as i32); + // Signal 0 is used to check if process exists without sending a signal + kill(nix_pid, None).is_ok() + } + + #[cfg(windows)] + { + use std::process::Command; + + let output = Command::new("tasklist") + .arg("/FI") + .arg(format!("PID eq {}", pid)) + .output(); + + output + .map(|o| String::from_utf8_lossy(&o.stdout).contains(&pid.to_string())) + .unwrap_or(false) + } + + #[cfg(not(any(unix, windows)))] + { + false + } + } + + /// Validate agent configuration + pub fn validate_config(&self, config_path: &Path) -> Result<()> { + if !config_path.exists() { + bail!("Configuration file not found: {}", config_path.display()); + } + + // Try to load and parse as YAML + let content = std::fs::read_to_string(config_path)?; + serde_yaml::from_str::(&content) + .map_err(|e| anyhow::anyhow!("Invalid YAML in config: {}", e))?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[test] + fn test_process_manager_creation() { + let temp_dir = TempDir::new().unwrap(); + let manager = AgentProcessManager::new(temp_dir.path().to_path_buf()); + + assert!(manager.config_dir.exists()); + } + + #[test] + fn test_validate_config_missing_file() { + let temp_dir = TempDir::new().unwrap(); + let manager = AgentProcessManager::new(temp_dir.path().to_path_buf()); + + let result = manager.validate_config(std::path::Path::new("/nonexistent/config.yaml")); + assert!(result.is_err()); + } + + #[test] + fn test_validate_config_valid_yaml() { + let temp_dir = TempDir::new().unwrap(); + let config_path = temp_dir.path().join("config.yaml"); + std::fs::write(&config_path, "agent:\n name: Test\n id: test-1\n").unwrap(); + + let manager = AgentProcessManager::new(temp_dir.path().to_path_buf()); + let result = manager.validate_config(&config_path); + assert!(result.is_ok()); + } + + #[test] + fn test_validate_config_invalid_yaml() { + let temp_dir = TempDir::new().unwrap(); + let config_path = temp_dir.path().join("config.yaml"); + std::fs::write(&config_path, "agent:\n name: [unclosed").unwrap(); + + let manager = AgentProcessManager::new(temp_dir.path().to_path_buf()); + let result = manager.validate_config(&config_path); + assert!(result.is_err()); + } +} diff --git a/crates/mofa-cli/tests/agent_integration_tests.rs b/crates/mofa-cli/tests/agent_integration_tests.rs index 3cb989b0..8425de16 100644 --- a/crates/mofa-cli/tests/agent_integration_tests.rs +++ b/crates/mofa-cli/tests/agent_integration_tests.rs @@ -134,7 +134,10 @@ async fn test_registry_factory_create() { .unwrap(); let created = registry - .create("test-factory", AgentConfig::new("factory-agent", "Factory Agent")) + .create( + "test-factory", + AgentConfig::new("factory-agent", "Factory Agent"), + ) .await .unwrap(); let guard = created.read().await; From b055e1f3645297e2953817a50b42d4ba00cb2f02 Mon Sep 17 00:00:00 2001 From: Dorotea Monaco <134923734+doroteaMonaco@users.noreply.github.com> Date: Sun, 22 Feb 2026 15:24:08 +0100 Subject: [PATCH 2/5] fix: update nix dependency to include signal feature for process management --- crates/mofa-cli/Cargo.toml | 2 +- crates/mofa-cli/src/utils/process_manager.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/mofa-cli/Cargo.toml b/crates/mofa-cli/Cargo.toml index 1b38cb0a..2261d2e7 100644 --- a/crates/mofa-cli/Cargo.toml +++ b/crates/mofa-cli/Cargo.toml @@ -51,7 +51,7 @@ comfy-table = "7" dirs-next = "2" # Process management -nix = { version = "0.29", features = ["process"] } +nix = { version = "0.29", features = ["process", "signal"] } # Database (optional, for db init command) sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "mysql", "sqlite"], optional = true } diff --git a/crates/mofa-cli/src/utils/process_manager.rs b/crates/mofa-cli/src/utils/process_manager.rs index af691c04..16584fdb 100644 --- a/crates/mofa-cli/src/utils/process_manager.rs +++ b/crates/mofa-cli/src/utils/process_manager.rs @@ -111,7 +111,7 @@ impl AgentProcessManager { #[cfg(unix)] { - use nix::process::{Signal, kill}; + use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; let nix_pid = Pid::from_raw(pid as i32); @@ -166,7 +166,7 @@ impl AgentProcessManager { pub fn is_running(&self, pid: u32) -> bool { #[cfg(unix)] { - use nix::process::{Signal, kill}; + use nix::sys::signal::kill; use nix::unistd::Pid; let nix_pid = Pid::from_raw(pid as i32); From e2c10cc5d83e6723939e98ad061fee9609676019 Mon Sep 17 00:00:00 2001 From: Dorotea Monaco <134923734+doroteaMonaco@users.noreply.github.com> Date: Sun, 22 Feb 2026 15:35:09 +0100 Subject: [PATCH 3/5] chore: update process manager documentation for clarity --- crates/mofa-cli/src/commands/agent/stop.rs | 13 +++++-------- crates/mofa-cli/src/utils/process_manager.rs | 2 +- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/crates/mofa-cli/src/commands/agent/stop.rs b/crates/mofa-cli/src/commands/agent/stop.rs index f8369c42..8b9b9977 100644 --- a/crates/mofa-cli/src/commands/agent/stop.rs +++ b/crates/mofa-cli/src/commands/agent/stop.rs @@ -10,11 +10,13 @@ pub async fn run(ctx: &CliContext, agent_id: &str) -> anyhow::Result<()> { // Check if agent exists in registry or store let in_registry = ctx.agent_registry.contains(agent_id).await; - let in_store = ctx + + let previous_entry = ctx .agent_store .get(agent_id) - .map_err(|e| anyhow::anyhow!("Failed to check agent store: {}", e))? - .is_some(); + .map_err(|e| anyhow::anyhow!("Failed to load persisted agent '{}': {}", agent_id, e))?; + + let in_store = previous_entry.is_some(); if !in_registry && !in_store { anyhow::bail!("Agent '{}' not found", agent_id); @@ -28,11 +30,6 @@ pub async fn run(ctx: &CliContext, agent_id: &str) -> anyhow::Result<()> { } } - let previous_entry = ctx - .agent_store - .get(agent_id) - .map_err(|e| anyhow::anyhow!("Failed to load persisted agent '{}': {}", agent_id, e))?; - let persisted_updated = if let Some(mut entry) = previous_entry.clone() { entry.state = "Stopped".to_string(); ctx.agent_store diff --git a/crates/mofa-cli/src/utils/process_manager.rs b/crates/mofa-cli/src/utils/process_manager.rs index 16584fdb..5fceeaa7 100644 --- a/crates/mofa-cli/src/utils/process_manager.rs +++ b/crates/mofa-cli/src/utils/process_manager.rs @@ -111,7 +111,7 @@ impl AgentProcessManager { #[cfg(unix)] { - use nix::sys::signal::{kill, Signal}; + use nix::sys::signal::{Signal, kill}; use nix::unistd::Pid; let nix_pid = Pid::from_raw(pid as i32); From 4939fe8dec964ada4f9411e6e3e1d6123ccdd222 Mon Sep 17 00:00:00 2001 From: Dorotea Monaco <134923734+doroteaMonaco@users.noreply.github.com> Date: Sun, 22 Feb 2026 15:49:03 +0100 Subject: [PATCH 4/5] fix: enhance agent stop command to update persisted state with force flag --- crates/mofa-cli/src/commands/agent/stop.rs | 26 ++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/crates/mofa-cli/src/commands/agent/stop.rs b/crates/mofa-cli/src/commands/agent/stop.rs index ed26f870..d78f1cfc 100644 --- a/crates/mofa-cli/src/commands/agent/stop.rs +++ b/crates/mofa-cli/src/commands/agent/stop.rs @@ -26,6 +26,32 @@ pub async fn run( anyhow::bail!("Agent '{}' not found", agent_id); } + // When commands run in separate CLI invocations, runtime registry state can be absent. + // In that case, if force_persisted_stop is true, update the persisted state. + if !in_registry && in_store && force_persisted_stop { + if let Some(mut entry) = previous_entry { + entry.state = "Stopped".to_string(); + ctx.agent_store + .save(agent_id, &entry) + .map_err(|e| anyhow::anyhow!("Failed to update agent '{}': {}", agent_id, e))?; + + println!( + "{} Agent '{}' persisted state updated to Stopped", + "✓".green(), + agent_id + ); + return Ok(()); + } + } + + // If not in registry and no force flag, error out + if !in_registry { + anyhow::bail!( + "Agent '{}' is not active in runtime registry. Use --force-persisted-stop to update persisted state.", + agent_id + ); + } + // Attempt graceful shutdown via the agent instance if let Some(agent) = ctx.agent_registry.get(agent_id).await { let mut agent_guard = agent.write().await; From 54f986c5cf97f95b8c46cbb3a45beef404e52d88 Mon Sep 17 00:00:00 2001 From: Dorotea Monaco <134923734+doroteaMonaco@users.noreply.github.com> Date: Mon, 23 Feb 2026 20:39:10 +0100 Subject: [PATCH 5/5] refactor: remove duplicate format_timestamp function --- crates/mofa-cli/src/commands/agent/list.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/crates/mofa-cli/src/commands/agent/list.rs b/crates/mofa-cli/src/commands/agent/list.rs index 61d3017f..3d5cb502 100644 --- a/crates/mofa-cli/src/commands/agent/list.rs +++ b/crates/mofa-cli/src/commands/agent/list.rs @@ -103,16 +103,6 @@ fn format_timestamp(millis: u64) -> String { datetime.format("%Y-%m-%d %H:%M:%S").to_string() } -/// Format timestamp as human-readable string -fn format_timestamp(millis: u64) -> String { - use chrono::{DateTime, Local}; - use std::time::UNIX_EPOCH; - - let duration = std::time::Duration::from_millis(millis); - let datetime = DateTime::::from(UNIX_EPOCH + duration); - datetime.format("%Y-%m-%d %H:%M:%S").to_string() -} - /// Formats a duration into a human-readable string (e.g., "2h 15m", "45s"). fn format_duration(duration: chrono::Duration) -> String { let seconds = duration.num_seconds();