Skip to content

Commit

Permalink
Fjord: Add Brotli channel compression support (ethereum-optimism#10358)
Browse files Browse the repository at this point in the history
* wip

* wip

* fix

* fix

* fix

* fix

* address some of the bots comments

* use version bit of 1

* fix lint

* adding compression type

* update batch reader

* abstract span channel compressor

* test and singular batch compressor

* fix

* lint

* move channel compressor as interface

* add base class

* fix go mod

* test fixes

* address comments

* fix

* fix

* revert channel builder test

* revert ratio compressor test

* add checks to accept brotli only post fjord

* revemo unnecessary in test

* fix forge-std

* gofmt

* address comments

* remove methods in compressor

* fix error msg

* add compression algo flag to optional flags

* add Clone() function

---------

Co-authored-by: Roberto Bayardo <[email protected]>
  • Loading branch information
cody-wang-cb and Roberto Bayardo authored May 13, 2024
1 parent ea52388 commit 4b8f6f4
Show file tree
Hide file tree
Showing 32 changed files with 714 additions and 177 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.21

require (
github.com/BurntSushi/toml v1.3.2
github.com/DataDog/zstd v1.5.2
github.com/andybalholm/brotli v1.1.0
github.com/btcsuite/btcd v0.24.0
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0
github.com/cockroachdb/pebble v0.0.0-20231018212520-f6cde3fc2fa4
Expand Down Expand Up @@ -55,7 +57,6 @@ require (
)

require (
github.com/DataDog/zstd v1.5.2 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/VictoriaMetrics/fastcache v1.12.1 // indirect
github.com/allegro/bigcache v1.2.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/allegro/bigcache v1.2.1 h1:hg1sY1raCwic3Vnsvje6TT7/pnZba83LeFck5NrFKSc=
github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/armon/go-metrics v0.3.8/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config, latestL1Origi
}
var co derive.ChannelOut
if cfg.BatchType == derive.SpanBatchType {
co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize)
co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize, cfg.CompressorConfig.CompressionAlgo)
} else {
co, err = derive.NewSingularChannelOut(c)
}
Expand Down
38 changes: 29 additions & 9 deletions op-batcher/batcher/channel_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ func TestChannelBuilderBatchType(t *testing.T) {
{"ChannelBuilder_PendingFrames_TotalFrames", ChannelBuilder_PendingFrames_TotalFrames},
{"ChannelBuilder_InputBytes", ChannelBuilder_InputBytes},
{"ChannelBuilder_OutputBytes", ChannelBuilder_OutputBytes},
{"ChannelBuilder_OutputWrongFramePanic", ChannelBuilder_OutputWrongFramePanic},
}
for _, test := range tests {
test := test
Expand Down Expand Up @@ -413,7 +412,8 @@ func TestChannelBuilder_OutputFrames(t *testing.T) {

// Check how many ready bytes
require.Greater(t, uint64(cb.co.ReadyBytes()+derive.FrameV0OverHeadSize), channelConfig.MaxFrameSize)
require.Equal(t, 0, cb.PendingFrames())

require.Equal(t, 0, cb.PendingFrames()) // always 0 because non compressor

// The channel should not be full
// but we want to output the frames for testing anyways
Expand All @@ -430,11 +430,27 @@ func TestChannelBuilder_OutputFrames(t *testing.T) {
}

func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) {
for _, algo := range derive.CompressionAlgoTypes {
t.Run("ChannelBuilder_OutputFrames_SpanBatch_"+algo.String(), func(t *testing.T) {
if algo.IsBrotli() {
ChannelBuilder_OutputFrames_SpanBatch(t, algo) // to fill faster for brotli
} else {
ChannelBuilder_OutputFrames_SpanBatch(t, algo)
}
})
}
}

func ChannelBuilder_OutputFrames_SpanBatch(t *testing.T, algo derive.CompressionAlgo) {
channelConfig := defaultTestChannelConfig()
channelConfig.MaxFrameSize = 20 + derive.FrameV0OverHeadSize
channelConfig.TargetNumFrames = 5
if algo.IsBrotli() {
channelConfig.TargetNumFrames = 3
} else {
channelConfig.TargetNumFrames = 5
}
channelConfig.BatchType = derive.SpanBatchType
channelConfig.InitRatioCompressor(1)
channelConfig.InitRatioCompressor(1, algo)

// Construct the channel builder
cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin)
Expand All @@ -453,6 +469,10 @@ func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) {
for {
err = addMiniBlock(cb)
if err == nil {
if cb.IsFull() {
// this happens when the data exactly fills the channel
break
}
require.False(t, cb.IsFull())
// There should be no ready bytes until the channel is full
require.Equal(t, cb.co.ReadyBytes(), 0)
Expand Down Expand Up @@ -504,7 +524,7 @@ func ChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T, batchType uint) {
channelConfig := defaultTestChannelConfig()
channelConfig.MaxFrameSize = derive.FrameV0OverHeadSize + 1
channelConfig.TargetNumFrames = math.MaxUint16 + 1
channelConfig.InitRatioCompressor(.1)
channelConfig.InitRatioCompressor(.1, derive.Zlib)
channelConfig.BatchType = batchType

rng := rand.New(rand.NewSource(123))
Expand Down Expand Up @@ -546,8 +566,8 @@ func TestChannelBuilder_FullShadowCompressor(t *testing.T) {
TargetNumFrames: 1,
BatchType: derive.SpanBatchType,
}
cfg.InitShadowCompressor()

cfg.InitShadowCompressor(derive.Zlib)
cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin)
require.NoError(err)

Expand Down Expand Up @@ -577,7 +597,7 @@ func ChannelBuilder_AddBlock(t *testing.T, batchType uint) {
channelConfig.MaxFrameSize = 20 + derive.FrameV0OverHeadSize
channelConfig.TargetNumFrames = 2
// Configure the Input Threshold params so we observe a full channel
channelConfig.InitRatioCompressor(1)
channelConfig.InitRatioCompressor(1, derive.Zlib)

// Construct the channel builder
cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin)
Expand Down Expand Up @@ -700,7 +720,7 @@ func ChannelBuilder_PendingFrames_TotalFrames(t *testing.T, batchType uint) {
cfg.MaxFrameSize = 1000
cfg.TargetNumFrames = tnf
cfg.BatchType = batchType
cfg.InitShadowCompressor()
cfg.InitShadowCompressor(derive.Zlib)
cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin)
require.NoError(err)

Expand Down Expand Up @@ -782,7 +802,7 @@ func ChannelBuilder_OutputBytes(t *testing.T, batchType uint) {
cfg.MaxFrameSize = 1000
cfg.TargetNumFrames = 16
cfg.BatchType = batchType
cfg.InitRatioCompressor(1.0)
cfg.InitRatioCompressor(1.0, derive.Zlib)
cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin)
require.NoError(err, "NewChannelBuilder")

Expand Down
13 changes: 7 additions & 6 deletions op-batcher/batcher/channel_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,26 @@ type ChannelConfig struct {
// value consistent with cc.TargetNumFrames and cc.MaxFrameSize.
// comprKind can be the empty string, in which case the default compressor will
// be used.
func (cc *ChannelConfig) InitCompressorConfig(approxComprRatio float64, comprKind string) {
func (cc *ChannelConfig) InitCompressorConfig(approxComprRatio float64, comprKind string, compressionAlgo derive.CompressionAlgo) {
cc.CompressorConfig = compressor.Config{
// Compressor output size needs to account for frame encoding overhead
TargetOutputSize: MaxDataSize(cc.TargetNumFrames, cc.MaxFrameSize),
ApproxComprRatio: approxComprRatio,
Kind: comprKind,
CompressionAlgo: compressionAlgo,
}
}

func (cc *ChannelConfig) InitRatioCompressor(approxComprRatio float64) {
cc.InitCompressorConfig(approxComprRatio, compressor.RatioKind)
func (cc *ChannelConfig) InitRatioCompressor(approxComprRatio float64, compressionAlgo derive.CompressionAlgo) {
cc.InitCompressorConfig(approxComprRatio, compressor.RatioKind, compressionAlgo)
}

func (cc *ChannelConfig) InitShadowCompressor() {
cc.InitCompressorConfig(0, compressor.ShadowKind)
func (cc *ChannelConfig) InitShadowCompressor(compressionAlgo derive.CompressionAlgo) {
cc.InitCompressorConfig(0, compressor.ShadowKind, compressionAlgo)
}

func (cc *ChannelConfig) InitNoneCompressor() {
cc.InitCompressorConfig(0, compressor.NoneKind)
cc.InitCompressorConfig(0, compressor.NoneKind, derive.Zlib)
}

func (cc *ChannelConfig) MaxFramesPerTx() int {
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/channel_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func defaultTestChannelConfig() ChannelConfig {
TargetNumFrames: 1,
BatchType: derive.SingularBatchType,
}
c.InitRatioCompressor(0.4)
c.InitRatioCompressor(0.4, derive.Zlib)
return c
}

Expand Down
4 changes: 2 additions & 2 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func channelManagerTestConfig(maxFrameSize uint64, batchType uint) ChannelConfig
TargetNumFrames: 1,
BatchType: batchType,
}
cfg.InitRatioCompressor(1)
cfg.InitRatioCompressor(1, derive.Zlib)
return cfg
}

Expand Down Expand Up @@ -123,7 +123,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
// channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and resetting the pendingChannels map
cfg.ChannelTimeout = 10
cfg.InitRatioCompressor(1)
cfg.InitRatioCompressor(1, derive.Zlib)
m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)

// Channel Manager state should be empty by default
Expand Down
21 changes: 19 additions & 2 deletions op-batcher/batcher/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"
"testing"

"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
Expand All @@ -29,6 +30,9 @@ func TestChannelTimeout(t *testing.T) {
log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
ChannelTimeout: 100,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
},
}, &rollup.Config{})
m.Clear(eth.BlockID{})

Expand Down Expand Up @@ -71,7 +75,9 @@ func TestChannelTimeout(t *testing.T) {
// TestChannelManager_NextTxData tests the nextTxData function.
func TestChannelManager_NextTxData(t *testing.T) {
log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
}}, &rollup.Config{})
m.Clear(eth.BlockID{})

