Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client/v2): broadcast logic (backport #22282) #22415

Merged
merged 6 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/v2/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [#18461](https://github.com/cosmos/cosmos-sdk/pull/18461) Support governance proposals.
* [#20623](https://github.com/cosmos/cosmos-sdk/pull/20623) Introduce client/v2 tx factory.
* [#20623](https://github.com/cosmos/cosmos-sdk/pull/20623) Extend client/v2 keyring interface with `KeyType` and `KeyInfo`.
* [#22282](https://github.com/cosmos/cosmos-sdk/pull/22282) Added custom broadcast logic.

### Improvements

Expand Down
15 changes: 15 additions & 0 deletions client/v2/broadcast/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package broadcast

import "context"

// Broadcaster defines an interface for broadcasting transactions to the consensus engine.
type Broadcaster interface {
// Broadcast sends a transaction to the network and returns the result.
//
// It returns a byte slice containing the formatted result that will be
// passed to the output writer, and an error if the broadcast failed.
Broadcast(ctx context.Context, txBytes []byte) ([]byte, error)

// Consensus returns the consensus engine identifier for this Broadcaster.
Consensus() string
}
197 changes: 197 additions & 0 deletions client/v2/broadcast/comet/comet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package comet

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/cometbft/cometbft/mempool"
rpcclient "github.com/cometbft/cometbft/rpc/client"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
cmttypes "github.com/cometbft/cometbft/types"

apiacbci "cosmossdk.io/api/cosmos/base/abci/v1beta1"
"cosmossdk.io/client/v2/broadcast"

"github.com/cosmos/cosmos-sdk/codec"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

const (
// BroadcastSync defines a tx broadcasting mode where the client waits for
// a CheckTx execution response only.
BroadcastSync = "sync"
// BroadcastAsync defines a tx broadcasting mode where the client returns
// immediately.
BroadcastAsync = "async"

// cometBftConsensus is the identifier for the CometBFT consensus engine.
cometBFTConsensus = "comet"
)

// CometRPC defines the interface of a CometBFT RPC client needed for
// queries and transaction handling.
type CometRPC interface {
rpcclient.ABCIClient

Validators(ctx context.Context, height *int64, page, perPage *int) (*coretypes.ResultValidators, error)
Status(context.Context) (*coretypes.ResultStatus, error)
Block(ctx context.Context, height *int64) (*coretypes.ResultBlock, error)
BlockByHash(ctx context.Context, hash []byte) (*coretypes.ResultBlock, error)
BlockResults(ctx context.Context, height *int64) (*coretypes.ResultBlockResults, error)
BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*coretypes.ResultBlockchainInfo, error)
Commit(ctx context.Context, height *int64) (*coretypes.ResultCommit, error)
Tx(ctx context.Context, hash []byte, prove bool) (*coretypes.ResultTx, error)
TxSearch(
ctx context.Context,
query string,
prove bool,
page, perPage *int,
orderBy string,
) (*coretypes.ResultTxSearch, error)
BlockSearch(
ctx context.Context,
query string,
page, perPage *int,
orderBy string,
) (*coretypes.ResultBlockSearch, error)
}

var _ broadcast.Broadcaster = &CometBFTBroadcaster{}

// CometBFTBroadcaster implements the Broadcaster interface for CometBFT consensus engine.
type CometBFTBroadcaster struct {
rpcClient CometRPC
mode string
cdc codec.JSONCodec
}

// NewCometBFTBroadcaster creates a new CometBFTBroadcaster.
func NewCometBFTBroadcaster(rpcURL, mode string, cdc codec.JSONCodec) (*CometBFTBroadcaster, error) {
if cdc == nil {
return nil, errors.New("codec can't be nil")
}

if mode == "" {
mode = BroadcastSync
}

rpcClient, err := rpchttp.New(rpcURL)
if err != nil {
return nil, fmt.Errorf("failed to create CometBft RPC client: %w", err)
}

return &CometBFTBroadcaster{
rpcClient: rpcClient,
mode: mode,
cdc: cdc,
}, nil
}

// Consensus returns the consensus engine name used by the broadcaster.
// It always returns "comet" for CometBFTBroadcaster.
func (c *CometBFTBroadcaster) Consensus() string {
return cometBFTConsensus
}

// Broadcast sends a transaction to the network and returns the result.
// returns a byte slice containing the JSON-encoded result and an error if the broadcast failed.
func (c *CometBFTBroadcaster) Broadcast(ctx context.Context, txBytes []byte) ([]byte, error) {
if c.cdc == nil {
return []byte{}, fmt.Errorf("JSON codec is not initialized")
}

var broadcastFunc func(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error)
switch c.mode {
case BroadcastSync:
broadcastFunc = c.rpcClient.BroadcastTxSync
case BroadcastAsync:
broadcastFunc = c.rpcClient.BroadcastTxAsync
default:
return []byte{}, fmt.Errorf("unknown broadcast mode: %s", c.mode)
}

res, err := c.broadcast(ctx, txBytes, broadcastFunc)
if err != nil {
return []byte{}, err
}

return c.cdc.MarshalJSON(res)
}

// broadcast sends a transaction to the CometBFT network using the provided function.
func (c *CometBFTBroadcaster) broadcast(ctx context.Context, txBytes []byte,
fn func(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error),
) (*apiacbci.TxResponse, error) {
bResult, err := fn(ctx, txBytes)
if errRes := checkCometError(err, txBytes); errRes != nil {
return errRes, nil
}

return newResponseFormatBroadcastTx(bResult), err
}

// checkCometError checks for errors returned by the CometBFT network and returns an appropriate TxResponse.
// It extracts error information and constructs a TxResponse with the error details.
func checkCometError(err error, tx cmttypes.Tx) *apiacbci.TxResponse {
if err == nil {
return nil
}

errStr := strings.ToLower(err.Error())
txHash := fmt.Sprintf("%X", tx.Hash())

switch {
case strings.Contains(errStr, strings.ToLower(mempool.ErrTxInCache.Error())):
return &apiacbci.TxResponse{
Code: sdkerrors.ErrTxInMempoolCache.ABCICode(),
Codespace: sdkerrors.ErrTxInMempoolCache.Codespace(),
Txhash: txHash,
}

case strings.Contains(errStr, "mempool is full"):
return &apiacbci.TxResponse{
Code: sdkerrors.ErrMempoolIsFull.ABCICode(),
Codespace: sdkerrors.ErrMempoolIsFull.Codespace(),
Txhash: txHash,
}

case strings.Contains(errStr, "tx too large"):
return &apiacbci.TxResponse{
Code: sdkerrors.ErrTxTooLarge.ABCICode(),
Codespace: sdkerrors.ErrTxTooLarge.Codespace(),
Txhash: txHash,
}

default:
return nil
}
}

// newResponseFormatBroadcastTx returns a TxResponse given a ResultBroadcastTx from cometbft
func newResponseFormatBroadcastTx(res *coretypes.ResultBroadcastTx) *apiacbci.TxResponse {
if res == nil {
return nil
}

parsedLogs, _ := parseABCILogs(res.Log)

return &apiacbci.TxResponse{
Code: res.Code,
Codespace: res.Codespace,
Data: res.Data.String(),
RawLog: res.Log,
Logs: parsedLogs,
Txhash: res.Hash.String(),
}
}

// parseABCILogs attempts to parse a stringified ABCI tx log into a slice of
// ABCIMessageLog types. It returns an error upon JSON decoding failure.
func parseABCILogs(logs string) (res []*apiacbci.ABCIMessageLog, err error) {
err = json.Unmarshal([]byte(logs), &res)
return res, err
}
149 changes: 149 additions & 0 deletions client/v2/broadcast/comet/comet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package comet

import (
"context"
"errors"
"testing"

"github.com/cometbft/cometbft/mempool"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

apiacbci "cosmossdk.io/api/cosmos/base/abci/v1beta1"
mockrpc "cosmossdk.io/client/v2/broadcast/comet/testutil"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/codec/testutil"
)

var cdc = testutil.CodecOptions{}.NewCodec()

func TestNewCometBftBroadcaster(t *testing.T) {
tests := []struct {
name string
cdc codec.JSONCodec
mode string
want *CometBFTBroadcaster
wantErr bool
}{
{
name: "constructor",
mode: BroadcastSync,
cdc: cdc,
want: &CometBFTBroadcaster{
mode: BroadcastSync,
cdc: cdc,
},
},
{
name: "nil codec",
mode: BroadcastSync,
cdc: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewCometBFTBroadcaster("localhost:26657", tt.mode, tt.cdc)
if tt.wantErr {
require.Error(t, err)
require.Nil(t, got)
} else {
require.Equal(t, got.mode, tt.want.mode)
require.Equal(t, got.cdc, tt.want.cdc)
}
})
}
}

func TestCometBftBroadcaster_Broadcast(t *testing.T) {
ctrl := gomock.NewController(t)
cometMock := mockrpc.NewMockCometRPC(ctrl)
c := CometBFTBroadcaster{
rpcClient: cometMock,
mode: BroadcastSync,
cdc: cdc,
}
tests := []struct {
name string
mode string
setupMock func(*mockrpc.MockCometRPC)
wantErr bool
}{
{
name: "sync",
mode: BroadcastSync,
setupMock: func(m *mockrpc.MockCometRPC) {
m.EXPECT().BroadcastTxSync(context.Background(), gomock.Any()).Return(&coretypes.ResultBroadcastTx{
Code: 0,
Data: []byte{},
Log: "",
Codespace: "",
Hash: []byte("%�����\u0010\n�T�\u0017\u0016�N^H[5�\u0006}�n�w�/Vi� "),
}, nil)
},
},
{
name: "async",
mode: BroadcastAsync,
setupMock: func(m *mockrpc.MockCometRPC) {
m.EXPECT().BroadcastTxAsync(context.Background(), gomock.Any()).Return(&coretypes.ResultBroadcastTx{
Code: 0,
Data: []byte{},
Log: "",
Codespace: "",
Hash: []byte("%�����\u0010\n�T�\u0017\u0016�N^H[5�\u0006}�n�w�/Vi� "),
}, nil)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c.mode = tt.mode
tt.setupMock(cometMock)
got, err := c.Broadcast(context.Background(), []byte{})
if tt.wantErr {
require.Error(t, err)
} else {
require.NotNil(t, got)
}
})
}
}

func Test_checkCometError(t *testing.T) {
tests := []struct {
name string
err error
want *apiacbci.TxResponse
}{
{
name: "tx already in cache",
err: errors.New("tx already exists in cache"),
want: &apiacbci.TxResponse{
Code: 19,
},
},
{
name: "mempool is full",
err: mempool.ErrMempoolIsFull{},
want: &apiacbci.TxResponse{
Code: 20,
},
},
{
name: "tx too large",
err: mempool.ErrTxTooLarge{},
want: &apiacbci.TxResponse{
Code: 21,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := checkCometError(tt.err, []byte{})
require.Equal(t, got.Code, tt.want.Code)
})
}
}
Loading
Loading