diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 83dff1266d3..32ab8db8bf0 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -271,6 +271,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( FlushWorkerNum: c.Consistent.FlushWorkerNum, Storage: c.Consistent.Storage, UseFileBackend: c.Consistent.UseFileBackend, + Compression: c.Consistent.Compression, } } if c.Sink != nil { @@ -500,6 +501,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { FlushWorkerNum: c.Consistent.FlushWorkerNum, Storage: cloned.Consistent.Storage, UseFileBackend: cloned.Consistent.UseFileBackend, + Compression: cloned.Consistent.Compression, } } if cloned.Mounter != nil { @@ -687,6 +689,7 @@ type ConsistentConfig struct { FlushWorkerNum int `json:"flush_worker_num"` Storage string `json:"storage,omitempty"` UseFileBackend bool `json:"use_file_backend"` + Compression string `json:"compression,omitempty"` } // ChangefeedSchedulerConfig is per changefeed scheduler settings. diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 3615a8a867d..6ea9234ee7d 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -537,7 +537,7 @@ type Column struct { Default interface{} `json:"default" msg:"-"` // ApproximateBytes is approximate bytes consumed by the column. - ApproximateBytes int `json:"-"` + ApproximateBytes int `json:"-" msg:"-"` } // RedoColumn stores Column change diff --git a/cdc/model/sink_gen.go b/cdc/model/sink_gen.go index 330bae9a4f3..80b563424c9 100644 --- a/cdc/model/sink_gen.go +++ b/cdc/model/sink_gen.go @@ -54,12 +54,6 @@ func (z *Column) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Value") return } - case "ApproximateBytes": - z.ApproximateBytes, err = dc.ReadInt() - if err != nil { - err = msgp.WrapError(err, "ApproximateBytes") - return - } default: err = dc.Skip() if err != nil { @@ -73,9 +67,9 @@ func (z *Column) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *Column) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 6 + // map header, size 5 // write "name" - err = en.Append(0x86, 0xa4, 0x6e, 0x61, 0x6d, 0x65) + err = en.Append(0x85, 0xa4, 0x6e, 0x61, 0x6d, 0x65) if err != nil { return } @@ -124,25 +118,15 @@ func (z *Column) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Value") return } - // write "ApproximateBytes" - err = en.Append(0xb0, 0x41, 0x70, 0x70, 0x72, 0x6f, 0x78, 0x69, 0x6d, 0x61, 0x74, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73) - if err != nil { - return - } - err = en.WriteInt(z.ApproximateBytes) - if err != nil { - err = msgp.WrapError(err, "ApproximateBytes") - return - } return } // MarshalMsg implements msgp.Marshaler func (z *Column) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 6 + // map header, size 5 // string "name" - o = append(o, 0x86, 0xa4, 0x6e, 0x61, 0x6d, 0x65) + o = append(o, 0x85, 0xa4, 0x6e, 0x61, 0x6d, 0x65) o = msgp.AppendString(o, z.Name) // string "type" o = append(o, 0xa4, 0x74, 0x79, 0x70, 0x65) @@ -160,9 +144,6 @@ func (z *Column) MarshalMsg(b []byte) (o []byte, err error) { err = msgp.WrapError(err, "Value") return } - // string "ApproximateBytes" - o = append(o, 0xb0, 0x41, 0x70, 0x70, 0x72, 0x6f, 0x78, 0x69, 0x6d, 0x61, 0x74, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73) - o = msgp.AppendInt(o, z.ApproximateBytes) return } @@ -214,12 +195,6 @@ func (z *Column) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Value") return } - case "ApproximateBytes": - z.ApproximateBytes, bts, err = msgp.ReadIntBytes(bts) - if err != nil { - err = msgp.WrapError(err, "ApproximateBytes") - return - } default: bts, err = msgp.Skip(bts) if err != nil { @@ -234,7 +209,7 @@ func (z *Column) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *Column) Msgsize() (s int) { - s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 5 + msgp.ByteSize + 8 + msgp.StringPrefixSize + len(z.Charset) + 10 + msgp.StringPrefixSize + len(z.Collation) + 6 + msgp.GuessSize(z.Value) + 17 + msgp.IntSize + s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 5 + msgp.ByteSize + 8 + msgp.StringPrefixSize + len(z.Charset) + 10 + msgp.StringPrefixSize + len(z.Collation) + 6 + msgp.GuessSize(z.Value) return } diff --git a/cdc/redo/reader/file.go b/cdc/redo/reader/file.go index 71d723835ce..ae9976ef912 100644 --- a/cdc/redo/reader/file.go +++ b/cdc/redo/reader/file.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/writer" "github.com/pingcap/tiflow/cdc/redo/writer/file" + "github.com/pingcap/tiflow/pkg/compression" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" "go.uber.org/zap" @@ -51,6 +52,9 @@ const ( defaultWorkerNum = 16 ) +// lz4MagicNumber is the magic number of lz4 compressed data +var lz4MagicNumber = []byte{0x04, 0x22, 0x4D, 0x18} + type fileReader interface { io.Closer // Read return the log from log file @@ -202,6 +206,13 @@ func selectDownLoadFile( return files, nil } +func isLZ4Compressed(data []byte) bool { + if len(data) < 4 { + return false + } + return bytes.Equal(data[:4], lz4MagicNumber) +} + func readAllFromBuffer(buf []byte) (logHeap, error) { r := &reader{ br: bytes.NewReader(buf), @@ -250,6 +261,12 @@ func sortAndWriteFile( log.Warn("download file is empty", zap.String("file", fileName)) return nil } + // it's lz4 compressed, decompress it + if isLZ4Compressed(fileContent) { + if fileContent, err = compression.Decode(compression.LZ4, fileContent); err != nil { + return err + } + } // sort data h, err := readAllFromBuffer(fileContent) diff --git a/cdc/redo/writer/memory/file_worker.go b/cdc/redo/writer/memory/file_worker.go index 3258bf169c4..3571a102c9e 100644 --- a/cdc/redo/writer/memory/file_worker.go +++ b/cdc/redo/writer/memory/file_worker.go @@ -14,16 +14,20 @@ package memory import ( + "bytes" "context" "fmt" + "io" "sync" "time" + "github.com/pierrec/lz4/v4" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/cdc/redo/writer" + "github.com/pingcap/tiflow/pkg/compression" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/uuid" @@ -34,6 +38,7 @@ import ( type fileCache struct { data []byte + fileSize int64 maxCommitTs model.Ts // After memoryWriter become stable, this field would be used to // avoid traversing log files. @@ -41,17 +46,24 @@ type fileCache struct { filename string flushed chan struct{} + writer *dataWriter } -func newFileCache(event *polymorphicRedoEvent, buf []byte) *fileCache { - buf = buf[:0] - buf = append(buf, event.data.Bytes()...) - return &fileCache{ - data: buf, - maxCommitTs: event.commitTs, - minCommitTs: event.commitTs, - flushed: make(chan struct{}), +type dataWriter struct { + buf *bytes.Buffer + writer io.Writer + closer io.Closer +} + +func (w *dataWriter) Write(p []byte) (n int, err error) { + return w.writer.Write(p) +} + +func (w *dataWriter) Close() error { + if w.closer != nil { + return w.closer.Close() } + return nil } func (f *fileCache) waitFlushed(ctx context.Context) error { @@ -71,14 +83,19 @@ func (f *fileCache) markFlushed() { } } -func (f *fileCache) appendData(event *polymorphicRedoEvent) { - f.data = append(f.data, event.data.Bytes()...) +func (f *fileCache) appendData(event *polymorphicRedoEvent) error { + _, err := f.writer.Write(event.data.Bytes()) + if err != nil { + return err + } + f.fileSize += int64(event.data.Len()) if event.commitTs > f.maxCommitTs { f.maxCommitTs = event.commitTs } if event.commitTs < f.minCommitTs { f.minCommitTs = event.commitTs } + return nil } type fileWorkerGroup struct { @@ -170,7 +187,10 @@ func (f *fileWorkerGroup) bgFlushFileCache(egCtx context.Context) error { return errors.Trace(egCtx.Err()) case file := <-f.flushCh: start := time.Now() - err := f.extStorage.WriteFile(egCtx, file.filename, file.data) + if err := file.writer.Close(); err != nil { + return errors.Trace(err) + } + err := f.extStorage.WriteFile(egCtx, file.filename, file.writer.buf.Bytes()) f.metricFlushAllDuration.Observe(time.Since(start).Seconds()) if err != nil { return errors.Trace(err) @@ -213,10 +233,40 @@ func (f *fileWorkerGroup) bgWriteLogs( } // newFileCache write event to a new file cache. -func (f *fileWorkerGroup) newFileCache(event *polymorphicRedoEvent) { +func (f *fileWorkerGroup) newFileCache(event *polymorphicRedoEvent) error { bufPtr := f.pool.Get().(*[]byte) - file := newFileCache(event, *bufPtr) + buf := *bufPtr + buf = buf[:0] + var ( + wr io.Writer + closer io.Closer + ) + bufferWriter := bytes.NewBuffer(buf) + wr = bufferWriter + if f.cfg.Compression == compression.LZ4 { + wr = lz4.NewWriter(bufferWriter) + closer = wr.(io.Closer) + } + _, err := wr.Write(event.data.Bytes()) + if err != nil { + return errors.Trace(err) + } + + dw := &dataWriter{ + buf: bufferWriter, + writer: wr, + closer: closer, + } + file := &fileCache{ + data: buf, + fileSize: int64(len(event.data.Bytes())), + maxCommitTs: event.commitTs, + minCommitTs: event.commitTs, + flushed: make(chan struct{}), + writer: dw, + } f.files = append(f.files, file) + return nil } func (f *fileWorkerGroup) writeToCache( @@ -230,12 +280,11 @@ func (f *fileWorkerGroup) writeToCache( defer f.metricWriteBytes.Add(float64(writeLen)) if len(f.files) == 0 { - f.newFileCache(event) - return nil + return f.newFileCache(event) } file := f.files[len(f.files)-1] - if int64(len(file.data))+writeLen > f.cfg.MaxLogSizeInBytes { + if file.fileSize+writeLen > f.cfg.MaxLogSizeInBytes { file.filename = f.getLogFileName(file.maxCommitTs) select { case <-egCtx.Done(): @@ -243,12 +292,10 @@ func (f *fileWorkerGroup) writeToCache( case f.flushCh <- file: } - f.newFileCache(event) - return nil + return f.newFileCache(event) } - file.appendData(event) - return nil + return file.appendData(event) } func (f *fileWorkerGroup) flushAll(egCtx context.Context) error { diff --git a/go.mod b/go.mod index 420fc16a958..242519f70ac 100644 --- a/go.mod +++ b/go.mod @@ -49,12 +49,14 @@ require ( github.com/jarcoal/httpmock v1.2.0 github.com/jmoiron/sqlx v1.3.3 github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d + github.com/klauspost/compress v1.15.14 github.com/labstack/gommon v0.3.0 github.com/linkedin/goavro/v2 v2.11.1 github.com/mailru/easyjson v0.7.7 github.com/mattn/go-shellwords v1.0.12 github.com/modern-go/reflect2 v1.0.2 github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 + github.com/pierrec/lz4/v4 v4.1.17 github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278 github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 @@ -201,7 +203,6 @@ require ( github.com/jonboulle/clockwork v0.3.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.15.14 // indirect github.com/klauspost/cpuid v1.3.1 // indirect github.com/klauspost/cpuid/v2 v2.2.4 // indirect github.com/kr/pretty v0.3.0 // indirect @@ -229,7 +230,6 @@ require ( github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/philhofer/fwd v1.1.1 // indirect - github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pingcap/badger v1.5.1-0.20220314162537-ab58fbf40580 // indirect github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 // indirect github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect diff --git a/pkg/compression/compress.go b/pkg/compression/compress.go new file mode 100644 index 00000000000..dab5fa3e570 --- /dev/null +++ b/pkg/compression/compress.go @@ -0,0 +1,112 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package compression + +import ( + "bytes" + "fmt" + "sync" + + "github.com/klauspost/compress/snappy" + "github.com/pierrec/lz4/v4" + "github.com/pingcap/errors" +) + +const ( + // None no compression + None string = "none" + + // Snappy compression + Snappy string = "snappy" + + // LZ4 compression + LZ4 string = "lz4" +) + +var ( + lz4ReaderPool = sync.Pool{ + New: func() interface{} { + return lz4.NewReader(nil) + }, + } + + bufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + } +) + +// Supported return true if the given compression is supported. +func Supported(cc string) bool { + switch cc { + case None, Snappy, LZ4: + return true + } + return false +} + +// Encode the given data by the given compression codec. +func Encode(cc string, data []byte) ([]byte, error) { + switch cc { + case None: + return data, nil + case Snappy: + return snappy.Encode(nil, data), nil + case LZ4: + var buf bytes.Buffer + writer := lz4.NewWriter(&buf) + if _, err := writer.Write(data); err != nil { + return nil, errors.Trace(err) + } + if err := writer.Close(); err != nil { + return nil, errors.Trace(err) + } + return buf.Bytes(), nil + default: + } + + return nil, errors.New(fmt.Sprintf("Unsupported compression %s", cc)) +} + +// Decode the given data by the given compression codec. +func Decode(cc string, data []byte) ([]byte, error) { + switch cc { + case None: + return data, nil + case Snappy: + return snappy.Decode(nil, data) + case LZ4: + reader, ok := lz4ReaderPool.Get().(*lz4.Reader) + if !ok { + reader = lz4.NewReader(bytes.NewReader(data)) + } else { + reader.Reset(bytes.NewReader(data)) + } + buffer := bufferPool.Get().(*bytes.Buffer) + _, err := buffer.ReadFrom(reader) + // copy the buffer to a new slice with the correct length + // reuse lz4Reader and buffer + lz4ReaderPool.Put(reader) + res := make([]byte, buffer.Len()) + copy(res, buffer.Bytes()) + buffer.Reset() + bufferPool.Put(buffer) + + return res, err + default: + } + + return nil, errors.New(fmt.Sprintf("Unsupported compression %s", cc)) +} diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index c8d606662c4..a4fc933eb3a 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -61,6 +61,7 @@ const ( "max-log-size": 64, "flush-interval": 2000, "meta-flush-interval": 200, + "compression": "", "encoding-worker-num": 16, "flush-worker-num": 8, "storage": "", @@ -218,6 +219,7 @@ const ( "max-log-size": 64, "flush-interval": 2000, "meta-flush-interval": 200, + "compression": "", "encoding-worker-num": 16, "flush-worker-num": 8, "storage": "", @@ -283,6 +285,7 @@ const ( "max-log-size": 64, "flush-interval": 2000, "meta-flush-interval": 200, + "compression": "", "encoding-worker-num": 16, "flush-worker-num": 8, "storage": "", diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index 14021e2074a..26f4950b4b3 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -17,6 +17,7 @@ import ( "fmt" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tiflow/pkg/compression" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/util" @@ -32,6 +33,7 @@ type ConsistentConfig struct { FlushWorkerNum int `toml:"flush-worker-num" json:"flush-worker-num"` Storage string `toml:"storage" json:"storage"` UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"` + Compression string `toml:"compression" json:"compression"` } // ValidateAndAdjust validates the consistency config and adjusts it if necessary. @@ -61,6 +63,11 @@ func (c *ConsistentConfig) ValidateAndAdjust() error { fmt.Sprintf("The consistent.meta-flush-interval:%d must be equal or greater than %d", c.MetaFlushIntervalInMs, redo.MinFlushIntervalInMs)) } + if len(c.Compression) > 0 && + c.Compression != compression.None && c.Compression != compression.LZ4 { + return cerror.ErrInvalidReplicaConfig.FastGenByArgs( + fmt.Sprintf("The consistent.compression:%s must be 'none' or 'lz4'", c.Compression)) + } if c.EncodingWorkerNum == 0 { c.EncodingWorkerNum = redo.DefaultEncodingWorkerNum diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 502dadff428..520a34f48db 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -81,6 +81,7 @@ var defaultReplicaConfig = &ReplicaConfig{ FlushWorkerNum: redo.DefaultFlushWorkerNum, Storage: "", UseFileBackend: false, + Compression: "", }, ChangefeedErrorStuckDuration: time.Minute * 30, SQLMode: defaultSQLMode, diff --git a/tests/integration_tests/consistent_replicate_storage_file/conf/changefeed.toml b/tests/integration_tests/consistent_replicate_storage_file/conf/changefeed.toml index dadf83cf9fd..12c6fb0d09b 100644 --- a/tests/integration_tests/consistent_replicate_storage_file/conf/changefeed.toml +++ b/tests/integration_tests/consistent_replicate_storage_file/conf/changefeed.toml @@ -1,3 +1,4 @@ [consistent] level = "eventual" storage = "file:///tmp/tidb_cdc_test/consistent_replicate_storage_file/redo" +compression = "lz4"