Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion cmd/mithril/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,8 @@ func runVerifyRange(c *cobra.Command, args []string) {
snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot)
}
mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0)
// Populate manifest seed data so replay doesn't need manifest at runtime
snapshot.PopulateManifestSeed(mithrilState, manifest)
if err := mithrilState.Save(accountsDbDir); err != nil {
mlog.Log.Errorf("failed to save state file: %v", err)
}
Expand Down Expand Up @@ -1175,6 +1177,8 @@ func runLive(c *cobra.Command, args []string) {
snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot)
}
mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0)
// Populate manifest seed data so replay doesn't need manifest at runtime
snapshot.PopulateManifestSeed(mithrilState, manifest)
if err := mithrilState.Save(accountsPath); err != nil {
mlog.Log.Errorf("failed to save state file: %v", err)
}
Expand Down Expand Up @@ -1243,6 +1247,8 @@ func runLive(c *cobra.Command, args []string) {
snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot)
}
mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0)
// Populate manifest seed data so replay doesn't need manifest at runtime
snapshot.PopulateManifestSeed(mithrilState, manifest)
if err := mithrilState.Save(accountsPath); err != nil {
mlog.Log.Errorf("failed to save state file: %v", err)
}
Expand Down Expand Up @@ -1299,6 +1305,8 @@ func runLive(c *cobra.Command, args []string) {
snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot)
}
mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0)
// Populate manifest seed data so replay doesn't need manifest at runtime
snapshot.PopulateManifestSeed(mithrilState, manifest)
if err := mithrilState.Save(accountsPath); err != nil {
mlog.Log.Errorf("failed to save state file: %v", err)
}
Expand Down Expand Up @@ -1369,6 +1377,8 @@ func runLive(c *cobra.Command, args []string) {
snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot)
}
mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0)
// Populate manifest seed data so replay doesn't need manifest at runtime
snapshot.PopulateManifestSeed(mithrilState, manifest)
if err := mithrilState.Save(accountsPath); err != nil {
mlog.Log.Errorf("failed to save state file: %v", err)
}
Expand Down Expand Up @@ -1471,6 +1481,8 @@ func runLive(c *cobra.Command, args []string) {
WriterVersion: getVersion(),
WriterCommit: getCommit(),
})
// Populate manifest seed data so replay doesn't need manifest at runtime
snapshot.PopulateManifestSeed(mithrilState, manifest)
if err := mithrilState.Save(accountsPath); err != nil {
mlog.Log.Errorf("failed to save state file: %v", err)
}
Expand Down Expand Up @@ -1582,6 +1594,8 @@ postBootstrap:
snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot)
}
mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0)
// Populate manifest seed data so replay doesn't need manifest at runtime
snapshot.PopulateManifestSeed(mithrilState, manifest)
if err := mithrilState.Save(accountsPath); err != nil {
mlog.Log.Errorf("failed to save state file: %v", err)
}
Expand Down Expand Up @@ -2604,6 +2618,6 @@ func runReplayWithRecovery(
}
}()

result = replay.ReplayBlocks(ctx, accountsDb, accountsDbPath, manifest, resumeState, startSlot, endSlot, rpcEndpoints, blockDir, txParallelism, isLive, useLightbringer, dbgOpts, metricsWriter, rpcServer, blockFetchOpts, onCancelWriteState)
result = replay.ReplayBlocks(ctx, accountsDb, accountsDbPath, mithrilState, resumeState, startSlot, endSlot, rpcEndpoints, blockDir, txParallelism, isLive, useLightbringer, dbgOpts, metricsWriter, rpcServer, blockFetchOpts, onCancelWriteState)
return result
}
44 changes: 26 additions & 18 deletions pkg/accountsdb/accountsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"path/filepath"
"runtime/trace"
"sync"
"time"
"sync/atomic"

"github.com/Overclock-Validator/mithril/pkg/accounts"
Expand Down Expand Up @@ -41,11 +42,6 @@ type AccountsDb struct {
storeRequestChan chan *list.Element
storeWorkerDone chan struct{}

// InRewardsWindow is set during partitioned epoch rewards distribution.
// When true, stake accounts are not cached in CommonAcctsCache since they're
// one-shot reads/writes that would evict genuinely hot accounts.
// Atomic for safe concurrent access from RPC goroutines.
InRewardsWindow atomic.Bool
}

