diff --git a/cmd/mithril/node/node.go b/cmd/mithril/node/node.go index 0ad38782..444b9f4a 100644 --- a/cmd/mithril/node/node.go +++ b/cmd/mithril/node/node.go @@ -89,18 +89,18 @@ var ( accountsPath string scratchDirectory string rpcEndpoints []string - cluster string // "mainnet-beta", "testnet", "devnet" - blockSource string // "rpc" or "lightbringer" + cluster string // "mainnet-beta", "testnet", "devnet" + blockSource string // "rpc" or "lightbringer" lightbringerEndpoint string - blockMaxRPS int // Rate limit for block fetching - blockMaxInflight int // Max concurrent block fetch workers - blockTipPollIntervalMs int // Tip poll interval in milliseconds - blockTipSafetyMargin int // Don't fetch within N slots of tip + blockMaxRPS int // Rate limit for block fetching + blockMaxInflight int // Max concurrent block fetch workers + blockTipPollIntervalMs int // Tip poll interval in milliseconds + blockTipSafetyMargin int // Don't fetch within N slots of tip // Mode thresholds - blockNearTipThreshold int // Enter near-tip when gap <= this - blockCatchupThreshold int // Exit near-tip when gap >= this - blockCatchupTipGateThreshold int // Only apply safety margin when gap > this + blockNearTipThreshold int // Enter near-tip when gap <= this + blockCatchupThreshold int // Exit near-tip when gap >= this + blockCatchupTipGateThreshold int // Only apply safety margin when gap > this // Near-tip tuning blockNearTipPollMs int // Faster poll in near-tip mode @@ -108,11 +108,11 @@ var ( snapshotDlPath string logDir string - numReplaySlots int64 - endSlot int64 - pprofPort int64 - blockstorePath string - txParallelism int64 + numReplaySlots int64 + endSlot int64 + pprofPort int64 + blockstorePath string + txParallelism int64 debugTxs []string debugAcctWrites []string @@ -856,129 +856,22 @@ func runVerifyRange(c *cobra.Command, args []string) { } defer metricsWriterCleanup() - if paramArenaSizeMB > 0 { - replay.SerializedParameterArena = arena.New[byte](paramArenaSizeMB << 20) - } - if borrowedAccountArenaSize > 0 { - sealevel.BorrowedAccountArenas = make([]*arena.Arena[sealevel.BorrowedAccount], txParallelism) - for i := range txParallelism { - sealevel.BorrowedAccountArenas[i] = arena.New[sealevel.BorrowedAccount](borrowedAccountArenaSize) - } - } + initArenas(paramArenaSizeMB, borrowedAccountArenaSize, txParallelism) - var rpcServer *rpcserver.RpcServer - if rpcPort < 0 || rpcPort > 65535 { - klog.Fatalf("invalid port: %d", rpcPort) - } else if rpcPort != 0 { - rpcServer = rpcserver.NewRpcServer(accountsDb, uint16(rpcPort)) - rpcServer.Start() - mlog.Log.Infof("Started RPC server on port %d", rpcPort) + rpcServer, err := startRPCServer(accountsDb, rpcPort) + if err != nil { + klog.Fatalf("%v", err) } replayStartTime := time.Now() - blockFetchOpts := &replay.BlockFetchOpts{ - MaxRPS: blockMaxRPS, - MaxInflight: blockMaxInflight, - TipPollMs: blockTipPollIntervalMs, - TipSafetyMargin: uint64(blockTipSafetyMargin), - - // Mode thresholds - NearTipThreshold: blockNearTipThreshold, - CatchupThreshold: blockCatchupThreshold, - CatchupTipGateThreshold: blockCatchupTipGateThreshold, - - // Near-tip tuning - NearTipPollMs: blockNearTipPollMs, - NearTipLookahead: blockNearTipLookahead, - } + blockFetchOpts := newBlockFetchOpts() result := runReplayWithRecovery(ctx, accountsDb, accountsDbDir, manifest, resumeState, uint64(startSlot), uint64(endSlot), rpcEndpoints, blockstorePath, int(txParallelism), false, false, dbgOpts, metricsWriter, rpcServer, mithrilState, blockFetchOpts, replayStartTime) // Update state file with last persisted slot and shutdown context - // Skip if already written during cancellation (eliminates timing window) - if result.LastPersistedSlot > 0 && mithrilState != nil && !result.StateWrittenOnCancel { - // Build shutdown context for graceful shutdown - var shutdownCtx *state.ShutdownContext - if result.LastAcctsLtHash != nil { - // Calculate epoch for the last persisted slot - var lastEpoch uint64 - if sealevel.SysvarCache.EpochSchedule.Sysvar != nil { - lastEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(result.LastPersistedSlot) - } - // Determine shutdown reason - shutdownReason := state.ShutdownReasonCompleted - if result.WasCancelled { - shutdownReason = state.ShutdownReasonNormal - } else if result.Error != nil { - if strings.Contains(result.Error.Error(), "stall") { - shutdownReason = state.ShutdownReasonStall - } else if strings.Contains(result.Error.Error(), "leader schedule") { - shutdownReason = state.ShutdownReasonLeaderSchedule - } else { - // Include the actual error for easier debugging - shutdownReason = fmt.Sprintf("%s: %v", state.ShutdownReasonError, result.Error) - } - } - - shutdownCtx = &state.ShutdownContext{ - RunID: replay.CurrentRunID, - WriterVersion: getVersion(), - WriterCommit: getCommit(), - WriterBranch: getBranch(), - ShutdownReason: shutdownReason, - Epoch: lastEpoch, - - // LtHash and fee state - AcctsLtHash: base64.StdEncoding.EncodeToString(result.LastAcctsLtHash.Hash()), - LamportsPerSignature: result.LastLamportsPerSignature, - PrevLamportsPerSig: result.LastPrevLamportsPerSig, - NumSignatures: result.LastNumSignatures, - - // Blockhash context - RecentBlockhashes: encodeRecentBlockhashes(result.LastRecentBlockhashes), - EvictedBlockhash: base58.Encode(result.LastEvictedBlockhash[:]), - LastBlockhash: base58.Encode(result.LastBlockhash[:]), - - // SlotHashes context - SlotHashes: encodeSlotHashes(result.LastSlotHashes), - - // ReplayCtx fields - Capitalization: result.LastCapitalization, - SlotsPerYear: result.LastSlotsPerYear, - InflationInitial: result.LastInflation.Initial, - InflationTerminal: result.LastInflation.Terminal, - InflationTaper: result.LastInflation.Taper, - InflationFoundation: result.LastInflation.FoundationVal, - InflationFoundationTerm: result.LastInflation.FoundationTerm, - - // EpochStakes - required for correct leader schedule on resume - ComputedEpochStakes: result.ComputedEpochStakes, - } - } - if err := mithrilState.UpdateOnShutdown(accountsDbDir, result.LastPersistedSlot, result.LastPersistedBankhash, shutdownCtx); err != nil { - mlog.Log.Errorf("failed to update state file: %v", err) - } - } + updateStateOnShutdown(result, mithrilState, accountsDbDir) // Print shutdown summary if cancelled or error - if (result.WasCancelled || result.Error != nil) && result.LastPersistedSlot > 0 { - // Calculate epoch from slot using epoch schedule - var epoch, snapshotEpoch uint64 - if sealevel.SysvarCache.EpochSchedule.Sysvar != nil { - epoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(result.LastPersistedSlot) - snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(snapshotBaseSlot) - } - progress.PrintShutdownSummary(progress.ShutdownInfo{ - LastSlot: result.LastPersistedSlot, - LastBankhash: result.LastPersistedBankhash, - SnapshotBaseSlot: snapshotBaseSlot, - AccountsDBPath: accountsDbDir, - ReplayDuration: time.Since(replayStartTime), - WasCancelled: result.WasCancelled, - RunID: replay.CurrentRunID, - Epoch: epoch, - SnapshotEpoch: snapshotEpoch, - }) - } + printShutdownSummaryIfNeeded(result, snapshotBaseSlot, accountsDbDir, replayStartTime) mlog.Log.Infof("Done replaying, closing DB") accountsDb.CloseDb() @@ -1619,135 +1512,22 @@ postBootstrap: } defer metricsWriterCleanup() - if paramArenaSizeMB > 0 { - replay.SerializedParameterArena = arena.New[byte](paramArenaSizeMB << 20) - } - if borrowedAccountArenaSize > 0 { - sealevel.BorrowedAccountArenas = make([]*arena.Arena[sealevel.BorrowedAccount], txParallelism) - for i := range txParallelism { - sealevel.BorrowedAccountArenas[i] = arena.New[sealevel.BorrowedAccount](borrowedAccountArenaSize) - } - } + initArenas(paramArenaSizeMB, borrowedAccountArenaSize, txParallelism) - var rpcServer *rpcserver.RpcServer - if rpcPort < 0 || rpcPort > 65535 { - klog.Fatalf("invalid port: %d", rpcPort) - } else if rpcPort != 0 { - rpcServer = rpcserver.NewRpcServer(accountsDb, uint16(rpcPort)) - rpcServer.Start() - mlog.Log.Infof("Started RPC server on port %d", rpcPort) + rpcServer, err := startRPCServer(accountsDb, rpcPort) + if err != nil { + klog.Fatalf("%v", err) } replayStartTime := time.Now() - blockFetchOpts := &replay.BlockFetchOpts{ - MaxRPS: blockMaxRPS, - MaxInflight: blockMaxInflight, - TipPollMs: blockTipPollIntervalMs, - TipSafetyMargin: uint64(blockTipSafetyMargin), - - // Mode thresholds - NearTipThreshold: blockNearTipThreshold, - CatchupThreshold: blockCatchupThreshold, - CatchupTipGateThreshold: blockCatchupTipGateThreshold, - - // Near-tip tuning - NearTipPollMs: blockNearTipPollMs, - NearTipLookahead: blockNearTipLookahead, - } + blockFetchOpts := newBlockFetchOpts() result := runReplayWithRecovery(ctx, accountsDb, accountsPath, manifest, resumeState, uint64(startSlot), liveEndSlot, rpcEndpoints, blockstorePath, int(txParallelism), true, useLightbringer, dbgOpts, metricsWriter, rpcServer, mithrilState, blockFetchOpts, replayStartTime) // Update state file with last persisted slot and shutdown context - // Skip if already written during cancellation (eliminates timing window) - if result.LastPersistedSlot > 0 && mithrilState != nil && !result.StateWrittenOnCancel { - var shutdownCtx *state.ShutdownContext - if result.LastAcctsLtHash != nil { - // Calculate epoch for the last persisted slot - var lastEpoch uint64 - if sealevel.SysvarCache.EpochSchedule.Sysvar != nil { - lastEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(result.LastPersistedSlot) - } - // Determine shutdown reason - shutdownReason := state.ShutdownReasonCompleted - if result.WasCancelled { - shutdownReason = state.ShutdownReasonNormal - } else if result.Error != nil { - if strings.Contains(result.Error.Error(), "stall") { - shutdownReason = state.ShutdownReasonStall - } else if strings.Contains(result.Error.Error(), "leader schedule") { - shutdownReason = state.ShutdownReasonLeaderSchedule - } else { - // Include the actual error for easier debugging - shutdownReason = fmt.Sprintf("%s: %v", state.ShutdownReasonError, result.Error) - } - } - - shutdownCtx = &state.ShutdownContext{ - RunID: replay.CurrentRunID, - WriterVersion: getVersion(), - WriterCommit: getCommit(), - WriterBranch: getBranch(), - ShutdownReason: shutdownReason, - Epoch: lastEpoch, - - // LtHash and fee state - AcctsLtHash: base64.StdEncoding.EncodeToString(result.LastAcctsLtHash.Hash()), - LamportsPerSignature: result.LastLamportsPerSignature, - PrevLamportsPerSig: result.LastPrevLamportsPerSig, - NumSignatures: result.LastNumSignatures, - - // Blockhash context - RecentBlockhashes: encodeRecentBlockhashes(result.LastRecentBlockhashes), - EvictedBlockhash: base58.Encode(result.LastEvictedBlockhash[:]), - LastBlockhash: base58.Encode(result.LastBlockhash[:]), - - // SlotHashes context - SlotHashes: encodeSlotHashes(result.LastSlotHashes), - - // ReplayCtx fields - Capitalization: result.LastCapitalization, - SlotsPerYear: result.LastSlotsPerYear, - InflationInitial: result.LastInflation.Initial, - InflationTerminal: result.LastInflation.Terminal, - InflationTaper: result.LastInflation.Taper, - InflationFoundation: result.LastInflation.FoundationVal, - InflationFoundationTerm: result.LastInflation.FoundationTerm, - - // EpochStakes - required for correct leader schedule on resume - ComputedEpochStakes: result.ComputedEpochStakes, - } - // Record shutdown in history (must be inside this block where shutdownReason is defined) - if err := mithrilState.UpdateOnShutdown(accountsPath, result.LastPersistedSlot, result.LastPersistedBankhash, shutdownCtx); err != nil { - mlog.Log.Errorf("failed to update state file: %v", err) - } - state.RecordShutdown(accountsPath, result.LastPersistedSlot, base58.Encode(result.LastPersistedBankhash), replay.CurrentRunID, getVersion(), getCommit(), getBranch(), shutdownReason) - } else { - // No shutdown context - just update slot - if err := mithrilState.UpdateOnShutdown(accountsPath, result.LastPersistedSlot, result.LastPersistedBankhash, shutdownCtx); err != nil { - mlog.Log.Errorf("failed to update state file: %v", err) - } - } - } + updateStateOnShutdown(result, mithrilState, accountsPath) // Print shutdown summary if cancelled or error - if (result.WasCancelled || result.Error != nil) && result.LastPersistedSlot > 0 { - // Calculate epoch from slot using epoch schedule - var epoch, snapshotEpoch uint64 - if sealevel.SysvarCache.EpochSchedule.Sysvar != nil { - epoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(result.LastPersistedSlot) - snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(snapshotBaseSlot) - } - progress.PrintShutdownSummary(progress.ShutdownInfo{ - LastSlot: result.LastPersistedSlot, - LastBankhash: result.LastPersistedBankhash, - SnapshotBaseSlot: snapshotBaseSlot, - AccountsDBPath: accountsPath, - ReplayDuration: time.Since(replayStartTime), - WasCancelled: result.WasCancelled, - RunID: replay.CurrentRunID, - Epoch: epoch, - SnapshotEpoch: snapshotEpoch, - }) - } + printShutdownSummaryIfNeeded(result, snapshotBaseSlot, accountsPath, replayStartTime) mlog.Log.Infof("Done replaying, closing DB") accountsDb.CloseDb() @@ -2470,6 +2250,161 @@ func encodeSlotHashes(sysvar *sealevel.SysvarSlotHashes) []state.SlotHashEntry { return result } +// initArenas initializes the serialized parameter arena and borrowed account arenas. +// txPar is the transaction parallelism level (0 for sequential execution). +func initArenas(paramArenaSizeMB, borrowedAccountArenaSize uint64, txPar int64) { + if paramArenaSizeMB > 0 { + replay.SerializedParameterArena = arena.New[byte](paramArenaSizeMB << 20) + } + if borrowedAccountArenaSize > 0 { + sealevel.BorrowedAccountArenas = make([]*arena.Arena[sealevel.BorrowedAccount], txPar) + for i := range txPar { + sealevel.BorrowedAccountArenas[i] = arena.New[sealevel.BorrowedAccount](borrowedAccountArenaSize) + } + } +} + +// startRPCServer validates the port and starts the RPC server if configured. +// Returns the server (or nil if port is 0) and any error. +func startRPCServer(accountsDb *accountsdb.AccountsDb, port int) (*rpcserver.RpcServer, error) { + if port < 0 || port > 65535 { + return nil, fmt.Errorf("invalid port: %d", port) + } + if port == 0 { + return nil, nil + } + server := rpcserver.NewRpcServer(accountsDb, uint16(port)) + server.Start() + mlog.Log.Infof("Started RPC server on port %d", port) + return server, nil +} + +// newBlockFetchOpts creates BlockFetchOpts from the global config variables. +func newBlockFetchOpts() *replay.BlockFetchOpts { + return &replay.BlockFetchOpts{ + MaxRPS: blockMaxRPS, + MaxInflight: blockMaxInflight, + TipPollMs: blockTipPollIntervalMs, + TipSafetyMargin: uint64(blockTipSafetyMargin), + + // Mode thresholds + NearTipThreshold: blockNearTipThreshold, + CatchupThreshold: blockCatchupThreshold, + CatchupTipGateThreshold: blockCatchupTipGateThreshold, + + // Near-tip tuning + NearTipPollMs: blockNearTipPollMs, + NearTipLookahead: blockNearTipLookahead, + } +} + +// buildShutdownContext creates a ShutdownContext from replay result for state persistence. +// Returns nil if result doesn't have the necessary data. +func buildShutdownContext(result *replay.ReplayResult, wasCancelled bool) *state.ShutdownContext { + if result.LastAcctsLtHash == nil { + return nil + } + + // Calculate epoch for the last persisted slot + var lastEpoch uint64 + if sealevel.SysvarCache.EpochSchedule.Sysvar != nil { + lastEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(result.LastPersistedSlot) + } + + // Determine shutdown reason + shutdownReason := state.ShutdownReasonCompleted + if wasCancelled { + shutdownReason = state.ShutdownReasonNormal + } else if result.Error != nil { + errStr := result.Error.Error() + if strings.Contains(errStr, "stall") { + shutdownReason = state.ShutdownReasonStall + } else if strings.Contains(errStr, "leader schedule") { + shutdownReason = state.ShutdownReasonLeaderSchedule + } else { + shutdownReason = fmt.Sprintf("%s: %v", state.ShutdownReasonError, result.Error) + } + } + + return &state.ShutdownContext{ + RunID: replay.CurrentRunID, + WriterVersion: getVersion(), + WriterCommit: getCommit(), + WriterBranch: getBranch(), + ShutdownReason: shutdownReason, + Epoch: lastEpoch, + + // LtHash and fee state + AcctsLtHash: base64.StdEncoding.EncodeToString(result.LastAcctsLtHash.Hash()), + LamportsPerSignature: result.LastLamportsPerSignature, + PrevLamportsPerSig: result.LastPrevLamportsPerSig, + NumSignatures: result.LastNumSignatures, + + // Blockhash context + RecentBlockhashes: encodeRecentBlockhashes(result.LastRecentBlockhashes), + EvictedBlockhash: base58.Encode(result.LastEvictedBlockhash[:]), + LastBlockhash: base58.Encode(result.LastBlockhash[:]), + + // SlotHashes context + SlotHashes: encodeSlotHashes(result.LastSlotHashes), + + // ReplayCtx fields + Capitalization: result.LastCapitalization, + SlotsPerYear: result.LastSlotsPerYear, + InflationInitial: result.LastInflation.Initial, + InflationTerminal: result.LastInflation.Terminal, + InflationTaper: result.LastInflation.Taper, + InflationFoundation: result.LastInflation.FoundationVal, + InflationFoundationTerm: result.LastInflation.FoundationTerm, + + // EpochStakes - required for correct leader schedule on resume + ComputedEpochStakes: result.ComputedEpochStakes, + } +} + +// updateStateOnShutdown updates the state file with shutdown context and records to history. +// This is called after replay completes (either normally, cancelled, or error). +func updateStateOnShutdown(result *replay.ReplayResult, mithrilState *state.MithrilState, accountsDbPath string) { + if result.LastPersistedSlot == 0 || mithrilState == nil || result.StateWrittenOnCancel { + return + } + + shutdownCtx := buildShutdownContext(result, result.WasCancelled) + if err := mithrilState.UpdateOnShutdown(accountsDbPath, result.LastPersistedSlot, result.LastPersistedBankhash, shutdownCtx); err != nil { + mlog.Log.Errorf("failed to update state file: %v", err) + } + + // Record shutdown in history if we have shutdown context + if shutdownCtx != nil { + state.RecordShutdown(accountsDbPath, result.LastPersistedSlot, base58.Encode(result.LastPersistedBankhash), replay.CurrentRunID, getVersion(), getCommit(), getBranch(), shutdownCtx.ShutdownReason) + } +} + +// printShutdownSummaryIfNeeded prints the shutdown summary if replay was cancelled or errored. +func printShutdownSummaryIfNeeded(result *replay.ReplayResult, snapshotBaseSlot uint64, accountsDbPath string, replayStartTime time.Time) { + if (!result.WasCancelled && result.Error == nil) || result.LastPersistedSlot == 0 { + return + } + + // Calculate epoch from slot using epoch schedule + var epoch, snapshotEpoch uint64 + if sealevel.SysvarCache.EpochSchedule.Sysvar != nil { + epoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(result.LastPersistedSlot) + snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(snapshotBaseSlot) + } + progress.PrintShutdownSummary(progress.ShutdownInfo{ + LastSlot: result.LastPersistedSlot, + LastBankhash: result.LastPersistedBankhash, + SnapshotBaseSlot: snapshotBaseSlot, + AccountsDBPath: accountsDbPath, + ReplayDuration: time.Since(replayStartTime), + WasCancelled: result.WasCancelled, + RunID: replay.CurrentRunID, + Epoch: epoch, + SnapshotEpoch: snapshotEpoch, + }) +} + func createBufWriter(filename string) (io.Writer, func(), error) { if filename == "" { return nil, func() {}, nil