diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 1da0ea079..5fcdbc216 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -13,7 +13,7 @@ jobs: steps: - uses: actions/setup-go@v4 with: - go-version: "1.19" + go-version: "1.20" - uses: actions/checkout@v3 - run: go mod download shell: bash @@ -27,7 +27,7 @@ jobs: steps: - uses: actions/setup-go@v4 with: - go-version: "1.19" + go-version: "1.20" - uses: actions/checkout@v3 - name: test VM run: go test -v ./vm/... diff --git a/vm/block.go b/vm/block.go index 771687cb2..500d20cc3 100644 --- a/vm/block.go +++ b/vm/block.go @@ -2,6 +2,7 @@ package vm import ( "context" + "fmt" "time" "github.com/ava-labs/avalanchego/ids" @@ -11,85 +12,103 @@ import ( ) var ( - _ snowman.Block = &Block{} + _ choices.Decidable = (*Block)(nil) + _ snowman.Block = (*Block)(nil) ) -// Block implements the snowman.Block interface type Block struct { - id ids.ID - tmBlock *types.Block - vm *VM - status choices.Status + *types.Block + st choices.Status + vm *VM } -// newBlock returns a new Block wrapping the Tendermint Block type and implementing the snowman.Block interface -func (vm *VM) newBlock(tmBlock *types.Block) (*Block, error) { - var id ids.ID - copy(id[:], tmBlock.Hash()) - - return &Block{ - id: id, - tmBlock: tmBlock, - vm: vm, - }, nil -} - -func (b *Block) ID() ids.ID { - return b.id +func NewBlock(vm *VM, block *types.Block, st choices.Status) *Block { + return &Block{Block: block, vm: vm, st: st} } -func (b *Block) Accept(ctx context.Context) error { - b.SetStatus(choices.Accepted) - return b.vm.applyBlock(b) +// ID returns a unique ID for this element. +// +// Typically, this is implemented by using a cryptographic hash of a +// binary representation of this element. An element should return the same +// IDs upon repeated calls. +func (block *Block) ID() ids.ID { + return ids.ID(block.Hash()) } -func (b *Block) Reject(ctx context.Context) error { - b.SetStatus(choices.Rejected) - - return nil +// Accept this element. +// +// This element will be accepted by every correct node in the network. +func (block *Block) Accept(context.Context) error { + block.vm.log.Debug("try to accept block", "block", block.ID()) + block.st = choices.Accepted + delete(block.vm.verifiedBlocks, block.ID()) + return block.vm.applyBlock(block) } -func (b *Block) SetStatus(status choices.Status) { - b.status = status +// Reject this element. +// +// This element will not be accepted by any correct node in the network. +func (block *Block) Reject(context.Context) error { + block.vm.log.Debug("try to reject block", "block", block.ID()) + block.st = choices.Rejected + panic("implement me") } -func (b *Block) Status() choices.Status { - return b.status +// Status returns this element's current status. +// +// If Accept has been called on an element with this ID, Accepted should be +// returned. Similarly, if Reject has been called on an element with this +// ID, Rejected should be returned. If the contents of this element are +// unknown, then Unknown should be returned. Otherwise, Processing should be +// returned. +// +// TODO: Consider allowing Status to return an error. +func (block *Block) Status() choices.Status { + return block.st } -func (b *Block) Parent() ids.ID { - var id ids.ID - parentHash := b.tmBlock.Header.LastBlockID.Hash - copy(id[:], parentHash) - - return id +// Parent returns the ID of this block's parent. +func (block *Block) Parent() ids.ID { + return ids.ID(block.LastBlockID.Hash) } -func (b *Block) Verify(context.Context) error { - if b == nil || b.tmBlock == nil { - return errInvalidBlock - } - - return b.tmBlock.ValidateBasic() +// Verify that the state transition this block would make if accepted is +// valid. If the state transition is invalid, a non-nil error should be +// returned. +// +// It is guaranteed that the Parent has been successfully verified. +// +// If nil is returned, it is guaranteed that either Accept or Reject will be +// called on this block, unless the VM is shut down. +func (block *Block) Verify(context.Context) error { + return block.ValidateBasic() } -func (b *Block) Bytes() []byte { - block, err := b.tmBlock.ToProto() +// Bytes returns the binary representation of this block. +// +// This is used for sending blocks to peers. The bytes should be able to be +// parsed into the same block on another node. +func (block *Block) Bytes() []byte { + b, err := block.ToProto() if err != nil { - panic(err) + panic(fmt.Sprintf("can't convert block to proto obj: %s", err)) } - data, err := block.Marshal() + data, err := b.Marshal() if err != nil { - panic(err) + panic(fmt.Sprintf("can't serialize block: %s", err)) } - - return data + return append([]byte{uint8(block.st)}, data...) } -func (b *Block) Height() uint64 { - return uint64(b.tmBlock.Height) +// Height returns the height of this block in the chain. +func (block *Block) Height() uint64 { + return uint64(block.Block.Height) } -func (b *Block) Timestamp() time.Time { - return b.tmBlock.Time +// Time this block was proposed at. This value should be consistent across +// all nodes. If this block hasn't been successfully verified, any value can +// be returned. If this block is the last accepted block, the timestamp must +// be returned correctly. Otherwise, accepted blocks can return any value. +func (block *Block) Timestamp() time.Time { + return block.Block.Time } diff --git a/vm/cmd/main.go b/vm/cmd/main.go index e0f8b9743..b511ea15f 100644 --- a/vm/cmd/main.go +++ b/vm/cmd/main.go @@ -3,23 +3,24 @@ package main import ( "context" "fmt" - "github.com/consideritdone/landslidecore/abci/example/counter" - landslideCoreVM "github.com/consideritdone/landslidecore/vm" "os" + "github.com/consideritdone/landslidecore/abci/example/counter" + "github.com/consideritdone/landslidecore/vm" + "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/ulimit" "github.com/ava-labs/avalanchego/vms/rpcchainvm" ) func main() { - if err := ulimit.Set(ulimit.DefaultFDLimit, logging.NoLog{}); err != nil { fmt.Printf("failed to set fd limit correctly due to: %s", err) os.Exit(1) } - vm := landslideCoreVM.NewVM(counter.NewApplication(true)) - - rpcchainvm.Serve(context.Background(), vm) + rpcchainvm.Serve( + context.Background(), + vm.New(vm.LocalAppCreator(counter.NewApplication(true))), + ) } diff --git a/vm/database.go b/vm/database.go index 07b62ff18..d136c1c8b 100644 --- a/vm/database.go +++ b/vm/database.go @@ -6,7 +6,7 @@ import ( ) var ( - _ dbm.DB = &Database{} + _ dbm.DB = (*Database)(nil) ) type ( @@ -24,6 +24,20 @@ type ( } ) +func NewDB(db database.Database) *Database { + return &Database{ + Database: db, + } +} + +func (db Database) Close() error { + return db.Database.Close() +} + +func (db Database) Has(key []byte) (bool, error) { + return db.Database.Has(key) +} + func (db Database) Get(key []byte) ([]byte, error) { res, err := db.Database.Get(key) if err != nil { @@ -43,6 +57,10 @@ func (db Database) SetSync(key []byte, value []byte) error { return db.Database.Put(key, value) } +func (db Database) Delete(key []byte) error { + return db.Database.Delete(key) +} + func (db Database) DeleteSync(key []byte) error { return db.Database.Delete(key) } diff --git a/vm/block_utils.go b/vm/funcs.go similarity index 64% rename from vm/block_utils.go rename to vm/funcs.go index 49723682a..b48a27057 100644 --- a/vm/block_utils.go +++ b/vm/funcs.go @@ -5,34 +5,51 @@ import ( "fmt" "time" - "github.com/consideritdone/landslidecore/crypto" - "github.com/consideritdone/landslidecore/state" - "github.com/consideritdone/landslidecore/types" - abci "github.com/consideritdone/landslidecore/abci/types" + "github.com/consideritdone/landslidecore/crypto" "github.com/consideritdone/landslidecore/libs/log" + tmmath "github.com/consideritdone/landslidecore/libs/math" mempl "github.com/consideritdone/landslidecore/mempool" + "github.com/consideritdone/landslidecore/node" tmstate "github.com/consideritdone/landslidecore/proto/tendermint/state" "github.com/consideritdone/landslidecore/proxy" + "github.com/consideritdone/landslidecore/rpc/client" + coretypes "github.com/consideritdone/landslidecore/rpc/core/types" + "github.com/consideritdone/landslidecore/state" + "github.com/consideritdone/landslidecore/store" + "github.com/consideritdone/landslidecore/types" ) -func makeCommitMock(height int64, timestamp time.Time) *types.Commit { - var commitSig []types.CommitSig = nil - if height != 1 { - commitSig = []types.CommitSig{{Timestamp: time.Now()}} - } - return types.NewCommit( - height, - 0, - types.BlockID{ - Hash: []byte(""), - PartSetHeader: types.PartSetHeader{ - Hash: []byte(""), - Total: 1, - }, +var ( + // see README + defaultPerPage = 30 + maxPerPage = 100 +) + +func NewLocalGenesisDocProvider(data []byte) node.GenesisDocProvider { + return func() (*types.GenesisDoc, error) { + return types.GenesisDocFromJSON(data) + } +} + +func makeCommit(height int64, timestamp time.Time) *types.Commit { + commitSig := []types.CommitSig(nil) + if height > 1 { + commitSig = []types.CommitSig{{ + BlockIDFlag: types.BlockIDFlagNil, + Timestamp: time.Now(), + ValidatorAddress: proposerAddress, + Signature: []byte{0x0}, + }} + } + blockID := types.BlockID{ + Hash: []byte(""), + PartSetHeader: types.PartSetHeader{ + Hash: []byte(""), + Total: 1, }, - commitSig, - ) + } + return types.NewCommit(height, 0, blockID, commitSig) } func validateBlock(state state.State, block *types.Block) error { @@ -190,24 +207,6 @@ func ABCIResponsesResultsHash(ar *tmstate.ABCIResponses) []byte { return types.NewResults(ar.DeliverTxs).Hash() } -func updateState( - st state.State, - blockID types.BlockID, - header *types.Header, - abciResponses *tmstate.ABCIResponses, -) (state.State, error) { - return state.State{ - Version: st.Version, - ChainID: st.ChainID, - InitialHeight: st.InitialHeight, - LastBlockHeight: header.Height, - LastBlockID: blockID, - LastBlockTime: header.Time, - LastResultsHash: ABCIResponsesResultsHash(abciResponses), - AppHash: nil, - }, nil -} - // TxPreCheck returns a function to filter transactions before processing. // The function limits the size of a transaction to the block's maximum data size. func TxPreCheck(state state.State) mempl.PreCheckFunc { @@ -269,3 +268,118 @@ func fireEvents( } } } + +// bsHeight can be either latest committed or uncommitted (+1) height. +func getHeight(bs *store.BlockStore, heightPtr *int64) (int64, error) { + bsHeight := bs.Height() + bsBase := bs.Base() + if heightPtr != nil { + height := *heightPtr + if height <= 0 { + return 0, fmt.Errorf("height must be greater than 0, but got %d", height) + } + if height > bsHeight { + return 0, fmt.Errorf("height %d must be less than or equal to the current blockchain height %d", height, bsHeight) + } + if height < bsBase { + return 0, fmt.Errorf("height %d is not available, lowest height is %d", height, bsBase) + } + return height, nil + } + return bsHeight, nil +} + +func validatePage(pagePtr *int, perPage, totalCount int) (int, error) { + if perPage < 1 { + panic(fmt.Sprintf("zero or negative perPage: %d", perPage)) + } + + if pagePtr == nil { // no page parameter + return 1, nil + } + + pages := ((totalCount - 1) / perPage) + 1 + if pages == 0 { + pages = 1 // one page (even if it's empty) + } + page := *pagePtr + if page <= 0 || page > pages { + return 1, fmt.Errorf("page should be within [1, %d] range, given %d", pages, page) + } + + return page, nil +} + +func validatePerPage(perPagePtr *int) int { + if perPagePtr == nil { // no per_page parameter + return defaultPerPage + } + + perPage := *perPagePtr + if perPage < 1 { + return defaultPerPage + } else if perPage > maxPerPage { + return maxPerPage + } + return perPage +} + +func validateSkipCount(page, perPage int) int { + skipCount := (page - 1) * perPage + if skipCount < 0 { + return 0 + } + return skipCount +} + +// filterMinMax returns error if either min or max are negative or min > max +// if 0, use blockstore base for min, latest block height for max +// enforce limit. +func filterMinMax(base, height, min, max, limit int64) (int64, int64, error) { + // filter negatives + if min < 0 || max < 0 { + return min, max, fmt.Errorf("heights must be non-negative") + } + + // adjust for default values + if min == 0 { + min = 1 + } + if max == 0 { + max = height + } + + // limit max to the height + max = tmmath.MinInt64(height, max) + + // limit min to the base + min = tmmath.MaxInt64(base, min) + + // limit min to within `limit` of max + // so the total number of blocks returned will be `limit` + min = tmmath.MaxInt64(min, max-limit+1) + + if min > max { + return min, max, fmt.Errorf("min height %d can't be greater than max height %d", min, max) + } + return min, max, nil +} + +func WaitForHeight(c Service, h int64, waiter client.Waiter) error { + if waiter == nil { + waiter = client.DefaultWaitStrategy + } + delta := int64(1) + for delta > 0 { + r := new(coretypes.ResultStatus) + if err := c.Status(nil, nil, r); err != nil { + return err + } + delta = h - r.SyncInfo.LatestBlockHeight + // wait for the time, or abort early + if err := waiter(delta); err != nil { + return err + } + } + return nil +} diff --git a/vm/service.go b/vm/service.go index 7caa1bcbd..aca8dc3e6 100644 --- a/vm/service.go +++ b/vm/service.go @@ -85,8 +85,8 @@ type ( } TxArgs struct { - Hash []byte `json:"hash"` - Prove bool `json:"prove"` + Hash tmbytes.HexBytes `json:"hash"` + Prove bool `json:"prove"` } TxSearchArgs struct { @@ -170,7 +170,7 @@ func NewService(vm *VM) Service { } func (s *LocalService) ABCIInfo(_ *http.Request, _ *struct{}, reply *ctypes.ResultABCIInfo) error { - resInfo, err := s.vm.proxyApp.Query().InfoSync(proxy.RequestInfo) + resInfo, err := s.vm.app.Query().InfoSync(proxy.RequestInfo) if err != nil { return err } @@ -187,7 +187,7 @@ func (s *LocalService) ABCIQueryWithOptions( args *ABCIQueryWithOptionsArgs, reply *ctypes.ResultABCIQuery, ) error { - resQuery, err := s.vm.proxyApp.Query().QuerySync(abci.RequestQuery{ + resQuery, err := s.vm.app.Query().QuerySync(abci.RequestQuery{ Path: args.Path, Data: args.Data, Height: args.Opts.Height, @@ -215,13 +215,13 @@ func (s *LocalService) BroadcastTxCommit( deliverTxSub, err := s.vm.eventBus.Subscribe(subCtx, subscriber, q) if err != nil { err = fmt.Errorf("failed to subscribe to tx: %w", err) - s.vm.tmLogger.Error("Error on broadcast_tx_commit", "err", err) + s.vm.log.Error("Error on broadcast_tx_commit", "err", err) return err } defer func() { if err := s.vm.eventBus.Unsubscribe(context.Background(), subscriber, q); err != nil { - s.vm.tmLogger.Error("Error unsubscribing from eventBus", "err", err) + s.vm.log.Error("Error unsubscribing from eventBus", "err", err) } }() @@ -231,7 +231,7 @@ func (s *LocalService) BroadcastTxCommit( checkTxResCh <- res }, mempl.TxInfo{}) if err != nil { - s.vm.tmLogger.Error("Error on broadcastTxCommit", "err", err) + s.vm.log.Error("Error on broadcastTxCommit", "err", err) return fmt.Errorf("error on broadcastTxCommit: %v", err) } checkTxResMsg := <-checkTxResCh @@ -264,12 +264,12 @@ func (s *LocalService) BroadcastTxCommit( reason = deliverTxSub.Err().Error() } err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason) - s.vm.tmLogger.Error("Error on broadcastTxCommit", "err", err) + s.vm.log.Error("Error on broadcastTxCommit", "err", err) return err // TODO: use config for timeout case <-time.After(10 * time.Second): err = errors.New("timed out waiting for tx to be included in a block") - s.vm.tmLogger.Error("Error on broadcastTxCommit", "err", err) + s.vm.log.Error("Error on broadcastTxCommit", "err", err) return err } } @@ -290,7 +290,7 @@ func (s *LocalService) BroadcastTxAsync( func (s *LocalService) BroadcastTxSync(_ *http.Request, args *BroadcastTxArgs, reply *ctypes.ResultBroadcastTx) error { resCh := make(chan *abci.Response, 1) err := s.vm.mempool.CheckTx(args.Tx, func(res *abci.Response) { - s.vm.tmLogger.With("module", "service").Debug("handled response from checkTx") + s.vm.log.With("module", "service").Debug("handled response from checkTx") resCh <- res }, mempl.TxInfo{}) if err != nil { @@ -404,10 +404,12 @@ func (s *LocalService) Validators(_ *http.Request, args *ValidatorsArgs, reply * } func (s *LocalService) Tx(_ *http.Request, args *TxArgs, reply *ctypes.ResultTx) error { + s.vm.log.Debug("query tx", "hash", args.Hash) r, err := s.vm.txIndexer.Get(args.Hash) if err != nil { return err } + s.vm.log.Debug("query tx", "r", args.Hash) if r == nil { return fmt.Errorf("tx (%X) not found", args.Hash) @@ -584,7 +586,7 @@ func (s *LocalService) BlockchainInfo( if err != nil { return err } - s.vm.tmLogger.Debug("BlockchainInfoHandler", "maxHeight", args.MaxHeight, "minHeight", args.MinHeight) + s.vm.log.Debug("BlockchainInfoHandler", "maxHeight", args.MaxHeight, "minHeight", args.MinHeight) var blockMetas []*types.BlockMeta for height := args.MaxHeight; height >= args.MinHeight; height-- { @@ -598,32 +600,28 @@ func (s *LocalService) BlockchainInfo( } func (s *LocalService) Genesis(_ *http.Request, _ *struct{}, reply *ctypes.ResultGenesis) error { - if len(s.vm.genChunks) > 1 { - return errors.New("genesis response is large, please use the genesis_chunked API instead") - } - reply.Genesis = s.vm.genesis return nil } func (s *LocalService) GenesisChunked(_ *http.Request, args *GenesisChunkedArgs, reply *ctypes.ResultGenesisChunk) error { - if s.vm.genChunks == nil { - return fmt.Errorf("service configuration error, genesis chunks are not initialized") - } - - if len(s.vm.genChunks) == 0 { - return fmt.Errorf("service configuration error, there are no chunks") - } - - id := int(args.Chunk) - - if id > len(s.vm.genChunks)-1 { - return fmt.Errorf("there are %d chunks, %d is invalid", len(s.vm.genChunks)-1, id) - } - - reply.TotalChunks = len(s.vm.genChunks) - reply.ChunkNumber = id - reply.Data = s.vm.genChunks[id] + //if s.vm.genChunks == nil { + // return fmt.Errorf("service configuration error, genesis chunks are not initialized") + //} + // + //if len(s.vm.genChunks) == 0 { + // return fmt.Errorf("service configuration error, there are no chunks") + //} + // + //id := int(args.Chunk) + // + //if id > len(s.vm.genChunks)-1 { + // return fmt.Errorf("there are %d chunks, %d is invalid", len(s.vm.genChunks)-1, id) + //} + // + //reply.TotalChunks = len(s.vm.genChunks) + //reply.ChunkNumber = id + //reply.Data = s.vm.genChunks[id] return nil } @@ -659,8 +657,8 @@ func (s *LocalService) Status(_ *http.Request, _ *struct{}, reply *ctypes.Result } reply.NodeInfo = p2p.DefaultNodeInfo{ - DefaultNodeID: p2p.ID(s.vm.ctx.NodeID.String()), - Network: fmt.Sprintf("%d", s.vm.ctx.NetworkID), + DefaultNodeID: p2p.ID(s.vm.chainCtx.NodeID.String()), + Network: fmt.Sprintf("%d", s.vm.chainCtx.NetworkID), } reply.SyncInfo = ctypes.SyncInfo{ LatestBlockHash: latestBlockHash, @@ -718,7 +716,7 @@ func (s *LocalService) NumUnconfirmedTxs(_ *http.Request, _ *struct{}, reply *ct } func (s *LocalService) CheckTx(_ *http.Request, args *CheckTxArgs, reply *ctypes.ResultCheckTx) error { - res, err := s.vm.proxyApp.Mempool().CheckTxSync(abci.RequestCheckTx{Tx: args.Tx}) + res, err := s.vm.app.Mempool().CheckTxSync(abci.RequestCheckTx{Tx: args.Tx}) if err != nil { return err } diff --git a/vm/service_test.go b/vm/service_test.go index 1eb747ef4..6b1651c2d 100644 --- a/vm/service_test.go +++ b/vm/service_test.go @@ -2,6 +2,7 @@ package vm import ( "context" + "fmt" "testing" "time" @@ -19,8 +20,8 @@ func TestABCIService(t *testing.T) { reply := new(ctypes.ResultABCIInfo) assert.NoError(t, service.ABCIInfo(nil, nil, reply)) assert.Equal(t, uint64(1), reply.Response.AppVersion) - assert.Equal(t, int64(0), reply.Response.LastBlockHeight) - assert.Equal(t, []uint8([]byte(nil)), reply.Response.LastBlockAppHash) + assert.Equal(t, int64(1), reply.Response.LastBlockHeight) + assert.Equal(t, []uint8([]byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}), reply.Response.LastBlockAppHash) t.Logf("%+v", reply) }) @@ -117,7 +118,7 @@ func TestHistoryService(t *testing.T) { t.Run("BlockchainInfo", func(t *testing.T) { reply := new(ctypes.ResultBlockchainInfo) assert.NoError(t, service.BlockchainInfo(nil, &BlockchainInfoArgs{1, 100}, reply)) - assert.Equal(t, int64(1), reply.LastHeight) + assert.Equal(t, int64(2), reply.LastHeight) }) t.Run("Genesis", func(t *testing.T) { @@ -148,7 +149,7 @@ func TestNetworkService(t *testing.T) { t.Run("ConsensusParams", func(t *testing.T) { reply := new(ctypes.ResultConsensusParams) assert.NoError(t, service.ConsensusParams(nil, nil, reply)) - assert.Equal(t, int64(0), reply.BlockHeight) + assert.Equal(t, int64(1), reply.BlockHeight) txReply := new(ctypes.ResultBroadcastTx) assert.NoError(t, service.BroadcastTxSync(nil, &BroadcastTxArgs{Tx: []byte{0x00}}, txReply)) @@ -160,7 +161,7 @@ func TestNetworkService(t *testing.T) { assert.NoError(t, blk.Accept(context.Background())) assert.NoError(t, service.ConsensusParams(nil, nil, reply)) - assert.Equal(t, int64(1), reply.BlockHeight) + assert.Equal(t, int64(2), reply.BlockHeight) }) t.Run("Health", func(t *testing.T) { @@ -239,17 +240,17 @@ func TestSignService(t *testing.T) { assert.EqualValues(t, tx, reply.Tx) }) - //t.Run("TxSearch", func(t *testing.T) { - // reply := new(ctypes.ResultTxSearch) - // assert.NoError(t, service.TxSearch(nil, &TxSearchArgs{Query: "tx.height>0"}, reply)) - // assert.True(t, len(reply.Txs) > 0) - //}) - - //t.Run("BlockSearch", func(t *testing.T) { - // reply := new(ctypes.ResultBlockSearch) - // assert.NoError(t, service.BlockSearch(nil, &BlockSearchArgs{Query: "block.height>0"}, reply)) - // assert.True(t, len(reply.Blocks) > 0) - //}) + t.Run("TxSearch", func(t *testing.T) { + reply := new(ctypes.ResultTxSearch) + assert.NoError(t, service.TxSearch(nil, &TxSearchArgs{Query: fmt.Sprintf("tx.hash='%s'", txReply.Hash)}, reply)) + assert.True(t, len(reply.Txs) > 0) + }) + + t.Run("BlockSearch", func(t *testing.T) { + reply := new(ctypes.ResultBlockSearch) + assert.NoError(t, service.BlockSearch(nil, &BlockSearchArgs{Query: "block.height=2"}, reply)) + assert.True(t, len(reply.Blocks) > 0) + }) } func TestStatusService(t *testing.T) { @@ -270,7 +271,7 @@ func TestStatusService(t *testing.T) { t.Run("Status", func(t *testing.T) { reply1 := new(ctypes.ResultStatus) assert.NoError(t, service.Status(nil, nil, reply1)) - assert.Equal(t, int64(0), reply1.SyncInfo.LatestBlockHeight) + assert.Equal(t, int64(1), reply1.SyncInfo.LatestBlockHeight) blk, err := vm.BuildBlock(context.Background()) assert.NoError(t, err) @@ -279,7 +280,7 @@ func TestStatusService(t *testing.T) { reply2 := new(ctypes.ResultStatus) assert.NoError(t, service.Status(nil, nil, reply2)) - assert.Equal(t, int64(1), reply2.SyncInfo.LatestBlockHeight) + assert.Equal(t, int64(2), reply2.SyncInfo.LatestBlockHeight) }) } diff --git a/vm/service_utils.go b/vm/service_utils.go deleted file mode 100644 index 502c9e681..000000000 --- a/vm/service_utils.go +++ /dev/null @@ -1,131 +0,0 @@ -package vm - -import ( - "fmt" - - tmmath "github.com/consideritdone/landslidecore/libs/math" - "github.com/consideritdone/landslidecore/rpc/client" - coretypes "github.com/consideritdone/landslidecore/rpc/core/types" - "github.com/consideritdone/landslidecore/store" -) - -var ( - // see README - defaultPerPage = 30 - maxPerPage = 100 -) - -// bsHeight can be either latest committed or uncommitted (+1) height. -func getHeight(bs *store.BlockStore, heightPtr *int64) (int64, error) { - bsHeight := bs.Height() - bsBase := bs.Base() - if heightPtr != nil { - height := *heightPtr - if height <= 0 { - return 0, fmt.Errorf("height must be greater than 0, but got %d", height) - } - if height > bsHeight { - return 0, fmt.Errorf("height %d must be less than or equal to the current blockchain height %d", height, bsHeight) - } - if height < bsBase { - return 0, fmt.Errorf("height %d is not available, lowest height is %d", height, bsBase) - } - return height, nil - } - return bsHeight, nil -} - -func validatePage(pagePtr *int, perPage, totalCount int) (int, error) { - if perPage < 1 { - panic(fmt.Sprintf("zero or negative perPage: %d", perPage)) - } - - if pagePtr == nil { // no page parameter - return 1, nil - } - - pages := ((totalCount - 1) / perPage) + 1 - if pages == 0 { - pages = 1 // one page (even if it's empty) - } - page := *pagePtr - if page <= 0 || page > pages { - return 1, fmt.Errorf("page should be within [1, %d] range, given %d", pages, page) - } - - return page, nil -} - -func validatePerPage(perPagePtr *int) int { - if perPagePtr == nil { // no per_page parameter - return defaultPerPage - } - - perPage := *perPagePtr - if perPage < 1 { - return defaultPerPage - } else if perPage > maxPerPage { - return maxPerPage - } - return perPage -} - -func validateSkipCount(page, perPage int) int { - skipCount := (page - 1) * perPage - if skipCount < 0 { - return 0 - } - return skipCount -} - -// filterMinMax returns error if either min or max are negative or min > max -// if 0, use blockstore base for min, latest block height for max -// enforce limit. -func filterMinMax(base, height, min, max, limit int64) (int64, int64, error) { - // filter negatives - if min < 0 || max < 0 { - return min, max, fmt.Errorf("heights must be non-negative") - } - - // adjust for default values - if min == 0 { - min = 1 - } - if max == 0 { - max = height - } - - // limit max to the height - max = tmmath.MinInt64(height, max) - - // limit min to the base - min = tmmath.MaxInt64(base, min) - - // limit min to within `limit` of max - // so the total number of blocks returned will be `limit` - min = tmmath.MaxInt64(min, max-limit+1) - - if min > max { - return min, max, fmt.Errorf("min height %d can't be greater than max height %d", min, max) - } - return min, max, nil -} - -func WaitForHeight(c Service, h int64, waiter client.Waiter) error { - if waiter == nil { - waiter = client.DefaultWaitStrategy - } - delta := int64(1) - for delta > 0 { - r := new(coretypes.ResultStatus) - if err := c.Status(nil, nil, r); err != nil { - return err - } - delta = h - r.SyncInfo.LatestBlockHeight - // wait for the time, or abort early - if err := waiter(delta); err != nil { - return err - } - } - return nil -} diff --git a/vm/vm.go b/vm/vm.go index a07226755..3e5b9140b 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -2,14 +2,15 @@ package vm import ( "context" - "encoding/base64" "errors" "fmt" "net/http" "time" + "github.com/gorilla/rpc/v2" + + "github.com/ava-labs/avalanchego/api/health" "github.com/ava-labs/avalanchego/api/metrics" - "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/manager" "github.com/ava-labs/avalanchego/database/prefixdb" "github.com/ava-labs/avalanchego/ids" @@ -18,18 +19,15 @@ import ( "github.com/ava-labs/avalanchego/snow/consensus/snowman" "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils" "github.com/ava-labs/avalanchego/utils/json" - "github.com/ava-labs/avalanchego/utils/timer/mockable" "github.com/ava-labs/avalanchego/version" - "github.com/ava-labs/avalanchego/vms/components/chain" - "github.com/gorilla/rpc/v2" - "github.com/prometheus/client_golang/prometheus" - dbm "github.com/tendermint/tm-db" abciTypes "github.com/consideritdone/landslidecore/abci/types" "github.com/consideritdone/landslidecore/config" - cs "github.com/consideritdone/landslidecore/consensus" - tmjson "github.com/consideritdone/landslidecore/libs/json" + "github.com/consideritdone/landslidecore/consensus" + "github.com/consideritdone/landslidecore/crypto/tmhash" "github.com/consideritdone/landslidecore/libs/log" mempl "github.com/consideritdone/landslidecore/mempool" "github.com/consideritdone/landslidecore/node" @@ -37,7 +35,7 @@ import ( "github.com/consideritdone/landslidecore/proxy" rpccore "github.com/consideritdone/landslidecore/rpc/core" rpcserver "github.com/consideritdone/landslidecore/rpc/jsonrpc/server" - sm "github.com/consideritdone/landslidecore/state" + "github.com/consideritdone/landslidecore/state" "github.com/consideritdone/landslidecore/state/indexer" blockidxkv "github.com/consideritdone/landslidecore/state/indexer/block/kv" "github.com/consideritdone/landslidecore/state/txindex" @@ -46,16 +44,6 @@ import ( "github.com/consideritdone/landslidecore/types" ) -var ( - _ block.ChainVM = &VM{} - - Version = &version.Semantic{ - Major: 0, - Minor: 1, - Patch: 1, - } -) - const ( Name = "landslide" @@ -63,84 +51,250 @@ const ( missingCacheSize = 50 unverifiedCacheSize = 50 - // genesisChunkSize is the maximum size, in bytes, of each - // chunk in the genesis structure for the chunked API genesisChunkSize = 16 * 1024 * 1024 // 16 ) var ( - chainStateMetricsPrefix = "chain_state" - - lastAcceptedKey = []byte("last_accepted_key") - blockStoreDBPrefix = []byte("blockstore") - stateDBPrefix = []byte("state") - txIndexerDBPrefix = []byte("tx_index") - blockIndexerDBPrefix = []byte("block_events") + Version = version.Semantic{ + Major: 0, + Minor: 1, + Patch: 2, + } + _ common.NetworkAppHandler = (*VM)(nil) + _ common.CrossChainAppHandler = (*VM)(nil) + _ common.AppHandler = (*VM)(nil) + _ health.Checker = (*VM)(nil) + _ validators.Connector = (*VM)(nil) + _ common.VM = (*VM)(nil) + _ block.Getter = (*VM)(nil) + _ block.Parser = (*VM)(nil) + _ block.ChainVM = (*VM)(nil) + + dbPrefixBlockStore = []byte("block-store") + dbPrefixStateStore = []byte("state-store") + dbPrefixTxIndexer = []byte("tx-indexer") + dbPrefixBlockIndexer = []byte("block-indexer") proposerAddress = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} -) -var ( errInvalidBlock = errors.New("invalid block") errNoPendingTxs = errors.New("there is no txs to include to block") ) -type VM struct { - ctx *snow.Context - dbManager manager.Manager +type ( + AppCreator func(ids.ID) (abciTypes.Application, error) - toEngine chan<- common.Message + VM struct { + appCreator AppCreator + app proxy.AppConns - // *chain.State helps to implement the VM interface by wrapping blocks - // with an efficient caching layer. - *chain.State + log log.Logger + chainCtx *snow.Context + toEngine chan<- common.Message - tmLogger log.Logger + blockStore *store.BlockStore + stateStore state.Store + state state.State + genesis *types.GenesisDoc - blockStoreDB dbm.DB - blockStore *store.BlockStore + mempool *mempl.CListMempool + eventBus *types.EventBus - stateDB dbm.DB - stateStore sm.Store - tmState *sm.State + txIndexer txindex.TxIndexer + blockIndexer indexer.BlockIndexer + indexerService *txindex.IndexerService + multiGatherer metrics.MultiGatherer - mempool mempl.Mempool + bootstrapped utils.Atomic[bool] + verifiedBlocks map[ids.ID]*Block + preferred ids.ID + } +) - // Tendermint Application - app abciTypes.Application +func LocalAppCreator(app abciTypes.Application) AppCreator { + return func(ids.ID) (abciTypes.Application, error) { + return app, nil + } +} - // Tendermint proxy app - proxyApp proxy.AppConns +func New(appCreator AppCreator) *VM { + return &VM{ + appCreator: appCreator, + app: nil, + } +} - // EventBus is a common bus for all events going through the system. - eventBus *types.EventBus +// Notify this engine of a request for data from [nodeID]. +// +// The meaning of [request], and what should be sent in response to it, is +// application (VM) specific. +// +// It is not guaranteed that: +// * [request] is well-formed/valid. +// +// This node should typically send an AppResponse to [nodeID] in response to +// a valid message using the same request ID before the deadline. However, +// the VM may arbitrarily choose to not send a response to this request. +func (vm *VM) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, deadline time.Time, request []byte) error { + panic("implement me") +} - // [acceptedBlockDB] is the database to store the last accepted - // block. - acceptedBlockDB database.Database +// Notify this engine that an AppRequest message it sent to [nodeID] with +// request ID [requestID] failed. +// +// This may be because the request timed out or because the message couldn't +// be sent to [nodeID]. +// +// It is guaranteed that: +// * This engine sent a request to [nodeID] with ID [requestID]. +// * AppRequestFailed([nodeID], [requestID]) has not already been called. +// * AppResponse([nodeID], [requestID]) has not already been called. +func (vm *VM) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { + panic("implement me") +} - genesis *types.GenesisDoc - // cache of chunked genesis data. - genChunks []string +// Notify this engine of a response to the AppRequest message it sent to +// [nodeID] with request ID [requestID]. +// +// The meaning of [response] is application (VM) specifc. +// +// It is guaranteed that: +// * This engine sent a request to [nodeID] with ID [requestID]. +// * AppRequestFailed([nodeID], [requestID]) has not already been called. +// * AppResponse([nodeID], [requestID]) has not already been called. +// +// It is not guaranteed that: +// * [response] contains the expected response +// * [response] is well-formed/valid. +// +// If [response] is invalid or not the expected response, the VM chooses how +// to react. For example, the VM may send another AppRequest, or it may give +// up trying to get the requested information. +func (vm *VM) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error { + panic("implement me") +} - // Metrics - multiGatherer metrics.MultiGatherer +// Notify this engine of a gossip message from [nodeID]. +// +// The meaning of [msg] is application (VM) specific, and the VM defines how +// to react to this message. +// +// This message is not expected in response to any event, and it does not +// need to be responded to. +// +// A node may gossip the same message multiple times. That is, +// AppGossip([nodeID], [msg]) may be called multiple times. +func (vm *VM) AppGossip(ctx context.Context, nodeID ids.NodeID, msg []byte) error { + panic("implement me") +} + +// CrossChainAppRequest Notify this engine of a request for data from +// [chainID]. +// +// The meaning of [request], and what should be sent in response to it, is +// application (VM) specific. +// +// Guarantees surrounding the request are specific to the implementation of +// the requesting VM. For example, the request may or may not be guaranteed +// to be well-formed/valid depending on the implementation of the requesting +// VM. +// +// This node should typically send a CrossChainAppResponse to [chainID] in +// response to a valid message using the same request ID before the +// deadline. However, the VM may arbitrarily choose to not send a response +// to this request. +func (vm *VM) CrossChainAppRequest(ctx context.Context, chainID ids.ID, requestID uint32, deadline time.Time, request []byte) error { + panic("implement me") +} + +// CrossChainAppRequestFailed notifies this engine that a +// CrossChainAppRequest message it sent to [chainID] with request ID +// [requestID] failed. +// +// This may be because the request timed out or because the message couldn't +// be sent to [chainID]. +// +// It is guaranteed that: +// * This engine sent a request to [chainID] with ID [requestID]. +// * CrossChainAppRequestFailed([chainID], [requestID]) has not already been +// called. +// * CrossChainAppResponse([chainID], [requestID]) has not already been +// called. +func (vm *VM) CrossChainAppRequestFailed(ctx context.Context, chainID ids.ID, requestID uint32) error { + panic("implement me") +} + +// CrossChainAppResponse notifies this engine of a response to the +// CrossChainAppRequest message it sent to [chainID] with request ID +// [requestID]. +// +// The meaning of [response] is application (VM) specific. +// +// It is guaranteed that: +// * This engine sent a request to [chainID] with ID [requestID]. +// * CrossChainAppRequestFailed([chainID], [requestID]) has not already been +// called. +// * CrossChainAppResponse([chainID], [requestID]) has not already been +// called. +// +// Guarantees surrounding the response are specific to the implementation of +// the responding VM. For example, the response may or may not be guaranteed +// to be well-formed/valid depending on the implementation of the requesting +// VM. +// +// If [response] is invalid or not the expected response, the VM chooses how +// to react. For example, the VM may send another CrossChainAppRequest, or +// it may give up trying to get the requested information. +func (vm *VM) CrossChainAppResponse(ctx context.Context, chainID ids.ID, requestID uint32, response []byte) error { + panic("implement me") +} - txIndexer txindex.TxIndexer - txIndexerDB dbm.DB - blockIndexer indexer.BlockIndexer - blockIndexerDB dbm.DB - indexerService *txindex.IndexerService +// HealthCheck returns health check results and, if not healthy, a non-nil +// error +// +// It is expected that the results are json marshallable. +func (vm *VM) HealthCheck(context.Context) (interface{}, error) { + return nil, nil +} - clock mockable.Clock +// Connector represents a handler that is called when a connection is marked as connected +func (vm *VM) Connected(ctx context.Context, nodeID ids.NodeID, nodeVersion *version.Application) error { + vm.log.Info("connected", "nodeID", nodeID.String(), "nodeVersion", nodeVersion.String()) + return nil } -func NewVM(app abciTypes.Application) *VM { - return &VM{app: app} +// Connector represents a handler that is called when a connection is marked as disconnected +func (vm *VM) Disconnected(ctx context.Context, nodeID ids.NodeID) error { + vm.log.Info("disconnected", "nodeID", nodeID.String()) + return nil } +// Initialize this VM. +// [chainCtx]: Metadata about this VM. +// +// [chainCtx.networkID]: The ID of the network this VM's chain is +// running on. +// [chainCtx.chainID]: The unique ID of the chain this VM is running on. +// [chainCtx.Log]: Used to log messages +// [chainCtx.NodeID]: The unique staker ID of this node. +// [chainCtx.Lock]: A Read/Write lock shared by this VM and the +// consensus engine that manages this VM. The write +// lock is held whenever code in the consensus engine +// calls the VM. +// +// [dbManager]: The manager of the database this VM will persist data to. +// [genesisBytes]: The byte-encoding of the genesis information of this +// +// VM. The VM uses it to initialize its state. For +// example, if this VM were an account-based payments +// system, `genesisBytes` would probably contain a genesis +// transaction that gives coins to some accounts, and this +// transaction would be in the genesis block. +// +// [toEngine]: The channel used to send messages to the consensus engine. +// [fxs]: Feature extensions that attach to this VM. func (vm *VM) Initialize( - _ context.Context, + ctx context.Context, chainCtx *snow.Context, dbManager manager.Manager, genesisBytes []byte, @@ -150,373 +304,400 @@ func (vm *VM) Initialize( fxs []*common.Fx, appSender common.AppSender, ) error { - vm.ctx = chainCtx - vm.tmLogger = log.NewTMLogger(vm.ctx.Log) - vm.dbManager = dbManager - + vm.chainCtx = chainCtx vm.toEngine = toEngine + vm.log = log.NewTMLogger(vm.chainCtx.Log).With("module", "vm") + vm.verifiedBlocks = make(map[ids.ID]*Block) - baseDB := dbManager.Current().Database - - vm.blockStoreDB = Database{prefixdb.NewNested(blockStoreDBPrefix, baseDB)} - vm.blockStore = store.NewBlockStore(vm.blockStoreDB) + db := dbManager.Current().Database - vm.stateDB = Database{prefixdb.NewNested(stateDBPrefix, baseDB)} - vm.stateStore = sm.NewStore(vm.stateDB) + dbBlockStore := NewDB(prefixdb.NewNested(dbPrefixBlockStore, db)) + vm.blockStore = store.NewBlockStore(dbBlockStore) - if err := vm.initGenesis(genesisBytes); err != nil { - return err - } + dbStateStore := NewDB(prefixdb.NewNested(dbPrefixStateStore, db)) + vm.stateStore = state.NewStore(dbStateStore) - if err := vm.initGenesisChunks(); err != nil { + app, err := vm.appCreator(chainCtx.ChainID) + if err != nil { return err } - state, err := vm.stateStore.LoadFromDBOrGenesisDoc(vm.genesis) + vm.state, vm.genesis, err = node.LoadStateFromDBOrGenesisDocProvider( + dbStateStore, + NewLocalGenesisDocProvider(genesisBytes), + ) if err != nil { - return fmt.Errorf("failed to load tmState from genesis: %w ", err) - } - vm.tmState = &state - - // genesis only - if vm.tmState.LastBlockHeight == 0 { - // TODO use decoded/encoded genesis bytes - block, partSet := vm.tmState.MakeBlock(1, []types.Tx{genesisBytes}, nil, nil, nil) - vm.tmLogger.Info("init block", "b", block, "part set", partSet) + return nil } - //vm.genesisHash = vm.ethConfig.Genesis.ToBlock(nil).Hash() // must create genesis hash before [vm.readLastAccepted] - - // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). - proxyApp, err := node.CreateAndStartProxyAppConns(proxy.NewLocalClientCreator(vm.app), vm.tmLogger) + vm.app, err = node.CreateAndStartProxyAppConns(proxy.NewLocalClientCreator(app), vm.log) if err != nil { - return fmt.Errorf("failed to create and start proxy app: %w ", err) + return err } - vm.proxyApp = proxyApp - // Create EventBus - eventBus, err := node.CreateAndStartEventBus(vm.tmLogger) + vm.eventBus, err = node.CreateAndStartEventBus(vm.log) if err != nil { - return fmt.Errorf("failed to create and start event bus: %w ", err) + return err } - vm.eventBus = eventBus - vm.txIndexerDB = Database{prefixdb.NewNested(txIndexerDBPrefix, baseDB)} - vm.txIndexer = txidxkv.NewTxIndex(vm.txIndexerDB) - vm.blockIndexerDB = Database{prefixdb.NewNested(blockIndexerDBPrefix, baseDB)} - vm.blockIndexer = blockidxkv.New(vm.blockIndexerDB) - vm.indexerService = txindex.NewIndexerService(vm.txIndexer, vm.blockIndexer, eventBus) - vm.indexerService.SetLogger(vm.tmLogger.With("module", "txindex")) + dbTxIndexer := NewDB(prefixdb.NewNested(dbPrefixTxIndexer, db)) + vm.txIndexer = txidxkv.NewTxIndex(dbTxIndexer) - if err := vm.indexerService.Start(); err != nil { - return err - } + dbBlockIndexer := NewDB(prefixdb.NewNested(dbPrefixBlockIndexer, db)) + vm.blockIndexer = blockidxkv.New(dbBlockIndexer) - if err := vm.doHandshake(vm.genesis, vm.tmLogger.With("module", "consensus")); err != nil { + vm.indexerService = txindex.NewIndexerService(vm.txIndexer, vm.blockIndexer, vm.eventBus) + vm.indexerService.SetLogger(vm.log.With("module", "indexer")) + if err := vm.indexerService.Start(); err != nil { return err } - state, err = vm.stateStore.Load() - if err != nil { - return fmt.Errorf("failed to load tmState: %w ", err) + handshaker := consensus.NewHandshaker( + vm.stateStore, + vm.state, + vm.blockStore, + vm.genesis, + ) + handshaker.SetLogger(vm.log.With("module", "consensus")) + handshaker.SetEventBus(vm.eventBus) + if err := handshaker.Handshake(vm.app); err != nil { + return fmt.Errorf("error during handshake: %v", err) } - vm.tmState = &state - genesisBlock, err := vm.buildGenesisBlock(genesisBytes) + vm.state, err = vm.stateStore.Load() if err != nil { - return fmt.Errorf("failed to build genesis block: %w ", err) + return nil } - vm.mempool = vm.createMempool() + vm.mempool = mempl.NewCListMempool( + config.DefaultMempoolConfig(), + vm.app.Mempool(), + vm.state.LastBlockHeight, + vm, + mempl.WithMetrics(mempl.NopMetrics()), + mempl.WithPreCheck(state.TxPreCheck(vm.state)), + mempl.WithPostCheck(state.TxPostCheck(vm.state)), + ) + vm.mempool.SetLogger(vm.log.With("module", "mempool")) + vm.mempool.EnableTxsAvailable() - if err := vm.initializeMetrics(); err != nil { + vm.multiGatherer = metrics.NewMultiGatherer() + if err := vm.chainCtx.Metrics.Register(vm.multiGatherer); err != nil { return err } - if err := vm.initChainState(genesisBlock); err != nil { - return err + if vm.state.LastBlockHeight == 0 { + block, _ := vm.state.MakeBlock(1, types.Txs{}, makeCommit(1, time.Now()), nil, proposerAddress) + block.LastBlockID = types.BlockID{ + Hash: tmhash.Sum([]byte{}), + PartSetHeader: types.PartSetHeader{ + Total: 0, + Hash: tmhash.Sum([]byte{}), + }, + } + if err := NewBlock(vm, block, choices.Processing).Accept(ctx); err != nil { + return err + } } + vm.log.Info("vm initialization completed") return nil } -// builds genesis block if required -func (vm *VM) buildGenesisBlock(genesisData []byte) (*types.Block, error) { - if vm.tmState.LastBlockHeight != 0 { - return nil, nil - } - txs := types.Txs{types.Tx(genesisData)} - if len(txs) == 0 { - return nil, errNoPendingTxs +func (vm *VM) NotifyBlockReady() { + select { + case vm.toEngine <- common.PendingTxs: + vm.log.Debug("notify consensys engine") + default: + vm.log.Error("failed to push PendingTxs notification to the consensus engine.") } - height := vm.tmState.LastBlockHeight + 1 - - commit := makeCommitMock(height, time.Now()) - genesisBlock, _ := vm.tmState.MakeBlock(height, txs, commit, nil, proposerAddress) - return genesisBlock, nil } -// Initializes Genesis if required -func (vm *VM) initGenesis(genesisData []byte) error { - // load genesis from database - genesis, err := node.LoadGenesisDoc(vm.stateDB) - // genesis not found in database - if err != nil { - if err == node.ErrNoGenesisDoc { - // get it from json - genesis, err = types.GenesisDocFromJSON(genesisData) - if err != nil { - return fmt.Errorf("failed to decode genesis bytes: %w ", err) - } - // save to database - err = node.SaveGenesisDoc(vm.stateDB, genesis) - if err != nil { - return fmt.Errorf("failed to save genesis data: %w ", err) - } - } else { - return err - } +// SetState communicates to VM its next state it starts +func (vm *VM) SetState(ctx context.Context, state snow.State) error { + vm.log.Debug("set state", "state", state.String()) + switch state { + case snow.Bootstrapping: + vm.bootstrapped.Set(false) + case snow.NormalOp: + vm.bootstrapped.Set(true) + default: + return snow.ErrUnknownState } - - vm.genesis = genesis return nil } -// InitGenesisChunks configures the environment -// and should be called on service startup. -func (vm *VM) initGenesisChunks() error { - if vm.genesis == nil { - return fmt.Errorf("empty genesis") +// Shutdown is called when the node is shutting down. +func (vm *VM) Shutdown(context.Context) error { + vm.log.Debug("shutdown start") + + if err := vm.indexerService.Stop(); err != nil { + return fmt.Errorf("error closing indexerService: %w ", err) } - data, err := tmjson.Marshal(vm.genesis) - if err != nil { - return err + if err := vm.eventBus.Stop(); err != nil { + return fmt.Errorf("error closing eventBus: %w ", err) } - for i := 0; i < len(data); i += genesisChunkSize { - end := i + genesisChunkSize + if err := vm.app.Stop(); err != nil { + return fmt.Errorf("error closing app: %w ", err) + } - if end > len(data) { - end = len(data) - } + if err := vm.stateStore.Close(); err != nil { + return fmt.Errorf("error closing stateStore: %w ", err) + } - vm.genChunks = append(vm.genChunks, base64.StdEncoding.EncodeToString(data[i:end])) + if err := vm.blockStore.Close(); err != nil { + return fmt.Errorf("Error closing blockStore: %w ", err) } + vm.log.Debug("shutdown completed") return nil } -func (vm *VM) createMempool() *mempl.CListMempool { - cfg := config.DefaultMempoolConfig() - mempool := mempl.NewCListMempool( - cfg, - vm.proxyApp.Mempool(), - vm.tmState.LastBlockHeight, - vm, - mempl.WithMetrics(mempl.NopMetrics()), // TODO: use prometheus metrics based on config - mempl.WithPreCheck(sm.TxPreCheck(*vm.tmState)), - mempl.WithPostCheck(sm.TxPostCheck(*vm.tmState)), - ) - mempoolLogger := vm.tmLogger.With("module", "mempool") - mempool.SetLogger(mempoolLogger) +// Version returns the version of the VM. +func (vm *VM) Version(context.Context) (string, error) { + return Version.String(), nil +} - return mempool +// Creates the HTTP handlers for custom VM network calls. +// +// This exposes handlers that the outside world can use to communicate with +// a static reference to the VM. Each handler has the path: +// [Address of node]/ext/VM/[VM ID]/[extension] +// +// Returns a mapping from [extension]s to HTTP handlers. +// +// Each extension can specify how locking is managed for convenience. +// +// For example, it might make sense to have an extension for creating +// genesis bytes this VM can interpret. +// +// Note: If this method is called, no other method will be called on this VM. +// Each registered VM will have a single instance created to handle static +// APIs. This instance will be handled separately from instances created to +// service an instance of a chain. +func (vm *VM) CreateStaticHandlers(context.Context) (map[string]*common.HTTPHandler, error) { + // ToDo: need to add implementation + return nil, nil } -// NotifyBlockReady tells the consensus engine that a new block -// is ready to be created -func (vm *VM) NotifyBlockReady() { - select { - case vm.toEngine <- common.PendingTxs: - vm.tmLogger.Debug("Notify consensys engine") - default: - vm.tmLogger.Error("Failed to push PendingTxs notification to the consensus engine.") +// Creates the HTTP handlers for custom chain network calls. +// +// This exposes handlers that the outside world can use to communicate with +// the chain. Each handler has the path: +// [Address of node]/ext/bc/[chain ID]/[extension] +// +// Returns a mapping from [extension]s to HTTP handlers. +// +// Each extension can specify how locking is managed for convenience. +// +// For example, if this VM implements an account-based payments system, +// it have an extension called `accounts`, where clients could get +// information about their accounts. +func (vm *VM) CreateHandlers(context.Context) (map[string]*common.HTTPHandler, error) { + mux := http.NewServeMux() + rpcLogger := vm.log.With("module", "rpc-server") + rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger) + + server := rpc.NewServer() + server.RegisterCodec(json.NewCodec(), "application/json") + server.RegisterCodec(json.NewCodec(), "application/json;charset=UTF-8") + if err := server.RegisterService(NewService(vm), Name); err != nil { + return nil, err } + + return map[string]*common.HTTPHandler{ + "/rpc": { + LockOptions: common.WriteLock, + Handler: server, + }, + }, nil } -func (vm *VM) doHandshake(genesis *types.GenesisDoc, consensusLogger log.Logger) error { - handshaker := cs.NewHandshaker(vm.stateStore, *vm.tmState, vm.blockStore, genesis) - handshaker.SetLogger(consensusLogger) - handshaker.SetEventBus(vm.eventBus) - if err := handshaker.Handshake(vm.proxyApp); err != nil { - return fmt.Errorf("error during handshake: %v", err) - } - return nil +// Attempt to load a block. +// +// If the block does not exist, database.ErrNotFound should be returned. +// +// It is expected that blocks that have been successfully verified should be +// returned correctly. It is also expected that blocks that have been +// accepted by the consensus engine should be able to be fetched. It is not +// required for blocks that have been rejected by the consensus engine to be +// able to be fetched. +func (vm *VM) GetBlock(ctx context.Context, blkID ids.ID) (snowman.Block, error) { + vm.log.Debug("get block", "blkID", blkID.String()) + if b, ok := vm.verifiedBlocks[blkID]; ok { + vm.log.Debug("get block", "status", b.Status()) + return b, nil + } + b := vm.blockStore.LoadBlockByHash(blkID[:]) + if b == nil { + return nil, errInvalidBlock + } + vm.log.Debug("get block", "status", choices.Accepted) + return NewBlock(vm, b, choices.Accepted), nil } -// readLastAccepted reads the last accepted hash from [acceptedBlockDB] and returns the -// last accepted block hash and height by reading directly from [vm.chaindb] instead of relying -// on [chain]. -// Note: assumes chaindb, ethConfig, and genesisHash have been initialized. -//func (vm *VM) readLastAccepted() (tmbytes.HexBytes, uint64, error) { -// // Attempt to load last accepted block to determine if it is necessary to -// // initialize state with the genesis block. -// lastAcceptedBytes, lastAcceptedErr := vm.acceptedBlockDB.Get(lastAcceptedKey) -// switch { -// case lastAcceptedErr == database.ErrNotFound: -// // If there is nothing in the database, return the genesis block hash and height -// return vm.genesisHash, 0, nil -// case lastAcceptedErr != nil: -// return common.Hash{}, 0, fmt.Errorf("failed to get last accepted block ID due to: %w", lastAcceptedErr) -// case len(lastAcceptedBytes) != common.HashLength: -// return common.Hash{}, 0, fmt.Errorf("last accepted bytes should have been length %d, but found %d", common.HashLength, len(lastAcceptedBytes)) -// default: -// lastAcceptedHash := common.BytesToHash(lastAcceptedBytes) -// height := rawdb.ReadHeaderNumber(vm.chaindb, lastAcceptedHash) -// if height == nil { -// return common.Hash{}, 0, fmt.Errorf("failed to retrieve header number of last accepted block: %s", lastAcceptedHash) -// } -// return lastAcceptedHash, *height, nil -// } -//} - -func (vm *VM) initChainState(lastAcceptedBlock *types.Block) error { - block, err := vm.newBlock(lastAcceptedBlock) - if err != nil { - return fmt.Errorf("failed to create block wrapper for the last accepted block: %w", err) - } - block.status = choices.Accepted +// Attempt to create a block from a stream of bytes. +// +// The block should be represented by the full byte array, without extra +// bytes. +// +// It is expected for all historical blocks to be parseable. +func (vm *VM) ParseBlock(ctx context.Context, blockBytes []byte) (snowman.Block, error) { + vm.log.Debug("parse block") - config := &chain.Config{ - DecidedCacheSize: decidedCacheSize, - MissingCacheSize: missingCacheSize, - UnverifiedCacheSize: unverifiedCacheSize, - //GetBlockIDAtHeight: vm.GetBlockIDAtHeight, - GetBlock: vm.getBlock, - UnmarshalBlock: vm.parseBlock, - BuildBlock: vm.buildBlock, - LastAcceptedBlock: block, + protoBlock := new(tmproto.Block) + if err := protoBlock.Unmarshal(blockBytes[1:]); err != nil { + vm.log.Error("can't parse block", "err", err) + return nil, err } - // Register chain state metrics - chainStateRegisterer := prometheus.NewRegistry() - state, err := chain.NewMeteredState(chainStateRegisterer, config) + block, err := types.BlockFromProto(protoBlock) if err != nil { - return fmt.Errorf("could not create metered state: %w", err) + vm.log.Error("can't create block from proto", "err", err) + return nil, err } - vm.State = state - return vm.multiGatherer.Register(chainStateMetricsPrefix, chainStateRegisterer) + blk := NewBlock(vm, block, choices.Status(uint32(blockBytes[0]))) + vm.log.Debug("parsed block", "id", blk.ID(), "status", blk.Status().String()) + if _, ok := vm.verifiedBlocks[blk.ID()]; !ok { + vm.verifiedBlocks[blk.ID()] = blk + } + + return blk, nil } -func (vm *VM) initializeMetrics() error { - vm.multiGatherer = metrics.NewMultiGatherer() +// Attempt to create a new block from data contained in the VM. +// +// If the VM doesn't want to issue a new block, an error should be +// returned. +func (vm *VM) BuildBlock(ctx context.Context) (snowman.Block, error) { + vm.log.Debug("build block") - if err := vm.ctx.Metrics.Register(vm.multiGatherer); err != nil { - return err + txs := vm.mempool.ReapMaxBytesMaxGas(-1, -1) + if len(txs) == 0 { + return nil, errNoPendingTxs } - return nil -} + state := vm.state.Copy() -// parseBlock parses [b] into a block to be wrapped by ChainState. -func (vm *VM) parseBlock(_ context.Context, b []byte) (snowman.Block, error) { - protoBlock := new(tmproto.Block) - err := protoBlock.Unmarshal(b) - if err != nil { - return nil, err + var preferredBlock *types.Block + if vm.preferred != ids.Empty { + if b, ok := vm.verifiedBlocks[vm.preferred]; ok { + vm.log.Debug("load preferred block from cache", "id", vm.preferred.String()) + preferredBlock = b.Block + } else { + vm.log.Debug("load preferred block from blockStore", "id", vm.preferred.String()) + preferredBlock = vm.blockStore.LoadBlockByHash(vm.preferred[:]) + if preferredBlock == nil { + return nil, errInvalidBlock + } + } + } else { + preferredBlock = vm.blockStore.LoadBlockByHash(state.LastBlockID.Hash) } + preferredHeight := preferredBlock.Header.Height - tmBlock, err := types.BlockFromProto(protoBlock) - if err != nil { - return nil, err + commit := makeCommit(preferredHeight+1, time.Now()) + block, _ := state.MakeBlock(preferredHeight+1, txs, commit, nil, proposerAddress) + block.LastBlockID = types.BlockID{ + Hash: preferredBlock.Hash(), + PartSetHeader: preferredBlock.MakePartSet(types.BlockPartSizeBytes).Header(), } - // Note: the status of block is set by ChainState - block, err := vm.newBlock(tmBlock) - if err != nil { - return nil, err - } + blk := NewBlock(vm, block, choices.Processing) + vm.verifiedBlocks[blk.ID()] = blk - return block, nil + vm.log.Debug("build block", "id", blk.ID(), "height", blk.Height(), "txs", len(block.Txs)) + return blk, nil } -// getBlock attempts to retrieve block [id] from the VM to be wrapped -// by ChainState. -func (vm *VM) getBlock(_ context.Context, id ids.ID) (snowman.Block, error) { - var hash []byte - copy(hash, id[:]) - tmBlock := vm.blockStore.LoadBlockByHash(hash) - // If [tmBlock] is nil, return [database.ErrNotFound] here - // so that the miss is considered cacheable. - if tmBlock == nil { - return nil, database.ErrNotFound - } - // Note: the status of block is set by ChainState - return vm.newBlock(tmBlock) +// Notify the VM of the currently preferred block. +// +// This should always be a block that has no children known to consensus. +func (vm *VM) SetPreference(ctx context.Context, blkID ids.ID) error { + vm.log.Debug("set preference", "blkID", blkID.String()) + vm.preferred = blkID + return nil +} + +// LastAccepted returns the ID of the last accepted block. +// +// If no blocks have been accepted by consensus yet, it is assumed there is +// a definitionally accepted block, the Genesis block, that will be +// returned. +func (vm *VM) LastAccepted(context.Context) (ids.ID, error) { + if vm.preferred == ids.Empty { + return ids.ID(vm.state.LastBlockID.Hash), nil + } + return vm.preferred, nil } func (vm *VM) applyBlock(block *Block) error { vm.mempool.Lock() defer vm.mempool.Unlock() - state, err := vm.stateStore.Load() - if err != nil { - return err - } + state := vm.state.Copy() - if err := validateBlock(state, block.tmBlock); err != nil { + err := validateBlock(state, block.Block) + if err != nil { return err } - abciResponses, err := execBlockOnProxyApp( - vm.tmLogger, - vm.proxyApp.Consensus(), - block.tmBlock, vm.stateStore, - state.InitialHeight, - ) + abciResponses, err := execBlockOnProxyApp(vm.log, vm.app.Consensus(), block.Block, vm.stateStore, state.InitialHeight) if err != nil { return err } // Save the results before we commit. - if err := vm.stateStore.SaveABCIResponses(block.tmBlock.Height, abciResponses); err != nil { + if err := vm.stateStore.SaveABCIResponses(block.Block.Height, abciResponses); err != nil { return err } blockID := types.BlockID{ - Hash: block.tmBlock.Hash(), - PartSetHeader: block.tmBlock.MakePartSet(types.BlockPartSizeBytes).Header(), - } - - // Update the state with the block and responses. - state, err = updateState(state, blockID, &block.tmBlock.Header, abciResponses) - if err != nil { - return err + Hash: block.Block.Hash(), + PartSetHeader: block.Block.MakePartSet(types.BlockPartSizeBytes).Header(), } // while mempool is Locked, flush to ensure all async requests have completed // in the ABCI app before Commit. if err := vm.mempool.FlushAppConn(); err != nil { - vm.tmLogger.Error("client error during mempool.FlushAppConn", "err", err) + vm.log.Error("client error during mempool.FlushAppConn", "err", err) return err } // Commit block, get hash back - res, err := vm.proxyApp.Consensus().CommitSync() + res, err := vm.app.Consensus().CommitSync() if err != nil { - vm.tmLogger.Error("client error during proxyAppConn.CommitSync", "err", err) + vm.log.Error("client error during proxyAppConn.CommitSync", "err", err) return err } + // Update the state with the block and responses. + state.LastBlockHeight = block.Block.Height + state.LastBlockID = blockID + state.LastBlockTime = block.Time + state.LastResultsHash = types.NewResults(abciResponses.DeliverTxs).Hash() + state.AppHash = res.Data + // ResponseCommit has no error code - just data - vm.tmLogger.Info( + vm.log.Info( "committed state", "height", block.Height, - "num_txs", len(block.tmBlock.Txs), + "num_txs", len(block.Block.Txs), "app_hash", fmt.Sprintf("%X", res.Data), ) - deliverTxResponses := make([]*abciTypes.ResponseDeliverTx, len(block.tmBlock.Txs)) - for i := range block.tmBlock.Txs { + deliverTxResponses := make([]*abciTypes.ResponseDeliverTx, len(block.Block.Txs)) + for i := range block.Block.Txs { deliverTxResponses[i] = &abciTypes.ResponseDeliverTx{Code: abciTypes.CodeTypeOK} } // Update mempool. if err := vm.mempool.Update( - block.tmBlock.Height, - block.tmBlock.Txs, + block.Block.Height, + block.Block.Txs, deliverTxResponses, TxPreCheck(state), TxPostCheck(state), @@ -524,163 +705,12 @@ func (vm *VM) applyBlock(block *Block) error { return err } - vm.tmState.LastBlockHeight = block.tmBlock.Height if err := vm.stateStore.Save(state); err != nil { return err } - vm.blockStore.SaveBlock(block.tmBlock, block.tmBlock.MakePartSet(types.BlockPartSizeBytes), block.tmBlock.LastCommit) - - fireEvents(vm.tmLogger, vm.eventBus, block.tmBlock, abciResponses) - return nil -} - -// buildBlock builds a block to be wrapped by ChainState -func (vm *VM) buildBlock(_ context.Context) (snowman.Block, error) { - txs := vm.mempool.ReapMaxBytesMaxGas(-1, -1) - if len(txs) == 0 { - return nil, errNoPendingTxs - } - height := vm.tmState.LastBlockHeight + 1 - - commit := makeCommitMock(height, time.Now()) - block, _ := vm.tmState.MakeBlock(height, txs, commit, nil, proposerAddress) - - // Note: the status of block is set by ChainState - blk, err := vm.newBlock(block) - blk.SetStatus(choices.Processing) - if err != nil { - return nil, err - } - vm.tmLogger.Debug(fmt.Sprintf("Built block %s", blk.ID())) - - return blk, nil -} - -func (vm *VM) AppGossip(_ context.Context, nodeID ids.NodeID, msg []byte) error { - return nil -} - -func (vm *VM) SetState(ctx context.Context, state snow.State) error { - return nil -} - -func (vm *VM) Shutdown(ctx context.Context) error { - // first stop the non-reactor services - if err := vm.eventBus.Stop(); err != nil { - return fmt.Errorf("Error closing eventBus: %w ", err) - } - if err := vm.indexerService.Stop(); err != nil { - return fmt.Errorf("Error closing indexerService: %w ", err) - } - //TODO: investigate wal configuration - // stop mempool WAL - //if vm.config.Mempool.WalEnabled() { - // n.mempool.CloseWAL() - //} - //if n.prometheusSrv != nil { - // if err := n.prometheusSrv.Shutdown(context.Background()); err != nil { - // // Error from closing listeners, or context timeout: - // n.Logger.Error("Prometheus HTTP server Shutdown", "err", err) - // } - //} - if err := vm.blockStore.Close(); err != nil { - return fmt.Errorf("Error closing blockStore: %w ", err) - } - if err := vm.stateStore.Close(); err != nil { - return fmt.Errorf("Error closing stateStore: %w ", err) - } - return nil - //timestampVM and deprecated landslide - //if vm.state == nil { - // return nil - //} - // - //return vm.state.Close() // close versionDB - - //coreth - //if vm.ctx == nil { - // return nil - //} - //vm.Network.Shutdown() - //if err := vm.StateSyncClient.Shutdown(); err != nil { - // log.Error("error stopping state syncer", "err", err) - //} - //close(vm.shutdownChan) - //vm.eth.Stop() - //vm.shutdownWg.Wait() - //return nil -} - -func (vm *VM) Version(ctx context.Context) (string, error) { - return Version.String(), nil -} + vm.state = state -func (vm *VM) CreateStaticHandlers(ctx context.Context) (map[string]*common.HTTPHandler, error) { - //TODO implement me - return nil, nil -} - -func (vm *VM) CreateHandlers(_ context.Context) (map[string]*common.HTTPHandler, error) { - mux := http.NewServeMux() - rpcLogger := vm.tmLogger.With("module", "rpc-server") - rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger) - - server := rpc.NewServer() - server.RegisterCodec(json.NewCodec(), "application/json") - server.RegisterCodec(json.NewCodec(), "application/json;charset=UTF-8") - if err := server.RegisterService(NewService(vm), Name); err != nil { - return nil, err - } - - return map[string]*common.HTTPHandler{ - "/rpc": { - LockOptions: common.WriteLock, - Handler: server, - }, - }, nil -} - -func (vm *VM) ProxyApp() proxy.AppConns { - return vm.proxyApp -} - -func (vm *VM) SetPreference(ctx context.Context, blkID ids.ID) error { - //TODO implement me - return nil -} - -func (vm *VM) AppRequest(_ context.Context, nodeID ids.NodeID, requestID uint32, time time.Time, request []byte) error { - return nil -} - -// This VM doesn't (currently) have any app-specific messages -func (vm *VM) AppResponse(_ context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error { - return nil -} - -// This VM doesn't (currently) have any app-specific messages -func (vm *VM) AppRequestFailed(_ context.Context, nodeID ids.NodeID, requestID uint32) error { - return nil -} - -func (vm *VM) CrossChainAppRequest(_ context.Context, _ ids.ID, _ uint32, deadline time.Time, request []byte) error { + vm.blockStore.SaveBlock(block.Block, block.Block.MakePartSet(types.BlockPartSizeBytes), block.Block.LastCommit) + fireEvents(vm.log, vm.eventBus, block.Block, abciResponses) return nil } - -func (vm *VM) CrossChainAppRequestFailed(_ context.Context, _ ids.ID, _ uint32) error { - return nil -} - -func (vm *VM) CrossChainAppResponse(_ context.Context, _ ids.ID, _ uint32, response []byte) error { - return nil -} - -func (vm *VM) Connected(_ context.Context, id ids.NodeID, nodeVersion *version.Application) error { - return nil // noop -} - -func (vm *VM) Disconnected(_ context.Context, id ids.NodeID) error { - return nil // noop -} - -func (vm *VM) HealthCheck(ctx context.Context) (interface{}, error) { return nil, nil } diff --git a/vm/vm_test.go b/vm/vm_test.go index 02801de19..212eb7513 100644 --- a/vm/vm_test.go +++ b/vm/vm_test.go @@ -15,7 +15,6 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/version" - "github.com/ava-labs/avalanchego/vms/components/chain" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -51,7 +50,7 @@ func newTestVM(app atypes.Application) (*VM, *snow.Context, chan common.Message, Patch: 0, }) msgChan := make(chan common.Message, 1) - vm := NewVM(app) + vm := New(LocalAppCreator(app)) snowCtx := snow.DefaultContextTest() snowCtx.Log = logging.NewLogger( fmt.Sprintf("<%s Chain>", blockchainID), @@ -123,7 +122,7 @@ func TestInitVm(t *testing.T) { err = blk1.Accept(context.Background()) assert.NoError(t, err) - tmBlk1 := blk1.(*chain.BlockWrapper).Block.(*Block).tmBlock + tmBlk1 := blk1.(*Block).Block t.Logf("Block: %d", blk1.Height()) t.Logf("TM Block Tx count: %d", len(tmBlk1.Data.Txs)) @@ -161,7 +160,7 @@ func TestInitVm(t *testing.T) { err = blk2.Accept(context.Background()) assert.NoError(t, err) - tmBlk2 := blk2.(*chain.BlockWrapper).Block.(*Block).tmBlock + tmBlk2 := blk2.(*Block).Block t.Logf("Block: %d", blk2.Height()) t.Logf("TM Block Tx count: %d", len(tmBlk2.Data.Txs))