Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ The minor version will be incremented upon a breaking change and the patch versi

### Features

## [0.11.0]

### Breaking

- Renamed `spawn_lock_manager` to `spawn_lock_manager_with_lease_man`.
- Replaced `ManagedLease::new` with `ManagedLease::spawn` and `ManagedLease::spawn_on` as it better indicates the intent the the user.

### Changes

- Refactored the `ManagedLease` implementation to use a background task to handle lease creation and lifecycle management.
Removing the need to use `Arc<Mutex<...>>`.

## [0.10.0]

- Removed pinned dependencies for "^" in mature dependencies and "~" for non-mature deps.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rust-etcd-utils"
version = "0.10.0+pre1"
version = "0.11.0"
authors = [
"Triton One",
"Louis-Vincent Boudreault"
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ Utility library for common ETCD management in Rust, it covers:
2. Automatic lock lifecycle managemented: auto-revoke and auto-keep-alive.
3. Builtin retry logic for every public function to make robust in-production application.

## Add to library

```
cargo add rust-etcd-utils
```


## How to test

Expand Down
240 changes: 189 additions & 51 deletions src/lease.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use {
crate::retry::retry_etcd,
futures::StreamExt,
std::{sync::Arc, time::Duration},
std::time::Duration,
tokio::{
sync::{broadcast, oneshot, Mutex},
task::JoinSet,
sync::{broadcast, mpsc, oneshot},
task::{JoinHandle, JoinSet},
time::Instant,
},
tracing::{error, warn},
Expand Down Expand Up @@ -128,8 +128,7 @@ impl ManagedLease {
///
#[derive(Clone)]
pub struct ManagedLeaseFactory {
etcd: etcd_client::Client,
js: Arc<Mutex<JoinSet<()>>>,
cnc_tx: mpsc::Sender<ManagedLeaseRuntimeCommand>,
}

///
Expand All @@ -156,58 +155,75 @@ impl Clone for LeaseExpiredNotify {
}
}

impl ManagedLeaseFactory {
pub fn new(etcd: etcd_client::Client) -> Self {
let js = Arc::new(Mutex::new(JoinSet::new()));
let js2 = Arc::clone(&js);

tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
{
let mut lock = js2.lock().await;
while let Some(result) = lock.try_join_next() {
if let Err(e) = result {
error!("detected managed lease thread failed with: {e:?}");
} else {
tracing::info!("detected managed lease thread finished");
}
}
}
}
});
struct CreateLeaseCommand {
ttl: Duration,
keepalive_interval: Option<Duration>,
auto_refresh_limit: Option<usize>,
callback: oneshot::Sender<Result<ManagedLease, CreateLeaseError>>,
}

Self { etcd, js }
}
enum ManagedLeaseRuntimeCommand {
CreateLease(CreateLeaseCommand),
}

