Skip to content

Commit

Permalink
Remove AddSingularBatch from ChannelOut interface (prefer AddBlock) (e…
Browse files Browse the repository at this point in the history
  • Loading branch information
mdehoog authored Oct 7, 2024
1 parent 8917511 commit d062c1c
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 81 deletions.
14 changes: 5 additions & 9 deletions op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,20 +178,16 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro
return nil, c.FullErr()
}

batch, l1info, err := derive.BlockToSingularBatch(c.rollupCfg, block)
if err != nil {
return l1info, fmt.Errorf("converting block to batch: %w", err)
}

if err = c.co.AddSingularBatch(batch, l1info.SequenceNumber); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.ErrCompressorFull) {
l1info, err := c.co.AddBlock(c.rollupCfg, block)
if errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.ErrCompressorFull) {
c.setFullErr(err)
return l1info, c.FullErr()
} else if err != nil {
return l1info, fmt.Errorf("adding block to channel out: %w", err)
}

c.blocks = append(c.blocks, block)
c.updateSwTimeout(batch)
c.updateSwTimeout(l1info.Number)

if l1info.Number > c.latestL1Origin.Number {
c.latestL1Origin = eth.BlockID{
Expand Down Expand Up @@ -252,8 +248,8 @@ func (c *ChannelBuilder) updateDurationTimeout(l1BlockNum uint64) {
// derived from the batch's origin L1 block. The timeout is only moved forward
// if the derived sequencer window timeout is earlier than the currently set
// timeout.
func (c *ChannelBuilder) updateSwTimeout(batch *derive.SingularBatch) {
timeout := uint64(batch.EpochNum) + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin
func (c *ChannelBuilder) updateSwTimeout(l1InfoNumber uint64) {
timeout := l1InfoNumber + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin
c.updateTimeout(timeout, ErrSeqWindowClose)
}

Expand Down
4 changes: 2 additions & 2 deletions op-batcher/batcher/channel_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func FuzzSeqWindowClose(f *testing.F) {

// Check the timeout
cb.timeout = timeout
cb.updateSwTimeout(&derive.SingularBatch{EpochNum: rollup.Epoch(epochNum)})
cb.updateSwTimeout(epochNum)
calculatedTimeout := epochNum + seqWindowSize - subSafetyMargin
if timeout > calculatedTimeout && calculatedTimeout != 0 {
cb.CheckTimeout(calculatedTimeout)
Expand Down Expand Up @@ -278,7 +278,7 @@ func FuzzSeqWindowZeroTimeoutClose(f *testing.F) {

// Check the timeout
cb.timeout = 0
cb.updateSwTimeout(&derive.SingularBatch{EpochNum: rollup.Epoch(epochNum)})
cb.updateSwTimeout(epochNum)
calculatedTimeout := epochNum + seqWindowSize - subSafetyMargin
cb.CheckTimeout(calculatedTimeout)
if cb.timeout != 0 {
Expand Down
26 changes: 13 additions & 13 deletions op-e2e/actions/helpers/garbage_channel_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type Writer interface {
type ChannelOutIface interface {
ID() derive.ChannelID
Reset() error
AddBlock(rollupCfg *rollup.Config, block *types.Block) error
AddBlock(rollupCfg *rollup.Config, block *types.Block) (*derive.L1BlockInfo, error)
ReadyBytes() int
Flush() error
Close() error
Expand Down Expand Up @@ -157,19 +157,19 @@ func (co *GarbageChannelOut) Reset() error {
// error that it returns is ErrTooManyRLPBytes. If this error
// is returned, the channel should be closed and a new one
// should be made.
func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) error {
func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) (*derive.L1BlockInfo, error) {
if co.closed {
return errors.New("already closed")
return nil, errors.New("already closed")
}
batch, err := blockToBatch(rollupCfg, block)
batch, l1Info, err := blockToBatch(rollupCfg, block)
if err != nil {
return err
return nil, err
}
// We encode to a temporary buffer to determine the encoded length to
// ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL
var buf bytes.Buffer
if err := rlp.Encode(&buf, batch); err != nil {
return err
return nil, err
}
if co.cfg.MalformRLP {
// Malform the RLP by incrementing the length prefix by 1.
Expand All @@ -182,13 +182,13 @@ func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Blo
chainSpec := rollup.NewChainSpec(rollupCfg)
maxRLPBytesPerChannel := chainSpec.MaxRLPBytesPerChannel(block.Time())
if co.rlpLength+buf.Len() > int(maxRLPBytesPerChannel) {
return fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w",
return nil, fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w",
buf.Len(), co.rlpLength, maxRLPBytesPerChannel, derive.ErrTooManyRLPBytes)
}
co.rlpLength += buf.Len()

_, err = io.Copy(co.compress, &buf)
return err
return l1Info, err
}

// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame.
Expand Down Expand Up @@ -256,25 +256,25 @@ func (co *GarbageChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint1
}

// blockToBatch transforms a block into a batch object that can easily be RLP encoded.
func blockToBatch(rollupCfg *rollup.Config, block *types.Block) (*derive.BatchData, error) {
func blockToBatch(rollupCfg *rollup.Config, block *types.Block) (*derive.BatchData, *derive.L1BlockInfo, error) {
opaqueTxs := make([]hexutil.Bytes, 0, len(block.Transactions()))
for i, tx := range block.Transactions() {
if tx.Type() == types.DepositTxType {
continue
}
otx, err := tx.MarshalBinary()
if err != nil {
return nil, fmt.Errorf("could not encode tx %v in block %v: %w", i, tx.Hash(), err)
return nil, nil, fmt.Errorf("could not encode tx %v in block %v: %w", i, tx.Hash(), err)
}
opaqueTxs = append(opaqueTxs, otx)
}
l1InfoTx := block.Transactions()[0]
if l1InfoTx.Type() != types.DepositTxType {
return nil, derive.ErrNotDepositTx
return nil, nil, derive.ErrNotDepositTx
}
l1Info, err := derive.L1BlockInfoFromBytes(rollupCfg, block.Time(), l1InfoTx.Data())
if err != nil {
return nil, fmt.Errorf("could not parse the L1 Info deposit: %w", err)
return nil, nil, fmt.Errorf("could not parse the L1 Info deposit: %w", err)
}

singularBatch := &derive.SingularBatch{
Expand All @@ -285,5 +285,5 @@ func blockToBatch(rollupCfg *rollup.Config, block *types.Block) (*derive.BatchDa
Transactions: opaqueTxs,
}

return derive.NewBatchData(singularBatch), nil
return derive.NewBatchData(singularBatch), l1Info, nil
}
2 changes: 1 addition & 1 deletion op-e2e/actions/helpers/l2_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (s *L2Batcher) Buffer(t Testing, opts ...BlockModifier) error {
require.NoError(t, err, "failed to create channel")
s.L2ChannelOut = ch
}
if err := s.L2ChannelOut.AddBlock(s.rollupCfg, block); err != nil {
if _, err := s.L2ChannelOut.AddBlock(s.rollupCfg, block); err != nil {
return err
}
ref, err := s.engCl.L2BlockRefByHash(t.Ctx(), block.Hash())
Expand Down
10 changes: 5 additions & 5 deletions op-e2e/actions/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func TestBackupUnsafe(gt *testing.T) {
block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], invalidTx}})
}
// Add A1, B2, B3, B4, B5 into the channel
err = channelOut.AddBlock(sd.RollupCfg, block)
_, err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err)
}

Expand Down Expand Up @@ -414,7 +414,7 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) {
block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], invalidTx}})
}
// Add A1, B2, B3, B4, B5 into the channel
err = channelOut.AddBlock(sd.RollupCfg, block)
_, err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err)
}

