From 098a235a876c967ccac82087dff053b5a648b360 Mon Sep 17 00:00:00 2001 From: Isaac Harris-Holt Date: Sun, 22 Jun 2025 19:55:23 +0100 Subject: [PATCH] accept pool names rather than sending subjects everywhere --- CHANGELOG.md | 9 +++ README.md | 15 ++--- gleam.toml | 2 +- src/bath.gleam | 147 +++++++++++++++++++++---------------------- test/bath_test.gleam | 29 ++------- 5 files changed, 91 insertions(+), 111 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e1a9b8d..c4570c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Changelog +## v5.0.0 - 2025-06-22 + +- Remove the `Pool` type in favour of `process.Subject(bath.Msg(msg))`. It was just a + wrapper, anyway. +- Allow pools to be named, avoiding the previous dance required to start a supervised + pool. + - This changes the signature of `bath.supervised` and removes the need for + `bath.supervised_map`. + ## v4.1.0 - 2025-06-21 - Added `bath.supervised_map` to create a supervised pool of resources while mapping diff --git a/README.md b/README.md index 5274c84..522e155 100644 --- a/README.md +++ b/README.md @@ -23,20 +23,17 @@ import fake_db import gleam/otp/static_supervisor as supervisor pub fn main() { - // Create a subject to receive the pool handler once the supervision tree has been - // started. Use a named subject to make sure we can always receive the pool handler, - // even if our original process crashes. - let pool_receiver_name = process.new_name("bath_pool_receiver") - let assert Ok(_) = process.register(process.self(), pool_receiver_name) - - let pool_receiver = process.named_subject(pool_receiver_name) + // Create a name to interact with the pool once it's started under the + // static supervisor. + let pool_name = process.new_name("bath_pool") // Define a pool of 10 connections to some fictional database, and create a child // spec to allow it to be supervised. let bath_child_spec = bath.new(fn() { fake_db.get_conn() }) |> bath.size(10) - |> bath.supervised(pool_receiver, 1000) + |> bath.name(pool_name) + |> bath.supervised(1000) // Start the pool under a supervisor let assert Ok(_started) = @@ -45,7 +42,7 @@ pub fn main() { |> supervisor.start // Receive the pool handle now that it's started - let assert Ok(pool) = process.receive(pool_receiver, 1000) + let pool = process.named_subject(pool_name) // Use the pool. Shown here in a block to use `use`. let assert Ok("Hello!") = { diff --git a/gleam.toml b/gleam.toml index 45c2825..f778fab 100644 --- a/gleam.toml +++ b/gleam.toml @@ -1,5 +1,5 @@ name = "bath" -version = "4.1.0" +version = "5.0.0" description = "A resource pool for Gleam!" licences = ["MIT"] repository = { type = "github", user = "Pevensie", repo = "bath" } diff --git a/src/bath.gleam b/src/bath.gleam index f81b5e7..537bdc9 100644 --- a/src/bath.gleam +++ b/src/bath.gleam @@ -1,9 +1,9 @@ import gleam/deque import gleam/dict.{type Dict} import gleam/erlang/process.{type Pid, type Subject} -import gleam/function import gleam/int import gleam/list +import gleam/option import gleam/otp/actor import gleam/otp/supervision import gleam/result @@ -29,6 +29,7 @@ pub type CreationStrategy { /// Configuration for a [`Pool`](#Pool). pub opaque type Builder(resource_type) { Builder( + name: option.Option(process.Name(Msg(resource_type))), size: Int, create_resource: fn() -> Result(resource_type, String), shutdown_resource: fn(resource_type) -> Nil, @@ -57,6 +58,7 @@ pub opaque type Builder(resource_type) { /// /// | Config | Default | /// |--------|---------| +/// | `name` | `option.None` | /// | `size` | 10 | /// | `shutdown_resource` | `fn(_resource) { Nil }` | /// | `checkout_strategy` | `FIFO` | @@ -66,6 +68,7 @@ pub fn new( resource create_resource: fn() -> Result(resource_type, String), ) -> Builder(resource_type) { Builder( + name: option.None, size: 10, create_resource:, shutdown_resource: fn(_) { Nil }, @@ -83,6 +86,17 @@ pub fn size( Builder(..builder, size: int.max(size, 1)) } +/// Set the name for the pool process. Defaults to `None`. +/// +/// You will need to provide a name if you plan on using the pool under a static +/// supervisor. +pub fn name( + builder builder: Builder(resource_type), + name name: process.Name(Msg(resource_type)), +) -> Builder(resource_type) { + Builder(..builder, name: option.Some(name)) +} + /// Set a shutdown function to be run for each resource when the pool exits. pub fn on_shutdown( builder builder: Builder(resource_type), @@ -133,8 +147,8 @@ pub type ShutdownError { /// Return the [`ChildSpecification`](https://hexdocs.pm/gleam_otp/gleam/otp/supervision.html#ChildSpecification) /// for creating a supervised resource pool. /// -/// You must provide a selector to receive the [`Pool`](#Pool) value representing the -/// pool once it has started. +/// In order to use a supervised pool, your pool _must_ be named, otherwise you will +/// not be able to send messages to your pool. See the [`name`](#name) function. /// /// ## Example /// @@ -144,53 +158,31 @@ pub type ShutdownError { /// import gleam/otp/static_supervisor as supervisor /// /// fn main() { -/// // Create a subject to receive the pool handler once the supervision tree has been -/// // started. Use a named subject to make sure we can always receive the pool handler, -/// // even if our original process crashes. -/// let pool_receiver_name = process.new_name("bath_pool_receiver") -/// let assert Ok(_) = process.register(process.self(), pool_receiver_name) -/// -/// let pool_receiver = process.named_subject(pool_receiver_name) +/// // Create a name to interact with the pool once it's started under the +/// // static supervisor. +/// let pool_name = process.new_name("bath_pool") /// /// let assert Ok(_started) = /// supervisor.new(supervisor.OneForOne) /// |> supervisor.add( /// bath.new(create_resource) -/// |> bath.supervised(pool_receiver, 1000) +/// |> bath.name(pool_name) +/// |> bath.supervised(1000) /// ) /// |> supervisor.start /// -/// let assert Ok(pool) = -/// process.receive(pool_receiver) +/// let pool = process.named_subject(pool_name) /// /// // Do more stuff... /// } /// ``` pub fn supervised( builder builder: Builder(resource_type), - receiver pool_receiver: Subject(Pool(resource_type)), - timeout init_timeout: Int, -) { - supervised_map(builder, pool_receiver, function.identity, init_timeout) -} - -/// Like [`supervised`](#supervised), but allows you to pass a mapping function to -/// transform the pool handler before sending it to the receiver. This is mostly -/// useful for library authors who wish to use Bath to create a pool of resources. -pub fn supervised_map( - builder builder: Builder(resource_type), - receiver pool_receiver: Subject(a), - using mapper: fn(Pool(resource_type)) -> a, timeout init_timeout: Int, ) { supervision.worker(fn() { - use started <- result.try( - actor_builder(builder, init_timeout) - |> actor.start, - ) - - process.send(pool_receiver, mapper(Pool(started.data))) - Ok(started) + actor_builder(builder, init_timeout) + |> actor.start }) } @@ -200,13 +192,13 @@ pub fn supervised_map( pub fn start( builder builder: Builder(resource_type), timeout init_timeout: Int, -) -> Result(Pool(resource_type), actor.StartError) { +) -> Result(process.Subject(Msg(resource_type)), actor.StartError) { use started <- result.try( actor_builder(builder, init_timeout) |> actor.start, ) - Ok(Pool(started.data)) + Ok(started.data) } /// The type used to indicate what to do with a resource after use. @@ -243,20 +235,20 @@ pub fn returning(next: Next(old), value: new) -> Next(new) { /// monitor in case the client dies. This allows the pool to create a new resource /// later if required. fn check_out( - pool: Pool(resource_type), + pool: process.Subject(Msg(resource_type)), caller: Pid, timeout: Int, ) -> Result(resource_type, ApplyError) { - process.call(pool.subject, timeout, CheckOut(_, caller:)) + process.call(pool, timeout, CheckOut(_, caller:)) } fn check_in( - pool: Pool(resource_type), + pool: process.Subject(Msg(resource_type)), resource: resource_type, caller: Pid, next: Next(Nil), ) { - process.send(pool.subject, CheckIn(resource:, caller:, next:)) + process.send(pool, CheckIn(resource:, caller:, next:)) } /// Check out a resource from the pool, apply the `next` function, then check @@ -276,7 +268,7 @@ fn check_in( /// |> bath.returning("Hello!") /// ``` pub fn apply( - pool pool: Pool(resource_type), + pool pool: process.Subject(Msg(resource_type)), timeout timeout: Int, next next: fn(resource_type) -> Next(result_type), ) -> Result(result_type, ApplyError) { @@ -306,20 +298,15 @@ pub fn apply( /// You only need to call this when using unsupervised pools. You should let your /// supervision tree handle the shutdown of supervised resource pools. pub fn shutdown( - pool pool: Pool(resource_type), + pool pool: process.Subject(Msg(resource_type)), force force: Bool, timeout timeout: Int, ) -> Result(Nil, ShutdownError) { - process.call(pool.subject, timeout, Shutdown(_, force)) + process.call(pool, timeout, Shutdown(_, force)) } // ----- Pool ----- // -/// The interface for interacting with a pool of resources in Bath. -pub opaque type Pool(resource_type) { - Pool(subject: Subject(Msg(resource_type))) -} - /// Pool actor state. pub opaque type State(resource_type) { State( @@ -582,37 +569,45 @@ fn actor_builder( Msg(resource_type), Subject(Msg(resource_type)), ) { - actor.new_with_initialiser(init_timeout, fn(self) { - use #(resources, current_size) <- result.try(create_pool_resources(builder)) - - // Trap exits - process.trap_exits(True) - - let selector = - process.new_selector() - |> process.select(self) - |> process.select_trapped_exits(PoolExit) + let pool_builder = + actor.new_with_initialiser(init_timeout, fn(self) { + use #(resources, current_size) <- result.try(create_pool_resources( + builder, + )) + + // Trap exits + process.trap_exits(True) + + let selector = + process.new_selector() + |> process.select(self) + |> process.select_trapped_exits(PoolExit) + + let state = + State( + resources:, + checkout_strategy: builder.checkout_strategy, + creation_strategy: builder.creation_strategy, + live_resources: dict.new(), + selector:, + current_size:, + max_size: builder.size, + create_resource: builder.create_resource, + shutdown_resource: builder.shutdown_resource, + log_errors: builder.log_errors, + ) - let state = - State( - resources:, - checkout_strategy: builder.checkout_strategy, - creation_strategy: builder.creation_strategy, - live_resources: dict.new(), - selector:, - current_size:, - max_size: builder.size, - create_resource: builder.create_resource, - shutdown_resource: builder.shutdown_resource, - log_errors: builder.log_errors, - ) + actor.initialised(state) + |> actor.selecting(selector) + |> actor.returning(self) + |> Ok + }) + |> actor.on_message(handle_pool_message) - actor.initialised(state) - |> actor.selecting(selector) - |> actor.returning(self) - |> Ok - }) - |> actor.on_message(handle_pool_message) + case builder.name { + option.Some(name) -> actor.named(pool_builder, name) + option.None -> pool_builder + } } // ----- Utils ----- // diff --git a/test/bath_test.gleam b/test/bath_test.gleam index 2208cec..57998a2 100644 --- a/test/bath_test.gleam +++ b/test/bath_test.gleam @@ -30,41 +30,20 @@ pub fn lifecycle_test() { } pub fn supervised_lifecycle_test() { - let pool_receiver = process.new_subject() + let pool_name = process.new_name("bath_pool") let bath_child_spec = bath.new(fn() { Ok(10) }) |> bath.size(1) - |> bath.supervised(pool_receiver, 1000) + |> bath.name(pool_name) + |> bath.supervised(1000) let assert Ok(_started) = static_supervisor.new(static_supervisor.OneForOne) |> static_supervisor.add(bath_child_spec) |> static_supervisor.start - let assert Ok(pool) = process.receive(pool_receiver, 1000) - - let assert Ok(_) = bath.shutdown(pool, False, 1000) -} - -type Mapped(a) { - Mapped(a) -} - -pub fn supervised_map_test() { - let number_receiver = process.new_subject() - - let bath_child_spec = - bath.new(fn() { Ok(10) }) - |> bath.size(1) - |> bath.supervised_map(number_receiver, Mapped, 1000) - - let assert Ok(_started) = - static_supervisor.new(static_supervisor.OneForOne) - |> static_supervisor.add(bath_child_spec) - |> static_supervisor.start - - let assert Ok(Mapped(pool)) = process.receive(number_receiver, 1000) + let pool = process.named_subject(pool_name) let assert Ok(_) = bath.shutdown(pool, False, 1000) }