Skip to content

Conversation

@edgarrmondragon
Copy link
Collaborator

@edgarrmondragon edgarrmondragon commented Dec 3, 2025

Summary by Sourcery

Introduce a modern, type-safe sink loading architecture while preserving backwards compatibility with existing dict-based sink implementations.

New Features:

  • Add load_batch and load_record entry points as the recommended modern APIs for batch and record sinks, alongside legacy process_* methods.
  • Introduce BatchContext as a strongly-typed batch context object, including helpers to convert to and from legacy dict contexts.
  • Provide pluggable batch accumulation strategies (count, time window, size, hybrid, and no-batch) via a new BatchStrategy abstraction.
  • Expose new sink protocols (IRecordLoader, IBatchLoader, IPreprocessable, IVersionable, IBatchable) for clearer, typed interfaces and extension points.
  • Extend the CSV target sink to demonstrate the modern load_batch pattern using BatchContext.

Enhancements:

  • Delay SQL sink table preparation to the first batch rather than during setup to avoid data loss when multiple schema messages arrive for the same stream.
  • Export the new BatchContext, batch strategy classes, and sink protocol interfaces from the sinks package for easier consumption.
  • Improve sink base class docs and behavior so that subclasses must override either the modern load_* methods or the legacy process_* methods, with clearer error messages and guidance.

Tests:

  • Add tests covering modern vs legacy load methods, BatchContext conversion, and error behavior when sinks fail to override any loading method.

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Dec 3, 2025

Reviewer's Guide

Refactors sink loading architecture to introduce modern, type-safe load_record/load_batch APIs alongside legacy methods, adds pluggable batch strategies and batch context types, defers SQL table creation to first batch, and updates CSV target and tests to exercise the new behavior and compatibility guarantees.

Sequence diagram for modern vs legacy record loading in RecordSink

sequenceDiagram
    actor Tap
    participant Framework as Framework
    participant Base as RecordSink
    participant Modern as ModernRecordSink
    participant Legacy as LegacyRecordSink

    Tap->>Framework: emit_record(record)
    Framework->>Modern: process_record(record, context)
    alt Modern sink overrides load_record
        Modern->>Base: process_record(record, context)
        Base->>Base: type(self).load_record is not RecordSink.load_record
        Base->>Modern: load_record(record)
        Modern-->>Framework: record persisted via modern path
    else Legacy sink does not override load_record
        Framework->>Legacy: process_record(record, context)
        Legacy->>Base: process_record(record, context)
        Base->>Base: type(self).load_record is RecordSink.load_record
        Base->>Base: raise NotImplementedError
    end
Loading

Sequence diagram for modern vs legacy batch loading in BatchSink

sequenceDiagram
    actor Tap
    participant Framework as Framework
    participant Base as BatchSink
    participant Modern as ModernBatchSink
    participant Legacy as LegacyBatchSink

    Tap->>Framework: emit_batch(context)
    Framework->>Modern: process_batch(context)
    alt Modern sink overrides load_batch
        Modern->>Base: process_batch(context)
        Base->>Base: type(self).load_batch is not BatchSink.load_batch
        Base->>Base: batch = BatchContext.from_legacy_dict(context)
        Base->>Modern: load_batch(batch)
        Modern-->>Framework: batch persisted via modern path
    else Legacy sink does not override load_batch
        Framework->>Legacy: process_batch(context)
        Legacy->>Base: process_batch(context)
        Base->>Base: type(self).load_batch is BatchSink.load_batch
        Base->>Base: raise NotImplementedError
    end
Loading

File-Level Changes

Change Details Files
Introduce a modern, type-safe batch loading API and context while maintaining backward compatibility with legacy dict-based batch sinks.
  • Extend BatchSink with a new load_batch(batch: BatchContext) method that defaults to delegating to process_batch with a legacy dict for backwards compatibility.
  • Add typed BatchContext creation and storage in _get_context, and provide a helper _get_batch_context_typed plus conversion between BatchContext and legacy dict contexts.
  • Change process_batch to route to an overridden load_batch when present and raise NotImplementedError if neither load_batch nor process_batch is overridden, with updated docstrings describing legacy vs modern patterns.
singer_sdk/sinks/batch.py
singer_sdk/sinks/batch_context.py
Introduce a modern per-record loading API for record sinks while keeping the legacy process_record path available.
  • Add load_record(record: dict) to RecordSink that by default calls legacy process_record with an empty context to preserve existing behavior.
  • Update process_record to call an overridden load_record when present and raise NotImplementedError if neither method is overridden.
  • Expand RecordSink docstrings to document legacy vs modern usage and the routing behavior between the two methods.
