From 04f4b6b3c5dce6e539ebc9b59813f1118847b905 Mon Sep 17 00:00:00 2001 From: ucwong Date: Fri, 15 Aug 2025 01:29:54 +0800 Subject: [PATCH] optimized --- monitor.go | 129 +++++++++++++++++++++++++++++------------------------ 1 file changed, 70 insertions(+), 59 deletions(-) diff --git a/monitor.go b/monitor.go index 198194f..8a1845a 100644 --- a/monitor.go +++ b/monitor.go @@ -549,117 +549,128 @@ func (m *Monitor) skip(i uint64) bool { }*/ func (m *Monitor) syncLastBlock() uint64 { - /*currentNumber, err := m.currentBlock() - if err != nil { - return 0 - }*/ - currentNumber := m.currentNumber.Load() + lastNumber := m.lastNumber.Load() - if currentNumber < m.lastNumber.Load() { - log.Warn("Fs sync rollback", "current", currentNumber, "last", m.lastNumber.Load(), "offset", m.lastNumber.Load()-currentNumber) + // Step 1: Handle rollback logic if current block number is less than the last processed number. + if currentNumber < lastNumber { + log.Warn("Fs sync rollback detected", "current", currentNumber, "last", lastNumber, "offset", lastNumber-currentNumber) + rollbackNumber := uint64(0) if currentNumber > 65536 { - m.lastNumber.Store(currentNumber - 65536) - } else { - m.lastNumber.Store(0) + rollbackNumber = currentNumber - 65536 } - m.startNumber.Store(m.lastNumber.Load()) + m.lastNumber.Store(rollbackNumber) + m.startNumber.Store(rollbackNumber) } + // Step 2: Determine the block range for this sync batch. minNumber := m.lastNumber.Load() + 1 - maxNumber := uint64(0) + maxNumber := currentNumber if currentNumber > delay { maxNumber = currentNumber - delay - //maxNumber = currentNumber } + // If the last processed block is unexpectedly higher than the current block (after rollback check), + // this indicates a need to sync backward from the last number. if m.lastNumber.Load() > currentNumber { - if m.lastNumber.Load() > batch { - minNumber = m.lastNumber.Load() - batch + minNumber = m.lastNumber.Load() - batch + if m.lastNumber.Load() < batch { + minNumber = 0 // Avoids underflow if lastNumber is smaller than batch } } - if maxNumber > batch+minNumber { + // Adjust maxNumber to not exceed the batch size. + if maxNumber > minNumber+batch { maxNumber = minNumber + batch } - // replay - if minNumber >= delay { - //minNumber = minNumber - delay - } - if maxNumber < minNumber { return 0 } - //if m.start == 0 { m.start = mclock.Now() - //} - counter := 0 - for i := minNumber; i <= maxNumber; { // i++ { + processedCount := 0 + for i := minNumber; i <= maxNumber; { if m.terminated.Load() { - log.Warn("Fs scan terminated", "number", i) + log.Warn("Fs scan terminated by signal", "lastProcessed", i-1) m.lastNumber.Store(i - 1) return 0 } - //if maxNumber > minNumber && i%2048 == 0 { - // log.Info("Running", "min", minNumber, "max", maxNumber, "cur", currentNumber, "last", m.lastNumber.Load(), "batch", batch, "i", i, "srv", m.srv.Load(), "count", maxNumber-minNumber, "progress", float64(i)/float64(currentNumber)) - //} + if m.ckp != nil && m.skip(i) { i++ continue } - if maxNumber-i >= m.scope { - blocks, rpcErr := m.rpcBatchBlockByNumber(i, i+m.scope) - if rpcErr != nil { - log.Error("Batch sync old block failed", "current", maxNumber, "number", i, "batch", m.scope, "error", rpcErr) - m.lastNumber.Store(i - 1) - return 0 + // Step 3: Fetch blocks in a batch or individually based on remaining scope. + remainingScope := maxNumber - i + if remainingScope >= m.scope { + // Process a batch of blocks + blocks, err := m.rpcBatchBlockByNumber(i, i+m.scope) + if err != nil { + return m.handleSyncError("Batch sync old block failed", err, i-1) } - // batch blocks operation according service category - for _, rpcBlock := range blocks { - m.taskCh <- rpcBlock + // Send blocks to the processing channel + for _, block := range blocks { + m.taskCh <- block } - size := len(blocks) - for n := 0; n < size; n++ { + // Wait for the processing results for the entire batch + for range blocks { select { case err := <-m.errCh: if err != nil { - m.lastNumber.Store(i - 1) - log.Error("solve err", "err", err, "last", m.lastNumber.Load(), "i", i, "scope", m.scope, "min", minNumber, "max", maxNumber, "cur", currentNumber) - return 0 + return m.handleSyncError("Processing error", err, i-1) } case <-m.exitCh: + log.Info("Task checker quit signal received") m.lastNumber.Store(i - 1) - log.Info("Task checker quit") return 0 } } - i += uint64(size) - counter += size + + i += uint64(len(blocks)) + processedCount += len(blocks) } else { - rpcBlock, rpcErr := m.rpcBlockByNumber(i) - if rpcErr != nil { - log.Error("Sync old block failed", "number", i, "error", rpcErr) - m.lastNumber.Store(i - 1) - return 0 + // Process a single block + block, err := m.rpcBlockByNumber(i) + if err != nil { + return m.handleSyncError("Sync old block failed", err, i-1) } - if err := m.solve(rpcBlock); err != nil { - log.Error("solve err", "err", err) - m.lastNumber.Store(i - 1) - return 0 + + if err := m.solve(block); err != nil { + return m.handleSyncError("solve err", err, i-1) } + i++ - counter++ + processedCount++ } } - //log.Debug("Last number changed", "min", minNumber, "max", maxNumber, "cur", currentNumber, "last", m.lastNumber.Load(), "batch", batch) + + // Step 4: Finalize the sync operation. m.lastNumber.Store(maxNumber) - elapsedA := time.Duration(mclock.Now()) - time.Duration(m.start) - log.Debug("Chain segment frozen", "from", minNumber, "to", maxNumber, "range", uint64(maxNumber-minNumber+1), "counter", counter, "scope", m.scope, "current", m.CurrentNumber(), "prog", float64(maxNumber)/float64(m.CurrentNumber()), "last", m.lastNumber.Load(), "bps", float64(counter)*1000*1000*1000/float64(elapsedA), "elapsed", common.PrettyDuration(elapsedA)) + elapsed := time.Duration(mclock.Now()) - time.Duration(m.start) + + log.Debug("Chain segment frozen", + "from", minNumber, + "to", maxNumber, + "totalBlocks", uint64(maxNumber-minNumber+1), + "processed", processedCount, + "scope", m.scope, + "current", m.CurrentNumber(), + "last", m.lastNumber.Load(), + "elapsed", common.PrettyDuration(elapsed), + "bps", float64(processedCount)/elapsed.Seconds(), + ) + return uint64(maxNumber - minNumber) } + +// handleSyncError is a helper function to log an error, update the last processed block number, and return 0. +func (m *Monitor) handleSyncError(msg string, err error, lastBlock uint64) uint64 { + log.Error(msg, "error", err, "lastProcessed", lastBlock) + m.lastNumber.Store(lastBlock) + return 0 +}