// Nil pending channel should return EOF
Expand Down Expand Up @@ -118,6 +124,9 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) {
ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{
MultiFrameTxs: false,
TargetNumFrames: n,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
},
}, &rollup.Config{}, latestL1BlockOrigin)
require.NoError(err)
chID := ch.ID()
Expand Down Expand Up @@ -156,6 +165,9 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) {
ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{
MultiFrameTxs: true,
TargetNumFrames: n,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
},
}, &rollup.Config{}, latestL1BlockOrigin)
require.NoError(err)
chID := ch.ID()
Expand Down Expand Up @@ -202,6 +214,9 @@ func TestChannelTxConfirmed(t *testing.T) {
// channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and resetting the pendingChannels map
ChannelTimeout: 10,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
},
}, &rollup.Config{})
m.Clear(eth.BlockID{})

Expand Down Expand Up @@ -251,7 +266,9 @@ func TestChannelTxConfirmed(t *testing.T) {
func TestChannelTxFailed(t *testing.T) {
// Create a channel manager
log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
}}, &rollup.Config{})
m.Clear(eth.BlockID{})

// Let's add a valid pending transaction to the channel
Expand Down
8 changes: 8 additions & 0 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
Expand Down Expand Up @@ -67,6 +68,9 @@ type CLIConfig struct {
// Type of compressor to use. Must be one of [compressor.KindKeys].
Compressor string

// Type of compression algorithm to use. Must be one of [zlib, brotli, brotli[9-11]]
CompressionAlgo derive.CompressionAlgo

// If Stopped is true, the batcher starts stopped and won't start batching right away.
// Batching needs to be started via an admin RPC.
Stopped bool
Expand Down Expand Up @@ -124,6 +128,9 @@ func (c *CLIConfig) Check() error {
if c.Compressor == compressor.RatioKind && (c.ApproxComprRatio <= 0 || c.ApproxComprRatio > 1) {
return fmt.Errorf("invalid ApproxComprRatio %v for ratio compressor", c.ApproxComprRatio)
}
if !derive.ValidCompressionAlgoType(c.CompressionAlgo) {
return fmt.Errorf("invalid compression algo %v", c.CompressionAlgo)
}
if c.BatchType > 1 {
return fmt.Errorf("unknown batch type: %v", c.BatchType)
}
Expand Down Expand Up @@ -168,6 +175,7 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
TargetNumFrames: ctx.Int(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.Float64(flags.ApproxComprRatioFlag.Name),
Compressor: ctx.String(flags.CompressorFlag.Name),
CompressionAlgo: derive.CompressionAlgo(ctx.String(flags.CompressionAlgoFlag.Name)),
Stopped: ctx.Bool(flags.StoppedFlag.Name),
WaitNodeSync: ctx.Bool(flags.WaitNodeSyncFlag.Name),
CheckRecentTxsDepth: ctx.Int(flags.CheckRecentTxsDepthFlag.Name),
Expand Down
4 changes: 3 additions & 1 deletion op-batcher/batcher/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
Expand Down Expand Up @@ -35,7 +36,8 @@ func validBatcherConfig() batcher.CLIConfig {
MetricsConfig: metrics.DefaultCLIConfig(),
PprofConfig: oppprof.DefaultCLIConfig(),
// The compressor config is not checked in config.Check()
RPC: rpc.DefaultCLIConfig(),
RPC: rpc.DefaultCLIConfig(),
CompressionAlgo: derive.Zlib,
}
}

Expand Down
7 changes: 6 additions & 1 deletion op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
return fmt.Errorf("max frame size %d exceeds plasma max input size %d", cc.MaxFrameSize, plasma.MaxInputSize)
}

cc.InitCompressorConfig(cfg.ApproxComprRatio, cfg.Compressor)
cc.InitCompressorConfig(cfg.ApproxComprRatio, cfg.Compressor, cfg.CompressionAlgo)

if bs.UseBlobs && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
bs.Log.Error("Cannot use Blob data before Ecotone!") // log only, the batcher may not be actively running.
Expand All @@ -228,6 +228,11 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
bs.Log.Warn("Ecotone upgrade is active, but batcher is not configured to use Blobs!")
}

// Checking for brotli compression only post Fjord
if bs.ChannelConfig.CompressorConfig.CompressionAlgo.IsBrotli() && !bs.RollupConfig.IsFjord(uint64(time.Now().Unix())) {
return fmt.Errorf("cannot use brotli compression before Fjord")
}

if err := cc.Check(); err != nil {
return fmt.Errorf("invalid channel configuration: %w", err)
}
Expand Down
3 changes: 3 additions & 0 deletions op-batcher/compressor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type Config struct {
// Kind of compressor to use. Must be one of KindKeys. If unset, NewCompressor
// will default to RatioKind.
Kind string

// Type of compression algorithm to use. Must be one of [zlib, brotli-(9|10|11)]
CompressionAlgo derive.CompressionAlgo
}

func (c Config) NewCompressor() (derive.Compressor, error) {
Expand Down
Loading

0 comments on commit 4b8f6f4

Please sign in to comment.