diff --git a/notify/src/config.rs b/notify/src/config.rs index 9054b401..417882ef 100644 --- a/notify/src/config.rs +++ b/notify/src/config.rs @@ -86,7 +86,7 @@ pub enum TargetMode { /// If the underlying physical entity (inode/File ID) is replaced /// (e.g., by a move/rename operation), the watch stops monitoring. /// - /// TODO: fsevents backend and Windows backend does not unwatch on physical entity change yet. + /// TODO: fsevents backend and Windows backend and polling backend does not unwatch on physical entity change yet. NoTrack, } diff --git a/notify/src/consolidating_path_trie.rs b/notify/src/consolidating_path_trie.rs index 50ad8810..6c731de3 100644 --- a/notify/src/consolidating_path_trie.rs +++ b/notify/src/consolidating_path_trie.rs @@ -172,14 +172,17 @@ impl<'a, T> PathTrieIter<'a, T> { } pub struct ConsolidatingPathTrie { + children_consolidation: bool, + trie: PathTrie<()>, } impl ConsolidatingPathTrie { const CHILDREN_CONSOLIDATION_THRESHOLD: usize = 10; - pub fn new() -> Self { + pub fn new(children_consolidation: bool) -> Self { Self { + children_consolidation, trie: PathTrie::new(), } } @@ -192,14 +195,16 @@ impl ConsolidatingPathTrie { let inserted = self.trie.insert(path, ()); inserted.remove_children(); - for ancestor_path in path.ancestors().skip(1) { - if let Some(parent_node) = self.trie.get_node_mut(ancestor_path) - && parent_node.children_len() >= Self::CHILDREN_CONSOLIDATION_THRESHOLD - { - parent_node.remove_children(); - parent_node.set_value(()); - } else { - break; + if self.children_consolidation { + for ancestor_path in path.ancestors().skip(1) { + if let Some(parent_node) = self.trie.get_node_mut(ancestor_path) + && parent_node.children_len() >= Self::CHILDREN_CONSOLIDATION_THRESHOLD + { + parent_node.remove_children(); + parent_node.set_value(()); + } else { + break; + } } } } @@ -235,54 +240,68 @@ mod tests { #[test] fn consolidate_no_siblings() { - let mut ct = ConsolidatingPathTrie::new(); - ct.insert(PathBuf::from("/a/b")); - ct.insert(PathBuf::from("/a/c")); - assert_eq!( - ct.values(), - vec![PathBuf::from("/a/b"), PathBuf::from("/a/c")] - ); + for children_consolidation in [true, false] { + let mut ct = ConsolidatingPathTrie::new(children_consolidation); + ct.insert(PathBuf::from("/a/b")); + ct.insert(PathBuf::from("/a/c")); + assert_eq!( + ct.values(), + vec![PathBuf::from("/a/b"), PathBuf::from("/a/c")] + ); + } } #[test] fn consolidate_no_siblings2() { - let mut ct = ConsolidatingPathTrie::new(); - ct.insert(PathBuf::from("/a/b1")); - ct.insert(PathBuf::from("/a/b2")); - assert_eq!( - ct.values(), - vec![PathBuf::from("/a/b1"), PathBuf::from("/a/b2")] - ); + for children_consolidation in [true, false] { + let mut ct = ConsolidatingPathTrie::new(children_consolidation); + ct.insert(PathBuf::from("/a/b1")); + ct.insert(PathBuf::from("/a/b2")); + assert_eq!( + ct.values(), + vec![PathBuf::from("/a/b1"), PathBuf::from("/a/b2")] + ); + } } #[test] fn consolidate_children() { - let mut ct = ConsolidatingPathTrie::new(); - ct.insert(PathBuf::from("/a/b")); - ct.insert(PathBuf::from("/a/b/c")); - assert_eq!(ct.values(), vec![PathBuf::from("/a/b")]); + for children_consolidation in [true, false] { + let mut ct = ConsolidatingPathTrie::new(children_consolidation); + ct.insert(PathBuf::from("/a/b")); + ct.insert(PathBuf::from("/a/b/c")); + assert_eq!(ct.values(), vec![PathBuf::from("/a/b")]); + } } #[test] fn consolidate_parent() { - let mut ct = ConsolidatingPathTrie::new(); - ct.insert(PathBuf::from("/a/b/c")); - ct.insert(PathBuf::from("/a/b")); - assert_eq!(ct.values(), vec![PathBuf::from("/a/b")]); + for children_consolidation in [true, false] { + let mut ct = ConsolidatingPathTrie::new(children_consolidation); + ct.insert(PathBuf::from("/a/b/c")); + ct.insert(PathBuf::from("/a/b")); + assert_eq!(ct.values(), vec![PathBuf::from("/a/b")]); + } } #[test] fn consolidate_to_single_parent() { - let mut cr = ConsolidatingPathTrie::new(); + let mut cr = ConsolidatingPathTrie::new(true); for i in 1..=ConsolidatingPathTrie::CHILDREN_CONSOLIDATION_THRESHOLD { cr.insert(PathBuf::from(format!("/a/b/c{i}"))); } assert_eq!(cr.values(), vec![PathBuf::from("/a/b")]); + + let mut cr = ConsolidatingPathTrie::new(false); + for i in 1..=ConsolidatingPathTrie::CHILDREN_CONSOLIDATION_THRESHOLD { + cr.insert(PathBuf::from(format!("/a/b/c{i}"))); + } + assert!(cr.values().len() > 1); } #[test] fn consolidate_to_single_parent_nested1() { - let mut cr = ConsolidatingPathTrie::new(); + let mut cr = ConsolidatingPathTrie::new(true); for i in 1..ConsolidatingPathTrie::CHILDREN_CONSOLIDATION_THRESHOLD { cr.insert(PathBuf::from(format!("/a/b/c{i}"))); } @@ -294,7 +313,7 @@ mod tests { #[test] fn consolidate_to_single_parent_nested2() { - let mut cr = ConsolidatingPathTrie::new(); + let mut cr = ConsolidatingPathTrie::new(true); cr.insert(PathBuf::from("/a/b/c1")); cr.insert(PathBuf::from("/a/b/c2")); for i in 1..=ConsolidatingPathTrie::CHILDREN_CONSOLIDATION_THRESHOLD { diff --git a/notify/src/fsevent.rs b/notify/src/fsevent.rs index 57b0baa5..51021ea8 100644 --- a/notify/src/fsevent.rs +++ b/notify/src/fsevent.rs @@ -364,7 +364,7 @@ impl FsEventWatcher { fn update_paths_based_on_watches(&mut self) { let paths_to_watch = { - let mut trie = ConsolidatingPathTrie::new(); + let mut trie = ConsolidatingPathTrie::new(true); for path in self.watches.keys() { trie.insert(path.clone()); } diff --git a/notify/src/lib.rs b/notify/src/lib.rs index 64deab53..ef7b70d6 100644 --- a/notify/src/lib.rs +++ b/notify/src/lib.rs @@ -234,7 +234,6 @@ pub mod poll; mod bimap; mod config; -#[cfg(all(target_os = "macos", not(feature = "macos_kqueue")))] mod consolidating_path_trie; mod error; diff --git a/notify/src/poll.rs b/notify/src/poll.rs index 2ce9710a..fd11c847 100644 --- a/notify/src/poll.rs +++ b/notify/src/poll.rs @@ -3,15 +3,13 @@ //! Checks the `watch`ed paths periodically to detect changes. This implementation only uses //! Rust stdlib APIs and should work on all of the platforms it supports. -use crate::{Config, Error, EventHandler, Receiver, Sender, WatchMode, Watcher, unbounded}; +use crate::{ + Config, Error, EventHandler, PathsMut, Receiver, Result, Sender, WatchMode, Watcher, + poll::data::WatchData, unbounded, +}; use std::{ - collections::HashMap, path::{Path, PathBuf}, - sync::{ - Arc, Mutex, - atomic::{AtomicBool, Ordering}, - mpsc, - }, + sync::mpsc, thread, time::Duration, }; @@ -70,17 +68,18 @@ impl ScanEventHandler for () { fn handle_event(&mut self, _event: ScanEvent) {} } -use data::{DataBuilder, WatchData}; +use data::DataBuilder; mod data { use crate::{ - EventHandler, + Error, EventHandler, Result, WatchMode, + consolidating_path_trie::ConsolidatingPathTrie, event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind}, }; use std::{ cell::RefCell, collections::{HashMap, hash_map::RandomState}, fmt::{self, Debug}, - fs::{self, File, Metadata}, + fs::{File, Metadata}, hash::{BuildHasher, Hasher}, io::{self, Read}, path::{Path, PathBuf}, @@ -143,19 +142,6 @@ mod data { self.now = Instant::now(); } - /// Create [`WatchData`]. - /// - /// This function will return `Err(_)` if can not retrieve metadata from - /// the path location. (e.g., not found). - pub(super) fn build_watch_data( - &self, - root: PathBuf, - is_recursive: bool, - follow_symlinks: bool, - ) -> Option { - WatchData::new(self, root, is_recursive, follow_symlinks) - } - /// Create [`PathData`]. fn build_path_data(&self, meta_path: &MetaPath) -> PathData { PathData::new(self, meta_path) @@ -171,67 +157,109 @@ mod data { } } + #[derive(Debug)] + struct WatchHandlers { + current: HashMap, + next: HashMap, + is_stale: bool, + } + + impl WatchHandlers { + fn new() -> Self { + Self { + current: HashMap::new(), + next: HashMap::new(), + is_stale: false, + } + } + + /// Recalculate from `watches`. + fn recalculate(&mut self, watches: &HashMap) { + self.next.clear(); + self.is_stale = true; + + let mut trie = ConsolidatingPathTrie::new(false); + for (path, mode) in watches { + if mode.recursive_mode == crate::RecursiveMode::Recursive { + trie.insert(path); + } + } + // insert non-recursive watches that are not covered by recursive watches + for (path, mode) in watches { + if mode.recursive_mode != crate::RecursiveMode::Recursive { + self.next.insert(path.clone(), false); + } + } + // insert recursive watches + for path in trie.values() { + self.next.insert(path, true); + } + } + + fn use_handlers( + &mut self, + ) -> ( + &HashMap, + Option>, + ) { + if self.is_stale { + let old_next = std::mem::take(&mut self.next); + let old_current = std::mem::replace(&mut self.current, old_next); + self.is_stale = false; + return (&self.current, Some(old_current)); + } + (&self.current, None) + } + } + #[derive(Debug)] pub(super) struct WatchData { // config part, won't change. - root: PathBuf, - is_recursive: bool, follow_symlinks: bool, // current status part. + watches: HashMap, + watch_handlers: WatchHandlers, all_path_data: HashMap, } impl WatchData { - /// Scan filesystem and create a new `WatchData`. - /// - /// # Side effect - /// - /// This function may send event by `data_builder.emitter`. - fn new( - data_builder: &DataBuilder, - root: PathBuf, - is_recursive: bool, - follow_symlinks: bool, - ) -> Option { - // If metadata read error at `root` path, it will emit - // a error event and stop to create the whole `WatchData`. - // - // QUESTION: inconsistent? - // - // When user try to *CREATE* a watch by `poll_watcher.watch(root, ..)`, - // if `root` path hit an io error, then watcher will reject to - // create this new watch. - // - // This may inconsistent with *POLLING* a watch. When watcher - // continue polling, io error at root path will not delete - // a existing watch. polling still working. - // - // So, consider a config file may not exists at first time but may - // create after a while, developer cannot watch it. - // - // FIXME: Can we always allow to watch a path, even file not - // found at this path? - if let Err(e) = fs::metadata(&root) { - data_builder.emitter.emit_io_err(e, Some(&root)); - return None; + /// Create a new `WatchData`. + pub fn new(follow_symlinks: bool) -> Self { + Self { + follow_symlinks, + watches: HashMap::new(), + watch_handlers: WatchHandlers::new(), + all_path_data: HashMap::new(), } + } - let all_path_data = Self::scan_all_path_data( - data_builder, - root.clone(), - is_recursive, - follow_symlinks, - true, - ) - .collect(); + pub fn add_watch(&mut self, path: PathBuf, mode: WatchMode) -> Result<()> { + if mode.target_mode == crate::TargetMode::NoTrack && !path.exists() { + return Err(crate::Error::path_not_found().add_path(path)); + } - Some(Self { - root, - is_recursive, - follow_symlinks, - all_path_data, - }) + self.watches.insert(path, mode); + self.watch_handlers.recalculate(&self.watches); + Ok(()) + } + + pub fn add_watch_multiple(&mut self, paths: Vec<(PathBuf, WatchMode)>) -> Result<()> { + for (path, mode) in paths { + if mode.target_mode == crate::TargetMode::NoTrack && !path.exists() { + return Err(crate::Error::path_not_found().add_path(path)); + } + + self.watches.insert(path, mode); + } + self.watch_handlers.recalculate(&self.watches); + Ok(()) + } + + pub fn remove_watch(&mut self, path: &Path) -> Result<()> { + self.watches.remove(path).ok_or(Error::watch_not_found())?; + self.watch_handlers.recalculate(&self.watches); + Ok(()) } /// Rescan filesystem and update this `WatchData`. @@ -240,23 +268,41 @@ mod data { /// /// This function may emit event by `data_builder.emitter`. pub(super) fn rescan(&mut self, data_builder: &DataBuilder) { + let (watch_handlers, old_watch_handlers) = self.watch_handlers.use_handlers(); + // scan current filesystem. - for (path, new_path_data) in Self::scan_all_path_data( - data_builder, - self.root.clone(), - self.is_recursive, - self.follow_symlinks, - false, - ) { + for (path, new_path_data) in + Self::scan_all_path_data(data_builder, watch_handlers, self.follow_symlinks) + { let old_path_data = self .all_path_data .insert(path.clone(), new_path_data.clone()); - // emit event - let event = - PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data)); - if let Some(event) = event { - data_builder.emitter.emit_ok(event); + let is_initial = old_watch_handlers + .as_ref() + .is_some_and(|old_watch_handlers| { + !old_watch_handlers.contains_key(&path) + && !path.ancestors().skip(1).any(|ancestor| { + old_watch_handlers + .get(ancestor) + .is_some_and(|is_recursive| *is_recursive) + }) + }); + if is_initial { + // emit initial scans + if let Some(ref emitter) = data_builder.scan_emitter { + emitter.borrow_mut().handle_event(Ok(path.clone())); + } + } else { + // emit event + let event = PathData::compare_to_event( + path, + old_path_data.as_ref(), + Some(&new_path_data), + ); + if let Some(event) = event { + data_builder.emitter.emit_ok(event); + } } } @@ -286,65 +332,66 @@ mod data { /// /// This function may emit some IO Error events by `data_builder.emitter`. fn scan_all_path_data( - data_builder: &'_ DataBuilder, - root: PathBuf, - is_recursive: bool, + data_builder: &DataBuilder, + watch_handlers: &HashMap, follow_symlinks: bool, - // whether this is an initial scan, used only for events - is_initial: bool, - ) -> impl Iterator + '_ { - tracing::trace!("rescanning {root:?}"); - // WalkDir return only one entry if root is a file (not a folder), - // so we can use single logic to do the both file & dir's jobs. - // - // See: https://docs.rs/walkdir/2.0.1/walkdir/struct.WalkDir.html#method.new - WalkDir::new(root) - .follow_links(follow_symlinks) - .max_depth(Self::dir_scan_depth(is_recursive)) - .into_iter() - .filter_map(|entry_res| match entry_res { - Ok(entry) => Some(entry), - Err(err) => { - tracing::warn!("walkdir error scanning {err:?}"); - - if let Some(io_error) = err.io_error() { - // clone an io::Error, so we have to create a new one. - let new_io_error = io::Error::new(io_error.kind(), err.to_string()); - data_builder.emitter.emit_io_err(new_io_error, err.path()); - } else { - let crate_err = - crate::Error::new(crate::ErrorKind::Generic(err.to_string())); - data_builder.emitter.emit(Err(crate_err)); - } - None - } - }) - .filter_map(move |entry| match entry.metadata() { - Ok(metadata) => { - let path = entry.into_path(); - if is_initial { - // emit initial scans - if let Some(ref emitter) = data_builder.scan_emitter { - emitter.borrow_mut().handle_event(Ok(path.clone())); + ) -> impl Iterator { + tracing::trace!("rescanning"); + + watch_handlers.iter().flat_map(move |(path, is_recursive)| { + tracing::trace!(?path, is_recursive, "scanning watch handler"); + + // WalkDir return only one entry if root is a file (not a folder), + // so we can use single logic to do the both file & dir's jobs. + // + // See: https://docs.rs/walkdir/2.0.1/walkdir/struct.WalkDir.html#method.new + WalkDir::new(path) + .follow_links(follow_symlinks) + .max_depth(if *is_recursive { usize::MAX } else { 1 }) + .into_iter() + .filter_map(|entry_res| match entry_res { + Ok(entry) => Some(entry), + Err(err) => { + tracing::warn!("walkdir error scanning {err:?}"); + + if let Some(io_error) = err.io_error() { + if io_error.kind() == io::ErrorKind::NotFound { + return None; + } + // clone an io::Error, so we have to create a new one. + let new_io_error = io::Error::new(io_error.kind(), err.to_string()); + data_builder.emitter.emit_io_err(new_io_error, err.path()); + } else { + let crate_err = + Error::new(crate::ErrorKind::Generic(err.to_string())); + data_builder.emitter.emit(Err(crate_err)); } + None + } + }) + .filter_map(move |entry| match entry.metadata() { + Ok(metadata) => { + let path = entry.into_path(); + let meta_path = MetaPath::from_parts_unchecked(path, metadata); + let data_path = data_builder.build_path_data(&meta_path); + + Some((meta_path.into_path(), data_path)) } - let meta_path = MetaPath::from_parts_unchecked(path, metadata); - let data_path = data_builder.build_path_data(&meta_path); + Err(err) => { + if let Some(io_error) = err.io_error() + && io_error.kind() == io::ErrorKind::NotFound + { + return None; + } - Some((meta_path.into_path(), data_path)) - } - Err(e) => { - // emit event. - let path = entry.into_path(); - data_builder.emitter.emit_io_err(e, Some(path)); + // emit event. + let path = entry.into_path(); + data_builder.emitter.emit_io_err(err, Some(path)); - None - } - }) - } - - fn dir_scan_depth(is_recursive: bool) -> usize { - if is_recursive { usize::MAX } else { 1 } + None + } + }) + }) } } @@ -504,6 +551,48 @@ mod data { } } +enum EventLoopMsg { + AddWatch(PathBuf, WatchMode, Sender>), + AddWatchMultiple(Vec<(PathBuf, WatchMode)>, Sender>), + RemoveWatch(PathBuf, Sender>), + #[cfg(test)] + WaitNextScan(Sender>), + /// currently used only for manual polling + Poll, + Shutdown, +} + +struct PollPathsMut<'a> { + inner: &'a mut PollWatcher, + add_paths: Vec<(PathBuf, WatchMode)>, +} +impl<'a> PollPathsMut<'a> { + fn new(watcher: &'a mut PollWatcher) -> Self { + Self { + inner: watcher, + add_paths: Vec::new(), + } + } +} +impl PathsMut for PollPathsMut<'_> { + #[tracing::instrument(level = "debug", skip(self))] + fn add(&mut self, path: &Path, watch_mode: WatchMode) -> Result<()> { + self.add_paths.push((path.to_owned(), watch_mode)); + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + fn remove(&mut self, path: &Path) -> Result<()> { + self.inner.unwatch_inner(path) + } + + #[tracing::instrument(level = "debug", skip(self))] + fn commit(self: Box) -> Result<()> { + let paths = self.add_paths; + self.inner.watch_multiple_inner(paths) + } +} + /// Polling based `Watcher` implementation. /// /// By default scans through all files and checks for changed entries based on their change date. @@ -512,14 +601,10 @@ mod data { /// See [Config] for more details. #[derive(Debug)] pub struct PollWatcher { - watches: Arc>>, - data_builder: Arc>, - want_to_stop: Arc, - /// channel to the poll loop - /// currently used only for manual polling - message_channel: Sender<()>, delay: Option, - follow_sylinks: bool, + follow_symlinks: bool, + + event_loop_tx: Sender, } impl PollWatcher { @@ -530,16 +615,37 @@ impl PollWatcher { /// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling. pub fn poll(&self) -> crate::Result<()> { - self.message_channel - .send(()) + self.event_loop_tx + .send(EventLoopMsg::Poll) .map_err(|_| Error::generic("failed to send poll message"))?; Ok(()) } + #[cfg(test)] + pub(crate) fn wait_next_scan(&self) -> crate::Result<()> { + let (tx, rx) = unbounded(); + self.event_loop_tx + .send(EventLoopMsg::WaitNextScan(tx)) + .map_err(|_| Error::generic("failed to send WaitNextScan message"))?; + rx.recv().unwrap() + } + /// Returns a sender to initiate changes detection. #[cfg(test)] pub(crate) fn poll_sender(&self) -> Sender<()> { - self.message_channel.clone() + let inner_tx = self.event_loop_tx.clone(); + let (tx, rx) = unbounded(); + thread::Builder::new() + .name("notify-rs poll loop".to_string()) + .spawn(move || { + for () in &rx { + if let Err(err) = inner_tx.send(EventLoopMsg::Poll) { + tracing::error!(?err, "failed to send poll message"); + } + } + }) + .unwrap(); + tx } /// Create a new [`PollWatcher`] with an scan event handler. @@ -559,64 +665,79 @@ impl PollWatcher { config: Config, scan_callback: Option, ) -> PollWatcher { - let data_builder = - DataBuilder::new(event_handler, config.compare_contents(), scan_callback); - let (tx, rx) = unbounded(); let poll_watcher = PollWatcher { - watches: Arc::default(), - data_builder: Arc::new(Mutex::new(data_builder)), - want_to_stop: Arc::new(AtomicBool::new(false)), delay: config.poll_interval(), - follow_sylinks: config.follow_symlinks(), - message_channel: tx, + follow_symlinks: config.follow_symlinks(), + + event_loop_tx: tx, }; - poll_watcher.run(rx); + let data_builder = + DataBuilder::new(event_handler, config.compare_contents(), scan_callback); + poll_watcher.run(rx, data_builder); poll_watcher } - fn run(&self, rx: Receiver<()>) { - let watches = Arc::clone(&self.watches); - let data_builder = Arc::clone(&self.data_builder); - let want_to_stop = Arc::clone(&self.want_to_stop); + fn run(&self, rx: Receiver, mut data_builder: DataBuilder) { let delay = self.delay; + let follow_symlinks = self.follow_symlinks; let result = thread::Builder::new() .name("notify-rs poll loop".to_string()) .spawn(move || { + let mut watch_data = WatchData::new(follow_symlinks); + loop { - if want_to_stop.load(Ordering::SeqCst) { - break; - } + data_builder.update_timestamp(); + watch_data.rescan(&data_builder); - // HINT: Make sure always lock in the same order to avoid deadlock. - // - // FIXME: inconsistent: some place mutex poison cause panic, - // some place just ignore. - if let (Ok(mut watches), Ok(mut data_builder)) = - (watches.lock(), data_builder.lock()) - { - data_builder.update_timestamp(); - - let vals = watches.values_mut(); - for watch_data in vals { - watch_data.rescan(&data_builder); - } - } // TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start)) let result = if let Some(delay) = delay { rx.recv_timeout(delay).or_else(|e| match e { - mpsc::RecvTimeoutError::Timeout => Ok(()), + mpsc::RecvTimeoutError::Timeout => Ok(EventLoopMsg::Poll), mpsc::RecvTimeoutError::Disconnected => Err(mpsc::RecvError), }) } else { rx.recv() }; - if let Err(e) = result { - tracing::error!(?e, "failed to receive poll message"); + match result { + Ok(EventLoopMsg::AddWatch(path, mode, resp_tx)) => { + let result = resp_tx.send(watch_data.add_watch(path, mode)); + if let Err(e) = result { + tracing::error!(?e, "failed to send AddWatch response"); + } + } + Ok(EventLoopMsg::AddWatchMultiple(paths, resp_tx)) => { + let result = resp_tx.send(watch_data.add_watch_multiple(paths)); + if let Err(e) = result { + tracing::error!(?e, "failed to send AddWatchMultiple response"); + } + } + Ok(EventLoopMsg::RemoveWatch(path, resp_tx)) => { + let result = resp_tx.send(watch_data.remove_watch(&path)); + if let Err(e) = result { + tracing::error!(?e, "failed to send RemoveWatch response"); + } + } + Ok(EventLoopMsg::Poll) => { + // continue the loop + } + #[cfg(test)] + Ok(EventLoopMsg::WaitNextScan(resp_tx)) => { + let result = resp_tx.send(Ok(())); + if let Err(e) = result { + tracing::error!(?e, "failed to send WaitNextScan response"); + } + } + Ok(EventLoopMsg::Shutdown) => { + break; + } + Err(e) => { + tracing::error!(?e, "failed to receive poll message"); + } } } }); @@ -626,42 +747,28 @@ impl PollWatcher { } /// Watch a path location. - /// - /// QUESTION: this function never return an Error, is it as intend? - /// Please also consider the IO Error event problem. - fn watch_inner(&self, path: &Path, watch_mode: WatchMode) { - // HINT: Make sure always lock in the same order to avoid deadlock. - // - // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore. - if let (Ok(mut watches), Ok(mut data_builder)) = - (self.watches.lock(), self.data_builder.lock()) - { - data_builder.update_timestamp(); - - let watch_data = data_builder.build_watch_data( - path.to_path_buf(), - watch_mode.recursive_mode.is_recursive(), - self.follow_sylinks, - ); + fn watch_inner(&self, path: &Path, watch_mode: WatchMode) -> crate::Result<()> { + let (tx, rx) = unbounded(); + self.event_loop_tx + .send(EventLoopMsg::AddWatch(path.to_path_buf(), watch_mode, tx))?; + rx.recv().unwrap() + } - // if create watch_data successful, add it to watching list. - if let Some(watch_data) = watch_data { - watches.insert(path.to_path_buf(), watch_data); - } - } + fn watch_multiple_inner(&self, paths: Vec<(PathBuf, WatchMode)>) -> crate::Result<()> { + let (tx, rx) = unbounded(); + self.event_loop_tx + .send(EventLoopMsg::AddWatchMultiple(paths, tx))?; + rx.recv().unwrap() } /// Unwatch a path. /// /// Return `Err(_)` if given path has't be monitored. fn unwatch_inner(&self, path: &Path) -> crate::Result<()> { - // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore. - self.watches - .lock() - .unwrap() - .remove(path) - .map(|_| ()) - .ok_or_else(crate::Error::watch_not_found) + let (tx, rx) = unbounded(); + self.event_loop_tx + .send(EventLoopMsg::RemoveWatch(path.to_path_buf(), tx))?; + rx.recv().unwrap() } } @@ -674,9 +781,12 @@ impl Watcher for PollWatcher { #[tracing::instrument(level = "debug", skip(self))] fn watch(&mut self, path: &Path, watch_mode: WatchMode) -> crate::Result<()> { - self.watch_inner(path, watch_mode); + self.watch_inner(path, watch_mode) + } - Ok(()) + #[tracing::instrument(level = "debug", skip(self))] + fn paths_mut<'me>(&'me mut self) -> Box { + Box::new(PollPathsMut::new(self)) } #[tracing::instrument(level = "debug", skip(self))] @@ -691,7 +801,10 @@ impl Watcher for PollWatcher { impl Drop for PollWatcher { fn drop(&mut self) { - self.want_to_stop.store(true, Ordering::Relaxed); + let result = self.event_loop_tx.send(EventLoopMsg::Shutdown); + if let Err(e) = result { + tracing::error!(?e, "failed to send shutdown message to poll watcher thread"); + } } } @@ -715,6 +828,7 @@ mod tests { let tmpdir = testdir(); let (mut watcher, rx) = watcher(); watcher.watch_recursively(&tmpdir); + watcher.watcher.wait_next_scan().expect("wait next scan"); let path = tmpdir.path().join("entry"); std::fs::File::create_new(&path).expect("Unable to create"); @@ -724,7 +838,6 @@ mod tests { } #[test] - #[ignore = "not implemented"] fn create_self_file() { let tmpdir = testdir(); let (mut watcher, rx) = watcher(); @@ -732,6 +845,7 @@ mod tests { let path = tmpdir.path().join("entry"); watcher.watch_nonrecursively(&path); + watcher.watcher.wait_next_scan().expect("wait next scan"); std::fs::File::create_new(&path).expect("create"); @@ -740,7 +854,6 @@ mod tests { } #[test] - #[ignore = "not implemented"] fn create_self_file_no_track() { let tmpdir = testdir(); let (mut watcher, _) = watcher(); @@ -764,7 +877,6 @@ mod tests { } #[test] - #[ignore = "TODO: not implemented"] fn create_self_file_nested() { let tmpdir = testdir(); let (mut watcher, rx) = watcher(); @@ -772,11 +884,12 @@ mod tests { let path = tmpdir.path().join("entry/nested"); watcher.watch_nonrecursively(&path); + watcher.watcher.wait_next_scan().expect("wait next scan"); std::fs::create_dir_all(path.parent().unwrap()).expect("create"); std::fs::File::create_new(&path).expect("create"); - rx.wait_ordered_exact([expected(&path).create_file()]); + rx.wait_ordered_exact([expected(&path).create_any()]); } #[test] @@ -784,6 +897,7 @@ mod tests { let tmpdir = testdir(); let (mut watcher, rx) = watcher(); watcher.watch_recursively(&tmpdir); + watcher.watcher.wait_next_scan().expect("wait next scan"); let path = tmpdir.path().join("entry"); std::fs::create_dir(&path).expect("Unable to create"); @@ -800,6 +914,7 @@ mod tests { std::fs::File::create_new(&path).expect("Unable to create"); watcher.watch_recursively(&tmpdir); + watcher.watcher.wait_next_scan().expect("wait next scan"); std::fs::write(&path, b"123").expect("Unable to write"); assert!( @@ -818,6 +933,7 @@ mod tests { std::fs::File::create_new(&path).expect("Unable to create"); watcher.watch_recursively(&tmpdir); + watcher.watcher.wait_next_scan().expect("wait next scan"); std::fs::rename(&path, &new_path).expect("Unable to remove"); rx.sleep_while_exists(&path); @@ -830,7 +946,6 @@ mod tests { } #[test] - #[ignore = "TODO: not implemented"] fn rename_self_file() { let tmpdir = testdir(); let (mut watcher, rx) = watcher(); @@ -839,6 +954,7 @@ mod tests { std::fs::File::create_new(&path).expect("create"); watcher.watch_nonrecursively(&path); + watcher.watcher.wait_next_scan().expect("wait next scan"); let new_path = tmpdir.path().join("renamed"); std::fs::rename(&path, &new_path).expect("rename"); @@ -851,15 +967,14 @@ mod tests { std::fs::rename(&new_path, &path).expect("rename2"); - rx.sleep_until_exists(&new_path); - rx.sleep_while_exists(&path); + rx.sleep_while_exists(&new_path); + rx.sleep_until_exists(&path); rx.wait_unordered_exact([expected(&path).create_any()]) .ensure_no_tail(); } #[test] - #[ignore = "TODO: not implemented"] fn rename_self_file_no_track() { let tmpdir = testdir(); let (mut watcher, rx) = watcher(); @@ -874,6 +989,7 @@ mod tests { target_mode: TargetMode::NoTrack, }, ); + watcher.watcher.wait_next_scan().expect("wait next scan"); let new_path = tmpdir.path().join("renamed"); @@ -909,14 +1025,17 @@ mod tests { std::fs::File::create_new(&path).expect("Unable to create"); watcher.watch_recursively(&tmpdir); + watcher.watcher.wait_next_scan().expect("wait next scan"); std::fs::remove_file(&path).expect("Unable to remove"); rx.sleep_while_exists(&path); - rx.wait_ordered_exact([expected(&path).remove_any()]); + rx.wait_ordered_exact([ + expected(&path).modify_data_any().optional(), + expected(&path).remove_any(), + ]); } #[test] - #[ignore = "TODO: not implemented"] fn delete_self_file() { let tmpdir = testdir(); let (mut watcher, rx) = watcher(); @@ -924,20 +1043,23 @@ mod tests { std::fs::File::create_new(&path).expect("Unable to create"); watcher.watch_nonrecursively(&path); + watcher.watcher.wait_next_scan().expect("wait next scan"); std::fs::remove_file(&path).expect("Unable to remove"); rx.sleep_while_exists(&path); - rx.wait_ordered_exact([expected(&path).remove_any()]); + rx.wait_ordered_exact([ + expected(&path).modify_data_any().optional(), + expected(&path).remove_any(), + ]); std::fs::write(&path, "").expect("write"); rx.sleep_until_exists(&path); - rx.wait_ordered_exact([expected(&path).create_file()]); + rx.wait_ordered_exact([expected(&path).create_any()]); } #[test] - #[ignore = "TODO: not implemented"] fn delete_self_file_no_track() { let tmpdir = testdir(); let (mut watcher, rx) = watcher(); @@ -951,11 +1073,15 @@ mod tests { target_mode: TargetMode::NoTrack, }, ); + watcher.watcher.wait_next_scan().expect("wait next scan"); std::fs::remove_file(&path).expect("Unable to remove"); rx.sleep_while_exists(&path); - rx.wait_ordered_exact([expected(&path).remove_any()]); + rx.wait_ordered_exact([ + expected(&path).modify_data_any().optional(), + expected(&path).remove_any(), + ]); std::fs::write(&path, "").expect("write"); @@ -971,6 +1097,7 @@ mod tests { std::fs::write(&overwritten_file, "123").expect("write1"); watcher.watch_nonrecursively(&tmpdir); + watcher.watcher.wait_next_scan().expect("wait next scan"); std::fs::File::create(&overwriting_file).expect("create"); std::fs::write(&overwriting_file, "321").expect("write2");