type storeRequest struct {
Expand Down Expand Up @@ -123,6 +119,24 @@ func OpenDb(accountsDbDir string) (*AccountsDb, error) {
return accountsDb, nil
}

// DrainStoreQueue waits until all queued async store requests have completed.
// Unlike WaitForStoreWorker, this does NOT shut down the worker — it just
// spins until the in-progress list is empty.
func (accountsDb *AccountsDb) DrainStoreQueue() {
if !StoreAsync {
return
}
for {
accountsDb.inProgressStoreRequestsMu.Lock()
empty := accountsDb.inProgressStoreRequests.Len() == 0
accountsDb.inProgressStoreRequestsMu.Unlock()
if empty {
return
}
time.Sleep(time.Millisecond)
}
}

// Turns down the store worker. AccountsDb cannot accept writes after this if StoreAsync.
// Should not be called concurrently.
func (accountsDb *AccountsDb) WaitForStoreWorker() {
Expand Down Expand Up @@ -231,7 +245,7 @@ func (accountsDb *AccountsDb) getStoredAccount(slot uint64, pubkey solana.Public
return nil, ErrNoAccount
}

acctIdxEntry, err := unmarshalAcctIdxEntry(acctIdxEntryBytes)
acctIdxEntry, err := UnmarshalAcctIdxEntry(acctIdxEntryBytes)
if err != nil {
panic("failed to unmarshal AccountIndexEntry from index kv database")
}
Expand Down Expand Up @@ -268,9 +282,6 @@ func (accountsDb *AccountsDb) getStoredAccount(slot uint64, pubkey solana.Public
owner := solana.PublicKeyFromBytes(acct.Owner[:])
if owner == addresses.VoteProgramAddr {
accountsDb.VoteAcctCache.Set(pubkey, acct)
} else if owner == addresses.StakeProgramAddr && accountsDb.InRewardsWindow.Load() {
// During reward distribution, stake accounts are one-shot reads that would
// evict genuinely hot accounts from the cache. Skip caching them.
} else {
accountsDb.CommonAcctsCache.Set(pubkey, acct)
}
Expand Down Expand Up @@ -343,17 +354,13 @@ func (accountsDb *AccountsDb) storeAccountsSync(accts []*accounts.Account, slot
accountsDb.parallelStoreAccounts(StoreAccountsWorkers, accts, slot)
}

inRewardsWindow := accountsDb.InRewardsWindow.Load()
for _, acct := range accts {
if acct == nil {
continue
}
owner := solana.PublicKeyFromBytes(acct.Owner[:])
if owner == addresses.VoteProgramAddr {
accountsDb.VoteAcctCache.Set(acct.Key, acct)
} else if owner == addresses.StakeProgramAddr && inRewardsWindow {
// During reward distribution, stake accounts are one-shot writes that would
// evict genuinely hot accounts from the cache. Skip caching them.
} else {
accountsDb.CommonAcctsCache.Set(acct.Key, acct)
}
Expand All @@ -365,12 +372,13 @@ func (accountsDb *AccountsDb) storeWorker() {
for elt := range accountsDb.storeRequestChan {
sr := elt.Value.(storeRequest)
accountsDb.storeAccountsSync(sr.accts, sr.slot)
accountsDb.inProgressStoreRequestsMu.Lock()
accountsDb.inProgressStoreRequests.Remove(elt)
accountsDb.inProgressStoreRequestsMu.Unlock()
if sr.cb != nil {
sr.cb()
}
// Remove after callback so DrainStoreQueue waits for callbacks (e.g. index flush) to complete
accountsDb.inProgressStoreRequestsMu.Lock()
accountsDb.inProgressStoreRequests.Remove(elt)
accountsDb.inProgressStoreRequestsMu.Unlock()
}
}

Expand Down Expand Up @@ -407,7 +415,7 @@ func (accountsDb *AccountsDb) storeAccountsInternal(accts []*accounts.Account, s
// if not, then we write out a new appendvec.
existingacctIdxEntryBuf, c, err := accountsDb.Index.Get(acct.Key[:])
if err == nil {
acctIdxEntry, err := unmarshalAcctIdxEntry(existingacctIdxEntryBuf)
acctIdxEntry, err := UnmarshalAcctIdxEntry(existingacctIdxEntryBuf)
if err != nil {
panic("failed to unmarshal AccountIndexEntry from index kv database")
}
Expand Down Expand Up @@ -509,7 +517,7 @@ func (accountsDb *AccountsDb) parallelStoreAccounts(n int, accts []*accounts.Acc
if err != nil {
return fmt.Errorf("reading from index: %w", err)
}
existingIdxEntry, err := unmarshalAcctIdxEntry(existingacctIdxEntryBuf)
existingIdxEntry, err := UnmarshalAcctIdxEntry(existingacctIdxEntryBuf)
c.Close()
if err != nil {
return fmt.Errorf("unmarshaling index entry: %w", err)
Expand Down
71 changes: 63 additions & 8 deletions pkg/accountsdb/index.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package accountsdb

import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"os"

"github.com/Overclock-Validator/mithril/pkg/addresses"
"github.com/gagliardetto/solana-go"
Expand All @@ -27,23 +29,71 @@ func (entry *AccountIndexEntry) Unmarshal(in *[24]byte) {
entry.Offset = binary.LittleEndian.Uint64(in[16:24])
}

func unmarshalAcctIdxEntry(data []byte) (*AccountIndexEntry, error) {
func UnmarshalAcctIdxEntry(data []byte) (*AccountIndexEntry, error) {
if len(data) < 24 {
return nil, fmt.Errorf("unmarshalAcctIdxEntry: input had %d < 24 minimum bytes", len(data))
return nil, fmt.Errorf("UnmarshalAcctIdxEntry: input had %d < 24 minimum bytes", len(data))
}
out := &AccountIndexEntry{}
out.Unmarshal((*[24]byte)(data[:24]))
return out, nil
}

// StakeIndexEntry stores a stake account pubkey with its appendvec location hint.
// The location (FileId, Offset) is used for sorting to achieve sequential I/O.
// It may be stale; actual reads still go through Pebble for the canonical location.
type StakeIndexEntry struct {
Pubkey solana.PublicKey
FileId uint64
Offset uint64
}

// StakeIndexMagic is the magic header for stake pubkey index files.
var StakeIndexMagic = [4]byte{'S', 'T', 'K', 'I'}

const StakeIndexVersion = uint32(2)
const StakeIndexRecordSize = 48 // 32-byte pubkey + 8-byte fileId + 8-byte offset

// WriteStakePubkeyIndex writes stake index entries.
// Format: 8-byte header ("STKI" + version uint32 LE) + N × 48-byte records.
func WriteStakePubkeyIndex(path string, entries []StakeIndexEntry) error {
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()

buf := bufio.NewWriterSize(f, 1<<20)

// Write header
var header [8]byte
copy(header[0:4], StakeIndexMagic[:])
binary.LittleEndian.PutUint32(header[4:8], StakeIndexVersion)
if _, err := buf.Write(header[:]); err != nil {
return fmt.Errorf("writing stake index header: %w", err)
}

// Write records
var record [StakeIndexRecordSize]byte
for _, e := range entries {
copy(record[0:32], e.Pubkey[:])
binary.LittleEndian.PutUint64(record[32:40], e.FileId)
binary.LittleEndian.PutUint64(record[40:48], e.Offset)
if _, err := buf.Write(record[:]); err != nil {
return fmt.Errorf("writing stake index record: %w", err)
}
}

return buf.Flush()
}

// BuildIndexEntriesFromAppendVecs parses an appendvec and returns:
// - pubkeys: all account pubkeys
// - acctIdxEntries: index entries for each account
// - stakePubkeys: pubkeys of accounts owned by the stake program
func BuildIndexEntriesFromAppendVecs(data []byte, fileSize uint64, slot uint64, fileId uint64) ([]solana.PublicKey, []AccountIndexEntry, []solana.PublicKey, error) {
// - stakeEntries: stake account pubkeys with their appendvec location hints
func BuildIndexEntriesFromAppendVecs(data []byte, fileSize uint64, slot uint64, fileId uint64) ([]solana.PublicKey, []AccountIndexEntry, []StakeIndexEntry, error) {
pubkeys := make([]solana.PublicKey, 0, 20000)
acctIdxEntries := make([]AccountIndexEntry, 0, 20000)
stakePubkeys := make([]solana.PublicKey, 0, 1000)
stakeEntries := make([]StakeIndexEntry, 0, 1000)
var err error

parser := &appendVecParser{Buf: data, FileSize: fileSize, FileId: fileId, Slot: slot}
Expand All @@ -58,11 +108,16 @@ func BuildIndexEntriesFromAppendVecs(data []byte, fileSize uint64, slot uint64,
acctIdxEntries = acctIdxEntries[:len(acctIdxEntries)-1]
break
}
// Collect stake account pubkeys for building stake index
// Collect stake account entries with appendvec location hints
if bytes.Equal(owner[:], addresses.StakeProgramAddr[:]) {
stakePubkeys = append(stakePubkeys, pubkeys[len(pubkeys)-1])
idx := len(acctIdxEntries) - 1
stakeEntries = append(stakeEntries, StakeIndexEntry{
Pubkey: pubkeys[len(pubkeys)-1],
FileId: acctIdxEntries[idx].FileId,
Offset: acctIdxEntries[idx].Offset,
})
}
}

return pubkeys, acctIdxEntries, stakePubkeys, nil
return pubkeys, acctIdxEntries, stakeEntries, nil
}
Loading