diff --git a/debridge_node/src/modules/chain/scanning/services/AddNewEventsAction.ts b/debridge_node/src/modules/chain/scanning/services/AddNewEventsAction.ts index 3fc10c5..008a8b6 100644 --- a/debridge_node/src/modules/chain/scanning/services/AddNewEventsAction.ts +++ b/debridge_node/src/modules/chain/scanning/services/AddNewEventsAction.ts @@ -11,7 +11,6 @@ import { SubmissionProcessingService } from './SubmissionProcessingService'; import { TransformService } from './TransformService'; import { ProcessNewTransferResultStatusEnum } from '../enums/ProcessNewTransferResultStatusEnum'; import Contract from 'web3-eth-contract'; -import { SubmissionEntity } from 'src/entities/SubmissionEntity'; @Injectable() export class AddNewEventsAction { @@ -21,8 +20,6 @@ export class AddNewEventsAction { constructor( @InjectRepository(SupportedChainEntity) private readonly supportedChainRepository: Repository, - @InjectRepository(SubmissionEntity) - private readonly submissionsRepository: Repository, private readonly chainConfigService: ChainConfigService, private readonly web3Service: Web3Service, private readonly solanaReaderService: SolanaReaderService, @@ -76,24 +73,14 @@ export class AddNewEventsAction { logger.debug(`Getting events from block ${fromBlock} to ${toBlock} on ${supportedChain.network}`); // Handle invalid block range (fromBlock > toBlock) + // This can happen when the RPC returns a stale block number due to load balancing, + // network issues, or node sync delays. We reset to toBlock to prevent massive resyncs. if (fromBlock > toBlock) { - logger.error(`Invalid block range: fromBlock (${fromBlock}) > toBlock (${toBlock})`); - - // Find the latest block number for the given chainId from the submissions repository - const lastEvent = await this.submissionsRepository.findOne({ - where: { chainFrom: chainId }, - order: { blockNumber: 'DESC' }, // Get the highest block number - }); - - const newLatestBlock = lastEvent?.blockNumber ?? toBlock; - if (!lastEvent) { - logger.warn(`No events found for chainId ${chainId}. Using toBlock (${toBlock}) as latest.`); - } else { - logger.debug(`Found last event block number: ${newLatestBlock} for chainId ${chainId}`); - } - - await this.supportedChainRepository.update(chainId, { latestBlock: newLatestBlock }); - logger.log(`Updated latestBlock for chainId ${chainId} to ${newLatestBlock}`); + logger.warn( + `Stale RPC response detected: fromBlock (${fromBlock}) > toBlock (${toBlock}). ` + + `Resetting latestBlock to ${toBlock} to prevent resync from old blocks.` + ); + await this.supportedChainRepository.update(chainId, { latestBlock: toBlock }); return; } diff --git a/debridge_node/src/modules/chain/scanning/services/tests/AddNewEventsAction.spec.ts b/debridge_node/src/modules/chain/scanning/services/tests/AddNewEventsAction.spec.ts index 5c10a72..a4804c5 100644 --- a/debridge_node/src/modules/chain/scanning/services/tests/AddNewEventsAction.spec.ts +++ b/debridge_node/src/modules/chain/scanning/services/tests/AddNewEventsAction.spec.ts @@ -4,7 +4,6 @@ import { Web3Service } from '../../../../web3/services/Web3Service'; import { SolanaReaderService } from '../SolanaReaderService'; import { SubmissionProcessingService } from '../SubmissionProcessingService'; import { TransformService } from '../TransformService'; -import { SubmissionEntity } from '../../../../../entities/SubmissionEntity'; import { getRepositoryToken } from '@nestjs/typeorm'; import { Test, TestingModule } from '@nestjs/testing'; import { SupportedChainEntity } from '../../../../../entities/SupportedChainEntity'; @@ -50,31 +49,17 @@ describe('AddNewEventsAction', () => { const module: TestingModule = await Test.createTestingModule({ providers: [ - { - provide: getRepositoryToken(SubmissionEntity), - useValue: { - find: async () => { - return [ - { - submissionId: '123', - }, - ]; - }, - update: async () => { - return; - }, - }, - }, { provide: getRepositoryToken(SupportedChainEntity), useValue: { findOne: async chainId => { return { chainId, - latestBlock: 0, + latestBlock: 98, network: 'eth', } as SupportedChainEntity; }, + update: jest.fn().mockResolvedValue({}), }, }, ChainConfigService, @@ -159,4 +144,101 @@ describe('AddNewEventsAction', () => { ); expect(processMock).toBeCalledTimes(1); }); + + describe('stale RPC response handling', () => { + let staleRpcService: AddNewEventsAction; + let staleWeb3; + let staleGetPastEventsMock; + let staleProcessMock; + let updateMock; + + beforeEach(async () => { + staleProcessMock = jest.fn().mockResolvedValue({}); + staleGetPastEventsMock = jest.fn().mockResolvedValue([]); + updateMock = jest.fn().mockResolvedValue({}); + + // Simulate stale RPC: getBlockNumber returns 100, but latestBlock in DB is 200 + // This means fromBlock (200) > toBlock (99) after blockConfirmation + staleWeb3 = { + eth: { + setProvider: jest.fn().mockResolvedValue({}), + Contract: jest.fn().mockImplementation(() => { + return { + setProvider: jest.fn().mockResolvedValue({}), + getPastEvents: staleGetPastEventsMock, + }; + }), + getBlockNumber: jest.fn().mockResolvedValue(100), // RPC returns block 100 + }, + }; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + { + provide: getRepositoryToken(SupportedChainEntity), + useValue: { + findOne: async () => { + return { + chainId: 1, + latestBlock: 200, // DB has synced to block 200 (ahead of RPC) + network: 'eth', + } as SupportedChainEntity; + }, + update: updateMock, + }, + }, + { + provide: ChainConfigService, + useValue: { + get(chainId) { + return { + chainId, + isSolana: false, + maxBlockRange: 200, + blockConfirmation: 1, + debridgeAddr: 'debridgeAddr', + providers: 'providers', + }; + }, + }, + }, + { + provide: Web3Service, + useValue: { + web3HttpProvider: jest.fn().mockImplementation(() => staleWeb3), + }, + }, + { + provide: SolanaReaderService, + useValue: { + syncTransactions: jest.fn().mockResolvedValue({}), + }, + }, + { + provide: SubmissionProcessingService, + useValue: { + process: staleProcessMock, + }, + }, + TransformService, + AddNewEventsAction, + ], + }).compile(); + staleRpcService = module.get(AddNewEventsAction); + }); + + it('should reset latestBlock to toBlock when RPC returns stale block number', async () => { + await staleRpcService.action(1); + + // Should NOT fetch events when fromBlock > toBlock + expect(staleGetPastEventsMock).not.toBeCalled(); + + // Should NOT process any submissions + expect(staleProcessMock).not.toBeCalled(); + + // Should update latestBlock to toBlock (99) to prevent massive resync + // toBlock = getBlockNumber(100) - blockConfirmation(1) = 99 + expect(updateMock).toBeCalledWith(1, { latestBlock: 99 }); + }); + }); });