Skip to content

A CDC pipeline to sync changes between postgres and opensearch via debezium and kafka

Notifications You must be signed in to change notification settings

diivi/postgres-cdc-os

Repository files navigation

Postgres CDC to OpenSearch Sync

A production-ready Change Data Capture (CDC) system that synchronizes PostgreSQL data to OpenSearch in real-time using Debezium and Kafka, with full reconciliation capabilities.

Quick Start

πŸ—οΈ Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              β”‚      β”‚              β”‚      β”‚              β”‚
β”‚  PostgreSQL  │─────▢│   Debezium   │─────▢│    Kafka     β”‚
β”‚              β”‚ CDC  β”‚  Connector   β”‚      β”‚              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                                     β”‚
                                                     β”‚ Events
                                                     β–Ό
                      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                      β”‚        Sync Worker                  β”‚
                      β”‚  - Consumes CDC events              β”‚
                      β”‚  - Batches updates (50/2s)          β”‚
                      β”‚  - Deduplicates project IDs         β”‚
                      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                     β”‚
                                     β–Ό
                      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                      β”‚        OpenSearch                   β”‚
                      β”‚  - Real-time search index           β”‚
                      β”‚  - Nested user/hashtag queries      β”‚
                      β”‚  - Full-text search with fuzzy      β”‚
                      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                     β–²
                                     β”‚
                      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                      β”‚                              β”‚
           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
           β”‚    API Server       β”‚    β”‚  Reconciliation Worker   β”‚
           β”‚  - Search projects  β”‚    β”‚  - Daily full reindex    β”‚
           β”‚  - Filter by user   β”‚    β”‚  - Zero-downtime         β”‚
           β”‚  - Filter by tag    β”‚    β”‚  - Index retention       β”‚
           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

✨ Features

Real-time Synchronization

  • CDC-based updates: Captures all database changes via Debezium
  • Smart batching: Configurable batch size (default 50) and interval (default 2s)
  • Deduplication: Automatically deduplicates multiple updates to the same project
  • Immediate deletes: Delete operations bypass batching for instant removal

Search Capabilities

  • Full-text search with fuzzy matching
  • Multi-field queries (slug, name, description)
  • Nested queries for users and hashtags
  • Pagination with configurable limits
  • Sorting by date, score, name, or slug
  • Date range filtering
  • Custom field selection

Reconciliation

  • Daily full reindex (default: 3 AM)
  • Zero-downtime switching via atomic alias updates
  • Index versioning with dated indices (e.g., projects-2026-01-24)
  • Automatic cleanup of old indices (7-day retention)
  • Retry logic with exponential backoff

πŸ“¦ Project Structure

.
β”œβ”€β”€ cmd/
β”‚   β”œβ”€β”€ server/                    # API server
β”‚   β”‚   └── main.go
β”‚   β”œβ”€β”€ workers/
β”‚   β”‚   β”œβ”€β”€ sync-worker/          # CDC sync worker
β”‚   β”‚   β”‚   └── main.go
β”‚   β”‚   └── reconciliation-worker/ # Daily reconciliation
β”‚   β”‚       └── main.go
β”‚   └── tools/
β”‚       └── mapping-json/         # Utility to print mapping
β”‚           └── main.go
β”œβ”€β”€ internal/
β”‚   β”œβ”€β”€ config/                   # Configuration management
β”‚   β”œβ”€β”€ database/                 # PostgreSQL client
β”‚   β”œβ”€β”€ dto/                      # Request/Response DTOs
β”‚   β”œβ”€β”€ errors/                   # Custom error types
β”‚   β”œβ”€β”€ handlers/                 # HTTP handlers
β”‚   β”œβ”€β”€ kafka/                    # Kafka consumer
β”‚   β”œβ”€β”€ models/                   # Domain models
β”‚   β”œβ”€β”€ opensearch/               # OpenSearch client
β”‚   β”œβ”€β”€ processor/                # CDC message processor
β”‚   β”œβ”€β”€ repository/               # Repository interfaces & implementations
β”‚   └── service/                  # Business logic layer
β”œβ”€β”€ pkg/
β”‚   └── constants/                # Application constants
β”œβ”€β”€ postgres/
β”‚   β”œβ”€β”€ migrations/               # Database schema
β”‚   └── seeds/                    # Seed data
β”œβ”€β”€ opensearch/
β”‚   └── init-index.sh            # Index initialization
β”œβ”€β”€ scripts/
β”‚   β”œβ”€β”€ setup.sh                 # Main setup script
β”‚   β”œβ”€β”€ setup-debezium.sh        # Debezium connector setup
β”‚   └── bulk-load.sh             # Bulk load utility
β”œβ”€β”€ docker-compose.yml           # All services
└── README.md

