From e844d0f0cffc5bcb15300c6245a9dcd558e9f475 Mon Sep 17 00:00:00 2001 From: juan mantica Date: Thu, 18 Dec 2025 11:29:21 -0600 Subject: [PATCH] Streaming optimizations --- pkg/accountsdb/appendvec.go | 29 ++++++++++-- pkg/accountsdb/bench_test.go | 73 ++++++++++++++++++++++++++++++ pkg/accountsdb/index.go | 12 +++-- pkg/snapshot/build_db.go | 13 +++++- pkg/snapshot/build_db_with_incr.go | 13 +++++- pkg/snapshot/snapshot.go | 2 +- 6 files changed, 130 insertions(+), 12 deletions(-) create mode 100644 pkg/accountsdb/bench_test.go diff --git a/pkg/accountsdb/appendvec.go b/pkg/accountsdb/appendvec.go index 372759e6..1234992a 100644 --- a/pkg/accountsdb/appendvec.go +++ b/pkg/accountsdb/appendvec.go @@ -30,7 +30,7 @@ const ( ) type appendVecParser struct { - Buf []byte + Reader io.Reader FileSize uint64 Offset uint64 @@ -40,21 +40,42 @@ type appendVecParser struct { func (parser *appendVecParser) ParseNextAcct(pk *solana.PublicKey, a *AccountIndexEntry) error { if parser.Offset+hdrLen > parser.FileSize { - return fmt.Errorf("overflow") + return io.EOF + } + + var hdr [hdrLen]byte + if _, err := io.ReadFull(parser.Reader, hdr[:]); err != nil { + if err == io.EOF { + return err + } + return fmt.Errorf("reading header: %w", err) } - dataLen := binary.LittleEndian.Uint64(parser.Buf[parser.Offset+dataLenOffset : parser.Offset+dataLenOffset+8]) + dataLen := binary.LittleEndian.Uint64(hdr[dataLenOffset : dataLenOffset+8]) - *pk = solana.PublicKeyFromBytes(parser.Buf[parser.Offset+pubkeyOffset : parser.Offset+pubkeyOffset+32]) + *pk = solana.PublicKeyFromBytes(hdr[pubkeyOffset : pubkeyOffset+32]) a.Slot = parser.Slot a.FileId = parser.FileId a.Offset = parser.Offset parser.Offset += hdrLen + alignedLen := util.AlignUp(dataLen, 8) + if parser.Offset+dataLen > parser.FileSize { return fmt.Errorf("overflow") } + if alignedLen > 0 { + if seeker, ok := parser.Reader.(io.Seeker); ok { + if _, err := seeker.Seek(int64(alignedLen), io.SeekCurrent); err != nil { + return fmt.Errorf("seeking data: %w", err) + } + } else { + if _, err := io.CopyN(io.Discard, parser.Reader, int64(alignedLen)); err != nil { + return fmt.Errorf("discarding data: %w", err) + } + } + } parser.Offset += util.AlignUp(dataLen, 8) diff --git a/pkg/accountsdb/bench_test.go b/pkg/accountsdb/bench_test.go new file mode 100644 index 00000000..1722ee22 --- /dev/null +++ b/pkg/accountsdb/bench_test.go @@ -0,0 +1,73 @@ +package accountsdb + +import ( + "crypto/rand" + "os" + "testing" + + "github.com/gagliardetto/solana-go" +) + +// BenchmarkBuildIndexEntries writes a real file to disk and parses it. +func BenchmarkBuildIndexEntries(b *testing.B) { + numAccounts := 1000 + dataSize := 100 * 1024 // 100 KB per account + tmpFile := "bench_appendvec.dat" + + // Create or truncate the file + f, err := os.Create(tmpFile) + if err != nil { + b.Fatal(err) + } + defer func() { + f.Close() + os.Remove(tmpFile) // clean up + }() + + // Write dummy accounts to the file + for i := 0; i < numAccounts; i++ { + var pk, owner solana.PublicKey + rand.Read(pk[:]) + rand.Read(owner[:]) + + acct := AppendVecAccount{ + WriteVersion: 1, + DataLen: uint64(dataSize), + Pubkey: pk, + Lamports: 100, + RentEpoch: 0, + Owner: owner, + Executable: false, + Data: make([]byte, dataSize), + } + + if err := acct.Marshal(f); err != nil { + b.Fatal(err) + } + } + + fileInfo, _ := f.Stat() + fileSize := uint64(fileInfo.Size()) + b.Logf("Benchmarking with file size: %.2f MB", float64(fileSize)/1024/1024) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + // Open the file for reading + fRead, err := os.Open(tmpFile) + // Open the file in bytes for testing + //fileBytes, err := os.ReadFile(tmpFile) + if err != nil { + b.Fatal(err) + } + + _, _, err = BuildIndexEntriesFromAppendVecs(fRead, fileSize, 100, 100) + if err != nil { + // fRead.Close() + b.Fatal(err) + } + + //fRead.Close() + } +} diff --git a/pkg/accountsdb/index.go b/pkg/accountsdb/index.go index b7b80f61..fa55ac58 100644 --- a/pkg/accountsdb/index.go +++ b/pkg/accountsdb/index.go @@ -3,6 +3,7 @@ package accountsdb import ( "encoding/binary" "fmt" + "io" "github.com/gagliardetto/solana-go" ) @@ -34,12 +35,12 @@ func unmarshalAcctIdxEntry(data []byte) (*AccountIndexEntry, error) { return out, nil } -func BuildIndexEntriesFromAppendVecs(data []byte, fileSize uint64, slot uint64, fileId uint64) ([]solana.PublicKey, []AccountIndexEntry, error) { +func BuildIndexEntriesFromAppendVecs(reader io.Reader, fileSize uint64, slot uint64, fileId uint64) ([]solana.PublicKey, []AccountIndexEntry, error) { pubkeys := make([]solana.PublicKey, 0, 20000) acctIdxEntries := make([]AccountIndexEntry, 0, 20000) var err error - parser := &appendVecParser{Buf: data, FileSize: fileSize, FileId: fileId, Slot: slot} + parser := &appendVecParser{Reader: reader, FileSize: fileSize, FileId: fileId, Slot: slot} for { pubkeys = append(pubkeys, solana.PublicKey{}) @@ -50,5 +51,10 @@ func BuildIndexEntriesFromAppendVecs(data []byte, fileSize uint64, slot uint64, } } - return pubkeys, acctIdxEntries, nil + if err != io.EOF { + return nil, nil, err + } + + // Remove the last empty entry added before loop break + return pubkeys[:len(pubkeys)-1], acctIdxEntries[:len(acctIdxEntries)-1], nil } diff --git a/pkg/snapshot/build_db.go b/pkg/snapshot/build_db.go index 2a2c07a4..66fc4361 100644 --- a/pkg/snapshot/build_db.go +++ b/pkg/snapshot/build_db.go @@ -105,7 +105,16 @@ func BuildAccountsDb( start := time.Now() defer wg.Done() task := i.(indexEntryBuilderTask) - pubkeys, entries, err := accountsdb.BuildIndexEntriesFromAppendVecs(task.Data, task.FileSize, task.Slot, task.FileId) + + f, err := os.Open(task.FilePath) + if err != nil { + mlog.Log.Errorf("failed to open appendvec file %s: %s", task.FilePath, err) + indexEntryBuilderInProgress.Add(-1) + return + } + defer f.Close() + + pubkeys, entries, err := accountsdb.BuildIndexEntriesFromAppendVecs(f, task.FileSize, task.Slot, task.FileId) if err != nil { mlog.Log.Errorf("%s\n", err) return @@ -187,7 +196,7 @@ func BuildAccountsDb( } appendVecCopyingInProgress.Add(-1) - nextTask := indexEntryBuilderTask{Data: appendVecBytes, FileSize: fileSize, Slot: slot, FileId: fileId} + nextTask := indexEntryBuilderTask{FilePath: cleanPath, FileSize: fileSize, Slot: slot, FileId: fileId} wg.Add(1) statsd.Timing(statsd.TasksAppendVecCopyingLatency, uint64(time.Since(start)), nil) err = indexEntryBuilderPool.Invoke(nextTask) diff --git a/pkg/snapshot/build_db_with_incr.go b/pkg/snapshot/build_db_with_incr.go index a4512c41..b39a6a1f 100644 --- a/pkg/snapshot/build_db_with_incr.go +++ b/pkg/snapshot/build_db_with_incr.go @@ -92,7 +92,16 @@ func BuildAccountsDbWithIncr( start := time.Now() defer wg.Done() task := i.(indexEntryBuilderTask) - pubkeys, entries, err := accountsdb.BuildIndexEntriesFromAppendVecs(task.Data, task.FileSize, task.Slot, task.FileId) + + f, err := os.Open(task.FilePath) + if err != nil { + mlog.Log.Errorf("failed to open appendvec file %s: %s", task.FilePath, err) + indexEntryBuilderInProgress.Add(-1) + return + } + defer f.Close() + + pubkeys, entries, err := accountsdb.BuildIndexEntriesFromAppendVecs(f, task.FileSize, task.Slot, task.FileId) if err != nil { mlog.Log.Errorf("%s\n", err) return @@ -181,7 +190,7 @@ func BuildAccountsDbWithIncr( } appendVecCopyingInProgress.Add(-1) - nextTask := indexEntryBuilderTask{Data: appendVecBytes, FileSize: fileSize, Slot: slot, FileId: fileId} + nextTask := indexEntryBuilderTask{FilePath: cleanPath, FileSize: fileSize, Slot: slot, FileId: fileId} wg.Add(1) statsd.Timing(statsd.TasksAppendVecCopyingLatency, uint64(time.Since(start)), nil) err = indexEntryBuilderPool.Invoke(nextTask) diff --git a/pkg/snapshot/snapshot.go b/pkg/snapshot/snapshot.go index b0d731fe..7de5bc0b 100644 --- a/pkg/snapshot/snapshot.go +++ b/pkg/snapshot/snapshot.go @@ -73,7 +73,7 @@ type appendVecCopyingTask struct { } type indexEntryBuilderTask struct { - Data []byte + FilePath string FileSize uint64 Slot uint64 FileId uint64