From 8a0d542498291db0b56b7e43fabbb88c9ee740c3 Mon Sep 17 00:00:00 2001 From: "DESKTOP-765JFGJ\\Admin" Date: Fri, 9 Jun 2023 10:51:25 +0200 Subject: [PATCH 01/10] preliminary version of rpc server --- vm/vm.go | 164 ++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 151 insertions(+), 13 deletions(-) diff --git a/vm/vm.go b/vm/vm.go index a07226755..05d61d4ac 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "net/http" + "net/rpc" "time" "github.com/ava-labs/avalanchego/api/metrics" @@ -18,11 +19,9 @@ import ( "github.com/ava-labs/avalanchego/snow/consensus/snowman" "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" - "github.com/ava-labs/avalanchego/utils/json" "github.com/ava-labs/avalanchego/utils/timer/mockable" "github.com/ava-labs/avalanchego/version" "github.com/ava-labs/avalanchego/vms/components/chain" - "github.com/gorilla/rpc/v2" "github.com/prometheus/client_golang/prometheus" dbm "github.com/tendermint/tm-db" @@ -35,7 +34,6 @@ import ( "github.com/consideritdone/landslidecore/node" tmproto "github.com/consideritdone/landslidecore/proto/tendermint/types" "github.com/consideritdone/landslidecore/proxy" - rpccore "github.com/consideritdone/landslidecore/rpc/core" rpcserver "github.com/consideritdone/landslidecore/rpc/jsonrpc/server" sm "github.com/consideritdone/landslidecore/state" "github.com/consideritdone/landslidecore/state/indexer" @@ -620,26 +618,166 @@ func (vm *VM) CreateStaticHandlers(ctx context.Context) (map[string]*common.HTTP return nil, nil } +// Routes is a map of available routes. +var Routes = map[string]*rpcserver.RPCFunc{ + //// subscribe/unsubscribe are reserved for websocket events. + //"subscribe": rpc.NewWSRPCFunc(Subscribe, "query"), + //"unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "query"), + //"unsubscribe_all": rpc.NewWSRPCFunc(UnsubscribeAll, ""), + // + //// info API + //"health": rpc.NewRPCFunc(Health, ""), + //"status": rpc.NewRPCFunc(Status, ""), + //"net_info": rpc.NewRPCFunc(NetInfo, ""), + //"blockchain": rpc.NewRPCFunc(BlockchainInfo, "minHeight,maxHeight"), + //"genesis": rpc.NewRPCFunc(Genesis, ""), + //"genesis_chunked": rpc.NewRPCFunc(GenesisChunked, "chunk"), + //"block": rpc.NewRPCFunc(Block, "height"), + //"block_by_hash": rpc.NewRPCFunc(BlockByHash, "hash"), + //"block_results": rpc.NewRPCFunc(BlockResults, "height"), + //"commit": rpc.NewRPCFunc(Commit, "height"), + //"check_tx": rpc.NewRPCFunc(CheckTx, "tx"), + //"tx": rpc.NewRPCFunc(Tx, "hash,prove"), + //"tx_search": rpc.NewRPCFunc(TxSearch, "query,prove,page,per_page,order_by"), + //"block_search": rpc.NewRPCFunc(BlockSearch, "query,page,per_page,order_by"), + //"validators": rpc.NewRPCFunc(Validators, "height,page,per_page"), + //"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""), + //"consensus_state": rpc.NewRPCFunc(ConsensusState, ""), + //"consensus_params": rpc.NewRPCFunc(ConsensusParams, "height"), + //"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxs, "limit"), + //"num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxs, ""), + // + //// tx broadcast API + //"broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommit, "tx"), + //"broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSync, "tx"), + //"broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsync, "tx"), + // + //// abci API + //"abci_query": rpc.NewRPCFunc(ABCIQuery, "path,data,height,prove"), + "abci_info": rpcserver.NewRPCFunc(ABCIInfo, ""), + // + //// evidence API + //"broadcast_evidence": rpc.NewRPCFunc(BroadcastEvidence, "evidence"), +} + +//func (vm *VM) CreateHandlers(_ context.Context) (map[string]*common.HTTPHandler, error) { +// server := rpc.NewServer() +// tmService := NewService(vm) +// err := rpc.Register(tmService) +// if err != nil { +// return nil, fmt.Errorf("Failed to create vm handlers: failed to register tendermint service: %w ", err) +// } +// rpc.HandleHTTP() +// +// return map[string]*common.HTTPHandler{ +// "/rpc": { +// LockOptions: common.WriteLock, +// Handler: func(w http.ResponseWriter, r *http.Request) { +// if r.Method != "POST" { +// http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) +// return +// } +// +// err := r.ParseForm() +// if err != nil { +// http.Error(w, "Bad request", http.StatusBadRequest) +// return +// } +// +// method := r.FormValue("method") +// if method == "" { +// http.Error(w, "Method not specified", http.StatusBadRequest) +// return +// } +// +// switch method { +// case "math_add": +// //Math add handler +// +// case "math_subtract": +// //Math subc=stract handler +// +// default: +// http.Error(w, "Method not found", http.StatusNotFound) +// return +// } +// }, +// }, +// }, nil +//} + func (vm *VM) CreateHandlers(_ context.Context) (map[string]*common.HTTPHandler, error) { - mux := http.NewServeMux() - rpcLogger := vm.tmLogger.With("module", "rpc-server") - rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger) + //server := rpc.NewServer() + //tmService := NewService(vm) + //err := rpc.Register(tmService) + //if err != nil { + // return nil, fmt.Errorf("Failed to create vm handlers: failed to register tendermint service: %w ", err) + //} + //rpc.HandleHTTP() - server := rpc.NewServer() - server.RegisterCodec(json.NewCodec(), "application/json") - server.RegisterCodec(json.NewCodec(), "application/json;charset=UTF-8") - if err := server.RegisterService(NewService(vm), Name); err != nil { - return nil, err - } + rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger) return map[string]*common.HTTPHandler{ "/rpc": { LockOptions: common.WriteLock, - Handler: server, + Handler: rpcserver. }, }, nil } +// convert from a function name to the http handler +func makeHTTPHandler(rpcFunc *RPCFunc, logger log.Logger) func(http.ResponseWriter, *http.Request) { + // Always return -1 as there's no ID here. + dummyID := types.JSONRPCIntID(-1) // URIClientRequestID + + // Exception for websocket endpoints + if rpcFunc.ws { + return func(w http.ResponseWriter, r *http.Request) { + res := types.RPCMethodNotFoundError(dummyID) + if wErr := WriteRPCResponseHTTPError(w, http.StatusNotFound, res); wErr != nil { + logger.Error("failed to write response", "res", res, "err", wErr) + } + } + } + + // All other endpoints + return func(w http.ResponseWriter, r *http.Request) { + logger.Debug("HTTP HANDLER", "req", r) + + ctx := &types.Context{HTTPReq: r} + args := []reflect.Value{reflect.ValueOf(ctx)} + + fnArgs, err := httpParamsToArgs(rpcFunc, r) + if err != nil { + res := types.RPCInvalidParamsError(dummyID, + fmt.Errorf("error converting http params to arguments: %w", err), + ) + if wErr := WriteRPCResponseHTTPError(w, http.StatusInternalServerError, res); wErr != nil { + logger.Error("failed to write response", "res", res, "err", wErr) + } + return + } + args = append(args, fnArgs...) + + returns := rpcFunc.f.Call(args) + + logger.Debug("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) + result, err := unreflectResult(returns) + if err != nil { + if err := WriteRPCResponseHTTPError(w, http.StatusInternalServerError, + types.RPCInternalError(dummyID, err)); err != nil { + logger.Error("failed to write response", "res", result, "err", err) + return + } + return + } + if err := WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(dummyID, result)); err != nil { + logger.Error("failed to write response", "res", result, "err", err) + return + } + } +} + func (vm *VM) ProxyApp() proxy.AppConns { return vm.proxyApp } From 0e4ad5f705e6466ed8c8c855c6b6afbcf20649c3 Mon Sep 17 00:00:00 2001 From: Ivan Sukach Date: Thu, 15 Jun 2023 10:37:57 +0200 Subject: [PATCH 02/10] setup RPC server properly --- go.mod | 2 +- rpc/jsonrpc/server/http_json_handler.go | 17 +- rpc/jsonrpc/server/rpc_func.go | 2 +- testFile.txt | 1 + vm/service.go | 24 +-- vm/vm.go | 204 ++++++++++++------------ 6 files changed, 128 insertions(+), 122 deletions(-) create mode 100644 testFile.txt diff --git a/go.mod b/go.mod index 52b9c6ffd..530ebc9af 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,6 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.2 github.com/google/orderedcode v0.0.1 - github.com/gorilla/rpc v1.2.0 github.com/gorilla/websocket v1.5.0 github.com/gtank/merlin v0.1.1 github.com/lib/pq v1.10.4 @@ -67,6 +66,7 @@ require ( github.com/golang/mock v1.6.0 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.1.2 // indirect + github.com/gorilla/rpc v1.2.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.12.0 // indirect github.com/gtank/ristretto255 v0.1.2 // indirect diff --git a/rpc/jsonrpc/server/http_json_handler.go b/rpc/jsonrpc/server/http_json_handler.go index b51f1f231..35dd1db54 100644 --- a/rpc/jsonrpc/server/http_json_handler.go +++ b/rpc/jsonrpc/server/http_json_handler.go @@ -17,7 +17,7 @@ import ( // HTTP + JSON handler // jsonrpc calls grab the given method's function info and runs reflect.Call -func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.HandlerFunc { +func MakeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { b, err := ioutil.ReadAll(r.Body) if err != nil { @@ -67,13 +67,14 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.Han ) continue } - if len(r.URL.Path) > 1 { - responses = append( - responses, - types.RPCInvalidRequestError(request.ID, fmt.Errorf("path %s is invalid", r.URL.Path)), - ) - continue - } + //TODO + //if len(r.URL.Path) > 1 { + // responses = append( + // responses, + // types.RPCInvalidRequestError(request.ID, fmt.Errorf("path %s is invalid", r.URL.Path)), + // ) + // continue + //} rpcFunc, ok := funcMap[request.Method] if !ok || rpcFunc.ws { responses = append(responses, types.RPCMethodNotFoundError(request.ID)) diff --git a/rpc/jsonrpc/server/rpc_func.go b/rpc/jsonrpc/server/rpc_func.go index 9f39c3664..03925c0c4 100644 --- a/rpc/jsonrpc/server/rpc_func.go +++ b/rpc/jsonrpc/server/rpc_func.go @@ -20,7 +20,7 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger lo } // JSONRPC endpoints - mux.HandleFunc("/", handleInvalidJSONRPCPaths(makeJSONRPCHandler(funcMap, logger))) + mux.HandleFunc("/", handleInvalidJSONRPCPaths(MakeJSONRPCHandler(funcMap, logger))) } // Function introspection diff --git a/testFile.txt b/testFile.txt new file mode 100644 index 000000000..519c053d6 --- /dev/null +++ b/testFile.txt @@ -0,0 +1 @@ +start backend grpc: Time: 2023-06-15 10:09:31.532974728 +0200 CEST m=+0.900550567 \ No newline at end of file diff --git a/vm/service.go b/vm/service.go index 7caa1bcbd..7a30c45ea 100644 --- a/vm/service.go +++ b/vm/service.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + rpctypes "github.com/consideritdone/landslidecore/rpc/jsonrpc/types" "net/http" "sort" "time" @@ -14,7 +15,6 @@ import ( tmquery "github.com/consideritdone/landslidecore/libs/pubsub/query" mempl "github.com/consideritdone/landslidecore/mempool" "github.com/consideritdone/landslidecore/p2p" - "github.com/consideritdone/landslidecore/proxy" "github.com/consideritdone/landslidecore/rpc/core" ctypes "github.com/consideritdone/landslidecore/rpc/core/types" "github.com/consideritdone/landslidecore/types" @@ -56,7 +56,8 @@ type ( ABCIService interface { // Reading from abci app - ABCIInfo(_ *http.Request, _ *struct{}, reply *ctypes.ResultABCIInfo) error + ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) + //ABCIInfo(_ *http.Request, _ *struct{}, reply *ctypes.ResultABCIInfo) error ABCIQuery(_ *http.Request, args *ABCIQueryArgs, reply *ctypes.ResultABCIQuery) error ABCIQueryWithOptions(_ *http.Request, args *ABCIQueryWithOptionsArgs, reply *ctypes.ResultABCIQuery) error @@ -169,13 +170,18 @@ func NewService(vm *VM) Service { return &LocalService{vm} } -func (s *LocalService) ABCIInfo(_ *http.Request, _ *struct{}, reply *ctypes.ResultABCIInfo) error { - resInfo, err := s.vm.proxyApp.Query().InfoSync(proxy.RequestInfo) - if err != nil { - return err - } - reply.Response = *resInfo - return nil +//func (s *LocalService) ABCIInfo(_ *http.Request, _ *struct{}, reply *ctypes.ResultABCIInfo) error { +// resInfo, err := s.vm.proxyApp.Query().InfoSync(proxy.RequestInfo) +// if err != nil { +// return err +// } +// reply.Response = *resInfo +// return nil +//} + +func (s *LocalService) ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) { + + return &ctypes.ResultABCIInfo{Response: abci.ResponseInfo{}}, nil } func (s *LocalService) ABCIQuery(req *http.Request, args *ABCIQueryArgs, reply *ctypes.ResultABCIQuery) error { diff --git a/vm/vm.go b/vm/vm.go index 05d61d4ac..916f83604 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "net/http" - "net/rpc" "time" "github.com/ava-labs/avalanchego/api/metrics" @@ -619,45 +618,48 @@ func (vm *VM) CreateStaticHandlers(ctx context.Context) (map[string]*common.HTTP } // Routes is a map of available routes. -var Routes = map[string]*rpcserver.RPCFunc{ - //// subscribe/unsubscribe are reserved for websocket events. - //"subscribe": rpc.NewWSRPCFunc(Subscribe, "query"), - //"unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "query"), - //"unsubscribe_all": rpc.NewWSRPCFunc(UnsubscribeAll, ""), - // - //// info API - //"health": rpc.NewRPCFunc(Health, ""), - //"status": rpc.NewRPCFunc(Status, ""), - //"net_info": rpc.NewRPCFunc(NetInfo, ""), - //"blockchain": rpc.NewRPCFunc(BlockchainInfo, "minHeight,maxHeight"), - //"genesis": rpc.NewRPCFunc(Genesis, ""), - //"genesis_chunked": rpc.NewRPCFunc(GenesisChunked, "chunk"), - //"block": rpc.NewRPCFunc(Block, "height"), - //"block_by_hash": rpc.NewRPCFunc(BlockByHash, "hash"), - //"block_results": rpc.NewRPCFunc(BlockResults, "height"), - //"commit": rpc.NewRPCFunc(Commit, "height"), - //"check_tx": rpc.NewRPCFunc(CheckTx, "tx"), - //"tx": rpc.NewRPCFunc(Tx, "hash,prove"), - //"tx_search": rpc.NewRPCFunc(TxSearch, "query,prove,page,per_page,order_by"), - //"block_search": rpc.NewRPCFunc(BlockSearch, "query,page,per_page,order_by"), - //"validators": rpc.NewRPCFunc(Validators, "height,page,per_page"), - //"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""), - //"consensus_state": rpc.NewRPCFunc(ConsensusState, ""), - //"consensus_params": rpc.NewRPCFunc(ConsensusParams, "height"), - //"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxs, "limit"), - //"num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxs, ""), - // - //// tx broadcast API - //"broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommit, "tx"), - //"broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSync, "tx"), - //"broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsync, "tx"), - // - //// abci API - //"abci_query": rpc.NewRPCFunc(ABCIQuery, "path,data,height,prove"), - "abci_info": rpcserver.NewRPCFunc(ABCIInfo, ""), - // - //// evidence API - //"broadcast_evidence": rpc.NewRPCFunc(BroadcastEvidence, "evidence"), +func (vm *VM) RPCRoutes() map[string]*rpcserver.RPCFunc { + vmTMService := NewService(vm) + return map[string]*rpcserver.RPCFunc{ + //// subscribe/unsubscribe are reserved for websocket events. + //"subscribe": rpc.NewWSRPCFunc(Subscribe, "query"), + //"unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "query"), + //"unsubscribe_all": rpc.NewWSRPCFunc(UnsubscribeAll, ""), + // + //// info API + //"health": rpc.NewRPCFunc(Health, ""), + //"status": rpc.NewRPCFunc(Status, ""), + //"net_info": rpc.NewRPCFunc(NetInfo, ""), + //"blockchain": rpc.NewRPCFunc(BlockchainInfo, "minHeight,maxHeight"), + //"genesis": rpc.NewRPCFunc(Genesis, ""), + //"genesis_chunked": rpc.NewRPCFunc(GenesisChunked, "chunk"), + //"block": rpc.NewRPCFunc(Block, "height"), + //"block_by_hash": rpc.NewRPCFunc(BlockByHash, "hash"), + //"block_results": rpc.NewRPCFunc(BlockResults, "height"), + //"commit": rpc.NewRPCFunc(Commit, "height"), + //"check_tx": rpc.NewRPCFunc(CheckTx, "tx"), + //"tx": rpc.NewRPCFunc(Tx, "hash,prove"), + //"tx_search": rpc.NewRPCFunc(TxSearch, "query,prove,page,per_page,order_by"), + //"block_search": rpc.NewRPCFunc(BlockSearch, "query,page,per_page,order_by"), + //"validators": rpc.NewRPCFunc(Validators, "height,page,per_page"), + //"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""), + //"consensus_state": rpc.NewRPCFunc(ConsensusState, ""), + //"consensus_params": rpc.NewRPCFunc(ConsensusParams, "height"), + //"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxs, "limit"), + //"num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxs, ""), + // + //// tx broadcast API + //"broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommit, "tx"), + //"broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSync, "tx"), + //"broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsync, "tx"), + // + //// abci API + //"abci_query": rpc.NewRPCFunc(ABCIQuery, "path,data,height,prove"), + "abci_info": rpcserver.NewRPCFunc(vmTMService.ABCIInfo, ""), + // + //// evidence API + //"broadcast_evidence": rpc.NewRPCFunc(BroadcastEvidence, "evidence"), + } } //func (vm *VM) CreateHandlers(_ context.Context) (map[string]*common.HTTPHandler, error) { @@ -672,7 +674,7 @@ var Routes = map[string]*rpcserver.RPCFunc{ // return map[string]*common.HTTPHandler{ // "/rpc": { // LockOptions: common.WriteLock, -// Handler: func(w http.ResponseWriter, r *http.Request) { +// Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // if r.Method != "POST" { // http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) // return @@ -701,82 +703,78 @@ var Routes = map[string]*rpcserver.RPCFunc{ // http.Error(w, "Method not found", http.StatusNotFound) // return // } -// }, +// }), // }, // }, nil //} func (vm *VM) CreateHandlers(_ context.Context) (map[string]*common.HTTPHandler, error) { - //server := rpc.NewServer() - //tmService := NewService(vm) - //err := rpc.Register(tmService) - //if err != nil { - // return nil, fmt.Errorf("Failed to create vm handlers: failed to register tendermint service: %w ", err) - //} - //rpc.HandleHTTP() + mux := http.NewServeMux() - rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger) + // 1) Register regular routes. + routes := vm.RPCRoutes() + mux.HandleFunc("/", rpcserver.MakeJSONRPCHandler(routes, vm.tmLogger)) return map[string]*common.HTTPHandler{ "/rpc": { LockOptions: common.WriteLock, - Handler: rpcserver. + Handler: mux, }, }, nil } -// convert from a function name to the http handler -func makeHTTPHandler(rpcFunc *RPCFunc, logger log.Logger) func(http.ResponseWriter, *http.Request) { - // Always return -1 as there's no ID here. - dummyID := types.JSONRPCIntID(-1) // URIClientRequestID - - // Exception for websocket endpoints - if rpcFunc.ws { - return func(w http.ResponseWriter, r *http.Request) { - res := types.RPCMethodNotFoundError(dummyID) - if wErr := WriteRPCResponseHTTPError(w, http.StatusNotFound, res); wErr != nil { - logger.Error("failed to write response", "res", res, "err", wErr) - } - } - } - - // All other endpoints - return func(w http.ResponseWriter, r *http.Request) { - logger.Debug("HTTP HANDLER", "req", r) - - ctx := &types.Context{HTTPReq: r} - args := []reflect.Value{reflect.ValueOf(ctx)} - - fnArgs, err := httpParamsToArgs(rpcFunc, r) - if err != nil { - res := types.RPCInvalidParamsError(dummyID, - fmt.Errorf("error converting http params to arguments: %w", err), - ) - if wErr := WriteRPCResponseHTTPError(w, http.StatusInternalServerError, res); wErr != nil { - logger.Error("failed to write response", "res", res, "err", wErr) - } - return - } - args = append(args, fnArgs...) - - returns := rpcFunc.f.Call(args) - - logger.Debug("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - if err := WriteRPCResponseHTTPError(w, http.StatusInternalServerError, - types.RPCInternalError(dummyID, err)); err != nil { - logger.Error("failed to write response", "res", result, "err", err) - return - } - return - } - if err := WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(dummyID, result)); err != nil { - logger.Error("failed to write response", "res", result, "err", err) - return - } - } -} +//// convert from a function name to the http handler +//func makeHTTPHandler(rpcFunc *RPCFunc, logger log.Logger) func(http.ResponseWriter, *http.Request) { +// // Always return -1 as there's no ID here. +// dummyID := types.JSONRPCIntID(-1) // URIClientRequestID +// +// // Exception for websocket endpoints +// if rpcFunc.ws { +// return func(w http.ResponseWriter, r *http.Request) { +// res := types.RPCMethodNotFoundError(dummyID) +// if wErr := WriteRPCResponseHTTPError(w, http.StatusNotFound, res); wErr != nil { +// logger.Error("failed to write response", "res", res, "err", wErr) +// } +// } +// } +// +// // All other endpoints +// return func(w http.ResponseWriter, r *http.Request) { +// logger.Debug("HTTP HANDLER", "req", r) +// +// ctx := &types.Context{HTTPReq: r} +// args := []reflect.Value{reflect.ValueOf(ctx)} +// +// fnArgs, err := httpParamsToArgs(rpcFunc, r) +// if err != nil { +// res := types.RPCInvalidParamsError(dummyID, +// fmt.Errorf("error converting http params to arguments: %w", err), +// ) +// if wErr := WriteRPCResponseHTTPError(w, http.StatusInternalServerError, res); wErr != nil { +// logger.Error("failed to write response", "res", res, "err", wErr) +// } +// return +// } +// args = append(args, fnArgs...) +// +// returns := rpcFunc.f.Call(args) +// +// logger.Debug("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) +// result, err := unreflectResult(returns) +// if err != nil { +// if err := WriteRPCResponseHTTPError(w, http.StatusInternalServerError, +// types.RPCInternalError(dummyID, err)); err != nil { +// logger.Error("failed to write response", "res", result, "err", err) +// return +// } +// return +// } +// if err := WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(dummyID, result)); err != nil { +// logger.Error("failed to write response", "res", result, "err", err) +// return +// } +// } +//} func (vm *VM) ProxyApp() proxy.AppConns { return vm.proxyApp From fdfaf55a6a36918f9a14573ae6b7119dd9a9628d Mon Sep 17 00:00:00 2001 From: Ivan Sukach Date: Thu, 15 Jun 2023 10:38:51 +0200 Subject: [PATCH 03/10] setup RPC server properly --- testFile.txt | 1 - 1 file changed, 1 deletion(-) delete mode 100644 testFile.txt diff --git a/testFile.txt b/testFile.txt deleted file mode 100644 index 519c053d6..000000000 --- a/testFile.txt +++ /dev/null @@ -1 +0,0 @@ -start backend grpc: Time: 2023-06-15 10:09:31.532974728 +0200 CEST m=+0.900550567 \ No newline at end of file From 45b31e0c9292b6c6dfeef29c2c500ac5e57265a2 Mon Sep 17 00:00:00 2001 From: Ivan Sukach Date: Sat, 17 Jun 2023 21:48:53 +0200 Subject: [PATCH 04/10] implement preliminary version of RPC server --- vm/service.go | 720 +++++++++++++++++++++++--------------------- vm/service_utils.go | 6 +- vm/vm.go | 181 +++-------- 3 files changed, 419 insertions(+), 488 deletions(-) diff --git a/vm/service.go b/vm/service.go index 7a30c45ea..beb66e6dc 100644 --- a/vm/service.go +++ b/vm/service.go @@ -4,12 +4,16 @@ import ( "context" "errors" "fmt" + tmpubsub "github.com/consideritdone/landslidecore/libs/pubsub" + "github.com/consideritdone/landslidecore/proxy" rpctypes "github.com/consideritdone/landslidecore/rpc/jsonrpc/types" - "net/http" + blockidxnull "github.com/consideritdone/landslidecore/state/indexer/block/null" + "github.com/consideritdone/landslidecore/state/txindex/null" "sort" "time" abci "github.com/consideritdone/landslidecore/abci/types" + "github.com/consideritdone/landslidecore/libs/bytes" tmbytes "github.com/consideritdone/landslidecore/libs/bytes" tmmath "github.com/consideritdone/landslidecore/libs/math" tmquery "github.com/consideritdone/landslidecore/libs/pubsub/query" @@ -20,6 +24,8 @@ import ( "github.com/consideritdone/landslidecore/types" ) +const SubscribeTimeout = 5 * time.Second + type ( LocalService struct { vm *VM @@ -27,202 +33,218 @@ type ( Service interface { ABCIService - HistoryService - NetworkService - SignService - StatusService - MempoolService - } - - ABCIQueryArgs struct { - Path string `json:"path"` - Data tmbytes.HexBytes `json:"data"` - } - - ABCIQueryOptions struct { - Height int64 `json:"height"` - Prove bool `json:"prove"` - } - - ABCIQueryWithOptionsArgs struct { - Path string `json:"path"` - Data tmbytes.HexBytes `json:"data"` - Opts ABCIQueryOptions `json:"opts"` - } - - BroadcastTxArgs struct { - Tx types.Tx `json:"tx"` + EventsService + HistoryClient + NetworkClient + SignClient + StatusClient + MempoolClient } ABCIService interface { // Reading from abci app ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) - //ABCIInfo(_ *http.Request, _ *struct{}, reply *ctypes.ResultABCIInfo) error - ABCIQuery(_ *http.Request, args *ABCIQueryArgs, reply *ctypes.ResultABCIQuery) error - ABCIQueryWithOptions(_ *http.Request, args *ABCIQueryWithOptionsArgs, reply *ctypes.ResultABCIQuery) error + ABCIQuery(ctx *rpctypes.Context, path string, data bytes.HexBytes, height int64, prove bool) (*ctypes.ResultABCIQuery, error) // Writing to abci app - BroadcastTxCommit(_ *http.Request, args *BroadcastTxArgs, reply *ctypes.ResultBroadcastTxCommit) error - BroadcastTxAsync(_ *http.Request, args *BroadcastTxArgs, reply *ctypes.ResultBroadcastTx) error - BroadcastTxSync(_ *http.Request, args *BroadcastTxArgs, reply *ctypes.ResultBroadcastTx) error + BroadcastTxCommit(*rpctypes.Context, types.Tx) (*ctypes.ResultBroadcastTxCommit, error) + BroadcastTxAsync(*rpctypes.Context, types.Tx) (*ctypes.ResultBroadcastTx, error) + BroadcastTxSync(*rpctypes.Context, types.Tx) (*ctypes.ResultBroadcastTx, error) } - BlockHeightArgs struct { - Height *int64 `json:"height"` + EventsService interface { + Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) + Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) + UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) } - BlockHashArgs struct { - Hash []byte `json:"hash"` + HistoryClient interface { + Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) + GenesisChunked(*rpctypes.Context, uint) (*ctypes.ResultGenesisChunk, error) + BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) } - CommitArgs struct { - Height *int64 `json:"height"` + MempoolClient interface { + UnconfirmedTxs(ctx *rpctypes.Context, limit *int) (*ctypes.ResultUnconfirmedTxs, error) + NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) + CheckTx(*rpctypes.Context, types.Tx) (*ctypes.ResultCheckTx, error) } - ValidatorsArgs struct { - Height *int64 `json:"height"` - Page *int `json:"page"` - PerPage *int `json:"perPage"` + NetworkClient interface { + NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) + DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState, error) + ConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) + ConsensusParams(ctx *rpctypes.Context, height *int64) (*ctypes.ResultConsensusParams, error) + Health(ctx *rpctypes.Context) (*ctypes.ResultHealth, error) } - TxArgs struct { - Hash []byte `json:"hash"` - Prove bool `json:"prove"` - } + SignClient interface { + Block(ctx *rpctypes.Context, height *int64) (*ctypes.ResultBlock, error) + BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) + BlockResults(ctx *rpctypes.Context, height *int64) (*ctypes.ResultBlockResults, error) + Commit(ctx *rpctypes.Context, height *int64) (*ctypes.ResultCommit, error) + Validators(ctx *rpctypes.Context, height *int64, page, perPage *int) (*ctypes.ResultValidators, error) + Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) - TxSearchArgs struct { - Query string `json:"query"` - Prove bool `json:"prove"` - Page *int `json:"page"` - PerPage *int `json:"perPage"` - OrderBy string `json:"orderBy"` - } + TxSearch(ctx *rpctypes.Context, query string, prove bool, + page, perPage *int, orderBy string) (*ctypes.ResultTxSearch, error) - BlockSearchArgs struct { - Query string `json:"query"` - Page *int `json:"page"` - PerPage *int `json:"perPage"` - OrderBy string `json:"orderBy"` + BlockSearch(ctx *rpctypes.Context, query string, + page, perPage *int, orderBy string) (*ctypes.ResultBlockSearch, error) } - SignService interface { - Block(_ *http.Request, args *BlockHeightArgs, reply *ctypes.ResultBlock) error - BlockByHash(_ *http.Request, args *BlockHashArgs, reply *ctypes.ResultBlock) error - BlockResults(_ *http.Request, args *BlockHeightArgs, reply *ctypes.ResultBlockResults) error - Commit(_ *http.Request, args *CommitArgs, reply *ctypes.ResultCommit) error - Validators(_ *http.Request, args *ValidatorsArgs, reply *ctypes.ResultValidators) error - Tx(_ *http.Request, args *TxArgs, reply *ctypes.ResultTx) error - TxSearch(_ *http.Request, args *TxSearchArgs, reply *ctypes.ResultTxSearch) error - BlockSearch(_ *http.Request, args *BlockSearchArgs, reply *ctypes.ResultBlockSearch) error + StatusClient interface { + Status(*rpctypes.Context) (*ctypes.ResultStatus, error) } +) - BlockchainInfoArgs struct { - MinHeight int64 `json:"minHeight"` - MaxHeight int64 `json:"maxHeight"` - } +func (s *LocalService) Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) { + addr := ctx.RemoteAddr() - GenesisChunkedArgs struct { - Chunk uint `json:"chunk"` + if s.vm.eventBus.NumClients() >= s.vm.rpcConfig.MaxSubscriptionClients { + return nil, fmt.Errorf("max_subscription_clients %d reached", s.vm.rpcConfig.MaxSubscriptionClients) + } else if s.vm.eventBus.NumClientSubscriptions(addr) >= s.vm.rpcConfig.MaxSubscriptionsPerClient { + return nil, fmt.Errorf("max_subscriptions_per_client %d reached", s.vm.rpcConfig.MaxSubscriptionsPerClient) } - HistoryService interface { - BlockchainInfo(_ *http.Request, args *BlockchainInfoArgs, reply *ctypes.ResultBlockchainInfo) error - Genesis(_ *http.Request, _ *struct{}, reply *ctypes.ResultGenesis) error - GenesisChunked(_ *http.Request, args *GenesisChunkedArgs, reply *ctypes.ResultGenesisChunk) error - } + s.vm.tmLogger.Info("Subscribe to query", "remote", addr, "query", query) - StatusService interface { - Status(_ *http.Request, _ *struct{}, reply *ctypes.ResultStatus) error + q, err := tmquery.New(query) + if err != nil { + return nil, fmt.Errorf("failed to parse query: %w", err) } - ConsensusParamsArgs struct { - Height *int64 `json:"height"` - } + subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout) + defer cancel() - NetworkService interface { - NetInfo(_ *http.Request, _ *struct{}, reply *ctypes.ResultNetInfo) error - DumpConsensusState(_ *http.Request, _ *struct{}, reply *ctypes.ResultDumpConsensusState) error - ConsensusState(_ *http.Request, _ *struct{}, reply *ctypes.ResultConsensusState) error - ConsensusParams(_ *http.Request, args *ConsensusParamsArgs, reply *ctypes.ResultConsensusParams) error - Health(_ *http.Request, _ *struct{}, reply *ctypes.ResultHealth) error - } + sub, err := s.vm.eventBus.Subscribe(subCtx, addr, q, s.vm.rpcConfig.SubscriptionBufferSize) + if err != nil { + return nil, err + } + + closeIfSlow := s.vm.rpcConfig.CloseOnSlowClient + + // Capture the current ID, since it can change in the future. + subscriptionID := ctx.JSONReq.ID + go func() { + for { + select { + case msg := <-sub.Out(): + var ( + resultEvent = &ctypes.ResultEvent{Query: query, Data: msg.Data(), Events: msg.Events()} + resp = rpctypes.NewRPCSuccessResponse(subscriptionID, resultEvent) + ) + writeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err = ctx.WSConn.WriteRPCResponse(writeCtx, resp); err != nil { + s.vm.tmLogger.Info("Can't write response (slow client)", + "to", addr, "subscriptionID", subscriptionID, "err", err) + + if closeIfSlow { + var ( + err = errors.New("subscription was cancelled (reason: slow client)") + resp = rpctypes.RPCServerError(subscriptionID, err) + ) + if !ctx.WSConn.TryWriteRPCResponse(resp) { + s.vm.tmLogger.Info("Can't write response (slow client)", + "to", addr, "subscriptionID", subscriptionID, "err", err) + } + return + } + } + case <-sub.Cancelled(): + if sub.Err() != tmpubsub.ErrUnsubscribed { + var reason string + if sub.Err() == nil { + reason = "Tendermint exited" + } else { + reason = sub.Err().Error() + } + resp := rpctypes.RPCServerError(subscriptionID, err) + if !ctx.WSConn.TryWriteRPCResponse(resp) { + s.vm.tmLogger.Info("Can't write response (slow client)", + "to", addr, "subscriptionID", subscriptionID, "err", + fmt.Errorf("subscription was cancelled (reason: %s)", reason)) + } + } + return + } + } + }() - UnconfirmedTxsArgs struct { - Limit *int `json:"limit"` - } + return &ctypes.ResultSubscribe{}, nil +} - CheckTxArgs struct { - Tx []byte `json:"tx"` +func (s *LocalService) Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) { + addr := ctx.RemoteAddr() + s.vm.tmLogger.Info("Unsubscribe from query", "remote", addr, "query", query) + q, err := tmquery.New(query) + if err != nil { + return nil, fmt.Errorf("failed to parse query: %w", err) } - - MempoolService interface { - UnconfirmedTxs(_ *http.Request, args *UnconfirmedTxsArgs, reply *ctypes.ResultUnconfirmedTxs) error - NumUnconfirmedTxs(_ *http.Request, _ *struct{}, reply *ctypes.ResultUnconfirmedTxs) error - CheckTx(_ *http.Request, args *CheckTxArgs, reply *ctypes.ResultCheckTx) error + err = s.vm.eventBus.Unsubscribe(context.Background(), addr, q) + if err != nil { + return nil, err } -) + return &ctypes.ResultUnsubscribe{}, nil +} -var ( - DefaultABCIQueryOptions = ABCIQueryOptions{Height: 0, Prove: false} -) +func (s *LocalService) UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) { + addr := ctx.RemoteAddr() + s.vm.tmLogger.Info("Unsubscribe from all", "remote", addr) + err := s.vm.eventBus.UnsubscribeAll(context.Background(), addr) + if err != nil { + return nil, err + } + return &ctypes.ResultUnsubscribe{}, nil +} func NewService(vm *VM) Service { return &LocalService{vm} } -//func (s *LocalService) ABCIInfo(_ *http.Request, _ *struct{}, reply *ctypes.ResultABCIInfo) error { -// resInfo, err := s.vm.proxyApp.Query().InfoSync(proxy.RequestInfo) -// if err != nil { -// return err -// } -// reply.Response = *resInfo -// return nil -//} - func (s *LocalService) ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) { - - return &ctypes.ResultABCIInfo{Response: abci.ResponseInfo{}}, nil -} - -func (s *LocalService) ABCIQuery(req *http.Request, args *ABCIQueryArgs, reply *ctypes.ResultABCIQuery) error { - return s.ABCIQueryWithOptions(req, &ABCIQueryWithOptionsArgs{args.Path, args.Data, DefaultABCIQueryOptions}, reply) + resInfo, err := s.vm.proxyApp.Query().InfoSync(proxy.RequestInfo) + if err != nil || resInfo == nil { + return nil, err + } + return &ctypes.ResultABCIInfo{Response: *resInfo}, nil } -func (s *LocalService) ABCIQueryWithOptions( - _ *http.Request, - args *ABCIQueryWithOptionsArgs, - reply *ctypes.ResultABCIQuery, -) error { +// TODO: attention! Different signatures in RPC interfaces +func (s *LocalService) ABCIQuery( + ctx *rpctypes.Context, + path string, + data bytes.HexBytes, + height int64, + prove bool, +) (*ctypes.ResultABCIQuery, error) { resQuery, err := s.vm.proxyApp.Query().QuerySync(abci.RequestQuery{ - Path: args.Path, - Data: args.Data, - Height: args.Opts.Height, - Prove: args.Opts.Prove, + Path: path, + Data: data, + Height: height, + Prove: prove, }) - if err != nil { - return err + if err != nil || resQuery == nil { + return nil, err } - reply.Response = *resQuery - return nil + + return &ctypes.ResultABCIQuery{Response: *resQuery}, nil } -func (s *LocalService) BroadcastTxCommit( - _ *http.Request, - args *BroadcastTxArgs, - reply *ctypes.ResultBroadcastTxCommit, -) error { +func (s *LocalService) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { subscriber := "" // Subscribe to tx being committed in block. - subCtx, cancel := context.WithTimeout(context.Background(), core.SubscribeTimeout) + subCtx, cancel := context.WithTimeout(ctx.Context(), core.SubscribeTimeout) defer cancel() - q := types.EventQueryTxFor(args.Tx) + q := types.EventQueryTxFor(tx) deliverTxSub, err := s.vm.eventBus.Subscribe(subCtx, subscriber, q) if err != nil { err = fmt.Errorf("failed to subscribe to tx: %w", err) s.vm.tmLogger.Error("Error on broadcast_tx_commit", "err", err) - return err + return nil, err } defer func() { @@ -233,35 +255,33 @@ func (s *LocalService) BroadcastTxCommit( // Broadcast tx and wait for CheckTx result checkTxResCh := make(chan *abci.Response, 1) - err = s.vm.mempool.CheckTx(args.Tx, func(res *abci.Response) { + err = s.vm.mempool.CheckTx(tx, func(res *abci.Response) { checkTxResCh <- res }, mempl.TxInfo{}) if err != nil { s.vm.tmLogger.Error("Error on broadcastTxCommit", "err", err) - return fmt.Errorf("error on broadcastTxCommit: %v", err) + return nil, fmt.Errorf("error on broadcastTxCommit: %v", err) } checkTxResMsg := <-checkTxResCh checkTxRes := checkTxResMsg.GetCheckTx() if checkTxRes.Code != abci.CodeTypeOK { - *reply = ctypes.ResultBroadcastTxCommit{ + return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, DeliverTx: abci.ResponseDeliverTx{}, - Hash: args.Tx.Hash(), - } - return nil + Hash: tx.Hash(), + }, nil } // Wait for the tx to be included in a block or timeout. select { case msg := <-deliverTxSub.Out(): // The tx was included in a block. deliverTxRes := msg.Data().(types.EventDataTx) - *reply = ctypes.ResultBroadcastTxCommit{ + return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, DeliverTx: deliverTxRes.Result, - Hash: args.Tx.Hash(), + Hash: tx.Hash(), Height: deliverTxRes.Height, - } - return nil + }, nil case <-deliverTxSub.Cancelled(): var reason string if deliverTxSub.Err() == nil { @@ -271,192 +291,193 @@ func (s *LocalService) BroadcastTxCommit( } err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason) s.vm.tmLogger.Error("Error on broadcastTxCommit", "err", err) - return err + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: abci.ResponseDeliverTx{}, + Hash: tx.Hash(), + }, err // TODO: use config for timeout case <-time.After(10 * time.Second): err = errors.New("timed out waiting for tx to be included in a block") s.vm.tmLogger.Error("Error on broadcastTxCommit", "err", err) - return err + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: abci.ResponseDeliverTx{}, + Hash: tx.Hash(), + }, err } } -func (s *LocalService) BroadcastTxAsync( - _ *http.Request, - args *BroadcastTxArgs, - reply *ctypes.ResultBroadcastTx, -) error { - err := s.vm.mempool.CheckTx(args.Tx, nil, mempl.TxInfo{}) +func (s *LocalService) BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + err := s.vm.mempool.CheckTx(tx, nil, mempl.TxInfo{}) if err != nil { - return err + return nil, err } - reply.Hash = args.Tx.Hash() - return nil + return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil } -func (s *LocalService) BroadcastTxSync(_ *http.Request, args *BroadcastTxArgs, reply *ctypes.ResultBroadcastTx) error { +func (s *LocalService) BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh := make(chan *abci.Response, 1) - err := s.vm.mempool.CheckTx(args.Tx, func(res *abci.Response) { + err := s.vm.mempool.CheckTx(tx, func(res *abci.Response) { s.vm.tmLogger.With("module", "service").Debug("handled response from checkTx") resCh <- res }, mempl.TxInfo{}) if err != nil { - return err + return nil, err } res := <-resCh r := res.GetCheckTx() - - reply.Code = r.Code - reply.Data = r.Data - reply.Log = r.Log - reply.Codespace = r.Codespace - reply.Hash = args.Tx.Hash() - - return nil + return &ctypes.ResultBroadcastTx{ + Code: r.Code, + Data: r.Data, + Log: r.Log, + Codespace: r.Codespace, + Hash: tx.Hash(), + }, nil } -func (s *LocalService) Block(_ *http.Request, args *BlockHeightArgs, reply *ctypes.ResultBlock) error { - height, err := getHeight(s.vm.blockStore, args.Height) +func (s *LocalService) Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) { + height, err := getHeight(s.vm.blockStore, heightPtr) if err != nil { - return err + return nil, err } + block := s.vm.blockStore.LoadBlock(height) blockMeta := s.vm.blockStore.LoadBlockMeta(height) - - if blockMeta != nil { - reply.BlockID = blockMeta.BlockID + if blockMeta == nil { + return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, nil } - reply.Block = block - return nil + return &ctypes.ResultBlock{BlockID: blockMeta.BlockID, Block: block}, nil } -func (s *LocalService) BlockByHash(_ *http.Request, args *BlockHashArgs, reply *ctypes.ResultBlock) error { - block := s.vm.blockStore.LoadBlockByHash(args.Hash) +func (s *LocalService) BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) { + block := s.vm.blockStore.LoadBlockByHash(hash) if block == nil { - reply.BlockID = types.BlockID{} - reply.Block = nil - return nil + return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: nil}, nil } + // If block is not nil, then blockMeta can't be nil. blockMeta := s.vm.blockStore.LoadBlockMeta(block.Height) - reply.BlockID = blockMeta.BlockID - reply.Block = block - return nil + return &ctypes.ResultBlock{BlockID: blockMeta.BlockID, Block: block}, nil } -func (s *LocalService) BlockResults(_ *http.Request, args *BlockHeightArgs, reply *ctypes.ResultBlockResults) error { - height, err := getHeight(s.vm.blockStore, args.Height) +func (s *LocalService) BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockResults, error) { + height, err := getHeight(s.vm.blockStore, heightPtr) if err != nil { - return err + return nil, err } results, err := s.vm.stateStore.LoadABCIResponses(height) if err != nil { - return err + return nil, err } - reply.Height = height - reply.TxsResults = results.DeliverTxs - reply.BeginBlockEvents = results.BeginBlock.Events - reply.EndBlockEvents = results.EndBlock.Events - reply.ValidatorUpdates = results.EndBlock.ValidatorUpdates - reply.ConsensusParamUpdates = results.EndBlock.ConsensusParamUpdates - return nil + return &ctypes.ResultBlockResults{ + Height: height, + TxsResults: results.DeliverTxs, + BeginBlockEvents: results.BeginBlock.Events, + EndBlockEvents: results.EndBlock.Events, + ValidatorUpdates: results.EndBlock.ValidatorUpdates, + ConsensusParamUpdates: results.EndBlock.ConsensusParamUpdates, + }, nil } -func (s *LocalService) Commit(_ *http.Request, args *CommitArgs, reply *ctypes.ResultCommit) error { - height, err := getHeight(s.vm.blockStore, args.Height) +func (s *LocalService) Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, error) { + height, err := getHeight(s.vm.blockStore, heightPtr) if err != nil { - return err + return nil, err } blockMeta := s.vm.blockStore.LoadBlockMeta(height) if blockMeta == nil { - return nil + return nil, nil } - header := blockMeta.Header - commit := s.vm.blockStore.LoadBlockCommit(height) - res := ctypes.NewResultCommit(&header, commit, !(height == s.vm.blockStore.Height())) - reply.SignedHeader = res.SignedHeader - reply.CanonicalCommit = res.CanonicalCommit - return nil + // Return the canonical commit (comes from the block at height+1) + commit := s.vm.blockStore.LoadBlockCommit(height) + return ctypes.NewResultCommit(&header, commit, true), nil } -func (s *LocalService) Validators(_ *http.Request, args *ValidatorsArgs, reply *ctypes.ResultValidators) error { - height, err := getHeight(s.vm.blockStore, args.Height) +func (s *LocalService) Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *int) (*ctypes.ResultValidators, error) { + height, err := getHeight(s.vm.blockStore, heightPtr) if err != nil { - return err + return nil, err } validators, err := s.vm.stateStore.LoadValidators(height) if err != nil { - return err + return nil, err } totalCount := len(validators.Validators) - perPage := validatePerPage(args.PerPage) - page, err := validatePage(args.Page, perPage, totalCount) + perPage := validatePerPage(perPagePtr) + page, err := validatePage(pagePtr, perPage, totalCount) if err != nil { - return err + return nil, err } skipCount := validateSkipCount(page, perPage) - reply.BlockHeight = height - reply.Validators = validators.Validators[skipCount : skipCount+tmmath.MinInt(perPage, totalCount-skipCount)] - reply.Count = len(reply.Validators) - reply.Total = totalCount - return nil + v := validators.Validators[skipCount : skipCount+tmmath.MinInt(perPage, totalCount-skipCount)] + + return &ctypes.ResultValidators{ + BlockHeight: height, + Validators: v, + Count: len(v), + Total: totalCount}, nil } -func (s *LocalService) Tx(_ *http.Request, args *TxArgs, reply *ctypes.ResultTx) error { - r, err := s.vm.txIndexer.Get(args.Hash) +func (s *LocalService) Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { + if _, ok := s.vm.txIndexer.(*null.TxIndex); ok { + return nil, fmt.Errorf("transaction indexing is disabled") + } + + r, err := s.vm.txIndexer.Get(hash) if err != nil { - return err + return nil, err } if r == nil { - return fmt.Errorf("tx (%X) not found", args.Hash) + return nil, fmt.Errorf("tx (%X) not found", hash) } height := r.Height index := r.Index var proof types.TxProof - if args.Prove { + if prove { block := s.vm.blockStore.LoadBlock(height) proof = block.Data.Txs.Proof(int(index)) // XXX: overflow on 32-bit machines } - reply.Hash = args.Hash - reply.Height = height - reply.Index = index - reply.TxResult = r.Result - reply.Tx = r.Tx - reply.Proof = proof - return nil + return &ctypes.ResultTx{ + Hash: hash, + Height: height, + Index: index, + TxResult: r.Result, + Tx: r.Tx, + Proof: proof, + }, nil } -func (s *LocalService) TxSearch(req *http.Request, args *TxSearchArgs, reply *ctypes.ResultTxSearch) error { - q, err := tmquery.New(args.Query) - if err != nil { - return err +func (s *LocalService) TxSearch(ctx *rpctypes.Context, query string, prove bool, pagePtr, perPagePtr *int, + orderBy string) (*ctypes.ResultTxSearch, error) { + // if index is disabled, return error + if _, ok := s.vm.txIndexer.(*null.TxIndex); ok { + return nil, errors.New("transaction indexing is disabled") } - - var ctx context.Context - if req != nil { - ctx = req.Context() - } else { - ctx = context.Background() + q, err := tmquery.New(query) + if err != nil { + return nil, err } - results, err := s.vm.txIndexer.Search(ctx, q) + results, err := s.vm.txIndexer.Search(ctx.Context(), q) if err != nil { - return err + return nil, err } // sort results (must be done before pagination) - switch args.OrderBy { + switch orderBy { case "desc": sort.Slice(results, func(i, j int) bool { if results[i].Height == results[j].Height { @@ -472,16 +493,16 @@ func (s *LocalService) TxSearch(req *http.Request, args *TxSearchArgs, reply *ct return results[i].Height < results[j].Height }) default: - return errors.New("expected order_by to be either `asc` or `desc` or empty") + return nil, errors.New("expected order_by to be either `asc` or `desc` or empty") } // paginate results totalCount := len(results) - perPage := validatePerPage(args.PerPage) + perPage := validatePerPage(perPagePtr) - page, err := validatePage(args.Page, perPage, totalCount) + page, err := validatePage(pagePtr, perPage, totalCount) if err != nil { - return err + return nil, err } skipCount := validateSkipCount(page, perPage) @@ -492,7 +513,7 @@ func (s *LocalService) TxSearch(req *http.Request, args *TxSearchArgs, reply *ct r := results[i] var proof types.TxProof - if args.Prove { + if prove { block := s.vm.blockStore.LoadBlock(r.Height) proof = block.Data.Txs.Proof(int(r.Index)) // XXX: overflow on 32-bit machines } @@ -507,31 +528,35 @@ func (s *LocalService) TxSearch(req *http.Request, args *TxSearchArgs, reply *ct }) } - reply.Txs = apiResults - reply.TotalCount = totalCount - return nil + return &ctypes.ResultTxSearch{Txs: apiResults, TotalCount: totalCount}, nil } -func (s *LocalService) BlockSearch(req *http.Request, args *BlockSearchArgs, reply *ctypes.ResultBlockSearch) error { - q, err := tmquery.New(args.Query) - if err != nil { - return err +// BlockSearch searches for a paginated set of blocks matching BeginBlock and +// EndBlock event search criteria. +func (s *LocalService) BlockSearch( + ctx *rpctypes.Context, + query string, + pagePtr, perPagePtr *int, + orderBy string, +) (*ctypes.ResultBlockSearch, error) { + + // skip if block indexing is disabled + if _, ok := s.vm.blockIndexer.(*blockidxnull.BlockerIndexer); ok { + return nil, errors.New("block indexing is disabled") } - var ctx context.Context - if req != nil { - ctx = req.Context() - } else { - ctx = context.Background() + q, err := tmquery.New(query) + if err != nil { + return nil, err } - results, err := s.vm.blockIndexer.Search(ctx, q) + results, err := s.vm.blockIndexer.Search(ctx.Context(), q) if err != nil { - return err + return nil, err } // sort results (must be done before pagination) - switch args.OrderBy { + switch orderBy { case "desc", "": sort.Slice(results, func(i, j int) bool { return results[i] > results[j] }) @@ -539,16 +564,16 @@ func (s *LocalService) BlockSearch(req *http.Request, args *BlockSearchArgs, rep sort.Slice(results, func(i, j int) bool { return results[i] < results[j] }) default: - return errors.New("expected order_by to be either `asc` or `desc` or empty") + return nil, errors.New("expected order_by to be either `asc` or `desc` or empty") } // paginate results totalCount := len(results) - perPage := validatePerPage(args.PerPage) + perPage := validatePerPage(perPagePtr) - page, err := validatePage(args.Page, perPage, totalCount) + page, err := validatePage(pagePtr, perPage, totalCount) if err != nil { - return err + return nil, err } skipCount := validateSkipCount(page, perPage) @@ -568,72 +593,66 @@ func (s *LocalService) BlockSearch(req *http.Request, args *BlockSearchArgs, rep } } - reply.Blocks = apiResults - reply.TotalCount = totalCount - return nil + return &ctypes.ResultBlockSearch{Blocks: apiResults, TotalCount: totalCount}, nil } -func (s *LocalService) BlockchainInfo( - _ *http.Request, - args *BlockchainInfoArgs, - reply *ctypes.ResultBlockchainInfo, -) error { +func (s *LocalService) BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { // maximum 20 block metas const limit int64 = 20 var err error - args.MinHeight, args.MaxHeight, err = filterMinMax( + minHeight, maxHeight, err = filterMinMax( s.vm.blockStore.Base(), s.vm.blockStore.Height(), - args.MinHeight, - args.MaxHeight, + minHeight, + maxHeight, limit) if err != nil { - return err + return nil, err } - s.vm.tmLogger.Debug("BlockchainInfoHandler", "maxHeight", args.MaxHeight, "minHeight", args.MinHeight) + s.vm.tmLogger.Debug("BlockchainInfoHandler", "maxHeight", maxHeight, "minHeight", minHeight) var blockMetas []*types.BlockMeta - for height := args.MaxHeight; height >= args.MinHeight; height-- { + for height := maxHeight; height >= minHeight; height-- { blockMeta := s.vm.blockStore.LoadBlockMeta(height) blockMetas = append(blockMetas, blockMeta) } - reply.LastHeight = s.vm.blockStore.Height() - reply.BlockMetas = blockMetas - return nil + return &ctypes.ResultBlockchainInfo{ + LastHeight: s.vm.blockStore.Height(), + BlockMetas: blockMetas}, nil } -func (s *LocalService) Genesis(_ *http.Request, _ *struct{}, reply *ctypes.ResultGenesis) error { +func (s *LocalService) Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) { if len(s.vm.genChunks) > 1 { - return errors.New("genesis response is large, please use the genesis_chunked API instead") + return nil, errors.New("genesis response is large, please use the genesis_chunked API instead") } - reply.Genesis = s.vm.genesis - return nil + return &ctypes.ResultGenesis{Genesis: s.vm.genesis}, nil } -func (s *LocalService) GenesisChunked(_ *http.Request, args *GenesisChunkedArgs, reply *ctypes.ResultGenesisChunk) error { +func (s *LocalService) GenesisChunked(ctx *rpctypes.Context, chunk uint) (*ctypes.ResultGenesisChunk, error) { if s.vm.genChunks == nil { - return fmt.Errorf("service configuration error, genesis chunks are not initialized") + return nil, fmt.Errorf("service configuration error, genesis chunks are not initialized") } if len(s.vm.genChunks) == 0 { - return fmt.Errorf("service configuration error, there are no chunks") + return nil, fmt.Errorf("service configuration error, there are no chunks") } - id := int(args.Chunk) + id := int(chunk) if id > len(s.vm.genChunks)-1 { - return fmt.Errorf("there are %d chunks, %d is invalid", len(s.vm.genChunks)-1, id) + return nil, fmt.Errorf("there are %d chunks, %d is invalid", len(s.vm.genChunks)-1, id) } - reply.TotalChunks = len(s.vm.genChunks) - reply.ChunkNumber = id - reply.Data = s.vm.genChunks[id] - return nil + return &ctypes.ResultGenesisChunk{ + TotalChunks: len(s.vm.genChunks), + ChunkNumber: id, + Data: s.vm.genChunks[id], + }, nil } -func (s *LocalService) Status(_ *http.Request, _ *struct{}, reply *ctypes.ResultStatus) error { +func (s *LocalService) Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { var ( earliestBlockHeight int64 earliestBlockHash tmbytes.HexBytes @@ -664,70 +683,79 @@ func (s *LocalService) Status(_ *http.Request, _ *struct{}, reply *ctypes.Result } } - reply.NodeInfo = p2p.DefaultNodeInfo{ - DefaultNodeID: p2p.ID(s.vm.ctx.NodeID.String()), - Network: fmt.Sprintf("%d", s.vm.ctx.NetworkID), - } - reply.SyncInfo = ctypes.SyncInfo{ - LatestBlockHash: latestBlockHash, - LatestAppHash: latestAppHash, - LatestBlockHeight: latestHeight, - LatestBlockTime: time.Unix(0, latestBlockTimeNano), - EarliestBlockHash: earliestBlockHash, - EarliestAppHash: earliestAppHash, - EarliestBlockHeight: earliestBlockHeight, - EarliestBlockTime: time.Unix(0, earliestBlockTimeNano), - } - return nil + result := &ctypes.ResultStatus{ + NodeInfo: p2p.DefaultNodeInfo{ + DefaultNodeID: p2p.ID(s.vm.ctx.NodeID.String()), + Network: fmt.Sprintf("%d", s.vm.ctx.NetworkID), + }, + SyncInfo: ctypes.SyncInfo{ + LatestBlockHash: latestBlockHash, + LatestAppHash: latestAppHash, + LatestBlockHeight: latestHeight, + LatestBlockTime: time.Unix(0, latestBlockTimeNano), + EarliestBlockHash: earliestBlockHash, + EarliestAppHash: earliestAppHash, + EarliestBlockHeight: earliestBlockHeight, + EarliestBlockTime: time.Unix(0, earliestBlockTimeNano), + CatchingUp: false, + }, + ValidatorInfo: ctypes.ValidatorInfo{ + Address: proposerPubKey.Address(), + PubKey: proposerPubKey, + VotingPower: 0, + }, + } + + return result, nil } -// ToDo: no peers, because it's vm -func (s *LocalService) NetInfo(_ *http.Request, _ *struct{}, reply *ctypes.ResultNetInfo) error { - return nil +// ToDo: no peers, no network from tendermint side +func (s *LocalService) NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) { + return &ctypes.ResultNetInfo{}, nil } // ToDo: we doesn't have consensusState -func (s *LocalService) DumpConsensusState(_ *http.Request, _ *struct{}, reply *ctypes.ResultDumpConsensusState) error { - return nil +func (s *LocalService) DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState, error) { + return &ctypes.ResultDumpConsensusState{}, nil } // ToDo: we doesn't have consensusState -func (s *LocalService) ConsensusState(_ *http.Request, _ *struct{}, reply *ctypes.ResultConsensusState) error { - return nil +func (s *LocalService) ConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) { + return &ctypes.ResultConsensusState{}, nil } -func (s *LocalService) ConsensusParams(_ *http.Request, args *ConsensusParamsArgs, reply *ctypes.ResultConsensusParams) error { - reply.BlockHeight = s.vm.blockStore.Height() - reply.ConsensusParams = *s.vm.genesis.ConsensusParams - return nil +func (s *LocalService) ConsensusParams(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultConsensusParams, error) { + return &ctypes.ResultConsensusParams{ + BlockHeight: s.vm.blockStore.Height(), + ConsensusParams: *s.vm.genesis.ConsensusParams, + }, nil } -func (s *LocalService) Health(_ *http.Request, _ *struct{}, reply *ctypes.ResultHealth) error { - *reply = ctypes.ResultHealth{} - return nil +func (s *LocalService) Health(ctx *rpctypes.Context) (*ctypes.ResultHealth, error) { + return &ctypes.ResultHealth{}, nil } -func (s *LocalService) UnconfirmedTxs(_ *http.Request, args *UnconfirmedTxsArgs, reply *ctypes.ResultUnconfirmedTxs) error { - limit := validatePerPage(args.Limit) +func (s *LocalService) UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) { + limit := validatePerPage(limitPtr) txs := s.vm.mempool.ReapMaxTxs(limit) - reply.Count = len(txs) - reply.Total = s.vm.mempool.Size() - reply.Txs = txs - return nil + return &ctypes.ResultUnconfirmedTxs{ + Count: len(txs), + Total: s.vm.mempool.Size(), + Txs: txs, + }, nil } -func (s *LocalService) NumUnconfirmedTxs(_ *http.Request, _ *struct{}, reply *ctypes.ResultUnconfirmedTxs) error { - reply.Count = s.vm.mempool.Size() - reply.Total = s.vm.mempool.Size() - reply.TotalBytes = s.vm.mempool.TxsBytes() - return nil +func (s *LocalService) NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) { + return &ctypes.ResultUnconfirmedTxs{ + Count: s.vm.mempool.Size(), + Total: s.vm.mempool.Size(), + TotalBytes: s.vm.mempool.TxsBytes()}, nil } -func (s *LocalService) CheckTx(_ *http.Request, args *CheckTxArgs, reply *ctypes.ResultCheckTx) error { - res, err := s.vm.proxyApp.Mempool().CheckTxSync(abci.RequestCheckTx{Tx: args.Tx}) +func (s *LocalService) CheckTx(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) { + res, err := s.vm.proxyApp.Mempool().CheckTxSync(abci.RequestCheckTx{Tx: tx}) if err != nil { - return err + return nil, err } - reply.ResponseCheckTx = *res - return nil + return &ctypes.ResultCheckTx{ResponseCheckTx: *res}, nil } diff --git a/vm/service_utils.go b/vm/service_utils.go index 502c9e681..63caaa808 100644 --- a/vm/service_utils.go +++ b/vm/service_utils.go @@ -2,10 +2,10 @@ package vm import ( "fmt" + rpctypes "github.com/consideritdone/landslidecore/rpc/jsonrpc/types" tmmath "github.com/consideritdone/landslidecore/libs/math" "github.com/consideritdone/landslidecore/rpc/client" - coretypes "github.com/consideritdone/landslidecore/rpc/core/types" "github.com/consideritdone/landslidecore/store" ) @@ -117,8 +117,8 @@ func WaitForHeight(c Service, h int64, waiter client.Waiter) error { } delta := int64(1) for delta > 0 { - r := new(coretypes.ResultStatus) - if err := c.Status(nil, nil, r); err != nil { + r, err := c.Status(&rpctypes.Context{}) + if err != nil { return err } delta = h - r.SyncInfo.LatestBlockHeight diff --git a/vm/vm.go b/vm/vm.go index 916f83604..7c0971689 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "errors" "fmt" + "github.com/consideritdone/landslidecore/crypto/secp256k1" "net/http" "time" @@ -26,6 +27,7 @@ import ( abciTypes "github.com/consideritdone/landslidecore/abci/types" "github.com/consideritdone/landslidecore/config" + cfg "github.com/consideritdone/landslidecore/config" cs "github.com/consideritdone/landslidecore/consensus" tmjson "github.com/consideritdone/landslidecore/libs/json" "github.com/consideritdone/landslidecore/libs/log" @@ -75,6 +77,7 @@ var ( blockIndexerDBPrefix = []byte("block_events") proposerAddress = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + proposerPubKey = secp256k1.PubKey{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} ) var ( @@ -129,6 +132,8 @@ type VM struct { blockIndexerDB dbm.DB indexerService *txindex.IndexerService + rpcConfig cfg.RPCConfig + clock mockable.Clock } @@ -621,92 +626,43 @@ func (vm *VM) CreateStaticHandlers(ctx context.Context) (map[string]*common.HTTP func (vm *VM) RPCRoutes() map[string]*rpcserver.RPCFunc { vmTMService := NewService(vm) return map[string]*rpcserver.RPCFunc{ - //// subscribe/unsubscribe are reserved for websocket events. - //"subscribe": rpc.NewWSRPCFunc(Subscribe, "query"), - //"unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "query"), - //"unsubscribe_all": rpc.NewWSRPCFunc(UnsubscribeAll, ""), - // - //// info API - //"health": rpc.NewRPCFunc(Health, ""), - //"status": rpc.NewRPCFunc(Status, ""), - //"net_info": rpc.NewRPCFunc(NetInfo, ""), - //"blockchain": rpc.NewRPCFunc(BlockchainInfo, "minHeight,maxHeight"), - //"genesis": rpc.NewRPCFunc(Genesis, ""), - //"genesis_chunked": rpc.NewRPCFunc(GenesisChunked, "chunk"), - //"block": rpc.NewRPCFunc(Block, "height"), - //"block_by_hash": rpc.NewRPCFunc(BlockByHash, "hash"), - //"block_results": rpc.NewRPCFunc(BlockResults, "height"), - //"commit": rpc.NewRPCFunc(Commit, "height"), - //"check_tx": rpc.NewRPCFunc(CheckTx, "tx"), - //"tx": rpc.NewRPCFunc(Tx, "hash,prove"), - //"tx_search": rpc.NewRPCFunc(TxSearch, "query,prove,page,per_page,order_by"), - //"block_search": rpc.NewRPCFunc(BlockSearch, "query,page,per_page,order_by"), - //"validators": rpc.NewRPCFunc(Validators, "height,page,per_page"), - //"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""), - //"consensus_state": rpc.NewRPCFunc(ConsensusState, ""), - //"consensus_params": rpc.NewRPCFunc(ConsensusParams, "height"), - //"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxs, "limit"), - //"num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxs, ""), - // - //// tx broadcast API - //"broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommit, "tx"), - //"broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSync, "tx"), - //"broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsync, "tx"), - // - //// abci API - //"abci_query": rpc.NewRPCFunc(ABCIQuery, "path,data,height,prove"), - "abci_info": rpcserver.NewRPCFunc(vmTMService.ABCIInfo, ""), - // - //// evidence API - //"broadcast_evidence": rpc.NewRPCFunc(BroadcastEvidence, "evidence"), - } -} - -//func (vm *VM) CreateHandlers(_ context.Context) (map[string]*common.HTTPHandler, error) { -// server := rpc.NewServer() -// tmService := NewService(vm) -// err := rpc.Register(tmService) -// if err != nil { -// return nil, fmt.Errorf("Failed to create vm handlers: failed to register tendermint service: %w ", err) -// } -// rpc.HandleHTTP() -// -// return map[string]*common.HTTPHandler{ -// "/rpc": { -// LockOptions: common.WriteLock, -// Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { -// if r.Method != "POST" { -// http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) -// return -// } -// -// err := r.ParseForm() -// if err != nil { -// http.Error(w, "Bad request", http.StatusBadRequest) -// return -// } -// -// method := r.FormValue("method") -// if method == "" { -// http.Error(w, "Method not specified", http.StatusBadRequest) -// return -// } -// -// switch method { -// case "math_add": -// //Math add handler -// -// case "math_subtract": -// //Math subc=stract handler -// -// default: -// http.Error(w, "Method not found", http.StatusNotFound) -// return -// } -// }), -// }, -// }, nil -//} + // subscribe/unsubscribe are reserved for websocket events. + "subscribe": rpcserver.NewWSRPCFunc(vmTMService.Subscribe, "query"), + "unsubscribe": rpcserver.NewWSRPCFunc(vmTMService.Unsubscribe, "query"), + "unsubscribe_all": rpcserver.NewWSRPCFunc(vmTMService.UnsubscribeAll, ""), + + // info API + "health": rpcserver.NewRPCFunc(vmTMService.Health, ""), + "status": rpcserver.NewRPCFunc(vmTMService.Status, ""), + "net_info": rpcserver.NewRPCFunc(vmTMService.NetInfo, ""), + "blockchain": rpcserver.NewRPCFunc(vmTMService.BlockchainInfo, "minHeight,maxHeight"), + "genesis": rpcserver.NewRPCFunc(vmTMService.Genesis, ""), + "genesis_chunked": rpcserver.NewRPCFunc(vmTMService.GenesisChunked, "chunk"), + "block": rpcserver.NewRPCFunc(vmTMService.Block, "height"), + "block_by_hash": rpcserver.NewRPCFunc(vmTMService.BlockByHash, "hash"), + "block_results": rpcserver.NewRPCFunc(vmTMService.BlockResults, "height"), + "commit": rpcserver.NewRPCFunc(vmTMService.Commit, "height"), + "check_tx": rpcserver.NewRPCFunc(vmTMService.CheckTx, "tx"), + "tx": rpcserver.NewRPCFunc(vmTMService.Tx, "hash,prove"), + "tx_search": rpcserver.NewRPCFunc(vmTMService.TxSearch, "query,prove,page,per_page,order_by"), + "block_search": rpcserver.NewRPCFunc(vmTMService.BlockSearch, "query,page,per_page,order_by"), + "validators": rpcserver.NewRPCFunc(vmTMService.Validators, "height,page,per_page"), + "dump_consensus_state": rpcserver.NewRPCFunc(vmTMService.DumpConsensusState, ""), + "consensus_state": rpcserver.NewRPCFunc(vmTMService.ConsensusState, ""), + "consensus_params": rpcserver.NewRPCFunc(vmTMService.ConsensusParams, "height"), + "unconfirmed_txs": rpcserver.NewRPCFunc(vmTMService.UnconfirmedTxs, "limit"), + "num_unconfirmed_txs": rpcserver.NewRPCFunc(vmTMService.NumUnconfirmedTxs, ""), + + // tx broadcast API + "broadcast_tx_commit": rpcserver.NewRPCFunc(vmTMService.BroadcastTxCommit, "tx"), + "broadcast_tx_sync": rpcserver.NewRPCFunc(vmTMService.BroadcastTxSync, "tx"), + "broadcast_tx_async": rpcserver.NewRPCFunc(vmTMService.BroadcastTxAsync, "tx"), + + // abci API + "abci_query": rpcserver.NewRPCFunc(vmTMService.ABCIQuery, "path,data,height,prove"), + "abci_info": rpcserver.NewRPCFunc(vmTMService.ABCIInfo, ""), + } +} func (vm *VM) CreateHandlers(_ context.Context) (map[string]*common.HTTPHandler, error) { mux := http.NewServeMux() @@ -723,59 +679,6 @@ func (vm *VM) CreateHandlers(_ context.Context) (map[string]*common.HTTPHandler, }, nil } -//// convert from a function name to the http handler -//func makeHTTPHandler(rpcFunc *RPCFunc, logger log.Logger) func(http.ResponseWriter, *http.Request) { -// // Always return -1 as there's no ID here. -// dummyID := types.JSONRPCIntID(-1) // URIClientRequestID -// -// // Exception for websocket endpoints -// if rpcFunc.ws { -// return func(w http.ResponseWriter, r *http.Request) { -// res := types.RPCMethodNotFoundError(dummyID) -// if wErr := WriteRPCResponseHTTPError(w, http.StatusNotFound, res); wErr != nil { -// logger.Error("failed to write response", "res", res, "err", wErr) -// } -// } -// } -// -// // All other endpoints -// return func(w http.ResponseWriter, r *http.Request) { -// logger.Debug("HTTP HANDLER", "req", r) -// -// ctx := &types.Context{HTTPReq: r} -// args := []reflect.Value{reflect.ValueOf(ctx)} -// -// fnArgs, err := httpParamsToArgs(rpcFunc, r) -// if err != nil { -// res := types.RPCInvalidParamsError(dummyID, -// fmt.Errorf("error converting http params to arguments: %w", err), -// ) -// if wErr := WriteRPCResponseHTTPError(w, http.StatusInternalServerError, res); wErr != nil { -// logger.Error("failed to write response", "res", res, "err", wErr) -// } -// return -// } -// args = append(args, fnArgs...) -// -// returns := rpcFunc.f.Call(args) -// -// logger.Debug("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) -// result, err := unreflectResult(returns) -// if err != nil { -// if err := WriteRPCResponseHTTPError(w, http.StatusInternalServerError, -// types.RPCInternalError(dummyID, err)); err != nil { -// logger.Error("failed to write response", "res", result, "err", err) -// return -// } -// return -// } -// if err := WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(dummyID, result)); err != nil { -// logger.Error("failed to write response", "res", result, "err", err) -// return -// } -// } -//} - func (vm *VM) ProxyApp() proxy.AppConns { return vm.proxyApp } From 6eb97363d2b8c6584e306dd99524da5eedda5f77 Mon Sep 17 00:00:00 2001 From: Ivan Sukach Date: Fri, 23 Jun 2023 10:34:47 +0200 Subject: [PATCH 05/10] fix func signatures and all according infrastructure --- vm/service_test.go | 144 +++++++++++++++++++++------------------------ vm/vm_test.go | 23 +++----- 2 files changed, 75 insertions(+), 92 deletions(-) diff --git a/vm/service_test.go b/vm/service_test.go index 1eb747ef4..5795a3945 100644 --- a/vm/service_test.go +++ b/vm/service_test.go @@ -2,11 +2,11 @@ package vm import ( "context" + rpctypes "github.com/consideritdone/landslidecore/rpc/jsonrpc/types" "testing" "time" atypes "github.com/consideritdone/landslidecore/abci/types" - ctypes "github.com/consideritdone/landslidecore/rpc/core/types" "github.com/davecgh/go-spew/spew" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -16,8 +16,8 @@ func TestABCIService(t *testing.T) { vm, service, _ := mustNewKVTestVm(t) t.Run("ABCIInfo", func(t *testing.T) { - reply := new(ctypes.ResultABCIInfo) - assert.NoError(t, service.ABCIInfo(nil, nil, reply)) + reply, err := service.ABCIInfo(&rpctypes.Context{}) + require.NoError(t, err) assert.Equal(t, uint64(1), reply.Response.AppVersion) assert.Equal(t, int64(0), reply.Response.LastBlockHeight) assert.Equal(t, []uint8([]byte(nil)), reply.Response.LastBlockAppHash) @@ -27,8 +27,8 @@ func TestABCIService(t *testing.T) { t.Run("ABCIQuery", func(t *testing.T) { k, v, tx := MakeTxKV() - replyBroadcast := new(ctypes.ResultBroadcastTx) - require.NoError(t, service.BroadcastTxSync(nil, &BroadcastTxArgs{tx}, replyBroadcast)) + _, err := service.BroadcastTxSync(&rpctypes.Context{}, tx) + require.NoError(t, err) blk, err := vm.BuildBlock(context.Background()) require.NoError(t, err) @@ -37,10 +37,9 @@ func TestABCIService(t *testing.T) { err = blk.Accept(context.Background()) require.NoError(t, err) - res := new(ctypes.ResultABCIQuery) - err = service.ABCIQuery(nil, &ABCIQueryArgs{Path: "/key", Data: k}, res) - if assert.Nil(t, err) && assert.True(t, res.Response.IsOK()) { - assert.EqualValues(t, v, res.Response.Value) + reply, err := service.ABCIQuery(&rpctypes.Context{}, "/key", k, 0, false) + if assert.Nil(t, err) && assert.True(t, reply.Response.IsOK()) { + assert.EqualValues(t, v, reply.Response.Value) } spew.Dump(vm.mempool.Size()) }) @@ -68,8 +67,8 @@ func TestABCIService(t *testing.T) { }(ctx) _, _, tx := MakeTxKV() - reply := new(ctypes.ResultBroadcastTxCommit) - assert.NoError(t, service.BroadcastTxCommit(nil, &BroadcastTxArgs{tx}, reply)) + reply, err := service.BroadcastTxCommit(&rpctypes.Context{}, tx) + assert.NoError(t, err) assert.True(t, reply.CheckTx.IsOK()) assert.True(t, reply.DeliverTx.IsOK()) assert.Equal(t, 0, vm.mempool.Size()) @@ -81,8 +80,8 @@ func TestABCIService(t *testing.T) { initMempoolSize := vm.mempool.Size() _, _, tx := MakeTxKV() - reply := new(ctypes.ResultBroadcastTx) - assert.NoError(t, service.BroadcastTxAsync(nil, &BroadcastTxArgs{tx}, reply)) + reply, err := service.BroadcastTxAsync(&rpctypes.Context{}, tx) + assert.NoError(t, err) assert.NotNil(t, reply.Hash) assert.Equal(t, initMempoolSize+1, vm.mempool.Size()) assert.EqualValues(t, tx, vm.mempool.ReapMaxTxs(-1)[0]) @@ -94,8 +93,8 @@ func TestABCIService(t *testing.T) { initMempoolSize := vm.mempool.Size() _, _, tx := MakeTxKV() - reply := new(ctypes.ResultBroadcastTx) - assert.NoError(t, service.BroadcastTxSync(nil, &BroadcastTxArgs{Tx: tx}, reply)) + reply, err := service.BroadcastTxSync(&rpctypes.Context{}, tx) + assert.NoError(t, err) assert.Equal(t, reply.Code, atypes.CodeTypeOK) assert.Equal(t, initMempoolSize+1, vm.mempool.Size()) assert.EqualValues(t, tx, vm.mempool.ReapMaxTxs(-1)[0]) @@ -105,8 +104,8 @@ func TestABCIService(t *testing.T) { func TestHistoryService(t *testing.T) { vm, service, _ := mustNewCounterTestVm(t) - txReply := new(ctypes.ResultBroadcastTx) - assert.NoError(t, service.BroadcastTxSync(nil, &BroadcastTxArgs{Tx: []byte{0x00}}, txReply)) + txReply, err := service.BroadcastTxSync(&rpctypes.Context{}, []byte{0x00}) + assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, txReply.Code) blk, err := vm.BuildBlock(context.Background()) @@ -115,14 +114,14 @@ func TestHistoryService(t *testing.T) { assert.NoError(t, blk.Accept(context.Background())) t.Run("BlockchainInfo", func(t *testing.T) { - reply := new(ctypes.ResultBlockchainInfo) - assert.NoError(t, service.BlockchainInfo(nil, &BlockchainInfoArgs{1, 100}, reply)) + reply, err := service.BlockchainInfo(&rpctypes.Context{}, 1, 100) + assert.NoError(t, err) assert.Equal(t, int64(1), reply.LastHeight) }) t.Run("Genesis", func(t *testing.T) { - reply := new(ctypes.ResultGenesis) - assert.NoError(t, service.Genesis(nil, nil, reply)) + reply, err := service.Genesis(&rpctypes.Context{}) + assert.NoError(t, err) assert.Equal(t, vm.genesis, reply.Genesis) }) } @@ -131,27 +130,27 @@ func TestNetworkService(t *testing.T) { vm, service, _ := mustNewCounterTestVm(t) t.Run("NetInfo", func(t *testing.T) { - reply := new(ctypes.ResultNetInfo) - assert.NoError(t, service.NetInfo(nil, nil, reply)) + _, err := service.NetInfo(&rpctypes.Context{}) + assert.NoError(t, err) }) t.Run("DumpConsensusState", func(t *testing.T) { - reply := new(ctypes.ResultDumpConsensusState) - assert.NoError(t, service.DumpConsensusState(nil, nil, reply)) + _, err := service.DumpConsensusState(&rpctypes.Context{}) + assert.NoError(t, err) }) t.Run("ConsensusState", func(t *testing.T) { - reply := new(ctypes.ResultConsensusState) - assert.NoError(t, service.ConsensusState(nil, nil, reply)) + _, err := service.ConsensusState(&rpctypes.Context{}) + assert.NoError(t, err) }) t.Run("ConsensusParams", func(t *testing.T) { - reply := new(ctypes.ResultConsensusParams) - assert.NoError(t, service.ConsensusParams(nil, nil, reply)) + reply, err := service.ConsensusParams(&rpctypes.Context{}, nil) + assert.NoError(t, err) assert.Equal(t, int64(0), reply.BlockHeight) - txReply := new(ctypes.ResultBroadcastTx) - assert.NoError(t, service.BroadcastTxSync(nil, &BroadcastTxArgs{Tx: []byte{0x00}}, txReply)) + txReply, err := service.BroadcastTxSync(&rpctypes.Context{}, []byte{0x00}) + assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, txReply.Code) blk, err := vm.BuildBlock(context.Background()) @@ -159,13 +158,14 @@ func TestNetworkService(t *testing.T) { assert.NotNil(t, blk) assert.NoError(t, blk.Accept(context.Background())) - assert.NoError(t, service.ConsensusParams(nil, nil, reply)) - assert.Equal(t, int64(1), reply.BlockHeight) + reply2, err := service.ConsensusParams(&rpctypes.Context{}, nil) + assert.NoError(t, err) + assert.Equal(t, int64(1), reply2.BlockHeight) }) t.Run("Health", func(t *testing.T) { - reply := new(ctypes.ResultHealth) - assert.NoError(t, service.Health(nil, nil, reply)) + _, err := service.Health(&rpctypes.Context{}) + assert.NoError(t, err) }) } @@ -177,9 +177,7 @@ func TestSignService(t *testing.T) { assert.ErrorIs(t, err, errNoPendingTxs, "expecting error no txs") assert.Nil(t, blk0) - txArg := &BroadcastTxArgs{tx} - txReply := new(ctypes.ResultBroadcastTx) - err = service.BroadcastTxSync(nil, txArg, txReply) + txReply, err := service.BroadcastTxSync(&rpctypes.Context{}, tx) assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, txReply.Code) @@ -191,40 +189,39 @@ func TestSignService(t *testing.T) { height1 := int64(blk1.Height()) t.Run("Block", func(t *testing.T) { - replyWithoutHeight := new(ctypes.ResultBlock) - assert.NoError(t, service.Block(nil, &BlockHeightArgs{&height1}, replyWithoutHeight)) + replyWithoutHeight, err := service.Block(&rpctypes.Context{}, &height1) + assert.NoError(t, err) if assert.NotNil(t, replyWithoutHeight.Block) { assert.EqualValues(t, height1, replyWithoutHeight.Block.Height) } - reply := new(ctypes.ResultBlock) - assert.NoError(t, service.Block(nil, &BlockHeightArgs{Height: &height1}, reply)) + reply, err := service.Block(&rpctypes.Context{}, &height1) + assert.NoError(t, err) if assert.NotNil(t, reply.Block) { assert.EqualValues(t, height1, reply.Block.Height) } }) t.Run("BlockByHash", func(t *testing.T) { - replyWithoutHash := new(ctypes.ResultBlock) - assert.NoError(t, service.BlockByHash(nil, &BlockHashArgs{}, replyWithoutHash)) + replyWithoutHash, err := service.BlockByHash(&rpctypes.Context{}, []byte{}) + assert.NoError(t, err) assert.Nil(t, replyWithoutHash.Block) - reply := new(ctypes.ResultBlock) hash := blk1.ID() - - assert.NoError(t, service.BlockByHash(nil, &BlockHashArgs{Hash: hash[:]}, reply)) + reply, err := service.BlockByHash(&rpctypes.Context{}, hash[:]) + assert.NoError(t, err) if assert.NotNil(t, reply.Block) { assert.EqualValues(t, hash[:], reply.Block.Hash().Bytes()) } }) t.Run("BlockResults", func(t *testing.T) { - replyWithoutHeight := new(ctypes.ResultBlockResults) - assert.NoError(t, service.BlockResults(nil, &BlockHeightArgs{}, replyWithoutHeight)) + replyWithoutHeight, err := service.BlockResults(&rpctypes.Context{}, nil) + assert.NoError(t, err) assert.Equal(t, height1, replyWithoutHeight.Height) - reply := new(ctypes.ResultBlockResults) - assert.NoError(t, service.BlockResults(nil, &BlockHeightArgs{Height: &height1}, reply)) + reply, err := service.BlockResults(&rpctypes.Context{}, &height1) + assert.NoError(t, err) if assert.NotNil(t, reply.TxsResults) { assert.Equal(t, height1, reply.Height) } @@ -233,8 +230,8 @@ func TestSignService(t *testing.T) { t.Run("Tx", func(t *testing.T) { time.Sleep(2 * time.Second) - reply := new(ctypes.ResultTx) - assert.NoError(t, service.Tx(nil, &TxArgs{Hash: txReply.Hash.Bytes()}, reply)) + reply, err := service.Tx(&rpctypes.Context{}, txReply.Hash.Bytes(), false) + assert.NoError(t, err) assert.EqualValues(t, txReply.Hash, reply.Hash) assert.EqualValues(t, tx, reply.Tx) }) @@ -259,17 +256,13 @@ func TestStatusService(t *testing.T) { assert.ErrorIs(t, err, errNoPendingTxs, "expecting error no txs") assert.Nil(t, blk0) - txArg := &BroadcastTxArgs{ - Tx: []byte{0x01}, - } - txReply := &ctypes.ResultBroadcastTx{} - err = service.BroadcastTxSync(nil, txArg, txReply) + txReply, err := service.BroadcastTxSync(&rpctypes.Context{}, []byte{0x01}) assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, txReply.Code) t.Run("Status", func(t *testing.T) { - reply1 := new(ctypes.ResultStatus) - assert.NoError(t, service.Status(nil, nil, reply1)) + reply1, err := service.Status(&rpctypes.Context{}) + assert.NoError(t, err) assert.Equal(t, int64(0), reply1.SyncInfo.LatestBlockHeight) blk, err := vm.BuildBlock(context.Background()) @@ -277,8 +270,8 @@ func TestStatusService(t *testing.T) { assert.NotNil(t, blk) assert.NoError(t, blk.Accept(context.Background())) - reply2 := new(ctypes.ResultStatus) - assert.NoError(t, service.Status(nil, nil, reply2)) + reply2, err := service.Status(&rpctypes.Context{}) + assert.NoError(t, err) assert.Equal(t, int64(1), reply2.SyncInfo.LatestBlockHeight) }) } @@ -290,32 +283,30 @@ func TestMempoolService(t *testing.T) { assert.ErrorIs(t, err, errNoPendingTxs, "expecting error no txs") assert.Nil(t, blk0) - txArg := &BroadcastTxArgs{ - Tx: []byte{0x01}, - } - txReply := &ctypes.ResultBroadcastTx{} - err = service.BroadcastTxSync(nil, txArg, txReply) + tx := []byte{0x01} + txReply, err := service.BroadcastTxSync(&rpctypes.Context{}, []byte{0x01}) assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, txReply.Code) t.Run("UnconfirmedTxs", func(t *testing.T) { limit := 100 - reply := new(ctypes.ResultUnconfirmedTxs) - assert.NoError(t, service.UnconfirmedTxs(nil, &UnconfirmedTxsArgs{Limit: &limit}, reply)) + reply, err := service.UnconfirmedTxs(&rpctypes.Context{}, &limit) + assert.NoError(t, err) assert.True(t, len(reply.Txs) == 1) - assert.Equal(t, reply.Txs[0], txArg.Tx) + assert.Equal(t, reply.Txs[0], tx) }) t.Run("NumUnconfirmedTxs", func(t *testing.T) { - reply := new(ctypes.ResultUnconfirmedTxs) - assert.NoError(t, service.NumUnconfirmedTxs(nil, nil, reply)) + reply, err := service.NumUnconfirmedTxs(&rpctypes.Context{}) + assert.NoError(t, err) assert.Equal(t, reply.Count, 1) assert.Equal(t, reply.Total, 1) }) t.Run("CheckTx", func(t *testing.T) { - reply1 := new(ctypes.ResultCheckTx) - assert.NoError(t, service.CheckTx(nil, &CheckTxArgs{Tx: txArg.Tx}, reply1)) + reply1, err := service.CheckTx(&rpctypes.Context{}, tx) + assert.NoError(t, err) + t.Logf("%v\n", reply1) // ToDo: check reply1 blk, err := vm.BuildBlock(context.Background()) @@ -323,8 +314,9 @@ func TestMempoolService(t *testing.T) { assert.NotNil(t, blk) assert.NoError(t, blk.Accept(context.Background())) - reply2 := new(ctypes.ResultCheckTx) - assert.NoError(t, service.CheckTx(nil, &CheckTxArgs{Tx: txArg.Tx}, reply2)) + reply2, err := service.CheckTx(&rpctypes.Context{}, tx) + assert.NoError(t, err) // ToDo: check reply2 + t.Logf("%v\n", reply2) }) } diff --git a/vm/vm_test.go b/vm/vm_test.go index 02801de19..128bfb9e6 100644 --- a/vm/vm_test.go +++ b/vm/vm_test.go @@ -4,6 +4,7 @@ import ( "context" _ "embed" "fmt" + rpctypes "github.com/consideritdone/landslidecore/rpc/jsonrpc/types" "os" "testing" @@ -22,7 +23,6 @@ import ( "github.com/consideritdone/landslidecore/abci/example/counter" atypes "github.com/consideritdone/landslidecore/abci/types" tmrand "github.com/consideritdone/landslidecore/libs/rand" - ctypes "github.com/consideritdone/landslidecore/rpc/core/types" ) var ( @@ -100,11 +100,8 @@ func TestInitVm(t *testing.T) { assert.Nil(t, blk0) // submit first tx (0x00) - args := &BroadcastTxArgs{ - Tx: []byte{0x00}, - } - reply := &ctypes.ResultBroadcastTx{} - err = service.BroadcastTxSync(nil, args, reply) + tx := []byte{0x00} + reply, err := service.BroadcastTxSync(&rpctypes.Context{}, tx) assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, reply.Code) @@ -129,20 +126,14 @@ func TestInitVm(t *testing.T) { t.Logf("TM Block Tx count: %d", len(tmBlk1.Data.Txs)) // submit second tx (0x01) - args = &BroadcastTxArgs{ - Tx: []byte{0x01}, - } - reply = &ctypes.ResultBroadcastTx{} - err = service.BroadcastTxSync(nil, args, reply) + tx = []byte{0x01} + reply, err = service.BroadcastTxSync(&rpctypes.Context{}, tx) assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, reply.Code) // submit 3rd tx (0x02) - args = &BroadcastTxArgs{ - Tx: []byte{0x02}, - } - reply = &ctypes.ResultBroadcastTx{} - err = service.BroadcastTxSync(nil, args, reply) + tx = []byte{0x02} + reply, err = service.BroadcastTxSync(&rpctypes.Context{}, tx) assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, reply.Code) From 0b81e912b7683b6e2d1db97dfce95640835285e6 Mon Sep 17 00:00:00 2001 From: Ivan Sukach Date: Fri, 23 Jun 2023 11:06:46 +0200 Subject: [PATCH 06/10] add note from Ilnur about subscriptionID --- vm/service.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/vm/service.go b/vm/service.go index beb66e6dc..5cac2102b 100644 --- a/vm/service.go +++ b/vm/service.go @@ -124,6 +124,8 @@ func (s *LocalService) Subscribe(ctx *rpctypes.Context, query string) (*ctypes.R closeIfSlow := s.vm.rpcConfig.CloseOnSlowClient + // TODO: inspired by Ilnur: usage of ctx.JSONReq.ID may cause situation when user or server try to create multiple subscriptions with the same id. + // Solution: return error code with the error sescription when this situation happens // Capture the current ID, since it can change in the future. subscriptionID := ctx.JSONReq.ID go func() { From 979b4e6ad16f587b0dd005b3427d0bfb45ff3104 Mon Sep 17 00:00:00 2001 From: Ivan Sukach Date: Mon, 26 Jun 2023 11:08:19 +0200 Subject: [PATCH 07/10] implement preliminary version of rpc methods' tests --- vm/service_test.go | 241 ++++++++++++++++++++++++++++++++++++++++++--- vm/vm.go | 4 +- 2 files changed, 232 insertions(+), 13 deletions(-) diff --git a/vm/service_test.go b/vm/service_test.go index 5795a3945..88095af13 100644 --- a/vm/service_test.go +++ b/vm/service_test.go @@ -2,7 +2,11 @@ package vm import ( "context" + "encoding/base64" + tmjson "github.com/consideritdone/landslidecore/libs/json" rpctypes "github.com/consideritdone/landslidecore/rpc/jsonrpc/types" + "github.com/consideritdone/landslidecore/types" + "strings" "testing" "time" @@ -101,6 +105,67 @@ func TestABCIService(t *testing.T) { }) } +func TestEventService(t *testing.T) { + _, service, _ := mustNewCounterTestVm(t) + + // subscribe to new blocks and make sure height increments by 1 + t.Run("Subscribe", func(t *testing.T) { + events := []string{ + types.QueryForEvent(types.EventNewBlock).String(), + types.QueryForEvent(types.EventNewBlockHeader).String(), + types.QueryForEvent(types.EventValidBlock).String(), + } + + for i, event := range events { + _, err := service.Subscribe(&rpctypes.Context{JSONReq: &rpctypes.RPCRequest{ID: rpctypes.JSONRPCIntID(i)}}, event) + require.NoError(t, err) + } + t.Cleanup(func() { + if _, err := service.UnsubscribeAll(&rpctypes.Context{}); err != nil { + t.Error(err) + } + }) + }) + + t.Run("Unsubscribe", func(t *testing.T) { + events := []string{ + types.QueryForEvent(types.EventNewBlock).String(), + types.QueryForEvent(types.EventNewBlockHeader).String(), + types.QueryForEvent(types.EventValidBlock).String(), + } + + for i, event := range events { + _, err := service.Subscribe(&rpctypes.Context{JSONReq: &rpctypes.RPCRequest{ID: rpctypes.JSONRPCIntID(i)}}, event) + require.NoError(t, err) + _, err = service.Unsubscribe(&rpctypes.Context{}, event) + require.NoError(t, err) + } + //TODO: investigate the need to use Cleanup with UnsubscribeAll + //t.Cleanup(func() { + // if _, err := service.UnsubscribeAll(&rpctypes.Context{}); err != nil { + // t.Error(err) + // } + //}) + }) + + t.Run("UnsubscribeAll", func(t *testing.T) { + events := []string{ + types.QueryForEvent(types.EventNewBlock).String(), + types.QueryForEvent(types.EventNewBlockHeader).String(), + types.QueryForEvent(types.EventValidBlock).String(), + } + + for i, event := range events { + _, err := service.Subscribe(&rpctypes.Context{JSONReq: &rpctypes.RPCRequest{ID: rpctypes.JSONRPCIntID(i)}}, event) + require.NoError(t, err) + } + _, err := service.UnsubscribeAll(&rpctypes.Context{}) + if err != nil { + t.Error(err) + } + }) +} + func TestHistoryService(t *testing.T) { vm, service, _ := mustNewCounterTestVm(t) @@ -124,6 +189,27 @@ func TestHistoryService(t *testing.T) { assert.NoError(t, err) assert.Equal(t, vm.genesis, reply.Genesis) }) + + t.Run("GenesisChunked", func(t *testing.T) { + + first, err := service.GenesisChunked(&rpctypes.Context{}, 0) + require.NoError(t, err) + + decoded := make([]string, 0, first.TotalChunks) + for i := 0; i < first.TotalChunks; i++ { + chunk, err := service.GenesisChunked(&rpctypes.Context{}, uint(i)) + require.NoError(t, err) + data, err := base64.StdEncoding.DecodeString(chunk.Data) + require.NoError(t, err) + decoded = append(decoded, string(data)) + + } + doc := []byte(strings.Join(decoded, "")) + + var out types.GenesisDoc + require.NoError(t, tmjson.Unmarshal(doc, &out), + "first: %+v, doc: %s", first, string(doc)) + }) } func TestNetworkService(t *testing.T) { @@ -171,6 +257,8 @@ func TestNetworkService(t *testing.T) { func TestSignService(t *testing.T) { _, _, tx := MakeTxKV() + tx2 := []byte{0x02} + tx3 := []byte{0x03} vm, service, _ := mustNewKVTestVm(t) blk0, err := vm.BuildBlock(context.Background()) @@ -236,17 +324,145 @@ func TestSignService(t *testing.T) { assert.EqualValues(t, tx, reply.Tx) }) - //t.Run("TxSearch", func(t *testing.T) { - // reply := new(ctypes.ResultTxSearch) - // assert.NoError(t, service.TxSearch(nil, &TxSearchArgs{Query: "tx.height>0"}, reply)) - // assert.True(t, len(reply.Txs) > 0) - //}) - - //t.Run("BlockSearch", func(t *testing.T) { - // reply := new(ctypes.ResultBlockSearch) - // assert.NoError(t, service.BlockSearch(nil, &BlockSearchArgs{Query: "block.height>0"}, reply)) - // assert.True(t, len(reply.Blocks) > 0) - //}) + t.Run("TxSearch", func(t *testing.T) { + txReply, err := service.BroadcastTxAsync(&rpctypes.Context{}, tx2) + require.NoError(t, err) + assert.Equal(t, atypes.CodeTypeOK, txReply.Code) + //TODO: why it is not able to find tx? + reply, err := service.TxSearch(&rpctypes.Context{}, "tx.height>=0", false, nil, nil, "desc") + assert.NoError(t, err) + assert.True(t, len(reply.Txs) > 0) + }) + + //TODO: Check logic of test + t.Run("Commit", func(t *testing.T) { + txReply, err := service.BroadcastTxAsync(&rpctypes.Context{}, tx3) + require.NoError(t, err) + assert.Equal(t, atypes.CodeTypeOK, txReply.Code) + + assert, require := assert.New(t), require.New(t) + + // get an offset of height to avoid racing and guessing + s, err := service.Status(&rpctypes.Context{}) + require.NoError(err) + // sh is start height or status height + sh := s.SyncInfo.LatestBlockHeight + + // look for the future + h := sh + 20 + _, err = service.Block(&rpctypes.Context{}, &h) + require.Error(err) // no block yet + + // write something + k, v, tx := MakeTxKV() + bres, err := service.BroadcastTxCommit(&rpctypes.Context{}, tx) + require.NoError(err) + require.True(bres.DeliverTx.IsOK()) + txh := bres.Height + apph := txh + 1 // this is where the tx will be applied to the state + + // wait before querying + err = WaitForHeight(service, apph, nil) + require.NoError(err) + + qres, err := service.ABCIQuery(&rpctypes.Context{}, "/key", k, 0, false) + require.NoError(err) + if assert.True(qres.Response.IsOK()) { + assert.Equal(k, qres.Response.Key) + assert.EqualValues(v, qres.Response.Value) + } + + // make sure we can lookup the tx with proof + ptx, err := service.Tx(&rpctypes.Context{}, bres.Hash, true) + require.NoError(err) + assert.EqualValues(txh, ptx.Height) + assert.EqualValues(tx, ptx.Tx) + + // and we can even check the block is added + block, err := service.Block(&rpctypes.Context{}, &apph) + require.NoError(err) + appHash := block.Block.Header.AppHash + assert.True(len(appHash) > 0) + assert.EqualValues(apph, block.Block.Header.Height) + + blockByHash, err := service.BlockByHash(&rpctypes.Context{}, block.BlockID.Hash) + require.NoError(err) + require.Equal(block, blockByHash) + + // now check the results + blockResults, err := service.BlockResults(&rpctypes.Context{}, &txh) + require.Nil(err, "%+v", err) + assert.Equal(txh, blockResults.Height) + if assert.Equal(1, len(blockResults.TxsResults)) { + // check success code + assert.EqualValues(0, blockResults.TxsResults[0].Code) + } + + // check blockchain info, now that we know there is info + info, err := service.BlockchainInfo(&rpctypes.Context{}, apph, apph) + require.NoError(err) + assert.True(info.LastHeight >= apph) + if assert.Equal(1, len(info.BlockMetas)) { + lastMeta := info.BlockMetas[0] + assert.EqualValues(apph, lastMeta.Header.Height) + blockData := block.Block + assert.Equal(blockData.Header.AppHash, lastMeta.Header.AppHash) + assert.Equal(block.BlockID, lastMeta.BlockID) + } + + // and get the corresponding commit with the same apphash + commit, err := service.Commit(&rpctypes.Context{}, &apph) + require.NoError(err) + cappHash := commit.Header.AppHash + assert.Equal(appHash, cappHash) + assert.NotNil(commit.Commit) + + // compare the commits (note Commit(2) has commit from Block(3)) + h = apph - 1 + commit2, err := service.Commit(&rpctypes.Context{}, &h) + require.NoError(err) + assert.Equal(block.Block.LastCommitHash, commit2.Commit.Hash()) + + // and we got a proof that works! + pres, err := service.ABCIQuery(&rpctypes.Context{}, "/key", k, 0, true) + require.NoError(err) + assert.True(pres.Response.IsOK()) + + // XXX Test proof + }) + + //TODO: COMMIT + //TODO: VALIDATORS + + t.Run("Validators", func(t *testing.T) { + + // make sure this is the right genesis file + gen, err := service.Genesis(&rpctypes.Context{}) + require.Nil(t, err, "%+v", err) + // get the genesis validator + require.Equal(t, 1, len(gen.Genesis.Validators)) + gval := gen.Genesis.Validators[0] + + // get the current validators + h := int64(1) + vals, err := service.Validators(&rpctypes.Context{}, &h, nil, nil) + require.Nil(t, err, "%d: %+v", err) + require.Equal(t, 1, len(vals.Validators)) + require.Equal(t, 1, vals.Count) + require.Equal(t, 1, vals.Total) + val := vals.Validators[0] + + // make sure the current set is also the genesis set + assert.Equal(t, gval.Power, val.VotingPower) + assert.Equal(t, gval.PubKey, val.PubKey) + }) + + t.Run("BlockSearch", func(t *testing.T) { + //TODO: CREATE BLOCK? + reply, err := service.BlockSearch(&rpctypes.Context{}, "block.height>0", nil, nil, "desc") + assert.NoError(t, err) + assert.True(t, len(reply.Blocks) > 0) + }) } func TestStatusService(t *testing.T) { @@ -284,6 +500,7 @@ func TestMempoolService(t *testing.T) { assert.Nil(t, blk0) tx := []byte{0x01} + expectedTx := types.Tx(tx) txReply, err := service.BroadcastTxSync(&rpctypes.Context{}, []byte{0x01}) assert.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, txReply.Code) @@ -293,7 +510,7 @@ func TestMempoolService(t *testing.T) { reply, err := service.UnconfirmedTxs(&rpctypes.Context{}, &limit) assert.NoError(t, err) assert.True(t, len(reply.Txs) == 1) - assert.Equal(t, reply.Txs[0], tx) + assert.Equal(t, expectedTx, reply.Txs[0]) }) t.Run("NumUnconfirmedTxs", func(t *testing.T) { diff --git a/vm/vm.go b/vm/vm.go index 7c0971689..c7d4f226a 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -132,7 +132,7 @@ type VM struct { blockIndexerDB dbm.DB indexerService *txindex.IndexerService - rpcConfig cfg.RPCConfig + rpcConfig *cfg.RPCConfig clock mockable.Clock } @@ -203,6 +203,8 @@ func (vm *VM) Initialize( } vm.eventBus = eventBus + vm.rpcConfig = config.DefaultRPCConfig() + vm.txIndexerDB = Database{prefixdb.NewNested(txIndexerDBPrefix, baseDB)} vm.txIndexer = txidxkv.NewTxIndex(vm.txIndexerDB) vm.blockIndexerDB = Database{prefixdb.NewNested(blockIndexerDBPrefix, baseDB)} From b9fa9eb76bfd05450fa95f1720bf3ea332f225b3 Mon Sep 17 00:00:00 2001 From: Ivan Sukach Date: Mon, 10 Jul 2023 14:37:59 +0200 Subject: [PATCH 08/10] add db tests --- vm/db_test.go | 209 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 209 insertions(+) create mode 100644 vm/db_test.go diff --git a/vm/db_test.go b/vm/db_test.go new file mode 100644 index 000000000..9ab1e9cd8 --- /dev/null +++ b/vm/db_test.go @@ -0,0 +1,209 @@ +package vm + +import ( + "bytes" + "fmt" + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/prefixdb" + dbm "github.com/tendermint/tm-db" + "math/rand" + "sync" + "testing" +) + +var ( + testDBPrefix = []byte("test") +) + +func TestMemDB(t *testing.T) { + vm, _, _ := mustNewKVTestVm(t) + baseDB := vm.dbManager.Current().Database + db := Database{prefixdb.NewNested(testDBPrefix, baseDB)} + t.Run("PrefixDB", func(t *testing.T) { Run(t, db) }) + t.Run("BaseDB(MemDB)", func(t *testing.T) { RunAvaDatabase(t, baseDB) }) +} + +// Run generates concurrent reads and writes to db so the race detector can +// verify concurrent operations are properly synchronized. +// The contents of db are garbage after Run returns. +func Run(t *testing.T, db dbm.DB) { + t.Helper() + + const numWorkers = 10 + const numKeys = 64 + + var wg sync.WaitGroup + for i := 0; i < numWorkers; i++ { + wg.Add(1) + i := i + go func() { + defer wg.Done() + + // Insert a bunch of keys with random data. + for k := 1; k <= numKeys; k++ { + key := taskKey(i, k) // say, "task--key-" + value := randomValue() + if err := db.Set(key, value); err != nil { + t.Errorf("Task %d: db.Set(%q=%q) failed: %v", + i, string(key), string(value), err) + } + } + + // Iterate over the database to make sure our keys are there. + it, err := db.Iterator(nil, nil) + if err != nil { + t.Errorf("Iterator[%d]: %v", i, err) + return + } + found := make(map[string][]byte) + mine := []byte(fmt.Sprintf("task-%d-", i)) + for { + if key := it.Key(); bytes.HasPrefix(key, mine) { + found[string(key)] = it.Value() + } + it.Next() + if !it.Valid() { + break + } + } + if err := it.Error(); err != nil { + t.Errorf("Iterator[%d] reported error: %v", i, err) + } + if err := it.Close(); err != nil { + t.Errorf("Close iterator[%d]: %v", i, err) + } + if len(found) != numKeys { + t.Errorf("Task %d: found %d keys, wanted %d", i, len(found), numKeys) + } + + for key, value := range mine { + fmt.Println("--") + fmt.Println(key) + fmt.Println(value) + fmt.Println("--") + } + + // Delete all the keys we inserted. + for k := 1; k <= numKeys; k++ { + key := taskKey(i, k) // say, "task--key-" + if err := db.Delete(key); err != nil { + t.Errorf("Delete %q: %v", key, err) + } + } + // Iterate over the database to make sure our keys are there. + it, err = db.Iterator(nil, nil) + if err != nil { + t.Errorf("Iterator[%d]: %v", i, err) + return + } + foundAfterRemoval := make(map[string][]byte) + for { + if key := it.Key(); bytes.HasPrefix(key, mine) { + foundAfterRemoval[string(key)] = it.Value() + } + it.Next() + if !it.Valid() { + break + } + } + if len(foundAfterRemoval) != 0 { + t.Errorf("Values left after deletion: %v", foundAfterRemoval) + return + } + }() + } + wg.Wait() +} + +// Run generates concurrent reads and writes to db so the race detector can +// verify concurrent operations are properly synchronized. +// The contents of db are garbage after Run returns. +func RunAvaDatabase(t *testing.T, db database.Database) { + t.Helper() + + const numWorkers = 10 + const numKeys = 64 + + var wg sync.WaitGroup + for i := 0; i < numWorkers; i++ { + wg.Add(1) + i := i + go func() { + defer wg.Done() + + // Insert a bunch of keys with random data. + for k := 1; k <= numKeys; k++ { + key := taskKey(i, k) // say, "task--key-" + value := randomValue() + if err := db.Put(key, value); err != nil { + t.Errorf("Task %d: db.Set(%q=%q) failed: %v", + i, string(key), string(value), err) + } + } + + // Iterate over the database to make sure our keys are there. + it := db.NewIterator() + found := make(map[string][]byte) + mine := []byte(fmt.Sprintf("task-%d-", i)) + for { + if key := it.Key(); bytes.HasPrefix(key, mine) { + found[string(key)] = it.Value() + } + it.Next() + if !it.Next() { + break + } + } + if err := it.Error(); err != nil { + t.Errorf("Iterator[%d] reported error: %v", i, err) + } + it.Release() + + if len(found) != numKeys { + t.Errorf("Task %d: found %d keys, wanted %d", i, len(found), numKeys) + } + + for key, value := range mine { + fmt.Println("--") + fmt.Println(key) + fmt.Println(value) + fmt.Println("--") + } + + // Delete all the keys we inserted. + for k := 1; k <= numKeys; k++ { + key := taskKey(i, k) // say, "task--key-" + if err := db.Delete(key); err != nil { + t.Errorf("Delete %q: %v", key, err) + } + } + // Iterate over the database to make sure our keys are there. + it = db.NewIterator() + foundAfterRemoval := make(map[string][]byte) + for { + if key := it.Key(); bytes.HasPrefix(key, mine) { + foundAfterRemoval[string(key)] = it.Value() + } + it.Next() + if !it.Next() { + break + } + } + if len(foundAfterRemoval) != 0 { + t.Errorf("Values left after deletion: %v", foundAfterRemoval) + return + } + }() + } + wg.Wait() +} + +func taskKey(i, k int) []byte { + return []byte(fmt.Sprintf("task-%d-key-%d", i, k)) +} + +func randomValue() []byte { + value := []byte("value-") + dec := rand.Uint32() + return []byte(fmt.Sprintf("%s%d", value, dec)) +} From 124d975a8a3610879caf256eb3b86cbd14465068 Mon Sep 17 00:00:00 2001 From: Ivan Sukach Date: Mon, 10 Jul 2023 15:33:21 +0200 Subject: [PATCH 09/10] search tx by hash in TxSearch test --- vm/service_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vm/service_test.go b/vm/service_test.go index 88095af13..a9004c7a6 100644 --- a/vm/service_test.go +++ b/vm/service_test.go @@ -3,6 +3,7 @@ package vm import ( "context" "encoding/base64" + "fmt" tmjson "github.com/consideritdone/landslidecore/libs/json" rpctypes "github.com/consideritdone/landslidecore/rpc/jsonrpc/types" "github.com/consideritdone/landslidecore/types" @@ -329,7 +330,7 @@ func TestSignService(t *testing.T) { require.NoError(t, err) assert.Equal(t, atypes.CodeTypeOK, txReply.Code) //TODO: why it is not able to find tx? - reply, err := service.TxSearch(&rpctypes.Context{}, "tx.height>=0", false, nil, nil, "desc") + reply, err := service.TxSearch(&rpctypes.Context{}, fmt.Sprintf("tx.hash='%s'", txReply.Hash.String()), false, nil, nil, "desc") assert.NoError(t, err) assert.True(t, len(reply.Txs) > 0) }) From c7d28d0d70c2c6e12bb644bc3fc7fc34228ab6c3 Mon Sep 17 00:00:00 2001 From: Ivan Sukach Date: Thu, 13 Jul 2023 12:24:08 +0200 Subject: [PATCH 10/10] search tx by height --- vm/service_test.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/vm/service_test.go b/vm/service_test.go index a9004c7a6..f5f4ff84a 100644 --- a/vm/service_test.go +++ b/vm/service_test.go @@ -326,13 +326,20 @@ func TestSignService(t *testing.T) { }) t.Run("TxSearch", func(t *testing.T) { - txReply, err := service.BroadcastTxAsync(&rpctypes.Context{}, tx2) + txReply2, err := service.BroadcastTxAsync(&rpctypes.Context{}, tx2) + blk2, err := vm.BuildBlock(context.Background()) require.NoError(t, err) - assert.Equal(t, atypes.CodeTypeOK, txReply.Code) + assert.NotNil(t, blk2) + assert.NoError(t, blk2.Accept(context.Background())) + assert.Equal(t, atypes.CodeTypeOK, txReply2.Code) //TODO: why it is not able to find tx? - reply, err := service.TxSearch(&rpctypes.Context{}, fmt.Sprintf("tx.hash='%s'", txReply.Hash.String()), false, nil, nil, "desc") + reply, err := service.TxSearch(&rpctypes.Context{}, fmt.Sprintf("tx.hash='%s'", txReply2.Hash.String()), false, nil, nil, "desc") assert.NoError(t, err) assert.True(t, len(reply.Txs) > 0) + // Search by height + reply2, err := service.TxSearch(&rpctypes.Context{}, fmt.Sprintf("tx.height=%d", blk2.Height()), false, nil, nil, "desc") + assert.NoError(t, err) + assert.True(t, len(reply2.Txs) > 0) }) //TODO: Check logic of test