Expand Down Expand Up @@ -547,7 +547,7 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) {
block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], invalidTx}})
}
// Add A1, B2, B3, B4, B5 into the channel
err = channelOut.AddBlock(sd.RollupCfg, block)
_, err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err)
}

Expand Down Expand Up @@ -924,7 +924,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) {
block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], invalidTx}})
}
// Add A1 ~ A12 into the channel
err = channelOut.AddBlock(sd.RollupCfg, block)
_, err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err)
}

Expand Down Expand Up @@ -973,7 +973,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) {
block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], tx}})
}
// Add B1, A2 ~ A12 into the channel
err = channelOut.AddBlock(sd.RollupCfg, block)
_, err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err)
}
// Submit span batch(B1, A2, ... A12)
Expand Down
76 changes: 59 additions & 17 deletions op-node/benchmarks/batchbuilding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -101,6 +104,40 @@ func channelOutByType(b *testing.B, batchType uint, cd compressorDetails) (deriv
return nil, fmt.Errorf("unsupported batch type: %d", batchType)
}

func randomBlock(cfg *rollup.Config, rng *rand.Rand, txCount int, timestamp uint64) (*types.Block, error) {
batch := derive.RandomSingularBatch(rng, txCount, cfg.L2ChainID)
batch.Timestamp = timestamp
return singularBatchToBlock(cfg, batch)
}

