Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 72 additions & 35 deletions model_srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,130 +29,167 @@ import (
)

func (m *Monitor) parseFileMeta(tx *types.Transaction, meta *types.FileMeta, b *types.Block) error {
log.Debug("Monitor", "FileMeta", meta)
log.Debug("Parsing FileMeta", "infoHash", meta.InfoHash)

// Step 1: Get transaction receipt
receipt, err := m.getReceipt(tx.Hash.String())
if err != nil {
log.Error("Failed to get receipt", "txHash", tx.Hash.String(), "err", err)
return err
}

// Step 2: Validate receipt
if receipt.ContractAddr == nil {
log.Warn("contract address is nil, waiting for indexing", "tx.Hash.String()", tx.Hash.String())
return errors.New("contract address is nil")
// More descriptive error message is better
err = errors.New("contract address is nil")
log.Warn("Contract address is nil, unable to proceed", "txHash", tx.Hash.String())
return err
}

log.Debug("Transaction Receipt", "address", receipt.ContractAddr.String(), "gas", receipt.GasUsed, "status", receipt.Status) //, "tx", receipt.TxHash.String())

if receipt.Status != 1 {
log.Warn("receipt.Status is wrong", "receipt.Status", receipt.Status)
return nil
log.Warn("Transaction receipt status is not successful", "txHash", tx.Hash.String(), "status", receipt.Status)
return nil // Return nil for unsuccessful transactions as it's a valid state
}

log.Debug("Meta data", "meta", meta)
log.Debug("Transaction receipt OK", "address", receipt.ContractAddr.String(), "gas", receipt.GasUsed)

// Step 3: Create and add file information
info := m.fs.NewFileInfo(meta)

info.LeftSize = meta.RawSize
info.ContractAddr = receipt.ContractAddr
info.Relate = append(info.Relate, *info.ContractAddr)

op, update, err := m.fs.AddFile(info)
if err != nil {
log.Warn("Create file failed", "err", err)
log.Warn("Failed to add file to filesystem", "infoHash", meta.InfoHash, "err", err)
return err
}

// Step 4: Handle file download if it's a new file
if update && op == 1 {
log.Debug("Create new file", "ih", meta.InfoHash, "op", op)
log.Debug("New file created, initiating download", "infoHash", meta.InfoHash)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
sizeLimit := uint64(0)
if m.mode == params.FULL {
return m.download(ctx, meta.InfoHash, 512*1024)
} else {
return m.download(ctx, meta.InfoHash, 0)
sizeLimit = 512 * 1024 // Set a specific size limit for full mode
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

return m.download(ctx, meta.InfoHash, sizeLimit)
}

return nil
}

func (m *Monitor) parseBlockTorrentInfo(b *types.Block) (bool, error) {
start := mclock.Now()
var (
record bool
start = mclock.Now()
final []types.Transaction
)

for _, tx := range b.Txs {
// Attempt to parse transaction for file metadata
if meta := tx.Parse(); meta != nil {
log.Debug("Data encounter", "ih", meta.InfoHash, "number", b.Number, "meta", meta)
log.Debug("Data encounter", "infoHash", meta.InfoHash, "blockNumber", b.Number)
if err := m.parseFileMeta(&tx, meta, b); err != nil {
log.Error("Parse file meta error", "err", err, "number", b.Number)
log.Error("Parse file meta failed", "error", err, "blockNumber", b.Number, "txHash", tx.Hash)
return false, err
}
record = true
final = append(final, tx)
} else if tx.IsFlowControl() {
continue
}

// Handle flow control transactions
if tx.IsFlowControl() {
// Use guard clauses to reduce nesting
if tx.Recipient == nil {
continue
}

file := m.fs.GetFileByAddr(*tx.Recipient)
if file == nil {
continue
}

receipt, err := m.getReceipt(tx.Hash.String())
if err != nil {
log.Error("Failed to get receipt for flow control tx", "error", err, "txHash", tx.Hash)
return false, err
}

if receipt.Status != 1 || receipt.GasUsed != params.UploadGas {
continue
}

// All checks passed, process the flow control transaction
remainingSize, err := m.getRemainingSize((*tx.Recipient).String())
if err != nil {
log.Error("Get remain failed", "err", err, "addr", (*tx.Recipient).String())
log.Error("Get remaining size failed", "error", err, "addr", (*tx.Recipient).String())
return false, err
}

if file.LeftSize > remainingSize {
file.LeftSize = remainingSize
if _, progress, err := m.fs.AddFile(file); err != nil {
_, progress, err := m.fs.AddFile(file)
if err != nil {
return false, err
} else if progress {
log.Debug("Update storage success", "ih", file.Meta.InfoHash, "left", file.LeftSize)
var bytesRequested uint64
}

if progress {
bytesRequested := uint64(0)
if file.Meta.RawSize > file.LeftSize {
bytesRequested = file.Meta.RawSize - file.LeftSize
}

logMsg := "Data processing..."
if file.LeftSize == 0 {
log.Debug("Data processing completed !!!", "ih", file.Meta.InfoHash, "addr", (*tx.Recipient).String(), "remain", common.StorageSize(remainingSize), "request", common.StorageSize(bytesRequested), "raw", common.StorageSize(file.Meta.RawSize), "number", b.Number)
} else {
log.Debug("Data processing ...", "ih", file.Meta.InfoHash, "addr", (*tx.Recipient).String(), "remain", common.StorageSize(remainingSize), "request", common.StorageSize(bytesRequested), "raw", common.StorageSize(file.Meta.RawSize), "number", b.Number)
logMsg = "Data processing completed!"
}

log.Debug(logMsg,
"infoHash", file.Meta.InfoHash,
"addr", (*tx.Recipient).String(),
"remain", common.StorageSize(remainingSize),
"request", common.StorageSize(bytesRequested),
"raw", common.StorageSize(file.Meta.RawSize),
"blockNumber", b.Number)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := m.download(ctx, file.Meta.InfoHash, bytesRequested); err != nil {
cancel() // Ensure cancel is called on error
return false, err
}
cancel() // Call cancel when download is successful
}
}
record = true
final = append(final, tx)
}
}

// Update block transactions if necessary
if len(final) > 0 && len(final) < len(b.Txs) {
log.Debug("Final txs layout", "total", len(b.Txs), "final", len(final), "num", b.Number, "txs", m.fs.Txs())
log.Debug("Txs filtered", "total", len(b.Txs), "final", len(final), "blockNumber", b.Number)
b.Txs = final
}

// Add block to filesystem if any relevant transactions were found
if record {
if err := m.fs.AddBlock(b); err == nil {
log.Debug("Root has been changed", "number", b.Number, "hash", b.Hash, "root", m.fs.Root())
} else {
log.Warn("Block added failed", "number", b.Number, "hash", b.Hash, "root", m.fs.Root(), "err", err)
if err := m.fs.AddBlock(b); err != nil {
log.Warn("Block added failed", "blockNumber", b.Number, "blockHash", b.Hash, "root", m.fs.Root(), "error", err)
return false, err // Return the error if adding the block fails
}
log.Debug("Block added to filesystem", "blockNumber", b.Number, "blockHash", b.Hash, "root", m.fs.Root())
}

// Log transaction scanning time
if len(b.Txs) > 0 {
elapsed := time.Duration(mclock.Now()) - time.Duration(start)
log.Trace("Transactions scanning", "count", len(b.Txs), "number", b.Number, "elapsed", common.PrettyDuration(elapsed))
log.Trace("Transaction scanning complete", "count", len(b.Txs), "blockNumber", b.Number, "elapsed", common.PrettyDuration(elapsed))
}

return record, nil
Expand Down
103 changes: 69 additions & 34 deletions monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"math"
//"sort"
"path/filepath"
"runtime"
"sync"
Expand Down Expand Up @@ -444,52 +445,59 @@ func (m *Monitor) syncLatestBlock() {
defer m.wg.Done()
timer := time.NewTimer(time.Second * params.QueryTimeInterval)
defer timer.Stop()
progress, counter, end := uint64(0), 0, false

counter := 0

// Helper function to determine the next delay based on sync progress
getNextDelay := func(progress uint64) time.Duration {
if progress >= delay {
return 0 // Trigger immediately
}
if progress > 1 {
return time.Millisecond * 2000
}
if progress == 1 {
return time.Millisecond * 6750
}

// If progress is 0, check for listener and checkpoint status
if !m.listen && ((m.ckp != nil && m.currentNumber.Load() >= m.ckp.TfsCheckPoint) || (m.ckp == nil && m.currentNumber.Load() > 0)) {
// This part seems to have a specific termination or pause logic
// The original code has some commented-out `return`, so I'm assuming it's a "steady state" delay.
return time.Millisecond * 6750
}

return time.Millisecond * 6750 // Default case for other conditions
}

for {
select {
case sv := <-m.srvCh:
if err := m.doSwitch(sv); err != nil {
log.Error("Service switch failed", "srv", sv, "err", err)
}

case <-timer.C:
progress = m.syncLastBlock()
// Avoid sync in full mode, fresh interval may be less.
if progress >= delay {
end = false
timer.Reset(time.Millisecond * 0)
} else if progress > 1 {
end = false
timer.Reset(time.Millisecond * 2000)
} else if progress == 1 {
end = true
timer.Reset(time.Millisecond * 6750)
} else {
if !m.listen {
if (m.ckp != nil && m.currentNumber.Load() >= m.ckp.TfsCheckPoint) || (m.ckp == nil && m.currentNumber.Load() > 0) {
if !end {
end = true
timer.Reset(time.Millisecond * 6750)
continue
}
m.fs.Flush()
//elapsed := time.Duration(mclock.Now()) - time.Duration(m.start)
//log.Debug("Finish sync, listener will be paused", "current", m.currentNumber.Load(), "elapsed", common.PrettyDuration(elapsed), "progress", progress, "end", end, "last", m.lastNumber.Load())
//return
timer.Reset(time.Millisecond * 6750)
end = false
continue
}
}
timer.Reset(time.Millisecond * 6750)
}
counter++
if counter%100 == 0 {
log.Info("Monitor status", "blocks", progress, "current", m.CurrentNumber(), "latest", m.lastNumber.Load(), "end", end, "txs", m.fs.Txs(), "ckp", m.fs.CheckPoint(), "last", m.fs.LastListenBlockNumber())
progress := m.syncLastBlock()

// Determine the next delay and reset the timer
nextDelay := getNextDelay(progress)
timer.Reset(nextDelay)

// Log status periodically
counter += int(progress)
if counter > 65536 {
log.Info("Monitor status", "blocks", progress, "current", m.CurrentNumber(), "latest", m.lastNumber.Load(), "txs", m.fs.Txs(), "ckp", m.fs.CheckPoint(), "last", m.fs.LastListenBlockNumber(), "progress", progress, "root", m.fs.Root())
counter = 0
}

// Always flush at the end of a timer cycle
m.fs.Flush()

case <-m.exitCh:
log.Info("Block syncer stopped")
// Flush one last time before returning
m.fs.Flush()
return
}
}
Expand All @@ -513,6 +521,33 @@ func (m *Monitor) skip(i uint64) bool {
return false
}

/*func (m *Monitor) skip(i uint64) bool {
if m.srv.Load() != SRV_MODEL {
return false
}

if len(m.ckp.Skips) == 0 || i > m.ckp.Skips[len(m.ckp.Skips)-1].To || i < m.ckp.Skips[0].From {
return false
}

// Use sort.Search to find the index of the first skip interval
// whose 'From' field is greater than or equal to i.
idx := sort.Search(len(m.ckp.Skips), func(j int) bool {
return m.ckp.Skips[j].From > i
})

// Adjust the index to check the interval that might contain i.
// If idx is 0, no interval starts at or before i.
if idx > 0 {
interval := m.ckp.Skips[idx-1]
if i >= interval.From && i < interval.To {
return true
}
}

return false
}*/

func (m *Monitor) syncLastBlock() uint64 {
/*currentNumber, err := m.currentBlock()
if err != nil {
Expand Down
Loading