Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/btcindexer/src/btcindexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ describe("CFStorage.insertBlockInfo (Stale Block Protection)", () => {
};

const result = await indexer.storage.insertBlockInfo(record);
expect(result).toBe(true);
expect(result).toEqual({ status: "inserted", changed: true });
const row = await suite.db.prepare("SELECT * FROM btc_blocks WHERE height = 100").first();
expect(row).toEqual(
expect.objectContaining({
Expand Down Expand Up @@ -691,7 +691,7 @@ describe("CFStorage.insertBlockInfo (Stale Block Protection)", () => {

const result = await indexer.storage.insertBlockInfo(newerRecord);

expect(result).toBe(true);
expect(result).toEqual({ status: "updated", changed: true });
const row = await suite.db.prepare("SELECT * FROM btc_blocks WHERE height = 100").first();
expect(row).toEqual(
expect.objectContaining({
Expand Down Expand Up @@ -719,7 +719,7 @@ describe("CFStorage.insertBlockInfo (Stale Block Protection)", () => {

const result = await indexer.storage.insertBlockInfo(staleRecord);

expect(result).toBe(false); // Update rejected
expect(result).toEqual({ status: "skipped", changed: false }); // Update rejected
const row = await suite.db.prepare("SELECT * FROM btc_blocks WHERE height = 100").first();
expect(row).toEqual(
expect.objectContaining({
Expand Down
7 changes: 4 additions & 3 deletions packages/btcindexer/src/btcindexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,14 @@ export class Indexer {
const block = Block.fromBuffer(Buffer.from(rawBlockBuffer));
const existingHash = await this.storage.getBlockHash(blockInfo.height, blockInfo.network);

const isNewOrMinted = await this.storage.insertBlockInfo(blockInfo);
if (!isNewOrMinted) {
const result = await this.storage.insertBlockInfo(blockInfo);
if (!result.changed) {
logger.debug({
msg: "Skipping processing already processed block",
msg: "Skipping: block already processed",
method: "Indexer.processBlock",
height: blockInfo.height,
hash: blockInfo.hash,
status: result.status,
});
return;
}
Expand Down
34 changes: 28 additions & 6 deletions packages/btcindexer/src/cf-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
NbtcTxUpdate,
NbtcBroadcastedDeposit,
ConfirmingBlockInfo,
InsertBlockResult,
} from "./models";
import { MintTxStatus } from "./models";
import type { Storage } from "./storage";
Expand Down Expand Up @@ -63,11 +64,12 @@ export class CFStorage implements Storage {
* - AND The incoming `timestamp_ms` is greater than the stored `inserted_at`.
*
* @param b - The block record from the queue.
* @returns `true` if the block was inserted or updated, `false` otherwise.
* @returns InsertBlockResult tells us whether the block was inserted, updated, or skipped.
*/
async insertBlockInfo(b: BlockQueueRecord): Promise<boolean> {
// TODO: we should return here InsertResult {ignored, inserted, updated}
// we need to use batching
async insertBlockInfo(b: BlockQueueRecord): Promise<InsertBlockResult> {
const checkRowStmt = this.d1.prepare(
`SELECT 1 FROM btc_blocks WHERE height = ? AND network = ?`,
);
const insertStmt = this.d1.prepare(
`INSERT INTO btc_blocks (hash, height, network, inserted_at) VALUES (?, ?, ?, ?)
ON CONFLICT(height, network) DO UPDATE SET
Expand All @@ -77,8 +79,28 @@ export class CFStorage implements Storage {
WHERE btc_blocks.hash IS NOT excluded.hash AND excluded.inserted_at > btc_blocks.inserted_at`,
);
try {
const result = await insertStmt.bind(b.hash, b.height, b.network, b.timestamp_ms).run();
return result.meta.changes > 0;
const results = await this.d1.batch([
checkRowStmt.bind(b.height, b.network),
insertStmt.bind(b.hash, b.height, b.network, b.timestamp_ms),
]);

const checkResult = results[0];
const upsertResult = results[1];

if (!checkResult || !upsertResult) {
throw new Error("Batch operation failed");
}

const wasFound = checkResult.results.length > 0;
const rowsChanged = upsertResult.meta.changes > 0;

if (!wasFound) {
return { status: "inserted", changed: true };
} else if (rowsChanged) {
return { status: "updated", changed: true };
} else {
return { status: "skipped", changed: false };
}
} catch (e) {
logError(
{
Expand Down
5 changes: 5 additions & 0 deletions packages/btcindexer/src/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,8 @@ export interface NbtcDepositAddrVal {

// Maps Bitcoin deposit address to NbtcDepositAddrMapping
export type NbtcDepositAddrsMap = Map<string, NbtcDepositAddrVal>;

export type InsertBlockResult =
| { status: "inserted"; changed: true }
| { status: "updated"; changed: true }
| { status: "skipped"; changed: false };
6 changes: 3 additions & 3 deletions packages/btcindexer/src/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ describe("CFStorage", () => {
timestamp_ms: 1000,
};
const result = await storage.insertBlockInfo(block);
expect(result).toBe(true);
expect(result).toEqual({ status: "inserted", changed: true });

const saved = await storage.getBlockHash(100, BtcNet.REGTEST);
expect(saved).toBe("0000hash1");
Expand All @@ -155,7 +155,7 @@ describe("CFStorage", () => {
network: BtcNet.REGTEST,
timestamp_ms: 2000,
});
expect(result).toBe(true);
expect(result).toEqual({ status: "updated", changed: true });
const saved = await storage.getBlockHash(100, BtcNet.REGTEST);
expect(saved).toBe("0000hashNew");
});
Expand All @@ -174,7 +174,7 @@ describe("CFStorage", () => {
network: BtcNet.REGTEST,
timestamp_ms: 1000,
});
expect(result).toBe(false);
expect(result).toEqual({ status: "skipped", changed: false });
const saved = await storage.getBlockHash(100, BtcNet.REGTEST);
expect(saved).toBe("0000hashNew");
});
Expand Down
3 changes: 2 additions & 1 deletion packages/btcindexer/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
NbtcPkgCfg,
NbtcDepositAddrsMap,
ConfirmingBlockInfo,
InsertBlockResult,
} from "./models";
import { D1Database } from "@cloudflare/workers-types";
import type { BlockQueueRecord, BtcNet } from "@gonative-cc/lib/nbtc";
Expand All @@ -18,7 +19,7 @@ import { toSuiNet } from "@gonative-cc/lib/nsui";
export interface Storage {
// Block operations
markBlockAsProcessed(hash: string, network: BtcNet): Promise<void>;
insertBlockInfo(blockMessage: BlockQueueRecord): Promise<boolean>;
insertBlockInfo(blockMessage: BlockQueueRecord): Promise<InsertBlockResult>;
getLatestBlockHeight(network: BtcNet): Promise<number | null>;
getChainTip(network: BtcNet): Promise<number | null>;
setChainTip(height: number, network: BtcNet): Promise<void>;
Expand Down