Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

* Configures CI to run on pull request.

* Add `Kaffe.MessageHandler` behaviour. To utilize it, add the behaviour to your configured `message_handler` and `@impl Kaffe.MessageHandler` on `handle_messages/1`.

### Fixes

* Stops compiler warnings on duplicate doc definitions
Expand Down
22 changes: 9 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,7 @@ An opinionated, highly specific, Elixir wrapper around [Brod](https://github.com
end
```

2. Ensure `kaffe` is started with your application:

```elixir
def application do
[applications: [:logger, :kaffe]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a dumb question, but is this no longer needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is super outdated.

Elixir as 1.4 infers which applications to include based on your deps. See https://elixir-lang.org/blog/2017/01/05/elixir-v1-4-0-released/

There's now the extra_applications tag, which is only needed when you use something that is not in the deps list (typically things from erlang, like :ssl or :logger).

end
```

3. Configure a Kaffe Consumer and/or Producer
2. Configure a Kaffe Consumer and/or Producer

## Kaffe Consumer Usage

Expand All @@ -61,14 +53,15 @@ There is also legacy support for single message consumers, which process one mes

### Kaffe GroupMember - Batch Message Consumer

1. Define a `handle_messages/1` function in the provided module.

`handle_messages/1` This function (note the pluralization) will be called with a *list of messages*, with each message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata.
1. Define a `handle_messages/1` function in the provided module implementing the `Kaffe.MessageHandler` behaviour.

The module's `handle_messages/1` function _must_ return `:ok` or Kaffe will throw an error. The Kaffe consumer will block until your `handle_messages/1` function returns `:ok`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be helpful to have this kind of comment somewhere else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the README is a good place for it, unless we want to make a separate Getting Started.md or similar. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the README is a fine place for it. A Getting Started.md file might be a good idea, but can probably be a separate PR if we do it

`handle_messages/1` will be called with a *list of messages*, with each message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata.

```elixir
defmodule MessageProcessor do
@behaviour Kaffe.MessageHandler

@impl Kaffe.MessageHandler
def handle_messages(messages) do
for %{key: key, value: value} = message <- messages do
IO.inspect message
Expand Down Expand Up @@ -154,6 +147,9 @@ Example:

```elixir
defmodule MessageProcessor do
@behaviour Kaffe.MessageHandler

@impl Kaffe.MessageHandler
def handle_messages(messages) do
for %{key: key, value: value} = message <- messages do
IO.inspect message
Expand Down
12 changes: 12 additions & 0 deletions lib/kaffe/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ defmodule Kaffe.Consumer do
@kafka Application.compile_env(:kaffe, :kafka_mod, :brod)
@group_subscriber Application.compile_env(:kaffe, :group_subscriber_mod, :brod_group_subscriber)

# See kafka_message in "brod/include/brod.hrl"
@type message() :: %{
key: binary(),
value: binary(),
topic: binary(),
offset: non_neg_integer(),
partition: non_neg_integer(),
ts: non_neg_integer(),
ts_type: :undefined | :create | :append,
headers: list()
}

require Record
import Record, only: [defrecord: 2, extract: 2]
defrecord :kafka_message, extract(:kafka_message, from_lib: "brod/include/brod.hrl")
Expand Down
12 changes: 6 additions & 6 deletions lib/kaffe/consumer_group/subscriber/subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ defmodule Kaffe.Subscriber do

The subscriber reads the following options out of the configuration:

- `max_bytes` - The maximum number of message bytes to receive in a batch
- `min_bytes` - The minimum number of message bytes to receive in a batch
- `max_wait_time` - Maximum number of milliseconds broker will wait for `:min_bytes` of messages
to be collected
- `offset_reset_policy` - The native `auto.offset.reset` option,
either `:reset_to_earliest` or `:reset_to_latest`.
- `max_bytes` - The maximum number of message bytes to receive in a batch
- `min_bytes` - The minimum number of message bytes to receive in a batch
- `max_wait_time` - Maximum number of milliseconds broker will wait for `:min_bytes` of messages
to be collected
- `offset_reset_policy` - The native `auto.offset.reset` option,
either `:reset_to_earliest` or `:reset_to_latest`.

See: https://github.com/klarna/brucke/blob/master/src/brucke_member.erl
Also: https://github.com/klarna/brod/blob/master/src/brod_consumer.erl
Expand Down
24 changes: 10 additions & 14 deletions lib/kaffe/consumer_group/worker/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ defmodule Kaffe.Worker do
@moduledoc """
A worker receives messages for a single topic for a single partition.

Processing the message set is delegated to the configured message handler. It's
responsible for any error handling as well. The message handler must define a
`handle_messages` function (*note* the pluralization!) to accept a list of messages.

The result of `handle_messages` is sent back to the subscriber. Additionally, the
message handler should inform the subscriber on what to do with the offsets after
processing the message set.
Processing the message set is delegated to the configured message
handler (See `Kaffe.MessageHandler`). The result of `handle_messages`
is sent back to the subscriber. Additionally, the message handler should
inform the subscriber on what to do with the offsets after processing the
message set.
"""

use GenServer
Expand Down Expand Up @@ -39,13 +37,11 @@ defmodule Kaffe.Worker do
{:ok, %{message_handler: message_handler, worker_name: worker_name}}
end

@doc """
Entry point for processing a message set received by a subscriber.

Note that the response from the message handler is what dictates how a
subscriber should deal with the message offset. Depending on the situation,
a message processor may not want to have it's most recent offsets committed.
"""
# Entry point for processing a message set received by a subscriber.
#
# Note that the response from the message handler is what dictates how a
# subscriber should deal with the message offset. Depending on the situation,
# a message processor may not want to have it's most recent offsets committed.
@impl GenServer
def handle_cast(
{:process_messages, subscriber_pid, topic, partition, generation_id, messages},
Expand Down
49 changes: 49 additions & 0 deletions lib/kaffe/message_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
defmodule Kaffe.MessageHandler do
@moduledoc """
The behaviour for a message handler.

The module implementing this behaviour needs to be configured for the consumer
under Kaffe config.

```
config :kaffe,
consumers: %{
"subscriber_1" => [
...
message_handler: MyApp.MessageHandler
]
}
```
"""
alias Kaffe.Consumer

@doc """
The functionality responsible for handling the message set from Kaffe.

Each message will include the topic and partition in addition to the normal Kafka
message metadata.

The response from the message handler is what dictates how a
subscriber should deal with the message offset. Depending on the situation,
a message processor may not want to have its most recent offsets committed.

In some cases you may not want to commit back the most recent offset after
processing a list of messages. For example, if you're batching messages to be
sent elsewhere and want to ensure that a batch can be rebuilt should there be
an error further downstream. In that example you might want to keep the offset
of the first message in your batch so your consumer can restart back at that point
to reprocess and rebatch the messages.

The following returns are supported:
- `:ok` - commit back the most recent offset and request more messages
- `{:ok, :no_commit}` - do _not_ commit back the most recent offset and
request more messages from the offset of the last message
- `{:ok, offset}` - commit back at the offset specified and request
messages from that point forward

Because the return types are only success based, the message handler needs
to handle errors.
"""
@callback handle_messages(messages :: list(Consumer.message())) ::
:ok | {:ok, :no_commit} | {:ok, offset :: :brod.offset()}
end