diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..6109ab6 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,19 @@ +# Changelog + +## v2.0.0 - 2024-12-29 + +- Switch to a builder pattern for pool configuration. +- Change the `shutdown` API to take a `force` argument instead of having a separate + `force_shutdown` function. +- Improve reliability by tracking live resources and monitoring the calling process, + re-adding resources to the pool if the caller dies. +- Add checkout strategies (`FIFO` and `LIFO`). +- Add resource creation strategies (`Lazy` and `Eager`). +- Added `child_spec` and `from_subject` functions for creating a pool as part of a + supervision tree. + +Thanks to @lpil for their help debugging this release! + +## v1.0.0 - 2024-12-12 + +- Initial release diff --git a/README.md b/README.md index 1b238ef..4d54ed4 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ any value, such as database connections, file handles, or other resources. ## Installation ```sh -gleam add bath@1 +gleam add bath@2 ``` ## Usage @@ -20,7 +20,11 @@ import fake_db pub fn main() { // Create a pool of 10 connections to some fictional database. - let assert Ok(pool) = bath.init(10, fn() { fake_db.connect() }) + let assert Ok(pool) = + bath.new(fn() { fake_db.connect() }) + |> bath.with_size(10) + |> bath.with_shutdown(fn(conn) { fake_db.close(conn) }) + |> bath.start(1000) // Use the pool. Shown here in a block to use `use`. let assert Ok("Hello!") = { diff --git a/gleam.toml b/gleam.toml index df85b3b..c8b1ade 100644 --- a/gleam.toml +++ b/gleam.toml @@ -1,14 +1,16 @@ name = "bath" -version = "1.0.0" +version = "2.0.0" description = "A resource pool for Gleam!" licences = ["MIT"] repository = { type = "github", user = "Pevensie", repo = "bath" } # links = [{ title = "Website", href = "" }] [dependencies] -gleam_stdlib = ">= 0.34.0 and < 2.0.0" +gleam_stdlib = ">= 0.51.0 and < 2.0.0" gleam_otp = ">= 0.16.0 and < 1.0.0" gleam_erlang = ">= 0.33.0 and < 1.0.0" +gleam_deque = ">= 1.0.0 and < 2.0.0" +logging = ">= 1.3.0 and < 2.0.0" [dev-dependencies] gleeunit = ">= 1.0.0 and < 2.0.0" diff --git a/manifest.toml b/manifest.toml index 848c8ba..583c2bf 100644 --- a/manifest.toml +++ b/manifest.toml @@ -2,14 +2,18 @@ # You typically do not need to edit this file packages = [ + { name = "gleam_deque", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_deque", source = "hex", outer_checksum = "64D77068931338CF0D0CB5D37522C3E3CCA7CB7D6C5BACB41648B519CC0133C7" }, { name = "gleam_erlang", version = "0.33.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "A1D26B80F01901B59AABEE3475DD4C18D27D58FA5C897D922FCB9B099749C064" }, { name = "gleam_otp", version = "0.16.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "FA0EB761339749B4E82D63016C6A18C4E6662DA05BAB6F1346F9AF2E679E301A" }, { name = "gleam_stdlib", version = "0.51.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "14AFA8D3DDD7045203D422715DBB822D1725992A31DF35A08D97389014B74B68" }, { name = "gleeunit", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "F7A7228925D3EE7D0813C922E062BFD6D7E9310F0BEE585D3A42F3307E3CFD13" }, + { name = "logging", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "logging", source = "hex", outer_checksum = "1098FBF10B54B44C2C7FDF0B01C1253CAFACDACABEFB4B0D027803246753E06D" }, ] [requirements] +gleam_deque = { version = ">= 1.0.0 and < 2.0.0" } gleam_erlang = { version = ">= 0.33.0 and < 1.0.0" } gleam_otp = { version = ">= 0.16.0 and < 1.0.0" } -gleam_stdlib = { version = ">= 0.34.0 and < 2.0.0" } +gleam_stdlib = { version = ">= 0.51.0 and < 2.0.0" } gleeunit = { version = ">= 1.0.0 and < 2.0.0" } +logging = { version = ">= 1.3.0 and < 2.0.0" } diff --git a/src/bath.gleam b/src/bath.gleam index 05a1b45..47bd436 100644 --- a/src/bath.gleam +++ b/src/bath.gleam @@ -1,43 +1,127 @@ +import gleam/deque +import gleam/dict.{type Dict} import gleam/dynamic -import gleam/erlang/process +import gleam/erlang/process.{type Pid, type Subject} +import gleam/function import gleam/list import gleam/otp/actor import gleam/result +import gleam/string +import logging -type Msg(resource_type) { - CheckIn(resource_type) - CheckOut(process.Subject(Result(resource_type, ApplyError))) - Shutdown( - fn(resource_type) -> Nil, - process.Subject(Result(Nil, ShutdownError)), +// ---- Pool config ----- // + +/// The strategy used to check out a resource from the pool. +pub type CheckoutStrategy { + FIFO + LIFO +} + +/// How to create resources in the pool. `Lazy` will create resources when +/// required (i.e. the pool is empty but has extra capacity), while `Eager` will +/// create the maximum number of resources upfront. +pub type CreationStrategy { + Lazy + Eager +} + +/// Configuration for a [`Pool`](#Pool). +pub opaque type PoolConfig(resource_type, resource_create_error) { + PoolConfig( + size: Int, + create_resource: fn() -> Result(resource_type, resource_create_error), + shutdown_resource: fn(resource_type) -> Nil, + checkout_strategy: CheckoutStrategy, + creation_strategy: CreationStrategy, ) - ForceShutdown(fn(resource_type) -> Nil) } -type PoolSubject(resource_type) = - process.Subject(Msg(resource_type)) +/// Create a new [`PoolConfig`](#PoolConfig) for creating a pool of resources. +/// +/// ```gleam +/// import bath +/// import fake_db +/// +/// pub fn main() { +/// // Create a pool of 10 connections to some fictional database. +/// let assert Ok(pool) = +/// bath.new(fn() { fake_db.connect() }) +/// |> bath.with_size(10) +/// |> bath.start(1000) +/// } +/// ``` +/// +/// ### Default values +/// +/// | Config | Default | +/// |--------|---------| +/// | `size` | 10 | +/// | `shutdown_resource` | `fn(_resource) { Nil }` | +/// | `checkout_strategy` | `FIFO` | +/// | `creation_strategy` | `Lazy` | +pub fn new( + resource create_resource: fn() -> Result(resource_type, resource_create_error), +) -> PoolConfig(resource_type, resource_create_error) { + PoolConfig( + size: 10, + create_resource:, + shutdown_resource: fn(_) { Nil }, + checkout_strategy: FIFO, + creation_strategy: Lazy, + ) +} -type PoolState(resource_type) { - PoolState(checked_in: List(resource_type), checked_out: List(resource_type)) +/// Set the pool size. Defaults to 10. +pub fn with_size( + config pool_config: PoolConfig(resource_type, resource_create_error), + size size: Int, +) -> PoolConfig(resource_type, resource_create_error) { + PoolConfig(..pool_config, size:) } -/// A resource pool. -pub opaque type Pool(resource_type) { - Pool(size: Int, subject: PoolSubject(resource_type)) +/// Set a shutdown function to be run for each resource when the pool exits. +pub fn with_shutdown( + config pool_config: PoolConfig(resource_type, resource_create_error), + shutdown shutdown_resource: fn(resource_type) -> Nil, +) -> PoolConfig(resource_type, resource_create_error) { + PoolConfig(..pool_config, shutdown_resource:) } +/// Change the checkout strategy for the pool. Defaults to `FIFO`. +pub fn with_checkout_strategy( + config pool_config: PoolConfig(resource_type, resource_create_error), + strategy checkout_strategy: CheckoutStrategy, +) -> PoolConfig(resource_type, resource_create_error) { + PoolConfig(..pool_config, checkout_strategy:) +} + +/// Change the resource creation strategy for the pool. Defaults to `Lazy`. +pub fn with_creation_strategy( + config pool_config: PoolConfig(resource_type, resource_create_error), + strategy creation_strategy: CreationStrategy, +) -> PoolConfig(resource_type, resource_create_error) { + PoolConfig(..pool_config, creation_strategy:) +} + +// ----- Lifecycle functions ---- // + /// An error returned when creating a [`Pool`](#Pool). -pub type InitError(resource_create_error) { - /// The actor failed to start. - StartError(actor.StartError) - /// The resource creation function failed. - ResourceCreateError(resource_create_error) +pub type StartError(resource_create_error) { + PoolStartResourceCreateError(resource_create_error) + ActorStartError(actor.StartError) +} + +/// An error returned when failing to apply a function to a pooled resource. +pub type ApplyError(resource_create_error) { + NoResourcesAvailable + CheckOutResourceCreateError(resource_create_error) + CheckOutTimeout } /// An error returned when the resource pool fails to shut down. pub type ShutdownError { /// There are still resources checked out. Ignore this failure case by - /// calling [`force_shutdown`](#force_shutdown) function. + /// calling [`shutdown`](#shutdown) function with `force` set to `True`. ResourcesInUse /// The shutdown timeout expired. ShutdownTimeout @@ -45,77 +129,121 @@ pub type ShutdownError { CalleeDown(reason: dynamic.Dynamic) } -/// An error returned when failing to apply a function to a pooled resource. -pub type ApplyError { - /// There are no resources available in the pool. - NoResourcesAvailable - /// The checkout timeout expired. - CheckoutTimeout +/// Create a child actor spec pool actor, for use in your application's supervision tree, +/// using the given [`PoolConfig`](#PoolConfig). Once the pool is started, use +/// [`from_subject`](#from_subject) to create a [`Pool`](#Pool) from the +/// `Subject(bath.Msg(a, b))`. +pub fn child_spec( + config pool_config: PoolConfig(resource_type, resource_create_error), + timeout init_timeout: Int, +) -> Result( + actor.Spec( + State(resource_type, resource_create_error), + Msg(resource_type, resource_create_error), + ), + StartError(resource_create_error), +) { + let #(resources_result, current_size) = case pool_config.creation_strategy { + Lazy -> #(Ok(deque.new()), 0) + Eager -> #( + list.repeat("", pool_config.size) + |> list.try_map(fn(_) { pool_config.create_resource() }) + |> result.map(deque.from_list) + |> result.map_error(PoolStartResourceCreateError), + pool_config.size, + ) + } + + use resources <- result.try(resources_result) + + Ok(pool_spec(pool_config, resources, current_size, init_timeout)) } -/// Start a new resource pool. -/// -/// ```gleam -/// // Creates a pool with 10 strings. -/// let assert Ok(pool) = bath.init(10, fn() { Ok("Some pooled resource") }) -/// ``` -pub fn init( - size: Int, - resource_create_function: fn() -> Result(resource_type, resource_create_error), -) -> Result(Pool(resource_type), InitError(resource_create_error)) { - let resources_result = - list.repeat("", size) - |> list.try_map(fn(_) { - resource_create_function() |> result.map_error(ResourceCreateError) - }) +/// Create a [`Pool`](#Pool) from a `Subject(bath.Msg(a, b))`. Useful when +/// creating a pool as part of a supervision tree via the +/// [`child_spec`](#child_spec) function. +pub fn from_subject( + subject subject: Subject(Msg(resource_type, resource_create_error)), +) -> Pool(resource_type, resource_create_error) { + Pool(subject:) +} - use resources <- result.try(resources_result) +/// Start a pool actor using the given [`PoolConfig`](#PoolConfig) and return a +/// [`Pool`](#Pool). +pub fn start( + config pool_config: PoolConfig(resource_type, resource_create_error), + timeout init_timeout: Int, +) -> Result( + Pool(resource_type, resource_create_error), + StartError(resource_create_error), +) { + use spec <- result.try(child_spec(pool_config, init_timeout)) - let actor_result = - actor.start( - PoolState(checked_in: resources, checked_out: []), - handle_message, - ) - |> result.map_error(StartError) + actor.start_spec(spec) + |> result.map(fn(subject) { Pool(subject:) }) + |> result.map_error(ActorStartError) +} - use subject <- result.try(actor_result) - Ok(Pool(size:, subject:)) +/// Checks out a resource from the pool, sending the caller Pid for the pool to +/// monitor in case the client dies. This allows the pool to create a new resource +/// later if required. +fn check_out( + pool: Pool(resource_type, resource_create_error), + caller: Pid, + timeout: Int, +) -> Result(resource_type, ApplyError(resource_create_error)) { + process.try_call(pool.subject, CheckOut(_, caller:), timeout) + |> result.replace_error(CheckOutTimeout) + |> result.flatten +} + +fn check_in( + pool: Pool(resource_type, resource_create_error), + resource: resource_type, + caller: Pid, +) { + process.send(pool.subject, CheckIn(resource:, caller:)) } /// Check out a resource from the pool, apply the `next` function, then check /// the resource back in. /// /// ```gleam -/// let assert Ok(pool) = bath.init(10, fn() { Ok("Some pooled resource") }) +/// let assert Ok(pool) = +/// bath.new(fn() { Ok("Some pooled resource") }) +/// |> bath.start(1000) /// /// use resource <- bath.apply(pool, 1000) /// // Do stuff with resource... /// ``` pub fn apply( - pool: Pool(resource_type), - timeout: Int, - next: fn(resource_type) -> result_type, -) { - use resource <- result.try(check_out(pool.subject, timeout)) - let result = next(resource) - check_in(pool.subject, resource) - Ok(result) + pool pool: Pool(resource_type, resource_create_error), + timeout timeout: Int, + next next: fn(resource_type) -> result_type, +) -> Result(result_type, ApplyError(resource_create_error)) { + let self = process.self() + use resource <- result.try(check_out(pool, self, timeout)) + + let usage_result = next(resource) + + check_in(pool, resource, self) + + Ok(usage_result) } -/// Shut down the pool, calling the `resource_shutdown_function` on each -/// resource in the pool. +/// Shut down the pool, calling the `shutdown_function` on each +/// resource in the pool. Calling with `force` set to `True` will +/// force the shutdown, not calling the `shutdown_function` on any +/// resources. /// -/// Will fail if there are still resources checked out. +/// Will fail if there are still resources checked out, unless `force` is +/// `True`. pub fn shutdown( - pool: Pool(resource_type), - resource_shutdown_function: fn(resource_type) -> Nil, - timeout: Int, + pool pool: Pool(resource_type, resource_create_error), + force force: Bool, + timeout timeout: Int, ) { - process.try_call( - pool.subject, - Shutdown(resource_shutdown_function, _), - timeout, - ) + process.try_call(pool.subject, Shutdown(_, force), timeout) |> result.map_error(fn(err) { case err { process.CallTimeout -> ShutdownTimeout @@ -125,78 +253,294 @@ pub fn shutdown( |> result.flatten } -/// Shut down the pool, calling the `resource_shutdown_function` on each -/// resource in the pool. -/// -/// Will not fail, even if resources are checked out, and will call the -/// `resource_shutdown_function` on both checked in and checked out resources. -pub fn force_shutdown( - pool: Pool(resource_type), - resource_shutdown_function: fn(resource_type) -> Nil, -) { - process.send(pool.subject, ForceShutdown(resource_shutdown_function)) +// ----- Pool ----- // + +/// The interface for interacting with a pool of resources in Bath. +pub opaque type Pool(resource_type, resource_create_error) { + Pool(subject: Subject(Msg(resource_type, resource_create_error))) } -fn check_out(pool_subject: PoolSubject(resource_type), timeout: Int) { - process.try_call(pool_subject, CheckOut, timeout) - |> result.replace_error(CheckoutTimeout) - |> result.flatten +/// Pool actor state. +pub opaque type State(resource_type, resource_create_error) { + State( + // Config + checkout_strategy: CheckoutStrategy, + creation_strategy: CreationStrategy, + max_size: Int, + create_resource: fn() -> Result(resource_type, resource_create_error), + shutdown_resource: fn(resource_type) -> Nil, + // State + resources: deque.Deque(resource_type), + current_size: Int, + live_resources: LiveResources(resource_type), + selector: process.Selector(Msg(resource_type, resource_create_error)), + ) } -fn check_in( - pool_subject: PoolSubject(resource_type), - item: resource_type, -) -> Nil { - process.send(pool_subject, CheckIn(item)) +type LiveResources(resource_type) = + Dict(Pid, LiveResource(resource_type)) + +type LiveResource(resource_type) { + LiveResource(resource: resource_type, monitor: process.ProcessMonitor) +} + +/// A message sent to the pool actor. +pub opaque type Msg(resource_type, resource_create_error) { + CheckIn(resource: resource_type, caller: Pid) + CheckOut( + reply_to: Subject(Result(resource_type, ApplyError(resource_create_error))), + caller: Pid, + ) + PoolExit(process.ExitMessage) + CallerDown(process.ProcessDown) + Shutdown(reply_to: process.Subject(Result(Nil, ShutdownError)), force: Bool) } -fn handle_message(msg: Msg(resource_type), pool_state: PoolState(resource_type)) { +fn handle_pool_message( + msg: Msg(resource_type, resource_create_error), + state: State(resource_type, resource_create_error), +) { case msg { - CheckIn(resource) -> { - let checked_out = - pool_state.checked_out - |> list.filter(fn(item) { item != resource }) + CheckIn(resource:, caller:) -> { + // If the checked-in process currently has a live resource, remove it from + // the live_resources dict + let caller_live_resource = dict.get(state.live_resources, caller) + let live_resources = dict.delete(state.live_resources, caller) - let checked_in = [resource, ..pool_state.checked_in] - actor.continue(PoolState(checked_in:, checked_out:)) + let selector = case caller_live_resource { + Ok(live_resource) -> { + demonitor_process(state.selector, live_resource.monitor) + } + Error(_) -> state.selector + } + + let new_resources = deque.push_back(state.resources, resource) + + actor.with_selector( + actor.continue( + State(..state, resources: new_resources, live_resources:, selector:), + ), + selector, + ) } - CheckOut(client) -> { - case pool_state.checked_in { - [] -> { - actor.send(client, Error(NoResourcesAvailable)) - actor.continue(pool_state) + CheckOut(reply_to:, caller:) -> { + // We always push to the back, so for FIFO, we pop front, + // and for LIFO, we pop back + let get_result = case state.checkout_strategy { + FIFO -> deque.pop_front(state.resources) + LIFO -> deque.pop_back(state.resources) + } + + // Try to get a new resource, either from the pool or by creating a new one + // if we still have capacity + let resource_result = case get_result { + // Use an existing resource - current size hasn't changed + Ok(#(resource, new_resources)) -> + Ok(#(resource, new_resources, state.current_size)) + Error(_) -> { + // Nothing in the pool. Create a new resource if we can + case state.current_size < state.max_size { + True -> { + use resource <- result.try( + state.create_resource() + |> result.map_error(fn(err) { + log_resource_creation_error(err) + CheckOutResourceCreateError(err) + }), + ) + // Checked-in resources queue hasn't changed, but we've added a new resource + // so current size has increased + Ok(#(resource, state.resources, state.current_size + 1)) + } + False -> Error(NoResourcesAvailable) + } + } + } + + case resource_result { + Error(err) -> { + // Nothing has changed + actor.send(reply_to, Error(err)) + actor.continue(state) } - [chosen, ..checked_in] -> { - actor.send(client, Ok(chosen)) - actor.continue( - PoolState(checked_in:, checked_out: [ - chosen, - ..pool_state.checked_out - ]), + Ok(#(resource, new_resources, new_current_size)) -> { + // Monitor the caller process + let #(monitor, selector) = monitor_process(state.selector, caller) + + let live_resources = + dict.insert( + state.live_resources, + caller, + LiveResource(resource:, monitor:), + ) + + actor.send(reply_to, Ok(resource)) + actor.with_selector( + actor.continue( + State( + ..state, + resources: new_resources, + current_size: new_current_size, + selector:, + live_resources:, + ), + ), + selector, ) } } } - Shutdown(resource_shutdown_function, reply_to) -> { - case pool_state.checked_out { - [] -> { - pool_state.checked_in - |> list.each(resource_shutdown_function) + PoolExit(exit_message) -> { + // Don't clean up live resources, as they may be in use + state.resources + |> deque.to_list + |> list.each(state.shutdown_resource) + + actor.Stop(exit_message.reason) + } + Shutdown(reply_to:, force:) -> { + case dict.size(state.live_resources), force { + // No live resource, shut down + 0, _ -> { + state.resources + |> deque.to_list + |> list.each(state.shutdown_resource) - process.send(reply_to, Ok(Nil)) + actor.send(reply_to, Ok(Nil)) actor.Stop(process.Normal) } - _ -> { - process.send(reply_to, Error(ResourcesInUse)) - actor.continue(pool_state) + _, True -> { + // Force shutdown + actor.send(reply_to, Ok(Nil)) + actor.Stop(process.Normal) + } + _, False -> { + actor.send(reply_to, Error(ResourcesInUse)) + actor.continue(state) } } } - ForceShutdown(resource_shutdown_function) -> { - list.append(pool_state.checked_in, pool_state.checked_out) - |> list.each(resource_shutdown_function) + CallerDown(process_down) -> { + // If the caller was a live resource, either create a new one or + // decrement the current size depending on the creation strategy + case dict.get(state.live_resources, process_down.pid) { + // Continue as normal, ignoring this message + Error(_) -> actor.continue(state) + Ok(live_resource) -> { + // Demonitor the process + let selector = + demonitor_process(state.selector, live_resource.monitor) + + // Shutdown the old resource + state.shutdown_resource(live_resource.resource) + + let #(new_resources, new_current_size) = case + state.creation_strategy + { + // If we create lazily, just decrement the current size - a new resource + // will be created when required + Lazy -> #(state.resources, state.current_size - 1) + // Otherwise, create a new resource, warning if resource creation fails + Eager -> { + case state.create_resource() { + // Size hasn't changed + Ok(resource) -> #( + deque.push_back(state.resources, resource), + state.current_size, + ) + // Size has changed + Error(resource_create_error) -> { + log_resource_creation_error(resource_create_error) + #(state.resources, state.current_size) + } + } + } + } - actor.Stop(process.Normal) + actor.with_selector( + actor.continue( + State( + ..state, + resources: new_resources, + current_size: new_current_size, + selector:, + live_resources: dict.delete( + state.live_resources, + process_down.pid, + ), + ), + ), + selector, + ) + } + } } } } + +fn pool_spec( + pool_config: PoolConfig(resource_type, resource_create_error), + resources: deque.Deque(resource_type), + current_size: Int, + init_timeout: Int, +) -> actor.Spec( + State(resource_type, resource_create_error), + Msg(resource_type, resource_create_error), +) { + actor.Spec(init_timeout:, loop: handle_pool_message, init: fn() { + let self = process.new_subject() + + // Trap exits + process.trap_exits(True) + + let selector = + process.new_selector() + |> process.selecting(self, function.identity) + |> process.selecting_trapped_exits(PoolExit) + + let state = + State( + resources:, + checkout_strategy: pool_config.checkout_strategy, + creation_strategy: pool_config.creation_strategy, + live_resources: dict.new(), + selector:, + current_size:, + max_size: pool_config.size, + create_resource: pool_config.create_resource, + shutdown_resource: pool_config.shutdown_resource, + ) + + actor.Ready(state, selector) + }) +} + +// ----- Utils ----- // + +fn monitor_process( + selector: process.Selector(Msg(resource_type, resource_create_error)), + pid: Pid, +) { + let monitor = process.monitor_process(pid) + let selector = + selector + |> process.selecting_process_down(monitor, CallerDown) + #(monitor, selector) +} + +fn demonitor_process( + selector: process.Selector(Msg(resource_type, resource_create_error)), + monitor: process.ProcessMonitor, +) { + let selector = + selector + |> process.deselecting_process_down(monitor) + selector +} + +fn log_resource_creation_error(resource_create_error: resource_create_error) { + logging.log( + logging.Error, + "Bath: Resource creation failed: " <> string.inspect(resource_create_error), + ) +} diff --git a/test/bath_test.gleam b/test/bath_test.gleam index 8838a88..2c725c3 100644 --- a/test/bath_test.gleam +++ b/test/bath_test.gleam @@ -1,28 +1,45 @@ import bath +import gleam/erlang/process import gleam/io import gleeunit import gleeunit/should +import logging pub fn main() { + logging.configure() gleeunit.main() } // gleeunit test functions end in `_test` pub fn lifecycle_test() { - let assert Ok(pool) = bath.init(10, fn() { Ok(10) }) - let assert Ok(_) = bath.apply(pool, 1000, fn(n) { io.debug(n) }) - let assert Ok(_) = bath.shutdown(pool, fn(_) { Nil }, 1000) + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.with_size(1) + |> bath.with_shutdown(fn(res) { + io.debug("Shutting down") + io.debug(res) + Nil + }) + |> bath.start(1000) + let assert Ok(20) = bath.apply(pool, 1000, fn(n) { n * 2 }) + let assert Ok(Nil) = bath.shutdown(pool, False, 1000) } pub fn empty_pool_fails_to_apply_test() { - let assert Ok(pool) = bath.init(0, fn() { Ok(10) }) + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.with_size(0) + |> bath.start(1000) let assert Error(bath.NoResourcesAvailable) = bath.apply(pool, 1000, fn(_) { Nil }) - let assert Ok(_) = bath.shutdown(pool, fn(_) { Nil }, 1000) + let assert Ok(Nil) = bath.shutdown(pool, False, 1000) } pub fn pool_has_correct_capacity_test() { - let assert Ok(pool) = bath.init(1, fn() { Ok(10) }) + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.with_size(1) + |> bath.start(1000) let assert Ok(_) = bath.apply(pool, 1000, fn(_) { // Only one capacity, so attempting to check out another resource @@ -31,16 +48,49 @@ pub fn pool_has_correct_capacity_test() { bath.apply(pool, 1000, fn(_) { Nil }) Nil }) - let assert Ok(_) = bath.shutdown(pool, fn(_) { Nil }, 1000) + let assert Ok(Nil) = bath.shutdown(pool, False, 1000) } pub fn pool_has_correct_resources_test() { - let assert Ok(pool) = bath.init(1, fn() { Ok(10) }) + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.with_size(10) + |> bath.start(1000) + let assert Ok(_) = bath.apply(pool, 1000, fn(n) { // Check we have the right values n |> should.equal(10) }) - let assert Ok(_) = bath.shutdown(pool, fn(_) { Nil }, 1000) + + let assert Ok(Nil) = bath.shutdown(pool, False, 1000) +} + +pub fn pool_handles_caller_crash_test() { + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.with_size(1) + |> bath.start(1000) + + // Expect an error message here + logging.set_level(logging.Critical) + + process.start( + fn() { + use _ <- bath.apply(pool, 1000) + panic as "Oh no, the caller crashed!" + }, + False, + ) + + process.sleep(100) + + // Reset level + logging.configure() + + // Ensure the pool still has an available resource + let assert Ok(10) = bath.apply(pool, 1000, fn(r) { r }) + + let assert Ok(Nil) = bath.shutdown(pool, False, 1000) }