diff --git a/.github/workflows/go.yaml b/.github/workflows/go.yaml index 7aed3f3a0..3ad8c5886 100644 --- a/.github/workflows/go.yaml +++ b/.github/workflows/go.yaml @@ -87,12 +87,12 @@ jobs: runs-on: ubuntu-latest services: postgres: - image: postgres:12-alpine + image: timescale/timescaledb:2.25.0-pg17 env: POSTGRES_USER: postgres POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres - PGHOST: localhost + PGHOST: /var/run/postgresql options: >- --health-cmd pg_isready --health-interval 10s diff --git a/cmd/ingest.go b/cmd/ingest.go index e33ce6cce..db039b157 100644 --- a/cmd/ingest.go +++ b/cmd/ingest.go @@ -140,6 +140,22 @@ func (c *ingestCmd) Command() *cobra.Command { FlagDefault: true, Required: false, }, + { + Name: "chunk-interval", + Usage: "TimescaleDB chunk time interval for hypertables. Only affects future chunks. Uses PostgreSQL INTERVAL syntax.", + OptType: types.String, + ConfigKey: &cfg.ChunkInterval, + FlagDefault: "1 day", + Required: false, + }, + { + Name: "retention-period", + Usage: "TimescaleDB data retention period. Chunks older than this are automatically dropped. Empty disables retention. Uses PostgreSQL INTERVAL syntax.", + OptType: types.String, + ConfigKey: &cfg.RetentionPeriod, + FlagDefault: "", + Required: false, + }, } cmd := &cobra.Command{ diff --git a/docker-compose.yaml b/docker-compose.yaml index 0a251049a..a6ad38372 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -7,7 +7,8 @@ services: db: container_name: db - image: postgres:14-alpine + image: timescale/timescaledb:2.25.0-pg17 + command: ["postgres", "-c", "timescaledb.enable_chunk_skipping=on"] healthcheck: test: ["CMD-SHELL", "pg_isready -U postgres -d wallet-backend"] interval: 10s @@ -268,5 +269,4 @@ volumes: configs: postgres_init: content: | - CREATE SCHEMA IF NOT EXISTS wallet_backend_mainnet; - CREATE SCHEMA IF NOT EXISTS wallet_backend_testnet; + CREATE EXTENSION IF NOT EXISTS timescaledb; diff --git a/internal/data/accounts_test.go b/internal/data/accounts_test.go index 2e1abd439..65729f27e 100644 --- a/internal/data/accounts_test.go +++ b/internal/data/accounts_test.go @@ -273,7 +273,7 @@ func TestAccountModelBatchGetByToIDs(t *testing.T) { require.NoError(t, err) // Insert test transactions_accounts links - _, err = m.DB.ExecContext(ctx, "INSERT INTO transactions_accounts (tx_to_id, account_id) VALUES ($1, $2), ($3, $4)", + _, err = m.DB.ExecContext(ctx, "INSERT INTO transactions_accounts (ledger_created_at, tx_to_id, account_id) VALUES (NOW(), $1, $2), (NOW(), $3, $4)", toID1, types.AddressBytea(address1), toID2, types.AddressBytea(address2)) require.NoError(t, err) @@ -333,7 +333,7 @@ func TestAccountModelBatchGetByOperationIDs(t *testing.T) { require.NoError(t, err) // Insert test operations_accounts links (account_id is BYTEA) - _, err = m.DB.ExecContext(ctx, "INSERT INTO operations_accounts (operation_id, account_id) VALUES ($1, $2), ($3, $4)", + _, err = m.DB.ExecContext(ctx, "INSERT INTO operations_accounts (ledger_created_at, operation_id, account_id) VALUES (NOW(), $1, $2), (NOW(), $3, $4)", operationID1, types.AddressBytea(address1), operationID2, types.AddressBytea(address2)) require.NoError(t, err) diff --git a/internal/data/native_balances.go b/internal/data/native_balances.go index 32b480087..a1b5091c7 100644 --- a/internal/data/native_balances.go +++ b/internal/data/native_balances.go @@ -129,23 +129,21 @@ func (m *NativeBalanceModel) BatchCopy(ctx context.Context, dbTx pgx.Tx, balance start := time.Now() - rows := make([][]any, len(balances)) - for i, nb := range balances { - rows[i] = []any{nb.AccountAddress, nb.Balance, nb.MinimumBalance, nb.BuyingLiabilities, nb.SellingLiabilities, nb.LedgerNumber} - } - copyCount, err := dbTx.CopyFrom( ctx, pgx.Identifier{"native_balances"}, []string{"account_address", "balance", "minimum_balance", "buying_liabilities", "selling_liabilities", "last_modified_ledger"}, - pgx.CopyFromRows(rows), + pgx.CopyFromSlice(len(balances), func(i int) ([]any, error) { + nb := balances[i] + return []any{nb.AccountAddress, nb.Balance, nb.MinimumBalance, nb.BuyingLiabilities, nb.SellingLiabilities, nb.LedgerNumber}, nil + }), ) if err != nil { return fmt.Errorf("bulk inserting native balances via COPY: %w", err) } - if int(copyCount) != len(rows) { - return fmt.Errorf("expected %d rows copied, got %d", len(rows), copyCount) + if int(copyCount) != len(balances) { + return fmt.Errorf("expected %d rows copied, got %d", len(balances), copyCount) } m.MetricsService.ObserveDBQueryDuration("BatchCopy", "native_balances", time.Since(start).Seconds()) diff --git a/internal/data/operations.go b/internal/data/operations.go index 2d862bd31..ad4ddda34 100644 --- a/internal/data/operations.go +++ b/internal/data/operations.go @@ -269,133 +269,13 @@ func (m *OperationModel) BatchGetByStateChangeIDs(ctx context.Context, scToIDs [ return operationsWithStateChanges, nil } -// BatchInsert inserts the operations and the operations_accounts links. -// It returns the IDs of the successfully inserted operations. -func (m *OperationModel) BatchInsert( - ctx context.Context, - sqlExecuter db.SQLExecuter, - operations []*types.Operation, - stellarAddressesByOpID map[int64]set.Set[string], -) ([]int64, error) { - if sqlExecuter == nil { - sqlExecuter = m.DB - } - - // 1. Flatten the operations into parallel slices - ids := make([]int64, len(operations)) - operationTypes := make([]string, len(operations)) - operationXDRs := make([][]byte, len(operations)) - resultCodes := make([]string, len(operations)) - successfulFlags := make([]bool, len(operations)) - ledgerNumbers := make([]uint32, len(operations)) - ledgerCreatedAts := make([]time.Time, len(operations)) - - for i, op := range operations { - ids[i] = op.ID - operationTypes[i] = string(op.OperationType) - operationXDRs[i] = []byte(op.OperationXDR) - resultCodes[i] = op.ResultCode - successfulFlags[i] = op.Successful - ledgerNumbers[i] = op.LedgerNumber - ledgerCreatedAts[i] = op.LedgerCreatedAt - } - - // 2. Flatten the stellarAddressesByOpID into parallel slices, converting to BYTEA - var opIDs []int64 - var stellarAddressBytes [][]byte - for opID, addresses := range stellarAddressesByOpID { - for address := range addresses.Iter() { - opIDs = append(opIDs, opID) - addrBytes, err := types.AddressBytea(address).Value() - if err != nil { - return nil, fmt.Errorf("converting address %s to bytes: %w", address, err) - } - stellarAddressBytes = append(stellarAddressBytes, addrBytes.([]byte)) - } - } - - // Insert operations and operations_accounts links. - const insertQuery = ` - WITH - -- Insert operations - inserted_operations AS ( - INSERT INTO operations - (id, operation_type, operation_xdr, result_code, successful, ledger_number, ledger_created_at) - SELECT - o.id, o.operation_type, o.operation_xdr, o.result_code, o.successful, o.ledger_number, o.ledger_created_at - FROM ( - SELECT - UNNEST($1::bigint[]) AS id, - UNNEST($2::text[]) AS operation_type, - UNNEST($3::bytea[]) AS operation_xdr, - UNNEST($4::text[]) AS result_code, - UNNEST($5::boolean[]) AS successful, - UNNEST($6::bigint[]) AS ledger_number, - UNNEST($7::timestamptz[]) AS ledger_created_at - ) o - ON CONFLICT (id) DO NOTHING - RETURNING id - ), - - -- Insert operations_accounts links - inserted_operations_accounts AS ( - INSERT INTO operations_accounts - (operation_id, account_id) - SELECT - oa.op_id, oa.account_id - FROM ( - SELECT - UNNEST($8::bigint[]) AS op_id, - UNNEST($9::bytea[]) AS account_id - ) oa - ON CONFLICT DO NOTHING - ) - - -- Return the IDs of successfully inserted operations - SELECT id FROM inserted_operations; - ` - - start := time.Now() - var insertedIDs []int64 - err := sqlExecuter.SelectContext(ctx, &insertedIDs, insertQuery, - pq.Array(ids), - pq.Array(operationTypes), - pq.Array(operationXDRs), - pq.Array(resultCodes), - pq.Array(successfulFlags), - pq.Array(ledgerNumbers), - pq.Array(ledgerCreatedAts), - pq.Array(opIDs), - pq.Array(stellarAddressBytes), - ) - duration := time.Since(start).Seconds() - for _, dbTableName := range []string{"operations", "operations_accounts"} { - m.MetricsService.ObserveDBQueryDuration("BatchInsert", dbTableName, duration) - if dbTableName == "operations" { - m.MetricsService.ObserveDBBatchSize("BatchInsert", dbTableName, len(operations)) - } - if err == nil { - m.MetricsService.IncDBQuery("BatchInsert", dbTableName) - } - } - if err != nil { - for _, dbTableName := range []string{"operations", "operations_accounts"} { - m.MetricsService.IncDBQueryError("BatchInsert", dbTableName, utils.GetDBErrorType(err)) - } - return nil, fmt.Errorf("batch inserting operations and operations_accounts: %w", err) - } - - return insertedIDs, nil -} - // BatchCopy inserts operations using pgx's binary COPY protocol. // Uses pgx.Tx for binary format which is faster than lib/pq's text format. // Uses native pgtype types for optimal performance (see https://github.com/jackc/pgx/issues/763). // -// IMPORTANT: Unlike BatchInsert which uses ON CONFLICT DO NOTHING, BatchCopy will FAIL -// if any duplicate records exist. The PostgreSQL COPY protocol does not support conflict -// handling. Callers must ensure no duplicates exist before calling this method, or handle -// the unique constraint violation error appropriately. +// IMPORTANT: BatchCopy will FAIL if any duplicate records exist. The PostgreSQL COPY +// protocol does not support conflict handling. Callers must ensure no duplicates exist +// before calling this method, or handle the unique constraint violation error appropriately. func (m *OperationModel) BatchCopy( ctx context.Context, pgxTx pgx.Tx, @@ -436,23 +316,34 @@ func (m *OperationModel) BatchCopy( // COPY operations_accounts using pgx binary format with native pgtype types if len(stellarAddressesByOpID) > 0 { + // Build OpID -> LedgerCreatedAt lookup from operations + ledgerCreatedAtByOpID := make(map[int64]time.Time, len(operations)) + for _, op := range operations { + ledgerCreatedAtByOpID[op.ID] = op.LedgerCreatedAt + } + var oaRows [][]any for opID, addresses := range stellarAddressesByOpID { + ledgerCreatedAt := ledgerCreatedAtByOpID[opID] + ledgerCreatedAtPgtype := pgtype.Timestamptz{Time: ledgerCreatedAt, Valid: true} opIDPgtype := pgtype.Int8{Int64: opID, Valid: true} for _, addr := range addresses.ToSlice() { - var addrBytes any - addrBytes, err = types.AddressBytea(addr).Value() - if err != nil { - return 0, fmt.Errorf("converting address %s to bytes: %w", addr, err) + addrBytes, addrErr := types.AddressBytea(addr).Value() + if addrErr != nil { + return 0, fmt.Errorf("converting address %s to bytes: %w", addr, addrErr) } - oaRows = append(oaRows, []any{opIDPgtype, addrBytes}) + oaRows = append(oaRows, []any{ + ledgerCreatedAtPgtype, + opIDPgtype, + addrBytes, + }) } } _, err = pgxTx.CopyFrom( ctx, pgx.Identifier{"operations_accounts"}, - []string{"operation_id", "account_id"}, + []string{"ledger_created_at", "operation_id", "account_id"}, pgx.CopyFromRows(oaRows), ) if err != nil { diff --git a/internal/data/operations_test.go b/internal/data/operations_test.go index 40a5181e3..fe27139d0 100644 --- a/internal/data/operations_test.go +++ b/internal/data/operations_test.go @@ -43,193 +43,6 @@ func generateTestOperations(n int, startID int64) ([]*types.Operation, map[int64 return ops, addressesByOpID } -func Test_OperationModel_BatchInsert(t *testing.T) { - dbt := dbtest.Open(t) - defer dbt.Close() - dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) - require.NoError(t, err) - defer dbConnectionPool.Close() - - ctx := context.Background() - now := time.Now() - - // Create test data - kp1 := keypair.MustRandom() - kp2 := keypair.MustRandom() - const q = "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)" - _, err = dbConnectionPool.ExecContext(ctx, q, types.AddressBytea(kp1.Address()), types.AddressBytea(kp2.Address())) - require.NoError(t, err) - - // Create referenced transactions first with specific ToIDs - // Operations IDs must be in TOID range for each transaction: (to_id, to_id + 4096) - meta1, meta2 := "meta1", "meta2" - envelope1, envelope2 := "envelope1", "envelope2" - tx1 := types.Transaction{ - Hash: "d176b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa4877", - ToID: 4096, - EnvelopeXDR: &envelope1, - FeeCharged: 100, - ResultCode: "TransactionResultCodeTxSuccess", - MetaXDR: &meta1, - LedgerNumber: 1, - LedgerCreatedAt: now, - IsFeeBump: false, - } - tx2 := types.Transaction{ - Hash: "e176b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa4877", - ToID: 8192, - EnvelopeXDR: &envelope2, - FeeCharged: 200, - ResultCode: "TransactionResultCodeTxSuccess", - MetaXDR: &meta2, - LedgerNumber: 2, - LedgerCreatedAt: now, - IsFeeBump: true, - } - - // Insert transactions - sqlxDB, err := dbConnectionPool.SqlxDB(ctx) - require.NoError(t, err) - txModel := &TransactionModel{DB: dbConnectionPool, MetricsService: metrics.NewMetricsService(sqlxDB)} - _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1, &tx2}, map[int64]set.Set[string]{ - tx1.ToID: set.NewSet(kp1.Address()), - tx2.ToID: set.NewSet(kp2.Address()), - }) - require.NoError(t, err) - - // Operations IDs must be in TOID range: (to_id, to_id + 4096) - op1 := types.Operation{ - ID: 4097, // in range (4096, 8192) - OperationType: types.OperationTypePayment, - OperationXDR: types.XDRBytea([]byte("operation1")), - LedgerCreatedAt: now, - } - op2 := types.Operation{ - ID: 8193, // in range (8192, 12288) - OperationType: types.OperationTypeCreateAccount, - OperationXDR: types.XDRBytea([]byte("operation2")), - LedgerCreatedAt: now, - } - - testCases := []struct { - name string - useDBTx bool - operations []*types.Operation - stellarAddressesByOpID map[int64]set.Set[string] - wantAccountLinks map[int64][]string - wantErrContains string - wantIDs []int64 - }{ - { - name: "🟢successful_insert_without_dbTx", - useDBTx: false, - operations: []*types.Operation{&op1, &op2}, - stellarAddressesByOpID: map[int64]set.Set[string]{op1.ID: set.NewSet(kp1.Address(), kp1.Address(), kp1.Address(), kp1.Address()), op2.ID: set.NewSet(kp2.Address(), kp2.Address())}, - wantAccountLinks: map[int64][]string{op1.ID: {kp1.Address()}, op2.ID: {kp2.Address()}}, - wantErrContains: "", - wantIDs: []int64{op1.ID, op2.ID}, - }, - { - name: "🟢successful_insert_with_dbTx", - useDBTx: true, - operations: []*types.Operation{&op1}, - stellarAddressesByOpID: map[int64]set.Set[string]{op1.ID: set.NewSet(kp1.Address())}, - wantAccountLinks: map[int64][]string{op1.ID: {kp1.Address()}}, - wantErrContains: "", - wantIDs: []int64{op1.ID}, - }, - { - name: "🟢empty_input", - useDBTx: false, - operations: []*types.Operation{}, - stellarAddressesByOpID: map[int64]set.Set[string]{}, - wantAccountLinks: map[int64][]string{}, - wantErrContains: "", - wantIDs: nil, - }, - { - name: "🟡duplicate_operation", - useDBTx: false, - operations: []*types.Operation{&op1, &op1}, - stellarAddressesByOpID: map[int64]set.Set[string]{op1.ID: set.NewSet(kp1.Address())}, - wantAccountLinks: map[int64][]string{op1.ID: {kp1.Address()}}, - wantErrContains: "", - wantIDs: []int64{op1.ID}, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // Clear the database before each test - _, err = dbConnectionPool.ExecContext(ctx, "TRUNCATE operations, operations_accounts CASCADE") - require.NoError(t, err) - - // Create fresh mock for each test case - mockMetricsService := metrics.NewMockMetricsService() - mockMetricsService. - On("ObserveDBQueryDuration", "BatchInsert", "operations", mock.Anything).Return().Once(). - On("ObserveDBQueryDuration", "BatchInsert", "operations_accounts", mock.Anything).Return().Once(). - On("ObserveDBBatchSize", "BatchInsert", "operations", mock.Anything).Return().Once(). - On("IncDBQuery", "BatchInsert", "operations").Return().Once(). - On("IncDBQuery", "BatchInsert", "operations_accounts").Return().Once() - defer mockMetricsService.AssertExpectations(t) - - m := &OperationModel{ - DB: dbConnectionPool, - MetricsService: mockMetricsService, - } - - var sqlExecuter db.SQLExecuter = dbConnectionPool - if tc.useDBTx { - tx, err := dbConnectionPool.BeginTxx(ctx, nil) - require.NoError(t, err) - defer tx.Rollback() - sqlExecuter = tx - } - - gotInsertedIDs, err := m.BatchInsert(ctx, sqlExecuter, tc.operations, tc.stellarAddressesByOpID) - - if tc.wantErrContains != "" { - require.Error(t, err) - assert.Contains(t, err.Error(), tc.wantErrContains) - return - } - - // Verify the results - require.NoError(t, err) - var dbInsertedIDs []int64 - err = sqlExecuter.SelectContext(ctx, &dbInsertedIDs, "SELECT id FROM operations") - require.NoError(t, err) - assert.ElementsMatch(t, tc.wantIDs, dbInsertedIDs) - assert.ElementsMatch(t, tc.wantIDs, gotInsertedIDs) - - // Verify the account links - if len(tc.wantAccountLinks) > 0 { - var accountLinks []struct { - OperationID int64 `db:"operation_id"` - AccountID types.AddressBytea `db:"account_id"` - } - err = sqlExecuter.SelectContext(ctx, &accountLinks, "SELECT operation_id, account_id FROM operations_accounts ORDER BY operation_id, account_id") - require.NoError(t, err) - - // Create a map of operation_id -> set of account_ids for O(1) lookups - accountLinksMap := make(map[int64][]string) - for _, link := range accountLinks { - accountLinksMap[link.OperationID] = append(accountLinksMap[link.OperationID], string(link.AccountID)) - } - - // Verify each operation has its expected account links - require.Equal(t, len(tc.wantAccountLinks), len(accountLinksMap), "number of elements in the maps don't match") - for key, expectedSlice := range tc.wantAccountLinks { - actualSlice, exists := accountLinksMap[key] - require.True(t, exists, "key %s not found in actual map", key) - assert.ElementsMatch(t, expectedSlice, actualSlice, "slices for key %s don't match", key) - } - } - }) - } -} - func Test_OperationModel_BatchCopy(t *testing.T) { dbt := dbtest.Open(t) defer dbt.Close() @@ -274,14 +87,12 @@ func Test_OperationModel_BatchCopy(t *testing.T) { IsFeeBump: true, } - // Insert transactions - sqlxDB, err := dbConnectionPool.SqlxDB(ctx) - require.NoError(t, err) - txModel := &TransactionModel{DB: dbConnectionPool, MetricsService: metrics.NewMetricsService(sqlxDB)} - _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1, &tx2}, map[int64]set.Set[string]{ - tx1.ToID: set.NewSet(kp1.Address()), - tx2.ToID: set.NewSet(kp2.Address()), - }) + // Insert transactions using direct SQL + _, err = dbConnectionPool.ExecContext(ctx, ` + INSERT INTO transactions (hash, to_id, envelope_xdr, fee_charged, result_code, meta_xdr, ledger_number, ledger_created_at, is_fee_bump) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9), ($10, $11, $12, $13, $14, $15, $16, $17, $18) + `, tx1.Hash, tx1.ToID, *tx1.EnvelopeXDR, tx1.FeeCharged, tx1.ResultCode, *tx1.MetaXDR, tx1.LedgerNumber, tx1.LedgerCreatedAt, tx1.IsFeeBump, + tx2.Hash, tx2.ToID, *tx2.EnvelopeXDR, tx2.FeeCharged, tx2.ResultCode, *tx2.MetaXDR, tx2.LedgerNumber, tx2.LedgerCreatedAt, tx2.IsFeeBump) require.NoError(t, err) // Operations IDs must be in TOID range: (to_id, to_id + 4096) @@ -406,82 +217,6 @@ func Test_OperationModel_BatchCopy(t *testing.T) { } } -func Test_OperationModel_BatchCopy_DuplicateFails(t *testing.T) { - dbt := dbtest.Open(t) - defer dbt.Close() - dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) - require.NoError(t, err) - defer dbConnectionPool.Close() - - ctx := context.Background() - now := time.Now() - - // Create test accounts - kp1 := keypair.MustRandom() - const q = "INSERT INTO accounts (stellar_address) VALUES ($1)" - _, err = dbConnectionPool.ExecContext(ctx, q, types.AddressBytea(kp1.Address())) - require.NoError(t, err) - - // Create a parent transaction that the operation will reference - _, err = dbConnectionPool.ExecContext(ctx, ` - INSERT INTO transactions (hash, to_id, envelope_xdr, fee_charged, result_code, meta_xdr, ledger_number, ledger_created_at, is_fee_bump) - VALUES ('tx_for_dup_test', 1, 'env', 100, 'TransactionResultCodeTxSuccess', 'meta', 1, $1, false) - `, now) - require.NoError(t, err) - - op1 := types.Operation{ - ID: 999, - OperationType: types.OperationTypePayment, - OperationXDR: types.XDRBytea([]byte("operation_xdr_dup_test")), - LedgerNumber: 1, - LedgerCreatedAt: now, - } - - // Pre-insert the operation using BatchInsert (which uses ON CONFLICT DO NOTHING) - sqlxDB, err := dbConnectionPool.SqlxDB(ctx) - require.NoError(t, err) - opModel := &OperationModel{DB: dbConnectionPool, MetricsService: metrics.NewMetricsService(sqlxDB)} - _, err = opModel.BatchInsert(ctx, nil, []*types.Operation{&op1}, map[int64]set.Set[string]{ - op1.ID: set.NewSet(kp1.Address()), - }) - require.NoError(t, err) - - // Verify the operation was inserted - var count int - err = dbConnectionPool.GetContext(ctx, &count, "SELECT COUNT(*) FROM operations WHERE id = $1", op1.ID) - require.NoError(t, err) - require.Equal(t, 1, count) - - // Now try to insert the same operation using BatchCopy - this should FAIL - // because COPY does not support ON CONFLICT handling - mockMetricsService := metrics.NewMockMetricsService() - mockMetricsService.On("IncDBQueryError", "BatchCopy", "operations", mock.Anything).Return().Once() - defer mockMetricsService.AssertExpectations(t) - - m := &OperationModel{ - DB: dbConnectionPool, - MetricsService: mockMetricsService, - } - - conn, err := pgx.Connect(ctx, dbt.DSN) - require.NoError(t, err) - defer conn.Close(ctx) - - pgxTx, err := conn.Begin(ctx) - require.NoError(t, err) - - _, err = m.BatchCopy(ctx, pgxTx, []*types.Operation{&op1}, map[int64]set.Set[string]{ - op1.ID: set.NewSet(kp1.Address()), - }) - - // BatchCopy should fail with a unique constraint violation - require.Error(t, err) - assert.Contains(t, err.Error(), "pgx CopyFrom operations: ERROR: duplicate key value violates unique constraint \"operations_pkey\"") - - // Rollback the failed transaction - require.NoError(t, pgxTx.Rollback(ctx)) -} - func TestOperationModel_GetAll(t *testing.T) { dbt := dbtest.Open(t) defer dbt.Close() @@ -845,12 +580,12 @@ func TestOperationModel_BatchGetByAccountAddresses(t *testing.T) { // Create test operations_accounts links _, err = dbConnectionPool.ExecContext(ctx, ` - INSERT INTO operations_accounts (operation_id, account_id) + INSERT INTO operations_accounts (ledger_created_at, operation_id, account_id) VALUES - (4097, $1), - (8193, $1), - (12289, $2) - `, types.AddressBytea(address1), types.AddressBytea(address2)) + ($3, 4097, $1), + ($3, 8193, $1), + ($3, 12289, $2) + `, types.AddressBytea(address1), types.AddressBytea(address2), now) require.NoError(t, err) // Test BatchGetByAccount @@ -988,51 +723,6 @@ func TestOperationModel_BatchGetByStateChangeIDs(t *testing.T) { assert.Equal(t, int64(4097), stateChangeIDsFound["12288-4097-1"]) } -func BenchmarkOperationModel_BatchInsert(b *testing.B) { - dbt := dbtest.OpenB(b) - defer dbt.Close() - dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) - if err != nil { - b.Fatalf("failed to open db connection pool: %v", err) - } - defer dbConnectionPool.Close() - - ctx := context.Background() - sqlxDB, err := dbConnectionPool.SqlxDB(ctx) - if err != nil { - b.Fatalf("failed to get sqlx db: %v", err) - } - metricsService := metrics.NewMetricsService(sqlxDB) - - m := &OperationModel{ - DB: dbConnectionPool, - MetricsService: metricsService, - } - - batchSizes := []int{1000, 5000, 10000, 50000, 100000} - - for _, size := range batchSizes { - b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - b.StopTimer() - // Clean up operations before each iteration - //nolint:errcheck // truncate is best-effort cleanup in benchmarks - dbConnectionPool.ExecContext(ctx, "TRUNCATE operations, operations_accounts CASCADE") - // Generate fresh test data for each iteration - ops, addressesByOpID := generateTestOperations(size, int64(i*size)) - b.StartTimer() - - _, err := m.BatchInsert(ctx, nil, ops, addressesByOpID) - if err != nil { - b.Fatalf("BatchInsert failed: %v", err) - } - } - }) - } -} - // BenchmarkOperationModel_BatchCopy benchmarks bulk insert using pgx's binary COPY protocol. func BenchmarkOperationModel_BatchCopy(b *testing.B) { dbt := dbtest.OpenB(b) diff --git a/internal/data/query_utils.go b/internal/data/query_utils.go index d2304136a..35fbd70d4 100644 --- a/internal/data/query_utils.go +++ b/internal/data/query_utils.go @@ -162,7 +162,12 @@ func prepareColumnsWithID(columns string, model any, prefix string, idColumns .. dbColumns = getDBColumns(model) } else { dbColumns = set.NewSet[string]() - dbColumns.Add(columns) + for _, col := range strings.Split(columns, ",") { + col = strings.TrimSpace(col) + if col != "" { + dbColumns.Add(col) + } + } } if prefix != "" { diff --git a/internal/data/statechanges.go b/internal/data/statechanges.go index ae0aee543..0560e0b44 100644 --- a/internal/data/statechanges.go +++ b/internal/data/statechanges.go @@ -168,236 +168,13 @@ func (m *StateChangeModel) GetAll(ctx context.Context, columns string, limit *in return stateChanges, nil } -func (m *StateChangeModel) BatchInsert( - ctx context.Context, - sqlExecuter db.SQLExecuter, - stateChanges []types.StateChange, -) ([]string, error) { - if sqlExecuter == nil { - sqlExecuter = m.DB - } - - // Flatten the state changes into parallel slices - stateChangeOrders := make([]int64, len(stateChanges)) - toIDs := make([]int64, len(stateChanges)) - categories := make([]string, len(stateChanges)) - reasons := make([]*string, len(stateChanges)) - ledgerCreatedAts := make([]time.Time, len(stateChanges)) - ledgerNumbers := make([]int, len(stateChanges)) - accountIDBytes := make([][]byte, len(stateChanges)) - operationIDs := make([]int64, len(stateChanges)) - tokenIDs := make([]*string, len(stateChanges)) - amounts := make([]*string, len(stateChanges)) - signerAccountIDBytes := make([][]byte, len(stateChanges)) - spenderAccountIDBytes := make([][]byte, len(stateChanges)) - sponsoredAccountIDBytes := make([][]byte, len(stateChanges)) - sponsorAccountIDBytes := make([][]byte, len(stateChanges)) - deployerAccountIDBytes := make([][]byte, len(stateChanges)) - funderAccountIDBytes := make([][]byte, len(stateChanges)) - claimableBalanceIDs := make([]*string, len(stateChanges)) - liquidityPoolIDs := make([]*string, len(stateChanges)) - sponsoredDataValues := make([]*string, len(stateChanges)) - signerWeightOlds := make([]*int16, len(stateChanges)) - signerWeightNews := make([]*int16, len(stateChanges)) - thresholdOlds := make([]*int16, len(stateChanges)) - thresholdNews := make([]*int16, len(stateChanges)) - trustlineLimitOlds := make([]*string, len(stateChanges)) - trustlineLimitNews := make([]*string, len(stateChanges)) - flags := make([]*int16, len(stateChanges)) - keyValues := make([]*types.NullableJSONB, len(stateChanges)) - - for i, sc := range stateChanges { - stateChangeOrders[i] = sc.StateChangeOrder - toIDs[i] = sc.ToID - categories[i] = string(sc.StateChangeCategory) - ledgerCreatedAts[i] = sc.LedgerCreatedAt - ledgerNumbers[i] = int(sc.LedgerNumber) - operationIDs[i] = sc.OperationID - - // Convert account_id to BYTEA (required field) - addrBytes, err := sc.AccountID.Value() - if err != nil { - return nil, fmt.Errorf("converting account_id: %w", err) - } - accountIDBytes[i] = addrBytes.([]byte) - - // Nullable fields - if sc.StateChangeReason != nil { - reason := string(*sc.StateChangeReason) - reasons[i] = &reason - } - if sc.TokenID.Valid { - tokenIDs[i] = &sc.TokenID.String - } - if sc.Amount.Valid { - amounts[i] = &sc.Amount.String - } - - // Convert nullable account_id fields to BYTEA - signerAccountIDBytes[i], err = pgtypeBytesFromNullAddressBytea(sc.SignerAccountID) - if err != nil { - return nil, fmt.Errorf("converting signer_account_id: %w", err) - } - spenderAccountIDBytes[i], err = pgtypeBytesFromNullAddressBytea(sc.SpenderAccountID) - if err != nil { - return nil, fmt.Errorf("converting spender_account_id: %w", err) - } - sponsoredAccountIDBytes[i], err = pgtypeBytesFromNullAddressBytea(sc.SponsoredAccountID) - if err != nil { - return nil, fmt.Errorf("converting sponsored_account_id: %w", err) - } - sponsorAccountIDBytes[i], err = pgtypeBytesFromNullAddressBytea(sc.SponsorAccountID) - if err != nil { - return nil, fmt.Errorf("converting sponsor_account_id: %w", err) - } - deployerAccountIDBytes[i], err = pgtypeBytesFromNullAddressBytea(sc.DeployerAccountID) - if err != nil { - return nil, fmt.Errorf("converting deployer_account_id: %w", err) - } - funderAccountIDBytes[i], err = pgtypeBytesFromNullAddressBytea(sc.FunderAccountID) - if err != nil { - return nil, fmt.Errorf("converting funder_account_id: %w", err) - } - if sc.ClaimableBalanceID.Valid { - claimableBalanceIDs[i] = &sc.ClaimableBalanceID.String - } - if sc.LiquidityPoolID.Valid { - liquidityPoolIDs[i] = &sc.LiquidityPoolID.String - } - if sc.SponsoredData.Valid { - sponsoredDataValues[i] = &sc.SponsoredData.String - } - if sc.SignerWeightOld.Valid { - signerWeightOlds[i] = &sc.SignerWeightOld.Int16 - } - if sc.SignerWeightNew.Valid { - signerWeightNews[i] = &sc.SignerWeightNew.Int16 - } - if sc.ThresholdOld.Valid { - thresholdOlds[i] = &sc.ThresholdOld.Int16 - } - if sc.ThresholdNew.Valid { - thresholdNews[i] = &sc.ThresholdNew.Int16 - } - if sc.TrustlineLimitOld.Valid { - trustlineLimitOlds[i] = &sc.TrustlineLimitOld.String - } - if sc.TrustlineLimitNew.Valid { - trustlineLimitNews[i] = &sc.TrustlineLimitNew.String - } - if sc.Flags.Valid { - flags[i] = &sc.Flags.Int16 - } - if sc.KeyValue != nil { - keyValues[i] = &sc.KeyValue - } - } - - const insertQuery = ` - -- Insert state changes - WITH input_data AS ( - SELECT - UNNEST($1::bigint[]) AS state_change_order, - UNNEST($2::bigint[]) AS to_id, - UNNEST($3::text[]) AS state_change_category, - UNNEST($4::text[]) AS state_change_reason, - UNNEST($5::timestamptz[]) AS ledger_created_at, - UNNEST($6::integer[]) AS ledger_number, - UNNEST($7::bytea[]) AS account_id, - UNNEST($8::bigint[]) AS operation_id, - UNNEST($9::text[]) AS token_id, - UNNEST($10::text[]) AS amount, - UNNEST($11::bytea[]) AS signer_account_id, - UNNEST($12::bytea[]) AS spender_account_id, - UNNEST($13::bytea[]) AS sponsored_account_id, - UNNEST($14::bytea[]) AS sponsor_account_id, - UNNEST($15::bytea[]) AS deployer_account_id, - UNNEST($16::bytea[]) AS funder_account_id, - UNNEST($17::text[]) AS claimable_balance_id, - UNNEST($18::text[]) AS liquidity_pool_id, - UNNEST($19::text[]) AS sponsored_data, - UNNEST($20::smallint[]) AS signer_weight_old, - UNNEST($21::smallint[]) AS signer_weight_new, - UNNEST($22::smallint[]) AS threshold_old, - UNNEST($23::smallint[]) AS threshold_new, - UNNEST($24::text[]) AS trustline_limit_old, - UNNEST($25::text[]) AS trustline_limit_new, - UNNEST($26::smallint[]) AS flags, - UNNEST($27::jsonb[]) AS key_value - ), - inserted_state_changes AS ( - INSERT INTO state_changes - (state_change_order, to_id, state_change_category, state_change_reason, ledger_created_at, - ledger_number, account_id, operation_id, token_id, amount, - signer_account_id, spender_account_id, sponsored_account_id, sponsor_account_id, - deployer_account_id, funder_account_id, claimable_balance_id, liquidity_pool_id, sponsored_data, - signer_weight_old, signer_weight_new, threshold_old, threshold_new, - trustline_limit_old, trustline_limit_new, flags, key_value) - SELECT - sc.state_change_order, sc.to_id, sc.state_change_category, sc.state_change_reason, sc.ledger_created_at, - sc.ledger_number, sc.account_id, sc.operation_id, sc.token_id, sc.amount, - sc.signer_account_id, sc.spender_account_id, sc.sponsored_account_id, sc.sponsor_account_id, - sc.deployer_account_id, sc.funder_account_id, sc.claimable_balance_id, sc.liquidity_pool_id, sc.sponsored_data, - sc.signer_weight_old, sc.signer_weight_new, sc.threshold_old, sc.threshold_new, - sc.trustline_limit_old, sc.trustline_limit_new, sc.flags, sc.key_value - FROM input_data sc - ON CONFLICT (to_id, operation_id, state_change_order) DO NOTHING - RETURNING to_id, operation_id, state_change_order - ) - SELECT CONCAT(to_id, '-', operation_id, '-', state_change_order) FROM inserted_state_changes; - ` - - start := time.Now() - var insertedIDs []string - err := sqlExecuter.SelectContext(ctx, &insertedIDs, insertQuery, - pq.Array(stateChangeOrders), - pq.Array(toIDs), - pq.Array(categories), - pq.Array(reasons), - pq.Array(ledgerCreatedAts), - pq.Array(ledgerNumbers), - pq.Array(accountIDBytes), - pq.Array(operationIDs), - pq.Array(tokenIDs), - pq.Array(amounts), - pq.Array(signerAccountIDBytes), - pq.Array(spenderAccountIDBytes), - pq.Array(sponsoredAccountIDBytes), - pq.Array(sponsorAccountIDBytes), - pq.Array(deployerAccountIDBytes), - pq.Array(funderAccountIDBytes), - pq.Array(claimableBalanceIDs), - pq.Array(liquidityPoolIDs), - pq.Array(sponsoredDataValues), - pq.Array(signerWeightOlds), - pq.Array(signerWeightNews), - pq.Array(thresholdOlds), - pq.Array(thresholdNews), - pq.Array(trustlineLimitOlds), - pq.Array(trustlineLimitNews), - pq.Array(flags), - pq.Array(keyValues), - ) - duration := time.Since(start).Seconds() - m.MetricsService.ObserveDBQueryDuration("BatchInsert", "state_changes", duration) - m.MetricsService.ObserveDBBatchSize("BatchInsert", "state_changes", len(stateChanges)) - if err != nil { - m.MetricsService.IncDBQueryError("BatchInsert", "state_changes", utils.GetDBErrorType(err)) - return nil, fmt.Errorf("batch inserting state changes: %w", err) - } - m.MetricsService.IncDBQuery("BatchInsert", "state_changes") - - return insertedIDs, nil -} - // BatchCopy inserts state changes using pgx's binary COPY protocol. // Uses pgx.Tx for binary format which is faster than lib/pq's text format. // Uses native pgtype types for optimal performance (see https://github.com/jackc/pgx/issues/763). // -// IMPORTANT: Unlike BatchInsert which uses ON CONFLICT DO NOTHING, BatchCopy will FAIL -// if any duplicate records exist. The PostgreSQL COPY protocol does not support conflict -// handling. Callers must ensure no duplicates exist before calling this method, or handle -// the unique constraint violation error appropriately. +// IMPORTANT: BatchCopy will FAIL if any duplicate records exist. The PostgreSQL COPY +// protocol does not support conflict handling. Callers must ensure no duplicates exist +// before calling this method, or handle the unique constraint violation error appropriately. func (m *StateChangeModel) BatchCopy( ctx context.Context, pgxTx pgx.Tx, diff --git a/internal/data/statechanges_test.go b/internal/data/statechanges_test.go index 8267b5fa5..e2cf539fc 100644 --- a/internal/data/statechanges_test.go +++ b/internal/data/statechanges_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - set "github.com/deckarep/golang-set/v2" "github.com/jackc/pgx/v5" "github.com/stellar/go-stellar-sdk/keypair" "github.com/stretchr/testify/assert" @@ -65,162 +64,6 @@ func generateTestStateChanges(n int, accountID string, startToID int64, auxAddre return scs } -func TestStateChangeModel_BatchInsert(t *testing.T) { - dbt := dbtest.Open(t) - defer dbt.Close() - dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) - require.NoError(t, err) - defer dbConnectionPool.Close() - - ctx := context.Background() - now := time.Now() - - // Create test data - kp1 := keypair.MustRandom() - kp2 := keypair.MustRandom() - const q = "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)" - _, err = dbConnectionPool.ExecContext(ctx, q, types.AddressBytea(kp1.Address()), types.AddressBytea(kp2.Address())) - require.NoError(t, err) - - // Create referenced transactions first - meta1, meta2 := "meta1", "meta2" - envelope1, envelope2 := "envelope1", "envelope2" - tx1 := types.Transaction{ - Hash: "f176b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa4877", - ToID: 1, - EnvelopeXDR: &envelope1, - FeeCharged: 100, - ResultCode: "TransactionResultCodeTxSuccess", - MetaXDR: &meta1, - LedgerNumber: 1, - LedgerCreatedAt: now, - IsFeeBump: false, - } - tx2 := types.Transaction{ - Hash: "0276b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa4877", - ToID: 2, - EnvelopeXDR: &envelope2, - FeeCharged: 200, - ResultCode: "TransactionResultCodeTxSuccess", - MetaXDR: &meta2, - LedgerNumber: 2, - LedgerCreatedAt: now, - IsFeeBump: true, - } - sqlxDB, err := dbConnectionPool.SqlxDB(ctx) - require.NoError(t, err) - txModel := &TransactionModel{DB: dbConnectionPool, MetricsService: metrics.NewMetricsService(sqlxDB)} - _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1, &tx2}, map[int64]set.Set[string]{ - tx1.ToID: set.NewSet(kp1.Address()), - tx2.ToID: set.NewSet(kp2.Address()), - }) - require.NoError(t, err) - - reason := types.StateChangeReasonAdd - sc1 := types.StateChange{ - ToID: 1, - StateChangeOrder: 1, - StateChangeCategory: types.StateChangeCategoryBalance, - StateChangeReason: &reason, - LedgerCreatedAt: now, - LedgerNumber: 1, - AccountID: types.AddressBytea(kp1.Address()), - OperationID: 123, - TokenID: sql.NullString{String: "token1", Valid: true}, - Amount: sql.NullString{String: "100", Valid: true}, - } - sc2 := types.StateChange{ - ToID: 2, - StateChangeOrder: 1, - StateChangeCategory: types.StateChangeCategoryBalance, - StateChangeReason: &reason, - LedgerCreatedAt: now, - LedgerNumber: 2, - AccountID: types.AddressBytea(kp2.Address()), - OperationID: 456, - } - - testCases := []struct { - name string - useDBTx bool - stateChanges []types.StateChange - wantIDs []string - wantErrContains string - }{ - { - name: "🟢successful_insert_without_dbTx", - useDBTx: false, - stateChanges: []types.StateChange{sc1, sc2}, - wantIDs: []string{fmt.Sprintf("%d-%d-%d", sc1.ToID, sc1.OperationID, sc1.StateChangeOrder), fmt.Sprintf("%d-%d-%d", sc2.ToID, sc2.OperationID, sc2.StateChangeOrder)}, - }, - { - name: "🟢successful_insert_with_dbTx", - useDBTx: true, - stateChanges: []types.StateChange{sc1}, - wantIDs: []string{fmt.Sprintf("%d-%d-%d", sc1.ToID, sc1.OperationID, sc1.StateChangeOrder)}, - }, - { - name: "🟢empty_input", - useDBTx: false, - stateChanges: []types.StateChange{}, - wantIDs: nil, - }, - { - name: "🟡duplicate_state_change", - useDBTx: false, - stateChanges: []types.StateChange{sc1, sc1}, - wantIDs: []string{fmt.Sprintf("%d-%d-%d", sc1.ToID, sc1.OperationID, sc1.StateChangeOrder)}, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - _, err = dbConnectionPool.ExecContext(ctx, "TRUNCATE state_changes CASCADE") - require.NoError(t, err) - - mockMetricsService := metrics.NewMockMetricsService() - mockMetricsService. - On("ObserveDBQueryDuration", "BatchInsert", "state_changes", mock.Anything).Return().Once() - mockMetricsService. - On("ObserveDBBatchSize", "BatchInsert", "state_changes", mock.Anything).Return().Once() - mockMetricsService. - On("IncDBQuery", "BatchInsert", "state_changes").Return().Once() - - m := &StateChangeModel{ - DB: dbConnectionPool, - MetricsService: mockMetricsService, - } - - var sqlExecuter db.SQLExecuter = dbConnectionPool - if tc.useDBTx { - tx, err := dbConnectionPool.BeginTxx(ctx, nil) - require.NoError(t, err) - defer tx.Rollback() // nolint: errcheck - sqlExecuter = tx - } - - gotInsertedIDs, err := m.BatchInsert(ctx, sqlExecuter, tc.stateChanges) - - if tc.wantErrContains != "" { - require.Error(t, err) - assert.Contains(t, err.Error(), tc.wantErrContains) - return - } - - require.NoError(t, err) - assert.ElementsMatch(t, tc.wantIDs, gotInsertedIDs) - - // Verify from DB - var dbInsertedIDs []string - err = sqlExecuter.SelectContext(ctx, &dbInsertedIDs, "SELECT CONCAT(to_id, '-', operation_id, '-', state_change_order) FROM state_changes") - require.NoError(t, err) - assert.ElementsMatch(t, tc.wantIDs, dbInsertedIDs) - - mockMetricsService.AssertExpectations(t) - }) - } -} - func TestStateChangeModel_BatchCopy(t *testing.T) { dbt := dbtest.Open(t) defer dbt.Close() @@ -263,13 +106,12 @@ func TestStateChangeModel_BatchCopy(t *testing.T) { LedgerCreatedAt: now, IsFeeBump: true, } - sqlxDB, err := dbConnectionPool.SqlxDB(ctx) - require.NoError(t, err) - txModel := &TransactionModel{DB: dbConnectionPool, MetricsService: metrics.NewMetricsService(sqlxDB)} - _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1, &tx2}, map[int64]set.Set[string]{ - tx1.ToID: set.NewSet(kp1.Address()), - tx2.ToID: set.NewSet(kp2.Address()), - }) + // Insert transactions using direct SQL + _, err = dbConnectionPool.ExecContext(ctx, ` + INSERT INTO transactions (hash, to_id, envelope_xdr, fee_charged, result_code, meta_xdr, ledger_number, ledger_created_at, is_fee_bump) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9), ($10, $11, $12, $13, $14, $15, $16, $17, $18) + `, tx1.Hash, tx1.ToID, *tx1.EnvelopeXDR, tx1.FeeCharged, tx1.ResultCode, *tx1.MetaXDR, tx1.LedgerNumber, tx1.LedgerCreatedAt, tx1.IsFeeBump, + tx2.Hash, tx2.ToID, *tx2.EnvelopeXDR, tx2.FeeCharged, tx2.ResultCode, *tx2.MetaXDR, tx2.LedgerNumber, tx2.LedgerCreatedAt, tx2.IsFeeBump) require.NoError(t, err) reason := types.StateChangeReasonAdd @@ -394,82 +236,6 @@ func TestStateChangeModel_BatchCopy(t *testing.T) { } } -func TestStateChangeModel_BatchCopy_DuplicateFails(t *testing.T) { - dbt := dbtest.Open(t) - defer dbt.Close() - dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) - require.NoError(t, err) - defer dbConnectionPool.Close() - - ctx := context.Background() - now := time.Now() - - // Create test account - kp1 := keypair.MustRandom() - const q = "INSERT INTO accounts (stellar_address) VALUES ($1)" - _, err = dbConnectionPool.ExecContext(ctx, q, types.AddressBytea(kp1.Address())) - require.NoError(t, err) - - // Create parent transaction - _, err = dbConnectionPool.ExecContext(ctx, ` - INSERT INTO transactions (hash, to_id, envelope_xdr, fee_charged, result_code, meta_xdr, ledger_number, ledger_created_at, is_fee_bump) - VALUES ('tx_for_sc_dup_test', 1, 'env', 100, 'TransactionResultCodeTxSuccess', 'meta', 1, $1, false) - `, now) - require.NoError(t, err) - - reason := types.StateChangeReasonCredit - sc1 := types.StateChange{ - ToID: 1, // Must reference the transaction created above with to_id=1 - StateChangeOrder: 1, - StateChangeCategory: types.StateChangeCategoryBalance, - StateChangeReason: &reason, - LedgerCreatedAt: now, - LedgerNumber: 1, - AccountID: types.AddressBytea(kp1.Address()), - OperationID: 123, - } - - // Pre-insert the state change using BatchInsert (which uses ON CONFLICT DO NOTHING) - sqlxDB, err := dbConnectionPool.SqlxDB(ctx) - require.NoError(t, err) - scModel := &StateChangeModel{DB: dbConnectionPool, MetricsService: metrics.NewMetricsService(sqlxDB)} - _, err = scModel.BatchInsert(ctx, nil, []types.StateChange{sc1}) - require.NoError(t, err) - - // Verify the state change was inserted - var count int - err = dbConnectionPool.GetContext(ctx, &count, "SELECT COUNT(*) FROM state_changes WHERE to_id = $1 AND state_change_order = $2", sc1.ToID, sc1.StateChangeOrder) - require.NoError(t, err) - require.Equal(t, 1, count) - - // Now try to insert the same state change using BatchCopy - this should FAIL - // because COPY does not support ON CONFLICT handling - mockMetricsService := metrics.NewMockMetricsService() - mockMetricsService.On("IncDBQueryError", "BatchCopy", "state_changes", mock.Anything).Return().Once() - defer mockMetricsService.AssertExpectations(t) - - m := &StateChangeModel{ - DB: dbConnectionPool, - MetricsService: mockMetricsService, - } - - conn, err := pgx.Connect(ctx, dbt.DSN) - require.NoError(t, err) - defer conn.Close(ctx) - - pgxTx, err := conn.Begin(ctx) - require.NoError(t, err) - - _, err = m.BatchCopy(ctx, pgxTx, []types.StateChange{sc1}) - - // BatchCopy should fail with a unique constraint violation - require.Error(t, err) - assert.Contains(t, err.Error(), "pgx CopyFrom state_changes: ERROR: duplicate key value violates unique constraint \"state_changes_pkey\"") - - // Rollback the failed transaction - require.NoError(t, pgxTx.Rollback(ctx)) -} - func TestStateChangeModel_BatchGetByAccountAddress(t *testing.T) { dbt := dbtest.Open(t) defer dbt.Close() @@ -1151,69 +917,6 @@ func TestStateChangeModel_BatchGetByToID(t *testing.T) { }) } -func BenchmarkStateChangeModel_BatchInsert(b *testing.B) { - dbt := dbtest.OpenB(b) - defer dbt.Close() - dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) - if err != nil { - b.Fatalf("failed to open db connection pool: %v", err) - } - defer dbConnectionPool.Close() - - ctx := context.Background() - sqlxDB, err := dbConnectionPool.SqlxDB(ctx) - if err != nil { - b.Fatalf("failed to get sqlx db: %v", err) - } - metricsService := metrics.NewMetricsService(sqlxDB) - - m := &StateChangeModel{ - DB: dbConnectionPool, - MetricsService: metricsService, - } - - // Create a parent transaction that state changes will reference - const txHash = "benchmark_tx_hash" - accountID := keypair.MustRandom().Address() - now := time.Now() - _, err = dbConnectionPool.ExecContext(ctx, ` - INSERT INTO transactions (hash, to_id, envelope_xdr, fee_charged, result_code, meta_xdr, ledger_number, ledger_created_at, is_fee_bump) - VALUES ($1, 1, 'env', 100, 'TransactionResultCodeTxSuccess', 'meta', 1, $2, false) - `, txHash, now) - if err != nil { - b.Fatalf("failed to create parent transaction: %v", err) - } - - // Pre-generate auxiliary addresses for nullable account_id fields - auxAddresses := make([]string, 10) - for i := range auxAddresses { - auxAddresses[i] = keypair.MustRandom().Address() - } - - batchSizes := []int{1000, 5000, 10000, 50000, 100000} - - for _, size := range batchSizes { - b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - b.StopTimer() - // Clean up state changes before each iteration (keep the parent transaction) - //nolint:errcheck // truncate is best-effort cleanup in benchmarks - dbConnectionPool.ExecContext(ctx, "TRUNCATE state_changes CASCADE") - // Generate fresh test data for each iteration - scs := generateTestStateChanges(size, accountID, int64(i*size), auxAddresses) - b.StartTimer() - - _, err := m.BatchInsert(ctx, nil, scs) - if err != nil { - b.Fatalf("BatchInsert failed: %v", err) - } - } - }) - } -} - // BenchmarkStateChangeModel_BatchCopy benchmarks bulk insert using pgx's binary COPY protocol. func BenchmarkStateChangeModel_BatchCopy(b *testing.B) { dbt := dbtest.OpenB(b) diff --git a/internal/data/transactions.go b/internal/data/transactions.go index 7e51a892d..5209ffcfc 100644 --- a/internal/data/transactions.go +++ b/internal/data/transactions.go @@ -178,151 +178,13 @@ func (m *TransactionModel) BatchGetByStateChangeIDs(ctx context.Context, scToIDs return transactions, nil } -// BatchInsert inserts the transactions and the transactions_accounts links. -// It returns the hashes of the successfully inserted transactions. -func (m *TransactionModel) BatchInsert( - ctx context.Context, - sqlExecuter db.SQLExecuter, - txs []*types.Transaction, - stellarAddressesByToID map[int64]set.Set[string], -) ([]string, error) { - if sqlExecuter == nil { - sqlExecuter = m.DB - } - - // 1. Flatten the transactions into parallel slices - hashes := make([][]byte, len(txs)) - toIDs := make([]int64, len(txs)) - envelopeXDRs := make([]*string, len(txs)) - feesCharged := make([]int64, len(txs)) - resultCodes := make([]string, len(txs)) - metaXDRs := make([]*string, len(txs)) - ledgerNumbers := make([]int, len(txs)) - ledgerCreatedAts := make([]time.Time, len(txs)) - isFeeBumps := make([]bool, len(txs)) - - for i, t := range txs { - hashBytes, err := t.Hash.Value() - if err != nil { - return nil, fmt.Errorf("converting hash %s to bytes: %w", t.Hash, err) - } - hashes[i] = hashBytes.([]byte) - toIDs[i] = t.ToID - envelopeXDRs[i] = t.EnvelopeXDR - feesCharged[i] = t.FeeCharged - resultCodes[i] = t.ResultCode - metaXDRs[i] = t.MetaXDR - ledgerNumbers[i] = int(t.LedgerNumber) - ledgerCreatedAts[i] = t.LedgerCreatedAt - isFeeBumps[i] = t.IsFeeBump - } - - // 2. Flatten the stellarAddressesByToID into parallel slices, converting to BYTEA - var txToIDs []int64 - var stellarAddressBytes [][]byte - for toID, addresses := range stellarAddressesByToID { - for address := range addresses.Iter() { - txToIDs = append(txToIDs, toID) - addrBytes, err := types.AddressBytea(address).Value() - if err != nil { - return nil, fmt.Errorf("converting address %s to bytes: %w", address, err) - } - stellarAddressBytes = append(stellarAddressBytes, addrBytes.([]byte)) - } - } - - // Insert transactions and transactions_accounts links with minimal account validation. - const insertQuery = ` - WITH - -- Insert transactions - inserted_transactions AS ( - INSERT INTO transactions - (hash, to_id, envelope_xdr, fee_charged, result_code, meta_xdr, ledger_number, ledger_created_at, is_fee_bump) - SELECT - t.hash, t.to_id, t.envelope_xdr, t.fee_charged, t.result_code, t.meta_xdr, t.ledger_number, t.ledger_created_at, t.is_fee_bump - FROM ( - SELECT - UNNEST($1::bytea[]) AS hash, - UNNEST($2::bigint[]) AS to_id, - UNNEST($3::text[]) AS envelope_xdr, - UNNEST($4::bigint[]) AS fee_charged, - UNNEST($5::text[]) AS result_code, - UNNEST($6::text[]) AS meta_xdr, - UNNEST($7::bigint[]) AS ledger_number, - UNNEST($8::timestamptz[]) AS ledger_created_at, - UNNEST($9::boolean[]) AS is_fee_bump - ) t - ON CONFLICT (to_id) DO NOTHING - RETURNING hash - ), - - -- Insert transactions_accounts links - inserted_transactions_accounts AS ( - INSERT INTO transactions_accounts - (tx_to_id, account_id) - SELECT - ta.tx_to_id, ta.account_id - FROM ( - SELECT - UNNEST($10::bigint[]) AS tx_to_id, - UNNEST($11::bytea[]) AS account_id - ) ta - ON CONFLICT DO NOTHING - ) - - -- Return the hashes of successfully inserted transactions - SELECT hash FROM inserted_transactions; - ` - - start := time.Now() - var insertedHashes []types.HashBytea - err := sqlExecuter.SelectContext(ctx, &insertedHashes, insertQuery, - pq.Array(hashes), - pq.Array(toIDs), - pq.Array(envelopeXDRs), - pq.Array(feesCharged), - pq.Array(resultCodes), - pq.Array(metaXDRs), - pq.Array(ledgerNumbers), - pq.Array(ledgerCreatedAts), - pq.Array(isFeeBumps), - pq.Array(txToIDs), - pq.Array(stellarAddressBytes), - ) - duration := time.Since(start).Seconds() - for _, dbTableName := range []string{"transactions", "transactions_accounts"} { - m.MetricsService.ObserveDBQueryDuration("BatchInsert", dbTableName, duration) - if dbTableName == "transactions" { - m.MetricsService.ObserveDBBatchSize("BatchInsert", dbTableName, len(txs)) - } - if err == nil { - m.MetricsService.IncDBQuery("BatchInsert", dbTableName) - } - } - if err != nil { - for _, dbTableName := range []string{"transactions", "transactions_accounts"} { - m.MetricsService.IncDBQueryError("BatchInsert", dbTableName, utils.GetDBErrorType(err)) - } - return nil, fmt.Errorf("batch inserting transactions and transactions_accounts: %w", err) - } - - // Convert HashBytea to string for the return value - result := make([]string, len(insertedHashes)) - for i, h := range insertedHashes { - result[i] = h.String() - } - - return result, nil -} - // BatchCopy inserts transactions using pgx's binary COPY protocol. // Uses pgx.Tx for binary format which is faster than lib/pq's text format. // Uses native pgtype types for optimal performance (see https://github.com/jackc/pgx/issues/763). // -// IMPORTANT: Unlike BatchInsert which uses ON CONFLICT DO NOTHING, BatchCopy will FAIL -// if any duplicate records exist. The PostgreSQL COPY protocol does not support conflict -// handling. Callers must ensure no duplicates exist before calling this method, or handle -// the unique constraint violation error appropriately. +// IMPORTANT: BatchCopy will FAIL if any duplicate records exist. The PostgreSQL COPY +// protocol does not support conflict handling. Callers must ensure no duplicates exist +// before calling this method, or handle the unique constraint violation error appropriately. func (m *TransactionModel) BatchCopy( ctx context.Context, pgxTx pgx.Tx, @@ -369,22 +231,34 @@ func (m *TransactionModel) BatchCopy( // COPY transactions_accounts using pgx binary format with native pgtype types if len(stellarAddressesByToID) > 0 { + // Build ToID -> LedgerCreatedAt lookup from transactions + ledgerCreatedAtByToID := make(map[int64]time.Time, len(txs)) + for _, tx := range txs { + ledgerCreatedAtByToID[tx.ToID] = tx.LedgerCreatedAt + } + var taRows [][]any for toID, addresses := range stellarAddressesByToID { + ledgerCreatedAt := ledgerCreatedAtByToID[toID] + ledgerCreatedAtPgtype := pgtype.Timestamptz{Time: ledgerCreatedAt, Valid: true} toIDPgtype := pgtype.Int8{Int64: toID, Valid: true} for _, addr := range addresses.ToSlice() { - addrBytes, err := types.AddressBytea(addr).Value() - if err != nil { - return 0, fmt.Errorf("converting address %s to bytes: %w", addr, err) + addrBytes, addrErr := types.AddressBytea(addr).Value() + if addrErr != nil { + return 0, fmt.Errorf("converting address %s to bytes: %w", addr, addrErr) } - taRows = append(taRows, []any{toIDPgtype, addrBytes}) + taRows = append(taRows, []any{ + ledgerCreatedAtPgtype, + toIDPgtype, + addrBytes, + }) } } _, err = pgxTx.CopyFrom( ctx, pgx.Identifier{"transactions_accounts"}, - []string{"tx_to_id", "account_id"}, + []string{"ledger_created_at", "tx_to_id", "account_id"}, pgx.CopyFromRows(taRows), ) if err != nil { diff --git a/internal/data/transactions_test.go b/internal/data/transactions_test.go index 489e5691c..3e1b952c6 100644 --- a/internal/data/transactions_test.go +++ b/internal/data/transactions_test.go @@ -53,176 +53,6 @@ func generateTestTransactions(n int, startLedger int32) ([]*types.Transaction, m return txs, addressesByToID } -func Test_TransactionModel_BatchInsert(t *testing.T) { - dbt := dbtest.Open(t) - defer dbt.Close() - dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) - require.NoError(t, err) - defer dbConnectionPool.Close() - - ctx := context.Background() - now := time.Now() - - // Create test data - kp1 := keypair.MustRandom() - kp2 := keypair.MustRandom() - const q = "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)" - _, err = dbConnectionPool.ExecContext(ctx, q, types.AddressBytea(kp1.Address()), types.AddressBytea(kp2.Address())) - require.NoError(t, err) - - meta1, meta2 := "meta1", "meta2" - envelope1, envelope2 := "envelope1", "envelope2" - tx1 := types.Transaction{ - Hash: "e76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48760", - ToID: 1, - EnvelopeXDR: &envelope1, - FeeCharged: 100, - ResultCode: "TransactionResultCodeTxSuccess", - MetaXDR: &meta1, - LedgerNumber: 1, - LedgerCreatedAt: now, - IsFeeBump: false, - } - tx2 := types.Transaction{ - Hash: "a76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48761", - ToID: 2, - EnvelopeXDR: &envelope2, - FeeCharged: 200, - ResultCode: "TransactionResultCodeTxSuccess", - MetaXDR: &meta2, - LedgerNumber: 2, - LedgerCreatedAt: now, - IsFeeBump: true, - } - - testCases := []struct { - name string - useDBTx bool - txs []*types.Transaction - stellarAddressesByToID map[int64]set.Set[string] - wantAccountLinks map[int64][]string - wantErrContains string - wantHashes []string - }{ - { - name: "🟢successful_insert_without_dbTx", - useDBTx: false, - txs: []*types.Transaction{&tx1, &tx2}, - stellarAddressesByToID: map[int64]set.Set[string]{tx1.ToID: set.NewSet(kp1.Address()), tx2.ToID: set.NewSet(kp2.Address())}, - wantAccountLinks: map[int64][]string{tx1.ToID: {kp1.Address()}, tx2.ToID: {kp2.Address()}}, - wantErrContains: "", - wantHashes: []string{tx1.Hash.String(), tx2.Hash.String()}, - }, - { - name: "🟢successful_insert_with_dbTx", - useDBTx: true, - txs: []*types.Transaction{&tx1}, - stellarAddressesByToID: map[int64]set.Set[string]{tx1.ToID: set.NewSet(kp1.Address())}, - wantAccountLinks: map[int64][]string{tx1.ToID: {kp1.Address()}}, - wantErrContains: "", - wantHashes: []string{tx1.Hash.String()}, - }, - { - name: "🟢empty_input", - useDBTx: false, - txs: []*types.Transaction{}, - stellarAddressesByToID: map[int64]set.Set[string]{}, - wantAccountLinks: map[int64][]string{}, - wantErrContains: "", - wantHashes: nil, - }, - { - name: "🟡duplicate_transaction", - useDBTx: false, - txs: []*types.Transaction{&tx1, &tx1}, - stellarAddressesByToID: map[int64]set.Set[string]{tx1.ToID: set.NewSet(kp1.Address())}, - wantAccountLinks: map[int64][]string{tx1.ToID: {kp1.Address()}}, - wantErrContains: "", - wantHashes: []string{tx1.Hash.String()}, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // Clear the database before each test - _, err = dbConnectionPool.ExecContext(ctx, "TRUNCATE transactions, transactions_accounts CASCADE") - require.NoError(t, err) - - // Create fresh mock for each test case - mockMetricsService := metrics.NewMockMetricsService() - // The implementation always loops through both tables and calls ObserveDBQueryDuration for each - mockMetricsService. - On("ObserveDBQueryDuration", "BatchInsert", "transactions", mock.Anything).Return().Once(). - On("ObserveDBQueryDuration", "BatchInsert", "transactions_accounts", mock.Anything).Return().Once() - // ObserveDBBatchSize is only called for transactions table (not transactions_accounts) - mockMetricsService.On("ObserveDBBatchSize", "BatchInsert", "transactions", mock.Anything).Return().Once() - // IncDBQuery is called for both tables on success - mockMetricsService. - On("IncDBQuery", "BatchInsert", "transactions").Return().Once(). - On("IncDBQuery", "BatchInsert", "transactions_accounts").Return().Once() - defer mockMetricsService.AssertExpectations(t) - - m := &TransactionModel{ - DB: dbConnectionPool, - MetricsService: mockMetricsService, - } - - var sqlExecuter db.SQLExecuter = dbConnectionPool - if tc.useDBTx { - tx, err := dbConnectionPool.BeginTxx(ctx, nil) - require.NoError(t, err) - defer tx.Rollback() - sqlExecuter = tx - } - - gotInsertedHashes, err := m.BatchInsert(ctx, sqlExecuter, tc.txs, tc.stellarAddressesByToID) - - if tc.wantErrContains != "" { - require.Error(t, err) - assert.Contains(t, err.Error(), tc.wantErrContains) - return - } - - // Verify the results - require.NoError(t, err) - var dbInsertedHashes []types.HashBytea - err = sqlExecuter.SelectContext(ctx, &dbInsertedHashes, "SELECT hash FROM transactions") - require.NoError(t, err) - // Convert HashBytea to string for comparison - dbHashStrings := make([]string, len(dbInsertedHashes)) - for i, h := range dbInsertedHashes { - dbHashStrings[i] = h.String() - } - assert.ElementsMatch(t, tc.wantHashes, dbHashStrings) - assert.ElementsMatch(t, tc.wantHashes, gotInsertedHashes) - - // Verify the account links - if len(tc.wantAccountLinks) > 0 { - var accountLinks []struct { - TxToID int64 `db:"tx_to_id"` - AccountID types.AddressBytea `db:"account_id"` - } - err = sqlExecuter.SelectContext(ctx, &accountLinks, "SELECT tx_to_id, account_id FROM transactions_accounts ORDER BY tx_to_id, account_id") - require.NoError(t, err) - - // Create a map of tx_to_id -> set of account_ids for O(1) lookups - accountLinksMap := make(map[int64][]string) - for _, link := range accountLinks { - accountLinksMap[link.TxToID] = append(accountLinksMap[link.TxToID], string(link.AccountID)) - } - - // Verify each transaction has its expected account links - require.Equal(t, len(tc.wantAccountLinks), len(accountLinksMap), "number of elements in the maps don't match") - for key, expectedSlice := range tc.wantAccountLinks { - actualSlice, exists := accountLinksMap[key] - require.True(t, exists, "key %d not found in actual map", key) - assert.ElementsMatch(t, expectedSlice, actualSlice, "slices for key %d don't match", key) - } - } - }) - } -} - func Test_TransactionModel_BatchCopy(t *testing.T) { dbt := dbtest.Open(t) defer dbt.Close() @@ -391,81 +221,6 @@ func Test_TransactionModel_BatchCopy(t *testing.T) { } } -func Test_TransactionModel_BatchCopy_DuplicateFails(t *testing.T) { - dbt := dbtest.Open(t) - defer dbt.Close() - dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) - require.NoError(t, err) - defer dbConnectionPool.Close() - - ctx := context.Background() - now := time.Now() - - // Create test account - kp1 := keypair.MustRandom() - const q = "INSERT INTO accounts (stellar_address) VALUES ($1)" - _, err = dbConnectionPool.ExecContext(ctx, q, types.AddressBytea(kp1.Address())) - require.NoError(t, err) - - meta := "meta1" - envelope := "envelope1" - txDup := types.Transaction{ - Hash: "f76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48766", - ToID: 100, - EnvelopeXDR: &envelope, - FeeCharged: 100, - ResultCode: "TransactionResultCodeTxSuccess", - MetaXDR: &meta, - LedgerNumber: 1, - LedgerCreatedAt: now, - IsFeeBump: false, - } - - // Pre-insert the transaction using BatchInsert (which uses ON CONFLICT DO NOTHING) - sqlxDB, err := dbConnectionPool.SqlxDB(ctx) - require.NoError(t, err) - txModel := &TransactionModel{DB: dbConnectionPool, MetricsService: metrics.NewMetricsService(sqlxDB)} - _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&txDup}, map[int64]set.Set[string]{ - txDup.ToID: set.NewSet(kp1.Address()), - }) - require.NoError(t, err) - - // Verify the transaction was inserted - var count int - err = dbConnectionPool.GetContext(ctx, &count, "SELECT COUNT(*) FROM transactions WHERE hash = $1", txDup.Hash) - require.NoError(t, err) - require.Equal(t, 1, count) - - // Now try to insert the same transaction using BatchCopy - this should FAIL - // because COPY does not support ON CONFLICT handling - mockMetricsService := metrics.NewMockMetricsService() - mockMetricsService.On("IncDBQueryError", "BatchCopy", "transactions", mock.Anything).Return().Once() - defer mockMetricsService.AssertExpectations(t) - - m := &TransactionModel{ - DB: dbConnectionPool, - MetricsService: mockMetricsService, - } - - conn, err := pgx.Connect(ctx, dbt.DSN) - require.NoError(t, err) - defer conn.Close(ctx) - - pgxTx, err := conn.Begin(ctx) - require.NoError(t, err) - - _, err = m.BatchCopy(ctx, pgxTx, []*types.Transaction{&txDup}, map[int64]set.Set[string]{ - txDup.ToID: set.NewSet(kp1.Address()), - }) - - // BatchCopy should fail with a unique constraint violation - require.Error(t, err) - assert.Contains(t, err.Error(), "pgx CopyFrom transactions: ERROR: duplicate key value violates unique constraint \"transactions_pkey\"") - - // Rollback the failed transaction - require.NoError(t, pgxTx.Rollback(ctx)) -} - func TestTransactionModel_GetByHash(t *testing.T) { dbt := dbtest.Open(t) defer dbt.Close() @@ -595,12 +350,12 @@ func TestTransactionModel_BatchGetByAccountAddress(t *testing.T) { // Create test transactions_accounts links (account_id is BYTEA) _, err = dbConnectionPool.ExecContext(ctx, ` - INSERT INTO transactions_accounts (tx_to_id, account_id) + INSERT INTO transactions_accounts (ledger_created_at, tx_to_id, account_id) VALUES - (1, $1), - (2, $1), - (3, $2) - `, types.AddressBytea(address1), types.AddressBytea(address2)) + ($3, 1, $1), + ($3, 2, $1), + ($3, 3, $2) + `, types.AddressBytea(address1), types.AddressBytea(address2), now) require.NoError(t, err) // Test BatchGetByAccount @@ -742,51 +497,6 @@ func TestTransactionModel_BatchGetByStateChangeIDs(t *testing.T) { assert.Equal(t, scTestHash3, stateChangeIDsFound["3-3-1"]) // to_id=3 -> scTestHash3 (to_id=3) } -func BenchmarkTransactionModel_BatchInsert(b *testing.B) { - dbt := dbtest.OpenB(b) - defer dbt.Close() - dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) - if err != nil { - b.Fatalf("failed to open db connection pool: %v", err) - } - defer dbConnectionPool.Close() - - ctx := context.Background() - sqlxDB, err := dbConnectionPool.SqlxDB(ctx) - if err != nil { - b.Fatalf("failed to get sqlx db: %v", err) - } - metricsService := metrics.NewMetricsService(sqlxDB) - - m := &TransactionModel{ - DB: dbConnectionPool, - MetricsService: metricsService, - } - - batchSizes := []int{1000, 5000, 10000, 50000, 100000} - - for _, size := range batchSizes { - b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - b.StopTimer() - // Clean up before each iteration - //nolint:errcheck // truncate is best-effort cleanup in benchmarks - dbConnectionPool.ExecContext(ctx, "TRUNCATE transactions, transactions_accounts CASCADE") - // Generate fresh test data for each iteration - txs, addressesByToID := generateTestTransactions(size, int32(i*size)) - b.StartTimer() - - _, err := m.BatchInsert(ctx, nil, txs, addressesByToID) - if err != nil { - b.Fatalf("BatchInsert failed: %v", err) - } - } - }) - } -} - // BenchmarkTransactionModel_BatchCopy benchmarks bulk insert using pgx's binary COPY protocol. func BenchmarkTransactionModel_BatchCopy(b *testing.B) { dbt := dbtest.OpenB(b) diff --git a/internal/db/dbtest/dbtest.go b/internal/db/dbtest/dbtest.go index 9cbc9917e..67adc9e9b 100644 --- a/internal/db/dbtest/dbtest.go +++ b/internal/db/dbtest/dbtest.go @@ -1,3 +1,4 @@ +// Package dbtest provides test database utilities with TimescaleDB support. package dbtest import ( @@ -11,14 +12,28 @@ import ( "github.com/stellar/wallet-backend/internal/db/migrations" ) +// Open opens a test database with migrations applied. +// Requires TimescaleDB extension to be available on the PostgreSQL server. func Open(t *testing.T) *dbtest.DB { db := dbtest.Postgres(t) conn := db.Open() defer conn.Close() + // Enable TimescaleDB extension before running migrations + _, err := conn.Exec("CREATE EXTENSION IF NOT EXISTS timescaledb") + if err != nil { + t.Fatal(err) + } + + // Also set on the current session since ALTER DATABASE only affects new connections + _, err = conn.Exec("SET timescaledb.enable_chunk_skipping = on") + if err != nil { + t.Fatal(err) + } + migrateDirection := schema.MigrateUp m := migrate.HttpFileSystemMigrationSource{FileSystem: http.FS(migrations.FS)} - _, err := schema.Migrate(conn.DB, m, migrateDirection, 0) + _, err = schema.Migrate(conn.DB, m, migrateDirection, 0) if err != nil { t.Fatal(err) } @@ -26,20 +41,51 @@ func Open(t *testing.T) *dbtest.DB { return db } +// OpenWithoutMigrations opens a test database without running migrations +// but still enables TimescaleDB extension for manual migration testing. func OpenWithoutMigrations(t *testing.T) *dbtest.DB { db := dbtest.Postgres(t) + conn := db.Open() + defer conn.Close() + + // Enable TimescaleDB extension + _, err := conn.Exec("CREATE EXTENSION IF NOT EXISTS timescaledb") + if err != nil { + t.Fatal(err) + } + + // Enable chunk skipping at the database level so all connections (including + // those opened by the Migrate function) inherit this setting. + _, err = conn.Exec("DO $$ BEGIN EXECUTE format('ALTER DATABASE %I SET timescaledb.enable_chunk_skipping = on', current_database()); END $$") + if err != nil { + t.Fatal(err) + } + return db } // OpenB opens a test database for benchmarks with migrations applied. +// Requires TimescaleDB extension to be available on the PostgreSQL server. func OpenB(b *testing.B) *dbtest.DB { db := dbtest.Postgres(b) conn := db.Open() defer conn.Close() + // Enable TimescaleDB extension before running migrations + _, err := conn.Exec("CREATE EXTENSION IF NOT EXISTS timescaledb") + if err != nil { + b.Fatal(err) + } + + // Also set on the current session since ALTER DATABASE only affects new connections + _, err = conn.Exec("SET timescaledb.enable_chunk_skipping = on") + if err != nil { + b.Fatal(err) + } + migrateDirection := schema.MigrateUp m := migrate.HttpFileSystemMigrationSource{FileSystem: http.FS(migrations.FS)} - _, err := schema.Migrate(conn.DB, m, migrateDirection, 0) + _, err = schema.Migrate(conn.DB, m, migrateDirection, 0) if err != nil { b.Fatal(err) } diff --git a/internal/db/migrations/2025-06-10.2-transactions.sql b/internal/db/migrations/2025-06-10.2-transactions.sql index 44ee11ae1..3424e7e2c 100644 --- a/internal/db/migrations/2025-06-10.2-transactions.sql +++ b/internal/db/migrations/2025-06-10.2-transactions.sql @@ -1,30 +1,46 @@ -- +migrate Up --- Table: transactions +-- Table: transactions (TimescaleDB hypertable with columnstore) CREATE TABLE transactions ( - to_id BIGINT PRIMARY KEY, - hash BYTEA NOT NULL UNIQUE, + to_id BIGINT NOT NULL, + hash BYTEA NOT NULL, envelope_xdr TEXT, fee_charged BIGINT NOT NULL, result_code TEXT NOT NULL, meta_xdr TEXT, ledger_number INTEGER NOT NULL, - ledger_created_at TIMESTAMPTZ NOT NULL, is_fee_bump BOOLEAN NOT NULL DEFAULT false, - ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + ledger_created_at TIMESTAMPTZ NOT NULL +) WITH ( + tsdb.hypertable, + tsdb.partition_column = 'ledger_created_at', + tsdb.chunk_interval = '1 day', + tsdb.orderby = 'ledger_created_at DESC, to_id DESC' ); -CREATE INDEX idx_transactions_ledger_created_at ON transactions(ledger_created_at); +SELECT enable_chunk_skipping('transactions', 'to_id'); --- Table: transactions_accounts +CREATE INDEX idx_transactions_hash ON transactions(hash); +CREATE INDEX idx_transactions_to_id ON transactions(to_id); + +-- Table: transactions_accounts (TimescaleDB hypertable for automatic cleanup with retention) CREATE TABLE transactions_accounts ( - tx_to_id BIGINT NOT NULL REFERENCES transactions(to_id) ON DELETE CASCADE, + tx_to_id BIGINT NOT NULL, account_id BYTEA NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - PRIMARY KEY (account_id, tx_to_id) + ledger_created_at TIMESTAMPTZ NOT NULL +) WITH ( + tsdb.hypertable, + tsdb.partition_column = 'ledger_created_at', + tsdb.chunk_interval = '1 day', + tsdb.orderby = 'ledger_created_at DESC, tx_to_id DESC', + tsdb.segmentby = 'account_id' ); +SELECT enable_chunk_skipping('transactions_accounts', 'tx_to_id'); + CREATE INDEX idx_transactions_accounts_tx_to_id ON transactions_accounts(tx_to_id); +CREATE INDEX idx_transactions_accounts_account_id ON transactions_accounts(account_id, ledger_created_at DESC, tx_to_id DESC); -- +migrate Down diff --git a/internal/db/migrations/2025-06-10.3-operations.sql b/internal/db/migrations/2025-06-10.3-operations.sql index 11fc2cbb5..497f9f1f2 100644 --- a/internal/db/migrations/2025-06-10.3-operations.sql +++ b/internal/db/migrations/2025-06-10.3-operations.sql @@ -1,8 +1,8 @@ -- +migrate Up --- Table: operations +-- Table: operations (TimescaleDB hypertable with columnstore) CREATE TABLE operations ( - id BIGINT PRIMARY KEY, + id BIGINT NOT NULL, operation_type TEXT NOT NULL CHECK ( operation_type IN ( 'CREATE_ACCOUNT', 'PAYMENT', 'PATH_PAYMENT_STRICT_RECEIVE', @@ -21,21 +21,36 @@ CREATE TABLE operations ( result_code TEXT NOT NULL, successful BOOLEAN NOT NULL, ledger_number INTEGER NOT NULL, - ledger_created_at TIMESTAMPTZ NOT NULL, - ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + ledger_created_at TIMESTAMPTZ NOT NULL +) WITH ( + tsdb.hypertable, + tsdb.partition_column = 'ledger_created_at', + tsdb.chunk_interval = '1 day', + tsdb.orderby = 'ledger_created_at DESC, id DESC' ); -CREATE INDEX idx_operations_ledger_created_at ON operations(ledger_created_at); +SELECT enable_chunk_skipping('operations', 'id'); --- Table: operations_accounts +CREATE INDEX idx_operations_id ON operations(id); + +-- Table: operations_accounts (TimescaleDB hypertable for automatic cleanup with retention) CREATE TABLE operations_accounts ( - operation_id BIGINT NOT NULL REFERENCES operations(id) ON DELETE CASCADE, + operation_id BIGINT NOT NULL, account_id BYTEA NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - PRIMARY KEY (account_id, operation_id) + ledger_created_at TIMESTAMPTZ NOT NULL +) WITH ( + tsdb.hypertable, + tsdb.partition_column = 'ledger_created_at', + tsdb.chunk_interval = '1 day', + tsdb.orderby = 'ledger_created_at DESC, operation_id DESC', + tsdb.segmentby = 'account_id' ); +SELECT enable_chunk_skipping('operations_accounts', 'operation_id'); + CREATE INDEX idx_operations_accounts_operation_id ON operations_accounts(operation_id); +CREATE INDEX idx_operations_accounts_account_id ON operations_accounts(account_id, ledger_created_at DESC, operation_id DESC); -- +migrate Down diff --git a/internal/db/migrations/2025-06-10.4-statechanges.sql b/internal/db/migrations/2025-06-10.4-statechanges.sql index eb41b37ac..a6b33ea5d 100644 --- a/internal/db/migrations/2025-06-10.4-statechanges.sql +++ b/internal/db/migrations/2025-06-10.4-statechanges.sql @@ -1,8 +1,10 @@ -- +migrate Up --- Table: state_changes +-- Table: state_changes (TimescaleDB hypertable with columnstore) +-- Note: FK to transactions removed (hypertable FKs not supported) CREATE TABLE state_changes ( - to_id BIGINT NOT NULL REFERENCES transactions(to_id) ON DELETE CASCADE, + to_id BIGINT NOT NULL, + operation_id BIGINT NOT NULL, state_change_order BIGINT NOT NULL CHECK (state_change_order >= 1), state_change_category TEXT NOT NULL CHECK ( state_change_category IN ( @@ -19,10 +21,8 @@ CREATE TABLE state_changes ( ) ), ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - ledger_created_at TIMESTAMPTZ NOT NULL, ledger_number INTEGER NOT NULL, account_id BYTEA NOT NULL, - operation_id BIGINT NOT NULL, token_id TEXT, amount TEXT, signer_account_id BYTEA, @@ -42,13 +42,21 @@ CREATE TABLE state_changes ( trustline_limit_new TEXT, flags SMALLINT, key_value JSONB, - - PRIMARY KEY (to_id, operation_id, state_change_order) + ledger_created_at TIMESTAMPTZ NOT NULL +) WITH ( + tsdb.hypertable, + tsdb.partition_column = 'ledger_created_at', + tsdb.chunk_interval = '1 day', + tsdb.orderby = 'ledger_created_at DESC, to_id DESC, operation_id DESC, state_change_order DESC', + tsdb.segmentby = 'account_id' ); -CREATE INDEX idx_state_changes_account_id ON state_changes(account_id); +SELECT enable_chunk_skipping('state_changes', 'to_id'); +SELECT enable_chunk_skipping('state_changes', 'operation_id'); + +CREATE INDEX idx_state_changes_account_id ON state_changes(account_id, ledger_created_at DESC, to_id DESC, operation_id DESC, state_change_order DESC); +CREATE INDEX idx_state_changes_to_id ON state_changes(to_id, operation_id DESC, state_change_order DESC); CREATE INDEX idx_state_changes_operation_id ON state_changes(operation_id); -CREATE INDEX idx_state_changes_ledger_created_at ON state_changes(ledger_created_at); -- +migrate Down diff --git a/internal/db/migrations/2026-01-12.0-trustline_balances.sql b/internal/db/migrations/2026-01-12.0-trustline_balances.sql index a55c20b25..358b8bbfa 100644 --- a/internal/db/migrations/2026-01-12.0-trustline_balances.sql +++ b/internal/db/migrations/2026-01-12.0-trustline_balances.sql @@ -3,6 +3,9 @@ -- +migrate Up +-- Storage parameters tuned for heavy UPSERT/DELETE during ledger ingestion. +-- UPSERTs only modify non-indexed columns (balance, trust_limit, liabilities, flags, +-- last_modified_ledger) while PK columns (account_address, asset_id) are never changed. CREATE TABLE trustline_balances ( account_address TEXT NOT NULL, asset_id UUID NOT NULL, @@ -16,6 +19,28 @@ CREATE TABLE trustline_balances ( CONSTRAINT fk_trustline_asset FOREIGN KEY (asset_id) REFERENCES trustline_assets(id) DEFERRABLE INITIALLY DEFERRED +) WITH ( + -- Reserve 20% free space per page so PostgreSQL can do HOT (Heap-Only Tuple) updates. + -- HOT updates rewrite the row in-place on the same page without creating dead tuples + -- or new index entries, since no indexed column is modified during UPSERTs. + fillfactor = 80, + -- Trigger vacuum when 2% of rows are dead (default 20%). For a 500K-row table, + -- this means vacuum starts at ~10K dead rows instead of waiting for 100K. + autovacuum_vacuum_scale_factor = 0.02, + -- Base dead-row count added to (scale_factor * total_rows). Default is fine here + -- since the scale factor already keeps the threshold low. + autovacuum_vacuum_threshold = 50, + -- Refresh planner statistics at 1% change (default 10%). Balances shift every ledger, + -- so stale stats can cause bad query plans (e.g. on the GetByAccount JOIN). + autovacuum_analyze_scale_factor = 0.01, + autovacuum_analyze_threshold = 50, + -- No sleep between vacuum page-processing cycles (default 2ms). Per-table setting, + -- so only workers on this table run full-speed; other tables are unaffected. + autovacuum_vacuum_cost_delay = 0, + -- 5x the default page-processing budget per cycle (default 200). Combined with + -- cost_delay=0, vacuum finishes quickly. Per-table cost settings exempt this worker + -- from global cost balancing, so other tables' vacuum workers keep their full budget. + autovacuum_vacuum_cost_limit = 1000 ); -- +migrate Down diff --git a/internal/db/migrations/2026-01-15.0-native_balances.sql b/internal/db/migrations/2026-01-15.0-native_balances.sql index 698cece48..cc393e040 100644 --- a/internal/db/migrations/2026-01-15.0-native_balances.sql +++ b/internal/db/migrations/2026-01-15.0-native_balances.sql @@ -2,6 +2,9 @@ -- Table: native_balances -- Stores native XLM balance data for accounts during ingestion. +-- Storage parameters tuned for heavy UPSERT/DELETE during ledger ingestion. +-- UPSERTs only modify non-indexed columns (balance, minimum_balance, liabilities, +-- last_modified_ledger) while the PK column (account_address) is never changed. CREATE TABLE native_balances ( account_address TEXT PRIMARY KEY, balance BIGINT NOT NULL DEFAULT 0, @@ -9,6 +12,28 @@ CREATE TABLE native_balances ( buying_liabilities BIGINT NOT NULL DEFAULT 0, selling_liabilities BIGINT NOT NULL DEFAULT 0, last_modified_ledger BIGINT NOT NULL DEFAULT 0 +) WITH ( + -- Reserve 20% free space per page so PostgreSQL can do HOT (Heap-Only Tuple) updates. + -- HOT updates rewrite the row in-place on the same page without creating dead tuples + -- or new index entries, since no indexed column is modified during UPSERTs. + fillfactor = 80, + -- Trigger vacuum when 2% of rows are dead (default 20%). For a 500K-row table, + -- this means vacuum starts at ~10K dead rows instead of waiting for 100K. + autovacuum_vacuum_scale_factor = 0.02, + -- Base dead-row count added to (scale_factor * total_rows). Default is fine here + -- since the scale factor already keeps the threshold low. + autovacuum_vacuum_threshold = 50, + -- Refresh planner statistics at 1% change (default 10%). Balances shift every ledger, + -- so stale stats can cause bad query plans. + autovacuum_analyze_scale_factor = 0.01, + autovacuum_analyze_threshold = 50, + -- No sleep between vacuum page-processing cycles (default 2ms). Per-table setting, + -- so only workers on this table run full-speed; other tables are unaffected. + autovacuum_vacuum_cost_delay = 0, + -- 5x the default page-processing budget per cycle (default 200). Combined with + -- cost_delay=0, vacuum finishes quickly. Per-table cost settings exempt this worker + -- from global cost balancing, so other tables' vacuum workers keep their full budget. + autovacuum_vacuum_cost_limit = 1000 ); -- +migrate Down diff --git a/internal/db/migrations/2026-01-16.0-sac-balances.sql b/internal/db/migrations/2026-01-16.0-sac-balances.sql index 40c03ca8a..b3bc5f99a 100644 --- a/internal/db/migrations/2026-01-16.0-sac-balances.sql +++ b/internal/db/migrations/2026-01-16.0-sac-balances.sql @@ -3,6 +3,9 @@ -- Table: sac_balances -- Stores SAC (Stellar Asset Contract) balance data for contract addresses (C...) during ingestion. -- Classic Stellar accounts (G...) have SAC balances in their trustlines, so only contract holders are stored here. +-- Storage parameters tuned for heavy UPSERT/DELETE during ledger ingestion. +-- UPSERTs only modify non-indexed columns (balance, is_authorized, is_clawback_enabled, +-- last_modified_ledger) while PK columns (account_address, contract_id) are never changed. CREATE TABLE sac_balances ( account_address TEXT NOT NULL, contract_id UUID NOT NULL, @@ -14,6 +17,28 @@ CREATE TABLE sac_balances ( CONSTRAINT fk_contract_token FOREIGN KEY (contract_id) REFERENCES contract_tokens(id) DEFERRABLE INITIALLY DEFERRED +) WITH ( + -- Reserve 20% free space per page so PostgreSQL can do HOT (Heap-Only Tuple) updates. + -- HOT updates rewrite the row in-place on the same page without creating dead tuples + -- or new index entries, since no indexed column is modified during UPSERTs. + fillfactor = 80, + -- Trigger vacuum when 2% of rows are dead (default 20%). For a 500K-row table, + -- this means vacuum starts at ~10K dead rows instead of waiting for 100K. + autovacuum_vacuum_scale_factor = 0.02, + -- Base dead-row count added to (scale_factor * total_rows). Default is fine here + -- since the scale factor already keeps the threshold low. + autovacuum_vacuum_threshold = 50, + -- Refresh planner statistics at 1% change (default 10%). Balances shift every ledger, + -- so stale stats can cause bad query plans. + autovacuum_analyze_scale_factor = 0.01, + autovacuum_analyze_threshold = 50, + -- No sleep between vacuum page-processing cycles (default 2ms). Per-table setting, + -- so only workers on this table run full-speed; other tables are unaffected. + autovacuum_vacuum_cost_delay = 0, + -- 5x the default page-processing budget per cycle (default 200). Combined with + -- cost_delay=0, vacuum finishes quickly. Per-table cost settings exempt this worker + -- from global cost balancing, so other tables' vacuum workers keep their full budget. + autovacuum_vacuum_cost_limit = 1000 ); -- +migrate Down diff --git a/internal/indexer/processors/participants.go b/internal/indexer/processors/participants.go index c94626ff3..fc293d98a 100644 --- a/internal/indexer/processors/participants.go +++ b/internal/indexer/processors/participants.go @@ -134,6 +134,7 @@ func (p *ParticipantsProcessor) GetOperationsParticipants(transaction ingest.Led Operation: xdrOp, LedgerSequence: ledgerSequence, Network: p.networkPassphrase, + LedgerClosed: transaction.Ledger.ClosedAt(), } opID := op.ID() diff --git a/internal/indexer/processors/participants_test.go b/internal/indexer/processors/participants_test.go index 3d6e63beb..b535df0ac 100644 --- a/internal/indexer/processors/participants_test.go +++ b/internal/indexer/processors/participants_test.go @@ -615,6 +615,7 @@ func TestParticipantsProcessor_GetOperationsParticipants(t *testing.T) { Network: network.TestNetworkPassphrase, Transaction: ingestTx, LedgerSequence: 4873, + LedgerClosed: ingestTx.Ledger.ClosedAt(), } assert.Equal(t, tc.wantParticipantsFn(t, opWrapper), gotParticipants) }) diff --git a/internal/ingest/ingest.go b/internal/ingest/ingest.go index a6863b3cb..b205fb5ef 100644 --- a/internal/ingest/ingest.go +++ b/internal/ingest/ingest.go @@ -87,6 +87,12 @@ type Configs struct { // CatchupThreshold is the number of ledgers behind network tip that triggers fast catchup. // Defaults to 100. CatchupThreshold int + // ChunkInterval sets the TimescaleDB chunk time interval for hypertables. + // Only affects future chunks. Uses PostgreSQL INTERVAL syntax (e.g., "1 day", "7 days"). + ChunkInterval string + // RetentionPeriod configures automatic data retention. Chunks older than this are dropped. + // Empty string disables retention. Uses PostgreSQL INTERVAL syntax (e.g., "30 days", "6 months"). + RetentionPeriod string } func Ingest(cfg Configs) error { @@ -122,7 +128,7 @@ func setupDeps(cfg Configs) (services.IngestService, error) { log.Ctx(ctx).Warnf("Could not disable FK checks (may require superuser privileges): %v", fkErr) // Continue anyway - other optimizations (async commit, work_mem) still apply } else { - log.Ctx(ctx).Info("Backfill session configured: FK checks disabled, async commit enabled, work_mem=256MB") + log.Ctx(ctx).Info("Backfill session configured: FK checks disabled, async commit enabled") } default: dbConnectionPool, err = db.OpenDBConnectionPool(cfg.DatabaseURL) @@ -135,6 +141,10 @@ func setupDeps(cfg Configs) (services.IngestService, error) { return nil, fmt.Errorf("getting sqlx db: %w", err) } + if err := configureHypertableSettings(ctx, dbConnectionPool, cfg.ChunkInterval, cfg.RetentionPeriod, cfg.OldestLedgerCursorName); err != nil { + return nil, fmt.Errorf("configuring hypertable settings: %w", err) + } + metricsService := metrics.NewMetricsService(sqlxDB) models, err := data.NewModels(dbConnectionPool, metricsService) if err != nil { diff --git a/internal/ingest/timescaledb.go b/internal/ingest/timescaledb.go new file mode 100644 index 000000000..65c371b5c --- /dev/null +++ b/internal/ingest/timescaledb.go @@ -0,0 +1,96 @@ +// Package ingest - configureHypertableSettings applies TimescaleDB chunk interval +// and retention policy settings to hypertables at startup. +package ingest + +import ( + "context" + "fmt" + + "github.com/stellar/go-stellar-sdk/support/log" + + "github.com/stellar/wallet-backend/internal/db" +) + +// hypertables lists all TimescaleDB hypertables managed by the ingestion system. +var hypertables = []string{ + "transactions", + "transactions_accounts", + "operations", + "operations_accounts", + "state_changes", +} + +// configureHypertableSettings applies chunk interval and retention policy settings +// to all hypertables. Chunk interval only affects future chunks. Retention policy +// is idempotent: any existing policy is removed before re-adding. When retention +// is enabled, a reconciliation job keeps oldest_ingest_ledger in sync with the +// actual minimum ledger remaining after chunk drops. +func configureHypertableSettings(ctx context.Context, pool db.ConnectionPool, chunkInterval, retentionPeriod, oldestCursorName string) error { + for _, table := range hypertables { + if _, err := pool.ExecContext(ctx, + "SELECT set_chunk_time_interval($1::regclass, $2::interval)", + table, chunkInterval, + ); err != nil { + return fmt.Errorf("setting chunk interval on %s: %w", table, err) + } + log.Ctx(ctx).Infof("Set chunk interval %q on %s", chunkInterval, table) + } + + if retentionPeriod != "" { + for _, table := range hypertables { + if _, err := pool.ExecContext(ctx, + "SELECT remove_retention_policy($1::regclass, if_exists => true)", + table, + ); err != nil { + return fmt.Errorf("removing retention policy on %s: %w", table, err) + } + + if _, err := pool.ExecContext(ctx, + "SELECT add_retention_policy($1::regclass, drop_after => $2::interval)", + table, retentionPeriod, + ); err != nil { + return fmt.Errorf("adding retention policy on %s: %w", table, err) + } + log.Ctx(ctx).Infof("Set retention policy %q on %s", retentionPeriod, table) + } + + // Reconciliation job: keeps oldestCursorName in sync after retention drops chunks. + // Remove any existing job first (idempotent re-registration on every restart). + if _, err := pool.ExecContext(ctx, + "SELECT delete_job(job_id) FROM timescaledb_information.jobs WHERE proc_name = 'reconcile_oldest_cursor'", + ); err != nil { + return fmt.Errorf("removing existing reconciliation job: %w", err) + } + + // Create or replace the PL/pgSQL function that advances the cursor. + if _, err := pool.ExecContext(ctx, ` + CREATE OR REPLACE FUNCTION reconcile_oldest_cursor(job_id INT, config JSONB) + RETURNS VOID LANGUAGE plpgsql AS $$ + DECLARE + actual_min INTEGER; + stored INTEGER; + BEGIN + SELECT ledger_number INTO actual_min FROM transactions + ORDER BY ledger_created_at ASC, to_id ASC LIMIT 1; + IF actual_min IS NULL THEN RETURN; END IF; + SELECT value::integer INTO stored FROM ingest_store WHERE key = config->>'cursor_name'; + IF stored IS NULL OR actual_min <= stored THEN RETURN; END IF; + UPDATE ingest_store SET value = actual_min::text WHERE key = config->>'cursor_name'; + RAISE LOG 'reconcile_oldest_cursor: advanced % from % to %', config->>'cursor_name', stored, actual_min; + END $$; + `); err != nil { + return fmt.Errorf("creating reconcile_oldest_cursor function: %w", err) + } + + // Schedule the reconciliation job with the same cadence as the chunk interval. + if _, err := pool.ExecContext(ctx, + "SELECT add_job('reconcile_oldest_cursor', $1::interval, config => $2::jsonb)", + chunkInterval, fmt.Sprintf(`{"cursor_name":"%s"}`, oldestCursorName), + ); err != nil { + return fmt.Errorf("scheduling reconciliation job: %w", err) + } + log.Ctx(ctx).Infof("Scheduled reconcile_oldest_cursor job every %s for cursor %q", chunkInterval, oldestCursorName) + } + + return nil +} diff --git a/internal/ingest/timescaledb_test.go b/internal/ingest/timescaledb_test.go new file mode 100644 index 000000000..4dc165d52 --- /dev/null +++ b/internal/ingest/timescaledb_test.go @@ -0,0 +1,225 @@ +// Package ingest - tests for configureHypertableSettings verifying chunk interval +// and retention policy configuration against a real TimescaleDB instance. +package ingest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/stellar/wallet-backend/internal/db" + "github.com/stellar/wallet-backend/internal/db/dbtest" +) + +func TestConfigureHypertableSettings(t *testing.T) { + t.Run("chunk_interval", func(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + + ctx := context.Background() + + err = configureHypertableSettings(ctx, dbConnectionPool, "7 days", "", "oldest_ledger_cursor") + require.NoError(t, err) + + // Verify chunk interval was updated for all hypertables + for _, table := range hypertables { + var intervalSecs float64 + err := dbConnectionPool.GetContext(ctx, &intervalSecs, + `SELECT EXTRACT(EPOCH FROM d.time_interval) + FROM timescaledb_information.dimensions d + WHERE d.hypertable_name = $1 AND d.column_name = 'ledger_created_at'`, + table, + ) + require.NoError(t, err, "querying dimensions for %s", table) + // 7 days in seconds = 7 * 24 * 60 * 60 + assert.Equal(t, float64(7*24*60*60), intervalSecs, "chunk interval for %s", table) + } + }) + + t.Run("retention_policy", func(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + + ctx := context.Background() + + err = configureHypertableSettings(ctx, dbConnectionPool, "1 day", "30 days", "oldest_ledger_cursor") + require.NoError(t, err) + + // Verify retention policy was created for all hypertables + for _, table := range hypertables { + var count int + err := dbConnectionPool.GetContext(ctx, &count, + `SELECT COUNT(*) + FROM timescaledb_information.jobs j + WHERE j.proc_name = 'policy_retention' + AND j.hypertable_name = $1`, + table, + ) + require.NoError(t, err, "querying retention policy for %s", table) + assert.Equal(t, 1, count, "expected exactly 1 retention policy for %s", table) + } + }) + + t.Run("no_retention_when_empty", func(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + + ctx := context.Background() + + err = configureHypertableSettings(ctx, dbConnectionPool, "1 day", "", "oldest_ledger_cursor") + require.NoError(t, err) + + // Verify no retention policies were created + var count int + err = dbConnectionPool.GetContext(ctx, &count, + `SELECT COUNT(*) + FROM timescaledb_information.jobs + WHERE proc_name = 'policy_retention'`, + ) + require.NoError(t, err) + assert.Equal(t, 0, count, "expected no retention policies when retention period is empty") + }) + + t.Run("retention_policy_idempotent", func(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + + ctx := context.Background() + + // Apply retention policy twice with different values to simulate restarts + err = configureHypertableSettings(ctx, dbConnectionPool, "1 day", "30 days", "oldest_ledger_cursor") + require.NoError(t, err) + + err = configureHypertableSettings(ctx, dbConnectionPool, "1 day", "90 days", "oldest_ledger_cursor") + require.NoError(t, err) + + // Verify exactly 1 retention policy per table (not duplicated) + for _, table := range hypertables { + var count int + err := dbConnectionPool.GetContext(ctx, &count, + `SELECT COUNT(*) + FROM timescaledb_information.jobs j + WHERE j.proc_name = 'policy_retention' + AND j.hypertable_name = $1`, + table, + ) + require.NoError(t, err, "querying retention policy for %s", table) + assert.Equal(t, 1, count, "expected exactly 1 retention policy for %s after re-application", table) + } + }) + + t.Run("reconciliation_job_created", func(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + + ctx := context.Background() + + err = configureHypertableSettings(ctx, dbConnectionPool, "1 day", "30 days", "oldest_ledger_cursor") + require.NoError(t, err) + + // Verify reconciliation job was created + var count int + err = dbConnectionPool.GetContext(ctx, &count, + `SELECT COUNT(*) + FROM timescaledb_information.jobs + WHERE proc_name = 'reconcile_oldest_cursor'`, + ) + require.NoError(t, err) + assert.Equal(t, 1, count, "expected exactly 1 reconciliation job") + }) + + t.Run("reconciliation_job_idempotent", func(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + + ctx := context.Background() + + // Apply twice to simulate restarts + err = configureHypertableSettings(ctx, dbConnectionPool, "1 day", "30 days", "oldest_ledger_cursor") + require.NoError(t, err) + + err = configureHypertableSettings(ctx, dbConnectionPool, "7 days", "90 days", "oldest_ledger_cursor") + require.NoError(t, err) + + // Verify exactly 1 reconciliation job (not duplicated) + var count int + err = dbConnectionPool.GetContext(ctx, &count, + `SELECT COUNT(*) + FROM timescaledb_information.jobs + WHERE proc_name = 'reconcile_oldest_cursor'`, + ) + require.NoError(t, err) + assert.Equal(t, 1, count, "expected exactly 1 reconciliation job after re-application") + }) + + t.Run("no_reconciliation_job_without_retention", func(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + + ctx := context.Background() + + err = configureHypertableSettings(ctx, dbConnectionPool, "1 day", "", "oldest_ledger_cursor") + require.NoError(t, err) + + // Verify no reconciliation job was created + var count int + err = dbConnectionPool.GetContext(ctx, &count, + `SELECT COUNT(*) + FROM timescaledb_information.jobs + WHERE proc_name = 'reconcile_oldest_cursor'`, + ) + require.NoError(t, err) + assert.Equal(t, 0, count, "expected no reconciliation job when retention is disabled") + }) + + t.Run("invalid_chunk_interval", func(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + + ctx := context.Background() + + err = configureHypertableSettings(ctx, dbConnectionPool, "not-an-interval", "", "oldest_ledger_cursor") + assert.Error(t, err) + assert.Contains(t, err.Error(), "setting chunk interval") + }) + + t.Run("invalid_retention_period", func(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + + ctx := context.Background() + + err = configureHypertableSettings(ctx, dbConnectionPool, "1 day", "not-an-interval", "oldest_ledger_cursor") + assert.Error(t, err) + assert.Contains(t, err.Error(), "adding retention policy") + }) +} diff --git a/internal/integrationtests/infrastructure/containers.go b/internal/integrationtests/infrastructure/containers.go index 6c295cf1c..9316d87e8 100644 --- a/internal/integrationtests/infrastructure/containers.go +++ b/internal/integrationtests/infrastructure/containers.go @@ -317,11 +317,12 @@ func createRPCContainer(ctx context.Context, testNetwork *testcontainers.DockerN }, nil } -// createWalletDBContainer starts a PostgreSQL container for wallet-backend +// createWalletDBContainer starts a TimescaleDB container for wallet-backend func createWalletDBContainer(ctx context.Context, testNetwork *testcontainers.DockerNetwork) (*TestContainer, error) { containerRequest := testcontainers.ContainerRequest{ Name: walletBackendDBContainerName, - Image: "postgres:14-alpine", + Image: "timescale/timescaledb:latest-pg17", + Cmd: []string{"postgres", "-c", "timescaledb.enable_chunk_skipping=on"}, Labels: map[string]string{ "org.testcontainers.session-id": "wallet-backend-integration-tests", }, diff --git a/internal/integrationtests/infrastructure/helpers.go b/internal/integrationtests/infrastructure/helpers.go index fde006c11..283938309 100644 --- a/internal/integrationtests/infrastructure/helpers.go +++ b/internal/integrationtests/infrastructure/helpers.go @@ -43,27 +43,27 @@ func WaitForRPCHealthAndRun(ctx context.Context, rpcService services.RPCService, defer cancel() log.Ctx(ctx).Info("⏳ Waiting for RPC service to become healthy...") - rpcHeartbeatChannel := rpcService.GetHeartbeatChannel() signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) defer signal.Stop(signalChan) - select { - case <-ctx.Done(): - return fmt.Errorf("context canceled while waiting for RPC service to become healthy: %w", ctx.Err()) + for { + select { + case <-ctx.Done(): + return fmt.Errorf("context canceled while waiting for RPC service to become healthy: %w", ctx.Err()) - case sig := <-signalChan: - return fmt.Errorf("received signal %s while waiting for RPC service to become healthy", sig) + case sig := <-signalChan: + return fmt.Errorf("received signal %s while waiting for RPC service to become healthy", sig) - case <-rpcHeartbeatChannel: - log.Ctx(ctx).Info("👍 RPC service is healthy") - if onReady != nil { - if err := onReady(); err != nil { - return fmt.Errorf("executing onReady after RPC became healthy: %w", err) + default: + healthRes, err := rpcService.GetHealth() + if err == nil { + if healthRes.Status == "healthy" { + return nil + } } } - return nil } } diff --git a/internal/integrationtests/infrastructure/main_setup.go b/internal/integrationtests/infrastructure/main_setup.go index ab6dccff7..6d3f89d53 100644 --- a/internal/integrationtests/infrastructure/main_setup.go +++ b/internal/integrationtests/infrastructure/main_setup.go @@ -577,12 +577,6 @@ func createRPCService(ctx context.Context, containers *SharedContainers) (servic // This prevents the immediate health check from failing due to transient unavailability. time.Sleep(3 * time.Second) - // Start tracking RPC health - go func() { - //nolint:errcheck // Error is expected on context cancellation during shutdown - rpcService.TrackRPCServiceHealth(ctx, nil) - }() - return rpcService, nil } diff --git a/internal/serve/graphql/resolvers/test_utils.go b/internal/serve/graphql/resolvers/test_utils.go index 099ee9abc..bed41d477 100644 --- a/internal/serve/graphql/resolvers/test_utils.go +++ b/internal/serve/graphql/resolvers/test_utils.go @@ -153,8 +153,8 @@ func setupDB(ctx context.Context, t *testing.T, dbConnectionPool db.ConnectionPo require.NoError(t, err) _, err = tx.ExecContext(ctx, - `INSERT INTO transactions_accounts (tx_to_id, account_id) VALUES ($1, $2)`, - txn.ToID, parentAccount.StellarAddress) + `INSERT INTO transactions_accounts (ledger_created_at, tx_to_id, account_id) VALUES ($1, $2, $3)`, + txn.LedgerCreatedAt, txn.ToID, parentAccount.StellarAddress) require.NoError(t, err) } @@ -165,8 +165,8 @@ func setupDB(ctx context.Context, t *testing.T, dbConnectionPool db.ConnectionPo require.NoError(t, err) _, err = tx.ExecContext(ctx, - `INSERT INTO operations_accounts (operation_id, account_id) VALUES ($1, $2)`, - op.ID, parentAccount.StellarAddress) + `INSERT INTO operations_accounts (ledger_created_at, operation_id, account_id) VALUES ($1, $2, $3)`, + op.LedgerCreatedAt, op.ID, parentAccount.StellarAddress) require.NoError(t, err) } diff --git a/internal/services/ingest_backfill.go b/internal/services/ingest_backfill.go index a996d717d..4ec697f9b 100644 --- a/internal/services/ingest_backfill.go +++ b/internal/services/ingest_backfill.go @@ -48,6 +48,8 @@ type BackfillResult struct { Duration time.Duration Error error BatchChanges *BatchChanges // Only populated for catchup mode + StartTime time.Time // First ledger close time in batch (for compression) + EndTime time.Time // Last ledger close time in batch (for compression) } // BatchChanges holds data collected from a backfill batch for catchup mode. @@ -176,6 +178,24 @@ func (m *ingestService) startBackfilling(ctx context.Context, startLedger, endLe numFailedBatches := analyzeBatchResults(ctx, results) + // Compress backfilled chunks for historical mode (no batches failed) + if mode.isHistorical() && numFailedBatches == 0 { + var minTime, maxTime time.Time + for _, result := range results { + if result.Error == nil { + if minTime.IsZero() || result.StartTime.Before(minTime) { + minTime = result.StartTime + } + if result.EndTime.After(maxTime) { + maxTime = result.EndTime + } + } + } + if !minTime.IsZero() { + m.recompressBackfilledChunks(ctx, minTime, maxTime) + } + } + // Update latest ledger cursor and process catchup data for catchup mode if mode.isCatchup() { if numFailedBatches > 0 { @@ -311,25 +331,27 @@ func (m *ingestService) splitGapsIntoBatches(gaps []data.LedgerRange) []Backfill } // processBackfillBatchesParallel processes backfill batches in parallel using a worker pool. +// For historical mode, direct compress handles compression during COPY; a single recompression +// pass runs after all batches complete (in startBackfilling). func (m *ingestService) processBackfillBatchesParallel(ctx context.Context, mode BackfillMode, batches []BackfillBatch) []BackfillResult { results := make([]BackfillResult, len(batches)) group := m.backfillPool.NewGroupContext(ctx) for i, batch := range batches { group.Submit(func() { - result := m.processSingleBatch(ctx, mode, batch) - results[i] = result + results[i] = m.processSingleBatch(ctx, mode, batch, i, len(batches)) }) } if err := group.Wait(); err != nil { log.Ctx(ctx).Warnf("Backfill batch group wait returned error: %v", err) } + return results } // processSingleBatch processes a single backfill batch with its own ledger backend. -func (m *ingestService) processSingleBatch(ctx context.Context, mode BackfillMode, batch BackfillBatch) BackfillResult { +func (m *ingestService) processSingleBatch(ctx context.Context, mode BackfillMode, batch BackfillBatch, batchIndex, totalBatches int) BackfillResult { start := time.Now() result := BackfillResult{Batch: batch} @@ -347,9 +369,11 @@ func (m *ingestService) processSingleBatch(ctx context.Context, mode BackfillMod }() // Process all ledgers in batch (cursor is updated atomically with final flush for historical mode) - ledgersCount, batchChanges, err := m.processLedgersInBatch(ctx, backend, batch, mode) + ledgersCount, batchChanges, batchStartTime, batchEndTime, err := m.processLedgersInBatch(ctx, backend, batch, mode) result.LedgersCount = ledgersCount result.BatchChanges = batchChanges + result.StartTime = batchStartTime + result.EndTime = batchEndTime if err != nil { result.Error = err result.Duration = time.Since(start) @@ -362,8 +386,8 @@ func (m *ingestService) processSingleBatch(ctx context.Context, mode BackfillMod } result.Duration = time.Since(start) - log.Ctx(ctx).Infof("Batch [%d - %d] completed: %d ledgers in %v", - batch.StartLedger, batch.EndLedger, result.LedgersCount, result.Duration) + log.Ctx(ctx).Infof("Batch %d/%d [%d - %d] completed: %d ledgers in %v", + batchIndex+1, totalBatches, batch.StartLedger, batch.EndLedger, result.LedgersCount, result.Duration) return result } @@ -386,7 +410,8 @@ func (m *ingestService) setupBatchBackend(ctx context.Context, batch BackfillBat // flushBatchBufferWithRetry persists buffered data to the database within a transaction. // If updateCursorTo is non-nil, it also updates the oldest cursor atomically. -func (m *ingestService) flushBatchBufferWithRetry(ctx context.Context, buffer *indexer.IndexerBuffer, updateCursorTo *uint32, batchChanges *BatchChanges) error { +// If directCompress is true, enables TimescaleDB direct compress for COPY operations. +func (m *ingestService) flushBatchBufferWithRetry(ctx context.Context, buffer *indexer.IndexerBuffer, updateCursorTo *uint32, batchChanges *BatchChanges, directCompress bool) error { var lastErr error for attempt := 0; attempt < maxIngestProcessedDataRetries; attempt++ { select { @@ -396,6 +421,11 @@ func (m *ingestService) flushBatchBufferWithRetry(ctx context.Context, buffer *i } err := db.RunInPgxTransaction(ctx, m.models.DB, func(dbTx pgx.Tx) error { + if directCompress { + if _, err := dbTx.Exec(ctx, "SET LOCAL timescaledb.enable_direct_compress_copy = on"); err != nil { + return fmt.Errorf("enabling direct compress: %w", err) + } + } filteredData, err := m.filterParticipantData(ctx, dbTx, buffer) if err != nil { return fmt.Errorf("filtering participant data: %w", err) @@ -458,16 +488,17 @@ func (m *ingestService) flushBatchBufferWithRetry(ctx context.Context, buffer *i // processLedgersInBatch processes all ledgers in a batch, flushing to DB periodically. // For historical backfill mode, the cursor is updated atomically with the final data flush. // For catchup mode, returns collected batch changes for post-catchup processing. -// Returns the count of ledgers processed and batch changes (nil for historical mode). +// Returns the count of ledgers processed, batch changes (nil for historical mode), and time range. func (m *ingestService) processLedgersInBatch( ctx context.Context, backend ledgerbackend.LedgerBackend, batch BackfillBatch, mode BackfillMode, -) (int, *BatchChanges, error) { +) (int, *BatchChanges, time.Time, time.Time, error) { batchBuffer := indexer.NewIndexerBuffer() ledgersInBuffer := uint32(0) ledgersProcessed := 0 + var startTime, endTime time.Time // Initialize batch changes collector for catchup mode var batchChanges *BatchChanges @@ -485,19 +516,26 @@ func (m *ingestService) processLedgersInBatch( for ledgerSeq := batch.StartLedger; ledgerSeq <= batch.EndLedger; ledgerSeq++ { ledgerMeta, err := m.getLedgerWithRetry(ctx, backend, ledgerSeq) if err != nil { - return ledgersProcessed, nil, fmt.Errorf("getting ledger %d: %w", ledgerSeq, err) + return ledgersProcessed, nil, startTime, endTime, fmt.Errorf("getting ledger %d: %w", ledgerSeq, err) + } + + // Track time range for compression + ledgerTime := ledgerMeta.ClosedAt() + if startTime.IsZero() { + startTime = ledgerTime } + endTime = ledgerTime if err := m.processLedger(ctx, ledgerMeta, batchBuffer); err != nil { - return ledgersProcessed, nil, fmt.Errorf("processing ledger %d: %w", ledgerSeq, err) + return ledgersProcessed, nil, startTime, endTime, fmt.Errorf("processing ledger %d: %w", ledgerSeq, err) } ledgersProcessed++ ledgersInBuffer++ // Flush buffer periodically to control memory usage (intermediate flushes, no cursor update) if ledgersInBuffer >= m.backfillDBInsertBatchSize { - if err := m.flushBatchBufferWithRetry(ctx, batchBuffer, nil, batchChanges); err != nil { - return ledgersProcessed, batchChanges, err + if err := m.flushBatchBufferWithRetry(ctx, batchBuffer, nil, batchChanges, mode.isHistorical()); err != nil { + return ledgersProcessed, batchChanges, startTime, endTime, err } batchBuffer.Clear() ledgersInBuffer = 0 @@ -510,18 +548,18 @@ func (m *ingestService) processLedgersInBatch( if mode.isHistorical() { cursorUpdate = &batch.StartLedger } - if err := m.flushBatchBufferWithRetry(ctx, batchBuffer, cursorUpdate, batchChanges); err != nil { - return ledgersProcessed, batchChanges, err + if err := m.flushBatchBufferWithRetry(ctx, batchBuffer, cursorUpdate, batchChanges, mode.isHistorical()); err != nil { + return ledgersProcessed, batchChanges, startTime, endTime, err } } else if mode.isHistorical() { // All data was flushed in intermediate batches, but we still need to update the cursor // This happens when ledgersInBuffer == 0 (exact multiple of batch size) if err := m.updateOldestCursor(ctx, batch.StartLedger); err != nil { - return ledgersProcessed, nil, err + return ledgersProcessed, nil, startTime, endTime, err } } - return ledgersProcessed, batchChanges, nil + return ledgersProcessed, batchChanges, startTime, endTime, nil } // updateOldestCursor updates the oldest ledger cursor to the given ledger. @@ -584,3 +622,71 @@ func (m *ingestService) processBatchChanges( return nil } + +// recompressBackfilledChunks recompresses already-compressed chunks overlapping the backfill range. +// Direct compress produces compressed chunks during COPY; recompression optimizes compression ratios. +// Tables are compressed sequentially to avoid CPU spikes; chunks within each table also stay sequential to avoid OOM. +// Skips chunks where range_end >= NOW() to avoid compressing active live ingestion chunks. +func (m *ingestService) recompressBackfilledChunks(ctx context.Context, startTime, endTime time.Time) { + tables := []string{"transactions", "transactions_accounts", "operations", "operations_accounts", "state_changes"} + + tableCounts := make([]int, len(tables)) + + for i, table := range tables { + tableCounts[i] = m.compressTableChunks(ctx, table, startTime, endTime) + } + + totalCompressed := 0 + for _, count := range tableCounts { + totalCompressed += count + } + log.Ctx(ctx).Infof("Recompressed %d total chunks for time range [%s - %s]", + totalCompressed, startTime.Format(time.RFC3339), endTime.Format(time.RFC3339)) +} + +// compressTableChunks recompresses already-compressed chunks for a single hypertable. +// Direct compress produces compressed chunks during COPY; this pass optimizes compression ratios. +// Chunks are recompressed sequentially within the table to avoid OOM errors. +func (m *ingestService) compressTableChunks(ctx context.Context, table string, startTime, endTime time.Time) int { + rows, err := m.models.DB.PgxPool().Query(ctx, + `SELECT chunk_schema || '.' || chunk_name FROM timescaledb_information.chunks + WHERE hypertable_name = $1 AND is_compressed + AND range_start < $2::timestamptz AND range_end > $3::timestamptz + AND range_end < NOW()`, + table, endTime, startTime) + if err != nil { + log.Ctx(ctx).Warnf("Failed to get chunks for %s: %v", table, err) + return 0 + } + + var chunks []string + for rows.Next() { + var chunk string + if err := rows.Scan(&chunk); err != nil { + continue + } + chunks = append(chunks, chunk) + } + rows.Close() + + compressed := 0 + for i, chunk := range chunks { + select { + case <-ctx.Done(): + log.Ctx(ctx).Warnf("Recompression cancelled for %s after %d chunks", table, compressed) + return compressed + default: + } + + _, err := m.models.DB.PgxPool().Exec(ctx, + `CALL _timescaledb_functions.rebuild_columnstore($1::regclass)`, chunk) + if err != nil { + log.Ctx(ctx).Warnf("Failed to recompress chunk %s: %v", chunk, err) + continue + } + compressed++ + log.Ctx(ctx).Debugf("Recompressed chunk %d/%d for %s: %s", i+1, len(chunks), table, chunk) + } + log.Ctx(ctx).Infof("Recompressed %d chunks for table %s", len(chunks), table) + return compressed +} diff --git a/internal/services/ingest_live.go b/internal/services/ingest_live.go index 4b3fe2077..d3c608d80 100644 --- a/internal/services/ingest_live.go +++ b/internal/services/ingest_live.go @@ -20,6 +20,7 @@ import ( const ( maxIngestProcessedDataRetries = 5 maxIngestProcessedDataRetryBackoff = 10 * time.Second + oldestLedgerSyncInterval = 100 ) // PersistLedgerData persists processed ledger data to the database in a single atomic transaction. @@ -124,7 +125,17 @@ func (m *ingestService) startLiveIngestion(ctx context.Context) error { if err != nil { return fmt.Errorf("populating account tokens and initializing cursors: %w", err) } + m.metricsService.SetLatestLedgerIngested(float64(startLedger)) + m.metricsService.SetOldestLedgerIngested(float64(startLedger)) } else { + // Initialize metrics from DB state so Prometheus reflects backfill progress after restart + oldestIngestedLedger, oldestErr := m.models.IngestStore.Get(ctx, m.oldestLedgerCursorName) + if oldestErr != nil { + return fmt.Errorf("getting oldest ledger cursor: %w", oldestErr) + } + m.metricsService.SetOldestLedgerIngested(float64(oldestIngestedLedger)) + m.metricsService.SetLatestLedgerIngested(float64(latestIngestedLedger)) + // If we already have data in the DB, we will do an optimized catchup by parallely backfilling the ledgers. health, err := m.rpcService.GetHealth() if err != nil { @@ -194,6 +205,12 @@ func (m *ingestService) ingestLiveLedgers(ctx context.Context, startLedger uint3 m.metricsService.IncIngestionOperationsProcessed(numOperationProcessed) m.metricsService.IncIngestionLedgersProcessed(1) m.metricsService.SetLatestLedgerIngested(float64(currentLedger)) + // Periodically sync oldest ledger metric from DB (picks up changes from backfill jobs) + if currentLedger%oldestLedgerSyncInterval == 0 { + if oldest, syncErr := m.models.IngestStore.Get(ctx, m.oldestLedgerCursorName); syncErr == nil { + m.metricsService.SetOldestLedgerIngested(float64(oldest)) + } + } log.Ctx(ctx).Infof("Ingested ledger %d in %.4fs", currentLedger, totalIngestionDuration) currentLedger++ diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index c558abb59..b1571ae47 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -1317,7 +1317,7 @@ func Test_ingestService_flushBatchBufferWithRetry(t *testing.T) { buffer := tc.setupBuffer() // Call flushBatchBuffer - err = svc.flushBatchBufferWithRetry(ctx, buffer, tc.updateCursorTo, nil) + err = svc.flushBatchBufferWithRetry(ctx, buffer, tc.updateCursorTo, nil, false) require.NoError(t, err) // Verify the cursor value @@ -2820,7 +2820,7 @@ func Test_ingestService_flushBatchBuffer_batchChanges(t *testing.T) { buffer := tc.setupBuffer() - err = svc.flushBatchBufferWithRetry(ctx, buffer, nil, tc.batchChanges) + err = svc.flushBatchBufferWithRetry(ctx, buffer, nil, tc.batchChanges, false) require.NoError(t, err) // Verify collected token changes match expected values @@ -2925,7 +2925,7 @@ func Test_ingestService_processLedgersInBatch_catchupMode(t *testing.T) { require.NoError(t, err) batch := BackfillBatch{StartLedger: 4599, EndLedger: 4599} - ledgersProcessed, batchChanges, err := svc.processLedgersInBatch(ctx, mockLedgerBackend, batch, tc.mode) + ledgersProcessed, batchChanges, _, _, err := svc.processLedgersInBatch(ctx, mockLedgerBackend, batch, tc.mode) require.NoError(t, err) assert.Equal(t, 1, ledgersProcessed) @@ -3094,3 +3094,101 @@ func Test_ingestService_startBackfilling_CatchupMode_ProcessesBatchChanges(t *te }) } } + +// Test_ingestService_processBackfillBatchesParallel_BothModes verifies +// that both historical and catchup modes process batches successfully. +func Test_ingestService_processBackfillBatchesParallel_BothModes(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + + ctx := context.Background() + + testCases := []struct { + name string + mode BackfillMode + }{ + { + name: "historical_mode_processes_batches", + mode: BackfillModeHistorical, + }, + { + name: "catchup_mode_processes_batches", + mode: BackfillModeCatchup, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockMetricsService := metrics.NewMockMetricsService() + mockMetricsService.On("RegisterPoolMetrics", "ledger_indexer", mock.Anything).Return() + mockMetricsService.On("RegisterPoolMetrics", "backfill", mock.Anything).Return() + mockMetricsService.On("SetOldestLedgerIngested", mock.Anything).Return().Maybe() + mockMetricsService.On("ObserveDBQueryDuration", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + mockMetricsService.On("IncDBQuery", mock.Anything, mock.Anything).Return().Maybe() + mockMetricsService.On("IncDBTransaction", mock.Anything).Return().Maybe() + mockMetricsService.On("ObserveDBTransactionDuration", mock.Anything, mock.Anything).Return().Maybe() + mockMetricsService.On("ObserveDBBatchSize", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + mockMetricsService.On("ObserveIngestionParticipantsCount", mock.Anything).Return().Maybe() + mockMetricsService.On("IncStateChanges", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + defer mockMetricsService.AssertExpectations(t) + + models, modelsErr := data.NewModels(dbConnectionPool, mockMetricsService) + require.NoError(t, modelsErr) + + mockRPCService := &RPCServiceMock{} + mockRPCService.On("NetworkPassphrase").Return(network.TestNetworkPassphrase).Maybe() + + // Factory that returns a backend with minimal valid ledger data + factory := func(ctx context.Context) (ledgerbackend.LedgerBackend, error) { + mockBackend := &LedgerBackendMock{} + mockBackend.On("PrepareRange", mock.Anything, mock.Anything).Return(nil) + mockBackend.On("GetLedger", mock.Anything, mock.Anything).Return(xdr.LedgerCloseMeta{ + V: 0, + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(100), + }, + }, + }, + }, nil) + mockBackend.On("Close").Return(nil) + return mockBackend, nil + } + + svc, svcErr := NewIngestService(IngestServiceConfig{ + IngestionMode: IngestionModeBackfill, + Models: models, + LatestLedgerCursorName: "latest_ledger_cursor", + OldestLedgerCursorName: "oldest_ledger_cursor", + AppTracker: &apptracker.MockAppTracker{}, + RPCService: mockRPCService, + LedgerBackend: &LedgerBackendMock{}, + LedgerBackendFactory: factory, + MetricsService: mockMetricsService, + GetLedgersLimit: defaultGetLedgersLimit, + Network: network.TestNetworkPassphrase, + NetworkPassphrase: network.TestNetworkPassphrase, + Archive: &HistoryArchiveMock{}, + BackfillBatchSize: 10, + }) + require.NoError(t, svcErr) + + batches := []BackfillBatch{ + {StartLedger: 100, EndLedger: 100}, + {StartLedger: 101, EndLedger: 101}, + } + + results := svc.processBackfillBatchesParallel(ctx, tc.mode, batches) + + // All batches should succeed + require.Len(t, results, 2) + for i, result := range results { + assert.NoError(t, result.Error, "batch %d should succeed", i) + } + }) + } +} diff --git a/internal/services/rpc_service.go b/internal/services/rpc_service.go index b7792ec1a..cecd00209 100644 --- a/internal/services/rpc_service.go +++ b/internal/services/rpc_service.go @@ -10,7 +10,6 @@ import ( "net/http" "time" - "github.com/stellar/go-stellar-sdk/support/log" "github.com/stellar/go-stellar-sdk/xdr" "github.com/stellar/stellar-rpc/protocol" @@ -34,14 +33,6 @@ type RPCService interface { GetLedgerEntries(keys []string) (entities.RPCGetLedgerEntriesResult, error) GetAccountLedgerSequence(address string) (int64, error) GetHeartbeatChannel() chan entities.RPCGetHealthResult - // TrackRPCServiceHealth continuously monitors the health of the RPC service and updates metrics. - // It runs health checks at regular intervals and can be triggered on-demand via immediateHealthCheckTrigger. - // - // The immediateHealthCheckTrigger channel allows external components to request an immediate health check, - // which is particularly useful when the ingestor needs to catch up with the RPC service. - // - // Returns an error if the context is cancelled. The caller is responsible for handling shutdown signals. - TrackRPCServiceHealth(ctx context.Context, immediateHealthCheckTrigger <-chan any) error SimulateTransaction(transactionXDR string, resourceConfig entities.RPCResourceConfig) (entities.RPCSimulateTransactionResult, error) NetworkPassphrase() string } @@ -327,72 +318,6 @@ func (r *rpcService) HealthCheckTickInterval() time.Duration { return r.healthCheckTickInterval } -// TrackRPCServiceHealth continuously monitors the health of the RPC service and updates metrics. -// It runs health checks at regular intervals and can be triggered on-demand via immediateHealthCheckTrigger. -// -// The immediateHealthCheckTrigger channel allows external components to request an immediate health check, -// which is particularly useful when the ingestor needs to catch up with the RPC service. -// -// Returns an error if the context is cancelled. The caller is responsible for handling shutdown signals. -func (r *rpcService) TrackRPCServiceHealth(ctx context.Context, immediateHealthCheckTrigger <-chan any) error { - // Handle nil channel by creating a never-firing channel - if immediateHealthCheckTrigger == nil { - immediateHealthCheckTrigger = make(chan any) - } - - healthCheckTicker := time.NewTicker(r.HealthCheckTickInterval()) - unhealthyWarningTicker := time.NewTicker(r.HealthCheckWarningInterval()) - defer func() { - healthCheckTicker.Stop() - unhealthyWarningTicker.Stop() - close(r.heartbeatChannel) - }() - - // performHealthCheck is a function that performs a health check and updates the metrics. - performHealthCheck := func() { - health, err := r.GetHealth() - if err != nil { - log.Ctx(ctx).Warnf("RPC health check failed: %v", err) - r.metricsService.SetRPCServiceHealth(false) - return - } - - unhealthyWarningTicker.Reset(r.HealthCheckWarningInterval()) - select { - case r.heartbeatChannel <- health: - // sent successfully - default: - // channel is full, clear it and send latest - <-r.heartbeatChannel - r.heartbeatChannel <- health - } - r.metricsService.SetRPCServiceHealth(true) - r.metricsService.SetRPCLatestLedger(int64(health.LatestLedger)) - } - - // Perform immediate health check at startup to avoid 5-second delay - performHealthCheck() - - for { - select { - case <-ctx.Done(): - log.Ctx(ctx).Infof("RPC health tracking stopped due to context cancellation: %v", ctx.Err()) - return fmt.Errorf("context cancelled: %w", ctx.Err()) - - case <-unhealthyWarningTicker.C: - log.Ctx(ctx).Warnf("RPC service unhealthy for over %s", r.HealthCheckWarningInterval()) - r.metricsService.SetRPCServiceHealth(false) - - case <-healthCheckTicker.C: - performHealthCheck() - - case <-immediateHealthCheckTrigger: - healthCheckTicker.Reset(r.HealthCheckTickInterval()) - performHealthCheck() - } - } -} - func (r *rpcService) sendRPCRequest(method string, params entities.RPCParams) (json.RawMessage, error) { startTime := time.Now() r.metricsService.IncRPCRequests(method) diff --git a/internal/services/rpc_service_test.go b/internal/services/rpc_service_test.go index d29331f97..6d14fbbf0 100644 --- a/internal/services/rpc_service_test.go +++ b/internal/services/rpc_service_test.go @@ -2,7 +2,6 @@ package services import ( "bytes" - "context" "encoding/json" "errors" "fmt" @@ -13,7 +12,6 @@ import ( "time" "github.com/stellar/go-stellar-sdk/network" - "github.com/stellar/go-stellar-sdk/support/log" "github.com/stellar/go-stellar-sdk/xdr" "github.com/stellar/stellar-rpc/protocol" "github.com/stretchr/testify/assert" @@ -876,242 +874,3 @@ func Test_rpcService_GetLedgers(t *testing.T) { assert.Equal(t, "sending getLedgers request: sending POST request to RPC: connection failed", err.Error()) }) } - -func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { - dbt := dbtest.Open(t) - defer dbt.Close() - - dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) - require.NoError(t, err) - defer dbConnectionPool.Close() - - mockMetricsService := metrics.NewMockMetricsService() - mockMetricsService.On("IncRPCMethodCalls", "GetHealth").Once() - mockMetricsService.On("ObserveRPCMethodDuration", "GetHealth", mock.AnythingOfType("float64")).Once() - mockMetricsService.On("IncRPCRequests", "getHealth").Once() - mockMetricsService.On("IncRPCEndpointSuccess", "getHealth").Once() - mockMetricsService.On("ObserveRPCRequestDuration", "getHealth", mock.AnythingOfType("float64")).Once() - mockMetricsService.On("SetRPCServiceHealth", true).Once() - mockMetricsService.On("SetRPCLatestLedger", int64(100)).Once() - defer mockMetricsService.AssertExpectations(t) - - mockHTTPClient := &utils.MockHTTPClient{} - rpcURL := "http://test-url-track-rpc-service-health" - rpcService, err := NewRPCService(rpcURL, network.TestNetworkPassphrase, mockHTTPClient, mockMetricsService) - require.NoError(t, err) - - healthResult := entities.RPCGetHealthResult{ - Status: "healthy", - LatestLedger: 100, - OldestLedger: 1, - LedgerRetentionWindow: 0, - } - - // Mock the HTTP response for GetHealth - ctx, cancel := context.WithTimeout(context.Background(), rpcService.HealthCheckTickInterval()*2) - defer cancel() - mockResponse := &http.Response{ - StatusCode: http.StatusOK, - Body: io.NopCloser(bytes.NewBuffer([]byte(`{ - "jsonrpc": "2.0", - "id": 1, - "result": { - "status": "healthy", - "latestLedger": 100, - "oldestLedger": 1, - "ledgerRetentionWindow": 0 - } - }`))), - } - mockHTTPClient.On("Post", rpcURL, "application/json", mock.Anything).Return(mockResponse, nil).Run(func(args mock.Arguments) { - cancel() - }) - err = rpcService.TrackRPCServiceHealth(ctx, nil) - require.Error(t, err) - - // Get result from heartbeat channel - select { - case result := <-rpcService.GetHeartbeatChannel(): - assert.Equal(t, healthResult, result) - case <-time.After(10 * time.Second): - t.Fatal("timeout waiting for heartbeat") - } - - mockHTTPClient.AssertExpectations(t) -} - -func TestTrackRPCServiceHealth_UnhealthyService(t *testing.T) { - healthCheckTickInterval := 300 * time.Millisecond - healthCheckWarningInterval := 400 * time.Millisecond - contextTimeout := healthCheckWarningInterval + time.Millisecond*190 - ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) - defer cancel() - - dbt := dbtest.Open(t) - defer dbt.Close() - - dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) - require.NoError(t, err) - defer dbConnectionPool.Close() - - mockMetricsService := metrics.NewMockMetricsService() - mockMetricsService.On("IncRPCMethodCalls", "GetHealth").Once() - mockMetricsService.On("IncRPCMethodCalls", "GetHealth").Maybe() - mockMetricsService.On("ObserveRPCMethodDuration", "GetHealth", mock.AnythingOfType("float64")).Once() - mockMetricsService.On("ObserveRPCMethodDuration", "GetHealth", mock.AnythingOfType("float64")).Maybe() - mockMetricsService.On("IncRPCRequests", "getHealth").Once() - mockMetricsService.On("IncRPCRequests", "getHealth").Maybe() - mockMetricsService.On("IncRPCEndpointFailure", "getHealth").Once() - mockMetricsService.On("IncRPCEndpointFailure", "getHealth").Maybe() - mockMetricsService.On("ObserveRPCRequestDuration", "getHealth", mock.AnythingOfType("float64")).Once() - mockMetricsService.On("ObserveRPCRequestDuration", "getHealth", mock.AnythingOfType("float64")).Maybe() - mockMetricsService.On("IncRPCMethodErrors", "GetHealth", "rpc_error").Once() - mockMetricsService.On("IncRPCMethodErrors", "GetHealth", "rpc_error").Maybe() - mockMetricsService.On("SetRPCServiceHealth", false).Once() - mockMetricsService.On("SetRPCServiceHealth", false).Maybe() - defer mockMetricsService.AssertExpectations(t) - getLogs := log.DefaultLogger.StartTest(log.WarnLevel) - - mockHTTPClient := &utils.MockHTTPClient{} - defer mockHTTPClient.AssertExpectations(t) - rpcURL := "http://test-url-track-rpc-service-health" - rpcService, err := NewRPCService(rpcURL, network.TestNetworkPassphrase, mockHTTPClient, mockMetricsService) - require.NoError(t, err) - rpcService.healthCheckTickInterval = healthCheckTickInterval - rpcService.healthCheckWarningInterval = healthCheckWarningInterval - - // Mock error response for GetHealth with a valid http.Response - getHealthRequestBody, err := json.Marshal(map[string]any{"jsonrpc": "2.0", "id": 1, "method": "getHealth"}) - require.NoError(t, err) - getHealthResponseBody := `{ - "jsonrpc": "2.0", - "id": 1, - "error": { - "code": -32601, - "message": "rpc error" - } - }` - mockResponse := &http.Response{ - StatusCode: http.StatusOK, - Body: io.NopCloser(strings.NewReader(getHealthResponseBody)), - } - mockHTTPClient.On("Post", rpcURL, "application/json", bytes.NewBuffer(getHealthRequestBody)). - Return(mockResponse, nil) - - // The ctx will timeout after {contextTimeout}, which is enough for the warning to trigger - err = rpcService.TrackRPCServiceHealth(ctx, nil) - require.Error(t, err) - - entries := getLogs() - testSucceeded := false - logMessages := []string{} - for _, entry := range entries { - logMessages = append(logMessages, entry.Message) - if strings.Contains(entry.Message, "RPC service unhealthy for over "+healthCheckWarningInterval.String()) { - testSucceeded = true - break - } - } - assert.Truef(t, testSucceeded, "couldn't find log entry containing %q in %v", "rpc service unhealthy for over "+healthCheckWarningInterval.String(), logMessages) -} - -func TestTrackRPCService_ContextCancelled(t *testing.T) { - // Create and immediately cancel context to test cancellation handling - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - dbt := dbtest.Open(t) - defer dbt.Close() - - dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) - require.NoError(t, err) - defer dbConnectionPool.Close() - mockMetricsService := metrics.NewMockMetricsService() - mockHTTPClient := &utils.MockHTTPClient{} - rpcURL := "http://test-url-track-rpc-service-health" - rpcService, err := NewRPCService(rpcURL, network.TestNetworkPassphrase, mockHTTPClient, mockMetricsService) - require.NoError(t, err) - - // Mock metrics for the initial health check that happens before context check - mockMetricsService.On("IncRPCMethodCalls", "GetHealth").Maybe() - mockMetricsService.On("ObserveRPCMethodDuration", "GetHealth", mock.AnythingOfType("float64")).Maybe() - mockMetricsService.On("IncRPCRequests", "getHealth").Maybe() - mockMetricsService.On("IncRPCEndpointFailure", "getHealth").Maybe() - mockMetricsService.On("IncRPCMethodErrors", "GetHealth", "rpc_error").Maybe() - mockMetricsService.On("ObserveRPCRequestDuration", "getHealth", mock.AnythingOfType("float64")).Maybe() - mockMetricsService.On("SetRPCServiceHealth", false).Maybe() - - // Mock HTTP client to return error (simulating cancelled context) - mockHTTPClient.On("Post", rpcURL, "application/json", mock.Anything). - Return(&http.Response{}, context.Canceled).Maybe() - - err = rpcService.TrackRPCServiceHealth(ctx, nil) - require.Error(t, err) - assert.Contains(t, err.Error(), "context") - - // Verify channel is closed after context cancellation - _, ok := <-rpcService.GetHeartbeatChannel() - assert.False(t, ok, "channel should be closed") -} - -func TestTrackRPCService_DeadlockPrevention(t *testing.T) { - dbt := dbtest.Open(t) - defer dbt.Close() - dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) - require.NoError(t, err) - defer dbConnectionPool.Close() - - // Create fresh RPC service for each test case - mockMetricsService := metrics.NewMockMetricsService() - mockHTTPClient := &utils.MockHTTPClient{} - rpcURL := "http://test-url-deadlock-prevention" - rpcService, err := NewRPCService(rpcURL, network.TestNetworkPassphrase, mockHTTPClient, mockMetricsService) - require.NoError(t, err) - rpcService.healthCheckTickInterval = 10 * time.Millisecond - - // Mock metrics expectations - mockMetricsService.On("IncRPCMethodCalls", "GetHealth").Maybe() - mockMetricsService.On("ObserveRPCMethodDuration", "GetHealth", mock.AnythingOfType("float64")).Maybe() - mockMetricsService.On("IncRPCRequests", "getHealth").Maybe() - mockMetricsService.On("IncRPCEndpointSuccess", "getHealth").Maybe() - mockMetricsService.On("ObserveRPCRequestDuration", "getHealth", mock.AnythingOfType("float64")).Maybe() - mockMetricsService.On("SetRPCServiceHealth", true).Maybe() - mockMetricsService.On("SetRPCLatestLedger", mock.AnythingOfType("int64")).Maybe() - defer mockMetricsService.AssertExpectations(t) - - // Mock successful health response - create fresh response for each call - mockHTTPClient.On("Post", rpcURL, "application/json", mock.Anything).Return(func(url, contentType string, body io.Reader) *http.Response { - return &http.Response{ - StatusCode: http.StatusOK, - Body: io.NopCloser(strings.NewReader(`{ - "jsonrpc": "2.0", - "id": 8675309, - "result": { - "status": "healthy", - "latestLedger": 100, - "oldestLedger": 1, - "ledgerRetentionWindow": 0 - } - }`)), - } - }, nil).Maybe() - - // Start health tracking - this should not deadlock - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - manualTriggerChan := make(chan any, 1) - go func() { - //nolint:errcheck // Error is expected on context cancellation - rpcService.TrackRPCServiceHealth(ctx, manualTriggerChan) - }() - time.Sleep(20 * time.Millisecond) - manualTriggerChan <- nil - - select { - case <-ctx.Done(): - require.Fail(t, "😵 deadlock occurred!") - case manualTriggerChan <- nil: - t.Log("🎉 Deadlock prevented!") - } -} diff --git a/internal/services/token_ingestion.go b/internal/services/token_ingestion.go index 1402beea5..b868ee591 100644 --- a/internal/services/token_ingestion.go +++ b/internal/services/token_ingestion.go @@ -28,7 +28,7 @@ import ( const ( // FlushBatchSize is the number of entries to buffer before flushing to DB. - flushBatchSize = 100_000 + flushBatchSize = 250_000 ) // checkpointData holds all data collected from processing a checkpoint ledger. @@ -268,6 +268,15 @@ func (s *tokenIngestionService) PopulateAccountTokens(ctx context.Context, check return fmt.Errorf("setting synchronous_commit=off: %w", txErr) } + // Disable FK constraint checking for this transaction only. Data integrity is + // guaranteed by the code: trustline assets are collected in uniqueAssets and SAC + // contracts get minimal stubs created when balance entries are encountered (even + // when contract instance entries are missing from the checkpoint). All parent rows + // are inserted via storeTokensInDB before commit. Requires superuser privileges. + if _, txErr := dbTx.Exec(ctx, "SET LOCAL session_replication_role = 'replica'"); txErr != nil { + log.Ctx(ctx).Warnf("Could not disable FK checks for checkpoint population (may require superuser): %v", txErr) + } + // Stream trustlines and collect contracts from checkpoint cpData, txErr := s.streamCheckpointData(ctx, dbTx, reader, checkpointLedger) if txErr != nil {