diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 1a3f870b266e..0114d9335676 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -15,6 +15,7 @@ import ( "google.golang.org/protobuf/reflect/protoregistry" "cosmossdk.io/collections" + addresscodec "cosmossdk.io/core/address" appmodulev2 "cosmossdk.io/core/appmodule/v2" "cosmossdk.io/core/comet" corecontext "cosmossdk.io/core/context" @@ -36,9 +37,6 @@ import ( consensustypes "cosmossdk.io/x/consensus/types" "github.com/cosmos/cosmos-sdk/codec" - codectypes "github.com/cosmos/cosmos-sdk/codec/types" - sdk "github.com/cosmos/cosmos-sdk/types" - txtypes "github.com/cosmos/cosmos-sdk/types/tx" ) const ( @@ -86,8 +84,10 @@ type consensus[T transaction.Tx] struct { addrPeerFilter types.PeerFilter // filter peers by address and port idPeerFilter types.PeerFilter // filter peers by node ID - queryHandlersMap map[string]appmodulev2.Handler - getProtoRegistry func() (*protoregistry.Files, error) + queryHandlersMap map[string]appmodulev2.Handler + getProtoRegistry func() (*protoregistry.Files, error) + consensusAddressCodec addresscodec.Codec + cfgMap server.ConfigMap } // CheckTx implements types.Application. @@ -184,6 +184,16 @@ func (c *consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) ( return resp, err } + // when a client did not provide a query height, manually inject the latest + // for modules queries, AppManager does it automatically + if req.Height == 0 { + latestVersion, err := c.store.GetLatestVersion() + if err != nil { + return nil, err + } + req.Height = int64(latestVersion) + } + // this error most probably means that we can't handle it with a proto message, so // it must be an app/p2p/store query path := splitABCIQueryPath(req.Path) @@ -238,48 +248,15 @@ func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq handlerFullName = string(md.Input().FullName()) } - // special case for simulation as it is an external gRPC registered on the grpc server component + // special case for non-module services as they are external gRPC registered on the grpc server component // and not on the app itself, so it won't pass the router afterwards. - if req.Path == "/cosmos.tx.v1beta1.Service/Simulate" { - simulateRequest := &txtypes.SimulateRequest{} - err = gogoproto.Unmarshal(req.Data, simulateRequest) - if err != nil { - return nil, true, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err) - } - tx, err := c.txCodec.Decode(simulateRequest.TxBytes) - if err != nil { - return nil, true, fmt.Errorf("failed to decode tx: %w", err) - } - - txResult, _, err := c.app.Simulate(ctx, tx) - if err != nil { - return nil, true, fmt.Errorf("failed with gas used: '%d': %w", txResult.GasUsed, err) - } - - msgResponses := make([]*codectypes.Any, 0, len(txResult.Resp)) - // pack the messages into Any - for _, msg := range txResult.Resp { - anyMsg, err := codectypes.NewAnyWithValue(msg) - if err != nil { - return nil, true, fmt.Errorf("failed to pack message response: %w", err) - } - - msgResponses = append(msgResponses, anyMsg) - } - - resp := &txtypes.SimulateResponse{ - GasInfo: &sdk.GasInfo{ - GasUsed: txResult.GasUsed, - GasWanted: txResult.GasWanted, - }, - Result: &sdk.Result{ - MsgResponses: msgResponses, - }, - } - - res, err := queryResponse(resp, req.Height) - return res, true, err + externalResp, err := c.maybeHandleExternalServices(ctx, req) + if err != nil { + return nil, true, err + } else if externalResp != nil { + resp, err = queryResponse(externalResp, req.Height) + return resp, true, err } handler, found := c.queryHandlersMap[handlerFullName] diff --git a/server/v2/cometbft/grpc.go b/server/v2/cometbft/grpc.go index 170475246d35..cfc682e47f31 100644 --- a/server/v2/cometbft/grpc.go +++ b/server/v2/cometbft/grpc.go @@ -2,6 +2,7 @@ package cometbft import ( "context" + "errors" "fmt" "strings" @@ -17,13 +18,19 @@ import ( "cosmossdk.io/core/server" corestore "cosmossdk.io/core/store" "cosmossdk.io/core/transaction" + "cosmossdk.io/log" storeserver "cosmossdk.io/server/v2/store" + rpchttp "github.com/cometbft/cometbft/rpc/client/http" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node" + "github.com/cosmos/cosmos-sdk/codec" codectypes "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/std" sdk "github.com/cosmos/cosmos-sdk/types" + sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" + "github.com/cosmos/cosmos-sdk/types/query" txtypes "github.com/cosmos/cosmos-sdk/types/tx" "github.com/cosmos/cosmos-sdk/x/auth/migrations/legacytx" authtx "github.com/cosmos/cosmos-sdk/x/auth/tx" @@ -46,7 +53,7 @@ func gRPCServiceRegistrar[T transaction.Tx]( ) func(srv *grpc.Server) error { return func(srv *grpc.Server) error { cmtservice.RegisterServiceServer(srv, cmtservice.NewQueryServer(clientCtx.Client, consensus.Query, clientCtx.ConsensusAddressCodec)) - txtypes.RegisterServiceServer(srv, txServer[T]{clientCtx, txCodec, app}) + txtypes.RegisterServiceServer(srv, txServer[T]{clientCtx, txCodec, app, consensus}) nodeservice.RegisterServiceServer(srv, nodeServer[T]{cfg, cometBFTAppConfig, consensus}) return nil @@ -57,6 +64,7 @@ type txServer[T transaction.Tx] struct { clientCtx client.Context txCodec transaction.Codec[T] app appSimulator[T] + consensus abci.Application } // BroadcastTx implements tx.ServiceServer. @@ -65,8 +73,84 @@ func (t txServer[T]) BroadcastTx(ctx context.Context, req *txtypes.BroadcastTxRe } // GetBlockWithTxs implements tx.ServiceServer. -func (t txServer[T]) GetBlockWithTxs(context.Context, *txtypes.GetBlockWithTxsRequest) (*txtypes.GetBlockWithTxsResponse, error) { - return nil, status.Error(codes.Unimplemented, "not implemented") +func (t txServer[T]) GetBlockWithTxs(ctx context.Context, req *txtypes.GetBlockWithTxsRequest) (*txtypes.GetBlockWithTxsResponse, error) { + logger := log.NewNopLogger() + if req == nil { + return nil, status.Error(codes.InvalidArgument, "request cannot be nil") + } + + resp, err := t.consensus.Info(ctx, &abci.InfoRequest{}) + if err != nil { + return nil, err + } + currentHeight := resp.LastBlockHeight + + if req.Height < 1 || req.Height > currentHeight { + return nil, sdkerrors.ErrInvalidHeight.Wrapf("requested height %d but height must not be less than 1 "+ + "or greater than the current height %d", req.Height, currentHeight) + } + + node, err := t.clientCtx.GetNode() + if err != nil { + return nil, err + } + + blockID, block, err := cmtservice.GetProtoBlock(ctx, node, &req.Height) + if err != nil { + return nil, err + } + + var offset, limit uint64 + if req.Pagination != nil { + offset = req.Pagination.Offset + limit = req.Pagination.Limit + } else { + offset = 0 + limit = query.DefaultLimit + } + + blockTxs := block.Data.Txs + blockTxsLn := uint64(len(blockTxs)) + txs := make([]*txtypes.Tx, 0, limit) + if offset >= blockTxsLn && blockTxsLn != 0 { + return nil, sdkerrors.ErrInvalidRequest.Wrapf("out of range: cannot paginate %d txs with offset %d and limit %d", blockTxsLn, offset, limit) + } + decodeTxAt := func(i uint64) error { + tx := blockTxs[i] + txb, err := t.clientCtx.TxConfig.TxDecoder()(tx) + fmt.Println("TxDecoder", txb, err) + if err != nil { + return err + } + p, err := txb.(interface{ AsTx() (*txtypes.Tx, error) }).AsTx() + if err != nil { + return err + } + txs = append(txs, p) + return nil + } + if req.Pagination != nil && req.Pagination.Reverse { + for i, count := offset, uint64(0); i > 0 && count != limit; i, count = i-1, count+1 { + if err = decodeTxAt(i); err != nil { + logger.Error("failed to decode tx", "error", err) + } + } + } else { + for i, count := offset, uint64(0); i < blockTxsLn && count != limit; i, count = i+1, count+1 { + if err = decodeTxAt(i); err != nil { + logger.Error("failed to decode tx", "error", err) + } + } + } + + return &txtypes.GetBlockWithTxsResponse{ + Txs: txs, + BlockId: &blockID, + Block: block, + Pagination: &query.PageResponse{ + Total: blockTxsLn, + }, + }, nil } // GetTx implements tx.ServiceServer. @@ -100,8 +184,33 @@ func (t txServer[T]) GetTx(ctx context.Context, req *txtypes.GetTxRequest) (*txt } // GetTxsEvent implements tx.ServiceServer. -func (t txServer[T]) GetTxsEvent(context.Context, *txtypes.GetTxsEventRequest) (*txtypes.GetTxsEventResponse, error) { - return nil, status.Error(codes.Unimplemented, "not implemented") +func (t txServer[T]) GetTxsEvent(ctx context.Context, req *txtypes.GetTxsEventRequest) (*txtypes.GetTxsEventResponse, error) { + if req == nil { + return nil, status.Error(codes.InvalidArgument, "request cannot be nil") + } + + orderBy := parseOrderBy(req.OrderBy) + + result, err := authtx.QueryTxsByEvents(t.clientCtx, int(req.Page), int(req.Limit), req.Query, orderBy) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + txsList := make([]*txtypes.Tx, len(result.Txs)) + for i, tx := range result.Txs { + protoTx, ok := tx.Tx.GetCachedValue().(*txtypes.Tx) + if !ok { + return nil, status.Errorf(codes.Internal, "getting cached value failed expected %T, got %T", txtypes.Tx{}, tx.Tx.GetCachedValue()) + } + + txsList[i] = protoTx + } + + return &txtypes.GetTxsEventResponse{ + Txs: txsList, + TxResponses: result.Txs, + Total: result.TotalCount, + }, nil } // Simulate implements tx.ServiceServer. @@ -159,8 +268,23 @@ func (t txServer[T]) Simulate(ctx context.Context, req *txtypes.SimulateRequest) } // TxDecode implements tx.ServiceServer. -func (t txServer[T]) TxDecode(context.Context, *txtypes.TxDecodeRequest) (*txtypes.TxDecodeResponse, error) { - return nil, status.Error(codes.Unimplemented, "not implemented") +func (t txServer[T]) TxDecode(ctx context.Context, req *txtypes.TxDecodeRequest) (*txtypes.TxDecodeResponse, error) { + if req.TxBytes == nil { + return nil, status.Error(codes.InvalidArgument, "invalid empty tx bytes") + } + + txb, err := t.clientCtx.TxConfig.TxDecoder()(req.TxBytes) + if err != nil { + return nil, err + } + + tx, err := txb.(interface{ AsTx() (*txtypes.Tx, error) }).AsTx() // TODO: maybe we can break the Tx interface to add this also + if err != nil { + return nil, err + } + return &txtypes.TxDecodeResponse{ + Tx: tx, + }, nil } // TxDecodeAmino implements tx.ServiceServer. @@ -325,3 +449,151 @@ var CometBFTAutoCLIDescriptor = &autocliv1.ServiceCommandDescriptor{ }, }, } + +func parseOrderBy(orderBy txtypes.OrderBy) string { + switch orderBy { + case txtypes.OrderBy_ORDER_BY_ASC: + return "asc" + case txtypes.OrderBy_ORDER_BY_DESC: + return "desc" + default: + return "" // Defaults to CometBFT's default, which is `asc` now. + } +} + +func (c *consensus[T]) maybeHandleExternalServices(ctx context.Context, req *abci.QueryRequest) (transaction.Msg, error) { + // Handle comet service + if strings.HasPrefix(req.Path, "/cosmos.base.tendermint.v1beta1.Service") { + rpcClient, _ := rpchttp.New(c.cfg.ConfigTomlConfig.RPC.ListenAddress) + + cometQServer := cmtservice.NewQueryServer(rpcClient, c.Query, c.consensusAddressCodec) + paths := strings.Split(req.Path, "/") + if len(paths) <= 2 { + return nil, fmt.Errorf("invalid request path: %s", req.Path) + } + + var resp transaction.Msg + var err error + switch paths[2] { + case "GetNodeInfo": + resp, err = handleExternalService(ctx, req, cometQServer.GetNodeInfo) + case "GetSyncing": + resp, err = handleExternalService(ctx, req, cometQServer.GetSyncing) + case "GetLatestBlock": + resp, err = handleExternalService(ctx, req, cometQServer.GetLatestBlock) + case "GetBlockByHeight": + resp, err = handleExternalService(ctx, req, cometQServer.GetBlockByHeight) + case "GetLatestValidatorSet": + resp, err = handleExternalService(ctx, req, cometQServer.GetLatestValidatorSet) + case "GetValidatorSetByHeight": + resp, err = handleExternalService(ctx, req, cometQServer.GetValidatorSetByHeight) + case "ABCIQuery": + resp, err = handleExternalService(ctx, req, cometQServer.ABCIQuery) + } + + return resp, err + } + + // Handle node service + if strings.HasPrefix(req.Path, "/cosmos.base.node.v1beta1.Service") { + nodeQService := nodeServer[T]{c.cfgMap, c.cfg.AppTomlConfig, c} + paths := strings.Split(req.Path, "/") + if len(paths) <= 2 { + return nil, fmt.Errorf("invalid request path: %s", req.Path) + } + + var resp transaction.Msg + var err error + switch paths[2] { + case "Config": + resp, err = handleExternalService(ctx, req, nodeQService.Config) + case "Status": + resp, err = handleExternalService(ctx, req, nodeQService.Status) + } + + return resp, err + } + + // Handle tx service + if strings.HasPrefix(req.Path, "/cosmos.tx.v1beta1.Service") { + // init simple client context + amino := codec.NewLegacyAmino() + std.RegisterLegacyAminoCodec(amino) + txConfig := authtx.NewTxConfig( + c.appCodec, + c.appCodec.InterfaceRegistry().SigningContext().AddressCodec(), + c.appCodec.InterfaceRegistry().SigningContext().ValidatorAddressCodec(), + authtx.DefaultSignModes, + ) + rpcClient, _ := client.NewClientFromNode(c.cfg.AppTomlConfig.Address) + + clientCtx := client.Context{}. + WithLegacyAmino(amino). + WithCodec(c.appCodec). + WithTxConfig(txConfig). + WithNodeURI(c.cfg.AppTomlConfig.Address). + WithClient(rpcClient) + + txService := txServer[T]{ + clientCtx: clientCtx, + txCodec: c.txCodec, + app: c.app, + consensus: c, + } + paths := strings.Split(req.Path, "/") + if len(paths) <= 2 { + return nil, fmt.Errorf("invalid request path: %s", req.Path) + } + + var resp transaction.Msg + var err error + switch paths[2] { + case "Simulate": + resp, err = handleExternalService(ctx, req, txService.Simulate) + case "GetTx": + resp, err = handleExternalService(ctx, req, txService.GetTx) + case "BroadcastTx": + return nil, errors.New("can't route a broadcast tx message") + case "GetTxsEvent": + resp, err = handleExternalService(ctx, req, txService.GetTxsEvent) + case "GetBlockWithTxs": + resp, err = handleExternalService(ctx, req, txService.GetBlockWithTxs) + case "TxDecode": + resp, err = handleExternalService(ctx, req, txService.TxDecode) + case "TxEncode": + resp, err = handleExternalService(ctx, req, txService.TxEncode) + case "TxEncodeAmino": + resp, err = handleExternalService(ctx, req, txService.TxEncodeAmino) + case "TxDecodeAmino": + resp, err = handleExternalService(ctx, req, txService.TxDecodeAmino) + } + + return resp, err + } + + return nil, nil +} + +func handleExternalService[T any, PT interface { + *T + proto.Message +}, + U any, UT interface { + *U + proto.Message + }]( + ctx context.Context, + rawReq *abciproto.QueryRequest, + handler func(ctx context.Context, msg PT) (UT, error), +) (transaction.Msg, error) { + req := PT(new(T)) + err := proto.Unmarshal(rawReq.Data, req) + if err != nil { + return nil, err + } + typedResp, err := handler(ctx, req) + if err != nil { + return nil, err + } + return typedResp, nil +} diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index 51c24285e440..8789b7830f94 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -23,6 +23,7 @@ import ( "github.com/spf13/pflag" "google.golang.org/grpc" + addresscodec "cosmossdk.io/core/address" appmodulev2 "cosmossdk.io/core/appmodule/v2" "cosmossdk.io/core/server" "cosmossdk.io/core/transaction" @@ -72,6 +73,7 @@ func New[T transaction.Tx]( app appmanager.AppManager[T], appCodec codec.Codec, txCodec transaction.Codec[T], + consensusAddressCodec addresscodec.Codec, queryHandlers map[string]appmodulev2.Handler, decoderResolver decoding.DecoderResolver, serverOptions ServerOptions[T], @@ -189,6 +191,8 @@ func New[T transaction.Tx]( getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry), addrPeerFilter: srv.serverOptions.AddrPeerFilter, idPeerFilter: srv.serverOptions.IdPeerFilter, + cfgMap: cfg, + consensusAddressCodec: consensusAddressCodec, } c.optimisticExec = oe.NewOptimisticExecution( diff --git a/simapp/v2/simdv2/cmd/commands.go b/simapp/v2/simdv2/cmd/commands.go index 28586e32c2de..4b8cb10ab0cd 100644 --- a/simapp/v2/simdv2/cmd/commands.go +++ b/simapp/v2/simdv2/cmd/commands.go @@ -117,6 +117,7 @@ func InitRootCmd[T transaction.Tx]( simApp.App.AppManager, simApp.AppCodec(), &client.DefaultTxDecoder[T]{TxConfig: deps.TxConfig}, + deps.ClientContext.ConsensusAddressCodec, simApp.App.QueryHandlers(), simApp.App.SchemaDecoderResolver(), initCometOptions[T](),