Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions pkg/global/global_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type GlobalCtx struct {
cachedStakeEntries []accountsdb.StakeIndexEntry // Parsed+sorted index, populated on first load
entriesFlushedSinceCompact int // Appended entries since last compaction
voteCache map[solana.PublicKey]*sealevel.VoteStateVersions
voteStakeTotals map[solana.PublicKey]uint64 // Aggregated stake totals per vote account (replaces full stake cache at startup)
epochStakes *epochstakes.EpochStakesCache
epochAuthorizedVoters *epochstakes.EpochAuthorizedVotersCache
forkChoice *forkchoice.ForkChoiceService
Expand All @@ -46,7 +45,6 @@ type GlobalCtx struct {
manageBlockHeight bool
pendingStakeMutex sync.Mutex // Protects pendingNewStakePubkeys
voteCacheMutex sync.RWMutex
voteStakeTotalsMu sync.RWMutex
slotsConfirmedMutex sync.Mutex
mu sync.Mutex
}
Expand Down Expand Up @@ -161,14 +159,6 @@ func VoteCacheSnapshot() map[solana.PublicKey]*sealevel.VoteStateVersions {
return snapshot
}

// SetVoteStakeTotals sets the aggregated stake totals per vote account.
// Called at startup after scanning all stake accounts.
func SetVoteStakeTotals(m map[solana.PublicKey]uint64) {
instance.voteStakeTotalsMu.Lock()
instance.voteStakeTotals = m
instance.voteStakeTotalsMu.Unlock()
}

func PutEpochStakesEntry(epoch uint64, pubkey solana.PublicKey, stake uint64, voteAcct *epochstakes.VoteAccount) {
if instance.epochStakes == nil {
instance.epochStakes = epochstakes.NewEpochStakesCache()
Expand Down
34 changes: 27 additions & 7 deletions pkg/replay/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"io"
"maps"
"math"
"os"
"path/filepath"
Expand Down Expand Up @@ -786,18 +787,37 @@ func setupInitialVoteAcctsAndStakeAccts(acctsDb *accountsdb.AccountsDb, block *b
wg.Wait()
stakeAcctWorkerPool.Release()

// Store aggregated totals in global for later use
global.SetVoteStakeTotals(voteAcctStakes)

// Load vote accounts from AccountsDB into vote cache
if err := RebuildVoteCacheFromAccountsDB(acctsDb, block.Slot, voteAcctStakes, 0); err != nil {
mlog.Log.Warnf("vote cache rebuild had errors: %v", err)
}

// Derive EpochStakesPerVoteAcct and TotalEpochStake from aggregated totals
for pk, stake := range voteAcctStakes {
block.EpochStakesPerVoteAcct[pk] = stake
block.TotalEpochStake += stake
// Seed EpochStakesPerVoteAcct and TotalEpochStake from the epoch stakes cache,
// loaded by buildInitialEpochStakesCache() from the manifest. These are
// epoch-effective stakes (warmup/cooldown applied), matching Agave's
// get_epoch_stake syscall behavior. The raw AccountsDB scan above uses
// delegation.StakeLamports which can differ from effective stake.
epochStakes := global.EpochStakes(block.Epoch)
if len(epochStakes) == 0 {
mlog.Log.Errorf("FATAL: no epoch stakes in cache for epoch %d - "+
"buildInitialEpochStakesCache should have loaded these from manifest", block.Epoch)
mlog.Log.Errorf("Available cached epochs: %v", global.GetAllCachedEpochs())
os.Exit(1)
}
maps.Copy(block.EpochStakesPerVoteAcct, epochStakes)
block.TotalEpochStake = global.EpochTotalStake(block.Epoch)

// Diagnostic: one-time startup comparison of raw scan vs epoch-effective totals
var rawScanTotal uint64
for _, stake := range voteAcctStakes {
rawScanTotal += stake
}
if rawScanTotal != block.TotalEpochStake {
mlog.Log.Infof("startup stake check: rawScanTotal=%d epochEffectiveTotal=%d delta=%d "+
"(expected difference from warmup/cooldown)",
rawScanTotal, block.TotalEpochStake, int64(rawScanTotal)-int64(block.TotalEpochStake))
} else {
mlog.Log.Infof("startup stake check: rawScanTotal=%d matches epochEffectiveTotal (all stakes fully warmed up)", rawScanTotal)
}

// Derive VoteTimestamps from ALL vote accounts in cache (including zero-stake)
Expand Down
11 changes: 9 additions & 2 deletions pkg/replay/rewards.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/Overclock-Validator/mithril/pkg/block"
"github.com/Overclock-Validator/mithril/pkg/features"
"github.com/Overclock-Validator/mithril/pkg/global"
"github.com/Overclock-Validator/mithril/pkg/mlog"
"github.com/Overclock-Validator/mithril/pkg/rewards"
"github.com/Overclock-Validator/mithril/pkg/sealevel"
"github.com/Overclock-Validator/wide"
Expand Down Expand Up @@ -85,10 +86,12 @@ func distributePartitionedEpochRewardsForSlot(acctsDb *accountsdb.AccountsDb, ep

partitionIdx := currentBlockHeight - epochRewards.DistributionStartingBlockHeight

distributedAccts, parentDistributedAccts, distributedLamports := rewards.DistributeStakingRewardsFromSpool(acctsDb, partitionedEpochRewardsInfo.SpoolDir, partitionedEpochRewardsInfo.SpoolSlot, partitionIdx, currentSlot)
distributedAccts, parentDistributedAccts, distributedLamports, burnedLamports := rewards.DistributeStakingRewardsFromSpool(acctsDb, partitionedEpochRewardsInfo.SpoolDir, partitionedEpochRewardsInfo.SpoolSlot, partitionIdx, currentSlot)
parentDistributedAccts = append(parentDistributedAccts, epochRewardsAcct.Clone())

epochRewards.Distribute(distributedLamports)
// EpochRewards sysvar advances by distributed + burned (matching Agave/FD),
// but capitalization only increases by distributed.
epochRewards.Distribute(distributedLamports + burnedLamports)
partitionedEpochRewardsInfo.NumRewardPartitionsRemaining--

if partitionedEpochRewardsInfo.NumRewardPartitionsRemaining == 0 {
Expand All @@ -111,5 +114,9 @@ func distributePartitionedEpochRewardsForSlot(acctsDb *accountsdb.AccountsDb, ep
distributedAccts = append(distributedAccts, epochRewardsAcct.Clone())
epochCtx.Capitalization += distributedLamports

if burnedLamports > 0 {
mlog.Log.Warnf("partition %d: distributed=%d burned=%d", partitionIdx, distributedLamports, burnedLamports)
}

return distributedAccts, parentDistributedAccts
}
60 changes: 50 additions & 10 deletions pkg/rewards/rewards.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rewards

import (
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -166,13 +167,13 @@ func DistributeVotingRewards(acctsDb *accountsdb.AccountsDb, validatorRewards ma
return updatedAccts, parentUpdatedAccts, totalVotingRewards.Load()
}

func DistributeStakingRewardsFromSpool(acctsDb *accountsdb.AccountsDb, spoolDir string, spoolSlot uint64, partitionIdx uint64, currentSlot uint64) ([]*accounts.Account, []*accounts.Account, uint64) {
func DistributeStakingRewardsFromSpool(acctsDb *accountsdb.AccountsDb, spoolDir string, spoolSlot uint64, partitionIdx uint64, currentSlot uint64) ([]*accounts.Account, []*accounts.Account, uint64, uint64) {
reader, err := NewPartitionReader(spoolDir, spoolSlot, uint32(partitionIdx))
if err != nil {
panic(fmt.Sprintf("unable to open partition %d spool for distribution: %s", partitionIdx, err))
}
if reader == nil {
return nil, nil, 0
return nil, nil, 0, 0
}
defer reader.Close()

Expand All @@ -189,27 +190,50 @@ func DistributeStakingRewardsFromSpool(acctsDb *accountsdb.AccountsDb, spoolDir
}

if len(records) == 0 {
return nil, nil, 0
return nil, nil, 0, 0
}

accts := make([]*accounts.Account, len(records))
parentAccts := make([]*accounts.Account, len(records))
var distributedLamports atomic.Uint64
var burnedLamports atomic.Uint64

var wg sync.WaitGroup
workerPool, _ := ants.NewPoolWithFunc(runtime.GOMAXPROCS(0)*8, func(i interface{}) {
defer wg.Done()
idx := i.(int)
rec := records[idx]

// Per-record failures burn rewards instead of panicking, matching Agave
// distribution.rs:282-294 and Firedancer fd_rewards.c:958-968.
// Burned rewards advance the EpochRewards sysvar but do NOT increase
// capitalization.

stakeAcct, err := acctsDb.GetAccount(currentSlot, rec.StakePubkey)
if err != nil {
panic(fmt.Sprintf("unable to get acct %s from acctsdb for spool distribution in slot %d", rec.StakePubkey, currentSlot))
if errors.Is(err, accountsdb.ErrNoAccount) {
// AccountNotFound — matches Agave DistributionError::AccountNotFound
mlog.Log.Warnf("spool distribution: account %s not found in slot %d, %d lamports burned", rec.StakePubkey, currentSlot, rec.RewardLamports)
burnedLamports.Add(rec.RewardLamports)
return
}
// Storage/IO error — hard fail, do not burn
panic(fmt.Sprintf("spool distribution: GetAccount failed for %s in slot %d: %v", rec.StakePubkey, currentSlot, err))
}
parentAccts[idx] = stakeAcct.Clone()

stakeState, err := sealevel.UnmarshalStakeState(stakeAcct.Data)
if err != nil {
// Stake state decode failure — matches FD fd_stake_get_state != 0
mlog.Log.Warnf("spool distribution: stake state decode failed for %s, %d lamports burned: %v", rec.StakePubkey, rec.RewardLamports, err)
burnedLamports.Add(rec.RewardLamports)
return
}

if stakeState.Status != sealevel.StakeStateV2StatusStake {
// Non-stake state — matches FD !fd_stake_state_v2_is_stake
mlog.Log.Warnf("spool distribution: account %s not in Stake state (status=%d), %d lamports burned", rec.StakePubkey, stakeState.Status, rec.RewardLamports)
burnedLamports.Add(rec.RewardLamports)
return
}

Expand All @@ -218,12 +242,18 @@ func DistributeStakingRewardsFromSpool(acctsDb *accountsdb.AccountsDb, spoolDir

err = sealevel.MarshalStakeStakeInto(stakeState, stakeAcct.Data)
if err != nil {
panic(fmt.Sprintf("unable to serialize stake state in spool distribution: %s", err))
// Re-encode failure — matches Agave DistributionError::UnableToSetState
mlog.Log.Warnf("spool distribution: stake state encode failed for %s, %d lamports burned: %v", rec.StakePubkey, rec.RewardLamports, err)
burnedLamports.Add(rec.RewardLamports)
return
}

stakeAcct.Lamports, err = safemath.CheckedAddU64(stakeAcct.Lamports, rec.RewardLamports)
if err != nil {
panic(fmt.Sprintf("overflow in spool distribution in slot %d to acct %s: %s", currentSlot, rec.StakePubkey, err))
// Arithmetic overflow — matches Agave DistributionError::ArithmeticOverflow
mlog.Log.Warnf("spool distribution: lamports overflow for %s, %d lamports burned: %v", rec.StakePubkey, rec.RewardLamports, err)
burnedLamports.Add(rec.RewardLamports)
return
}

accts[idx] = stakeAcct
Expand All @@ -237,12 +267,22 @@ func DistributeStakingRewardsFromSpool(acctsDb *accountsdb.AccountsDb, spoolDir
wg.Wait()
workerPool.Release()

err = acctsDb.StoreAccounts(accts, currentSlot, nil)
if err != nil {
panic(fmt.Sprintf("error updating accounts for spool distribution in slot %d: %s", currentSlot, err))
// Filter out nil entries (burned records) before storing
var filteredAccts []*accounts.Account
for _, a := range accts {
if a != nil {
filteredAccts = append(filteredAccts, a)
}
}

if len(filteredAccts) > 0 {
err = acctsDb.StoreAccounts(filteredAccts, currentSlot, nil)
if err != nil {
panic(fmt.Sprintf("error updating accounts for spool distribution in slot %d: %s", currentSlot, err))
}
}

return accts, parentAccts, distributedLamports.Load()
return accts, parentAccts, distributedLamports.Load(), burnedLamports.Load()
}

func minimumStakeDelegation(slotCtx *sealevel.SlotCtx) uint64 {
Expand Down