diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 450d914f..3f4fa369 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,9 +16,11 @@ jobs: fail-fast: false matrix: include: - # Earliest-supported versions. - - elixir: "1.7.4" - otp: "21.3.8.17" + # Minimum supported versions on ubuntu-22.04. + # OTP 24.2 is the earliest available on ubuntu-22.04. + # Elixir 1.12 is the earliest version compatible with OTP 24. + - elixir: "1.12" + otp: "24.2" # Latest versions. - elixir: "1.18" @@ -34,6 +36,7 @@ jobs: with: otp-version: ${{matrix.otp}} elixir-version: ${{matrix.elixir}} + version-type: strict - name: Cache Mix dependencies uses: actions/cache@v3 diff --git a/lib/broadway/process.ex b/lib/broadway/process.ex new file mode 100644 index 00000000..1539ff4a --- /dev/null +++ b/lib/broadway/process.ex @@ -0,0 +1,20 @@ +defmodule Broadway.Process do + @moduledoc false + + # TODO: Remove this module once we require Elixir 1.17+. + # Process.set_label/1 was added in Elixir 1.17.0. + + if function_exported?(Process, :set_label, 1) do + def set_label(label) do + Process.set_label(label) + end + + def labels_supported?, do: true + else + def set_label(_label) do + :ok + end + + def labels_supported?, do: false + end +end diff --git a/lib/broadway/topology/batch_processor_stage.ex b/lib/broadway/topology/batch_processor_stage.ex index 25a532ee..5a36af99 100644 --- a/lib/broadway/topology/batch_processor_stage.ex +++ b/lib/broadway/topology/batch_processor_stage.ex @@ -29,6 +29,8 @@ defmodule Broadway.Topology.BatchProcessorStage do producer: args[:producer] } + Broadway.Process.set_label({:broadway_batch_processor, state.topology_name, state.partition}) + {:consumer, state, []} end diff --git a/lib/broadway/topology/batcher_stage.ex b/lib/broadway/topology/batcher_stage.ex index dd54266e..2c37eeb5 100644 --- a/lib/broadway/topology/batcher_stage.ex +++ b/lib/broadway/topology/batcher_stage.ex @@ -45,6 +45,8 @@ defmodule Broadway.Topology.BatcherStage do context: args[:context] } + Broadway.Process.set_label({:broadway_batcher, state.topology_name, state.batcher}) + {:producer_consumer, state, dispatcher: dispatcher} end diff --git a/lib/broadway/topology/processor_stage.ex b/lib/broadway/topology/processor_stage.ex index e0e1014f..5cdc862b 100644 --- a/lib/broadway/topology/processor_stage.ex +++ b/lib/broadway/topology/processor_stage.ex @@ -33,6 +33,10 @@ defmodule Broadway.Topology.ProcessorStage do producer: args[:producer] } + Broadway.Process.set_label( + {:broadway_processor, state.topology_name, state.processor_key, state.partition} + ) + case type do :consumer -> {:consumer, state, []} diff --git a/lib/broadway/topology/producer_stage.ex b/lib/broadway/topology/producer_stage.ex index 3b74231a..1d2395b7 100644 --- a/lib/broadway/topology/producer_stage.ex +++ b/lib/broadway/topology/producer_stage.ex @@ -61,6 +61,9 @@ defmodule Broadway.Topology.ProducerStage do rate_limiting: rate_limiting_state } + topology_name = args[:broadway][:name] + Broadway.Process.set_label({:broadway_producer, topology_name, index}) + case module.init(arg) do {:producer, module_state} -> {:producer, %{state | module_state: module_state}, dispatcher: dispatcher} diff --git a/test/broadway/topology/batch_processor_stage_test.exs b/test/broadway/topology/batch_processor_stage_test.exs new file mode 100644 index 00000000..7205130d --- /dev/null +++ b/test/broadway/topology/batch_processor_stage_test.exs @@ -0,0 +1,32 @@ +defmodule Broadway.Topology.BatchProcessorStageTest do + use ExUnit.Case, async: true + + test "sets process label with topology name and partition" do + topology_name = :test_topology + partition = 3 + + {:ok, pid} = + Broadway.Topology.BatchProcessorStage.start_link( + [ + topology_name: topology_name, + name: :test_batch_processor, + module: __MODULE__, + context: %{}, + terminator: __MODULE__, + resubscribe: :never, + partition: partition, + batcher: :test_batcher, + producer: nil + ], + [] + ) + + if Broadway.Process.labels_supported?() do + label = Process.info(pid, :label) + assert label == {:label, {:broadway_batch_processor, topology_name, partition}} + else + # Labels not supported in this Elixir version, skip assertion + :ok + end + end +end diff --git a/test/broadway/topology/batcher_stage_test.exs b/test/broadway/topology/batcher_stage_test.exs index 2a72e7a2..ce917e0e 100644 --- a/test/broadway/topology/batcher_stage_test.exs +++ b/test/broadway/topology/batcher_stage_test.exs @@ -22,4 +22,34 @@ defmodule Broadway.Topology.BatcherStageTest do %{state: state} = :sys.get_state(pid) assert state.subscription_options[:max_demand] == 123 end + + test "sets process label with topology name and batcher key" do + topology_name = :test_topology + batcher_key = :my_batcher + + {:ok, pid} = + Broadway.Topology.BatcherStage.start_link( + [ + topology_name: topology_name, + name: :test_batcher, + context: %{}, + terminator: __MODULE__, + resubscribe: :never, + batcher: batcher_key, + processors: [:some_processor], + batch_size: 10, + batch_timeout: 1000, + partition: batcher_key + ], + [] + ) + + if Broadway.Process.labels_supported?() do + label = Process.info(pid, :label) + assert label == {:label, {:broadway_batcher, topology_name, batcher_key}} + else + # Labels not supported in this Elixir version, skip assertion + :ok + end + end end diff --git a/test/broadway/topology/processor_stage_test.exs b/test/broadway/topology/processor_stage_test.exs index c16ec88d..ec2091ec 100644 --- a/test/broadway/topology/processor_stage_test.exs +++ b/test/broadway/topology/processor_stage_test.exs @@ -22,4 +22,38 @@ defmodule Broadway.Topology.ProcessorStageTest do assert state.subscription_options[:min_demand] == 3 assert state.subscription_options[:max_demand] == 6 end + + test "sets process label with topology name, processor key, and partition" do + topology_name = :test_topology + processor_key = :default + partition = 5 + + {:ok, pid} = + Broadway.Topology.ProcessorStage.start_link( + [ + topology_name: topology_name, + name: :test_processor, + module: __MODULE__, + context: %{}, + type: :consumer, + terminator: __MODULE__, + resubscribe: :never, + processor_config: [min_demand: 1, max_demand: 10], + processor_key: processor_key, + producers: [:sample], + partition: partition, + batchers: :none, + producer: nil + ], + [] + ) + + if Broadway.Process.labels_supported?() do + label = Process.info(pid, :label) + assert label == {:label, {:broadway_processor, topology_name, processor_key, partition}} + else + # Labels not supported in this Elixir version, skip assertion + :ok + end + end end diff --git a/test/broadway/topology/producer_stage_test.exs b/test/broadway/topology/producer_stage_test.exs index bbf65dde..6d4621d8 100644 --- a/test/broadway/topology/producer_stage_test.exs +++ b/test/broadway/topology/producer_stage_test.exs @@ -184,4 +184,27 @@ defmodule Broadway.Topology.ProducerStageTest do assert ProducerStage.terminate(:normal, state) == :ok end end + + test "sets process label with topology name and index" do + topology_name = :test_topology + index = 2 + + args = %{ + module: {FakeProducer, []}, + broadway: [name: topology_name, index: index], + transformer: nil, + dispatcher: GenStage.DemandDispatcher, + rate_limiter: nil + } + + {:ok, pid} = ProducerStage.start_link(args, index) + + if Broadway.Process.labels_supported?() do + label = Process.info(pid, :label) + assert label == {:label, {:broadway_producer, topology_name, index}} + else + # Labels not supported in this Elixir version, skip assertion + :ok + end + end end