diff --git a/Cargo.lock b/Cargo.lock index 3649a6a..2fb7653 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2929,7 +2929,7 @@ dependencies = [ [[package]] name = "nstealth" version = "0.1.0" -source = "git+https://github.com/gen0sec/nstealth?rev=1a11641f451fb58d0d04c9164c44b9e778d3e012#1a11641f451fb58d0d04c9164c44b9e778d3e012" +source = "git+https://github.com/gen0sec/nstealth?rev=3c87751b9d9537b055a119f155a730360a7d0078#3c87751b9d9537b055a119f155a730360a7d0078" dependencies = [ "serde", "sha2", diff --git a/Cargo.toml b/Cargo.toml index 3f741b4..1ac2fb1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,7 +96,7 @@ pingora-memory-cache = { git = "https://github.com/gen0sec/pingora", rev = "c921 # JA4+ fingerprinting library # nstealth = { path = "../nstealth" } -nstealth = { git = "https://github.com/gen0sec/nstealth", rev = "1a11641f451fb58d0d04c9164c44b9e778d3e012" } +nstealth = { git = "https://github.com/gen0sec/nstealth", rev = "3c87751b9d9537b055a119f155a730360a7d0078" } mimalloc = { version = "0.1.48", default-features = false } dashmap = "7.0.0-rc2" diff --git a/src/logger/pingora_access_appender.rs b/src/logger/pingora_access_appender.rs new file mode 100644 index 0000000..a9ab2d5 --- /dev/null +++ b/src/logger/pingora_access_appender.rs @@ -0,0 +1,59 @@ +use log::Record; +use log4rs::append::Append; +use std::fs::{File, OpenOptions}; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::sync::Mutex; + +/// Minimal file appender used for Pingora access logs. +/// +/// Pingora emits access logs through the `pingora_proxy` logger target. We keep this +/// separate from the main access log appender to avoid coupling to log4rs encoders. +pub struct PingoraAccessAppender { + path: PathBuf, + file: Mutex, +} + +impl std::fmt::Debug for PingoraAccessAppender { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PingoraAccessAppender") + .field("path", &self.path) + .field("file", &"") + .finish() + } +} + +impl PingoraAccessAppender { + pub fn new(path: &Path) -> Result> { + let file = OpenOptions::new() + .create(true) + .append(true) + .open(path)?; + + Ok(Self { + path: path.to_path_buf(), + file: Mutex::new(file), + }) + } +} + +impl Append for PingoraAccessAppender { + fn append(&self, record: &Record) -> anyhow::Result<()> { + // Pingora access logs are already formatted as a single line message (typically JSON). + let mut msg = format!("{}", record.args()); + if !msg.ends_with('\n') { + msg.push('\n'); + } + + let mut file = self.file.lock().unwrap(); + file.write_all(msg.as_bytes())?; + Ok(()) + } + + fn flush(&self) { + if let Ok(mut file) = self.file.lock() { + let _ = file.flush(); + } + } +} + diff --git a/src/main.rs b/src/main.rs index 7b47f58..79d9335 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ use std::io; use std::mem::MaybeUninit; use std::str::FromStr; use std::sync::Arc; +use std::collections::HashMap; use anyhow::{Context, Result}; use clap::Parser; @@ -59,11 +60,15 @@ use crate::utils::fingerprint::tcp_fingerprint::TcpFingerprintCollector; use crate::utils::fingerprint::tcp_fingerprint::TcpFingerprintConfig; use crate::logger::access_log::LogSenderConfig; +use crate::platform::agent_status::{ + AgentStatusIdentity, add_platform_metadata, derive_agent_id, read_workspace_id_from_env, +}; use crate::platform::authcheck::validate_api_key; use crate::security::waf::actions::captcha::{ CaptchaConfig, CaptchaProvider, init_captcha_client, start_cache_cleanup_task, }; use crate::utils::http_client::init_global_client; +use crate::worker::agent_status::AgentStatusWorker; use crate::worker::log::set_log_sender_config; fn main() -> Result<()> { @@ -844,6 +849,112 @@ async fn async_main(args: Args, config: Config) -> Result<()> { if let Err(e) = worker_manager.register_worker(worker_config, log_sender_worker) { log::error!("Failed to register log sender worker: {}", e); } + + // Register agent status worker (register + heartbeat) while the unified event queue exists. + // Minimal-conflict workspace_id strategy: read from env only; if missing, derive from agent_name only. + if std::env::var("AGENT_ID").ok().is_some() { + log::warn!("AGENT_ID is ignored; agent_id is derived from agent_name + workspace_id."); + } + + let hostname = std::env::var("HOSTNAME") + .ok() + .filter(|s| !s.trim().is_empty()) + .unwrap_or_else(|| gethostname::gethostname().to_string_lossy().into_owned()); + + let agent_name = std::env::var("AGENT_NAME") + .ok() + .filter(|s| !s.trim().is_empty()) + .unwrap_or_else(|| hostname.clone()); + + let workspace_id = read_workspace_id_from_env(); + if workspace_id.is_none() { + log::warn!( + "WORKSPACE_ID not set; agent_id derived only from agent_name '{}'. Set WORKSPACE_ID (or ARXIGNIS_WORKSPACE_ID) to avoid collisions across organizations.", + agent_name + ); + } + + let agent_id = derive_agent_id(&agent_name, workspace_id.as_deref()); + + let tags = std::env::var("AGENT_TAGS") + .ok() + .map(|value| { + value + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect::>() + }) + .unwrap_or_default(); + + let ip_addresses = std::env::var("AGENT_IP_ADDRESSES") + .or_else(|_| std::env::var("AGENT_IPS")) + .ok() + .map(|value| { + value + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect::>() + }) + .unwrap_or_default(); + + let mut capabilities: Vec = Vec::new(); + capabilities.push("log_sender".to_string()); + if config.logging.bpf_stats.enabled && !state.skels.is_empty() { + capabilities.push("bpf_stats".to_string()); + if config.logging.bpf_stats.enable_dropped_ip_events { + capabilities.push("bpf_stats_dropped_ip_events".to_string()); + } + } + if config.logging.tcp_fingerprint.enabled && !state.skels.is_empty() { + capabilities.push("tcp_fingerprint".to_string()); + if config.logging.tcp_fingerprint.enable_fingerprint_events { + capabilities.push("tcp_fingerprint_events".to_string()); + } + } + if !state.skels.is_empty() { + capabilities.push("xdp".to_string()); + } + + let mut metadata = HashMap::new(); + metadata.insert("os".to_string(), std::env::consts::OS.to_string()); + metadata.insert("arch".to_string(), std::env::consts::ARCH.to_string()); + metadata.insert("version".to_string(), env!("CARGO_PKG_VERSION").to_string()); + metadata.insert("mode".to_string(), config.mode.clone()); + metadata.insert("platform_base_url".to_string(), config.platform.base_url.clone()); + add_platform_metadata(&mut metadata); + + let started_at = chrono::Utc::now(); + let identity = AgentStatusIdentity { + agent_id, + agent_name, + hostname, + version: env!("CARGO_PKG_VERSION").to_string(), + mode: config.mode.clone(), + tags, + capabilities, + interfaces: iface_names.clone(), + ip_addresses, + metadata, + started_at, + }; + + let heartbeat_secs = std::env::var("AGENT_HEARTBEAT_SECS") + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(30); + + let worker_config = worker::WorkerConfig { + name: "agent_status".to_string(), + interval_secs: heartbeat_secs, + enabled: true, + }; + + let worker = AgentStatusWorker::new(identity, heartbeat_secs); + if let Err(e) = worker_manager.register_worker(worker_config, worker) { + log::error!("Failed to register agent status worker: {}", e); + } } // Determine if we have API key for full functionality diff --git a/src/platform/agent_status.rs b/src/platform/agent_status.rs new file mode 100644 index 0000000..e97e628 --- /dev/null +++ b/src/platform/agent_status.rs @@ -0,0 +1,149 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::collections::HashMap; +use std::fs; + +/// Agent status identity (static-ish fields) used to produce register/heartbeat events. +#[derive(Debug, Clone)] +pub struct AgentStatusIdentity { + pub agent_id: String, + pub agent_name: String, + pub hostname: String, + pub version: String, + pub mode: String, + pub tags: Vec, + pub capabilities: Vec, + pub interfaces: Vec, + pub ip_addresses: Vec, + pub metadata: HashMap, + pub started_at: DateTime, +} + +impl AgentStatusIdentity { + pub fn to_event(&self, status: &str, now: DateTime) -> AgentStatusEvent { + let uptime_secs = (now - self.started_at).num_seconds(); + AgentStatusEvent { + schema_version: "1.0".to_string(), + timestamp: now.clone(), + agent_id: self.agent_id.clone(), + agent_name: self.agent_name.clone(), + hostname: self.hostname.clone(), + version: self.version.clone(), + mode: self.mode.clone(), + status: status.to_string(), + pid: std::process::id(), + started_at: self.started_at.clone(), + last_seen: now, + uptime_secs, + tags: self.tags.clone(), + capabilities: self.capabilities.clone(), + interfaces: self.interfaces.clone(), + ip_addresses: self.ip_addresses.clone(), + metadata: self.metadata.clone(), + } + } +} + +/// Agent status event payload. `event_type` is provided by the `UnifiedEvent` wrapper. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AgentStatusEvent { + pub schema_version: String, + pub timestamp: DateTime, + pub agent_id: String, + pub agent_name: String, + pub hostname: String, + pub version: String, + pub mode: String, + pub status: String, + pub pid: u32, + pub started_at: DateTime, + pub last_seen: DateTime, + pub uptime_secs: i64, + pub tags: Vec, + pub capabilities: Vec, + pub interfaces: Vec, + pub ip_addresses: Vec, + pub metadata: HashMap, +} + +pub fn read_workspace_id_from_env() -> Option { + // Minimal-conflict option: do NOT plumb workspace_id through config; read env only. + for key in ["WORKSPACE_ID", "ARXIGNIS_WORKSPACE_ID", "AX_ARXIGNIS_WORKSPACE_ID"] { + if let Ok(value) = std::env::var(key) { + let v = value.trim().to_string(); + if !v.is_empty() { + return Some(v); + } + } + } + None +} + +pub fn derive_agent_id(agent_name: &str, workspace_id: Option<&str>) -> String { + let mut hasher = Sha256::new(); + if let Some(wid) = workspace_id { + hasher.update(agent_name.as_bytes()); + hasher.update(b":"); + hasher.update(wid.as_bytes()); + } else { + // Still deterministic (but may collide across orgs) - warn at runtime in main. + hasher.update(agent_name.as_bytes()); + } + format!("{:x}", hasher.finalize()) +} + +pub fn add_platform_metadata(metadata: &mut HashMap) { + if let Some(k) = read_kernel_version() { + metadata.insert("kernel_version".to_string(), k); + } + if let Some(osr) = read_os_release() { + if let Some(id) = osr.get("ID") { + metadata.insert("linux_type".to_string(), id.clone()); + } + if let Some(version_id) = osr.get("VERSION_ID") { + metadata.insert("linux_version".to_string(), version_id.clone()); + } + if let Some(pretty) = osr.get("PRETTY_NAME") { + metadata.insert("linux_pretty_name".to_string(), pretty.clone()); + } + } +} + +fn read_kernel_version() -> Option { + fs::read_to_string("/proc/sys/kernel/osrelease") + .ok() + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) +} + +fn read_os_release() -> Option> { + // Try both common locations; distroless images often still contain /etc/os-release. + let content = fs::read_to_string("/etc/os-release") + .or_else(|_| fs::read_to_string("/usr/lib/os-release")) + .ok()?; + + let mut map = HashMap::new(); + for line in content.lines() { + let line = line.trim(); + if line.is_empty() || line.starts_with('#') { + continue; + } + let Some((k, v)) = line.split_once('=') else { + continue; + }; + let key = k.trim().to_string(); + let mut value = v.trim().to_string(); + // Strip surrounding quotes if present. + if (value.starts_with('"') && value.ends_with('"')) + || (value.starts_with('\'') && value.ends_with('\'')) + { + value = value[1..value.len() - 1].to_string(); + } + if !key.is_empty() && !value.is_empty() { + map.insert(key, value); + } + } + + if map.is_empty() { None } else { Some(map) } +} diff --git a/src/platform/mod.rs b/src/platform/mod.rs index e201258..37b71e8 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -1 +1,2 @@ pub mod authcheck; +pub mod agent_status; diff --git a/src/worker/agent_status.rs b/src/worker/agent_status.rs new file mode 100644 index 0000000..ada8ddd --- /dev/null +++ b/src/worker/agent_status.rs @@ -0,0 +1,62 @@ +use std::time::Duration; + +use tokio::sync::watch; +use tokio::time::{MissedTickBehavior, interval}; + +use crate::platform::agent_status::AgentStatusIdentity; +use crate::worker::log::{UnifiedEvent, send_event}; + +/// Agent status worker that sends register + heartbeat events. +/// +/// This piggybacks on the unified event queue (same as logs), so it should only be +/// registered when the log sender queue is initialized (i.e. log_sending is enabled). +pub struct AgentStatusWorker { + identity: AgentStatusIdentity, + interval_secs: u64, +} + +impl AgentStatusWorker { + pub fn new(identity: AgentStatusIdentity, interval_secs: u64) -> Self { + Self { + identity, + interval_secs, + } + } +} + +impl super::Worker for AgentStatusWorker { + fn name(&self) -> &str { + "agent_status" + } + + fn run(&self, mut shutdown: watch::Receiver) -> tokio::task::JoinHandle<()> { + let identity = self.identity.clone(); + let interval_secs = self.interval_secs; + let worker_name = self.name().to_string(); + + tokio::spawn(async move { + let mut tick = interval(Duration::from_secs(interval_secs)); + tick.set_missed_tick_behavior(MissedTickBehavior::Skip); + + // Initial register event + let now = chrono::Utc::now(); + send_event(UnifiedEvent::AgentStatus(identity.to_event("running", now))); + + loop { + tokio::select! { + _ = shutdown.changed() => { + if *shutdown.borrow() { + log::info!("[{}] Shutdown signal received, stopping agent status worker", worker_name); + break; + } + } + _ = tick.tick() => { + let now = chrono::Utc::now(); + send_event(UnifiedEvent::AgentStatus(identity.to_event("running", now))); + } + } + } + }) + } +} + diff --git a/src/worker/log.rs b/src/worker/log.rs index a014173..5f9df21 100644 --- a/src/worker/log.rs +++ b/src/worker/log.rs @@ -345,6 +345,8 @@ pub enum UnifiedEvent { DroppedIp(crate::logger::bpf_stats::DroppedIpEvent), #[serde(rename = "tcp_fingerprint")] TcpFingerprint(crate::utils::fingerprint::tcp_fingerprint::TcpFingerprintEvent), + #[serde(rename = "agent_status")] + AgentStatus(crate::platform::agent_status::AgentStatusEvent), } impl UnifiedEvent { @@ -354,6 +356,7 @@ impl UnifiedEvent { UnifiedEvent::HttpAccessLog(_) => "http_access_log", UnifiedEvent::DroppedIp(_) => "dropped_ip", UnifiedEvent::TcpFingerprint(_) => "tcp_fingerprint", + UnifiedEvent::AgentStatus(_) => "agent_status", } } @@ -363,6 +366,7 @@ impl UnifiedEvent { UnifiedEvent::HttpAccessLog(event) => event.timestamp, UnifiedEvent::DroppedIp(event) => event.timestamp, UnifiedEvent::TcpFingerprint(event) => event.timestamp, + UnifiedEvent::AgentStatus(event) => event.timestamp, } } @@ -486,6 +490,7 @@ fn estimate_event_size(event: &UnifiedEvent) -> usize { } UnifiedEvent::DroppedIp(_) => base_size + 200, // Dropped IP events are relatively small UnifiedEvent::TcpFingerprint(_) => base_size + 100, // TCP fingerprint events are small + UnifiedEvent::AgentStatus(_) => base_size + 800, // Agent status events are small/medium JSON payloads } } diff --git a/src/worker/mod.rs b/src/worker/mod.rs index e0e6c48..2f2ea61 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -1,3 +1,4 @@ +pub mod agent_status; pub mod certificate; pub mod config; pub mod geoip_mmdb;