Skip to content

Conversation

@siiddhantt
Copy link
Contributor

Implements parallel WAL log processing for PostgreSQL CDC to improve throughput and reduce latency during change data capture operations.

The implementation uses transaction batching to maintain ACID properties while allowing independent transactions to be processed in parallel. A configurable worker pool processes changes concurrently, with each worker handling complete transactions sequentially to preserve ordering within transactions.

Fixes #553

Type of change

  • New feature (non-breaking change which adds functionality)
  • Bug fix (non-breaking change which fixes an issue)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

Implementation Details

Configuration: New optional WALWorkerCount field in PostgreSQL CDC config (defaults to 1 for backward compatibility)

Architecture:

  • Transaction batching: Groups all changes within a PostgreSQL transaction
  • Parallel workers: Process independent transactions concurrently via errgroup
  • Bounded channels: Natural backpressure to prevent memory exhaustion
  • Thread-safe relation map: sync.RWMutex for concurrent reads from multiple workers

Code Changes:

  • pkg/waljs/pgoutput.go - Core parallel implementation (~300 lines added)
  • pkg/waljs/types.go - Added WorkerCount config field
  • drivers/postgres/internal/config.go - Added WALWorkerCount JSON field
  • drivers/postgres/internal/cdc.go - Passed config through to waljs
  • pkg/waljs/pgoutput_test.go - 4 unit tests (all passing)

How Has This Been Tested?

  • Unit tests (4 tests - all passing)
  • Backward compatibility (sequential path still works with workerCount=1)
  • Concurrent processing verification
  • Transaction ordering verification

Benchmark Results

Performance measured with realistic workload (100µs processing per record):

Workers Records/sec Speedup
1 (sequential) 1,857 1.0x
2 (parallel) 3,725 2.01x
4 (parallel) 7,158 3.85x

Test Evidence

✓ Unit tests pass without database
✓ Integration tests pass with PostgreSQL (docker-compose)
✓ Data integrity verified: 1,000 records with zero duplicates/missing records
✓ Full codebase compiles: go build ./...

Documentation

  • N/A (backward compatible feature, uses existing config pattern)

Related PRs

None

@siiddhantt siiddhantt changed the base branch from master to staging December 18, 2025 13:45
@nayanj98
Copy link
Collaborator

nayanj98 commented Jan 9, 2026

@siiddhantt Hey sorry for the delay on assigning a reviewer for your work. Can you please add a working demo showing your changes work by running a sync using OLake.

@siiddhantt
Copy link
Contributor Author

Hi @nayanj98 no worries, I'll attach a video demo for it or message you in Slack with it.
Meanwhile could you assign the issue #553 to me, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

improvement: parallelize pgoutput wal log processing

2 participants