diff --git a/lib/mxpanel/batcher.ex b/lib/mxpanel/batcher.ex index 5940cf1..4135a14 100644 --- a/lib/mxpanel/batcher.ex +++ b/lib/mxpanel/batcher.ex @@ -60,6 +60,11 @@ defmodule Mxpanel.Batcher do type: :pos_integer, doc: "The size of the pool of event buffers. Defaults to `System.schedulers_online()`." ], + graceful_shutdown: [ + type: :boolean, + doc: "Gracefully flush all in-flight messages before terminating.", + default: true + ], flush_interval: [ type: :pos_integer, doc: "Interval in milliseconds which the event buffer are processed.", diff --git a/lib/mxpanel/batcher/buffer.ex b/lib/mxpanel/batcher/buffer.ex index c709661..266b1b0 100644 --- a/lib/mxpanel/batcher/buffer.ex +++ b/lib/mxpanel/batcher/buffer.ex @@ -18,6 +18,7 @@ defmodule Mxpanel.Batcher.Buffer do :client, :flush_interval, :flush_jitter, + :graceful_shutdown, :retry_max_attempts, :retry_base_backoff, :import_timeout, @@ -48,6 +49,7 @@ defmodule Mxpanel.Batcher.Buffer do client: client, flush_interval: opts[:flush_interval], flush_jitter: opts[:flush_jitter], + graceful_shutdown: opts[:graceful_shutdown], retry_max_attempts: opts[:retry_max_attempts], retry_base_backoff: opts[:retry_base_backoff], import_timeout: opts[:import_timeout], @@ -56,6 +58,10 @@ defmodule Mxpanel.Batcher.Buffer do endpoint: opts[:endpoint] } + if state.graceful_shutdown do + Process.flag(:trap_exit, true) + end + Manager.register(batcher_name, state.endpoint) {:ok, state, {:continue, :schedule_flush}} @@ -154,4 +160,16 @@ defmodule Mxpanel.Batcher.Buffer do Process.send_after(self(), :flush, flush_interval + jitter) end + + @impl true + @doc "Flush all events before shutting down unless cleanup: false" + def terminate(_reason, %State{graceful_shutdown: false}) do + :ok + end + + @impl true + def terminate(_reason, state) do + flush(state) + :ok + end end diff --git a/test/mxpanel/batcher_test.exs b/test/mxpanel/batcher_test.exs index 583cb99..b6eee38 100644 --- a/test/mxpanel/batcher_test.exs +++ b/test/mxpanel/batcher_test.exs @@ -72,6 +72,7 @@ defmodule Mxpanel.BatcherTest do token: "token", pool_size: 5, flush_interval: @one_year, + graceful_shutdown: false, http_client: {HTTPClientMock, []}} ) @@ -136,6 +137,7 @@ defmodule Mxpanel.BatcherTest do token: "token", telemetry_buffers_info_interval: 1, flush_interval: @one_year, + graceful_shutdown: false, pool_size: 10, http_client: {HTTPClientMock, []}} ) @@ -384,6 +386,40 @@ defmodule Mxpanel.BatcherTest do Batcher.drain_buffers(name) end + + test "flushes all queues on graceful shutdown" do + name = gen_name() + + pid = + start_supervised!( + {Batcher, + name: name, + token: "token", + pool_size: 1, + telemetry_buffers_info_interval: 1, + http_client: {HTTPClientMock, []}, + flush_interval: 100, + flush_jitter: 100} + ) + + expect(HTTPClientMock, :request, 3, fn :post, url, headers, body, opts -> + {:ok, %{body: "1", headers: [], status: 200}} + end) + + for i <- 1..3 do + Batcher.enqueue(name, build_track_operation("signup", "#{i}")) + end + + for i <- 1..3 do + Batcher.enqueue(name, build_engage_operation("#{i}")) + end + + for i <- 1..3 do + Batcher.enqueue(name, build_groups_operation("Company", "#{i}")) + end + + assert :ok = Supervisor.stop(pid) + end end defp build_track_operation(event, distinct_id) do