Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: prioritize protocol txs #189

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app

import (
"github.com/cosmos/cosmos-sdk/types/mempool"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -335,6 +336,14 @@ func New(

app.App = appBuilder.Build(db, traceStore, baseAppOptions...)

// Set the priority proposal handler
// We use a no-op mempool which means we rely on the CometBFT default transaction ordering (FIFO)
noOpMempool := mempool.NoOpMempool{}
app.App.BaseApp.SetMempool(noOpMempool)
defaultProposalHandler := baseapp.NewDefaultProposalHandler(noOpMempool, app.App.BaseApp)
proposalHandler := NewPriorityProposalHandler(defaultProposalHandler.PrepareProposalHandler(), app.txConfig.TxDecoder())
app.App.BaseApp.SetPrepareProposal(proposalHandler.PrepareProposal())

// Register legacy modules
app.registerIBCModules()

Expand Down
72 changes: 72 additions & 0 deletions app/proposal_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package app

import (
bundlestypes "github.com/KYVENetwork/chain/x/bundles/types"
abci "github.com/cometbft/cometbft/abci/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"reflect"
"slices"
)

type PriorityProposalHandler struct {
defaultHandler sdk.PrepareProposalHandler
txDecoder sdk.TxDecoder
}

func NewPriorityProposalHandler(defaultHandler sdk.PrepareProposalHandler, decoder sdk.TxDecoder) *PriorityProposalHandler {
return &PriorityProposalHandler{
defaultHandler: defaultHandler,
txDecoder: decoder,
}
}

var priorityTypes = []string{
reflect.TypeOf(bundlestypes.MsgSubmitBundleProposal{}).Name(),
reflect.TypeOf(bundlestypes.MsgVoteBundleProposal{}).Name(),
reflect.TypeOf(bundlestypes.MsgClaimUploaderRole{}).Name(),
reflect.TypeOf(bundlestypes.MsgSkipUploaderRole{}).Name(),
}
shifty11 marked this conversation as resolved.
Show resolved Hide resolved

// PrepareProposal returns a PrepareProposalHandler that separates transactions into different queues
// This function is only called by the block proposer and therefore does NOT need to be deterministic
func (h *PriorityProposalHandler) PrepareProposal() sdk.PrepareProposalHandler {
return func(ctx sdk.Context, req *abci.RequestPrepareProposal) (*abci.ResponsePrepareProposal, error) {
// Separate the transactions into different queues
// priorityQueue: transactions that should be executed before the default transactions
// defaultQueue: transactions that should be executed last

var priorityQueue [][]byte
var defaultQueue [][]byte

// Iterate through the transactions and separate them into different queues
for _, rawTx := range req.Txs {
tx, err := h.txDecoder(rawTx)
if err != nil {
return nil, err
}
msgs, err := tx.GetMsgsV2()
if err != nil {
return nil, err
}

// We only care about transactions with a single message
if len(msgs) == 1 {
msg := msgs[0]
msgType := string(msg.ProtoReflect().Type().Descriptor().Name())

if slices.Contains(priorityTypes, msgType) {
priorityQueue = append(priorityQueue, rawTx)
continue
}
}

// Otherwise, add the tx to the default queue
defaultQueue = append(defaultQueue, rawTx)
}

// Append the transactions in the correct order
req.Txs = append(priorityQueue, defaultQueue...)

return h.defaultHandler(ctx, req)
}
}
204 changes: 204 additions & 0 deletions interchaintest/proposal_handler/proposal_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package proposal_handler_test

import (
"context"
"cosmossdk.io/math"
bundlestypes "github.com/KYVENetwork/chain/x/bundles/types"
stakerstypes "github.com/KYVENetwork/chain/x/stakers/types"
sdkclient "github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/client/tx"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
"github.com/strangelove-ventures/interchaintest/v8"
"github.com/strangelove-ventures/interchaintest/v8/testutil"
"reflect"
"testing"

"github.com/strangelove-ventures/interchaintest/v8/chain/cosmos"
"go.uber.org/zap/zaptest"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

/*

TEST CASES - proposal_handler.go

* Execute multiple transactions and check their order
* Execute transactions that exceed max tx bytes

*/

func TestProposalHandler(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "interchaintest/ProposalHandler Test Suite")
}

var _ = Describe("proposal_handler.go", Ordered, func() {
var chain *cosmos.CosmosChain

var ctx context.Context
var interchain *interchaintest.Interchain

var broadcaster *cosmos.Broadcaster

BeforeAll(func() {
numFullNodes := 0
numValidators := 2
factory := interchaintest.NewBuiltinChainFactory(
zaptest.NewLogger(GinkgoT()),
[]*interchaintest.ChainSpec{mainnetChainSpec(numValidators, numFullNodes)},
)

chains, err := factory.Chains(GinkgoT().Name())
Expect(err).To(BeNil())
chain = chains[0].(*cosmos.CosmosChain)

interchain = interchaintest.NewInterchain().
AddChain(chain)

broadcaster = cosmos.NewBroadcaster(GinkgoT(), chain)
broadcaster.ConfigureClientContextOptions(func(clientContext sdkclient.Context) sdkclient.Context {
return clientContext.
WithBroadcastMode(flags.BroadcastAsync)
})
broadcaster.ConfigureFactoryOptions(func(factory tx.Factory) tx.Factory {
return factory.
WithGas(flags.DefaultGasLimit * 10)
})

ctx = context.Background()
client, network := interchaintest.DockerSetup(GinkgoT())

err = interchain.Build(ctx, nil, interchaintest.InterchainBuildOptions{
TestName: GinkgoT().Name(),
Client: client,
NetworkID: network,
SkipPathCreation: true,
})
Expect(err).To(BeNil())
})

AfterAll(func() {
_ = chain.StopAllNodes(ctx)
_ = interchain.Close()
})

It("Execute multiple transactions and check their order", func() {
// ARRANGE
var wallets []*cosmos.CosmosWallet
for i := 0; i < 8; i++ {
wallets = append(wallets, interchaintest.GetAndFundTestUsers(
GinkgoT(), ctx, GinkgoT().Name(), math.NewInt(10_000_000_000), chain,
)[0].(*cosmos.CosmosWallet))
}

err := testutil.WaitForBlocks(ctx, 1, chain)
Expect(err).To(BeNil())

height, err := chain.Height(ctx)
Expect(err).To(BeNil())

// ACT

// Execute different transactions
// We don't care about the results, they only have to be included in a block
broadcastMsgs(ctx, broadcaster, wallets[0], &banktypes.MsgSend{FromAddress: wallets[0].FormattedAddress()})
broadcastMsgs(ctx, broadcaster, wallets[1], &stakerstypes.MsgCreateStaker{Creator: wallets[1].FormattedAddress()})
broadcastMsgs(ctx, broadcaster, wallets[2], &bundlestypes.MsgClaimUploaderRole{Creator: wallets[2].FormattedAddress()}) // priority msg
broadcastMsgs(ctx, broadcaster, wallets[3], &stakerstypes.MsgJoinPool{Creator: wallets[3].FormattedAddress(), Valaddress: wallets[0].FormattedAddress(), PoolId: 0})
broadcastMsgs(ctx, broadcaster, wallets[4], &banktypes.MsgSend{FromAddress: wallets[4].FormattedAddress()})
broadcastMsgs(ctx, broadcaster, wallets[5], &bundlestypes.MsgVoteBundleProposal{Creator: wallets[5].FormattedAddress()}) // priority msg
broadcastMsgs(ctx, broadcaster, wallets[6], &bundlestypes.MsgSkipUploaderRole{Creator: wallets[6].FormattedAddress()}) // priority msg
broadcastMsgs(ctx, broadcaster, wallets[7], &bundlestypes.MsgSubmitBundleProposal{Creator: wallets[7].FormattedAddress()}) // priority msg

expectedOrder := []string{
// priority msgs
reflect.TypeOf(bundlestypes.MsgClaimUploaderRole{}).Name(),
reflect.TypeOf(bundlestypes.MsgVoteBundleProposal{}).Name(),
reflect.TypeOf(bundlestypes.MsgSkipUploaderRole{}).Name(),
reflect.TypeOf(bundlestypes.MsgSubmitBundleProposal{}).Name(),
// default msgs
reflect.TypeOf(banktypes.MsgSend{}).Name(),
reflect.TypeOf(stakerstypes.MsgCreateStaker{}).Name(),
reflect.TypeOf(stakerstypes.MsgJoinPool{}).Name(),
reflect.TypeOf(banktypes.MsgSend{}).Name(),
}

afterHeight, err := chain.Height(ctx)
Expect(err).To(BeNil())
Expect(afterHeight).To(Equal(height))

// Wait for the transactions to be included in a block
err = testutil.WaitForBlocks(ctx, 2, chain)
Expect(err).To(BeNil())

// ASSERT

// Check the order of the transactions
checkTxsOrder(ctx, chain, height+1, expectedOrder)
})

It("Execute transactions that exceed max tx bytes", func() {
// ARRANGE
var wallets []*cosmos.CosmosWallet
for i := 0; i < 6; i++ {
wallets = append(wallets, interchaintest.GetAndFundTestUsers(
GinkgoT(), ctx, GinkgoT().Name(), math.NewInt(10_000_000_000), chain,
)[0].(*cosmos.CosmosWallet))
}
err := testutil.WaitForBlocks(ctx, 1, chain)
Expect(err).To(BeNil())

height, err := chain.Height(ctx)
Expect(err).To(BeNil())

// ACT
const duplications = 40
broadcastMsgs(ctx, broadcaster, wallets[0], &stakerstypes.MsgCreateStaker{Creator: wallets[0].FormattedAddress()})
broadcastMsgs(ctx, broadcaster, wallets[1], duplicateMsg(&banktypes.MsgSend{FromAddress: wallets[1].FormattedAddress()}, duplications)...)
broadcastMsgs(ctx, broadcaster, wallets[2], &bundlestypes.MsgSkipUploaderRole{Creator: wallets[2].FormattedAddress()}) // priority msg

// this will not make it into the actual block, so it goes into the next one with all following msgs
broadcastMsgs(ctx, broadcaster, wallets[3], duplicateMsg(&banktypes.MsgSend{FromAddress: wallets[3].FormattedAddress()}, duplications)...)
broadcastMsgs(ctx, broadcaster, wallets[4], &stakerstypes.MsgJoinPool{Creator: wallets[4].FormattedAddress(), Valaddress: wallets[0].FormattedAddress(), PoolId: 0})
broadcastMsgs(ctx, broadcaster, wallets[5], &bundlestypes.MsgVoteBundleProposal{Creator: wallets[5].FormattedAddress()}) // priority msg

afterHeight, err := chain.Height(ctx)
Expect(err).To(BeNil())
Expect(afterHeight).To(Equal(height))

// Wait for the transactions to be included in a block
err = testutil.WaitForBlocks(ctx, 2, chain)
Expect(err).To(BeNil())

// ASSERT
var msgTypes []string
for i := 0; i < duplications; i++ {
msgTypes = append(msgTypes, reflect.TypeOf(banktypes.MsgSend{}).Name())
}

expectedOrder1 := append(
[]string{
reflect.TypeOf(bundlestypes.MsgSkipUploaderRole{}).Name(), // priority msg
reflect.TypeOf(stakerstypes.MsgCreateStaker{}).Name(),
},
msgTypes...,
)
expectedOrder2 := append(
[]string{
reflect.TypeOf(bundlestypes.MsgVoteBundleProposal{}).Name(), // priority msg
},
msgTypes...,
)
expectedOrder2 = append(expectedOrder2,
reflect.TypeOf(stakerstypes.MsgJoinPool{}).Name(),
)

// Check that only the first block contains the first transactions
checkTxsOrder(ctx, chain, height+1, expectedOrder1)
// The second block should contain the rest of the transactions
checkTxsOrder(ctx, chain, height+2, expectedOrder2)
})
})
Loading
Loading