Skip to content

Commit

Permalink
feat(client/v2): broadcast logic (#22282)
Browse files Browse the repository at this point in the history
  • Loading branch information
JulianToledano authored Oct 29, 2024
1 parent 31f97e9 commit 78cfc68
Show file tree
Hide file tree
Showing 14 changed files with 688 additions and 20 deletions.
1 change: 1 addition & 0 deletions client/v2/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [#18461](https://github.com/cosmos/cosmos-sdk/pull/18461) Support governance proposals.
* [#20623](https://github.com/cosmos/cosmos-sdk/pull/20623) Introduce client/v2 tx factory.
* [#20623](https://github.com/cosmos/cosmos-sdk/pull/20623) Extend client/v2 keyring interface with `KeyType` and `KeyInfo`.
* [#22282](https://github.com/cosmos/cosmos-sdk/pull/22282) Added custom broadcast logic.

### Improvements

Expand Down
15 changes: 15 additions & 0 deletions client/v2/broadcast/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package broadcast

import "context"

// Broadcaster defines an interface for broadcasting transactions to the consensus engine.
type Broadcaster interface {
// Broadcast sends a transaction to the network and returns the result.
//
// It returns a byte slice containing the formatted result that will be
// passed to the output writer, and an error if the broadcast failed.
Broadcast(ctx context.Context, txBytes []byte) ([]byte, error)

// Consensus returns the consensus engine identifier for this Broadcaster.
Consensus() string
}
197 changes: 197 additions & 0 deletions client/v2/broadcast/comet/comet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package comet

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/cometbft/cometbft/mempool"
rpcclient "github.com/cometbft/cometbft/rpc/client"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
cmttypes "github.com/cometbft/cometbft/types"

apiacbci "cosmossdk.io/api/cosmos/base/abci/v1beta1"
"cosmossdk.io/client/v2/broadcast"

"github.com/cosmos/cosmos-sdk/codec"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

const (
// BroadcastSync defines a tx broadcasting mode where the client waits for
// a CheckTx execution response only.
BroadcastSync = "sync"
// BroadcastAsync defines a tx broadcasting mode where the client returns
// immediately.
BroadcastAsync = "async"

// cometBftConsensus is the identifier for the CometBFT consensus engine.
cometBFTConsensus = "comet"
)

// CometRPC defines the interface of a CometBFT RPC client needed for
// queries and transaction handling.
type CometRPC interface {
rpcclient.ABCIClient

Validators(ctx context.Context, height *int64, page, perPage *int) (*coretypes.ResultValidators, error)
Status(context.Context) (*coretypes.ResultStatus, error)
Block(ctx context.Context, height *int64) (*coretypes.ResultBlock, error)
BlockByHash(ctx context.Context, hash []byte) (*coretypes.ResultBlock, error)
BlockResults(ctx context.Context, height *int64) (*coretypes.ResultBlockResults, error)
BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*coretypes.ResultBlockchainInfo, error)
Commit(ctx context.Context, height *int64) (*coretypes.ResultCommit, error)
Tx(ctx context.Context, hash []byte, prove bool) (*coretypes.ResultTx, error)
TxSearch(
ctx context.Context,
query string,
prove bool,
page, perPage *int,
orderBy string,
) (*coretypes.ResultTxSearch, error)
BlockSearch(
ctx context.Context,
query string,
page, perPage *int,
orderBy string,
) (*coretypes.ResultBlockSearch, error)
}

var _ broadcast.Broadcaster = &CometBFTBroadcaster{}

// CometBFTBroadcaster implements the Broadcaster interface for CometBFT consensus engine.
type CometBFTBroadcaster struct {
rpcClient CometRPC
mode string
cdc codec.JSONCodec
}

// NewCometBFTBroadcaster creates a new CometBFTBroadcaster.
func NewCometBFTBroadcaster(rpcURL, mode string, cdc codec.JSONCodec) (*CometBFTBroadcaster, error) {
if cdc == nil {
return nil, errors.New("codec can't be nil")
}

if mode == "" {
mode = BroadcastSync
}

rpcClient, err := rpchttp.New(rpcURL)
if err != nil {
return nil, fmt.Errorf("failed to create CometBft RPC client: %w", err)
}

return &CometBFTBroadcaster{
rpcClient: rpcClient,
mode: mode,
cdc: cdc,
}, nil
}

// Consensus returns the consensus engine name used by the broadcaster.
// It always returns "comet" for CometBFTBroadcaster.
func (c *CometBFTBroadcaster) Consensus() string {
return cometBFTConsensus
}

// Broadcast sends a transaction to the network and returns the result.
// returns a byte slice containing the JSON-encoded result and an error if the broadcast failed.
func (c *CometBFTBroadcaster) Broadcast(ctx context.Context, txBytes []byte) ([]byte, error) {
if c.cdc == nil {
return []byte{}, fmt.Errorf("JSON codec is not initialized")
}

var broadcastFunc func(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error)
switch c.mode {
case BroadcastSync:
broadcastFunc = c.rpcClient.BroadcastTxSync
case BroadcastAsync:
broadcastFunc = c.rpcClient.BroadcastTxAsync
default:
return []byte{}, fmt.Errorf("unknown broadcast mode: %s", c.mode)
}

res, err := c.broadcast(ctx, txBytes, broadcastFunc)
if err != nil {
return []byte{}, err
}

return c.cdc.MarshalJSON(res)
}

// broadcast sends a transaction to the CometBFT network using the provided function.
func (c *CometBFTBroadcaster) broadcast(ctx context.Context, txBytes []byte,
fn func(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error),
) (*apiacbci.TxResponse, error) {
bResult, err := fn(ctx, txBytes)
if errRes := checkCometError(err, txBytes); errRes != nil {
return errRes, nil
}

return newResponseFormatBroadcastTx(bResult), err
}

// checkCometError checks for errors returned by the CometBFT network and returns an appropriate TxResponse.
// It extracts error information and constructs a TxResponse with the error details.
func checkCometError(err error, tx cmttypes.Tx) *apiacbci.TxResponse {
if err == nil {
return nil
}

errStr := strings.ToLower(err.Error())
txHash := fmt.Sprintf("%X", tx.Hash())

switch {
case strings.Contains(errStr, strings.ToLower(mempool.ErrTxInCache.Error())):
return &apiacbci.TxResponse{
Code: sdkerrors.ErrTxInMempoolCache.ABCICode(),
Codespace: sdkerrors.ErrTxInMempoolCache.Codespace(),
Txhash: txHash,
}

case strings.Contains(errStr, "mempool is full"):
return &apiacbci.TxResponse{
Code: sdkerrors.ErrMempoolIsFull.ABCICode(),
Codespace: sdkerrors.ErrMempoolIsFull.Codespace(),
Txhash: txHash,
}

case strings.Contains(errStr, "tx too large"):
return &apiacbci.TxResponse{
Code: sdkerrors.ErrTxTooLarge.ABCICode(),
Codespace: sdkerrors.ErrTxTooLarge.Codespace(),
Txhash: txHash,
}

default:
return nil
}
}

// newResponseFormatBroadcastTx returns a TxResponse given a ResultBroadcastTx from cometbft
func newResponseFormatBroadcastTx(res *coretypes.ResultBroadcastTx) *apiacbci.TxResponse {
if res == nil {
return nil
}

parsedLogs, _ := parseABCILogs(res.Log)

return &apiacbci.TxResponse{
Code: res.Code,
Codespace: res.Codespace,
Data: res.Data.String(),
RawLog: res.Log,
Logs: parsedLogs,
Txhash: res.Hash.String(),
}
}

// parseABCILogs attempts to parse a stringified ABCI tx log into a slice of
// ABCIMessageLog types. It returns an error upon JSON decoding failure.
func parseABCILogs(logs string) (res []*apiacbci.ABCIMessageLog, err error) {
err = json.Unmarshal([]byte(logs), &res)
return res, err
}
149 changes: 149 additions & 0 deletions client/v2/broadcast/comet/comet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package comet

import (
"context"
"errors"
"testing"

"github.com/cometbft/cometbft/mempool"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

apiacbci "cosmossdk.io/api/cosmos/base/abci/v1beta1"
mockrpc "cosmossdk.io/client/v2/broadcast/comet/testutil"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/codec/testutil"
)

var cdc = testutil.CodecOptions{}.NewCodec()

func TestNewCometBftBroadcaster(t *testing.T) {
tests := []struct {
name string
cdc codec.JSONCodec
mode string
want *CometBFTBroadcaster
wantErr bool
}{
{
name: "constructor",
mode: BroadcastSync,
cdc: cdc,
want: &CometBFTBroadcaster{
mode: BroadcastSync,
cdc: cdc,
},
},
{
name: "nil codec",
mode: BroadcastSync,
cdc: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewCometBFTBroadcaster("localhost:26657", tt.mode, tt.cdc)
if tt.wantErr {
require.Error(t, err)
require.Nil(t, got)
} else {
require.Equal(t, got.mode, tt.want.mode)
require.Equal(t, got.cdc, tt.want.cdc)
}
})
}
}

func TestCometBftBroadcaster_Broadcast(t *testing.T) {
ctrl := gomock.NewController(t)
cometMock := mockrpc.NewMockCometRPC(ctrl)
c := CometBFTBroadcaster{
rpcClient: cometMock,
mode: BroadcastSync,
cdc: cdc,
}
tests := []struct {
name string
mode string
setupMock func(*mockrpc.MockCometRPC)
wantErr bool
}{
{
name: "sync",
mode: BroadcastSync,
setupMock: func(m *mockrpc.MockCometRPC) {
m.EXPECT().BroadcastTxSync(context.Background(), gomock.Any()).Return(&coretypes.ResultBroadcastTx{
Code: 0,
Data: []byte{},
Log: "",
Codespace: "",
Hash: []byte("%�����\u0010\n�T�\u0017\u0016�N^H[5�\u0006}�n�w�/Vi� "),
}, nil)
},
},
{
name: "async",
mode: BroadcastAsync,
setupMock: func(m *mockrpc.MockCometRPC) {
m.EXPECT().BroadcastTxAsync(context.Background(), gomock.Any()).Return(&coretypes.ResultBroadcastTx{
Code: 0,
Data: []byte{},
Log: "",
Codespace: "",
Hash: []byte("%�����\u0010\n�T�\u0017\u0016�N^H[5�\u0006}�n�w�/Vi� "),
}, nil)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c.mode = tt.mode
tt.setupMock(cometMock)
got, err := c.Broadcast(context.Background(), []byte{})
if tt.wantErr {
require.Error(t, err)
} else {
require.NotNil(t, got)
}
})
}
}

func Test_checkCometError(t *testing.T) {
tests := []struct {
name string
err error
want *apiacbci.TxResponse
}{
{
name: "tx already in cache",
err: errors.New("tx already exists in cache"),
want: &apiacbci.TxResponse{
Code: 19,
},
},
{
name: "mempool is full",
err: mempool.ErrMempoolIsFull{},
want: &apiacbci.TxResponse{
Code: 20,
},
},
{
name: "tx too large",
err: mempool.ErrTxTooLarge{},
want: &apiacbci.TxResponse{
Code: 21,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := checkCometError(tt.err, []byte{})
require.Equal(t, got.Code, tt.want.Code)
})
}
}
Loading

0 comments on commit 78cfc68

Please sign in to comment.