diff --git a/rpc/jsonrpc/server/http_json_handler.go b/rpc/jsonrpc/server/http_json_handler.go index b51f1f231..930e8c43d 100644 --- a/rpc/jsonrpc/server/http_json_handler.go +++ b/rpc/jsonrpc/server/http_json_handler.go @@ -17,7 +17,7 @@ import ( // HTTP + JSON handler // jsonrpc calls grab the given method's function info and runs reflect.Call -func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.HandlerFunc { +func MakeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { b, err := ioutil.ReadAll(r.Body) if err != nil { @@ -67,13 +67,14 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.Han ) continue } - if len(r.URL.Path) > 1 { - responses = append( - responses, - types.RPCInvalidRequestError(request.ID, fmt.Errorf("path %s is invalid", r.URL.Path)), - ) - continue - } + // TODO: need to rever this change + //if len(r.URL.Path) > 1 { + // responses = append( + // responses, + // types.RPCInvalidRequestError(request.ID, fmt.Errorf("path %s is invalid", r.URL.Path)), + // ) + // continue + //} rpcFunc, ok := funcMap[request.Method] if !ok || rpcFunc.ws { responses = append(responses, types.RPCMethodNotFoundError(request.ID)) diff --git a/rpc/jsonrpc/server/rpc_func.go b/rpc/jsonrpc/server/rpc_func.go index 9f39c3664..03925c0c4 100644 --- a/rpc/jsonrpc/server/rpc_func.go +++ b/rpc/jsonrpc/server/rpc_func.go @@ -20,7 +20,7 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger lo } // JSONRPC endpoints - mux.HandleFunc("/", handleInvalidJSONRPCPaths(makeJSONRPCHandler(funcMap, logger))) + mux.HandleFunc("/", handleInvalidJSONRPCPaths(MakeJSONRPCHandler(funcMap, logger))) } // Function introspection diff --git a/vm/funcs.go b/vm/funcs.go index b48a27057..5f34d6dc5 100644 --- a/vm/funcs.go +++ b/vm/funcs.go @@ -14,7 +14,6 @@ import ( tmstate "github.com/consideritdone/landslidecore/proto/tendermint/state" "github.com/consideritdone/landslidecore/proxy" "github.com/consideritdone/landslidecore/rpc/client" - coretypes "github.com/consideritdone/landslidecore/rpc/core/types" "github.com/consideritdone/landslidecore/state" "github.com/consideritdone/landslidecore/store" "github.com/consideritdone/landslidecore/types" @@ -371,8 +370,8 @@ func WaitForHeight(c Service, h int64, waiter client.Waiter) error { } delta := int64(1) for delta > 0 { - r := new(coretypes.ResultStatus) - if err := c.Status(nil, nil, r); err != nil { + r, err := c.Status(nil) + if err != nil { return err } delta = h - r.SyncInfo.LatestBlockHeight diff --git a/vm/service.go b/vm/service.go index aca8dc3e6..6cdcb4944 100644 --- a/vm/service.go +++ b/vm/service.go @@ -4,22 +4,31 @@ import ( "context" "errors" "fmt" - "net/http" "sort" "time" abci "github.com/consideritdone/landslidecore/abci/types" tmbytes "github.com/consideritdone/landslidecore/libs/bytes" tmmath "github.com/consideritdone/landslidecore/libs/math" + tmpubsub "github.com/consideritdone/landslidecore/libs/pubsub" tmquery "github.com/consideritdone/landslidecore/libs/pubsub/query" mempl "github.com/consideritdone/landslidecore/mempool" "github.com/consideritdone/landslidecore/p2p" "github.com/consideritdone/landslidecore/proxy" "github.com/consideritdone/landslidecore/rpc/core" ctypes "github.com/consideritdone/landslidecore/rpc/core/types" + rpcserver "github.com/consideritdone/landslidecore/rpc/jsonrpc/server" + rpctypes "github.com/consideritdone/landslidecore/rpc/jsonrpc/types" + blockidxnull "github.com/consideritdone/landslidecore/state/indexer/block/null" + "github.com/consideritdone/landslidecore/state/txindex/null" "github.com/consideritdone/landslidecore/types" ) +var ( + SubscribeTimeout = 5 * time.Second + _ Service = (*LocalService)(nil) +) + type ( LocalService struct { vm *VM @@ -27,196 +36,261 @@ type ( Service interface { ABCIService - HistoryService - NetworkService - SignService - StatusService - MempoolService - } - - ABCIQueryArgs struct { - Path string `json:"path"` - Data tmbytes.HexBytes `json:"data"` - } - - ABCIQueryOptions struct { - Height int64 `json:"height"` - Prove bool `json:"prove"` - } - - ABCIQueryWithOptionsArgs struct { - Path string `json:"path"` - Data tmbytes.HexBytes `json:"data"` - Opts ABCIQueryOptions `json:"opts"` - } - - BroadcastTxArgs struct { - Tx types.Tx `json:"tx"` + EventsService + HistoryClient + NetworkClient + SignClient + StatusClient + MempoolClient } ABCIService interface { // Reading from abci app - ABCIInfo(_ *http.Request, _ *struct{}, reply *ctypes.ResultABCIInfo) error - ABCIQuery(_ *http.Request, args *ABCIQueryArgs, reply *ctypes.ResultABCIQuery) error - ABCIQueryWithOptions(_ *http.Request, args *ABCIQueryWithOptionsArgs, reply *ctypes.ResultABCIQuery) error + ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) + ABCIQuery(ctx *rpctypes.Context, path string, data tmbytes.HexBytes, height int64, prove bool) (*ctypes.ResultABCIQuery, error) // Writing to abci app - BroadcastTxCommit(_ *http.Request, args *BroadcastTxArgs, reply *ctypes.ResultBroadcastTxCommit) error - BroadcastTxAsync(_ *http.Request, args *BroadcastTxArgs, reply *ctypes.ResultBroadcastTx) error - BroadcastTxSync(_ *http.Request, args *BroadcastTxArgs, reply *ctypes.ResultBroadcastTx) error + BroadcastTxCommit(*rpctypes.Context, types.Tx) (*ctypes.ResultBroadcastTxCommit, error) + BroadcastTxAsync(*rpctypes.Context, types.Tx) (*ctypes.ResultBroadcastTx, error) + BroadcastTxSync(*rpctypes.Context, types.Tx) (*ctypes.ResultBroadcastTx, error) } - BlockHeightArgs struct { - Height *int64 `json:"height"` + EventsService interface { + Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) + Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) + UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) } - BlockHashArgs struct { - Hash []byte `json:"hash"` + HistoryClient interface { + Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) + GenesisChunked(*rpctypes.Context, uint) (*ctypes.ResultGenesisChunk, error) + BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) } - CommitArgs struct { - Height *int64 `json:"height"` + MempoolClient interface { + UnconfirmedTxs(ctx *rpctypes.Context, limit *int) (*ctypes.ResultUnconfirmedTxs, error) + NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) + CheckTx(*rpctypes.Context, types.Tx) (*ctypes.ResultCheckTx, error) } - ValidatorsArgs struct { - Height *int64 `json:"height"` - Page *int `json:"page"` - PerPage *int `json:"perPage"` + NetworkClient interface { + NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) + DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState, error) + ConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) + ConsensusParams(ctx *rpctypes.Context, height *int64) (*ctypes.ResultConsensusParams, error) + Health(ctx *rpctypes.Context) (*ctypes.ResultHealth, error) } - TxArgs struct { - Hash tmbytes.HexBytes `json:"hash"` - Prove bool `json:"prove"` - } + SignClient interface { + Block(ctx *rpctypes.Context, height *int64) (*ctypes.ResultBlock, error) + BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) + BlockResults(ctx *rpctypes.Context, height *int64) (*ctypes.ResultBlockResults, error) + Commit(ctx *rpctypes.Context, height *int64) (*ctypes.ResultCommit, error) + Validators(ctx *rpctypes.Context, height *int64, page, perPage *int) (*ctypes.ResultValidators, error) + Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) - TxSearchArgs struct { - Query string `json:"query"` - Prove bool `json:"prove"` - Page *int `json:"page"` - PerPage *int `json:"perPage"` - OrderBy string `json:"orderBy"` - } - - BlockSearchArgs struct { - Query string `json:"query"` - Page *int `json:"page"` - PerPage *int `json:"perPage"` - OrderBy string `json:"orderBy"` - } + TxSearch(ctx *rpctypes.Context, query string, prove bool, + page, perPage *int, orderBy string) (*ctypes.ResultTxSearch, error) - SignService interface { - Block(_ *http.Request, args *BlockHeightArgs, reply *ctypes.ResultBlock) error - BlockByHash(_ *http.Request, args *BlockHashArgs, reply *ctypes.ResultBlock) error - BlockResults(_ *http.Request, args *BlockHeightArgs, reply *ctypes.ResultBlockResults) error - Commit(_ *http.Request, args *CommitArgs, reply *ctypes.ResultCommit) error - Validators(_ *http.Request, args *ValidatorsArgs, reply *ctypes.ResultValidators) error - Tx(_ *http.Request, args *TxArgs, reply *ctypes.ResultTx) error - TxSearch(_ *http.Request, args *TxSearchArgs, reply *ctypes.ResultTxSearch) error - BlockSearch(_ *http.Request, args *BlockSearchArgs, reply *ctypes.ResultBlockSearch) error + BlockSearch(ctx *rpctypes.Context, query string, + page, perPage *int, orderBy string) (*ctypes.ResultBlockSearch, error) } - BlockchainInfoArgs struct { - MinHeight int64 `json:"minHeight"` - MaxHeight int64 `json:"maxHeight"` + StatusClient interface { + Status(*rpctypes.Context) (*ctypes.ResultStatus, error) } +) - GenesisChunkedArgs struct { - Chunk uint `json:"chunk"` - } +func NewService(vm *VM) *LocalService { + return &LocalService{vm} +} - HistoryService interface { - BlockchainInfo(_ *http.Request, args *BlockchainInfoArgs, reply *ctypes.ResultBlockchainInfo) error - Genesis(_ *http.Request, _ *struct{}, reply *ctypes.ResultGenesis) error - GenesisChunked(_ *http.Request, args *GenesisChunkedArgs, reply *ctypes.ResultGenesisChunk) error +func NewServiceAsRPCRoutes(vm *VM) map[string]*rpcserver.RPCFunc { + s := NewService(vm) + return map[string]*rpcserver.RPCFunc{ + // subscribe/unsubscribe are reserved for websocket events. + "subscribe": rpcserver.NewWSRPCFunc(s.Subscribe, "query"), + "unsubscribe": rpcserver.NewWSRPCFunc(s.Unsubscribe, "query"), + "unsubscribe_all": rpcserver.NewWSRPCFunc(s.UnsubscribeAll, ""), + + // info API + "health": rpcserver.NewRPCFunc(s.Health, ""), + "status": rpcserver.NewRPCFunc(s.Status, ""), + "net_info": rpcserver.NewRPCFunc(s.NetInfo, ""), + "blockchain": rpcserver.NewRPCFunc(s.BlockchainInfo, "minHeight,maxHeight"), + "genesis": rpcserver.NewRPCFunc(s.Genesis, ""), + "genesis_chunked": rpcserver.NewRPCFunc(s.GenesisChunked, "chunk"), + "block": rpcserver.NewRPCFunc(s.Block, "height"), + "block_by_hash": rpcserver.NewRPCFunc(s.BlockByHash, "hash"), + "block_results": rpcserver.NewRPCFunc(s.BlockResults, "height"), + "commit": rpcserver.NewRPCFunc(s.Commit, "height"), + "check_tx": rpcserver.NewRPCFunc(s.CheckTx, "tx"), + "tx": rpcserver.NewRPCFunc(s.Tx, "hash,prove"), + "tx_search": rpcserver.NewRPCFunc(s.TxSearch, "query,prove,page,per_page,order_by"), + "block_search": rpcserver.NewRPCFunc(s.BlockSearch, "query,page,per_page,order_by"), + "validators": rpcserver.NewRPCFunc(s.Validators, "height,page,per_page"), + "dump_consensus_state": rpcserver.NewRPCFunc(s.DumpConsensusState, ""), + "consensus_state": rpcserver.NewRPCFunc(s.ConsensusState, ""), + "consensus_params": rpcserver.NewRPCFunc(s.ConsensusParams, "height"), + "unconfirmed_txs": rpcserver.NewRPCFunc(s.UnconfirmedTxs, "limit"), + "num_unconfirmed_txs": rpcserver.NewRPCFunc(s.NumUnconfirmedTxs, ""), + + // tx broadcast API + "broadcast_tx_commit": rpcserver.NewRPCFunc(s.BroadcastTxCommit, "tx"), + "broadcast_tx_sync": rpcserver.NewRPCFunc(s.BroadcastTxSync, "tx"), + "broadcast_tx_async": rpcserver.NewRPCFunc(s.BroadcastTxAsync, "tx"), + + // abci API + "abci_query": rpcserver.NewRPCFunc(s.ABCIQuery, "path,data,height,prove"), + "abci_info": rpcserver.NewRPCFunc(s.ABCIInfo, ""), } +} - StatusService interface { - Status(_ *http.Request, _ *struct{}, reply *ctypes.ResultStatus) error - } +func (s *LocalService) Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) { + addr := ctx.RemoteAddr() - ConsensusParamsArgs struct { - Height *int64 `json:"height"` + if s.vm.eventBus.NumClients() >= s.vm.rpcConfig.MaxSubscriptionClients { + return nil, fmt.Errorf("max_subscription_clients %d reached", s.vm.rpcConfig.MaxSubscriptionClients) + } else if s.vm.eventBus.NumClientSubscriptions(addr) >= s.vm.rpcConfig.MaxSubscriptionsPerClient { + return nil, fmt.Errorf("max_subscriptions_per_client %d reached", s.vm.rpcConfig.MaxSubscriptionsPerClient) } - NetworkService interface { - NetInfo(_ *http.Request, _ *struct{}, reply *ctypes.ResultNetInfo) error - DumpConsensusState(_ *http.Request, _ *struct{}, reply *ctypes.ResultDumpConsensusState) error - ConsensusState(_ *http.Request, _ *struct{}, reply *ctypes.ResultConsensusState) error - ConsensusParams(_ *http.Request, args *ConsensusParamsArgs, reply *ctypes.ResultConsensusParams) error - Health(_ *http.Request, _ *struct{}, reply *ctypes.ResultHealth) error - } + s.vm.log.Info("Subscribe to query", "remote", addr, "query", query) - UnconfirmedTxsArgs struct { - Limit *int `json:"limit"` + q, err := tmquery.New(query) + if err != nil { + return nil, fmt.Errorf("failed to parse query: %w", err) } - CheckTxArgs struct { - Tx []byte `json:"tx"` - } + subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout) + defer cancel() - MempoolService interface { - UnconfirmedTxs(_ *http.Request, args *UnconfirmedTxsArgs, reply *ctypes.ResultUnconfirmedTxs) error - NumUnconfirmedTxs(_ *http.Request, _ *struct{}, reply *ctypes.ResultUnconfirmedTxs) error - CheckTx(_ *http.Request, args *CheckTxArgs, reply *ctypes.ResultCheckTx) error - } -) + sub, err := s.vm.eventBus.Subscribe(subCtx, addr, q, s.vm.rpcConfig.SubscriptionBufferSize) + if err != nil { + return nil, err + } + + closeIfSlow := s.vm.rpcConfig.CloseOnSlowClient + + // TODO: inspired by Ilnur: usage of ctx.JSONReq.ID may cause situation when user or server try to create multiple subscriptions with the same id. + // Solution: return error code with the error sescription when this situation happens + // Capture the current ID, since it can change in the future. + subscriptionID := ctx.JSONReq.ID + go func() { + for { + select { + case msg := <-sub.Out(): + var ( + resultEvent = &ctypes.ResultEvent{Query: query, Data: msg.Data(), Events: msg.Events()} + resp = rpctypes.NewRPCSuccessResponse(subscriptionID, resultEvent) + ) + writeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err = ctx.WSConn.WriteRPCResponse(writeCtx, resp); err != nil { + s.vm.log.Info("Can't write response (slow client)", + "to", addr, "subscriptionID", subscriptionID, "err", err) + + if closeIfSlow { + var ( + err = errors.New("subscription was cancelled (reason: slow client)") + resp = rpctypes.RPCServerError(subscriptionID, err) + ) + if !ctx.WSConn.TryWriteRPCResponse(resp) { + s.vm.log.Info("Can't write response (slow client)", + "to", addr, "subscriptionID", subscriptionID, "err", err) + } + return + } + } + case <-sub.Cancelled(): + if sub.Err() != tmpubsub.ErrUnsubscribed { + var reason string + if sub.Err() == nil { + reason = "Tendermint exited" + } else { + reason = sub.Err().Error() + } + resp := rpctypes.RPCServerError(subscriptionID, err) + if !ctx.WSConn.TryWriteRPCResponse(resp) { + s.vm.log.Info("Can't write response (slow client)", + "to", addr, "subscriptionID", subscriptionID, "err", + fmt.Errorf("subscription was cancelled (reason: %s)", reason)) + } + } + return + } + } + }() -var ( - DefaultABCIQueryOptions = ABCIQueryOptions{Height: 0, Prove: false} -) + return &ctypes.ResultSubscribe{}, nil +} -func NewService(vm *VM) Service { - return &LocalService{vm} +func (s *LocalService) Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) { + addr := ctx.RemoteAddr() + s.vm.log.Info("Unsubscribe from query", "remote", addr, "query", query) + q, err := tmquery.New(query) + if err != nil { + return nil, fmt.Errorf("failed to parse query: %w", err) + } + err = s.vm.eventBus.Unsubscribe(context.Background(), addr, q) + if err != nil { + return nil, err + } + return &ctypes.ResultUnsubscribe{}, nil } -func (s *LocalService) ABCIInfo(_ *http.Request, _ *struct{}, reply *ctypes.ResultABCIInfo) error { - resInfo, err := s.vm.app.Query().InfoSync(proxy.RequestInfo) +func (s *LocalService) UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) { + addr := ctx.RemoteAddr() + s.vm.log.Info("Unsubscribe from all", "remote", addr) + err := s.vm.eventBus.UnsubscribeAll(context.Background(), addr) if err != nil { - return err + return nil, err } - reply.Response = *resInfo - return nil + return &ctypes.ResultUnsubscribe{}, nil } -func (s *LocalService) ABCIQuery(req *http.Request, args *ABCIQueryArgs, reply *ctypes.ResultABCIQuery) error { - return s.ABCIQueryWithOptions(req, &ABCIQueryWithOptionsArgs{args.Path, args.Data, DefaultABCIQueryOptions}, reply) +func (s *LocalService) ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) { + resInfo, err := s.vm.app.Query().InfoSync(proxy.RequestInfo) + if err != nil || resInfo == nil { + return nil, err + } + return &ctypes.ResultABCIInfo{Response: *resInfo}, nil } -func (s *LocalService) ABCIQueryWithOptions( - _ *http.Request, - args *ABCIQueryWithOptionsArgs, - reply *ctypes.ResultABCIQuery, -) error { +// TODO: attention! Different signatures in RPC interfaces +func (s *LocalService) ABCIQuery( + ctx *rpctypes.Context, + path string, + data tmbytes.HexBytes, + height int64, + prove bool, +) (*ctypes.ResultABCIQuery, error) { resQuery, err := s.vm.app.Query().QuerySync(abci.RequestQuery{ - Path: args.Path, - Data: args.Data, - Height: args.Opts.Height, - Prove: args.Opts.Prove, + Path: path, + Data: data, + Height: height, + Prove: prove, }) - if err != nil { - return err + if err != nil || resQuery == nil { + return nil, err } - reply.Response = *resQuery - return nil + + return &ctypes.ResultABCIQuery{Response: *resQuery}, nil } -func (s *LocalService) BroadcastTxCommit( - _ *http.Request, - args *BroadcastTxArgs, - reply *ctypes.ResultBroadcastTxCommit, -) error { +func (s *LocalService) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { subscriber := "" // Subscribe to tx being committed in block. - subCtx, cancel := context.WithTimeout(context.Background(), core.SubscribeTimeout) + subCtx, cancel := context.WithTimeout(ctx.Context(), core.SubscribeTimeout) defer cancel() - q := types.EventQueryTxFor(args.Tx) + q := types.EventQueryTxFor(tx) deliverTxSub, err := s.vm.eventBus.Subscribe(subCtx, subscriber, q) if err != nil { err = fmt.Errorf("failed to subscribe to tx: %w", err) s.vm.log.Error("Error on broadcast_tx_commit", "err", err) - return err + return nil, err } defer func() { @@ -227,35 +301,33 @@ func (s *LocalService) BroadcastTxCommit( // Broadcast tx and wait for CheckTx result checkTxResCh := make(chan *abci.Response, 1) - err = s.vm.mempool.CheckTx(args.Tx, func(res *abci.Response) { + err = s.vm.mempool.CheckTx(tx, func(res *abci.Response) { checkTxResCh <- res }, mempl.TxInfo{}) if err != nil { s.vm.log.Error("Error on broadcastTxCommit", "err", err) - return fmt.Errorf("error on broadcastTxCommit: %v", err) + return nil, fmt.Errorf("error on broadcastTxCommit: %v", err) } checkTxResMsg := <-checkTxResCh checkTxRes := checkTxResMsg.GetCheckTx() if checkTxRes.Code != abci.CodeTypeOK { - *reply = ctypes.ResultBroadcastTxCommit{ + return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, DeliverTx: abci.ResponseDeliverTx{}, - Hash: args.Tx.Hash(), - } - return nil + Hash: tx.Hash(), + }, nil } // Wait for the tx to be included in a block or timeout. select { case msg := <-deliverTxSub.Out(): // The tx was included in a block. deliverTxRes := msg.Data().(types.EventDataTx) - *reply = ctypes.ResultBroadcastTxCommit{ + return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, DeliverTx: deliverTxRes.Result, - Hash: args.Tx.Hash(), + Hash: tx.Hash(), Height: deliverTxRes.Height, - } - return nil + }, nil case <-deliverTxSub.Cancelled(): var reason string if deliverTxSub.Err() == nil { @@ -265,194 +337,189 @@ func (s *LocalService) BroadcastTxCommit( } err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason) s.vm.log.Error("Error on broadcastTxCommit", "err", err) - return err + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: abci.ResponseDeliverTx{}, + Hash: tx.Hash(), + }, err // TODO: use config for timeout case <-time.After(10 * time.Second): err = errors.New("timed out waiting for tx to be included in a block") s.vm.log.Error("Error on broadcastTxCommit", "err", err) - return err + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: abci.ResponseDeliverTx{}, + Hash: tx.Hash(), + }, err } } -func (s *LocalService) BroadcastTxAsync( - _ *http.Request, - args *BroadcastTxArgs, - reply *ctypes.ResultBroadcastTx, -) error { - err := s.vm.mempool.CheckTx(args.Tx, nil, mempl.TxInfo{}) +func (s *LocalService) BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + err := s.vm.mempool.CheckTx(tx, nil, mempl.TxInfo{}) if err != nil { - return err + return nil, err } - reply.Hash = args.Tx.Hash() - return nil + return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil } -func (s *LocalService) BroadcastTxSync(_ *http.Request, args *BroadcastTxArgs, reply *ctypes.ResultBroadcastTx) error { +func (s *LocalService) BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh := make(chan *abci.Response, 1) - err := s.vm.mempool.CheckTx(args.Tx, func(res *abci.Response) { + err := s.vm.mempool.CheckTx(tx, func(res *abci.Response) { s.vm.log.With("module", "service").Debug("handled response from checkTx") resCh <- res }, mempl.TxInfo{}) if err != nil { - return err + return nil, err } res := <-resCh r := res.GetCheckTx() - - reply.Code = r.Code - reply.Data = r.Data - reply.Log = r.Log - reply.Codespace = r.Codespace - reply.Hash = args.Tx.Hash() - - return nil + return &ctypes.ResultBroadcastTx{ + Code: r.Code, + Data: r.Data, + Log: r.Log, + Codespace: r.Codespace, + Hash: tx.Hash(), + }, nil } -func (s *LocalService) Block(_ *http.Request, args *BlockHeightArgs, reply *ctypes.ResultBlock) error { - height, err := getHeight(s.vm.blockStore, args.Height) +func (s *LocalService) Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) { + height, err := getHeight(s.vm.blockStore, heightPtr) if err != nil { - return err + return nil, err } + block := s.vm.blockStore.LoadBlock(height) blockMeta := s.vm.blockStore.LoadBlockMeta(height) - - if blockMeta != nil { - reply.BlockID = blockMeta.BlockID + if blockMeta == nil { + return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, nil } - reply.Block = block - return nil + return &ctypes.ResultBlock{BlockID: blockMeta.BlockID, Block: block}, nil } -func (s *LocalService) BlockByHash(_ *http.Request, args *BlockHashArgs, reply *ctypes.ResultBlock) error { - block := s.vm.blockStore.LoadBlockByHash(args.Hash) +func (s *LocalService) BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) { + block := s.vm.blockStore.LoadBlockByHash(hash) if block == nil { - reply.BlockID = types.BlockID{} - reply.Block = nil - return nil + return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: nil}, nil } + // If block is not nil, then blockMeta can't be nil. blockMeta := s.vm.blockStore.LoadBlockMeta(block.Height) - reply.BlockID = blockMeta.BlockID - reply.Block = block - return nil + return &ctypes.ResultBlock{BlockID: blockMeta.BlockID, Block: block}, nil } -func (s *LocalService) BlockResults(_ *http.Request, args *BlockHeightArgs, reply *ctypes.ResultBlockResults) error { - height, err := getHeight(s.vm.blockStore, args.Height) +func (s *LocalService) BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockResults, error) { + height, err := getHeight(s.vm.blockStore, heightPtr) if err != nil { - return err + return nil, err } results, err := s.vm.stateStore.LoadABCIResponses(height) if err != nil { - return err + return nil, err } - reply.Height = height - reply.TxsResults = results.DeliverTxs - reply.BeginBlockEvents = results.BeginBlock.Events - reply.EndBlockEvents = results.EndBlock.Events - reply.ValidatorUpdates = results.EndBlock.ValidatorUpdates - reply.ConsensusParamUpdates = results.EndBlock.ConsensusParamUpdates - return nil + return &ctypes.ResultBlockResults{ + Height: height, + TxsResults: results.DeliverTxs, + BeginBlockEvents: results.BeginBlock.Events, + EndBlockEvents: results.EndBlock.Events, + ValidatorUpdates: results.EndBlock.ValidatorUpdates, + ConsensusParamUpdates: results.EndBlock.ConsensusParamUpdates, + }, nil } -func (s *LocalService) Commit(_ *http.Request, args *CommitArgs, reply *ctypes.ResultCommit) error { - height, err := getHeight(s.vm.blockStore, args.Height) +func (s *LocalService) Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, error) { + height, err := getHeight(s.vm.blockStore, heightPtr) if err != nil { - return err + return nil, err } blockMeta := s.vm.blockStore.LoadBlockMeta(height) if blockMeta == nil { - return nil + return nil, nil } - header := blockMeta.Header - commit := s.vm.blockStore.LoadBlockCommit(height) - res := ctypes.NewResultCommit(&header, commit, !(height == s.vm.blockStore.Height())) - reply.SignedHeader = res.SignedHeader - reply.CanonicalCommit = res.CanonicalCommit - return nil + // Return the canonical commit (comes from the block at height+1) + commit := s.vm.blockStore.LoadBlockCommit(height) + return ctypes.NewResultCommit(&header, commit, true), nil } -func (s *LocalService) Validators(_ *http.Request, args *ValidatorsArgs, reply *ctypes.ResultValidators) error { - height, err := getHeight(s.vm.blockStore, args.Height) +func (s *LocalService) Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *int) (*ctypes.ResultValidators, error) { + height, err := getHeight(s.vm.blockStore, heightPtr) if err != nil { - return err + return nil, err } validators, err := s.vm.stateStore.LoadValidators(height) if err != nil { - return err + return nil, err } totalCount := len(validators.Validators) - perPage := validatePerPage(args.PerPage) - page, err := validatePage(args.Page, perPage, totalCount) + perPage := validatePerPage(perPagePtr) + page, err := validatePage(pagePtr, perPage, totalCount) if err != nil { - return err + return nil, err } skipCount := validateSkipCount(page, perPage) - reply.BlockHeight = height - reply.Validators = validators.Validators[skipCount : skipCount+tmmath.MinInt(perPage, totalCount-skipCount)] - reply.Count = len(reply.Validators) - reply.Total = totalCount - return nil + v := validators.Validators[skipCount : skipCount+tmmath.MinInt(perPage, totalCount-skipCount)] + + return &ctypes.ResultValidators{ + BlockHeight: height, + Validators: v, + Count: len(v), + Total: totalCount}, nil } -func (s *LocalService) Tx(_ *http.Request, args *TxArgs, reply *ctypes.ResultTx) error { - s.vm.log.Debug("query tx", "hash", args.Hash) - r, err := s.vm.txIndexer.Get(args.Hash) +func (s *LocalService) Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { + if _, ok := s.vm.txIndexer.(*null.TxIndex); ok { + return nil, fmt.Errorf("transaction indexing is disabled") + } + + r, err := s.vm.txIndexer.Get(hash) if err != nil { - return err + return nil, err } - s.vm.log.Debug("query tx", "r", args.Hash) if r == nil { - return fmt.Errorf("tx (%X) not found", args.Hash) + return nil, fmt.Errorf("tx (%X) not found", hash) } height := r.Height index := r.Index var proof types.TxProof - if args.Prove { + if prove { block := s.vm.blockStore.LoadBlock(height) proof = block.Data.Txs.Proof(int(index)) // XXX: overflow on 32-bit machines } - reply.Hash = args.Hash - reply.Height = height - reply.Index = index - reply.TxResult = r.Result - reply.Tx = r.Tx - reply.Proof = proof - return nil + return &ctypes.ResultTx{ + Hash: hash, + Height: height, + Index: index, + TxResult: r.Result, + Tx: r.Tx, + Proof: proof, + }, nil } -func (s *LocalService) TxSearch(req *http.Request, args *TxSearchArgs, reply *ctypes.ResultTxSearch) error { - q, err := tmquery.New(args.Query) +func (s *LocalService) TxSearch(ctx *rpctypes.Context, query string, prove bool, pagePtr, perPagePtr *int, orderBy string) (*ctypes.ResultTxSearch, error) { + // if index is disabled, return error + q, err := tmquery.New(query) if err != nil { - return err - } - - var ctx context.Context - if req != nil { - ctx = req.Context() - } else { - ctx = context.Background() + return nil, err } - results, err := s.vm.txIndexer.Search(ctx, q) + results, err := s.vm.txIndexer.Search(ctx.Context(), q) if err != nil { - return err + return nil, err } // sort results (must be done before pagination) - switch args.OrderBy { + switch orderBy { case "desc": sort.Slice(results, func(i, j int) bool { if results[i].Height == results[j].Height { @@ -468,16 +535,16 @@ func (s *LocalService) TxSearch(req *http.Request, args *TxSearchArgs, reply *ct return results[i].Height < results[j].Height }) default: - return errors.New("expected order_by to be either `asc` or `desc` or empty") + return nil, errors.New("expected order_by to be either `asc` or `desc` or empty") } // paginate results totalCount := len(results) - perPage := validatePerPage(args.PerPage) + perPage := validatePerPage(perPagePtr) - page, err := validatePage(args.Page, perPage, totalCount) + page, err := validatePage(pagePtr, perPage, totalCount) if err != nil { - return err + return nil, err } skipCount := validateSkipCount(page, perPage) @@ -488,7 +555,7 @@ func (s *LocalService) TxSearch(req *http.Request, args *TxSearchArgs, reply *ct r := results[i] var proof types.TxProof - if args.Prove { + if prove { block := s.vm.blockStore.LoadBlock(r.Height) proof = block.Data.Txs.Proof(int(r.Index)) // XXX: overflow on 32-bit machines } @@ -503,31 +570,35 @@ func (s *LocalService) TxSearch(req *http.Request, args *TxSearchArgs, reply *ct }) } - reply.Txs = apiResults - reply.TotalCount = totalCount - return nil + return &ctypes.ResultTxSearch{Txs: apiResults, TotalCount: totalCount}, nil } -func (s *LocalService) BlockSearch(req *http.Request, args *BlockSearchArgs, reply *ctypes.ResultBlockSearch) error { - q, err := tmquery.New(args.Query) - if err != nil { - return err +// BlockSearch searches for a paginated set of blocks matching BeginBlock and +// EndBlock event search criteria. +func (s *LocalService) BlockSearch( + ctx *rpctypes.Context, + query string, + pagePtr, perPagePtr *int, + orderBy string, +) (*ctypes.ResultBlockSearch, error) { + + // skip if block indexing is disabled + if _, ok := s.vm.blockIndexer.(*blockidxnull.BlockerIndexer); ok { + return nil, errors.New("block indexing is disabled") } - var ctx context.Context - if req != nil { - ctx = req.Context() - } else { - ctx = context.Background() + q, err := tmquery.New(query) + if err != nil { + return nil, err } - results, err := s.vm.blockIndexer.Search(ctx, q) + results, err := s.vm.blockIndexer.Search(ctx.Context(), q) if err != nil { - return err + return nil, err } // sort results (must be done before pagination) - switch args.OrderBy { + switch orderBy { case "desc", "": sort.Slice(results, func(i, j int) bool { return results[i] > results[j] }) @@ -535,16 +606,16 @@ func (s *LocalService) BlockSearch(req *http.Request, args *BlockSearchArgs, rep sort.Slice(results, func(i, j int) bool { return results[i] < results[j] }) default: - return errors.New("expected order_by to be either `asc` or `desc` or empty") + return nil, errors.New("expected order_by to be either `asc` or `desc` or empty") } // paginate results totalCount := len(results) - perPage := validatePerPage(args.PerPage) + perPage := validatePerPage(perPagePtr) - page, err := validatePage(args.Page, perPage, totalCount) + page, err := validatePage(pagePtr, perPage, totalCount) if err != nil { - return err + return nil, err } skipCount := validateSkipCount(page, perPage) @@ -564,68 +635,66 @@ func (s *LocalService) BlockSearch(req *http.Request, args *BlockSearchArgs, rep } } - reply.Blocks = apiResults - reply.TotalCount = totalCount - return nil + return &ctypes.ResultBlockSearch{Blocks: apiResults, TotalCount: totalCount}, nil } -func (s *LocalService) BlockchainInfo( - _ *http.Request, - args *BlockchainInfoArgs, - reply *ctypes.ResultBlockchainInfo, -) error { +func (s *LocalService) BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { // maximum 20 block metas const limit int64 = 20 var err error - args.MinHeight, args.MaxHeight, err = filterMinMax( + minHeight, maxHeight, err = filterMinMax( s.vm.blockStore.Base(), s.vm.blockStore.Height(), - args.MinHeight, - args.MaxHeight, + minHeight, + maxHeight, limit) if err != nil { - return err + return nil, err } - s.vm.log.Debug("BlockchainInfoHandler", "maxHeight", args.MaxHeight, "minHeight", args.MinHeight) + s.vm.log.Debug("BlockchainInfoHandler", "maxHeight", maxHeight, "minHeight", minHeight) var blockMetas []*types.BlockMeta - for height := args.MaxHeight; height >= args.MinHeight; height-- { + for height := maxHeight; height >= minHeight; height-- { blockMeta := s.vm.blockStore.LoadBlockMeta(height) blockMetas = append(blockMetas, blockMeta) } - reply.LastHeight = s.vm.blockStore.Height() - reply.BlockMetas = blockMetas - return nil + return &ctypes.ResultBlockchainInfo{ + LastHeight: s.vm.blockStore.Height(), + BlockMetas: blockMetas}, nil } -func (s *LocalService) Genesis(_ *http.Request, _ *struct{}, reply *ctypes.ResultGenesis) error { - reply.Genesis = s.vm.genesis - return nil +func (s *LocalService) Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) { + //if len(s.vm.genChunks) > 1 { + // return nil, errors.New("genesis response is large, please use the genesis_chunked API instead") + //} + + return &ctypes.ResultGenesis{Genesis: s.vm.genesis}, nil } -func (s *LocalService) GenesisChunked(_ *http.Request, args *GenesisChunkedArgs, reply *ctypes.ResultGenesisChunk) error { - //if s.vm.genChunks == nil { - // return fmt.Errorf("service configuration error, genesis chunks are not initialized") - //} - // - //if len(s.vm.genChunks) == 0 { - // return fmt.Errorf("service configuration error, there are no chunks") - //} - // - //id := int(args.Chunk) - // - //if id > len(s.vm.genChunks)-1 { - // return fmt.Errorf("there are %d chunks, %d is invalid", len(s.vm.genChunks)-1, id) - //} - // - //reply.TotalChunks = len(s.vm.genChunks) - //reply.ChunkNumber = id - //reply.Data = s.vm.genChunks[id] - return nil +func (s *LocalService) GenesisChunked(ctx *rpctypes.Context, chunk uint) (*ctypes.ResultGenesisChunk, error) { + if s.vm.genChunks == nil { + return nil, fmt.Errorf("service configuration error, genesis chunks are not initialized") + } + + if len(s.vm.genChunks) == 0 { + return nil, fmt.Errorf("service configuration error, there are no chunks") + } + + id := int(chunk) + + if id > len(s.vm.genChunks)-1 { + return nil, fmt.Errorf("there are %d chunks, %d is invalid", len(s.vm.genChunks)-1, id) + } + + return &ctypes.ResultGenesisChunk{ + TotalChunks: len(s.vm.genChunks), + ChunkNumber: id, + Data: s.vm.genChunks[id], + }, nil } -func (s *LocalService) Status(_ *http.Request, _ *struct{}, reply *ctypes.ResultStatus) error { +func (s *LocalService) Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { var ( earliestBlockHeight int64 earliestBlockHash tmbytes.HexBytes @@ -656,70 +725,84 @@ func (s *LocalService) Status(_ *http.Request, _ *struct{}, reply *ctypes.Result } } - reply.NodeInfo = p2p.DefaultNodeInfo{ - DefaultNodeID: p2p.ID(s.vm.chainCtx.NodeID.String()), - Network: fmt.Sprintf("%d", s.vm.chainCtx.NetworkID), - } - reply.SyncInfo = ctypes.SyncInfo{ - LatestBlockHash: latestBlockHash, - LatestAppHash: latestAppHash, - LatestBlockHeight: latestHeight, - LatestBlockTime: time.Unix(0, latestBlockTimeNano), - EarliestBlockHash: earliestBlockHash, - EarliestAppHash: earliestAppHash, - EarliestBlockHeight: earliestBlockHeight, - EarliestBlockTime: time.Unix(0, earliestBlockTimeNano), - } - return nil + result := &ctypes.ResultStatus{ + NodeInfo: p2p.DefaultNodeInfo{ + DefaultNodeID: p2p.ID(fmt.Sprintf("%x", s.vm.chainCtx.NodeID.Bytes())), + ListenAddr: fmt.Sprintf("/ext/bc/%s/rpc", s.vm.chainCtx.ChainID.String()), + Network: fmt.Sprintf("%d", s.vm.chainCtx.NetworkID), + // TODO: correct data + Channels: []byte("channels"), + // TODO: correct data + Moniker: "moniker", + }, + SyncInfo: ctypes.SyncInfo{ + LatestBlockHash: latestBlockHash, + LatestAppHash: latestAppHash, + LatestBlockHeight: latestHeight, + LatestBlockTime: time.Unix(0, latestBlockTimeNano), + EarliestBlockHash: earliestBlockHash, + EarliestAppHash: earliestAppHash, + EarliestBlockHeight: earliestBlockHeight, + EarliestBlockTime: time.Unix(0, earliestBlockTimeNano), + CatchingUp: false, + }, + ValidatorInfo: ctypes.ValidatorInfo{ + Address: proposerPubKey.Address(), + PubKey: proposerPubKey, + VotingPower: 0, + }, + } + + return result, nil } -// ToDo: no peers, because it's vm -func (s *LocalService) NetInfo(_ *http.Request, _ *struct{}, reply *ctypes.ResultNetInfo) error { - return nil +// ToDo: no peers, no network from tendermint side +func (s *LocalService) NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) { + return &ctypes.ResultNetInfo{}, nil } // ToDo: we doesn't have consensusState -func (s *LocalService) DumpConsensusState(_ *http.Request, _ *struct{}, reply *ctypes.ResultDumpConsensusState) error { - return nil +func (s *LocalService) DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState, error) { + return &ctypes.ResultDumpConsensusState{}, nil } // ToDo: we doesn't have consensusState -func (s *LocalService) ConsensusState(_ *http.Request, _ *struct{}, reply *ctypes.ResultConsensusState) error { - return nil +func (s *LocalService) ConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) { + return &ctypes.ResultConsensusState{}, nil } -func (s *LocalService) ConsensusParams(_ *http.Request, args *ConsensusParamsArgs, reply *ctypes.ResultConsensusParams) error { - reply.BlockHeight = s.vm.blockStore.Height() - reply.ConsensusParams = *s.vm.genesis.ConsensusParams - return nil +func (s *LocalService) ConsensusParams(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultConsensusParams, error) { + return &ctypes.ResultConsensusParams{ + BlockHeight: s.vm.blockStore.Height(), + ConsensusParams: *s.vm.genesis.ConsensusParams, + }, nil } -func (s *LocalService) Health(_ *http.Request, _ *struct{}, reply *ctypes.ResultHealth) error { - *reply = ctypes.ResultHealth{} - return nil +func (s *LocalService) Health(ctx *rpctypes.Context) (*ctypes.ResultHealth, error) { + return &ctypes.ResultHealth{}, nil } -func (s *LocalService) UnconfirmedTxs(_ *http.Request, args *UnconfirmedTxsArgs, reply *ctypes.ResultUnconfirmedTxs) error { - limit := validatePerPage(args.Limit) +func (s *LocalService) UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) { + limit := validatePerPage(limitPtr) txs := s.vm.mempool.ReapMaxTxs(limit) - reply.Count = len(txs) - reply.Total = s.vm.mempool.Size() - reply.Txs = txs - return nil + return &ctypes.ResultUnconfirmedTxs{ + Count: len(txs), + Total: s.vm.mempool.Size(), + Txs: txs, + }, nil } -func (s *LocalService) NumUnconfirmedTxs(_ *http.Request, _ *struct{}, reply *ctypes.ResultUnconfirmedTxs) error { - reply.Count = s.vm.mempool.Size() - reply.Total = s.vm.mempool.Size() - reply.TotalBytes = s.vm.mempool.TxsBytes() - return nil +func (s *LocalService) NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) { + return &ctypes.ResultUnconfirmedTxs{ + Count: s.vm.mempool.Size(), + Total: s.vm.mempool.Size(), + TotalBytes: s.vm.mempool.TxsBytes()}, nil } -func (s *LocalService) CheckTx(_ *http.Request, args *CheckTxArgs, reply *ctypes.ResultCheckTx) error { - res, err := s.vm.app.Mempool().CheckTxSync(abci.RequestCheckTx{Tx: args.Tx}) +func (s *LocalService) CheckTx(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) { + res, err := s.vm.app.Mempool().CheckTxSync(abci.RequestCheckTx{Tx: tx}) if err != nil { - return err + return nil, err } - reply.ResponseCheckTx = *res - return nil + return &ctypes.ResultCheckTx{ResponseCheckTx: *res}, nil } diff --git a/vm/service_test.go b/vm/service_test.go index 6b1651c2d..50896bbfb 100644 --- a/vm/service_test.go +++ b/vm/service_test.go @@ -2,23 +2,59 @@ package vm import ( "context" + "encoding/base64" + "errors" "fmt" + "strings" "testing" "time" - atypes "github.com/consideritdone/landslidecore/abci/types" + "github.com/ava-labs/avalanchego/snow/engine/common" + tmjson "github.com/consideritdone/landslidecore/libs/json" ctypes "github.com/consideritdone/landslidecore/rpc/core/types" + rpctypes "github.com/consideritdone/landslidecore/rpc/jsonrpc/types" + "github.com/consideritdone/landslidecore/types" + "golang.org/x/sync/errgroup" + + atypes "github.com/consideritdone/landslidecore/abci/types" "github.com/davecgh/go-spew/spew" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func broadcastTx(t *testing.T, v *VM, msgs chan common.Message, tx []byte) (*ctypes.ResultBroadcastTxCommit, error) { + var result *ctypes.ResultBroadcastTxCommit + wg := new(errgroup.Group) + wg.Go(func() error { + select { + case <-msgs: + t.Logf("found new txs in engine") + block, err := v.BuildBlock(context.Background()) + if err != nil { + return err + } + return block.Accept(context.Background()) + case <-time.After(time.Minute): + return errors.New("timeout. no txs") + } + }) + wg.Go(func() error { + var err error + result, err = NewService(v).BroadcastTxCommit(&rpctypes.Context{}, tx) + return err + }) + if err := wg.Wait(); err != nil { + return nil, err + } + return result, nil +} + func TestABCIService(t *testing.T) { vm, service, _ := mustNewKVTestVm(t) t.Run("ABCIInfo", func(t *testing.T) { - reply := new(ctypes.ResultABCIInfo) - assert.NoError(t, service.ABCIInfo(nil, nil, reply)) + reply, err := service.ABCIInfo(&rpctypes.Context{}) + require.NoError(t, err) assert.Equal(t, uint64(1), reply.Response.AppVersion) assert.Equal(t, int64(1), reply.Response.LastBlockHeight) assert.Equal(t, []uint8([]byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}), reply.Response.LastBlockAppHash) @@ -28,8 +64,8 @@ func TestABCIService(t *testing.T) { t.Run("ABCIQuery", func(t *testing.T) { k, v, tx := MakeTxKV() - replyBroadcast := new(ctypes.ResultBroadcastTx) - require.NoError(t, service.BroadcastTxSync(nil, &BroadcastTxArgs{tx}, replyBroadcast)) + _, err := service.BroadcastTxSync(&rpctypes.Context{}, tx) + require.NoError(t, err) blk, err := vm.BuildBlock(context.Background()) require.NoError(t, err) @@ -38,10 +74,9 @@ func TestABCIService(t *testing.T) { err = blk.Accept(context.Background()) require.NoError(t, err) - res := new(ctypes.ResultABCIQuery) - err = service.ABCIQuery(nil, &ABCIQueryArgs{Path: "/key", Data: k}, res) - if assert.Nil(t, err) && assert.True(t, res.Response.IsOK()) { - assert.EqualValues(t, v, res.Response.Value) + reply, err := service.ABCIQuery(&rpctypes.Context{}, "/key", k, 0, false) + if assert.Nil(t, err) && assert.True(t, reply.Response.IsOK()) { + assert.EqualValues(t, v, reply.Response.Value) } spew.Dump(vm.mempool.Size()) }) @@ -69,8 +104,8 @@ func TestABCIService(t *testing.T) { }(ctx) _, _, tx := MakeTxKV() - reply := new(ctypes.ResultBroadcastTxCommit) - assert.NoError(t, service.BroadcastTxCommit(nil, &BroadcastTxArgs{tx}, reply)) + reply, err := service.BroadcastTxCommit(&rpctypes.Context{}, tx) + assert.NoError(t, err) assert.True(t, reply.CheckTx.IsOK()) assert.True(t, reply.DeliverTx.IsOK()) assert.Equal(t, 0, vm.mempool.Size()) @@ -82,8 +117,8 @@ func TestABCIService(t *testing.T) { initMempoolSize := vm.mempool.Size() _, _, tx := MakeTxKV() - reply := new(ctypes.ResultBroadcastTx) - assert.NoError(t, service.BroadcastTxAsync(nil, &BroadcastTxArgs{tx}, reply)) + reply, err := service.BroadcastTxAsync(&rpctypes.Context{}, tx) + assert.NoError(t, err) assert.NotNil(t, reply.Hash) assert.Equal(t, initMempoolSize+1, vm.mempool.Size()) assert.EqualValues(t, tx, vm.mempool.ReapMaxTxs(-1)[0]) @@ -95,19 +130,80 @@ func TestABCIService(t *testing.T) { initMempoolSize := vm.mempool.Size() _, _, tx := MakeTxKV() - reply := new(ctypes.ResultBroadcastTx) - assert.NoError(t, service.BroadcastTxSync(nil, &BroadcastTxArgs{Tx: tx}, reply)) + reply, err := service.BroadcastTxSync(&rpctypes.Context{}, tx) + assert.NoError(t, err) assert.Equal(t, reply.Code, atypes.CodeTypeOK) assert.Equal(t, initMempoolSize+1, vm.mempool.Size()) assert.EqualValues(t, tx, vm.mempool.ReapMaxTxs(-1)[0]) }) } +func TestEventService(t *testing.T) { + _, service, _ := mustNewCounterTestVm(t) + + // subscribe to new blocks and make sure height increments by 1 + t.Run("Subscribe", func(t *testing.T) { + events := []string{ + types.QueryForEvent(types.EventNewBlock).String(), + types.QueryForEvent(types.EventNewBlockHeader).String(), + types.QueryForEvent(types.EventValidBlock).String(), + } + + for i, event := range events { + _, err := service.Subscribe(&rpctypes.Context{JSONReq: &rpctypes.RPCRequest{ID: rpctypes.JSONRPCIntID(i)}}, event) + require.NoError(t, err) + } + t.Cleanup(func() { + if _, err := service.UnsubscribeAll(&rpctypes.Context{}); err != nil { + t.Error(err) + } + }) + }) + + t.Run("Unsubscribe", func(t *testing.T) { + events := []string{ + types.QueryForEvent(types.EventNewBlock).String(), + types.QueryForEvent(types.EventNewBlockHeader).String(), + types.QueryForEvent(types.EventValidBlock).String(), + } + + for i, event := range events { + _, err := service.Subscribe(&rpctypes.Context{JSONReq: &rpctypes.RPCRequest{ID: rpctypes.JSONRPCIntID(i)}}, event) + require.NoError(t, err) + _, err = service.Unsubscribe(&rpctypes.Context{}, event) + require.NoError(t, err) + } + //TODO: investigate the need to use Cleanup with UnsubscribeAll + //t.Cleanup(func() { + // if _, err := service.UnsubscribeAll(&rpctypes.Context{}); err != nil { + // t.Error(err) + // } + //}) + }) + + t.Run("UnsubscribeAll", func(t *testing.T) { + events := []string{ + types.QueryForEvent(types.EventNewBlock).String(), + types.QueryForEvent(types.EventNewBlockHeader).String(), + types.QueryForEvent(types.EventValidBlock).String(), + } + + for i, event := range events { + _, err := service.Subscribe(&rpctypes.Context{JSONReq: &rpctypes.RPCRequest{ID: rpctypes.JSONRPCIntID(i)}}, event) + require.NoError(t, err) + } + _, err := service.UnsubscribeAll(&rpctypes.Context{}) + if err != nil { + t.Error(err) + } + }) +} + func TestHistoryService(t *testing.T) { vm, service, _ := mustNewCounterTestVm(t) - txReply := new(ctypes.ResultBroadcastTx) - assert.NoError(t, service.BroadcastTxSync(nil, &BroadcastTxArgs{Tx: []byte{0x00}}, txReply)) + txReply, err := service.BroadcastTxSync(&rpctypes.Context{}, []byte{0x00}) + assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, txReply.Code) blk, err := vm.BuildBlock(context.Background()) @@ -115,44 +211,63 @@ func TestHistoryService(t *testing.T) { assert.NotNil(t, blk) assert.NoError(t, blk.Accept(context.Background())) - t.Run("BlockchainInfo", func(t *testing.T) { - reply := new(ctypes.ResultBlockchainInfo) - assert.NoError(t, service.BlockchainInfo(nil, &BlockchainInfoArgs{1, 100}, reply)) - assert.Equal(t, int64(2), reply.LastHeight) - }) - t.Run("Genesis", func(t *testing.T) { - reply := new(ctypes.ResultGenesis) - assert.NoError(t, service.Genesis(nil, nil, reply)) + reply, err := service.Genesis(&rpctypes.Context{}) + assert.NoError(t, err) assert.Equal(t, vm.genesis, reply.Genesis) }) + + t.Run("GenesisChunked", func(t *testing.T) { + first, err := service.GenesisChunked(&rpctypes.Context{}, 0) + require.NoError(t, err) + + decoded := make([]string, 0, first.TotalChunks) + for i := 0; i < first.TotalChunks; i++ { + chunk, err := service.GenesisChunked(&rpctypes.Context{}, uint(i)) + require.NoError(t, err) + data, err := base64.StdEncoding.DecodeString(chunk.Data) + require.NoError(t, err) + decoded = append(decoded, string(data)) + + } + doc := []byte(strings.Join(decoded, "")) + + var out types.GenesisDoc + require.NoError(t, tmjson.Unmarshal(doc, &out), "first: %+v, doc: %s", first, string(doc)) + }) + + t.Run("BlockchainInfo", func(t *testing.T) { + reply, err := service.BlockchainInfo(&rpctypes.Context{}, 1, 100) + assert.NoError(t, err) + assert.Equal(t, int64(2), reply.LastHeight) + }) } func TestNetworkService(t *testing.T) { vm, service, _ := mustNewCounterTestVm(t) t.Run("NetInfo", func(t *testing.T) { - reply := new(ctypes.ResultNetInfo) - assert.NoError(t, service.NetInfo(nil, nil, reply)) + _, err := service.NetInfo(&rpctypes.Context{}) + assert.NoError(t, err) }) t.Run("DumpConsensusState", func(t *testing.T) { - reply := new(ctypes.ResultDumpConsensusState) - assert.NoError(t, service.DumpConsensusState(nil, nil, reply)) + _, err := service.DumpConsensusState(&rpctypes.Context{}) + assert.NoError(t, err) }) t.Run("ConsensusState", func(t *testing.T) { - reply := new(ctypes.ResultConsensusState) - assert.NoError(t, service.ConsensusState(nil, nil, reply)) + _, err := service.ConsensusState(&rpctypes.Context{}) + assert.NoError(t, err) }) t.Run("ConsensusParams", func(t *testing.T) { - reply := new(ctypes.ResultConsensusParams) - assert.NoError(t, service.ConsensusParams(nil, nil, reply)) + reply, err := service.ConsensusParams(&rpctypes.Context{}, nil) + assert.NoError(t, err) assert.Equal(t, int64(1), reply.BlockHeight) - txReply := new(ctypes.ResultBroadcastTx) - assert.NoError(t, service.BroadcastTxSync(nil, &BroadcastTxArgs{Tx: []byte{0x00}}, txReply)) + txReply, err := service.BroadcastTxSync(&rpctypes.Context{}, []byte{0x00}) + assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, txReply.Code) blk, err := vm.BuildBlock(context.Background()) @@ -160,27 +275,28 @@ func TestNetworkService(t *testing.T) { assert.NotNil(t, blk) assert.NoError(t, blk.Accept(context.Background())) - assert.NoError(t, service.ConsensusParams(nil, nil, reply)) - assert.Equal(t, int64(2), reply.BlockHeight) + reply2, err := service.ConsensusParams(&rpctypes.Context{}, nil) + assert.NoError(t, err) + assert.Equal(t, int64(2), reply2.BlockHeight) }) t.Run("Health", func(t *testing.T) { - reply := new(ctypes.ResultHealth) - assert.NoError(t, service.Health(nil, nil, reply)) + _, err := service.Health(&rpctypes.Context{}) + assert.NoError(t, err) }) } func TestSignService(t *testing.T) { _, _, tx := MakeTxKV() - vm, service, _ := mustNewKVTestVm(t) + tx2 := []byte{0x02} + tx3 := []byte{0x03} + vm, service, msgs := mustNewKVTestVm(t) blk0, err := vm.BuildBlock(context.Background()) assert.ErrorIs(t, err, errNoPendingTxs, "expecting error no txs") assert.Nil(t, blk0) - txArg := &BroadcastTxArgs{tx} - txReply := new(ctypes.ResultBroadcastTx) - err = service.BroadcastTxSync(nil, txArg, txReply) + txReply, err := service.BroadcastTxSync(&rpctypes.Context{}, tx) assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, txReply.Code) @@ -192,40 +308,39 @@ func TestSignService(t *testing.T) { height1 := int64(blk1.Height()) t.Run("Block", func(t *testing.T) { - replyWithoutHeight := new(ctypes.ResultBlock) - assert.NoError(t, service.Block(nil, &BlockHeightArgs{&height1}, replyWithoutHeight)) + replyWithoutHeight, err := service.Block(&rpctypes.Context{}, &height1) + assert.NoError(t, err) if assert.NotNil(t, replyWithoutHeight.Block) { assert.EqualValues(t, height1, replyWithoutHeight.Block.Height) } - reply := new(ctypes.ResultBlock) - assert.NoError(t, service.Block(nil, &BlockHeightArgs{Height: &height1}, reply)) + reply, err := service.Block(&rpctypes.Context{}, &height1) + assert.NoError(t, err) if assert.NotNil(t, reply.Block) { assert.EqualValues(t, height1, reply.Block.Height) } }) t.Run("BlockByHash", func(t *testing.T) { - replyWithoutHash := new(ctypes.ResultBlock) - assert.NoError(t, service.BlockByHash(nil, &BlockHashArgs{}, replyWithoutHash)) + replyWithoutHash, err := service.BlockByHash(&rpctypes.Context{}, []byte{}) + assert.NoError(t, err) assert.Nil(t, replyWithoutHash.Block) - reply := new(ctypes.ResultBlock) hash := blk1.ID() - - assert.NoError(t, service.BlockByHash(nil, &BlockHashArgs{Hash: hash[:]}, reply)) + reply, err := service.BlockByHash(&rpctypes.Context{}, hash[:]) + assert.NoError(t, err) if assert.NotNil(t, reply.Block) { assert.EqualValues(t, hash[:], reply.Block.Hash().Bytes()) } }) t.Run("BlockResults", func(t *testing.T) { - replyWithoutHeight := new(ctypes.ResultBlockResults) - assert.NoError(t, service.BlockResults(nil, &BlockHeightArgs{}, replyWithoutHeight)) + replyWithoutHeight, err := service.BlockResults(&rpctypes.Context{}, nil) + assert.NoError(t, err) assert.Equal(t, height1, replyWithoutHeight.Height) - reply := new(ctypes.ResultBlockResults) - assert.NoError(t, service.BlockResults(nil, &BlockHeightArgs{Height: &height1}, reply)) + reply, err := service.BlockResults(&rpctypes.Context{}, &height1) + assert.NoError(t, err) if assert.NotNil(t, reply.TxsResults) { assert.Equal(t, height1, reply.Height) } @@ -234,21 +349,133 @@ func TestSignService(t *testing.T) { t.Run("Tx", func(t *testing.T) { time.Sleep(2 * time.Second) - reply := new(ctypes.ResultTx) - assert.NoError(t, service.Tx(nil, &TxArgs{Hash: txReply.Hash.Bytes()}, reply)) + reply, err := service.Tx(&rpctypes.Context{}, txReply.Hash.Bytes(), false) + assert.NoError(t, err) assert.EqualValues(t, txReply.Hash, reply.Hash) assert.EqualValues(t, tx, reply.Tx) }) t.Run("TxSearch", func(t *testing.T) { - reply := new(ctypes.ResultTxSearch) - assert.NoError(t, service.TxSearch(nil, &TxSearchArgs{Query: fmt.Sprintf("tx.hash='%s'", txReply.Hash)}, reply)) + txReply2, err := service.BroadcastTxAsync(&rpctypes.Context{}, tx2) + assert.NoError(t, err) + assert.Equal(t, atypes.CodeTypeOK, txReply2.Code) + + blk2, err := vm.BuildBlock(context.Background()) + require.NoError(t, err) + assert.NotNil(t, blk2) + assert.NoError(t, blk2.Accept(context.Background())) + + time.Sleep(time.Second) + + reply, err := service.TxSearch(&rpctypes.Context{}, fmt.Sprintf("tx.hash='%s'", txReply2.Hash), false, nil, nil, "asc") + assert.NoError(t, err) assert.True(t, len(reply.Txs) > 0) + + // TODO: need to fix + // reply2, err := service.TxSearch(&rpctypes.Context{}, fmt.Sprintf("tx.height=%d", blk2.Height()), false, nil, nil, "desc") + // assert.NoError(t, err) + // assert.True(t, len(reply2.Txs) > 0) + }) + + //TODO: Check logic of test + t.Run("Commit", func(t *testing.T) { + txReply, err := service.BroadcastTxAsync(&rpctypes.Context{}, tx3) + require.NoError(t, err) + assert.Equal(t, atypes.CodeTypeOK, txReply.Code) + + assert, require := assert.New(t), require.New(t) + + // get an offset of height to avoid racing and guessing + s, err := service.Status(&rpctypes.Context{}) + require.NoError(err) + // sh is start height or status height + sh := s.SyncInfo.LatestBlockHeight + + // look for the future + h := sh + 20 + _, err = service.Block(&rpctypes.Context{}, &h) + require.Error(err) // no block yet + + // write something + k, v, tx := MakeTxKV() + bres, err := broadcastTx(t, vm, msgs, tx) + require.NoError(err) + require.True(bres.DeliverTx.IsOK()) + time.Sleep(2 * time.Second) + + txh := bres.Height + apph := txh + + // wait before querying + err = WaitForHeight(service, apph, nil) + require.NoError(err) + + qres, err := service.ABCIQuery(&rpctypes.Context{}, "/key", k, 0, false) + require.NoError(err) + if assert.True(qres.Response.IsOK()) { + assert.Equal(k, qres.Response.Key) + assert.EqualValues(v, qres.Response.Value) + } + + // make sure we can lookup the tx with proof + ptx, err := service.Tx(&rpctypes.Context{}, bres.Hash, true) + require.NoError(err) + assert.EqualValues(txh, ptx.Height) + assert.EqualValues(tx, ptx.Tx) + + // and we can even check the block is added + block, err := service.Block(&rpctypes.Context{}, &apph) + require.NoError(err) + appHash := block.Block.Header.AppHash + assert.True(len(appHash) > 0) + assert.EqualValues(apph, block.Block.Header.Height) + + blockByHash, err := service.BlockByHash(&rpctypes.Context{}, block.BlockID.Hash) + require.NoError(err) + require.Equal(block, blockByHash) + + // now check the results + blockResults, err := service.BlockResults(&rpctypes.Context{}, &txh) + require.Nil(err, "%+v", err) + assert.Equal(txh, blockResults.Height) + if assert.Equal(2, len(blockResults.TxsResults)) { + // check success code + assert.EqualValues(0, blockResults.TxsResults[0].Code) + } + + // check blockchain info, now that we know there is info + info, err := service.BlockchainInfo(&rpctypes.Context{}, apph, apph) + require.NoError(err) + assert.True(info.LastHeight >= apph) + if assert.Equal(1, len(info.BlockMetas)) { + lastMeta := info.BlockMetas[0] + assert.EqualValues(apph, lastMeta.Header.Height) + blockData := block.Block + assert.Equal(blockData.Header.AppHash, lastMeta.Header.AppHash) + assert.Equal(block.BlockID, lastMeta.BlockID) + } + + // and get the corresponding commit with the same apphash + commit, err := service.Commit(&rpctypes.Context{}, &apph) + require.NoError(err) + assert.NotNil(commit) + assert.Equal(appHash, commit.Header.AppHash) + + // compare the commits (note Commit(2) has commit from Block(3)) + h = apph - 1 + commit2, err := service.Commit(&rpctypes.Context{}, &h) + require.NoError(err) + assert.Equal(block.Block.LastCommitHash, commit2.Commit.Hash()) + + // and we got a proof that works! + pres, err := service.ABCIQuery(&rpctypes.Context{}, "/key", k, 0, true) + require.NoError(err) + assert.True(pres.Response.IsOK()) }) t.Run("BlockSearch", func(t *testing.T) { - reply := new(ctypes.ResultBlockSearch) - assert.NoError(t, service.BlockSearch(nil, &BlockSearchArgs{Query: "block.height=2"}, reply)) + reply, err := service.BlockSearch(&rpctypes.Context{}, "block.height=2", nil, nil, "desc") + assert.NoError(t, err) assert.True(t, len(reply.Blocks) > 0) }) } @@ -260,17 +487,13 @@ func TestStatusService(t *testing.T) { assert.ErrorIs(t, err, errNoPendingTxs, "expecting error no txs") assert.Nil(t, blk0) - txArg := &BroadcastTxArgs{ - Tx: []byte{0x01}, - } - txReply := &ctypes.ResultBroadcastTx{} - err = service.BroadcastTxSync(nil, txArg, txReply) + txReply, err := service.BroadcastTxSync(&rpctypes.Context{}, []byte{0x01}) assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, txReply.Code) t.Run("Status", func(t *testing.T) { - reply1 := new(ctypes.ResultStatus) - assert.NoError(t, service.Status(nil, nil, reply1)) + reply1, err := service.Status(&rpctypes.Context{}) + assert.NoError(t, err) assert.Equal(t, int64(1), reply1.SyncInfo.LatestBlockHeight) blk, err := vm.BuildBlock(context.Background()) @@ -278,8 +501,8 @@ func TestStatusService(t *testing.T) { assert.NotNil(t, blk) assert.NoError(t, blk.Accept(context.Background())) - reply2 := new(ctypes.ResultStatus) - assert.NoError(t, service.Status(nil, nil, reply2)) + reply2, err := service.Status(&rpctypes.Context{}) + assert.NoError(t, err) assert.Equal(t, int64(2), reply2.SyncInfo.LatestBlockHeight) }) } @@ -291,32 +514,31 @@ func TestMempoolService(t *testing.T) { assert.ErrorIs(t, err, errNoPendingTxs, "expecting error no txs") assert.Nil(t, blk0) - txArg := &BroadcastTxArgs{ - Tx: []byte{0x01}, - } - txReply := &ctypes.ResultBroadcastTx{} - err = service.BroadcastTxSync(nil, txArg, txReply) + tx := []byte{0x01} + expectedTx := types.Tx(tx) + txReply, err := service.BroadcastTxSync(&rpctypes.Context{}, []byte{0x01}) assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, txReply.Code) t.Run("UnconfirmedTxs", func(t *testing.T) { limit := 100 - reply := new(ctypes.ResultUnconfirmedTxs) - assert.NoError(t, service.UnconfirmedTxs(nil, &UnconfirmedTxsArgs{Limit: &limit}, reply)) + reply, err := service.UnconfirmedTxs(&rpctypes.Context{}, &limit) + assert.NoError(t, err) assert.True(t, len(reply.Txs) == 1) - assert.Equal(t, reply.Txs[0], txArg.Tx) + assert.Equal(t, expectedTx, reply.Txs[0]) }) t.Run("NumUnconfirmedTxs", func(t *testing.T) { - reply := new(ctypes.ResultUnconfirmedTxs) - assert.NoError(t, service.NumUnconfirmedTxs(nil, nil, reply)) + reply, err := service.NumUnconfirmedTxs(&rpctypes.Context{}) + assert.NoError(t, err) assert.Equal(t, reply.Count, 1) assert.Equal(t, reply.Total, 1) }) t.Run("CheckTx", func(t *testing.T) { - reply1 := new(ctypes.ResultCheckTx) - assert.NoError(t, service.CheckTx(nil, &CheckTxArgs{Tx: txArg.Tx}, reply1)) + reply1, err := service.CheckTx(&rpctypes.Context{}, tx) + assert.NoError(t, err) + t.Logf("%v\n", reply1) // ToDo: check reply1 blk, err := vm.BuildBlock(context.Background()) @@ -324,8 +546,9 @@ func TestMempoolService(t *testing.T) { assert.NotNil(t, blk) assert.NoError(t, blk.Accept(context.Background())) - reply2 := new(ctypes.ResultCheckTx) - assert.NoError(t, service.CheckTx(nil, &CheckTxArgs{Tx: txArg.Tx}, reply2)) + reply2, err := service.CheckTx(&rpctypes.Context{}, tx) + assert.NoError(t, err) // ToDo: check reply2 + t.Logf("%v\n", reply2) }) } diff --git a/vm/vm.go b/vm/vm.go index 3e5b9140b..366117152 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -2,13 +2,11 @@ package vm import ( "context" + "encoding/base64" "errors" "fmt" - "net/http" "time" - "github.com/gorilla/rpc/v2" - "github.com/ava-labs/avalanchego/api/health" "github.com/ava-labs/avalanchego/api/metrics" "github.com/ava-labs/avalanchego/database/manager" @@ -21,19 +19,18 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils" - "github.com/ava-labs/avalanchego/utils/json" "github.com/ava-labs/avalanchego/version" abciTypes "github.com/consideritdone/landslidecore/abci/types" "github.com/consideritdone/landslidecore/config" "github.com/consideritdone/landslidecore/consensus" + "github.com/consideritdone/landslidecore/crypto/secp256k1" "github.com/consideritdone/landslidecore/crypto/tmhash" "github.com/consideritdone/landslidecore/libs/log" mempl "github.com/consideritdone/landslidecore/mempool" "github.com/consideritdone/landslidecore/node" tmproto "github.com/consideritdone/landslidecore/proto/tendermint/types" "github.com/consideritdone/landslidecore/proxy" - rpccore "github.com/consideritdone/landslidecore/rpc/core" rpcserver "github.com/consideritdone/landslidecore/rpc/jsonrpc/server" "github.com/consideritdone/landslidecore/state" "github.com/consideritdone/landslidecore/state/indexer" @@ -76,6 +73,7 @@ var ( dbPrefixBlockIndexer = []byte("block-indexer") proposerAddress = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + proposerPubKey = secp256k1.PubKey{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} errInvalidBlock = errors.New("invalid block") errNoPendingTxs = errors.New("there is no txs to include to block") @@ -88,6 +86,8 @@ type ( appCreator AppCreator app proxy.AppConns + rpcConfig *config.RPCConfig + log log.Logger chainCtx *snow.Context toEngine chan<- common.Message @@ -96,6 +96,7 @@ type ( stateStore state.Store state state.State genesis *types.GenesisDoc + genChunks []string mempool *mempl.CListMempool eventBus *types.EventBus @@ -308,6 +309,7 @@ func (vm *VM) Initialize( vm.toEngine = toEngine vm.log = log.NewTMLogger(vm.chainCtx.Log).With("module", "vm") vm.verifiedBlocks = make(map[ids.ID]*Block) + vm.rpcConfig = config.DefaultRPCConfig() db := dbManager.Current().Database @@ -329,6 +331,13 @@ func (vm *VM) Initialize( if err != nil { return nil } + for i := 0; i < len(genesisBytes); i += genesisChunkSize { + end := i + genesisChunkSize + if end > len(genesisBytes) { + end = len(genesisBytes) + } + vm.genChunks = append(vm.genChunks, base64.StdEncoding.EncodeToString(genesisBytes[i:end])) + } vm.app, err = node.CreateAndStartProxyAppConns(proxy.NewLocalClientCreator(app), vm.log) if err != nil { @@ -496,21 +505,13 @@ func (vm *VM) CreateStaticHandlers(context.Context) (map[string]*common.HTTPHand // it have an extension called `accounts`, where clients could get // information about their accounts. func (vm *VM) CreateHandlers(context.Context) (map[string]*common.HTTPHandler, error) { - mux := http.NewServeMux() - rpcLogger := vm.log.With("module", "rpc-server") - rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger) - - server := rpc.NewServer() - server.RegisterCodec(json.NewCodec(), "application/json") - server.RegisterCodec(json.NewCodec(), "application/json;charset=UTF-8") - if err := server.RegisterService(NewService(vm), Name); err != nil { - return nil, err - } - return map[string]*common.HTTPHandler{ "/rpc": { LockOptions: common.WriteLock, - Handler: server, + Handler: rpcserver.MakeJSONRPCHandler( + NewServiceAsRPCRoutes(vm), + vm.log.With("module", "rpc"), + ), }, }, nil } diff --git a/vm/vm_test.go b/vm/vm_test.go index 212eb7513..06b92ba91 100644 --- a/vm/vm_test.go +++ b/vm/vm_test.go @@ -21,7 +21,6 @@ import ( "github.com/consideritdone/landslidecore/abci/example/counter" atypes "github.com/consideritdone/landslidecore/abci/types" tmrand "github.com/consideritdone/landslidecore/libs/rand" - ctypes "github.com/consideritdone/landslidecore/rpc/core/types" ) var ( @@ -99,11 +98,7 @@ func TestInitVm(t *testing.T) { assert.Nil(t, blk0) // submit first tx (0x00) - args := &BroadcastTxArgs{ - Tx: []byte{0x00}, - } - reply := &ctypes.ResultBroadcastTx{} - err = service.BroadcastTxSync(nil, args, reply) + reply, err := service.BroadcastTxSync(nil, []byte{0x00}) assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, reply.Code) @@ -128,20 +123,12 @@ func TestInitVm(t *testing.T) { t.Logf("TM Block Tx count: %d", len(tmBlk1.Data.Txs)) // submit second tx (0x01) - args = &BroadcastTxArgs{ - Tx: []byte{0x01}, - } - reply = &ctypes.ResultBroadcastTx{} - err = service.BroadcastTxSync(nil, args, reply) + reply, err = service.BroadcastTxSync(nil, []byte{0x01}) assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, reply.Code) // submit 3rd tx (0x02) - args = &BroadcastTxArgs{ - Tx: []byte{0x02}, - } - reply = &ctypes.ResultBroadcastTx{} - err = service.BroadcastTxSync(nil, args, reply) + reply, err = service.BroadcastTxSync(nil, []byte{0x02}) assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, reply.Code)