-
Notifications
You must be signed in to change notification settings - Fork 11
Add abstractions for tiered storage of stream data #196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add abstractions for tiered storage of stream data #196
Conversation
src/osiris.erl
Outdated
| -type retention_fun() :: fun((IdxFile :: file:filename_all()) -> boolean()). | ||
| -type retention_spec() :: | ||
| {max_bytes, non_neg_integer()} | {max_age, milliseconds()}. | ||
| {max_bytes, non_neg_integer()} | | ||
| {max_age, milliseconds()} | | ||
| {'fun', retention_fun()}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The retention spec part of this PR could probably be ported out on its own. It could also be used to provide a feature unrelated to tiered storage like rabbitmq/rabbitmq-server#14413 - retention up to an offset
| -spec overview(file:filename_all()) -> | ||
| {range(), [{epoch(), offset()}]}. | ||
| overview(). | ||
| overview(Dir) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change to return a map from overview/1 is a big (but boring) part of the diff - it means lots of changes to osiris_log_SUITE. I think returning a map is the right API here but we will probably need to be careful with it since it's a breaking change. The way the return type is changed now would probably cause errors in mixed-version clusters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part (1cb566e) can be dropped actually. We only need to adjust the osiris_log:range() to start a replica further along in the log. That resets quite a lot of the diff.
Dropping these changes reduced the diff from +596 −293 to +364 −99.
5f5ed04 to
6ce7e82
Compare
6ce7e82 to
d579d06
Compare
This change introduces a behaviour `osiris_log_reader` which can be implemented externally to read from a stream at a given offset spec. This closes over the high-level reading operations `send_file/3` and `chunk_iterator/3`. `osiris:init_reader/4` selects the reader module based on application env, and then callers use `osiris_log_reader` to interact with the reader. By default all of these functions delegate to `osiris_log`. `osiris_log` doesn't need any meaningful changes this way. The only change is to expose the `header_map()` type.
d579d06 to
02ca62b
Compare
This can be used flexibly to evaluate retention depending on the name or contents of index files. You pass in a function which returns a tuple with the index files split into two lists: to delete and to keep. This could be used as a way to truncate everything up to an offset or to guarantee that an offset (for example an uncommitted one) won't be truncated. Since these files are sorted, some retention functions could operate just on the names (deriving the offset of the segment with `erlang:binary_to_integer/1`).
02ca62b to
09e7e09
Compare
| -type retention_fun() :: fun((IdxFiles :: [file:filename_all()]) -> | ||
| {ToDelete :: [file:filename_all()], ToKeep :: [file:filename_all()]}). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The force pushes were to rebase and modify this retention spec. Before it was
-type retention_fun() :: fun((IdxFile :: file:filename_all()) -> boolean()).Going through the sorted set of index files, you could return true to delete index+segment file pairs and false to stop evaluating the retention function.
For retention where you want to keep some offset, though, this meant that you had to read the index file contents and a chunk from the segment too, since you don't know the final offset in the segment just from the index file. Passing the full list of index files lets us determine which files to keep and which to delete just from the index file names.
Here is where it's used in rabbitmq_stream_s3 to clean up any segments which have been fully uploaded to the remote tier and are not the current segment: https://github.com/amazon-mq/rabbitmq-stream-s3/blob/57d0a0a93bdb7d3da1e6f03f828e0dfce31019a6/src/rabbitmq_stream_s3_log_manifest.erl#L968-L1069
This change refactors `parse_header/2` to take the chunk header binary and the position at which it was read and return a `header_map()`. This is useful for other readers - so that they do not need to duplicate the binary match code and `next_position` calculation.
`osiris_log_manifest` adds a few callbacks which are executed during initialization of writers and at events like rolling segments and writing chunks.
09e7e09 to
c78cfc0
Compare
This is an evolution to #194 with a different approach to adding the necessary hooks for #184. It's quite different so I wanted to make a new PR to preserve the old history.
The main difference is that the "log reader" part is at a higher level. Instead of
file-like functions with callbacks likeopen/1,pread/3andclose/1we have callbacks more like what's already inosiris_log:init_offset_reader/2,send_file/3,chunk_iterator/3anditerator_next/1. @kjnilsson suggested something higher level when we last discussed the reader. It's much more natural than wrapping file operations after rebasing on #192. Implementors have more responsibilities, but it makes for much less invasive changes intoosiris_logand it doesn't need many changes for callers: just search-and-replaceosiris_logwithosiris_log_reader(see here). It also makes the manifest behaviour less complex and invasive. The manifest mainly provides hooks into events like whenever a chunk is written or a segment file rolled over. It also has hooks into writer and acceptor start-up and closing.Use of these abstractions for S3 tiered storage can be found in amazon-mq/rabbitmq-stream-s3#2 - not everything works well yet but it shows a good sketch of using the reader and manifest behaviors.