Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
144 commits
Select commit Hold shift + click to select a range
ea14019
adds ledgerbackend and refactors ingestion to use ledger backend inte…
aristidesstaffieri Nov 21, 2025
b2b1cf1
adds datastore config and backend type, re-adds LedgerLimit as the le…
aristidesstaffieri Nov 21, 2025
e33351d
prepare range when using buffered storage backend
aristidesstaffieri Nov 24, 2025
f858338
prepare range based on backend type
aristidesstaffieri Nov 24, 2025
3f73e47
removes schema config from public data lake config
aristidesstaffieri Nov 24, 2025
ffc15ae
renames datastore-config flag to datastore-config-path
aristidesstaffieri Nov 24, 2025
a179cf3
cleans up ledger backend initialization
aristidesstaffieri Nov 24, 2025
1dc8916
removes ingestion logs, now tracked by metrics
aristidesstaffieri Nov 24, 2025
f638db5
prepare range for rpc ledgerbackend at initialization
aristidesstaffieri Nov 24, 2025
f0879f3
formats ingest and go mod files
aristidesstaffieri Nov 24, 2025
2c3de46
updates ledger backend test to match config
aristidesstaffieri Nov 24, 2025
c058540
formats octal properly in tests
aristidesstaffieri Nov 24, 2025
7bfa5ff
removes dead mock, no longer needed
aristidesstaffieri Nov 24, 2025
e2bc12d
Add ledger backend interface - 1
aditya1702 Nov 25, 2025
8970ab3
Use switch-case block
aditya1702 Nov 25, 2025
d72ff23
Update ingest.go
aditya1702 Nov 25, 2025
3893aea
use the same ingestion loop
aditya1702 Nov 25, 2025
97de9a3
refactor to process single ledger
aditya1702 Nov 25, 2025
5f0ee71
remove deprecated run function
aditya1702 Nov 25, 2025
a65b137
Update ingest_test.go
aditya1702 Nov 25, 2025
0b1ce7b
Update ingest.go
aditya1702 Nov 25, 2025
cce30a2
Initialize checkpoint ledger properly for token cache
aditya1702 Nov 25, 2025
7ebc9e1
change buckets
aditya1702 Nov 25, 2025
e10a62b
allow backfilling old data using datastore
aditya1702 Nov 26, 2025
02efa3d
Update ingest.go
aditya1702 Nov 26, 2025
7db166c
Update ingest.go
aditya1702 Nov 26, 2025
eb5ee39
Update ingest_store_test.go
aditya1702 Nov 26, 2025
80bd8f7
remove participant filtering
aditya1702 Nov 26, 2025
93a1f55
Refactor - 1
aditya1702 Nov 26, 2025
2ea4238
Add comments
aditya1702 Nov 26, 2025
2c7f351
Update ingest.go
aditya1702 Nov 26, 2025
fa6f973
Update ingest.go
aditya1702 Nov 26, 2025
08fed31
Merge branch 'feature/ingest-ledger-backend' into store-all-data
aditya1702 Nov 26, 2025
30a8a0d
Skip storing txmeta via config variable
aditya1702 Nov 26, 2025
fed1fa9
Log time for getting ledger
aditya1702 Nov 26, 2025
f1d9989
Add more granular buckets for ingest phase duration
aditya1702 Nov 26, 2025
91ea056
fix state changes query
aditya1702 Nov 26, 2025
07432c0
Update metrics.go
aditya1702 Nov 26, 2025
2ea32ce
Update ingest.go
aditya1702 Nov 26, 2025
03862e3
fix integration tests - 1
aditya1702 Nov 26, 2025
975b937
Update account_tokens.go
aditya1702 Nov 26, 2025
a60daec
Make metaXDR nullable in graphql schema
aditya1702 Nov 26, 2025
3986905
Fix tests again
aditya1702 Nov 26, 2025
696673e
refactor - 2
aditya1702 Nov 26, 2025
44cbd8b
insert all participants in the account table
aditya1702 Nov 26, 2025
6785756
remove the account register test
aditya1702 Nov 26, 2025
dfe8cd2
Add 1 extra state change count since we not ingest all state changes …
aditya1702 Nov 26, 2025
aaf4285
remove account registration integration tests
aditya1702 Nov 26, 2025
e3962a4
set default options
aditya1702 Nov 26, 2025
13c9a8c
Update Dockerfile
aditya1702 Nov 26, 2025
39cdcf3
Update containers.go
aditya1702 Nov 26, 2025
73445ab
Update containers.go
aditya1702 Nov 26, 2025
2ff58e4
Dont append empty state changes
aditya1702 Nov 26, 2025
3103d05
resolve review comments
aditya1702 Nov 30, 2025
9c87b2f
Fix cherry pick issues
aditya1702 Dec 4, 2025
5765f38
Update global_options.go
aditya1702 Dec 4, 2025
2ae137f
Merge branch 'main' into remove-participant-filtering
aditya1702 Dec 4, 2025
abbe4d1
insert all participants in the account table
aditya1702 Nov 26, 2025
0588242
Merge branch 'remove-participant-filtering' of https://github.com/ste…
aditya1702 Dec 4, 2025
48d965d
Update indexer.go
aditya1702 Dec 4, 2025
4fa2f34
Add 1 extra state change count since we not ingest all state changes …
aditya1702 Nov 26, 2025
9363b3b
remove account registration integration tests
aditya1702 Nov 26, 2025
fa1d9e6
remove account register test from integration tests
aditya1702 Dec 4, 2025
971f89f
Uncomment populate account tokens
aditya1702 Dec 4, 2025
5b95818
Update indexer.go
aditya1702 Dec 4, 2025
1865284
Merge branch 'remove-participant-filtering' into skip-tx-meta
aditya1702 Dec 4, 2025
a9e5c7a
Skip storing tx envelope with config param
aditya1702 Dec 11, 2025
4a6bc86
Handle unlocking channel accounts
aditya1702 Dec 11, 2025
8676ea7
Fix failing unit tests
aditya1702 Dec 16, 2025
5dca8b3
fix failing integration tests - 1
aditya1702 Dec 16, 2025
b1a87da
run make gql-generate
aditya1702 Dec 16, 2025
707e126
add participant filtering flag
aditya1702 Dec 17, 2025
0bb37f7
Update 2025-06-10.4-create_indexer_table_state_changes.sql
aditya1702 Dec 17, 2025
8a4e6fa
Return virtual object if no address found
aditya1702 Dec 17, 2025
b12279a
remove batch insert function for accounts
aditya1702 Dec 17, 2025
d50833e
Add integration tests for filtering participants
aditya1702 Dec 17, 2025
a673114
make check
aditya1702 Dec 17, 2025
72e4fb9
Merge branch 'remove-participant-filtering' into skip-tx-meta
aditya1702 Dec 17, 2025
b8bca17
make check
aditya1702 Dec 17, 2025
438bf31
Merge branch 'skip-tx-meta' into skip-tx-envelope
aditya1702 Dec 17, 2025
6f0af1b
make check
aditya1702 Dec 17, 2025
27ee2f3
Add env flags
aditya1702 Dec 17, 2025
dc16e46
Use a config to pass params to ingestion service
aditya1702 Dec 17, 2025
b719652
Add ledgerbackend factory
aditya1702 Dec 17, 2025
4c29562
Add live ingestion code - 1
aditya1702 Dec 17, 2025
d7f52da
Add live ingestion code - 2
aditya1702 Dec 17, 2025
ae06962
Add backfill code - 1
aditya1702 Dec 17, 2025
f4aba70
Add new methods to ingest store to calculate gaps
aditya1702 Dec 17, 2025
6ea3194
remove unused functions
aditya1702 Dec 17, 2025
bb92ac7
Add metrics function for oldest ledger ingested
aditya1702 Dec 17, 2025
271729b
Add common functions for ingestion for backfilling and live
aditya1702 Dec 17, 2025
1c3250b
Add special db connection pool settings for backfilling
aditya1702 Dec 17, 2025
9f6f02f
use optimized connection pool settings for backfilling
aditya1702 Dec 17, 2025
2f48828
Wait for ingest container to sync with network before starting API
aditya1702 Dec 18, 2025
d0c86da
fix flaky unit test for account balance resolver
aditya1702 Dec 18, 2025
6c1449d
fix health test
aditya1702 Dec 18, 2025
eecd1ec
move CREATE SCHEMA to separate config in docker compose
aditya1702 Dec 18, 2025
ed18524
make check
aditya1702 Dec 18, 2025
c35f3bd
fix make check
aditya1702 Dec 18, 2025
0b209b8
Add backfilling tests to integration tests
aditya1702 Dec 18, 2025
61d3b5a
make check
aditya1702 Dec 18, 2025
d73265c
remove unused account tokens cursor
aditya1702 Dec 18, 2025
d40611d
remove
aditya1702 Dec 18, 2025
5049319
Add new env flags for backfilling workers, batch size etc...
aditya1702 Dec 18, 2025
52298cc
Do fast catchup using parallel backfilling
aditya1702 Dec 18, 2025
a3e0aae
Ledger buffer: Add Clear() method and use pointers for txns and opera…
aditya1702 Dec 18, 2025
1974b9f
Use pointers to batch inserting txns and ops
aditya1702 Dec 18, 2025
45eca0a
Flush ledgers after some batch size during backfilling
aditya1702 Dec 18, 2025
7b78c7c
fix failing tests
aditya1702 Dec 18, 2025
db795bd
make check
aditya1702 Dec 18, 2025
2995cc7
Update ingest_store_test.go
aditya1702 Dec 18, 2025
13abefc
Update ingest_backfill.go
aditya1702 Dec 18, 2025
cf21714
Add log line
aditya1702 Dec 18, 2025
869443a
Add integration tests for catchup
aditya1702 Dec 18, 2025
688699c
Update ingest_store_test.go
aditya1702 Dec 18, 2025
b461474
Update ingest_test.go
aditya1702 Dec 18, 2025
fa8fce1
Merge branch 'add-backfilling' into optimize-catchup
aditya1702 Dec 18, 2025
a003099
Add catchup mode to do optimized catchup
aditya1702 Dec 18, 2025
2f21414
Update ingest_test.go
aditya1702 Dec 18, 2025
f3d30bb
Update catchup_test.go
aditya1702 Dec 19, 2025
99ab73f
Update health_test.go
aditya1702 Dec 19, 2025
cc15033
Update catchup_test.go
aditya1702 Dec 19, 2025
bfff201
Update catchup_test.go
aditya1702 Dec 19, 2025
2827279
Update catchup_test.go
aditya1702 Dec 19, 2025
15194c4
Add BatchCopy to operations
aditya1702 Dec 19, 2025
afcfc46
Add BatchCopy functions
aditya1702 Dec 19, 2025
402770d
Add tests for BatchCopy
aditya1702 Dec 19, 2025
08a49be
Update statechanges.go
aditya1702 Dec 19, 2025
88baa0a
Add benchmarking - 1
aditya1702 Dec 19, 2025
509706e
Update statechanges_test.go
aditya1702 Dec 19, 2025
3ad234a
use real mainnet xdr data for test txns benchmark
aditya1702 Dec 19, 2025
b02055f
increase batch sizes for txn benchmark
aditya1702 Dec 19, 2025
eaf7acb
Use pgx Copy which uses binary format
aditya1702 Dec 19, 2025
390eb63
Add pgx copy benchmarking
aditya1702 Dec 19, 2025
2965a18
Use pgx CopyFrom for binary format copy
aditya1702 Dec 19, 2025
e7782f2
Use pgxTx for unlocking channel accounts
aditya1702 Dec 19, 2025
1c3cd09
Update channel_accounts_model_test.go
aditya1702 Dec 19, 2025
b107506
Change BatchCopyPgx naming
aditya1702 Dec 19, 2025
77c1895
Add backfilling metrics
aditya1702 Dec 19, 2025
c0bed1d
Add backfilling metrics - 2
aditya1702 Dec 19, 2025
5228055
Only log backfilling metrics during backfilling and not catchup mode
aditya1702 Dec 19, 2025
15695a9
Merge branch 'optimize-catchup' into use-copy-inserts
aditya1702 Dec 21, 2025
fbd8215
Merge branch 'use-copy-inserts' into backfilling-metrics
aditya1702 Dec 21, 2025
9d12758
Update ingest_test.go
aditya1702 Dec 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ RUN apt-get update && \
echo "deb https://apt.stellar.org jammy unstable" >/etc/apt/sources.list.d/SDF-unstable.list

