diff --git a/CHANGELOG.md b/CHANGELOG.md index 701488c..39c3209 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ * Configures CI to run on pull request. +* Add `Kaffe.MessageHandler` behaviour. To utilize it, add the behaviour to your configured `message_handler` and `@impl Kaffe.MessageHandler` on `handle_messages/1`. + ### Fixes * Stops compiler warnings on duplicate doc definitions diff --git a/README.md b/README.md index e58b817..664d552 100644 --- a/README.md +++ b/README.md @@ -39,15 +39,7 @@ An opinionated, highly specific, Elixir wrapper around [Brod](https://github.com end ``` - 2. Ensure `kaffe` is started with your application: - - ```elixir - def application do - [applications: [:logger, :kaffe]] - end - ``` - - 3. Configure a Kaffe Consumer and/or Producer + 2. Configure a Kaffe Consumer and/or Producer ## Kaffe Consumer Usage @@ -61,14 +53,15 @@ There is also legacy support for single message consumers, which process one mes ### Kaffe GroupMember - Batch Message Consumer -1. Define a `handle_messages/1` function in the provided module. - - `handle_messages/1` This function (note the pluralization) will be called with a *list of messages*, with each message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata. +1. Define a `handle_messages/1` function in the provided module implementing the `Kaffe.MessageHandler` behaviour. - The module's `handle_messages/1` function _must_ return `:ok` or Kaffe will throw an error. The Kaffe consumer will block until your `handle_messages/1` function returns `:ok`. + `handle_messages/1` will be called with a *list of messages*, with each message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata. ```elixir defmodule MessageProcessor do + @behaviour Kaffe.MessageHandler + + @impl Kaffe.MessageHandler def handle_messages(messages) do for %{key: key, value: value} = message <- messages do IO.inspect message @@ -154,6 +147,9 @@ Example: ```elixir defmodule MessageProcessor do + @behaviour Kaffe.MessageHandler + + @impl Kaffe.MessageHandler def handle_messages(messages) do for %{key: key, value: value} = message <- messages do IO.inspect message diff --git a/lib/kaffe/consumer.ex b/lib/kaffe/consumer.ex index 884dd65..2724490 100644 --- a/lib/kaffe/consumer.ex +++ b/lib/kaffe/consumer.ex @@ -18,6 +18,18 @@ defmodule Kaffe.Consumer do @kafka Application.compile_env(:kaffe, :kafka_mod, :brod) @group_subscriber Application.compile_env(:kaffe, :group_subscriber_mod, :brod_group_subscriber) + # See kafka_message in "brod/include/brod.hrl" + @type message() :: %{ + key: binary(), + value: binary(), + topic: binary(), + offset: non_neg_integer(), + partition: non_neg_integer(), + ts: non_neg_integer(), + ts_type: :undefined | :create | :append, + headers: list() + } + require Record import Record, only: [defrecord: 2, extract: 2] defrecord :kafka_message, extract(:kafka_message, from_lib: "brod/include/brod.hrl") diff --git a/lib/kaffe/consumer_group/subscriber/subscriber.ex b/lib/kaffe/consumer_group/subscriber/subscriber.ex index 218bda2..68593b3 100644 --- a/lib/kaffe/consumer_group/subscriber/subscriber.ex +++ b/lib/kaffe/consumer_group/subscriber/subscriber.ex @@ -12,12 +12,12 @@ defmodule Kaffe.Subscriber do The subscriber reads the following options out of the configuration: - - `max_bytes` - The maximum number of message bytes to receive in a batch - - `min_bytes` - The minimum number of message bytes to receive in a batch - - `max_wait_time` - Maximum number of milliseconds broker will wait for `:min_bytes` of messages - to be collected - - `offset_reset_policy` - The native `auto.offset.reset` option, - either `:reset_to_earliest` or `:reset_to_latest`. + - `max_bytes` - The maximum number of message bytes to receive in a batch + - `min_bytes` - The minimum number of message bytes to receive in a batch + - `max_wait_time` - Maximum number of milliseconds broker will wait for `:min_bytes` of messages + to be collected + - `offset_reset_policy` - The native `auto.offset.reset` option, + either `:reset_to_earliest` or `:reset_to_latest`. See: https://github.com/klarna/brucke/blob/master/src/brucke_member.erl Also: https://github.com/klarna/brod/blob/master/src/brod_consumer.erl diff --git a/lib/kaffe/consumer_group/worker/worker.ex b/lib/kaffe/consumer_group/worker/worker.ex index 73ea2c1..59c0485 100644 --- a/lib/kaffe/consumer_group/worker/worker.ex +++ b/lib/kaffe/consumer_group/worker/worker.ex @@ -2,13 +2,11 @@ defmodule Kaffe.Worker do @moduledoc """ A worker receives messages for a single topic for a single partition. - Processing the message set is delegated to the configured message handler. It's - responsible for any error handling as well. The message handler must define a - `handle_messages` function (*note* the pluralization!) to accept a list of messages. - - The result of `handle_messages` is sent back to the subscriber. Additionally, the - message handler should inform the subscriber on what to do with the offsets after - processing the message set. + Processing the message set is delegated to the configured message + handler (See `Kaffe.MessageHandler`). The result of `handle_messages` + is sent back to the subscriber. Additionally, the message handler should + inform the subscriber on what to do with the offsets after processing the + message set. """ use GenServer @@ -39,13 +37,11 @@ defmodule Kaffe.Worker do {:ok, %{message_handler: message_handler, worker_name: worker_name}} end - @doc """ - Entry point for processing a message set received by a subscriber. - - Note that the response from the message handler is what dictates how a - subscriber should deal with the message offset. Depending on the situation, - a message processor may not want to have it's most recent offsets committed. - """ + # Entry point for processing a message set received by a subscriber. + # + # Note that the response from the message handler is what dictates how a + # subscriber should deal with the message offset. Depending on the situation, + # a message processor may not want to have it's most recent offsets committed. @impl GenServer def handle_cast( {:process_messages, subscriber_pid, topic, partition, generation_id, messages}, diff --git a/lib/kaffe/message_handler.ex b/lib/kaffe/message_handler.ex new file mode 100644 index 0000000..674b83b --- /dev/null +++ b/lib/kaffe/message_handler.ex @@ -0,0 +1,49 @@ +defmodule Kaffe.MessageHandler do + @moduledoc """ + The behaviour for a message handler. + + The module implementing this behaviour needs to be configured for the consumer + under Kaffe config. + + ``` + config :kaffe, + consumers: %{ + "subscriber_1" => [ + ... + message_handler: MyApp.MessageHandler + ] + } + ``` + """ + alias Kaffe.Consumer + + @doc """ + The functionality responsible for handling the message set from Kaffe. + + Each message will include the topic and partition in addition to the normal Kafka + message metadata. + + The response from the message handler is what dictates how a + subscriber should deal with the message offset. Depending on the situation, + a message processor may not want to have its most recent offsets committed. + + In some cases you may not want to commit back the most recent offset after + processing a list of messages. For example, if you're batching messages to be + sent elsewhere and want to ensure that a batch can be rebuilt should there be + an error further downstream. In that example you might want to keep the offset + of the first message in your batch so your consumer can restart back at that point + to reprocess and rebatch the messages. + + The following returns are supported: + - `:ok` - commit back the most recent offset and request more messages + - `{:ok, :no_commit}` - do _not_ commit back the most recent offset and + request more messages from the offset of the last message + - `{:ok, offset}` - commit back at the offset specified and request + messages from that point forward + + Because the return types are only success based, the message handler needs + to handle errors. + """ + @callback handle_messages(messages :: list(Consumer.message())) :: + :ok | {:ok, :no_commit} | {:ok, offset :: :brod.offset()} +end