singer_sdk/sinks/record.py
Expose new sink-related types, strategies, and protocols from the sinks package to support the redesigned architecture.
  • Export BatchContext, batch strategy classes, and sink protocols via the sinks package all and imports.
  • Add protocols for record loaders, batch loaders, preprocessable sinks, versionable sinks, and batch lifecycle hooks to formalize sink interfaces.
singer_sdk/sinks/__init__.py
singer_sdk/sinks/protocols.py
singer_sdk/sinks/batch_strategies.py
Adjust SQLSink to defer table preparation until the first batch to avoid data loss on multiple schema messages.
  • Add a _table_prepared flag to SQLSink to track whether the table has been created.
  • Change setup() to only prepare the schema; move table preparation logic into start_batch, guarded by the _table_prepared flag.
singer_sdk/sql/sink.py
Update the CSV target sink to use the new BatchContext-based load_batch API and demonstrate modern batch usage.
  • Change CSVSink to import and accept BatchContext and implement load_batch instead of process_batch using the typed batch.records.
  • Keep a commented legacy process_batch implementation as reference for old-style usage.
packages/target-csv/target_csv/sink.py
Add tests covering new load_* methods, BatchContext conversion, and error behavior when sinks do not override required methods.
  • Create dummy modern and legacy batch and record sinks to verify load_batch/load_record routing and compatibility.
  • Test BatchContext.from_legacy_dict and to_legacy_dict round-trip behavior, and ensure base sinks raise NotImplementedError if neither modern nor legacy methods are overridden.
tests/core/sinks/test_load_methods.py

Possibly linked issues

  • #bug: SQL prepare_table runs on setup in stead of drain: PR changes SQLSink to prepare tables at first batch instead of setup, resolving multi-schema data loss bug.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@read-the-docs-community
Copy link

Documentation build overview

📚 Meltano SDK | 🛠️ Build #30545828 | 📁 Comparing de2fdaf against latest (26b30fa)


🔍 Preview build

Show files changed (4 files in total): 📝 4 modified | ➕ 0 added | ➖ 0 deleted
File Status
genindex.html 📝 modified
classes/singer_sdk.BatchSink.html 📝 modified
classes/singer_sdk.RecordSink.html 📝 modified
classes/singer_sdk.sql.SQLSink.html 📝 modified

@codecov
Copy link

codecov bot commented Dec 3, 2025

Codecov Report

❌ Patch coverage is 54.10959% with 67 lines in your changes missing coverage. Please review.
✅ Project coverage is 92.73%. Comparing base (26b30fa) to head (de2fdaf).

Files with missing lines Patch % Lines
singer_sdk/sinks/batch_strategies.py 37.77% 56 Missing ⚠️
singer_sdk/sinks/batch.py 77.77% 4 Missing ⚠️
singer_sdk/sinks/batch_context.py 88.00% 3 Missing ⚠️
singer_sdk/sinks/record.py 40.00% 2 Missing and 1 partial ⚠️
singer_sdk/sql/sink.py 80.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3400      +/-   ##
==========================================
- Coverage   93.90%   92.73%   -1.17%     
==========================================
  Files          69       72       +3     
  Lines        5774     5918     +144     
  Branches      716      726      +10     
==========================================
+ Hits         5422     5488      +66     
- Misses        248      320      +72     
- Partials      104      110       +6     
Flag Coverage Δ
core 80.97% <52.05%> (-0.76%) ⬇️
end-to-end 75.51% <52.05%> (-0.92%) ⬇️
optional-components 43.44% <41.09%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@codspeed-hq
Copy link

codspeed-hq bot commented Dec 3, 2025

CodSpeed Performance Report

Merging #3400 will not alter performance

Comparing refactor/new-sink-arch (de2fdaf) with main (a876bda)1

Summary

✅ 8 untouched

Footnotes

  1. No successful run was found on main (26b30fa) during the generation of this report, so a876bda was used instead as the comparison base. There might be some changes unrelated to this pull request in this report.

@edgarrmondragon edgarrmondragon changed the title WIP: Redesisgn sink architecture WIP: Redesign sink architecture Dec 3, 2025
@edgarrmondragon edgarrmondragon added this to the v1.0 Release milestone Dec 3, 2025
@edgarrmondragon edgarrmondragon self-assigned this Dec 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants