Skip to content

Conversation

@elenagaljak-db
Copy link
Collaborator

@elenagaljak-db elenagaljak-db commented Dec 29, 2025

What changes are proposed in this pull request?

Adds alternative ingestion API methods (ingest_record_v2() and ingest_records_v2()) that return logical offset IDs directly instead of wrapped in Futures. Includes new wait_for_offset() method to explicitly wait for acknowledgment of specific offsets. Refactors flush() and wait_for_offset() to share common waiting logic.

How is this tested?

Tests Added:

  • test_ingest_record_v2_single_record - Basic single record ingestion
  • test_ingest_record_v2_multiple_records - Multiple sequential records (10 records)
  • test_ingest_records_v2_batch - Batch ingestion (3 records in one batch)
  • test_ingest_records_v2_empty_batch - Empty batch edge case (returns None)
  • test_mixed_v1_v2_api_usage - Mixing v1 and v2 APIs (backward compatibility)
  • test_ingest_record_v2_with_json - JSON record type with v2 API
  • test_ingest_records_v2_with_json_batch - JSON batch with v2 API
  • test_ingest_record_v2_after_close_fails - Error handling after close
  • test_ingest_records_v2_after_close_fails - Batch error handling after close
  • test_ingest_record_v2_blocks_on_inflight_limit - Backpressure/blocking behavior

Signed-off-by: elenagaljak-db <elena.galjak@databricks.com>
@elenagaljak-db elenagaljak-db marked this pull request as ready for review December 29, 2025 17:02
@elenagaljak-db elenagaljak-db self-assigned this Dec 29, 2025
@elenagaljak-db elenagaljak-db linked an issue Dec 29, 2025 that may be closed by this pull request

### API Changes

- Added `ingest_record_v2()` method to `ZerobusStream` for immediate offset return without Future wrapping
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So maybe something that we can directly pair up with wait_for_offset.

Eg ingest_record_async_offset or ingest_record_offset_ack.
ingest_record_offset_ack indicates that you need to pair this with wait_for_offset so I would vote for that.

Signed-off-by: elenagaljak-db <elena.galjak@databricks.com>
now, now
);
let offset_id = stream.ingest_record_v2(json_record_2).await.unwrap();
println!("Record ingested with offset Id: {}", offset_id);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we call it "record ingested" if only after ack we can say that the record was ingested?
Should be "Record sent with offset id: "

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment on lines 70 to 71
// Example 2: Using v2 API, ingest_record_v2 returns offset immediately.
let json_record_2 = format!(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is not in line with Example 1 – there, we define the json record first and then have the comment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

if let Some(offset) = offset {
if offset >= offset_to_wait {
info!(stream_id = %stream_id, "Stream is caught up to the given offset. Flushing complete.");
info!(stream_id = %stream_id, "Stream is caught up to the given offset. {} complete.", operation_name);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Waiting for acknowledgement completed"?

Signed-off-by: elenagaljak-db <elena.galjak@databricks.com>
/// # Ok(())
/// # }
/// ```
pub async fn ingest_record_v2(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's then just rename this to what Atomic proposed, mark previously existing methods as deprecated and change all the examples/documentation to primarily show this new API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Task] Introduce ingest methods which don't return futures

5 participants