Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
37c03e8
add timescale db hypertable
aditya1702 Feb 2, 2026
a963d23
update docker and test files to use timescaledb
aditya1702 Feb 2, 2026
ce9d7d0
Fix tests
aditya1702 Feb 2, 2026
8badbef
convert junction tables to hypertables
aditya1702 Feb 2, 2026
f2bff57
fix failing tests
aditya1702 Feb 2, 2026
2bc2180
remove comments
aditya1702 Feb 2, 2026
34124ee
changes to indexes
aditya1702 Feb 2, 2026
63de994
Merge branch 'db-indexes' into timescale
aditya1702 Feb 2, 2026
e51e697
Update 2025-06-10.2-transactions.sql
aditya1702 Feb 2, 2026
3fb3a6c
Update 2025-06-10.2-transactions.sql
aditya1702 Feb 2, 2026
66c39be
compress chunks at the end
aditya1702 Feb 3, 2026
1bf6580
parallely compress chunks
aditya1702 Feb 3, 2026
2fffe5d
Update ingest_backfill.go
aditya1702 Feb 3, 2026
fc165bf
Update ingest_backfill.go
aditya1702 Feb 3, 2026
38d08c1
Update ingest_backfill.go
aditya1702 Feb 3, 2026
369e79a
Cherry-pick opxdr-bytea-2 files (types, indexer, graphql, processors,…
aditya1702 Feb 10, 2026
1224c17
Migrate TEXT columns to BYTEA in transaction/operation/statechange sc…
aditya1702 Feb 10, 2026
9ef4af8
Apply BYTEA type conversions to data layer (transactions, operations,…
aditya1702 Feb 10, 2026
864d23b
Update test files for BYTEA types while preserving TimescaleDB patterns
aditya1702 Feb 10, 2026
ec91fe7
Fix shadow variable warnings in BatchCopy address conversions
aditya1702 Feb 10, 2026
d013efa
Update go.yaml
aditya1702 Feb 10, 2026
235aa88
merge branch opxr-bytea-2
aditya1702 Feb 10, 2026
3abf801
Update go.yaml
aditya1702 Feb 10, 2026
2b9e98e
Update go.yaml
aditya1702 Feb 10, 2026
6c55228
Update docker-compose.yaml
aditya1702 Feb 10, 2026
9f41d08
Update query_utils.go
aditya1702 Feb 10, 2026
d34dc37
remove flaky RPC test
aditya1702 Feb 10, 2026
516251b
Update helpers.go
aditya1702 Feb 10, 2026
bb35d75
Update helpers.go
aditya1702 Feb 10, 2026
71869d7
add bloom filter on account_id
aditya1702 Feb 10, 2026
147a67f
remove BatchInsert
aditya1702 Feb 11, 2026
1561630
update hypertable config - 1
aditya1702 Feb 11, 2026
ff75bf7
remove Duplicate failure tests
aditya1702 Feb 11, 2026
e85fb63
Add extra orderby columns and other hypertable configration
aditya1702 Feb 11, 2026
7a6334b
Update normal B-tree indexes
aditya1702 Feb 11, 2026
6768f8d
enable chunk skipping
aditya1702 Feb 11, 2026
a415099
Update dbtest.go
aditya1702 Feb 11, 2026
d0e76df
Update containers.go
aditya1702 Feb 11, 2026
ed71815
Compress backfill chunks parallelly using goroutine
aditya1702 Feb 11, 2026
b9305e5
remove schemas for mainnet and testnet
aditya1702 Feb 12, 2026
5a5ac6e
Enable direct compress and recompression for backfilling
aditya1702 Feb 12, 2026
77dbbc7
Add command line configuration to set retention policy and chunk size
aditya1702 Feb 12, 2026
7ef1866
Update ingest.go
aditya1702 Feb 12, 2026
6488155
add batch index tracking to logs
aditya1702 Feb 12, 2026
3db78c8
Update ingest_backfill.go
aditya1702 Feb 13, 2026
7f578ea
update auto-vaccum settings for balances tables
aditya1702 Feb 13, 2026
cefebf5
add explanations for the values
aditya1702 Feb 13, 2026
22d6f33
Disable FK checks during checkpoint population
aditya1702 Feb 13, 2026
4c7afba
Update ingest_live.go
aditya1702 Feb 13, 2026
88586d1
Update ingest_live.go
aditya1702 Feb 13, 2026
5896323
Update ingest_live.go
aditya1702 Feb 13, 2026
b4e4abe
Update ingest_backfill.go
aditya1702 Feb 13, 2026
0183370
fix ledger_created_at bug
aditya1702 Feb 13, 2026
5ccac7f
update oldest ledger using timescaledb job
aditya1702 Feb 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ jobs:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:12-alpine
image: timescale/timescaledb:2.25.0-pg17
env:
POSTGRES_USER: postgres
POSTGRES_DB: postgres
POSTGRES_PASSWORD: postgres
PGHOST: localhost
PGHOST: /var/run/postgresql
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PGHOST environment variable is set to /var/run/postgresql in the postgres service (line 95) but overridden to localhost in the test environment (line 104). This configuration mismatch is confusing.

The PGHOST=/var/run/postgresql setting in the service configuration is typically used for Unix domain socket connections within the container, but since the service exposes port 5432:5432 and the tests connect via TCP from the host, the test environment correctly uses localhost. The service-level PGHOST setting appears unnecessary and potentially misleading.

Consider removing line 95 (PGHOST: /var/run/postgresql) from the postgres service configuration as it's not needed for the GitHub Actions workflow where connections are made via TCP from the host.

Suggested change
PGHOST: /var/run/postgresql

Copilot uses AI. Check for mistakes.
options: >-
--health-cmd pg_isready
--health-interval 10s
Expand Down
16 changes: 16 additions & 0 deletions cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,22 @@ func (c *ingestCmd) Command() *cobra.Command {
FlagDefault: true,
Required: false,
},
{
Name: "chunk-interval",
Usage: "TimescaleDB chunk time interval for hypertables. Only affects future chunks. Uses PostgreSQL INTERVAL syntax.",
OptType: types.String,
ConfigKey: &cfg.ChunkInterval,
FlagDefault: "1 day",
Required: false,
},
{
Name: "retention-period",
Usage: "TimescaleDB data retention period. Chunks older than this are automatically dropped. Empty disables retention. Uses PostgreSQL INTERVAL syntax.",
OptType: types.String,
ConfigKey: &cfg.RetentionPeriod,
FlagDefault: "",
Required: false,
},
}

cmd := &cobra.Command{
Expand Down
6 changes: 3 additions & 3 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ services:

db:
container_name: db
image: postgres:14-alpine
image: timescale/timescaledb:2.25.0-pg17
command: ["postgres", "-c", "timescaledb.enable_chunk_skipping=on"]
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d wallet-backend"]
interval: 10s
Expand Down Expand Up @@ -268,5 +269,4 @@ volumes:
configs:
postgres_init:
content: |
CREATE SCHEMA IF NOT EXISTS wallet_backend_mainnet;
CREATE SCHEMA IF NOT EXISTS wallet_backend_testnet;
CREATE EXTENSION IF NOT EXISTS timescaledb;
4 changes: 2 additions & 2 deletions internal/data/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func TestAccountModelBatchGetByToIDs(t *testing.T) {
require.NoError(t, err)

// Insert test transactions_accounts links
_, err = m.DB.ExecContext(ctx, "INSERT INTO transactions_accounts (tx_to_id, account_id) VALUES ($1, $2), ($3, $4)",
_, err = m.DB.ExecContext(ctx, "INSERT INTO transactions_accounts (ledger_created_at, tx_to_id, account_id) VALUES (NOW(), $1, $2), (NOW(), $3, $4)",
toID1, types.AddressBytea(address1), toID2, types.AddressBytea(address2))
require.NoError(t, err)

Expand Down Expand Up @@ -333,7 +333,7 @@ func TestAccountModelBatchGetByOperationIDs(t *testing.T) {
require.NoError(t, err)

// Insert test operations_accounts links (account_id is BYTEA)
_, err = m.DB.ExecContext(ctx, "INSERT INTO operations_accounts (operation_id, account_id) VALUES ($1, $2), ($3, $4)",
_, err = m.DB.ExecContext(ctx, "INSERT INTO operations_accounts (ledger_created_at, operation_id, account_id) VALUES (NOW(), $1, $2), (NOW(), $3, $4)",
operationID1, types.AddressBytea(address1), operationID2, types.AddressBytea(address2))
require.NoError(t, err)

Expand Down
14 changes: 6 additions & 8 deletions internal/data/native_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,23 +129,21 @@ func (m *NativeBalanceModel) BatchCopy(ctx context.Context, dbTx pgx.Tx, balance

start := time.Now()

rows := make([][]any, len(balances))
for i, nb := range balances {
rows[i] = []any{nb.AccountAddress, nb.Balance, nb.MinimumBalance, nb.BuyingLiabilities, nb.SellingLiabilities, nb.LedgerNumber}
}

copyCount, err := dbTx.CopyFrom(
ctx,
pgx.Identifier{"native_balances"},
[]string{"account_address", "balance", "minimum_balance", "buying_liabilities", "selling_liabilities", "last_modified_ledger"},
pgx.CopyFromRows(rows),
pgx.CopyFromSlice(len(balances), func(i int) ([]any, error) {
nb := balances[i]
return []any{nb.AccountAddress, nb.Balance, nb.MinimumBalance, nb.BuyingLiabilities, nb.SellingLiabilities, nb.LedgerNumber}, nil
}),
)
if err != nil {
return fmt.Errorf("bulk inserting native balances via COPY: %w", err)
}

if int(copyCount) != len(rows) {
return fmt.Errorf("expected %d rows copied, got %d", len(rows), copyCount)
if int(copyCount) != len(balances) {
return fmt.Errorf("expected %d rows copied, got %d", len(balances), copyCount)
}

m.MetricsService.ObserveDBQueryDuration("BatchCopy", "native_balances", time.Since(start).Seconds())
Expand Down
149 changes: 20 additions & 129 deletions internal/data/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,133 +269,13 @@ func (m *OperationModel) BatchGetByStateChangeIDs(ctx context.Context, scToIDs [
return operationsWithStateChanges, nil
}

// BatchInsert inserts the operations and the operations_accounts links.
// It returns the IDs of the successfully inserted operations.
func (m *OperationModel) BatchInsert(
ctx context.Context,
sqlExecuter db.SQLExecuter,
operations []*types.Operation,
stellarAddressesByOpID map[int64]set.Set[string],
) ([]int64, error) {
if sqlExecuter == nil {
sqlExecuter = m.DB
}

// 1. Flatten the operations into parallel slices
ids := make([]int64, len(operations))
operationTypes := make([]string, len(operations))
operationXDRs := make([][]byte, len(operations))
resultCodes := make([]string, len(operations))
successfulFlags := make([]bool, len(operations))
ledgerNumbers := make([]uint32, len(operations))
ledgerCreatedAts := make([]time.Time, len(operations))

for i, op := range operations {
ids[i] = op.ID
operationTypes[i] = string(op.OperationType)
operationXDRs[i] = []byte(op.OperationXDR)
resultCodes[i] = op.ResultCode
successfulFlags[i] = op.Successful
ledgerNumbers[i] = op.LedgerNumber
ledgerCreatedAts[i] = op.LedgerCreatedAt
}

// 2. Flatten the stellarAddressesByOpID into parallel slices, converting to BYTEA
var opIDs []int64
var stellarAddressBytes [][]byte
for opID, addresses := range stellarAddressesByOpID {
for address := range addresses.Iter() {
opIDs = append(opIDs, opID)
addrBytes, err := types.AddressBytea(address).Value()
if err != nil {
return nil, fmt.Errorf("converting address %s to bytes: %w", address, err)
}
stellarAddressBytes = append(stellarAddressBytes, addrBytes.([]byte))
}
}

// Insert operations and operations_accounts links.
const insertQuery = `
WITH
-- Insert operations
inserted_operations AS (
INSERT INTO operations
(id, operation_type, operation_xdr, result_code, successful, ledger_number, ledger_created_at)
SELECT
o.id, o.operation_type, o.operation_xdr, o.result_code, o.successful, o.ledger_number, o.ledger_created_at
FROM (
SELECT
UNNEST($1::bigint[]) AS id,
UNNEST($2::text[]) AS operation_type,
UNNEST($3::bytea[]) AS operation_xdr,
UNNEST($4::text[]) AS result_code,
UNNEST($5::boolean[]) AS successful,
UNNEST($6::bigint[]) AS ledger_number,
UNNEST($7::timestamptz[]) AS ledger_created_at
) o
ON CONFLICT (id) DO NOTHING
RETURNING id
),

-- Insert operations_accounts links
inserted_operations_accounts AS (
INSERT INTO operations_accounts
(operation_id, account_id)
SELECT
oa.op_id, oa.account_id
FROM (
SELECT
UNNEST($8::bigint[]) AS op_id,
UNNEST($9::bytea[]) AS account_id
) oa
ON CONFLICT DO NOTHING
)

-- Return the IDs of successfully inserted operations
SELECT id FROM inserted_operations;
`

start := time.Now()
var insertedIDs []int64
err := sqlExecuter.SelectContext(ctx, &insertedIDs, insertQuery,
pq.Array(ids),
pq.Array(operationTypes),
pq.Array(operationXDRs),
pq.Array(resultCodes),
pq.Array(successfulFlags),
pq.Array(ledgerNumbers),
pq.Array(ledgerCreatedAts),
pq.Array(opIDs),
pq.Array(stellarAddressBytes),
)
duration := time.Since(start).Seconds()
for _, dbTableName := range []string{"operations", "operations_accounts"} {
m.MetricsService.ObserveDBQueryDuration("BatchInsert", dbTableName, duration)
if dbTableName == "operations" {
m.MetricsService.ObserveDBBatchSize("BatchInsert", dbTableName, len(operations))
}
if err == nil {
m.MetricsService.IncDBQuery("BatchInsert", dbTableName)
}
}
if err != nil {
for _, dbTableName := range []string{"operations", "operations_accounts"} {
m.MetricsService.IncDBQueryError("BatchInsert", dbTableName, utils.GetDBErrorType(err))
}
return nil, fmt.Errorf("batch inserting operations and operations_accounts: %w", err)
}

return insertedIDs, nil
}

// BatchCopy inserts operations using pgx's binary COPY protocol.
// Uses pgx.Tx for binary format which is faster than lib/pq's text format.
// Uses native pgtype types for optimal performance (see https://github.com/jackc/pgx/issues/763).
//
// IMPORTANT: Unlike BatchInsert which uses ON CONFLICT DO NOTHING, BatchCopy will FAIL
// if any duplicate records exist. The PostgreSQL COPY protocol does not support conflict
// handling. Callers must ensure no duplicates exist before calling this method, or handle
// the unique constraint violation error appropriately.
// IMPORTANT: BatchCopy will FAIL if any duplicate records exist. The PostgreSQL COPY
// protocol does not support conflict handling. Callers must ensure no duplicates exist
// before calling this method, or handle the unique constraint violation error appropriately.
func (m *OperationModel) BatchCopy(
ctx context.Context,
pgxTx pgx.Tx,
Expand Down Expand Up @@ -436,23 +316,34 @@ func (m *OperationModel) BatchCopy(

// COPY operations_accounts using pgx binary format with native pgtype types
if len(stellarAddressesByOpID) > 0 {
// Build OpID -> LedgerCreatedAt lookup from operations
ledgerCreatedAtByOpID := make(map[int64]time.Time, len(operations))
for _, op := range operations {
ledgerCreatedAtByOpID[op.ID] = op.LedgerCreatedAt
}

var oaRows [][]any
for opID, addresses := range stellarAddressesByOpID {
ledgerCreatedAt := ledgerCreatedAtByOpID[opID]
ledgerCreatedAtPgtype := pgtype.Timestamptz{Time: ledgerCreatedAt, Valid: true}
opIDPgtype := pgtype.Int8{Int64: opID, Valid: true}
for _, addr := range addresses.ToSlice() {
var addrBytes any
addrBytes, err = types.AddressBytea(addr).Value()
if err != nil {
return 0, fmt.Errorf("converting address %s to bytes: %w", addr, err)
addrBytes, addrErr := types.AddressBytea(addr).Value()
if addrErr != nil {
return 0, fmt.Errorf("converting address %s to bytes: %w", addr, addrErr)
}
oaRows = append(oaRows, []any{opIDPgtype, addrBytes})
oaRows = append(oaRows, []any{
ledgerCreatedAtPgtype,
opIDPgtype,
addrBytes,
})
}
}

_, err = pgxTx.CopyFrom(
ctx,
pgx.Identifier{"operations_accounts"},
[]string{"operation_id", "account_id"},
[]string{"ledger_created_at", "operation_id", "account_id"},
pgx.CopyFromRows(oaRows),
)
if err != nil {
Expand Down
Loading
Loading