// singularBatchToBlock converts a singular batch to a block for use in the benchmarks. This function
// should only be used for testing purposes, as the batch input doesn't contain the necessary information
// to build the full block (only non-deposit transactions and a subset of header fields are populated).
func singularBatchToBlock(rollupCfg *rollup.Config, batch *derive.SingularBatch) (*types.Block, error) {
l1InfoTx, err := derive.L1InfoDeposit(rollupCfg, eth.SystemConfig{}, 0, &testutils.MockBlockInfo{
InfoNum: uint64(batch.EpochNum),
InfoHash: batch.EpochHash,
}, batch.Timestamp)
if err != nil {
return nil, fmt.Errorf("could not build L1 Info transaction: %w", err)
}
txs := []*types.Transaction{types.NewTx(l1InfoTx)}
for i, opaqueTx := range batch.Transactions {
var tx types.Transaction
err = tx.UnmarshalBinary(opaqueTx)
if err != nil {
return nil, fmt.Errorf("could not decode tx %d: %w", i, err)
}
txs = append(txs, &tx)
}
return types.NewBlockWithHeader(&types.Header{
ParentHash: batch.ParentHash,
Time: batch.Timestamp,
}).WithBody(types.Body{
Transactions: txs,
}), nil
}

// a test case for the benchmark controls the number of batches and transactions per batch,
// as well as the batch type and compressor used
type BatchingBenchmarkTC struct {
Expand Down Expand Up @@ -155,16 +192,17 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
}

