From 03638e382389b755babac4e542387e9012ffb7d8 Mon Sep 17 00:00:00 2001 From: Ilyak Date: Thu, 16 Oct 2025 13:01:30 -0600 Subject: [PATCH 1/6] [WIP]feat: added core logic for handling errors for transactions and reports --- .../20251016181328_init/migration.sql | 16 +++++ prisma/schema.prisma | 15 ++++ src/managers/Relayer.ts | 71 ++++++++++++++++++- src/utils/RelayerJobQueue.ts | 37 ++++++++++ 4 files changed, 137 insertions(+), 2 deletions(-) create mode 100644 prisma/migrations/20251016181328_init/migration.sql create mode 100644 src/utils/RelayerJobQueue.ts diff --git a/prisma/migrations/20251016181328_init/migration.sql b/prisma/migrations/20251016181328_init/migration.sql new file mode 100644 index 0000000..adbfb68 --- /dev/null +++ b/prisma/migrations/20251016181328_init/migration.sql @@ -0,0 +1,16 @@ +-- CreateTable +CREATE TABLE "RelayerJob" ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "jobType" TEXT NOT NULL DEFAULT 'tx-submit', + "txHash" TEXT, + "chainName" TEXT NOT NULL, + "payload" TEXT NOT NULL, + "status" TEXT NOT NULL DEFAULT 'pending', + "attempts" INTEGER NOT NULL DEFAULT 0, + "nextRetryAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" DATETIME NOT NULL +); + +-- CreateIndex +CREATE INDEX "RelayerJob_status_nextRetryAt_idx" ON "RelayerJob"("status", "nextRetryAt"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index f7e738f..e9af795 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -21,3 +21,18 @@ model LogsListenerBlockCheckpoints { @@index([chainSelector, contractAddress]) @@map("logsListener_blockCheckpoints") } + +model RelayerJob { + id Int @id @default(autoincrement()) + jobType String @default("tx-submit") // tx-submit | report-request + txHash String? + chainName String + payload String + status String @default("pending") // pending | processing | done | failed + attempts Int @default(0) + nextRetryAt DateTime @default(now()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@index([status, nextRetryAt]) +} diff --git a/src/managers/Relayer.ts b/src/managers/Relayer.ts index f6c4f0e..e2c1eef 100644 --- a/src/managers/Relayer.ts +++ b/src/managers/Relayer.ts @@ -26,6 +26,9 @@ import { DecodedLog } from '../types/DecodedLog'; import { RelayerConfig } from '../types/ManagerConfigs'; import { decodeCLFReport, decodeMessageReportResult } from '../utils'; import { DecodedMessageReportResult } from '../utils/decoders/types'; +import { PrismaClient } from "@prisma/client"; +import { RelayerJobQueue } from "../utils/RelayerJobQueue"; + export class Relayer extends ManagerBase { private static instance: Relayer | undefined; @@ -39,6 +42,9 @@ export class Relayer extends ManagerBase { private readonly txMonitor: ITxMonitor; private readonly setup: RelayerSetup; private readonly config: RelayerConfig; + private prisma = new PrismaClient(); + private jobQueue = new RelayerJobQueue(this.prisma); + private verifierNetwork!: ConceroNetwork; private verifierAddress!: Address; @@ -144,6 +150,47 @@ export class Relayer extends ManagerBase { await this.setup.executeSetup(); await this.setupEventListeners(); + setInterval(async () => { + const jobs = await this.jobQueue.getDue(5); + for (const job of jobs) { + const ctx = JSON.parse(job.payload); + if (job.jobType === "report-request") { + try { + this.logger.info(`[${ctx.srcChainSelector}] Retrying report request...`); + await this.requestMessageReport(ctx.decodedLog, ctx.srcChainSelector); + await this.jobQueue.markSuccess(job.id); + } catch (err) { + this.logger.error(`[${ctx.srcChainSelector}] Report retry error: ${err}`); + await this.jobQueue.markFailed(job.id, job.attempts + 1); + } + continue; + } + const { chainName, reportSubmission, messages, indexes, results, totalGasLimit } = ctx; + const dstChain = this.networkManager.getNetworkByName(chainName); + if (!dstChain) { + this.logger.error(`[${chainName}] Retry failed: no network`); + await this.jobQueue.markFailed(job.id, job.attempts); + continue; + } + + try { + this.logger.info(`[${chainName}] Retrying job ${job.id} (attempt ${job.attempts + 1})`); + const newTxHash = await this.submitBatchToDestination( + dstChain, + reportSubmission, + messages, + indexes, + results, + totalGasLimit, + ); + this.logger.info(`[${chainName}] Retry success: ${newTxHash}`); + await this.jobQueue.markSuccess(job.id); + } catch (err) { + this.logger.error(`[${chainName}] Retry error: ${err}`); + await this.jobQueue.markFailed(job.id, job.attempts + 1); + } + } + }, 15_000); this.logger.info('initialized'); } @@ -464,7 +511,14 @@ export class Relayer extends ManagerBase { } catch (error) { const messageId = decodedLog.args?.messageId; this.logger.error( - `[${this.verifierNetwork.name}] Error requesting CLF message report for messageId ${messageId || 'unknown'}: ${error}`, + `[${this.verifierNetwork.name}] Error requesting CLF message report for messageId ${messageId || 'unknown'}: ${error}`, + ); + + await this.jobQueue.add( + "report-request", + null, + this.verifierNetwork.name, + { decodedLog, srcChainSelector } ); } } @@ -642,7 +696,7 @@ export class Relayer extends ManagerBase { if (isFinalized) { this.destinationChainFinalityMap.delete(txHash); } else { - this.retryDestinationSubmission(txHash); + this.retryDestinationSubmissionWithQueue(txHash); } return; } @@ -707,4 +761,17 @@ export class Relayer extends ManagerBase { ); this.destinationChainFinalityMap.delete(originalTxHash); } + + private async retryDestinationSubmissionWithQueue(originalTxHash: string): Promise { + const context = this.destinationChainFinalityMap.get(originalTxHash); + if (!context) { + this.logger.error(`Cannot retry: no context for ${originalTxHash}`); + return; + } + + const { chainName } = context; + await this.jobQueue.add(chainName, originalTxHash, context); + this.logger.info(`[${chainName}] Queued failed tx ${originalTxHash} for retry`); + } + } diff --git a/src/utils/RelayerJobQueue.ts b/src/utils/RelayerJobQueue.ts new file mode 100644 index 0000000..73236f9 --- /dev/null +++ b/src/utils/RelayerJobQueue.ts @@ -0,0 +1,37 @@ +import { PrismaClient } from "@prisma/client"; + +export class RelayerJobQueue { + constructor(private prisma: PrismaClient) {} + + async add(jobType: "tx-submit" | "report-request", chainName: string | null, txHash: string | null, payload: any) { + await this.prisma.relayerJob.create({ + data: { + jobType, + chainName, + txHash, + payload: JSON.stringify(payload), + }, + }); + } + + async getDue(limit = 5) { + return this.prisma.relayerJob.findMany({ + where: { status: "pending", nextRetryAt: { lte: new Date() } }, + orderBy: { id: "asc" }, + take: limit, + }); + } + + async markSuccess(id: number) { + await this.prisma.relayerJob.delete({ where: { id } }); + } + + async markFailed(id: number, attempts: number) { + const delaySec = Math.min(60 * Math.pow(2, attempts), 600); // максимум 10 минут + const next = new Date(Date.now() + delaySec * 1000); + await this.prisma.relayerJob.update({ + where: { id }, + data: { attempts: { increment: 1 }, nextRetryAt: next }, + }); + } +} From c0d504e845b4e0aa42f32c6e219531d6826a5331 Mon Sep 17 00:00:00 2001 From: Ilyak Date: Thu, 16 Oct 2025 13:04:06 -0600 Subject: [PATCH 2/6] fix: translation --- src/utils/RelayerJobQueue.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/RelayerJobQueue.ts b/src/utils/RelayerJobQueue.ts index 73236f9..adf6991 100644 --- a/src/utils/RelayerJobQueue.ts +++ b/src/utils/RelayerJobQueue.ts @@ -27,7 +27,7 @@ export class RelayerJobQueue { } async markFailed(id: number, attempts: number) { - const delaySec = Math.min(60 * Math.pow(2, attempts), 600); // максимум 10 минут + const delaySec = Math.min(60 * Math.pow(2, attempts), 600); // 10 minutes max const next = new Date(Date.now() + delaySec * 1000); await this.prisma.relayerJob.update({ where: { id }, From 1672c75058a03e4a24d186ee0f642122f68cdcc2 Mon Sep 17 00:00:00 2001 From: Ilyak Date: Mon, 20 Oct 2025 11:26:22 -0600 Subject: [PATCH 3/6] chore: core logic for re-request reports, fixed docker db path for linux --- .dockerignore | 3 + bun.lock | 2 +- docker/Dockerfile | 3 +- package.json | 2 +- .../migration.sql | 8 + prisma/schema.prisma | 3 +- src/managers/Relayer.ts | 232 ++++++++---------- src/types/envPrefixes.ts | 15 +- src/utils/RelayerJobQueue.ts | 114 ++++++--- src/utils/configureDotEnv.ts | 53 +++- src/utils/resolvePrivateKey.ts | 18 ++ 11 files changed, 270 insertions(+), 183 deletions(-) create mode 100644 prisma/migrations/20251020165731_add_compound_unique_relayerjob/migration.sql create mode 100644 src/utils/resolvePrivateKey.ts diff --git a/.dockerignore b/.dockerignore index e41b6d4..fda475d 100644 --- a/.dockerignore +++ b/.dockerignore @@ -9,3 +9,6 @@ logs *.db ./prisma/db/* ./prisma/generated +prisma/dev.db +*.sqlite +*.sqlite-journal diff --git a/bun.lock b/bun.lock index 6a20668..fec0aff 100644 --- a/bun.lock +++ b/bun.lock @@ -4,7 +4,7 @@ "": { "name": "concero-v2-operators", "dependencies": { - "@concero/operator-utils": "https://github.com/concero/operator-utils.git#feature/add-block-checkpoints", + "@concero/operator-utils": "https://github.com/concero/operator-utils.git#feature/restart-failed-transaction", "@prisma/client": "6.16.2", "@slack/web-api": "7.9.1", "@types/jest": "29.5.14", diff --git a/docker/Dockerfile b/docker/Dockerfile index 69cd0b5..3e67214 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -33,5 +33,6 @@ COPY --from=builder --chown=bunuser:bunuser /app/prisma /app/prisma COPY --chown=bunuser:bunuser ./rpc.extensions.json /app/ COPY --chown=bunuser:bunuser ./rpc.overrides.json /app/ -ENV NODE_ENV=production +ENV NODE_ENV=production \ + DATABASE_URL=file:/app/data/concero.sqlite ENTRYPOINT ["/app/entrypoint.sh"] diff --git a/package.json b/package.json index 4883d6a..081c498 100644 --- a/package.json +++ b/package.json @@ -45,7 +45,7 @@ "typescript": "5.0.0" }, "dependencies": { - "@concero/operator-utils": "https://github.com/concero/operator-utils.git#feature/add-block-checkpoints", + "@concero/operator-utils": "https://github.com/concero/operator-utils.git#feature/restart-failed-transaction", "@prisma/client": "6.16.2", "@slack/web-api": "7.9.1", "@types/jest": "29.5.14", diff --git a/prisma/migrations/20251020165731_add_compound_unique_relayerjob/migration.sql b/prisma/migrations/20251020165731_add_compound_unique_relayerjob/migration.sql new file mode 100644 index 0000000..1521c4c --- /dev/null +++ b/prisma/migrations/20251020165731_add_compound_unique_relayerjob/migration.sql @@ -0,0 +1,8 @@ +/* + Warnings: + + - A unique constraint covering the columns `[jobType,txHash]` on the table `RelayerJob` will be added. If there are existing duplicate values, this will fail. + +*/ +-- CreateIndex +CREATE UNIQUE INDEX "RelayerJob_jobType_txHash_key" ON "RelayerJob"("jobType", "txHash"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index e9af795..59e49f1 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -25,8 +25,8 @@ model LogsListenerBlockCheckpoints { model RelayerJob { id Int @id @default(autoincrement()) jobType String @default("tx-submit") // tx-submit | report-request - txHash String? chainName String + txHash String? payload String status String @default("pending") // pending | processing | done | failed attempts Int @default(0) @@ -35,4 +35,5 @@ model RelayerJob { updatedAt DateTime @updatedAt @@index([status, nextRetryAt]) + @@unique([jobType, txHash]) } diff --git a/src/managers/Relayer.ts b/src/managers/Relayer.ts index e2c1eef..add47af 100644 --- a/src/managers/Relayer.ts +++ b/src/managers/Relayer.ts @@ -1,5 +1,6 @@ import { Address, + ByteArray, decodeAbiParameters, encodeAbiParameters, getAbiItem, @@ -18,6 +19,7 @@ import { ViemClientManager, } from '@concero/operator-utils'; import { MessagingDeploymentManager, RelayerSetup } from './index'; +import { PrismaClient } from '@prisma/client'; import { eventEmitter } from '../constants'; import { decodeLogs } from '../eventListener/decodeLogs'; @@ -26,9 +28,7 @@ import { DecodedLog } from '../types/DecodedLog'; import { RelayerConfig } from '../types/ManagerConfigs'; import { decodeCLFReport, decodeMessageReportResult } from '../utils'; import { DecodedMessageReportResult } from '../utils/decoders/types'; -import { PrismaClient } from "@prisma/client"; -import { RelayerJobQueue } from "../utils/RelayerJobQueue"; - +import { RelayerJobQueue } from '../utils/RelayerJobQueue'; export class Relayer extends ManagerBase { private static instance: Relayer | undefined; @@ -45,7 +45,6 @@ export class Relayer extends ManagerBase { private prisma = new PrismaClient(); private jobQueue = new RelayerJobQueue(this.prisma); - private verifierNetwork!: ConceroNetwork; private verifierAddress!: Address; @@ -151,46 +150,55 @@ export class Relayer extends ManagerBase { await this.setupEventListeners(); setInterval(async () => { - const jobs = await this.jobQueue.getDue(5); + const jobs = await this.jobQueue.getDue(10); + for (const job of jobs) { - const ctx = JSON.parse(job.payload); - if (job.jobType === "report-request") { + if (job.jobType === 'report-request') { try { - this.logger.info(`[${ctx.srcChainSelector}] Retrying report request...`); - await this.requestMessageReport(ctx.decodedLog, ctx.srcChainSelector); - await this.jobQueue.markSuccess(job.id); + const ctx = JSON.parse(job.payload); + this.logger.info( + `[report-request] retry #${job.attempts + 1} for messageId=${job.txHash}`, + ); + await this.requestMessageReport(ctx.decodedLog, ctx.srcChainSelector); } catch (err) { - this.logger.error(`[${ctx.srcChainSelector}] Report retry error: ${err}`); - await this.jobQueue.markFailed(job.id, job.attempts + 1); + this.logger.error(`[report-request] error: ${err}`); + } finally { + await this.jobQueue.rescheduleReportRequest(job.id, job.attempts); } continue; - } - const { chainName, reportSubmission, messages, indexes, results, totalGasLimit } = ctx; + } + + const ctx = JSON.parse(job.payload); + const { chainName, reportSubmission, messages, indexes, results, totalGasLimit } = + ctx; const dstChain = this.networkManager.getNetworkByName(chainName); - if (!dstChain) { - this.logger.error(`[${chainName}] Retry failed: no network`); - await this.jobQueue.markFailed(job.id, job.attempts); - continue; - } - - try { - this.logger.info(`[${chainName}] Retrying job ${job.id} (attempt ${job.attempts + 1})`); - const newTxHash = await this.submitBatchToDestination( - dstChain, - reportSubmission, - messages, - indexes, - results, - totalGasLimit, - ); - this.logger.info(`[${chainName}] Retry success: ${newTxHash}`); - await this.jobQueue.markSuccess(job.id); - } catch (err) { - this.logger.error(`[${chainName}] Retry error: ${err}`); - await this.jobQueue.markFailed(job.id, job.attempts + 1); - } + if (!dstChain) { + this.logger.error(`[${chainName}] Retry failed: no network`); + await this.jobQueue.markFailed(job.id, job.attempts); + continue; + } + + try { + this.logger.info( + `[${chainName}] Retrying job ${job.id} (attempt ${job.attempts + 1})`, + ); + const newTxHash = await this.submitBatchToDestination( + dstChain, + reportSubmission, + messages, + indexes, + results, + totalGasLimit, + ); + this.logger.info(`[${chainName}] Retry success: ${newTxHash}`); + await this.jobQueue.markSuccess(job.id); + } catch (err) { + this.logger.error(`[${chainName}] Retry error: ${err}`); + await this.jobQueue.markFailed(job.id, job.attempts + 1); + } } - }, 15_000); + }, 15_000); + this.logger.info('initialized'); } @@ -275,11 +283,11 @@ export class Relayer extends ManagerBase { const decodedLogs = decodeLogs(logs, this.config.abi.CONCERO_ROUTER); const immediateProcessLogs = decodedLogs.filter( - log => log.args.shouldFinaliseSrc === false, + log => log.args?.shouldFinaliseSrc === false, ); const finalityRequiredLogs = decodedLogs.filter( - log => log.args.shouldFinaliseSrc === true, + log => log.args?.shouldFinaliseSrc === true, ); finalityRequiredLogs.forEach(decodedLog => { @@ -339,6 +347,10 @@ export class Relayer extends ManagerBase { ); return; } + const allMessageIds = messageResults.map(r => r.messageId); + await Promise.allSettled( + allMessageIds.map(id => this.jobQueue.cancelReportRequest(id)), + ); const messagesByDstChain = this.groupMessagesByDestination(messageResults); @@ -371,7 +383,7 @@ export class Relayer extends ManagerBase { ); const validMessages: string[] = []; - const validIndexes: bigint[] = []; + const validIndexes: number[] = []; const validResults: any[] = []; let totalGasLimit = 0n; @@ -416,15 +428,15 @@ export class Relayer extends ManagerBase { const messageIds = validResults.map(r => r.messageId); - // this.destinationChainFinalityMap.set(submissionTxHash, { - // chainName: dstChain.name, - // messageIds, - // reportSubmission, - // messages: validMessages, - // indexes: validIndexes, - // results: validResults, - // totalGasLimit, - // }); + this.destinationChainFinalityMap.set(submissionTxHash, { + chainName: dstChain.name, + messageIds, + reportSubmission, + messages: validMessages, + indexes: validIndexes, + results: validResults, + totalGasLimit, + }); this.txMonitor.ensureTxFinality( submissionTxHash, @@ -467,8 +479,11 @@ export class Relayer extends ManagerBase { ): Promise { try { const args = decodedLog.args; - const { messageId, message, sender } = args; - + const { messageId, message, sender } = args as unknown as { + messageId: string; + message: Hash | ByteArray; + sender: Hash; + }; if (!messageId || !message || !sender || !decodedLog.blockNumber) { this.logger.error(`Missing required data in log: ${decodedLog}`); return; @@ -484,12 +499,7 @@ export class Relayer extends ManagerBase { ], }, ], - [ - { - blockNumber: BigInt(decodedLog.blockNumber), - sender, - }, - ], + [{ blockNumber: BigInt(decodedLog.blockNumber), sender }], ); const txHash = await this.txWriter.callContract(this.verifierNetwork, { @@ -500,25 +510,29 @@ export class Relayer extends ManagerBase { chain: this.verifierNetwork.viemChain, }); - if (txHash) { - eventEmitter.emit('requestMessageReport', { - txHash: txHash, - }); - this.logger.info(`Report requested, tx: ${txHash}`); - } else { - this.logger.error(`Failed to submit Report request`); - } + await this.jobQueue.upsertReportRequest( + messageId, + this.verifierNetwork.name, + { decodedLog, srcChainSelector }, + 5, + ); + + eventEmitter.emit('requestMessageReport', { txHash }); + this.logger.info(`Report requested, tx: ${txHash}`); } catch (error) { const messageId = decodedLog.args?.messageId; this.logger.error( - `[${this.verifierNetwork.name}] Error requesting CLF message report for messageId ${messageId || 'unknown'}: ${error}`, + `[${this.verifierNetwork.name}] Error requesting report for messageId ${messageId || 'unknown'}: ${error}`, ); - - await this.jobQueue.add( - "report-request", - null, - this.verifierNetwork.name, - { decodedLog, srcChainSelector } + + await this.jobQueue.upsertReportRequest( + decodedLog.args?.messageId ?? 'unknown', + this.verifierNetwork.name, + { + decodedLog, + srcChainSelector, + }, + 10, ); } } @@ -614,7 +628,10 @@ export class Relayer extends ManagerBase { return { message: null, gasLimit: 0n }; } - const { message, dstChainData } = conceroMessageSentLog.args; + const { message, dstChainData } = conceroMessageSentLog.args as { + message: string; + dstChainData: Hash | ByteArray; + }; const decodedDstChainData = decodeAbiParameters( [ @@ -636,10 +653,10 @@ export class Relayer extends ManagerBase { dstChain: ConceroNetwork, reportSubmission: any, messages: string[], - indexes: number[], + indexes: number[] | bigint[], results: DecodedMessageReportResult[], totalGasLimit: bigint, - ): Promise { + ): Promise { const dstConceroRouter = await this.deploymentManager.getRouterByChainName(dstChain.name); const txHash = await this.txWriter.callContract( @@ -704,74 +721,19 @@ export class Relayer extends ManagerBase { this.logger.error(`No context found for transaction ${txHash} on chain ${chainName}`); } - private async retryDestinationSubmission(originalTxHash: string): Promise { + private async retryDestinationSubmissionWithQueue(originalTxHash: string): Promise { const context = this.destinationChainFinalityMap.get(originalTxHash); if (!context) { - this.logger.error(`Cannot retry: no context found for ${originalTxHash}`); - return; - } - - const { chainName, reportSubmission, messages, indexes, results, totalGasLimit } = context; - - const dstChain = this.networkManager.getNetworkByName(chainName); - if (!dstChain) { - this.logger.error(`Cannot retry: network ${chainName} not found`); - this.destinationChainFinalityMap.delete(originalTxHash); + this.logger.error(`Cannot retry: no context for ${originalTxHash}`); return; } - const maxAttempts = 3; - for (let attempt = 1; attempt <= maxAttempts; attempt++) { - try { - this.logger.info( - `[${chainName}] Retrying report submission (attempt ${attempt}/${maxAttempts}). Message IDs: ${context.messageIds.join(', ')}`, - ); - - const newTxHash = await this.submitBatchToDestination( - dstChain, - reportSubmission, - messages, - indexes, - results, - totalGasLimit, - ); - - this.destinationChainFinalityMap.delete(originalTxHash); - this.destinationChainFinalityMap.set(newTxHash, context); - this.txMonitor.ensureTxFinality( - newTxHash, - dstChain.name, - this.onFinalityCallback.bind(this), - ); - return; - } catch (error) { - this.logger.error( - `Error in destination submission attempt ${attempt}/${maxAttempts} for ${originalTxHash}: ${error}`, - ); - } + const { chainName } = context; - if (attempt < maxAttempts) { - await new Promise(resolve => setTimeout(resolve, 5000)); - } - } + await this.jobQueue.add('tx-submit', chainName, originalTxHash, context); - // All attempts threw exceptions, clean up the original failed tx - this.logger.error( - `[${chainName}] All ${maxAttempts} submission attempts threw exceptions. Giving up. Message IDs: ${context.messageIds.join(', ')}`, - ); this.destinationChainFinalityMap.delete(originalTxHash); - } - private async retryDestinationSubmissionWithQueue(originalTxHash: string): Promise { - const context = this.destinationChainFinalityMap.get(originalTxHash); - if (!context) { - this.logger.error(`Cannot retry: no context for ${originalTxHash}`); - return; - } - - const { chainName } = context; - await this.jobQueue.add(chainName, originalTxHash, context); this.logger.info(`[${chainName}] Queued failed tx ${originalTxHash} for retry`); - } - + } } diff --git a/src/types/envPrefixes.ts b/src/types/envPrefixes.ts index faad3b3..f30aaff 100644 --- a/src/types/envPrefixes.ts +++ b/src/types/envPrefixes.ts @@ -1,7 +1,16 @@ export type EnvPrefixes = { + nodeEnv: string; + networkMode: string; + logLevel: string; + operatorAddress: string; + operatorPrivateKey: string; + pollingIntervalMs: string; + dryRun: string; + rpcServiceGitBranch: string; + deploymentsServiceGitBranch: string; router: string; verifier: string; - lpToken: string; - create3Factory: string; - pause: string; + lpToken?: string; + create3Factory?: string; + pause?: string; }; diff --git a/src/utils/RelayerJobQueue.ts b/src/utils/RelayerJobQueue.ts index adf6991..842e26d 100644 --- a/src/utils/RelayerJobQueue.ts +++ b/src/utils/RelayerJobQueue.ts @@ -1,37 +1,83 @@ -import { PrismaClient } from "@prisma/client"; +import { PrismaClient } from '@prisma/client'; + +const DELAYS = [5, 10, 30, 120, 300, 600, 1200, 3600]; +const nextDelay = (attempts: number) => + attempts < DELAYS.length ? DELAYS[attempts] : DELAYS[DELAYS.length - 1]; export class RelayerJobQueue { - constructor(private prisma: PrismaClient) {} - - async add(jobType: "tx-submit" | "report-request", chainName: string | null, txHash: string | null, payload: any) { - await this.prisma.relayerJob.create({ - data: { - jobType, - chainName, - txHash, - payload: JSON.stringify(payload), - }, - }); - } - - async getDue(limit = 5) { - return this.prisma.relayerJob.findMany({ - where: { status: "pending", nextRetryAt: { lte: new Date() } }, - orderBy: { id: "asc" }, - take: limit, - }); - } - - async markSuccess(id: number) { - await this.prisma.relayerJob.delete({ where: { id } }); - } - - async markFailed(id: number, attempts: number) { - const delaySec = Math.min(60 * Math.pow(2, attempts), 600); // 10 minutes max - const next = new Date(Date.now() + delaySec * 1000); - await this.prisma.relayerJob.update({ - where: { id }, - data: { attempts: { increment: 1 }, nextRetryAt: next }, - }); - } + constructor(private prisma: PrismaClient) {} + + async getDue(limit = 10) { + return this.prisma.relayerJob.findMany({ + where: { status: 'pending', nextRetryAt: { lte: new Date() } }, + orderBy: { id: 'asc' }, + take: limit, + }); + } + + async markSuccess(id: number) { + await this.prisma.relayerJob.delete({ where: { id } }); + } + + async markFailed(id: number, attempts: number) { + const delay = nextDelay(attempts); + const next = new Date(Date.now() + delay * 1000); + await this.prisma.relayerJob.update({ + where: { id }, + data: { attempts: { increment: 1 }, nextRetryAt: next }, + }); + } + + async add(jobType: 'tx-submit', chainName: string, txHash: string, payload: any) { + await this.prisma.relayerJob.create({ + data: { + jobType, + chainName, + txHash, + payload: JSON.stringify(payload), + }, + }); + } + + async upsertReportRequest( + messageId: string, + chainName: string, + payload: any, + initialDelaySec = 5, + ) { + const next = new Date(Date.now() + initialDelaySec * 1000); + await this.prisma.relayerJob.upsert({ + where: { jobType_txHash: { jobType: 'report-request', txHash: messageId } }, + update: { + chainName, + payload: JSON.stringify(payload), + status: 'pending', + nextRetryAt: next, + }, + create: { + jobType: 'report-request', + chainName, + txHash: messageId, + payload: JSON.stringify(payload), + status: 'pending', + attempts: 0, + nextRetryAt: next, + }, + }); + } + + async rescheduleReportRequest(id: number, attempts: number) { + const delay = nextDelay(attempts); + const next = new Date(Date.now() + delay * 1000); + await this.prisma.relayerJob.update({ + where: { id }, + data: { attempts: { increment: 1 }, nextRetryAt: next }, + }); + } + + async cancelReportRequest(messageId: string) { + await this.prisma.relayerJob.deleteMany({ + where: { jobType: 'report-request', txHash: messageId }, + }); + } } diff --git a/src/utils/configureDotEnv.ts b/src/utils/configureDotEnv.ts index 58c5fe4..ebad1a5 100644 --- a/src/utils/configureDotEnv.ts +++ b/src/utils/configureDotEnv.ts @@ -4,6 +4,40 @@ import * as path from 'path'; export const ENV_FILES: string[] = ['.env']; +function stripJunk(raw: string) { + return raw + .replace(/^\uFEFF/, '') + .replace(/[\u200B-\u200D\uFEFF]/g, '') + .trim() + .replace(/^['"]|['"]$/g, ''); +} + +function sanitizeHexPrivateKey(raw: string | undefined): `0x${string}` { + if (!raw) throw new Error('OPERATOR_PRIVATE_KEY is not set'); + + let v = stripJunk(raw); + + if (/^[0-9a-fA-F]{64}$/.test(v)) v = '0x' + v; + + if (!/^0x[0-9a-fA-F]{64}$/.test(v)) { + const sample = v.slice(0, 12); + throw new Error( + `OPERATOR_PRIVATE_KEY has invalid format. Got "${sample}..." (len=${v.length}). Expected 0x + 64 hex.`, + ); + } + return v as `0x${string}`; +} + +function sanitizeEthAddress(raw: string | undefined): `0x${string}` { + if (!raw) throw new Error('OPERATOR_ADDRESS is not set'); + + const v = stripJunk(raw); + if (!/^0x[0-9a-fA-F]{40}$/.test(v)) { + throw new Error(`OPERATOR_ADDRESS has invalid format: "${v}". Expected 0x + 40 hex.`); + } + return v as `0x${string}`; +} + /** * Configures dotenv loading order: * 1) Base files from ENV_FILES (in order added) @@ -11,12 +45,9 @@ export const ENV_FILES: string[] = ['.env']; * 3) .env.development or .env.production (last, always loaded with override) * * Any other NODE_ENV values are coerced to "development". - * - * @param {string} [basePath='./'] - Base path where .env files are located. */ export function configureDotEnv(basePath = './'): void { const baseDir = basePath.endsWith(path.sep) ? basePath : `${basePath}${path.sep}`; - const nodeEnvNormalized = process.env.NODE_ENV === 'production' ? 'production' : 'development'; for (const file of ENV_FILES) { @@ -24,11 +55,19 @@ export function configureDotEnv(basePath = './'): void { dotenv.config({ path: p, override: false }); } - const localFilePath = path.resolve(baseDir, '.env.local'); - dotenv.config({ path: localFilePath, override: true }); + dotenv.config({ path: path.resolve(baseDir, '.env.local'), override: true }); - const envFilePath = path.resolve(baseDir, `.env.${nodeEnvNormalized}`); - dotenv.config({ path: envFilePath, override: true }); + dotenv.config({ path: path.resolve(baseDir, `.env.${nodeEnvNormalized}`), override: true }); + + try { + process.env.OPERATOR_PRIVATE_KEY = sanitizeHexPrivateKey(process.env.OPERATOR_PRIVATE_KEY); + } catch (e) { + throw e; + } + + if (process.env.OPERATOR_ADDRESS) { + process.env.OPERATOR_ADDRESS = sanitizeEthAddress(process.env.OPERATOR_ADDRESS); + } } configureDotEnv(); diff --git a/src/utils/resolvePrivateKey.ts b/src/utils/resolvePrivateKey.ts new file mode 100644 index 0000000..486f77d --- /dev/null +++ b/src/utils/resolvePrivateKey.ts @@ -0,0 +1,18 @@ +export function resolvePrivateKey(raw: string | undefined): `0x${string}` { + if (!raw) throw new Error('OPERATOR_PRIVATE_KEY is not set'); + + let t = raw + .trim() + .replace(/^['"]|['"]$/g, '') + .replace(/\s+/g, ''); + + if (/^[0-9a-fA-F]{64}$/.test(t)) t = '0x' + t; + + if (!/^0x[0-9a-fA-F]{64}$/.test(t)) { + const sample = t.slice(0, 10); + throw new Error( + `OPERATOR_PRIVATE_KEY has invalid format. Got: "${sample}..." (len=${t.length}). Expected 0x + 64 hex.`, + ); + } + return t as `0x${string}`; +} From 4f074aa266dde32d9deb782a36a9b0d1ed048dac Mon Sep 17 00:00:00 2001 From: Ilyak Date: Mon, 20 Oct 2025 11:28:51 -0600 Subject: [PATCH 4/6] fix: returned info about base env --- src/utils/configureDotEnv.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/utils/configureDotEnv.ts b/src/utils/configureDotEnv.ts index ebad1a5..38e1973 100644 --- a/src/utils/configureDotEnv.ts +++ b/src/utils/configureDotEnv.ts @@ -45,7 +45,9 @@ function sanitizeEthAddress(raw: string | undefined): `0x${string}` { * 3) .env.development or .env.production (last, always loaded with override) * * Any other NODE_ENV values are coerced to "development". - */ + * + * @param {string} [basePath='./'] - Base path where .env files are located. + * */ export function configureDotEnv(basePath = './'): void { const baseDir = basePath.endsWith(path.sep) ? basePath : `${basePath}${path.sep}`; const nodeEnvNormalized = process.env.NODE_ENV === 'production' ? 'production' : 'development'; From 8f083af0e939e8f0d398b26b5e2e1e924d1df026 Mon Sep 17 00:00:00 2001 From: Ilyak Date: Wed, 22 Oct 2025 14:41:48 -0600 Subject: [PATCH 5/6] fix: errors while inititalize app --- src/managers/Relayer.ts | 10 ++++++---- src/utils/RelayerJobQueue.ts | 4 ++-- src/utils/configureDotEnv.ts | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/managers/Relayer.ts b/src/managers/Relayer.ts index add47af..531dfcb 100644 --- a/src/managers/Relayer.ts +++ b/src/managers/Relayer.ts @@ -297,10 +297,11 @@ export class Relayer extends ManagerBase { chainSelector: network.chainSelector, }); - this.txMonitor.ensureTxFinality( + this.txMonitor.trackTxFinality( txHash, network.name, - this.onFinalityCallback.bind(this), + //todo + 'relayer' ); }); @@ -438,10 +439,11 @@ export class Relayer extends ManagerBase { totalGasLimit, }); - this.txMonitor.ensureTxFinality( + this.txMonitor.trackTxFinality( submissionTxHash, dstChain.name, - this.onFinalityCallback.bind(this), + //todo + 'relayer' ); } catch (err) { this.logger.error( diff --git a/src/utils/RelayerJobQueue.ts b/src/utils/RelayerJobQueue.ts index 842e26d..ab6fde4 100644 --- a/src/utils/RelayerJobQueue.ts +++ b/src/utils/RelayerJobQueue.ts @@ -50,7 +50,7 @@ export class RelayerJobQueue { where: { jobType_txHash: { jobType: 'report-request', txHash: messageId } }, update: { chainName, - payload: JSON.stringify(payload), + payload: JSON.stringify(payload, (_, v) => typeof v === 'bigint' ? v.toString() : v), status: 'pending', nextRetryAt: next, }, @@ -58,7 +58,7 @@ export class RelayerJobQueue { jobType: 'report-request', chainName, txHash: messageId, - payload: JSON.stringify(payload), + payload: JSON.stringify(payload, (_, v) => typeof v === 'bigint' ? v.toString() : v), status: 'pending', attempts: 0, nextRetryAt: next, diff --git a/src/utils/configureDotEnv.ts b/src/utils/configureDotEnv.ts index 38e1973..e0930c1 100644 --- a/src/utils/configureDotEnv.ts +++ b/src/utils/configureDotEnv.ts @@ -54,7 +54,7 @@ export function configureDotEnv(basePath = './'): void { for (const file of ENV_FILES) { const p = path.resolve(baseDir, file); - dotenv.config({ path: p, override: false }); + dotenv.config({ path: p, override: true }); } dotenv.config({ path: path.resolve(baseDir, '.env.local'), override: true }); From 1e4f398346a703da875ca5653bc468454f900081 Mon Sep 17 00:00:00 2001 From: Ilyak Date: Wed, 22 Oct 2025 14:50:47 -0600 Subject: [PATCH 6/6] fix: add JSON serializer --- src/utils/RelayerJobQueue.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/RelayerJobQueue.ts b/src/utils/RelayerJobQueue.ts index ab6fde4..40a72ed 100644 --- a/src/utils/RelayerJobQueue.ts +++ b/src/utils/RelayerJobQueue.ts @@ -34,7 +34,7 @@ export class RelayerJobQueue { jobType, chainName, txHash, - payload: JSON.stringify(payload), + payload: JSON.stringify(payload, (_, v) => typeof v === 'bigint' ? v.toString() : v), }, }); }