Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## v5.0.0 - 2025-06-22

- Remove the `Pool` type in favour of `process.Subject(bath.Msg(msg))`. It was just a
wrapper, anyway.
- Allow pools to be named, avoiding the previous dance required to start a supervised
pool.
- This changes the signature of `bath.supervised` and removes the need for
`bath.supervised_map`.

## v4.1.0 - 2025-06-21

- Added `bath.supervised_map` to create a supervised pool of resources while mapping
Expand Down
15 changes: 6 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,17 @@ import fake_db
import gleam/otp/static_supervisor as supervisor

pub fn main() {
// Create a subject to receive the pool handler once the supervision tree has been
// started. Use a named subject to make sure we can always receive the pool handler,
// even if our original process crashes.
let pool_receiver_name = process.new_name("bath_pool_receiver")
let assert Ok(_) = process.register(process.self(), pool_receiver_name)

let pool_receiver = process.named_subject(pool_receiver_name)
// Create a name to interact with the pool once it's started under the
// static supervisor.
let pool_name = process.new_name("bath_pool")

// Define a pool of 10 connections to some fictional database, and create a child
// spec to allow it to be supervised.
let bath_child_spec =
bath.new(fn() { fake_db.get_conn() })
|> bath.size(10)
|> bath.supervised(pool_receiver, 1000)
|> bath.name(pool_name)
|> bath.supervised(1000)

// Start the pool under a supervisor
let assert Ok(_started) =
Expand All @@ -45,7 +42,7 @@ pub fn main() {
|> supervisor.start

// Receive the pool handle now that it's started
let assert Ok(pool) = process.receive(pool_receiver, 1000)
let pool = process.named_subject(pool_name)

// Use the pool. Shown here in a block to use `use`.
let assert Ok("Hello!") = {
Expand Down
2 changes: 1 addition & 1 deletion gleam.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name = "bath"
version = "4.1.0"
version = "5.0.0"
description = "A resource pool for Gleam!"
licences = ["MIT"]
repository = { type = "github", user = "Pevensie", repo = "bath" }
Expand Down
147 changes: 71 additions & 76 deletions src/bath.gleam
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import gleam/deque
import gleam/dict.{type Dict}
import gleam/erlang/process.{type Pid, type Subject}
import gleam/function
import gleam/int
import gleam/list
import gleam/option
import gleam/otp/actor
import gleam/otp/supervision
import gleam/result
Expand All @@ -29,6 +29,7 @@ pub type CreationStrategy {
/// Configuration for a [`Pool`](#Pool).
pub opaque type Builder(resource_type) {
Builder(
name: option.Option(process.Name(Msg(resource_type))),
size: Int,
create_resource: fn() -> Result(resource_type, String),
shutdown_resource: fn(resource_type) -> Nil,
Expand Down Expand Up @@ -57,6 +58,7 @@ pub opaque type Builder(resource_type) {
///
/// | Config | Default |
/// |--------|---------|
/// | `name` | `option.None` |
/// | `size` | 10 |
/// | `shutdown_resource` | `fn(_resource) { Nil }` |
/// | `checkout_strategy` | `FIFO` |
Expand All @@ -66,6 +68,7 @@ pub fn new(
resource create_resource: fn() -> Result(resource_type, String),
) -> Builder(resource_type) {
Builder(
name: option.None,
size: 10,
create_resource:,
shutdown_resource: fn(_) { Nil },
Expand All @@ -83,6 +86,17 @@ pub fn size(
Builder(..builder, size: int.max(size, 1))
}

/// Set the name for the pool process. Defaults to `None`.
///
/// You will need to provide a name if you plan on using the pool under a static
/// supervisor.
pub fn name(
builder builder: Builder(resource_type),
name name: process.Name(Msg(resource_type)),
) -> Builder(resource_type) {
Builder(..builder, name: option.Some(name))
}

/// Set a shutdown function to be run for each resource when the pool exits.
pub fn on_shutdown(
builder builder: Builder(resource_type),
Expand Down Expand Up @@ -133,8 +147,8 @@ pub type ShutdownError {
/// Return the [`ChildSpecification`](https://hexdocs.pm/gleam_otp/gleam/otp/supervision.html#ChildSpecification)
/// for creating a supervised resource pool.
///
/// You must provide a selector to receive the [`Pool`](#Pool) value representing the
/// pool once it has started.
/// In order to use a supervised pool, your pool _must_ be named, otherwise you will
/// not be able to send messages to your pool. See the [`name`](#name) function.
///
/// ## Example
///
Expand All @@ -144,53 +158,31 @@ pub type ShutdownError {
/// import gleam/otp/static_supervisor as supervisor
///
/// fn main() {
/// // Create a subject to receive the pool handler once the supervision tree has been
/// // started. Use a named subject to make sure we can always receive the pool handler,
/// // even if our original process crashes.
/// let pool_receiver_name = process.new_name("bath_pool_receiver")
/// let assert Ok(_) = process.register(process.self(), pool_receiver_name)
///
/// let pool_receiver = process.named_subject(pool_receiver_name)
/// // Create a name to interact with the pool once it's started under the
/// // static supervisor.
/// let pool_name = process.new_name("bath_pool")
///
/// let assert Ok(_started) =
/// supervisor.new(supervisor.OneForOne)
/// |> supervisor.add(
/// bath.new(create_resource)
/// |> bath.supervised(pool_receiver, 1000)
/// |> bath.name(pool_name)
/// |> bath.supervised(1000)
/// )
/// |> supervisor.start
///
/// let assert Ok(pool) =
/// process.receive(pool_receiver)
/// let pool = process.named_subject(pool_name)
///
/// // Do more stuff...
/// }
/// ```
pub fn supervised(
builder builder: Builder(resource_type),
receiver pool_receiver: Subject(Pool(resource_type)),
timeout init_timeout: Int,
) {
supervised_map(builder, pool_receiver, function.identity, init_timeout)
}

/// Like [`supervised`](#supervised), but allows you to pass a mapping function to
/// transform the pool handler before sending it to the receiver. This is mostly
/// useful for library authors who wish to use Bath to create a pool of resources.
pub fn supervised_map(
builder builder: Builder(resource_type),
receiver pool_receiver: Subject(a),
using mapper: fn(Pool(resource_type)) -> a,
timeout init_timeout: Int,
) {
supervision.worker(fn() {
use started <- result.try(
actor_builder(builder, init_timeout)
|> actor.start,
)

process.send(pool_receiver, mapper(Pool(started.data)))
Ok(started)
actor_builder(builder, init_timeout)
|> actor.start
})
}

Expand All @@ -200,13 +192,13 @@ pub fn supervised_map(
pub fn start(
builder builder: Builder(resource_type),
timeout init_timeout: Int,
) -> Result(Pool(resource_type), actor.StartError) {
) -> Result(process.Subject(Msg(resource_type)), actor.StartError) {
use started <- result.try(
actor_builder(builder, init_timeout)
|> actor.start,
)

Ok(Pool(started.data))
Ok(started.data)
}

/// The type used to indicate what to do with a resource after use.
Expand Down Expand Up @@ -243,20 +235,20 @@ pub fn returning(next: Next(old), value: new) -> Next(new) {
/// monitor in case the client dies. This allows the pool to create a new resource
/// later if required.
fn check_out(
pool: Pool(resource_type),
pool: process.Subject(Msg(resource_type)),
caller: Pid,
timeout: Int,
) -> Result(resource_type, ApplyError) {
process.call(pool.subject, timeout, CheckOut(_, caller:))
process.call(pool, timeout, CheckOut(_, caller:))
}

fn check_in(
pool: Pool(resource_type),
pool: process.Subject(Msg(resource_type)),
resource: resource_type,
caller: Pid,
next: Next(Nil),
) {
process.send(pool.subject, CheckIn(resource:, caller:, next:))
process.send(pool, CheckIn(resource:, caller:, next:))
}

/// Check out a resource from the pool, apply the `next` function, then check
Expand All @@ -276,7 +268,7 @@ fn check_in(
/// |> bath.returning("Hello!")
/// ```
pub fn apply(
pool pool: Pool(resource_type),
pool pool: process.Subject(Msg(resource_type)),
timeout timeout: Int,
next next: fn(resource_type) -> Next(result_type),
) -> Result(result_type, ApplyError) {
Expand Down Expand Up @@ -306,20 +298,15 @@ pub fn apply(
/// You only need to call this when using unsupervised pools. You should let your
/// supervision tree handle the shutdown of supervised resource pools.
pub fn shutdown(
pool pool: Pool(resource_type),
pool pool: process.Subject(Msg(resource_type)),
force force: Bool,
timeout timeout: Int,
) -> Result(Nil, ShutdownError) {
process.call(pool.subject, timeout, Shutdown(_, force))
process.call(pool, timeout, Shutdown(_, force))
}

// ----- Pool ----- //

/// The interface for interacting with a pool of resources in Bath.
pub opaque type Pool(resource_type) {
Pool(subject: Subject(Msg(resource_type)))
}

/// Pool actor state.
pub opaque type State(resource_type) {
State(
Expand Down Expand Up @@ -582,37 +569,45 @@ fn actor_builder(
Msg(resource_type),
Subject(Msg(resource_type)),
) {
actor.new_with_initialiser(init_timeout, fn(self) {
use #(resources, current_size) <- result.try(create_pool_resources(builder))

// Trap exits
process.trap_exits(True)

let selector =
process.new_selector()
|> process.select(self)
|> process.select_trapped_exits(PoolExit)
let pool_builder =
actor.new_with_initialiser(init_timeout, fn(self) {
use #(resources, current_size) <- result.try(create_pool_resources(
builder,
))

// Trap exits
process.trap_exits(True)

let selector =
process.new_selector()
|> process.select(self)
|> process.select_trapped_exits(PoolExit)

let state =
State(
resources:,
checkout_strategy: builder.checkout_strategy,
creation_strategy: builder.creation_strategy,
live_resources: dict.new(),
selector:,
current_size:,
max_size: builder.size,
create_resource: builder.create_resource,
shutdown_resource: builder.shutdown_resource,
log_errors: builder.log_errors,
)

let state =
State(
resources:,
checkout_strategy: builder.checkout_strategy,
creation_strategy: builder.creation_strategy,
live_resources: dict.new(),
selector:,
current_size:,
max_size: builder.size,
create_resource: builder.create_resource,
shutdown_resource: builder.shutdown_resource,
log_errors: builder.log_errors,
)
actor.initialised(state)
|> actor.selecting(selector)
|> actor.returning(self)
|> Ok
})
|> actor.on_message(handle_pool_message)

actor.initialised(state)
|> actor.selecting(selector)
|> actor.returning(self)
|> Ok
})
|> actor.on_message(handle_pool_message)
case builder.name {
option.Some(name) -> actor.named(pool_builder, name)
option.None -> pool_builder
}
}

// ----- Utils ----- //
Expand Down
29 changes: 4 additions & 25 deletions test/bath_test.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -30,41 +30,20 @@ pub fn lifecycle_test() {
}

pub fn supervised_lifecycle_test() {
let pool_receiver = process.new_subject()
let pool_name = process.new_name("bath_pool")

let bath_child_spec =
bath.new(fn() { Ok(10) })
|> bath.size(1)
|> bath.supervised(pool_receiver, 1000)
|> bath.name(pool_name)
|> bath.supervised(1000)

let assert Ok(_started) =
static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(bath_child_spec)
|> static_supervisor.start

let assert Ok(pool) = process.receive(pool_receiver, 1000)

let assert Ok(_) = bath.shutdown(pool, False, 1000)
}

type Mapped(a) {
Mapped(a)
}

pub fn supervised_map_test() {
let number_receiver = process.new_subject()

let bath_child_spec =
bath.new(fn() { Ok(10) })
|> bath.size(1)
|> bath.supervised_map(number_receiver, Mapped, 1000)

let assert Ok(_started) =
static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(bath_child_spec)
|> static_supervisor.start

let assert Ok(Mapped(pool)) = process.receive(number_receiver, 1000)
let pool = process.named_subject(pool_name)

let assert Ok(_) = bath.shutdown(pool, False, 1000)
}
Expand Down