Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 70 additions & 59 deletions monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,117 +549,128 @@ func (m *Monitor) skip(i uint64) bool {
}*/

func (m *Monitor) syncLastBlock() uint64 {
/*currentNumber, err := m.currentBlock()
if err != nil {
return 0
}*/

currentNumber := m.currentNumber.Load()
lastNumber := m.lastNumber.Load()

if currentNumber < m.lastNumber.Load() {
log.Warn("Fs sync rollback", "current", currentNumber, "last", m.lastNumber.Load(), "offset", m.lastNumber.Load()-currentNumber)
// Step 1: Handle rollback logic if current block number is less than the last processed number.
if currentNumber < lastNumber {
log.Warn("Fs sync rollback detected", "current", currentNumber, "last", lastNumber, "offset", lastNumber-currentNumber)
rollbackNumber := uint64(0)
if currentNumber > 65536 {
m.lastNumber.Store(currentNumber - 65536)
} else {
m.lastNumber.Store(0)
rollbackNumber = currentNumber - 65536
}
m.startNumber.Store(m.lastNumber.Load())
m.lastNumber.Store(rollbackNumber)
m.startNumber.Store(rollbackNumber)
}

// Step 2: Determine the block range for this sync batch.
minNumber := m.lastNumber.Load() + 1
maxNumber := uint64(0)
maxNumber := currentNumber
if currentNumber > delay {
maxNumber = currentNumber - delay
//maxNumber = currentNumber
}

// If the last processed block is unexpectedly higher than the current block (after rollback check),
// this indicates a need to sync backward from the last number.
if m.lastNumber.Load() > currentNumber {
if m.lastNumber.Load() > batch {
minNumber = m.lastNumber.Load() - batch
minNumber = m.lastNumber.Load() - batch
if m.lastNumber.Load() < batch {
minNumber = 0 // Avoids underflow if lastNumber is smaller than batch
}
}

if maxNumber > batch+minNumber {
// Adjust maxNumber to not exceed the batch size.
if maxNumber > minNumber+batch {
maxNumber = minNumber + batch
}

// replay
if minNumber >= delay {
//minNumber = minNumber - delay
}

if maxNumber < minNumber {
return 0
}

//if m.start == 0 {
m.start = mclock.Now()
//}

counter := 0
for i := minNumber; i <= maxNumber; { // i++ {
processedCount := 0
for i := minNumber; i <= maxNumber; {
if m.terminated.Load() {
log.Warn("Fs scan terminated", "number", i)
log.Warn("Fs scan terminated by signal", "lastProcessed", i-1)
m.lastNumber.Store(i - 1)
return 0
}
//if maxNumber > minNumber && i%2048 == 0 {
// log.Info("Running", "min", minNumber, "max", maxNumber, "cur", currentNumber, "last", m.lastNumber.Load(), "batch", batch, "i", i, "srv", m.srv.Load(), "count", maxNumber-minNumber, "progress", float64(i)/float64(currentNumber))
//}

if m.ckp != nil && m.skip(i) {
i++
continue
}

if maxNumber-i >= m.scope {
blocks, rpcErr := m.rpcBatchBlockByNumber(i, i+m.scope)
if rpcErr != nil {
log.Error("Batch sync old block failed", "current", maxNumber, "number", i, "batch", m.scope, "error", rpcErr)
m.lastNumber.Store(i - 1)
return 0
// Step 3: Fetch blocks in a batch or individually based on remaining scope.
remainingScope := maxNumber - i
if remainingScope >= m.scope {
// Process a batch of blocks
blocks, err := m.rpcBatchBlockByNumber(i, i+m.scope)
if err != nil {
return m.handleSyncError("Batch sync old block failed", err, i-1)
}

// batch blocks operation according service category
for _, rpcBlock := range blocks {
m.taskCh <- rpcBlock
// Send blocks to the processing channel
for _, block := range blocks {
m.taskCh <- block
}

size := len(blocks)
for n := 0; n < size; n++ {
// Wait for the processing results for the entire batch
for range blocks {
select {
case err := <-m.errCh:
if err != nil {
m.lastNumber.Store(i - 1)
log.Error("solve err", "err", err, "last", m.lastNumber.Load(), "i", i, "scope", m.scope, "min", minNumber, "max", maxNumber, "cur", currentNumber)
return 0
return m.handleSyncError("Processing error", err, i-1)
}
case <-m.exitCh:
log.Info("Task checker quit signal received")
m.lastNumber.Store(i - 1)
log.Info("Task checker quit")
return 0
}
}
i += uint64(size)
counter += size

i += uint64(len(blocks))
processedCount += len(blocks)
} else {
rpcBlock, rpcErr := m.rpcBlockByNumber(i)
if rpcErr != nil {
log.Error("Sync old block failed", "number", i, "error", rpcErr)
m.lastNumber.Store(i - 1)
return 0
// Process a single block
block, err := m.rpcBlockByNumber(i)
if err != nil {
return m.handleSyncError("Sync old block failed", err, i-1)
}
if err := m.solve(rpcBlock); err != nil {
log.Error("solve err", "err", err)
m.lastNumber.Store(i - 1)
return 0

if err := m.solve(block); err != nil {
return m.handleSyncError("solve err", err, i-1)
}

i++
counter++
processedCount++
}
}
//log.Debug("Last number changed", "min", minNumber, "max", maxNumber, "cur", currentNumber, "last", m.lastNumber.Load(), "batch", batch)

// Step 4: Finalize the sync operation.
m.lastNumber.Store(maxNumber)
elapsedA := time.Duration(mclock.Now()) - time.Duration(m.start)
log.Debug("Chain segment frozen", "from", minNumber, "to", maxNumber, "range", uint64(maxNumber-minNumber+1), "counter", counter, "scope", m.scope, "current", m.CurrentNumber(), "prog", float64(maxNumber)/float64(m.CurrentNumber()), "last", m.lastNumber.Load(), "bps", float64(counter)*1000*1000*1000/float64(elapsedA), "elapsed", common.PrettyDuration(elapsedA))
elapsed := time.Duration(mclock.Now()) - time.Duration(m.start)

log.Debug("Chain segment frozen",
"from", minNumber,
"to", maxNumber,
"totalBlocks", uint64(maxNumber-minNumber+1),
"processed", processedCount,
"scope", m.scope,
"current", m.CurrentNumber(),
"last", m.lastNumber.Load(),
"elapsed", common.PrettyDuration(elapsed),
"bps", float64(processedCount)/elapsed.Seconds(),
)

return uint64(maxNumber - minNumber)
}

// handleSyncError is a helper function to log an error, update the last processed block number, and return 0.
func (m *Monitor) handleSyncError(msg string, err error, lastBlock uint64) uint64 {
log.Error(msg, "error", err, "lastProcessed", lastBlock)
m.lastNumber.Store(lastBlock)
return 0
}