From c6ddcf91c9e1dbfd2d9cfafb5f341ccb0dd68643 Mon Sep 17 00:00:00 2001 From: Nick Dichev Date: Thu, 15 Aug 2024 11:55:58 -0700 Subject: [PATCH 01/12] Replace :max_concurrency option with :strategy --- lib/flame/pool.ex | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/lib/flame/pool.ex b/lib/flame/pool.ex index 05401a3..16275b8 100644 --- a/lib/flame/pool.ex +++ b/lib/flame/pool.ex @@ -26,7 +26,7 @@ defmodule FLAME.Pool do children = [ ..., - {FLAME.Pool, name: MyRunner, min: 1, max: 10, max_concurrency: 100} + {FLAME.Pool, name: MyRunner, min: 1, max: 10} ] See `start_link/1` for supported options. @@ -40,7 +40,7 @@ defmodule FLAME.Pool do alias FLAME.{Pool, Runner, Queue, CodeSync} alias FLAME.Pool.{RunnerState, WaitingState, Caller} - @default_max_concurrency 100 + @default_strategy {Pool.PerRunnerMaxConcurrencyStrategy, [max_concurrency: 100]} @boot_timeout 30_000 @idle_shutdown_after 30_000 @async_boot_debounce 1_000 @@ -55,7 +55,7 @@ defmodule FLAME.Pool do min_idle_shutdown_after: nil, min: nil, max: nil, - max_concurrency: nil, + strategy: nil, callers: %{}, waiting: Queue.new(), runners: %{}, @@ -89,8 +89,7 @@ defmodule FLAME.Pool do * `:max` - The maximum number of runners to elastically grow to in the pool. - * `:max_concurrency` - The maximum number of concurrent executions per runner before - booting new runners or queueing calls. Defaults to `100`. + * `:strategy` - The strategy to use. Defaults to `FLAME.Pool.PerRunnerMaxConcurrencyStrategy`. * `:single_use` - if `true`, runners will be terminated after each call completes. Defaults `false`. @@ -183,7 +182,7 @@ defmodule FLAME.Pool do ], min: 1, max: 1, - max_concurrency: 10, + strategy: {FLAME.Pool.PerRunnerMaxConcurrencyStrategy, [max_concurrency: 10]}, backend: {FLAME.FlyBackend, cpu_kind: "performance", cpus: 4, memory_mb: 8192, token: System.fetch_env!("FLY_API_TOKEN"), @@ -203,7 +202,7 @@ defmodule FLAME.Pool do :min_idle_shutdown_after, :min, :max, - :max_concurrency, + :strategy, :backend, :log, :single_use, @@ -417,7 +416,7 @@ defmodule FLAME.Pool do boot_timeout: boot_timeout, idle_shutdown_after: Keyword.get(opts, :idle_shutdown_after, @idle_shutdown_after), min_idle_shutdown_after: Keyword.get(opts, :min_idle_shutdown_after, :infinity), - max_concurrency: Keyword.get(opts, :max_concurrency, @default_max_concurrency), + strategy: Keyword.get(opts, :strategy, @default_strategy), on_grow_start: opts[:on_grow_start], on_grow_end: opts[:on_grow_end], on_shrink: opts[:on_shrink], From d0a47e071cdcd2dd462072a732d99d53aca13e2f Mon Sep 17 00:00:00 2001 From: Nick Dichev Date: Thu, 15 Aug 2024 13:38:33 -0700 Subject: [PATCH 02/12] Update tests to use :strategy instead of :max_concurrency --- test/flame_test.exs | 95 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 79 insertions(+), 16 deletions(-) diff --git a/test/flame_test.exs b/test/flame_test.exs index 97ea75e..6abc76f 100644 --- a/test/flame_test.exs +++ b/test/flame_test.exs @@ -28,7 +28,11 @@ defmodule FLAME.FLAMETest do {:ok, runner_sup: runner_sup, pool_pid: pool_pid} end - @tag runner: [min: 1, max: 2, max_concurrency: 2] + @tag runner: [ + min: 1, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2} + ] test "init boots min runners synchronously and grows on demand", %{runner_sup: runner_sup} = config do min_pool = Supervisor.which_children(runner_sup) @@ -58,7 +62,11 @@ defmodule FLAME.FLAMETest do assert new_pool == Supervisor.which_children(runner_sup) end - @tag runner: [min: 0, max: 1, max_concurrency: 2] + @tag runner: [ + min: 0, + max: 1, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2} + ] test "concurrent calls on fully pending runners", %{runner_sup: runner_sup} = config do assert Supervisor.which_children(runner_sup) == [] @@ -97,7 +105,7 @@ defmodule FLAME.FLAMETest do @tag runner: [ min: 1, max: 2, - max_concurrency: 1, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 1}, on_grow_start: &__MODULE__.on_grow_start/1, on_grow_end: &__MODULE__.on_grow_end/2 ] @@ -139,7 +147,12 @@ defmodule FLAME.FLAMETest do end) end - @tag runner: [min: 1, max: 2, max_concurrency: 2, idle_shutdown_after: 500] + @tag runner: [ + min: 1, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 500 + ] test "idle shutdown", %{runner_sup: runner_sup} = config do sim_long_running(config.test, 100) sim_long_running(config.test, 100) @@ -160,7 +173,12 @@ defmodule FLAME.FLAMETest do Supervisor.which_children(runner_sup) end - @tag runner: [min: 1, max: 1, max_concurrency: 2, idle_shutdown_after: 500] + @tag runner: [ + min: 1, + max: 1, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 500 + ] test "pool runner DOWN exits any active checkouts", %{runner_sup: runner_sup} = config do {:ok, active_checkout} = sim_long_running(config.test, 10_000) Process.unlink(active_checkout) @@ -170,7 +188,12 @@ defmodule FLAME.FLAMETest do assert_receive {:DOWN, _ref, :process, ^active_checkout, :killed} end - @tag runner: [min: 0, max: 1, max_concurrency: 2, idle_shutdown_after: 50] + @tag runner: [ + min: 0, + max: 1, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 50 + ] test "call links", %{runner_sup: runner_sup} = config do ExUnit.CaptureLog.capture_log(fn -> parent = self() @@ -226,7 +249,12 @@ defmodule FLAME.FLAMETest do end) end - @tag runner: [min: 0, max: 1, max_concurrency: 2, idle_shutdown_after: 50] + @tag runner: [ + min: 0, + max: 1, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 50 + ] test "cast with link false", %{runner_sup: runner_sup} = config do ExUnit.CaptureLog.capture_log(fn -> assert Supervisor.which_children(runner_sup) == [] @@ -252,7 +280,12 @@ defmodule FLAME.FLAMETest do end describe "cast" do - @tag runner: [min: 1, max: 2, max_concurrency: 2, idle_shutdown_after: 500] + @tag runner: [ + min: 1, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 500 + ] test "normal execution", %{} = config do sim_long_running(config.test, 100) parent = self() @@ -278,7 +311,7 @@ defmodule FLAME.FLAMETest do @tag runner: [ min: 0, max: 2, - max_concurrency: 1, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 1}, on_grow_start: &__MODULE__.growth_grow_start/1 ] test "pool growth", %{} = config do @@ -303,7 +336,12 @@ defmodule FLAME.FLAMETest do refute_receive {:grow_start, _}, 1000 end - @tag runner: [min: 1, max: 2, max_concurrency: 2, idle_shutdown_after: 500] + @tag runner: [ + min: 1, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 500 + ] test "with exit and default link", %{} = config do ExUnit.CaptureLog.capture_log(fn -> Process.flag(:trap_exit, true) @@ -327,7 +365,12 @@ defmodule FLAME.FLAMETest do end describe "process placement" do - @tag runner: [min: 0, max: 2, max_concurrency: 2, idle_shutdown_after: 100] + @tag runner: [ + min: 0, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 100 + ] test "place_child/2", %{runner_sup: runner_sup} = config do assert [] = Supervisor.which_children(runner_sup) assert {:ok, pid} = FLAME.place_child(config.test, {Agent, fn -> 1 end}) @@ -355,7 +398,12 @@ defmodule FLAME.FLAMETest do assert_receive {:DOWN, _ref, :process, ^runner, _}, 1000 end - @tag runner: [min: 0, max: 2, max_concurrency: 2, idle_shutdown_after: 100] + @tag runner: [ + min: 0, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 100 + ] test "place_child links", %{runner_sup: runner_sup} = config do # links by default Process.flag(:trap_exit, true) @@ -389,7 +437,12 @@ defmodule FLAME.FLAMETest do assert_receive {:DOWN, _ref, :process, ^runner, _}, 1000 end - @tag runner: [min: 0, max: 2, max_concurrency: 2, idle_shutdown_after: 100] + @tag runner: [ + min: 0, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 100 + ] test "place_child when caller exits", %{runner_sup: runner_sup} = config do # links by default parent = self() @@ -464,7 +517,12 @@ defmodule FLAME.FLAMETest do assert_receive {:DOWN, ^monitor_ref, _, _, :normal} end - @tag runner: [min: 0, max: 2, max_concurrency: 2, idle_shutdown_after: 100] + @tag runner: [ + min: 0, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 100 + ] test "remote without tracking", config do name = :"#{config.test}_trackable" non_trackable = URI.new!("/") @@ -481,7 +539,12 @@ defmodule FLAME.FLAMETest do assert %MyTrackable{pid: nil} = map["yes"] end - @tag runner: [min: 0, max: 2, max_concurrency: 2, idle_shutdown_after: 100] + @tag runner: [ + min: 0, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 100 + ] test "remote with tracking", %{runner_sup: runner_sup} = config do name = :"#{config.test}_trackable" non_trackable = URI.new!("/") @@ -513,7 +576,7 @@ defmodule FLAME.FLAMETest do @tag runner: [ min: 0, max: 2, - max_concurrency: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, idle_shutdown_after: 100, track_resources: true ] From 989d1931da2ebead47ab202bda6ba94c26ca9265 Mon Sep 17 00:00:00 2001 From: Nick Dichev Date: Thu, 15 Aug 2024 16:18:54 -0700 Subject: [PATCH 03/12] Add FLAME.Pool.Strategy --- lib/flame/pool/strategy.ex | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 lib/flame/pool/strategy.ex diff --git a/lib/flame/pool/strategy.ex b/lib/flame/pool/strategy.ex new file mode 100644 index 0000000..3ea102a --- /dev/null +++ b/lib/flame/pool/strategy.ex @@ -0,0 +1,15 @@ +defmodule FLAME.Pool.Strategy do + alias FLAME.Pool + + @callback checkout_runner(state :: Pool.t(), opts :: Keyword.t()) :: + {:checkout, Pool.RunnerState.t()} | :wait | :scale + + @callback assign_waiting_callers( + state :: Pool.t(), + runner :: Pool.RunnerState.t(), + opts :: Keyword.t() + ) :: + Pool.t() + + @callback desired_count(state :: Pool.t(), opts :: Keyword.t()) :: non_neg_integer() +end From 460ff2a4b3e51b303fb448bb97018ae7e81d59ff Mon Sep 17 00:00:00 2001 From: Nick Dichev Date: Thu, 15 Aug 2024 16:19:35 -0700 Subject: [PATCH 04/12] Implement the existing "max concurrency per worker" strategy --- lib/flame/pool.ex | 95 ++++++++----------- .../per_worker_max_concurrency_strategy.ex | 73 ++++++++++++++ 2 files changed, 111 insertions(+), 57 deletions(-) create mode 100644 lib/flame/pool/per_worker_max_concurrency_strategy.ex diff --git a/lib/flame/pool.ex b/lib/flame/pool.ex index 16275b8..088aa25 100644 --- a/lib/flame/pool.ex +++ b/lib/flame/pool.ex @@ -1,6 +1,7 @@ defmodule FLAME.Pool.RunnerState do @moduledoc false + @type t :: %__MODULE__{} defstruct count: nil, pid: nil, monitor_ref: nil end @@ -45,6 +46,7 @@ defmodule FLAME.Pool do @idle_shutdown_after 30_000 @async_boot_debounce 1_000 + @type t :: %__MODULE__{} defstruct name: nil, runner_sup: nil, task_sup: nil, @@ -488,23 +490,14 @@ defmodule FLAME.Pool do {:noreply, checkout_runner(state, deadline, from)} end - defp runner_count(state) do + def runner_count(state) do map_size(state.runners) + map_size(state.pending_runners) end - defp waiting_count(%Pool{waiting: %Queue{} = waiting}) do + def waiting_count(%Pool{waiting: %Queue{} = waiting}) do Queue.size(waiting) end - defp min_runner(state) do - if map_size(state.runners) == 0 do - nil - else - {_ref, min} = Enum.min_by(state.runners, fn {_, %RunnerState{count: count}} -> count end) - min - end - end - defp replace_caller(state, checkout_ref, caller_pid, child_pid) do # replace caller with child pid and do not inc concurrency counts since we are replacing %{^caller_pid => %Caller{checkout_ref: ^checkout_ref} = caller} = state.callers @@ -543,29 +536,23 @@ defmodule FLAME.Pool do end defp checkout_runner(%Pool{} = state, deadline, from, monitor_ref \\ nil) do - min_runner = min_runner(state) - runner_count = runner_count(state) + {strategy_module, strategy_opts} = state.strategy - cond do - min_runner && min_runner.count < state.max_concurrency -> - reply_runner_checkout(state, min_runner, from, monitor_ref) + case strategy_module.checkout_runner(state, strategy_opts) do + :wait -> + waiting_in(state, deadline, from) - runner_count < state.max -> - if state.async_boot_timer || - map_size(state.pending_runners) * state.max_concurrency > waiting_count(state) do - waiting_in(state, deadline, from) - else - state - |> async_boot_runner() - |> waiting_in(deadline, from) - end + :scale -> + state + |> async_boot_runner() + |> waiting_in(deadline, from) - true -> - waiting_in(state, deadline, from) + {:checkout, runner} -> + reply_runner_checkout(state, runner, from, monitor_ref) end end - defp reply_runner_checkout(state, %RunnerState{} = runner, from, monitor_ref) do + def reply_runner_checkout(state, %RunnerState{} = runner, from, monitor_ref) do # we pass monitor_ref down from waiting so we don't need to remonitor if already monitoring {from_pid, checkout_ref} = from @@ -629,17 +616,28 @@ defmodule FLAME.Pool do end defp async_boot_runner(%Pool{on_grow_start: on_grow_start, name: name} = state) do - new_count = runner_count(state) + 1 - - task = - Task.Supervisor.async_nolink(state.task_sup, fn -> - if on_grow_start, do: on_grow_start.(%{count: new_count, name: name, pid: self()}) + {strategy_module, strategy_opts} = state.strategy + current_count = runner_count(state) + new_count = strategy_module.desired_count(state, strategy_opts) + + num_tasks = max(new_count - current_count, 0) + + if num_tasks do + tasks = + for _ <- 1..num_tasks do + Task.Supervisor.async_nolink(state.task_sup, fn -> + if on_grow_start, do: on_grow_start.(%{count: new_count, name: name, pid: self()}) + start_child_runner(state) + end) + end - start_child_runner(state) - end) + pending_runners = Map.new(tasks, &{&1.ref, &1.pid}) + new_pending = Map.merge(state.pending_runners, pending_runners) - new_pending = Map.put(state.pending_runners, task.ref, task.pid) - %Pool{state | pending_runners: new_pending} + %Pool{state | pending_runners: new_pending} + else + state + end end defp start_child_runner(%Pool{} = state, runner_opts \\ []) do @@ -719,7 +717,7 @@ defmodule FLAME.Pool do %Pool{state | waiting: Queue.delete_by_key(state.waiting, caller_pid)} end - defp pop_next_waiting_caller(%Pool{} = state) do + def pop_next_waiting_caller(%Pool{} = state) do result = Queue.pop_until(state.waiting, fn _pid, %WaitingState{} = waiting -> %WaitingState{from: {pid, _}, monitor_ref: ref, deadline: deadline} = waiting @@ -817,25 +815,8 @@ defmodule FLAME.Pool do {runner, new_state} = put_runner(new_state, pid) new_state = maybe_on_grow_end(new_state, task_pid, :ok) - # pop waiting callers up to max_concurrency, but we must handle: - # 1. the case where we have no waiting callers - # 2. the case where we process a DOWN for the new runner as we pop DOWNs - # looking for fresh waiting - # 3. if we still have waiting callers at the end, boot more runners if we have capacity - Enum.reduce_while(1..state.max_concurrency, new_state, fn i, acc -> - with {:ok, %RunnerState{} = runner} <- Map.fetch(acc.runners, runner.monitor_ref), - true <- i <= acc.max_concurrency do - case pop_next_waiting_caller(acc) do - {%WaitingState{} = next, acc} -> - {:cont, reply_runner_checkout(acc, runner, next.from, next.monitor_ref)} - - {nil, acc} -> - {:halt, acc} - end - else - _ -> {:halt, acc} - end - end) + {strategy_module, strategy_opts} = state.strategy + strategy_module.assign_waiting_callers(new_state, runner, strategy_opts) end defp deadline(timeout) when is_integer(timeout) do diff --git a/lib/flame/pool/per_worker_max_concurrency_strategy.ex b/lib/flame/pool/per_worker_max_concurrency_strategy.ex new file mode 100644 index 0000000..9247a5a --- /dev/null +++ b/lib/flame/pool/per_worker_max_concurrency_strategy.ex @@ -0,0 +1,73 @@ +defmodule FLAME.Pool.PerRunnerMaxConcurrencyStrategy do + alias FLAME.Pool + @behaviour FLAME.Pool.Strategy + + def checkout_runner(%Pool{} = pool, opts) do + min_runner = min_runner(pool) + runner_count = Pool.runner_count(pool) + max_concurrency = Keyword.fetch!(opts, :max_concurrency) + + cond do + min_runner && min_runner.count < state.max_concurrency -> + reply_runner_checkout(state, min_runner, from, monitor_ref) + {:checkout, min_runner} + + runner_count < state.max -> + if state.async_boot_timer || + map_size(state.pending_runners) * state.max_concurrency > waiting_count(state) do + :wait + else + :scale + + state + |> async_boot_runner() + |> waiting_in(deadline, from) + end + + true -> + :wait + end + end + + def assign_waiting_callers(%Pool{} = pool, %Pool.RunnerState{} = runner, opts) do + max_concurrency = Keyword.fetch!(opts, :max_concurrency) + + # pop waiting callers up to max_concurrency, but we must handle: + # 1. the case where we have no waiting callers + # 2. the case where we process a DOWN for the new runner as we pop DOWNs + # looking for fresh waiting + {pool, _assigned_concurrency} = + Enum.reduce_while(1..max_concurrency, {pool, 0}, fn _i, {pool, assigned_concurrency} -> + with {:ok, %Pool.RunnerState{} = runner} <- Map.fetch(pool.runners, runner.monitor_ref), + true <- assigned_concurrency <= max_concurrency do + case Pool.pop_next_waiting_caller(pool) do + {%Pool.WaitingState{} = next, pool} -> + pool = Pool.reply_runner_checkout(pool, runner, next.from, next.monitor_ref) + {:cont, {pool, assigned_concurrency + 1}} + + {nil, pool} -> + {:halt, {pool, assigned_concurrency}} + end + else + _ -> {:halt, {pool, assigned_concurrency}} + end + end) + + pool + end + + def desired_count(%Pool{} = pool, _opts) do + Pool.runner_count(pool) + 1 + end + + defp min_runner(pool) do + if map_size(pool.runners) == 0 do + nil + else + {_ref, min} = + Enum.min_by(pool.runners, fn {_, %Pool.RunnerState{count: count}} -> count end) + + min + end + end +end From 8753a6e567c7f94a73163b4fece1cb4aba9e2135 Mon Sep 17 00:00:00 2001 From: Nick Dichev Date: Thu, 15 Aug 2024 17:07:02 -0700 Subject: [PATCH 05/12] Add :checkout_and_scale action --- lib/flame/pool.ex | 6 ++++++ lib/flame/pool/strategy.ex | 9 +++++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/lib/flame/pool.ex b/lib/flame/pool.ex index 088aa25..fba3c22 100644 --- a/lib/flame/pool.ex +++ b/lib/flame/pool.ex @@ -549,6 +549,12 @@ defmodule FLAME.Pool do {:checkout, runner} -> reply_runner_checkout(state, runner, from, monitor_ref) + + {{:checkout, runner}, :scale} -> + state + |> reply_runner_checkout(runner, from, monitor_ref) + |> async_boot_runner() + |> waiting_in(deadline, from) end end diff --git a/lib/flame/pool/strategy.ex b/lib/flame/pool/strategy.ex index 3ea102a..9384406 100644 --- a/lib/flame/pool/strategy.ex +++ b/lib/flame/pool/strategy.ex @@ -1,8 +1,13 @@ defmodule FLAME.Pool.Strategy do alias FLAME.Pool - @callback checkout_runner(state :: Pool.t(), opts :: Keyword.t()) :: - {:checkout, Pool.RunnerState.t()} | :wait | :scale + @type action :: + :wait + | :scale + | {:checkout, Pool.RunnerState.t()} + | {{:checkout, Pool.RunnerState.t()}, :scale} + + @callback checkout_runner(state :: Pool.t(), opts :: Keyword.t()) :: action @callback assign_waiting_callers( state :: Pool.t(), From 58538d3ae0f557338c02d904db717ac962ada7d2 Mon Sep 17 00:00:00 2001 From: Nick Dichev Date: Fri, 16 Aug 2024 14:34:37 -0700 Subject: [PATCH 06/12] Seperate the pending count from the runner count --- lib/flame/pool.ex | 17 +++++++++++------ .../per_worker_max_concurrency_strategy.ex | 19 +++++++------------ 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/lib/flame/pool.ex b/lib/flame/pool.ex index fba3c22..b9bc5cf 100644 --- a/lib/flame/pool.ex +++ b/lib/flame/pool.ex @@ -491,13 +491,17 @@ defmodule FLAME.Pool do end def runner_count(state) do - map_size(state.runners) + map_size(state.pending_runners) + map_size(state.runners) end def waiting_count(%Pool{waiting: %Queue{} = waiting}) do Queue.size(waiting) end + def pending_count(state) do + map_size(state.pending_runners) + end + defp replace_caller(state, checkout_ref, caller_pid, child_pid) do # replace caller with child pid and do not inc concurrency counts since we are replacing %{^caller_pid => %Caller{checkout_ref: ^checkout_ref} = caller} = state.callers @@ -623,7 +627,8 @@ defmodule FLAME.Pool do defp async_boot_runner(%Pool{on_grow_start: on_grow_start, name: name} = state) do {strategy_module, strategy_opts} = state.strategy - current_count = runner_count(state) + + current_count = runner_count(state) + pending_count(state) new_count = strategy_module.desired_count(state, strategy_opts) num_tasks = max(new_count - current_count, 0) @@ -790,7 +795,7 @@ defmodule FLAME.Pool do end defp maybe_on_grow_end(%Pool{on_grow_end: on_grow_end} = state, pid, result) do - new_count = runner_count(state) + new_count = runner_count(state) + pending_count(state) meta = %{count: new_count, name: state.name, pid: pid} case result do @@ -802,15 +807,15 @@ defmodule FLAME.Pool do end defp maybe_on_shrink(%Pool{} = state) do - new_count = runner_count(state) + new_count = runner_count(state) + pending_count(state) if state.on_shrink, do: state.on_shrink.(%{count: new_count, name: state.name}) state end defp has_unmet_servicable_demand?(%Pool{} = state) do - waiting_count(state) > map_size(state.pending_runners) * state.max_concurrency and - runner_count(state) < state.max + runner_count = runner_count(state) + pending_count(state) + waiting_count(state) > map_size(state.pending_runners) * state.max_concurrency and runner_count < state.max end defp handle_runner_async_up(%Pool{} = state, pid, ref) when is_pid(pid) and is_reference(ref) do diff --git a/lib/flame/pool/per_worker_max_concurrency_strategy.ex b/lib/flame/pool/per_worker_max_concurrency_strategy.ex index 9247a5a..5f792b1 100644 --- a/lib/flame/pool/per_worker_max_concurrency_strategy.ex +++ b/lib/flame/pool/per_worker_max_concurrency_strategy.ex @@ -4,24 +4,19 @@ defmodule FLAME.Pool.PerRunnerMaxConcurrencyStrategy do def checkout_runner(%Pool{} = pool, opts) do min_runner = min_runner(pool) - runner_count = Pool.runner_count(pool) + runner_count = Pool.runner_count(pool) + Pool.pending_count(pool) max_concurrency = Keyword.fetch!(opts, :max_concurrency) cond do - min_runner && min_runner.count < state.max_concurrency -> - reply_runner_checkout(state, min_runner, from, monitor_ref) + min_runner && min_runner.count < max_concurrency -> {:checkout, min_runner} - runner_count < state.max -> - if state.async_boot_timer || - map_size(state.pending_runners) * state.max_concurrency > waiting_count(state) do + runner_count < pool.max -> + if pool.async_boot_timer || + map_size(pool.pending_runners) * max_concurrency > Pool.waiting_count(pool) do :wait else - :scale - - state - |> async_boot_runner() - |> waiting_in(deadline, from) + {{:checkout, min_runner}, :scale} end true -> @@ -57,7 +52,7 @@ defmodule FLAME.Pool.PerRunnerMaxConcurrencyStrategy do end def desired_count(%Pool{} = pool, _opts) do - Pool.runner_count(pool) + 1 + Pool.runner_count(pool) + Pool.pending_count(pool) + 1 end defp min_runner(pool) do From afe85350ddc0c57b75b80a43bcb5cad6a157d3ec Mon Sep 17 00:00:00 2001 From: Nick Dichev Date: Fri, 16 Aug 2024 14:59:38 -0700 Subject: [PATCH 07/12] Hand the strategy implementations a closure to pop waiters and assign runners --- lib/flame/pool.ex | 17 +++++++++++++---- .../pool/per_worker_max_concurrency_strategy.ex | 12 +++++++++--- lib/flame/pool/strategy.ex | 6 ++++++ 3 files changed, 28 insertions(+), 7 deletions(-) diff --git a/lib/flame/pool.ex b/lib/flame/pool.ex index b9bc5cf..04d4c21 100644 --- a/lib/flame/pool.ex +++ b/lib/flame/pool.ex @@ -562,7 +562,7 @@ defmodule FLAME.Pool do end end - def reply_runner_checkout(state, %RunnerState{} = runner, from, monitor_ref) do + defp reply_runner_checkout(state, %RunnerState{} = runner, from, monitor_ref) do # we pass monitor_ref down from waiting so we don't need to remonitor if already monitoring {from_pid, checkout_ref} = from @@ -728,7 +728,7 @@ defmodule FLAME.Pool do %Pool{state | waiting: Queue.delete_by_key(state.waiting, caller_pid)} end - def pop_next_waiting_caller(%Pool{} = state) do + defp pop_next_waiting_caller(%Pool{} = state) do result = Queue.pop_until(state.waiting, fn _pid, %WaitingState{} = waiting -> %WaitingState{from: {pid, _}, monitor_ref: ref, deadline: deadline} = waiting @@ -815,7 +815,9 @@ defmodule FLAME.Pool do defp has_unmet_servicable_demand?(%Pool{} = state) do runner_count = runner_count(state) + pending_count(state) - waiting_count(state) > map_size(state.pending_runners) * state.max_concurrency and runner_count < state.max + + waiting_count(state) > map_size(state.pending_runners) * state.max_concurrency and + runner_count < state.max end defp handle_runner_async_up(%Pool{} = state, pid, ref) when is_pid(pid) and is_reference(ref) do @@ -827,7 +829,14 @@ defmodule FLAME.Pool do new_state = maybe_on_grow_end(new_state, task_pid, :ok) {strategy_module, strategy_opts} = state.strategy - strategy_module.assign_waiting_callers(new_state, runner, strategy_opts) + + pop = fn state -> pop_next_waiting_caller(state) end + + checkout = fn state, runner, from, monitor_ref -> + reply_runner_checkout(state, runner, from, monitor_ref) + end + + strategy_module.assign_waiting_callers(new_state, runner, pop, checkout, strategy_opts) end defp deadline(timeout) when is_integer(timeout) do diff --git a/lib/flame/pool/per_worker_max_concurrency_strategy.ex b/lib/flame/pool/per_worker_max_concurrency_strategy.ex index 5f792b1..2f61beb 100644 --- a/lib/flame/pool/per_worker_max_concurrency_strategy.ex +++ b/lib/flame/pool/per_worker_max_concurrency_strategy.ex @@ -24,7 +24,13 @@ defmodule FLAME.Pool.PerRunnerMaxConcurrencyStrategy do end end - def assign_waiting_callers(%Pool{} = pool, %Pool.RunnerState{} = runner, opts) do + def assign_waiting_callers( + %Pool{} = pool, + %Pool.RunnerState{} = runner, + pop_next_waiting_caller, + reply_runner_checkout, + opts + ) do max_concurrency = Keyword.fetch!(opts, :max_concurrency) # pop waiting callers up to max_concurrency, but we must handle: @@ -35,9 +41,9 @@ defmodule FLAME.Pool.PerRunnerMaxConcurrencyStrategy do Enum.reduce_while(1..max_concurrency, {pool, 0}, fn _i, {pool, assigned_concurrency} -> with {:ok, %Pool.RunnerState{} = runner} <- Map.fetch(pool.runners, runner.monitor_ref), true <- assigned_concurrency <= max_concurrency do - case Pool.pop_next_waiting_caller(pool) do + case pop_next_waiting_caller.(pool) do {%Pool.WaitingState{} = next, pool} -> - pool = Pool.reply_runner_checkout(pool, runner, next.from, next.monitor_ref) + pool = reply_runner_checkout.(pool, runner, next.from, next.monitor_ref) {:cont, {pool, assigned_concurrency + 1}} {nil, pool} -> diff --git a/lib/flame/pool/strategy.ex b/lib/flame/pool/strategy.ex index 9384406..2c547ff 100644 --- a/lib/flame/pool/strategy.ex +++ b/lib/flame/pool/strategy.ex @@ -9,9 +9,15 @@ defmodule FLAME.Pool.Strategy do @callback checkout_runner(state :: Pool.t(), opts :: Keyword.t()) :: action + @type pop_next_waiting_caller_fun :: (Pool.t() -> {Pool.WaitingState.t() | nil, Pool.t()}) + @type reply_runner_checkout_fun :: + (Pool.t(), Pool.RunnerState.t(), pid(), reference() -> Pool.t()) + @callback assign_waiting_callers( state :: Pool.t(), runner :: Pool.RunnerState.t(), + pop_next_waiting_caller :: pop_next_waiting_caller_fun(), + reply_runner_checkout :: reply_runner_checkout_fun(), opts :: Keyword.t() ) :: Pool.t() From d216f51a1cba880d5c41bf069d7cd937b07e3490 Mon Sep 17 00:00:00 2001 From: Nick Dichev Date: Sun, 18 Aug 2024 12:12:59 -0700 Subject: [PATCH 08/12] Change checkout_runner API to return a list of actions so caller can premptively scale without waiting --- lib/flame/pool.ex | 25 ++++++------------- .../per_worker_max_concurrency_strategy.ex | 8 +++--- lib/flame/pool/strategy.ex | 3 +-- 3 files changed, 13 insertions(+), 23 deletions(-) diff --git a/lib/flame/pool.ex b/lib/flame/pool.ex index 04d4c21..90df358 100644 --- a/lib/flame/pool.ex +++ b/lib/flame/pool.ex @@ -542,24 +542,15 @@ defmodule FLAME.Pool do defp checkout_runner(%Pool{} = state, deadline, from, monitor_ref \\ nil) do {strategy_module, strategy_opts} = state.strategy - case strategy_module.checkout_runner(state, strategy_opts) do - :wait -> - waiting_in(state, deadline, from) + actions = strategy_module.checkout_runner(state, strategy_opts) - :scale -> - state - |> async_boot_runner() - |> waiting_in(deadline, from) - - {:checkout, runner} -> - reply_runner_checkout(state, runner, from, monitor_ref) - - {{:checkout, runner}, :scale} -> - state - |> reply_runner_checkout(runner, from, monitor_ref) - |> async_boot_runner() - |> waiting_in(deadline, from) - end + Enum.reduce(actions, state, fn action, acc -> + case action do + :wait -> waiting_in(acc, deadline, from) + :scale -> async_boot_runner(acc) + {:checkout, runner} -> reply_runner_checkout(acc, runner, from, monitor_ref) + end + end) end defp reply_runner_checkout(state, %RunnerState{} = runner, from, monitor_ref) do diff --git a/lib/flame/pool/per_worker_max_concurrency_strategy.ex b/lib/flame/pool/per_worker_max_concurrency_strategy.ex index 2f61beb..779591e 100644 --- a/lib/flame/pool/per_worker_max_concurrency_strategy.ex +++ b/lib/flame/pool/per_worker_max_concurrency_strategy.ex @@ -9,18 +9,18 @@ defmodule FLAME.Pool.PerRunnerMaxConcurrencyStrategy do cond do min_runner && min_runner.count < max_concurrency -> - {:checkout, min_runner} + [{:checkout, min_runner}] runner_count < pool.max -> if pool.async_boot_timer || map_size(pool.pending_runners) * max_concurrency > Pool.waiting_count(pool) do - :wait + [:wait] else - {{:checkout, min_runner}, :scale} + [:scale, :wait] end true -> - :wait + [:wait] end end diff --git a/lib/flame/pool/strategy.ex b/lib/flame/pool/strategy.ex index 2c547ff..accb66f 100644 --- a/lib/flame/pool/strategy.ex +++ b/lib/flame/pool/strategy.ex @@ -5,9 +5,8 @@ defmodule FLAME.Pool.Strategy do :wait | :scale | {:checkout, Pool.RunnerState.t()} - | {{:checkout, Pool.RunnerState.t()}, :scale} - @callback checkout_runner(state :: Pool.t(), opts :: Keyword.t()) :: action + @callback checkout_runner(state :: Pool.t(), opts :: Keyword.t()) :: list(action) @type pop_next_waiting_caller_fun :: (Pool.t() -> {Pool.WaitingState.t() | nil, Pool.t()}) @type reply_runner_checkout_fun :: From e8a0df232234528f76ad15f03ca48090607e7e5a Mon Sep 17 00:00:00 2001 From: Nick Dichev Date: Sun, 18 Aug 2024 12:51:59 -0700 Subject: [PATCH 09/12] Add t() for Pool.WaitingState --- lib/flame/pool.ex | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/flame/pool.ex b/lib/flame/pool.ex index 90df358..7379d81 100644 --- a/lib/flame/pool.ex +++ b/lib/flame/pool.ex @@ -8,6 +8,7 @@ end defmodule FLAME.Pool.WaitingState do @moduledoc false + @type t :: %__MODULE__{} defstruct from: nil, monitor_ref: nil, deadline: nil end From 0c68dfe151dc7468e1d447ae697f21e14f6b936c Mon Sep 17 00:00:00 2001 From: Nick Dichev Date: Sun, 18 Aug 2024 12:59:38 -0700 Subject: [PATCH 10/12] Rename PerRunnerMaxConcurrencyStrategy filename to match module --- ...urrency_strategy.ex => per_runner_max_concurrency_strategy.ex} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename lib/flame/pool/{per_worker_max_concurrency_strategy.ex => per_runner_max_concurrency_strategy.ex} (100%) diff --git a/lib/flame/pool/per_worker_max_concurrency_strategy.ex b/lib/flame/pool/per_runner_max_concurrency_strategy.ex similarity index 100% rename from lib/flame/pool/per_worker_max_concurrency_strategy.ex rename to lib/flame/pool/per_runner_max_concurrency_strategy.ex From 992b4a63216cbee0c8640623a3f70021d12c5ec1 Mon Sep 17 00:00:00 2001 From: Nick Dichev Date: Mon, 19 Aug 2024 15:48:00 -0700 Subject: [PATCH 11/12] Allow strategy implementations to implement has_unment_servicable_demand?/2 --- lib/flame/pool.ex | 22 +++++++++---------- .../per_runner_max_concurrency_strategy.ex | 5 +++++ lib/flame/pool/strategy.ex | 2 +- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/lib/flame/pool.ex b/lib/flame/pool.ex index 7379d81..3389035 100644 --- a/lib/flame/pool.ex +++ b/lib/flame/pool.ex @@ -754,8 +754,15 @@ defmodule FLAME.Pool do defp handle_down(%Pool{} = state, {:DOWN, ref, :process, pid, reason}) do state = maybe_drop_waiting(state, pid) + %{ + callers: callers, + runners: runners, + pending_runners: pending_runners, + strategy: {strategy_module, strategy_opts} + } = state + state = - case state.callers do + case callers do %{^pid => %Caller{monitor_ref: ^ref} = caller} -> drop_caller(state, pid, caller) @@ -764,16 +771,16 @@ defmodule FLAME.Pool do end state = - case state.runners do + case runners do %{^ref => _} -> drop_child_runner(state, ref) %{} -> state end - case state.pending_runners do + case pending_runners do %{^ref => _} -> state = %Pool{state | pending_runners: Map.delete(state.pending_runners, ref)} # we rate limit this to avoid many failed async boot attempts - if has_unmet_servicable_demand?(state) do + if strategy_module.has_unmet_servicable_demand?(state, strategy_opts) do state |> maybe_on_grow_end(pid, {:exit, reason}) |> schedule_async_boot_runner() @@ -805,13 +812,6 @@ defmodule FLAME.Pool do state end - defp has_unmet_servicable_demand?(%Pool{} = state) do - runner_count = runner_count(state) + pending_count(state) - - waiting_count(state) > map_size(state.pending_runners) * state.max_concurrency and - runner_count < state.max - end - defp handle_runner_async_up(%Pool{} = state, pid, ref) when is_pid(pid) and is_reference(ref) do %{^ref => task_pid} = state.pending_runners Process.demonitor(ref, [:flush]) diff --git a/lib/flame/pool/per_runner_max_concurrency_strategy.ex b/lib/flame/pool/per_runner_max_concurrency_strategy.ex index 779591e..9d61be3 100644 --- a/lib/flame/pool/per_runner_max_concurrency_strategy.ex +++ b/lib/flame/pool/per_runner_max_concurrency_strategy.ex @@ -61,6 +61,11 @@ defmodule FLAME.Pool.PerRunnerMaxConcurrencyStrategy do Pool.runner_count(pool) + Pool.pending_count(pool) + 1 end + def has_unmet_servicable_demand?(%Pool{} = pool, _opts) do + runner_count = Pool.runner_count(pool) + Pool.pending_count(pool) + Pool.waiting_count(pool) > 0 and runner_count < pool.max + end + defp min_runner(pool) do if map_size(pool.runners) == 0 do nil diff --git a/lib/flame/pool/strategy.ex b/lib/flame/pool/strategy.ex index accb66f..79f9a36 100644 --- a/lib/flame/pool/strategy.ex +++ b/lib/flame/pool/strategy.ex @@ -21,5 +21,5 @@ defmodule FLAME.Pool.Strategy do ) :: Pool.t() - @callback desired_count(state :: Pool.t(), opts :: Keyword.t()) :: non_neg_integer() + @callback has_unmet_servicable_demand?(state :: Pool.t(), opts :: Keyword.t()) :: boolean() end From 0270d43aa7c5ce5466aaaa936bf4e8b456d431dd Mon Sep 17 00:00:00 2001 From: Nick Dichev Date: Mon, 19 Aug 2024 15:50:32 -0700 Subject: [PATCH 12/12] Add desired_count/2 to the behavior, it was forgotten to be added --- lib/flame/pool/strategy.ex | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/flame/pool/strategy.ex b/lib/flame/pool/strategy.ex index 79f9a36..7389d86 100644 --- a/lib/flame/pool/strategy.ex +++ b/lib/flame/pool/strategy.ex @@ -21,5 +21,7 @@ defmodule FLAME.Pool.Strategy do ) :: Pool.t() + @callback desired_count(state :: Pool.t(), opts :: Keyword.t()) :: non_neg_integer() + @callback has_unmet_servicable_demand?(state :: Pool.t(), opts :: Keyword.t()) :: boolean() end