Convert indexer tables to TimescaleDB hypertables#486
Convert indexer tables to TimescaleDB hypertables#486aditya1702 wants to merge 54 commits intoopxdr-bytea-2from
Conversation
… tests) Bring in BYTEA type definitions, indexer changes, GraphQL schema/resolver updates, processor changes, and test files from opxdr-bytea-2 branch. These files had no modifications on the timescale branch.
…hemas Change hash, account_id, operation_xdr, and address columns from TEXT to BYTEA type while preserving all TimescaleDB hypertable configuration, composite primary keys, and chunk settings.
… statechanges) Convert hash, account_id, operation_xdr, and address columns from TEXT to BYTEA in BatchInsert, BatchCopy, and query methods. Uses HashBytea, AddressBytea, and pgtypeBytesFromNullAddressBytea for type conversions. Preserves TimescaleDB junction table patterns (ledger_created_at columns, composite primary keys, parameter numbering).
Adopt BYTEA types (HashBytea, AddressBytea, XDRBytea, NullAddressBytea) in test data while preserving TimescaleDB-specific patterns: - Keep ledger_created_at in junction table INSERTs - Use generic "duplicate key value violates unique constraint" assertions (TimescaleDB chunk-based constraint names differ from standard PG) - Keep 5-value return from processLedgersInBatch (includes startTime/endTime)
Rename inner err to addrErr in address BYTEA conversion loops to avoid shadowing the outer err variable from pgx.CopyFrom.
There was a problem hiding this comment.
Pull request overview
This PR converts the indexer tables (transactions, operations, state_changes, and their account junction tables) to TimescaleDB hypertables with columnstore compression for efficient time-series data storage. The changes include significant schema modifications, removal of foreign keys (which aren't supported on hypertables), updates to primary keys to include the partitioning column (ledger_created_at), and implementation of automatic chunk compression for historical backfills. Additionally, the RPC service health tracking functionality has been removed, with health checks now performed on-demand.
Changes:
- Convert transactions, operations, and state_changes tables to TimescaleDB 3.0+ hypertables with ledger_created_at as the partition column
- Add automatic chunk compression logic for backfilled historical data
- Remove RPC health tracking background service, replacing it with on-demand health checks
- Update all primary keys and ON CONFLICT clauses to include ledger_created_at
- Upgrade container images from PostgreSQL 14 to TimescaleDB 2.25.0
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/db/migrations/2025-06-10.2-transactions.sql | Converts transactions and transactions_accounts to TimescaleDB hypertables with new primary keys |
| internal/db/migrations/2025-06-10.3-operations.sql | Converts operations and operations_accounts to TimescaleDB hypertables with segmentation |
| internal/db/migrations/2025-06-10.4-statechanges.sql | Converts state_changes to TimescaleDB hypertable, removes FK constraint |
| internal/data/transactions.go | Updates BatchInsert/BatchCopy to include ledger_created_at in junction table inserts |
| internal/data/operations.go | Updates BatchInsert/BatchCopy to include ledger_created_at in junction table inserts |
| internal/data/statechanges.go | Updates ON CONFLICT clause to match new composite primary key |
| internal/data/query_utils.go | Fixes column parsing to properly handle comma-separated column lists |
| internal/data/*_test.go | Updates test data inserts to include ledger_created_at in junction tables |
| internal/services/ingest_backfill.go | Adds compressBackfilledChunks function to compress historical data chunks |
| internal/services/rpc_service.go | Removes TrackRPCServiceHealth method and related documentation |
| internal/services/rpc_service_test.go | Removes tests for TrackRPCServiceHealth functionality |
| internal/integrationtests/infrastructure/helpers.go | Replaces heartbeat channel with direct health polling |
| internal/integrationtests/infrastructure/main_setup.go | Removes health tracking goroutine startup |
| internal/integrationtests/infrastructure/containers.go | Updates DB container to use TimescaleDB latest |
| internal/db/dbtest/dbtest.go | Adds TimescaleDB extension enablement for test databases |
| docker-compose.yaml | Updates image to TimescaleDB 2.25.0 and adds extension creation |
| .github/workflows/go.yaml | Updates CI image to TimescaleDB 2.25.0 |
Comments suppressed due to low confidence (1)
internal/db/migrations/2025-06-10.2-transactions.sql:48
- The migration file is dated 2025-06-10, which is in the future (current date is February 11, 2026). This suggests the migration was created in June 2025. However, there are later migrations (2025-12-14, 2026-01-12, 2026-01-15, 2026-01-16) that already exist. This appears to be correct chronologically.
More critically, the TimescaleDB declarative syntax (WITH tsdb.hypertable, tsdb.partition_column, etc.) is only supported in TimescaleDB 3.0+, which was released in November 2024. The docker-compose.yaml specifies timescale/timescaledb:2.25.0-pg17, which is TimescaleDB 2.x and does not support this syntax. This migration will fail.
You should either:
- Update to TimescaleDB 3.x (e.g., timescale/timescaledb:latest-pg17), OR
- Use the traditional SELECT create_hypertable() function-based syntax for TimescaleDB 2.x compatibility
-- +migrate Up
-- Table: transactions (TimescaleDB hypertable with columnstore)
CREATE TABLE transactions (
ledger_created_at TIMESTAMPTZ NOT NULL,
to_id BIGINT NOT NULL,
hash BYTEA NOT NULL,
envelope_xdr TEXT,
fee_charged BIGINT NOT NULL,
result_code TEXT NOT NULL,
meta_xdr TEXT,
ledger_number INTEGER NOT NULL,
is_fee_bump BOOLEAN NOT NULL DEFAULT false,
ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (ledger_created_at, to_id)
) WITH (
tsdb.hypertable,
tsdb.partition_column = 'ledger_created_at',
tsdb.chunk_interval = '1 day',
tsdb.orderby = 'ledger_created_at DESC'
);
CREATE INDEX idx_transactions_hash ON transactions(hash);
CREATE INDEX idx_transactions_to_id ON transactions(to_id);
-- Table: transactions_accounts (TimescaleDB hypertable for automatic cleanup with retention)
CREATE TABLE transactions_accounts (
ledger_created_at TIMESTAMPTZ NOT NULL,
tx_to_id BIGINT NOT NULL,
account_id BYTEA NOT NULL,
PRIMARY KEY (ledger_created_at, account_id, tx_to_id)
) WITH (
tsdb.hypertable,
tsdb.partition_column = 'ledger_created_at',
tsdb.chunk_interval = '1 day',
tsdb.orderby = 'ledger_created_at DESC',
tsdb.sparse_index = 'bloom(account_id)'
);
CREATE INDEX idx_transactions_accounts_tx_to_id ON transactions_accounts(tx_to_id);
CREATE INDEX idx_transactions_accounts_account_id ON transactions_accounts(account_id);
-- +migrate Down
-- Tables
DROP TABLE IF EXISTS transactions_accounts CASCADE;
DROP TABLE IF EXISTS transactions CASCADE;
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| healthRes, err := rpcService.GetHealth() | ||
| if err == nil { | ||
| if healthRes.Status == "healthy" { | ||
| return nil |
There was a problem hiding this comment.
The WaitForRPCHealthAndRun function has a critical bug: after successfully checking RPC health (lines 60-65), it returns nil immediately, but never calls the onReady callback function that was passed as a parameter. This breaks the intended workflow where onReady should be executed after the RPC service becomes healthy.
The function should call onReady() before returning nil when the health check succeeds, similar to the original implementation that was removed in rpc_service_test.go.
| return nil | |
| return onReady() |
| POSTGRES_DB: postgres | ||
| POSTGRES_PASSWORD: postgres | ||
| PGHOST: localhost | ||
| PGHOST: /var/run/postgresql |
There was a problem hiding this comment.
The PGHOST environment variable is set to /var/run/postgresql in the postgres service (line 95) but overridden to localhost in the test environment (line 104). This configuration mismatch is confusing.
The PGHOST=/var/run/postgresql setting in the service configuration is typically used for Unix domain socket connections within the container, but since the service exposes port 5432:5432 and the tests connect via TCP from the host, the test environment correctly uses localhost. The service-level PGHOST setting appears unnecessary and potentially misleading.
Consider removing line 95 (PGHOST: /var/run/postgresql) from the postgres service configuration as it's not needed for the GitHub Actions workflow where connections are made via TCP from the host.
| PGHOST: /var/run/postgresql |
| if healthRes.Status == "healthy" { | ||
| return nil | ||
| } | ||
| } |
There was a problem hiding this comment.
The infinite polling loop with no delay will cause 100% CPU usage when the RPC service is not yet healthy. The default case in the select statement immediately retries GetHealth() without any sleep/delay between attempts.
Add a time.Sleep() after the health check to prevent CPU spin-looping, for example: time.Sleep(100 * time.Millisecond) or use a ticker instead of the default case.
| } | |
| } | |
| // Avoid a tight spin loop when the RPC service is not yet healthy. | |
| time.Sleep(100 * time.Millisecond) |
internal/services/ingest_backfill.go
Outdated
|
|
||
| var chunks []string | ||
| for rows.Next() { | ||
| var chunk string | ||
| if err := rows.Scan(&chunk); err != nil { | ||
| continue | ||
| } | ||
| chunks = append(chunks, chunk) | ||
| } | ||
| rows.Close() |
There was a problem hiding this comment.
The rows.Close() call on line 645 should be deferred to ensure it's called even if there's an error during iteration. Additionally, the error from rows.Scan() is being silently ignored on line 640. If there's a scanning error, it should be logged at minimum.
Consider replacing line 645 with defer rows.Close() after line 631, and handle the scanning error properly instead of continuing silently.
| var chunks []string | |
| for rows.Next() { | |
| var chunk string | |
| if err := rows.Scan(&chunk); err != nil { | |
| continue | |
| } | |
| chunks = append(chunks, chunk) | |
| } | |
| rows.Close() | |
| defer rows.Close() | |
| var chunks []string | |
| for rows.Next() { | |
| var chunk string | |
| if err := rows.Scan(&chunk); err != nil { | |
| log.Ctx(ctx).Warnf("Failed to scan chunk row for table %s: %v", table, err) | |
| continue | |
| } | |
| chunks = append(chunks, chunk) | |
| } | |
| if err := rows.Err(); err != nil { | |
| log.Ctx(ctx).Warnf("Error encountered while iterating chunks for table %s: %v", table, err) | |
| } |
What
[TODO: Short statement about what is changing.]
Why
[TODO: Why this change is being made. Include any context required to understand the why.]
Known limitations
[TODO or N/A]
Issue that this PR addresses
[TODO: Attach the link to the GitHub issue or task. Include the priority of the task here in addition to the link.]
Checklist
PR Structure
allif the changes are broad or impact many packages.Thoroughness
Release