diff --git a/.zed/settings.json b/.zed/settings.json new file mode 100644 index 00000000..a75d4611 --- /dev/null +++ b/.zed/settings.json @@ -0,0 +1,32 @@ +// Folder-specific settings +// +// For a full list of overridable settings, and general information on folder-specific settings, +// see the documentation: https://zed.dev/docs/configuring-zed#settings-files +{ + "file_scan_exclusions": [ + ".data", + ".git$", + ".hg", + ".rsync_cache", + "assets/webpack", + "build/static", + "client/build", + "CVS", + "node_modules", + "public/system/*", + "tmp/cache", + "**/.classpath", + "**/.DS_Store", + "**/.git", + "**/.hg", + "**/.settings", + "**/.svn", + "**/CVS", + "**/Thumbs.db", + ".svn", + "gems/scheduler/ext/*", + "gems/server/ext/*", + "benchmarks/results/*", + "docs/static/*" + ] +} diff --git a/CHANGELOG.md b/CHANGELOG.md index 7326448c..229be525 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +## [0.2.18] - 2025-XX-XX +### WIP +-- Fixing error in auto-reload on Linux when reuse_port is false +-- Fix breaking of auto-reload on config file errors +-- include directive is relative (equivalent to require_relative) +-- Fixing preload gem group logic +-- Fix errors in interrupt handling during some debug flows + ## [0.2.17] - 2025-05-31 - Enabled vectorized writes in IoSteam - Replaced all usage of heap-allocated BoxBody with HttpBody enums diff --git a/Cargo.lock b/Cargo.lock index a086a600..8e6a9caf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1644,7 +1644,7 @@ checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" [[package]] name = "itsi-scheduler" -version = "0.2.17" +version = "0.2.18" dependencies = [ "bytes", "derive_more", @@ -1662,7 +1662,7 @@ dependencies = [ [[package]] name = "itsi-server" -version = "0.2.17" +version = "0.2.18" dependencies = [ "argon2", "async-channel", diff --git a/Gemfile.lock b/Gemfile.lock index 3b727690..a3a5934e 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,20 +1,20 @@ PATH remote: . specs: - itsi (0.2.17) - itsi-scheduler (~> 0.2.17) - itsi-server (~> 0.2.17) + itsi (0.2.18) + itsi-scheduler (~> 0.2.18) + itsi-server (~> 0.2.18) PATH remote: gems/scheduler specs: - itsi-scheduler (0.2.17) + itsi-scheduler (0.2.18) rb_sys (~> 0.9.91) PATH remote: gems/server specs: - itsi-server (0.2.17) + itsi-server (0.2.18) json (~> 2) prism (~> 1.4) rack (>= 1.6) diff --git a/crates/itsi_scheduler/Cargo.toml b/crates/itsi_scheduler/Cargo.toml index f6f7b438..76aae105 100644 --- a/crates/itsi_scheduler/Cargo.toml +++ b/crates/itsi_scheduler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "itsi-scheduler" -version = "0.2.17" +version = "0.2.18" edition = "2021" authors = ["Wouter Coppieters "] license = "MIT" diff --git a/crates/itsi_server/Cargo.toml b/crates/itsi_server/Cargo.toml index ed6fbb8b..ae4d9aa7 100644 --- a/crates/itsi_server/Cargo.toml +++ b/crates/itsi_server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "itsi-server" -version = "0.2.17" +version = "0.2.18" edition = "2021" authors = ["Wouter Coppieters "] license = "MIT" diff --git a/crates/itsi_server/src/ruby_types/itsi_server/file_watcher.rs b/crates/itsi_server/src/ruby_types/itsi_server/file_watcher.rs index 08219cc0..532ff1b5 100644 --- a/crates/itsi_server/src/ruby_types/itsi_server/file_watcher.rs +++ b/crates/itsi_server/src/ruby_types/itsi_server/file_watcher.rs @@ -1,69 +1,136 @@ use derive_more::Debug; use globset::{Glob, GlobSet, GlobSetBuilder}; use magnus::error::Result; -use nix::unistd::{close, fork, pipe, read}; +use nix::unistd::{close, dup, fork, pipe, read, write}; use notify::event::ModifyKind; -use notify::{Event, RecursiveMode, Watcher}; -use notify::{EventKind, RecommendedWatcher}; +use notify::{Event, EventKind, RecursiveMode, Watcher}; +use parking_lot::Mutex; +use std::collections::{HashMap, HashSet}; +use std::fs; +use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; use std::path::Path; -use std::sync::mpsc::Sender; +use std::path::PathBuf; +use std::process::Command; + +use std::sync::{mpsc, Arc}; +use std::thread; use std::time::{Duration, Instant}; -use std::{collections::HashSet, fs}; -use std::{ - os::fd::{AsRawFd, IntoRawFd, OwnedFd}, - path::PathBuf, - process::Command, - sync::mpsc, - thread::{self}, -}; -use tracing::debug; - -/// Represents a set of patterns and commands. +use tracing::{error, info}; + #[derive(Debug, Clone)] struct PatternGroup { base_dir: PathBuf, glob_set: GlobSet, - pattern: String, commands: Vec>, + pattern: String, last_triggered: Option, } -/// Extracts the base directory from a wildcard pattern by taking the portion up to the first -/// component that contains a wildcard character. -fn extract_and_canonicalize_base_dir(pattern: &str) -> PathBuf { - if !(pattern.contains("*") || pattern.contains("?") || pattern.contains('[')) { - let base = PathBuf::from("."); - return fs::canonicalize(&base).unwrap_or(base); +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum WatcherCommand { + Stop, + ConfigError, + Continue, +} + +#[derive(Debug)] +pub struct WatcherPipes { + pub read_fd: OwnedFd, + pub write_fd: OwnedFd, +} + +impl AsRawFd for WatcherPipes { + fn as_raw_fd(&self) -> RawFd { + self.read_fd.as_raw_fd() } +} + +impl Drop for WatcherPipes { + fn drop(&mut self) { + let _ = send_watcher_command(&self.write_fd, WatcherCommand::Stop); + let _ = close(self.read_fd.as_raw_fd()); + let _ = close(self.write_fd.as_raw_fd()); + } +} +fn extract_and_canonicalize_base_dir(pattern: &str) -> (PathBuf, String) { let path = Path::new(pattern); let mut base = PathBuf::new(); + let mut remaining_components = Vec::new(); + let mut found_glob = false; + for comp in path.components() { let comp_str = comp.as_os_str().to_string_lossy(); - if comp_str.contains('*') || comp_str.contains('?') || comp_str.contains('[') { - break; + if !found_glob + && (comp_str.contains('*') || comp_str.contains('?') || comp_str.contains('[')) + { + found_glob = true; + remaining_components.push(comp_str.to_string()); + } else if found_glob { + remaining_components.push(comp_str.to_string()); } else { base.push(comp); } } - // If no base was built, default to "." - let base = if base.as_os_str().is_empty() || !base.exists() { + + let base = if base.as_os_str().is_empty() { PathBuf::from(".") } else { base }; + let base = fs::canonicalize(&base).unwrap_or(base); + let remaining_pattern = remaining_components.join("/"); - fs::canonicalize(&base).unwrap_or(base) + (base, remaining_pattern) } -/// Minimum time between triggering the same pattern group (debounce time) -const DEBOUNCE_DURATION: Duration = Duration::from_millis(2000); +const DEBOUNCE_DURATION: Duration = Duration::from_millis(300); +const EVENT_DEDUP_DURATION: Duration = Duration::from_millis(50); +const AUTO_RECOVERY_TIMEOUT: Duration = Duration::from_secs(5); + +fn serialize_command(cmd: WatcherCommand) -> u8 { + match cmd { + WatcherCommand::Stop => 0, + WatcherCommand::ConfigError => 1, + WatcherCommand::Continue => 2, + } +} + +fn deserialize_command(byte: u8) -> Option { + match byte { + 0 => Some(WatcherCommand::Stop), + 1 => Some(WatcherCommand::ConfigError), + 2 => Some(WatcherCommand::Continue), + _ => None, + } +} + +pub fn send_watcher_command(fd: &OwnedFd, cmd: WatcherCommand) -> Result<()> { + let buf = [serialize_command(cmd)]; + match write(fd, &buf) { + Ok(_) => Ok(()), + Err(e) => Err(magnus::Error::new( + magnus::exception::standard_error(), + format!("Failed to send command to watcher: {}", e), + )), + } +} + +pub fn watch_groups( + pattern_groups: Vec<(String, Vec>)>, +) -> Result> { + // Create bidirectional pipes for communication + let (parent_read_fd, child_write_fd): (OwnedFd, OwnedFd) = pipe().map_err(|e| { + magnus::Error::new( + magnus::exception::standard_error(), + format!("Failed to create parent read pipe: {}", e), + ) + })?; -pub fn watch_groups(pattern_groups: Vec<(String, Vec>)>) -> Result> { - let (r_fd, w_fd): (OwnedFd, OwnedFd) = pipe().map_err(|e| { + let (child_read_fd, parent_write_fd): (OwnedFd, OwnedFd) = pipe().map_err(|e| { magnus::Error::new( magnus::exception::standard_error(), - format!("Failed to create watcher pipe: {}", e), + format!("Failed to create child read pipe: {}", e), ) })?; @@ -77,17 +144,41 @@ pub fn watch_groups(pattern_groups: Vec<(String, Vec>)>) -> Result { + info!("Parent closed command pipe, exiting watcher"); std::process::exit(0); } - Ok(_) => {} - Err(_) => { - std::process::exit(0); + Ok(_) => { + if let Some(cmd) = deserialize_command(buf[0]) { + info!("Received command from parent: {:?}", cmd); + *command_channel_clone.lock() = Some(cmd); + + if matches!(cmd, WatcherCommand::Stop) { + info!("Received stop command, exiting watcher"); + std::process::exit(0); + } + } + } + Err(e) => { + error!("Error reading from command pipe: {}", e); + std::process::exit(1); } } } @@ -95,11 +186,19 @@ pub fn watch_groups(pattern_groups: Vec<(String, Vec>)>) -> Result>)>) -> Result>(); + let startup_time = Instant::now(); let sender = tx.clone(); - fn event_fn(sender: Sender>) -> impl Fn(notify::Result) { - move |res| match res { - Ok(event) => { - sender.send(Ok(event)).unwrap(); - } - Err(e) => println!("watch error: {:?}", e), + + let event_fn = move |res: notify::Result| { + if let Ok(event) = res { + sender.send(Ok(event)).unwrap_or_else(|e| { + error!("Failed to send event: {}", e); + }); + } else if let Err(e) = res { + error!("Watch error: {:?}", e); } - } + }; + + let mut watched_paths = HashSet::new(); + let mut watcher = notify::recommended_watcher(event_fn).expect("Failed to create watcher"); - let mut watched_dirs = HashSet::new(); - let mut watcher: RecommendedWatcher = - notify::recommended_watcher(event_fn(sender)).expect("Failed to create watcher"); for group in &groups { - if watched_dirs.insert(group.base_dir.clone()) { - debug!("Watching {}/{}", group.base_dir.display(), group.pattern); + if watched_paths.insert(group.base_dir.clone()) { + let recursive = if group.pattern.is_empty() { + RecursiveMode::NonRecursive + } else { + RecursiveMode::Recursive + }; + watcher - .watch(&group.base_dir, RecursiveMode::Recursive) + .watch(&group.base_dir, recursive) .expect("Failed to add watch"); } } - debug!("Monitored groups {:?}", groups.len()); - // Main event loop. + // Wait briefly to avoid initial event storm + thread::sleep(Duration::from_millis(100)); + + // State management + let mut recent_events: HashMap<(PathBuf, EventKind), Instant> = HashMap::new(); + let restart_state = Arc::new(Mutex::new(None::)); + + // Main event loop for res in rx { match res { Ok(event) => { if !matches!(event.kind, EventKind::Modify(ModifyKind::Data(_))) { continue; } - debug!("Event fired {:?}", event); + let now = Instant::now(); + + // Skip startup events + if now.duration_since(startup_time) < Duration::from_millis(500) { + continue; + } + + // Deduplicate events + let mut should_process = true; + for path in &event.paths { + let event_key = (path.clone(), event.kind); + if let Some(&last_seen) = recent_events.get(&event_key) { + if now.duration_since(last_seen) < EVENT_DEDUP_DURATION { + should_process = false; + break; + } + } + recent_events.insert(event_key, now); + } + + if !should_process { + continue; + } + + // Clean up old entries + recent_events + .retain(|_, &mut time| now.duration_since(time) < Duration::from_secs(1)); + + // Check restart state + let should_skip = { + let state = restart_state.lock(); + if let Some(restart_time) = *state { + now.duration_since(restart_time) < Duration::from_millis(500) + } else { + false + } + }; + + if should_skip { + continue; + } + + // Process commands from parent + let command_to_process = { + let mut command_guard = command_channel.lock(); + let cmd = *command_guard; + *command_guard = None; + cmd + }; + + if let Some(cmd) = command_to_process { + match cmd { + WatcherCommand::ConfigError => { + info!("Received config error notification, resuming file watching"); + *restart_state.lock() = None; + for group in &mut groups { + group.last_triggered = None; + } + recent_events.clear(); + } + WatcherCommand::Continue => { + info!("Received continue notification, resuming file watching"); + *restart_state.lock() = None; + } + WatcherCommand::Stop => { /* Handled in command thread */ } + } + } + + // Process file events for group in &mut groups { + // Apply debounce + if let Some(last_triggered) = group.last_triggered { + if now.duration_since(last_triggered) < DEBOUNCE_DURATION { + continue; + } + } + for path in event.paths.iter() { - if let Ok(rel_path) = path.strip_prefix(&group.base_dir) { - if group.glob_set.is_match(rel_path) - || rel_path.to_str().is_some_and(|s| s == group.pattern) - { - debug!("Matched pattern: {:?}", group.pattern); - // Check if we should debounce this event - if let Some(last_triggered) = group.last_triggered { - if now.duration_since(last_triggered) < DEBOUNCE_DURATION { - // Skip this event as we've recently triggered for this pattern - continue; - } + let matches = if group.pattern.is_empty() { + path == &group.base_dir + } else if let Ok(rel_path) = path.strip_prefix(&group.base_dir) { + group.glob_set.is_match(rel_path) + } else { + false + }; + + if matches { + group.last_triggered = Some(now); + + // Execute commands + for command in &group.commands { + if command.is_empty() { + continue; } - // Update the last triggered time - group.last_triggered = Some(now); + // Check for shell command or restart/reload + let is_shell_command = command.len() == 1 + && (command[0].contains("&&") + || command[0].contains("||") + || command[0].contains("|") + || command[0].contains(";")); + + let is_restart = command + .windows(2) + .any(|w| w[0] == "itsi" && w[1] == "restart") + || (is_shell_command + && command[0].contains("itsi restart")); + + let is_reload = command + .windows(2) + .any(|w| w[0] == "itsi" && w[1] == "reload") + || (is_shell_command && command[0].contains("itsi reload")); + + // Handle restart/reload + if is_restart || is_reload { + let cmd_type = + if is_restart { "restart" } else { "reload" }; + let mut should_run = false; + + { + let mut state = restart_state.lock(); + if let Some(last_time) = *state { + if now.duration_since(last_time) + < Duration::from_secs(3) + { + info!( + "Ignoring {} command - too soon", + cmd_type + ); + } else { + *state = Some(now); + should_run = true; + } + } else { + *state = Some(now); + should_run = true; + } + } - // Execute the commands for this group. - for command in &group.commands { - if command.is_empty() { + if !should_run { continue; } + + // Notify parent (optional) + let _ = write(&child_write_fd_clone, &[3]); + } + + // Build and execute command + let mut cmd = if is_shell_command { + let mut shell_cmd = Command::new("sh"); + shell_cmd.arg("-c").arg(command.join(" ")); + shell_cmd + } else { let mut cmd = Command::new(&command[0]); if command.len() > 1 { cmd.args(&command[1..]); } - debug!( - "Executing command: {:?} due to change in {:?}", - command, path - ); - match cmd.spawn() { - Ok(mut child) => { - if let Err(e) = child.wait() { - eprintln!( - "Command {:?} failed: {:?}", - command, e - ); - } + cmd + }; + + match cmd.spawn() { + Ok(mut child) => { + if let Err(e) = child.wait() { + error!("Command {:?} failed: {:?}", command, e); } - Err(e) => { - eprintln!( - "Failed to execute command {:?}: {:?}", - command, e - ); + + if is_restart || is_reload { + info!("Itsi command submitted, waiting for parent response"); + + // Set auto-recovery timer + let restart_state_clone = + Arc::clone(&restart_state); + let now_clone = now; + thread::spawn(move || { + thread::sleep(AUTO_RECOVERY_TIMEOUT); + let mut state = restart_state_clone.lock(); + if let Some(restart_time) = *state { + if now_clone.duration_since(restart_time) + < Duration::from_secs(1) + { + info!("Auto-recovering from potential restart failure"); + *state = None; + } + } + }); } } + Err(e) => { + error!( + "Failed to execute command {:?}: {:?}", + command, e + ); + } } - break; } + break; } } } } - Err(e) => println!("Watch error: {:?}", e), + Err(e) => error!("Watch error: {:?}", e), } } - // Clean up the watches. - for group in &groups { - watcher - .unwatch(&group.base_dir) - .expect("Failed to remove watch"); - } + // Clean up drop(watcher); std::process::exit(0); } else { - let _ = close(r_fd.into_raw_fd()); - Ok(Some(w_fd)) + // Parent process - close the child ends of the pipes + let _ = close(child_read_fd.into_raw_fd()); + let _ = close(child_write_fd.into_raw_fd()); + + // Create a paired structure to return + let watcher_pipes = WatcherPipes { + read_fd: parent_read_fd, + write_fd: parent_write_fd, + }; + + Ok(Some(watcher_pipes)) + } +} + +#[cfg(test)] +mod tests { + use std::env; + + use super::*; + + #[test] + fn test_extract_patterns() { + // Save current dir to restore later + let original_dir = env::current_dir().unwrap(); + + // Create a temp dir and work from there for consistent results + let temp_dir = env::temp_dir().join("itsi_test_patterns"); + let _ = fs::create_dir_all(&temp_dir); + env::set_current_dir(&temp_dir).unwrap(); + + // Test glob patterns + let (base, pattern) = extract_and_canonicalize_base_dir("assets/*/**.tsx"); + assert!(base.ends_with("assets")); + assert_eq!(pattern, "*/**.tsx"); + + let (base, pattern) = extract_and_canonicalize_base_dir("./assets/*/**.tsx"); + assert!(base.ends_with("assets")); + assert_eq!(pattern, "*/**.tsx"); + + // Test non-glob patterns - exact files should have empty pattern + let (base, pattern) = extract_and_canonicalize_base_dir("foo/bar.txt"); + assert!(base.ends_with("bar.txt")); + assert_eq!(pattern, ""); + + // Test current directory patterns + let (base, pattern) = extract_and_canonicalize_base_dir("*.txt"); + assert_eq!(base, temp_dir.canonicalize().unwrap()); + assert_eq!(pattern, "*.txt"); + + // Test file in current directory + let (base, pattern) = extract_and_canonicalize_base_dir("test.txt"); + assert!(base.ends_with("test.txt")); + assert_eq!(pattern, ""); + + // Restore original directory and clean up + env::set_current_dir(original_dir).unwrap(); + let _ = fs::remove_dir_all(&temp_dir); + } + + #[test] + fn test_watcher_commands() { + assert_eq!(serialize_command(WatcherCommand::Stop), 0); + assert_eq!(serialize_command(WatcherCommand::ConfigError), 1); + assert_eq!(serialize_command(WatcherCommand::Continue), 2); + + assert_eq!(deserialize_command(0), Some(WatcherCommand::Stop)); + assert_eq!(deserialize_command(1), Some(WatcherCommand::ConfigError)); + assert_eq!(deserialize_command(2), Some(WatcherCommand::Continue)); + assert_eq!(deserialize_command(99), None); } } diff --git a/crates/itsi_server/src/ruby_types/itsi_server/itsi_server_config.rs b/crates/itsi_server/src/ruby_types/itsi_server/itsi_server_config.rs index bd9d0aae..9732c5de 100644 --- a/crates/itsi_server/src/ruby_types/itsi_server/itsi_server_config.rs +++ b/crates/itsi_server/src/ruby_types/itsi_server/itsi_server_config.rs @@ -1,4 +1,4 @@ -use super::file_watcher::{self}; +use super::file_watcher::{self, WatcherCommand}; use crate::{ ruby_types::ITSI_SERVER_CONFIG, server::{ @@ -9,7 +9,7 @@ use crate::{ use derive_more::Debug; use itsi_error::ItsiError; use itsi_rb_helpers::{call_with_gvl, print_rb_backtrace, HeapValue}; -use itsi_tracing::{set_format, set_level, set_target, set_target_filters}; +use itsi_tracing::{error, set_format, set_level, set_target, set_target_filters}; use magnus::{ block::Proc, error::Result, @@ -18,12 +18,12 @@ use magnus::{ }; use nix::{ fcntl::{fcntl, FcntlArg, FdFlag}, - unistd::{close, dup}, + unistd::dup, }; use parking_lot::{Mutex, RwLock}; use std::{ collections::HashMap, - os::fd::{AsRawFd, OwnedFd, RawFd}, + os::fd::RawFd, path::PathBuf, str::FromStr, sync::{ @@ -32,7 +32,7 @@ use std::{ }, time::Duration, }; -use tracing::{debug, error}; +use tracing::debug; static DEFAULT_BIND: &str = "http://localhost:3000"; static ID_BUILD_CONFIG: LazyId = LazyId::new("build_config"); static ID_RELOAD_EXEC: LazyId = LazyId::new("reload_exec"); @@ -44,7 +44,7 @@ pub struct ItsiServerConfig { pub itsi_config_proc: Arc>>, #[debug(skip)] pub server_params: Arc>>, - pub watcher_fd: Arc>, + pub watcher_fd: Arc>, } #[derive(Debug)] @@ -84,7 +84,7 @@ pub struct ServerParams { listener_info: Mutex>, pub itsi_server_token_preference: ItsiServerTokenPreference, pub preloaded: AtomicBool, - socket_opts: SocketOpts, + pub socket_opts: SocketOpts, preexisting_listeners: Option, } @@ -442,6 +442,10 @@ impl ItsiServerConfig { } } + pub fn use_reuse_port_load_balancing(&self) -> bool { + cfg!(target_os = "linux") && self.server_params.read().socket_opts.reuse_port + } + /// Reload pub fn reload(self: Arc, cluster_worker: bool) -> Result { let server_params = call_with_gvl(|ruby| { @@ -553,6 +557,9 @@ impl ItsiServerConfig { } pub fn dup_fds(self: &Arc) -> Result<()> { + // Ensure the watcher is already stopped before duplicating file descriptors + // to prevent race conditions between closing the watcher FD and duplicating socket FDs + let binding = self.server_params.read(); let mut listener_info_guard = binding.listener_info.lock(); let dupped_fd_map = listener_info_guard @@ -578,8 +585,10 @@ impl ItsiServerConfig { } pub fn stop_watcher(self: &Arc) -> Result<()> { - if let Some(r_fd) = self.watcher_fd.as_ref() { - close(r_fd.as_raw_fd()).ok(); + if let Some(pipes) = self.watcher_fd.as_ref() { + // Send explicit stop command to the watcher process + file_watcher::send_watcher_command(&pipes.write_fd, WatcherCommand::Stop)?; + // We don't close the pipes here - they'll be closed when the WatcherPipes is dropped } Ok(()) } @@ -594,8 +603,24 @@ impl ItsiServerConfig { pub async fn check_config(&self) -> bool { if let Some(errors) = self.get_config_errors().await { Self::print_config_errors(errors); + // Notify watcher that config check failed + if let Some(pipes) = self.watcher_fd.as_ref() { + if let Err(e) = + file_watcher::send_watcher_command(&pipes.write_fd, WatcherCommand::ConfigError) + { + error!("Failed to notify watcher of config error: {}", e); + } + } return false; } + // If we reach here, the config is valid + if let Some(pipes) = self.watcher_fd.as_ref() { + if let Err(e) = + file_watcher::send_watcher_command(&pipes.write_fd, WatcherCommand::Continue) + { + error!("Failed to notify watcher to continue: {}", e); + } + } true } @@ -609,7 +634,8 @@ impl ItsiServerConfig { ) })?; - self.stop_watcher()?; + // Make sure we're not calling stop_watcher here to avoid double-stopping + // The watcher should be stopped earlier in the restart sequence call_with_gvl(|ruby| -> Result<()> { ruby.get_inner_ref(&ITSI_SERVER_CONFIG) .funcall::<_, _, Value>(*ID_RELOAD_EXEC, (listener_json,))?; diff --git a/crates/itsi_server/src/server/binds/listener.rs b/crates/itsi_server/src/server/binds/listener.rs index 7af10c89..488a6c7e 100644 --- a/crates/itsi_server/src/server/binds/listener.rs +++ b/crates/itsi_server/src/server/binds/listener.rs @@ -304,16 +304,16 @@ impl Listener { connect_tcp_socket(ip, port, &socket_opts).unwrap() } - pub fn into_tokio_listener(self, no_rebind: bool) -> TokioListener { + pub fn into_tokio_listener(self, should_rebind: bool) -> TokioListener { match self { Listener::Tcp(mut listener) => { - if cfg!(target_os = "linux") && !no_rebind { + if should_rebind { listener = Listener::rebind_listener(listener); } TokioListener::Tcp(TokioTcpListener::from_std(listener).unwrap()) } Listener::TcpTls((mut listener, acceptor)) => { - if cfg!(target_os = "linux") && !no_rebind { + if should_rebind { listener = Listener::rebind_listener(listener); } TokioListener::TcpTls( diff --git a/crates/itsi_server/src/server/serve_strategy/cluster_mode.rs b/crates/itsi_server/src/server/serve_strategy/cluster_mode.rs index dfb8ad65..6b018a44 100644 --- a/crates/itsi_server/src/server/serve_strategy/cluster_mode.rs +++ b/crates/itsi_server/src/server/serve_strategy/cluster_mode.rs @@ -100,9 +100,11 @@ impl ClusterMode { LifecycleEvent::Restart => { if self.server_config.check_config().await { self.invoke_hook("before_restart"); + self.server_config.stop_watcher()?; self.server_config.dup_fds()?; self.shutdown().await.ok(); info!("Shutdown complete. Calling reload exec"); + self.server_config.reload_exec()?; } Ok(()) @@ -111,8 +113,11 @@ impl ClusterMode { if !self.server_config.check_config().await { return Ok(()); } + let should_reexec = self.server_config.clone().reload(true)?; + if should_reexec { + self.server_config.stop_watcher()?; self.server_config.dup_fds()?; self.shutdown().await.ok(); self.server_config.reload_exec()?; @@ -321,15 +326,6 @@ impl ClusterMode { .iter() .try_for_each(|worker| worker.boot(Arc::clone(&self)))?; - if cfg!(target_os = "linux") { - self.server_config - .server_params - .write() - .listeners - .lock() - .drain(..); - }; - let (sender, mut receiver) = watch::channel(()); *CHILD_SIGNAL_SENDER.lock() = Some(sender); diff --git a/crates/itsi_server/src/server/serve_strategy/single_mode.rs b/crates/itsi_server/src/server/serve_strategy/single_mode.rs index 2595dfaf..d5849564 100644 --- a/crates/itsi_server/src/server/serve_strategy/single_mode.rs +++ b/crates/itsi_server/src/server/serve_strategy/single_mode.rs @@ -262,7 +262,14 @@ impl SingleMode { let shutdown_timeout = self.server_config.server_params.read().shutdown_timeout; let (shutdown_sender, _) = watch::channel(RunningPhase::Running); let monitor_thread = self.clone().start_monitors(thread_workers.clone()); + + // If we're on Linux with reuse_port enabled, we can use + // kernel level load balancing across processes sharing a port. + // To take advantage of this, these forks will rebind to the same port upon boot. + // Worker 0 is special (this one just inherits the bind from the master process). let is_zero_worker = self.is_zero_worker(); + let should_rebind = !is_zero_worker && self.server_config.use_reuse_port_load_balancing(); + if monitor_thread.is_none() { error!("Failed to start monitor thread"); return Err(ItsiError::new("Failed to start monitor thread")); @@ -283,7 +290,7 @@ impl SingleMode { .listeners .lock() .drain(..) - .map(|list| Arc::new(list.into_tokio_listener(is_zero_worker))) + .map(|list| Arc::new(list.into_tokio_listener(should_rebind))) .collect::>(); tokio_listeners.iter().cloned().for_each(|listener| { @@ -311,7 +318,7 @@ impl SingleMode { let mut after_accept_wait: Option = None::; if cfg!(target_os = "macos") { - after_accept_wait = if server_params.workers > 1 { + after_accept_wait = if server_params.workers > 1 && !(server_params.socket_opts.reuse_port && server_params.socket_opts.reuse_address) { Some(Duration::from_nanos(10 * server_params.workers as u64)) } else { None @@ -434,6 +441,7 @@ impl SingleMode { if self.is_single_mode() { self.invoke_hook("before_restart"); } + self.server_config.stop_watcher()?; self.server_config.dup_fds()?; self.server_config.reload_exec()?; Ok(()) diff --git a/docs/content/acknowledgements/_index.md b/docs/content/acknowledgements/_index.md index ec60c10a..881c5762 100644 --- a/docs/content/acknowledgements/_index.md +++ b/docs/content/acknowledgements/_index.md @@ -34,7 +34,7 @@ It's mature, stable and rock solid. Many features and interfaces of Itsi have be It's highly scalable and a great choice for high-traffic websites. Many of Itsi's proxy and static file server design decisions and features have been inspired by their NGINX equivalents. -* The [Async](https://github.com/socketry/async) ecosystem and [Falcon](https://github.com/socketry/falcon), championed by [@ioaquatix](https://github.com/ioquatix) - a fellow Kiwi. +* The [Async](https://github.com/socketry/async) ecosystem and [Falcon](https://github.com/socketry/falcon), championed by [@ioquatix](https://github.com/ioquatix) - a fellow Kiwi. These tools and efforts in driving forward Ruby's cooperative multitasking have been a great inspiration and source of learning for Itsi's async IO design. * [Iodine](https://github.com/boazsegev/iodine), [Agoo](https://github.com/ohler55/agoo). Two class-leading options when it comes to blazing fast Ruby servers written in native code. diff --git a/docs/content/itsi_scheduler/_index.md b/docs/content/itsi_scheduler/_index.md index c498e177..c13a67bc 100644 --- a/docs/content/itsi_scheduler/_index.md +++ b/docs/content/itsi_scheduler/_index.md @@ -105,8 +105,8 @@ to run many blocking operations simultaneously all while occupying only a single ### 3 (Optional) - Enable Scheduler Refinements You can opt-in to a tiny set of Ruby refinements provided by the `Itsi::Scheduler` to make usage even more ergonomic. -By opting in to this refinement (using `using Itsi::Scheduler`) you gain access to the top-level #schedule(&block) method, as well -as enumerable methods #schedule_each, and #schedule_map. +By opting in to this refinement (using `using Itsi::ScheduleRefinement`) you gain access to the top-level `#schedule(&block)` method, as well +as enumerable methods `#schedule_each`, and `#schedule_map`. ```ruby require 'net/http' diff --git a/docs/hugo.yaml b/docs/hugo.yaml index a9be6230..a7a3bf42 100644 --- a/docs/hugo.yaml +++ b/docs/hugo.yaml @@ -71,7 +71,13 @@ menu: params: navbar: displayTitle: true - displayLogo: false + displayLogo: true + logo: + path: favicon.svg + dark: favicon-96x96.png + link: / + width: 35 + height: 15 footer: displayCopyright: false diff --git a/gems/scheduler/Cargo.lock b/gems/scheduler/Cargo.lock index dcf8f3a8..b6afcc8d 100644 --- a/gems/scheduler/Cargo.lock +++ b/gems/scheduler/Cargo.lock @@ -213,7 +213,7 @@ checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" [[package]] name = "itsi-scheduler" -version = "0.2.17" +version = "0.2.18" dependencies = [ "bytes", "derive_more", diff --git a/gems/scheduler/lib/itsi/scheduler/version.rb b/gems/scheduler/lib/itsi/scheduler/version.rb index d747b526..20a7b299 100644 --- a/gems/scheduler/lib/itsi/scheduler/version.rb +++ b/gems/scheduler/lib/itsi/scheduler/version.rb @@ -2,6 +2,6 @@ module Itsi class Scheduler - VERSION = "0.2.17" + VERSION = "0.2.18" end end diff --git a/gems/server/Cargo.lock b/gems/server/Cargo.lock index ea181405..df68f5ca 100644 --- a/gems/server/Cargo.lock +++ b/gems/server/Cargo.lock @@ -1644,7 +1644,7 @@ checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" [[package]] name = "itsi-server" -version = "0.2.17" +version = "0.2.18" dependencies = [ "argon2", "async-channel", diff --git a/gems/server/exe/itsi b/gems/server/exe/itsi index 25c75c88..f4293e1f 100755 --- a/gems/server/exe/itsi +++ b/gems/server/exe/itsi @@ -4,7 +4,6 @@ require "itsi/server" require "optparse" - COMMANDS = { "init" => "Initialize a new Itsi.rb server configuration file", "status" => "Show the status of the server", @@ -119,7 +118,6 @@ parser = OptionParser.new do |opts| options[:shutdown_timeout] = shutdown_timeout end - opts.on("--stream-body", TrueClass, "Stream body frames (default: false for best compatibility)") do |stream_body| options[:stream_body] = stream_body end @@ -159,7 +157,7 @@ parser = OptionParser.new do |opts| end end -if ENV['COMP_LINE'] || ARGV.include?('--completion') +if ENV["COMP_LINE"] || ARGV.include?("--completion") puts COMMANDS.keys exit end @@ -173,7 +171,7 @@ end case (command = ARGV.shift) when *COMMANDS.keys - required_arity = Itsi::Server.method(command).parameters&.select{|c| c.first == :req }&.length&.succ || 2 + required_arity = Itsi::Server.method(command).parameters&.select { |c| c.first == :req }&.length&.succ || 2 case required_arity when 1 then Itsi::Server.send(command) when 2 then Itsi::Server.send(command, options) diff --git a/gems/server/lib/itsi/server/config.rb b/gems/server/lib/itsi/server/config.rb index e7376514..79ec76a2 100644 --- a/gems/server/lib/itsi/server/config.rb +++ b/gems/server/lib/itsi/server/config.rb @@ -97,7 +97,7 @@ def self.build_config(args, config_file_path, builder_proc = nil) errors << [e, e.backtrace[0]] end # If we're just preloading a specific gem group, we'll do that here too - when Symbol + when Symbol, String Itsi.log_debug("Preloading gem group #{preload}") Bundler.require(preload) end diff --git a/gems/server/lib/itsi/server/config/options/auto_reload_config.rb b/gems/server/lib/itsi/server/config/options/auto_reload_config.rb index eaf31642..b4bf4582 100644 --- a/gems/server/lib/itsi/server/config/options/auto_reload_config.rb +++ b/gems/server/lib/itsi/server/config/options/auto_reload_config.rb @@ -2,9 +2,8 @@ module Itsi class Server module Config class AutoReloadConfig < Option - insert_text <<~SNIPPET - auto_reload_config! # Auto-reload the server configuration each time it changes. + auto_reload_config! # Auto-reload the server configuration each time it changes. SNIPPET detail "Auto-reload the server configuration each time it changes." @@ -15,18 +14,20 @@ def self.option_name def build! return if @auto_reloading - src = caller.find{|l| !(l =~ /lib\/itsi\/server\/config/) }.split(":").first + + src = caller.find { |l| !(l =~ %r{lib/itsi/server/config}) }.split(":").first location.instance_eval do return if @auto_reloading if @included @included.each do |file| - next if "#{file}.rb" == src + next if "#{file}" == src + if ENV["BUNDLE_BIN_PATH"] - watch "#{file}.rb", [%w[bundle exec itsi restart]] + watch "#{file}", [%w[bundle exec itsi restart]] else - watch "#{file}.rb", [%w[itsi restart]] + watch "#{file}", [%w[itsi restart]] end end end diff --git a/gems/server/lib/itsi/server/config/options/include.md b/gems/server/lib/itsi/server/config/options/include.md index c16db081..86a5b880 100644 --- a/gems/server/lib/itsi/server/config/options/include.md +++ b/gems/server/lib/itsi/server/config/options/include.md @@ -7,6 +7,7 @@ Use the `include` option to load additional files to be evaluated within the cur You can use this option to split a large configuration file into multiple smaller files. Files required using `include` are also subject to auto-reloading, when using the [auto_reload_config](/options/auto_reload_config) option. +The path of the included file is evaluated relative to the current configuration file. ## Examples ```ruby {filename="Itsi.rb"} diff --git a/gems/server/lib/itsi/server/config/options/include.rb b/gems/server/lib/itsi/server/config/options/include.rb index 452aa2d9..b4cc97a8 100644 --- a/gems/server/lib/itsi/server/config/options/include.rb +++ b/gems/server/lib/itsi/server/config/options/include.rb @@ -2,37 +2,39 @@ module Itsi class Server module Config class Include < Option - insert_text "include \"${1|other_file|}\" # Include another file to be loaded within the current configuration" detail "Include another file to be loaded within the current configuration" schema do - Type(String) + Type(String) & Required() end def build! - included_file = @params + caller_location = caller_locations(2, 1).first.path + included_file = \ + if caller_location =~ %r{lib/itsi/server} + File.expand_path("#{@params}.rb") + else + File.expand_path("#{@params}.rb", File.dirname(caller_location)) + end + location.instance_eval do @included ||= [] @included << included_file if @auto_reloading if ENV["BUNDLE_BIN_PATH"] - watch "#{included_file}.rb", [%w[bundle exec itsi restart]] + watch "#{included_file}", [%w[bundle exec itsi restart]] else - watch "#{included_file}.rb", [%w[itsi restart]] + watch "#{included_file}", [%w[itsi restart]] end end end - filename = File.expand_path("#{included_file}.rb") - - code = IO.read(filename) - location.instance_eval(code, filename, 1) - + code = IO.read(included_file) + location.instance_eval(code, included_file, 1) end - end end end diff --git a/gems/server/lib/itsi/server/config/options/reuse_port.rb b/gems/server/lib/itsi/server/config/options/reuse_port.rb index d08eb1a1..3de22d8e 100644 --- a/gems/server/lib/itsi/server/config/options/reuse_port.rb +++ b/gems/server/lib/itsi/server/config/options/reuse_port.rb @@ -2,17 +2,15 @@ module Itsi class Server module Config class ReusePort < Option - insert_text <<~SNIPPET - reuse_port ${1|true,false|} + reuse_port ${1|true,false|} SNIPPET detail "Configures whether the server should set the reuse_port option on the underlying socket." schema do - (Bool() & Required()).default(false) + (Bool() & Required()).default(true) end - end end end diff --git a/gems/server/lib/itsi/server/default_config/Itsi.rb b/gems/server/lib/itsi/server/default_config/Itsi.rb index 4d466f28..a622046b 100644 --- a/gems/server/lib/itsi/server/default_config/Itsi.rb +++ b/gems/server/lib/itsi/server/default_config/Itsi.rb @@ -10,7 +10,7 @@ # Number of worker processes to spawn # If more than 1, Itsi will be booted in Cluster mode -workers ENV["ITSI_WORKERS"]&.to_i || env == "development" ? 1 : nil +workers ENV["ITSI_WORKERS"]&.to_i || (env == "development" ? 1 : nil) # Number of threads to spawn per worker process # For pure CPU bound applicationss, you'll get the best results keeping this number low @@ -27,11 +27,13 @@ fiber_scheduler nil # If you bind to https, without specifying a certificate, Itsi will use a self-signed certificate. -# The self-signed certificate will use a CA generated for your host and stored inside `ITSI_LOCAL_CA_DIR` (Defaults to ~/.itsi) +# The self-signed certificate will use a CA generated for your +# host and stored inside `ITSI_LOCAL_CA_DIR` (Defaults to ~/.itsi) # bind "https://0.0.0.0:3000" # bind "https://0.0.0.0:3000?domains=dev.itsi.fyi" # -# If you want to use let's encrypt to generate you a real certificate you and pass cert=acme and an acme_email address to generate one. +# If you want to use let's encrypt to generate you a real certificate you +# and pass cert=acme and an acme_email address to generate one. # bind "https://itsi.fyi?cert=acme&acme_email=admin@itsi.fyi" # You can generate certificates for multiple domains at once, by passing a comma-separated list of domains # bind "https://0.0.0.0?domains=foo.itsi.fyi,bar.itsi.fyi&cert=acme&acme_email=admin@itsi.fyi" @@ -68,7 +70,8 @@ # all of them at once, if they reach the threshold simultaneously. worker_memory_limit 1024 * 1024 * 1024 -# You can provide an optional block of code to run, when a worker hits its memory threshold (Use this to send yourself an alert, +# You can provide an optional block of code to run, when a worker hits its memory threshold +# (Use this to send yourself an alert, # write metrics to disk etc. etc.) after_memory_limit_reached do |pid| puts "Worker #{pid} has reached its memory threshold and will restart" @@ -85,7 +88,8 @@ shutdown_timeout 5 # Set this to false for application environments that require rack.input to be a rewindable body -# (like Rails). For rack applications that can stream inputs, you can set this to true for a more memory-efficient approach. +# (like Rails). For rack applications that can stream inputs, you can set this to true for a more +# memory-efficient approach. stream_body false # OOB GC responses threshold diff --git a/gems/server/lib/itsi/server/version.rb b/gems/server/lib/itsi/server/version.rb index ee5359f2..634d1022 100644 --- a/gems/server/lib/itsi/server/version.rb +++ b/gems/server/lib/itsi/server/version.rb @@ -2,6 +2,6 @@ module Itsi class Server - VERSION = "0.2.17" + VERSION = "0.2.18" end end diff --git a/itsi.gemspec b/itsi.gemspec index 9aa15f43..7b52a3bf 100644 --- a/itsi.gemspec +++ b/itsi.gemspec @@ -33,6 +33,6 @@ Gem::Specification.new do |spec| spec.require_paths = ['lib'] - spec.add_dependency 'itsi-scheduler', '~> 0.2.17' - spec.add_dependency 'itsi-server', '~> 0.2.17' + spec.add_dependency 'itsi-scheduler', '~> 0.2.18' + spec.add_dependency 'itsi-server', '~> 0.2.18' end diff --git a/lib/itsi/version.rb b/lib/itsi/version.rb index 21b43b8c..0d243a0f 100644 --- a/lib/itsi/version.rb +++ b/lib/itsi/version.rb @@ -1,3 +1,3 @@ module Itsi - VERSION = '0.2.17' + VERSION = '0.2.18' end