///
/// Managed lease factory runtime that will handle the lease creation and keep alive.
/// This is a separate task that will run in the background.
///
struct ManagedLeaseFactoryRuntime {
///
/// Shutdown the lease factory and revoke all leases.
/// The etcd client to use.
///
/// Becareful calling this method as it will wait for all lease to be revoked.
etcd: etcd_client::Client,

///
pub async fn shutdown(self, timeout: Duration) {
let mut lock = self.js.lock().await;
let _ = tokio::time::timeout(timeout, lock.shutdown()).await;
}
/// The runtime handle to spawn tasks on.
///
rt: tokio::runtime::Handle,

pub async fn new_lease_with_auto_refresh_limit(
&self,
ttl: Duration,
keepalive_interval: Option<Duration>,
auto_refresh_limit: Option<usize>,
) -> Result<ManagedLease, etcd_client::Error> {
let ttl_secs: i64 = ttl.as_secs() as i64;
assert!(ttl_secs >= 2, "lease ttl must be at least two (2) seconds");
let lease_id = retry_etcd(self.etcd.clone(), (), move |mut etcd, _| async move {
///
/// The join set to manage the tasks.
///
js: JoinSet<()>,

///
/// The channel to notify the runtime to shutdown.
///
cnc_rx: mpsc::Receiver<ManagedLeaseRuntimeCommand>,
}

#[derive(Debug, thiserror::Error)]
pub enum CreateLeaseError {
#[error("lease creation failed")]
EtcdError(#[from] etcd_client::Error),
#[error("invalid lease ttl, must be at least 2 seconds")]
InvalidTTL,
}

impl ManagedLeaseFactoryRuntime {
async fn handle_create_lease(&mut self, cmd: CreateLeaseCommand) {
let CreateLeaseCommand {
ttl,
keepalive_interval,
auto_refresh_limit,
callback,
} = cmd;
let ttl_secs = ttl.as_secs() as i64;
let lease_result = retry_etcd(self.etcd.clone(), (), move |mut etcd, _| async move {
etcd.lease_grant(ttl_secs, None).await
})
.await?
.id();
let (stop_tx, mut stop_rx) = oneshot::channel();
.await;
let lease_id = match lease_result {
Ok(lease) => lease.id(),
Err(e) => {
let _ = callback.send(Err(e.into()));
return;
}
};
let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
let client = self.etcd.clone();
let mut lock = self.js.lock().await;
let (tx_expired, rx_expired) = broadcast::channel(1);
let _ = lock.spawn(async move {
let _ah = self.js.spawn_on(async move {
let mut refresh_count = 0;
'outer: loop {
let first_keep_alive = Instant::now();
Expand Down Expand Up @@ -305,14 +321,131 @@ impl ManagedLeaseFactory {
}
}
let _ = tx_expired.send(());
});

Ok(ManagedLease {
}, &self.rt);
let lease = ManagedLease {
etcd: self.etcd.clone(),
lease_id,
_tx_terminate: stop_tx,
rx_lease_expire: rx_expired,
})
};
let _ = callback.send(Ok(lease));
}

async fn handle_command(&mut self, cmd: ManagedLeaseRuntimeCommand) {
match cmd {
ManagedLeaseRuntimeCommand::CreateLease(cmd) => {
self.handle_create_lease(cmd).await;
}
}
}

async fn run(mut self) {
loop {
// Loops ends when both the command channel and the join set are closed.
// When command-and-control channel is closed, it means no `ManagedLease` exists anymore.
// However, the join set may still have tasks running, we must wait for them to finish.
tokio::select! {
Some(cmd) = self.cnc_rx.recv() => {
self.handle_command(cmd).await;
}
Some(res) = self.js.join_next() => {
match res {
Ok(_) => {
// task completed successfully
tracing::trace!("managed lease task completed");
}
Err(e) => {
tracing::warn!("task failed: {e:?}");
}
}
}
else => {
break;
}
}
}
tracing::trace!("managed lease factory runtime exiting");
}
}

impl ManagedLeaseFactory {
///
/// Create a new managed lease factory.
/// This will spawn a new task that will handle the lease creation and keep alive.
///
pub fn spawn(etcd: etcd_client::Client) -> (Self, JoinHandle<()>) {
Self::spawn_on(etcd, tokio::runtime::Handle::current())
}

///
/// Create a new managed lease factory.
/// This will spawn a new task that will handle the lease creation and keep alive.
///
/// Arguments:
/// * `etcd` - The etcd client to use.
/// * `rt` - The runtime handle to spawn tasks on.
pub fn spawn_on(
etcd: etcd_client::Client,
rt: tokio::runtime::Handle,
) -> (Self, JoinHandle<()>) {
let (cnc_tx, cnc_rx) = mpsc::channel(100);
let lease_rt = ManagedLeaseFactoryRuntime {
etcd,
rt: rt.clone(),
js: JoinSet::new(),
cnc_rx,
};
let jh = rt.spawn(lease_rt.run());
(
Self {
cnc_tx: cnc_tx.clone(),
},
jh,
)
}

///
/// Create a new managed lease with the given time-to-live (TTL), keepalive interval and auto refresh limit.
/// The lease will be kept alive until it is dropped OR until the lease has been refresh `auto_refresh_limit` times.
///
/// Arguments:
///
/// * `ttl` - The time-to-live for the lease.
/// * `keepalive_interval` - The interval to keep the lease alive.
/// * `auto_refresh_limit` - The number of times to auto refresh the lease.
///
pub async fn new_lease_with_auto_refresh_limit(
&self,
ttl: Duration,
keepalive_interval: Option<Duration>,
auto_refresh_limit: Option<usize>,
) -> Result<ManagedLease, etcd_client::Error> {
let ttl_secs: i64 = ttl.as_secs() as i64;
assert!(ttl_secs >= 2, "lease ttl must be at least two (2) seconds");
let (callback_tx, callback_rx) = oneshot::channel();
let command = CreateLeaseCommand {
ttl,
keepalive_interval,
auto_refresh_limit,
callback: callback_tx,
};
self.cnc_tx
.send(ManagedLeaseRuntimeCommand::CreateLease(command))
.await
.expect("failed to send command to managed lease factory");

let result = callback_rx
.await
.expect("failed to receive result from managed lease factory");
match result {
Ok(lease) => Ok(lease),
Err(e) => match e {
CreateLeaseError::EtcdError(e) => Err(e),
CreateLeaseError::InvalidTTL => {
panic!("lease ttl must be at least two (2) seconds");
}
},
}
}

///
Expand All @@ -324,6 +457,11 @@ impl ManagedLeaseFactory {
///
/// Keepalive interval is optional, if not provided it will be half of the ttl.
///
/// Arguments:
///
/// * `ttl` - The time-to-live for the lease.
/// * `keepalive_interval` - The interval to keep the lease alive.
///
pub async fn new_lease(
&self,
ttl: Duration,
Expand Down
Loading
Loading