Skip to content

Commit

Permalink
redo(ticdc): redo lz4 compression (#10171) (#10216)
Browse files Browse the repository at this point in the history
close #10176
  • Loading branch information
ti-chi-bot authored Dec 1, 2023
1 parent 873a2ff commit da0e717
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 53 deletions.
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
FlushWorkerNum: c.Consistent.FlushWorkerNum,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
Compression: c.Consistent.Compression,
}
}
if c.Sink != nil {
Expand Down Expand Up @@ -500,6 +501,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
FlushWorkerNum: c.Consistent.FlushWorkerNum,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
Compression: cloned.Consistent.Compression,
}
}
if cloned.Mounter != nil {
Expand Down Expand Up @@ -687,6 +689,7 @@ type ConsistentConfig struct {
FlushWorkerNum int `json:"flush_worker_num"`
Storage string `json:"storage,omitempty"`
UseFileBackend bool `json:"use_file_backend"`
Compression string `json:"compression,omitempty"`
}

// ChangefeedSchedulerConfig is per changefeed scheduler settings.
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ type Column struct {
Default interface{} `json:"default" msg:"-"`

// ApproximateBytes is approximate bytes consumed by the column.
ApproximateBytes int `json:"-"`
ApproximateBytes int `json:"-" msg:"-"`
}

// RedoColumn stores Column change
Expand Down
35 changes: 5 additions & 30 deletions cdc/model/sink_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions cdc/redo/reader/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/cdc/redo/writer/file"
"github.com/pingcap/tiflow/pkg/compression"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"go.uber.org/zap"
Expand All @@ -51,6 +52,9 @@ const (
defaultWorkerNum = 16
)

// lz4MagicNumber is the magic number of lz4 compressed data
var lz4MagicNumber = []byte{0x04, 0x22, 0x4D, 0x18}

type fileReader interface {
io.Closer
// Read return the log from log file
Expand Down Expand Up @@ -202,6 +206,13 @@ func selectDownLoadFile(
return files, nil
}

func isLZ4Compressed(data []byte) bool {
if len(data) < 4 {
return false
}
return bytes.Equal(data[:4], lz4MagicNumber)
}

func readAllFromBuffer(buf []byte) (logHeap, error) {
r := &reader{
br: bytes.NewReader(buf),
Expand Down Expand Up @@ -250,6 +261,12 @@ func sortAndWriteFile(
log.Warn("download file is empty", zap.String("file", fileName))
return nil
}
// it's lz4 compressed, decompress it
if isLZ4Compressed(fileContent) {
if fileContent, err = compression.Decode(compression.LZ4, fileContent); err != nil {
return err
}
}

// sort data
h, err := readAllFromBuffer(fileContent)
Expand Down
87 changes: 67 additions & 20 deletions cdc/redo/writer/memory/file_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@
package memory

import (
"bytes"
"context"
"fmt"
"io"
"sync"
"time"

"github.com/pierrec/lz4/v4"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/pkg/compression"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/uuid"
Expand All @@ -34,24 +38,32 @@ import (

type fileCache struct {
data []byte
fileSize int64
maxCommitTs model.Ts
// After memoryWriter become stable, this field would be used to
// avoid traversing log files.
minCommitTs model.Ts

filename string
flushed chan struct{}
writer *dataWriter
}

func newFileCache(event *polymorphicRedoEvent, buf []byte) *fileCache {
buf = buf[:0]
buf = append(buf, event.data.Bytes()...)
return &fileCache{
data: buf,
maxCommitTs: event.commitTs,
minCommitTs: event.commitTs,
flushed: make(chan struct{}),
type dataWriter struct {
buf *bytes.Buffer
writer io.Writer
closer io.Closer
}

func (w *dataWriter) Write(p []byte) (n int, err error) {
return w.writer.Write(p)
}

func (w *dataWriter) Close() error {
if w.closer != nil {
return w.closer.Close()
}
return nil
}

func (f *fileCache) waitFlushed(ctx context.Context) error {
Expand All @@ -71,14 +83,19 @@ func (f *fileCache) markFlushed() {
}
}

func (f *fileCache) appendData(event *polymorphicRedoEvent) {
f.data = append(f.data, event.data.Bytes()...)
func (f *fileCache) appendData(event *polymorphicRedoEvent) error {
_, err := f.writer.Write(event.data.Bytes())
if err != nil {
return err
}
f.fileSize += int64(event.data.Len())
if event.commitTs > f.maxCommitTs {
f.maxCommitTs = event.commitTs
}
if event.commitTs < f.minCommitTs {
f.minCommitTs = event.commitTs
}
return nil
}

type fileWorkerGroup struct {
Expand Down Expand Up @@ -170,7 +187,10 @@ func (f *fileWorkerGroup) bgFlushFileCache(egCtx context.Context) error {
return errors.Trace(egCtx.Err())
case file := <-f.flushCh:
start := time.Now()
err := f.extStorage.WriteFile(egCtx, file.filename, file.data)
if err := file.writer.Close(); err != nil {
return errors.Trace(err)
}
err := f.extStorage.WriteFile(egCtx, file.filename, file.writer.buf.Bytes())
f.metricFlushAllDuration.Observe(time.Since(start).Seconds())
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -213,10 +233,40 @@ func (f *fileWorkerGroup) bgWriteLogs(
}

// newFileCache write event to a new file cache.
func (f *fileWorkerGroup) newFileCache(event *polymorphicRedoEvent) {
func (f *fileWorkerGroup) newFileCache(event *polymorphicRedoEvent) error {
bufPtr := f.pool.Get().(*[]byte)
file := newFileCache(event, *bufPtr)
buf := *bufPtr
buf = buf[:0]
var (
wr io.Writer
closer io.Closer
)
bufferWriter := bytes.NewBuffer(buf)
wr = bufferWriter
if f.cfg.Compression == compression.LZ4 {
wr = lz4.NewWriter(bufferWriter)
closer = wr.(io.Closer)
}
_, err := wr.Write(event.data.Bytes())
if err != nil {
return errors.Trace(err)
}

dw := &dataWriter{
buf: bufferWriter,
writer: wr,
closer: closer,
}
file := &fileCache{
data: buf,
fileSize: int64(len(event.data.Bytes())),
maxCommitTs: event.commitTs,
minCommitTs: event.commitTs,
flushed: make(chan struct{}),
writer: dw,
}
f.files = append(f.files, file)
return nil
}

func (f *fileWorkerGroup) writeToCache(
Expand All @@ -230,25 +280,22 @@ func (f *fileWorkerGroup) writeToCache(
defer f.metricWriteBytes.Add(float64(writeLen))

if len(f.files) == 0 {
f.newFileCache(event)
return nil
return f.newFileCache(event)
}

file := f.files[len(f.files)-1]
if int64(len(file.data))+writeLen > f.cfg.MaxLogSizeInBytes {
if file.fileSize+writeLen > f.cfg.MaxLogSizeInBytes {
file.filename = f.getLogFileName(file.maxCommitTs)
select {
case <-egCtx.Done():
return errors.Trace(egCtx.Err())
case f.flushCh <- file:
}

f.newFileCache(event)
return nil
return f.newFileCache(event)
}

file.appendData(event)
return nil
return file.appendData(event)
}

func (f *fileWorkerGroup) flushAll(egCtx context.Context) error {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ require (
github.com/jarcoal/httpmock v1.2.0
github.com/jmoiron/sqlx v1.3.3
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d
github.com/klauspost/compress v1.15.14
github.com/labstack/gommon v0.3.0
github.com/linkedin/goavro/v2 v2.11.1
github.com/mailru/easyjson v0.7.7
github.com/mattn/go-shellwords v1.0.12
github.com/modern-go/reflect2 v1.0.2
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pierrec/lz4/v4 v4.1.17
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
Expand Down Expand Up @@ -201,7 +203,6 @@ require (
github.com/jonboulle/clockwork v0.3.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.14 // indirect
github.com/klauspost/cpuid v1.3.1 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/kr/pretty v0.3.0 // indirect
Expand Down Expand Up @@ -229,7 +230,6 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/philhofer/fwd v1.1.1 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pingcap/badger v1.5.1-0.20220314162537-ab58fbf40580 // indirect
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 // indirect
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect
Expand Down
Loading

0 comments on commit da0e717

Please sign in to comment.