diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 14ea1bb..9cb065b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,7 +15,7 @@ jobs: - uses: erlef/setup-beam@v1 with: otp-version: "27.1.2" - gleam-version: "1.6.3" + gleam-version: "1.11.1" rebar3-version: "3" # elixir-version: "1.15.4" - run: gleam deps download diff --git a/CHANGELOG.md b/CHANGELOG.md index 6109ab6..d1ab72e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,70 @@ # Changelog +## v3.0.0 - 2025-06-14 + +Bath has been updated to use the new stable versions of `gleam/erlang` and +`gleam/otp`. Bath now follows the conventions laid out in `gleam/otp`, and +creating a resource pool under a supervisor is now much easier. + +### Example + +The recommended way to start a Bath pool is with the `supervised` function. You +can use this to include the Bath pool in your application's supervision tree. + +```gleam +import bath +import fake_db +import gleam/otp/static_supervisor as supervisor + +pub fn main() { + let pool_receiver = process.new_subject() + + // Define a pool of 10 connections to some fictional database, and create a child + // spec to allow it to be supervised. + let bath_child_spec = + bath.new(fn() { fake_db.get_conn() }) + |> bath.size(10) + |> bath.supervised(pool_receiver, 1000) + + // Start the pool under a supervisor + let assert Ok(_started) = + supervisor.new(supervisor.OneForOne) + |> supervisor.add(bath_child_spec) + |> supervisor.start + + // Receive the pool handle now that it's started + let assert Ok(pool) = process.receive(pool_receiver, 1000) + + // Use the pool. Shown here in a block to use `use`. + let assert Ok("Hello!") = { + use conn <- bath.apply(pool, 1000) + // Do stuff with the connection... + "Hello!" + } + + // Close the pool. + let assert Ok(_) = bath.shutdown(pool, False, 1000) +} +``` + +### Behavioural changes + +#### Panics + +Like the new version of `gleam/erlang`, failing to send messages to the Bath pool will +will now panic rather than returning an error result. + +##### Why? + +Previously, Bath used the `process.try_call` function that was present in +`gleam/erlang`. However, this had the potential to cause a memory leak if the +process did not return within the provided timeout. + +The calling process would cancel its receive operation and continue, and the +process would also continue its operation. When the process replied to the +calling process, that message would be stuck in the caller's queue, never to +be received. + ## v2.0.0 - 2024-12-29 - Switch to a builder pattern for pool configuration. diff --git a/README.md b/README.md index 4d54ed4..5bcc3a1 100644 --- a/README.md +++ b/README.md @@ -9,22 +9,37 @@ any value, such as database connections, file handles, or other resources. ## Installation ```sh -gleam add bath@2 +gleam add bath@3 ``` ## Usage +The recommended way to start a Bath pool is with the `supervised` function. You +can use this to include the Bath pool in your application's supervision tree. + ```gleam import bath import fake_db +import gleam/otp/static_supervisor as supervisor pub fn main() { - // Create a pool of 10 connections to some fictional database. - let assert Ok(pool) = - bath.new(fn() { fake_db.connect() }) - |> bath.with_size(10) - |> bath.with_shutdown(fn(conn) { fake_db.close(conn) }) - |> bath.start(1000) + let pool_receiver = process.new_subject() + + // Define a pool of 10 connections to some fictional database, and create a child + // spec to allow it to be supervised. + let bath_child_spec = + bath.new(fn() { fake_db.get_conn() }) + |> bath.size(10) + |> bath.supervised(pool_receiver, 1000) + + // Start the pool under a supervisor + let assert Ok(_started) = + supervisor.new(supervisor.OneForOne) + |> supervisor.add(bath_child_spec) + |> supervisor.start + + // Receive the pool handle now that it's started + let assert Ok(pool) = process.receive(pool_receiver, 1000) // Use the pool. Shown here in a block to use `use`. let assert Ok("Hello!") = { @@ -34,7 +49,7 @@ pub fn main() { } // Close the pool. - let assert Ok(_) = bath.shutdown(pool, fn(conn) { fake_db.close(conn) }, 1000) + let assert Ok(_) = bath.shutdown(pool, False, 1000) } ``` @@ -42,7 +57,12 @@ Further documentation can be found at . ## Development +If you've found any bugs, please open an issue on +[GitHub](https://github.com/Pevensie/bath/issues). + +The code is reasonably well tested and documented, but PRs to improve either are always +welcome. + ```sh -gleam run # Run the project gleam test # Run the tests ``` diff --git a/gleam.toml b/gleam.toml index c8b1ade..0a480c8 100644 --- a/gleam.toml +++ b/gleam.toml @@ -1,14 +1,14 @@ name = "bath" -version = "2.0.0" +version = "3.0.0" description = "A resource pool for Gleam!" licences = ["MIT"] repository = { type = "github", user = "Pevensie", repo = "bath" } # links = [{ title = "Website", href = "" }] [dependencies] -gleam_stdlib = ">= 0.51.0 and < 2.0.0" -gleam_otp = ">= 0.16.0 and < 1.0.0" -gleam_erlang = ">= 0.33.0 and < 1.0.0" +gleam_stdlib = ">= 0.60.0 and < 2.0.0" +gleam_otp = ">= 1.0.0 and < 2.0.0" +gleam_erlang = ">= 1.0.0 and < 2.0.0" gleam_deque = ">= 1.0.0 and < 2.0.0" logging = ">= 1.3.0 and < 2.0.0" diff --git a/manifest.toml b/manifest.toml index 583c2bf..6ec542c 100644 --- a/manifest.toml +++ b/manifest.toml @@ -3,17 +3,17 @@ packages = [ { name = "gleam_deque", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_deque", source = "hex", outer_checksum = "64D77068931338CF0D0CB5D37522C3E3CCA7CB7D6C5BACB41648B519CC0133C7" }, - { name = "gleam_erlang", version = "0.33.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "A1D26B80F01901B59AABEE3475DD4C18D27D58FA5C897D922FCB9B099749C064" }, - { name = "gleam_otp", version = "0.16.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "FA0EB761339749B4E82D63016C6A18C4E6662DA05BAB6F1346F9AF2E679E301A" }, - { name = "gleam_stdlib", version = "0.51.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "14AFA8D3DDD7045203D422715DBB822D1725992A31DF35A08D97389014B74B68" }, - { name = "gleeunit", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "F7A7228925D3EE7D0813C922E062BFD6D7E9310F0BEE585D3A42F3307E3CFD13" }, + { name = "gleam_erlang", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "7E6A5234F927C4B24F8054AB1E4572206C41F9E6D5C6C02273CB7531E7E5CED0" }, + { name = "gleam_otp", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "7020E652D18F9ABAC9C877270B14160519FA0856EE80126231C505D719AD68DA" }, + { name = "gleam_stdlib", version = "0.60.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "621D600BB134BC239CB2537630899817B1A42E60A1D46C5E9F3FAE39F88C800B" }, + { name = "gleeunit", version = "1.5.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "D33B7736CF0766ED3065F64A1EBB351E72B2E8DE39BAFC8ADA0E35E92A6A934F" }, { name = "logging", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "logging", source = "hex", outer_checksum = "1098FBF10B54B44C2C7FDF0B01C1253CAFACDACABEFB4B0D027803246753E06D" }, ] [requirements] gleam_deque = { version = ">= 1.0.0 and < 2.0.0" } -gleam_erlang = { version = ">= 0.33.0 and < 1.0.0" } -gleam_otp = { version = ">= 0.16.0 and < 1.0.0" } -gleam_stdlib = { version = ">= 0.51.0 and < 2.0.0" } +gleam_erlang = { version = ">= 1.0.0 and < 2.0.0" } +gleam_otp = { version = ">= 1.0.0 and < 2.0.0" } +gleam_stdlib = { version = ">= 0.60.0 and < 2.0.0" } gleeunit = { version = ">= 1.0.0 and < 2.0.0" } logging = { version = ">= 1.3.0 and < 2.0.0" } diff --git a/src/bath.gleam b/src/bath.gleam index 47bd436..cfacf0c 100644 --- a/src/bath.gleam +++ b/src/bath.gleam @@ -1,10 +1,9 @@ import gleam/deque import gleam/dict.{type Dict} -import gleam/dynamic import gleam/erlang/process.{type Pid, type Subject} -import gleam/function import gleam/list import gleam/otp/actor +import gleam/otp/supervision import gleam/result import gleam/string import logging @@ -26,17 +25,18 @@ pub type CreationStrategy { } /// Configuration for a [`Pool`](#Pool). -pub opaque type PoolConfig(resource_type, resource_create_error) { - PoolConfig( +pub opaque type Builder(resource_type) { + Builder( size: Int, - create_resource: fn() -> Result(resource_type, resource_create_error), + create_resource: fn() -> Result(resource_type, String), shutdown_resource: fn(resource_type) -> Nil, checkout_strategy: CheckoutStrategy, creation_strategy: CreationStrategy, + log_errors: Bool, ) } -/// Create a new [`PoolConfig`](#PoolConfig) for creating a pool of resources. +/// Create a new [`Builder`](#Builder) for creating a pool of resources. /// /// ```gleam /// import bath @@ -59,63 +59,66 @@ pub opaque type PoolConfig(resource_type, resource_create_error) { /// | `shutdown_resource` | `fn(_resource) { Nil }` | /// | `checkout_strategy` | `FIFO` | /// | `creation_strategy` | `Lazy` | +/// | `log_errors` | `False` | pub fn new( - resource create_resource: fn() -> Result(resource_type, resource_create_error), -) -> PoolConfig(resource_type, resource_create_error) { - PoolConfig( + resource create_resource: fn() -> Result(resource_type, String), +) -> Builder(resource_type) { + Builder( size: 10, create_resource:, shutdown_resource: fn(_) { Nil }, checkout_strategy: FIFO, creation_strategy: Lazy, + log_errors: False, ) } /// Set the pool size. Defaults to 10. -pub fn with_size( - config pool_config: PoolConfig(resource_type, resource_create_error), +pub fn size( + builder builder: Builder(resource_type), size size: Int, -) -> PoolConfig(resource_type, resource_create_error) { - PoolConfig(..pool_config, size:) +) -> Builder(resource_type) { + Builder(..builder, size:) } /// Set a shutdown function to be run for each resource when the pool exits. -pub fn with_shutdown( - config pool_config: PoolConfig(resource_type, resource_create_error), +pub fn on_shutdown( + builder builder: Builder(resource_type), shutdown shutdown_resource: fn(resource_type) -> Nil, -) -> PoolConfig(resource_type, resource_create_error) { - PoolConfig(..pool_config, shutdown_resource:) +) -> Builder(resource_type) { + Builder(..builder, shutdown_resource:) } /// Change the checkout strategy for the pool. Defaults to `FIFO`. -pub fn with_checkout_strategy( - config pool_config: PoolConfig(resource_type, resource_create_error), +pub fn checkout_strategy( + builder builder: Builder(resource_type), strategy checkout_strategy: CheckoutStrategy, -) -> PoolConfig(resource_type, resource_create_error) { - PoolConfig(..pool_config, checkout_strategy:) +) -> Builder(resource_type) { + Builder(..builder, checkout_strategy:) } /// Change the resource creation strategy for the pool. Defaults to `Lazy`. -pub fn with_creation_strategy( - config pool_config: PoolConfig(resource_type, resource_create_error), +pub fn creation_strategy( + builder builder: Builder(resource_type), strategy creation_strategy: CreationStrategy, -) -> PoolConfig(resource_type, resource_create_error) { - PoolConfig(..pool_config, creation_strategy:) +) -> Builder(resource_type) { + Builder(..builder, creation_strategy:) } -// ----- Lifecycle functions ---- // - -/// An error returned when creating a [`Pool`](#Pool). -pub type StartError(resource_create_error) { - PoolStartResourceCreateError(resource_create_error) - ActorStartError(actor.StartError) +/// Set whether the pool logs errors when resources fail to create. +pub fn log_errors( + builder builder: Builder(resource_type), + log_errors log_errors: Bool, +) -> Builder(resource_type) { + Builder(..builder, log_errors:) } +// ----- Lifecycle functions ---- // + /// An error returned when failing to apply a function to a pooled resource. -pub type ApplyError(resource_create_error) { +pub type ApplyError { NoResourcesAvailable - CheckOutResourceCreateError(resource_create_error) - CheckOutTimeout + CheckOutResourceCreateError(error: String) } /// An error returned when the resource pool fails to shut down. @@ -123,85 +126,81 @@ pub type ShutdownError { /// There are still resources checked out. Ignore this failure case by /// calling [`shutdown`](#shutdown) function with `force` set to `True`. ResourcesInUse - /// The shutdown timeout expired. - ShutdownTimeout - /// The pool was already down or failed to send the response message. - CalleeDown(reason: dynamic.Dynamic) } -/// Create a child actor spec pool actor, for use in your application's supervision tree, -/// using the given [`PoolConfig`](#PoolConfig). Once the pool is started, use -/// [`from_subject`](#from_subject) to create a [`Pool`](#Pool) from the -/// `Subject(bath.Msg(a, b))`. -pub fn child_spec( - config pool_config: PoolConfig(resource_type, resource_create_error), +/// Return the [`ChildSpecification`](https://hexdocs.pm/gleam_otp/gleam/otp/supervision.html#ChildSpecification) +/// for creating a supervised resource pool. +/// +/// You must provide a selector to receive the [`Pool`](#Pool) value representing the +/// pool once it has started. +/// +/// ## Example +/// +/// ```gleam +/// import bath +/// import gleam/erlang/process +/// import gleam/otp/static_supervisor as supervisor +/// +/// fn main() { +/// let pool_receiver = process.new_subject() +/// +/// let assert Ok(_started) = +/// supervisor.new(supervisor.OneForOne) +/// |> supervisor.add( +/// bath.new(create_resource) +/// |> bath.supervised(pool_receiver, 1000) +/// ) +/// |> supervisor.start +/// +/// let assert Ok(pool) = +/// process.receive(pool_receiver) +/// +/// let assert Ok(_) = bath.apply(pool, fn(res) { echo res }) +/// } +/// ``` +pub fn supervised( + builder builder: Builder(resource_type), + receiver pool_receiver: Subject(Pool(resource_type)), timeout init_timeout: Int, -) -> Result( - actor.Spec( - State(resource_type, resource_create_error), - Msg(resource_type, resource_create_error), - ), - StartError(resource_create_error), ) { - let #(resources_result, current_size) = case pool_config.creation_strategy { - Lazy -> #(Ok(deque.new()), 0) - Eager -> #( - list.repeat("", pool_config.size) - |> list.try_map(fn(_) { pool_config.create_resource() }) - |> result.map(deque.from_list) - |> result.map_error(PoolStartResourceCreateError), - pool_config.size, + supervision.worker(fn() { + use started <- result.try( + actor_builder(builder, init_timeout) + |> actor.start, ) - } - - use resources <- result.try(resources_result) - Ok(pool_spec(pool_config, resources, current_size, init_timeout)) -} - -/// Create a [`Pool`](#Pool) from a `Subject(bath.Msg(a, b))`. Useful when -/// creating a pool as part of a supervision tree via the -/// [`child_spec`](#child_spec) function. -pub fn from_subject( - subject subject: Subject(Msg(resource_type, resource_create_error)), -) -> Pool(resource_type, resource_create_error) { - Pool(subject:) + process.send(pool_receiver, Pool(started.data)) + Ok(started) + }) } -/// Start a pool actor using the given [`PoolConfig`](#PoolConfig) and return a -/// [`Pool`](#Pool). +/// Start an unsupervised pool using the given [`Builder`](#Builder) and return a +/// [`Pool`](#Pool). In most cases, you should use the [`supervised`](#supervised) +/// function instead. pub fn start( - config pool_config: PoolConfig(resource_type, resource_create_error), + builder builder: Builder(resource_type), timeout init_timeout: Int, -) -> Result( - Pool(resource_type, resource_create_error), - StartError(resource_create_error), -) { - use spec <- result.try(child_spec(pool_config, init_timeout)) +) -> Result(Pool(resource_type), actor.StartError) { + use started <- result.try( + actor_builder(builder, init_timeout) + |> actor.start, + ) - actor.start_spec(spec) - |> result.map(fn(subject) { Pool(subject:) }) - |> result.map_error(ActorStartError) + Ok(Pool(started.data)) } /// Checks out a resource from the pool, sending the caller Pid for the pool to /// monitor in case the client dies. This allows the pool to create a new resource /// later if required. fn check_out( - pool: Pool(resource_type, resource_create_error), + pool: Pool(resource_type), caller: Pid, timeout: Int, -) -> Result(resource_type, ApplyError(resource_create_error)) { - process.try_call(pool.subject, CheckOut(_, caller:), timeout) - |> result.replace_error(CheckOutTimeout) - |> result.flatten +) -> Result(resource_type, ApplyError) { + process.call(pool.subject, timeout, CheckOut(_, caller:)) } -fn check_in( - pool: Pool(resource_type, resource_create_error), - resource: resource_type, - caller: Pid, -) { +fn check_in(pool: Pool(resource_type), resource: resource_type, caller: Pid) { process.send(pool.subject, CheckIn(resource:, caller:)) } @@ -217,10 +216,10 @@ fn check_in( /// // Do stuff with resource... /// ``` pub fn apply( - pool pool: Pool(resource_type, resource_create_error), + pool pool: Pool(resource_type), timeout timeout: Int, next next: fn(resource_type) -> result_type, -) -> Result(result_type, ApplyError(resource_create_error)) { +) -> Result(result_type, ApplyError) { let self = process.self() use resource <- result.try(check_out(pool, self, timeout)) @@ -239,41 +238,35 @@ pub fn apply( /// Will fail if there are still resources checked out, unless `force` is /// `True`. pub fn shutdown( - pool pool: Pool(resource_type, resource_create_error), + pool pool: Pool(resource_type), force force: Bool, timeout timeout: Int, -) { - process.try_call(pool.subject, Shutdown(_, force), timeout) - |> result.map_error(fn(err) { - case err { - process.CallTimeout -> ShutdownTimeout - process.CalleeDown(reason) -> CalleeDown(reason:) - } - }) - |> result.flatten +) -> Result(Nil, ShutdownError) { + process.call(pool.subject, timeout, Shutdown(_, force)) } // ----- Pool ----- // /// The interface for interacting with a pool of resources in Bath. -pub opaque type Pool(resource_type, resource_create_error) { - Pool(subject: Subject(Msg(resource_type, resource_create_error))) +pub opaque type Pool(resource_type) { + Pool(subject: Subject(Msg(resource_type))) } /// Pool actor state. -pub opaque type State(resource_type, resource_create_error) { +pub opaque type State(resource_type) { State( // Config checkout_strategy: CheckoutStrategy, creation_strategy: CreationStrategy, max_size: Int, - create_resource: fn() -> Result(resource_type, resource_create_error), + create_resource: fn() -> Result(resource_type, String), shutdown_resource: fn(resource_type) -> Nil, // State resources: deque.Deque(resource_type), current_size: Int, live_resources: LiveResources(resource_type), - selector: process.Selector(Msg(resource_type, resource_create_error)), + selector: process.Selector(Msg(resource_type)), + log_errors: Bool, ) } @@ -281,25 +274,19 @@ type LiveResources(resource_type) = Dict(Pid, LiveResource(resource_type)) type LiveResource(resource_type) { - LiveResource(resource: resource_type, monitor: process.ProcessMonitor) + LiveResource(resource: resource_type, monitor: process.Monitor) } /// A message sent to the pool actor. -pub opaque type Msg(resource_type, resource_create_error) { +pub opaque type Msg(resource_type) { CheckIn(resource: resource_type, caller: Pid) - CheckOut( - reply_to: Subject(Result(resource_type, ApplyError(resource_create_error))), - caller: Pid, - ) + CheckOut(reply_to: Subject(Result(resource_type, ApplyError)), caller: Pid) PoolExit(process.ExitMessage) - CallerDown(process.ProcessDown) + CallerDown(process.Down) Shutdown(reply_to: process.Subject(Result(Nil, ShutdownError)), force: Bool) } -fn handle_pool_message( - msg: Msg(resource_type, resource_create_error), - state: State(resource_type, resource_create_error), -) { +fn handle_pool_message(state: State(resource_type), msg: Msg(resource_type)) { case msg { CheckIn(resource:, caller:) -> { // If the checked-in process currently has a live resource, remove it from @@ -344,7 +331,7 @@ fn handle_pool_message( use resource <- result.try( state.create_resource() |> result.map_error(fn(err) { - log_resource_creation_error(err) + log_resource_creation_error(state.log_errors, err) CheckOutResourceCreateError(err) }), ) @@ -396,7 +383,13 @@ fn handle_pool_message( |> deque.to_list |> list.each(state.shutdown_resource) - actor.Stop(exit_message.reason) + case exit_message.reason { + process.Abnormal(reason) -> + string.inspect(reason) + |> actor.stop_abnormal + process.Killed -> actor.stop_abnormal("Killed") + process.Normal -> actor.stop() + } } Shutdown(reply_to:, force:) -> { case dict.size(state.live_resources), force { @@ -407,12 +400,12 @@ fn handle_pool_message( |> list.each(state.shutdown_resource) actor.send(reply_to, Ok(Nil)) - actor.Stop(process.Normal) + actor.stop() } _, True -> { // Force shutdown actor.send(reply_to, Ok(Nil)) - actor.Stop(process.Normal) + actor.stop() } _, False -> { actor.send(reply_to, Error(ResourcesInUse)) @@ -421,9 +414,12 @@ fn handle_pool_message( } } CallerDown(process_down) -> { + // We don't monitor ports + let assert process.ProcessDown(pid: process_down_pid, ..) = process_down + // If the caller was a live resource, either create a new one or // decrement the current size depending on the creation strategy - case dict.get(state.live_resources, process_down.pid) { + case dict.get(state.live_resources, process_down_pid) { // Continue as normal, ignoring this message Error(_) -> actor.continue(state) Ok(live_resource) -> { @@ -450,97 +446,146 @@ fn handle_pool_message( ) // Size has changed Error(resource_create_error) -> { - log_resource_creation_error(resource_create_error) + log_resource_creation_error( + state.log_errors, + resource_create_error, + ) #(state.resources, state.current_size) } } } } - actor.with_selector( - actor.continue( - State( - ..state, - resources: new_resources, - current_size: new_current_size, - selector:, - live_resources: dict.delete( - state.live_resources, - process_down.pid, - ), - ), - ), - selector, + State( + ..state, + resources: new_resources, + current_size: new_current_size, + selector:, + live_resources: dict.delete(state.live_resources, process_down_pid), ) + |> actor.continue + |> actor.with_selector(selector) + } + } + } + } +} + +/// Create the resources for a pool, returning a deque of resources and the number of +/// resources created. +fn create_pool_resources( + builder: Builder(resource_type), +) -> Result(#(deque.Deque(resource_type), Int), String) { + case builder.creation_strategy { + Lazy -> Ok(#(deque.new(), 0)) + Eager -> { + let create_result = + list.repeat("", builder.size) + |> try_map_returning(fn(_) { builder.create_resource() }) + |> result.map(deque.from_list) + + case create_result { + Ok(resources) -> Ok(#(resources, builder.size)) + Error(#(created_resources, error)) -> { + created_resources + |> list.each(builder.shutdown_resource) + + Error(error) } } } } } -fn pool_spec( - pool_config: PoolConfig(resource_type, resource_create_error), - resources: deque.Deque(resource_type), - current_size: Int, +fn actor_builder( + builder: Builder(resource_type), init_timeout: Int, -) -> actor.Spec( - State(resource_type, resource_create_error), - Msg(resource_type, resource_create_error), +) -> actor.Builder( + State(resource_type), + Msg(resource_type), + Subject(Msg(resource_type)), ) { - actor.Spec(init_timeout:, loop: handle_pool_message, init: fn() { - let self = process.new_subject() + actor.new_with_initialiser(init_timeout, fn(self) { + use #(resources, current_size) <- result.try(create_pool_resources(builder)) // Trap exits process.trap_exits(True) let selector = process.new_selector() - |> process.selecting(self, function.identity) - |> process.selecting_trapped_exits(PoolExit) + |> process.select(self) + |> process.select_trapped_exits(PoolExit) let state = State( resources:, - checkout_strategy: pool_config.checkout_strategy, - creation_strategy: pool_config.creation_strategy, + checkout_strategy: builder.checkout_strategy, + creation_strategy: builder.creation_strategy, live_resources: dict.new(), selector:, current_size:, - max_size: pool_config.size, - create_resource: pool_config.create_resource, - shutdown_resource: pool_config.shutdown_resource, + max_size: builder.size, + create_resource: builder.create_resource, + shutdown_resource: builder.shutdown_resource, + log_errors: builder.log_errors, ) - actor.Ready(state, selector) + actor.initialised(state) + |> actor.selecting(selector) + |> actor.returning(self) + |> Ok }) + |> actor.on_message(handle_pool_message) } // ----- Utils ----- // -fn monitor_process( - selector: process.Selector(Msg(resource_type, resource_create_error)), - pid: Pid, -) { - let monitor = process.monitor_process(pid) +fn monitor_process(selector: process.Selector(Msg(resource_type)), pid: Pid) { + let monitor = process.monitor(pid) let selector = selector - |> process.selecting_process_down(monitor, CallerDown) + |> process.select_specific_monitor(monitor, CallerDown) #(monitor, selector) } fn demonitor_process( - selector: process.Selector(Msg(resource_type, resource_create_error)), - monitor: process.ProcessMonitor, + selector: process.Selector(Msg(resource_type)), + monitor: process.Monitor, ) { let selector = selector - |> process.deselecting_process_down(monitor) + |> process.deselect_specific_monitor(monitor) selector } -fn log_resource_creation_error(resource_create_error: resource_create_error) { - logging.log( - logging.Error, - "Bath: Resource creation failed: " <> string.inspect(resource_create_error), - ) +fn log_resource_creation_error( + log_errors: Bool, + resource_create_error: String, +) -> Nil { + case log_errors { + True -> + logging.log( + logging.Error, + "Bath: Resource creation failed: " <> resource_create_error, + ) + False -> Nil + } +} + +/// Iterate over a list, applying a function that returns a result. +/// If an `Error` is returned by the function, iteration stops and all +/// items to this point are returned. +@internal +pub fn try_map_returning( + over list: List(a), + with fun: fn(a) -> Result(b, e), +) -> Result(List(b), #(List(b), e)) { + list + |> list.try_fold([], fn(acc, item) { + case fun(item) { + Ok(value) -> Ok([value, ..acc]) + Error(error) -> Error(#(acc, error)) + } + }) + |> result.map(list.reverse) } diff --git a/test/bath_test.gleam b/test/bath_test.gleam index 2c725c3..d02473d 100644 --- a/test/bath_test.gleam +++ b/test/bath_test.gleam @@ -1,6 +1,9 @@ import bath import gleam/erlang/process +import gleam/function +import gleam/int import gleam/io +import gleam/otp/actor import gleeunit import gleeunit/should import logging @@ -14,10 +17,9 @@ pub fn main() { pub fn lifecycle_test() { let assert Ok(pool) = bath.new(fn() { Ok(10) }) - |> bath.with_size(1) - |> bath.with_shutdown(fn(res) { - io.debug("Shutting down") - io.debug(res) + |> bath.size(1) + |> bath.on_shutdown(fn(res) { + io.println("Shutting down with resource: " <> int.to_string(res)) Nil }) |> bath.start(1000) @@ -28,7 +30,7 @@ pub fn lifecycle_test() { pub fn empty_pool_fails_to_apply_test() { let assert Ok(pool) = bath.new(fn() { Ok(10) }) - |> bath.with_size(0) + |> bath.size(0) |> bath.start(1000) let assert Error(bath.NoResourcesAvailable) = bath.apply(pool, 1000, fn(_) { Nil }) @@ -38,7 +40,7 @@ pub fn empty_pool_fails_to_apply_test() { pub fn pool_has_correct_capacity_test() { let assert Ok(pool) = bath.new(fn() { Ok(10) }) - |> bath.with_size(1) + |> bath.size(1) |> bath.start(1000) let assert Ok(_) = bath.apply(pool, 1000, fn(_) { @@ -54,7 +56,7 @@ pub fn pool_has_correct_capacity_test() { pub fn pool_has_correct_resources_test() { let assert Ok(pool) = bath.new(fn() { Ok(10) }) - |> bath.with_size(10) + |> bath.size(10) |> bath.start(1000) let assert Ok(_) = @@ -70,19 +72,16 @@ pub fn pool_has_correct_resources_test() { pub fn pool_handles_caller_crash_test() { let assert Ok(pool) = bath.new(fn() { Ok(10) }) - |> bath.with_size(1) + |> bath.size(1) |> bath.start(1000) // Expect an error message here logging.set_level(logging.Critical) - process.start( - fn() { - use _ <- bath.apply(pool, 1000) - panic as "Oh no, the caller crashed!" - }, - False, - ) + process.spawn_unlinked(fn() { + use _ <- bath.apply(pool, 1000) + panic as "Oh no, the caller crashed!" + }) process.sleep(100) @@ -94,3 +93,40 @@ pub fn pool_handles_caller_crash_test() { let assert Ok(Nil) = bath.shutdown(pool, False, 1000) } + +pub fn eager_pool_fails_to_start_if_resource_creation_fails_test() { + let assert Error(actor.InitFailed("Failed to create resource")) = + bath.new(fn() { Error("Failed to create resource") }) + |> bath.creation_strategy(bath.Eager) + |> bath.start(1000) +} + +pub fn shutdown_function_gets_called_test() { + let self = process.new_subject() + + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.size(1) + |> bath.creation_strategy(bath.Eager) + |> bath.on_shutdown(fn(_) { process.send(self, "Shut down") }) + |> bath.start(1000) + + let assert Ok(Nil) = bath.shutdown(pool, False, 1000) + + let assert Ok("Shut down") = process.receive(self, 1000) +} + +// ----- Util tests ----- // + +pub fn try_map_returning_succeeds_if_no_errors_test() { + let list = [Ok(1), Ok(2), Ok(3)] + + let assert Ok([1, 2, 3]) = bath.try_map_returning(list, function.identity) +} + +pub fn try_map_returning_fails_early_if_any_errors_test() { + let list = [Ok(1), Error("error"), Ok(3)] + + let assert Error(#([1], "error")) = + bath.try_map_returning(list, function.identity) +}