From 4b8f6f4ffcd5adebf710f51ae245737c774c72df Mon Sep 17 00:00:00 2001 From: cody-wang-cb Date: Mon, 13 May 2024 15:15:48 -0700 Subject: [PATCH] Fjord: Add Brotli channel compression support (#10358) * wip * wip * fix * fix * fix * fix * address some of the bots comments * use version bit of 1 * fix lint * adding compression type * update batch reader * abstract span channel compressor * test and singular batch compressor * fix * lint * move channel compressor as interface * add base class * fix go mod * test fixes * address comments * fix * fix * revert channel builder test * revert ratio compressor test * add checks to accept brotli only post fjord * revemo unnecessary in test * fix forge-std * gofmt * address comments * remove methods in compressor * fix error msg * add compression algo flag to optional flags * add Clone() function --------- Co-authored-by: Roberto Bayardo --- go.mod | 3 +- go.sum | 2 + op-batcher/batcher/channel_builder.go | 2 +- op-batcher/batcher/channel_builder_test.go | 38 +++-- op-batcher/batcher/channel_config.go | 13 +- op-batcher/batcher/channel_config_test.go | 2 +- op-batcher/batcher/channel_manager_test.go | 4 +- op-batcher/batcher/channel_test.go | 21 ++- op-batcher/batcher/config.go | 8 + op-batcher/batcher/config_test.go | 4 +- op-batcher/batcher/service.go | 7 +- op-batcher/compressor/config.go | 3 + op-batcher/compressor/ratio_compressor.go | 23 ++- .../compressor/ratio_compressor_test.go | 2 + op-batcher/compressor/shadow_compressor.go | 36 ++--- .../compressor/shadow_compressor_test.go | 2 + op-batcher/flags/flags.go | 11 ++ op-e2e/actions/l2_batcher.go | 3 +- op-e2e/actions/sync_test.go | 2 +- op-e2e/sequencer_failover_setup.go | 1 + op-e2e/setup.go | 1 + op-node/benchmarks/batchbuilding_test.go | 153 +++++++++--------- .../batch_decoder/reassemble/reassemble.go | 2 +- op-node/rollup/derive/channel.go | 42 ++++- op-node/rollup/derive/channel_compressor.go | 94 +++++++++++ .../rollup/derive/channel_compressor_test.go | 67 ++++++++ op-node/rollup/derive/channel_in_reader.go | 2 +- op-node/rollup/derive/channel_out_test.go | 58 +++++-- op-node/rollup/derive/channel_test.go | 120 ++++++++++++++ op-node/rollup/derive/span_channel_out.go | 39 +++-- op-node/rollup/derive/types.go | 68 ++++++++ op-node/rollup/derive/types_test.go | 58 +++++++ 32 files changed, 714 insertions(+), 177 deletions(-) create mode 100644 op-node/rollup/derive/channel_compressor.go create mode 100644 op-node/rollup/derive/channel_compressor_test.go create mode 100644 op-node/rollup/derive/types.go create mode 100644 op-node/rollup/derive/types_test.go diff --git a/go.mod b/go.mod index 324324320184..7a57a01b640b 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.21 require ( github.com/BurntSushi/toml v1.3.2 + github.com/DataDog/zstd v1.5.2 + github.com/andybalholm/brotli v1.1.0 github.com/btcsuite/btcd v0.24.0 github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 github.com/cockroachdb/pebble v0.0.0-20231018212520-f6cde3fc2fa4 @@ -55,7 +57,6 @@ require ( ) require ( - github.com/DataDog/zstd v1.5.2 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect github.com/VictoriaMetrics/fastcache v1.12.1 // indirect github.com/allegro/bigcache v1.2.1 // indirect diff --git a/go.sum b/go.sum index 72df4da527b1..fa75105188a8 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/allegro/bigcache v1.2.1 h1:hg1sY1raCwic3Vnsvje6TT7/pnZba83LeFck5NrFKSc= github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/armon/go-metrics v0.3.8/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= diff --git a/op-batcher/batcher/channel_builder.go b/op-batcher/batcher/channel_builder.go index e364570d48b0..d63e1d45b5dc 100644 --- a/op-batcher/batcher/channel_builder.go +++ b/op-batcher/batcher/channel_builder.go @@ -86,7 +86,7 @@ func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config, latestL1Origi } var co derive.ChannelOut if cfg.BatchType == derive.SpanBatchType { - co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize) + co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize, cfg.CompressorConfig.CompressionAlgo) } else { co, err = derive.NewSingularChannelOut(c) } diff --git a/op-batcher/batcher/channel_builder_test.go b/op-batcher/batcher/channel_builder_test.go index 79df1868330f..cb4c3ee9b7bd 100644 --- a/op-batcher/batcher/channel_builder_test.go +++ b/op-batcher/batcher/channel_builder_test.go @@ -297,7 +297,6 @@ func TestChannelBuilderBatchType(t *testing.T) { {"ChannelBuilder_PendingFrames_TotalFrames", ChannelBuilder_PendingFrames_TotalFrames}, {"ChannelBuilder_InputBytes", ChannelBuilder_InputBytes}, {"ChannelBuilder_OutputBytes", ChannelBuilder_OutputBytes}, - {"ChannelBuilder_OutputWrongFramePanic", ChannelBuilder_OutputWrongFramePanic}, } for _, test := range tests { test := test @@ -413,7 +412,8 @@ func TestChannelBuilder_OutputFrames(t *testing.T) { // Check how many ready bytes require.Greater(t, uint64(cb.co.ReadyBytes()+derive.FrameV0OverHeadSize), channelConfig.MaxFrameSize) - require.Equal(t, 0, cb.PendingFrames()) + + require.Equal(t, 0, cb.PendingFrames()) // always 0 because non compressor // The channel should not be full // but we want to output the frames for testing anyways @@ -430,11 +430,27 @@ func TestChannelBuilder_OutputFrames(t *testing.T) { } func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) { + for _, algo := range derive.CompressionAlgoTypes { + t.Run("ChannelBuilder_OutputFrames_SpanBatch_"+algo.String(), func(t *testing.T) { + if algo.IsBrotli() { + ChannelBuilder_OutputFrames_SpanBatch(t, algo) // to fill faster for brotli + } else { + ChannelBuilder_OutputFrames_SpanBatch(t, algo) + } + }) + } +} + +func ChannelBuilder_OutputFrames_SpanBatch(t *testing.T, algo derive.CompressionAlgo) { channelConfig := defaultTestChannelConfig() channelConfig.MaxFrameSize = 20 + derive.FrameV0OverHeadSize - channelConfig.TargetNumFrames = 5 + if algo.IsBrotli() { + channelConfig.TargetNumFrames = 3 + } else { + channelConfig.TargetNumFrames = 5 + } channelConfig.BatchType = derive.SpanBatchType - channelConfig.InitRatioCompressor(1) + channelConfig.InitRatioCompressor(1, algo) // Construct the channel builder cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin) @@ -453,6 +469,10 @@ func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) { for { err = addMiniBlock(cb) if err == nil { + if cb.IsFull() { + // this happens when the data exactly fills the channel + break + } require.False(t, cb.IsFull()) // There should be no ready bytes until the channel is full require.Equal(t, cb.co.ReadyBytes(), 0) @@ -504,7 +524,7 @@ func ChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T, batchType uint) { channelConfig := defaultTestChannelConfig() channelConfig.MaxFrameSize = derive.FrameV0OverHeadSize + 1 channelConfig.TargetNumFrames = math.MaxUint16 + 1 - channelConfig.InitRatioCompressor(.1) + channelConfig.InitRatioCompressor(.1, derive.Zlib) channelConfig.BatchType = batchType rng := rand.New(rand.NewSource(123)) @@ -546,8 +566,8 @@ func TestChannelBuilder_FullShadowCompressor(t *testing.T) { TargetNumFrames: 1, BatchType: derive.SpanBatchType, } - cfg.InitShadowCompressor() + cfg.InitShadowCompressor(derive.Zlib) cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin) require.NoError(err) @@ -577,7 +597,7 @@ func ChannelBuilder_AddBlock(t *testing.T, batchType uint) { channelConfig.MaxFrameSize = 20 + derive.FrameV0OverHeadSize channelConfig.TargetNumFrames = 2 // Configure the Input Threshold params so we observe a full channel - channelConfig.InitRatioCompressor(1) + channelConfig.InitRatioCompressor(1, derive.Zlib) // Construct the channel builder cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin) @@ -700,7 +720,7 @@ func ChannelBuilder_PendingFrames_TotalFrames(t *testing.T, batchType uint) { cfg.MaxFrameSize = 1000 cfg.TargetNumFrames = tnf cfg.BatchType = batchType - cfg.InitShadowCompressor() + cfg.InitShadowCompressor(derive.Zlib) cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin) require.NoError(err) @@ -782,7 +802,7 @@ func ChannelBuilder_OutputBytes(t *testing.T, batchType uint) { cfg.MaxFrameSize = 1000 cfg.TargetNumFrames = 16 cfg.BatchType = batchType - cfg.InitRatioCompressor(1.0) + cfg.InitRatioCompressor(1.0, derive.Zlib) cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin) require.NoError(err, "NewChannelBuilder") diff --git a/op-batcher/batcher/channel_config.go b/op-batcher/batcher/channel_config.go index c6a0eec4c04c..b504d2480f3d 100644 --- a/op-batcher/batcher/channel_config.go +++ b/op-batcher/batcher/channel_config.go @@ -53,25 +53,26 @@ type ChannelConfig struct { // value consistent with cc.TargetNumFrames and cc.MaxFrameSize. // comprKind can be the empty string, in which case the default compressor will // be used. -func (cc *ChannelConfig) InitCompressorConfig(approxComprRatio float64, comprKind string) { +func (cc *ChannelConfig) InitCompressorConfig(approxComprRatio float64, comprKind string, compressionAlgo derive.CompressionAlgo) { cc.CompressorConfig = compressor.Config{ // Compressor output size needs to account for frame encoding overhead TargetOutputSize: MaxDataSize(cc.TargetNumFrames, cc.MaxFrameSize), ApproxComprRatio: approxComprRatio, Kind: comprKind, + CompressionAlgo: compressionAlgo, } } -func (cc *ChannelConfig) InitRatioCompressor(approxComprRatio float64) { - cc.InitCompressorConfig(approxComprRatio, compressor.RatioKind) +func (cc *ChannelConfig) InitRatioCompressor(approxComprRatio float64, compressionAlgo derive.CompressionAlgo) { + cc.InitCompressorConfig(approxComprRatio, compressor.RatioKind, compressionAlgo) } -func (cc *ChannelConfig) InitShadowCompressor() { - cc.InitCompressorConfig(0, compressor.ShadowKind) +func (cc *ChannelConfig) InitShadowCompressor(compressionAlgo derive.CompressionAlgo) { + cc.InitCompressorConfig(0, compressor.ShadowKind, compressionAlgo) } func (cc *ChannelConfig) InitNoneCompressor() { - cc.InitCompressorConfig(0, compressor.NoneKind) + cc.InitCompressorConfig(0, compressor.NoneKind, derive.Zlib) } func (cc *ChannelConfig) MaxFramesPerTx() int { diff --git a/op-batcher/batcher/channel_config_test.go b/op-batcher/batcher/channel_config_test.go index a89d780ab558..d7f3c2cc5ea5 100644 --- a/op-batcher/batcher/channel_config_test.go +++ b/op-batcher/batcher/channel_config_test.go @@ -20,7 +20,7 @@ func defaultTestChannelConfig() ChannelConfig { TargetNumFrames: 1, BatchType: derive.SingularBatchType, } - c.InitRatioCompressor(0.4) + c.InitRatioCompressor(0.4, derive.Zlib) return c } diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index e029e69e658c..eafcc49d1143 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -25,7 +25,7 @@ func channelManagerTestConfig(maxFrameSize uint64, batchType uint) ChannelConfig TargetNumFrames: 1, BatchType: batchType, } - cfg.InitRatioCompressor(1) + cfg.InitRatioCompressor(1, derive.Zlib) return cfg } @@ -123,7 +123,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) { // channels on confirmation. This would result in [TxConfirmed] // clearing confirmed transactions, and resetting the pendingChannels map cfg.ChannelTimeout = 10 - cfg.InitRatioCompressor(1) + cfg.InitRatioCompressor(1, derive.Zlib) m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig) // Channel Manager state should be empty by default diff --git a/op-batcher/batcher/channel_test.go b/op-batcher/batcher/channel_test.go index 71903eb370ca..3d3e813d0ae8 100644 --- a/op-batcher/batcher/channel_test.go +++ b/op-batcher/batcher/channel_test.go @@ -4,6 +4,7 @@ import ( "io" "testing" + "github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" @@ -29,6 +30,9 @@ func TestChannelTimeout(t *testing.T) { log := testlog.Logger(t, log.LevelCrit) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ ChannelTimeout: 100, + CompressorConfig: compressor.Config{ + CompressionAlgo: derive.Zlib, + }, }, &rollup.Config{}) m.Clear(eth.BlockID{}) @@ -71,7 +75,9 @@ func TestChannelTimeout(t *testing.T) { // TestChannelManager_NextTxData tests the nextTxData function. func TestChannelManager_NextTxData(t *testing.T) { log := testlog.Logger(t, log.LevelCrit) - m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{}) + m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{CompressorConfig: compressor.Config{ + CompressionAlgo: derive.Zlib, + }}, &rollup.Config{}) m.Clear(eth.BlockID{}) // Nil pending channel should return EOF @@ -118,6 +124,9 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) { ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{ MultiFrameTxs: false, TargetNumFrames: n, + CompressorConfig: compressor.Config{ + CompressionAlgo: derive.Zlib, + }, }, &rollup.Config{}, latestL1BlockOrigin) require.NoError(err) chID := ch.ID() @@ -156,6 +165,9 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) { ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{ MultiFrameTxs: true, TargetNumFrames: n, + CompressorConfig: compressor.Config{ + CompressionAlgo: derive.Zlib, + }, }, &rollup.Config{}, latestL1BlockOrigin) require.NoError(err) chID := ch.ID() @@ -202,6 +214,9 @@ func TestChannelTxConfirmed(t *testing.T) { // channels on confirmation. This would result in [TxConfirmed] // clearing confirmed transactions, and resetting the pendingChannels map ChannelTimeout: 10, + CompressorConfig: compressor.Config{ + CompressionAlgo: derive.Zlib, + }, }, &rollup.Config{}) m.Clear(eth.BlockID{}) @@ -251,7 +266,9 @@ func TestChannelTxConfirmed(t *testing.T) { func TestChannelTxFailed(t *testing.T) { // Create a channel manager log := testlog.Logger(t, log.LevelCrit) - m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{}) + m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{CompressorConfig: compressor.Config{ + CompressionAlgo: derive.Zlib, + }}, &rollup.Config{}) m.Clear(eth.BlockID{}) // Let's add a valid pending transaction to the channel diff --git a/op-batcher/batcher/config.go b/op-batcher/batcher/config.go index 6f67e3850328..cc37961a056a 100644 --- a/op-batcher/batcher/config.go +++ b/op-batcher/batcher/config.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-batcher/flags" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" plasma "github.com/ethereum-optimism/optimism/op-plasma" oplog "github.com/ethereum-optimism/optimism/op-service/log" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" @@ -67,6 +68,9 @@ type CLIConfig struct { // Type of compressor to use. Must be one of [compressor.KindKeys]. Compressor string + // Type of compression algorithm to use. Must be one of [zlib, brotli, brotli[9-11]] + CompressionAlgo derive.CompressionAlgo + // If Stopped is true, the batcher starts stopped and won't start batching right away. // Batching needs to be started via an admin RPC. Stopped bool @@ -124,6 +128,9 @@ func (c *CLIConfig) Check() error { if c.Compressor == compressor.RatioKind && (c.ApproxComprRatio <= 0 || c.ApproxComprRatio > 1) { return fmt.Errorf("invalid ApproxComprRatio %v for ratio compressor", c.ApproxComprRatio) } + if !derive.ValidCompressionAlgoType(c.CompressionAlgo) { + return fmt.Errorf("invalid compression algo %v", c.CompressionAlgo) + } if c.BatchType > 1 { return fmt.Errorf("unknown batch type: %v", c.BatchType) } @@ -168,6 +175,7 @@ func NewConfig(ctx *cli.Context) *CLIConfig { TargetNumFrames: ctx.Int(flags.TargetNumFramesFlag.Name), ApproxComprRatio: ctx.Float64(flags.ApproxComprRatioFlag.Name), Compressor: ctx.String(flags.CompressorFlag.Name), + CompressionAlgo: derive.CompressionAlgo(ctx.String(flags.CompressionAlgoFlag.Name)), Stopped: ctx.Bool(flags.StoppedFlag.Name), WaitNodeSync: ctx.Bool(flags.WaitNodeSyncFlag.Name), CheckRecentTxsDepth: ctx.Int(flags.CheckRecentTxsDepthFlag.Name), diff --git a/op-batcher/batcher/config_test.go b/op-batcher/batcher/config_test.go index 18a6227e0011..f8fb08a703da 100644 --- a/op-batcher/batcher/config_test.go +++ b/op-batcher/batcher/config_test.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum-optimism/optimism/op-batcher/batcher" "github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-batcher/flags" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/log" "github.com/ethereum-optimism/optimism/op-service/metrics" "github.com/ethereum-optimism/optimism/op-service/oppprof" @@ -35,7 +36,8 @@ func validBatcherConfig() batcher.CLIConfig { MetricsConfig: metrics.DefaultCLIConfig(), PprofConfig: oppprof.DefaultCLIConfig(), // The compressor config is not checked in config.Check() - RPC: rpc.DefaultCLIConfig(), + RPC: rpc.DefaultCLIConfig(), + CompressionAlgo: derive.Zlib, } } diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index f9bf8bd8e04f..4203f442c78e 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -219,7 +219,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { return fmt.Errorf("max frame size %d exceeds plasma max input size %d", cc.MaxFrameSize, plasma.MaxInputSize) } - cc.InitCompressorConfig(cfg.ApproxComprRatio, cfg.Compressor) + cc.InitCompressorConfig(cfg.ApproxComprRatio, cfg.Compressor, cfg.CompressionAlgo) if bs.UseBlobs && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) { bs.Log.Error("Cannot use Blob data before Ecotone!") // log only, the batcher may not be actively running. @@ -228,6 +228,11 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { bs.Log.Warn("Ecotone upgrade is active, but batcher is not configured to use Blobs!") } + // Checking for brotli compression only post Fjord + if bs.ChannelConfig.CompressorConfig.CompressionAlgo.IsBrotli() && !bs.RollupConfig.IsFjord(uint64(time.Now().Unix())) { + return fmt.Errorf("cannot use brotli compression before Fjord") + } + if err := cc.Check(); err != nil { return fmt.Errorf("invalid channel configuration: %w", err) } diff --git a/op-batcher/compressor/config.go b/op-batcher/compressor/config.go index 8befc43812ae..e4078c8373a6 100644 --- a/op-batcher/compressor/config.go +++ b/op-batcher/compressor/config.go @@ -16,6 +16,9 @@ type Config struct { // Kind of compressor to use. Must be one of KindKeys. If unset, NewCompressor // will default to RatioKind. Kind string + + // Type of compression algorithm to use. Must be one of [zlib, brotli-(9|10|11)] + CompressionAlgo derive.CompressionAlgo } func (c Config) NewCompressor() (derive.Compressor, error) { diff --git a/op-batcher/compressor/ratio_compressor.go b/op-batcher/compressor/ratio_compressor.go index 6844062e44e8..516cd05f6a2c 100644 --- a/op-batcher/compressor/ratio_compressor.go +++ b/op-batcher/compressor/ratio_compressor.go @@ -1,9 +1,6 @@ package compressor import ( - "bytes" - "compress/zlib" - "github.com/ethereum-optimism/optimism/op-node/rollup/derive" ) @@ -11,8 +8,7 @@ type RatioCompressor struct { config Config inputBytes int - buf bytes.Buffer - compress *zlib.Writer + compressor derive.ChannelCompressor } // NewRatioCompressor creates a new derive.Compressor implementation that uses the target @@ -25,11 +21,11 @@ func NewRatioCompressor(config Config) (derive.Compressor, error) { config: config, } - compress, err := zlib.NewWriterLevel(&c.buf, zlib.BestCompression) + compressor, err := derive.NewChannelCompressor(config.CompressionAlgo) if err != nil { return nil, err } - c.compress = compress + c.compressor = compressor return c, nil } @@ -39,29 +35,28 @@ func (t *RatioCompressor) Write(p []byte) (int, error) { return 0, err } t.inputBytes += len(p) - return t.compress.Write(p) + return t.compressor.Write(p) } func (t *RatioCompressor) Close() error { - return t.compress.Close() + return t.compressor.Close() } func (t *RatioCompressor) Read(p []byte) (int, error) { - return t.buf.Read(p) + return t.compressor.Read(p) } func (t *RatioCompressor) Reset() { - t.buf.Reset() - t.compress.Reset(&t.buf) + t.compressor.Reset() t.inputBytes = 0 } func (t *RatioCompressor) Len() int { - return t.buf.Len() + return t.compressor.Len() } func (t *RatioCompressor) Flush() error { - return t.compress.Flush() + return t.compressor.Flush() } func (t *RatioCompressor) FullErr() error { diff --git a/op-batcher/compressor/ratio_compressor_test.go b/op-batcher/compressor/ratio_compressor_test.go index 27e377a234d6..d1b0d4b936c7 100644 --- a/op-batcher/compressor/ratio_compressor_test.go +++ b/op-batcher/compressor/ratio_compressor_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/ethereum-optimism/optimism/op-batcher/compressor" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/stretchr/testify/require" ) @@ -62,6 +63,7 @@ func TestChannelConfig_InputThreshold(t *testing.T) { comp, err := compressor.NewRatioCompressor(compressor.Config{ TargetOutputSize: tt.targetOutputSize, ApproxComprRatio: tt.approxComprRatio, + CompressionAlgo: derive.Zlib, }) require.NoError(t, err) got := comp.(*compressor.RatioCompressor).InputThreshold() diff --git a/op-batcher/compressor/shadow_compressor.go b/op-batcher/compressor/shadow_compressor.go index 7e6172460392..b5cd9fe6f199 100644 --- a/op-batcher/compressor/shadow_compressor.go +++ b/op-batcher/compressor/shadow_compressor.go @@ -1,9 +1,6 @@ package compressor import ( - "bytes" - "compress/zlib" - "github.com/ethereum-optimism/optimism/op-node/rollup/derive" ) @@ -21,11 +18,8 @@ const ( type ShadowCompressor struct { config Config - buf bytes.Buffer - compress *zlib.Writer - - shadowBuf bytes.Buffer - shadowCompress *zlib.Writer + compressor derive.ChannelCompressor + shadowCompressor derive.ChannelCompressor fullErr error @@ -45,11 +39,11 @@ func NewShadowCompressor(config Config) (derive.Compressor, error) { } var err error - c.compress, err = zlib.NewWriterLevel(&c.buf, zlib.BestCompression) + c.compressor, err = derive.NewChannelCompressor(config.CompressionAlgo) if err != nil { return nil, err } - c.shadowCompress, err = zlib.NewWriterLevel(&c.shadowBuf, zlib.BestCompression) + c.shadowCompressor, err = derive.NewChannelCompressor(config.CompressionAlgo) if err != nil { return nil, err } @@ -62,7 +56,7 @@ func (t *ShadowCompressor) Write(p []byte) (int, error) { if t.fullErr != nil { return 0, t.fullErr } - _, err := t.shadowCompress.Write(p) + _, err := t.shadowCompressor.Write(p) if err != nil { return 0, err } @@ -71,10 +65,10 @@ func (t *ShadowCompressor) Write(p []byte) (int, error) { // Do not flush the buffer unless there's some chance we will be over the size limit. // This reduces CPU but more importantly it makes the shadow compression ratio more // closely reflect the ultimate compression ratio. - if err = t.shadowCompress.Flush(); err != nil { + if err = t.shadowCompressor.Flush(); err != nil { return 0, err } - newBound = uint64(t.shadowBuf.Len()) + CloseOverheadZlib + newBound = uint64(t.shadowCompressor.Len()) + CloseOverheadZlib if newBound > t.config.TargetOutputSize { t.fullErr = derive.ErrCompressorFull if t.Len() > 0 { @@ -85,32 +79,30 @@ func (t *ShadowCompressor) Write(p []byte) (int, error) { } } t.bound = newBound - return t.compress.Write(p) + return t.compressor.Write(p) } func (t *ShadowCompressor) Close() error { - return t.compress.Close() + return t.compressor.Close() } func (t *ShadowCompressor) Read(p []byte) (int, error) { - return t.buf.Read(p) + return t.compressor.Read(p) } func (t *ShadowCompressor) Reset() { - t.buf.Reset() - t.compress.Reset(&t.buf) - t.shadowBuf.Reset() - t.shadowCompress.Reset(&t.shadowBuf) + t.compressor.Reset() + t.shadowCompressor.Reset() t.fullErr = nil t.bound = safeCompressionOverhead } func (t *ShadowCompressor) Len() int { - return t.buf.Len() + return t.compressor.Len() } func (t *ShadowCompressor) Flush() error { - return t.compress.Flush() + return t.compressor.Flush() } func (t *ShadowCompressor) FullErr() error { diff --git a/op-batcher/compressor/shadow_compressor_test.go b/op-batcher/compressor/shadow_compressor_test.go index 1b300bcc3321..c29daeaad322 100644 --- a/op-batcher/compressor/shadow_compressor_test.go +++ b/op-batcher/compressor/shadow_compressor_test.go @@ -63,6 +63,7 @@ func TestShadowCompressor(t *testing.T) { sc, err := NewShadowCompressor(Config{ TargetOutputSize: test.targetOutputSize, + CompressionAlgo: derive.Zlib, }) require.NoError(t, err) @@ -115,6 +116,7 @@ func TestBoundInaccurateForLargeRandomData(t *testing.T) { sc, err := NewShadowCompressor(Config{ TargetOutputSize: sizeLimit + 100, + CompressionAlgo: derive.Zlib, }) require.NoError(t, err) diff --git a/op-batcher/flags/flags.go b/op-batcher/flags/flags.go index 98d2121da0a5..c91234b96a29 100644 --- a/op-batcher/flags/flags.go +++ b/op-batcher/flags/flags.go @@ -9,6 +9,7 @@ import ( "github.com/urfave/cli/v2" "github.com/ethereum-optimism/optimism/op-batcher/compressor" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" plasma "github.com/ethereum-optimism/optimism/op-plasma" opservice "github.com/ethereum-optimism/optimism/op-service" openum "github.com/ethereum-optimism/optimism/op-service/enum" @@ -99,6 +100,15 @@ var ( return nil }, } + CompressionAlgoFlag = &cli.GenericFlag{ + Name: "compression-algo", + Usage: "The compression algorithm to use. Valid options: " + openum.EnumString(derive.CompressionAlgoTypes), + EnvVars: prefixEnvVars("COMPRESSION_ALGO"), + Value: func() *derive.CompressionAlgo { + out := derive.Zlib + return &out + }(), + } StoppedFlag = &cli.BoolFlag{ Name: "stopped", Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC", @@ -167,6 +177,7 @@ var optionalFlags = []cli.Flag{ BatchTypeFlag, DataAvailabilityTypeFlag, ActiveSequencerCheckDurationFlag, + CompressionAlgoFlag, } func init() { diff --git a/op-e2e/actions/l2_batcher.go b/op-e2e/actions/l2_batcher.go index 5eb96ee110ac..310a6cade951 100644 --- a/op-e2e/actions/l2_batcher.go +++ b/op-e2e/actions/l2_batcher.go @@ -192,6 +192,7 @@ func (s *L2Batcher) Buffer(t Testing) error { target := batcher.MaxDataSize(1, s.l2BatcherCfg.MaxL1TxSize) c, e := compressor.NewShadowCompressor(compressor.Config{ TargetOutputSize: target, + CompressionAlgo: derive.Zlib, }) require.NoError(t, e, "failed to create compressor") @@ -200,7 +201,7 @@ func (s *L2Batcher) Buffer(t Testing) error { } else { // use span batch if we're forcing it or if we're at/beyond delta if s.l2BatcherCfg.ForceSubmitSpanBatch || s.rollupCfg.IsDelta(block.Time()) { - ch, err = derive.NewSpanChannelOut(s.rollupCfg.Genesis.L2Time, s.rollupCfg.L2ChainID, target) + ch, err = derive.NewSpanChannelOut(s.rollupCfg.Genesis.L2Time, s.rollupCfg.L2ChainID, target, derive.Zlib) // use singular batches in all other cases } else { ch, err = derive.NewSingularChannelOut(c) diff --git a/op-e2e/actions/sync_test.go b/op-e2e/actions/sync_test.go index 759f51613e0e..e7521bdd8c94 100644 --- a/op-e2e/actions/sync_test.go +++ b/op-e2e/actions/sync_test.go @@ -26,7 +26,7 @@ import ( ) func newSpanChannelOut(t StatefulTesting, e e2eutils.SetupData) derive.ChannelOut { - channelOut, err := derive.NewSpanChannelOut(e.RollupCfg.Genesis.L2Time, e.RollupCfg.L2ChainID, 128_000) + channelOut, err := derive.NewSpanChannelOut(e.RollupCfg.Genesis.L2Time, e.RollupCfg.L2ChainID, 128_000, derive.Zlib) require.NoError(t, err) return channelOut } diff --git a/op-e2e/sequencer_failover_setup.go b/op-e2e/sequencer_failover_setup.go index 1e1617c06a87..5ef5635f467d 100644 --- a/op-e2e/sequencer_failover_setup.go +++ b/op-e2e/sequencer_failover_setup.go @@ -296,6 +296,7 @@ func setupBatcher(t *testing.T, sys *System, conductors map[string]*conductor) { BatchType: derive.SpanBatchType, DataAvailabilityType: batcherFlags.CalldataType, ActiveSequencerCheckDuration: 0, + CompressionAlgo: derive.Zlib, } batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"]) diff --git a/op-e2e/setup.go b/op-e2e/setup.go index 2840097ce442..3aa833f58c46 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -845,6 +845,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste Stopped: sys.Cfg.DisableBatcher, // Batch submitter may be enabled later BatchType: batchType, DataAvailabilityType: sys.Cfg.DataAvailabilityType, + CompressionAlgo: derive.Zlib, } // Batch Submitter batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"]) diff --git a/op-node/benchmarks/batchbuilding_test.go b/op-node/benchmarks/batchbuilding_test.go index 5a3ae2008b2b..7b87d72e09e3 100644 --- a/op-node/benchmarks/batchbuilding_test.go +++ b/op-node/benchmarks/batchbuilding_test.go @@ -58,13 +58,13 @@ type compressorAndTarget struct { } // channelOutByType returns a channel out of the given type as a helper for the benchmarks -func channelOutByType(batchType uint, compKey string) (derive.ChannelOut, error) { +func channelOutByType(batchType uint, compKey string, algo derive.CompressionAlgo) (derive.ChannelOut, error) { chainID := big.NewInt(333) if batchType == derive.SingularBatchType { return derive.NewSingularChannelOut(compressors[compKey].compressor) } if batchType == derive.SpanBatchType { - return derive.NewSpanChannelOut(0, chainID, compressors[compKey].targetOutput) + return derive.NewSpanChannelOut(0, chainID, compressors[compKey].targetOutput, algo) } return nil, fmt.Errorf("unsupported batch type: %d", batchType) } @@ -129,25 +129,28 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) { // to leverage optimizations in the Batch Linked List batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix()) } - b.Run(tc.String(), func(b *testing.B) { - // reset the compressor used in the test case - for bn := 0; bn < b.N; bn++ { - // don't measure the setup time - b.StopTimer() - compressors[tc.compKey].compressor.Reset() - cout, _ := channelOutByType(tc.BatchType, tc.compKey) - // add all but the final batch to the channel out - for i := 0; i < tc.BatchCount-1; i++ { - err := cout.AddSingularBatch(batches[i], 0) + for _, algo := range derive.CompressionAlgoTypes { + b.Run(tc.String()+"_"+algo.String(), func(b *testing.B) { + // reset the compressor used in the test case + for bn := 0; bn < b.N; bn++ { + // don't measure the setup time + b.StopTimer() + compressors[tc.compKey].compressor.Reset() + cout, _ := channelOutByType(tc.BatchType, tc.compKey, algo) + // add all but the final batch to the channel out + for i := 0; i < tc.BatchCount-1; i++ { + err := cout.AddSingularBatch(batches[i], 0) + require.NoError(b, err) + } + // measure the time to add the final batch + b.StartTimer() + // add the final batch to the channel out + err := cout.AddSingularBatch(batches[tc.BatchCount-1], 0) require.NoError(b, err) } - // measure the time to add the final batch - b.StartTimer() - // add the final batch to the channel out - err := cout.AddSingularBatch(batches[tc.BatchCount-1], 0) - require.NoError(b, err) - } - }) + }) + } + } } @@ -165,35 +168,37 @@ func BenchmarkIncremental(b *testing.B) { {derive.SpanBatchType, 5, 1, "RealBlindCompressor"}, //{derive.SingularBatchType, 100, 1, "RealShadowCompressor"}, } - for _, tc := range tcs { - cout, err := channelOutByType(tc.BatchType, tc.compKey) - if err != nil { - b.Fatal(err) - } - done := false - for base := 0; !done; base += tc.BatchCount { - rangeName := fmt.Sprintf("Incremental %s: %d-%d", tc.String(), base, base+tc.BatchCount) - b.Run(rangeName, func(b *testing.B) { - b.StopTimer() - // prepare the batches - t := time.Now() - batches := make([]*derive.SingularBatch, tc.BatchCount) - for i := 0; i < tc.BatchCount; i++ { - t := t.Add(time.Second) - batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID) - // set the timestamp to increase with each batch - // to leverage optimizations in the Batch Linked List - batches[i].Timestamp = uint64(t.Unix()) - } - b.StartTimer() - for i := 0; i < tc.BatchCount; i++ { - err := cout.AddSingularBatch(batches[i], 0) - if err != nil { - done = true - return + for _, algo := range derive.CompressionAlgoTypes { + for _, tc := range tcs { + cout, err := channelOutByType(tc.BatchType, tc.compKey, algo) + if err != nil { + b.Fatal(err) + } + done := false + for base := 0; !done; base += tc.BatchCount { + rangeName := fmt.Sprintf("Incremental %s-%s: %d-%d", algo, tc.String(), base, base+tc.BatchCount) + b.Run(rangeName+"_"+algo.String(), func(b *testing.B) { + b.StopTimer() + // prepare the batches + t := time.Now() + batches := make([]*derive.SingularBatch, tc.BatchCount) + for i := 0; i < tc.BatchCount; i++ { + t := t.Add(time.Second) + batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID) + // set the timestamp to increase with each batch + // to leverage optimizations in the Batch Linked List + batches[i].Timestamp = uint64(t.Unix()) } - } - }) + b.StartTimer() + for i := 0; i < tc.BatchCount; i++ { + err := cout.AddSingularBatch(batches[i], 0) + if err != nil { + done = true + return + } + } + }) + } } } } @@ -226,33 +231,35 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) { } } - for _, tc := range tests { - chainID := big.NewInt(333) - rng := rand.New(rand.NewSource(0x543331)) - // pre-generate batches to keep the benchmark from including the random generation - batches := make([]*derive.SingularBatch, tc.BatchCount) - t := time.Now() - for i := 0; i < tc.BatchCount; i++ { - batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID) - // set the timestamp to increase with each batch - // to leverage optimizations in the Batch Linked List - batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix()) - } - b.Run(tc.String(), func(b *testing.B) { - // reset the compressor used in the test case - for bn := 0; bn < b.N; bn++ { - // don't measure the setup time - b.StopTimer() - compressors[tc.compKey].compressor.Reset() - cout, _ := channelOutByType(tc.BatchType, tc.compKey) - b.StartTimer() - // add all batches to the channel out - for i := 0; i < tc.BatchCount; i++ { - err := cout.AddSingularBatch(batches[i], 0) - require.NoError(b, err) - } + for _, algo := range derive.CompressionAlgoTypes { + for _, tc := range tests { + chainID := big.NewInt(333) + rng := rand.New(rand.NewSource(0x543331)) + // pre-generate batches to keep the benchmark from including the random generation + batches := make([]*derive.SingularBatch, tc.BatchCount) + t := time.Now() + for i := 0; i < tc.BatchCount; i++ { + batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID) + // set the timestamp to increase with each batch + // to leverage optimizations in the Batch Linked List + batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix()) } - }) + b.Run(tc.String()+"_"+algo.String(), func(b *testing.B) { + // reset the compressor used in the test case + for bn := 0; bn < b.N; bn++ { + // don't measure the setup time + b.StopTimer() + compressors[tc.compKey].compressor.Reset() + cout, _ := channelOutByType(tc.BatchType, tc.compKey, algo) + b.StartTimer() + // add all batches to the channel out + for i := 0; i < tc.BatchCount; i++ { + err := cout.AddSingularBatch(batches[i], 0) + require.NoError(b, err) + } + } + }) + } } } diff --git a/op-node/cmd/batch_decoder/reassemble/reassemble.go b/op-node/cmd/batch_decoder/reassemble/reassemble.go index c4a76493944c..23b4323b3e0d 100644 --- a/op-node/cmd/batch_decoder/reassemble/reassemble.go +++ b/op-node/cmd/batch_decoder/reassemble/reassemble.go @@ -111,7 +111,7 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr var batchTypes []int invalidBatches := false if ch.IsReady() { - br, err := derive.BatchReader(ch.Reader(), spec.MaxRLPBytesPerChannel(ch.HighestBlock().Time)) + br, err := derive.BatchReader(ch.Reader(), spec.MaxRLPBytesPerChannel(ch.HighestBlock().Time), rollupCfg.IsFjord(ch.HighestBlock().Time)) if err == nil { for batchData, err := br(); err != io.EOF; batchData, err = br() { if err != nil { diff --git a/op-node/rollup/derive/channel.go b/op-node/rollup/derive/channel.go index f26afa681d28..58238133081e 100644 --- a/op-node/rollup/derive/channel.go +++ b/op-node/rollup/derive/channel.go @@ -1,15 +1,22 @@ package derive import ( + "bufio" "bytes" "compress/zlib" "fmt" "io" + "github.com/andybalholm/brotli" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum/go-ethereum/rlp" ) +const ( + ZlibCM8 = 8 + ZlibCM15 = 15 +) + // A Channel is a set of batches that are split into at least one, but possibly multiple frames. // Frames are allowed to be ingested out of order. // Each frame is ingested one by one. Once a frame with `closed` is added to the channel, the @@ -151,17 +158,44 @@ func (ch *Channel) Reader() io.Reader { // The L1Inclusion block is also provided at creation time. // Warning: the batch reader can read every batch-type. // The caller of the batch-reader should filter the results. -func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64) (func() (*BatchData, error), error) { - // Setup decompressor stage + RLP reader - zr, err := zlib.NewReader(r) +func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func() (*BatchData, error), error) { + // use buffered reader so can peek the first byte + bufReader := bufio.NewReader(r) + compressionType, err := bufReader.Peek(1) if err != nil { return nil, err } + + var zr io.Reader + // For zlib, the last 4 bits must be either 8 or 15 (both are reserved value) + if compressionType[0]&0x0F == ZlibCM8 || compressionType[0]&0x0F == ZlibCM15 { + var err error + zr, err = zlib.NewReader(bufReader) + if err != nil { + return nil, err + } + // If the bits equal to 1, then it is a brotli reader + } else if compressionType[0] == ChannelVersionBrotli { + // If before Fjord, we cannot accept brotli compressed batch + if !isFjord { + return nil, fmt.Errorf("cannot accept brotli compressed batch before Fjord") + } + // discard the first byte + _, err := bufReader.Discard(1) + if err != nil { + return nil, err + } + zr = brotli.NewReader(bufReader) + } else { + return nil, fmt.Errorf("cannot distinguish the compression algo used given type byte %v", compressionType[0]) + } + + // Setup decompressor stage + RLP reader rlpReader := rlp.NewStream(zr, maxRLPBytesPerChannel) // Read each batch iteratively return func() (*BatchData, error) { var batchData BatchData - if err = rlpReader.Decode(&batchData); err != nil { + if err := rlpReader.Decode(&batchData); err != nil { return nil, err } return &batchData, nil diff --git a/op-node/rollup/derive/channel_compressor.go b/op-node/rollup/derive/channel_compressor.go new file mode 100644 index 000000000000..341dd13d825e --- /dev/null +++ b/op-node/rollup/derive/channel_compressor.go @@ -0,0 +1,94 @@ +package derive + +import ( + "bytes" + "compress/zlib" + "fmt" + "io" + + "github.com/andybalholm/brotli" +) + +const ( + ChannelVersionBrotli byte = 0x01 +) + +type ChannelCompressor interface { + Write([]byte) (int, error) + Flush() error + Close() error + Reset() + Len() int + Read([]byte) (int, error) + GetCompressed() *bytes.Buffer +} + +type CompressorWriter interface { + Write([]byte) (int, error) + Flush() error + Close() error + Reset(io.Writer) +} + +type BaseChannelCompressor struct { + compressed *bytes.Buffer + CompressorWriter +} + +func (bcc *BaseChannelCompressor) Len() int { + return bcc.compressed.Len() +} + +func (bcc *BaseChannelCompressor) Read(p []byte) (int, error) { + return bcc.compressed.Read(p) +} + +func (bcc *BaseChannelCompressor) GetCompressed() *bytes.Buffer { + return bcc.compressed +} + +type ZlibCompressor struct { + BaseChannelCompressor +} + +func (zc *ZlibCompressor) Reset() { + zc.compressed.Reset() + zc.CompressorWriter.Reset(zc.compressed) +} + +type BrotliCompressor struct { + BaseChannelCompressor +} + +func (bc *BrotliCompressor) Reset() { + bc.compressed.Reset() + bc.compressed.WriteByte(ChannelVersionBrotli) + bc.CompressorWriter.Reset(bc.compressed) +} + +func NewChannelCompressor(algo CompressionAlgo) (ChannelCompressor, error) { + compressed := &bytes.Buffer{} + if algo == Zlib { + writer, err := zlib.NewWriterLevel(compressed, zlib.BestCompression) + if err != nil { + return nil, err + } + return &ZlibCompressor{ + BaseChannelCompressor{ + CompressorWriter: writer, + compressed: compressed, + }, + }, nil + } else if algo.IsBrotli() { + compressed.WriteByte(ChannelVersionBrotli) + writer := brotli.NewWriterLevel(compressed, GetBrotliLevel(algo)) + return &BrotliCompressor{ + BaseChannelCompressor{ + CompressorWriter: writer, + compressed: compressed, + }, + }, nil + } else { + return nil, fmt.Errorf("unsupported compression algorithm: %s", algo) + } +} diff --git a/op-node/rollup/derive/channel_compressor_test.go b/op-node/rollup/derive/channel_compressor_test.go new file mode 100644 index 000000000000..3224b89d9808 --- /dev/null +++ b/op-node/rollup/derive/channel_compressor_test.go @@ -0,0 +1,67 @@ +package derive + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/require" +) + +var r = rand.New(rand.NewSource(99)) + +func randomBytes(length int) []byte { + b := make([]byte, length) + _, err := r.Read(b) + // Rand.Read always returns nil error + if err != nil { + panic(err) + } + return b +} + +func TestChannelCompressor_NewReset(t *testing.T) { + testCases := []struct { + name string + algo CompressionAlgo + expectedResetSize int + expectErr bool + }{ + { + name: "zlib", + algo: Zlib, + expectedResetSize: 0, + }, + { + name: "brotli10", + algo: Brotli10, + expectedResetSize: 1, + }, + { + name: "zstd", + algo: CompressionAlgo("zstd"), + expectedResetSize: 0, + expectErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + scc, err := NewChannelCompressor(tc.algo) + if tc.expectErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedResetSize, scc.Len()) + + _, err = scc.Write(randomBytes(10)) + require.NoError(t, err) + err = scc.Flush() + require.NoError(t, err) + require.Greater(t, scc.Len(), tc.expectedResetSize) + + scc.Reset() + require.Equal(t, tc.expectedResetSize, scc.Len()) + }) + } +} diff --git a/op-node/rollup/derive/channel_in_reader.go b/op-node/rollup/derive/channel_in_reader.go index 31a5746bb737..cd739e95fb34 100644 --- a/op-node/rollup/derive/channel_in_reader.go +++ b/op-node/rollup/derive/channel_in_reader.go @@ -44,7 +44,7 @@ func (cr *ChannelInReader) Origin() eth.L1BlockRef { // TODO: Take full channel for better logging func (cr *ChannelInReader) WriteChannel(data []byte) error { - if f, err := BatchReader(bytes.NewBuffer(data), cr.spec.MaxRLPBytesPerChannel(cr.prev.Origin().Time)); err == nil { + if f, err := BatchReader(bytes.NewBuffer(data), cr.spec.MaxRLPBytesPerChannel(cr.prev.Origin().Time), cr.cfg.IsFjord(cr.prev.Origin().Time)); err == nil { cr.nextBatchFn = f cr.metrics.RecordChannelInputBytes(len(data)) return nil diff --git a/op-node/rollup/derive/channel_out_test.go b/op-node/rollup/derive/channel_out_test.go index 7c4ae92204d0..00e0a8b14487 100644 --- a/op-node/rollup/derive/channel_out_test.go +++ b/op-node/rollup/derive/channel_out_test.go @@ -52,7 +52,7 @@ var channelTypes = []struct { { Name: "Span", ChannelOut: func(t *testing.T) ChannelOut { - cout, err := NewSpanChannelOut(0, big.NewInt(0), 128_000) + cout, err := NewSpanChannelOut(0, big.NewInt(0), 128_000, Zlib) require.NoError(t, err) return cout }, @@ -113,7 +113,7 @@ func TestOutputFrameNoEmptyLastFrame(t *testing.T) { // depending on the channel type, determine the size of the written data if span, ok := cout.(*SpanChannelOut); ok { - written = uint64(span.compressed.Len()) + written = uint64(span.compressor.Len()) } else if singular, ok := cout.(*SingularChannelOut); ok { written = uint64(singular.compress.Len()) } @@ -220,12 +220,12 @@ func TestBlockToBatchValidity(t *testing.T) { require.ErrorContains(t, err, "has no transactions") } -func SpanChannelAndBatches(t *testing.T, target uint64, len int) (*SpanChannelOut, []*SingularBatch) { +func SpanChannelAndBatches(t *testing.T, target uint64, len int, algo CompressionAlgo) (*SpanChannelOut, []*SingularBatch) { // target is larger than one batch, but smaller than two batches rng := rand.New(rand.NewSource(0x543331)) chainID := big.NewInt(rng.Int63n(1000)) txCount := 1 - cout, err := NewSpanChannelOut(0, chainID, target) + cout, err := NewSpanChannelOut(0, chainID, target, algo) require.NoError(t, err) batches := make([]*SingularBatch, len) // adding the first batch should not cause an error @@ -237,14 +237,33 @@ func SpanChannelAndBatches(t *testing.T, target uint64, len int) (*SpanChannelOu return cout, batches } +func TestSpanChannelOut(t *testing.T) { + tests := []struct { + name string + f func(t *testing.T, algo CompressionAlgo) + }{ + {"SpanChannelOutCompressionOnlyOneBatch", SpanChannelOutCompressionOnlyOneBatch}, + {"SpanChannelOutCompressionUndo", SpanChannelOutCompressionUndo}, + {"SpanChannelOutClose", SpanChannelOutClose}, + } + for _, test := range tests { + test := test + for _, algo := range CompressionAlgoTypes { + t.Run(test.name+"_"+algo.String(), func(t *testing.T) { + test.f(t, algo) + }) + } + } +} + // TestSpanChannelOutCompressionOnlyOneBatch tests that the SpanChannelOut compression works as expected when there is only one batch // and it is larger than the target size. The single batch should be compressed, and the channel should now be full -func TestSpanChannelOutCompressionOnlyOneBatch(t *testing.T) { - cout, singularBatches := SpanChannelAndBatches(t, 300, 2) +func SpanChannelOutCompressionOnlyOneBatch(t *testing.T, algo CompressionAlgo) { + cout, singularBatches := SpanChannelAndBatches(t, 300, 2, algo) err := cout.AddSingularBatch(singularBatches[0], 0) // confirm compression was not skipped - require.Greater(t, cout.compressed.Len(), 0) + require.Greater(t, cout.compressor.Len(), 0) require.NoError(t, err) // confirm the channel is full @@ -256,21 +275,25 @@ func TestSpanChannelOutCompressionOnlyOneBatch(t *testing.T) { } // TestSpanChannelOutCompressionUndo tests that the SpanChannelOut compression rejects a batch that would cause the channel to be overfull -func TestSpanChannelOutCompressionUndo(t *testing.T) { +func SpanChannelOutCompressionUndo(t *testing.T, algo CompressionAlgo) { // target is larger than one batch, but smaller than two batches - cout, singularBatches := SpanChannelAndBatches(t, 750, 2) + cout, singularBatches := SpanChannelAndBatches(t, 750, 2, algo) err := cout.AddSingularBatch(singularBatches[0], 0) require.NoError(t, err) // confirm that the first compression was skipped - require.Equal(t, 0, cout.compressed.Len()) + if algo == Zlib { + require.Equal(t, 0, cout.compressor.Len()) + } else { + require.Equal(t, 1, cout.compressor.Len()) // 1 because of brotli channel version + } // record the RLP length to confirm it doesn't change when adding a rejected batch rlp1 := cout.activeRLP().Len() err = cout.AddSingularBatch(singularBatches[1], 0) require.ErrorIs(t, err, ErrCompressorFull) // confirm that the second compression was not skipped - require.Greater(t, cout.compressed.Len(), 0) + require.Greater(t, cout.compressor.Len(), 0) // confirm that the second rlp is tht same size as the first (because the second batch was not added) require.Equal(t, rlp1, cout.activeRLP().Len()) @@ -278,14 +301,19 @@ func TestSpanChannelOutCompressionUndo(t *testing.T) { // TestSpanChannelOutClose tests that the SpanChannelOut compression works as expected when the channel is closed. // it should compress the batch even if it is smaller than the target size because the channel is closing -func TestSpanChannelOutClose(t *testing.T) { +func SpanChannelOutClose(t *testing.T, algo CompressionAlgo) { target := uint64(600) - cout, singularBatches := SpanChannelAndBatches(t, target, 1) + cout, singularBatches := SpanChannelAndBatches(t, target, 1, algo) err := cout.AddSingularBatch(singularBatches[0], 0) require.NoError(t, err) // confirm no compression has happened yet - require.Equal(t, 0, cout.compressed.Len()) + + if algo == Zlib { + require.Equal(t, 0, cout.compressor.Len()) + } else { + require.Equal(t, 1, cout.compressor.Len()) // 1 because of brotli channel version + } // confirm the RLP length is less than the target rlpLen := cout.activeRLP().Len() @@ -295,6 +323,6 @@ func TestSpanChannelOutClose(t *testing.T) { require.NoError(t, cout.Close()) // confirm that the only batch was compressed, and that the RLP did not change - require.Greater(t, cout.compressed.Len(), 0) + require.Greater(t, cout.compressor.Len(), 0) require.Equal(t, rlpLen, cout.activeRLP().Len()) } diff --git a/op-node/rollup/derive/channel_test.go b/op-node/rollup/derive/channel_test.go index fdd6e4065e25..d52fa84e6ca2 100644 --- a/op-node/rollup/derive/channel_test.go +++ b/op-node/rollup/derive/channel_test.go @@ -1,8 +1,14 @@ package derive import ( + "bytes" + "compress/zlib" + "math/big" + "math/rand" "testing" + "github.com/DataDog/zstd" + "github.com/andybalholm/brotli" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/stretchr/testify/require" ) @@ -99,3 +105,117 @@ func TestFrameValidity(t *testing.T) { t.Run(tc.name, tc.Run) } } + +func TestBatchReader(t *testing.T) { + // Get batch data + rng := rand.New(rand.NewSource(0x543331)) + singularBatch := RandomSingularBatch(rng, 20, big.NewInt(333)) + batchDataInput := NewBatchData(singularBatch) + + encodedBatch := &bytes.Buffer{} + err := batchDataInput.EncodeRLP(encodedBatch) + require.NoError(t, err) + + var testCases = []struct { + name string + algo func(buf *bytes.Buffer, t *testing.T) + isFjord bool + expectErr bool + }{ + { + name: "zlib-post-fjord", + algo: func(buf *bytes.Buffer, t *testing.T) { + writer := zlib.NewWriter(buf) + _, err := writer.Write(encodedBatch.Bytes()) + require.NoError(t, err) + writer.Close() + }, + isFjord: true, + }, + { + name: "zlib-pre-fjord", + algo: func(buf *bytes.Buffer, t *testing.T) { + writer := zlib.NewWriter(buf) + _, err := writer.Write(encodedBatch.Bytes()) + require.NoError(t, err) + writer.Close() + }, + isFjord: false, + }, + { + name: "brotli9-post-fjord", + algo: func(buf *bytes.Buffer, t *testing.T) { + buf.WriteByte(ChannelVersionBrotli) + writer := brotli.NewWriterLevel(buf, 9) + _, err := writer.Write(encodedBatch.Bytes()) + require.NoError(t, err) + writer.Close() + }, + isFjord: true, + }, + { + name: "brotli9-pre-fjord", + algo: func(buf *bytes.Buffer, t *testing.T) { + buf.WriteByte(ChannelVersionBrotli) + writer := brotli.NewWriterLevel(buf, 9) + _, err := writer.Write(encodedBatch.Bytes()) + require.NoError(t, err) + writer.Close() + }, + isFjord: false, + expectErr: true, // expect an error because brotli is not supported before Fjord + }, + { + name: "brotli10-post-fjord", + algo: func(buf *bytes.Buffer, t *testing.T) { + buf.WriteByte(ChannelVersionBrotli) + writer := brotli.NewWriterLevel(buf, 10) + _, err := writer.Write(encodedBatch.Bytes()) + require.NoError(t, err) + writer.Close() + }, + isFjord: true, + }, + { + name: "brotli11-post-fjord", + algo: func(buf *bytes.Buffer, t *testing.T) { + buf.WriteByte(ChannelVersionBrotli) + writer := brotli.NewWriterLevel(buf, 11) + _, err := writer.Write(encodedBatch.Bytes()) + require.NoError(t, err) + writer.Close() + }, + isFjord: true, + }, + { + name: "zstd-post-fjord", + algo: func(buf *bytes.Buffer, t *testing.T) { + writer := zstd.NewWriter(buf) + _, err := writer.Write(encodedBatch.Bytes()) + require.NoError(t, err) + writer.Close() + }, + expectErr: true, + isFjord: true, + }} + + for _, tc := range testCases { + compressed := new(bytes.Buffer) + tc := tc + t.Run(tc.name, func(t *testing.T) { + tc.algo(compressed, t) + reader, err := BatchReader(bytes.NewReader(compressed.Bytes()), 120000, tc.isFjord) + if tc.expectErr { + require.Error(t, err) + return + } + require.NoError(t, err) + + // read the batch data + batchData, err := reader() + require.NoError(t, err) + require.NotNil(t, batchData) + require.Equal(t, batchDataInput, batchData) + }) + } +} diff --git a/op-node/rollup/derive/span_channel_out.go b/op-node/rollup/derive/span_channel_out.go index a93f71db48d2..8e02b553781a 100644 --- a/op-node/rollup/derive/span_channel_out.go +++ b/op-node/rollup/derive/span_channel_out.go @@ -2,7 +2,7 @@ package derive import ( "bytes" - "compress/zlib" + "crypto/rand" "fmt" "io" @@ -26,10 +26,8 @@ type SpanChannelOut struct { // lastCompressedRLPSize tracks the *uncompressed* size of the last RLP buffer that was compressed // it is used to measure the growth of the RLP buffer when adding a new batch to optimize compression lastCompressedRLPSize int - // compressed contains compressed data for making output frames - compressed *bytes.Buffer - // compress is the zlib writer for the channel - compressor *zlib.Writer + // the compressor for the channel + compressor ChannelCompressor // target is the target size of the compressed data target uint64 // closed indicates if the channel is closed @@ -49,22 +47,23 @@ func (co *SpanChannelOut) setRandomID() error { return err } -func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSize uint64) (*SpanChannelOut, error) { +func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSize uint64, compressionAlgo CompressionAlgo) (*SpanChannelOut, error) { c := &SpanChannelOut{ - id: ChannelID{}, - frame: 0, - spanBatch: NewSpanBatch(genesisTimestamp, chainID), - rlp: [2]*bytes.Buffer{{}, {}}, - compressed: &bytes.Buffer{}, - target: targetOutputSize, + id: ChannelID{}, + frame: 0, + spanBatch: NewSpanBatch(genesisTimestamp, chainID), + rlp: [2]*bytes.Buffer{{}, {}}, + target: targetOutputSize, } var err error if err = c.setRandomID(); err != nil { return nil, err } - if c.compressor, err = zlib.NewWriterLevel(c.compressed, zlib.BestCompression); err != nil { + + if c.compressor, err = NewChannelCompressor(compressionAlgo); err != nil { return nil, err } + return c, nil } @@ -75,8 +74,7 @@ func (co *SpanChannelOut) Reset() error { co.rlp[0].Reset() co.rlp[1].Reset() co.lastCompressedRLPSize = 0 - co.compressed.Reset() - co.compressor.Reset(co.compressed) + co.compressor.Reset() co.spanBatch = NewSpanBatch(co.spanBatch.GenesisTimestamp, co.spanBatch.ChainID) // setting the new randomID is the only part of the reset that can fail return co.setRandomID() @@ -153,7 +151,7 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) // if the compressed data *plus* the new rlp data is under the target size, return early // this optimizes out cases where the compressor will obviously come in under the target size rlpGrowth := co.activeRLP().Len() - co.lastCompressedRLPSize - if uint64(co.compressed.Len()+rlpGrowth) < co.target { + if uint64(co.compressor.Len()+rlpGrowth) < co.target { return nil } @@ -186,8 +184,7 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) // compress compresses the active RLP buffer and checks if the compressed data is over the target size. // it resets all the compression buffers because Span Batches aren't meant to be compressed incrementally. func (co *SpanChannelOut) compress() error { - co.compressed.Reset() - co.compressor.Reset(co.compressed) + co.compressor.Reset() if _, err := co.compressor.Write(co.activeRLP().Bytes()); err != nil { return err } @@ -207,7 +204,7 @@ func (co *SpanChannelOut) InputBytes() int { // Span Channel Out does not provide early output, so this will always be 0 until the channel is closed or full func (co *SpanChannelOut) ReadyBytes() int { if co.closed || co.FullErr() != nil { - return co.compressed.Len() + return co.compressor.Len() } return 0 } @@ -225,7 +222,7 @@ func (co *SpanChannelOut) checkFull() { if co.full != nil { return } - if uint64(co.compressed.Len()) >= co.target { + if uint64(co.compressor.Len()) >= co.target { co.full = ErrCompressorFull } } @@ -264,7 +261,7 @@ func (co *SpanChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, f := createEmptyFrame(co.id, co.frame, co.ReadyBytes(), co.closed, maxSize) - if _, err := io.ReadFull(co.compressed, f.Data); err != nil { + if _, err := io.ReadFull(co.compressor.GetCompressed(), f.Data); err != nil { return 0, err } diff --git a/op-node/rollup/derive/types.go b/op-node/rollup/derive/types.go new file mode 100644 index 000000000000..a17c1c9a9a6a --- /dev/null +++ b/op-node/rollup/derive/types.go @@ -0,0 +1,68 @@ +package derive + +import ( + "fmt" + "regexp" +) + +type CompressionAlgo string + +const ( + // compression algo types + Zlib CompressionAlgo = "zlib" + Brotli9 CompressionAlgo = "brotli-9" + Brotli10 CompressionAlgo = "brotli-10" + Brotli11 CompressionAlgo = "brotli-11" +) + +var CompressionAlgoTypes = []CompressionAlgo{ + Zlib, + Brotli9, + Brotli10, + Brotli11, +} + +var brotliRegexp = regexp.MustCompile(`^brotli-(9|10|11)$`) + +func (algo CompressionAlgo) String() string { + return string(algo) +} + +func (algo *CompressionAlgo) Set(value string) error { + if !ValidCompressionAlgoType(CompressionAlgo(value)) { + return fmt.Errorf("unknown compression algo type: %q", value) + } + *algo = CompressionAlgo(value) + return nil +} + +func (algo *CompressionAlgo) Clone() any { + cpy := *algo + return &cpy +} + +func (algo *CompressionAlgo) IsBrotli() bool { + return brotliRegexp.MatchString(algo.String()) +} + +func GetBrotliLevel(algo CompressionAlgo) int { + switch algo { + case Brotli9: + return 9 + case Brotli10: + return 10 + case Brotli11: + return 11 + default: + panic("Unsupported brotli level") + } +} + +func ValidCompressionAlgoType(value CompressionAlgo) bool { + for _, k := range CompressionAlgoTypes { + if k == value { + return true + } + } + return false +} diff --git a/op-node/rollup/derive/types_test.go b/op-node/rollup/derive/types_test.go new file mode 100644 index 000000000000..5b9c1d94ed50 --- /dev/null +++ b/op-node/rollup/derive/types_test.go @@ -0,0 +1,58 @@ +package derive + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCompressionAlgo(t *testing.T) { + testCases := []struct { + name string + algo CompressionAlgo + isBrotli bool + isValidCompressionAlgoType bool + }{ + { + name: "zlib", + algo: Zlib, + isBrotli: false, + isValidCompressionAlgoType: true, + }, + { + name: "brotli-9", + algo: Brotli9, + isBrotli: true, + isValidCompressionAlgoType: true, + }, + { + name: "brotli-10", + algo: Brotli10, + isBrotli: true, + isValidCompressionAlgoType: true, + }, + { + name: "brotli-11", + algo: Brotli11, + isBrotli: true, + isValidCompressionAlgoType: true, + }, + { + name: "invalid", + algo: CompressionAlgo("invalid"), + isBrotli: false, + isValidCompressionAlgoType: false, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.isBrotli, tc.algo.IsBrotli()) + if tc.isBrotli { + require.NotPanics(t, func() { GetBrotliLevel((tc.algo)) }) + } else { + require.Panics(t, func() { GetBrotliLevel(tc.algo) }) + } + require.Equal(t, tc.isValidCompressionAlgoType, ValidCompressionAlgoType(tc.algo)) + }) + } +}