From 0c8592c50269f7f2363b3558e306b93c70ab3e35 Mon Sep 17 00:00:00 2001 From: John Downey Date: Tue, 23 Dec 2025 09:56:12 -0600 Subject: [PATCH 1/4] Add blocking checkout with waiter queue Introduce apply_blocking that waits for resources when pool is exhausted instead of returning NoResourcesAvailable immediately. --- src/bath.gleam | 385 ++++++++++++++++++++++++++++++++++++++----- test/bath_test.gleam | 161 ++++++++++++++++++ 2 files changed, 501 insertions(+), 45 deletions(-) diff --git a/src/bath.gleam b/src/bath.gleam index aef941d..f627ed8 100644 --- a/src/bath.gleam +++ b/src/bath.gleam @@ -136,6 +136,8 @@ pub fn log_errors( pub type ApplyError { NoResourcesAvailable CheckOutResourceCreateError(error: String) + CheckoutTimedOut + PoolShuttingDown } /// An error returned when the resource pool fails to shut down. @@ -299,6 +301,73 @@ pub fn apply( Ok(usage_result) } +/// Like [`apply`](#apply), but blocks waiting for a resource if the pool is +/// exhausted instead of returning `NoResourcesAvailable` immediately. +/// +/// The timeout covers both waiting in the queue and the call to the pool actor. +/// If the timeout expires while waiting, `Error(CheckoutTimedOut)` is returned. +/// If the pool shuts down while waiting, `Error(PoolShuttingDown)` is returned. +/// +/// ```gleam +/// let assert Ok(pool) = +/// bath.new(fn() { Ok("Some pooled resource") }) +/// |> bath.size(1) +/// |> bath.start(1000) +/// +/// // This will block until a resource is available or timeout +/// use resource <- bath.apply_blocking(pool, 5000) +/// +/// // Do stuff with resource... +/// +/// bath.keep() +/// |> bath.returning("Hello!") +/// ``` +pub fn apply_blocking( + pool pool: process.Subject(Msg(resource_type)), + timeout timeout: Int, + next next: fn(resource_type) -> Next(result_type), +) -> Result(result_type, ApplyError) { + let self = process.self() + let checkout_result = blocking_checkout(pool, self, timeout) + + case checkout_result { + Error(err) -> Error(err) + Ok(resource) -> { + let next_action = next(resource) + + let #(usage_result, next_action) = case next_action { + Keep(return) -> #(return, Keep(Nil)) + Discard(return) -> #(return, Discard(Nil)) + } + + check_in(pool, resource, self, next_action) + + Ok(usage_result) + } + } +} + +/// Internal function to perform blocking checkout with graceful timeout handling. +/// Unlike process.call, this returns an error on timeout instead of panicking. +fn blocking_checkout( + pool: process.Subject(Msg(resource_type)), + caller: Pid, + timeout: Int, +) -> Result(resource_type, ApplyError) { + let reply_subject = process.new_subject() + process.send(pool, CheckOutBlocking(reply_to: reply_subject, caller:)) + + let selector = + process.new_selector() + |> process.select(reply_subject) + + case process.selector_receive(selector, timeout) { + Error(Nil) -> Error(CheckoutTimedOut) + Ok(Error(err)) -> Error(err) + Ok(Ok(resource)) -> Ok(resource) + } +} + /// Shut down the pool, calling the shutdown function on each /// resource in the pool. Calling with `force` set to `True` will /// force the shutdown, not calling the shutdown function on any @@ -332,6 +401,7 @@ pub opaque type State(resource_type) { resources: deque.Deque(resource_type), current_size: Int, live_resources: LiveResources(resource_type), + waiting: deque.Deque(WaitingRequest(resource_type)), selector: process.Selector(Msg(resource_type)), log_errors: Bool, ) @@ -344,12 +414,25 @@ type LiveResource(resource_type) { LiveResource(resource: resource_type, monitor: process.Monitor) } +type WaitingRequest(resource_type) { + WaitingRequest( + reply_to: Subject(Result(resource_type, ApplyError)), + caller: Pid, + monitor: process.Monitor, + ) +} + /// A message sent to the pool actor. pub opaque type Msg(resource_type) { CheckIn(resource: resource_type, caller: Pid, next: Next(Nil)) CheckOut(reply_to: Subject(Result(resource_type, ApplyError)), caller: Pid) + CheckOutBlocking( + reply_to: Subject(Result(resource_type, ApplyError)), + caller: Pid, + ) PoolExit(process.ExitMessage) CallerDown(process.Down) + WaiterDown(process.Down) Shutdown(reply_to: process.Subject(Result(Nil, ShutdownError)), force: Bool) } @@ -368,26 +451,90 @@ fn handle_pool_message(state: State(resource_type), msg: Msg(resource_type)) { Error(_) -> state.selector } - let #(new_resources, current_size) = case next { - Keep(_) -> #( - deque.push_back(state.resources, resource), - state.current_size, - ) + case next { + Keep(_) -> { + case deque.pop_front(state.waiting) { + Ok(#(waiter, new_waiting)) -> { + let selector = demonitor_process(selector, waiter.monitor) + let #(monitor, selector) = + monitor_process(selector, waiter.caller) + let live_resources = + dict.insert( + live_resources, + waiter.caller, + LiveResource(resource:, monitor:), + ) + actor.send(waiter.reply_to, Ok(resource)) + State(..state, live_resources:, waiting: new_waiting, selector:) + |> actor.continue + |> actor.with_selector(selector) + } + Error(_) -> { + State( + ..state, + resources: deque.push_back(state.resources, resource), + live_resources:, + selector:, + ) + |> actor.continue + |> actor.with_selector(selector) + } + } + } Discard(_) -> { state.shutdown_resource(resource) - #(state.resources, state.current_size - 1) + let current_size = state.current_size - 1 + case deque.pop_front(state.waiting) { + Ok(#(waiter, new_waiting)) -> { + case state.create_resource() { + Ok(new_resource) -> { + let selector = demonitor_process(selector, waiter.monitor) + let #(monitor, selector) = + monitor_process(selector, waiter.caller) + let live_resources = + dict.insert( + live_resources, + waiter.caller, + LiveResource(resource: new_resource, monitor:), + ) + actor.send(waiter.reply_to, Ok(new_resource)) + State( + ..state, + current_size: current_size + 1, + live_resources:, + waiting: new_waiting, + selector:, + ) + |> actor.continue + |> actor.with_selector(selector) + } + Error(err) -> { + log_resource_creation_error(state.log_errors, err) + let selector = demonitor_process(selector, waiter.monitor) + actor.send( + waiter.reply_to, + Error(CheckOutResourceCreateError(err)), + ) + State( + ..state, + current_size:, + live_resources:, + waiting: new_waiting, + selector:, + ) + |> actor.continue + |> actor.with_selector(selector) + } + } + } + Error(_) -> { + State(..state, current_size:, live_resources:, selector:) + |> actor.continue + |> actor.with_selector(selector) + } + } } } - - State( - ..state, - current_size:, - resources: new_resources, - live_resources:, - selector:, - ) - |> actor.continue - |> actor.with_selector(selector) } CheckOut(reply_to:, caller:) -> { // We always push to the back, so for FIFO, we pop front, @@ -453,6 +600,74 @@ fn handle_pool_message(state: State(resource_type), msg: Msg(resource_type)) { } } } + CheckOutBlocking(reply_to:, caller:) -> { + let get_result = case state.checkout_strategy { + FIFO -> deque.pop_front(state.resources) + LIFO -> deque.pop_back(state.resources) + } + + let resource_result = case get_result { + Ok(#(resource, new_resources)) -> + Ok(#(resource, new_resources, state.current_size)) + Error(_) -> { + case state.current_size < state.max_size { + True -> { + use resource <- result.try( + state.create_resource() + |> result.map_error(fn(err) { + log_resource_creation_error(state.log_errors, err) + CheckOutResourceCreateError(err) + }), + ) + Ok(#(resource, state.resources, state.current_size + 1)) + } + False -> Error(NoResourcesAvailable) + } + } + } + + case resource_result { + Error(NoResourcesAvailable) -> { + let monitor = process.monitor(caller) + let selector = + state.selector + |> process.select_specific_monitor(monitor, WaiterDown) + let waiting_request = WaitingRequest(reply_to:, caller:, monitor:) + State( + ..state, + waiting: deque.push_back(state.waiting, waiting_request), + selector:, + ) + |> actor.continue + |> actor.with_selector(selector) + } + Error(err) -> { + actor.send(reply_to, Error(err)) + actor.continue(state) + } + Ok(#(resource, new_resources, new_current_size)) -> { + let #(monitor, selector) = monitor_process(state.selector, caller) + + let live_resources = + dict.insert( + state.live_resources, + caller, + LiveResource(resource:, monitor:), + ) + + actor.send(reply_to, Ok(resource)) + State( + ..state, + resources: new_resources, + current_size: new_current_size, + selector:, + live_resources:, + ) + |> actor.continue + |> actor.with_selector(selector) + } + } + } PoolExit(exit_message) -> { // Don't clean up live resources, as they may be in use state.resources @@ -468,6 +683,13 @@ fn handle_pool_message(state: State(resource_type), msg: Msg(resource_type)) { } } Shutdown(reply_to:, force:) -> { + state.waiting + |> deque.to_list + |> list.each(fn(waiter) { + process.demonitor_process(waiter.monitor) + actor.send(waiter.reply_to, Error(PoolShuttingDown)) + }) + case dict.size(state.live_resources), force { // No live resource, shut down 0, _ -> { @@ -502,48 +724,120 @@ fn handle_pool_message(state: State(resource_type), msg: Msg(resource_type)) { // Demonitor the process let selector = demonitor_process(state.selector, live_resource.monitor) + let live_resources = + dict.delete(state.live_resources, process_down_pid) // Shutdown the old resource state.shutdown_resource(live_resource.resource) + let current_size = state.current_size - 1 - let #(new_resources, new_current_size) = case - state.creation_strategy - { - // If we create lazily, just decrement the current size - a new resource - // will be created when required - Lazy -> #(state.resources, state.current_size - 1) - // Otherwise, create a new resource, warning if resource creation fails - Eager -> { + case deque.pop_front(state.waiting) { + Ok(#(waiter, new_waiting)) -> { case state.create_resource() { - // Size hasn't changed - Ok(resource) -> #( - deque.push_back(state.resources, resource), - state.current_size, - ) - // Size has changed - Error(resource_create_error) -> { - log_resource_creation_error( - state.log_errors, - resource_create_error, + Ok(new_resource) -> { + let selector = demonitor_process(selector, waiter.monitor) + let #(monitor, selector) = + monitor_process(selector, waiter.caller) + let live_resources = + dict.insert( + live_resources, + waiter.caller, + LiveResource(resource: new_resource, monitor:), + ) + actor.send(waiter.reply_to, Ok(new_resource)) + State( + ..state, + current_size: current_size + 1, + live_resources:, + waiting: new_waiting, + selector:, + resources: state.resources, + ) + |> actor.continue + |> actor.with_selector(selector) + } + Error(err) -> { + log_resource_creation_error(state.log_errors, err) + let selector = demonitor_process(selector, waiter.monitor) + actor.send( + waiter.reply_to, + Error(CheckOutResourceCreateError(err)), + ) + State( + ..state, + current_size:, + live_resources:, + waiting: new_waiting, + selector:, + resources: state.resources, ) - #(state.resources, state.current_size) + |> actor.continue + |> actor.with_selector(selector) } } } - } + Error(_) -> { + let #(new_resources, new_current_size) = case + state.creation_strategy + { + // If we create lazily, just decrement the current size - a new resource + // will be created when required + Lazy -> #(state.resources, current_size) + // Otherwise, create a new resource, warning if resource creation fails + Eager -> { + case state.create_resource() { + // Size hasn't changed + Ok(resource) -> #( + deque.push_back(state.resources, resource), + current_size + 1, + ) + // Size has changed + Error(resource_create_error) -> { + log_resource_creation_error( + state.log_errors, + resource_create_error, + ) + #(state.resources, current_size) + } + } + } + } - State( - ..state, - resources: new_resources, - current_size: new_current_size, - selector:, - live_resources: dict.delete(state.live_resources, process_down_pid), - ) - |> actor.continue - |> actor.with_selector(selector) + State( + ..state, + resources: new_resources, + current_size: new_current_size, + selector:, + live_resources:, + ) + |> actor.continue + |> actor.with_selector(selector) + } + } } } } + WaiterDown(process_down) -> { + let assert process.ProcessDown(pid: process_down_pid, ..) = process_down + + let #(new_waiting, monitors_to_remove) = + state.waiting + |> deque.to_list + |> list.partition(fn(waiter) { waiter.caller != process_down_pid }) + |> fn(partitioned) { + let #(kept, removed) = partitioned + #(deque.from_list(kept), list.map(removed, fn(w) { w.monitor })) + } + + let selector = + list.fold(monitors_to_remove, state.selector, fn(sel, mon) { + demonitor_process(sel, mon) + }) + + State(..state, waiting: new_waiting, selector:) + |> actor.continue + |> actor.with_selector(selector) + } } } @@ -598,6 +892,7 @@ fn actor_builder( checkout_strategy: builder.checkout_strategy, creation_strategy: builder.creation_strategy, live_resources: dict.new(), + waiting: deque.new(), selector:, current_size:, max_size: builder.size, diff --git a/test/bath_test.gleam b/test/bath_test.gleam index 57998a2..b2546c3 100644 --- a/test/bath_test.gleam +++ b/test/bath_test.gleam @@ -184,6 +184,167 @@ pub fn shutdown_function_doesnt_get_called_on_keep_test() { let assert Ok(Nil) = bath.shutdown(pool, False, 1000) } +pub fn apply_blocking_waits_for_resource_test() { + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.size(1) + |> bath.start(1000) + + let result_subject = process.new_subject() + + // First caller takes the only resource + process.spawn(fn() { + use _ <- bath.apply(pool, 5000) + process.sleep(200) + bath.keep() + }) + + process.sleep(50) + + // Second caller uses apply_blocking - should wait and succeed + process.spawn(fn() { + let result = + bath.apply_blocking(pool, 5000, fn(r) { bath.keep() |> bath.returning(r) }) + process.send(result_subject, result) + }) + + let assert Ok(Ok(10)) = process.receive(result_subject, 1000) + let assert Ok(Nil) = bath.shutdown(pool, False, 1000) +} + +pub fn apply_blocking_timeout_returns_error_test() { + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.size(1) + |> bath.start(1000) + + // Take the only resource and hold it + process.spawn(fn() { + use _ <- bath.apply(pool, 5000) + process.sleep(1000) + bath.keep() + }) + + process.sleep(50) + + // Try to get resource with short timeout - should timeout + let assert Error(bath.CheckoutTimedOut) = + bath.apply_blocking(pool, 100, fn(_) { bath.keep() }) + + let assert Ok(Nil) = bath.shutdown(pool, True, 1000) +} + +pub fn apply_blocking_fifo_order_test() { + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.size(1) + |> bath.start(1000) + + let order_subject = process.new_subject() + + // First caller takes the only resource + process.spawn(fn() { + use _ <- bath.apply(pool, 5000) + process.sleep(300) + bath.keep() + }) + + process.sleep(50) + + // Second and third callers queue up + process.spawn(fn() { + use _ <- bath.apply_blocking(pool, 5000) + process.send(order_subject, "second") + bath.keep() + }) + + process.sleep(50) + + process.spawn(fn() { + use _ <- bath.apply_blocking(pool, 5000) + process.send(order_subject, "third") + bath.keep() + }) + + let assert Ok("second") = process.receive(order_subject, 2000) + let assert Ok("third") = process.receive(order_subject, 2000) + let assert Ok(Nil) = bath.shutdown(pool, False, 1000) +} + +pub fn apply_blocking_shutdown_rejects_waiters_test() { + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.size(1) + |> bath.start(1000) + + let result_subject = process.new_subject() + + // Take the only resource + process.spawn(fn() { + use _ <- bath.apply(pool, 5000) + process.sleep(500) + bath.keep() + }) + + process.sleep(50) + + // Queue up a waiter + process.spawn(fn() { + let result = bath.apply_blocking(pool, 5000, fn(_) { bath.keep() }) + process.send(result_subject, result) + }) + + process.sleep(50) + + // Force shutdown while waiter is waiting + let assert Ok(Nil) = bath.shutdown(pool, True, 1000) + + // Waiter should receive PoolShuttingDown error + let assert Ok(Error(bath.PoolShuttingDown)) = + process.receive(result_subject, 1000) +} + +pub fn apply_blocking_waiter_crash_removes_from_queue_test() { + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.size(1) + |> bath.start(1000) + + let result_subject = process.new_subject() + + logging.set_level(logging.Critical) + + // Take the only resource + process.spawn(fn() { + use _ <- bath.apply(pool, 5000) + process.sleep(300) + bath.keep() + }) + + process.sleep(50) + + // First waiter will crash + process.spawn_unlinked(fn() { + use _ <- bath.apply_blocking(pool, 5000) + panic as "Waiter crashed!" + }) + + process.sleep(50) + + // Second waiter should still get the resource + process.spawn(fn() { + let result = + bath.apply_blocking(pool, 5000, fn(r) { bath.keep() |> bath.returning(r) }) + process.send(result_subject, result) + }) + + logging.configure() + + // Second waiter should succeed + let assert Ok(Ok(10)) = process.receive(result_subject, 2000) + let assert Ok(Nil) = bath.shutdown(pool, False, 1000) +} + // ----- Util tests ----- // pub fn try_map_returning_succeeds_if_no_errors_test() { From 11221b84ad7920d9887b9fd0a6f225930ad7998f Mon Sep 17 00:00:00 2001 From: John Downey Date: Sat, 27 Dec 2025 21:40:04 -0600 Subject: [PATCH 2/4] Panic on timeout in apply_blocking to fix race condition Matches process.call behavior - caller dies on timeout, triggering cleanup via WaiterDown/CallerDown. Removes CheckoutTimedOut error. --- src/bath.gleam | 74 ++++++++++++++++++++++---------------------- test/bath_test.gleam | 22 ++++++++++--- 2 files changed, 55 insertions(+), 41 deletions(-) diff --git a/src/bath.gleam b/src/bath.gleam index f627ed8..423b4a4 100644 --- a/src/bath.gleam +++ b/src/bath.gleam @@ -136,7 +136,6 @@ pub fn log_errors( pub type ApplyError { NoResourcesAvailable CheckOutResourceCreateError(error: String) - CheckoutTimedOut PoolShuttingDown } @@ -304,17 +303,42 @@ pub fn apply( /// Like [`apply`](#apply), but blocks waiting for a resource if the pool is /// exhausted instead of returning `NoResourcesAvailable` immediately. /// -/// The timeout covers both waiting in the queue and the call to the pool actor. -/// If the timeout expires while waiting, `Error(CheckoutTimedOut)` is returned. +/// The timeout covers both waiting in the queue and any internal pool operations. +/// +/// ## Panics +/// +/// This function will panic if the timeout expires before a resource becomes +/// available. +/// +/// If you need to handle timeouts gracefully, you can: +/// - Use this function within a supervised process (let it crash, supervisor restarts) +/// - Wrap the call in a separate process/task that you can monitor +/// - Use the [`exception`](https://hexdocs.pm/exception) library's `rescue` +/// function to convert the panic into a `Result` +/// +/// ```gleam +/// import exception +/// +/// let result = exception.rescue(fn() { +/// bath.apply_blocking(pool, 1000, fn(resource) { +/// // use resource... +/// bath.keep() +/// }) +/// }) +/// // result: Result(Result(a, ApplyError), Exception) +/// ``` +/// /// If the pool shuts down while waiting, `Error(PoolShuttingDown)` is returned. /// +/// ## Example +/// /// ```gleam /// let assert Ok(pool) = /// bath.new(fn() { Ok("Some pooled resource") }) /// |> bath.size(1) /// |> bath.start(1000) /// -/// // This will block until a resource is available or timeout +/// // This will block until a resource is available or panic on timeout /// use resource <- bath.apply_blocking(pool, 5000) /// /// // Do stuff with resource... @@ -328,44 +352,20 @@ pub fn apply_blocking( next next: fn(resource_type) -> Next(result_type), ) -> Result(result_type, ApplyError) { let self = process.self() - let checkout_result = blocking_checkout(pool, self, timeout) - - case checkout_result { - Error(err) -> Error(err) - Ok(resource) -> { - let next_action = next(resource) - - let #(usage_result, next_action) = case next_action { - Keep(return) -> #(return, Keep(Nil)) - Discard(return) -> #(return, Discard(Nil)) - } + use resource <- result.try( + process.call(pool, timeout, CheckOutBlocking(_, caller: self)), + ) - check_in(pool, resource, self, next_action) + let next_action = next(resource) - Ok(usage_result) - } + let #(usage_result, next_action) = case next_action { + Keep(return) -> #(return, Keep(Nil)) + Discard(return) -> #(return, Discard(Nil)) } -} - -/// Internal function to perform blocking checkout with graceful timeout handling. -/// Unlike process.call, this returns an error on timeout instead of panicking. -fn blocking_checkout( - pool: process.Subject(Msg(resource_type)), - caller: Pid, - timeout: Int, -) -> Result(resource_type, ApplyError) { - let reply_subject = process.new_subject() - process.send(pool, CheckOutBlocking(reply_to: reply_subject, caller:)) - let selector = - process.new_selector() - |> process.select(reply_subject) + check_in(pool, resource, self, next_action) - case process.selector_receive(selector, timeout) { - Error(Nil) -> Error(CheckoutTimedOut) - Ok(Error(err)) -> Error(err) - Ok(Ok(resource)) -> Ok(resource) - } + Ok(usage_result) } /// Shut down the pool, calling the shutdown function on each diff --git a/test/bath_test.gleam b/test/bath_test.gleam index b2546c3..b7ad9f0 100644 --- a/test/bath_test.gleam +++ b/test/bath_test.gleam @@ -212,12 +212,14 @@ pub fn apply_blocking_waits_for_resource_test() { let assert Ok(Nil) = bath.shutdown(pool, False, 1000) } -pub fn apply_blocking_timeout_returns_error_test() { +pub fn apply_blocking_timeout_panics_test() { let assert Ok(pool) = bath.new(fn() { Ok(10) }) |> bath.size(1) |> bath.start(1000) + let result_subject = process.new_subject() + // Take the only resource and hold it process.spawn(fn() { use _ <- bath.apply(pool, 5000) @@ -227,9 +229,21 @@ pub fn apply_blocking_timeout_returns_error_test() { process.sleep(50) - // Try to get resource with short timeout - should timeout - let assert Error(bath.CheckoutTimedOut) = - bath.apply_blocking(pool, 100, fn(_) { bath.keep() }) + // Try to get resource with short timeout in separate process + // It should panic (process dies) + let pid = + process.spawn_unlinked(fn() { + bath.apply_blocking(pool, 100, fn(_) { bath.keep() }) + |> Ok + |> process.send(result_subject, _) + }) + + let monitor = process.monitor(pid) + + let assert Ok(process.ProcessDown(..)) = + process.new_selector() + |> process.select_specific_monitor(monitor, fn(down) { down }) + |> process.selector_receive(500) let assert Ok(Nil) = bath.shutdown(pool, True, 1000) } From dcd81c268a614304de057c539f8b5eca0fb2f8ae Mon Sep 17 00:00:00 2001 From: John Downey Date: Mon, 29 Dec 2025 07:34:29 -0600 Subject: [PATCH 3/4] Clarify pool size comments and add blocking strategy tests --- src/bath.gleam | 4 +-- test/bath_test.gleam | 69 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/src/bath.gleam b/src/bath.gleam index 423b4a4..9163d72 100644 --- a/src/bath.gleam +++ b/src/bath.gleam @@ -786,12 +786,12 @@ fn handle_pool_message(state: State(resource_type), msg: Msg(resource_type)) { // Otherwise, create a new resource, warning if resource creation fails Eager -> { case state.create_resource() { - // Size hasn't changed + // Resource replaced: +1 offsets earlier decrement, net pool size unchanged Ok(resource) -> #( deque.push_back(state.resources, resource), current_size + 1, ) - // Size has changed + // Resource lost: pool size decreased by 1 Error(resource_create_error) -> { log_resource_creation_error( state.log_errors, diff --git a/test/bath_test.gleam b/test/bath_test.gleam index b7ad9f0..1c09627 100644 --- a/test/bath_test.gleam +++ b/test/bath_test.gleam @@ -359,6 +359,75 @@ pub fn apply_blocking_waiter_crash_removes_from_queue_test() { let assert Ok(Nil) = bath.shutdown(pool, False, 1000) } +pub fn lazy_defers_resource_creation_on_crash_test() { + // Track how many times create_resource is called + let counter = process.new_subject() + + let assert Ok(pool) = + bath.new(fn() { + process.send(counter, Nil) + Ok(10) + }) + |> bath.size(1) + |> bath.creation_strategy(bath.Lazy) + |> bath.start(1000) + + // Lazy: no resource created at start + let assert Error(Nil) = process.receive(counter, 100) + + // Checkout creates resource on demand + process.spawn_unlinked(fn() { + use _ <- bath.apply(pool, 5000) + process.sleep(100) + panic as "Holder crashed" + }) + + // First creation happens on checkout + let assert Ok(Nil) = process.receive(counter, 500) + + // Wait for crash to be processed + process.sleep(200) + + // Lazy should NOT create replacement when no waiters + let assert Error(Nil) = process.receive(counter, 50) + let assert Ok(Nil) = bath.shutdown(pool, True, 1000) +} + +pub fn eager_replaces_resource_on_crash_test() { + // Track how many times create_resource is called + let counter = process.new_subject() + + let assert Ok(pool) = + bath.new(fn() { + process.send(counter, Nil) + Ok(10) + }) + |> bath.size(1) + |> bath.creation_strategy(bath.Eager) + |> bath.start(1000) + + // Eager: resource created at start + let assert Ok(Nil) = process.receive(counter, 500) + + // Checkout uses existing resource (no new creation) + process.spawn_unlinked(fn() { + use _ <- bath.apply(pool, 5000) + process.sleep(200) + panic as "Holder crashed" + }) + + // Wait for checkout to complete, then verify no new creation yet + process.sleep(100) + let assert Error(Nil) = process.receive(counter, 50) + + // Wait for crash to be processed + process.sleep(100) + + // Eager SHOULD create replacement when no waiters + let assert Ok(Nil) = process.receive(counter, 500) + let assert Ok(Nil) = bath.shutdown(pool, True, 1000) +} + // ----- Util tests ----- // pub fn try_map_returning_succeeds_if_no_errors_test() { From e2fae451be2d3f9ce19bba6c0922a6efc9ad8df7 Mon Sep 17 00:00:00 2001 From: John Downey Date: Thu, 1 Jan 2026 12:53:22 -0500 Subject: [PATCH 4/4] Move current_size calculations back into each branch --- src/bath.gleam | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/bath.gleam b/src/bath.gleam index 9163d72..7ebc315 100644 --- a/src/bath.gleam +++ b/src/bath.gleam @@ -729,7 +729,6 @@ fn handle_pool_message(state: State(resource_type), msg: Msg(resource_type)) { // Shutdown the old resource state.shutdown_resource(live_resource.resource) - let current_size = state.current_size - 1 case deque.pop_front(state.waiting) { Ok(#(waiter, new_waiting)) -> { @@ -747,7 +746,7 @@ fn handle_pool_message(state: State(resource_type), msg: Msg(resource_type)) { actor.send(waiter.reply_to, Ok(new_resource)) State( ..state, - current_size: current_size + 1, + current_size: state.current_size, live_resources:, waiting: new_waiting, selector:, @@ -765,7 +764,7 @@ fn handle_pool_message(state: State(resource_type), msg: Msg(resource_type)) { ) State( ..state, - current_size:, + current_size: state.current_size - 1, live_resources:, waiting: new_waiting, selector:, @@ -782,22 +781,22 @@ fn handle_pool_message(state: State(resource_type), msg: Msg(resource_type)) { { // If we create lazily, just decrement the current size - a new resource // will be created when required - Lazy -> #(state.resources, current_size) + Lazy -> #(state.resources, state.current_size - 1) // Otherwise, create a new resource, warning if resource creation fails Eager -> { case state.create_resource() { - // Resource replaced: +1 offsets earlier decrement, net pool size unchanged + // Size hasn't changed Ok(resource) -> #( deque.push_back(state.resources, resource), - current_size + 1, + state.current_size, ) - // Resource lost: pool size decreased by 1 + // Size has changed Error(resource_create_error) -> { log_resource_creation_error( state.log_errors, resource_create_error, ) - #(state.resources, current_size) + #(state.resources, state.current_size - 1) } } }