Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## v4.0.0 - 2025-06-20

- The function passed to `bath.apply` must now return a `bath.Next(return)` value to
indicate whether the checked-out resource should be returned to the pool or
discarded. This allows users to control the lifecycle of resources more precisely.
For example, you can now shut down and dispose of pooled TCP sockets for which the
connection has been closed.

## v3.0.0 - 2025-06-14

Bath has been updated to use the new stable versions of `gleam/erlang` and
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@ pub fn main() {
// Use the pool. Shown here in a block to use `use`.
let assert Ok("Hello!") = {
use conn <- bath.apply(pool, 1000)

// Do stuff with the connection...
"Hello!"

// Return the connection to the pool, returning "Hello!" to the caller.
bath.keep()
|> bath.returning("Hello!")
}

// Close the pool.
Expand Down
2 changes: 1 addition & 1 deletion gleam.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name = "bath"
version = "3.0.0"
version = "4.0.0"
description = "A resource pool for Gleam!"
licences = ["MIT"]
repository = { type = "github", user = "Pevensie", repo = "bath" }
Expand Down
111 changes: 83 additions & 28 deletions src/bath.gleam
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import gleam/deque
import gleam/dict.{type Dict}
import gleam/erlang/process.{type Pid, type Subject}
import gleam/int
import gleam/list
import gleam/otp/actor
import gleam/otp/supervision
Expand Down Expand Up @@ -73,12 +74,12 @@ pub fn new(
)
}

/// Set the pool size. Defaults to 10.
/// Set the pool size. Defaults to 10. Will be clamped to a minimum of 1.
pub fn size(
builder builder: Builder(resource_type),
size size: Int,
) -> Builder(resource_type) {
Builder(..builder, size:)
Builder(..builder, size: int.max(size, 1))
}

/// Set a shutdown function to be run for each resource when the pool exits.
Expand Down Expand Up @@ -189,6 +190,36 @@ pub fn start(
Ok(Pool(started.data))
}

/// The type used to indicate what to do with a resource after use.
pub opaque type Next(return) {
/// Return the resource to the pool.
Keep(return)
/// Discard the resource, running the shutdown function on it.
Discard(return)
}

/// Instruct Bath to keep the checked out resource, returning it to the pool.
pub fn keep() -> Next(Nil) {
Keep(Nil)
}

/// Instruct Bath to discard the checked out resource, running the shutdown function on
/// it.
///
/// Discarded resources will be recreated lazily, regardless of the pool's creation
/// strategy.
pub fn discard() -> Next(Nil) {
Discard(Nil)
}

/// Return a value from a use of [`apply`](#apply).
pub fn returning(next: Next(old), value: new) -> Next(new) {
case next {
Keep(_) -> Keep(value)
Discard(_) -> Discard(value)
}
}

/// Checks out a resource from the pool, sending the caller Pid for the pool to
/// monitor in case the client dies. This allows the pool to create a new resource
/// later if required.
Expand All @@ -200,8 +231,13 @@ fn check_out(
process.call(pool.subject, timeout, CheckOut(_, caller:))
}

