diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index a18454ce..239ffb0f 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -5,7 +5,9 @@ name: Go on: push: - branches: [ "main" ] + branches: + - "main" + - "dev" pull_request: jobs: diff --git a/example/kvstore/kvstore.go b/example/kvstore/kvstore.go index aced4de9..8b6898bf 100644 --- a/example/kvstore/kvstore.go +++ b/example/kvstore/kvstore.go @@ -7,11 +7,18 @@ import ( "github.com/cometbft/cometbft/abci/example/kvstore" "github.com/consideritdone/landslidevm" + "github.com/consideritdone/landslidevm/vm" ) func main() { - appCreator := landslidevm.NewLocalAppCreator(kvstore.NewInMemoryApplication()) + appCreator := KvStoreCreator() if err := landslidevm.Serve(context.Background(), appCreator); err != nil { panic(fmt.Sprintf("can't serve application: %s", err)) } } + +func KvStoreCreator() vm.AppCreator { + return func(config *vm.AppCreatorOpts) (vm.Application, error) { + return kvstore.NewPersistentApplication(config.ChainDataDir), nil + } +} diff --git a/example/wasm/main.go b/example/wasm/main.go index b892087d..581bcd71 100644 --- a/example/wasm/main.go +++ b/example/wasm/main.go @@ -2,39 +2,211 @@ package main import ( "context" + "encoding/json" "fmt" "os" + "os/signal" + "syscall" "cosmossdk.io/log" "github.com/CosmWasm/wasmd/app" "github.com/CosmWasm/wasmd/x/wasm/keeper" wasmtypes "github.com/CosmWasm/wasmd/x/wasm/types" + rpchttp "github.com/cometbft/cometbft/rpc/client/http" dbm "github.com/cosmos/cosmos-db" "github.com/cosmos/cosmos-sdk/baseapp" + "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/codec" + cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec" "github.com/cosmos/cosmos-sdk/server" + srvconfig "github.com/cosmos/cosmos-sdk/server/config" + servergrpc "github.com/cosmos/cosmos-sdk/server/grpc" "github.com/cosmos/cosmos-sdk/testutil/sims" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/x/auth/tx" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/consideritdone/landslidevm" + "github.com/consideritdone/landslidevm/utils/ids" + "github.com/consideritdone/landslidevm/vm" + vmtypes "github.com/consideritdone/landslidevm/vm/types" ) func main() { - db, err := dbm.NewDB("dbName", dbm.MemDBBackend, "") - if err != nil { - panic(err) + appCreator := WasmCreator() + if err := landslidevm.Serve(context.Background(), appCreator); err != nil { + panic(fmt.Sprintf("can't serve application: %s", err)) } - logger := log.NewNopLogger() +} - cfg := sdk.GetConfig() - cfg.SetBech32PrefixForAccount(app.Bech32PrefixAccAddr, app.Bech32PrefixAccPub) - cfg.SetBech32PrefixForValidator(app.Bech32PrefixValAddr, app.Bech32PrefixValPub) - cfg.SetBech32PrefixForConsensusNode(app.Bech32PrefixConsAddr, app.Bech32PrefixConsPub) - cfg.SetAddressVerifier(wasmtypes.VerifyAddressLen()) - cfg.Seal() - wasmApp := app.NewWasmApp(logger, db, nil, true, sims.NewAppOptionsWithFlagHome(os.TempDir()), []keeper.Option{}, baseapp.SetChainID("landslide-test")) +func WasmCreator() vm.AppCreator { + return func(config *vm.AppCreatorOpts) (vm.Application, error) { + db, err := dbm.NewDB("wasm", dbm.GoLevelDBBackend, config.ChainDataDir) + if err != nil { + panic(err) + } + logger := log.NewNopLogger() - appCreator := landslidevm.NewLocalAppCreator(server.NewCometABCIWrapper(wasmApp)) - if err := landslidevm.Serve(context.Background(), appCreator); err != nil { - panic(fmt.Sprintf("can't serve application: %s", err)) + cfg := sdk.GetConfig() + cfg.SetBech32PrefixForAccount(app.Bech32PrefixAccAddr, app.Bech32PrefixAccPub) + cfg.SetBech32PrefixForValidator(app.Bech32PrefixValAddr, app.Bech32PrefixValPub) + cfg.SetBech32PrefixForConsensusNode(app.Bech32PrefixConsAddr, app.Bech32PrefixConsPub) + cfg.SetAddressVerifier(wasmtypes.VerifyAddressLen()) + cfg.Seal() + + srvCfg := *srvconfig.DefaultConfig() + grpcCfg := srvCfg.GRPC + var vmCfg vmtypes.VmConfig + vmCfg.SetDefaults() + if len(config.ConfigBytes) > 0 { + if err := json.Unmarshal(config.ConfigBytes, &vmCfg); err != nil { + return nil, fmt.Errorf("failed to unmarshal config %s: %w", string(config.ConfigBytes), err) + } + // set the grpc port, if it is set to 0, disable gRPC + if vmCfg.GRPCPort > 0 { + grpcCfg.Address = fmt.Sprintf("127.0.0.1:%d", vmCfg.GRPCPort) + } else { + grpcCfg.Enable = false + } + } + + if err := vmCfg.Validate(); err != nil { + return nil, err + } + chainID := vmCfg.NetworkName + + var wasmApp = app.NewWasmApp( + logger, + db, + nil, + true, + sims.NewAppOptionsWithFlagHome(os.TempDir()), + []keeper.Option{}, + baseapp.SetChainID(chainID), + ) + + // early return if gRPC is disabled + if !grpcCfg.Enable { + return server.NewCometABCIWrapper(wasmApp), nil + } + + interfaceRegistry := wasmApp.InterfaceRegistry() + marshaller := codec.NewProtoCodec(interfaceRegistry) + clientCtx := client.Context{}. + WithCodec(marshaller). + WithLegacyAmino(makeCodec()). + WithTxConfig(tx.NewTxConfig(marshaller, tx.DefaultSignModes)). + WithInterfaceRegistry(interfaceRegistry). + WithChainID(chainID) + + avaChainID, err := ids.ToID(config.ChainId) + if err != nil { + return nil, err + } + + rpcURI := fmt.Sprintf( + "http://127.0.0.1:%d/ext/bc/%s/rpc", + vmCfg.RPCPort, + avaChainID, + ) + + clientCtx = clientCtx.WithNodeURI(rpcURI) + rpcclient, err := rpchttp.New(rpcURI, "/websocket") + if err != nil { + return nil, err + } + clientCtx = clientCtx.WithClient(rpcclient) + + // use the provided clientCtx to register the services + wasmApp.RegisterTxService(clientCtx) + wasmApp.RegisterTendermintService(clientCtx) + wasmApp.RegisterNodeService(clientCtx, srvconfig.Config{}) + + maxSendMsgSize := grpcCfg.MaxSendMsgSize + if maxSendMsgSize == 0 { + maxSendMsgSize = srvconfig.DefaultGRPCMaxSendMsgSize + } + + maxRecvMsgSize := grpcCfg.MaxRecvMsgSize + if maxRecvMsgSize == 0 { + maxRecvMsgSize = srvconfig.DefaultGRPCMaxRecvMsgSize + } + + // if gRPC is enabled, configure gRPC client for gRPC gateway + grpcClient, err := grpc.Dial( + grpcCfg.Address, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()), + grpc.MaxCallRecvMsgSize(maxRecvMsgSize), + grpc.MaxCallSendMsgSize(maxSendMsgSize), + ), + ) + if err != nil { + return nil, err + } + + clientCtx = clientCtx.WithGRPCClient(grpcClient) + logger.Debug("gRPC client assigned to client context", "target", grpcCfg.Address) + + g, ctx := getCtx(logger, false) + + grpcSrv, err := servergrpc.NewGRPCServer(clientCtx, wasmApp, grpcCfg) + if err != nil { + return nil, err + } + + // Start the gRPC server in a goroutine. Note, the provided ctx will ensure + // that the server is gracefully shut down. + g.Go(func() error { + return servergrpc.StartGRPCServer(ctx, logger.With("module", "grpc-server"), grpcCfg, grpcSrv) + }) + + return server.NewCometABCIWrapper(wasmApp), nil + } +} + +// custom tx codec +func makeCodec() *codec.LegacyAmino { + cdc := codec.NewLegacyAmino() + sdk.RegisterLegacyAminoCodec(cdc) + cryptocodec.RegisterCrypto(cdc) + return cdc +} + +func getCtx(logger log.Logger, block bool) (*errgroup.Group, context.Context) { + ctx, cancelFn := context.WithCancel(context.Background()) + g, ctx := errgroup.WithContext(ctx) + // listen for quit signals so the calling parent process can gracefully exit + listenForQuitSignals(g, block, cancelFn, logger) + return g, ctx +} + +// listenForQuitSignals listens for SIGINT and SIGTERM. When a signal is received, +// the cleanup function is called, indicating the caller can gracefully exit or +// return. +// +// Note, the blocking behavior of this depends on the block argument. +// The caller must ensure the corresponding context derived from the cancelFn is used correctly. +func listenForQuitSignals(g *errgroup.Group, block bool, cancelFn context.CancelFunc, logger log.Logger) { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + f := func() { + sig := <-sigCh + cancelFn() + + logger.Info("caught signal", "signal", sig.String()) + } + + if block { + g.Go(func() error { + f() + return nil + }) + } else { + go f() } } diff --git a/go.mod b/go.mod index 544c9b9c..a19d095c 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/cometbft/cometbft-db v0.8.0 github.com/cosmos/cosmos-db v1.0.2 github.com/cosmos/cosmos-sdk v0.50.1 + github.com/gotestyourself/gotestyourself v2.2.0+incompatible github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mr-tron/base58 v1.2.0 github.com/prometheus/client_golang v1.17.0 @@ -201,8 +202,12 @@ require ( gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + gotest.tools v2.2.0+incompatible // indirect gotest.tools/v3 v3.5.1 // indirect nhooyr.io/websocket v1.8.6 // indirect pgregory.net/rapid v1.1.0 // indirect sigs.k8s.io/yaml v1.4.0 // indirect ) + +// pin version! 126854af5e6d has issues with the store so that queries fail +replace github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 diff --git a/go.sum b/go.sum index cc9aabfd..2c0fc1d7 100644 --- a/go.sum +++ b/go.sum @@ -446,7 +446,6 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/getsentry/sentry-go v0.25.0 h1:q6Eo+hS+yoJlTO3uu/azhQadsD8V+jQn2D8VvX1eOyI= @@ -492,7 +491,6 @@ github.com/go-playground/validator/v10 v10.11.1 h1:prmOlTVv+YjZjmRmNSF3VmspqJIxJ github.com/go-playground/validator/v10 v10.11.1/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= @@ -553,8 +551,6 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -607,7 +603,6 @@ github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= -github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= @@ -648,6 +643,8 @@ github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gotestyourself/gotestyourself v2.2.0+incompatible h1:AQwinXlbQR2HvPjQZOmDhRqsv5mZf+Jb1RnSLxcqZcI= +github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.2.2/go.mod h1:EaizFBKfUKtMIF5iaDEhniwNedqGo9FuLFzppDr3uwI= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= @@ -853,15 +850,12 @@ github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:v github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= -github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= -github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= -github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= -github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro= github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg= github.com/onsi/gomega v1.29.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= @@ -1022,8 +1016,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= -github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d h1:vfofYNRScrDdvS342BElfbETmL1Aiz3i2t0zfRj16Hs= -github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48= +github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= +github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/tendermint/go-amino v0.16.0 h1:GyhmgQKvqF82e2oZeuMSp9JTN0N09emoSZlb2lyGa2E= github.com/tendermint/go-amino v0.16.0/go.mod h1:TQU0M1i/ImAo+tYpZi73AU3V/dKeCoMC9Sphe2ZwGME= github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= @@ -1186,6 +1180,7 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -1195,7 +1190,6 @@ golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= @@ -1298,16 +1292,17 @@ golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200501052902-10377860bb8e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1430,7 +1425,6 @@ golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82u golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= @@ -1717,6 +1711,8 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= +gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/jsonrpc/http_json_handler.go b/jsonrpc/http_json_handler.go index 51c8259e..f3dc1a16 100644 --- a/jsonrpc/http_json_handler.go +++ b/jsonrpc/http_json_handler.go @@ -9,7 +9,7 @@ import ( "reflect" "sort" - cmtjson "github.com/cometbft/cometbft/libs/json" + tmjson "github.com/cometbft/cometbft/libs/json" "github.com/cometbft/cometbft/libs/log" "github.com/cometbft/cometbft/rpc/jsonrpc/server" types "github.com/cometbft/cometbft/rpc/jsonrpc/types" @@ -123,12 +123,11 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.Han cache = false } - logger.Info("calling func", "method", request.Method, "args", args) returns := rpcFunc.f.Call(args) result, err := unreflectResult(returns) - logger.Info("result of calling func for %s: err: %s", request.Method, err) if err != nil { + logger.Debug("unexpected result", "method", request.Method, "err", err) responses = append(responses, types.RPCInternalError(request.ID, err)) continue } @@ -160,7 +159,7 @@ func mapParamsToArgs( if p, ok := params[argName]; ok && p != nil && len(p) > 0 { val := reflect.New(argType) - err := cmtjson.Unmarshal(p, val.Interface()) + err := tmjson.Unmarshal(p, val.Interface()) if err != nil { return nil, err } @@ -187,7 +186,7 @@ func arrayParamsToArgs( for i, p := range params { argType := rpcFunc.args[i+argsOffset] val := reflect.New(argType) - err := cmtjson.Unmarshal(p, val.Interface()) + err := tmjson.Unmarshal(p, val.Interface()) if err != nil { return nil, err } diff --git a/landslidevm.go b/landslidevm.go index 48406f4e..dd0bb586 100644 --- a/landslidevm.go +++ b/landslidevm.go @@ -98,7 +98,7 @@ func Serve[T interface { go func(ctx context.Context) { defer func() { server.GracefulStop() - fmt.Println("vm server: graceful termination success") + fmt.Println("landslide vm server: graceful termination success") }() for { @@ -108,19 +108,19 @@ func Serve[T interface { // that we are shutting down. Once we are in the shutdown // workflow, we will gracefully exit upon receiving a SIGTERM. if !lvm.CanShutdown() { - fmt.Printf("runtime engine: ignoring signal: %s\n", s) + fmt.Printf("landslide runtime engine: ignoring signal: %s\n", s) continue } switch s { case syscall.SIGINT: - fmt.Printf("runtime engine: ignoring signal: %s\n", s) + fmt.Printf("landslide runtime engine: ignoring signal: %s\n", s) case syscall.SIGTERM: - fmt.Printf("runtime engine: received shutdown signal: %s\n", s) + fmt.Printf("landslide runtime engine: received shutdown signal: %s\n", s) return } case <-ctx.Done(): - fmt.Println("runtime engine: context has been cancelled") + fmt.Println("landslide runtime engine: context has been cancelled") return } } diff --git a/safestate/safestate.go b/safestate/safestate.go new file mode 100644 index 00000000..38c68b08 --- /dev/null +++ b/safestate/safestate.go @@ -0,0 +1,67 @@ +package safestate + +import ( + "github.com/cometbft/cometbft/state" + "github.com/cometbft/cometbft/types" + "sync" +) + +type SafeState struct { + state.State + mtx *sync.RWMutex +} + +func New(state state.State) SafeState { + return SafeState{ + State: state, + mtx: &sync.RWMutex{}, + } +} + +func (ss *SafeState) StateCopy() state.State { + ss.mtx.RLock() + defer ss.mtx.RUnlock() + return ss.State +} + +func (ss *SafeState) StateBytes() []byte { + ss.mtx.RLock() + defer ss.mtx.RUnlock() + return ss.State.Bytes() +} + +func (ss *SafeState) UpdateState(cmtState state.State) { + ss.mtx.Lock() + defer ss.mtx.Unlock() + ss.State = cmtState +} + +func (ss *SafeState) LastBlockHeight() int64 { + ss.mtx.RLock() + defer ss.mtx.RUnlock() + return ss.State.LastBlockHeight +} + +func (ss *SafeState) LastBlockID() types.BlockID { + ss.mtx.RLock() + defer ss.mtx.RUnlock() + return ss.State.LastBlockID +} + +func (ss *SafeState) Validators() *types.ValidatorSet { + ss.mtx.RLock() + defer ss.mtx.RUnlock() + return ss.State.Validators +} + +func (ss *SafeState) AppHash() []byte { + ss.mtx.RLock() + defer ss.mtx.RUnlock() + return ss.State.AppHash +} + +func (ss *SafeState) ChainID() string { + ss.mtx.RLock() + defer ss.mtx.RUnlock() + return ss.State.ChainID +} diff --git a/scripts/versions.sh b/scripts/versions.sh index b80a1a24..7d18e732 100644 --- a/scripts/versions.sh +++ b/scripts/versions.sh @@ -1,4 +1,4 @@ #!/usr/bin/env bash # Don't export them as they're used in the context of other calls -AVALANCHE_VERSION=${AVALANCHE_VERSION:-'v1.11.7'} +AVALANCHE_VERSION=${AVALANCHE_VERSION:-'v1.11.8'} diff --git a/vm/rpc.go b/vm/rpc.go index 8f7d07b6..4dc915c0 100644 --- a/vm/rpc.go +++ b/vm/rpc.go @@ -4,6 +4,9 @@ import ( "context" "errors" "fmt" + "github.com/cometbft/cometbft/config" + "github.com/cometbft/cometbft/libs/pubsub" + "github.com/cometbft/cometbft/rpc/jsonrpc/server" "sort" "time" @@ -24,6 +27,12 @@ import ( "github.com/consideritdone/landslidevm/jsonrpc" ) +const ( + // maxQueryLength is the maximum length of a query string that will be + // accepted. This is just a safety check to avoid outlandish queries. + maxQueryLength = 512 +) + type RPC struct { vm *LandslideVM } @@ -34,11 +43,6 @@ func NewRPC(vm *LandslideVM) *RPC { func (rpc *RPC) Routes() map[string]*jsonrpc.RPCFunc { return map[string]*jsonrpc.RPCFunc{ - // subscribe/unsubscribe are reserved for websocket events. - // "subscribe": jsonrpc.NewWSRPCFunc(rpc.Subscribe, "query"), - // "unsubscribe": jsonrpc.NewWSRPCFunc(rpc.Unsubscribe, "query"), - // "unsubscribe_all": jsonrpc.NewWSRPCFunc(rpc.UnsubscribeAll, ""), - // info AP "health": jsonrpc.NewRPCFunc(rpc.Health, ""), "status": jsonrpc.NewRPCFunc(rpc.Status, ""), @@ -52,9 +56,9 @@ func (rpc *RPC) Routes() map[string]*jsonrpc.RPCFunc { "commit": jsonrpc.NewRPCFunc(rpc.Commit, "height", jsonrpc.Cacheable("height")), // "header": jsonrpc.NewRPCFunc(rpc.Header, "height", jsonrpc.Cacheable("height")), // "header_by_hash": jsonrpc.NewRPCFunc(rpc.HeaderByHash, "hash", jsonrpc.Cacheable()), - "check_tx": jsonrpc.NewRPCFunc(rpc.CheckTx, "tx"), - "tx": jsonrpc.NewRPCFunc(rpc.Tx, "hash,prove", jsonrpc.Cacheable()), - // "consensus_state": jsonrpc.NewRPCFunc(rpc.GetConsensusState, ""), + "check_tx": jsonrpc.NewRPCFunc(rpc.CheckTx, "tx"), + "tx": jsonrpc.NewRPCFunc(rpc.Tx, "hash,prove", jsonrpc.Cacheable()), + "consensus_state": jsonrpc.NewRPCFunc(rpc.GetConsensusState, ""), "unconfirmed_txs": jsonrpc.NewRPCFunc(rpc.UnconfirmedTxs, "limit"), "num_unconfirmed_txs": jsonrpc.NewRPCFunc(rpc.NumUnconfirmedTxs, ""), "tx_search": jsonrpc.NewRPCFunc(rpc.TxSearch, "query,prove,page,per_page,order_by"), @@ -77,6 +81,51 @@ func (rpc *RPC) Routes() map[string]*jsonrpc.RPCFunc { } } +func (rpc *RPC) TMRoutes() map[string]*server.RPCFunc { + return map[string]*server.RPCFunc{ + //subscribe/unsubscribe are reserved for websocket events. + "subscribe": server.NewWSRPCFunc(rpc.Subscribe, "query"), + "unsubscribe": server.NewWSRPCFunc(rpc.Unsubscribe, "query"), + "unsubscribe_all": server.NewWSRPCFunc(rpc.UnsubscribeAll, ""), + + // info AP + "health": server.NewRPCFunc(rpc.Health, ""), + "status": server.NewRPCFunc(rpc.Status, ""), + "net_info": server.NewRPCFunc(rpc.NetInfo, ""), + "blockchain": server.NewRPCFunc(rpc.BlockchainInfo, "minHeight,maxHeight", server.Cacheable()), + "genesis": server.NewRPCFunc(rpc.Genesis, "", server.Cacheable()), + "genesis_chunked": server.NewRPCFunc(rpc.GenesisChunked, "chunk", server.Cacheable()), + "block": server.NewRPCFunc(rpc.Block, "height", server.Cacheable("height")), + "block_by_hash": server.NewRPCFunc(rpc.BlockByHash, "hash", server.Cacheable()), + "block_results": server.NewRPCFunc(rpc.BlockResults, "height", server.Cacheable("height")), + "commit": server.NewRPCFunc(rpc.Commit, "height", server.Cacheable("height")), + // "header": server.NewRPCFunc(rpc.Header, "height", server.Cacheable("height")), + // "header_by_hash": server.NewRPCFunc(rpc.HeaderByHash, "hash", server.Cacheable()), + "check_tx": server.NewRPCFunc(rpc.CheckTx, "tx"), + "tx": server.NewRPCFunc(rpc.Tx, "hash,prove", server.Cacheable()), + // "consensus_state": server.NewRPCFunc(rpc.GetConsensusState, ""), + "unconfirmed_txs": server.NewRPCFunc(rpc.UnconfirmedTxs, "limit"), + "num_unconfirmed_txs": server.NewRPCFunc(rpc.NumUnconfirmedTxs, ""), + "tx_search": server.NewRPCFunc(rpc.TxSearch, "query,prove,page,per_page,order_by"), + "block_search": server.NewRPCFunc(rpc.BlockSearch, "query,page,per_page,order_by"), + "validators": server.NewRPCFunc(rpc.Validators, "height,page,per_page", server.Cacheable("height")), + "dump_consensus_state": server.NewRPCFunc(rpc.DumpConsensusState, ""), + "consensus_params": server.NewRPCFunc(rpc.ConsensusParams, "height", server.Cacheable("height")), + + // tx broadcast API + "broadcast_tx_commit": server.NewRPCFunc(rpc.BroadcastTxCommit, "tx"), + "broadcast_tx_sync": server.NewRPCFunc(rpc.BroadcastTxSync, "tx"), + "broadcast_tx_async": server.NewRPCFunc(rpc.BroadcastTxAsync, "tx"), + + // abci API + "abci_query": server.NewRPCFunc(rpc.ABCIQuery, "path,data,height,prove"), + "abci_info": server.NewRPCFunc(rpc.ABCIInfo, "", server.Cacheable()), + + // evidence API + // "broadcast_evidence": server.NewRPCFunc(rpc.BroadcastEvidence, "evidence"), + } +} + // UnconfirmedTxs gets unconfirmed transactions (maximum ?limit entries) // including their number. func (rpc *RPC) UnconfirmedTxs(_ *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) { @@ -139,6 +188,7 @@ func (rpc *RPC) ABCIQuery( } func (rpc *RPC) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { + rpc.vm.logger.Info("BroadcastTxCommit called") subscriber := ctx.RemoteAddr() // Subscribe to tx being committed in block. @@ -208,7 +258,7 @@ func (rpc *RPC) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.R Hash: tx.Hash(), }, err // TODO: use rpc.config.TimeoutBroadcastTxCommit for timeout - case <-time.After(10 * time.Second): + case <-time.After(30 * time.Second): err = errors.New("timed out waiting for tx to be included in a block") rpc.vm.logger.Error("Error on broadcastTxCommit", "err", err) return &ctypes.ResultBroadcastTxCommit{ @@ -221,22 +271,28 @@ func (rpc *RPC) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.R } func (rpc *RPC) BroadcastTxAsync(_ *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + rpc.vm.logger.Info("BroadcastTxAsync called") err := rpc.vm.mempool.CheckTx(tx, nil, mempl.TxInfo{}) if err != nil { + rpc.vm.logger.Error("Error on broadcastTxAsync", "err", err) return nil, err } return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil } func (rpc *RPC) BroadcastTxSync(_ *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + rpc.vm.logger.Info("BroadcastTxSync called") resCh := make(chan *abci.ResponseCheckTx, 1) err := rpc.vm.mempool.CheckTx(tx, func(res *abci.ResponseCheckTx) { resCh <- res }, mempl.TxInfo{}) if err != nil { + rpc.vm.logger.Error("Error on BroadcastTxSync", "err", err) return nil, err } res := <-resCh + + rpc.vm.logger.Info("BroadcastTxSync response", "Code", res.Code, "Log", res.Log, "Codespace", res.Codespace, "Hash", tx.Hash()) return &ctypes.ResultBroadcastTx{ Code: res.GetCode(), Data: res.GetData(), @@ -397,8 +453,11 @@ func (rpc *RPC) Block(_ *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBloc blockMeta := rpc.vm.blockStore.LoadBlockMeta(height) if blockMeta == nil { + rpc.vm.logger.Info("Block not found", "height", height) return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, nil } + + rpc.vm.logger.Info("Block response", "height", height, "block", block, "blockMeta", blockMeta) return &ctypes.ResultBlock{BlockID: blockMeta.BlockID, Block: block}, nil } @@ -539,12 +598,15 @@ func (rpc *RPC) Validators( } func (rpc *RPC) Tx(_ *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { + rpc.vm.logger.Info("Tx called", "hash", hash) r, err := rpc.vm.txIndexer.Get(hash) if err != nil { + rpc.vm.logger.Error("Error on Tx", "err", err) return nil, err } if r == nil { + rpc.vm.logger.Error("Error on Tx", "tx not found", hash) return nil, fmt.Errorf("tx (%X) not found", hash) } @@ -736,7 +798,7 @@ func (rpc *RPC) Status(_ *rpctypes.Context) (*ctypes.ResultStatus, error) { ), DefaultNodeID: p2p.ID(rpc.vm.appOpts.NodeId), ListenAddr: "", - Network: fmt.Sprintf("%d", rpc.vm.appOpts.NetworkId), + Network: rpc.vm.networkName, Version: version.TMCoreSemVer, Channels: nil, Moniker: "", @@ -763,3 +825,115 @@ func (rpc *RPC) Status(_ *rpctypes.Context) (*ctypes.ResultStatus, error) { return result, nil } + +// Subscribe for events via WebSocket. +// More: https://docs.cometbft.com/v0.38.x/rpc/#/Websocket/subscribe +func (rpc *RPC) Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) { + addr := ctx.RemoteAddr() + cfg := config.DefaultRPCConfig() + + if rpc.vm.eventBus.NumClients() >= cfg.MaxSubscriptionClients { + return nil, fmt.Errorf("max_subscription_clients %d reached", cfg.MaxSubscriptionClients) + } else if rpc.vm.eventBus.NumClientSubscriptions(addr) >= cfg.MaxSubscriptionsPerClient { + return nil, fmt.Errorf("max_subscriptions_per_client %d reached", cfg.MaxSubscriptionsPerClient) + } else if len(query) > maxQueryLength { + return nil, errors.New("maximum query length exceeded") + } + + rpc.vm.logger.Info("Subscribe to query", "remote", addr, "query", query) + + q, err := tmquery.New(query) + if err != nil { + return nil, fmt.Errorf("failed to parse query: %w", err) + } + + subCtx, cancel := context.WithTimeout(ctx.Context(), core.SubscribeTimeout) + defer cancel() + + sub, err := rpc.vm.eventBus.Subscribe(subCtx, addr, q, cfg.SubscriptionBufferSize) + if err != nil { + return nil, err + } + + closeIfSlow := cfg.CloseOnSlowClient + + // Capture the current ID, since it can change in the future. + subscriptionID := ctx.JSONReq.ID + go func() { + writeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + for { + select { + case msg := <-sub.Out(): + var ( + resultEvent = &ctypes.ResultEvent{Query: query, Data: msg.Data(), Events: msg.Events()} + resp = rpctypes.NewRPCSuccessResponse(subscriptionID, resultEvent) + ) + if err := ctx.WSConn.WriteRPCResponse(writeCtx, resp); err != nil { + rpc.vm.logger.Info("Can't write response (slow client)", + "to", addr, "subscriptionID", subscriptionID, "err", err) + + if closeIfSlow { + var ( + err = errors.New("subscription was canceled (reason: slow client)") + resp = rpctypes.RPCServerError(subscriptionID, err) + ) + if !ctx.WSConn.TryWriteRPCResponse(resp) { + rpc.vm.logger.Info("Can't write response (slow client)", + "to", addr, "subscriptionID", subscriptionID, "err", err) + } + return + } + } + case <-sub.Canceled(): + if sub.Err() != pubsub.ErrUnsubscribed { + var reason string + if sub.Err() == nil { + reason = "CometBFT exited" + } else { + reason = sub.Err().Error() + } + var ( + err = fmt.Errorf("subscription was canceled (reason: %s)", reason) + resp = rpctypes.RPCServerError(subscriptionID, err) + ) + if !ctx.WSConn.TryWriteRPCResponse(resp) { + rpc.vm.logger.Info("Can't write response (slow client)", + "to", addr, "subscriptionID", subscriptionID, "err", err) + } + } + return + } + } + }() + + return &ctypes.ResultSubscribe{}, nil +} + +// Unsubscribe from events via WebSocket. +// More: https://docs.cometbft.com/v0.38.x/rpc/#/Websocket/unsubscribe +func (rpc *RPC) Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) { + addr := ctx.RemoteAddr() + rpc.vm.logger.Info("Unsubscribe from query", "remote", addr, "query", query) + q, err := tmquery.New(query) + if err != nil { + return nil, fmt.Errorf("failed to parse query: %w", err) + } + err = rpc.vm.eventBus.Unsubscribe(context.Background(), addr, q) + if err != nil { + return nil, err + } + return &ctypes.ResultUnsubscribe{}, nil +} + +// UnsubscribeAll from all events via WebSocket. +// More: https://docs.cometbft.com/v0.38.x/rpc/#/Websocket/unsubscribe_all +func (rpc *RPC) UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) { + addr := ctx.RemoteAddr() + rpc.vm.logger.Info("Unsubscribe from all", "remote", addr) + err := rpc.vm.eventBus.UnsubscribeAll(context.Background(), addr) + if err != nil { + return nil, err + } + return &ctypes.ResultUnsubscribe{}, nil +} diff --git a/vm/rpc_test.go b/vm/rpc_test.go index 77456c07..71458bac 100644 --- a/vm/rpc_test.go +++ b/vm/rpc_test.go @@ -2,58 +2,986 @@ package vm import ( "context" + "encoding/base64" + "encoding/json" + "fmt" + "github.com/cometbft/cometbft/abci/example/kvstore" + "github.com/cometbft/cometbft/config" + "github.com/cometbft/cometbft/libs/bytes" + "github.com/cometbft/cometbft/libs/pubsub" + "github.com/cometbft/cometbft/libs/rand" + tmsync "github.com/cometbft/cometbft/libs/sync" + "github.com/cometbft/cometbft/p2p" + "github.com/cometbft/cometbft/rpc/jsonrpc/client" + rpcserver "github.com/cometbft/cometbft/rpc/jsonrpc/server" + "github.com/cometbft/cometbft/types" + "github.com/cometbft/cometbft/version" + vmpb "github.com/consideritdone/landslidevm/proto/vm" "net/http" + "sort" + "strings" "testing" "time" - ctypes "github.com/cometbft/cometbft/rpc/core/types" - "github.com/cometbft/cometbft/rpc/jsonrpc/client" + abcitypes "github.com/cometbft/cometbft/abci/types" + bftjson "github.com/cometbft/cometbft/libs/json" + rpcclient "github.com/cometbft/cometbft/rpc/client" + rpcclienthttp "github.com/cometbft/cometbft/rpc/client/http" + coretypes "github.com/cometbft/cometbft/rpc/core/types" "github.com/stretchr/testify/require" "github.com/consideritdone/landslidevm/jsonrpc" ) -func setupRPC(t *testing.T) (*http.Server, *LandslideVM, *client.Client) { +type HandlerRPC func(vmLnd *LandslideVM) http.Handler + +type BlockBuilder func(*testing.T, context.Context, *LandslideVM) + +type setupServerAndTransport func(t *testing.T, blockBuilder BlockBuilder) (*http.Server, *LandslideVM, rpcclient.Client, context.CancelFunc) + +type txRuntimeEnv struct { + key, value, hash []byte + initHeight int64 +} + +func buildAccept(t *testing.T, ctx context.Context, vm *LandslideVM) { + end := false + for !end { + select { + case <-ctx.Done(): + end = true + default: + if vm.mempool.Size() > 0 { + block, err := vm.BuildBlock(ctx, &vmpb.BuildBlockRequest{}) + require.NoError(t, err) + _, err = vm.BlockAccept(ctx, &vmpb.BlockAcceptRequest{ + Id: block.Id, + }) + require.NoError(t, err) + } else { + time.Sleep(500 * time.Millisecond) + } + } + } +} + +func noAction(t *testing.T, ctx context.Context, vm *LandslideVM) { + +} + +func setupServer(t *testing.T, handler HandlerRPC, blockBuilder BlockBuilder) (*http.Server, *LandslideVM, string, context.CancelFunc) { vm := newFreshKvApp(t) vmLnd := vm.(*LandslideVM) - mux := http.NewServeMux() - jsonrpc.RegisterRPCFuncs(mux, NewRPC(vmLnd).Routes(), vmLnd.logger) + + mux := handler(vmLnd) address := "127.0.0.1:44444" server := &http.Server{Addr: address, Handler: mux} + ctx, cancel := context.WithCancel(context.Background()) + go blockBuilder(t, ctx, vmLnd) go func() { - server.ListenAndServe() - //panic(err) - //require.NoError(t, err) + err := server.ListenAndServe() + t.Log(err) }() // wait for servers to start time.Sleep(time.Second * 2) - client, err := client.New("tcp://" + address) + return server, vmLnd, address, cancel +} + +func setupRPCServer(t *testing.T, blockBuilder BlockBuilder) (*http.Server, *LandslideVM, rpcclient.Client, context.CancelFunc) { + server, vmLnd, address, cancel := setupServer(t, setupRPC, blockBuilder) + client, err := rpcclienthttp.New("tcp://"+address, "/websocket") require.NoError(t, err) + return server, vmLnd, client, cancel +} - return server, vmLnd, client +func setupWSRPCServer(t *testing.T, blockBuilder BlockBuilder) (*http.Server, *LandslideVM, rpcclient.Client, context.CancelFunc) { + server, vmLnd, address, cancel := setupServer(t, setupWSRPC, blockBuilder) + client, err := client.NewWS("tcp://"+address, "/websocket") + require.NoError(t, err) + wsc := &WSClient{ + WSClient: client, + mtx: tmsync.RWMutex{}, + subscriptions: make(map[string]chan coretypes.ResultEvent), + } + err = wsc.Start() + require.Nil(t, err) + return server, vmLnd, wsc, cancel } -func TestHealth(t *testing.T) { - server, _, client := setupRPC(t) - defer server.Close() +func setupRPC(vmLnd *LandslideVM) http.Handler { + mux := http.NewServeMux() + jsonrpc.RegisterRPCFuncs(mux, NewRPC(vmLnd).Routes(), vmLnd.logger) + return mux +} + +func setupWSRPC(vmLnd *LandslideVM) http.Handler { + mux := http.NewServeMux() + jsonrpc.RegisterRPCFuncs(mux, NewRPC(vmLnd).Routes(), vmLnd.logger) + tmRPC := NewRPC(vmLnd) + wm := rpcserver.NewWebsocketManager(tmRPC.TMRoutes(), + rpcserver.OnDisconnect(func(remoteAddr string) { + err := vmLnd.eventBus.UnsubscribeAll(context.Background(), remoteAddr) + if err != nil && err != pubsub.ErrSubscriptionNotFound { + vmLnd.logger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) + } + }), + rpcserver.ReadLimit(config.DefaultRPCConfig().MaxBodyBytes), + rpcserver.WriteChanCapacity(config.DefaultRPCConfig().WebSocketWriteBufferSize), + ) + wm.SetLogger(vmLnd.logger) + mux.HandleFunc("/websocket", wm.WebsocketHandler) + return mux +} + +// MakeTxKV returns a text transaction, allong with expected key, value pair +func MakeTxKV() ([]byte, []byte, []byte) { + k := []byte(rand.Str(2)) + v := []byte(rand.Str(2)) + return k, v, append(k, append([]byte("="), v...)...) +} + +func testABCIInfo(t *testing.T, client rpcclient.Client, expected *coretypes.ResultABCIInfo) { + result, err := client.ABCIInfo(context.Background()) + require.NoError(t, err) + require.Equal(t, expected.Response.Version, result.Response.Version) + require.Equal(t, expected.Response.AppVersion, result.Response.AppVersion) + require.Equal(t, expected.Response.LastBlockHeight, result.Response.LastBlockHeight) + require.NotNil(t, result.Response.LastBlockAppHash) +} - result := new(ctypes.ResultHealth) - _, err := client.Call(context.Background(), "health", map[string]interface{}{}, result) +func testABCIQuery(t *testing.T, client rpcclient.Client, path string, data bytes.HexBytes, expected interface{}) { + result, err := client.ABCIQuery(context.Background(), path, data) require.NoError(t, err) + require.True(t, result.Response.IsOK()) + require.EqualValues(t, expected, result.Response.Value) +} - t.Logf("Health result %+v", result) +func testBroadcastTxCommit(t *testing.T, client rpcclient.Client, vm *LandslideVM, tx types.Tx) *coretypes.ResultBroadcastTxCommit { + initMempoolSize := vm.mempool.Size() + result, err := client.BroadcastTxCommit(context.Background(), tx) + waitForStateUpdate(result.Height, vm) + require.NoError(t, err) + require.True(t, result.CheckTx.IsOK()) + require.True(t, result.TxResult.IsOK()) + require.Equal(t, initMempoolSize, vm.mempool.Size()) + return result } -func TestStatus(t *testing.T) { - server, _, client := setupRPC(t) - defer server.Close() +func testBroadcastTxSync(t *testing.T, client rpcclient.Client, tx types.Tx) *coretypes.ResultBroadcastTx { + result, err := client.BroadcastTxSync(context.Background(), tx) + require.NoError(t, err) + require.Equal(t, result.Code, abcitypes.CodeTypeOK) + return result +} + +func testBroadcastTxAsync(t *testing.T, client rpcclient.Client, tx types.Tx) *coretypes.ResultBroadcastTx { + result, err := client.BroadcastTxAsync(context.Background(), tx) + require.NoError(t, err) + require.NotNil(t, result.Hash) + require.Equal(t, result.Code, abcitypes.CodeTypeOK) + return result +} + +func testStatus(t *testing.T, client rpcclient.Client, expected *coretypes.ResultStatus) { + result, err := client.Status(context.Background()) + require.NoError(t, err) + //TODO: test node info moniker + //require.Equal(t, expected.NodeInfo.Moniker, result.NodeInfo.Moniker) + require.Equal(t, expected.SyncInfo.LatestBlockHeight, result.SyncInfo.LatestBlockHeight) +} + +func testNetInfo(t *testing.T, client rpcclient.Client, expected *coretypes.ResultNetInfo) { + _, err := client.NetInfo(context.Background()) + require.NoError(t, err) + //TODO: check equality + //require.Equal(t, expected.Listening, result.Listening) + //require.Equal(t, expected.Peers, result.Peers) + //require.Equal(t, expected.Listeners, result.Listeners) + //require.Equal(t, expected.NPeers, result.NPeers) + //TODO: OR compare to desired values + //require.NoError(t, err, "%d: %+v", i, err) + //assert.True(t, netinfo.Listening) + //assert.Empty(t, netinfo.Peers) + +} + +func testConsensusState(t *testing.T, client rpcclient.Client, expected *coretypes.ResultConsensusState) { + _, err := client.ConsensusState(context.Background()) + require.NoError(t, err) + //TODO: check equality + //require.Equal(t, expected.RoundState, result.RoundState) + //TODO: OR compare to desired values + //assert.NotEmpty(t, cons.RoundState) +} + +func testDumpConsensusState(t *testing.T, client rpcclient.Client, expected *coretypes.ResultDumpConsensusState) { + _, err := client.DumpConsensusState(context.Background()) + require.NoError(t, err) + //TODO: check equality + //require.Equal(t, expected.RoundState, result.RoundState) + //require.EqualValues(t, expected.Peers, result.Peers) + //TODO: OR compare to desired values + //assert.NotEmpty(t, cons.RoundState) + //require.ElementsMatch(t, expected.Peers, result.Peers) +} + +func testConsensusParams(t *testing.T, client rpcclient.Client, height *int64, expected *coretypes.ResultConsensusParams) { + result, err := client.ConsensusParams(context.Background(), height) + require.NoError(t, err) + //TODO: check equality + require.Equal(t, expected.BlockHeight, result.BlockHeight) + //require.Equal(t, expected.ConsensusParams.Version.App, result.ConsensusParams.Version.App) + //require.Equal(t, expected.ConsensusParams.Hash(), result.ConsensusParams.Hash()) +} + +func testHealth(t *testing.T, client rpcclient.Client) { + _, err := client.Health(context.Background()) + require.NoError(t, err) +} + +func testBlockchainInfo(t *testing.T, client rpcclient.Client, minHeight int64, maxHeight int64, expected *coretypes.ResultBlockchainInfo) { + result, err := client.BlockchainInfo(context.Background(), minHeight, maxHeight) + require.NoError(t, err) + require.Equal(t, expected.LastHeight, result.LastHeight) + //TODO: implement same sorting method + //lastMeta := result.BlockMetas[len(result.BlockMetas)-1] + //expectedLastMeta := expected.BlockMetas[len(expected.BlockMetas)-1] + //require.Equal(t, expectedLastMeta.NumTxs, lastMeta.NumTxs) + //require.Equal(t, expectedLastMeta.Header.AppHash, lastMeta.Header.AppHash) + //require.Equal(t, expectedLastMeta.BlockID, lastMeta.BlockID) +} + +func testBlock(t *testing.T, client rpcclient.Client, height *int64, expected *coretypes.ResultBlock) *coretypes.ResultBlock { + result, err := client.Block(context.Background(), height) + require.NoError(t, err) + require.Equal(t, expected.Block.ChainID, result.Block.ChainID) + require.Equal(t, expected.Block.Height, result.Block.Height) + require.Equal(t, expected.Block.AppHash, result.Block.AppHash) + return result +} + +func testBlockByHash(t *testing.T, client rpcclient.Client, hash []byte, expected *coretypes.ResultBlock) *coretypes.ResultBlock { + result, err := client.BlockByHash(context.Background(), hash) + require.NoError(t, err) + require.Equal(t, expected.Block.ChainID, result.Block.ChainID) + require.Equal(t, expected.Block.Height, result.Block.Height) + require.Equal(t, expected.Block.AppHash, result.Block.AppHash) + return result +} + +func testBlockResults(t *testing.T, client rpcclient.Client, height *int64, expected *coretypes.ResultBlockResults) { + result, err := client.BlockResults(context.Background(), height) + require.NoError(t, err) + require.NotNil(t, result) + //require.Equal(t, expected.Height, result.Height) + //require.Equal(t, expected.AppHash, result.AppHash) + //require.Equal(t, expected.TxsResults, result.TxsResults) +} + +func testBlockSearch(t *testing.T, client rpcclient.Client, query string, page *int, perPage *int, orderBy string, expected *coretypes.ResultBlockSearch) { + result, err := client.BlockSearch(context.Background(), query, page, perPage, orderBy) + require.NoError(t, err) + require.Equal(t, expected.TotalCount, result.TotalCount) + sort.Slice(expected.Blocks, func(i, j int) bool { + return expected.Blocks[i].Block.Height < expected.Blocks[j].Block.Height + }) + sort.Slice(result.Blocks, func(i, j int) bool { + return result.Blocks[i].Block.Height < result.Blocks[j].Block.Height + }) + require.Equal(t, expected.Blocks, result.Blocks) +} + +func testTx(t *testing.T, client rpcclient.Client, hash []byte, prove bool, expected *coretypes.ResultTx) { + result, err := client.Tx(context.Background(), hash, prove) + require.NoError(t, err) + require.EqualValues(t, expected.Hash, result.Hash) + require.EqualValues(t, expected.Tx, result.Tx) + require.EqualValues(t, expected.Height, result.Height) + require.EqualValues(t, expected.TxResult, result.TxResult) +} + +func testTxSearch(t *testing.T, client rpcclient.Client, query string, prove bool, page *int, perPage *int, orderBy string, expected *coretypes.ResultTxSearch) { + result, err := client.TxSearch(context.Background(), query, prove, page, perPage, orderBy) + require.NoError(t, err) + require.EqualValues(t, expected.TotalCount, result.TotalCount) + require.EqualValues(t, expected.Txs, result.Txs) +} + +func testCommit(t *testing.T, client rpcclient.Client, height *int64, expected *coretypes.ResultCommit) { + result, err := client.Commit(context.Background(), height) + require.NoError(t, err) + //TODO: implement tests for all fields of result + //require.Equal(t, expected.Version, result.Version) + require.Equal(t, expected.ChainID, result.ChainID) + require.Equal(t, expected.Height, result.Height) + //require.Equal(t, expected.Time, result.Time) + //require.Equal(t, expected.LastBlockID, result.LastBlockID) + require.Equal(t, expected.LastCommitHash, result.LastCommitHash) + require.Equal(t, expected.DataHash, result.DataHash) + require.Equal(t, expected.ValidatorsHash, result.ValidatorsHash) + require.Equal(t, expected.NextValidatorsHash, result.NextValidatorsHash) + require.Equal(t, expected.ConsensusHash, result.ConsensusHash) + require.Equal(t, expected.AppHash, result.AppHash) + require.Equal(t, expected.LastResultsHash, result.LastResultsHash) + require.Equal(t, expected.EvidenceHash, result.EvidenceHash) + require.Equal(t, expected.ProposerAddress, result.ProposerAddress) + //TODO: fix empty height for non-genesis blocks, or even simulate signatures + //require.Equal(t, expected.Commit.Height, result.Commit.Height) + //require.Equal(t, expected.Commit.Round, result.Commit.Round) + //require.Equal(t, expected.Commit.BlockID, result.Commit.BlockID) + //require.EqualValues(t, expected.Commit.Signatures, result.Commit.Signatures) +} + +func testUnconfirmedTxs(t *testing.T, client rpcclient.Client, limit *int, expected *coretypes.ResultUnconfirmedTxs) { + result, err := client.UnconfirmedTxs(context.Background(), limit) + require.NoError(t, err) + require.Equal(t, expected.Total, result.Total) + require.Equal(t, expected.Count, result.Count) + require.EqualValues(t, expected.Txs, result.Txs) +} - result := new(ctypes.ResultStatus) - _, err := client.Call(context.Background(), "status", map[string]interface{}{}, result) +func testNumUnconfirmedTxs(t *testing.T, client rpcclient.Client, expected *coretypes.ResultUnconfirmedTxs) { + result, err := client.NumUnconfirmedTxs(context.Background()) require.NoError(t, err) + require.Equal(t, expected.Total, result.Total) + require.Equal(t, expected.Count, result.Count) +} + +func testCheckTx(t *testing.T, client rpcclient.Client, tx types.Tx, expected *coretypes.ResultCheckTx) { + result, err := client.CheckTx(context.Background(), tx) + require.NoError(t, err) + require.Equal(t, result.Code, expected.Code) +} + +func waitForStateUpdate(expectedHeight int64, vm *LandslideVM) { + for { + if vm.state.LastBlockHeight() == expectedHeight { + return + } + time.Sleep(100 * time.Millisecond) + } +} - t.Logf("Status result %+v", result) +func checkTxResult(t *testing.T, client rpcclient.Client, vm *LandslideVM, env *txRuntimeEnv) { + ctx, cancelCtx := context.WithTimeout(context.Background(), 10*time.Second) + for { + select { + case <-ctx.Done(): + cancelCtx() + t.Fatal("Broadcast tx timeout exceeded") + default: + if vm.state.LastBlockHeight() == env.initHeight+1 { + cancelCtx() + testABCIQuery(t, client, "/key", env.key, env.value) + //testABCIQuery(t, client, map[string]interface{}{"path": "/hash", "data": fmt.Sprintf("%x", env.hash)}, env.value) + return + } + time.Sleep(500 * time.Millisecond) + } + } } + +func checkCommittedTxResult(t *testing.T, client rpcclient.Client, env *txRuntimeEnv) { + testABCIQuery(t, client, "/key", env.key, env.value) + //testABCIQuery(t, client, map[string]interface{}{"path": "/hash", "data": fmt.Sprintf("%x", env.hash)}, env.value) +} + +func TestBlockProduction(t *testing.T) { + t.Run("JSONRPC", func(t *testing.T) { + testBlockProduction(t, setupRPCServer) + }) + t.Run("WebSocket", func(t *testing.T) { + testBlockProduction(t, setupWSRPCServer) + }) +} + +func testBlockProduction(t *testing.T, serverBuilder setupServerAndTransport) { + server, vm, client, cancel := serverBuilder(t, buildAccept) + defer server.Close() + defer vm.mempool.Flush() + defer client.Stop() + defer cancel() + + initialHeight := vm.state.LastBlockHeight() + + for i := 1; i < 10; i++ { + testStatus(t, client, &coretypes.ResultStatus{ + NodeInfo: p2p.DefaultNodeInfo{}, + SyncInfo: coretypes.SyncInfo{ + LatestBlockHeight: initialHeight + int64(i) - 1, + }, + ValidatorInfo: coretypes.ValidatorInfo{}, + }) + + // write something + _, _, tx := MakeTxKV() + previousAppHash := vm.state.AppHash() + bres := testBroadcastTxCommit(t, client, vm, tx) + + testBlock(t, client, &bres.Height, &coretypes.ResultBlock{ + Block: &types.Block{ + Header: types.Header{ + ChainID: vm.state.ChainID(), + Height: bres.Height, + AppHash: previousAppHash, + }, + }, + }) + } +} + +func TestABCIService(t *testing.T) { + t.Run("JSONRPC", func(t *testing.T) { + testABCIService(t, setupRPCServer) + }) + t.Run("WebSocket", func(t *testing.T) { + testABCIService(t, setupWSRPCServer) + }) +} + +func testABCIService(t *testing.T, serverBuilder setupServerAndTransport) { + server, vm, client, cancel := serverBuilder(t, buildAccept) + defer server.Close() + defer vm.mempool.Flush() + defer cancel() + + t.Run("ABCIInfo", func(t *testing.T) { + for i := 0; i < 3; i++ { + initialHeight := vm.state.LastBlockHeight() + testABCIInfo(t, client, &coretypes.ResultABCIInfo{ + Response: abcitypes.ResponseInfo{ + Version: version.ABCIVersion, + AppVersion: kvstore.AppVersion, + LastBlockHeight: initialHeight, + }, + }) + _, _, tx := MakeTxKV() + testBroadcastTxCommit(t, client, vm, tx) + testABCIInfo(t, client, &coretypes.ResultABCIInfo{ + Response: abcitypes.ResponseInfo{ + Version: version.ABCIVersion, + AppVersion: kvstore.AppVersion, + LastBlockHeight: initialHeight + 1, + }, + }) + } + }) + + t.Run("ABCIQuery", func(t *testing.T) { + for i := 0; i < 3; i++ { + k, v, tx := MakeTxKV() + testBroadcastTxCommit(t, client, vm, tx) + path := "/key" + testABCIQuery(t, client, path, k, v) + } + }) + + t.Run("BroadcastTxCommit", func(t *testing.T) { + for i := 0; i < 3; i++ { + k, v, tx := MakeTxKV() + result := testBroadcastTxCommit(t, client, vm, tx) + checkCommittedTxResult(t, client, &txRuntimeEnv{key: k, value: v, hash: result.Hash}) + } + }) + + t.Run("BroadcastTxAsync", func(t *testing.T) { + for i := 0; i < 3; i++ { + k, v, tx := MakeTxKV() + initHeight := vm.state.LastBlockHeight() + result := testBroadcastTxAsync(t, client, tx) + checkTxResult(t, client, vm, &txRuntimeEnv{key: k, value: v, hash: result.Hash, initHeight: initHeight}) + } + }) + + t.Run("BroadcastTxSync", func(t *testing.T) { + for i := 0; i < 3; i++ { + k, v, tx := MakeTxKV() + initHeight := vm.state.LastBlockHeight() + result := testBroadcastTxSync(t, client, tx) + checkTxResult(t, client, vm, &txRuntimeEnv{key: k, value: v, hash: result.Hash, initHeight: initHeight}) + } + cancel() + _, _, tx := MakeTxKV() + initMempoolSize := vm.mempool.Size() + testBroadcastTxSync(t, client, tx) + //result := testBroadcastTxSync(t, client, vm, map[string]interface{}{"tx": tx}) + require.Equal(t, initMempoolSize+1, vm.mempool.Size()) + //TODO: kvstore return empty check tx result, use another app or implement missing methods + //require.EqualValues(t, string(tx), result.Data.String()) + require.EqualValues(t, types.Tx(tx), vm.mempool.ReapMaxTxs(-1)[0]) + }) +} + +func TestStatusService(t *testing.T) { + t.Run("JSONRPC", func(t *testing.T) { + testStatusService(t, setupRPCServer) + }) + t.Run("WebSocket", func(t *testing.T) { + testStatusService(t, setupWSRPCServer) + }) +} + +func testStatusService(t *testing.T, serverBuilder setupServerAndTransport) { + server, vm, client, cancel := serverBuilder(t, buildAccept) + defer server.Close() + defer vm.mempool.Flush() + defer cancel() + + t.Run("Status", func(t *testing.T) { + initialHeight := vm.state.LastBlockHeight() + for i := 0; i < 3; i++ { + _, _, tx := MakeTxKV() + result := testBroadcastTxCommit(t, client, vm, tx) + require.EqualValues(t, result.Height, initialHeight+int64(1)+int64(i)) + testStatus(t, client, &coretypes.ResultStatus{ + NodeInfo: p2p.DefaultNodeInfo{}, + SyncInfo: coretypes.SyncInfo{ + LatestBlockHeight: initialHeight + int64(i) + 1, + }, + ValidatorInfo: coretypes.ValidatorInfo{}, + }) + } + }) +} + +func TestNetworkService(t *testing.T) { + t.Run("JSONRPC", func(t *testing.T) { + testNetworkService(t, setupRPCServer) + }) + t.Run("WebSocket", func(t *testing.T) { + testNetworkService(t, setupWSRPCServer) + }) +} + +func testNetworkService(t *testing.T, serverBuilder setupServerAndTransport) { + server, vm, client, cancel := serverBuilder(t, buildAccept) + defer server.Close() + defer cancel() + + t.Run("NetInfo", func(t *testing.T) { + testNetInfo(t, client, &coretypes.ResultNetInfo{ + Listening: true, + Listeners: nil, + NPeers: 0, + Peers: nil, + }) + }) + + t.Run("DumpConsensusState", func(t *testing.T) { + testDumpConsensusState(t, client, &coretypes.ResultDumpConsensusState{ + RoundState: json.RawMessage{}, + Peers: []coretypes.PeerStateInfo{}, + }) + }) + + //TODO: implement consensus_state rpc method, than uncomment this code block + t.Run("ConsensusState", func(t *testing.T) { + testConsensusState(t, client, &coretypes.ResultConsensusState{ + RoundState: json.RawMessage{}, + }) + }) + + t.Run("ConsensusParams", func(t *testing.T) { + initialHeight := vm.state.LastBlockHeight() + for i := 0; i < 3; i++ { + _, _, tx := MakeTxKV() + result := testBroadcastTxCommit(t, client, vm, tx) + require.EqualValues(t, result.Height, initialHeight+int64(1)+int64(i)) + lastBlockHeight := vm.state.LastBlockHeight() + testConsensusParams(t, client, &lastBlockHeight, &coretypes.ResultConsensusParams{ + BlockHeight: result.Height, + //TODO: compare consensus params + //ConsensusParams: types.ConsensusParams{}, + }) + } + }) + + t.Run("Health", func(t *testing.T) { + testHealth(t, client) + }) +} + +func TestHistoryService(t *testing.T) { + t.Run("JSONRPC", func(t *testing.T) { + testHistoryService(t, setupRPCServer) + }) + t.Run("WebSocket", func(t *testing.T) { + testHistoryService(t, setupWSRPCServer) + }) +} + +func testHistoryService(t *testing.T, serverBuilder setupServerAndTransport) { + server, vm, client, cancel := serverBuilder(t, buildAccept) + defer server.Close() + defer cancel() + + t.Run("Genesis", func(t *testing.T) { + result, err := client.Genesis(context.Background()) + require.NoError(t, err) + require.Equal(t, vm.genesis, result.Genesis) + }) + + t.Run("GenesisChunked", func(t *testing.T) { + first, err := client.GenesisChunked(context.Background(), 0) + require.NoError(t, err) + + decoded := make([]string, 0, first.TotalChunks) + for i := 0; i < first.TotalChunks; i++ { + chunk, err := client.GenesisChunked(context.Background(), uint(i)) + require.NoError(t, err) + data, err := base64.StdEncoding.DecodeString(chunk.Data) + require.NoError(t, err) + decoded = append(decoded, string(data)) + } + doc := []byte(strings.Join(decoded, "")) + + var out types.GenesisDoc + require.NoError(t, bftjson.Unmarshal(doc, &out), "first: %+v, doc: %s", first, string(doc)) + }) + + t.Run("BlockchainInfo", func(t *testing.T) { + blkMetas := make([]*types.BlockMeta, 0) + for i := int64(1); i <= vm.state.LastBlockHeight(); i++ { + blk := testBlock(t, client, &i, &coretypes.ResultBlock{ + Block: &types.Block{ + Header: types.Header{ + ChainID: vm.state.ChainID(), + Height: i, + AppHash: vm.state.AppHash(), + }, + }, + }) + bps, err := blk.Block.MakePartSet(types.BlockPartSizeBytes) + require.NoError(t, err) + blkMetas = append(blkMetas, &types.BlockMeta{ + BlockID: types.BlockID{Hash: blk.Block.Hash(), PartSetHeader: bps.Header()}, + BlockSize: blk.Block.Size(), + Header: blk.Block.Header, + NumTxs: len(blk.Block.Data.Txs), + }) + } + initialHeight := vm.state.LastBlockHeight() + testBlockchainInfo(t, client, 0, 0, &coretypes.ResultBlockchainInfo{ + LastHeight: initialHeight, + BlockMetas: blkMetas, + }) + _, _, tx := MakeTxKV() + prevStateAppHash := vm.state.AppHash() + bres := testBroadcastTxCommit(t, client, vm, tx) + blk := testBlock(t, client, &bres.Height, &coretypes.ResultBlock{ + Block: &types.Block{ + Header: types.Header{ + ChainID: vm.state.ChainID(), + Height: bres.Height, + AppHash: prevStateAppHash, + }, + }, + }) + bps, err := blk.Block.MakePartSet(types.BlockPartSizeBytes) + require.NoError(t, err) + blkMetas = append(blkMetas, &types.BlockMeta{ + BlockID: types.BlockID{Hash: blk.Block.Hash(), PartSetHeader: bps.Header()}, + BlockSize: blk.Block.Size(), + Header: blk.Block.Header, + NumTxs: len(blk.Block.Data.Txs), + }) + //TODO: fix test blockchain info, unexpected height, uncomment this block of code + testBlockchainInfo(t, client, 0, 0, &coretypes.ResultBlockchainInfo{ + LastHeight: initialHeight + 1, + BlockMetas: blkMetas, + }) + }) +} + +func TestSignService(t *testing.T) { + t.Run("JSONRPC", func(t *testing.T) { + testSignService(t, setupRPCServer) + }) + t.Run("WebSocket", func(t *testing.T) { + testSignService(t, setupWSRPCServer) + }) +} + +func testSignService(t *testing.T, serverBuilder setupServerAndTransport) { + server, vm, client, cancel := serverBuilder(t, buildAccept) + defer server.Close() + defer cancel() + + t.Run("Block", func(t *testing.T) { + initialHeight := vm.state.LastBlockHeight() + for i := 0; i < 3; i++ { + _, _, tx := MakeTxKV() + prevAppHash := vm.state.AppHash() + result := testBroadcastTxCommit(t, client, vm, tx) + require.EqualValues(t, result.Height, initialHeight+int64(1)+int64(i)) + lastBlockHeight := vm.state.LastBlockHeight() + testBlock(t, client, &lastBlockHeight, &coretypes.ResultBlock{ + Block: &types.Block{ + Header: types.Header{ + ChainID: vm.state.ChainID(), + Height: result.Height, + AppHash: prevAppHash, + }, + }, + }) + } + }) + + t.Run("BlockByHash", func(t *testing.T) { + prevAppHash := vm.state.AppHash() + _, _, tx := MakeTxKV() + result := testBroadcastTxCommit(t, client, vm, tx) + blk := testBlock(t, client, &result.Height, &coretypes.ResultBlock{ + Block: &types.Block{ + Header: types.Header{ + ChainID: vm.state.ChainID(), + Height: result.Height, + AppHash: prevAppHash, + }, + }, + }) + + hash := blk.Block.Hash() + //TODO: fix block search by hash: calcBlockHash give hash of different length in comparison of store and get block + reply := testBlockByHash(t, client, hash.Bytes(), &coretypes.ResultBlock{ + Block: &types.Block{ + Header: types.Header{ + ChainID: vm.state.ChainID(), + Height: result.Height, + AppHash: prevAppHash, + }, + }, + }) + require.EqualValues(t, hash[:], reply.Block.Hash().Bytes()) + }) + + //TODO: implement block_results rpc method, than uncomment this block of code + t.Run("BlockResults", func(t *testing.T) { + prevAppHash := vm.state.AppHash() + _, _, tx := MakeTxKV() + result := testBroadcastTxCommit(t, client, vm, tx) + testBlockResults(t, client, nil, &coretypes.ResultBlockResults{ + Height: result.Height, + AppHash: prevAppHash, + TxsResults: []*abcitypes.ExecTxResult{&result.TxResult}, + }) + + testBlockResults(t, client, &result.Height, &coretypes.ResultBlockResults{ + Height: result.Height, + AppHash: prevAppHash, + TxsResults: []*abcitypes.ExecTxResult{&result.TxResult}, + }) + }) + + t.Run("Tx", func(t *testing.T) { + for i := 0; i < 3; i++ { + _, _, tx := MakeTxKV() + result := testBroadcastTxCommit(t, client, vm, tx) + testTx(t, client, result.Hash.Bytes(), false, &coretypes.ResultTx{ + Hash: result.Hash, + Height: result.Height, + Index: 0, + TxResult: result.TxResult, + Tx: tx, + Proof: types.TxProof{}, + }) + } + }) + + t.Run("TxSearch", func(t *testing.T) { + _, _, tx := MakeTxKV() + txReply := testBroadcastTxCommit(t, client, vm, tx) + testTxSearch(t, client, fmt.Sprintf("tx.hash='%s'", txReply.Hash), false, nil, nil, "asc", &coretypes.ResultTxSearch{ + Txs: []*coretypes.ResultTx{{ + Hash: txReply.Hash, + Height: txReply.Height, + //TODO: check index + Index: 0, + TxResult: txReply.TxResult, + Tx: tx, + //TODO: check proof + Proof: types.TxProof{}, + }}, + TotalCount: 1, + }) + testTxSearch(t, client, fmt.Sprintf("tx.height=%d", txReply.Height), false, nil, nil, "asc", &coretypes.ResultTxSearch{ + Txs: []*coretypes.ResultTx{{ + Hash: txReply.Hash, + Height: txReply.Height, + //TODO: check index + Index: 0, + TxResult: txReply.TxResult, + Tx: tx, + //TODO: check proof + Proof: types.TxProof{}, + }}, + TotalCount: 1, + }) + }) + + t.Run("Commit", func(t *testing.T) { + prevAppHash := vm.state.AppHash() + _, _, tx := MakeTxKV() + txReply := testBroadcastTxCommit(t, client, vm, tx) + lastBlockHeight := vm.state.LastBlockHeight() + blk := testBlock(t, client, &lastBlockHeight, &coretypes.ResultBlock{ + Block: &types.Block{ + Header: types.Header{ + ChainID: vm.state.ChainID(), + Height: txReply.Height, + AppHash: prevAppHash, + }, + }, + }) + //TODO: implement check for all result commit fields + testCommit(t, client, &txReply.Height, &coretypes.ResultCommit{ + SignedHeader: types.SignedHeader{ + Header: &types.Header{ + //Version: bftversion.Consensus{}, + ChainID: vm.state.ChainID(), + Height: txReply.Height, + //Time: time.Time{}, + LastBlockID: blk.BlockID, + LastCommitHash: blk.Block.LastCommitHash, + DataHash: blk.Block.DataHash, + ValidatorsHash: blk.Block.ValidatorsHash, + NextValidatorsHash: blk.Block.NextValidatorsHash, + ConsensusHash: blk.Block.ConsensusHash, + AppHash: prevAppHash, + LastResultsHash: blk.Block.LastResultsHash, + EvidenceHash: blk.Block.EvidenceHash, + ProposerAddress: blk.Block.ProposerAddress, + }, + Commit: &types.Commit{ + Height: txReply.Height, + Round: 0, + BlockID: types.BlockID{}, + Signatures: nil, + }, + }, + CanonicalCommit: false, + }) + }) + + t.Run("BlockSearch", func(t *testing.T) { + initialHeight := vm.state.LastBlockHeight() + prevAppHash := vm.state.AppHash() + _, _, tx := MakeTxKV() + result := testBroadcastTxCommit(t, client, vm, tx) + blk := testBlock(t, client, &result.Height, &coretypes.ResultBlock{ + Block: &types.Block{ + Header: types.Header{ + ChainID: vm.state.ChainID(), + Height: result.Height, + AppHash: prevAppHash, + }, + }, + }) + testBlockSearch(t, client, fmt.Sprintf("block.height=%d", initialHeight+1), nil, nil, "asc", &coretypes.ResultBlockSearch{ + Blocks: []*coretypes.ResultBlock{blk}, + TotalCount: 1, + }) + prevAppHash = vm.state.AppHash() + _, _, tx = MakeTxKV() + result = testBroadcastTxCommit(t, client, vm, tx) + blk2 := testBlock(t, client, &result.Height, &coretypes.ResultBlock{ + Block: &types.Block{ + Header: types.Header{ + ChainID: vm.state.ChainID(), + Height: result.Height, + AppHash: prevAppHash, + }, + }, + }) + testBlockSearch(t, client, fmt.Sprintf("block.height>%d", initialHeight), nil, nil, "asc", &coretypes.ResultBlockSearch{ + Blocks: []*coretypes.ResultBlock{blk, blk2}, + TotalCount: 2, + }) + }) +} + +func TestMempoolService(t *testing.T) { + t.Run("JSONRPC", func(t *testing.T) { + testMempoolService(t, setupRPCServer) + }) + t.Run("WebSocket", func(t *testing.T) { + testMempoolService(t, setupWSRPCServer) + }) +} + +func testMempoolService(t *testing.T, serverBuilder setupServerAndTransport) { + server, vm, client, cancel := serverBuilder(t, noAction) + defer server.Close() + defer cancel() + + t.Run("UnconfirmedTxs", func(t *testing.T) { + limit := 10 + var count int + _, _, tx := MakeTxKV() + txs := []types.Tx{tx} + testBroadcastTxSync(t, client, tx) + if vm.mempool.Size() < limit { + count = vm.mempool.Size() + } else { + count = limit + } + testUnconfirmedTxs(t, client, &limit, &coretypes.ResultUnconfirmedTxs{ + Count: count, + Total: vm.mempool.Size(), + Txs: txs, + }) + for i := 0; i < 3; i++ { + _, _, tx = MakeTxKV() + txs = append(txs, tx) + testBroadcastTxSync(t, client, tx) + } + if vm.mempool.Size() < limit { + count = vm.mempool.Size() + } else { + count = limit + } + testUnconfirmedTxs(t, client, &limit, &coretypes.ResultUnconfirmedTxs{ + Count: count, + Total: vm.mempool.Size(), + Txs: txs, + }) + }) + + t.Run("NumUnconfirmedTxs", func(t *testing.T) { + _, _, tx := MakeTxKV() + txs := []types.Tx{tx} + testBroadcastTxSync(t, client, tx) + testNumUnconfirmedTxs(t, client, &coretypes.ResultUnconfirmedTxs{ + Count: vm.mempool.Size(), + Total: vm.mempool.Size(), + }) + for i := 0; i < 3; i++ { + _, _, tx = MakeTxKV() + txs = append(txs, tx) + testBroadcastTxSync(t, client, tx) + } + testNumUnconfirmedTxs(t, client, &coretypes.ResultUnconfirmedTxs{ + Count: vm.mempool.Size(), + Total: vm.mempool.Size(), + }) + }) + + t.Run("CheckTx", func(t *testing.T) { + _, _, tx := MakeTxKV() + testCheckTx(t, client, tx, &coretypes.ResultCheckTx{ + ResponseCheckTx: abcitypes.ResponseCheckTx{Code: kvstore.CodeTypeOK}, + }) + testCheckTx(t, client, []byte("inappropriate tx"), &coretypes.ResultCheckTx{ + ResponseCheckTx: abcitypes.ResponseCheckTx{Code: kvstore.CodeTypeInvalidTxFormat}, + }) + }) +} + +//TODO: implement complicated combinations +//TODO: implement rpc methods below, than implement according unit tests +//{"Header", "header", map[string]interface{}{}, new(ctypes.ResultHeader)}, +//{"HeaderByHash", "header_by_hash", map[string]interface{}{}, new(ctypes.ResultHeader)}, +//{"Validators", "validators", map[string]interface{}{}, new(ctypes.ResultValidators)}, diff --git a/vm/types/config.go b/vm/types/config.go new file mode 100644 index 00000000..c126ac58 --- /dev/null +++ b/vm/types/config.go @@ -0,0 +1,48 @@ +package types + +import ( + "fmt" + "time" +) + +const ( + defaultRPCPort = 9752 + defaultGRPCPort = 9090 + defaultMaxOpenConnections = 0 // unlimited + defaultTimeoutBroadcastTxCommit time.Duration = 30 * time.Second +) + +// VmConfig ... +type VmConfig struct { + RPCPort uint16 `json:"rpc_port"` + GRPCPort uint16 `json:"grpc_port"` + GRPCMaxOpenConnections int `json:"grpc_max_open_connections"` + TimeoutBroadcastTxCommit time.Duration `json:"broadcast_commit_timeout"` + NetworkName string `json:"network_name"` +} + +// SetDefaults sets the default values for the config. +func (c *VmConfig) SetDefaults() { + c.RPCPort = defaultRPCPort + c.GRPCPort = defaultGRPCPort + c.GRPCMaxOpenConnections = defaultMaxOpenConnections + c.TimeoutBroadcastTxCommit = defaultTimeoutBroadcastTxCommit + c.NetworkName = "landslide-test" +} + +// Validate returns an error if this is an invalid config. +func (c *VmConfig) Validate() error { + if c.GRPCMaxOpenConnections < 0 { + return fmt.Errorf("grpc_max_open_connections can't be negative") + } + + if c.TimeoutBroadcastTxCommit < 0 { + return fmt.Errorf("broadcast_tx_commit_timeout can't be negative") + } + + if len(c.NetworkName) == 0 { + return fmt.Errorf("network_name can't be empty") + } + + return nil +} diff --git a/vm/types/state/executor.go b/vm/types/state/executor.go index 4ab3b26e..1b7b75d3 100644 --- a/vm/types/state/executor.go +++ b/vm/types/state/executor.go @@ -11,7 +11,7 @@ import ( "github.com/cometbft/cometbft/libs/fail" "github.com/cometbft/cometbft/libs/log" "github.com/cometbft/cometbft/mempool" - cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" + tmproto "github.com/cometbft/cometbft/proto/tendermint/types" "github.com/cometbft/cometbft/proxy" statetypes "github.com/cometbft/cometbft/state" "github.com/cometbft/cometbft/types" @@ -456,7 +456,7 @@ func BuildLastCommitInfo(block *types.Block, lastValSet *types.ValidatorSet, ini commitSig := block.LastCommit.Signatures[i] votes[i] = abci.VoteInfo{ Validator: types.TM2PB.Validator(val), - BlockIdFlag: cmtproto.BlockIDFlag(commitSig.BlockIDFlag), + BlockIdFlag: tmproto.BlockIDFlag(commitSig.BlockIDFlag), } } @@ -535,7 +535,7 @@ func BuildExtendedCommitInfo(ec *types.ExtendedCommit, valSet *types.ValidatorSe votes[i] = abci.ExtendedVoteInfo{ Validator: types.TM2PB.Validator(val), - BlockIdFlag: cmtproto.BlockIDFlag(ecs.BlockIDFlag), + BlockIdFlag: tmproto.BlockIDFlag(ecs.BlockIDFlag), VoteExtension: ecs.Extension, ExtensionSignature: ecs.ExtensionSignature, } diff --git a/vm/types/state/utils.go b/vm/types/state/utils.go index e0f53459..3ea8b4ec 100644 --- a/vm/types/state/utils.go +++ b/vm/types/state/utils.go @@ -6,7 +6,7 @@ import ( "github.com/cometbft/cometbft/crypto" "github.com/cometbft/cometbft/libs/json" - cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" + tmproto "github.com/cometbft/cometbft/proto/tendermint/types" "github.com/cometbft/cometbft/state" "github.com/cometbft/cometbft/types" @@ -55,7 +55,7 @@ func EncodeBlock(block *types.Block) ([]byte, error) { } func DecodeBlock(data []byte) (*types.Block, error) { - protoBlock := new(cmtproto.Block) + protoBlock := new(tmproto.Block) if err := protoBlock.Unmarshal(data); err != nil { return nil, err } diff --git a/vm/vm.go b/vm/vm.go index ba36139f..02191b09 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/consideritdone/landslidevm/safestate" http2 "net/http" "os" "slices" @@ -16,8 +17,10 @@ import ( abcitypes "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/config" "github.com/cometbft/cometbft/consensus" + "github.com/cometbft/cometbft/crypto" "github.com/cometbft/cometbft/crypto/secp256k1" "github.com/cometbft/cometbft/libs/log" + "github.com/cometbft/cometbft/libs/pubsub" "github.com/cometbft/cometbft/mempool" "github.com/cometbft/cometbft/node" "github.com/cometbft/cometbft/proxy" @@ -36,6 +39,7 @@ import ( "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" + rpcserver "github.com/cometbft/cometbft/rpc/jsonrpc/server" "github.com/consideritdone/landslidevm/database" "github.com/consideritdone/landslidevm/grpcutils" "github.com/consideritdone/landslidevm/http" @@ -89,11 +93,13 @@ type ( GenesisBytes []byte UpgradeBytes []byte ConfigBytes []byte + ChainDataDir string } AppCreator func(*AppCreatorOpts) (Application, error) LandslideVM struct { + networkName string allowShutdown *vmtypes.Atomic[bool] processMetrics prometheus.Gatherer @@ -112,7 +118,7 @@ type ( blockStore *store.BlockStore stateStore state.Store - state state.State + state safestate.SafeState genesis *types.GenesisDoc genChunks []string @@ -144,7 +150,7 @@ func NewViaDB(database dbm.DB, creator AppCreator, options ...func(*LandslideVM) vm := &LandslideVM{ appCreator: creator, database: database, - allowShutdown: vmtypes.NewAtomic(true), + allowShutdown: vmtypes.NewAtomic(false), vmenabled: vmtypes.NewAtomic(false), vmstate: vmtypes.NewAtomic(vmpb.State_STATE_UNSPECIFIED), vmconnected: vmtypes.NewAtomic(false), @@ -255,7 +261,7 @@ func (vm *LandslideVM) Initialize(_ context.Context, req *vmpb.InitializeRequest vm.appOpts = &AppCreatorOpts{ NetworkId: req.NetworkId, SubnetId: req.SubnetId, - ChainId: req.CChainId, + ChainId: req.ChainId, NodeId: req.NodeId, PublicKey: req.PublicKey, XChainId: req.XChainId, @@ -264,18 +270,34 @@ func (vm *LandslideVM) Initialize(_ context.Context, req *vmpb.InitializeRequest GenesisBytes: req.GenesisBytes, UpgradeBytes: req.UpgradeBytes, ConfigBytes: req.ConfigBytes, + ChainDataDir: req.ChainDataDir, } app, err := vm.appCreator(vm.appOpts) if err != nil { return nil, err } - vm.state, vm.genesis, err = node.LoadStateFromDBOrGenesisDocProvider( + // Set the default configuration + var vmCfg vmtypes.VmConfig + vmCfg.SetDefaults() + if len(vm.appOpts.ConfigBytes) > 0 { + if err := json.Unmarshal(vm.appOpts.ConfigBytes, &vmCfg); err != nil { + return nil, fmt.Errorf("failed to unmarshal config %s: %w", string(vm.appOpts.ConfigBytes), err) + } + } + if err := vmCfg.Validate(); err != nil { + return nil, err + } + vm.networkName = vmCfg.NetworkName + + cmtState, genesis, err := node.LoadStateFromDBOrGenesisDocProvider( dbStateStore, func() (*types.GenesisDoc, error) { return types.GenesisDocFromJSON(req.GenesisBytes) }, ) + vm.state = safestate.New(cmtState) + vm.genesis = genesis if err != nil { return nil, err } @@ -313,7 +335,7 @@ func (vm *LandslideVM) Initialize(_ context.Context, req *vmpb.InitializeRequest handshaker := consensus.NewHandshaker( vm.stateStore, - vm.state, + vm.state.StateCopy(), vm.blockStore, vm.genesis, ) @@ -323,18 +345,19 @@ func (vm *LandslideVM) Initialize(_ context.Context, req *vmpb.InitializeRequest return nil, fmt.Errorf("error during handshake: %v", err) } - vm.state, err = vm.stateStore.Load() + cmtState, err = vm.stateStore.Load() if err != nil { return nil, err } + vm.state.UpdateState(cmtState) vm.mempool = mempool.NewCListMempool( config.DefaultMempoolConfig(), vm.app.Mempool(), - vm.state.LastBlockHeight, + vm.state.LastBlockHeight(), mempool.WithMetrics(mempool.NopMetrics()), - mempool.WithPreCheck(state.TxPreCheck(vm.state)), - mempool.WithPostCheck(state.TxPostCheck(vm.state)), + mempool.WithPreCheck(state.TxPreCheck(vm.state.StateCopy())), + mempool.WithPostCheck(state.TxPostCheck(vm.state.StateCopy())), ) vm.mempool.SetLogger(vm.logger.With("module", "mempool")) vm.mempool.EnableTxsAvailable() @@ -347,15 +370,15 @@ func (vm *LandslideVM) Initialize(_ context.Context, req *vmpb.InitializeRequest }() var blk *types.Block - if vm.state.LastBlockHeight > 0 { - vm.logger.Debug("loading last block", "height", vm.state.LastBlockHeight) - blk = vm.blockStore.LoadBlock(vm.state.LastBlockHeight) + if vm.state.LastBlockHeight() > 0 { + vm.logger.Debug("loading last block", "height", vm.state.LastBlockHeight()) + blk = vm.blockStore.LoadBlock(vm.state.LastBlockHeight()) } else { vm.logger.Debug("creating genesis block") executor := vmstate.NewBlockExecutor(vm.stateStore, vm.logger, vm.app.Consensus(), vm.mempool, vm.blockStore) executor.SetEventBus(vm.eventBus) - blk, err = executor.CreateProposalBlock(context.Background(), vm.state.LastBlockHeight+1, vm.state, &types.ExtendedCommit{}, proposerAddress) + blk, err = executor.CreateProposalBlock(context.Background(), vm.state.LastBlockHeight()+1, vm.state.StateCopy(), &types.ExtendedCommit{}, proposerAddress) if err != nil { return nil, err } @@ -370,25 +393,25 @@ func (vm *LandslideVM) Initialize(_ context.Context, req *vmpb.InitializeRequest PartSetHeader: bps.Header(), } - newstate, err := executor.ApplyBlock(vm.state, blockID, blk) + newstate, err := executor.ApplyBlock(vm.state.StateCopy(), blockID, blk) if err != nil { return nil, err } - vm.blockStore.SaveBlock(blk, bps, commit.MakeCommit(blk.Height, blk.Time, vm.state.Validators, blockID)) + vm.blockStore.SaveBlock(blk, bps, commit.MakeCommit(blk.Height, blk.Time, vm.state.Validators(), blockID)) err = vm.stateStore.Save(newstate) if err != nil { vm.logger.Error("failed to save state", "err", err) return nil, err } - vm.state = newstate + vm.state.UpdateState(newstate) } blockBytes, err := vmstate.EncodeBlockWithStatus(blk, vmpb.Status_STATUS_ACCEPTED) if err != nil { return nil, err } - vm.logger.Debug("initialize block", "bytes ", blockBytes) + //vm.logger.Debug("initialize block", "bytes ", blockBytes) vm.logger.Info("vm initialization completed") parentHash := block.BlockParentHash(blk) @@ -414,18 +437,19 @@ func (vm *LandslideVM) SetState(_ context.Context, req *vmpb.SetStateRequest) (* vm.logger.Error("SetState", "state", req.State) return nil, ErrUnknownState } - blk := vm.blockStore.LoadBlock(vm.state.LastBlockHeight) + blk := vm.blockStore.LoadBlock(vm.state.LastBlockHeight()) if blk == nil { return nil, ErrNotFound } - vm.logger.Debug("SetState", "LastAcceptedId", vm.state.LastBlockID.Hash, "block", blk.Hash()) + blkID := vm.state.LastBlockID() + vm.logger.Debug("SetState", "LastAcceptedId", blkID.Hash, "block", blk.Hash()) parentHash := block.BlockParentHash(blk) res := vmpb.SetStateResponse{ LastAcceptedId: blk.Hash(), LastAcceptedParentId: parentHash[:], Height: uint64(blk.Height), - Bytes: vm.state.Bytes(), + Bytes: vm.state.StateBytes(), Timestamp: timestamppb.New(blk.Time), } vm.vmstate.Set(req.State) @@ -439,7 +463,7 @@ func (vm *LandslideVM) CanShutdown() bool { // Shutdown is called when the node is shutting down. func (vm *LandslideVM) Shutdown(context.Context, *emptypb.Empty) (*emptypb.Empty, error) { - vm.logger.Info("Shutdown") + fmt.Println("Shutdown") vm.allowShutdown.Set(true) if vm.closed != nil { close(vm.closed) @@ -471,7 +495,20 @@ func (vm *LandslideVM) CreateHandlers(context.Context, *emptypb.Empty) (*vmpb.Cr vm.serverCloser.Add(server) mux := http2.NewServeMux() - jsonrpc.RegisterRPCFuncs(mux, NewRPC(vm).Routes(), vm.logger) + tmRPC := NewRPC(vm) + wm := rpcserver.NewWebsocketManager(tmRPC.TMRoutes(), + rpcserver.OnDisconnect(func(remoteAddr string) { + err := vm.eventBus.UnsubscribeAll(context.Background(), remoteAddr) + if err != nil && err != pubsub.ErrSubscriptionNotFound { + vm.logger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) + } + }), + rpcserver.ReadLimit(config.DefaultRPCConfig().MaxBodyBytes), + rpcserver.WriteChanCapacity(config.DefaultRPCConfig().WebSocketWriteBufferSize), + ) + wm.SetLogger(vm.logger) + mux.HandleFunc("/websocket", wm.WebsocketHandler) + jsonrpc.RegisterRPCFuncs(mux, tmRPC.Routes(), vm.logger) httppb.RegisterHTTPServer(server, http.NewServer(mux)) @@ -510,26 +547,27 @@ func (vm *LandslideVM) BuildBlock(context.Context, *vmpb.BuildBlockRequest) (*vm executor := vmstate.NewBlockExecutor(vm.stateStore, vm.logger, vm.app.Consensus(), vm.mempool, vm.blockStore) executor.SetEventBus(vm.eventBus) - signatures := make([]types.ExtendedCommitSig, len(vm.state.Validators.Validators)) + validators := vm.state.Validators() + signatures := make([]types.ExtendedCommitSig, len(validators.Validators)) for i := range signatures { signatures[i] = types.ExtendedCommitSig{ CommitSig: types.CommitSig{ BlockIDFlag: types.BlockIDFlagNil, Timestamp: time.Now(), - ValidatorAddress: vm.state.Validators.Validators[i].Address, - Signature: []byte{0x0}, + ValidatorAddress: validators.Validators[i].Address, + Signature: crypto.CRandBytes(types.MaxSignatureSize), // todo: sign the block }, } } lastComm := types.ExtendedCommit{ - Height: vm.state.LastBlockHeight, + Height: vm.state.LastBlockHeight(), Round: 0, - BlockID: vm.state.LastBlockID, + BlockID: vm.state.LastBlockID(), ExtendedSignatures: signatures, } - blk, err := executor.CreateProposalBlock(context.Background(), vm.state.LastBlockHeight+1, vm.state, &lastComm, proposerAddress) + blk, err := executor.CreateProposalBlock(context.Background(), vm.state.LastBlockHeight()+1, vm.state.StateCopy(), &lastComm, proposerAddress) if err != nil { vm.logger.Error("failed to create proposal block", "err", err) return nil, err @@ -565,7 +603,8 @@ func (vm *LandslideVM) BuildBlock(context.Context, *vmpb.BuildBlockRequest) (*vm // ParseBlock attempt to create a block from a stream of bytes. func (vm *LandslideVM) ParseBlock(_ context.Context, req *vmpb.ParseBlockRequest) (*vmpb.ParseBlockResponse, error) { - vm.logger.Debug("ParseBlock", "bytes", req.Bytes) + vm.logger.Info("ParseBlock") + //vm.logger.Debug("ParseBlock", "bytes", req.Bytes) var ( blk *types.Block blkStatus vmpb.Status @@ -821,7 +860,7 @@ func (vm *LandslideVM) GetStateSummary(context.Context, *vmpb.GetStateSummaryReq func (vm *LandslideVM) BlockVerify(_ context.Context, req *vmpb.BlockVerifyRequest) (*vmpb.BlockVerifyResponse, error) { vm.logger.Info("BlockVerify") - vm.logger.Debug("block verify", "bytes", req.Bytes) + //vm.logger.Debug("block verify", "bytes", req.Bytes) blk, blkStatus, err := vmstate.DecodeBlockWithStatus(req.Bytes) if err != nil { @@ -830,7 +869,7 @@ func (vm *LandslideVM) BlockVerify(_ context.Context, req *vmpb.BlockVerifyReque } vm.logger.Info("ValidateBlock") - err = vmstate.ValidateBlock(vm.state, blk) + err = vmstate.ValidateBlock(vm.state.StateCopy(), blk) if err != nil { vm.logger.Error("failed to validate block", "err", err) return nil, err @@ -879,12 +918,13 @@ func (vm *LandslideVM) BlockAccept(_ context.Context, req *vmpb.BlockAcceptReque PartSetHeader: bps.Header(), } - newstate, err := executor.ApplyBlock(vm.state, blockID, blk) + prevState := vm.state.StateCopy() + newstate, err := executor.ApplyBlock(prevState, blockID, blk) if err != nil { vm.logger.Error("failed to apply block", "err", err) return nil, err } - vm.blockStore.SaveBlock(blk, bps, commit.MakeCommit(blk.Height, blk.Time, vm.state.Validators, blockID)) + vm.blockStore.SaveBlock(blk, bps, commit.MakeCommit(blk.Height, blk.Time, vm.state.Validators(), blockID)) err = vm.stateStore.Save(newstate) if err != nil { @@ -892,7 +932,7 @@ func (vm *LandslideVM) BlockAccept(_ context.Context, req *vmpb.BlockAcceptReque return nil, err } - vm.state = newstate + vm.state.UpdateState(newstate) delete(vm.wrappedBlocks.VerifiedBlocks, blkID) vm.wrappedBlocks.MissingBlocks.Evict(blkID) diff --git a/vm/vm_test.go b/vm/vm_test.go index dc4e31c2..f95ebf30 100644 --- a/vm/vm_test.go +++ b/vm/vm_test.go @@ -9,6 +9,7 @@ import ( "github.com/cometbft/cometbft/abci/example/kvstore" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" vmpb "github.com/consideritdone/landslidevm/proto/vm" ) @@ -120,3 +121,30 @@ func TestAcceptBlock(t *testing.T) { }) require.NoError(t, err) } + +// TestShutdownWithoutInit tests VM Shutdown function. This function called without Initialize in Avalanchego Factory +// https://github.com/ava-labs/avalanchego/blob/0c4efd743e1d737f4e8970d0e0ebf229ea44406c/vms/manager.go#L129 +func TestShutdownWithoutInit(t *testing.T) { + vmdb := dbm.NewMemDB() + appdb := dbm.NewMemDB() + mockConn := &mockClientConn{} + vm := NewViaDB(vmdb, func(*AppCreatorOpts) (Application, error) { + return kvstore.NewApplication(appdb), nil + }, WithClientConn(mockConn)) + require.NotNil(t, vm) + _, err := vm.Shutdown(context.Background(), &emptypb.Empty{}) + require.NoError(t, err) +} + +// allowShutdown should be false by default https://github.com/ava-labs/avalanchego/blob/c8a5d0b11bcfe8b8a74983a9b0ef04fc68e78cf3/vms/rpcchainvm/vm.go#L40 +func TestAllowShutdown(t *testing.T) { + vm := newFreshKvApp(t) + vmLnd := vm.(*LandslideVM) + + require.False(t, vmLnd.CanShutdown()) + + _, err := vm.Shutdown(context.Background(), &emptypb.Empty{}) + require.NoError(t, err) + + require.True(t, vmLnd.CanShutdown()) +} diff --git a/vm/ws_client.go b/vm/ws_client.go new file mode 100644 index 00000000..a4b82b4d --- /dev/null +++ b/vm/ws_client.go @@ -0,0 +1,835 @@ +package vm + +import ( + "context" + "errors" + "github.com/cometbft/cometbft/libs/bytes" + tmjson "github.com/cometbft/cometbft/libs/json" + "github.com/cometbft/cometbft/libs/pubsub" + tmsync "github.com/cometbft/cometbft/libs/sync" + rpcclient "github.com/cometbft/cometbft/rpc/client" + ctypes "github.com/cometbft/cometbft/rpc/core/types" + "github.com/cometbft/cometbft/rpc/jsonrpc/client" + "github.com/cometbft/cometbft/types" + "strings" + "time" +) + +type WSClient struct { + *client.WSClient + + mtx tmsync.RWMutex + subscriptions map[string]chan ctypes.ResultEvent // query -> chan +} + +func NewWSClient(wsClient *client.WSClient) *WSClient { + return &WSClient{ + WSClient: wsClient, + mtx: tmsync.RWMutex{}, + subscriptions: make(map[string]chan ctypes.ResultEvent), + } +} + +func (ws *WSClient) Status(ctx context.Context) (*ctypes.ResultStatus, error) { + params := make(map[string]interface{}) + err := ws.Call(context.Background(), "status", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultStatus) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) ABCIInfo(ctx context.Context) (*ctypes.ResultABCIInfo, error) { + params := make(map[string]interface{}) + err := ws.Call(context.Background(), "abci_info", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultABCIInfo) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) ABCIQuery( + ctx context.Context, + path string, + data bytes.HexBytes, +) (*ctypes.ResultABCIQuery, error) { + return ws.ABCIQueryWithOptions(ctx, path, data, rpcclient.DefaultABCIQueryOptions) +} + +func (ws *WSClient) ABCIQueryWithOptions( + ctx context.Context, + path string, + data bytes.HexBytes, + opts rpcclient.ABCIQueryOptions, +) (*ctypes.ResultABCIQuery, error) { + params := map[string]interface{}{"path": path, "data": data, "height": opts.Height, "prove": opts.Prove} + err := ws.Call(context.Background(), "abci_query", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultABCIQuery) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) BroadcastTxCommit( + ctx context.Context, + tx types.Tx, +) (*ctypes.ResultBroadcastTxCommit, error) { + params := map[string]interface{}{"tx": tx} + err := ws.Call(ctx, "broadcast_tx_commit", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultBroadcastTxCommit) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) BroadcastTxAsync( + ctx context.Context, + tx types.Tx, +) (*ctypes.ResultBroadcastTx, error) { + return ws.broadcastTX(ctx, "broadcast_tx_async", tx) +} + +func (ws *WSClient) BroadcastTxSync( + ctx context.Context, + tx types.Tx, +) (*ctypes.ResultBroadcastTx, error) { + return ws.broadcastTX(ctx, "broadcast_tx_sync", tx) +} + +func (ws *WSClient) broadcastTX( + ctx context.Context, + route string, + tx types.Tx, +) (*ctypes.ResultBroadcastTx, error) { + params := map[string]interface{}{"tx": tx} + err := ws.Call(ctx, route, params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultBroadcastTx) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) UnconfirmedTxs( + ctx context.Context, + limit *int, +) (*ctypes.ResultUnconfirmedTxs, error) { + params := make(map[string]interface{}) + if limit != nil { + params["limit"] = limit + } + err := ws.Call(ctx, "unconfirmed_txs", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultUnconfirmedTxs) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) NumUnconfirmedTxs(ctx context.Context) (*ctypes.ResultUnconfirmedTxs, error) { + params := make(map[string]interface{}) + err := ws.Call(ctx, "num_unconfirmed_txs", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultUnconfirmedTxs) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) { + params := map[string]interface{}{"tx": tx} + err := ws.Call(ctx, "check_tx", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultCheckTx) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) NetInfo(ctx context.Context) (*ctypes.ResultNetInfo, error) { + params := make(map[string]interface{}) + err := ws.Call(ctx, "net_info", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultNetInfo) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) DumpConsensusState(ctx context.Context) (*ctypes.ResultDumpConsensusState, error) { + params := make(map[string]interface{}) + err := ws.Call(ctx, "dump_consensus_state", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultDumpConsensusState) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) ConsensusState(ctx context.Context) (*ctypes.ResultConsensusState, error) { + params := make(map[string]interface{}) + err := ws.Call(ctx, "consensus_state", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultConsensusState) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) ConsensusParams( + ctx context.Context, + height *int64, +) (*ctypes.ResultConsensusParams, error) { + params := make(map[string]interface{}) + if height != nil { + params["height"] = height + } + err := ws.Call(ctx, "consensus_params", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultConsensusParams) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) Health(ctx context.Context) (*ctypes.ResultHealth, error) { + params := make(map[string]interface{}) + + err := ws.Call(ctx, "health", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultHealth) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) BlockchainInfo( + ctx context.Context, + minHeight, + maxHeight int64, +) (*ctypes.ResultBlockchainInfo, error) { + params := map[string]interface{}{"minHeight": minHeight, "maxHeight": maxHeight} + err := ws.Call(ctx, "blockchain", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultBlockchainInfo) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) Genesis(ctx context.Context) (*ctypes.ResultGenesis, error) { + params := make(map[string]interface{}) + err := ws.Call(ctx, "genesis", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultGenesis) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) GenesisChunked(ctx context.Context, id uint) (*ctypes.ResultGenesisChunk, error) { + params := map[string]interface{}{"chunk": id} + err := ws.Call(ctx, "genesis_chunked", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultGenesisChunk) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) { + params := make(map[string]interface{}) + if height != nil { + params["height"] = height + } + err := ws.Call(ctx, "block", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultBlock) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) BlockByHash(ctx context.Context, hash []byte) (*ctypes.ResultBlock, error) { + params := map[string]interface{}{ + "hash": hash, + } + err := ws.Call(ctx, "block_by_hash", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultBlock) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) BlockResults( + ctx context.Context, + height *int64, +) (*ctypes.ResultBlockResults, error) { + params := make(map[string]interface{}) + if height != nil { + params["height"] = height + } + err := ws.Call(ctx, "block_results", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultBlockResults) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) Header(ctx context.Context, height *int64) (*ctypes.ResultHeader, error) { + params := make(map[string]interface{}) + if height != nil { + params["height"] = height + } + err := ws.Call(ctx, "header", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultHeader) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) HeaderByHash(ctx context.Context, hash bytes.HexBytes) (*ctypes.ResultHeader, error) { + params := map[string]interface{}{ + "hash": hash, + } + err := ws.Call(ctx, "header_by_hash", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultHeader) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error) { + params := make(map[string]interface{}) + if height != nil { + params["height"] = height + } + err := ws.Call(ctx, "commit", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultCommit) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { + params := map[string]interface{}{ + "hash": hash, + "prove": prove, + } + err := ws.Call(ctx, "tx", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultTx) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) TxSearch( + ctx context.Context, + query string, + prove bool, + page, + perPage *int, + orderBy string, +) (*ctypes.ResultTxSearch, error) { + params := map[string]interface{}{ + "query": query, + "prove": prove, + "order_by": orderBy, + } + + if page != nil { + params["page"] = page + } + if perPage != nil { + params["per_page"] = perPage + } + + err := ws.Call(ctx, "tx_search", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultTxSearch) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) BlockSearch( + ctx context.Context, + query string, + page, perPage *int, + orderBy string, +) (*ctypes.ResultBlockSearch, error) { + params := map[string]interface{}{ + "query": query, + "order_by": orderBy, + } + + if page != nil { + params["page"] = page + } + if perPage != nil { + params["per_page"] = perPage + } + + err := ws.Call(ctx, "block_search", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultBlockSearch) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) Validators( + ctx context.Context, + height *int64, + page, + perPage *int, +) (*ctypes.ResultValidators, error) { + params := make(map[string]interface{}) + if page != nil { + params["page"] = page + } + if perPage != nil { + params["per_page"] = perPage + } + if height != nil { + params["height"] = height + } + err := ws.Call(ctx, "validators", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultValidators) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (ws *WSClient) BroadcastEvidence( + ctx context.Context, + ev types.Evidence, +) (*ctypes.ResultBroadcastEvidence, error) { + params := map[string]interface{}{"evidence": ev} + err := ws.Call(ctx, "broadcast_evidence", params) + if err != nil { + return nil, err + } + + msg := <-ws.ResponsesCh + if msg.Error != nil { + return nil, err + } + result := new(ctypes.ResultBroadcastEvidence) + err = tmjson.Unmarshal(msg.Result, result) + if err != nil { + return nil, err + } + return result, nil +} + +//----------------------------------------------------------------------------- +// WSEvents + +var errNotRunning = errors.New("client is not running. Use .Start() method to start") + +// OnStart implements service.Service by starting WSClient and event loop. +func (ws *WSClient) OnStart() error { + if err := ws.WSClient.Start(); err != nil { + return err + } + + go ws.eventListener() + + return nil +} + +// OnStop implements service.Service by stopping WSClient. +func (ws *WSClient) OnStop() { + if err := ws.WSClient.Stop(); err != nil { + ws.Logger.Error("Can't stop ws client", "err", err) + } +} + +func (ws *WSClient) eventListener() { + for { + select { + case resp, ok := <-ws.ResponsesCh: + if !ok { + return + } + + if resp.Error != nil { + ws.Logger.Error("WS error", "err", resp.Error.Error()) + // Error can be ErrAlreadySubscribed or max client (subscriptions per + // client) reached or CometBFT exited. + // We can ignore ErrAlreadySubscribed, but need to retry in other + // cases. + if !isErrAlreadySubscribed(resp.Error) { + // Resubscribe after 1 second to give CometBFT time to restart (if + // crashed). + ws.redoSubscriptionsAfter(1 * time.Second) + } + continue + } + + result := new(ctypes.ResultEvent) + err := tmjson.Unmarshal(resp.Result, result) + if err != nil { + ws.Logger.Error("failed to unmarshal response", "err", err) + continue + } + + ws.mtx.RLock() + if out, ok := ws.subscriptions[result.Query]; ok { + if cap(out) == 0 { + out <- *result + } else { + select { + case out <- *result: + default: + ws.Logger.Error("wanted to publish ResultEvent, but out channel is full", "result", result, "query", result.Query) + } + } + } + ws.mtx.RUnlock() + case <-ws.Quit(): + return + } + } +} + +func isErrAlreadySubscribed(err error) bool { + return strings.Contains(err.Error(), pubsub.ErrAlreadySubscribed.Error()) +} + +// After being reconnected, it is necessary to redo subscription to server +// otherwise no data will be automatically received. +func (ws *WSClient) redoSubscriptionsAfter(d time.Duration) { + time.Sleep(d) + + ws.mtx.RLock() + defer ws.mtx.RUnlock() + for q := range ws.subscriptions { + err := ws.WSClient.Subscribe(context.Background(), q) + if err != nil { + ws.Logger.Error("Failed to resubscribe", "err", err) + } + } +} + +// Subscribe implements EventsClient by using WSClient to subscribe given +// subscriber to query. By default, returns a channel with cap=1. Error is +// returned if it fails to subscribe. +// +// Channel is never closed to prevent clients from seeing an erroneous event. +// +// It returns an error if WSEvents is not running. +func (ws *WSClient) Subscribe(ctx context.Context, _, query string, + outCapacity ...int, +) (out <-chan ctypes.ResultEvent, err error) { + if !ws.IsRunning() { + return nil, errNotRunning + } + + if err := ws.WSClient.Subscribe(ctx, query); err != nil { + return nil, err + } + + outCap := 1 + if len(outCapacity) > 0 { + outCap = outCapacity[0] + } + + outc := make(chan ctypes.ResultEvent, outCap) + ws.mtx.Lock() + // subscriber param is ignored because CometBFT will override it with + // remote IP anyway. + ws.subscriptions[query] = outc + ws.mtx.Unlock() + + return outc, nil +} + +// Unsubscribe implements EventsClient by using WSClient to unsubscribe given +// subscriber from query. +// +// It returns an error if WSEvents is not running. +func (ws *WSClient) Unsubscribe(ctx context.Context, _, query string) error { + if !ws.IsRunning() { + return errNotRunning + } + + if err := ws.WSClient.Unsubscribe(ctx, query); err != nil { + return err + } + + ws.mtx.Lock() + _, ok := ws.subscriptions[query] + if ok { + delete(ws.subscriptions, query) + } + ws.mtx.Unlock() + + return nil +} + +// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe +// given subscriber from all the queries. +// +// It returns an error if WSEvents is not running. +func (ws *WSClient) UnsubscribeAll(ctx context.Context, _ string) error { + if !ws.IsRunning() { + return errNotRunning + } + + if err := ws.WSClient.UnsubscribeAll(ctx); err != nil { + return err + } + + ws.mtx.Lock() + ws.subscriptions = make(map[string]chan ctypes.ResultEvent) + ws.mtx.Unlock() + + return nil +}