for _, tc := range tests {
chainID := big.NewInt(333)
cfg := &rollup.Config{L2ChainID: big.NewInt(333)}
rng := rand.New(rand.NewSource(0x543331))
// pre-generate batches to keep the benchmark from including the random generation
batches := make([]*derive.SingularBatch, tc.BatchCount)
blocks := make([]*types.Block, tc.BatchCount)
t := time.Now()
for i := 0; i < tc.BatchCount; i++ {
batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID)
// set the timestamp to increase with each batch
// to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix())
var err error
blocks[i], err = randomBlock(cfg, rng, tc.txPerBatch, uint64(t.Add(time.Duration(i)*time.Second).Unix()))
require.NoError(b, err)
}
b.Run(tc.String(), func(b *testing.B) {
// reset the compressor used in the test case
Expand All @@ -174,13 +212,13 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
cout, _ := channelOutByType(b, tc.BatchType, tc.cd)
// add all but the final batch to the channel out
for i := 0; i < tc.BatchCount-1; i++ {
err := cout.AddSingularBatch(batches[i], 0)
_, err := cout.AddBlock(cfg, blocks[i])
require.NoError(b, err)
}
// measure the time to add the final batch
b.StartTimer()
// add the final batch to the channel out
err := cout.AddSingularBatch(batches[tc.BatchCount-1], 0)
_, err := cout.AddBlock(cfg, blocks[tc.BatchCount-1])
require.NoError(b, err)
}
})
Expand All @@ -193,7 +231,7 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
// Hint: use -benchtime=1x to run the benchmarks for a single iteration
// it is not currently designed to use b.N
func BenchmarkIncremental(b *testing.B) {
chainID := big.NewInt(333)
cfg := &rollup.Config{L2ChainID: big.NewInt(333)}
rng := rand.New(rand.NewSource(0x543331))
// use the real compressor for this benchmark
// use batchCount as the number of batches to add in each benchmark iteration
Expand Down Expand Up @@ -226,17 +264,20 @@ func BenchmarkIncremental(b *testing.B) {
b.StopTimer()
// prepare the batches
t := time.Now()
batches := make([]*derive.SingularBatch, tc.BatchCount)
blocks := make([]*types.Block, tc.BatchCount)
for i := 0; i < tc.BatchCount; i++ {
t := t.Add(time.Second)
batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID)
// set the timestamp to increase with each batch
// to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Unix())
t = t.Add(time.Second)
blocks[i], err = randomBlock(cfg, rng, tc.txPerBatch, uint64(t.Unix()))
if err != nil {
done = true
return
}
}
b.StartTimer()
for i := 0; i < tc.BatchCount; i++ {
err := cout.AddSingularBatch(batches[i], 0)
_, err := cout.AddBlock(cfg, blocks[i])
if err != nil {
done = true
return
Expand Down Expand Up @@ -280,16 +321,17 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) {
}

for _, tc := range tests {
chainID := big.NewInt(333)
cfg := &rollup.Config{L2ChainID: big.NewInt(333)}
rng := rand.New(rand.NewSource(0x543331))
// pre-generate batches to keep the benchmark from including the random generation
batches := make([]*derive.SingularBatch, tc.BatchCount)
blocks := make([]*types.Block, tc.BatchCount)
t := time.Now()
for i := 0; i < tc.BatchCount; i++ {
batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID)
// set the timestamp to increase with each batch
// to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix())
var err error
blocks[i], err = randomBlock(cfg, rng, tc.txPerBatch, uint64(t.Add(time.Duration(i)*time.Second).Unix()))
require.NoError(b, err)
}
b.Run(tc.String(), func(b *testing.B) {
// reset the compressor used in the test case
Expand All @@ -300,7 +342,7 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) {
b.StartTimer()
// add all batches to the channel out
for i := 0; i < tc.BatchCount; i++ {
err := cout.AddSingularBatch(batches[i], 0)
_, err := cout.AddBlock(cfg, blocks[i])
require.NoError(b, err)
}
}
Expand Down
23 changes: 9 additions & 14 deletions op-node/rollup/derive/channel_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ type Compressor interface {
type ChannelOut interface {
ID() ChannelID
Reset() error
AddBlock(*rollup.Config, *types.Block) error
AddSingularBatch(*SingularBatch, uint64) error
AddBlock(*rollup.Config, *types.Block) (*L1BlockInfo, error)
InputBytes() int
ReadyBytes() int
Flush() error
Expand Down Expand Up @@ -107,31 +106,27 @@ func (co *SingularChannelOut) Reset() error {
return err
}

// AddBlock adds a block to the channel. It returns the RLP encoded byte size
// AddBlock adds a block to the channel. It returns the block's L1BlockInfo
// and an error if there is a problem adding the block. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
func (co *SingularChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) error {
func (co *SingularChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) (*L1BlockInfo, error) {
if co.closed {
return ErrChannelOutAlreadyClosed
return nil, ErrChannelOutAlreadyClosed
}

batch, l1Info, err := BlockToSingularBatch(rollupCfg, block)
if err != nil {
return err
return nil, fmt.Errorf("converting block to batch: %w", err)
}
return co.AddSingularBatch(batch, l1Info.SequenceNumber)
return l1Info, co.addSingularBatch(batch, l1Info.SequenceNumber)
}

// AddSingularBatch adds a batch to the channel. It returns the RLP encoded byte size
// and an error if there is a problem adding the batch. The only sentinel error
// addSingularBatch adds a batch to the channel. It returns
// an error if there is a problem adding the batch. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
//
// AddSingularBatch should be used together with BlockToBatch if you need to access the
// BatchData before adding a block to the channel. It isn't possible to access
// the batch data with AddBlock.
func (co *SingularChannelOut) AddSingularBatch(batch *SingularBatch, _ uint64) error {
func (co *SingularChannelOut) addSingularBatch(batch *SingularBatch, _ uint64) error {
if co.closed {
return ErrChannelOutAlreadyClosed
}
Expand Down
Loading

0 comments on commit d062c1c

Please sign in to comment.