COPY --from=api-build /bin/wallet-backend /app/
COPY --from=api-build /src/wallet-backend/config /app/config

EXPOSE 8001
WORKDIR /app
Expand Down
82 changes: 73 additions & 9 deletions cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,61 @@ func (c *ingestCmd) Command() *cobra.Command {
utils.RedisHostOption(&cfg.RedisHost),
utils.RedisPortOption(&cfg.RedisPort),
{
Name: "ledger-cursor-name",
Name: "ingestion-mode",
Usage: "What mode to run ingestion in - live or backfill",
OptType: types.String,
ConfigKey: &cfg.IngestionMode,
FlagDefault: string(ingest.IngestionModeLive),
Required: true,
},
{
Name: "latest-ledger-cursor-name",
Usage: "Name of last synced ledger cursor, used to keep track of the last ledger ingested by the service. When starting up, ingestion will resume from the ledger number stored in this record. It should be an unique name per container as different containers would overwrite the cursor value of its peers when using the same cursor name.",
OptType: types.String,
ConfigKey: &cfg.LedgerCursorName,
FlagDefault: "live_ingest_cursor",
ConfigKey: &cfg.LatestLedgerCursorName,
FlagDefault: "latest_ingest_ledger",
Required: true,
},
{
Name: "account-tokens-cursor-name",
Usage: "Name of last synced account tokens ledger cursor, used to keep track of the last ledger ingested by the service.",
Name: "oldest-ledger-cursor-name",
Usage: "Name of the oldest ledger cursor, used to track the earliest ledger ingested by the service. Used for backfill operations to know where historical data begins.",
OptType: types.String,
ConfigKey: &cfg.AccountTokensCursorName,
FlagDefault: "live_account_tokens_ingest_cursor",
ConfigKey: &cfg.OldestLedgerCursorName,
FlagDefault: "oldest_ingest_ledger",
Required: true,
},
{
Name: "backfill-workers",
Usage: "Maximum concurrent workers for backfill processing. Defaults to number of CPUs. Lower values reduce RAM usage at cost of throughput.",
OptType: types.Int,
ConfigKey: &cfg.BackfillWorkers,
FlagDefault: 0,
Required: false,
},
{
Name: "backfill-batch-size",
Usage: "Number of ledgers per batch during backfill. Defaults to 250. Lower values reduce RAM usage at cost of more DB transactions.",
OptType: types.Int,
ConfigKey: &cfg.BackfillBatchSize,
FlagDefault: 250,
Required: false,
},
{
Name: "backfill-db-insert-batch-size",
Usage: "Number of ledgers to process before flushing buffer to DB during backfill. Defaults to 50. Lower values reduce RAM usage at cost of more DB transactions.",
OptType: types.Int,
ConfigKey: &cfg.BackfillDBInsertBatchSize,
FlagDefault: 100,
Required: false,
},
{
Name: "catchup-threshold",
Usage: "Number of ledgers behind network tip that triggers fast catchup via backfilling. Defaults to 100.",
OptType: types.Int,
ConfigKey: &cfg.CatchupThreshold,
FlagDefault: 100,
Required: false,
},
{
Name: "archive-url",
Usage: "Archive URL for history archives",
Expand All @@ -73,15 +113,39 @@ func (c *ingestCmd) Command() *cobra.Command {
Usage: "Type of ledger backend to use for fetching ledgers. Options: 'rpc' (default) or 'datastore'",
OptType: types.String,
ConfigKey: &ledgerBackendType,
FlagDefault: string(ingest.LedgerBackendTypeRPC),
FlagDefault: string(ingest.LedgerBackendTypeDatastore),
Required: false,
},
{
Name: "datastore-config-path",
Usage: "Path to TOML config file for datastore backend. Required when ledger-backend-type is 'datastore'",
OptType: types.String,
ConfigKey: &cfg.DatastoreConfigPath,
FlagDefault: "",
FlagDefault: "config/datastore-pubnet.toml",
Required: false,
},
{
Name: "skip-tx-meta",
Usage: "Skip storing transaction metadata (meta_xdr) to reduce storage space and improve insertion performance.",
OptType: types.Bool,
ConfigKey: &cfg.SkipTxMeta,
FlagDefault: true,
Required: false,
},
{
Name: "skip-tx-envelope",
Usage: "Skip storing transaction envelope (envelope_xdr) to reduce storage space and improve insertion performance.",
OptType: types.Bool,
ConfigKey: &cfg.SkipTxEnvelope,
FlagDefault: true,
Required: false,
},
{
Name: "enable-participant-filtering",
Usage: "When enabled, only store transactions, operations, and state changes for pre-registered accounts. When disabled (default), store all data.",
OptType: types.Bool,
ConfigKey: &cfg.EnableParticipantFiltering,
FlagDefault: false,
Required: false,
},
}
Expand Down
27 changes: 9 additions & 18 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,9 @@ services:
- postgres-db:/data/postgres
ports:
- 5432:5432
command: |
bash -c "
# Run default postgres initialization
/usr/local/bin/docker-entrypoint.sh postgres &

# Wait for startup
sleep 10

# Create schemas (only on first run)
if [ ! -f /data/postgres/schemas_created ]; then
psql -U postgres -d wallet-backend -c 'CREATE SCHEMA IF NOT EXISTS wallet_backend_mainnet;'
psql -U postgres -d wallet-backend -c 'CREATE SCHEMA IF NOT EXISTS wallet_backend_testnet;'
touch /data/postgres/schemas_created
fi

# Keep postgres running
wait
"
configs:
- source: postgres_init
target: /docker-entrypoint-initdb.d/init-schemas.sql

redis:
container_name: redis
Expand Down Expand Up @@ -305,3 +290,9 @@ volumes:
driver: local
redis-data:
driver: local

configs:
postgres_init:
content: |
CREATE SCHEMA IF NOT EXISTS wallet_backend_mainnet;
CREATE SCHEMA IF NOT EXISTS wallet_backend_testnet;
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/go-chi/chi v4.1.2+incompatible
github.com/go-playground/validator/v10 v10.27.0
github.com/golang-jwt/jwt/v5 v5.2.3
github.com/jackc/pgx/v5 v5.7.6
github.com/jmoiron/sqlx v1.4.0
github.com/lib/pq v1.10.9
github.com/mattn/go-sqlite3 v1.14.28
Expand Down Expand Up @@ -122,6 +123,9 @@ require (
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,14 @@ github.com/imkira/go-interpol v1.1.0 h1:KIiKr0VSG2CUW1hl1jpiyuzuJeKUUpC8iM1AIE7N
github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk=
github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jarcoal/httpmock v0.0.0-20161210151336-4442edb3db31 h1:Aw95BEvxJ3K6o9GGv5ppCd1P8hkeIeEJ30FO+OhOJpM=
github.com/jarcoal/httpmock v0.0.0-20161210151336-4442edb3db31/go.mod h1:ks+b9deReOc7jgqp+e7LuFiCBH6Rm5hL32cLcEAArb4=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
Expand Down
31 changes: 11 additions & 20 deletions internal/data/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,10 @@ func (m *AccountModel) IsAccountFeeBumpEligible(ctx context.Context, address str

// BatchGetByTxHashes gets the accounts that are associated with the given transaction hashes.
func (m *AccountModel) BatchGetByTxHashes(ctx context.Context, txHashes []string, columns string) ([]*types.AccountWithTxHash, error) {
columns = prepareColumnsWithID(columns, types.Account{}, "accounts", "stellar_address")
query := fmt.Sprintf(`
SELECT %s, transactions_accounts.tx_hash
query := `
SELECT account_id AS stellar_address, tx_hash
FROM transactions_accounts
INNER JOIN accounts
ON transactions_accounts.account_id = accounts.stellar_address
WHERE transactions_accounts.tx_hash = ANY($1)`, columns)
WHERE tx_hash = ANY($1)`
var accounts []*types.AccountWithTxHash
start := time.Now()
err := m.DB.SelectContext(ctx, &accounts, query, pq.Array(txHashes))
Expand All @@ -173,13 +170,10 @@ func (m *AccountModel) BatchGetByTxHashes(ctx context.Context, txHashes []string

// BatchGetByOperationIDs gets the accounts that are associated with the given operation IDs.
func (m *AccountModel) BatchGetByOperationIDs(ctx context.Context, operationIDs []int64, columns string) ([]*types.AccountWithOperationID, error) {
columns = prepareColumnsWithID(columns, types.Account{}, "accounts", "stellar_address")
query := fmt.Sprintf(`
SELECT %s, operations_accounts.operation_id
query := `
SELECT account_id AS stellar_address, operation_id
FROM operations_accounts
INNER JOIN accounts
ON operations_accounts.account_id = accounts.stellar_address
WHERE operations_accounts.operation_id = ANY($1)`, columns)
WHERE operation_id = ANY($1)`
var accounts []*types.AccountWithOperationID
start := time.Now()
err := m.DB.SelectContext(ctx, &accounts, query, pq.Array(operationIDs))
Expand All @@ -196,8 +190,6 @@ func (m *AccountModel) BatchGetByOperationIDs(ctx context.Context, operationIDs

// BatchGetByStateChangeIDs gets the accounts that are associated with the given state change IDs.
func (m *AccountModel) BatchGetByStateChangeIDs(ctx context.Context, scToIDs []int64, scOrders []int64, columns string) ([]*types.AccountWithStateChangeID, error) {
columns = prepareColumnsWithID(columns, types.Account{}, "accounts", "stellar_address")

// Build tuples for the IN clause. Since (to_id, state_change_order) is the primary key of state_changes,
// it will be faster to search on this tuple.
tuples := make([]string, len(scOrders))
Expand All @@ -206,12 +198,11 @@ func (m *AccountModel) BatchGetByStateChangeIDs(ctx context.Context, scToIDs []i
}

query := fmt.Sprintf(`
SELECT %s, CONCAT(state_changes.to_id, '-', state_changes.state_change_order) AS state_change_id
FROM accounts
INNER JOIN state_changes ON accounts.stellar_address = state_changes.account_id
WHERE (state_changes.to_id, state_changes.state_change_order) IN (%s)
ORDER BY accounts.created_at DESC
`, columns, strings.Join(tuples, ", "))
SELECT account_id AS stellar_address, CONCAT(to_id, '-', state_change_order) AS state_change_id
FROM state_changes
WHERE (to_id, state_change_order) IN (%s)
ORDER BY ledger_created_at DESC
`, strings.Join(tuples, ", "))

var accountsWithStateChanges []*types.AccountWithStateChangeID
start := time.Now()
Expand Down
55 changes: 48 additions & 7 deletions internal/data/ingest_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import (
"github.com/stellar/wallet-backend/internal/utils"
)

type LedgerRange struct {
GapStart uint32 `db:"gap_start"`
GapEnd uint32 `db:"gap_end"`
}

type IngestStoreModel struct {
DB db.ConnectionPool
MetricsService metrics.MetricsService
Expand All @@ -22,17 +27,17 @@ func (m *IngestStoreModel) Get(ctx context.Context, cursorName string) (uint32,
start := time.Now()
err := m.DB.GetContext(ctx, &lastSyncedLedger, `SELECT value FROM ingest_store WHERE key = $1`, cursorName)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("GetLatestLedgerSynced", "ingest_store", duration)
m.MetricsService.ObserveDBQueryDuration("Get", "ingest_store", duration)
// First run, key does not exist yet
if errors.Is(err, sql.ErrNoRows) {
m.MetricsService.IncDBQuery("GetLatestLedgerSynced", "ingest_store")
m.MetricsService.IncDBQuery("Get", "ingest_store")
return 0, nil
}
if err != nil {
m.MetricsService.IncDBQueryError("GetLatestLedgerSynced", "ingest_store", utils.GetDBErrorType(err))
m.MetricsService.IncDBQueryError("Get", "ingest_store", utils.GetDBErrorType(err))
return 0, fmt.Errorf("getting latest ledger synced for cursor %s: %w", cursorName, err)
}
m.MetricsService.IncDBQuery("GetLatestLedgerSynced", "ingest_store")
m.MetricsService.IncDBQuery("Get", "ingest_store")

return lastSyncedLedger, nil
}
Expand All @@ -45,12 +50,48 @@ func (m *IngestStoreModel) Update(ctx context.Context, dbTx db.Transaction, curs
start := time.Now()
_, err := dbTx.ExecContext(ctx, query, cursorName, ledger)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("UpdateLatestLedgerSynced", "ingest_store", duration)
m.MetricsService.ObserveDBQueryDuration("Update", "ingest_store", duration)
if err != nil {
m.MetricsService.IncDBQueryError("UpdateLatestLedgerSynced", "ingest_store", utils.GetDBErrorType(err))
m.MetricsService.IncDBQueryError("Update", "ingest_store", utils.GetDBErrorType(err))
return fmt.Errorf("updating last synced ledger to %d: %w", ledger, err)
}
m.MetricsService.IncDBQuery("UpdateLatestLedgerSynced", "ingest_store")
m.MetricsService.IncDBQuery("Update", "ingest_store")
return nil
}

func (m *IngestStoreModel) UpdateMin(ctx context.Context, dbTx db.Transaction, cursorName string, ledger uint32) error {
const query = `
UPDATE ingest_store
SET value = LEAST(value::integer, $2)::text
WHERE key = $1
`
_, err := dbTx.ExecContext(ctx, query, cursorName, ledger)
if err != nil {
return fmt.Errorf("updating minimum ledger for cursor %s: %w", cursorName, err)
}
return nil
}

func (m *IngestStoreModel) GetLedgerGaps(ctx context.Context) ([]LedgerRange, error) {
const query = `
SELECT gap_start, gap_end FROM (
SELECT
ledger_number + 1 AS gap_start,
LEAD(ledger_number) OVER (ORDER BY ledger_number) - 1 AS gap_end
FROM (SELECT DISTINCT ledger_number FROM transactions) t
) gaps
WHERE gap_start <= gap_end
ORDER BY gap_start
`
start := time.Now()
var ledgerGaps []LedgerRange
err := m.DB.SelectContext(ctx, &ledgerGaps, query)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("GetLedgerGaps", "transactions", duration)
if err != nil {
m.MetricsService.IncDBQueryError("GetLedgerGaps", "transactions", utils.GetDBErrorType(err))
return nil, fmt.Errorf("getting ledger gaps: %w", err)
}
m.MetricsService.IncDBQuery("GetLedgerGaps", "transactions")
return ledgerGaps, nil
}
Loading