From 228582f25c4145cae6a939d99c08f03b1f3a3cc2 Mon Sep 17 00:00:00 2001 From: Isaac Harris-Holt Date: Wed, 25 Dec 2024 21:00:44 +0000 Subject: [PATCH 1/7] attempt to integrate OTP --- gleam.toml | 3 + manifest.toml | 6 + src/bath.gleam | 505 +++++++++++++++++++++++++++++++++++-------- src/bath_ffi.erl | 6 + test/bath_test.gleam | 62 +++--- 5 files changed, 464 insertions(+), 118 deletions(-) create mode 100644 src/bath_ffi.erl diff --git a/gleam.toml b/gleam.toml index df85b3b..954921f 100644 --- a/gleam.toml +++ b/gleam.toml @@ -9,6 +9,9 @@ repository = { type = "github", user = "Pevensie", repo = "bath" } gleam_stdlib = ">= 0.34.0 and < 2.0.0" gleam_otp = ">= 0.16.0 and < 1.0.0" gleam_erlang = ">= 0.33.0 and < 1.0.0" +gleam_deque = ">= 1.0.0 and < 2.0.0" +lamb = ">= 0.6.1 and < 1.0.0" +bravo = ">= 4.0.1 and < 5.0.0" [dev-dependencies] gleeunit = ">= 1.0.0 and < 2.0.0" diff --git a/manifest.toml b/manifest.toml index 848c8ba..147c21b 100644 --- a/manifest.toml +++ b/manifest.toml @@ -2,14 +2,20 @@ # You typically do not need to edit this file packages = [ + { name = "bravo", version = "4.0.1", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "bravo", source = "hex", outer_checksum = "D450DCD5A896ADDE442A93A7D6B5B2785374579A35416819E29E024887C2A872" }, + { name = "gleam_deque", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_deque", source = "hex", outer_checksum = "64D77068931338CF0D0CB5D37522C3E3CCA7CB7D6C5BACB41648B519CC0133C7" }, { name = "gleam_erlang", version = "0.33.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "A1D26B80F01901B59AABEE3475DD4C18D27D58FA5C897D922FCB9B099749C064" }, { name = "gleam_otp", version = "0.16.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "FA0EB761339749B4E82D63016C6A18C4E6662DA05BAB6F1346F9AF2E679E301A" }, { name = "gleam_stdlib", version = "0.51.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "14AFA8D3DDD7045203D422715DBB822D1725992A31DF35A08D97389014B74B68" }, { name = "gleeunit", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "F7A7228925D3EE7D0813C922E062BFD6D7E9310F0BEE585D3A42F3307E3CFD13" }, + { name = "lamb", version = "0.6.1", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "lamb", source = "hex", outer_checksum = "A74714DE60B3BADB623DFFF910C843793AE660222A9AD63C70053D33C0C3D311" }, ] [requirements] +bravo = { version = ">= 4.0.1 and < 5.0.0" } +gleam_deque = { version = ">= 1.0.0 and < 2.0.0" } gleam_erlang = { version = ">= 0.33.0 and < 1.0.0" } gleam_otp = { version = ">= 0.16.0 and < 1.0.0" } gleam_stdlib = { version = ">= 0.34.0 and < 2.0.0" } gleeunit = { version = ">= 1.0.0 and < 2.0.0" } +lamb = { version = ">= 0.6.1 and < 1.0.0" } diff --git a/src/bath.gleam b/src/bath.gleam index 05a1b45..58f9b2b 100644 --- a/src/bath.gleam +++ b/src/bath.gleam @@ -1,44 +1,67 @@ +import bravo +import bravo/uset +import gleam/deque +import gleam/dict.{type Dict} import gleam/dynamic import gleam/erlang/process +import gleam/function +import gleam/int +import gleam/io import gleam/list import gleam/otp/actor +import gleam/otp/static_supervisor import gleam/result - -type Msg(resource_type) { - CheckIn(resource_type) - CheckOut(process.Subject(Result(resource_type, ApplyError))) - Shutdown( - fn(resource_type) -> Nil, - process.Subject(Result(Nil, ShutdownError)), - ) - ForceShutdown(fn(resource_type) -> Nil) -} +import gleam/string +import lamb +import lamb/query type PoolSubject(resource_type) = - process.Subject(Msg(resource_type)) - -type PoolState(resource_type) { - PoolState(checked_in: List(resource_type), checked_out: List(resource_type)) -} + process.Subject(PoolMsg(resource_type)) /// A resource pool. pub opaque type Pool(resource_type) { - Pool(size: Int, subject: PoolSubject(resource_type)) + Pool( + size: Int, + subject: PoolSubject(resource_type), + supervisor: process.Pid, + resource_shutdown_function: fn(resource_type) -> Nil, + ) } +type LiveWorker(resource_type) { + LiveWorker( + // worker_pid: process.Pid, + worker: ResourceSubject(resource_type), + worker_monitor: process.ProcessMonitor, + caller: process.Pid, + ) +} + +type LiveWorkers(resource_type) = + Dict(process.Pid, LiveWorker(resource_type)) + /// An error returned when creating a [`Pool`](#Pool). pub type InitError(resource_create_error) { - /// The actor failed to start. - StartError(actor.StartError) + /// The pool actor failed to start. + PoolStartError(actor.StartError) + /// The monitor actor failed to start. + MonitorStartError(actor.StartError) + /// A worker actor failed to start. + WorkerStartError(actor.StartError) /// The resource creation function failed. ResourceCreateError(resource_create_error) + // TableCreateError(lamb.Error) + /// ETS table creation failed. + TableCreateError(bravo.BravoError) + /// The supervisor failed to start. + SupervisorStartError(dynamic.Dynamic) } /// An error returned when the resource pool fails to shut down. pub type ShutdownError { /// There are still resources checked out. Ignore this failure case by /// calling [`force_shutdown`](#force_shutdown) function. - ResourcesInUse + ResourcesInUse(remaining: Int) /// The shutdown timeout expired. ShutdownTimeout /// The pool was already down or failed to send the response message. @@ -51,6 +74,8 @@ pub type ApplyError { NoResourcesAvailable /// The checkout timeout expired. CheckoutTimeout + /// The worker failed to be called. + WorkerCallError(process.CallError(dynamic.Dynamic)) } /// Start a new resource pool. @@ -62,24 +87,68 @@ pub type ApplyError { pub fn init( size: Int, resource_create_function: fn() -> Result(resource_type, resource_create_error), + resource_shutdown_function: fn(resource_type) -> Nil, + pool_init_timeout: Int, + worker_init_timeout: Int, ) -> Result(Pool(resource_type), InitError(resource_create_error)) { - let resources_result = + // use live_workers <- result.try( + // // lamb.create( + // // name: "bath_live_workers", + // // access: lamb.Public, + // // kind: lamb.Set, + // // registered: False, + // // ) + // // TODO: use random name + // uset.new(name: "bath_live_workers", access: bravo.Public, keypos: 1) + // |> result.map_error(TableCreateError), + // ) + + let live_workers = dict.new() + + let actor_result = + actor.start_spec(pool_spec(live_workers, pool_init_timeout)) + |> result.map_error(PoolStartError) + + use subject <- result.try(actor_result) + + let workers_result = list.repeat("", size) |> list.try_map(fn(_) { - resource_create_function() |> result.map_error(ResourceCreateError) + use subject <- result.try( + actor.start_spec(worker_spec( + resource_create_function, + resource_shutdown_function, + subject, + worker_init_timeout, + )) + |> result.map_error(WorkerStartError), + ) + Ok(subject) }) - use resources <- result.try(resources_result) + use workers <- result.try(workers_result) - let actor_result = - actor.start( - PoolState(checked_in: resources, checked_out: []), - handle_message, - ) - |> result.map_error(StartError) + let sup = + static_supervisor.new(static_supervisor.OneForOne) + |> static_supervisor.auto_shutdown(static_supervisor.AllSignificant) + let sup_result = + workers + |> list.index_fold(sup, fn(sup, actor, idx) { + static_supervisor.add( + sup, + static_supervisor.worker_child("worker_" <> int.to_string(idx), fn() { + process.subject_owner(actor) |> Ok + }) + |> static_supervisor.significant(True) + |> static_supervisor.restart(static_supervisor.Transient), + ) + }) + |> static_supervisor.start_link() + |> result.map_error(SupervisorStartError) - use subject <- result.try(actor_result) - Ok(Pool(size:, subject:)) + use sup <- result.try(sup_result) + + Ok(Pool(size:, subject:, supervisor: sup, resource_shutdown_function:)) } /// Check out a resource from the pool, apply the `next` function, then check @@ -95,34 +164,40 @@ pub fn apply( pool: Pool(resource_type), timeout: Int, next: fn(resource_type) -> result_type, -) { - use resource <- result.try(check_out(pool.subject, timeout)) - let result = next(resource) - check_in(pool.subject, resource) - Ok(result) +) -> Result(result_type, ApplyError) { + let self = process.self() + use worker <- result.try(check_out(pool.subject, self, timeout)) + let next = unsafe_coerce_to_dynamic_function(next) + + let result = + process.try_call(worker, UseResource(next, _), timeout) + |> result.map_error(WorkerCallError) + + check_in(pool.subject, worker) + + use return_value <- result.try(result) + Ok(unsafe_coerce_to_return_type(return_value)) } /// Shut down the pool, calling the `resource_shutdown_function` on each /// resource in the pool. /// /// Will fail if there are still resources checked out. -pub fn shutdown( - pool: Pool(resource_type), - resource_shutdown_function: fn(resource_type) -> Nil, - timeout: Int, -) { - process.try_call( - pool.subject, - Shutdown(resource_shutdown_function, _), - timeout, - ) - |> result.map_error(fn(err) { - case err { - process.CallTimeout -> ShutdownTimeout - process.CalleeDown(reason) -> CalleeDown(reason:) - } - }) - |> result.flatten +pub fn shutdown(pool: Pool(resource_type), timeout: Int) { + process.send_exit(pool.supervisor) + Ok(Nil) + // process.try_call( + // pool.subject, + // Shutdown(pool.resource_shutdown_function, pool.supervisor, _), + // timeout, + // ) + // |> result.map_error(fn(err) { + // case err { + // process.CallTimeout -> ShutdownTimeout + // process.CalleeDown(reason) -> CalleeDown(reason:) + // } + // }) + // |> result.flatten } /// Shut down the pool, calling the `resource_shutdown_function` on each @@ -134,69 +209,325 @@ pub fn force_shutdown( pool: Pool(resource_type), resource_shutdown_function: fn(resource_type) -> Nil, ) { - process.send(pool.subject, ForceShutdown(resource_shutdown_function)) + process.send( + pool.subject, + ForceShutdown(resource_shutdown_function, pool.supervisor), + ) } -fn check_out(pool_subject: PoolSubject(resource_type), timeout: Int) { - process.try_call(pool_subject, CheckOut, timeout) +fn check_out( + pool_subject: PoolSubject(resource_type), + caller: process.Pid, + timeout: Int, +) { + process.try_call(pool_subject, CheckOut(_, caller:), timeout) |> result.replace_error(CheckoutTimeout) |> result.flatten } fn check_in( pool_subject: PoolSubject(resource_type), - item: resource_type, + worker: ResourceSubject(resource_type), ) -> Nil { - process.send(pool_subject, CheckIn(item)) + process.send(pool_subject, CheckIn(worker)) +} + +// ----- Pool actor ----- // + +type PoolState(resource_type) { + PoolState( + workers: deque.Deque(ResourceSubject(resource_type)), + // live_workers: lamb.Table(process.Pid, LiveWorker(resource_type)), + // live_workers: uset.USet(LiveWorker(resource_type)), + live_workers: LiveWorkers(resource_type), + // supervisor: process.Pid, + selector: process.Selector(PoolMsg(resource_type)), + ) +} + +type PoolMsg(resource_type) { + ProcessDown(process.ProcessDown) + CheckIn(worker: ResourceSubject(resource_type)) + CheckOut( + reply_to: process.Subject( + Result(ResourceSubject(resource_type), ApplyError), + ), + caller: process.Pid, + ) + Shutdown( + resource_shutdown_function: fn(resource_type) -> Nil, + supervisor: process.Pid, + reply_to: process.Subject(Result(Nil, ShutdownError)), + ) + ForceShutdown( + resource_shutdown_function: fn(resource_type) -> Nil, + supervisor: process.Pid, + ) } -fn handle_message(msg: Msg(resource_type), pool_state: PoolState(resource_type)) { +fn handle_pool_message( + msg: PoolMsg(resource_type), + pool_state: PoolState(resource_type), +) { case msg { - CheckIn(resource) -> { - let checked_out = - pool_state.checked_out - |> list.filter(fn(item) { item != resource }) + CheckIn(worker:) -> { + let new_workers = deque.push_back(pool_state.workers, worker) - let checked_in = [resource, ..pool_state.checked_in] - actor.continue(PoolState(checked_in:, checked_out:)) - } - CheckOut(client) -> { - case pool_state.checked_in { - [] -> { - actor.send(client, Error(NoResourcesAvailable)) - actor.continue(pool_state) + // let query = + // query.new() + // |> query.index(worker |> process.subject_owner) + + // lamb.remove(pool_state.live_workers, query) + + // uset.delete_key(pool_state.live_workers, worker |> process.subject_owner) + let selector = case + dict.get(pool_state.live_workers, worker |> process.subject_owner) + { + Ok(live_worker) -> { + // Demonitor the process + let selector = + pool_state.selector + |> process.deselecting_process_down(live_worker.worker_monitor) + + process.demonitor_process(live_worker.worker_monitor) + + selector } - [chosen, ..checked_in] -> { - actor.send(client, Ok(chosen)) - actor.continue( - PoolState(checked_in:, checked_out: [ - chosen, - ..pool_state.checked_out - ]), + Error(_) -> pool_state.selector + } + + let live_workers = + dict.delete(pool_state.live_workers, worker |> process.subject_owner) + + actor.with_selector( + actor.continue(PoolState(workers: new_workers, live_workers:, selector:)), + selector, + ) + } + CheckOut(reply_to:, caller:) -> { + case deque.pop_front(pool_state.workers) { + Ok(#(worker, new_workers)) -> { + // Add the worker to the live_workers table + let worker_pid = process.subject_owner(worker) + let worker_monitor = process.monitor_process(worker_pid) + + // lamb.insert( + // pool_state.live_workers, + // worker |> process.subject_owner, + // LiveWorker(worker:, worker_monitor:, caller:), + // ) + // uset.insert(pool_state.live_workers, [ + // LiveWorker(worker_pid:, worker:, worker_monitor:, caller:), + // ]) + let live_workers = + dict.insert( + pool_state.live_workers, + worker |> process.subject_owner, + LiveWorker(worker:, worker_monitor:, caller:), + ) + + actor.send(reply_to, Ok(worker)) + + io.debug("live workers: check out") + io.debug(pool_state.live_workers) + + let selector = + pool_state.selector + |> process.selecting_process_down(worker_monitor, ProcessDown) + + actor.with_selector( + actor.continue(PoolState( + workers: new_workers, + selector:, + live_workers:, + )), + selector, ) } + Error(_) -> { + actor.send(reply_to, Error(NoResourcesAvailable)) + actor.continue(pool_state) + } } } - Shutdown(resource_shutdown_function, reply_to) -> { - case pool_state.checked_out { - [] -> { - pool_state.checked_in - |> list.each(resource_shutdown_function) + Shutdown(resource_shutdown_function:, supervisor:, reply_to:) -> { + io.debug("Shutdown") + case dict.size(pool_state.live_workers) { + 0 -> { + pool_state.workers + |> deque.to_list + |> list.each(actor.send(_, ShutdownResource( + resource_shutdown_function, + supervisor:, + ))) + + // TODO: handle this better + // process.kill(supervisor) process.send(reply_to, Ok(Nil)) actor.Stop(process.Normal) } - _ -> { - process.send(reply_to, Error(ResourcesInUse)) + remaining -> { + process.send(reply_to, Error(ResourcesInUse(remaining:))) actor.continue(pool_state) } } } - ForceShutdown(resource_shutdown_function) -> { - list.append(pool_state.checked_in, pool_state.checked_out) - |> list.each(resource_shutdown_function) + ForceShutdown(resource_shutdown_function:, supervisor:) -> { + // static_super + // list.append(pool_state.checked_in, pool_state.checked_out) + // |> list.each(resource_shutdown_function) + + // TODO: handle this better + // process.kill(supervisor) actor.Stop(process.Normal) } + ProcessDown(process_down) -> { + // TODO: remove let_assert + // let assert [live_worker] = + // lamb.lookup(pool_state.live_workers, process_down.pid) + // let assert Ok(live_worker) = + // uset.lookup(pool_state.live_workers, process_down.pid) + io.debug("Pool live workers") + io.debug(pool_state.live_workers) + let #(workers, selector) = case + dict.get(pool_state.live_workers, process_down.pid) + { + Ok(live_worker) -> { + // Demonitor the process + let selector = + pool_state.selector + |> process.deselecting_process_down(live_worker.worker_monitor) + + process.demonitor_process(live_worker.worker_monitor) + + // Remove the worker from the workers deque + let new_workers = + pool_state.workers + |> deque.to_list + |> list.filter(fn(worker) { worker != live_worker.worker }) + |> deque.from_list + + #(new_workers, selector) + } + Error(_) -> #(pool_state.workers, pool_state.selector) + } + + // Remove the process from the live_workers set + // let query = query.new() |> query.index(process_down.pid) + // lamb.remove(pool_state.live_workers, query) + // uset.delete_key(pool_state.live_workers, process_down.pid) + + let live_workers = dict.delete(pool_state.live_workers, process_down.pid) + + actor.with_selector( + actor.continue(PoolState(workers:, selector:, live_workers:)), + pool_state.selector, + ) + } } } + +fn pool_spec( + live_workers: LiveWorkers(resource_type), + init_timeout: Int, +) -> actor.Spec(PoolState(resource_type), PoolMsg(resource_type)) { + actor.Spec( + init: fn() { + let selector = process.new_selector() + actor.Ready(PoolState(deque.new(), selector:, live_workers:), selector) + }, + init_timeout:, + loop: handle_pool_message, + ) +} + +// ----- Resource actor ----- // + +type ResourceMessage(resource_type) { + UseResource( + next: fn(resource_type) -> dynamic.Dynamic, + reply_to: process.Subject(dynamic.Dynamic), + ) + // Takes a shutdown function + ShutdownResource( + resource_shutdown_function: fn(resource_type) -> Nil, + supervisor: process.Pid, + ) + ResourceExit( + process.ExitMessage, + resource_shutdown_function: fn(resource_type) -> Nil, + ) +} + +type ResourceSubject(resource_type) = + process.Subject(ResourceMessage(resource_type)) + +fn worker_spec( + resource_create_function: fn() -> Result(resource_type, resource_create_error), + resource_shutdown_function: fn(resource_type) -> Nil, + pool_subject: PoolSubject(resource_type), + init_timeout: Int, +) -> actor.Spec(resource_type, ResourceMessage(resource_type)) { + actor.Spec( + init: fn() { + case resource_create_function() { + Ok(resource) -> { + // Check in the worker + let self = process.new_subject() + process.send(pool_subject, CheckIn(self)) + // process.trap_exits(True) + + let selector = + process.new_selector() + |> process.selecting(self, function.identity) + |> process.selecting_trapped_exits(ResourceExit( + _, + resource_shutdown_function, + )) + + actor.Ready(resource, selector) + } + Error(resource_create_error) -> { + actor.Failed(resource_create_error |> string.inspect) + } + } + }, + init_timeout:, + loop: handle_resource_message, + ) +} + +fn handle_resource_message( + msg: ResourceMessage(resource_type), + resource: resource_type, +) { + case msg { + UseResource(next:, reply_to:) -> { + actor.send(reply_to, next(resource)) + actor.continue(resource) + } + ShutdownResource(resource_shutdown_function:, supervisor:) -> { + io.debug("Worker shutting down") + resource_shutdown_function(resource) + actor.Stop(process.Normal) + } + ResourceExit(exit_message, resource_shutdown_function:) -> { + io.debug("exit") + io.debug(exit_message) + resource_shutdown_function(resource) + actor.Stop(process.Normal) + } + } +} + +// ----- Utils ----- // + +@external(erlang, "bath_ffi", "unsafe_coerce") +fn unsafe_coerce_to_dynamic_function( + next: fn(resource_type) -> return_type, +) -> fn(resource_type) -> dynamic.Dynamic + +@external(erlang, "bath_ffi", "unsafe_coerce") +fn unsafe_coerce_to_return_type(term: dynamic.Dynamic) -> return_type diff --git a/src/bath_ffi.erl b/src/bath_ffi.erl new file mode 100644 index 0000000..c838425 --- /dev/null +++ b/src/bath_ffi.erl @@ -0,0 +1,6 @@ +-module(bath_ffi). + +-export([unsafe_coerce/1]). + +unsafe_coerce(Term) -> + Term. diff --git a/test/bath_test.gleam b/test/bath_test.gleam index 8838a88..64a79c6 100644 --- a/test/bath_test.gleam +++ b/test/bath_test.gleam @@ -1,4 +1,5 @@ import bath +import gleam/erlang/process import gleam/io import gleeunit import gleeunit/should @@ -9,38 +10,37 @@ pub fn main() { // gleeunit test functions end in `_test` pub fn lifecycle_test() { - let assert Ok(pool) = bath.init(10, fn() { Ok(10) }) + let assert Ok(pool) = bath.init(3, fn() { Ok(10) }, fn(_) { Nil }, 1000, 1000) let assert Ok(_) = bath.apply(pool, 1000, fn(n) { io.debug(n) }) - let assert Ok(_) = bath.shutdown(pool, fn(_) { Nil }, 1000) + let assert Ok(_) = bath.shutdown(pool, 1000) } +// pub fn empty_pool_fails_to_apply_test() { +// let assert Ok(pool) = bath.init(0, fn() { Ok(10) }, 1000) +// let assert Error(bath.NoResourcesAvailable) = +// bath.apply(pool, 1000, fn(_) { Nil }) +// let assert Ok(_) = bath.shutdown(pool, fn(_) { Nil }, 1000) +// } -pub fn empty_pool_fails_to_apply_test() { - let assert Ok(pool) = bath.init(0, fn() { Ok(10) }) - let assert Error(bath.NoResourcesAvailable) = - bath.apply(pool, 1000, fn(_) { Nil }) - let assert Ok(_) = bath.shutdown(pool, fn(_) { Nil }, 1000) -} - -pub fn pool_has_correct_capacity_test() { - let assert Ok(pool) = bath.init(1, fn() { Ok(10) }) - let assert Ok(_) = - bath.apply(pool, 1000, fn(_) { - // Only one capacity, so attempting to check out another resource - // should fail - let assert Error(bath.NoResourcesAvailable) = - bath.apply(pool, 1000, fn(_) { Nil }) - Nil - }) - let assert Ok(_) = bath.shutdown(pool, fn(_) { Nil }, 1000) -} +// pub fn pool_has_correct_capacity_test() { +// let assert Ok(pool) = bath.init(1, fn() { Ok(10) }, 1000) +// let assert Ok(_) = +// bath.apply(pool, 1000, fn(_) { +// // Only one capacity, so attempting to check out another resource +// // should fail +// let assert Error(bath.NoResourcesAvailable) = +// bath.apply(pool, 1000, fn(_) { Nil }) +// Nil +// }) +// let assert Ok(_) = bath.shutdown(pool, fn(_) { Nil }, 1000) +// } -pub fn pool_has_correct_resources_test() { - let assert Ok(pool) = bath.init(1, fn() { Ok(10) }) - let assert Ok(_) = - bath.apply(pool, 1000, fn(n) { - // Check we have the right values - n - |> should.equal(10) - }) - let assert Ok(_) = bath.shutdown(pool, fn(_) { Nil }, 1000) -} +// pub fn pool_has_correct_resources_test() { +// let assert Ok(pool) = bath.init(1, fn() { Ok(10) }, 1000) +// let assert Ok(_) = +// bath.apply(pool, 1000, fn(n) { +// // Check we have the right values +// n +// |> should.equal(10) +// }) +// let assert Ok(_) = bath.shutdown(pool, fn(_) { Nil }, 1000) +// } From 575a302322f965cd69a699eabbf100c66b38832e Mon Sep 17 00:00:00 2001 From: Isaac Harris-Holt Date: Thu, 26 Dec 2024 22:46:41 +0000 Subject: [PATCH 2/7] more v2 stuff --- src/bath_v2.gleam | 446 +++++++++++++++++++++++++++++++++++++++++++ test/bath_test.gleam | 55 +++--- 2 files changed, 479 insertions(+), 22 deletions(-) create mode 100644 src/bath_v2.gleam diff --git a/src/bath_v2.gleam b/src/bath_v2.gleam new file mode 100644 index 0000000..633b5cd --- /dev/null +++ b/src/bath_v2.gleam @@ -0,0 +1,446 @@ +import gleam/deque +import gleam/dict.{type Dict} +import gleam/dynamic +import gleam/erlang/process.{type Pid, type Subject} +import gleam/function +import gleam/int +import gleam/io +import gleam/list +import gleam/otp/actor +import gleam/otp/static_supervisor as sup +import gleam/result +import gleam/string + +// ---- Pool config ----- // + +pub type Strategy { + FIFO + LIFO +} + +pub opaque type PoolConfig(resource_type, resource_create_error) { + PoolConfig( + size: Int, + create_resource: fn() -> Result(resource_type, resource_create_error), + shutdown_resource: fn(resource_type) -> Nil, + strategy: Strategy, + ) +} + +pub fn new( + resource create_resource: fn() -> Result(resource_type, resource_create_error), +) -> PoolConfig(resource_type, resource_create_error) { + PoolConfig( + size: 10, + create_resource: create_resource, + shutdown_resource: fn(_) { Nil }, + strategy: FIFO, + ) +} + +pub fn with_size( + config pool_config: PoolConfig(resource_type, resource_create_error), + size size: Int, +) -> PoolConfig(resource_type, resource_create_error) { + PoolConfig(..pool_config, size:) +} + +pub fn with_shutdown( + config pool_config: PoolConfig(resource_type, resource_create_error), + shutdown shutdown_resource: fn(resource_type) -> Nil, +) -> PoolConfig(resource_type, resource_create_error) { + PoolConfig(..pool_config, shutdown_resource:) +} + +pub fn with_strategy( + config pool_config: PoolConfig(resource_type, resource_create_error), + strategy strategy: Strategy, +) -> PoolConfig(resource_type, resource_create_error) { + PoolConfig(..pool_config, strategy:) +} + +// ----- Lifecycle functions ---- // + +pub type StartError { + PoolActorStartError(actor.StartError) + WorkerStartError(actor.StartError) + PoolSupervisorStartError(dynamic.Dynamic) + WorkerSupervisorStartError(dynamic.Dynamic) +} + +pub type ApplyError { + NoResourcesAvailable + CheckOutTimeout + WorkerCallTimeout + WorkerCrashed(process.ProcessDown) +} + +pub fn start( + config pool_config: PoolConfig(resource_type, resource_create_error), + timeout init_timeout: Int, + // TODO: errors +) -> Result(Pool(resource_type), StartError) { + // The supervision tree for pools looks like this: + // supervisor (probably rest for 1?) + // | | + // | | + // pool supervisor (one for one) + // | | | + // / | \ + // / | \ + // worker worker worker + + let main_supervisor = sup.new(sup.RestForOne) + let worker_supervisor = sup.new(sup.OneForOne) + + let pool_start_result = + actor.start_spec(pool_spec(pool_config, init_timeout)) + |> result.map_error(PoolActorStartError) + + use pool_subject <- result.try(pool_start_result) + + let workers_result = + list.repeat("", pool_config.size) + |> list.try_map(fn(_) { + use subject <- result.try( + actor.start_spec(worker_spec( + pool_subject, + pool_config.create_resource, + pool_config.shutdown_resource, + init_timeout, + )) + |> result.map_error(WorkerStartError), + ) + Ok(subject) + }) + + use workers <- result.try(workers_result) + + // Add workers to the worker supervisor and start it + let worker_supervisor_result = + workers + |> list.index_fold(worker_supervisor, fn(worker_supervisor, actor, idx) { + sup.add( + worker_supervisor, + sup.worker_child("worker_" <> int.to_string(idx), fn() { + process.subject_owner(actor) |> Ok + }) + |> sup.restart(sup.Transient), + ) + }) + |> sup.start_link() + |> result.map_error(WorkerSupervisorStartError) + + use worker_supervisor <- result.try(worker_supervisor_result) + + // Add the pool and worker supervisors to the main supervisor + let main_supervisor_result = + sup.add( + main_supervisor, + sup.worker_child("pool", fn() { + process.subject_owner(pool_subject) |> Ok + }) + |> sup.restart(sup.Transient), + ) + |> sup.add( + sup.supervisor_child("worker_supervisor", fn() { Ok(worker_supervisor) }) + |> sup.restart(sup.Transient), + ) + |> sup.start_link() + |> result.map_error(PoolSupervisorStartError) + + use main_supervisor <- result.try(main_supervisor_result) + + Ok(Pool(subject: pool_subject, supervisor: main_supervisor)) +} + +fn check_out( + pool: Pool(resource_type), + caller: Subject(UsageResult), + timeout: Int, +) -> Result(WorkerSubject(resource_type), ApplyError) { + process.try_call(pool.subject, CheckOut(_, caller:), timeout) + |> result.replace_error(CheckOutTimeout) + |> result.flatten +} + +fn check_in( + pool: Pool(resource_type), + worker_subject: WorkerSubject(resource_type), +) { + process.send(pool.subject, CheckIn(worker_subject)) +} + +pub fn apply( + pool: Pool(resource_type), + timeout: Int, + next: fn(resource_type) -> result_type, +) -> Result(result_type, ApplyError) { + let self = process.new_subject() + use worker_subject <- result.try(check_out(pool, self, timeout)) + + // Use manual send instead of try_call so we can use the same caller subject + // we sent with the check out message + process.send( + worker_subject, + UseResource(self, unsafe_coerce_to_dynamic_function(next)), + ) + + let usage_result = + process.receive(self, timeout) + |> result.map(fn(result) { + result + |> result.map(unsafe_coerce_to_return_type) + |> result.map_error(WorkerCrashed) + }) + + let usage_result = case usage_result { + // Timeout + Error(Nil) -> Error(WorkerCallTimeout) + Ok(Error(err)) -> Error(err) + Ok(Ok(result)) -> Ok(result) + } + + check_in(pool, worker_subject) + + usage_result +} + +// ----- Pool ----- // + +pub opaque type Pool(resource_type) { + Pool(subject: Subject(PoolMsg(resource_type)), supervisor: Pid) +} + +type PoolState(resource_type) { + PoolState( + workers: deque.Deque(Worker(resource_type)), + strategy: Strategy, + live_workers: LiveWorkers(resource_type), + selector: process.Selector(PoolMsg(resource_type)), + ) +} + +type LiveWorkers(resource_type) = + Dict(Pid, LiveWorker(resource_type)) + +type LiveWorker(resource_type) { + LiveWorker(worker: Worker(resource_type), caller: Subject(UsageResult)) +} + +type PoolMsg(resource_type) { + CheckIn(WorkerSubject(resource_type)) + CheckOut( + reply_to: Subject(Result(WorkerSubject(resource_type), ApplyError)), + caller: Subject(UsageResult), + ) + PoolExit(process.ExitMessage) + WorkerDown(process.ProcessDown) +} + +fn handle_pool_message( + msg: PoolMsg(resource_type), + state: PoolState(resource_type), +) { + // TODO: process monitoring + case msg { + CheckIn(worker_subject) -> { + // If the checked-in process is a currently live worker, remove it from + // the live_workers dict + let live_workers = + dict.delete(state.live_workers, worker_subject |> process.subject_owner) + + // Monitor the new worker process + let monitor = + process.monitor_process(worker_subject |> process.subject_owner) + + let new_workers = + deque.push_back( + state.workers, + Worker(subject: worker_subject, monitor:), + ) + + let selector = + state.selector + |> process.selecting_process_down(monitor, WorkerDown) + + actor.continue( + PoolState(..state, workers: new_workers, live_workers:, selector:), + ) + } + CheckOut(reply_to:, caller:) -> { + // We always push to the back, so for FIFO, we pop front, + // and for LIFO, we pop back + let get_result = case state.strategy { + FIFO -> deque.pop_front(state.workers) + LIFO -> deque.pop_back(state.workers) + } + + case get_result { + Ok(#(worker, new_workers)) -> { + let live_workers = + dict.insert( + state.live_workers, + worker.subject |> process.subject_owner, + LiveWorker(worker:, caller:), + ) + actor.send(reply_to, Ok(worker.subject)) + actor.continue(PoolState(..state, workers: new_workers)) + } + Error(_) -> { + actor.send(reply_to, Error(NoResourcesAvailable)) + actor.continue(state) + } + } + } + PoolExit(exit_message) -> { + io.debug("Pool exited") + // TODO: cleanup + actor.Stop(process.Normal) + } + WorkerDown(process_down) -> { + let #(maybe_worker, new_workers) = + state.workers + |> deque.to_list + |> list.partition(fn(worker) { + process.subject_owner(worker.subject) == process_down.pid + }) + + // If the worker exists, demonitor it + let selector = case maybe_worker { + [worker] -> { + process.demonitor_process(worker.monitor) + + state.selector + |> process.deselecting_process_down(worker.monitor) + } + _ -> state.selector + } + + // If the process was a live worker, send an error message back + // to the caller + case dict.get(state.live_workers, process_down.pid) { + Ok(live_worker) -> { + process.send(live_worker.caller, Error(process_down)) + } + Error(_) -> Nil + } + + // Delete the process from the live_workers dict + let live_workers = dict.delete(state.live_workers, process_down.pid) + + actor.continue( + PoolState( + ..state, + live_workers:, + selector:, + workers: new_workers |> deque.from_list, + ), + ) + } + } +} + +fn pool_spec( + pool_config: PoolConfig(resource_type, resource_create_error), + init_timeout: Int, +) -> actor.Spec(PoolState(resource_type), PoolMsg(resource_type)) { + actor.Spec(init_timeout:, loop: handle_pool_message, init: fn() { + let self = process.new_subject() + + let selector = + process.new_selector() + |> process.selecting(self, function.identity) + |> process.selecting_trapped_exits(PoolExit) + + let state = + PoolState( + workers: deque.new(), + strategy: pool_config.strategy, + live_workers: dict.new(), + selector:, + ) + + // Trap exits + // process.trap_exits(True) + + actor.Ready(state, selector) + }) +} + +// ----- Worker ---- // + +type Worker(resource_type) { + Worker(subject: WorkerSubject(resource_type), monitor: process.ProcessMonitor) +} + +type WorkerSubject(resource_type) = + Subject(WorkerMsg(resource_type)) + +type WorkerMsg(resource_type) { + UseResource( + reply_to: Subject(UsageResult), + function: fn(resource_type) -> dynamic.Dynamic, + ) + WorkerExit( + process.ExitMessage, + resource_shutdown_function: fn(resource_type) -> Nil, + ) +} + +type UsageResult = + Result(dynamic.Dynamic, process.ProcessDown) + +fn worker_spec( + pool_subject: Subject(PoolMsg(resource_type)), + resource_create_function: fn() -> Result(resource_type, resource_create_error), + resource_shutdown_function: fn(resource_type) -> Nil, + init_timeout: Int, +) -> actor.Spec(resource_type, WorkerMsg(resource_type)) { + actor.Spec(init_timeout:, loop: handle_worker_message, init: fn() { + case resource_create_function() { + Ok(resource) -> { + // Check in the worker + let self = process.new_subject() + process.send(pool_subject, CheckIn(self)) + process.trap_exits(True) + + let selector = + process.new_selector() + |> process.selecting(self, function.identity) + |> process.selecting_trapped_exits(WorkerExit( + _, + resource_shutdown_function, + )) + + actor.Ready(resource, selector) + } + Error(resource_create_error) -> { + actor.Failed(resource_create_error |> string.inspect) + } + } + }) +} + +fn handle_worker_message(msg: WorkerMsg(resource_type), resource: resource_type) { + case msg { + UseResource(reply_to:, function:) -> { + actor.send(reply_to, Ok(function(resource))) + actor.continue(resource) + } + WorkerExit(exit_message, resource_shutdown_function:) -> { + resource_shutdown_function(resource) + actor.Stop(exit_message.reason) + } + } +} + +// ----- Utils ----- // + +@external(erlang, "bath_ffi", "unsafe_coerce") +fn unsafe_coerce_to_dynamic_function( + next: fn(resource_type) -> return_type, +) -> fn(resource_type) -> dynamic.Dynamic + +@external(erlang, "bath_ffi", "unsafe_coerce") +fn unsafe_coerce_to_return_type(term: dynamic.Dynamic) -> return_type diff --git a/test/bath_test.gleam b/test/bath_test.gleam index 64a79c6..688bf9b 100644 --- a/test/bath_test.gleam +++ b/test/bath_test.gleam @@ -1,4 +1,4 @@ -import bath +import bath_v2 as bath import gleam/erlang/process import gleam/io import gleeunit @@ -10,30 +10,41 @@ pub fn main() { // gleeunit test functions end in `_test` pub fn lifecycle_test() { - let assert Ok(pool) = bath.init(3, fn() { Ok(10) }, fn(_) { Nil }, 1000, 1000) - let assert Ok(_) = bath.apply(pool, 1000, fn(n) { io.debug(n) }) - let assert Ok(_) = bath.shutdown(pool, 1000) + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.with_size(1) + |> bath.with_shutdown(fn(res) { + io.debug("Shutting down") + io.debug(res) + Nil + }) + |> bath.start(1000) + let assert Ok(20) = bath.apply(pool, 1000, fn(n) { n * 2 }) } -// pub fn empty_pool_fails_to_apply_test() { -// let assert Ok(pool) = bath.init(0, fn() { Ok(10) }, 1000) -// let assert Error(bath.NoResourcesAvailable) = -// bath.apply(pool, 1000, fn(_) { Nil }) -// let assert Ok(_) = bath.shutdown(pool, fn(_) { Nil }, 1000) -// } -// pub fn pool_has_correct_capacity_test() { -// let assert Ok(pool) = bath.init(1, fn() { Ok(10) }, 1000) -// let assert Ok(_) = -// bath.apply(pool, 1000, fn(_) { -// // Only one capacity, so attempting to check out another resource -// // should fail -// let assert Error(bath.NoResourcesAvailable) = -// bath.apply(pool, 1000, fn(_) { Nil }) -// Nil -// }) -// let assert Ok(_) = bath.shutdown(pool, fn(_) { Nil }, 1000) -// } +pub fn empty_pool_fails_to_apply_test() { + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.with_size(0) + |> bath.start(1000) + let assert Error(bath.NoResourcesAvailable) = + bath.apply(pool, 1000, fn(_) { Nil }) +} +pub fn pool_has_correct_capacity_test() { + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.with_size(1) + |> bath.start(1000) + let assert Ok(_) = + bath.apply(pool, 1000, fn(_) { + // Only one capacity, so attempting to check out another resource + // should fail + let assert Error(bath.NoResourcesAvailable) = + bath.apply(pool, 1000, fn(_) { Nil }) + Nil + }) +} // pub fn pool_has_correct_resources_test() { // let assert Ok(pool) = bath.init(1, fn() { Ok(10) }, 1000) // let assert Ok(_) = From 99a1523272d2273f2334e37da51a3a3adf41efda Mon Sep 17 00:00:00 2001 From: Isaac Harris-Holt Date: Sun, 29 Dec 2024 19:41:47 +0000 Subject: [PATCH 3/7] turn bath back into a proper resource pool --- src/bath.gleam | 762 ++++++++++++++++++------------------------- src/bath_v2.gleam | 446 ------------------------- test/bath_test.gleam | 53 ++- 3 files changed, 354 insertions(+), 907 deletions(-) delete mode 100644 src/bath_v2.gleam diff --git a/src/bath.gleam b/src/bath.gleam index 58f9b2b..19c9a36 100644 --- a/src/bath.gleam +++ b/src/bath.gleam @@ -1,533 +1,395 @@ -import bravo -import bravo/uset import gleam/deque import gleam/dict.{type Dict} -import gleam/dynamic -import gleam/erlang/process +import gleam/erlang/process.{type Pid, type Subject} import gleam/function -import gleam/int import gleam/io import gleam/list import gleam/otp/actor -import gleam/otp/static_supervisor import gleam/result -import gleam/string -import lamb -import lamb/query -type PoolSubject(resource_type) = - process.Subject(PoolMsg(resource_type)) +// ---- Pool config ----- // -/// A resource pool. -pub opaque type Pool(resource_type) { - Pool( +pub type CheckoutStrategy { + FIFO + LIFO +} + +pub type CreationStrategy { + Lazy + Eager +} + +pub opaque type PoolConfig(resource_type, resource_create_error) { + PoolConfig( size: Int, - subject: PoolSubject(resource_type), - supervisor: process.Pid, - resource_shutdown_function: fn(resource_type) -> Nil, + create_resource: fn() -> Result(resource_type, resource_create_error), + shutdown_resource: fn(resource_type) -> Nil, + checkout_strategy: CheckoutStrategy, + creation_strategy: CreationStrategy, ) } -type LiveWorker(resource_type) { - LiveWorker( - // worker_pid: process.Pid, - worker: ResourceSubject(resource_type), - worker_monitor: process.ProcessMonitor, - caller: process.Pid, +pub fn new( + resource create_resource: fn() -> Result(resource_type, resource_create_error), +) -> PoolConfig(resource_type, resource_create_error) { + PoolConfig( + size: 10, + create_resource: create_resource, + shutdown_resource: fn(_) { Nil }, + checkout_strategy: FIFO, + creation_strategy: Lazy, ) } -type LiveWorkers(resource_type) = - Dict(process.Pid, LiveWorker(resource_type)) - -/// An error returned when creating a [`Pool`](#Pool). -pub type InitError(resource_create_error) { - /// The pool actor failed to start. - PoolStartError(actor.StartError) - /// The monitor actor failed to start. - MonitorStartError(actor.StartError) - /// A worker actor failed to start. - WorkerStartError(actor.StartError) - /// The resource creation function failed. - ResourceCreateError(resource_create_error) - // TableCreateError(lamb.Error) - /// ETS table creation failed. - TableCreateError(bravo.BravoError) - /// The supervisor failed to start. - SupervisorStartError(dynamic.Dynamic) +pub fn with_size( + config pool_config: PoolConfig(resource_type, resource_create_error), + size size: Int, +) -> PoolConfig(resource_type, resource_create_error) { + PoolConfig(..pool_config, size:) } -/// An error returned when the resource pool fails to shut down. -pub type ShutdownError { - /// There are still resources checked out. Ignore this failure case by - /// calling [`force_shutdown`](#force_shutdown) function. - ResourcesInUse(remaining: Int) - /// The shutdown timeout expired. - ShutdownTimeout - /// The pool was already down or failed to send the response message. - CalleeDown(reason: dynamic.Dynamic) +pub fn with_shutdown( + config pool_config: PoolConfig(resource_type, resource_create_error), + shutdown shutdown_resource: fn(resource_type) -> Nil, +) -> PoolConfig(resource_type, resource_create_error) { + PoolConfig(..pool_config, shutdown_resource:) } -/// An error returned when failing to apply a function to a pooled resource. -pub type ApplyError { - /// There are no resources available in the pool. - NoResourcesAvailable - /// The checkout timeout expired. - CheckoutTimeout - /// The worker failed to be called. - WorkerCallError(process.CallError(dynamic.Dynamic)) +pub fn with_checkout_strategy( + config pool_config: PoolConfig(resource_type, resource_create_error), + strategy checkout_strategy: CheckoutStrategy, +) -> PoolConfig(resource_type, resource_create_error) { + PoolConfig(..pool_config, checkout_strategy:) } -/// Start a new resource pool. -/// -/// ```gleam -/// // Creates a pool with 10 strings. -/// let assert Ok(pool) = bath.init(10, fn() { Ok("Some pooled resource") }) -/// ``` -pub fn init( - size: Int, - resource_create_function: fn() -> Result(resource_type, resource_create_error), - resource_shutdown_function: fn(resource_type) -> Nil, - pool_init_timeout: Int, - worker_init_timeout: Int, -) -> Result(Pool(resource_type), InitError(resource_create_error)) { - // use live_workers <- result.try( - // // lamb.create( - // // name: "bath_live_workers", - // // access: lamb.Public, - // // kind: lamb.Set, - // // registered: False, - // // ) - // // TODO: use random name - // uset.new(name: "bath_live_workers", access: bravo.Public, keypos: 1) - // |> result.map_error(TableCreateError), - // ) - - let live_workers = dict.new() - - let actor_result = - actor.start_spec(pool_spec(live_workers, pool_init_timeout)) - |> result.map_error(PoolStartError) - - use subject <- result.try(actor_result) - - let workers_result = - list.repeat("", size) - |> list.try_map(fn(_) { - use subject <- result.try( - actor.start_spec(worker_spec( - resource_create_function, - resource_shutdown_function, - subject, - worker_init_timeout, - )) - |> result.map_error(WorkerStartError), - ) - Ok(subject) - }) - - use workers <- result.try(workers_result) - - let sup = - static_supervisor.new(static_supervisor.OneForOne) - |> static_supervisor.auto_shutdown(static_supervisor.AllSignificant) - let sup_result = - workers - |> list.index_fold(sup, fn(sup, actor, idx) { - static_supervisor.add( - sup, - static_supervisor.worker_child("worker_" <> int.to_string(idx), fn() { - process.subject_owner(actor) |> Ok - }) - |> static_supervisor.significant(True) - |> static_supervisor.restart(static_supervisor.Transient), - ) - }) - |> static_supervisor.start_link() - |> result.map_error(SupervisorStartError) +pub fn with_creation_strategy( + config pool_config: PoolConfig(resource_type, resource_create_error), + strategy creation_strategy: CreationStrategy, +) -> PoolConfig(resource_type, resource_create_error) { + PoolConfig(..pool_config, creation_strategy:) +} - use sup <- result.try(sup_result) +// ----- Lifecycle functions ---- // - Ok(Pool(size:, subject:, supervisor: sup, resource_shutdown_function:)) +pub type StartError(resource_create_error) { + PoolStartResourceCreateError(resource_create_error) + ActorStartError(actor.StartError) } -/// Check out a resource from the pool, apply the `next` function, then check -/// the resource back in. -/// -/// ```gleam -/// let assert Ok(pool) = bath.init(10, fn() { Ok("Some pooled resource") }) -/// -/// use resource <- bath.apply(pool, 1000) -/// // Do stuff with resource... -/// ``` -pub fn apply( - pool: Pool(resource_type), - timeout: Int, - next: fn(resource_type) -> result_type, -) -> Result(result_type, ApplyError) { - let self = process.self() - use worker <- result.try(check_out(pool.subject, self, timeout)) - let next = unsafe_coerce_to_dynamic_function(next) - - let result = - process.try_call(worker, UseResource(next, _), timeout) - |> result.map_error(WorkerCallError) +pub type ApplyError(resource_create_error) { + NoResourcesAvailable + CheckOutResourceCreateError(resource_create_error) + CheckOutTimeout +} - check_in(pool.subject, worker) +pub fn child_spec( + config pool_config: PoolConfig(resource_type, resource_create_error), + timeout init_timeout: Int, +) -> Result( + actor.Spec( + State(resource_type, resource_create_error), + Msg(resource_type, resource_create_error), + ), + StartError(resource_create_error), +) { + let #(resources_result, current_size) = case pool_config.creation_strategy { + Lazy -> #(Ok(deque.new()), 0) + Eager -> #( + list.repeat("", pool_config.size) + |> list.try_map(fn(_) { pool_config.create_resource() }) + |> result.map(deque.from_list) + |> result.map_error(PoolStartResourceCreateError), + pool_config.size, + ) + } - use return_value <- result.try(result) - Ok(unsafe_coerce_to_return_type(return_value)) -} + use resources <- result.try(resources_result) -/// Shut down the pool, calling the `resource_shutdown_function` on each -/// resource in the pool. -/// -/// Will fail if there are still resources checked out. -pub fn shutdown(pool: Pool(resource_type), timeout: Int) { - process.send_exit(pool.supervisor) - Ok(Nil) - // process.try_call( - // pool.subject, - // Shutdown(pool.resource_shutdown_function, pool.supervisor, _), - // timeout, - // ) - // |> result.map_error(fn(err) { - // case err { - // process.CallTimeout -> ShutdownTimeout - // process.CalleeDown(reason) -> CalleeDown(reason:) - // } - // }) - // |> result.flatten + Ok(pool_spec(pool_config, resources, current_size, init_timeout)) } -/// Shut down the pool, calling the `resource_shutdown_function` on each -/// resource in the pool. -/// -/// Will not fail, even if resources are checked out, and will call the -/// `resource_shutdown_function` on both checked in and checked out resources. -pub fn force_shutdown( - pool: Pool(resource_type), - resource_shutdown_function: fn(resource_type) -> Nil, +pub fn start( + config pool_config: PoolConfig(resource_type, resource_create_error), + timeout init_timeout: Int, +) -> Result( + Pool(resource_type, resource_create_error), + StartError(resource_create_error), ) { - process.send( - pool.subject, - ForceShutdown(resource_shutdown_function, pool.supervisor), - ) + use spec <- result.try(child_spec(pool_config, init_timeout)) + + actor.start_spec(spec) + |> result.map(fn(subject) { Pool(subject:) }) + |> result.map_error(ActorStartError) } +/// Checks out a resource from the pool, sending the caller Pid for the pool to +/// monitor in case the client dies. This allows the pool to create a new resource +/// later if required. fn check_out( - pool_subject: PoolSubject(resource_type), - caller: process.Pid, + pool: Pool(resource_type, resource_create_error), + caller: Pid, timeout: Int, -) { - process.try_call(pool_subject, CheckOut(_, caller:), timeout) - |> result.replace_error(CheckoutTimeout) +) -> Result(resource_type, ApplyError(resource_create_error)) { + process.try_call(pool.subject, CheckOut(_, caller:), timeout) + |> result.replace_error(CheckOutTimeout) |> result.flatten } fn check_in( - pool_subject: PoolSubject(resource_type), - worker: ResourceSubject(resource_type), -) -> Nil { - process.send(pool_subject, CheckIn(worker)) + pool: Pool(resource_type, resource_create_error), + resource: resource_type, + caller: Pid, +) { + process.send(pool.subject, CheckIn(resource:, caller:)) +} + +pub fn apply( + pool: Pool(resource_type, resource_create_error), + timeout: Int, + next: fn(resource_type) -> result_type, +) -> Result(result_type, ApplyError(resource_create_error)) { + let self = process.self() + use resource <- result.try(check_out(pool, self, timeout)) + + let usage_result = next(resource) + + check_in(pool, resource, self) + + Ok(usage_result) +} + +pub fn shutdown(pool: Pool(resource_type, resource_create_error)) { + process.send_exit(pool.subject |> process.subject_owner) } -// ----- Pool actor ----- // +// ----- Pool ----- // + +pub opaque type Pool(resource_type, resource_create_error) { + Pool(subject: Subject(Msg(resource_type, resource_create_error))) +} -type PoolState(resource_type) { - PoolState( - workers: deque.Deque(ResourceSubject(resource_type)), - // live_workers: lamb.Table(process.Pid, LiveWorker(resource_type)), - // live_workers: uset.USet(LiveWorker(resource_type)), - live_workers: LiveWorkers(resource_type), - // supervisor: process.Pid, - selector: process.Selector(PoolMsg(resource_type)), +pub opaque type State(resource_type, resource_create_error) { + State( + // Config + checkout_strategy: CheckoutStrategy, + creation_strategy: CreationStrategy, + max_size: Int, + create_resource: fn() -> Result(resource_type, resource_create_error), + shutdown_resource: fn(resource_type) -> Nil, + // State + resources: deque.Deque(resource_type), + current_size: Int, + live_resources: LiveResources(resource_type), + selector: process.Selector(Msg(resource_type, resource_create_error)), ) } -type PoolMsg(resource_type) { - ProcessDown(process.ProcessDown) - CheckIn(worker: ResourceSubject(resource_type)) +type LiveResources(resource_type) = + Dict(Pid, LiveResource(resource_type)) + +type LiveResource(resource_type) { + LiveResource(resource: resource_type, monitor: process.ProcessMonitor) +} + +pub opaque type Msg(resource_type, resource_create_error) { + CheckIn(resource: resource_type, caller: Pid) CheckOut( - reply_to: process.Subject( - Result(ResourceSubject(resource_type), ApplyError), - ), - caller: process.Pid, - ) - Shutdown( - resource_shutdown_function: fn(resource_type) -> Nil, - supervisor: process.Pid, - reply_to: process.Subject(Result(Nil, ShutdownError)), - ) - ForceShutdown( - resource_shutdown_function: fn(resource_type) -> Nil, - supervisor: process.Pid, + reply_to: Subject(Result(resource_type, ApplyError(resource_create_error))), + caller: Pid, ) + PoolExit(process.ExitMessage) + CallerDown(process.ProcessDown) } fn handle_pool_message( - msg: PoolMsg(resource_type), - pool_state: PoolState(resource_type), + msg: Msg(resource_type, resource_create_error), + state: State(resource_type, resource_create_error), ) { + io.debug(state.live_resources) case msg { - CheckIn(worker:) -> { - let new_workers = deque.push_back(pool_state.workers, worker) - - // let query = - // query.new() - // |> query.index(worker |> process.subject_owner) - - // lamb.remove(pool_state.live_workers, query) - - // uset.delete_key(pool_state.live_workers, worker |> process.subject_owner) - let selector = case - dict.get(pool_state.live_workers, worker |> process.subject_owner) - { - Ok(live_worker) -> { - // Demonitor the process - let selector = - pool_state.selector - |> process.deselecting_process_down(live_worker.worker_monitor) - - process.demonitor_process(live_worker.worker_monitor) - - selector + CheckIn(resource:, caller:) -> { + // If the checked-in process currently has a live resource, remove it from + // the live_resources dict + let caller_live_resource = dict.get(state.live_resources, caller) + let live_resources = dict.delete(state.live_resources, caller) + + let selector = case caller_live_resource { + Ok(live_resource) -> { + process.demonitor_process(live_resource.monitor) + + state.selector + |> process.deselecting_process_down(live_resource.monitor) } - Error(_) -> pool_state.selector + Error(_) -> state.selector } - let live_workers = - dict.delete(pool_state.live_workers, worker |> process.subject_owner) + let new_resources = deque.push_back(state.resources, resource) - actor.with_selector( - actor.continue(PoolState(workers: new_workers, live_workers:, selector:)), - selector, + actor.continue( + State(..state, resources: new_resources, live_resources:, selector:), ) } CheckOut(reply_to:, caller:) -> { - case deque.pop_front(pool_state.workers) { - Ok(#(worker, new_workers)) -> { - // Add the worker to the live_workers table - let worker_pid = process.subject_owner(worker) - let worker_monitor = process.monitor_process(worker_pid) - - // lamb.insert( - // pool_state.live_workers, - // worker |> process.subject_owner, - // LiveWorker(worker:, worker_monitor:, caller:), - // ) - // uset.insert(pool_state.live_workers, [ - // LiveWorker(worker_pid:, worker:, worker_monitor:, caller:), - // ]) - let live_workers = - dict.insert( - pool_state.live_workers, - worker |> process.subject_owner, - LiveWorker(worker:, worker_monitor:, caller:), - ) - - actor.send(reply_to, Ok(worker)) + // We always push to the back, so for FIFO, we pop front, + // and for LIFO, we pop back + let get_result = case state.checkout_strategy { + FIFO -> deque.pop_front(state.resources) + LIFO -> deque.pop_back(state.resources) + } - io.debug("live workers: check out") - io.debug(pool_state.live_workers) + // Try to get a new resource, either from the pool or by creating a new one + // if we still have capacity + let resource_result = case get_result { + // Use an existing resource - current size hasn't changed + Ok(#(resource, new_resources)) -> + Ok(#(resource, new_resources, state.current_size)) + Error(_) -> { + // Nothing in the pool. Create a new resource if we can + case state.current_size < state.max_size { + True -> { + use resource <- result.try( + state.create_resource() + |> result.map_error(CheckOutResourceCreateError), + ) + // Checked-in resources queue hasn't changed, but we've added a new resource + // so current size has increased + Ok(#(resource, state.resources, state.current_size + 1)) + } + False -> Error(NoResourcesAvailable) + } + } + } + case resource_result { + Error(err) -> { + // Nothing has changed + actor.send(reply_to, Error(err)) + actor.continue(state) + } + Ok(#(resource, new_resources, new_current_size)) -> { + // Monitor the caller process + let monitor = process.monitor_process(caller) let selector = - pool_state.selector - |> process.selecting_process_down(worker_monitor, ProcessDown) + state.selector + |> process.selecting_process_down(monitor, CallerDown) - actor.with_selector( - actor.continue(PoolState( - workers: new_workers, + let live_resources = + dict.insert( + state.live_resources, + caller, + LiveResource(resource:, monitor:), + ) + + actor.send(reply_to, Ok(resource)) + actor.continue( + State( + ..state, + resources: new_resources, + current_size: new_current_size, selector:, - live_workers:, - )), - selector, + live_resources:, + ), ) } - Error(_) -> { - actor.send(reply_to, Error(NoResourcesAvailable)) - actor.continue(pool_state) - } } } - Shutdown(resource_shutdown_function:, supervisor:, reply_to:) -> { - io.debug("Shutdown") - case dict.size(pool_state.live_workers) { - 0 -> { - pool_state.workers - |> deque.to_list - |> list.each(actor.send(_, ShutdownResource( - resource_shutdown_function, - supervisor:, - ))) - - // TODO: handle this better - // process.kill(supervisor) - - process.send(reply_to, Ok(Nil)) - actor.Stop(process.Normal) - } - remaining -> { - process.send(reply_to, Error(ResourcesInUse(remaining:))) - actor.continue(pool_state) - } - } - } - ForceShutdown(resource_shutdown_function:, supervisor:) -> { - // static_super - // list.append(pool_state.checked_in, pool_state.checked_out) - // |> list.each(resource_shutdown_function) - - // TODO: handle this better - // process.kill(supervisor) + PoolExit(exit_message) -> { + // Don't clean up live resources, as they may be in use + state.resources + |> deque.to_list + |> list.each(state.shutdown_resource) - actor.Stop(process.Normal) + actor.Stop(exit_message.reason) } - ProcessDown(process_down) -> { - // TODO: remove let_assert - // let assert [live_worker] = - // lamb.lookup(pool_state.live_workers, process_down.pid) - // let assert Ok(live_worker) = - // uset.lookup(pool_state.live_workers, process_down.pid) - io.debug("Pool live workers") - io.debug(pool_state.live_workers) - let #(workers, selector) = case - dict.get(pool_state.live_workers, process_down.pid) - { - Ok(live_worker) -> { + CallerDown(process_down) -> { + io.debug("Caller down") + // If the caller was a live resource, either create a new one or + // decrement the current size depending on the creation strategy + case dict.get(state.live_resources, process_down.pid) { + // Continue as normal, ignoring this message + Error(_) -> actor.continue(state) + Ok(live_resource) -> { // Demonitor the process + process.demonitor_process(live_resource.monitor) let selector = - pool_state.selector - |> process.deselecting_process_down(live_worker.worker_monitor) - - process.demonitor_process(live_worker.worker_monitor) - - // Remove the worker from the workers deque - let new_workers = - pool_state.workers - |> deque.to_list - |> list.filter(fn(worker) { worker != live_worker.worker }) - |> deque.from_list - - #(new_workers, selector) + state.selector + |> process.deselecting_process_down(live_resource.monitor) + + let #(new_resources, new_current_size) = case + state.creation_strategy + { + // If we create lazily, just decrement the current size - a new resource + // will be created when required + Lazy -> #(state.resources, state.current_size - 1) + // Otherwise, create a new resource, warning if resource creation fails + Eager -> { + case state.create_resource() { + // Size hasn't changed + Ok(resource) -> #( + deque.push_back(state.resources, resource), + state.current_size, + ) + // Size has changed + Error(resource_create_error) -> { + io.debug("Bath: Resource creation failed") + io.debug(resource_create_error) + #(state.resources, state.current_size) + } + } + } + } + + actor.continue( + State( + ..state, + resources: new_resources, + current_size: new_current_size, + selector:, + live_resources: dict.delete( + state.live_resources, + process_down.pid, + ), + ), + ) } - Error(_) -> #(pool_state.workers, pool_state.selector) } - - // Remove the process from the live_workers set - // let query = query.new() |> query.index(process_down.pid) - // lamb.remove(pool_state.live_workers, query) - // uset.delete_key(pool_state.live_workers, process_down.pid) - - let live_workers = dict.delete(pool_state.live_workers, process_down.pid) - - actor.with_selector( - actor.continue(PoolState(workers:, selector:, live_workers:)), - pool_state.selector, - ) } } } fn pool_spec( - live_workers: LiveWorkers(resource_type), - init_timeout: Int, -) -> actor.Spec(PoolState(resource_type), PoolMsg(resource_type)) { - actor.Spec( - init: fn() { - let selector = process.new_selector() - actor.Ready(PoolState(deque.new(), selector:, live_workers:), selector) - }, - init_timeout:, - loop: handle_pool_message, - ) -} - -// ----- Resource actor ----- // - -type ResourceMessage(resource_type) { - UseResource( - next: fn(resource_type) -> dynamic.Dynamic, - reply_to: process.Subject(dynamic.Dynamic), - ) - // Takes a shutdown function - ShutdownResource( - resource_shutdown_function: fn(resource_type) -> Nil, - supervisor: process.Pid, - ) - ResourceExit( - process.ExitMessage, - resource_shutdown_function: fn(resource_type) -> Nil, - ) -} - -type ResourceSubject(resource_type) = - process.Subject(ResourceMessage(resource_type)) - -fn worker_spec( - resource_create_function: fn() -> Result(resource_type, resource_create_error), - resource_shutdown_function: fn(resource_type) -> Nil, - pool_subject: PoolSubject(resource_type), + pool_config: PoolConfig(resource_type, resource_create_error), + resources: deque.Deque(resource_type), + current_size: Int, init_timeout: Int, -) -> actor.Spec(resource_type, ResourceMessage(resource_type)) { - actor.Spec( - init: fn() { - case resource_create_function() { - Ok(resource) -> { - // Check in the worker - let self = process.new_subject() - process.send(pool_subject, CheckIn(self)) - // process.trap_exits(True) - - let selector = - process.new_selector() - |> process.selecting(self, function.identity) - |> process.selecting_trapped_exits(ResourceExit( - _, - resource_shutdown_function, - )) - - actor.Ready(resource, selector) - } - Error(resource_create_error) -> { - actor.Failed(resource_create_error |> string.inspect) - } - } - }, - init_timeout:, - loop: handle_resource_message, - ) -} - -fn handle_resource_message( - msg: ResourceMessage(resource_type), - resource: resource_type, +) -> actor.Spec( + State(resource_type, resource_create_error), + Msg(resource_type, resource_create_error), ) { - case msg { - UseResource(next:, reply_to:) -> { - actor.send(reply_to, next(resource)) - actor.continue(resource) - } - ShutdownResource(resource_shutdown_function:, supervisor:) -> { - io.debug("Worker shutting down") - resource_shutdown_function(resource) - actor.Stop(process.Normal) - } - ResourceExit(exit_message, resource_shutdown_function:) -> { - io.debug("exit") - io.debug(exit_message) - resource_shutdown_function(resource) - actor.Stop(process.Normal) - } - } -} - -// ----- Utils ----- // - -@external(erlang, "bath_ffi", "unsafe_coerce") -fn unsafe_coerce_to_dynamic_function( - next: fn(resource_type) -> return_type, -) -> fn(resource_type) -> dynamic.Dynamic + actor.Spec(init_timeout:, loop: handle_pool_message, init: fn() { + let self = process.new_subject() + + // Trap exits + process.trap_exits(True) + + let selector = + process.new_selector() + |> process.selecting(self, function.identity) + |> process.selecting_trapped_exits(PoolExit) + + let state = + State( + resources:, + checkout_strategy: pool_config.checkout_strategy, + creation_strategy: pool_config.creation_strategy, + live_resources: dict.new(), + selector:, + current_size:, + max_size: pool_config.size, + create_resource: pool_config.create_resource, + shutdown_resource: pool_config.shutdown_resource, + ) -@external(erlang, "bath_ffi", "unsafe_coerce") -fn unsafe_coerce_to_return_type(term: dynamic.Dynamic) -> return_type + actor.Ready(state, selector) + }) +} diff --git a/src/bath_v2.gleam b/src/bath_v2.gleam deleted file mode 100644 index 633b5cd..0000000 --- a/src/bath_v2.gleam +++ /dev/null @@ -1,446 +0,0 @@ -import gleam/deque -import gleam/dict.{type Dict} -import gleam/dynamic -import gleam/erlang/process.{type Pid, type Subject} -import gleam/function -import gleam/int -import gleam/io -import gleam/list -import gleam/otp/actor -import gleam/otp/static_supervisor as sup -import gleam/result -import gleam/string - -// ---- Pool config ----- // - -pub type Strategy { - FIFO - LIFO -} - -pub opaque type PoolConfig(resource_type, resource_create_error) { - PoolConfig( - size: Int, - create_resource: fn() -> Result(resource_type, resource_create_error), - shutdown_resource: fn(resource_type) -> Nil, - strategy: Strategy, - ) -} - -pub fn new( - resource create_resource: fn() -> Result(resource_type, resource_create_error), -) -> PoolConfig(resource_type, resource_create_error) { - PoolConfig( - size: 10, - create_resource: create_resource, - shutdown_resource: fn(_) { Nil }, - strategy: FIFO, - ) -} - -pub fn with_size( - config pool_config: PoolConfig(resource_type, resource_create_error), - size size: Int, -) -> PoolConfig(resource_type, resource_create_error) { - PoolConfig(..pool_config, size:) -} - -pub fn with_shutdown( - config pool_config: PoolConfig(resource_type, resource_create_error), - shutdown shutdown_resource: fn(resource_type) -> Nil, -) -> PoolConfig(resource_type, resource_create_error) { - PoolConfig(..pool_config, shutdown_resource:) -} - -pub fn with_strategy( - config pool_config: PoolConfig(resource_type, resource_create_error), - strategy strategy: Strategy, -) -> PoolConfig(resource_type, resource_create_error) { - PoolConfig(..pool_config, strategy:) -} - -// ----- Lifecycle functions ---- // - -pub type StartError { - PoolActorStartError(actor.StartError) - WorkerStartError(actor.StartError) - PoolSupervisorStartError(dynamic.Dynamic) - WorkerSupervisorStartError(dynamic.Dynamic) -} - -pub type ApplyError { - NoResourcesAvailable - CheckOutTimeout - WorkerCallTimeout - WorkerCrashed(process.ProcessDown) -} - -pub fn start( - config pool_config: PoolConfig(resource_type, resource_create_error), - timeout init_timeout: Int, - // TODO: errors -) -> Result(Pool(resource_type), StartError) { - // The supervision tree for pools looks like this: - // supervisor (probably rest for 1?) - // | | - // | | - // pool supervisor (one for one) - // | | | - // / | \ - // / | \ - // worker worker worker - - let main_supervisor = sup.new(sup.RestForOne) - let worker_supervisor = sup.new(sup.OneForOne) - - let pool_start_result = - actor.start_spec(pool_spec(pool_config, init_timeout)) - |> result.map_error(PoolActorStartError) - - use pool_subject <- result.try(pool_start_result) - - let workers_result = - list.repeat("", pool_config.size) - |> list.try_map(fn(_) { - use subject <- result.try( - actor.start_spec(worker_spec( - pool_subject, - pool_config.create_resource, - pool_config.shutdown_resource, - init_timeout, - )) - |> result.map_error(WorkerStartError), - ) - Ok(subject) - }) - - use workers <- result.try(workers_result) - - // Add workers to the worker supervisor and start it - let worker_supervisor_result = - workers - |> list.index_fold(worker_supervisor, fn(worker_supervisor, actor, idx) { - sup.add( - worker_supervisor, - sup.worker_child("worker_" <> int.to_string(idx), fn() { - process.subject_owner(actor) |> Ok - }) - |> sup.restart(sup.Transient), - ) - }) - |> sup.start_link() - |> result.map_error(WorkerSupervisorStartError) - - use worker_supervisor <- result.try(worker_supervisor_result) - - // Add the pool and worker supervisors to the main supervisor - let main_supervisor_result = - sup.add( - main_supervisor, - sup.worker_child("pool", fn() { - process.subject_owner(pool_subject) |> Ok - }) - |> sup.restart(sup.Transient), - ) - |> sup.add( - sup.supervisor_child("worker_supervisor", fn() { Ok(worker_supervisor) }) - |> sup.restart(sup.Transient), - ) - |> sup.start_link() - |> result.map_error(PoolSupervisorStartError) - - use main_supervisor <- result.try(main_supervisor_result) - - Ok(Pool(subject: pool_subject, supervisor: main_supervisor)) -} - -fn check_out( - pool: Pool(resource_type), - caller: Subject(UsageResult), - timeout: Int, -) -> Result(WorkerSubject(resource_type), ApplyError) { - process.try_call(pool.subject, CheckOut(_, caller:), timeout) - |> result.replace_error(CheckOutTimeout) - |> result.flatten -} - -fn check_in( - pool: Pool(resource_type), - worker_subject: WorkerSubject(resource_type), -) { - process.send(pool.subject, CheckIn(worker_subject)) -} - -pub fn apply( - pool: Pool(resource_type), - timeout: Int, - next: fn(resource_type) -> result_type, -) -> Result(result_type, ApplyError) { - let self = process.new_subject() - use worker_subject <- result.try(check_out(pool, self, timeout)) - - // Use manual send instead of try_call so we can use the same caller subject - // we sent with the check out message - process.send( - worker_subject, - UseResource(self, unsafe_coerce_to_dynamic_function(next)), - ) - - let usage_result = - process.receive(self, timeout) - |> result.map(fn(result) { - result - |> result.map(unsafe_coerce_to_return_type) - |> result.map_error(WorkerCrashed) - }) - - let usage_result = case usage_result { - // Timeout - Error(Nil) -> Error(WorkerCallTimeout) - Ok(Error(err)) -> Error(err) - Ok(Ok(result)) -> Ok(result) - } - - check_in(pool, worker_subject) - - usage_result -} - -// ----- Pool ----- // - -pub opaque type Pool(resource_type) { - Pool(subject: Subject(PoolMsg(resource_type)), supervisor: Pid) -} - -type PoolState(resource_type) { - PoolState( - workers: deque.Deque(Worker(resource_type)), - strategy: Strategy, - live_workers: LiveWorkers(resource_type), - selector: process.Selector(PoolMsg(resource_type)), - ) -} - -type LiveWorkers(resource_type) = - Dict(Pid, LiveWorker(resource_type)) - -type LiveWorker(resource_type) { - LiveWorker(worker: Worker(resource_type), caller: Subject(UsageResult)) -} - -type PoolMsg(resource_type) { - CheckIn(WorkerSubject(resource_type)) - CheckOut( - reply_to: Subject(Result(WorkerSubject(resource_type), ApplyError)), - caller: Subject(UsageResult), - ) - PoolExit(process.ExitMessage) - WorkerDown(process.ProcessDown) -} - -fn handle_pool_message( - msg: PoolMsg(resource_type), - state: PoolState(resource_type), -) { - // TODO: process monitoring - case msg { - CheckIn(worker_subject) -> { - // If the checked-in process is a currently live worker, remove it from - // the live_workers dict - let live_workers = - dict.delete(state.live_workers, worker_subject |> process.subject_owner) - - // Monitor the new worker process - let monitor = - process.monitor_process(worker_subject |> process.subject_owner) - - let new_workers = - deque.push_back( - state.workers, - Worker(subject: worker_subject, monitor:), - ) - - let selector = - state.selector - |> process.selecting_process_down(monitor, WorkerDown) - - actor.continue( - PoolState(..state, workers: new_workers, live_workers:, selector:), - ) - } - CheckOut(reply_to:, caller:) -> { - // We always push to the back, so for FIFO, we pop front, - // and for LIFO, we pop back - let get_result = case state.strategy { - FIFO -> deque.pop_front(state.workers) - LIFO -> deque.pop_back(state.workers) - } - - case get_result { - Ok(#(worker, new_workers)) -> { - let live_workers = - dict.insert( - state.live_workers, - worker.subject |> process.subject_owner, - LiveWorker(worker:, caller:), - ) - actor.send(reply_to, Ok(worker.subject)) - actor.continue(PoolState(..state, workers: new_workers)) - } - Error(_) -> { - actor.send(reply_to, Error(NoResourcesAvailable)) - actor.continue(state) - } - } - } - PoolExit(exit_message) -> { - io.debug("Pool exited") - // TODO: cleanup - actor.Stop(process.Normal) - } - WorkerDown(process_down) -> { - let #(maybe_worker, new_workers) = - state.workers - |> deque.to_list - |> list.partition(fn(worker) { - process.subject_owner(worker.subject) == process_down.pid - }) - - // If the worker exists, demonitor it - let selector = case maybe_worker { - [worker] -> { - process.demonitor_process(worker.monitor) - - state.selector - |> process.deselecting_process_down(worker.monitor) - } - _ -> state.selector - } - - // If the process was a live worker, send an error message back - // to the caller - case dict.get(state.live_workers, process_down.pid) { - Ok(live_worker) -> { - process.send(live_worker.caller, Error(process_down)) - } - Error(_) -> Nil - } - - // Delete the process from the live_workers dict - let live_workers = dict.delete(state.live_workers, process_down.pid) - - actor.continue( - PoolState( - ..state, - live_workers:, - selector:, - workers: new_workers |> deque.from_list, - ), - ) - } - } -} - -fn pool_spec( - pool_config: PoolConfig(resource_type, resource_create_error), - init_timeout: Int, -) -> actor.Spec(PoolState(resource_type), PoolMsg(resource_type)) { - actor.Spec(init_timeout:, loop: handle_pool_message, init: fn() { - let self = process.new_subject() - - let selector = - process.new_selector() - |> process.selecting(self, function.identity) - |> process.selecting_trapped_exits(PoolExit) - - let state = - PoolState( - workers: deque.new(), - strategy: pool_config.strategy, - live_workers: dict.new(), - selector:, - ) - - // Trap exits - // process.trap_exits(True) - - actor.Ready(state, selector) - }) -} - -// ----- Worker ---- // - -type Worker(resource_type) { - Worker(subject: WorkerSubject(resource_type), monitor: process.ProcessMonitor) -} - -type WorkerSubject(resource_type) = - Subject(WorkerMsg(resource_type)) - -type WorkerMsg(resource_type) { - UseResource( - reply_to: Subject(UsageResult), - function: fn(resource_type) -> dynamic.Dynamic, - ) - WorkerExit( - process.ExitMessage, - resource_shutdown_function: fn(resource_type) -> Nil, - ) -} - -type UsageResult = - Result(dynamic.Dynamic, process.ProcessDown) - -fn worker_spec( - pool_subject: Subject(PoolMsg(resource_type)), - resource_create_function: fn() -> Result(resource_type, resource_create_error), - resource_shutdown_function: fn(resource_type) -> Nil, - init_timeout: Int, -) -> actor.Spec(resource_type, WorkerMsg(resource_type)) { - actor.Spec(init_timeout:, loop: handle_worker_message, init: fn() { - case resource_create_function() { - Ok(resource) -> { - // Check in the worker - let self = process.new_subject() - process.send(pool_subject, CheckIn(self)) - process.trap_exits(True) - - let selector = - process.new_selector() - |> process.selecting(self, function.identity) - |> process.selecting_trapped_exits(WorkerExit( - _, - resource_shutdown_function, - )) - - actor.Ready(resource, selector) - } - Error(resource_create_error) -> { - actor.Failed(resource_create_error |> string.inspect) - } - } - }) -} - -fn handle_worker_message(msg: WorkerMsg(resource_type), resource: resource_type) { - case msg { - UseResource(reply_to:, function:) -> { - actor.send(reply_to, Ok(function(resource))) - actor.continue(resource) - } - WorkerExit(exit_message, resource_shutdown_function:) -> { - resource_shutdown_function(resource) - actor.Stop(exit_message.reason) - } - } -} - -// ----- Utils ----- // - -@external(erlang, "bath_ffi", "unsafe_coerce") -fn unsafe_coerce_to_dynamic_function( - next: fn(resource_type) -> return_type, -) -> fn(resource_type) -> dynamic.Dynamic - -@external(erlang, "bath_ffi", "unsafe_coerce") -fn unsafe_coerce_to_return_type(term: dynamic.Dynamic) -> return_type diff --git a/test/bath_test.gleam b/test/bath_test.gleam index 688bf9b..32a4fbc 100644 --- a/test/bath_test.gleam +++ b/test/bath_test.gleam @@ -1,4 +1,4 @@ -import bath_v2 as bath +import bath import gleam/erlang/process import gleam/io import gleeunit @@ -20,6 +20,7 @@ pub fn lifecycle_test() { }) |> bath.start(1000) let assert Ok(20) = bath.apply(pool, 1000, fn(n) { n * 2 }) + bath.shutdown(pool) } pub fn empty_pool_fails_to_apply_test() { @@ -29,6 +30,7 @@ pub fn empty_pool_fails_to_apply_test() { |> bath.start(1000) let assert Error(bath.NoResourcesAvailable) = bath.apply(pool, 1000, fn(_) { Nil }) + bath.shutdown(pool) } pub fn pool_has_correct_capacity_test() { @@ -44,14 +46,43 @@ pub fn pool_has_correct_capacity_test() { bath.apply(pool, 1000, fn(_) { Nil }) Nil }) + bath.shutdown(pool) +} + +pub fn pool_has_correct_resources_test() { + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.with_size(10) + |> bath.start(1000) + + let assert Ok(_) = + bath.apply(pool, 1000, fn(n) { + // Check we have the right values + n + |> should.equal(10) + }) + + bath.shutdown(pool) +} + +pub fn pool_handles_caller_crash_test() { + let assert Ok(pool) = + bath.new(fn() { Ok(10) }) + |> bath.with_size(1) + |> bath.start(1000) + + process.start( + fn() { + use _ <- bath.apply(pool, 1000) + panic as "Oh no, the caller crashed!" + }, + False, + ) + + process.sleep(1000) + + // Ensure the pool still has an available resource + let assert Ok(10) = bath.apply(pool, 1000, fn(r) { r }) + + bath.shutdown(pool) } -// pub fn pool_has_correct_resources_test() { -// let assert Ok(pool) = bath.init(1, fn() { Ok(10) }, 1000) -// let assert Ok(_) = -// bath.apply(pool, 1000, fn(n) { -// // Check we have the right values -// n -// |> should.equal(10) -// }) -// let assert Ok(_) = bath.shutdown(pool, fn(_) { Nil }, 1000) -// } From 73cb05bbb6c676e1d4f5001be24948f23c92ddb5 Mon Sep 17 00:00:00 2001 From: Isaac Harris-Holt Date: Sun, 29 Dec 2024 20:07:59 +0000 Subject: [PATCH 4/7] add erlang logging --- gleam.toml | 1 + manifest.toml | 2 + src/bath.gleam | 102 ++++++++++++++++++++++++++++--------------- test/bath_test.gleam | 10 ++++- 4 files changed, 80 insertions(+), 35 deletions(-) diff --git a/gleam.toml b/gleam.toml index 954921f..56b4e32 100644 --- a/gleam.toml +++ b/gleam.toml @@ -12,6 +12,7 @@ gleam_erlang = ">= 0.33.0 and < 1.0.0" gleam_deque = ">= 1.0.0 and < 2.0.0" lamb = ">= 0.6.1 and < 1.0.0" bravo = ">= 4.0.1 and < 5.0.0" +logging = ">= 1.3.0 and < 2.0.0" [dev-dependencies] gleeunit = ">= 1.0.0 and < 2.0.0" diff --git a/manifest.toml b/manifest.toml index 147c21b..d147529 100644 --- a/manifest.toml +++ b/manifest.toml @@ -9,6 +9,7 @@ packages = [ { name = "gleam_stdlib", version = "0.51.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "14AFA8D3DDD7045203D422715DBB822D1725992A31DF35A08D97389014B74B68" }, { name = "gleeunit", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "F7A7228925D3EE7D0813C922E062BFD6D7E9310F0BEE585D3A42F3307E3CFD13" }, { name = "lamb", version = "0.6.1", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "lamb", source = "hex", outer_checksum = "A74714DE60B3BADB623DFFF910C843793AE660222A9AD63C70053D33C0C3D311" }, + { name = "logging", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "logging", source = "hex", outer_checksum = "1098FBF10B54B44C2C7FDF0B01C1253CAFACDACABEFB4B0D027803246753E06D" }, ] [requirements] @@ -19,3 +20,4 @@ gleam_otp = { version = ">= 0.16.0 and < 1.0.0" } gleam_stdlib = { version = ">= 0.34.0 and < 2.0.0" } gleeunit = { version = ">= 1.0.0 and < 2.0.0" } lamb = { version = ">= 0.6.1 and < 1.0.0" } +logging = { version = ">= 1.3.0 and < 2.0.0" } diff --git a/src/bath.gleam b/src/bath.gleam index 19c9a36..213771d 100644 --- a/src/bath.gleam +++ b/src/bath.gleam @@ -1,11 +1,14 @@ import gleam/deque import gleam/dict.{type Dict} +import gleam/erlang/charlist.{type Charlist} import gleam/erlang/process.{type Pid, type Subject} import gleam/function import gleam/io import gleam/list import gleam/otp/actor import gleam/result +import gleam/string +import logging // ---- Pool config ----- // @@ -205,7 +208,6 @@ fn handle_pool_message( msg: Msg(resource_type, resource_create_error), state: State(resource_type, resource_create_error), ) { - io.debug(state.live_resources) case msg { CheckIn(resource:, caller:) -> { // If the checked-in process currently has a live resource, remove it from @@ -215,18 +217,18 @@ fn handle_pool_message( let selector = case caller_live_resource { Ok(live_resource) -> { - process.demonitor_process(live_resource.monitor) - - state.selector - |> process.deselecting_process_down(live_resource.monitor) + demonitor_process(state.selector, live_resource.monitor) } Error(_) -> state.selector } let new_resources = deque.push_back(state.resources, resource) - actor.continue( - State(..state, resources: new_resources, live_resources:, selector:), + actor.with_selector( + actor.continue( + State(..state, resources: new_resources, live_resources:, selector:), + ), + selector, ) } CheckOut(reply_to:, caller:) -> { @@ -249,7 +251,10 @@ fn handle_pool_message( True -> { use resource <- result.try( state.create_resource() - |> result.map_error(CheckOutResourceCreateError), + |> result.map_error(fn(err) { + log_resource_creation_error(err) + CheckOutResourceCreateError(err) + }), ) // Checked-in resources queue hasn't changed, but we've added a new resource // so current size has increased @@ -268,10 +273,7 @@ fn handle_pool_message( } Ok(#(resource, new_resources, new_current_size)) -> { // Monitor the caller process - let monitor = process.monitor_process(caller) - let selector = - state.selector - |> process.selecting_process_down(monitor, CallerDown) + let #(monitor, selector) = monitor_process(state.selector, caller) let live_resources = dict.insert( @@ -281,14 +283,17 @@ fn handle_pool_message( ) actor.send(reply_to, Ok(resource)) - actor.continue( - State( - ..state, - resources: new_resources, - current_size: new_current_size, - selector:, - live_resources:, + actor.with_selector( + actor.continue( + State( + ..state, + resources: new_resources, + current_size: new_current_size, + selector:, + live_resources:, + ), ), + selector, ) } } @@ -302,7 +307,6 @@ fn handle_pool_message( actor.Stop(exit_message.reason) } CallerDown(process_down) -> { - io.debug("Caller down") // If the caller was a live resource, either create a new one or // decrement the current size depending on the creation strategy case dict.get(state.live_resources, process_down.pid) { @@ -310,10 +314,8 @@ fn handle_pool_message( Error(_) -> actor.continue(state) Ok(live_resource) -> { // Demonitor the process - process.demonitor_process(live_resource.monitor) let selector = - state.selector - |> process.deselecting_process_down(live_resource.monitor) + demonitor_process(state.selector, live_resource.monitor) let #(new_resources, new_current_size) = case state.creation_strategy @@ -331,25 +333,27 @@ fn handle_pool_message( ) // Size has changed Error(resource_create_error) -> { - io.debug("Bath: Resource creation failed") - io.debug(resource_create_error) + log_resource_creation_error(resource_create_error) #(state.resources, state.current_size) } } } } - actor.continue( - State( - ..state, - resources: new_resources, - current_size: new_current_size, - selector:, - live_resources: dict.delete( - state.live_resources, - process_down.pid, + actor.with_selector( + actor.continue( + State( + ..state, + resources: new_resources, + current_size: new_current_size, + selector:, + live_resources: dict.delete( + state.live_resources, + process_down.pid, + ), ), ), + selector, ) } } @@ -393,3 +397,33 @@ fn pool_spec( actor.Ready(state, selector) }) } + +// ----- Utils ----- // + +fn monitor_process( + selector: process.Selector(Msg(resource_type, resource_create_error)), + pid: Pid, +) { + let monitor = process.monitor_process(pid) + let selector = + selector + |> process.selecting_process_down(monitor, CallerDown) + #(monitor, selector) +} + +fn demonitor_process( + selector: process.Selector(Msg(resource_type, resource_create_error)), + monitor: process.ProcessMonitor, +) { + let selector = + selector + |> process.deselecting_process_down(monitor) + selector +} + +fn log_resource_creation_error(resource_create_error: resource_create_error) { + logging.log( + logging.Error, + "Bath: Resource creation failed: " <> string.inspect(resource_create_error), + ) +} diff --git a/test/bath_test.gleam b/test/bath_test.gleam index 32a4fbc..fcc40ed 100644 --- a/test/bath_test.gleam +++ b/test/bath_test.gleam @@ -3,8 +3,10 @@ import gleam/erlang/process import gleam/io import gleeunit import gleeunit/should +import logging pub fn main() { + logging.configure() gleeunit.main() } @@ -71,6 +73,9 @@ pub fn pool_handles_caller_crash_test() { |> bath.with_size(1) |> bath.start(1000) + // Expect an error message here + logging.set_level(logging.Critical) + process.start( fn() { use _ <- bath.apply(pool, 1000) @@ -79,7 +84,10 @@ pub fn pool_handles_caller_crash_test() { False, ) - process.sleep(1000) + process.sleep(100) + + // Reset level + logging.configure() // Ensure the pool still has an available resource let assert Ok(10) = bath.apply(pool, 1000, fn(r) { r }) From ce64c15841cf59c70c2cd849aef399d9da3b7a28 Mon Sep 17 00:00:00 2001 From: Isaac Harris-Holt Date: Sun, 29 Dec 2024 21:55:03 +0000 Subject: [PATCH 5/7] docs and cleanup --- README.md | 8 ++- gleam.toml | 6 +- manifest.toml | 6 +- src/bath.gleam | 127 +++++++++++++++++++++++++++++++++++++++++-- src/bath_ffi.erl | 6 -- test/bath_test.gleam | 10 ++-- 6 files changed, 136 insertions(+), 27 deletions(-) delete mode 100644 src/bath_ffi.erl diff --git a/README.md b/README.md index 1b238ef..4d54ed4 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ any value, such as database connections, file handles, or other resources. ## Installation ```sh -gleam add bath@1 +gleam add bath@2 ``` ## Usage @@ -20,7 +20,11 @@ import fake_db pub fn main() { // Create a pool of 10 connections to some fictional database. - let assert Ok(pool) = bath.init(10, fn() { fake_db.connect() }) + let assert Ok(pool) = + bath.new(fn() { fake_db.connect() }) + |> bath.with_size(10) + |> bath.with_shutdown(fn(conn) { fake_db.close(conn) }) + |> bath.start(1000) // Use the pool. Shown here in a block to use `use`. let assert Ok("Hello!") = { diff --git a/gleam.toml b/gleam.toml index 56b4e32..c8b1ade 100644 --- a/gleam.toml +++ b/gleam.toml @@ -1,17 +1,15 @@ name = "bath" -version = "1.0.0" +version = "2.0.0" description = "A resource pool for Gleam!" licences = ["MIT"] repository = { type = "github", user = "Pevensie", repo = "bath" } # links = [{ title = "Website", href = "" }] [dependencies] -gleam_stdlib = ">= 0.34.0 and < 2.0.0" +gleam_stdlib = ">= 0.51.0 and < 2.0.0" gleam_otp = ">= 0.16.0 and < 1.0.0" gleam_erlang = ">= 0.33.0 and < 1.0.0" gleam_deque = ">= 1.0.0 and < 2.0.0" -lamb = ">= 0.6.1 and < 1.0.0" -bravo = ">= 4.0.1 and < 5.0.0" logging = ">= 1.3.0 and < 2.0.0" [dev-dependencies] diff --git a/manifest.toml b/manifest.toml index d147529..583c2bf 100644 --- a/manifest.toml +++ b/manifest.toml @@ -2,22 +2,18 @@ # You typically do not need to edit this file packages = [ - { name = "bravo", version = "4.0.1", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "bravo", source = "hex", outer_checksum = "D450DCD5A896ADDE442A93A7D6B5B2785374579A35416819E29E024887C2A872" }, { name = "gleam_deque", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_deque", source = "hex", outer_checksum = "64D77068931338CF0D0CB5D37522C3E3CCA7CB7D6C5BACB41648B519CC0133C7" }, { name = "gleam_erlang", version = "0.33.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "A1D26B80F01901B59AABEE3475DD4C18D27D58FA5C897D922FCB9B099749C064" }, { name = "gleam_otp", version = "0.16.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "FA0EB761339749B4E82D63016C6A18C4E6662DA05BAB6F1346F9AF2E679E301A" }, { name = "gleam_stdlib", version = "0.51.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "14AFA8D3DDD7045203D422715DBB822D1725992A31DF35A08D97389014B74B68" }, { name = "gleeunit", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "F7A7228925D3EE7D0813C922E062BFD6D7E9310F0BEE585D3A42F3307E3CFD13" }, - { name = "lamb", version = "0.6.1", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "lamb", source = "hex", outer_checksum = "A74714DE60B3BADB623DFFF910C843793AE660222A9AD63C70053D33C0C3D311" }, { name = "logging", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "logging", source = "hex", outer_checksum = "1098FBF10B54B44C2C7FDF0B01C1253CAFACDACABEFB4B0D027803246753E06D" }, ] [requirements] -bravo = { version = ">= 4.0.1 and < 5.0.0" } gleam_deque = { version = ">= 1.0.0 and < 2.0.0" } gleam_erlang = { version = ">= 0.33.0 and < 1.0.0" } gleam_otp = { version = ">= 0.16.0 and < 1.0.0" } -gleam_stdlib = { version = ">= 0.34.0 and < 2.0.0" } +gleam_stdlib = { version = ">= 0.51.0 and < 2.0.0" } gleeunit = { version = ">= 1.0.0 and < 2.0.0" } -lamb = { version = ">= 0.6.1 and < 1.0.0" } logging = { version = ">= 1.3.0 and < 2.0.0" } diff --git a/src/bath.gleam b/src/bath.gleam index 213771d..d53c0ef 100644 --- a/src/bath.gleam +++ b/src/bath.gleam @@ -1,9 +1,8 @@ import gleam/deque import gleam/dict.{type Dict} -import gleam/erlang/charlist.{type Charlist} +import gleam/dynamic import gleam/erlang/process.{type Pid, type Subject} import gleam/function -import gleam/io import gleam/list import gleam/otp/actor import gleam/result @@ -12,16 +11,21 @@ import logging // ---- Pool config ----- // +/// The strategy used to check out a resource from the pool. pub type CheckoutStrategy { FIFO LIFO } +/// How to create resources in the pool. `Lazy` will create resources when +/// required (i.e. the pool is empty but has extra capacity), while `Eager` will +/// create the maximum number of resources upfront. pub type CreationStrategy { Lazy Eager } +/// Configuration for a [`Pool`](#Pool). pub opaque type PoolConfig(resource_type, resource_create_error) { PoolConfig( size: Int, @@ -32,18 +36,42 @@ pub opaque type PoolConfig(resource_type, resource_create_error) { ) } +/// Create a new [`PoolConfig`](#PoolConfig) for creating a pool of resources. +/// +/// ```gleam +/// import bath +/// import fake_db +/// +/// pub fn main() { +/// // Create a pool of 10 connections to some fictional database. +/// let assert Ok(pool) = +/// bath.new(fn() { fake_db.connect() }) +/// |> bath.with_size(10) +/// |> bath.start(1000) +/// } +/// ``` +/// +/// ### Default values +/// +/// | Config | Default | +/// |--------|---------| +/// | `size` | 10 | +/// | `shutdown_resource` | `fn(_resource) { Nil }` | +/// | `checkout_strategy` | `FIFO` | +/// | `creation_strategy` | `Lazy` | pub fn new( resource create_resource: fn() -> Result(resource_type, resource_create_error), ) -> PoolConfig(resource_type, resource_create_error) { PoolConfig( size: 10, - create_resource: create_resource, + create_resource:, shutdown_resource: fn(_) { Nil }, checkout_strategy: FIFO, creation_strategy: Lazy, ) } +/// Set the pool size. Defaults to 10. pub fn with_size( config pool_config: PoolConfig(resource_type, resource_create_error), size size: Int, @@ -51,6 +79,7 @@ pub fn with_size( PoolConfig(..pool_config, size:) } +/// Set a shutdown function to be run for each resource when the pool exits. pub fn with_shutdown( config pool_config: PoolConfig(resource_type, resource_create_error), shutdown shutdown_resource: fn(resource_type) -> Nil, @@ -58,6 +87,7 @@ pub fn with_shutdown( PoolConfig(..pool_config, shutdown_resource:) } +/// Change the checkout strategy for the pool. Defaults to `FIFO`. pub fn with_checkout_strategy( config pool_config: PoolConfig(resource_type, resource_create_error), strategy checkout_strategy: CheckoutStrategy, @@ -65,6 +95,7 @@ pub fn with_checkout_strategy( PoolConfig(..pool_config, checkout_strategy:) } +/// Change the resource creation strategy for the pool. Defaults to `Lazy`. pub fn with_creation_strategy( config pool_config: PoolConfig(resource_type, resource_create_error), strategy creation_strategy: CreationStrategy, @@ -74,17 +105,34 @@ pub fn with_creation_strategy( // ----- Lifecycle functions ---- // +/// An error returned when creating a [`Pool`](#Pool). pub type StartError(resource_create_error) { PoolStartResourceCreateError(resource_create_error) ActorStartError(actor.StartError) } +/// An error returned when failing to apply a function to a pooled resource. pub type ApplyError(resource_create_error) { NoResourcesAvailable CheckOutResourceCreateError(resource_create_error) CheckOutTimeout } +/// An error returned when the resource pool fails to shut down. +pub type ShutdownError { + /// There are still resources checked out. Ignore this failure case by + /// calling [`force_shutdown`](#force_shutdown) function. + ResourcesInUse + /// The shutdown timeout expired. + ShutdownTimeout + /// The pool was already down or failed to send the response message. + CalleeDown(reason: dynamic.Dynamic) +} + +/// Create a child actor spec pool actor, for use in your application's supervision tree, +/// using the given [`PoolConfig`](#PoolConfig). Once the pool is started, use +/// [`from_subject`](#from_subject) to create a [`Pool`](#Pool) from the +/// `Subject(bath.Msg)`. pub fn child_spec( config pool_config: PoolConfig(resource_type, resource_create_error), timeout init_timeout: Int, @@ -111,6 +159,17 @@ pub fn child_spec( Ok(pool_spec(pool_config, resources, current_size, init_timeout)) } +/// Create a [`Pool`](#Pool) from a `Subject(bath.Msg)`. Useful when +/// creating a pool as part of a supervision tree via the +/// [`child_spec`](#child_spec) function. +pub fn from_subject( + subject: Subject(Msg(resource_type, resource_create_error)), +) -> Pool(resource_type, resource_create_error) { + Pool(subject:) +} + +/// Start a pool actor using the given [`PoolConfig`](#PoolConfig) and return a +/// [`Pool`](#Pool). pub fn start( config pool_config: PoolConfig(resource_type, resource_create_error), timeout init_timeout: Int, @@ -146,6 +205,17 @@ fn check_in( process.send(pool.subject, CheckIn(resource:, caller:)) } +/// Check out a resource from the pool, apply the `next` function, then check +/// the resource back in. +/// +/// ```gleam +/// let assert Ok(pool) = +/// bath.new(fn() { Ok("Some pooled resource") }) +/// |> bath.start(1000) +/// +/// use resource <- bath.apply(pool, 1000) +/// // Do stuff with resource... +/// ``` pub fn apply( pool: Pool(resource_type, resource_create_error), timeout: Int, @@ -161,16 +231,36 @@ pub fn apply( Ok(usage_result) } -pub fn shutdown(pool: Pool(resource_type, resource_create_error)) { - process.send_exit(pool.subject |> process.subject_owner) +/// Shut down the pool, calling the `shutdown_function` on each +/// resource in the pool. Calling with `force` set to `True` will +/// force the shutdown, not calling the `shutdown_function` on any +/// resources. +/// +/// Will fail if there are still resources checked out, unless `force` is +/// `True`. +pub fn shutdown( + pool: Pool(resource_type, resource_create_error), + force: Bool, + timeout: Int, +) { + process.try_call(pool.subject, Shutdown(_, force), timeout) + |> result.map_error(fn(err) { + case err { + process.CallTimeout -> ShutdownTimeout + process.CalleeDown(reason) -> CalleeDown(reason:) + } + }) + |> result.flatten } // ----- Pool ----- // +/// The interface for interacting with a pool of resources in Bath. pub opaque type Pool(resource_type, resource_create_error) { Pool(subject: Subject(Msg(resource_type, resource_create_error))) } +/// Pool actor state. pub opaque type State(resource_type, resource_create_error) { State( // Config @@ -194,6 +284,7 @@ type LiveResource(resource_type) { LiveResource(resource: resource_type, monitor: process.ProcessMonitor) } +/// A message sent to the pool actor. pub opaque type Msg(resource_type, resource_create_error) { CheckIn(resource: resource_type, caller: Pid) CheckOut( @@ -202,6 +293,7 @@ pub opaque type Msg(resource_type, resource_create_error) { ) PoolExit(process.ExitMessage) CallerDown(process.ProcessDown) + Shutdown(reply_to: process.Subject(Result(Nil, ShutdownError)), force: Bool) } fn handle_pool_message( @@ -306,6 +398,28 @@ fn handle_pool_message( actor.Stop(exit_message.reason) } + Shutdown(reply_to:, force:) -> { + case dict.size(state.live_resources), force { + // No live resource, shut down + 0, _ -> { + state.resources + |> deque.to_list + |> list.each(state.shutdown_resource) + + actor.send(reply_to, Ok(Nil)) + actor.Stop(process.Normal) + } + _, True -> { + // Force shutdown + actor.send(reply_to, Ok(Nil)) + actor.Stop(process.Normal) + } + _, False -> { + actor.send(reply_to, Error(ResourcesInUse)) + actor.continue(state) + } + } + } CallerDown(process_down) -> { // If the caller was a live resource, either create a new one or // decrement the current size depending on the creation strategy @@ -317,6 +431,9 @@ fn handle_pool_message( let selector = demonitor_process(state.selector, live_resource.monitor) + // Shutdown the old resource + state.shutdown_resource(live_resource.resource) + let #(new_resources, new_current_size) = case state.creation_strategy { diff --git a/src/bath_ffi.erl b/src/bath_ffi.erl deleted file mode 100644 index c838425..0000000 --- a/src/bath_ffi.erl +++ /dev/null @@ -1,6 +0,0 @@ --module(bath_ffi). - --export([unsafe_coerce/1]). - -unsafe_coerce(Term) -> - Term. diff --git a/test/bath_test.gleam b/test/bath_test.gleam index fcc40ed..2c725c3 100644 --- a/test/bath_test.gleam +++ b/test/bath_test.gleam @@ -22,7 +22,7 @@ pub fn lifecycle_test() { }) |> bath.start(1000) let assert Ok(20) = bath.apply(pool, 1000, fn(n) { n * 2 }) - bath.shutdown(pool) + let assert Ok(Nil) = bath.shutdown(pool, False, 1000) } pub fn empty_pool_fails_to_apply_test() { @@ -32,7 +32,7 @@ pub fn empty_pool_fails_to_apply_test() { |> bath.start(1000) let assert Error(bath.NoResourcesAvailable) = bath.apply(pool, 1000, fn(_) { Nil }) - bath.shutdown(pool) + let assert Ok(Nil) = bath.shutdown(pool, False, 1000) } pub fn pool_has_correct_capacity_test() { @@ -48,7 +48,7 @@ pub fn pool_has_correct_capacity_test() { bath.apply(pool, 1000, fn(_) { Nil }) Nil }) - bath.shutdown(pool) + let assert Ok(Nil) = bath.shutdown(pool, False, 1000) } pub fn pool_has_correct_resources_test() { @@ -64,7 +64,7 @@ pub fn pool_has_correct_resources_test() { |> should.equal(10) }) - bath.shutdown(pool) + let assert Ok(Nil) = bath.shutdown(pool, False, 1000) } pub fn pool_handles_caller_crash_test() { @@ -92,5 +92,5 @@ pub fn pool_handles_caller_crash_test() { // Ensure the pool still has an available resource let assert Ok(10) = bath.apply(pool, 1000, fn(r) { r }) - bath.shutdown(pool) + let assert Ok(Nil) = bath.shutdown(pool, False, 1000) } From 28f2d6b9b5cc77ade267057a6ae8ef2d995a0f39 Mon Sep 17 00:00:00 2001 From: Isaac Harris-Holt Date: Sun, 29 Dec 2024 21:57:53 +0000 Subject: [PATCH 6/7] docs corrections --- src/bath.gleam | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/bath.gleam b/src/bath.gleam index d53c0ef..47bd436 100644 --- a/src/bath.gleam +++ b/src/bath.gleam @@ -121,7 +121,7 @@ pub type ApplyError(resource_create_error) { /// An error returned when the resource pool fails to shut down. pub type ShutdownError { /// There are still resources checked out. Ignore this failure case by - /// calling [`force_shutdown`](#force_shutdown) function. + /// calling [`shutdown`](#shutdown) function with `force` set to `True`. ResourcesInUse /// The shutdown timeout expired. ShutdownTimeout @@ -132,7 +132,7 @@ pub type ShutdownError { /// Create a child actor spec pool actor, for use in your application's supervision tree, /// using the given [`PoolConfig`](#PoolConfig). Once the pool is started, use /// [`from_subject`](#from_subject) to create a [`Pool`](#Pool) from the -/// `Subject(bath.Msg)`. +/// `Subject(bath.Msg(a, b))`. pub fn child_spec( config pool_config: PoolConfig(resource_type, resource_create_error), timeout init_timeout: Int, @@ -159,11 +159,11 @@ pub fn child_spec( Ok(pool_spec(pool_config, resources, current_size, init_timeout)) } -/// Create a [`Pool`](#Pool) from a `Subject(bath.Msg)`. Useful when +/// Create a [`Pool`](#Pool) from a `Subject(bath.Msg(a, b))`. Useful when /// creating a pool as part of a supervision tree via the /// [`child_spec`](#child_spec) function. pub fn from_subject( - subject: Subject(Msg(resource_type, resource_create_error)), + subject subject: Subject(Msg(resource_type, resource_create_error)), ) -> Pool(resource_type, resource_create_error) { Pool(subject:) } @@ -217,9 +217,9 @@ fn check_in( /// // Do stuff with resource... /// ``` pub fn apply( - pool: Pool(resource_type, resource_create_error), - timeout: Int, - next: fn(resource_type) -> result_type, + pool pool: Pool(resource_type, resource_create_error), + timeout timeout: Int, + next next: fn(resource_type) -> result_type, ) -> Result(result_type, ApplyError(resource_create_error)) { let self = process.self() use resource <- result.try(check_out(pool, self, timeout)) @@ -239,9 +239,9 @@ pub fn apply( /// Will fail if there are still resources checked out, unless `force` is /// `True`. pub fn shutdown( - pool: Pool(resource_type, resource_create_error), - force: Bool, - timeout: Int, + pool pool: Pool(resource_type, resource_create_error), + force force: Bool, + timeout timeout: Int, ) { process.try_call(pool.subject, Shutdown(_, force), timeout) |> result.map_error(fn(err) { From ce32ac4a7eb321d67185cf9613b30ea3769f7d85 Mon Sep 17 00:00:00 2001 From: Isaac Harris-Holt Date: Sun, 29 Dec 2024 22:02:48 +0000 Subject: [PATCH 7/7] changelog --- CHANGELOG.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..6109ab6 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,19 @@ +# Changelog + +## v2.0.0 - 2024-12-29 + +- Switch to a builder pattern for pool configuration. +- Change the `shutdown` API to take a `force` argument instead of having a separate + `force_shutdown` function. +- Improve reliability by tracking live resources and monitoring the calling process, + re-adding resources to the pool if the caller dies. +- Add checkout strategies (`FIFO` and `LIFO`). +- Add resource creation strategies (`Lazy` and `Eager`). +- Added `child_spec` and `from_subject` functions for creating a pool as part of a + supervision tree. + +Thanks to @lpil for their help debugging this release! + +## v1.0.0 - 2024-12-12 + +- Initial release