diff --git a/pkg/global/global_ctx.go b/pkg/global/global_ctx.go index 0c3f347d..a2fcc376 100644 --- a/pkg/global/global_ctx.go +++ b/pkg/global/global_ctx.go @@ -35,7 +35,6 @@ type GlobalCtx struct { cachedStakeEntries []accountsdb.StakeIndexEntry // Parsed+sorted index, populated on first load entriesFlushedSinceCompact int // Appended entries since last compaction voteCache map[solana.PublicKey]*sealevel.VoteStateVersions - voteStakeTotals map[solana.PublicKey]uint64 // Aggregated stake totals per vote account (replaces full stake cache at startup) epochStakes *epochstakes.EpochStakesCache epochAuthorizedVoters *epochstakes.EpochAuthorizedVotersCache forkChoice *forkchoice.ForkChoiceService @@ -46,7 +45,6 @@ type GlobalCtx struct { manageBlockHeight bool pendingStakeMutex sync.Mutex // Protects pendingNewStakePubkeys voteCacheMutex sync.RWMutex - voteStakeTotalsMu sync.RWMutex slotsConfirmedMutex sync.Mutex mu sync.Mutex } @@ -161,14 +159,6 @@ func VoteCacheSnapshot() map[solana.PublicKey]*sealevel.VoteStateVersions { return snapshot } -// SetVoteStakeTotals sets the aggregated stake totals per vote account. -// Called at startup after scanning all stake accounts. -func SetVoteStakeTotals(m map[solana.PublicKey]uint64) { - instance.voteStakeTotalsMu.Lock() - instance.voteStakeTotals = m - instance.voteStakeTotalsMu.Unlock() -} - func PutEpochStakesEntry(epoch uint64, pubkey solana.PublicKey, stake uint64, voteAcct *epochstakes.VoteAccount) { if instance.epochStakes == nil { instance.epochStakes = epochstakes.NewEpochStakesCache() diff --git a/pkg/replay/block.go b/pkg/replay/block.go index 00a278b8..cc84af16 100644 --- a/pkg/replay/block.go +++ b/pkg/replay/block.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "io" + "maps" "math" "os" "path/filepath" @@ -786,18 +787,37 @@ func setupInitialVoteAcctsAndStakeAccts(acctsDb *accountsdb.AccountsDb, block *b wg.Wait() stakeAcctWorkerPool.Release() - // Store aggregated totals in global for later use - global.SetVoteStakeTotals(voteAcctStakes) - // Load vote accounts from AccountsDB into vote cache if err := RebuildVoteCacheFromAccountsDB(acctsDb, block.Slot, voteAcctStakes, 0); err != nil { mlog.Log.Warnf("vote cache rebuild had errors: %v", err) } - // Derive EpochStakesPerVoteAcct and TotalEpochStake from aggregated totals - for pk, stake := range voteAcctStakes { - block.EpochStakesPerVoteAcct[pk] = stake - block.TotalEpochStake += stake + // Seed EpochStakesPerVoteAcct and TotalEpochStake from the epoch stakes cache, + // loaded by buildInitialEpochStakesCache() from the manifest. These are + // epoch-effective stakes (warmup/cooldown applied), matching Agave's + // get_epoch_stake syscall behavior. The raw AccountsDB scan above uses + // delegation.StakeLamports which can differ from effective stake. + epochStakes := global.EpochStakes(block.Epoch) + if len(epochStakes) == 0 { + mlog.Log.Errorf("FATAL: no epoch stakes in cache for epoch %d - "+ + "buildInitialEpochStakesCache should have loaded these from manifest", block.Epoch) + mlog.Log.Errorf("Available cached epochs: %v", global.GetAllCachedEpochs()) + os.Exit(1) + } + maps.Copy(block.EpochStakesPerVoteAcct, epochStakes) + block.TotalEpochStake = global.EpochTotalStake(block.Epoch) + + // Diagnostic: one-time startup comparison of raw scan vs epoch-effective totals + var rawScanTotal uint64 + for _, stake := range voteAcctStakes { + rawScanTotal += stake + } + if rawScanTotal != block.TotalEpochStake { + mlog.Log.Infof("startup stake check: rawScanTotal=%d epochEffectiveTotal=%d delta=%d "+ + "(expected difference from warmup/cooldown)", + rawScanTotal, block.TotalEpochStake, int64(rawScanTotal)-int64(block.TotalEpochStake)) + } else { + mlog.Log.Infof("startup stake check: rawScanTotal=%d matches epochEffectiveTotal (all stakes fully warmed up)", rawScanTotal) } // Derive VoteTimestamps from ALL vote accounts in cache (including zero-stake) diff --git a/pkg/replay/rewards.go b/pkg/replay/rewards.go index 667d6d7a..31511a0f 100644 --- a/pkg/replay/rewards.go +++ b/pkg/replay/rewards.go @@ -9,6 +9,7 @@ import ( "github.com/Overclock-Validator/mithril/pkg/block" "github.com/Overclock-Validator/mithril/pkg/features" "github.com/Overclock-Validator/mithril/pkg/global" + "github.com/Overclock-Validator/mithril/pkg/mlog" "github.com/Overclock-Validator/mithril/pkg/rewards" "github.com/Overclock-Validator/mithril/pkg/sealevel" "github.com/Overclock-Validator/wide" @@ -85,10 +86,12 @@ func distributePartitionedEpochRewardsForSlot(acctsDb *accountsdb.AccountsDb, ep partitionIdx := currentBlockHeight - epochRewards.DistributionStartingBlockHeight - distributedAccts, parentDistributedAccts, distributedLamports := rewards.DistributeStakingRewardsFromSpool(acctsDb, partitionedEpochRewardsInfo.SpoolDir, partitionedEpochRewardsInfo.SpoolSlot, partitionIdx, currentSlot) + distributedAccts, parentDistributedAccts, distributedLamports, burnedLamports := rewards.DistributeStakingRewardsFromSpool(acctsDb, partitionedEpochRewardsInfo.SpoolDir, partitionedEpochRewardsInfo.SpoolSlot, partitionIdx, currentSlot) parentDistributedAccts = append(parentDistributedAccts, epochRewardsAcct.Clone()) - epochRewards.Distribute(distributedLamports) + // EpochRewards sysvar advances by distributed + burned (matching Agave/FD), + // but capitalization only increases by distributed. + epochRewards.Distribute(distributedLamports + burnedLamports) partitionedEpochRewardsInfo.NumRewardPartitionsRemaining-- if partitionedEpochRewardsInfo.NumRewardPartitionsRemaining == 0 { @@ -111,5 +114,9 @@ func distributePartitionedEpochRewardsForSlot(acctsDb *accountsdb.AccountsDb, ep distributedAccts = append(distributedAccts, epochRewardsAcct.Clone()) epochCtx.Capitalization += distributedLamports + if burnedLamports > 0 { + mlog.Log.Warnf("partition %d: distributed=%d burned=%d", partitionIdx, distributedLamports, burnedLamports) + } + return distributedAccts, parentDistributedAccts } diff --git a/pkg/rewards/rewards.go b/pkg/rewards/rewards.go index d683e863..dd5588ec 100644 --- a/pkg/rewards/rewards.go +++ b/pkg/rewards/rewards.go @@ -1,6 +1,7 @@ package rewards import ( + "errors" "fmt" "io" "math" @@ -166,13 +167,13 @@ func DistributeVotingRewards(acctsDb *accountsdb.AccountsDb, validatorRewards ma return updatedAccts, parentUpdatedAccts, totalVotingRewards.Load() } -func DistributeStakingRewardsFromSpool(acctsDb *accountsdb.AccountsDb, spoolDir string, spoolSlot uint64, partitionIdx uint64, currentSlot uint64) ([]*accounts.Account, []*accounts.Account, uint64) { +func DistributeStakingRewardsFromSpool(acctsDb *accountsdb.AccountsDb, spoolDir string, spoolSlot uint64, partitionIdx uint64, currentSlot uint64) ([]*accounts.Account, []*accounts.Account, uint64, uint64) { reader, err := NewPartitionReader(spoolDir, spoolSlot, uint32(partitionIdx)) if err != nil { panic(fmt.Sprintf("unable to open partition %d spool for distribution: %s", partitionIdx, err)) } if reader == nil { - return nil, nil, 0 + return nil, nil, 0, 0 } defer reader.Close() @@ -189,12 +190,13 @@ func DistributeStakingRewardsFromSpool(acctsDb *accountsdb.AccountsDb, spoolDir } if len(records) == 0 { - return nil, nil, 0 + return nil, nil, 0, 0 } accts := make([]*accounts.Account, len(records)) parentAccts := make([]*accounts.Account, len(records)) var distributedLamports atomic.Uint64 + var burnedLamports atomic.Uint64 var wg sync.WaitGroup workerPool, _ := ants.NewPoolWithFunc(runtime.GOMAXPROCS(0)*8, func(i interface{}) { @@ -202,14 +204,36 @@ func DistributeStakingRewardsFromSpool(acctsDb *accountsdb.AccountsDb, spoolDir idx := i.(int) rec := records[idx] + // Per-record failures burn rewards instead of panicking, matching Agave + // distribution.rs:282-294 and Firedancer fd_rewards.c:958-968. + // Burned rewards advance the EpochRewards sysvar but do NOT increase + // capitalization. + stakeAcct, err := acctsDb.GetAccount(currentSlot, rec.StakePubkey) if err != nil { - panic(fmt.Sprintf("unable to get acct %s from acctsdb for spool distribution in slot %d", rec.StakePubkey, currentSlot)) + if errors.Is(err, accountsdb.ErrNoAccount) { + // AccountNotFound — matches Agave DistributionError::AccountNotFound + mlog.Log.Warnf("spool distribution: account %s not found in slot %d, %d lamports burned", rec.StakePubkey, currentSlot, rec.RewardLamports) + burnedLamports.Add(rec.RewardLamports) + return + } + // Storage/IO error — hard fail, do not burn + panic(fmt.Sprintf("spool distribution: GetAccount failed for %s in slot %d: %v", rec.StakePubkey, currentSlot, err)) } parentAccts[idx] = stakeAcct.Clone() stakeState, err := sealevel.UnmarshalStakeState(stakeAcct.Data) if err != nil { + // Stake state decode failure — matches FD fd_stake_get_state != 0 + mlog.Log.Warnf("spool distribution: stake state decode failed for %s, %d lamports burned: %v", rec.StakePubkey, rec.RewardLamports, err) + burnedLamports.Add(rec.RewardLamports) + return + } + + if stakeState.Status != sealevel.StakeStateV2StatusStake { + // Non-stake state — matches FD !fd_stake_state_v2_is_stake + mlog.Log.Warnf("spool distribution: account %s not in Stake state (status=%d), %d lamports burned", rec.StakePubkey, stakeState.Status, rec.RewardLamports) + burnedLamports.Add(rec.RewardLamports) return } @@ -218,12 +242,18 @@ func DistributeStakingRewardsFromSpool(acctsDb *accountsdb.AccountsDb, spoolDir err = sealevel.MarshalStakeStakeInto(stakeState, stakeAcct.Data) if err != nil { - panic(fmt.Sprintf("unable to serialize stake state in spool distribution: %s", err)) + // Re-encode failure — matches Agave DistributionError::UnableToSetState + mlog.Log.Warnf("spool distribution: stake state encode failed for %s, %d lamports burned: %v", rec.StakePubkey, rec.RewardLamports, err) + burnedLamports.Add(rec.RewardLamports) + return } stakeAcct.Lamports, err = safemath.CheckedAddU64(stakeAcct.Lamports, rec.RewardLamports) if err != nil { - panic(fmt.Sprintf("overflow in spool distribution in slot %d to acct %s: %s", currentSlot, rec.StakePubkey, err)) + // Arithmetic overflow — matches Agave DistributionError::ArithmeticOverflow + mlog.Log.Warnf("spool distribution: lamports overflow for %s, %d lamports burned: %v", rec.StakePubkey, rec.RewardLamports, err) + burnedLamports.Add(rec.RewardLamports) + return } accts[idx] = stakeAcct @@ -237,12 +267,22 @@ func DistributeStakingRewardsFromSpool(acctsDb *accountsdb.AccountsDb, spoolDir wg.Wait() workerPool.Release() - err = acctsDb.StoreAccounts(accts, currentSlot, nil) - if err != nil { - panic(fmt.Sprintf("error updating accounts for spool distribution in slot %d: %s", currentSlot, err)) + // Filter out nil entries (burned records) before storing + var filteredAccts []*accounts.Account + for _, a := range accts { + if a != nil { + filteredAccts = append(filteredAccts, a) + } + } + + if len(filteredAccts) > 0 { + err = acctsDb.StoreAccounts(filteredAccts, currentSlot, nil) + if err != nil { + panic(fmt.Sprintf("error updating accounts for spool distribution in slot %d: %s", currentSlot, err)) + } } - return accts, parentAccts, distributedLamports.Load() + return accts, parentAccts, distributedLamports.Load(), burnedLamports.Load() } func minimumStakeDelegation(slotCtx *sealevel.SlotCtx) uint64 {