Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## v4.1.0 - 2025-06-21

- Added `bath.supervised_map` to create a supervised pool of resources while mapping
the `Pool` value to a new type.
- Fixed an issue where Bath would fail to demonitor processes once a resource had been
checked back into the pool.

## v4.0.0 - 2025-06-20

- The function passed to `bath.apply` must now return a `bath.Next(return)` value to
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ any value, such as database connections, file handles, or other resources.
## Installation

```sh
gleam add bath@3
gleam add bath
```

## Usage
Expand Down
2 changes: 1 addition & 1 deletion gleam.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name = "bath"
version = "4.0.0"
version = "4.1.0"
description = "A resource pool for Gleam!"
licences = ["MIT"]
repository = { type = "github", user = "Pevensie", repo = "bath" }
Expand Down
16 changes: 15 additions & 1 deletion src/bath.gleam
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import gleam/deque
import gleam/dict.{type Dict}
import gleam/erlang/process.{type Pid, type Subject}
import gleam/function
import gleam/int
import gleam/list
import gleam/otp/actor
Expand Down Expand Up @@ -163,14 +164,26 @@ pub fn supervised(
builder builder: Builder(resource_type),
receiver pool_receiver: Subject(Pool(resource_type)),
timeout init_timeout: Int,
) {
supervised_map(builder, pool_receiver, function.identity, init_timeout)
}

/// Like [`supervised`](#supervised), but allows you to pass a mapping function to
/// transform the pool handler before sending it to the receiver. This is mostly
/// useful for library authors who wish to use Bath to create a pool of resources.
pub fn supervised_map(
builder builder: Builder(resource_type),
receiver pool_receiver: Subject(a),
using mapper: fn(Pool(resource_type)) -> a,
timeout init_timeout: Int,
) {
supervision.worker(fn() {
use started <- result.try(
actor_builder(builder, init_timeout)
|> actor.start,
)

process.send(pool_receiver, Pool(started.data))
process.send(pool_receiver, mapper(Pool(started.data)))
Ok(started)
})
}
Expand Down Expand Up @@ -607,6 +620,7 @@ fn demonitor_process(
selector: process.Selector(Msg(resource_type)),
monitor: process.Monitor,
) {
process.demonitor_process(monitor)
let selector =
selector
|> process.deselect_specific_monitor(monitor)
Expand Down
41 changes: 41 additions & 0 deletions test/bath_test.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import gleam/function
import gleam/int
import gleam/io
import gleam/otp/actor
import gleam/otp/static_supervisor
import gleeunit
import gleeunit/should
import logging
Expand All @@ -28,6 +29,46 @@ pub fn lifecycle_test() {
let assert Ok(Nil) = bath.shutdown(pool, False, 1000)
}

pub fn supervised_lifecycle_test() {
let pool_receiver = process.new_subject()

let bath_child_spec =
bath.new(fn() { Ok(10) })
|> bath.size(1)
|> bath.supervised(pool_receiver, 1000)

let assert Ok(_started) =
static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(bath_child_spec)
|> static_supervisor.start

let assert Ok(pool) = process.receive(pool_receiver, 1000)

let assert Ok(_) = bath.shutdown(pool, False, 1000)
}

type Mapped(a) {
Mapped(a)
}

pub fn supervised_map_test() {
let number_receiver = process.new_subject()

let bath_child_spec =
bath.new(fn() { Ok(10) })
|> bath.size(1)
|> bath.supervised_map(number_receiver, Mapped, 1000)

let assert Ok(_started) =
static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(bath_child_spec)
|> static_supervisor.start

let assert Ok(Mapped(pool)) = process.receive(number_receiver, 1000)

let assert Ok(_) = bath.shutdown(pool, False, 1000)
}

pub fn empty_pool_fails_to_apply_test() {
let assert Ok(pool) =
bath.new(fn() { Ok(10) })
Expand Down