From 66c0510a77041f7c313f593eb8f1cfe71dd4fe44 Mon Sep 17 00:00:00 2001 From: Louis-Vincent Date: Mon, 12 May 2025 11:27:57 -0400 Subject: [PATCH 1/2] refactor api --- CHANGELOG.md | 12 +++ Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 6 ++ src/lease.rs | 240 ++++++++++++++++++++++++++++++++++---------- src/lib.rs | 65 ++++++++++++ src/lock.rs | 36 +++++-- src/watcher.rs | 9 ++ tests/test_lease.rs | 10 +- tests/test_lock.rs | 50 ++++----- tests/test_log.rs | 18 ++-- 11 files changed, 347 insertions(+), 103 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 75e3221..7dcc686 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,18 @@ The minor version will be incremented upon a breaking change and the patch versi ### Features +## [0.11.0] + +### Breaking + +- Renamed `spawn_lock_manager` to `spawn_lock_manager_with_lease_man`. +- Replaced `ManagedLease::new` with `ManagedLease::spawn` and `ManagedLease::spawn_on` as it better indicates the intent the the user. + +### Changes + +- Refactored the `ManagedLease` implementation to use a background task to handle lease creation and lifecycle management. +Removing the need to use `Arc>`. + ## [0.10.0] - Removed pinned dependencies for "^" in mature dependencies and "~" for non-mature deps. \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index c9602ac..465d86e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -874,7 +874,7 @@ dependencies = [ [[package]] name = "rust-etcd-utils" -version = "0.10.0+pre1" +version = "0.11.0+pre1" dependencies = [ "async-trait", "etcd-client", diff --git a/Cargo.toml b/Cargo.toml index b43d32d..fb03414 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rust-etcd-utils" -version = "0.10.0+pre1" +version = "0.11.0+pre1" authors = [ "Triton One", "Louis-Vincent Boudreault" diff --git a/README.md b/README.md index 6df3b00..e39be3a 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,12 @@ Utility library for common ETCD management in Rust, it covers: 2. Automatic lock lifecycle managemented: auto-revoke and auto-keep-alive. 3. Builtin retry logic for every public function to make robust in-production application. +## Add to library + +``` +cargo add rust-etcd-utils +``` + ## How to test diff --git a/src/lease.rs b/src/lease.rs index 5c58eae..da8e095 100644 --- a/src/lease.rs +++ b/src/lease.rs @@ -1,10 +1,10 @@ use { crate::retry::retry_etcd, futures::StreamExt, - std::{sync::Arc, time::Duration}, + std::time::Duration, tokio::{ - sync::{broadcast, oneshot, Mutex}, - task::JoinSet, + sync::{broadcast, mpsc, oneshot}, + task::{JoinHandle, JoinSet}, time::Instant, }, tracing::{error, warn}, @@ -128,8 +128,7 @@ impl ManagedLease { /// #[derive(Clone)] pub struct ManagedLeaseFactory { - etcd: etcd_client::Client, - js: Arc>>, + cnc_tx: mpsc::Sender, } /// @@ -156,58 +155,75 @@ impl Clone for LeaseExpiredNotify { } } -impl ManagedLeaseFactory { - pub fn new(etcd: etcd_client::Client) -> Self { - let js = Arc::new(Mutex::new(JoinSet::new())); - let js2 = Arc::clone(&js); - - tokio::spawn(async move { - loop { - tokio::time::sleep(Duration::from_secs(1)).await; - { - let mut lock = js2.lock().await; - while let Some(result) = lock.try_join_next() { - if let Err(e) = result { - error!("detected managed lease thread failed with: {e:?}"); - } else { - tracing::info!("detected managed lease thread finished"); - } - } - } - } - }); +struct CreateLeaseCommand { + ttl: Duration, + keepalive_interval: Option, + auto_refresh_limit: Option, + callback: oneshot::Sender>, +} - Self { etcd, js } - } +enum ManagedLeaseRuntimeCommand { + CreateLease(CreateLeaseCommand), +} +/// +/// Managed lease factory runtime that will handle the lease creation and keep alive. +/// This is a separate task that will run in the background. +/// +struct ManagedLeaseFactoryRuntime { /// - /// Shutdown the lease factory and revoke all leases. + /// The etcd client to use. /// - /// Becareful calling this method as it will wait for all lease to be revoked. + etcd: etcd_client::Client, + /// - pub async fn shutdown(self, timeout: Duration) { - let mut lock = self.js.lock().await; - let _ = tokio::time::timeout(timeout, lock.shutdown()).await; - } + /// The runtime handle to spawn tasks on. + /// + rt: tokio::runtime::Handle, - pub async fn new_lease_with_auto_refresh_limit( - &self, - ttl: Duration, - keepalive_interval: Option, - auto_refresh_limit: Option, - ) -> Result { - let ttl_secs: i64 = ttl.as_secs() as i64; - assert!(ttl_secs >= 2, "lease ttl must be at least two (2) seconds"); - let lease_id = retry_etcd(self.etcd.clone(), (), move |mut etcd, _| async move { + /// + /// The join set to manage the tasks. + /// + js: JoinSet<()>, + + /// + /// The channel to notify the runtime to shutdown. + /// + cnc_rx: mpsc::Receiver, +} + +#[derive(Debug, thiserror::Error)] +pub enum CreateLeaseError { + #[error("lease creation failed")] + EtcdError(#[from] etcd_client::Error), + #[error("invalid lease ttl, must be at least 2 seconds")] + InvalidTTL, +} + +impl ManagedLeaseFactoryRuntime { + async fn handle_create_lease(&mut self, cmd: CreateLeaseCommand) { + let CreateLeaseCommand { + ttl, + keepalive_interval, + auto_refresh_limit, + callback, + } = cmd; + let ttl_secs = ttl.as_secs() as i64; + let lease_result = retry_etcd(self.etcd.clone(), (), move |mut etcd, _| async move { etcd.lease_grant(ttl_secs, None).await }) - .await? - .id(); - let (stop_tx, mut stop_rx) = oneshot::channel(); + .await; + let lease_id = match lease_result { + Ok(lease) => lease.id(), + Err(e) => { + let _ = callback.send(Err(e.into())); + return; + } + }; + let (stop_tx, mut stop_rx) = oneshot::channel::<()>(); let client = self.etcd.clone(); - let mut lock = self.js.lock().await; let (tx_expired, rx_expired) = broadcast::channel(1); - let _ = lock.spawn(async move { + let _ah = self.js.spawn_on(async move { let mut refresh_count = 0; 'outer: loop { let first_keep_alive = Instant::now(); @@ -305,14 +321,131 @@ impl ManagedLeaseFactory { } } let _ = tx_expired.send(()); - }); - - Ok(ManagedLease { + }, &self.rt); + let lease = ManagedLease { etcd: self.etcd.clone(), lease_id, _tx_terminate: stop_tx, rx_lease_expire: rx_expired, - }) + }; + let _ = callback.send(Ok(lease)); + } + + async fn handle_command(&mut self, cmd: ManagedLeaseRuntimeCommand) { + match cmd { + ManagedLeaseRuntimeCommand::CreateLease(cmd) => { + self.handle_create_lease(cmd).await; + } + } + } + + async fn run(mut self) { + loop { + // Loops ends when both the command channel and the join set are closed. + // When command-and-control channel is closed, it means no `ManagedLease` exists anymore. + // However, the join set may still have tasks running, we must wait for them to finish. + tokio::select! { + Some(cmd) = self.cnc_rx.recv() => { + self.handle_command(cmd).await; + } + Some(res) = self.js.join_next() => { + match res { + Ok(_) => { + // task completed successfully + tracing::trace!("managed lease task completed"); + } + Err(e) => { + tracing::warn!("task failed: {e:?}"); + } + } + } + else => { + break; + } + } + } + tracing::trace!("managed lease factory runtime exiting"); + } +} + +impl ManagedLeaseFactory { + /// + /// Create a new managed lease factory. + /// This will spawn a new task that will handle the lease creation and keep alive. + /// + pub fn spawn(etcd: etcd_client::Client) -> (Self, JoinHandle<()>) { + Self::spawn_on(etcd, tokio::runtime::Handle::current()) + } + + /// + /// Create a new managed lease factory. + /// This will spawn a new task that will handle the lease creation and keep alive. + /// + /// Arguments: + /// * `etcd` - The etcd client to use. + /// * `rt` - The runtime handle to spawn tasks on. + pub fn spawn_on( + etcd: etcd_client::Client, + rt: tokio::runtime::Handle, + ) -> (Self, JoinHandle<()>) { + let (cnc_tx, cnc_rx) = mpsc::channel(100); + let lease_rt = ManagedLeaseFactoryRuntime { + etcd, + rt: rt.clone(), + js: JoinSet::new(), + cnc_rx, + }; + let jh = rt.spawn(lease_rt.run()); + ( + Self { + cnc_tx: cnc_tx.clone(), + }, + jh, + ) + } + + /// + /// Create a new managed lease with the given time-to-live (TTL), keepalive interval and auto refresh limit. + /// The lease will be kept alive until it is dropped OR until the lease has been refresh `auto_refresh_limit` times. + /// + /// Arguments: + /// + /// * `ttl` - The time-to-live for the lease. + /// * `keepalive_interval` - The interval to keep the lease alive. + /// * `auto_refresh_limit` - The number of times to auto refresh the lease. + /// + pub async fn new_lease_with_auto_refresh_limit( + &self, + ttl: Duration, + keepalive_interval: Option, + auto_refresh_limit: Option, + ) -> Result { + let ttl_secs: i64 = ttl.as_secs() as i64; + assert!(ttl_secs >= 2, "lease ttl must be at least two (2) seconds"); + let (callback_tx, callback_rx) = oneshot::channel(); + let command = CreateLeaseCommand { + ttl, + keepalive_interval, + auto_refresh_limit, + callback: callback_tx, + }; + self.cnc_tx + .send(ManagedLeaseRuntimeCommand::CreateLease(command)) + .await + .expect("failed to send command to managed lease factory"); + + let result = callback_rx + .await + .expect("failed to receive result from managed lease factory"); + match result { + Ok(lease) => Ok(lease), + Err(e) => match e { + CreateLeaseError::EtcdError(e) => Err(e), + CreateLeaseError::InvalidTTL => { + panic!("lease ttl must be at least two (2) seconds"); + } + }, + } } /// @@ -324,6 +457,11 @@ impl ManagedLeaseFactory { /// /// Keepalive interval is optional, if not provided it will be half of the ttl. /// + /// Arguments: + /// + /// * `ttl` - The time-to-live for the lease. + /// * `keepalive_interval` - The interval to keep the lease alive. + /// pub async fn new_lease( &self, ttl: Duration, diff --git a/src/lib.rs b/src/lib.rs index 5469b8f..987cc97 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,68 @@ +//! +//! A set of utilities for working with etcd client Rust client. +//! +//! The library provides API to create "managed" etcd resources, such as leases, locks, and logs. +//! +//! Most of the API is built on top of the etcd client library, and provides a higher-level abstraction. +//! +//! It is designed to be used in asynchronous Rust applications, and provides a set of utilities for working with etcd in a concurrent environment. +//! +//! As you start your journey with etcd you will find that the etcd client library is a bit low-level and requires a lot of boilerplate code to work with. +//! You have to implement your own lease renewal, early lock release and you typically wants to run future against a lock lifetime. +//! +//! Moreover, properly managing watcher can be tricky as you have to deal with many edge cases. +//! +//! Also, every object has retry logic to handle transient errors when communicating with etcd. +//! If you are using etcd in a distributed system, you will encounter transient errors such as network partition or server overload. +//! This library provides a set of utilities to handle these errors and retry the operation. +//! +//! Lastly, all modules in this crate uses RAII pattern to manage resources. +//! When you drop a resource, it will automatically release the lock or lease. +//! +//! # Examples +//! +//! For a complete example, see the [`tests`] directory. +//! +//! ```rust +//! use etcd_client::Client; +//! use rust_etcd_utils::{ +//! lock::spawn_lock_manager, +//! } +//! +//! #[tokio::main] +//! async fn main() { +//! +//! let etcd = Client::connect(["http://localhost:2379"], None) +//! .await +//! .expect("failed to connect to etcd"); +//! let (lock_manager, _lock_manager_handle) = spawn_lock_manager(etcd); +//! +//! let lock = lock_manager +//! .lock("my_lock", std::time::Duration::from_secs(10)) +//! .await +//! .expect("failed to lock"); +//! +//! lock.scope_with(|scope| async move { +//! // Do something WHILE YOU HOLD THE LOCK +//! // This is a good place to do some work that requires exclusive access to a resource. +//! // For example, you can write to a log or update a value in etcd. +//! // If the lock is lost while you are doing this work, the future will be cancelled. +//! ... +//! }).await.expect("lock lost"); +//! +//! let revoke_notif = lock.get_revoke_notify(); +//! +//! drop(lock)); // Release the lock early +//! +//! // Wait for the lock to be released +//! let _ = revoke_notif.recv().await; // The lock is released +//! } +//! ``` +//! +//! +//! [`tests`]: https://github.com/rpcpool/rust-etcd-utils/tree/main/tests +//! + /// /// Provides an API over "managed" lease /// diff --git a/src/lock.rs b/src/lock.rs index 9f36754..5139944 100644 --- a/src/lock.rs +++ b/src/lock.rs @@ -186,13 +186,11 @@ impl ManagedLockRevokeNotify { /// /// ```no_run /// use etcd_client::Client; -/// use rust_etcd_utils::{lease::ManagedLeaseFactory, lock::spawn_lock_manager, ManagedLock}; +/// use rust_etcd_utils::{lock::spawn_lock_manager, ManagedLock}; /// /// let etcd = Client::connect(["http://localhost:2379"], None).await.expect("failed to connect to etcd"); /// -/// let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); -/// -/// let (lock_man_handle, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory.clone()); +/// let (lock_man_handle, lock_man) = spawn_lock_manager(etcd.clone()); /// /// // Do something with the lock manager /// @@ -210,11 +208,10 @@ impl ManagedLockRevokeNotify { /// ```no_run /// /// use etcd_client::Client; -/// use rust_etcd_utils::{lease::ManagedLeaseFactory, lock::spawn_lock_manager, lock::ManagedLock, lock::TryLockError}; +/// use rust_etcd_utils::{lock::spawn_lock_manager, lock::ManagedLock, lock::TryLockError}; /// /// let etcd = Client::connect(["http://localhost:2379"], None).await.expect("failed to connect to etcd"); -/// let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); -/// let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory.clone()); +/// let (_, lock_man) = spawn_lock_manager(etcd.clone()); /// /// let lock_man2 = lock_man.clone(); /// let task1 = tokio::spawn(async move { @@ -229,7 +226,17 @@ impl ManagedLockRevokeNotify { /// assert!(matches!(err, Err(TryLockError::AlreadyTaken))); /// ``` /// -pub fn spawn_lock_manager( +pub fn spawn_lock_manager(etcd: etcd_client::Client) -> (LockManagerHandle, LockManager) { + let (lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); + spawn_lock_manager_with_lease_factory(etcd, lease_factory) +} + +/// +/// Creates a lock manager to create "managed" locks. +/// +/// See [`spawn_lock_manager`] for more details. +/// +pub fn spawn_lock_manager_with_lease_factory( etcd: etcd_client::Client, managed_lease_factory: ManagedLeaseFactory, ) -> (LockManagerHandle, LockManager) { @@ -708,9 +715,7 @@ impl ManagedLock { /// /// let etcd = Client::connect(["http://localhost:2379"], None).await.expect("failed to connect to etcd"); /// - /// let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); - /// - /// let (lock_man_handle, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory.clone()); + /// let (lock_man_handle, lock_man) = spawn_lock_manager(etcd.clone()); /// /// // Do something with the lock manager /// @@ -743,9 +748,11 @@ impl ManagedLock { match rx.try_recv() { Ok(_) => { + tracing::trace!("Lock revoked"); return Err(LockError::LockRevoked); } Err(broadcast::error::TryRecvError::Closed) => { + tracing::trace!("Lock revoked"); return Err(LockError::LockRevoked); } _ => {} @@ -762,6 +769,13 @@ impl ManagedLock { pub fn get_managed_lease_weak_ref(&self) -> ManagedLeaseWeak { self.managed_lease.get_weak() } + + /// + /// Convert the managed lock into a signal that will be resolved when the lock is revoked. + /// + pub async fn into_revoked_fut(self) { + let _ = self.scope(futures::future::pending::<()>()).await; + } } /// diff --git a/src/watcher.rs b/src/watcher.rs index 73183ab..638d875 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -40,6 +40,15 @@ pub enum WatchEvent { pub trait WatchClientExt { fn get_watch_client(&self) -> WatchClient; + /// + /// Creates a channel that watches for changes to a key in etcd. + /// + /// The channel will send a [`WatchEvent`] for each change to the key. + /// The channel will be retried on transient errors. + /// + /// The channel will be closed if the watch is cancelled or if the stream is closed. + /// + /// The watch expect value to be JSON encoded. fn json_watch_channel( &self, key: impl Into>, diff --git a/tests/test_lease.rs b/tests/test_lease.rs index cc6589c..a264030 100644 --- a/tests/test_lease.rs +++ b/tests/test_lease.rs @@ -7,7 +7,7 @@ mod common; #[tokio::test] async fn it_should_automatically_refresh_lease() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); let managed_lease = managed_lease_factory .new_lease(Duration::from_secs(2), None) @@ -27,7 +27,7 @@ async fn it_should_automatically_refresh_lease() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn it_should_limit_the_amount_of_lease_refresh() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); let lease_created_at = std::time::Instant::now(); let ttl = Duration::from_secs(2); @@ -58,7 +58,7 @@ async fn it_should_limit_the_amount_of_lease_refresh() { #[tokio::test] async fn test_lease_expire_notify_on_drop() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); let managed_lease = managed_lease_factory .new_lease(Duration::from_secs(2), None) @@ -78,7 +78,7 @@ async fn test_lease_expire_notify_on_drop() { #[tokio::test] async fn test_lease_expire_notify_when_etcd_revoke() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); let managed_lease = managed_lease_factory .new_lease(Duration::from_secs(2), None) @@ -102,7 +102,7 @@ async fn test_lease_expire_notify_when_etcd_revoke() { #[tokio::test] async fn lease_revoke_notify_should_return_immediately_if_created_after_delete() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); let managed_lease = managed_lease_factory .new_lease(Duration::from_secs(2), None) diff --git a/tests/test_lock.rs b/tests/test_lock.rs index 06e8598..b244dea 100644 --- a/tests/test_lock.rs +++ b/tests/test_lock.rs @@ -3,15 +3,14 @@ use std::time::Duration; use common::random_str; use rust_etcd_utils::{ lease::ManagedLeaseFactory, - lock::{spawn_lock_manager, TryLockError}, + lock::{spawn_lock_manager, spawn_lock_manager_with_lease_factory, TryLockError}, }; mod common; #[tokio::test] async fn it_should_failed_to_lock_already_taken_lock() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); - let (_lock_man_handle, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let (_lock_man_handle, lock_man) = spawn_lock_manager(etcd.clone()); let lock_name = random_str(10); let _managed_lock1 = lock_man @@ -26,12 +25,13 @@ async fn it_should_failed_to_lock_already_taken_lock() { #[tokio::test] async fn try_lock_should_fail_with_expired_lease() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); let managed_lease = managed_lease_factory .new_lease(Duration::from_secs(10), None) .await .expect("failed to create lease"); - let (_lock_man_handle, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let (_lock_man_handle, lock_man) = + spawn_lock_manager_with_lease_factory(etcd.clone(), managed_lease_factory); let lease_id = managed_lease.lease_id(); let _ = etcd .lease_client() @@ -48,12 +48,13 @@ async fn try_lock_should_fail_with_expired_lease() { #[tokio::test] async fn lock_should_fail_with_expired_lease() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); let managed_lease = managed_lease_factory .new_lease(Duration::from_secs(10), None) .await .expect("failed to create lease"); - let (_lock_man_handle, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let (_lock_man_handle, lock_man) = + spawn_lock_manager_with_lease_factory(etcd.clone(), managed_lease_factory); let lease_id = managed_lease.lease_id(); let _ = etcd .lease_client() @@ -71,8 +72,7 @@ async fn lock_should_fail_with_expired_lease() { #[tokio::test] async fn dropping_managed_lock_should_revoke_etcd_lock() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); - let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let (_, lock_man) = spawn_lock_manager(etcd.clone()); let lock_name = random_str(10); let managed_lock1 = lock_man @@ -93,8 +93,8 @@ async fn dropping_managed_lock_should_revoke_etcd_lock() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn lock_lease_should_be_automatically_refreshed() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); - let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); + let (_, lock_man) = spawn_lock_manager_with_lease_factory(etcd.clone(), managed_lease_factory); let lock_name = random_str(10); @@ -111,8 +111,8 @@ async fn lock_lease_should_be_automatically_refreshed() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_managed_lock_scope() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); - let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); + let (_, lock_man) = spawn_lock_manager_with_lease_factory(etcd.clone(), managed_lease_factory); let lock_name = random_str(10); @@ -148,8 +148,8 @@ async fn test_managed_lock_scope() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_managed_lock_scope_with() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); - let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); + let (_, lock_man) = spawn_lock_manager_with_lease_factory(etcd.clone(), managed_lease_factory); let lock_name = random_str(10); @@ -185,8 +185,8 @@ async fn test_managed_lock_scope_with() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_lock() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); - let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); + let (_, lock_man) = spawn_lock_manager_with_lease_factory(etcd.clone(), managed_lease_factory); let lock_name = random_str(10); let lock = lock_man @@ -209,8 +209,8 @@ async fn test_lock() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_managed_lock_revoke_notify_clonability() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); - let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); + let (_, lock_man) = spawn_lock_manager_with_lease_factory(etcd.clone(), managed_lease_factory); let lock_name = random_str(10); @@ -252,8 +252,8 @@ async fn test_managed_lock_revoke_notify_clonability() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn lock_revoke_notify_should_notify_even_if_lock_is_revoke_before_constructor() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); - let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); + let (_, lock_man) = spawn_lock_manager_with_lease_factory(etcd.clone(), managed_lease_factory); let lock_name = random_str(10); @@ -281,8 +281,8 @@ async fn lock_revoke_notify_should_notify_even_if_lock_is_revoke_before_construc #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn revoke_notify_should_be_completly_independent() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); - let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); + let (_, lock_man) = spawn_lock_manager_with_lease_factory(etcd.clone(), managed_lease_factory); let lock_name = random_str(10); @@ -338,8 +338,8 @@ async fn revoke_notify_should_be_completly_independent() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn it_should_notify_revoke_when_underlying_lease_expire() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); - let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); + let (_, lock_man) = spawn_lock_manager_with_lease_factory(etcd.clone(), managed_lease_factory); let lock_name = random_str(10); diff --git a/tests/test_log.rs b/tests/test_log.rs index a42551f..f768c84 100644 --- a/tests/test_log.rs +++ b/tests/test_log.rs @@ -3,7 +3,7 @@ use std::time::Duration; use common::random_str; use rust_etcd_utils::{ lease::ManagedLeaseFactory, - lock::{spawn_lock_manager, ManagedLockGuard}, + lock::{spawn_lock_manager_with_lease_factory, ManagedLockGuard}, log::{ExclusiveLogUpdater, LogWatcher, WriteError}, }; use serde::{Deserialize, Serialize}; @@ -18,8 +18,8 @@ struct DummyValue { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn write_should_work_within_lock_scope_lifetime() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); - let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); + let (_, lock_man) = spawn_lock_manager_with_lease_factory(etcd.clone(), managed_lease_factory); let lock_name = random_str(10); let log_name = random_str(10); let lock = lock_man @@ -54,8 +54,8 @@ async fn write_should_work_within_lock_scope_lifetime() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn releasing_lock_should_release_the_log_too() { let etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); - let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); + let (_, lock_man) = spawn_lock_manager_with_lease_factory(etcd.clone(), managed_lease_factory); let lock_name1 = random_str(10); let lock_name2 = random_str(10); let log_name = random_str(10); @@ -115,8 +115,8 @@ async fn releasing_lock_should_release_the_log_too() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn write_should_fail_if_lock_already_revoked() { let mut etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); - let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); + let (_, lock_man) = spawn_lock_manager_with_lease_factory(etcd.clone(), managed_lease_factory); let lock_name1 = random_str(10); let log_name = random_str(10); let lock = lock_man @@ -149,8 +149,8 @@ async fn write_should_fail_if_lock_already_revoked() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn log_watch_should_work_even_if_the_underlying_log_gets_deleted() { let mut etcd = common::get_etcd_client().await; - let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); - let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let (managed_lease_factory, _) = ManagedLeaseFactory::spawn(etcd.clone()); + let (_, lock_man) = spawn_lock_manager_with_lease_factory(etcd.clone(), managed_lease_factory); let lock_name1 = random_str(10); let lock_name2 = random_str(10); let log_name = random_str(10); From 5b07212dd68e6927142ba2a42af9a860a3498f03 Mon Sep 17 00:00:00 2001 From: Louis-Vincent Date: Mon, 12 May 2025 11:42:34 -0400 Subject: [PATCH 2/2] Cargo version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 465d86e..0182e17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -874,7 +874,7 @@ dependencies = [ [[package]] name = "rust-etcd-utils" -version = "0.11.0+pre1" +version = "0.11.0" dependencies = [ "async-trait", "etcd-client", diff --git a/Cargo.toml b/Cargo.toml index fb03414..f5da006 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rust-etcd-utils" -version = "0.11.0+pre1" +version = "0.11.0" authors = [ "Triton One", "Louis-Vincent Boudreault"