πŸš€ Quick Start

Prerequisites

  • Docker & Docker Compose
  • Go 1.24+ (for local development)

Setup

  1. Clone the repository
git clone <repo-url>
cd postgres-cdc-os
  1. Make scripts executable
chmod +x ./scripts/*.sh
chmod +x ./opensearch/init-index.sh
  1. Start all services
./scripts/setup.sh

This will:

  • Start PostgreSQL, Kafka, Debezium, and OpenSearch
  • Run database migrations
  • Seed initial data
  • Create OpenSearch index
  • Configure Debezium connector
  • Start sync worker, reconciliation worker, and API server
  1. Verify the setup
# Health check
curl http://localhost:8080/health

# Search projects
curl "http://localhost:8080/api/v1/projects/search?q=project"

πŸ”Œ API Endpoints

Search Projects

GET /api/v1/projects/search?q=<query>&fuzzy=true&limit=20&offset=0&sort_by=created_at&sort_order=desc

Query Parameters:

  • q (required): Search query
  • fuzzy (optional): Enable fuzzy matching (default: true)
  • operator (optional): and or or (default: or)
  • fields (optional): Fields to search (default: slug, name, description)
  • limit (optional): Results per page (default: 20, max: 100)
  • offset (optional): Pagination offset (max: 10000)
  • sort_by (optional): Sort field (created_at, updated_at, name, slug, _score)
  • sort_order (optional): asc or desc (default: desc)
  • date_from (optional): Filter by created date from (ISO 8601)
  • date_to (optional): Filter by created date to (ISO 8601)

Example:

curl "http://localhost:8080/api/v1/projects/search?q=awesome&fuzzy=true&sort_by=created_at&sort_order=desc"

Get Projects by User

GET /api/v1/projects/by-user/:userId?limit=20&offset=0

Example:

curl http://localhost:8080/api/v1/projects/by-user/1

Get Projects by Hashtag(s)

# Single or multiple hashtags (query parameter - comma-separated)
GET /api/v1/projects/by-hashtag?hashtags=hashtag1,hashtag2,hashtag3&match=any&limit=20&offset=0

# Multiple hashtags (query parameter - multiple values)
GET /api/v1/projects/by-hashtag?hashtags=golang&hashtags=rust&hashtags=nodejs&match=all&limit=20&offset=0

Query Parameters:

  • hashtags (required): Comma-separated hashtag names or multiple query parameters
  • match (optional): Match mode - "any" (OR logic, default) or "all" (AND logic)
  • limit (optional): Results per page (default: 100, max: 100)
  • offset (optional): Offset for pagination (default: 0)

Examples:

# Single hashtag
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang"

# Multiple hashtags - ANY match (OR logic, default)
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang,rust,nodejs"
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang,rust,nodejs&match=any"

# Multiple hashtags - ALL match (AND logic)
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang,rust,nodejs&match=all"

# Multiple hashtags (multiple query params) with ALL match
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang&hashtags=rust&match=all"

# With pagination
curl "http://localhost:8080/api/v1/projects/by-hashtag?hashtags=golang,rust&match=any&limit=10&offset=0"

Match Modes:

  • match=any (default): Returns projects that have any of the specified hashtags (OR logic)
  • match=all: Returns projects that have all of the specified hashtags (AND logic)

Health Check

GET /health

βš™οΈ Configuration

Configuration is managed via environment variables:

# Database
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_DB=projectsdb

# Kafka
KAFKA_BROKER=localhost:9092

# OpenSearch
OPENSEARCH_URL=http://localhost:9200
OPENSEARCH_INDEX=projects

# API Server
PORT=8080

# Worker Settings
BATCH_SIZE=50
BATCH_INTERVAL=2s

πŸ§ͺ Testing

This project includes API integration testing and end-to-end testing of the CDC flow.

Quick Start

Run E2E tests:

cd e2e && ./run-tests.sh

Run API tests:

go test -v ./tests/

Test Coverage

  • API Tests: Various search and filter endpoints tested via a dedicated test index
  • E2E Tests: Complete CDC flow with 15 comprehensive scenarios including:
    • Create/Update/Delete operations
    • Junction table operations
    • Related table updates triggering reindexing
    • API search functionality
    • Cascade deletes
    • Bulk operations

Manual Testing

1. Insert a new project

INSERT INTO projects (name, slug, description) 
VALUES ('Test Project', 'test-project', 'A test project');

2. Search for it (should appear within 2 seconds)

curl "http://localhost:8080/api/v1/projects/search?q=test-project"

3. Update the project

UPDATE projects SET description = 'Updated description' 
WHERE slug = 'test-project';

4. Add users to the project

INSERT INTO user_projects (user_id, project_id)
SELECT 1, id FROM projects WHERE slug = 'test-project';

5. Search by user

curl http://localhost:8080/api/v1/projects/by-user/1

6. Delete the project

DELETE FROM projects WHERE slug = 'test-project';

πŸ—οΈ Design Decisions

Batching Strategy

  • Why: Reduces OpenSearch load and improves throughput
  • How: Accumulates project IDs for 2 seconds or until batch size (50) is reached
  • Benefit: Can handle bursts of updates efficiently

Deduplication

  • Why: Multiple related tables (users, hashtags) can trigger updates for the same project
  • How: Uses a map to track pending project IDs
  • Benefit: Prevents redundant reindexing of the same project

Immediate Deletes

  • Why: Ensures deleted projects don't appear in search results
  • How: Bypasses batching for delete operations
  • Benefit: Consistent user experience

Reconciliation Worker

  • Why: Handles edge cases where CDC might miss events
  • How: Daily full reindex with zero-downtime alias switching
  • Benefit: Guarantees eventual consistency

Repository Pattern

  • Why: Separates data access logic from business logic
  • How: Interfaces define contracts, implementations are swappable
  • Benefit: Testable, maintainable, follows SOLID principles

πŸ“Š Performance Characteristics

  • Sync latency: < 2 seconds (configurable)
  • Batch size: 50 projects (configurable)
  • Reconciliation time: ~2-5 minutes for 10K projects
  • Search latency: < 50ms for most queries
  • Throughput: ~1000 updates/second with default batching

πŸ“ Development

Run locally

# API Server
go run cmd/server/main.go

# Sync Worker
go run cmd/workers/sync-worker/main.go

# Reconciliation Worker
go run cmd/workers/reconciliation-worker/main.go

Build Docker images

docker build -f Dockerfile.server -t cdc-api-server .
docker build -f Dockerfile.sync-worker -t cdc-sync-worker .
docker build -f Dockerfile.reconciliation-worker -t cdc-reconciliation-worker .

View OpenSearch mapping

go run cmd/tools/mapping-json/main.go | jq .

Terraform to provision infrastructure

cd terraform
terraform init
terraform plan -out=tfplan
terraform apply tfplan

About

A CDC pipeline to sync changes between postgres and opensearch via debezium and kafka

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published