fn check_in(pool: Pool(resource_type), resource: resource_type, caller: Pid) {
process.send(pool.subject, CheckIn(resource:, caller:))
fn check_in(
pool: Pool(resource_type),
resource: resource_type,
caller: Pid,
next: Next(Nil),
) {
process.send(pool.subject, CheckIn(resource:, caller:, next:))
}

/// Check out a resource from the pool, apply the `next` function, then check
Expand All @@ -213,26 +249,36 @@ fn check_in(pool: Pool(resource_type), resource: resource_type, caller: Pid) {
/// |> bath.start(1000)
///
/// use resource <- bath.apply(pool, 1000)
///
/// // Do stuff with resource...
///
/// // Return the resource to the pool, returning "Hello!" to the caller.
/// bath.keep()
/// |> bath.returning("Hello!")
/// ```
pub fn apply(
pool pool: Pool(resource_type),
timeout timeout: Int,
next next: fn(resource_type) -> result_type,
next next: fn(resource_type) -> Next(result_type),
) -> Result(result_type, ApplyError) {
let self = process.self()
use resource <- result.try(check_out(pool, self, timeout))

let usage_result = next(resource)
let next_action = next(resource)

check_in(pool, resource, self)
let #(usage_result, next_action) = case next_action {
Keep(return) -> #(return, Keep(Nil))
Discard(return) -> #(return, Discard(Nil))
}

check_in(pool, resource, self, next_action)

Ok(usage_result)
}

/// Shut down the pool, calling the `shutdown_function` on each
/// Shut down the pool, calling the shutdown function on each
/// resource in the pool. Calling with `force` set to `True` will
/// force the shutdown, not calling the `shutdown_function` on any
/// force the shutdown, not calling the shutdown function on any
/// resources.
///
/// Will fail if there are still resources checked out, unless `force` is
Expand Down Expand Up @@ -279,7 +325,7 @@ type LiveResource(resource_type) {

/// A message sent to the pool actor.
pub opaque type Msg(resource_type) {
CheckIn(resource: resource_type, caller: Pid)
CheckIn(resource: resource_type, caller: Pid, next: Next(Nil))
CheckOut(reply_to: Subject(Result(resource_type, ApplyError)), caller: Pid)
PoolExit(process.ExitMessage)
CallerDown(process.Down)
Expand All @@ -288,7 +334,7 @@ pub opaque type Msg(resource_type) {

fn handle_pool_message(state: State(resource_type), msg: Msg(resource_type)) {
case msg {
CheckIn(resource:, caller:) -> {
CheckIn(resource:, caller:, next:) -> {
// If the checked-in process currently has a live resource, remove it from
// the live_resources dict
let caller_live_resource = dict.get(state.live_resources, caller)
Expand All @@ -301,14 +347,26 @@ fn handle_pool_message(state: State(resource_type), msg: Msg(resource_type)) {
Error(_) -> state.selector
}

let new_resources = deque.push_back(state.resources, resource)
let #(new_resources, current_size) = case next {
Keep(_) -> #(
deque.push_back(state.resources, resource),
state.current_size,
)
Discard(_) -> {
state.shutdown_resource(resource)
#(state.resources, state.current_size - 1)
}
}

actor.with_selector(
actor.continue(
State(..state, resources: new_resources, live_resources:, selector:),
),
selector,
State(
..state,
current_size:,
resources: new_resources,
live_resources:,
selector:,
)
|> actor.continue
|> actor.with_selector(selector)
}
CheckOut(reply_to:, caller:) -> {
// We always push to the back, so for FIFO, we pop front,
Expand Down Expand Up @@ -362,18 +420,15 @@ fn handle_pool_message(state: State(resource_type), msg: Msg(resource_type)) {
)

actor.send(reply_to, Ok(resource))
actor.with_selector(
actor.continue(
State(
..state,
resources: new_resources,
current_size: new_current_size,
selector:,
live_resources:,
),
),
selector,
State(
..state,
resources: new_resources,
current_size: new_current_size,
selector:,
live_resources:,
)
|> actor.continue
|> actor.with_selector(selector)
}
}
}
Expand Down
62 changes: 55 additions & 7 deletions test/bath_test.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,29 @@ pub fn lifecycle_test() {
Nil
})
|> bath.start(1000)
let assert Ok(20) = bath.apply(pool, 1000, fn(n) { n * 2 })
let assert Ok(20) =
bath.apply(pool, 1000, fn(n) { bath.keep() |> bath.returning(n * 2) })
let assert Ok(Nil) = bath.shutdown(pool, False, 1000)
}

pub fn empty_pool_fails_to_apply_test() {
let assert Ok(pool) =
bath.new(fn() { Ok(10) })
|> bath.size(0)
|> bath.size(1)
|> bath.start(1000)

process.spawn(fn() {
use _ <- bath.apply(pool, 1000)
process.sleep(1000)
bath.keep()
})

process.sleep(100)

let assert Error(bath.NoResourcesAvailable) =
bath.apply(pool, 1000, fn(_) { Nil })
let assert Ok(Nil) = bath.shutdown(pool, False, 1000)
bath.apply(pool, 1000, fn(_) { bath.keep() })

let assert Ok(Nil) = bath.shutdown(pool, True, 1000)
}

pub fn pool_has_correct_capacity_test() {
Expand All @@ -47,8 +58,8 @@ pub fn pool_has_correct_capacity_test() {
// Only one capacity, so attempting to check out another resource
// should fail
let assert Error(bath.NoResourcesAvailable) =
bath.apply(pool, 1000, fn(_) { Nil })
Nil
bath.apply(pool, 1000, fn(_) { bath.keep() })
bath.keep()
})
let assert Ok(Nil) = bath.shutdown(pool, False, 1000)
}
Expand All @@ -64,6 +75,8 @@ pub fn pool_has_correct_resources_test() {
// Check we have the right values
n
|> should.equal(10)

bath.keep()
})

let assert Ok(Nil) = bath.shutdown(pool, False, 1000)
Expand All @@ -89,7 +102,8 @@ pub fn pool_handles_caller_crash_test() {
logging.configure()

// Ensure the pool still has an available resource
let assert Ok(10) = bath.apply(pool, 1000, fn(r) { r })
let assert Ok(10) =
bath.apply(pool, 1000, fn(r) { bath.keep() |> bath.returning(r) })

let assert Ok(Nil) = bath.shutdown(pool, False, 1000)
}
Expand All @@ -116,6 +130,40 @@ pub fn shutdown_function_gets_called_test() {
let assert Ok("Shut down") = process.receive(self, 1000)
}

pub fn shutdown_function_gets_called_on_discard_test() {
let self = process.new_subject()

let assert Ok(pool) =
bath.new(fn() { Ok(10) })
|> bath.size(1)
|> bath.creation_strategy(bath.Eager)
|> bath.on_shutdown(fn(_) { process.send(self, "Shut down") })
|> bath.start(1000)

let assert Ok(_) = bath.apply(pool, 1000, fn(_) { bath.discard() })

let assert Ok("Shut down") = process.receive(self, 1000)

let assert Ok(Nil) = bath.shutdown(pool, False, 1000)
}

pub fn shutdown_function_doesnt_get_called_on_keep_test() {
let self = process.new_subject()

let assert Ok(pool) =
bath.new(fn() { Ok(10) })
|> bath.size(1)
|> bath.creation_strategy(bath.Eager)
|> bath.on_shutdown(fn(_) { process.send(self, "Shut down") })
|> bath.start(1000)

let assert Ok(_) = bath.apply(pool, 1000, fn(_) { bath.keep() })

let assert Error(Nil) = process.receive(self, 100)

let assert Ok(Nil) = bath.shutdown(pool, False, 1000)
}

// ----- Util tests ----- //

pub fn try_map_returning_succeeds_if_no_errors_test() {
Expand Down