Skip to content

Commit

Permalink
redo(ticdc): redo lz4 compression (#10171)
Browse files Browse the repository at this point in the history
close #10176
  • Loading branch information
sdojjy authored Nov 30, 2023
1 parent e13b048 commit 07e50f7
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 51 deletions.
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
FlushWorkerNum: c.Consistent.FlushWorkerNum,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
Compression: c.Consistent.Compression,
}
}
if c.Sink != nil {
Expand Down Expand Up @@ -763,6 +764,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
FlushWorkerNum: c.Consistent.FlushWorkerNum,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
Compression: cloned.Consistent.Compression,
}
}
if cloned.Mounter != nil {
Expand Down Expand Up @@ -959,6 +961,7 @@ type ConsistentConfig struct {
FlushWorkerNum int `json:"flush_worker_num"`
Storage string `json:"storage,omitempty"`
UseFileBackend bool `json:"use_file_backend"`
Compression string `json:"compression,omitempty"`
}

// ChangefeedSchedulerConfig is per changefeed scheduler settings.
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ type Column struct {
Default interface{} `json:"default" msg:"-"`

// ApproximateBytes is approximate bytes consumed by the column.
ApproximateBytes int `json:"-"`
ApproximateBytes int `json:"-" msg:"-"`
}

// RedoColumn stores Column change
Expand Down
35 changes: 5 additions & 30 deletions cdc/model/sink_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions cdc/redo/reader/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tiflow/cdc/model/codec"
"github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/cdc/redo/writer/file"
"github.com/pingcap/tiflow/pkg/compression"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"go.uber.org/zap"
Expand All @@ -52,6 +53,9 @@ const (
defaultWorkerNum = 16
)

// lz4MagicNumber is the magic number of lz4 compressed data
var lz4MagicNumber = []byte{0x04, 0x22, 0x4D, 0x18}

type fileReader interface {
io.Closer
// Read return the log from log file
Expand Down Expand Up @@ -203,6 +207,13 @@ func selectDownLoadFile(
return files, nil
}

func isLZ4Compressed(data []byte) bool {
if len(data) < 4 {
return false
}
return bytes.Equal(data[:4], lz4MagicNumber)
}

func readAllFromBuffer(buf []byte) (logHeap, error) {
r := &reader{
br: bytes.NewReader(buf),
Expand Down Expand Up @@ -251,6 +262,12 @@ func sortAndWriteFile(
log.Warn("download file is empty", zap.String("file", fileName))
return nil
}
// it's lz4 compressed, decompress it
if isLZ4Compressed(fileContent) {
if fileContent, err = compression.Decode(compression.LZ4, fileContent); err != nil {
return err
}
}

// sort data
h, err := readAllFromBuffer(fileContent)
Expand Down
87 changes: 67 additions & 20 deletions cdc/redo/writer/memory/file_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@
package memory

import (
"bytes"
"context"
"fmt"
"io"
"sync"
"time"

"github.com/pierrec/lz4/v4"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/pkg/compression"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/uuid"
Expand All @@ -34,24 +38,32 @@ import (

type fileCache struct {
data []byte
fileSize int64
maxCommitTs model.Ts
// After memoryWriter become stable, this field would be used to
// avoid traversing log files.
minCommitTs model.Ts

filename string
flushed chan struct{}
writer *dataWriter
}

func newFileCache(event *polymorphicRedoEvent, buf []byte) *fileCache {
buf = buf[:0]
buf = append(buf, event.data.Bytes()...)
return &fileCache{
data: buf,
maxCommitTs: event.commitTs,
minCommitTs: event.commitTs,
flushed: make(chan struct{}),
type dataWriter struct {
buf *bytes.Buffer
writer io.Writer
closer io.Closer
}

func (w *dataWriter) Write(p []byte) (n int, err error) {
return w.writer.Write(p)
}

func (w *dataWriter) Close() error {
if w.closer != nil {
return w.closer.Close()
}
return nil
}

func (f *fileCache) waitFlushed(ctx context.Context) error {
Expand All @@ -71,14 +83,19 @@ func (f *fileCache) markFlushed() {
}
}

func (f *fileCache) appendData(event *polymorphicRedoEvent) {
f.data = append(f.data, event.data.Bytes()...)
func (f *fileCache) appendData(event *polymorphicRedoEvent) error {
_, err := f.writer.Write(event.data.Bytes())
if err != nil {
return err
}
f.fileSize += int64(event.data.Len())
if event.commitTs > f.maxCommitTs {
f.maxCommitTs = event.commitTs
}
if event.commitTs < f.minCommitTs {
f.minCommitTs = event.commitTs
}
return nil
}

type fileWorkerGroup struct {
Expand Down Expand Up @@ -176,7 +193,10 @@ func (f *fileWorkerGroup) bgFlushFileCache(egCtx context.Context) error {
return errors.Trace(egCtx.Err())
case file := <-f.flushCh:
start := time.Now()
err := f.extStorage.WriteFile(egCtx, file.filename, file.data)
if err := file.writer.Close(); err != nil {
return errors.Trace(err)
}
err := f.extStorage.WriteFile(egCtx, file.filename, file.writer.buf.Bytes())
f.metricFlushAllDuration.Observe(time.Since(start).Seconds())
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -219,10 +239,40 @@ func (f *fileWorkerGroup) bgWriteLogs(
}

// newFileCache write event to a new file cache.
func (f *fileWorkerGroup) newFileCache(event *polymorphicRedoEvent) {
func (f *fileWorkerGroup) newFileCache(event *polymorphicRedoEvent) error {
bufPtr := f.pool.Get().(*[]byte)
file := newFileCache(event, *bufPtr)
buf := *bufPtr
buf = buf[:0]
var (
wr io.Writer
closer io.Closer
)
bufferWriter := bytes.NewBuffer(buf)
wr = bufferWriter
if f.cfg.Compression == compression.LZ4 {
wr = lz4.NewWriter(bufferWriter)
closer = wr.(io.Closer)
}
_, err := wr.Write(event.data.Bytes())
if err != nil {
return errors.Trace(err)
}

dw := &dataWriter{
buf: bufferWriter,
writer: wr,
closer: closer,
}
file := &fileCache{
data: buf,
fileSize: int64(len(event.data.Bytes())),
maxCommitTs: event.commitTs,
minCommitTs: event.commitTs,
flushed: make(chan struct{}),
writer: dw,
}
f.files = append(f.files, file)
return nil
}

func (f *fileWorkerGroup) writeToCache(
Expand All @@ -236,25 +286,22 @@ func (f *fileWorkerGroup) writeToCache(
defer f.metricWriteBytes.Add(float64(writeLen))

if len(f.files) == 0 {
f.newFileCache(event)
return nil
return f.newFileCache(event)
}

file := f.files[len(f.files)-1]
if int64(len(file.data))+writeLen > f.cfg.MaxLogSizeInBytes {
if file.fileSize+writeLen > f.cfg.MaxLogSizeInBytes {
file.filename = f.getLogFileName(file.maxCommitTs)
select {
case <-egCtx.Done():
return errors.Trace(egCtx.Err())
case f.flushCh <- file:
}

f.newFileCache(event)
return nil
return f.newFileCache(event)
}

file.appendData(event)
return nil
return file.appendData(event)
}

func (f *fileWorkerGroup) flushAll(egCtx context.Context) error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const (
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"compression": "",
"encoding-worker-num": 16,
"flush-worker-num": 8,
"storage": "",
Expand Down Expand Up @@ -310,6 +311,7 @@ const (
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"compression": "",
"encoding-worker-num": 16,
"flush-worker-num": 8,
"storage": "",
Expand Down Expand Up @@ -462,6 +464,7 @@ const (
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"compression": "",
"encoding-worker-num": 16,
"flush-worker-num": 8,
"storage": "",
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/consistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"

"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/pkg/compression"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/util"
Expand All @@ -32,6 +33,7 @@ type ConsistentConfig struct {
FlushWorkerNum int `toml:"flush-worker-num" json:"flush-worker-num"`
Storage string `toml:"storage" json:"storage"`
UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"`
Compression string `toml:"compression" json:"compression"`
}

// ValidateAndAdjust validates the consistency config and adjusts it if necessary.
Expand Down Expand Up @@ -61,6 +63,11 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
fmt.Sprintf("The consistent.meta-flush-interval:%d must be equal or greater than %d",
c.MetaFlushIntervalInMs, redo.MinFlushIntervalInMs))
}
if len(c.Compression) > 0 &&
c.Compression != compression.None && c.Compression != compression.LZ4 {
return cerror.ErrInvalidReplicaConfig.FastGenByArgs(
fmt.Sprintf("The consistent.compression:%s must be 'none' or 'lz4'", c.Compression))
}

if c.EncodingWorkerNum == 0 {
c.EncodingWorkerNum = redo.DefaultEncodingWorkerNum
Expand Down
1 change: 1 addition & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ var defaultReplicaConfig = &ReplicaConfig{
FlushWorkerNum: redo.DefaultFlushWorkerNum,
Storage: "",
UseFileBackend: false,
Compression: "",
},
Scheduler: &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: false,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[consistent]
level = "eventual"
storage = "file:///tmp/tidb_cdc_test/consistent_replicate_storage_file/redo"
compression = "lz4"

0 comments on commit 07e50f7

Please sign in to comment.