From 2d3a9eafc0d7c8f0dec3e99b41c409d3fb220ee1 Mon Sep 17 00:00:00 2001 From: Isaac Harris-Holt Date: Fri, 20 Jun 2025 14:28:13 +0100 Subject: [PATCH] require `bath.apply` callback to return a `bath.Next` value --- CHANGELOG.md | 8 ++++ README.md | 6 ++- gleam.toml | 2 +- src/bath.gleam | 111 ++++++++++++++++++++++++++++++++----------- test/bath_test.gleam | 62 +++++++++++++++++++++--- 5 files changed, 152 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d1ab72e..5a79c52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## v4.0.0 - 2025-06-20 + +- The function passed to `bath.apply` must now return a `bath.Next(return)` value to + indicate whether the checked-out resource should be returned to the pool or + discarded. This allows users to control the lifecycle of resources more precisely. + For example, you can now shut down and dispose of pooled TCP sockets for which the + connection has been closed. + ## v3.0.0 - 2025-06-14 Bath has been updated to use the new stable versions of `gleam/erlang` and diff --git a/README.md b/README.md index 5bcc3a1..d40c3bd 100644 --- a/README.md +++ b/README.md @@ -44,8 +44,12 @@ pub fn main() { // Use the pool. Shown here in a block to use `use`. let assert Ok("Hello!") = { use conn <- bath.apply(pool, 1000) + // Do stuff with the connection... - "Hello!" + + // Return the connection to the pool, returning "Hello!" to the caller. + bath.keep() + |> bath.returning("Hello!") } // Close the pool. diff --git a/gleam.toml b/gleam.toml index 0a480c8..83e0e7a 100644 --- a/gleam.toml +++ b/gleam.toml @@ -1,5 +1,5 @@ name = "bath" -version = "3.0.0" +version = "4.0.0" description = "A resource pool for Gleam!" licences = ["MIT"] repository = { type = "github", user = "Pevensie", repo = "bath" } diff --git a/src/bath.gleam b/src/bath.gleam index cfacf0c..87b37ad 100644 --- a/src/bath.gleam +++ b/src/bath.gleam @@ -1,6 +1,7 @@ import gleam/deque import gleam/dict.{type Dict} import gleam/erlang/process.{type Pid, type Subject} +import gleam/int import gleam/list import gleam/otp/actor import gleam/otp/supervision @@ -73,12 +74,12 @@ pub fn new( ) } -/// Set the pool size. Defaults to 10. +/// Set the pool size. Defaults to 10. Will be clamped to a minimum of 1. pub fn size( builder builder: Builder(resource_type), size size: Int, ) -> Builder(resource_type) { - Builder(..builder, size:) + Builder(..builder, size: int.max(size, 1)) } /// Set a shutdown function to be run for each resource when the pool exits. @@ -189,6 +190,36 @@ pub fn start( Ok(Pool(started.data)) } +/// The type used to indicate what to do with a resource after use. +pub opaque type Next(return) { + /// Return the resource to the pool. + Keep(return) + /// Discard the resource, running the shutdown function on it. + Discard(return) +} + +/// Instruct Bath to keep the checked out resource, returning it to the pool. +pub fn keep() -> Next(Nil) { + Keep(Nil) +} + +/// Instruct Bath to discard the checked out resource, running the shutdown function on +/// it. +/// +/// Discarded resources will be recreated lazily, regardless of the pool's creation +/// strategy. +pub fn discard() -> Next(Nil) { + Discard(Nil) +} + +/// Return a value from a use of [`apply`](#apply). +pub fn returning(next: Next(old), value: new) -> Next(new) { + case next { + Keep(_) -> Keep(value) + Discard(_) -> Discard(value) + } +} + /// Checks out a resource from the pool, sending the caller Pid for the pool to /// monitor in case the client dies. This allows the pool to create a new resource /// later if required. @@ -200,8 +231,13 @@ fn check_out( process.call(pool.subject, timeout, CheckOut(_, caller:)) } -fn check_in(pool: Pool(resource_type), resource: resource_type, caller: Pid) { - process.send(pool.subject, CheckIn(resource:, caller:)) +fn check_in( + pool: Pool(resource_type), + resource: resource_type, + caller: Pid, + next: Next(Nil), +) { + process.send(pool.subject, CheckIn(resource:, caller:, next:)) } /// Check out a resource from the pool, apply the `next` function, then check @@ -213,26 +249,36 @@ fn check_in(pool: Pool(resource_type), resource: resource_type, caller: Pid) { /// |> bath.start(1000) /// /// use resource <- bath.apply(pool, 1000) +/// /// // Do stuff with resource... +/// +/// // Return the resource to the pool, returning "Hello!" to the caller. +/// bath.keep() +/// |> bath.returning("Hello!") /// ``` pub fn apply( pool pool: Pool(resource_type), timeout timeout: Int, - next next: fn(resource_type) -> result_type, + next next: fn(resource_type) -> Next(result_type), ) -> Result(result_type, ApplyError) { let self = process.self() use resource <- result.try(check_out(pool, self, timeout)) - let usage_result = next(resource) + let next_action = next(resource) - check_in(pool, resource, self) + let #(usage_result, next_action) = case next_action { + Keep(return) -> #(return, Keep(Nil)) + Discard(return) -> #(return, Discard(Nil)) + } + + check_in(pool, resource, self, next_action) Ok(usage_result) } -/// Shut down the pool, calling the `shutdown_function` on each +/// Shut down the pool, calling the shutdown function on each /// resource in the pool. Calling with `force` set to `True` will -/// force the shutdown, not calling the `shutdown_function` on any +/// force the shutdown, not calling the shutdown function on any /// resources. /// /// Will fail if there are still resources checked out, unless `force` is @@ -279,7 +325,7 @@ type LiveResource(resource_type) { /// A message sent to the pool actor. pub opaque type Msg(resource_type) { - CheckIn(resource: resource_type, caller: Pid) + CheckIn(resource: resource_type, caller: Pid, next: Next(Nil)) CheckOut(reply_to: Subject(Result(resource_type, ApplyError)), caller: Pid) PoolExit(process.ExitMessage) CallerDown(process.Down) @@ -288,7 +334,7 @@ pub opaque type Msg(resource_type) { fn handle_pool_message(state: State(resource_type), msg: Msg(resource_type)) { case msg { - CheckIn(resource:, caller:) -> { + CheckIn(resource:, caller:, next:) -> { // If the checked-in process currently has a live resource, remove it from // the live_resources dict let caller_live_resource = dict.get(state.live_resources, caller) @@ -301,14 +347,26 @@ fn handle_pool_message(state: State(resource_type), msg: Msg(resource_type)) { Error(_) -> state.selector } - let new_resources = deque.push_back(state.resources, resource) + let #(new_resources, current_size) = case next { + Keep(_) -> #( + deque.push_back(state.resources, resource), + state.current_size, + ) + Discard(_) -> { + state.shutdown_resource(resource) + #(state.resources, state.current_size - 1) + } + } - actor.with_selector( - actor.continue( - State(..state, resources: new_resources, live_resources:, selector:), - ), - selector, + State( + ..state, + current_size:, + resources: new_resources, + live_resources:, + selector:, ) + |> actor.continue + |> actor.with_selector(selector) } CheckOut(reply_to:, caller:) -> { // We always push to the back, so for FIFO, we pop front, @@ -362,18 +420,15 @@ fn handle_pool_message(state: State(resource_type), msg: Msg(resource_type)) { ) actor.send(reply_to, Ok(resource)) - actor.with_selector( - actor.continue( - State( - ..state, - resources: new_resources, - current_size: new_current_size, - selector:, - live_resources:, - ), - ), - selector, + State( + ..state, + resources: new_resources, + current_size: new_current_size, + selector:, + live_resources:, ) + |> actor.continue + |> actor.with_selector(selector) } } } diff --git a/test/bath_test.gleam b/test/bath_test.gleam index d02473d..d563d2d 100644 --- a/test/bath_test.gleam +++ b/test/bath_test.gleam @@ -23,18 +23,29 @@ pub fn lifecycle_test() { Nil }) |> bath.start(1000) - let assert Ok(20) = bath.apply(pool, 1000, fn(n) { n * 2 }) + let assert Ok(20) = + bath.apply(pool, 1000, fn(n) { bath.keep() |> bath.returning(n * 2) }) let assert Ok(Nil) = bath.shutdown(pool, False, 1000) } pub fn empty_pool_fails_to_apply_test() { let assert Ok(pool) = bath.new(fn() { Ok(10) }) - |> bath.size(0) + |> bath.size(1) |> bath.start(1000) + + process.spawn(fn() { + use _ <- bath.apply(pool, 1000) + process.sleep(1000) + bath.keep() + }) + + process.sleep(100) + let assert Error(bath.NoResourcesAvailable) = - bath.apply(pool, 1000, fn(_) { Nil }) - let assert Ok(Nil) = bath.shutdown(pool, False, 1000) + bath.apply(pool, 1000, fn(_) { bath.keep() }) + + let assert Ok(Nil) = bath.shutdown(pool, True, 1000) } pub fn pool_has_correct_capacity_test() { @@ -47,8 +58,8 @@ pub fn pool_has_correct_capacity_test() { // Only one capacity, so attempting to check out another resource // should fail let assert Error(bath.NoResourcesAvailable) = - bath.apply(pool, 1000, fn(_) { Nil }) - Nil + bath.apply(pool, 1000, fn(_) { bath.keep() }) + bath.keep() }) let assert Ok(Nil) = bath.shutdown(pool, False, 1000) } @@ -64,6 +75,8 @@ pub fn pool_has_correct_resources_test() { // Check we have the right values n |> should.equal(10) + + bath.keep() }) let assert Ok(Nil) = bath.shutdown(pool, False, 1000) @@ -89,7 +102,8 @@ pub fn pool_handles_caller_crash_test() { logging.configure() // Ensure the pool still has an available resource - let assert Ok(10) = bath.apply(pool, 1000, fn(r) { r }) + let assert Ok(10) = + bath.apply(pool, 1000, fn(r) { bath.keep() |> bath.returning(r) }) let assert Ok(Nil) = bath.shutdown(pool, False, 1000) } @@ -116,6 +130,40 @@ pub fn shutdown_function_gets_called_test() { let assert Ok("Shut down") = process.receive(self, 1000) } +pub fn shutdown_function_gets_called_on_discard_test() { + let self = process.new_subject() + + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.size(1) + |> bath.creation_strategy(bath.Eager) + |> bath.on_shutdown(fn(_) { process.send(self, "Shut down") }) + |> bath.start(1000) + + let assert Ok(_) = bath.apply(pool, 1000, fn(_) { bath.discard() }) + + let assert Ok("Shut down") = process.receive(self, 1000) + + let assert Ok(Nil) = bath.shutdown(pool, False, 1000) +} + +pub fn shutdown_function_doesnt_get_called_on_keep_test() { + let self = process.new_subject() + + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.size(1) + |> bath.creation_strategy(bath.Eager) + |> bath.on_shutdown(fn(_) { process.send(self, "Shut down") }) + |> bath.start(1000) + + let assert Ok(_) = bath.apply(pool, 1000, fn(_) { bath.keep() }) + + let assert Error(Nil) = process.receive(self, 100) + + let assert Ok(Nil) = bath.shutdown(pool, False, 1000) +} + // ----- Util tests ----- // pub fn try_map_returning_succeeds_if_no_errors_test() {