diff --git a/src/conn/pool/futures/disconnect_pool.rs b/src/conn/pool/futures/disconnect_pool.rs index b8a07d4d..631a923b 100644 --- a/src/conn/pool/futures/disconnect_pool.rs +++ b/src/conn/pool/futures/disconnect_pool.rs @@ -7,69 +7,38 @@ // modified, or distributed except according to those terms. use std::{ - future::Future, - pin::Pin, + future::poll_fn, + sync::atomic, task::{Context, Poll}, }; -use futures_core::ready; -use tokio::sync::mpsc::UnboundedSender; - use crate::{ - conn::pool::{Inner, Pool, QUEUE_END_ID}, + conn::pool::{Pool, QUEUE_END_ID}, error::Error, - Conn, }; -use std::sync::{atomic, Arc}; - -/// Future that disconnects this pool from a server and resolves to `()`. +/// Disconnect this pool from a server and resolves to `()`. /// -/// **Note:** This Future won't resolve until all active connections, taken from it, +/// **Note:** This won't resolve until all active connections, taken from the poll, /// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct DisconnectPool { - pool_inner: Arc, - drop: Option>>, -} +pub(crate) async fn disconnect_pool(pool: Pool) -> Result<(), Error> { + let inner = pool.inner; + let drop = pool.drop; -impl DisconnectPool { - pub(crate) fn new(pool: Pool) -> Self { - Self { - pool_inner: pool.inner, - drop: Some(pool.drop), - } - } -} - -impl Future for DisconnectPool { - type Output = Result<(), Error>; + inner.close.store(true, atomic::Ordering::Release); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.pool_inner.close.store(true, atomic::Ordering::Release); - let mut exchange = self.pool_inner.exchange.lock().unwrap(); - exchange.spawn_futures_if_needed(&self.pool_inner); + let f = |cx: &mut Context| { + let mut exchange = inner.exchange.lock().unwrap(); + exchange.spawn_futures_if_needed(&inner); exchange.waiting.push(cx.waker().clone(), QUEUE_END_ID); - drop(exchange); + Poll::Ready(()) + }; + poll_fn(f).await; - if self.pool_inner.closed.load(atomic::Ordering::Acquire) { - Poll::Ready(Ok(())) - } else { - match self.drop.take() { - Some(drop) => match drop.send(None) { - Ok(_) => { - // Recycler is alive. Waiting for it to finish. - ready!(Box::pin(drop.closed()).as_mut().poll(cx)); - Poll::Ready(Ok(())) - } - Err(_) => { - // Recycler seem dead. No one will wake us. - Poll::Ready(Ok(())) - } - }, - None => Poll::Pending, - } - } + if !inner.closed.load(atomic::Ordering::Acquire) && drop.send(None).is_ok() { + // Recycler is alive. Wait for it to finish. + drop.closed().await; } + + Ok(()) } diff --git a/src/conn/pool/futures/mod.rs b/src/conn/pool/futures/mod.rs index 00842994..0ef2f698 100644 --- a/src/conn/pool/futures/mod.rs +++ b/src/conn/pool/futures/mod.rs @@ -6,8 +6,9 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. +pub use self::get_conn::GetConn; pub(super) use self::get_conn::GetConnInner; -pub use self::{disconnect_pool::DisconnectPool, get_conn::GetConn}; +pub(crate) use disconnect_pool::disconnect_pool; mod disconnect_pool; mod get_conn; diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index de685441..8690f15b 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -287,8 +287,8 @@ impl Pool { /// /// **Note:** This Future won't resolve until all active connections, taken from it, /// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error. - pub fn disconnect(self) -> DisconnectPool { - DisconnectPool::new(self) + pub async fn disconnect(self) -> Result<()> { + disconnect_pool(self).await } /// A way to return connection taken from a pool. diff --git a/src/lib.rs b/src/lib.rs index 4ad9d735..8060d837 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -558,7 +558,7 @@ pub use crate::connection_like::{Connection, ToConnectionResult}; /// Futures used in this crate pub mod futures { - pub use crate::conn::pool::futures::{DisconnectPool, GetConn}; + pub use crate::conn::pool::futures::GetConn; } /// Traits used in this crate diff --git a/tests/exports.rs b/tests/exports.rs index 6f9feef8..2e736f40 100644 --- a/tests/exports.rs +++ b/tests/exports.rs @@ -1,7 +1,7 @@ #[allow(unused_imports)] use mysql_async::{ consts, from_row, from_row_opt, from_value, from_value_opt, - futures::{DisconnectPool, GetConn}, + futures::GetConn, params, prelude::{ BatchQuery, FromRow, FromValue, GlobalHandler, Protocol, Query, Queryable, StatementLike,