From 8f61ebbc9fc7f092c495c4e8f4707c65fa5eeaab Mon Sep 17 00:00:00 2001 From: ucwong Date: Thu, 14 Aug 2025 02:32:14 +0800 Subject: [PATCH 1/3] sync latest block --- monitor.go | 99 ++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 67 insertions(+), 32 deletions(-) diff --git a/monitor.go b/monitor.go index d08fad7..efd108f 100644 --- a/monitor.go +++ b/monitor.go @@ -20,6 +20,7 @@ import ( "context" "errors" "math" + //"sort" "path/filepath" "runtime" "sync" @@ -444,52 +445,59 @@ func (m *Monitor) syncLatestBlock() { defer m.wg.Done() timer := time.NewTimer(time.Second * params.QueryTimeInterval) defer timer.Stop() - progress, counter, end := uint64(0), 0, false + + counter := 0 + + // Helper function to determine the next delay based on sync progress + getNextDelay := func(progress uint64) time.Duration { + if progress >= delay { + return 0 // Trigger immediately + } + if progress > 1 { + return time.Millisecond * 2000 + } + if progress == 1 { + return time.Millisecond * 6750 + } + + // If progress is 0, check for listener and checkpoint status + if !m.listen && ((m.ckp != nil && m.currentNumber.Load() >= m.ckp.TfsCheckPoint) || (m.ckp == nil && m.currentNumber.Load() > 0)) { + // This part seems to have a specific termination or pause logic + // The original code has some commented-out `return`, so I'm assuming it's a "steady state" delay. + return time.Millisecond * 6750 + } + + return time.Millisecond * 6750 // Default case for other conditions + } + for { select { case sv := <-m.srvCh: if err := m.doSwitch(sv); err != nil { log.Error("Service switch failed", "srv", sv, "err", err) } + case <-timer.C: - progress = m.syncLastBlock() - // Avoid sync in full mode, fresh interval may be less. - if progress >= delay { - end = false - timer.Reset(time.Millisecond * 0) - } else if progress > 1 { - end = false - timer.Reset(time.Millisecond * 2000) - } else if progress == 1 { - end = true - timer.Reset(time.Millisecond * 6750) - } else { - if !m.listen { - if (m.ckp != nil && m.currentNumber.Load() >= m.ckp.TfsCheckPoint) || (m.ckp == nil && m.currentNumber.Load() > 0) { - if !end { - end = true - timer.Reset(time.Millisecond * 6750) - continue - } - m.fs.Flush() - //elapsed := time.Duration(mclock.Now()) - time.Duration(m.start) - //log.Debug("Finish sync, listener will be paused", "current", m.currentNumber.Load(), "elapsed", common.PrettyDuration(elapsed), "progress", progress, "end", end, "last", m.lastNumber.Load()) - //return - timer.Reset(time.Millisecond * 6750) - end = false - continue - } - } - timer.Reset(time.Millisecond * 6750) - } + progress := m.syncLastBlock() + + // Determine the next delay and reset the timer + nextDelay := getNextDelay(progress) + timer.Reset(nextDelay) + + // Log status periodically counter++ if counter%100 == 0 { - log.Info("Monitor status", "blocks", progress, "current", m.CurrentNumber(), "latest", m.lastNumber.Load(), "end", end, "txs", m.fs.Txs(), "ckp", m.fs.CheckPoint(), "last", m.fs.LastListenBlockNumber()) + log.Info("Monitor status", "blocks", progress, "current", m.CurrentNumber(), "latest", m.lastNumber.Load(), "txs", m.fs.Txs(), "ckp", m.fs.CheckPoint(), "last", m.fs.LastListenBlockNumber()) counter = 0 } + + // Always flush at the end of a timer cycle m.fs.Flush() + case <-m.exitCh: log.Info("Block syncer stopped") + // Flush one last time before returning + m.fs.Flush() return } } @@ -513,6 +521,33 @@ func (m *Monitor) skip(i uint64) bool { return false } +/*func (m *Monitor) skip(i uint64) bool { + if m.srv.Load() != SRV_MODEL { + return false + } + + if len(m.ckp.Skips) == 0 || i > m.ckp.Skips[len(m.ckp.Skips)-1].To || i < m.ckp.Skips[0].From { + return false + } + + // Use sort.Search to find the index of the first skip interval + // whose 'From' field is greater than or equal to i. + idx := sort.Search(len(m.ckp.Skips), func(j int) bool { + return m.ckp.Skips[j].From > i + }) + + // Adjust the index to check the interval that might contain i. + // If idx is 0, no interval starts at or before i. + if idx > 0 { + interval := m.ckp.Skips[idx-1] + if i >= interval.From && i < interval.To { + return true + } + } + + return false +}*/ + func (m *Monitor) syncLastBlock() uint64 { /*currentNumber, err := m.currentBlock() if err != nil { From 8bf5391ff67f05019d021b4ba89f350cc28a0d91 Mon Sep 17 00:00:00 2001 From: ucwong Date: Thu, 14 Aug 2025 04:09:06 +0800 Subject: [PATCH 2/3] model & service --- model_srv.go | 107 ++++++++++++++++++++++++++++++++++----------------- monitor.go | 6 +-- service.go | 73 +++++++++++++++++++++-------------- 3 files changed, 120 insertions(+), 66 deletions(-) diff --git a/model_srv.go b/model_srv.go index cfa4f35..355edab 100644 --- a/model_srv.go +++ b/model_srv.go @@ -29,107 +29,141 @@ import ( ) func (m *Monitor) parseFileMeta(tx *types.Transaction, meta *types.FileMeta, b *types.Block) error { - log.Debug("Monitor", "FileMeta", meta) + log.Debug("Parsing FileMeta", "infoHash", meta.InfoHash) + // Step 1: Get transaction receipt receipt, err := m.getReceipt(tx.Hash.String()) if err != nil { + log.Error("Failed to get receipt", "txHash", tx.Hash.String(), "err", err) return err } + // Step 2: Validate receipt if receipt.ContractAddr == nil { - log.Warn("contract address is nil, waiting for indexing", "tx.Hash.String()", tx.Hash.String()) - return errors.New("contract address is nil") + // More descriptive error message is better + err = errors.New("contract address is nil") + log.Warn("Contract address is nil, unable to proceed", "txHash", tx.Hash.String()) + return err } - log.Debug("Transaction Receipt", "address", receipt.ContractAddr.String(), "gas", receipt.GasUsed, "status", receipt.Status) //, "tx", receipt.TxHash.String()) - if receipt.Status != 1 { - log.Warn("receipt.Status is wrong", "receipt.Status", receipt.Status) - return nil + log.Warn("Transaction receipt status is not successful", "txHash", tx.Hash.String(), "status", receipt.Status) + return nil // Return nil for unsuccessful transactions as it's a valid state } - log.Debug("Meta data", "meta", meta) + log.Debug("Transaction receipt OK", "address", receipt.ContractAddr.String(), "gas", receipt.GasUsed) + // Step 3: Create and add file information info := m.fs.NewFileInfo(meta) - info.LeftSize = meta.RawSize info.ContractAddr = receipt.ContractAddr info.Relate = append(info.Relate, *info.ContractAddr) + op, update, err := m.fs.AddFile(info) if err != nil { - log.Warn("Create file failed", "err", err) + log.Warn("Failed to add file to filesystem", "infoHash", meta.InfoHash, "err", err) return err } + + // Step 4: Handle file download if it's a new file if update && op == 1 { - log.Debug("Create new file", "ih", meta.InfoHash, "op", op) + log.Debug("New file created, initiating download", "infoHash", meta.InfoHash) - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() + sizeLimit := uint64(0) if m.mode == params.FULL { - return m.download(ctx, meta.InfoHash, 512*1024) - } else { - return m.download(ctx, meta.InfoHash, 0) + sizeLimit = 512 * 1024 // Set a specific size limit for full mode } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + return m.download(ctx, meta.InfoHash, sizeLimit) } + return nil } func (m *Monitor) parseBlockTorrentInfo(b *types.Block) (bool, error) { + start := mclock.Now() var ( record bool - start = mclock.Now() final []types.Transaction ) for _, tx := range b.Txs { + // Attempt to parse transaction for file metadata if meta := tx.Parse(); meta != nil { - log.Debug("Data encounter", "ih", meta.InfoHash, "number", b.Number, "meta", meta) + log.Debug("Data encounter", "infoHash", meta.InfoHash, "blockNumber", b.Number) if err := m.parseFileMeta(&tx, meta, b); err != nil { - log.Error("Parse file meta error", "err", err, "number", b.Number) + log.Error("Parse file meta failed", "error", err, "blockNumber", b.Number, "txHash", tx.Hash) return false, err } record = true final = append(final, tx) - } else if tx.IsFlowControl() { + continue + } + + // Handle flow control transactions + if tx.IsFlowControl() { + // Use guard clauses to reduce nesting if tx.Recipient == nil { continue } + file := m.fs.GetFileByAddr(*tx.Recipient) if file == nil { continue } + receipt, err := m.getReceipt(tx.Hash.String()) if err != nil { + log.Error("Failed to get receipt for flow control tx", "error", err, "txHash", tx.Hash) return false, err } + if receipt.Status != 1 || receipt.GasUsed != params.UploadGas { continue } + + // All checks passed, process the flow control transaction remainingSize, err := m.getRemainingSize((*tx.Recipient).String()) if err != nil { - log.Error("Get remain failed", "err", err, "addr", (*tx.Recipient).String()) + log.Error("Get remaining size failed", "error", err, "addr", (*tx.Recipient).String()) return false, err } + if file.LeftSize > remainingSize { file.LeftSize = remainingSize - if _, progress, err := m.fs.AddFile(file); err != nil { + _, progress, err := m.fs.AddFile(file) + if err != nil { return false, err - } else if progress { - log.Debug("Update storage success", "ih", file.Meta.InfoHash, "left", file.LeftSize) - var bytesRequested uint64 + } + + if progress { + bytesRequested := uint64(0) if file.Meta.RawSize > file.LeftSize { bytesRequested = file.Meta.RawSize - file.LeftSize } + + logMsg := "Data processing..." if file.LeftSize == 0 { - log.Debug("Data processing completed !!!", "ih", file.Meta.InfoHash, "addr", (*tx.Recipient).String(), "remain", common.StorageSize(remainingSize), "request", common.StorageSize(bytesRequested), "raw", common.StorageSize(file.Meta.RawSize), "number", b.Number) - } else { - log.Debug("Data processing ...", "ih", file.Meta.InfoHash, "addr", (*tx.Recipient).String(), "remain", common.StorageSize(remainingSize), "request", common.StorageSize(bytesRequested), "raw", common.StorageSize(file.Meta.RawSize), "number", b.Number) + logMsg = "Data processing completed!" } + + log.Debug(logMsg, + "infoHash", file.Meta.InfoHash, + "addr", (*tx.Recipient).String(), + "remain", common.StorageSize(remainingSize), + "request", common.StorageSize(bytesRequested), + "raw", common.StorageSize(file.Meta.RawSize), + "blockNumber", b.Number) + ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() if err := m.download(ctx, file.Meta.InfoHash, bytesRequested); err != nil { + cancel() // Ensure cancel is called on error return false, err } + cancel() // Call cancel when download is successful } } record = true @@ -137,22 +171,25 @@ func (m *Monitor) parseBlockTorrentInfo(b *types.Block) (bool, error) { } } + // Update block transactions if necessary if len(final) > 0 && len(final) < len(b.Txs) { - log.Debug("Final txs layout", "total", len(b.Txs), "final", len(final), "num", b.Number, "txs", m.fs.Txs()) + log.Debug("Txs filtered", "total", len(b.Txs), "final", len(final), "blockNumber", b.Number) b.Txs = final } + // Add block to filesystem if any relevant transactions were found if record { - if err := m.fs.AddBlock(b); err == nil { - log.Debug("Root has been changed", "number", b.Number, "hash", b.Hash, "root", m.fs.Root()) - } else { - log.Warn("Block added failed", "number", b.Number, "hash", b.Hash, "root", m.fs.Root(), "err", err) + if err := m.fs.AddBlock(b); err != nil { + log.Warn("Block added failed", "blockNumber", b.Number, "blockHash", b.Hash, "root", m.fs.Root(), "error", err) + return false, err // Return the error if adding the block fails } + log.Debug("Block added to filesystem", "blockNumber", b.Number, "blockHash", b.Hash, "root", m.fs.Root()) } + // Log transaction scanning time if len(b.Txs) > 0 { elapsed := time.Duration(mclock.Now()) - time.Duration(start) - log.Trace("Transactions scanning", "count", len(b.Txs), "number", b.Number, "elapsed", common.PrettyDuration(elapsed)) + log.Trace("Transaction scanning complete", "count", len(b.Txs), "blockNumber", b.Number, "elapsed", common.PrettyDuration(elapsed)) } return record, nil diff --git a/monitor.go b/monitor.go index efd108f..a81360e 100644 --- a/monitor.go +++ b/monitor.go @@ -485,9 +485,9 @@ func (m *Monitor) syncLatestBlock() { timer.Reset(nextDelay) // Log status periodically - counter++ - if counter%100 == 0 { - log.Info("Monitor status", "blocks", progress, "current", m.CurrentNumber(), "latest", m.lastNumber.Load(), "txs", m.fs.Txs(), "ckp", m.fs.CheckPoint(), "last", m.fs.LastListenBlockNumber()) + counter += int(progress) + if counter > 65536 { + log.Info("Monitor status", "blocks", progress, "current", m.CurrentNumber(), "latest", m.lastNumber.Load(), "txs", m.fs.Txs(), "ckp", m.fs.CheckPoint(), "last", m.fs.LastListenBlockNumber(), "progress", progress) counter = 0 } diff --git a/service.go b/service.go index 1b86e3b..5938172 100644 --- a/service.go +++ b/service.go @@ -138,41 +138,58 @@ func (m *Monitor) forRecordService(block *types.Block) error { } func (m *Monitor) forModelService(block *types.Block) error { - i := block.Number - if i%65536 == 0 { - defer func() { - //elapsedA := time.Duration(mclock.Now()) - time.Duration(m.start) - //log.Info("Nas monitor", "start", m.startNumber.Load(), "max", uint64(m.CurrentNumber()), "last", m.lastNumber.Load(), "cur", i, "bps", math.Abs(float64(i)-float64(m.startNumber.Load()))*1000*1000*1000/float64(elapsedA), "elapsed", common.PrettyDuration(elapsedA), "scope", m.scope, "db", common.PrettyDuration(m.fs.Metrics()), "blocks", len(m.fs.Blocks()), "txs", m.fs.Txs(), "files", len(m.fs.Files()), "root", m.fs.Root()) - m.fs.SkipPrint() - }() + blockNumber := block.Number + + // Step 1: Handle periodic operations (e.g., every 65536 blocks) + if blockNumber%65536 == 0 { + defer m.fs.SkipPrint() } - if hash, suc := m.blockCache.Get(i); !suc || hash != block.Hash.Hex() { - if record, parseErr := m.parseBlockTorrentInfo(block); parseErr != nil { - log.Error("Parse new block", "number", block.Number, "block", block, "error", parseErr) - return parseErr - } else if record { - elapsed := time.Duration(mclock.Now()) - time.Duration(m.start) + // Step 2: Check if block is already processed in cache + hashInCache, found := m.blockCache.Get(blockNumber) + if found && hashInCache == block.Hash.Hex() { + return nil // Block already processed, do nothing + } - if m.ckp != nil && m.ckp.TfsCheckPoint > 0 && i == m.ckp.TfsCheckPoint { - if common.BytesToHash(m.fs.GetRoot(i)) == m.ckp.TfsRoot { - log.Warn("FIRST MILESTONE PASS", "number", i, "root", m.fs.Root(), "blocks", len(m.fs.Blocks()), "txs", m.fs.Txs(), "files", len(m.fs.Files()), "elapsed", common.PrettyDuration(elapsed)) - } else { - log.Error("Fs checkpoint failed", "number", i, "root", m.fs.Root(), "blocks", len(m.fs.Blocks()), "files", len(m.fs.Files()), "txs", m.fs.Txs(), "elapsed", common.PrettyDuration(elapsed), "exp", m.ckp.TfsRoot, "leaves", len(m.fs.Leaves())) - panic("FIRST MILESTONE ERROR, run './cortex removedb' command to solve this problem") - } - } + // Step 3: Parse transactions in the block + record, parseErr := m.parseBlockTorrentInfo(block) + if parseErr != nil { + log.Error("Failed to parse block transactions", "number", blockNumber, "error", parseErr) + return parseErr + } - log.Debug("Seal fs record", "number", i, "record", record, "root", m.fs.Root().Hex(), "blocks", len(m.fs.Blocks()), "txs", m.fs.Txs(), "files", len(m.fs.Files()), "ckp", m.fs.CheckPoint()) - } else { - if m.fs.LastListenBlockNumber() < i { - m.fs.Anchor(i) - } + // Step 4: Handle checkpoint logic if this is a record-carrying block + if record { + m.handleMilestoneCheckpoint(block) + } - log.Trace("Confirm to seal the fs record", "number", i) + // Step 5: Seal the record or anchor the filesystem + if record { + log.Debug("Sealing fs record", "number", blockNumber, "root", m.fs.Root().Hex(), "blocks", len(m.fs.Blocks()), "txs", m.fs.Txs(), "files", len(m.fs.Files()), "ckp", m.fs.CheckPoint()) + } else { + if m.fs.LastListenBlockNumber() < blockNumber { + m.fs.Anchor(blockNumber) } - m.blockCache.Add(i, block.Hash.Hex()) + log.Trace("Confirmed to seal fs record", "number", blockNumber) } + // Step 6: Add the new block to the cache + m.blockCache.Add(blockNumber, block.Hash.Hex()) return nil } + +// handleMilestoneCheckpoint encapsulates the logic for the TFS checkpoint. +func (m *Monitor) handleMilestoneCheckpoint(block *types.Block) { + if m.ckp == nil || m.ckp.TfsCheckPoint == 0 || block.Number != m.ckp.TfsCheckPoint { + return // Not at a checkpoint + } + + elapsed := time.Duration(mclock.Now()) - time.Duration(m.start) + + if common.BytesToHash(m.fs.GetRoot(block.Number)) == m.ckp.TfsRoot { + log.Warn("FIRST MILESTONE PASSED successfully", "number", block.Number, "root", m.fs.Root(), "blocks", len(m.fs.Blocks()), "txs", m.fs.Txs(), "files", len(m.fs.Files()), "elapsed", common.PrettyDuration(elapsed)) + } else { + log.Error("Filesystem checkpoint failed", "number", block.Number, "root", m.fs.Root(), "blocks", len(m.fs.Blocks()), "files", len(m.fs.Files()), "txs", m.fs.Txs(), "elapsed", common.PrettyDuration(elapsed), "expected", m.ckp.TfsRoot, "leaves", len(m.fs.Leaves())) + panic("FIRST MILESTONE ERROR, run './cortex removedb' command to solve this problem") + } +} From b6cbb9ff7cccb67d805b6ab5f4e1c44419dbb8d1 Mon Sep 17 00:00:00 2001 From: ucwong Date: Thu, 14 Aug 2025 04:29:26 +0800 Subject: [PATCH 3/3] tiny --- monitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monitor.go b/monitor.go index a81360e..198194f 100644 --- a/monitor.go +++ b/monitor.go @@ -487,7 +487,7 @@ func (m *Monitor) syncLatestBlock() { // Log status periodically counter += int(progress) if counter > 65536 { - log.Info("Monitor status", "blocks", progress, "current", m.CurrentNumber(), "latest", m.lastNumber.Load(), "txs", m.fs.Txs(), "ckp", m.fs.CheckPoint(), "last", m.fs.LastListenBlockNumber(), "progress", progress) + log.Info("Monitor status", "blocks", progress, "current", m.CurrentNumber(), "latest", m.lastNumber.Load(), "txs", m.fs.Txs(), "ckp", m.fs.CheckPoint(), "last", m.fs.LastListenBlockNumber(), "progress", progress, "root", m.fs.Root()) counter = 0 }