diff --git a/cmd/mithril/node/node.go b/cmd/mithril/node/node.go index edb7af08..72608a19 100644 --- a/cmd/mithril/node/node.go +++ b/cmd/mithril/node/node.go @@ -809,6 +809,8 @@ func runVerifyRange(c *cobra.Command, args []string) { snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot) } mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0) + // Populate manifest seed data so replay doesn't need manifest at runtime + snapshot.PopulateManifestSeed(mithrilState, manifest) if err := mithrilState.Save(accountsDbDir); err != nil { mlog.Log.Errorf("failed to save state file: %v", err) } @@ -1175,6 +1177,8 @@ func runLive(c *cobra.Command, args []string) { snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot) } mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0) + // Populate manifest seed data so replay doesn't need manifest at runtime + snapshot.PopulateManifestSeed(mithrilState, manifest) if err := mithrilState.Save(accountsPath); err != nil { mlog.Log.Errorf("failed to save state file: %v", err) } @@ -1243,6 +1247,8 @@ func runLive(c *cobra.Command, args []string) { snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot) } mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0) + // Populate manifest seed data so replay doesn't need manifest at runtime + snapshot.PopulateManifestSeed(mithrilState, manifest) if err := mithrilState.Save(accountsPath); err != nil { mlog.Log.Errorf("failed to save state file: %v", err) } @@ -1299,6 +1305,8 @@ func runLive(c *cobra.Command, args []string) { snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot) } mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0) + // Populate manifest seed data so replay doesn't need manifest at runtime + snapshot.PopulateManifestSeed(mithrilState, manifest) if err := mithrilState.Save(accountsPath); err != nil { mlog.Log.Errorf("failed to save state file: %v", err) } @@ -1369,6 +1377,8 @@ func runLive(c *cobra.Command, args []string) { snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot) } mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0) + // Populate manifest seed data so replay doesn't need manifest at runtime + snapshot.PopulateManifestSeed(mithrilState, manifest) if err := mithrilState.Save(accountsPath); err != nil { mlog.Log.Errorf("failed to save state file: %v", err) } @@ -1471,6 +1481,8 @@ func runLive(c *cobra.Command, args []string) { WriterVersion: getVersion(), WriterCommit: getCommit(), }) + // Populate manifest seed data so replay doesn't need manifest at runtime + snapshot.PopulateManifestSeed(mithrilState, manifest) if err := mithrilState.Save(accountsPath); err != nil { mlog.Log.Errorf("failed to save state file: %v", err) } @@ -1582,6 +1594,8 @@ postBootstrap: snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot) } mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0) + // Populate manifest seed data so replay doesn't need manifest at runtime + snapshot.PopulateManifestSeed(mithrilState, manifest) if err := mithrilState.Save(accountsPath); err != nil { mlog.Log.Errorf("failed to save state file: %v", err) } @@ -2604,6 +2618,6 @@ func runReplayWithRecovery( } }() - result = replay.ReplayBlocks(ctx, accountsDb, accountsDbPath, manifest, resumeState, startSlot, endSlot, rpcEndpoints, blockDir, txParallelism, isLive, useLightbringer, dbgOpts, metricsWriter, rpcServer, blockFetchOpts, onCancelWriteState) + result = replay.ReplayBlocks(ctx, accountsDb, accountsDbPath, mithrilState, resumeState, startSlot, endSlot, rpcEndpoints, blockDir, txParallelism, isLive, useLightbringer, dbgOpts, metricsWriter, rpcServer, blockFetchOpts, onCancelWriteState) return result } diff --git a/pkg/accountsdb/accountsdb.go b/pkg/accountsdb/accountsdb.go index 24deb284..58d451da 100644 --- a/pkg/accountsdb/accountsdb.go +++ b/pkg/accountsdb/accountsdb.go @@ -13,6 +13,7 @@ import ( "path/filepath" "runtime/trace" "sync" + "time" "sync/atomic" "github.com/Overclock-Validator/mithril/pkg/accounts" @@ -41,11 +42,6 @@ type AccountsDb struct { storeRequestChan chan *list.Element storeWorkerDone chan struct{} - // InRewardsWindow is set during partitioned epoch rewards distribution. - // When true, stake accounts are not cached in CommonAcctsCache since they're - // one-shot reads/writes that would evict genuinely hot accounts. - // Atomic for safe concurrent access from RPC goroutines. - InRewardsWindow atomic.Bool } type storeRequest struct { @@ -123,6 +119,24 @@ func OpenDb(accountsDbDir string) (*AccountsDb, error) { return accountsDb, nil } +// DrainStoreQueue waits until all queued async store requests have completed. +// Unlike WaitForStoreWorker, this does NOT shut down the worker — it just +// spins until the in-progress list is empty. +func (accountsDb *AccountsDb) DrainStoreQueue() { + if !StoreAsync { + return + } + for { + accountsDb.inProgressStoreRequestsMu.Lock() + empty := accountsDb.inProgressStoreRequests.Len() == 0 + accountsDb.inProgressStoreRequestsMu.Unlock() + if empty { + return + } + time.Sleep(time.Millisecond) + } +} + // Turns down the store worker. AccountsDb cannot accept writes after this if StoreAsync. // Should not be called concurrently. func (accountsDb *AccountsDb) WaitForStoreWorker() { @@ -231,7 +245,7 @@ func (accountsDb *AccountsDb) getStoredAccount(slot uint64, pubkey solana.Public return nil, ErrNoAccount } - acctIdxEntry, err := unmarshalAcctIdxEntry(acctIdxEntryBytes) + acctIdxEntry, err := UnmarshalAcctIdxEntry(acctIdxEntryBytes) if err != nil { panic("failed to unmarshal AccountIndexEntry from index kv database") } @@ -268,9 +282,6 @@ func (accountsDb *AccountsDb) getStoredAccount(slot uint64, pubkey solana.Public owner := solana.PublicKeyFromBytes(acct.Owner[:]) if owner == addresses.VoteProgramAddr { accountsDb.VoteAcctCache.Set(pubkey, acct) - } else if owner == addresses.StakeProgramAddr && accountsDb.InRewardsWindow.Load() { - // During reward distribution, stake accounts are one-shot reads that would - // evict genuinely hot accounts from the cache. Skip caching them. } else { accountsDb.CommonAcctsCache.Set(pubkey, acct) } @@ -343,7 +354,6 @@ func (accountsDb *AccountsDb) storeAccountsSync(accts []*accounts.Account, slot accountsDb.parallelStoreAccounts(StoreAccountsWorkers, accts, slot) } - inRewardsWindow := accountsDb.InRewardsWindow.Load() for _, acct := range accts { if acct == nil { continue @@ -351,9 +361,6 @@ func (accountsDb *AccountsDb) storeAccountsSync(accts []*accounts.Account, slot owner := solana.PublicKeyFromBytes(acct.Owner[:]) if owner == addresses.VoteProgramAddr { accountsDb.VoteAcctCache.Set(acct.Key, acct) - } else if owner == addresses.StakeProgramAddr && inRewardsWindow { - // During reward distribution, stake accounts are one-shot writes that would - // evict genuinely hot accounts from the cache. Skip caching them. } else { accountsDb.CommonAcctsCache.Set(acct.Key, acct) } @@ -365,12 +372,13 @@ func (accountsDb *AccountsDb) storeWorker() { for elt := range accountsDb.storeRequestChan { sr := elt.Value.(storeRequest) accountsDb.storeAccountsSync(sr.accts, sr.slot) - accountsDb.inProgressStoreRequestsMu.Lock() - accountsDb.inProgressStoreRequests.Remove(elt) - accountsDb.inProgressStoreRequestsMu.Unlock() if sr.cb != nil { sr.cb() } + // Remove after callback so DrainStoreQueue waits for callbacks (e.g. index flush) to complete + accountsDb.inProgressStoreRequestsMu.Lock() + accountsDb.inProgressStoreRequests.Remove(elt) + accountsDb.inProgressStoreRequestsMu.Unlock() } } @@ -407,7 +415,7 @@ func (accountsDb *AccountsDb) storeAccountsInternal(accts []*accounts.Account, s // if not, then we write out a new appendvec. existingacctIdxEntryBuf, c, err := accountsDb.Index.Get(acct.Key[:]) if err == nil { - acctIdxEntry, err := unmarshalAcctIdxEntry(existingacctIdxEntryBuf) + acctIdxEntry, err := UnmarshalAcctIdxEntry(existingacctIdxEntryBuf) if err != nil { panic("failed to unmarshal AccountIndexEntry from index kv database") } @@ -509,7 +517,7 @@ func (accountsDb *AccountsDb) parallelStoreAccounts(n int, accts []*accounts.Acc if err != nil { return fmt.Errorf("reading from index: %w", err) } - existingIdxEntry, err := unmarshalAcctIdxEntry(existingacctIdxEntryBuf) + existingIdxEntry, err := UnmarshalAcctIdxEntry(existingacctIdxEntryBuf) c.Close() if err != nil { return fmt.Errorf("unmarshaling index entry: %w", err) diff --git a/pkg/accountsdb/index.go b/pkg/accountsdb/index.go index 1d670b75..4cf25139 100644 --- a/pkg/accountsdb/index.go +++ b/pkg/accountsdb/index.go @@ -1,9 +1,11 @@ package accountsdb import ( + "bufio" "bytes" "encoding/binary" "fmt" + "os" "github.com/Overclock-Validator/mithril/pkg/addresses" "github.com/gagliardetto/solana-go" @@ -27,23 +29,71 @@ func (entry *AccountIndexEntry) Unmarshal(in *[24]byte) { entry.Offset = binary.LittleEndian.Uint64(in[16:24]) } -func unmarshalAcctIdxEntry(data []byte) (*AccountIndexEntry, error) { +func UnmarshalAcctIdxEntry(data []byte) (*AccountIndexEntry, error) { if len(data) < 24 { - return nil, fmt.Errorf("unmarshalAcctIdxEntry: input had %d < 24 minimum bytes", len(data)) + return nil, fmt.Errorf("UnmarshalAcctIdxEntry: input had %d < 24 minimum bytes", len(data)) } out := &AccountIndexEntry{} out.Unmarshal((*[24]byte)(data[:24])) return out, nil } +// StakeIndexEntry stores a stake account pubkey with its appendvec location hint. +// The location (FileId, Offset) is used for sorting to achieve sequential I/O. +// It may be stale; actual reads still go through Pebble for the canonical location. +type StakeIndexEntry struct { + Pubkey solana.PublicKey + FileId uint64 + Offset uint64 +} + +// StakeIndexMagic is the magic header for stake pubkey index files. +var StakeIndexMagic = [4]byte{'S', 'T', 'K', 'I'} + +const StakeIndexVersion = uint32(2) +const StakeIndexRecordSize = 48 // 32-byte pubkey + 8-byte fileId + 8-byte offset + +// WriteStakePubkeyIndex writes stake index entries. +// Format: 8-byte header ("STKI" + version uint32 LE) + N × 48-byte records. +func WriteStakePubkeyIndex(path string, entries []StakeIndexEntry) error { + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + buf := bufio.NewWriterSize(f, 1<<20) + + // Write header + var header [8]byte + copy(header[0:4], StakeIndexMagic[:]) + binary.LittleEndian.PutUint32(header[4:8], StakeIndexVersion) + if _, err := buf.Write(header[:]); err != nil { + return fmt.Errorf("writing stake index header: %w", err) + } + + // Write records + var record [StakeIndexRecordSize]byte + for _, e := range entries { + copy(record[0:32], e.Pubkey[:]) + binary.LittleEndian.PutUint64(record[32:40], e.FileId) + binary.LittleEndian.PutUint64(record[40:48], e.Offset) + if _, err := buf.Write(record[:]); err != nil { + return fmt.Errorf("writing stake index record: %w", err) + } + } + + return buf.Flush() +} + // BuildIndexEntriesFromAppendVecs parses an appendvec and returns: // - pubkeys: all account pubkeys // - acctIdxEntries: index entries for each account -// - stakePubkeys: pubkeys of accounts owned by the stake program -func BuildIndexEntriesFromAppendVecs(data []byte, fileSize uint64, slot uint64, fileId uint64) ([]solana.PublicKey, []AccountIndexEntry, []solana.PublicKey, error) { +// - stakeEntries: stake account pubkeys with their appendvec location hints +func BuildIndexEntriesFromAppendVecs(data []byte, fileSize uint64, slot uint64, fileId uint64) ([]solana.PublicKey, []AccountIndexEntry, []StakeIndexEntry, error) { pubkeys := make([]solana.PublicKey, 0, 20000) acctIdxEntries := make([]AccountIndexEntry, 0, 20000) - stakePubkeys := make([]solana.PublicKey, 0, 1000) + stakeEntries := make([]StakeIndexEntry, 0, 1000) var err error parser := &appendVecParser{Buf: data, FileSize: fileSize, FileId: fileId, Slot: slot} @@ -58,11 +108,16 @@ func BuildIndexEntriesFromAppendVecs(data []byte, fileSize uint64, slot uint64, acctIdxEntries = acctIdxEntries[:len(acctIdxEntries)-1] break } - // Collect stake account pubkeys for building stake index + // Collect stake account entries with appendvec location hints if bytes.Equal(owner[:], addresses.StakeProgramAddr[:]) { - stakePubkeys = append(stakePubkeys, pubkeys[len(pubkeys)-1]) + idx := len(acctIdxEntries) - 1 + stakeEntries = append(stakeEntries, StakeIndexEntry{ + Pubkey: pubkeys[len(pubkeys)-1], + FileId: acctIdxEntries[idx].FileId, + Offset: acctIdxEntries[idx].Offset, + }) } } - return pubkeys, acctIdxEntries, stakePubkeys, nil + return pubkeys, acctIdxEntries, stakeEntries, nil } diff --git a/pkg/global/global_ctx.go b/pkg/global/global_ctx.go index 6ea685d2..0c3f347d 100644 --- a/pkg/global/global_ctx.go +++ b/pkg/global/global_ctx.go @@ -3,16 +3,23 @@ package global import ( + "encoding/binary" "fmt" "os" "path/filepath" + "runtime" + "slices" "sync" + "sync/atomic" + "github.com/Overclock-Validator/mithril/pkg/accountsdb" "github.com/Overclock-Validator/mithril/pkg/epochstakes" "github.com/Overclock-Validator/mithril/pkg/forkchoice" "github.com/Overclock-Validator/mithril/pkg/leaderschedule" + "github.com/Overclock-Validator/mithril/pkg/mlog" "github.com/Overclock-Validator/mithril/pkg/sealevel" "github.com/gagliardetto/solana-go" + "github.com/panjf2000/ants/v2" ) // StakePubkeyIndexFileName is the name of the stake pubkey index file @@ -24,9 +31,11 @@ type GlobalCtx struct { slot uint64 epoch uint64 transactionCount uint64 - stakeCache map[solana.PublicKey]*sealevel.Delegation - pendingNewStakePubkeys []solana.PublicKey // New stake pubkeys to append to index after block commit + pendingNewStakePubkeys []accountsdb.StakeIndexEntry // New stake entries to append to index after block commit + cachedStakeEntries []accountsdb.StakeIndexEntry // Parsed+sorted index, populated on first load + entriesFlushedSinceCompact int // Appended entries since last compaction voteCache map[solana.PublicKey]*sealevel.VoteStateVersions + voteStakeTotals map[solana.PublicKey]uint64 // Aggregated stake totals per vote account (replaces full stake cache at startup) epochStakes *epochstakes.EpochStakesCache epochAuthorizedVoters *epochstakes.EpochAuthorizedVotersCache forkChoice *forkchoice.ForkChoiceService @@ -35,8 +44,9 @@ type GlobalCtx struct { calcUnixTimeForClockSysvar bool manageLeaderSchedule bool manageBlockHeight bool - stakeCacheMutex sync.Mutex // Changed from RWMutex - simpler, used for both cache and pending + pendingStakeMutex sync.Mutex // Protects pendingNewStakePubkeys voteCacheMutex sync.RWMutex + voteStakeTotalsMu sync.RWMutex slotsConfirmedMutex sync.Mutex mu sync.Mutex } @@ -75,39 +85,13 @@ func IncrTransactionCount(num uint64) { instance.IncrTransactionCount(num) } -// PutStakeCacheItem adds or updates a stake cache entry during replay. -// If this is a NEW pubkey (not already in cache), it's added to pendingNewStakePubkeys -// for later append to the index file via FlushPendingStakePubkeys. -func PutStakeCacheItem(pubkey solana.PublicKey, delegation *sealevel.Delegation) { - instance.stakeCacheMutex.Lock() - defer instance.stakeCacheMutex.Unlock() - if instance.stakeCache == nil { - instance.stakeCache = make(map[solana.PublicKey]*sealevel.Delegation) - } - // Track new pubkeys for index append - _, exists := instance.stakeCache[pubkey] - if !exists { - instance.pendingNewStakePubkeys = append(instance.pendingNewStakePubkeys, pubkey) - } - instance.stakeCache[pubkey] = delegation -} - -// PutStakeCacheItemBulk adds a stake cache entry during bulk population (startup). -// Does NOT track new pubkeys - use this when loading cache from index/snapshot/scan -// to avoid enqueueing the entire cache on rebuild. -func PutStakeCacheItemBulk(pubkey solana.PublicKey, delegation *sealevel.Delegation) { - instance.stakeCacheMutex.Lock() - defer instance.stakeCacheMutex.Unlock() - if instance.stakeCache == nil { - instance.stakeCache = make(map[solana.PublicKey]*sealevel.Delegation) - } - instance.stakeCache[pubkey] = delegation -} - -func DeleteStakeCacheItem(pubkey solana.PublicKey) { - instance.stakeCacheMutex.Lock() - defer instance.stakeCacheMutex.Unlock() - delete(instance.stakeCache, pubkey) +// EnqueuePendingStakePubkey records a stake pubkey for later append to the index file. +// Called during tx processing when a stake account is created or modified. +// Deduplication happens at index load time (LoadStakePubkeyIndex keeps last occurrence). +func EnqueuePendingStakePubkey(pubkey solana.PublicKey) { + instance.pendingStakeMutex.Lock() + defer instance.pendingStakeMutex.Unlock() + instance.pendingNewStakePubkeys = append(instance.pendingNewStakePubkeys, accountsdb.StakeIndexEntry{Pubkey: pubkey}) } func PutEpochAuthorizedVoter(voteAcct solana.PublicKey, authorizedVoter solana.PublicKey) { @@ -137,25 +121,6 @@ func SlotConfirmed(slot uint64) bool { return exists } -func StakeCache() map[solana.PublicKey]*sealevel.Delegation { - return instance.stakeCache -} - -func StakeCacheSnapshot() map[solana.PublicKey]*sealevel.Delegation { - instance.stakeCacheMutex.Lock() - defer instance.stakeCacheMutex.Unlock() - - if instance.stakeCache == nil { - return nil - } - - snapshot := make(map[solana.PublicKey]*sealevel.Delegation, len(instance.stakeCache)) - for pk, delegation := range instance.stakeCache { - snapshot[pk] = delegation - } - return snapshot -} - func PutVoteCacheItem(pubkey solana.PublicKey, voteState *sealevel.VoteStateVersions) { instance.voteCacheMutex.Lock() defer instance.voteCacheMutex.Unlock() @@ -196,6 +161,14 @@ func VoteCacheSnapshot() map[solana.PublicKey]*sealevel.VoteStateVersions { return snapshot } +// SetVoteStakeTotals sets the aggregated stake totals per vote account. +// Called at startup after scanning all stake accounts. +func SetVoteStakeTotals(m map[solana.PublicKey]uint64) { + instance.voteStakeTotalsMu.Lock() + instance.voteStakeTotals = m + instance.voteStakeTotalsMu.Unlock() +} + func PutEpochStakesEntry(epoch uint64, pubkey solana.PublicKey, stake uint64, voteAcct *epochstakes.VoteAccount) { if instance.epochStakes == nil { instance.epochStakes = epochstakes.NewEpochStakesCache() @@ -379,20 +352,22 @@ func (globctx *GlobalCtx) TransactionCount() uint64 { return globctx.transactionCount } -// FlushPendingStakePubkeys appends any new stake pubkeys discovered during replay +// FlushPendingStakePubkeys appends any new stake entries discovered during replay // to the stake pubkey index file. Called after each block commit. -// Returns the number of pubkeys flushed. +// Writes 48-byte records to match the header written at snapshot time. +// Returns the number of entries flushed. func FlushPendingStakePubkeys(accountsDbDir string) (int, error) { - instance.stakeCacheMutex.Lock() + instance.pendingStakeMutex.Lock() if len(instance.pendingNewStakePubkeys) == 0 { - instance.stakeCacheMutex.Unlock() + instance.pendingStakeMutex.Unlock() return 0, nil } - // Copy pending slice and clear it while holding lock - pending := make([]solana.PublicKey, len(instance.pendingNewStakePubkeys)) + // Copy pending slice, clear it, and invalidate cache while holding lock + pending := make([]accountsdb.StakeIndexEntry, len(instance.pendingNewStakePubkeys)) copy(pending, instance.pendingNewStakePubkeys) instance.pendingNewStakePubkeys = nil - instance.stakeCacheMutex.Unlock() + instance.cachedStakeEntries = nil // file is changing, invalidate cache + instance.pendingStakeMutex.Unlock() // Append to index file (don't hold lock during I/O) indexPath := filepath.Join(accountsDbDir, StakePubkeyIndexFileName) @@ -402,9 +377,27 @@ func FlushPendingStakePubkeys(accountsDbDir string) (int, error) { } defer f.Close() - for _, pk := range pending { - if _, err := f.Write(pk[:]); err != nil { - return 0, fmt.Errorf("writing stake pubkey to index: %w", err) + // If file is empty/new, write header first to avoid a headerless file + info, err := f.Stat() + if err != nil { + return 0, fmt.Errorf("stat stake pubkey index: %w", err) + } + if info.Size() == 0 { + var header [8]byte + copy(header[0:4], accountsdb.StakeIndexMagic[:]) + binary.LittleEndian.PutUint32(header[4:8], accountsdb.StakeIndexVersion) + if _, err := f.Write(header[:]); err != nil { + return 0, fmt.Errorf("writing stake index header: %w", err) + } + } + + var record [accountsdb.StakeIndexRecordSize]byte + for _, e := range pending { + copy(record[0:32], e.Pubkey[:]) + binary.LittleEndian.PutUint64(record[32:40], e.FileId) + binary.LittleEndian.PutUint64(record[40:48], e.Offset) + if _, err := f.Write(record[:]); err != nil { + return 0, fmt.Errorf("writing stake index entry: %w", err) } } @@ -414,76 +407,254 @@ func FlushPendingStakePubkeys(accountsDbDir string) (int, error) { return 0, fmt.Errorf("syncing stake pubkey index: %w", err) } + instance.pendingStakeMutex.Lock() + instance.entriesFlushedSinceCompact += len(pending) + instance.pendingStakeMutex.Unlock() + return len(pending), nil } // ClearPendingStakePubkeys discards any pending stake pubkeys without writing them. // Used for rollback on failed block replay. func ClearPendingStakePubkeys() { - instance.stakeCacheMutex.Lock() - defer instance.stakeCacheMutex.Unlock() + instance.pendingStakeMutex.Lock() + defer instance.pendingStakeMutex.Unlock() instance.pendingNewStakePubkeys = nil } -// LoadStakePubkeyIndex reads the stake pubkey index file and returns deduplicated pubkeys. -// Validates that file length is a multiple of 32 bytes. -func LoadStakePubkeyIndex(accountsDbDir string) ([]solana.PublicKey, error) { +// compactThreshold is the minimum number of appended entries before compaction triggers. +// At ~1 new stake account per block × 432k blocks/epoch ≈ a few thousand new entries max. +// 1000 keeps the file clean without rewriting 24MB every boundary when only a handful changed. +const compactThreshold = 1000 + +// CompactStakePubkeyIndex rewrites the index file from the cached deduplicated entries. +// Only triggers when at least compactThreshold entries have been appended since last compaction. +// Should be called at epoch boundary when the cache is already populated. +func CompactStakePubkeyIndex(accountsDbDir string) error { + instance.pendingStakeMutex.Lock() + flushed := instance.entriesFlushedSinceCompact + cached := instance.cachedStakeEntries + instance.pendingStakeMutex.Unlock() + + if flushed < compactThreshold { + return nil // Not enough new entries to justify rewrite + } + + if cached == nil || len(cached) == 0 { + return nil // Nothing to compact + } + + indexPath := filepath.Join(accountsDbDir, StakePubkeyIndexFileName) + if err := accountsdb.WriteStakePubkeyIndex(indexPath, cached); err != nil { + return fmt.Errorf("compacting stake pubkey index: %w", err) + } + + instance.pendingStakeMutex.Lock() + instance.entriesFlushedSinceCompact = 0 + instance.pendingStakeMutex.Unlock() + + mlog.Log.Infof("compacted stake pubkey index: %d entries", len(cached)) + return nil +} + +// LoadStakePubkeyIndex reads the stake pubkey index file, auto-detecting format. +// Returns deduplicated entries sorted by (FileId, Offset) for sequential I/O. +// Results are cached after first load; subsequent calls return the cached slice. +// IMPORTANT: The returned slice is shared — callers must NOT mutate it. +// Legacy format: 32-byte pubkeys with no location hints (FileId=0, Offset=0). +// Current format: 8-byte header ("STKI" + version) + 48-byte records (pubkey + fileId + offset). +func LoadStakePubkeyIndex(accountsDbDir string) ([]accountsdb.StakeIndexEntry, error) { + instance.pendingStakeMutex.Lock() + if instance.cachedStakeEntries != nil { + cached := instance.cachedStakeEntries + instance.pendingStakeMutex.Unlock() + return cached, nil + } + instance.pendingStakeMutex.Unlock() + indexPath := filepath.Join(accountsDbDir, StakePubkeyIndexFileName) data, err := os.ReadFile(indexPath) if err != nil { return nil, err } - // Validate file length - if len(data)%32 != 0 { - return nil, fmt.Errorf("stake pubkey index file corrupt: length %d is not a multiple of 32", len(data)) - } + var entries []accountsdb.StakeIndexEntry - numPubkeys := len(data) / 32 - if numPubkeys == 0 { - return nil, fmt.Errorf("stake pubkey index file is empty (0 pubkeys) - indicates corrupt or incomplete AccountsDB") + // Detect format: current format starts with "STKI" magic + if len(data) >= 8 && string(data[0:4]) == "STKI" { + entries, err = loadStakePubkeyIndexCurrent(data) + } else { + entries, err = loadStakePubkeyIndexLegacy(data) + } + if err != nil { + return nil, err } - // Deduplicate pubkeys using a map - seen := make(map[solana.PublicKey]struct{}, numPubkeys) - pubkeys := make([]solana.PublicKey, 0, numPubkeys) + if len(entries) == 0 { + return nil, fmt.Errorf("stake pubkey index file is empty (0 entries) - indicates corrupt or incomplete AccountsDB") + } - for i := 0; i < len(data); i += 32 { - var pk solana.PublicKey - copy(pk[:], data[i:i+32]) - if _, exists := seen[pk]; !exists { - seen[pk] = struct{}{} - pubkeys = append(pubkeys, pk) + // Deduplicate by pubkey (keep last occurrence = freshest location hint) + seen := make(map[solana.PublicKey]int, len(entries)) + deduped := make([]accountsdb.StakeIndexEntry, 0, len(entries)) + for _, e := range entries { + if idx, exists := seen[e.Pubkey]; exists { + deduped[idx] = e // overwrite with newer entry + } else { + seen[e.Pubkey] = len(deduped) + deduped = append(deduped, e) } } - return pubkeys, nil + // Sort by (FileId, Offset) for sequential appendvec I/O + slices.SortFunc(deduped, func(a, b accountsdb.StakeIndexEntry) int { + if a.FileId != b.FileId { + if a.FileId < b.FileId { + return -1 + } + return 1 + } + if a.Offset < b.Offset { + return -1 + } + if a.Offset > b.Offset { + return 1 + } + return 0 + }) + + // Cache for subsequent calls + instance.pendingStakeMutex.Lock() + instance.cachedStakeEntries = deduped + instance.pendingStakeMutex.Unlock() + + return deduped, nil } -// SaveStakePubkeyIndex writes the current stake cache pubkeys to the index file. -// This compacts the index by removing duplicates and closed accounts. -// Used during graceful shutdown. -func SaveStakePubkeyIndex(accountsDbDir string) error { - instance.stakeCacheMutex.Lock() - // Get current cache pubkeys - pubkeys := make([]solana.PublicKey, 0, len(instance.stakeCache)) - for pk := range instance.stakeCache { - pubkeys = append(pubkeys, pk) +func loadStakePubkeyIndexLegacy(data []byte) ([]accountsdb.StakeIndexEntry, error) { + if len(data)%32 != 0 { + return nil, fmt.Errorf("stake pubkey index V1 corrupt: length %d not multiple of 32", len(data)) + } + count := len(data) / 32 + entries := make([]accountsdb.StakeIndexEntry, count) + for i := 0; i < count; i++ { + copy(entries[i].Pubkey[:], data[i*32:(i+1)*32]) + // FileId=0, Offset=0 — no location hint, sort is effectively no-op } - instance.stakeCacheMutex.Unlock() + return entries, nil +} - indexPath := filepath.Join(accountsDbDir, StakePubkeyIndexFileName) - f, err := os.Create(indexPath) +func loadStakePubkeyIndexCurrent(data []byte) ([]accountsdb.StakeIndexEntry, error) { + version := binary.LittleEndian.Uint32(data[4:8]) + if version != accountsdb.StakeIndexVersion { + return nil, fmt.Errorf("stake pubkey index: unsupported version %d", version) + } + body := data[8:] + if len(body)%accountsdb.StakeIndexRecordSize != 0 { + return nil, fmt.Errorf("stake pubkey index corrupt: body length %d not multiple of %d", len(body), accountsdb.StakeIndexRecordSize) + } + count := len(body) / accountsdb.StakeIndexRecordSize + entries := make([]accountsdb.StakeIndexEntry, count) + for i := 0; i < count; i++ { + base := i * accountsdb.StakeIndexRecordSize + copy(entries[i].Pubkey[:], body[base:base+32]) + entries[i].FileId = binary.LittleEndian.Uint64(body[base+32 : base+40]) + entries[i].Offset = binary.LittleEndian.Uint64(body[base+40 : base+48]) + } + return entries, nil +} + +// StreamStakeAccounts iterates all stake accounts from the pubkey index, +// calling fn for each valid delegation. Returns count of processed accounts. +// This streams directly from AccountsDB without building a full stake cache. +// Entries are sorted by (FileId, Offset) and batched by FileId for sequential I/O. +func StreamStakeAccounts( + acctsDb *accountsdb.AccountsDb, + slot uint64, + fn func(pubkey solana.PublicKey, delegation *sealevel.Delegation, creditsObserved uint64), +) (int, error) { + // Load stake entries from index file (already sorted by FileId, Offset) + acctsDbDir := filepath.Join(acctsDb.AcctsDir, "..") + stakeEntries, err := LoadStakePubkeyIndex(acctsDbDir) if err != nil { - return fmt.Errorf("creating stake pubkey index: %w", err) + return 0, fmt.Errorf("loading stake pubkey index: %w", err) } - defer f.Close() - for _, pk := range pubkeys { - if _, err := f.Write(pk[:]); err != nil { - return fmt.Errorf("writing stake pubkey to index: %w", err) + var processedCount atomic.Int64 + var getAccountErrors atomic.Int64 + var unmarshalErrors atomic.Int64 + var statusNotStake atomic.Int64 + var wg sync.WaitGroup + + totalEntries := len(stakeEntries) + + // Batched worker pool for parallel processing + const maxBatchSize = 2000 + workerPool, err := ants.NewPoolWithFunc(runtime.NumCPU()*2, func(i interface{}) { + defer wg.Done() + + batch := i.([]accountsdb.StakeIndexEntry) + + for _, entry := range batch { + // Read stake account from AccountsDB + stakeAcct, err := acctsDb.GetAccount(slot, entry.Pubkey) + if err != nil { + getAccountErrors.Add(1) + continue // Account not found or closed + } + + stakeState, err := sealevel.UnmarshalStakeState(stakeAcct.Data) + if err != nil { + unmarshalErrors.Add(1) + continue // Invalid stake state + } + + // Only process delegated stake accounts (status must be "Stake") + if stakeState.Status != sealevel.StakeStateV2StatusStake { + statusNotStake.Add(1) + continue + } + + // Call the callback with delegation data + delegation := &stakeState.Stake.Stake.Delegation + creditsObserved := stakeState.Stake.Stake.CreditsObserved + fn(entry.Pubkey, delegation, creditsObserved) + processedCount.Add(1) + } + }) + if err != nil { + return 0, fmt.Errorf("creating worker pool: %w", err) + } + defer workerPool.Release() + + // Submit FileId-aligned batches for sequential appendvec I/O + var invokeErr error + batchStart := 0 + for i := 1; i <= len(stakeEntries); i++ { + flush := i == len(stakeEntries) || + stakeEntries[i].FileId != stakeEntries[batchStart].FileId || + i-batchStart >= maxBatchSize + if flush && i > batchStart { + wg.Add(1) + if err := workerPool.Invoke(stakeEntries[batchStart:i]); err != nil { + wg.Done() + invokeErr = fmt.Errorf("worker pool invoke failed: %w", err) + break + } + batchStart = i } } - return nil + // Always wait for all queued workers to finish before returning + wg.Wait() + + if invokeErr != nil { + return 0, invokeErr + } + + // Log diagnostic counters for debugging stake account streaming + mlog.Log.Infof("StreamStakeAccounts: totalEntries=%d processed=%d getAccountErrors=%d unmarshalErrors=%d statusNotStake=%d", + totalEntries, processedCount.Load(), getAccountErrors.Load(), unmarshalErrors.Load(), statusNotStake.Load()) + + return int(processedCount.Load()), nil } diff --git a/pkg/replay/block.go b/pkg/replay/block.go index 2f2e7f17..00a278b8 100644 --- a/pkg/replay/block.go +++ b/pkg/replay/block.go @@ -3,6 +3,7 @@ package replay import ( "context" "crypto/rand" + "encoding/base64" "encoding/hex" "encoding/json" "fmt" @@ -25,7 +26,6 @@ import ( "github.com/Overclock-Validator/mithril/pkg/base58" b "github.com/Overclock-Validator/mithril/pkg/block" "github.com/Overclock-Validator/mithril/pkg/blockstream" - "github.com/Overclock-Validator/mithril/pkg/epochstakes" "github.com/Overclock-Validator/mithril/pkg/features" "github.com/Overclock-Validator/mithril/pkg/fees" "github.com/Overclock-Validator/mithril/pkg/global" @@ -37,7 +37,7 @@ import ( "github.com/Overclock-Validator/mithril/pkg/rpcclient" "github.com/Overclock-Validator/mithril/pkg/rpcserver" "github.com/Overclock-Validator/mithril/pkg/sealevel" - "github.com/Overclock-Validator/mithril/pkg/snapshot" + "github.com/Overclock-Validator/mithril/pkg/state" "github.com/Overclock-Validator/mithril/pkg/statsd" bin "github.com/gagliardetto/binary" "github.com/gagliardetto/solana-go" @@ -694,15 +694,17 @@ func scanAndEnableFeatures(acctsDb *accountsdb.AccountsDb, slot uint64, startOfE // 3. Extract delegation fields (VoterPubkey, StakeLamports, epochs, etc.) from AccountsDB // // This ensures the stake cache reflects the actual on-chain state, not potentially outdated -// manifest data. Fatal error if index file is missing - indicates corrupt/incomplete AccountsDB. -func setupInitialVoteAcctsAndStakeAccts(acctsDb *accountsdb.AccountsDb, block *b.Block, snapshotManifest *snapshot.SnapshotManifest) { +// data. Fatal error if index file is missing - indicates corrupt/incomplete AccountsDB. +// NO manifest parameter - derives everything from AccountsDB. +func setupInitialVoteAcctsAndStakeAccts(acctsDb *accountsdb.AccountsDb, block *b.Block) { block.VoteTimestamps = make(map[solana.PublicKey]sealevel.BlockTimestamp) block.EpochStakesPerVoteAcct = make(map[solana.PublicKey]uint64) - // Load stake pubkeys from index file built during snapshot processing + // Load stake entries from index file built during snapshot processing // The index is in the accountsDbDir which is parent of AcctsDir + // Entries are sorted by (FileId, Offset) for sequential appendvec I/O acctsDbDir := filepath.Join(acctsDb.AcctsDir, "..") - stakePubkeys, err := global.LoadStakePubkeyIndex(acctsDbDir) + stakeEntries, err := global.LoadStakePubkeyIndex(acctsDbDir) if err != nil { // Fatal error - stake index is required for resume and must exist mlog.Log.Errorf("=======================================================") @@ -719,32 +721,28 @@ func setupInitialVoteAcctsAndStakeAccts(acctsDb *accountsdb.AccountsDb, block *b mlog.Log.Errorf("=======================================================") os.Exit(1) } - mlog.Log.Infof("Loading vote and stake caches") + mlog.Log.Infof("Loading vote and stake caches (aggregate-only mode, %d stake accounts)", len(stakeEntries)) var wg sync.WaitGroup - voteAcctWorkerPool, _ := ants.NewPoolWithFunc(1024, func(i interface{}) { - defer wg.Done() - pk := i.(solana.PublicKey) - voteAcct, err := acctsDb.GetAccount(block.Slot, pk) - if err == nil { - versionedVoteState, err := sealevel.UnmarshalVersionedVoteState(voteAcct.Data) - if err == nil { - global.PutVoteCacheItem(pk, versionedVoteState) - } - } - }) + // Shared aggregated stake totals - built directly from AccountsDB scan + // Thread-safe: each worker builds local map, then merges under mutex + voteAcctStakes := make(map[solana.PublicKey]uint64) + var voteAcctStakesMu sync.Mutex - // Stake worker pool reads ALL delegation fields from AccountsDB (not manifest) - // Uses batched processing to reduce wg.Add/Invoke overhead (1M pubkeys → ~1K batches) - const stakeBatchSize = 1000 + // Stake worker pool reads stake accounts and aggregates totals directly + const maxBatchSize = 2000 stakeAcctWorkerPool, _ := ants.NewPoolWithFunc(runtime.NumCPU()*2, func(i interface{}) { defer wg.Done() - batch := i.([]solana.PublicKey) - for _, pk := range batch { - // Read from AccountsDB - ALL fields, not manifest - stakeAcct, err := acctsDb.GetAccount(block.Slot, pk) + batch := i.([]accountsdb.StakeIndexEntry) + + // Build local aggregation for this batch + localStakes := make(map[solana.PublicKey]uint64) + + for _, entry := range batch { + // Read from AccountsDB + stakeAcct, err := acctsDb.GetAccount(block.Slot, entry.Pubkey) if err != nil { continue // Account not found or closed } @@ -754,80 +752,114 @@ func setupInitialVoteAcctsAndStakeAccts(acctsDb *accountsdb.AccountsDb, block *b continue // Invalid stake state } - // Only cache if this is a delegated stake account (status must be "Stake") + // Only count delegated stake accounts (status must be "Stake") if stakeState.Status != sealevel.StakeStateV2StatusStake { continue } - // Use delegation from AccountsDB, not manifest - // Use Bulk variant for startup loading - doesn't track as "new" for index append + // Aggregate stake by vote account delegation := stakeState.Stake.Stake.Delegation - global.PutStakeCacheItemBulk(pk, - &sealevel.Delegation{ - VoterPubkey: delegation.VoterPubkey, - StakeLamports: delegation.StakeLamports, - ActivationEpoch: delegation.ActivationEpoch, - DeactivationEpoch: delegation.DeactivationEpoch, - WarmupCooldownRate: delegation.WarmupCooldownRate, - CreditsObserved: stakeState.Stake.Stake.CreditsObserved, - }) + localStakes[delegation.VoterPubkey] += delegation.StakeLamports } - }) - wg.Add(1) - go func() { - defer wg.Done() - for _, va := range snapshotManifest.Bank.Stakes.VoteAccounts { - ts := sealevel.BlockTimestamp{Slot: va.Value.LastTimestampSlot, Timestamp: va.Value.LastTimestampTs} - block.VoteTimestamps[va.Key] = ts - block.EpochStakesPerVoteAcct[va.Key] = va.Stake - block.TotalEpochStake += va.Stake + // Merge local aggregation into shared map under lock + voteAcctStakesMu.Lock() + for voter, stake := range localStakes { + voteAcctStakes[voter] += stake + } + voteAcctStakesMu.Unlock() + }) + // Submit FileId-aligned batches for sequential appendvec I/O + batchStart := 0 + for i := 1; i <= len(stakeEntries); i++ { + flush := i == len(stakeEntries) || + stakeEntries[i].FileId != stakeEntries[batchStart].FileId || + i-batchStart >= maxBatchSize + if flush && i > batchStart { wg.Add(1) - voteAcctWorkerPool.Invoke(va.Key) + stakeAcctWorkerPool.Invoke(stakeEntries[batchStart:i]) + batchStart = i } - }() - - // Submit stake pubkeys in batches (reduces wg.Add/Invoke calls from ~1M to ~1K) - numBatches := (len(stakePubkeys) + stakeBatchSize - 1) / stakeBatchSize - wg.Add(numBatches) - for i := 0; i < len(stakePubkeys); i += stakeBatchSize { - end := min(i+stakeBatchSize, len(stakePubkeys)) - stakeAcctWorkerPool.Invoke(stakePubkeys[i:end]) } wg.Wait() stakeAcctWorkerPool.Release() - voteAcctWorkerPool.Release() - // After both caches are loaded, ensure vote cache has ALL vote accounts - // referenced by stake cache (catches any vote accounts not in manifest) - voteAcctStakes := make(map[solana.PublicKey]uint64) - for _, delegation := range global.StakeCache() { - voteAcctStakes[delegation.VoterPubkey] += delegation.StakeLamports - } + // Store aggregated totals in global for later use + global.SetVoteStakeTotals(voteAcctStakes) + + // Load vote accounts from AccountsDB into vote cache if err := RebuildVoteCacheFromAccountsDB(acctsDb, block.Slot, voteAcctStakes, 0); err != nil { mlog.Log.Warnf("vote cache rebuild had errors: %v", err) } + + // Derive EpochStakesPerVoteAcct and TotalEpochStake from aggregated totals + for pk, stake := range voteAcctStakes { + block.EpochStakesPerVoteAcct[pk] = stake + block.TotalEpochStake += stake + } + + // Derive VoteTimestamps from ALL vote accounts in cache (including zero-stake) + // This matches original manifest behavior where all vote accounts had timestamps populated + for pk, voteState := range global.VoteCache() { + if voteState != nil { + ts := voteState.LastTimestamp() + if ts != nil { + block.VoteTimestamps[pk] = *ts + } + } + } } func configureInitialBlock(acctsDb *accountsdb.AccountsDb, block *b.Block, - snapshotManifest *snapshot.SnapshotManifest, + mithrilState *state.MithrilState, epochCtx *ReplayCtx, epochSchedule *sealevel.SysvarEpochSchedule, rpcClient *rpcclient.RpcClient, auxBackupEndpoints []string) error { - block.ParentBankhash = snapshotManifest.Bank.Hash - block.ParentSlot = snapshotManifest.Bank.Slot - block.AcctsLtHash = snapshotManifest.LtHash + // Read from state file manifest_* fields (required) + if mithrilState.ManifestParentBankhash == "" { + return fmt.Errorf("state file missing manifest_parent_bankhash - delete AccountsDB and rebuild from snapshot") + } + + parentBankhash, err := base58.DecodeFromString(mithrilState.ManifestParentBankhash) + if err != nil { + return fmt.Errorf("corrupted state file: failed to decode manifest_parent_bankhash: %w", err) + } + block.ParentBankhash = parentBankhash + block.ParentSlot = mithrilState.ManifestParentSlot + + // LtHash: decode base64, restore with InitWithHash + if mithrilState.ManifestAcctsLtHash != "" { + ltHashBytes, err := base64.StdEncoding.DecodeString(mithrilState.ManifestAcctsLtHash) + if err != nil { + return fmt.Errorf("corrupted state file: failed to decode manifest_accts_lt_hash: %w", err) + } + block.AcctsLtHash = new(lthash.LtHash).InitWithHash(ltHashBytes) + } + + block.PrevFeeRateGovernor = reconstructFeeRateGovernor(mithrilState) + if block.PrevFeeRateGovernor == nil { + return fmt.Errorf("state file missing manifest_fee_rate_governor - delete AccountsDB and rebuild from snapshot") + } + block.PrevNumSignatures = mithrilState.ManifestSignatureCount + block.InitialPreviousLamportsPerSignature = mithrilState.ManifestLamportsPerSignature + + if mithrilState.ManifestEvictedBlockhash == "" { + return fmt.Errorf("state file missing manifest_evicted_blockhash - delete AccountsDB and rebuild from snapshot") + } + evictedHash, err := base58.DecodeFromString(mithrilState.ManifestEvictedBlockhash) + if err != nil { + return fmt.Errorf("corrupted state file: failed to decode manifest_evicted_blockhash: %w", err) + } + block.LatestEvictedBlockhash = evictedHash + block.EpochAcctsHash = epochCtx.EpochAcctsHash - block.PrevFeeRateGovernor = &snapshotManifest.Bank.FeeRateGovernor - block.PrevNumSignatures = snapshotManifest.Bank.SignatureCount - block.InitialPreviousLamportsPerSignature = snapshotManifest.LamportsPerSignature - setupInitialVoteAcctsAndStakeAccts(acctsDb, block, snapshotManifest) + setupInitialVoteAcctsAndStakeAccts(acctsDb, block) configureGlobalCtx(block) if global.ManageLeaderSchedule() { @@ -847,18 +879,25 @@ func configureInitialBlock(acctsDb *accountsdb.AccountsDb, block.BlockHeight = global.BlockHeight() } - // we use the RecentBlockhashes sysvar to determine whether a tx has a blockhash of acceptable - // age, but due to how Agave's BlockhashQueue is implemented, the latest 151 blockhashes - // are valid, rather than 150. we therefore need the last blockhash that was evicted from - // the RecentBlockhashes sysvar, and that's what the code below does. - ages := snapshotManifest.Bank.BlockhashQueue.HashAndAge - sort.Slice(ages, func(i, j int) bool { return ages[i].Val.HashIndex > ages[j].Val.HashIndex }) - block.LatestEvictedBlockhash = ages[150].Key - - snapshotManifest = nil return nil } +// reconstructFeeRateGovernor creates a FeeRateGovernor from state file manifest_* fields +func reconstructFeeRateGovernor(s *state.MithrilState) *sealevel.FeeRateGovernor { + if s.ManifestFeeRateGovernor == nil { + return nil + } + return &sealevel.FeeRateGovernor{ + TargetLamportsPerSignature: s.ManifestFeeRateGovernor.TargetLamportsPerSignature, + TargetSignaturesPerSlot: s.ManifestFeeRateGovernor.TargetSignaturesPerSlot, + MinLamportsPerSignature: s.ManifestFeeRateGovernor.MinLamportsPerSignature, + MaxLamportsPerSignature: s.ManifestFeeRateGovernor.MaxLamportsPerSignature, + BurnPercent: s.ManifestFeeRateGovernor.BurnPercent, + LamportsPerSignature: s.ManifestLamportsPerSignature, + PrevLamportsPerSignature: s.ManifestLamportsPerSignature, // Initial = current for fresh start + } +} + func configureBlock(block *b.Block, epochCtx *ReplayCtx, lastSlotCtx *sealevel.SlotCtx, @@ -935,7 +974,7 @@ func ensureStakeHistorySysvarCached(acctsDb *accountsdb.AccountsDb, slot uint64) func configureInitialBlockFromResume(acctsDb *accountsdb.AccountsDb, block *b.Block, resumeState *ResumeState, - snapshotManifest *snapshot.SnapshotManifest, // Still needed for static FeeRateGovernor fields + mithrilState *state.MithrilState, epochCtx *ReplayCtx, epochSchedule *sealevel.SysvarEpochSchedule, rpcClient *rpcclient.RpcClient, @@ -947,8 +986,11 @@ func configureInitialBlockFromResume(acctsDb *accountsdb.AccountsDb, block.AcctsLtHash = resumeState.AcctsLtHash block.EpochAcctsHash = epochCtx.EpochAcctsHash - // Reconstruct PrevFeeRateGovernor from manifest static fields + resume dynamic fields - prevFeeRateGovernor := snapshotManifest.Bank.FeeRateGovernor.Clone() + // Reconstruct PrevFeeRateGovernor from state file static fields + resume dynamic fields + prevFeeRateGovernor := reconstructFeeRateGovernor(mithrilState) + if prevFeeRateGovernor == nil { + return fmt.Errorf("cannot resume: state file missing manifest_fee_rate_governor (rebuild AccountsDB required)") + } prevFeeRateGovernor.LamportsPerSignature = resumeState.LamportsPerSignature prevFeeRateGovernor.PrevLamportsPerSignature = resumeState.PrevLamportsPerSignature block.PrevFeeRateGovernor = prevFeeRateGovernor @@ -957,7 +999,7 @@ func configureInitialBlockFromResume(acctsDb *accountsdb.AccountsDb, // Load vote accounts and populate global caches - same as fresh start // This seeds both block.VoteAccts/VoteTimestamps AND global.VoteCache() from AccountsDB // Required because getTimestampEstimate reads from global.VoteCache() - setupInitialVoteAcctsAndStakeAccts(acctsDb, block, snapshotManifest) + setupInitialVoteAcctsAndStakeAccts(acctsDb, block) configureGlobalCtx(block) // On resume, epoch stakes will be loaded from the persisted state file (not manifest or AccountsDB). @@ -1034,35 +1076,42 @@ func configureGlobalCtx(block *b.Block) { global.SetBlockHeight(block.BlockHeight) } -// buildInitialEpochStakesCache seeds the epoch stakes cache from manifest. -// If persistedEpochs is non-nil, skips epochs already loaded from the state file. -func buildInitialEpochStakesCache(snapshotManifest *snapshot.SnapshotManifest, persistedEpochs map[uint64]bool) { - for _, epochStake := range snapshotManifest.VersionedEpochStakes { - // Skip epochs already loaded from persisted state file - if persistedEpochs != nil && persistedEpochs[epochStake.Epoch] { - mlog.Log.Debugf("skipping epoch %d stakes from manifest (already loaded from state file)", epochStake.Epoch) - continue - } +// buildInitialEpochStakesCache seeds the epoch stakes cache from state file or manifest. +// Priority: 1) State file ManifestEpochStakes, 2) Direct manifest (backwards compat) +func buildInitialEpochStakesCache(mithrilState *state.MithrilState) error { + // Require state file ManifestEpochStakes (PersistedEpochStakes JSON format) + if mithrilState == nil || len(mithrilState.ManifestEpochStakes) == 0 { + return fmt.Errorf("state file missing manifest_epoch_stakes - delete AccountsDB and rebuild from snapshot") + } - if epochStake.Epoch == snapshotManifest.Bank.Epoch { - for _, entry := range epochStake.Val.EpochAuthorizedVoters { - global.PutEpochAuthorizedVoter(entry.Key, entry.Val) - } + for epoch, data := range mithrilState.ManifestEpochStakes { + if loadedEpoch, err := global.DeserializeAndLoadEpochStakes([]byte(data)); err != nil { + return fmt.Errorf("failed to load manifest epoch %d stakes from state file: %w", epoch, err) + } else { + mlog.Log.Debugf("loaded epoch %d stakes from state file manifest_epoch_stakes", loadedEpoch) } + } - global.PutEpochTotalStake(epochStake.Epoch, epochStake.Val.TotalStake) - for _, entry := range epochStake.Val.Stakes.VoteAccounts { - voteAcct := &epochstakes.VoteAccount{Lamports: entry.Value.Lamports, - NodePubkey: entry.Value.NodePubkey, - LastTimestampTs: entry.Value.LastTimestampTs, - LastTimestampSlot: entry.Value.LastTimestampSlot, - Owner: entry.Value.Owner, - Executable: entry.Value.Executable, - RentEpoch: entry.Value.RentEpoch} - global.PutEpochStakesEntry(epochStake.Epoch, entry.Key, entry.Stake, voteAcct) + // Load EpochAuthorizedVoters from state file (required) + // Supports multiple authorized voters per vote account (matches original manifest behavior) + if len(mithrilState.ManifestEpochAuthorizedVoters) == 0 { + return fmt.Errorf("state file missing manifest_epoch_authorized_voters - delete AccountsDB and rebuild from snapshot") + } + for voteAcctStr, authorizedVoterStrs := range mithrilState.ManifestEpochAuthorizedVoters { + voteAcct, err := base58.DecodeFromString(voteAcctStr) + if err != nil { + return fmt.Errorf("corrupted state file: failed to decode epoch_authorized_voters key %s: %w", voteAcctStr, err) + } + for _, authorizedVoterStr := range authorizedVoterStrs { + authorizedVoter, err := base58.DecodeFromString(authorizedVoterStr) + if err != nil { + return fmt.Errorf("corrupted state file: failed to decode epoch_authorized_voters value %s: %w", authorizedVoterStr, err) + } + global.PutEpochAuthorizedVoter(voteAcct, authorizedVoter) } - mlog.Log.Debugf("loaded epoch %d stakes from manifest", epochStake.Epoch) } + + return nil } type persistedTracker struct { @@ -1088,20 +1137,12 @@ func (t *persistedTracker) Get() (uint64, []byte) { return slot, out } -func nilifySnapshotManifest(manifest *snapshot.SnapshotManifest) { - manifest.Bank = nil - manifest.AccountsDb = nil - manifest.BankIncrementalSnapshotPersistence = nil - manifest.VersionedEpochStakes = nil - manifest.LtHash = nil -} - func ReplayBlocks( ctx context.Context, acctsDb *accountsdb.AccountsDb, acctsDbPath string, - snapshotManifest *snapshot.SnapshotManifest, - resumeState *ResumeState, // nil if not resuming, contains parent slot info when resuming + mithrilState *state.MithrilState, // State file with manifest_* seed fields + resumeState *ResumeState, // nil if not resuming, contains parent slot info when resuming startSlot, endSlot uint64, rpcEndpoints []string, // RPC endpoints in priority order (first = primary, rest = fallbacks) blockDir string, @@ -1159,16 +1200,21 @@ func ReplayBlocks( var featuresActivatedInFirstSlot []*accounts.Account var parentFeaturesActivatedInFirstSlot []*accounts.Account - // Pass resumeState if resuming, so ReplayCtx uses fresh values instead of stale manifest - replayCtx := newReplayCtx(snapshotManifest, resumeState) + // Pass mithrilState + resumeState so ReplayCtx uses state file for seed data + replayCtx, err := newReplayCtx(mithrilState, resumeState) + if err != nil { + result.Error = err + return result + } - global.IncrTransactionCount(snapshotManifest.Bank.TransactionCount) + // Use state file for transaction count (required) + global.IncrTransactionCount(mithrilState.ManifestTransactionCount) isFirstSlotInEpoch := epochSchedule.FirstSlotInEpoch(currentEpoch) == startSlot replayCtx.CurrentFeatures, featuresActivatedInFirstSlot, parentFeaturesActivatedInFirstSlot = scanAndEnableFeatures(acctsDb, startSlot, isFirstSlotInEpoch) partitionedEpochRewardsEnabled = replayCtx.CurrentFeatures.IsActive(features.EnablePartitionedEpochReward) || replayCtx.CurrentFeatures.IsActive(features.EnablePartitionedEpochRewardsSuperfeature) - // Load epoch stakes - persisted stakes on resume, manifest on fresh start - snapshotEpoch := epochSchedule.GetEpoch(snapshotManifest.Bank.Slot) + // Load epoch stakes - persisted stakes on resume, state file on fresh start + snapshotEpoch := epochSchedule.GetEpoch(mithrilState.ManifestParentSlot) if resumeState != nil { // Resume case - check if we've crossed epoch boundaries since snapshot epochsCrossed := currentEpoch > snapshotEpoch @@ -1199,12 +1245,18 @@ func ReplayBlocks( return result } } else { - // Resume in same epoch as snapshot, no boundaries crossed - manifest is still valid - buildInitialEpochStakesCache(snapshotManifest, nil) + // Resume in same epoch as snapshot, no boundaries crossed - state file epoch stakes still valid + if err := buildInitialEpochStakesCache(mithrilState); err != nil { + result.Error = err + return result + } } } else { - // Fresh start: load all epochs from manifest - buildInitialEpochStakesCache(snapshotManifest, nil) + // Fresh start: load all epochs from state file + if err := buildInitialEpochStakesCache(mithrilState); err != nil { + result.Error = err + return result + } } //forkChoice, err := forkchoice.NewForkChoiceService(currentEpoch, global.EpochStakes(currentEpoch), global.EpochTotalStake(currentEpoch), global.EpochAuthorizedVoters(), 4) //forkChoice.Start() @@ -1349,15 +1401,12 @@ func ReplayBlocks( // the first emitted block might have slot > startSlot. if lastSlotCtx == nil { if resumeState != nil { - // RESUME: Use resume state + manifest (for static fields) - configErr = configureInitialBlockFromResume(acctsDb, block, resumeState, snapshotManifest, replayCtx, epochSchedule, rpcc, rpcBackups) + // RESUME: Use resume state + state file (for static fields) + configErr = configureInitialBlockFromResume(acctsDb, block, resumeState, mithrilState, replayCtx, epochSchedule, rpcc, rpcBackups) } else { - // FRESH START: Use snapshot manifest - configErr = configureInitialBlock(acctsDb, block, snapshotManifest, replayCtx, epochSchedule, rpcc, rpcBackups) + // FRESH START: Use state file manifest_* fields + configErr = configureInitialBlock(acctsDb, block, mithrilState, replayCtx, epochSchedule, rpcc, rpcBackups) } - // We're done with the SnapshotManifest object. Since these objects are quite large, we hint to the GC to free - // the object's contents by nil'ing the struct's members. - nilifySnapshotManifest(snapshotManifest) } else { configErr = configureBlock(block, replayCtx, lastSlotCtx, epochSchedule, rpcc, rpcBackups) } @@ -1377,12 +1426,14 @@ func ReplayBlocks( // epoch boundary if block.Epoch != currentEpoch { - mlog.Log.Infof("epoch boundary, %d -> %d", currentEpoch, currentEpoch+1) + mlog.Log.Infof("") + mlog.Log.Infof("=== Epoch Boundary ===") + mlog.Log.Infof("%d -> %d", currentEpoch, currentEpoch+1) var newlyActivatedFeatures, parentNewlyActivatedFeatures []*accounts.Account replayCtx.CurrentFeatures, newlyActivatedFeatures, parentNewlyActivatedFeatures = scanAndEnableFeatures(acctsDb, currentSlot, true) partitionedEpochRewardsEnabled = replayCtx.CurrentFeatures.IsActive(features.EnablePartitionedEpochReward) || replayCtx.CurrentFeatures.IsActive(features.EnablePartitionedEpochRewardsSuperfeature) - partitionedRewardsInfo = handleEpochTransition(acctsDb, rpcc, rpcBackups, partitionedEpochRewardsEnabled, lastSlotCtx, replayCtx, epochSchedule, replayCtx.CurrentFeatures, block, currentEpoch) + partitionedRewardsInfo = handleEpochTransition(acctsDb, partitionedEpochRewardsEnabled, lastSlotCtx, replayCtx, epochSchedule, replayCtx.CurrentFeatures, block, currentEpoch) currentEpoch = block.Epoch justCrossedEpochBoundary = true if len(newlyActivatedFeatures) != 0 { @@ -1464,6 +1515,13 @@ func ReplayBlocks( replayCtx.Capitalization -= lastSlotCtx.LamportsBurnt + // Clear ManifestEpochStakes after first replayed slot past snapshot + // This frees memory and ensures we don't use stale manifest data on restart + if block.Slot > mithrilState.SnapshotSlot && len(mithrilState.ManifestEpochStakes) > 0 { + mithrilState.ClearManifestEpochStakes() + mlog.Log.Debugf("cleared manifest_epoch_stakes after replaying past snapshot slot") + } + // Check for cancellation immediately after block completes. // This minimizes the window between bankhash persistence and state file update, // preventing false "corruption" detection on graceful shutdown. @@ -1472,6 +1530,7 @@ func ReplayBlocks( result.WasCancelled = true acctsDb.WaitForStoreWorker() + // Populate result immediately for state write result.LastPersistedSlot, result.LastPersistedBankhash = pt.Get() @@ -2130,7 +2189,7 @@ func ProcessBlock( if err != nil { mlog.Log.Infof("unable to store bankhash for slot %d", slotCtx.Slot) } - flushed, err := global.FlushPendingStakePubkeys(acctsDb.AcctsDir) + flushed, err := global.FlushPendingStakePubkeys(filepath.Join(acctsDb.AcctsDir, "..")) if err != nil { mlog.Log.Errorf("failed to flush stake pubkey index: %v", err) } else if flushed > 0 { diff --git a/pkg/replay/epoch.go b/pkg/replay/epoch.go index c4dad0a5..95858ab6 100644 --- a/pkg/replay/epoch.go +++ b/pkg/replay/epoch.go @@ -2,10 +2,12 @@ package replay import ( "bytes" + "encoding/base64" "fmt" "maps" - "runtime" + "path/filepath" "sync" + "time" "sync/atomic" "github.com/Overclock-Validator/mithril/pkg/accounts" @@ -16,12 +18,10 @@ import ( "github.com/Overclock-Validator/mithril/pkg/global" "github.com/Overclock-Validator/mithril/pkg/mlog" "github.com/Overclock-Validator/mithril/pkg/rewards" - "github.com/Overclock-Validator/mithril/pkg/rpcclient" "github.com/Overclock-Validator/mithril/pkg/sealevel" - "github.com/Overclock-Validator/mithril/pkg/snapshot" + "github.com/Overclock-Validator/mithril/pkg/state" bin "github.com/gagliardetto/binary" "github.com/gagliardetto/solana-go" - "github.com/panjf2000/ants/v2" ) type ReplayCtx struct { @@ -35,10 +35,10 @@ type ReplayCtx struct { // newReplayCtx creates a new ReplayCtx, preferring values from resumeState if available. // This ensures resume uses fresh values instead of potentially stale manifest data. -func newReplayCtx(snapshotManifest *snapshot.SnapshotManifest, resumeState *ResumeState) *ReplayCtx { +func newReplayCtx(mithrilState *state.MithrilState, resumeState *ResumeState) (*ReplayCtx, error) { epochCtx := new(ReplayCtx) - // Prefer resume state if available (has non-zero capitalization) + // Priority 1: Resume state (has most recent values) if resumeState != nil && resumeState.Capitalization > 0 { epochCtx.Capitalization = resumeState.Capitalization epochCtx.SlotsPerYear = resumeState.SlotsPerYear @@ -49,22 +49,107 @@ func newReplayCtx(snapshotManifest *snapshot.SnapshotManifest, resumeState *Resu FoundationVal: resumeState.InflationFoundation, FoundationTerm: resumeState.InflationFoundationTerm, } + } else if mithrilState != nil && mithrilState.ManifestCapitalization > 0 { + // Priority 2: State file manifest_* fields (fresh start) + epochCtx.Capitalization = mithrilState.ManifestCapitalization + epochCtx.SlotsPerYear = mithrilState.ManifestSlotsPerYear + epochCtx.Inflation = rewards.Inflation{ + Initial: mithrilState.ManifestInflationInitial, + Terminal: mithrilState.ManifestInflationTerminal, + Taper: mithrilState.ManifestInflationTaper, + FoundationVal: mithrilState.ManifestInflationFoundation, + FoundationTerm: mithrilState.ManifestInflationFoundationTerm, + } } else { - // Fallback to manifest (fresh start) - epochCtx.Capitalization = snapshotManifest.Bank.Capitalization - epochCtx.Inflation = snapshotManifest.Bank.Inflation - epochCtx.SlotsPerYear = snapshotManifest.Bank.SlotsPerYear + return nil, fmt.Errorf("state file missing manifest_capitalization - delete AccountsDB and rebuild from snapshot") + } + + // Epoch account hash from state file (required) + if mithrilState != nil && mithrilState.ManifestEpochAcctsHash != "" { + epochAcctsHash, err := base64.StdEncoding.DecodeString(mithrilState.ManifestEpochAcctsHash) + if err != nil { + return nil, fmt.Errorf("corrupted state file: failed to decode manifest_epoch_accts_hash: %w", err) + } + if len(epochAcctsHash) == 32 { + epochCtx.HasEpochAcctsHash = true + epochCtx.EpochAcctsHash = epochAcctsHash + } } + // Note: epoch account hash may be empty for snapshots before SIMD-0160 + + return epochCtx, nil +} + +// BoundaryStakeScanResult holds all accumulated data from a single streaming pass +// over stake accounts at the epoch boundary, serving both stake history update +// and epoch stakes/vote cache refresh. +type BoundaryStakeScanResult struct { + // For stake history update + StakeHistoryEffective uint64 + StakeHistoryActivating uint64 + StakeHistoryDeactivating uint64 + // For epoch stakes + vote cache + VoteAcctStakes map[solana.PublicKey]uint64 + EffectiveStakes map[solana.PublicKey]uint64 + TotalEffectiveStake uint64 +} + +// scanStakesForEpochBoundary performs a single streaming pass over all stake accounts, +// accumulating data needed by both updateStakeHistorySysvar and updateEpochStakesAndRefreshVoteCache. +func scanStakesForEpochBoundary(acctsDb *accountsdb.AccountsDb, slot uint64, targetEpoch uint64, leaderScheduleEpoch uint64, stakeHistory *sealevel.SysvarStakeHistory, epochSchedule *sealevel.SysvarEpochSchedule, f *features.Features) *BoundaryStakeScanResult { + newRateActivationEpoch := newWarmupCooldownRateEpoch(epochSchedule, f) + + // Stake history accumulators (atomic — high contention from worker pool) + var shEffective atomic.Uint64 + var shActivating atomic.Uint64 + var shDeactivating atomic.Uint64 - if snapshotManifest.EpochAccountHash != [32]byte{} { - epochCtx.HasEpochAcctsHash = true - epochCtx.EpochAcctsHash = snapshotManifest.EpochAccountHash[:] + // Epoch stakes accumulators (mutex-guarded maps) + voteAcctStakes := make(map[solana.PublicKey]uint64) + var voteAcctStakesMu sync.Mutex + effectiveStakes := make(map[solana.PublicKey]uint64) + var effectiveStakesMu sync.Mutex + var totalEffectiveStake atomic.Uint64 + + _, err := global.StreamStakeAccounts(acctsDb, slot, + func(pk solana.PublicKey, delegation *sealevel.Delegation, creditsObs uint64) { + // --- Stake history accumulation --- + if delegation.StakeLamports > 0 { + entry := delegation.StakeActivatingAndDeactivating(targetEpoch, stakeHistory, newRateActivationEpoch) + shEffective.Add(entry.Effective) + shActivating.Add(entry.Activating) + shDeactivating.Add(entry.Deactivating) + } + + // --- Epoch stakes accumulation --- + voteAcctStakesMu.Lock() + voteAcctStakes[delegation.VoterPubkey] += delegation.StakeLamports + voteAcctStakesMu.Unlock() + + effectiveStake := delegation.Stake(leaderScheduleEpoch, stakeHistory, newRateActivationEpoch) + if effectiveStake > 0 { + effectiveStakesMu.Lock() + effectiveStakes[delegation.VoterPubkey] += effectiveStake + effectiveStakesMu.Unlock() + totalEffectiveStake.Add(effectiveStake) + } + }) + if err != nil { + panic(fmt.Sprintf("error scanning stake accounts at epoch boundary: %s", err)) } - return epochCtx + return &BoundaryStakeScanResult{ + StakeHistoryEffective: shEffective.Load(), + StakeHistoryActivating: shActivating.Load(), + StakeHistoryDeactivating: shDeactivating.Load(), + VoteAcctStakes: voteAcctStakes, + EffectiveStakes: effectiveStakes, + TotalEffectiveStake: totalEffectiveStake.Load(), + } } -func updateStakeHistorySysvar(acctsDb *accountsdb.AccountsDb, block *block.Block, prevSlotCtx *sealevel.SlotCtx, targetEpoch uint64, epochSchedule *sealevel.SysvarEpochSchedule, f *features.Features) *sealevel.SysvarStakeHistory { +// updateStakeHistorySysvar applies pre-computed stake history data to the sysvar. +func updateStakeHistorySysvar(acctsDb *accountsdb.AccountsDb, block *block.Block, prevSlotCtx *sealevel.SlotCtx, targetEpoch uint64, scanResult *BoundaryStakeScanResult) *sealevel.SysvarStakeHistory { stakeHistoryAcct, err := prevSlotCtx.GetAccount(sealevel.SysvarStakeHistoryAddr) if err != nil { stakeHistoryAcct, err = acctsDb.GetAccount(prevSlotCtx.Slot, sealevel.SysvarStakeHistoryAddr) @@ -78,41 +163,10 @@ func updateStakeHistorySysvar(acctsDb *accountsdb.AccountsDb, block *block.Block var stakeHistory sealevel.SysvarStakeHistory stakeHistory.MustUnmarshalWithDecoder(decoder) - newRateActivationEpoch := newWarmupCooldownRateEpoch(epochSchedule, f) - - var wg sync.WaitGroup - var effective atomic.Uint64 - var activating atomic.Uint64 - var deactivating atomic.Uint64 - - workerPool, _ := ants.NewPoolWithFunc(runtime.GOMAXPROCS(0)*8, func(i interface{}) { - defer wg.Done() - - delegation := i.(*sealevel.Delegation) - if delegation.StakeLamports == 0 { - return - } - - stakeHistoryEntry := delegation.StakeActivatingAndDeactivating(targetEpoch, &stakeHistory, newRateActivationEpoch) - - effective.Add(stakeHistoryEntry.Effective) - activating.Add(stakeHistoryEntry.Activating) - deactivating.Add(stakeHistoryEntry.Deactivating) - }) - - for _, delegation := range global.StakeCache() { - wg.Add(1) - workerPool.Invoke(delegation) - } - - wg.Wait() - workerPool.Release() - ants.Release() - var accumulatorStakeHistoryEntry sealevel.StakeHistoryEntry - accumulatorStakeHistoryEntry.Activating = activating.Load() - accumulatorStakeHistoryEntry.Effective = effective.Load() - accumulatorStakeHistoryEntry.Deactivating = deactivating.Load() + accumulatorStakeHistoryEntry.Effective = scanResult.StakeHistoryEffective + accumulatorStakeHistoryEntry.Activating = scanResult.StakeHistoryActivating + accumulatorStakeHistoryEntry.Deactivating = scanResult.StakeHistoryDeactivating stakeHistory.Update(targetEpoch, accumulatorStakeHistoryEntry) buf := new(bytes.Buffer) @@ -130,7 +184,16 @@ func updateStakeHistorySysvar(acctsDb *accountsdb.AccountsDb, block *block.Block return &stakeHistory } -func handleEpochTransition(acctsDb *accountsdb.AccountsDb, rpcc *rpcclient.RpcClient, rpcBackups []string, partitionedEpochRewards bool, prevSlotCtx *sealevel.SlotCtx, replayCtx *ReplayCtx, epochSchedule *sealevel.SysvarEpochSchedule, f *features.Features, block *block.Block, epoch uint64) *rewards.PartitionedRewardDistributionInfo { +func handleEpochTransition(acctsDb *accountsdb.AccountsDb, partitionedEpochRewards bool, prevSlotCtx *sealevel.SlotCtx, replayCtx *ReplayCtx, epochSchedule *sealevel.SysvarEpochSchedule, f *features.Features, block *block.Block, epoch uint64) *rewards.PartitionedRewardDistributionInfo { + // Flush any pending stake pubkeys to the index file before scanning. + // The async StoreAccounts callback from the previous block may not have + // run yet, so flush here to ensure the index is complete for the scan. + acctsDbDir := filepath.Join(acctsDb.AcctsDir, "..") + if _, err := global.FlushPendingStakePubkeys(acctsDbDir); err != nil { + mlog.Log.Errorf("failed to flush stake pubkeys before epoch scan: %v", err) + } + + // Load stake history (used by both scan and rewards) var stakeHistory sealevel.SysvarStakeHistory stakeHistoryAcct, err := prevSlotCtx.GetAccount(sealevel.SysvarStakeHistoryAddr) if err != nil { @@ -145,9 +208,15 @@ func handleEpochTransition(acctsDb *accountsdb.AccountsDb, rpcc *rpcclient.RpcCl var partitionedRewardsInfo *rewards.PartitionedRewardDistributionInfo newEpoch := epoch + 1 firstSlotInEpoch := epochSchedule.FirstSlotInEpoch(newEpoch) - leaderScheduleEpoch := epochSchedule.LeaderScheduleEpoch(block.Slot) - updateEpochStakesAndRefreshVoteCache(leaderScheduleEpoch, block, epochSchedule, f, acctsDb, prevSlotCtx.Slot) + + // Single streaming pass for both stake history and epoch stakes + t0 := time.Now() + scanResult := scanStakesForEpochBoundary(acctsDb, prevSlotCtx.Slot, epoch, leaderScheduleEpoch, &stakeHistory, epochSchedule, f) + t1 := time.Now() + + updateEpochStakesAndRefreshVoteCache(leaderScheduleEpoch, block, acctsDb, prevSlotCtx.Slot, scanResult) + t2 := time.Now() if global.ManageLeaderSchedule() { if len(global.EpochStakesVoteAccts(newEpoch)) > 0 { @@ -166,101 +235,59 @@ func handleEpochTransition(acctsDb *accountsdb.AccountsDb, rpcc *rpcclient.RpcCl panic(fmt.Sprintf("couldn't find leader for slot %d at epoch boundary", block.Slot)) } } + t3 := time.Now() if partitionedEpochRewards { - partitionedRewardsInfo, block.EpochUpdatedAccts, block.ParentEpochUpdatedAccts = beginPartitionedEpochRewardsDistribution(acctsDb, prevSlotCtx, &stakeHistory, replayCtx, epochSchedule, rpcc, rpcBackups, block, f, newEpoch, firstSlotInEpoch) + partitionedRewardsInfo, block.EpochUpdatedAccts, block.ParentEpochUpdatedAccts = beginPartitionedEpochRewardsDistribution(acctsDb, prevSlotCtx, &stakeHistory, replayCtx, epochSchedule, block, f, newEpoch, firstSlotInEpoch) } else { panic("only partitioned rewards supported") } + t4 := time.Now() - updateStakeHistorySysvar(acctsDb, block, prevSlotCtx, epoch, epochSchedule, f) - mlog.Log.Infof("epoch transition %d -> %d done.", epoch, newEpoch) - - return partitionedRewardsInfo -} - -type epochStakesVoteAcctData struct { - nodePubkey solana.PublicKey - stake atomic.Uint64 -} + updateStakeHistorySysvar(acctsDb, block, prevSlotCtx, epoch, scanResult) + t5 := time.Now() -type epochStakesBuilder struct { - mu sync.Mutex - epoch uint64 - epochStakesMap map[solana.PublicKey]*epochStakesVoteAcctData - totalStake atomic.Uint64 -} - -func newEpochStakesBuilder(epoch uint64, voteCache map[solana.PublicKey]*sealevel.VoteStateVersions) *epochStakesBuilder { - epochStakesMap := make(map[solana.PublicKey]*epochStakesVoteAcctData, len(voteCache)) - for votePk, voteAcct := range voteCache { - epochStakesMap[votePk] = &epochStakesVoteAcctData{nodePubkey: voteAcct.NodePubkey()} + // Compact stake index at epoch boundary — removes duplicates from appends + if err := global.CompactStakePubkeyIndex(acctsDbDir); err != nil { + mlog.Log.Errorf("failed to compact stake pubkey index: %v", err) } - return &epochStakesBuilder{epoch: epoch, epochStakesMap: epochStakesMap} -} -func (esb *epochStakesBuilder) AddStakeForVoteAcct(voteAcct solana.PublicKey, stake uint64) { - info := esb.epochStakesMap[voteAcct] - info.stake.Add(stake) - esb.totalStake.Add(stake) -} + mlog.Log.Infof("Timing: scan=%.1fs epochStakes=%.1fs leaderSched=%.1fs rewards=%.1fs stakeHistory=%.1fs total=%.1fs", + t1.Sub(t0).Seconds(), t2.Sub(t1).Seconds(), t3.Sub(t2).Seconds(), + t4.Sub(t3).Seconds(), t5.Sub(t4).Seconds(), t5.Sub(t0).Seconds()) + mlog.Log.Infof("=======================") -func (esb *epochStakesBuilder) Finish() { - for voterPubkey, entry := range esb.epochStakesMap { - global.PutEpochStakesEntry(esb.epoch, voterPubkey, entry.stake.Load(), &epochstakes.VoteAccount{NodePubkey: entry.nodePubkey}) - } - global.PutEpochTotalStake(esb.epoch, esb.totalStake.Load()) -} - -func (esb *epochStakesBuilder) TotalEpochStake() uint64 { - return esb.totalStake.Load() + return partitionedRewardsInfo } -func updateEpochStakesAndRefreshVoteCache(leaderScheduleEpoch uint64, b *block.Block, epochSchedule *sealevel.SysvarEpochSchedule, f *features.Features, acctsDb *accountsdb.AccountsDb, slot uint64) { - stakes := global.StakeCache() - newRateActivationEpoch := newWarmupCooldownRateEpoch(epochSchedule, f) - - // Build vote pubkey set for vote cache refresh (only needs pubkeys, not effective stakes) - voteAcctStakes := make(map[solana.PublicKey]uint64) - for _, delegation := range stakes { - voteAcctStakes[delegation.VoterPubkey] += delegation.StakeLamports - } +// updateEpochStakesAndRefreshVoteCache applies pre-computed vote/effective stake data +// from the boundary scan to refresh the vote cache and store epoch stakes. +func updateEpochStakesAndRefreshVoteCache(leaderScheduleEpoch uint64, b *block.Block, acctsDb *accountsdb.AccountsDb, slot uint64, scanResult *BoundaryStakeScanResult) { + // Check if we need to compute epoch stakes (skip on resume) + hasEpochStakes := global.HasEpochStakes(leaderScheduleEpoch) // ALWAYS refresh vote cache from AccountsDB, even if HasEpochStakes is true // This ensures the vote cache has fresh NodePubkey for leader schedule - if err := RebuildVoteCacheFromAccountsDB(acctsDb, slot, voteAcctStakes, 0); err != nil { + if err := RebuildVoteCacheFromAccountsDB(acctsDb, slot, scanResult.VoteAcctStakes, 0); err != nil { mlog.Log.Errorf("failed to rebuild vote cache at epoch boundary: %v", err) } - // Skip epoch stakes calculation if already cached (resume) - hasEpochStakes := global.HasEpochStakes(leaderScheduleEpoch) + // Skip epoch stakes storage if already cached (resume) if hasEpochStakes { mlog.Log.Infof("already had EpochStakes for epoch %d", leaderScheduleEpoch) return } + // Store epoch stakes computed during scanning voteCache := global.VoteCache() - esb := newEpochStakesBuilder(leaderScheduleEpoch, voteCache) - var wg sync.WaitGroup - - workerPool, _ := ants.NewPoolWithFunc(runtime.GOMAXPROCS(0)*8, func(i interface{}) { - defer wg.Done() - - delegation := i.(*sealevel.Delegation) - _, exists := voteCache[delegation.VoterPubkey] + for votePk, stake := range scanResult.EffectiveStakes { + voteAcct, exists := voteCache[votePk] if exists { - effectiveStake := delegation.Stake(esb.epoch, sealevel.SysvarCache.StakeHistory.Sysvar, newRateActivationEpoch) - esb.AddStakeForVoteAcct(delegation.VoterPubkey, effectiveStake) + global.PutEpochStakesEntry(leaderScheduleEpoch, votePk, stake, &epochstakes.VoteAccount{NodePubkey: voteAcct.NodePubkey()}) } - }) - - for _, entry := range stakes { - wg.Add(1) - workerPool.Invoke(entry) } - wg.Wait() - esb.Finish() + global.PutEpochTotalStake(leaderScheduleEpoch, scanResult.TotalEffectiveStake) maps.Copy(b.EpochStakesPerVoteAcct, global.EpochStakes(leaderScheduleEpoch)) - b.TotalEpochStake = esb.TotalEpochStake() + b.TotalEpochStake = scanResult.TotalEffectiveStake } diff --git a/pkg/replay/rewards.go b/pkg/replay/rewards.go index 806d8aec..667d6d7a 100644 --- a/pkg/replay/rewards.go +++ b/pkg/replay/rewards.go @@ -3,29 +3,16 @@ package replay import ( "bytes" "fmt" - "math" - "sync/atomic" "github.com/Overclock-Validator/mithril/pkg/accounts" "github.com/Overclock-Validator/mithril/pkg/accountsdb" "github.com/Overclock-Validator/mithril/pkg/block" "github.com/Overclock-Validator/mithril/pkg/features" "github.com/Overclock-Validator/mithril/pkg/global" - "github.com/Overclock-Validator/mithril/pkg/mlog" - - //"github.com/Overclock-Validator/mithril/pkg/mlog" "github.com/Overclock-Validator/mithril/pkg/rewards" - "github.com/Overclock-Validator/mithril/pkg/rpcclient" "github.com/Overclock-Validator/mithril/pkg/sealevel" "github.com/Overclock-Validator/wide" bin "github.com/gagliardetto/binary" - "github.com/gagliardetto/solana-go" - "github.com/gagliardetto/solana-go/rpc" -) - -const ( - fnvOffset64 uint64 = 14695981039346656037 - fnvPrime64 uint64 = 1099511628211 ) func newWarmupCooldownRateEpoch(epochSchedule *sealevel.SysvarEpochSchedule, f *features.Features) *uint64 { @@ -37,151 +24,30 @@ func newWarmupCooldownRateEpoch(epochSchedule *sealevel.SysvarEpochSchedule, f * return &epoch } -func validateCalculatedValidatorRewards(rpcRewards []rpc.BlockReward, calculateRewards map[solana.PublicKey]*atomic.Uint64) { - for _, rpcReward := range rpcRewards { - if calculateRewards[rpcReward.Pubkey].Load() != uint64(rpcReward.Lamports) { - mlog.Log.Infof("reward for vote acct %s - rpc reward %d did not match calculated reward %d", rpcReward.Pubkey, rpcReward.Lamports, calculateRewards[rpcReward.Pubkey]) - } - } -} - -func logRewardSnapshots(epoch uint64, slot uint64, stakeCache map[solana.PublicKey]*sealevel.Delegation, voteCache map[solana.PublicKey]*sealevel.VoteStateVersions) { - stakeCount, stakeLamports, stakeSum, stakeXor := stakeSnapshotDigest(stakeCache) - voteCount, voteCredits, voteSum, voteXor := voteSnapshotDigest(voteCache) - - mlog.Log.Infof( - "stake & vote acct snapshots: epoch=%d slot=%d stake_accts=%d vote_accts=%d stake_lamports=%d stake_hash_sum=%016x stake_hash_xor=%016x vote_credits=%d vote_hash_sum=%016x vote_hash_xor=%016x", - epoch, - slot, - stakeCount, - voteCount, - stakeLamports, - stakeSum, - stakeXor, - voteCredits, - voteSum, - voteXor, - ) -} - -func stakeSnapshotDigest(stakeCache map[solana.PublicKey]*sealevel.Delegation) (int, uint64, uint64, uint64) { - var count int - var totalStake uint64 - var sum uint64 - var xor uint64 - - for stakePk, delegation := range stakeCache { - if delegation == nil { - continue - } - h := fnvOffset64 - h = fnv1a64Add(h, stakePk[:]) - h = fnv1a64Add(h, delegation.VoterPubkey[:]) - h = fnv1a64AddUint64(h, delegation.StakeLamports) - h = fnv1a64AddUint64(h, delegation.ActivationEpoch) - h = fnv1a64AddUint64(h, delegation.DeactivationEpoch) - h = fnv1a64AddUint64(h, math.Float64bits(delegation.WarmupCooldownRate)) - h = fnv1a64AddUint64(h, delegation.CreditsObserved) - - sum += h - xor ^= h - totalStake += delegation.StakeLamports - count++ - } - - return count, totalStake, sum, xor -} - -func voteSnapshotDigest(voteCache map[solana.PublicKey]*sealevel.VoteStateVersions) (int, uint64, uint64, uint64) { - var count int - var creditsSum uint64 - var sum uint64 - var xor uint64 - - for votePk, voteState := range voteCache { - if voteState == nil { - continue - } - - var commission byte - var epochCredits []sealevel.EpochCredits - switch voteState.Type { - case sealevel.VoteStateVersionCurrent: - commission = voteState.Current.Commission - epochCredits = voteState.Current.EpochCredits - case sealevel.VoteStateVersionV0_23_5: - commission = voteState.V0_23_5.Commission - epochCredits = voteState.V0_23_5.EpochCredits - case sealevel.VoteStateVersionV1_14_11: - commission = voteState.V1_14_11.Commission - epochCredits = voteState.V1_14_11.EpochCredits - default: - continue - } - - var last sealevel.EpochCredits - if len(epochCredits) != 0 { - last = epochCredits[len(epochCredits)-1] - creditsSum += last.Credits - } - - h := fnvOffset64 - h = fnv1a64Add(h, votePk[:]) - nodePk := voteState.NodePubkey() - h = fnv1a64Add(h, nodePk[:]) - h = fnv1a64AddUint64(h, uint64(voteState.Type)) - h = fnv1a64AddUint64(h, uint64(commission)) - h = fnv1a64AddUint64(h, uint64(len(epochCredits))) - h = fnv1a64AddUint64(h, last.Epoch) - h = fnv1a64AddUint64(h, last.Credits) - h = fnv1a64AddUint64(h, last.PrevCredits) - - sum += h - xor ^= h - count++ - } - - return count, creditsSum, sum, xor -} - -func fnv1a64Add(h uint64, b []byte) uint64 { - for _, c := range b { - h ^= uint64(c) - h *= fnvPrime64 - } - return h -} - -func fnv1a64AddUint64(h uint64, v uint64) uint64 { - for i := 0; i < 8; i++ { - h ^= uint64(byte(v)) - h *= fnvPrime64 - v >>= 8 - } - return h -} - -func beginPartitionedEpochRewardsDistribution(acctsDb *accountsdb.AccountsDb, slotCtx *sealevel.SlotCtx, stakeHistory *sealevel.SysvarStakeHistory, epochCtx *ReplayCtx, epochSchedule *sealevel.SysvarEpochSchedule, rpcc *rpcclient.RpcClient, rpcBackups []string, block *block.Block, f *features.Features, epoch uint64, slot uint64) (*rewards.PartitionedRewardDistributionInfo, []*accounts.Account, []*accounts.Account) { - partitionedRewardsInfo := rewards.DeterminePartitionedStakingRewardsInfo(rpcc, rpcBackups, epochSchedule, &epochCtx.Inflation, epochCtx.Capitalization, epoch, epoch-1, slot, epochCtx.SlotsPerYear, f) +func beginPartitionedEpochRewardsDistribution(acctsDb *accountsdb.AccountsDb, slotCtx *sealevel.SlotCtx, stakeHistory *sealevel.SysvarStakeHistory, epochCtx *ReplayCtx, epochSchedule *sealevel.SysvarEpochSchedule, block *block.Block, f *features.Features, epoch uint64, slot uint64) (*rewards.PartitionedRewardDistributionInfo, []*accounts.Account, []*accounts.Account) { + partitionedRewardsInfo := rewards.DeterminePartitionedStakingRewardsInfo(epochSchedule, &epochCtx.Inflation, epochCtx.Capitalization, epoch, epoch-1, slot, epochCtx.SlotsPerYear, f) totalRewards := partitionedRewardsInfo.TotalStakingRewards newWarmupCooldownRateEpoch := newWarmupCooldownRateEpoch(epochSchedule, f) - var points wide.Uint128 - var pointsPerStakeAcct map[solana.PublicKey]*rewards.CalculatedStakePoints - stakeCacheSnapshot := global.StakeCacheSnapshot() voteCacheSnapshot := global.VoteCacheSnapshot() - pointsPerStakeAcct, points = rewards.CalculateStakePoints(acctsDb, slotCtx, slot, stakeHistory, newWarmupCooldownRateEpoch, stakeCacheSnapshot, voteCacheSnapshot) - pointValue := rewards.PointValue{Rewards: totalRewards, Points: points} + pointValue := rewards.PointValue{Rewards: totalRewards, Points: wide.Uint128{}} + streamResult, streamErr := rewards.CalculateRewardsStreaming( + acctsDb, slot, stakeHistory, newWarmupCooldownRateEpoch, + voteCacheSnapshot, pointValue, epoch-1, slotCtx.Blockhash, slotCtx, f) + if streamErr != nil { + panic(fmt.Sprintf("streaming rewards calculation failed: %s", streamErr)) + } + + partitionedRewardsInfo.SpoolDir = streamResult.SpoolDir + partitionedRewardsInfo.SpoolSlot = streamResult.SpoolSlot + partitionedRewardsInfo.NumRewardPartitionsRemaining = streamResult.NumPartitions - var validatorRewards map[solana.PublicKey]*atomic.Uint64 - partitionedRewardsInfo.StakingRewards, validatorRewards, partitionedRewardsInfo.RewardPartitions = rewards.CalculateStakeRewardsAndPartitions(pointsPerStakeAcct, slotCtx, stakeHistory, slot, epoch-1, pointValue, newWarmupCooldownRateEpoch, slotCtx.Features, stakeCacheSnapshot, voteCacheSnapshot) - updatedAccts, parentUpdatedAccts, voteRewardsDistributed := rewards.DistributeVotingRewards(acctsDb, validatorRewards, slot) - partitionedRewardsInfo.NumRewardPartitionsRemaining = partitionedRewardsInfo.RewardPartitions.NumPartitions() + updatedAccts, parentUpdatedAccts, voteRewardsDistributed := rewards.DistributeVotingRewards(acctsDb, streamResult.ValidatorRewards, slot) newEpochRewards := sealevel.SysvarEpochRewards{DistributionStartingBlockHeight: block.BlockHeight + 1, - NumPartitions: partitionedRewardsInfo.NumRewardPartitionsRemaining, ParentBlockhash: block.LastBlockhash, - TotalRewards: totalRewards, DistributedRewards: voteRewardsDistributed, TotalPoints: points, Active: true} + NumPartitions: streamResult.NumPartitions, ParentBlockhash: block.LastBlockhash, + TotalRewards: totalRewards, DistributedRewards: voteRewardsDistributed, TotalPoints: streamResult.TotalPoints, Active: true} epochRewardsAcct, err := acctsDb.GetAccount(slot, sealevel.SysvarEpochRewardsAddr) if err != nil { @@ -219,16 +85,7 @@ func distributePartitionedEpochRewardsForSlot(acctsDb *accountsdb.AccountsDb, ep partitionIdx := currentBlockHeight - epochRewards.DistributionStartingBlockHeight - // Initialize shared worker pool on first partition (reused across all 243 partitions) - if partitionedEpochRewardsInfo.WorkerPool == nil { - if err := partitionedEpochRewardsInfo.InitWorkerPool(); err != nil { - panic(fmt.Sprintf("unable to initialize reward distribution worker pool: %s", err)) - } - } - - // Set flag to prevent stake account cache pollution during one-shot reward reads/writes - acctsDb.InRewardsWindow.Store(true) - distributedAccts, parentDistributedAccts, distributedLamports := rewards.DistributeStakingRewardsForPartition(acctsDb, partitionedEpochRewardsInfo.RewardPartitions.Partition(partitionIdx), partitionedEpochRewardsInfo.StakingRewards, currentSlot, partitionedEpochRewardsInfo.WorkerPool) + distributedAccts, parentDistributedAccts, distributedLamports := rewards.DistributeStakingRewardsFromSpool(acctsDb, partitionedEpochRewardsInfo.SpoolDir, partitionedEpochRewardsInfo.SpoolSlot, partitionIdx, currentSlot) parentDistributedAccts = append(parentDistributedAccts, epochRewardsAcct.Clone()) epochRewards.Distribute(distributedLamports) @@ -236,8 +93,7 @@ func distributePartitionedEpochRewardsForSlot(acctsDb *accountsdb.AccountsDb, ep if partitionedEpochRewardsInfo.NumRewardPartitionsRemaining == 0 { epochRewards.Active = false - acctsDb.InRewardsWindow.Store(false) - partitionedEpochRewardsInfo.ReleaseWorkerPool() + rewards.CleanupPartitionedSpoolFiles(partitionedEpochRewardsInfo.SpoolDir, partitionedEpochRewardsInfo.SpoolSlot, epochRewards.NumPartitions) } writer := new(bytes.Buffer) diff --git a/pkg/replay/transaction.go b/pkg/replay/transaction.go index 1cb6e32e..f62a1efa 100644 --- a/pkg/replay/transaction.go +++ b/pkg/replay/transaction.go @@ -189,15 +189,9 @@ func recordStakeDelegation(acct *accounts.Account) { isUninitialized = acctType == sealevel.StakeStateV2StatusUninitialized } - if isEmpty || isUninitialized { - global.DeleteStakeCacheItem(acct.Key) - } else { - stakeState, err := sealevel.UnmarshalStakeState(acct.Data) - if err == nil { - delegation := stakeState.Stake.Stake.Delegation - delegation.CreditsObserved = stakeState.Stake.Stake.CreditsObserved - global.PutStakeCacheItem(acct.Key, &delegation) - } + if !isEmpty && !isUninitialized { + // Enqueue pubkey for index append so StreamStakeAccounts sees new stake accounts + global.EnqueuePendingStakePubkey(acct.Key) } } diff --git a/pkg/rewards/partitions.go b/pkg/rewards/partitions.go deleted file mode 100644 index ad1e6fc0..00000000 --- a/pkg/rewards/partitions.go +++ /dev/null @@ -1,46 +0,0 @@ -package rewards - -import ( - "sync" - - "github.com/gagliardetto/solana-go" -) - -type Partition struct { - pubkeys []solana.PublicKey - mu sync.Mutex - partitionIdx uint64 -} - -type Partitions []*Partition - -func NewPartitions(numPartitions uint64) Partitions { - p := make(Partitions, numPartitions) - for i := uint64(0); i < numPartitions; i++ { - p[i] = &Partition{pubkeys: make([]solana.PublicKey, 0, 2000), partitionIdx: i} - } - return p -} - -func (partitions Partitions) AddPubkey(partitionIdx uint64, pk solana.PublicKey) { - prt := partitions[partitionIdx] - prt.mu.Lock() - prt.pubkeys = append(prt.pubkeys, pk) - prt.mu.Unlock() -} - -func (partitions Partitions) Partition(partitionIdx uint64) *Partition { - return partitions[partitionIdx] -} - -func (partitions Partitions) NumPartitions() uint64 { - return uint64(len(partitions)) -} - -func (partition *Partition) NumPubkeys() uint64 { - return uint64(len(partition.pubkeys)) -} - -func (partition *Partition) Pubkeys() []solana.PublicKey { - return partition.pubkeys -} diff --git a/pkg/rewards/points.go b/pkg/rewards/points.go deleted file mode 100644 index 4221a076..00000000 --- a/pkg/rewards/points.go +++ /dev/null @@ -1,38 +0,0 @@ -package rewards - -import ( - "github.com/Overclock-Validator/wide" - "github.com/gagliardetto/solana-go" -) - -type CalculatedStakePointsAccumulator struct { - pubkeys []solana.PublicKey - pointsMap map[solana.PublicKey]*CalculatedStakePoints -} - -func NewCalculatedStakePointsAccumulator(pubkeys []solana.PublicKey) *CalculatedStakePointsAccumulator { - stakePointStructs := make([]CalculatedStakePoints, len(pubkeys)) - accum := &CalculatedStakePointsAccumulator{pubkeys: pubkeys, pointsMap: make(map[solana.PublicKey]*CalculatedStakePoints, len(pubkeys))} - for i, pk := range pubkeys { - accum.pointsMap[pk] = &stakePointStructs[i] - } - return accum -} - -func (accum *CalculatedStakePointsAccumulator) Add(pk solana.PublicKey, points CalculatedStakePoints) { - accum.pointsMap[pk].Points = points.Points - accum.pointsMap[pk].NewCreditsObserved = points.NewCreditsObserved - accum.pointsMap[pk].ForceCreditsUpdateWithSkippedReward = points.ForceCreditsUpdateWithSkippedReward -} - -func (accum CalculatedStakePointsAccumulator) TotalPoints() wide.Uint128 { - var totalPoints wide.Uint128 - for _, pk := range accum.pubkeys { - totalPoints = totalPoints.Add(accum.pointsMap[pk].Points) - } - return totalPoints -} - -func (accum CalculatedStakePointsAccumulator) CalculatedStakePoints() map[solana.PublicKey]*CalculatedStakePoints { - return accum.pointsMap -} diff --git a/pkg/rewards/rewards.go b/pkg/rewards/rewards.go index df72efc4..d683e863 100644 --- a/pkg/rewards/rewards.go +++ b/pkg/rewards/rewards.go @@ -2,7 +2,9 @@ package rewards import ( "fmt" + "io" "math" + "path/filepath" "runtime" "sort" "sync" @@ -12,13 +14,12 @@ import ( "github.com/Overclock-Validator/mithril/pkg/accountsdb" "github.com/Overclock-Validator/mithril/pkg/features" "github.com/Overclock-Validator/mithril/pkg/global" - "github.com/Overclock-Validator/mithril/pkg/rpcclient" + "github.com/Overclock-Validator/mithril/pkg/mlog" "github.com/Overclock-Validator/mithril/pkg/safemath" "github.com/Overclock-Validator/mithril/pkg/sealevel" "github.com/Overclock-Validator/wide" "github.com/dgryski/go-sip13" "github.com/gagliardetto/solana-go" - "github.com/gagliardetto/solana-go/rpc" "github.com/panjf2000/ants/v2" ) @@ -33,90 +34,8 @@ type PartitionedRewardDistributionInfo struct { TotalStakingRewards uint64 FirstStakingRewardSlot uint64 NumRewardPartitionsRemaining uint64 - Credits map[solana.PublicKey]CalculatedStakePoints - RewardPartitions Partitions - StakingRewards map[solana.PublicKey]*CalculatedStakeRewards - WorkerPool *ants.PoolWithFunc -} - -// rewardDistributionTask carries all context needed for processing one stake account. -// Used with the shared worker pool to avoid per-partition pool creation overhead. -type rewardDistributionTask struct { - acctsDb *accountsdb.AccountsDb - slot uint64 - stakingRewards map[solana.PublicKey]*CalculatedStakeRewards - accts []*accounts.Account - parentAccts []*accounts.Account - distributedLamports *atomic.Uint64 - wg *sync.WaitGroup - idx int - pubkey solana.PublicKey -} - -// rewardDistributionWorker is the shared worker function for stake reward distribution. -func rewardDistributionWorker(i interface{}) { - task := i.(*rewardDistributionTask) - defer task.wg.Done() - - reward, ok := task.stakingRewards[task.pubkey] - if !ok { - return - } - - stakeAcct, err := task.acctsDb.GetAccount(task.slot, task.pubkey) - if err != nil { - panic(fmt.Sprintf("unable to get acct %s from acctsdb for partitioned epoch rewards distribution in slot %d", task.pubkey, task.slot)) - } - task.parentAccts[task.idx] = stakeAcct.Clone() - - stakeState, err := sealevel.UnmarshalStakeState(stakeAcct.Data) - if err != nil { - return - } - - stakeState.Stake.Stake.CreditsObserved = reward.NewCreditsObserved - stakeState.Stake.Stake.Delegation.StakeLamports = safemath.SaturatingAddU64(stakeState.Stake.Stake.Delegation.StakeLamports, uint64(reward.StakerRewards)) - - err = sealevel.MarshalStakeStakeInto(stakeState, stakeAcct.Data) - if err != nil { - panic(fmt.Sprintf("unable to serialize new stake account state in distributing partitioned rewards: %s", err)) - } - - stakeAcct.Lamports, err = safemath.CheckedAddU64(stakeAcct.Lamports, uint64(reward.StakerRewards)) - if err != nil { - panic(fmt.Sprintf("overflow in partitioned epoch rewards distribution in slot %d to acct %s: %s", task.slot, task.pubkey, err)) - } - - task.accts[task.idx] = stakeAcct - task.distributedLamports.Add(reward.StakerRewards) - - // update the stake cache - delegationToCache := stakeState.Stake.Stake.Delegation - delegationToCache.CreditsObserved = stakeState.Stake.Stake.CreditsObserved - global.PutStakeCacheItem(task.pubkey, &delegationToCache) -} - -// InitWorkerPool creates the shared worker pool for reward distribution. -// Call once at the start of partitioned rewards, before processing any partition. -func (info *PartitionedRewardDistributionInfo) InitWorkerPool() error { - if info.WorkerPool != nil { - return nil - } - size := runtime.GOMAXPROCS(0) * 8 - pool, err := ants.NewPoolWithFunc(size, rewardDistributionWorker) - if err != nil { - return err - } - info.WorkerPool = pool - return nil -} - -// ReleaseWorkerPool releases the shared pool. Call when NumRewardPartitionsRemaining == 0. -func (info *PartitionedRewardDistributionInfo) ReleaseWorkerPool() { - if info.WorkerPool != nil { - info.WorkerPool.Release() - info.WorkerPool = nil - } + SpoolDir string + SpoolSlot uint64 } type CalculatedStakePoints struct { @@ -179,19 +98,13 @@ func IsWithinRewardsPeriod(epoch uint64, slot uint64, epochSchedule *sealevel.Sy } } -// DeterminePartitionedStakingRewardsInfo fetches reward partition info from RPC with failover support. -// It tries the primary RPC first with retries, then falls back to backup endpoints. -func DeterminePartitionedStakingRewardsInfo(rpcc *rpcclient.RpcClient, rpcBackups []string, epochSchedule *sealevel.SysvarEpochSchedule, inflation *Inflation, prevEpochCapitalization uint64, epoch uint64, prevEpoch uint64, slot uint64, slotsPerYear float64, f *features.Features) *PartitionedRewardDistributionInfo { +// DeterminePartitionedStakingRewardsInfo calculates the total staking rewards for the epoch. +func DeterminePartitionedStakingRewardsInfo(epochSchedule *sealevel.SysvarEpochSchedule, inflation *Inflation, prevEpochCapitalization uint64, epoch uint64, prevEpoch uint64, slot uint64, slotsPerYear float64, f *features.Features) *PartitionedRewardDistributionInfo { firstSlotInEpoch := epochSchedule.FirstSlotInEpoch(epoch) totalStakingRewards := CalculatePreviousEpochInflationRewards(epochSchedule, inflation, prevEpochCapitalization, epoch, prevEpoch, slotsPerYear, f) return &PartitionedRewardDistributionInfo{TotalStakingRewards: totalStakingRewards, FirstStakingRewardSlot: firstSlotInEpoch + 1} } -type idxAndReward struct { - idx int - reward rpc.BlockReward -} - type idxAndRewardNew struct { idx int reward uint64 @@ -253,33 +166,80 @@ func DistributeVotingRewards(acctsDb *accountsdb.AccountsDb, validatorRewards ma return updatedAccts, parentUpdatedAccts, totalVotingRewards.Load() } -func DistributeStakingRewardsForPartition(acctsDb *accountsdb.AccountsDb, partition *Partition, stakingRewards map[solana.PublicKey]*CalculatedStakeRewards, slot uint64, workerPool *ants.PoolWithFunc) ([]*accounts.Account, []*accounts.Account, uint64) { +func DistributeStakingRewardsFromSpool(acctsDb *accountsdb.AccountsDb, spoolDir string, spoolSlot uint64, partitionIdx uint64, currentSlot uint64) ([]*accounts.Account, []*accounts.Account, uint64) { + reader, err := NewPartitionReader(spoolDir, spoolSlot, uint32(partitionIdx)) + if err != nil { + panic(fmt.Sprintf("unable to open partition %d spool for distribution: %s", partitionIdx, err)) + } + if reader == nil { + return nil, nil, 0 + } + defer reader.Close() + + var records []*SpoolRecord + for { + rec, err := reader.Next() + if err == io.EOF { + break + } + if err != nil { + panic(fmt.Sprintf("unable to read partition %d spool record: %s", partitionIdx, err)) + } + records = append(records, rec) + } + + if len(records) == 0 { + return nil, nil, 0 + } + + accts := make([]*accounts.Account, len(records)) + parentAccts := make([]*accounts.Account, len(records)) var distributedLamports atomic.Uint64 - accts := make([]*accounts.Account, partition.NumPubkeys()) - parentAccts := make([]*accounts.Account, partition.NumPubkeys()) var wg sync.WaitGroup + workerPool, _ := ants.NewPoolWithFunc(runtime.GOMAXPROCS(0)*8, func(i interface{}) { + defer wg.Done() + idx := i.(int) + rec := records[idx] + + stakeAcct, err := acctsDb.GetAccount(currentSlot, rec.StakePubkey) + if err != nil { + panic(fmt.Sprintf("unable to get acct %s from acctsdb for spool distribution in slot %d", rec.StakePubkey, currentSlot)) + } + parentAccts[idx] = stakeAcct.Clone() + + stakeState, err := sealevel.UnmarshalStakeState(stakeAcct.Data) + if err != nil { + return + } + + stakeState.Stake.Stake.CreditsObserved = rec.CreditsObserved + stakeState.Stake.Stake.Delegation.StakeLamports = safemath.SaturatingAddU64(stakeState.Stake.Stake.Delegation.StakeLamports, rec.RewardLamports) - for idx, stakePk := range partition.Pubkeys() { - task := &rewardDistributionTask{ - acctsDb: acctsDb, - slot: slot, - stakingRewards: stakingRewards, - accts: accts, - parentAccts: parentAccts, - distributedLamports: &distributedLamports, - wg: &wg, - idx: idx, - pubkey: stakePk, + err = sealevel.MarshalStakeStakeInto(stakeState, stakeAcct.Data) + if err != nil { + panic(fmt.Sprintf("unable to serialize stake state in spool distribution: %s", err)) + } + + stakeAcct.Lamports, err = safemath.CheckedAddU64(stakeAcct.Lamports, rec.RewardLamports) + if err != nil { + panic(fmt.Sprintf("overflow in spool distribution in slot %d to acct %s: %s", currentSlot, rec.StakePubkey, err)) } + + accts[idx] = stakeAcct + distributedLamports.Add(rec.RewardLamports) + }) + + for idx := range records { wg.Add(1) - workerPool.Invoke(task) + workerPool.Invoke(idx) } wg.Wait() + workerPool.Release() - err := acctsDb.StoreAccounts(accts, slot, nil) + err = acctsDb.StoreAccounts(accts, currentSlot, nil) if err != nil { - panic(fmt.Sprintf("error updating accounts for partitioned epoch rewards in slot %d: %s", slot, err)) + panic(fmt.Sprintf("error updating accounts for spool distribution in slot %d: %s", currentSlot, err)) } return accts, parentAccts, distributedLamports.Load() @@ -322,77 +282,6 @@ type CalculatedStakeRewards struct { NewCreditsObserved uint64 } -func CalculateStakeRewardsAndPartitions(pointsPerStakeAcct map[solana.PublicKey]*CalculatedStakePoints, slotCtx *sealevel.SlotCtx, stakeHistory *sealevel.SysvarStakeHistory, slot uint64, rewardedEpoch uint64, pointValue PointValue, newRateActivationEpoch *uint64, f *features.Features, stakeCache map[solana.PublicKey]*sealevel.Delegation, voteCache map[solana.PublicKey]*sealevel.VoteStateVersions) (map[solana.PublicKey]*CalculatedStakeRewards, map[solana.PublicKey]*atomic.Uint64, Partitions) { - stakeInfoResults := make(map[solana.PublicKey]*CalculatedStakeRewards, 1500000) - validatorRewards := make(map[solana.PublicKey]*atomic.Uint64, 2000) - - minimumStakeDelegation := minimumStakeDelegation(slotCtx) - - var stakeMu sync.Mutex - var wg sync.WaitGroup - - workerPool, _ := ants.NewPoolWithFunc(runtime.GOMAXPROCS(0)*8, func(i interface{}) { - defer wg.Done() - - delegation := i.(*delegationAndPubkey) - - if delegation.delegation.StakeLamports < minimumStakeDelegation { - return - } - - voterPk := delegation.delegation.VoterPubkey - voteStateVersioned := voteCache[voterPk] - if voteStateVersioned == nil { - return - } - - pointsForStakeAcct := pointsPerStakeAcct[delegation.pubkey] - calculatedStakeRewards := CalculateStakeRewardsForAcct(delegation.pubkey, pointsForStakeAcct, delegation.delegation, voteStateVersioned, rewardedEpoch, pointValue, newRateActivationEpoch) - if calculatedStakeRewards != nil { - stakeMu.Lock() - stakeInfoResults[delegation.pubkey] = calculatedStakeRewards - stakeMu.Unlock() - - validatorRewards[voterPk].Add(calculatedStakeRewards.VoterRewards) - } - }) - - for _, delegation := range stakeCache { - _, exists := validatorRewards[delegation.VoterPubkey] - if !exists { - validatorRewards[delegation.VoterPubkey] = &atomic.Uint64{} - } - } - - for pk, delegation := range stakeCache { - d := &delegationAndPubkey{delegation: delegation, pubkey: pk} - wg.Add(1) - workerPool.Invoke(d) - } - wg.Wait() - workerPool.Release() - - numRewardPartitions := CalculateNumRewardPartitions(uint64(len(stakeInfoResults))) - partitions := NewPartitions(numRewardPartitions) - - partitionCalcWorkerPool, _ := ants.NewPoolWithFunc(runtime.GOMAXPROCS(0)*8, func(i interface{}) { - defer wg.Done() - - stakePk := i.(solana.PublicKey) - idx := CalculateRewardPartitionForPubkey(stakePk, slotCtx.Blockhash, numRewardPartitions) - partitions.AddPubkey(idx, stakePk) - }) - - for stakePk := range stakeInfoResults { - wg.Add(1) - partitionCalcWorkerPool.Invoke(stakePk) - } - wg.Wait() - partitionCalcWorkerPool.Release() - - return stakeInfoResults, validatorRewards, partitions -} - func CalculateStakeRewardsForAcct(pubkey solana.PublicKey, stakePointsResult *CalculatedStakePoints, delegation *sealevel.Delegation, voteState *sealevel.VoteStateVersions, rewardedEpoch uint64, pointValue PointValue, newRateActivationEpoch *uint64) *CalculatedStakeRewards { if pointValue.Rewards == 0 || delegation.ActivationEpoch == rewardedEpoch { stakePointsResult.ForceCreditsUpdateWithSkippedReward = true @@ -483,62 +372,6 @@ func voteCommissionSplit(voteState *sealevel.VoteStateVersions, rewards uint64) return result } -type delegationAndPubkey struct { - delegation *sealevel.Delegation - pubkey solana.PublicKey -} - -func CalculateStakePoints( - acctsDb *accountsdb.AccountsDb, - slotCtx *sealevel.SlotCtx, - slot uint64, - stakeHistory *sealevel.SysvarStakeHistory, - newWarmupCooldownRateEpoch *uint64, - stakeCache map[solana.PublicKey]*sealevel.Delegation, - voteCache map[solana.PublicKey]*sealevel.VoteStateVersions, -) (map[solana.PublicKey]*CalculatedStakePoints, wide.Uint128) { - minimum := minimumStakeDelegation(slotCtx) - - n := len(stakeCache) - pks := make([]solana.PublicKey, 0, n) - for pk := range stakeCache { - pks = append(pks, pk) - } - - pointsAccum := NewCalculatedStakePointsAccumulator(pks) - var wg sync.WaitGroup - - size := runtime.GOMAXPROCS(0) * 8 - workerPool, _ := ants.NewPoolWithFunc(size, func(i interface{}) { - defer wg.Done() - - t := i.(*delegationAndPubkey) - d := t.delegation - if d.StakeLamports < minimum { - return - } - - voterPk := d.VoterPubkey - voteState := voteCache[voterPk] - if voteState == nil { - return - } - - pcs := calculateStakePointsAndCredits(t.pubkey, stakeHistory, d, voteState, newWarmupCooldownRateEpoch) - pointsAccum.Add(t.pubkey, pcs) - }) - - for pk, delegation := range stakeCache { - wg.Add(1) - workerPool.Invoke(&delegationAndPubkey{delegation: delegation, pubkey: pk}) - } - - wg.Wait() - workerPool.Release() - - return pointsAccum.CalculatedStakePoints(), pointsAccum.TotalPoints() -} - func calculateStakePointsAndCredits( pubkey solana.PublicKey, stakeHistory *sealevel.SysvarStakeHistory, @@ -623,3 +456,343 @@ func CalculateNumRewardPartitions(numStakingRewards uint64) uint64 { return numRewardPartitions } + +// StreamingRewardsResult holds the results from streaming rewards calculation. +type StreamingRewardsResult struct { + SpoolDir string // Base directory for per-partition spool files + SpoolSlot uint64 // Slot for spool file naming + TotalPoints wide.Uint128 + ValidatorRewards map[solana.PublicKey]*atomic.Uint64 + NumStakeRewards uint64 + NumPartitions uint64 +} + +// spoolWriteRequest is sent to the single-writer goroutine for spool writes. +type spoolWriteRequest struct { + record *SpoolRecord +} + +// CalculateRewardsStreaming performs a streaming calculation of stake rewards. +// Phase 1: Stream stakes to calculate total points + write points spool (single AccountsDB scan) +// Phase 2: Replay points spool to compute rewards + write temp spool (sequential file I/O only) +// Phase 3: Read temp spool, assign partitions, write per-partition spools +func CalculateRewardsStreaming( + acctsDb *accountsdb.AccountsDb, + slot uint64, + stakeHistory *sealevel.SysvarStakeHistory, + newWarmupCooldownRateEpoch *uint64, + voteCache map[solana.PublicKey]*sealevel.VoteStateVersions, + pointValue PointValue, + rewardedEpoch uint64, + blockhash [32]byte, + slotCtx *sealevel.SlotCtx, + f *features.Features, +) (*StreamingRewardsResult, error) { + minimum := minimumStakeDelegation(slotCtx) + spoolDir := filepath.Join(acctsDb.AcctsDir, "..") + + mlog.Log.Infof("Rewards: voteCache has %d entries, minimum stake delegation=%d", len(voteCache), minimum) + + // ==================== PHASE 1: Calculate total points + write points spool ==================== + pointsWriter, err := NewPointsSpoolWriter(spoolDir, slot) + if err != nil { + return nil, fmt.Errorf("creating points spool: %w", err) + } + pointsPath := pointsWriter.Path() + + var totalPoints wide.Uint128 + var totalPointsMu sync.Mutex + var phase1StakeCount atomic.Int64 + var phase1BelowMinimum atomic.Int64 + var phase1NoVoteState atomic.Int64 + var phase1ZeroPoints atomic.Int64 + var phase1TotalStakeLamports atomic.Uint64 + + // Collect ALL vote pubkeys from delegations (matching in-memory path's pre-population) + var allVotePubkeys sync.Map + + // Channel + single-writer goroutine for points spool writes + type pointsWriteRequest struct { + record *PointsSpoolRecord + } + pointsWriteChan := make(chan pointsWriteRequest, 10000) + var pointsWriteErr atomic.Pointer[error] + var pointsWriterWg sync.WaitGroup + pointsWriterWg.Add(1) + + go func() { + defer pointsWriterWg.Done() + for req := range pointsWriteChan { + if pointsWriteErr.Load() != nil { + continue + } + if err := pointsWriter.WriteRecord(req.record); err != nil { + pointsWriteErr.Store(&err) + } + } + }() + + _, err = global.StreamStakeAccounts(acctsDb, slot, + func(pk solana.PublicKey, delegation *sealevel.Delegation, creditsObs uint64) { + // Always record the vote pubkey, even for below-minimum delegations + allVotePubkeys.Store(delegation.VoterPubkey, struct{}{}) + + if delegation.StakeLamports < minimum { + phase1BelowMinimum.Add(1) + return + } + + voterPk := delegation.VoterPubkey + voteState := voteCache[voterPk] + if voteState == nil { + phase1NoVoteState.Add(1) + return + } + + delegWithCredits := *delegation + delegWithCredits.CreditsObserved = creditsObs + pcs := calculateStakePointsAndCredits(pk, stakeHistory, &delegWithCredits, voteState, newWarmupCooldownRateEpoch) + + zero128 := wide.Uint128FromUint64(0) + if pcs.Points.Eq(zero128) { + phase1ZeroPoints.Add(1) + } + + totalPointsMu.Lock() + totalPoints = totalPoints.Add(pcs.Points) + totalPointsMu.Unlock() + phase1StakeCount.Add(1) + phase1TotalStakeLamports.Add(delegation.StakeLamports) + + // Precompute the full forceCreditsUpdate flag using the same three + // triggers as CalculateStakeRewardsForAcct: + forceCredits := pcs.ForceCreditsUpdateWithSkippedReward || + pointValue.Rewards == 0 || + delegation.ActivationEpoch == rewardedEpoch + + // Only write points records that Phase 2 will actually use: + // - forceCredits records produce spool records with 0 rewards + // - Non-zero points records produce actual rewards + // - Zero points without forceCredits → nil reward → skip + if pcs.Points.Eq(zero128) && !forceCredits { + return + } + + pointsWriteChan <- pointsWriteRequest{record: &PointsSpoolRecord{ + StakePubkey: pk, + VotePubkey: delegation.VoterPubkey, + Points: pcs.Points, + NewCreditsObserved: pcs.NewCreditsObserved, + StakeLamports: delegation.StakeLamports, + ForceCreditsUpdateWithSkippedReward: forceCredits, + }} + }) + + close(pointsWriteChan) + pointsWriterWg.Wait() + + if err != nil { + pointsWriter.Close() + CleanupPointsSpoolFile(pointsPath) + return nil, fmt.Errorf("phase 1 streaming stakes for points: %w", err) + } + + if werr := pointsWriteErr.Load(); werr != nil { + pointsWriter.Close() + CleanupPointsSpoolFile(pointsPath) + return nil, fmt.Errorf("points spool write failed: %w", *werr) + } + + if err := pointsWriter.Close(); err != nil { + CleanupPointsSpoolFile(pointsPath) + return nil, fmt.Errorf("points spool close failed: %w", err) + } + + mlog.Log.Infof("Rewards Phase 1: %d stakes (belowMin=%d, noVote=%d, zeroPoints=%d), totalStakeLamports=%d, totalPoints=%s, totalRewards=%d, pointsSpoolRecords=%d", + phase1StakeCount.Load(), phase1BelowMinimum.Load(), phase1NoVoteState.Load(), phase1ZeroPoints.Load(), + phase1TotalStakeLamports.Load(), totalPoints.String(), pointValue.Rewards, pointsWriter.Count()) + + // ==================== PHASE 2: Replay points spool → compute rewards → write temp spool ==================== + pv := PointValue{Rewards: pointValue.Rewards, Points: totalPoints} + + tempWriter, err := NewTempSpoolWriter(spoolDir, slot) + if err != nil { + CleanupPointsSpoolFile(pointsPath) + return nil, fmt.Errorf("creating temp spool: %w", err) + } + tempPath := tempWriter.Path() + + // Pre-populate validatorRewards with ALL vote pubkeys from delegations + validatorRewards := make(map[solana.PublicKey]*atomic.Uint64) + allVotePubkeys.Range(func(key, _ interface{}) bool { + voterPk := key.(solana.PublicKey) + validatorRewards[voterPk] = &atomic.Uint64{} + return true + }) + + pointsReader, err := NewPointsSpoolReader(pointsPath) + if err != nil { + CleanupPointsSpoolFile(pointsPath) + tempWriter.Close() + CleanupTempSpoolFile(tempPath) + return nil, fmt.Errorf("opening points spool reader: %w", err) + } + + var phase2SkippedNilReward int64 + var phase2TotalStakerRewards uint64 + zero128 := wide.Uint128FromUint64(0) + + for { + rec, err := pointsReader.Next() + if err == io.EOF { + break + } + if err != nil { + pointsReader.Close() + CleanupPointsSpoolFile(pointsPath) + tempWriter.Close() + CleanupTempSpoolFile(tempPath) + return nil, fmt.Errorf("reading points spool: %w", err) + } + + // ForceCreditsUpdateWithSkippedReward was fully precomputed in Phase 1 + // (covers pcs.ForceCredits, pointValue.Rewards==0, activationEpoch==rewardedEpoch) + if rec.ForceCreditsUpdateWithSkippedReward { + // Credits update only — write spool record with 0 rewards + if err := tempWriter.WriteRecord(&SpoolRecord{ + StakePubkey: rec.StakePubkey, + VotePubkey: rec.VotePubkey, + StakeLamports: rec.StakeLamports, + CreditsObserved: rec.NewCreditsObserved, + RewardLamports: 0, + }); err != nil { + pointsReader.Close() + CleanupPointsSpoolFile(pointsPath) + tempWriter.Close() + CleanupTempSpoolFile(tempPath) + return nil, fmt.Errorf("temp spool write failed: %w", err) + } + continue + } + + // 2. Zero points or zero totalPoints → nil reward + if rec.Points.Eq(zero128) || pv.Points.Eq(zero128) { + phase2SkippedNilReward++ + continue + } + + // 3. Compute reward: (points * totalRewards) / totalPoints + rewards128 := rec.Points.Mul(wide.Uint128FromUint64(pv.Rewards)).Div(pv.Points) + if !rewards128.IsUint64() { + phase2SkippedNilReward++ + continue + } + rewards := rewards128.Uint64() + if rewards == 0 { + phase2SkippedNilReward++ + continue + } + + // 4. Commission split + voteState := voteCache[rec.VotePubkey] + if voteState == nil { + phase2SkippedNilReward++ + continue + } + splitResult := voteCommissionSplit(voteState, rewards) + if splitResult.IsSplit && (splitResult.VoterPortion == 0 || splitResult.StakerPortion == 0) { + phase2SkippedNilReward++ + continue + } + + phase2TotalStakerRewards += splitResult.StakerPortion + + if err := tempWriter.WriteRecord(&SpoolRecord{ + StakePubkey: rec.StakePubkey, + VotePubkey: rec.VotePubkey, + StakeLamports: rec.StakeLamports, + CreditsObserved: rec.NewCreditsObserved, + RewardLamports: splitResult.StakerPortion, + }); err != nil { + pointsReader.Close() + CleanupPointsSpoolFile(pointsPath) + tempWriter.Close() + CleanupTempSpoolFile(tempPath) + return nil, fmt.Errorf("temp spool write failed: %w", err) + } + + if splitResult.VoterPortion > 0 { + validatorRewards[rec.VotePubkey].Add(splitResult.VoterPortion) + } + } + + pointsReader.Close() + CleanupPointsSpoolFile(pointsPath) + + if err := tempWriter.Close(); err != nil { + CleanupTempSpoolFile(tempPath) + return nil, fmt.Errorf("temp spool close failed: %w", err) + } + + // ==================== Calculate numPartitions from ACTUAL count ==================== + actualRewardCount := uint64(tempWriter.Count()) + numPartitions := CalculateNumRewardPartitions(actualRewardCount) + + var totalVotingRewards uint64 + for _, v := range validatorRewards { + totalVotingRewards += v.Load() + } + mlog.Log.Infof("Rewards Phase 2: %d records (skippedNilReward=%d), totalStakerRewards=%d, totalVotingRewards=%d, validatorCount=%d, numPartitions=%d", + actualRewardCount, phase2SkippedNilReward, + phase2TotalStakerRewards, totalVotingRewards, len(validatorRewards), numPartitions) + + // ==================== PHASE 3: Read temp spool, assign partitions, write per-partition spools ==================== + tempReader, err := NewTempSpoolReader(tempPath) + if err != nil { + CleanupTempSpoolFile(tempPath) + return nil, fmt.Errorf("opening temp spool reader: %w", err) + } + + partitionWriters := NewPartitionedSpoolWriters(spoolDir, slot, numPartitions) + + for { + rec, err := tempReader.Next() + if err == io.EOF { + break + } + if err != nil { + tempReader.Close() + partitionWriters.Close() + CleanupTempSpoolFile(tempPath) + CleanupPartitionedSpoolFiles(spoolDir, slot, numPartitions) + return nil, fmt.Errorf("reading temp spool: %w", err) + } + + rec.PartitionIndex = uint32(CalculateRewardPartitionForPubkey(rec.StakePubkey, blockhash, numPartitions)) + + if err := partitionWriters.WriteRecord(rec); err != nil { + tempReader.Close() + partitionWriters.Close() + CleanupTempSpoolFile(tempPath) + CleanupPartitionedSpoolFiles(spoolDir, slot, numPartitions) + return nil, fmt.Errorf("partition spool write: %w", err) + } + } + + tempReader.Close() + CleanupTempSpoolFile(tempPath) + + if err := partitionWriters.Close(); err != nil { + CleanupPartitionedSpoolFiles(spoolDir, slot, numPartitions) + return nil, fmt.Errorf("partition spool close failed: %w", err) + } + + return &StreamingRewardsResult{ + SpoolDir: spoolDir, + SpoolSlot: slot, + TotalPoints: totalPoints, + ValidatorRewards: validatorRewards, + NumStakeRewards: actualRewardCount, + NumPartitions: numPartitions, + }, nil +} diff --git a/pkg/rewards/spool.go b/pkg/rewards/spool.go new file mode 100644 index 00000000..47c59eed --- /dev/null +++ b/pkg/rewards/spool.go @@ -0,0 +1,468 @@ +package rewards + +import ( + "bufio" + "encoding/binary" + "fmt" + "io" + "os" + "path/filepath" + "sync" + + "github.com/Overclock-Validator/wide" + "github.com/gagliardetto/solana-go" +) + +// SpoolRecordSize is the binary size of a spool record. +// Format: stake_pubkey(32) + vote_pubkey(32) + stake_lamports(8) + +// +// credits_observed(8) + reward_lamports(8) = 88 bytes +const SpoolRecordSize = 88 + +// SpoolRecord represents a single stake reward record. +type SpoolRecord struct { + StakePubkey solana.PublicKey + VotePubkey solana.PublicKey + StakeLamports uint64 + CreditsObserved uint64 + RewardLamports uint64 + PartitionIndex uint32 // Only used during calculation, not serialized +} + +// encodeRecord encodes a record into the buffer (without partition index). +func encodeRecord(rec *SpoolRecord, buf []byte) { + copy(buf[0:32], rec.StakePubkey[:]) + copy(buf[32:64], rec.VotePubkey[:]) + binary.LittleEndian.PutUint64(buf[64:72], rec.StakeLamports) + binary.LittleEndian.PutUint64(buf[72:80], rec.CreditsObserved) + binary.LittleEndian.PutUint64(buf[80:88], rec.RewardLamports) +} + +// decodeRecord decodes a record from the buffer. +func decodeRecord(buf []byte, rec *SpoolRecord) { + copy(rec.StakePubkey[:], buf[0:32]) + copy(rec.VotePubkey[:], buf[32:64]) + rec.StakeLamports = binary.LittleEndian.Uint64(buf[64:72]) + rec.CreditsObserved = binary.LittleEndian.Uint64(buf[72:80]) + rec.RewardLamports = binary.LittleEndian.Uint64(buf[80:88]) +} + +// PartitionedSpoolWriters manages per-partition spool files. +// Thread-safe - multiple goroutines can write concurrently. +// Uses buffered I/O for performance. +type PartitionedSpoolWriters struct { + baseDir string + slot uint64 + numPartitions uint64 + writers map[uint32]*partitionWriter + mu sync.Mutex + closed bool +} + +// partitionWriter is a buffered writer for a single partition file. +type partitionWriter struct { + file *os.File + bufw *bufio.Writer + count int +} + +// NewPartitionedSpoolWriters creates a new set of per-partition spool writers. +func NewPartitionedSpoolWriters(baseDir string, slot uint64, numPartitions uint64) *PartitionedSpoolWriters { + return &PartitionedSpoolWriters{ + baseDir: baseDir, + slot: slot, + numPartitions: numPartitions, + writers: make(map[uint32]*partitionWriter), + } +} + +// SpoolDir returns the base directory for spool files. +func (p *PartitionedSpoolWriters) SpoolDir() string { + return p.baseDir +} + +// Slot returns the slot this spool is for. +func (p *PartitionedSpoolWriters) Slot() uint64 { + return p.slot +} + +// WriteRecord writes a record to the appropriate partition file. +// Thread-safe - lazily opens partition files as needed. +func (p *PartitionedSpoolWriters) WriteRecord(rec *SpoolRecord) error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.closed { + return fmt.Errorf("spool writers are closed") + } + + partition := rec.PartitionIndex + + // Get or create writer for this partition + w, exists := p.writers[partition] + if !exists { + path := partitionFilePath(p.baseDir, p.slot, partition) + f, err := os.Create(path) + if err != nil { + return fmt.Errorf("creating partition %d spool file: %w", partition, err) + } + // 1MB buffer for efficient sequential writes + w = &partitionWriter{file: f, bufw: bufio.NewWriterSize(f, 1<<20)} + p.writers[partition] = w + } + + // Write record to buffer + var buf [SpoolRecordSize]byte + encodeRecord(rec, buf[:]) + if _, err := w.bufw.Write(buf[:]); err != nil { + return fmt.Errorf("writing to partition %d: %w", partition, err) + } + w.count++ + return nil +} + +// Close flushes buffers, syncs, and closes all partition files. +// Returns the first error encountered. +func (p *PartitionedSpoolWriters) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.closed { + return nil + } + p.closed = true + + var firstErr error + for partition, w := range p.writers { + // Flush buffer first + if err := w.bufw.Flush(); err != nil && firstErr == nil { + firstErr = fmt.Errorf("flushing partition %d: %w", partition, err) + } + // Sync to disk + if err := w.file.Sync(); err != nil && firstErr == nil { + firstErr = fmt.Errorf("syncing partition %d: %w", partition, err) + } + // Close file + if err := w.file.Close(); err != nil && firstErr == nil { + firstErr = fmt.Errorf("closing partition %d: %w", partition, err) + } + } + return firstErr +} + +// TotalRecords returns the total number of records written across all partitions. +func (p *PartitionedSpoolWriters) TotalRecords() int { + p.mu.Lock() + defer p.mu.Unlock() + + total := 0 + for _, w := range p.writers { + total += w.count + } + return total +} + +// PartitionReader reads records sequentially from a partition spool file. +// Uses buffered I/O for efficient sequential reads. +type PartitionReader struct { + file *os.File + bufr *bufio.Reader + buf [SpoolRecordSize]byte +} + +// NewPartitionReader opens a partition file for sequential reading. +func NewPartitionReader(baseDir string, slot uint64, partition uint32) (*PartitionReader, error) { + path := partitionFilePath(baseDir, slot, partition) + f, err := os.Open(path) + if err != nil { + if os.IsNotExist(err) { + // No records for this partition + return nil, nil + } + return nil, fmt.Errorf("opening partition %d spool: %w", partition, err) + } + // 1MB buffer for efficient sequential reads + return &PartitionReader{file: f, bufr: bufio.NewReaderSize(f, 1<<20)}, nil +} + +// Next reads the next record. Returns io.EOF when done. +func (r *PartitionReader) Next() (*SpoolRecord, error) { + _, err := io.ReadFull(r.bufr, r.buf[:]) + if err == io.EOF { + return nil, io.EOF + } + if err != nil { + return nil, fmt.Errorf("reading spool record: %w", err) + } + + rec := &SpoolRecord{} + decodeRecord(r.buf[:], rec) + return rec, nil +} + +// Close closes the partition file. +func (r *PartitionReader) Close() error { + return r.file.Close() +} + +// partitionFilePath returns the path for a partition spool file. +func partitionFilePath(baseDir string, slot uint64, partition uint32) string { + return filepath.Join(baseDir, fmt.Sprintf("reward_spool_%d_p%d.bin", slot, partition)) +} + +// CleanupPartitionedSpoolFiles removes all partition spool files for a slot. +func CleanupPartitionedSpoolFiles(baseDir string, slot uint64, numPartitions uint64) { + for p := uint64(0); p < numPartitions; p++ { + path := partitionFilePath(baseDir, slot, uint32(p)) + os.Remove(path) // Ignore errors - file may not exist + } +} + +// TempSpoolWriter writes reward records to a single temp file (no partition separation). +// Used in the first phase of reward calculation before partition count is known. +// NOT thread-safe - should be used with a single-writer pattern. +type TempSpoolWriter struct { + file *os.File + bufw *bufio.Writer + path string + count int +} + +// NewTempSpoolWriter creates a new temp spool writer. +func NewTempSpoolWriter(baseDir string, slot uint64) (*TempSpoolWriter, error) { + path := tempSpoolPath(baseDir, slot) + f, err := os.Create(path) + if err != nil { + return nil, fmt.Errorf("creating temp spool: %w", err) + } + // 1MB buffer for efficient sequential writes + return &TempSpoolWriter{ + file: f, + bufw: bufio.NewWriterSize(f, 1<<20), + path: path, + }, nil +} + +// WriteRecord writes a record to the temp spool. +func (w *TempSpoolWriter) WriteRecord(rec *SpoolRecord) error { + var buf [SpoolRecordSize]byte + encodeRecord(rec, buf[:]) + if _, err := w.bufw.Write(buf[:]); err != nil { + return fmt.Errorf("writing temp spool record: %w", err) + } + w.count++ + return nil +} + +// Count returns the number of records written. +func (w *TempSpoolWriter) Count() int { + return w.count +} + +// Path returns the temp spool file path. +func (w *TempSpoolWriter) Path() string { + return w.path +} + +// Close flushes and closes the temp spool file. +// NOTE: No Sync() - temp spool is only used in-process and deleted immediately after reading. +func (w *TempSpoolWriter) Close() error { + if err := w.bufw.Flush(); err != nil { + return fmt.Errorf("flushing temp spool: %w", err) + } + return w.file.Close() +} + +// TempSpoolReader reads records sequentially from a temp spool file. +type TempSpoolReader struct { + file *os.File + bufr *bufio.Reader + buf [SpoolRecordSize]byte +} + +// NewTempSpoolReader opens a temp spool file for sequential reading. +func NewTempSpoolReader(path string) (*TempSpoolReader, error) { + f, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("opening temp spool: %w", err) + } + // 1MB buffer for efficient sequential reads + return &TempSpoolReader{ + file: f, + bufr: bufio.NewReaderSize(f, 1<<20), + }, nil +} + +// Next reads the next record. Returns io.EOF when done. +func (r *TempSpoolReader) Next() (*SpoolRecord, error) { + _, err := io.ReadFull(r.bufr, r.buf[:]) + if err == io.EOF { + return nil, io.EOF + } + if err != nil { + return nil, fmt.Errorf("reading temp spool record: %w", err) + } + + rec := &SpoolRecord{} + decodeRecord(r.buf[:], rec) + return rec, nil +} + +// Close closes the temp spool file. +func (r *TempSpoolReader) Close() error { + return r.file.Close() +} + +// tempSpoolPath returns the path for a temp spool file. +func tempSpoolPath(baseDir string, slot uint64) string { + return filepath.Join(baseDir, fmt.Sprintf("reward_temp_%d.bin", slot)) +} + +// CleanupTempSpoolFile removes a temp spool file. +func CleanupTempSpoolFile(path string) { + os.Remove(path) // Ignore errors - file may not exist +} + +// PointsSpoolRecordSize is the binary size of a points spool record. +// Format: stake_pubkey(32) + vote_pubkey(32) + points_lo(8) + points_hi(8) + +// +// new_credits_observed(8) + stake_lamports(8) + force_credits_update(8) = 104 bytes +const PointsSpoolRecordSize = 104 + +// PointsSpoolRecord stores intermediate per-stake data from Phase 1 (points calculation) +// so Phase 2 can compute rewards from sequential file I/O instead of re-scanning AccountsDB. +// ForceCreditsUpdateWithSkippedReward is fully precomputed in Phase 1 using all three triggers +// (pcs.ForceCredits, pointValue.Rewards==0, activationEpoch==rewardedEpoch). +type PointsSpoolRecord struct { + StakePubkey solana.PublicKey + VotePubkey solana.PublicKey + Points wide.Uint128 + NewCreditsObserved uint64 + StakeLamports uint64 + ForceCreditsUpdateWithSkippedReward bool +} + +func encodePointsRecord(rec *PointsSpoolRecord, buf []byte) { + copy(buf[0:32], rec.StakePubkey[:]) + copy(buf[32:64], rec.VotePubkey[:]) + binary.LittleEndian.PutUint64(buf[64:72], rec.Points.Lo) + binary.LittleEndian.PutUint64(buf[72:80], rec.Points.Hi) + binary.LittleEndian.PutUint64(buf[80:88], rec.NewCreditsObserved) + binary.LittleEndian.PutUint64(buf[88:96], rec.StakeLamports) + var flags uint64 + if rec.ForceCreditsUpdateWithSkippedReward { + flags = 1 + } + binary.LittleEndian.PutUint64(buf[96:104], flags) +} + +func decodePointsRecord(buf []byte, rec *PointsSpoolRecord) { + copy(rec.StakePubkey[:], buf[0:32]) + copy(rec.VotePubkey[:], buf[32:64]) + rec.Points.Lo = binary.LittleEndian.Uint64(buf[64:72]) + rec.Points.Hi = binary.LittleEndian.Uint64(buf[72:80]) + rec.NewCreditsObserved = binary.LittleEndian.Uint64(buf[80:88]) + rec.StakeLamports = binary.LittleEndian.Uint64(buf[88:96]) + rec.ForceCreditsUpdateWithSkippedReward = binary.LittleEndian.Uint64(buf[96:104]) != 0 +} + +// PointsSpoolWriter writes points records to a single temp file. +// NOT thread-safe — use with a single-writer goroutine via channel. +type PointsSpoolWriter struct { + file *os.File + bufw *bufio.Writer + path string + count int +} + +// NewPointsSpoolWriter creates a new points spool writer. +func NewPointsSpoolWriter(baseDir string, slot uint64) (*PointsSpoolWriter, error) { + path := pointsSpoolPath(baseDir, slot) + f, err := os.Create(path) + if err != nil { + return nil, fmt.Errorf("creating points spool: %w", err) + } + return &PointsSpoolWriter{ + file: f, + bufw: bufio.NewWriterSize(f, 1<<20), + path: path, + }, nil +} + +// WriteRecord writes a record to the points spool. +func (w *PointsSpoolWriter) WriteRecord(rec *PointsSpoolRecord) error { + var buf [PointsSpoolRecordSize]byte + encodePointsRecord(rec, buf[:]) + if _, err := w.bufw.Write(buf[:]); err != nil { + return fmt.Errorf("writing points spool record: %w", err) + } + w.count++ + return nil +} + +// Count returns the number of records written. +func (w *PointsSpoolWriter) Count() int { + return w.count +} + +// Path returns the points spool file path. +func (w *PointsSpoolWriter) Path() string { + return w.path +} + +// Close flushes and closes the points spool file. +// NOTE: No Sync() — points spool is only used in-process and deleted immediately after reading. +func (w *PointsSpoolWriter) Close() error { + if err := w.bufw.Flush(); err != nil { + return fmt.Errorf("flushing points spool: %w", err) + } + return w.file.Close() +} + +// PointsSpoolReader reads points records sequentially from a points spool file. +type PointsSpoolReader struct { + file *os.File + bufr *bufio.Reader + buf [PointsSpoolRecordSize]byte +} + +// NewPointsSpoolReader opens a points spool file for sequential reading. +func NewPointsSpoolReader(path string) (*PointsSpoolReader, error) { + f, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("opening points spool: %w", err) + } + return &PointsSpoolReader{ + file: f, + bufr: bufio.NewReaderSize(f, 1<<20), + }, nil +} + +// Next reads the next record. Returns io.EOF when done. +func (r *PointsSpoolReader) Next() (*PointsSpoolRecord, error) { + _, err := io.ReadFull(r.bufr, r.buf[:]) + if err == io.EOF { + return nil, io.EOF + } + if err != nil { + return nil, fmt.Errorf("reading points spool record: %w", err) + } + + rec := &PointsSpoolRecord{} + decodePointsRecord(r.buf[:], rec) + return rec, nil +} + +// Close closes the points spool file. +func (r *PointsSpoolReader) Close() error { + return r.file.Close() +} + +func pointsSpoolPath(baseDir string, slot uint64) string { + return filepath.Join(baseDir, fmt.Sprintf("reward_points_%d.bin", slot)) +} + +// CleanupPointsSpoolFile removes a points spool file. +func CleanupPointsSpoolFile(path string) { + os.Remove(path) +} diff --git a/pkg/sealevel/execution_ctx.go b/pkg/sealevel/execution_ctx.go index 7d47fa9a..bd2820ea 100644 --- a/pkg/sealevel/execution_ctx.go +++ b/pkg/sealevel/execution_ctx.go @@ -52,7 +52,6 @@ type SlotCtx struct { VoteTimestampMu *sync.Mutex // VoteTimestampsMu protects VoteTimestamps VoteTimestamps map[solana.PublicKey]BlockTimestamp - StakeCache map[solana.PublicKey]*Delegation VoteAccts map[solana.PublicKey]uint64 TotalEpochStake uint64 FinalBankhash []byte diff --git a/pkg/snapshot/build_db.go b/pkg/snapshot/build_db.go index 9b6605f2..c8c5ca58 100644 --- a/pkg/snapshot/build_db.go +++ b/pkg/snapshot/build_db.go @@ -1,7 +1,6 @@ package snapshot import ( - "bufio" "bytes" "context" "encoding/binary" @@ -19,7 +18,6 @@ import ( "github.com/Overclock-Validator/mithril/pkg/progress" "github.com/Overclock-Validator/mithril/pkg/statsd" "github.com/cockroachdb/pebble" - "github.com/gagliardetto/solana-go" "github.com/panjf2000/ants/v2" ) @@ -202,7 +200,7 @@ func BuildAccountsDbPaths( // Create stake pubkey collector for building stake index during appendvec processing stakeCollector := &stakeIndexCollector{ - pubkeys: make([]solana.PublicKey, 0, 1000000), // Pre-allocate for ~1M stake accounts + entries: make([]accountsdb.StakeIndexEntry, 0, 1000000), // Pre-allocate for ~1M stake accounts } pools, err := initWorkerPools(wg, sl, manifest, incrementalManifest, accountsDbDir, &largestFileId, stakeCollector) @@ -288,9 +286,9 @@ func BuildAccountsDbPaths( pools.Release() - // Write stake pubkey index file + // Write stake pubkey index file (with appendvec location hints) stakeIndexPath := filepath.Join(accountsDbDir, "stake_pubkeys.idx") - if err := WriteStakePubkeyIndex(stakeIndexPath, stakeCollector.pubkeys); err != nil { + if err := accountsdb.WriteStakePubkeyIndex(stakeIndexPath, stakeCollector.entries); err != nil { return nil, nil, fmt.Errorf("writing stake pubkey index: %w", err) } @@ -430,15 +428,15 @@ type snapshotWorkerPools struct { // manifest data. type stakeIndexCollector struct { mu sync.Mutex - pubkeys []solana.PublicKey + entries []accountsdb.StakeIndexEntry } -func (c *stakeIndexCollector) Add(pks []solana.PublicKey) { - if len(pks) == 0 { +func (c *stakeIndexCollector) Add(entries []accountsdb.StakeIndexEntry) { + if len(entries) == 0 { return } c.mu.Lock() - c.pubkeys = append(c.pubkeys, pks...) + c.entries = append(c.entries, entries...) c.mu.Unlock() } @@ -474,14 +472,14 @@ func initWorkerPools( start := time.Now() defer wg.Done() task := i.(indexEntryBuilderTask) - pubkeys, entries, stakePubkeys, err := accountsdb.BuildIndexEntriesFromAppendVecs(task.Data, task.FileSize, task.Slot, task.FileId) + pubkeys, entries, stakeEntries, err := accountsdb.BuildIndexEntriesFromAppendVecs(task.Data, task.FileSize, task.Slot, task.FileId) if err != nil { mlog.Log.Errorf("BuildIndexEntriesFromAppendVecs: %v", err) return } - // Collect stake pubkeys for building stake index - stakeCollector.Add(stakePubkeys) + // Collect stake entries with appendvec location hints for building stake index + stakeCollector.Add(stakeEntries) indexEntryBuilderInProgress.Add(-1) commitTask := indexEntryCommitterTask{IndexEntries: entries, Pubkeys: pubkeys} @@ -618,35 +616,3 @@ func ingestSSTFiles(indexDir, logsDir string) (*pebble.DB, error) { return db, nil } -// WriteStakePubkeyIndex writes stake pubkeys to a binary index file. -// Format: 32-byte pubkeys appended sequentially, no header. -func WriteStakePubkeyIndex(path string, pubkeys []solana.PublicKey) error { - f, err := os.Create(path) - if err != nil { - return err - } - defer f.Close() - - buf := bufio.NewWriter(f) - for _, pk := range pubkeys { - if _, err := buf.Write(pk[:]); err != nil { - return err - } - } - return buf.Flush() -} - -// LoadStakePubkeyIndex reads stake pubkeys from a binary index file. -func LoadStakePubkeyIndex(path string) ([]solana.PublicKey, error) { - data, err := os.ReadFile(path) - if err != nil { - return nil, err - } - - count := len(data) / 32 - pubkeys := make([]solana.PublicKey, count) - for i := 0; i < count; i++ { - copy(pubkeys[i][:], data[i*32:(i+1)*32]) - } - return pubkeys, nil -} diff --git a/pkg/snapshot/build_db_with_incr.go b/pkg/snapshot/build_db_with_incr.go index 6af452b0..3ef31844 100644 --- a/pkg/snapshot/build_db_with_incr.go +++ b/pkg/snapshot/build_db_with_incr.go @@ -17,7 +17,6 @@ import ( "github.com/Overclock-Validator/mithril/pkg/rpcclient" "github.com/Overclock-Validator/mithril/pkg/snapshotdl" "github.com/cockroachdb/pebble" - "github.com/gagliardetto/solana-go" "github.com/panjf2000/ants/v2" ) @@ -79,7 +78,7 @@ func BuildAccountsDbAuto( // Create stake pubkey collector for building stake index during appendvec processing stakeCollector := &stakeIndexCollector{ - pubkeys: make([]solana.PublicKey, 0, 1000000), // Pre-allocate for ~1M stake accounts + entries: make([]accountsdb.StakeIndexEntry, 0, 1000000), // Pre-allocate for ~1M stake accounts } pools, err := initWorkerPools(wg, sl, manifest, incrementalManifest, accountsDbDir, &largestFileId, stakeCollector) @@ -239,9 +238,9 @@ func BuildAccountsDbAuto( pools.Release() - // Write stake pubkey index file + // Write stake pubkey index file (with appendvec location hints) stakeIndexPath := filepath.Join(accountsDbDir, "stake_pubkeys.idx") - if err := WriteStakePubkeyIndex(stakeIndexPath, stakeCollector.pubkeys); err != nil { + if err := accountsdb.WriteStakePubkeyIndex(stakeIndexPath, stakeCollector.entries); err != nil { return nil, nil, fmt.Errorf("writing stake pubkey index: %w", err) } diff --git a/pkg/snapshot/manifest_seed.go b/pkg/snapshot/manifest_seed.go new file mode 100644 index 00000000..f4609284 --- /dev/null +++ b/pkg/snapshot/manifest_seed.go @@ -0,0 +1,133 @@ +package snapshot + +import ( + "encoding/base64" + "encoding/json" + "sort" + + "github.com/Overclock-Validator/mithril/pkg/base58" + "github.com/Overclock-Validator/mithril/pkg/epochstakes" + "github.com/Overclock-Validator/mithril/pkg/state" +) + +// PopulateManifestSeed copies manifest data to state file for replay context. +// Called ONCE after AccountsDB build completes, before writing state file. +// This eliminates the need to read the manifest at runtime. +func PopulateManifestSeed(s *state.MithrilState, m *SnapshotManifest) { + // Block config + s.ManifestParentSlot = m.Bank.Slot + s.ManifestParentBankhash = base58.Encode(m.Bank.Hash[:]) + + // LtHash: use Hash() method, encode as base64 + if m.LtHash != nil { + s.ManifestAcctsLtHash = base64.StdEncoding.EncodeToString(m.LtHash.Hash()) + } + + // Fee rate governor (static fields only) + s.ManifestFeeRateGovernor = &state.ManifestFeeRateGovernorSeed{ + TargetLamportsPerSignature: m.Bank.FeeRateGovernor.TargetLamportsPerSignature, + TargetSignaturesPerSlot: m.Bank.FeeRateGovernor.TargetSignaturesPerSlot, + MinLamportsPerSignature: m.Bank.FeeRateGovernor.MinLamportsPerSignature, + MaxLamportsPerSignature: m.Bank.FeeRateGovernor.MaxLamportsPerSignature, + BurnPercent: m.Bank.FeeRateGovernor.BurnPercent, + } + + // Signature/fee state + s.ManifestSignatureCount = m.Bank.SignatureCount + s.ManifestLamportsPerSignature = m.LamportsPerSignature + + // Blockhash context (sort by hash_index descending) + ages := make([]HashAgePair, len(m.Bank.BlockhashQueue.HashAndAge)) + copy(ages, m.Bank.BlockhashQueue.HashAndAge) + sort.Slice(ages, func(i, j int) bool { + return ages[i].Val.HashIndex > ages[j].Val.HashIndex + }) + + // Store top 150 blockhashes + numBlockhashes := min(150, len(ages)) + s.ManifestRecentBlockhashes = make([]state.BlockhashEntry, numBlockhashes) + for i := 0; i < numBlockhashes; i++ { + s.ManifestRecentBlockhashes[i] = state.BlockhashEntry{ + Blockhash: base58.Encode(ages[i].Key[:]), + LamportsPerSignature: ages[i].Val.FeeCalculator.LamportsPerSignature, + } + } + + // Guard: only access ages[150] if we have at least 151 entries + if len(ages) > 150 { + s.ManifestEvictedBlockhash = base58.Encode(ages[150].Key[:]) + } + + // ReplayCtx seed + s.ManifestCapitalization = m.Bank.Capitalization + s.ManifestSlotsPerYear = m.Bank.SlotsPerYear + s.ManifestInflationInitial = m.Bank.Inflation.Initial + s.ManifestInflationTerminal = m.Bank.Inflation.Terminal + s.ManifestInflationTaper = m.Bank.Inflation.Taper + s.ManifestInflationFoundation = m.Bank.Inflation.FoundationVal + s.ManifestInflationFoundationTerm = m.Bank.Inflation.FoundationTerm + + // Epoch account hash (base64 for consistency with LtHash) + if m.EpochAccountHash != [32]byte{} { + s.ManifestEpochAcctsHash = base64.StdEncoding.EncodeToString(m.EpochAccountHash[:]) + } + + // Transaction count at snapshot + s.ManifestTransactionCount = m.Bank.TransactionCount + + // Epoch authorized voters (for snapshot epoch only) + // Supports multiple authorized voters per vote account (matches original manifest behavior) + s.ManifestEpochAuthorizedVoters = make(map[string][]string) + for _, epochStake := range m.VersionedEpochStakes { + if epochStake.Epoch == m.Bank.Epoch { + for _, entry := range epochStake.Val.EpochAuthorizedVoters { + voteAcctStr := base58.Encode(entry.Key[:]) + authorizedVoterStr := base58.Encode(entry.Val[:]) + s.ManifestEpochAuthorizedVoters[voteAcctStr] = append(s.ManifestEpochAuthorizedVoters[voteAcctStr], authorizedVoterStr) + } + } + } + + // Epoch stakes: convert VersionedEpochStakes to PersistedEpochStakes format + // This stores ONLY vote-account aggregates, NOT full stake account data + s.ManifestEpochStakes = convertVersionedEpochStakesToPersisted(m.VersionedEpochStakes) +} + +// convertVersionedEpochStakesToPersisted converts manifest epoch stakes to +// the same PersistedEpochStakes JSON format used by ComputedEpochStakes. +// Only stores vote-account stakes (aggregated), NOT full stake account data. +func convertVersionedEpochStakesToPersisted(stakes []VersionedEpochStakesPair) map[uint64]string { + result := make(map[uint64]string, len(stakes)) + + for _, epochStake := range stakes { + // Build PersistedEpochStakes from manifest data + persisted := epochstakes.PersistedEpochStakes{ + Epoch: epochStake.Epoch, + TotalStake: epochStake.Val.TotalStake, + Stakes: make(map[string]uint64), + VoteAccts: make(map[string]*epochstakes.VoteAccountJSON), + } + + // Extract vote accounts from Stakes.VoteAccounts (aggregated data) + for _, va := range epochStake.Val.Stakes.VoteAccounts { + pkStr := base58.Encode(va.Key[:]) + persisted.Stakes[pkStr] = va.Stake + persisted.VoteAccts[pkStr] = &epochstakes.VoteAccountJSON{ + Lamports: va.Value.Lamports, + NodePubkey: base58.Encode(va.Value.NodePubkey[:]), + LastTimestampTs: va.Value.LastTimestampTs, + LastTimestampSlot: va.Value.LastTimestampSlot, + Owner: base58.Encode(va.Value.Owner[:]), + Executable: va.Value.Executable, + RentEpoch: va.Value.RentEpoch, + } + } + + data, err := json.Marshal(persisted) + if err != nil { + continue + } + result[epochStake.Epoch] = string(data) + } + return result +} diff --git a/pkg/state/state.go b/pkg/state/state.go index df678cc9..83531581 100644 --- a/pkg/state/state.go +++ b/pkg/state/state.go @@ -16,7 +16,7 @@ const HistoryFileName = "mithril_state.history.jsonl" // CurrentStateSchemaVersion is the current version of the state file format. // Increment this when making breaking changes to the state file structure. -const CurrentStateSchemaVersion uint32 = 1 +const CurrentStateSchemaVersion uint32 = 2 // MithrilState tracks the current state of the mithril node. // The state file serves as an atomic marker of validity - AccountsDB is valid @@ -58,6 +58,52 @@ type MithrilState struct { CorruptionReason string `json:"corruption_reason,omitempty"` CorruptionDetectedAt time.Time `json:"corruption_detected_at,omitempty"` + // ========================================================================= + // Manifest Seed Data (copied from manifest at snapshot build time) + // Used ONLY for fresh-start replay. Resume uses Last* fields instead. + // ========================================================================= + + // Block configuration seed + ManifestParentSlot uint64 `json:"manifest_parent_slot,omitempty"` + ManifestParentBankhash string `json:"manifest_parent_bankhash,omitempty"` // base58 + ManifestAcctsLtHash string `json:"manifest_accts_lt_hash,omitempty"` // base64 + + // Fee rate governor seed (static fields only) + ManifestFeeRateGovernor *ManifestFeeRateGovernorSeed `json:"manifest_fee_rate_governor,omitempty"` + + // Signature/fee state at snapshot + ManifestSignatureCount uint64 `json:"manifest_signature_count,omitempty"` + ManifestLamportsPerSignature uint64 `json:"manifest_lamports_per_sig,omitempty"` + + // Blockhash context (150 recent + 1 evicted) + ManifestRecentBlockhashes []BlockhashEntry `json:"manifest_recent_blockhashes,omitempty"` + ManifestEvictedBlockhash string `json:"manifest_evicted_blockhash,omitempty"` // base58 + + // ReplayCtx seed (inflation/capitalization at snapshot) + ManifestCapitalization uint64 `json:"manifest_capitalization,omitempty"` + ManifestSlotsPerYear float64 `json:"manifest_slots_per_year,omitempty"` + ManifestInflationInitial float64 `json:"manifest_inflation_initial,omitempty"` + ManifestInflationTerminal float64 `json:"manifest_inflation_terminal,omitempty"` + ManifestInflationTaper float64 `json:"manifest_inflation_taper,omitempty"` + ManifestInflationFoundation float64 `json:"manifest_inflation_foundation,omitempty"` + ManifestInflationFoundationTerm float64 `json:"manifest_inflation_foundation_term,omitempty"` + + // Epoch account hash (base64 for consistency with LtHash) + ManifestEpochAcctsHash string `json:"manifest_epoch_accts_hash,omitempty"` // base64 + + // Transaction count at snapshot slot + ManifestTransactionCount uint64 `json:"manifest_transaction_count,omitempty"` + + // Epoch authorized voters (for current epoch only) + // Maps vote account pubkey (base58) -> list of authorized voter pubkeys (base58) + // Multiple authorized voters per vote account are supported (matches original manifest behavior) + ManifestEpochAuthorizedVoters map[string][]string `json:"manifest_epoch_authorized_voters,omitempty"` + + // Epoch stakes seed - AGGREGATED vote-account stakes only (NOT full VersionedEpochStakes) + // Same format as ComputedEpochStakes (PersistedEpochStakes JSON) + // Cleared after first replayed slot to save space. + ManifestEpochStakes map[uint64]string `json:"manifest_epoch_stakes,omitempty"` + // ========================================================================= // Current Position (where we left off) // ========================================================================= @@ -123,6 +169,17 @@ type BlockhashEntry struct { LamportsPerSignature uint64 `json:"lamports_per_sig"` } +// ManifestFeeRateGovernorSeed contains the static fields from FeeRateGovernor +// that do not change during replay. Dynamic fields (LamportsPerSignature, +// PrevLamportsPerSignature) are stored separately and updated on resume. +type ManifestFeeRateGovernorSeed struct { + TargetLamportsPerSignature uint64 `json:"target_lamports_per_sig"` + TargetSignaturesPerSlot uint64 `json:"target_sigs_per_slot"` + MinLamportsPerSignature uint64 `json:"min_lamports_per_sig"` + MaxLamportsPerSignature uint64 `json:"max_lamports_per_sig"` + BurnPercent byte `json:"burn_percent"` +} + // SlotHashEntry represents a single entry in the SlotHashes sysvar type SlotHashEntry struct { Slot uint64 `json:"slot"` @@ -155,15 +212,9 @@ func LoadState(accountsDbDir string) (*MithrilState, error) { return nil, fmt.Errorf("failed to parse state file: %w", err) } - // Migrate from older schema versions - if state.StateSchemaVersion == 0 { - // Version 0 → 1 migration: - // - Migrate LastCommit to LastWriterCommit - if state.LastCommit != "" && state.LastWriterCommit == "" { - state.LastWriterCommit = state.LastCommit - } - state.StateSchemaVersion = 1 - // Note: We don't save here - the state will be saved on next update + // Require schema version 2 - no migration from older versions + if state.StateSchemaVersion != CurrentStateSchemaVersion { + return nil, fmt.Errorf("state file schema version %d is not supported (requires v%d). Delete AccountsDB and rebuild from snapshot", state.StateSchemaVersion, CurrentStateSchemaVersion) } return &state, nil @@ -348,6 +399,12 @@ func (s *MithrilState) HasResumeData() bool { return s != nil && s.LastSlot > 0 && s.LastAcctsLtHash != "" } +// ClearManifestEpochStakes removes the manifest epoch stakes after they're no longer needed. +// This should be called after the first slot is replayed past the snapshot slot. +func (s *MithrilState) ClearManifestEpochStakes() { + s.ManifestEpochStakes = nil +} + // getWriterCommit returns the writer commit, preferring the new field but falling back to legacy. func (s *MithrilState) getWriterCommit() string { if s.LastWriterCommit != "" {