-
Notifications
You must be signed in to change notification settings - Fork 84
refactor: Move stream state manipulation into a state management class #3377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Reviewer's GuideEncapsulates all stream state handling into a new StreamStateManager class and refactors Stream to delegate state-related logic to it, centralizing replication, partition, and progress marker behavior while updating constants, a sample tap, and tests accordingly. Sequence diagram for record processing with StreamStateManagersequenceDiagram
actor Operator
participant Tap
participant Stream
participant StreamStateManager as StateManager
participant StateWriter
Operator->>Tap: run()
Tap->>Stream: _sync_records()
loop For each record
Stream->>Tap: _generate_record_messages(record)
Tap-->>Stream: record_messages
Stream->>Tap: write_message(record_message)
Stream->>StateManager: increment_state(latest_record, context, replication_key, is_sorted, check_sorted)
Stream->>StateManager: is_flushed = False
end
par Periodic or end-of-partition
Stream->>Stream: _finalize_state(state)
Stream->>StateManager: finalize_state(state)
and At safe checkpoint or end-of-stream
Stream->>Stream: _write_state_message()
Stream->>StateManager: is_flushed?
alt state not flushed
Stream->>StateManager: stream_state
Stream->>StateWriter: write_state(tap_state)
Stream->>StateManager: is_flushed = True
end
end
Class diagram for Stream and StreamStateManager refactorclassDiagram
class Stream {
+tap
+name
+_tap_state
+_state_partitioning_keys
+_state_manager
+_sync_costs
+child_streams
+replication_method
+replication_key
+config
+selected
+partitions
+get_starting_replication_key_value(context)
+get_starting_timestamp(context)
+_write_replication_key_signpost(context, value)
+_write_starting_replication_value(context)
+get_replication_key_signpost(context)
+get_context_state(context) dict
+stream_state dict
+_increment_stream_state(latest_record, context, treat_as_sorted)
+_write_state_message()
+_abort_sync(abort_reason)
+reset_state_progress_markers(state)
+_finalize_state(state)
+finalize_state_progress_markers(state)
+_sync_records()
+_get_state_partition_context(context)
}
class StreamStateManager {
+tap_name
+stream_name
+tap_state
+state_partitioning_keys
+is_flushed
+stream_state dict
+get_context_state(context) dict
+get_state_partition_context(context) Context
+write_starting_replication_value(context, replication_method, replication_key, config, compare_start_date_fn)
+write_replication_key_signpost(context, value)
+increment_state(latest_record, context, replication_key, is_sorted, check_sorted)
+finalize_state(state)
+reset_progress_markers(state, partitions)
+finalize_progress_markers(state, partitions)
+get_state_partitions() list~dict~
+get_starting_replication_value(context, replication_method)
+is_state_non_resumable(context) bool
+log_sort_error(ex, current_context, record_count, partition_record_count)
}
class TapState {
<<interface>>
}
Stream --> StreamStateManager : uses
StreamStateManager --> TapState : tap_state
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3377 +/- ##
==========================================
+ Coverage 94.07% 94.31% +0.23%
==========================================
Files 69 70 +1
Lines 5777 5842 +65
Branches 716 724 +8
==========================================
+ Hits 5435 5510 +75
+ Misses 239 234 -5
+ Partials 103 98 -5
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
CodSpeed Performance ReportMerging #3377 will not alter performanceComparing Summary
|
338e868 to
61e574a
Compare
Documentation build overview
No files changed. |
cb124b6 to
3c48308
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `singer_sdk/streams/_state.py:243-244` </location>
<code_context>
+ """
+ if state is None or state == {}:
+ # Reset all partition states
+ partition_list = partitions or []
+ for partition_context in partition_list or [{}]:
+ partition_state = self.get_context_state(partition_context or None)
+ reset_state_progress_markers(partition_state)
</code_context>
<issue_to_address>
**nitpick:** The two-step `partition_list` + `partition_list or [{}]` pattern is a bit confusing and can be simplified.
In `reset_progress_markers`, you set `partition_list = partitions or []` and then iterate over `partition_list or [{}]`. While correct, the nested `or` is non-obvious. Consider either iterating directly over `partitions or [{}]` or explicitly building the default list when `partitions` is `None`, so the fallback-to-`[{}]` behavior is clearer to readers.
</issue_to_address>
### Comment 2
<location> `tests/core/test_stream_state_manager.py:86-95` </location>
<code_context>
+class TestGetContextState:
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for `get_context_state` when `state_partitioning_keys` is set and multiple partitions are present.
Please extend `TestGetContextState` with cases that cover the `state_partitioning_keys` behavior:
1. When `state_partitioning_keys = ['tenant_id']`, two contexts that share `tenant_id` but differ on other keys should map to the same partition entry and not create duplicates.
2. Contexts with different `tenant_id` values should create two distinct entries in `tap_state['bookmarks']['test_stream']['partitions']`.
This will guard against regressions in partition-key filtering logic as `get_context_state` delegates to `get_state_partition_context`.
</issue_to_address>
### Comment 3
<location> `tests/core/test_stream_state_manager.py:464-140` </location>
<code_context>
+ assert len(partitions) == 2
+
+
+class TestGetStartingReplicationValue:
+ """Tests for get_starting_replication_value method."""
+
+ def test_full_table_returns_none(self, tap_state: types.TapState):
+ """Test that FULL_TABLE replication returns None."""
+ tap_state["bookmarks"] = {
+ "test_stream": {"replication_key_value": "2021-01-01"}
+ }
+ manager = StreamStateManager(
+ tap_name="test-tap",
+ stream_name="test_stream",
+ tap_state=tap_state,
+ )
+ result = manager.get_starting_replication_value(None, REPLICATION_FULL_TABLE)
+ assert result is None
+
+ def test_incremental_returns_value(self, tap_state: types.TapState):
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding context/partition-based tests for `get_starting_replication_value`.
Existing tests cover FULL_TABLE vs INCREMENTAL and top-level stream state, but not the partitioned case where values come from partition state.
Please add tests that:
- Populate `tap_state['bookmarks']['test_stream']['partitions']` with different `starting_replication_value`s per partition.
- Call `get_starting_replication_value(context=...)` for multiple partitions and assert it returns the correct value for each.
- Assert it returns `None` for a context with no existing partition state.
This will verify that partition-aware behavior and `get_context_state` usage remain correct as state handling evolves.
</issue_to_address>
### Comment 4
<location> `tests/core/test_stream_state_manager.py:259-268` </location>
<code_context>
+class TestFinalizeProgressMarkers:
</code_context>
<issue_to_address>
**suggestion (testing):** Add coverage for `finalize_progress_markers` when `state` already contains partitions.
Right now we only cover:
- `state=None` with stream-level bookmarks and `is_flushed=False`, and
- a non-empty `state` without checking partition-level behavior.
Please add a test where `tap_state` already has `bookmarks['test_stream']['partitions']` populated (with `progress_markers`), and you call `finalize_progress_markers(state=None, partitions=[...])`. The test should assert that each partition’s `progress_markers` is promoted to `replication_key`/`replication_key_value` on that partition and that `is_flushed` is set to `False`, mirroring the partition tests for `reset_progress_markers`.
Suggested implementation:
```python
class TestFinalizeProgressMarkers:
"""Tests for finalize_progress_markers method."""
def test_finalize_with_no_state_uses_stream_state(
self,
state_manager: StreamStateManager,
tap_state: types.TapState,
):
"""Test finalize_progress_markers with no state creates stream bookmark."""
state_manager.finalize_progress_markers()
assert "bookmarks" in tap_state
def test_finalize_with_existing_partition_progress_markers_promotes_to_partition_state(
self,
state_manager: StreamStateManager,
tap_state: types.TapState,
):
"""When partitions already have progress_markers, finalize should promote them.
This mirrors the partition behavior tested for reset_progress_markers, but for
finalize_progress_markers with state=None and explicit partitions.
"""
stream_name = "test_stream"
# Pre-populate tap_state with partition-level progress_markers
tap_state.setdefault("bookmarks", {}).setdefault(stream_name, {})
tap_state["bookmarks"][stream_name]["partitions"] = [
{
"partition": {"id": "A"},
"progress_markers": {
"replication_key": "updated_at",
"replication_key_value": "2021-05-17T20:41:16Z",
"is_flushed": True,
},
},
{
"partition": {"id": "B"},
"progress_markers": {
"replication_key": "updated_at",
"replication_key_value": "2021-05-18T10:15:00Z",
"is_flushed": True,
},
},
]
partitions = [
p_state["partition"]
for p_state in tap_state["bookmarks"][stream_name]["partitions"]
]
# Act: finalize using the explicit partitions and no separate state dict
state_manager.finalize_progress_markers(state=None, partitions=partitions)
# Assert: each partition's progress_markers are promoted and is_flushed is False
partition_states = tap_state["bookmarks"][stream_name]["partitions"]
# Build a quick lookup from partition id to its state for stable assertions
partition_by_id = {p_state["partition"]["id"]: p_state for p_state in partition_states}
for partition_id, expected_value in {
"A": "2021-05-17T20:41:16Z",
"B": "2021-05-18T10:15:00Z",
].items():
partition_state = partition_by_id[partition_id]
# progress_markers should be removed after finalize
assert "progress_markers" not in partition_state
# replication key/value should be promoted to the partition state
assert partition_state["replication_key"] == "updated_at"
assert partition_state["replication_key_value"] == expected_value
# finalize_progress_markers should mark these as not yet flushed
assert partition_state["is_flushed"] is False
```
If the existing partition-level tests for `reset_progress_markers` use a different partition structure (e.g. a different key than `"id"` under `"partition"` or additional required fields), you should align the newly added test's partition dictionaries with that structure. Similarly, if the stream name used elsewhere in the tests differs from `"test_stream"`, update `stream_name` in the new test accordingly to match the existing conventions and fixtures.
</issue_to_address>
### Comment 5
<location> `singer_sdk/streams/_state.py:152-154` </location>
<code_context>
def write_starting_replication_value(
self,
context: types.Context | None,
replication_method: str,
replication_key: str | None,
config: dict,
compare_start_date_fn: t.Callable[[str, str], str] | None = None,
) -> None:
"""Write the starting replication value, if available.
Args:
context: Stream partition or context dictionary.
replication_method: The replication method for the stream.
replication_key: The replication key for the stream.
config: Stream configuration containing start_date if applicable.
compare_start_date_fn: Optional function to compare bookmark value
with start_date and return the most recent.
"""
if replication_method == REPLICATION_FULL_TABLE:
self._logger.debug(
"Stream '%s' is not configured for incremental replication. "
"Not writing starting replication value.",
self.stream_name,
)
return
value = None
state = self.get_context_state(context)
if replication_key:
replication_key_value = state.get("replication_key_value")
if replication_key_value and replication_key == state.get(
"replication_key",
):
value = replication_key_value
# Use start_date if it is more recent than the replication_key state
start_date_value: str | None = config.get("start_date")
if start_date_value:
if not value:
value = start_date_value
elif compare_start_date_fn:
value = compare_start_date_fn(value, start_date_value)
self._logger.info(
"Starting incremental sync of '%s' with bookmark value: %s",
self.stream_name,
value,
)
write_starting_replication_value(state, value)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
```suggestion
if start_date_value := config.get("start_date"):
```
</issue_to_address>
### Comment 6
<location> `tests/core/test_stream_state_manager.py:664` </location>
<code_context>
def test_empty_value_does_nothing(
self,
state_manager: StreamStateManager,
tap_state: dict,
):
"""Test that empty value does not write signpost."""
state_manager.write_replication_key_signpost(None, "")
assert tap_state == {}
</code_context>
<issue_to_address>
**suggestion (code-quality):** Replaces an empty collection equality with a boolean operation ([`simplify-empty-collection-comparison`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-empty-collection-comparison/))
```suggestion
assert not tap_state
```
</issue_to_address>
### Comment 7
<location> `tests/core/test_stream_state_manager.py:673` </location>
<code_context>
def test_none_value_does_nothing(
self,
state_manager: StreamStateManager,
tap_state: dict,
):
"""Test that None value does not write signpost."""
state_manager.write_replication_key_signpost(None, None)
assert tap_state == {}
</code_context>
<issue_to_address>
**suggestion (code-quality):** Replaces an empty collection equality with a boolean operation ([`simplify-empty-collection-comparison`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-empty-collection-comparison/))
```suggestion
assert not tap_state
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
3c48308 to
113ae7e
Compare
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Summary by Sourcery
Extract stream state handling into a dedicated StreamStateManager class and update Stream to delegate all state-related operations to it.
Enhancements:
Tests: