Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/release-7.1' into cherry-pick-…
Browse files Browse the repository at this point in the history
…10227-to-release-7.1

# Conflicts:
#	cdc/api/v2/model.go
#	cdc/redo/writer/memory/file_worker.go
#	pkg/config/consistent.go
  • Loading branch information
sdojjy committed Dec 7, 2023
2 parents 46224ff + de95a8e commit ae2fc6c
Show file tree
Hide file tree
Showing 50 changed files with 2,903 additions and 661 deletions.
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
FileIndexWidth: c.Sink.FileIndexWidth,
EnableKafkaSinkV2: c.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: c.Sink.OnlyOutputUpdatedColumns,
ContentCompatible: c.Sink.ContentCompatible,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
CloudStorageConfig: cloudStorageConfig,
Expand Down Expand Up @@ -621,6 +622,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
FileIndexWidth: cloned.Sink.FileIndexWidth,
EnableKafkaSinkV2: cloned.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: cloned.Sink.OnlyOutputUpdatedColumns,
ContentCompatible: cloned.Sink.ContentCompatible,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
CloudStorageConfig: cloudStorageConfig,
Expand Down Expand Up @@ -783,6 +785,7 @@ type SinkConfig struct {
EnableKafkaSinkV2 bool `json:"enable_kafka_sink_v2"`
OnlyOutputUpdatedColumns *bool `json:"only_output_updated_columns"`
SafeMode *bool `json:"safe_mode,omitempty"`
ContentCompatible *bool `json:"content_compatible"`
KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"`
MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ type Column struct {
Default interface{} `json:"default" msg:"-"`

// ApproximateBytes is approximate bytes consumed by the column.
ApproximateBytes int `json:"-"`
ApproximateBytes int `json:"-" msg:"-"`
}

// RedoColumn stores Column change
Expand Down
35 changes: 5 additions & 30 deletions cdc/model/sink_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 18 additions & 9 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,6 @@ func (m *ddlManager) tick(
}

for _, event := range events {
// If changefeed is in BDRMode, skip ddl.
if m.BDRMode {
log.Info("changefeed is in BDRMode, skip a ddl event",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("ID", m.changfeedID.ID),
zap.Any("ddlEvent", event))
continue
}

// TODO: find a better place to do this check
// check if the ddl event is belong to an ineligible table.
// If so, we should ignore it.
Expand Down Expand Up @@ -348,6 +339,24 @@ func (m *ddlManager) executeDDL(ctx context.Context) error {
if m.executingDDL == nil {
return nil
}

// If changefeed is in BDRMode, skip ddl.
if m.BDRMode {
log.Info("changefeed is in BDRMode, skip a ddl event",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("ID", m.changfeedID.ID),
zap.Any("ddlEvent", m.executingDDL))
tableName := m.executingDDL.TableInfo.TableName
// Set it to nil first to accelerate GC.
m.pendingDDLs[tableName][0] = nil
m.pendingDDLs[tableName] = m.pendingDDLs[tableName][1:]
m.schema.DoGC(m.executingDDL.CommitTs - 1)
m.justSentDDL = m.executingDDL
m.executingDDL = nil
m.cleanCache()
return nil
}

failpoint.Inject("ExecuteNotDone", func() {
// This ddl will never finish executing.
// It is used to test the logic that a ddl only block the related table
Expand Down
16 changes: 13 additions & 3 deletions cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type pullerImpl struct {
cfg *config.ServerConfig
lastForwardTime time.Time
lastForwardResolvedTs uint64
// startResolvedTs is the resolvedTs when puller is initialized
startResolvedTs uint64
}

// New create a new Puller fetch event start from checkpointTs and put into buf.
Expand Down Expand Up @@ -117,6 +119,8 @@ func New(ctx context.Context,
tableID: tableID,
tableName: tableName,
cfg: cfg,

startResolvedTs: checkpointTs,
}
return p
}
Expand Down Expand Up @@ -184,7 +188,7 @@ func (p *pullerImpl) Run(ctx context.Context) error {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-stuckDetectorTicker.C:
if err := p.detectResolvedTsStuck(initialized); err != nil {
if err := p.detectResolvedTsStuck(); err != nil {
return errors.Trace(err)
}
continue
Expand Down Expand Up @@ -247,9 +251,15 @@ func (p *pullerImpl) Run(ctx context.Context) error {
return g.Wait()
}

func (p *pullerImpl) detectResolvedTsStuck(initialized bool) error {
if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection && initialized {
func (p *pullerImpl) detectResolvedTsStuck() error {
if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection {
resolvedTs := p.tsTracker.Frontier()
// check if the resolvedTs is advancing,
// If the resolvedTs in Frontier is less than startResolvedTs, it means that the incremental scan has
// not complete yet. We need to make no decision in this scenario.
if resolvedTs <= p.startResolvedTs {
return nil
}
if resolvedTs == p.lastForwardResolvedTs {
log.Warn("ResolvedTs stuck detected in puller",
zap.String("namespace", p.changefeed.Namespace),
Expand Down
10 changes: 10 additions & 0 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,18 @@ func (m *logManager) Run(ctx context.Context, _ ...chan<- error) error {

func (m *logManager) getFlushDuration() time.Duration {
flushIntervalInMs := m.cfg.FlushIntervalInMs
defaultFlushIntervalInMs := redo.DefaultFlushIntervalInMs
if m.cfg.LogType == redo.RedoDDLLogFileType {
flushIntervalInMs = m.cfg.MetaFlushIntervalInMs
defaultFlushIntervalInMs = redo.DefaultMetaFlushIntervalInMs
}
if flushIntervalInMs < redo.MinFlushIntervalInMs {
log.Warn("redo flush interval is too small, use default value",
zap.Stringer("namespace", m.cfg.ChangeFeedID),
zap.Int("default", defaultFlushIntervalInMs),
zap.String("logType", m.cfg.LogType),
zap.Int64("interval", flushIntervalInMs))
flushIntervalInMs = int64(defaultFlushIntervalInMs)
}
return time.Duration(flushIntervalInMs) * time.Millisecond
}
Expand Down
4 changes: 4 additions & 0 deletions cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func (m *metaManager) preStart(ctx context.Context) error {
}
// "nfs" and "local" scheme are converted to "file" scheme
redo.FixLocalScheme(uri)
// blackhole scheme is converted to "noop" scheme here, so we can use blackhole for testing
if redo.IsBlackholeStorage(uri.Scheme) {
uri, _ = storage.ParseRawURL("noop://")
}
extStorage, err := redo.InitExternalStorage(ctx, *uri)
if err != nil {
return err
Expand Down
17 changes: 17 additions & 0 deletions cdc/redo/reader/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tiflow/cdc/model/codec"
"github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/cdc/redo/writer/file"
"github.com/pingcap/tiflow/pkg/compression"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"go.uber.org/zap"
Expand All @@ -52,6 +53,9 @@ const (
defaultWorkerNum = 16
)

// lz4MagicNumber is the magic number of lz4 compressed data
var lz4MagicNumber = []byte{0x04, 0x22, 0x4D, 0x18}

type fileReader interface {
io.Closer
// Read return the log from log file
Expand Down Expand Up @@ -203,6 +207,13 @@ func selectDownLoadFile(
return files, nil
}

func isLZ4Compressed(data []byte) bool {
if len(data) < 4 {
return false
}
return bytes.Equal(data[:4], lz4MagicNumber)
}

func readAllFromBuffer(buf []byte) (logHeap, error) {
r := &reader{
br: bytes.NewReader(buf),
Expand Down Expand Up @@ -251,6 +262,12 @@ func sortAndWriteFile(
log.Warn("download file is empty", zap.String("file", fileName))
return nil
}
// it's lz4 compressed, decompress it
if isLZ4Compressed(fileContent) {
if fileContent, err = compression.Decode(compression.LZ4, fileContent); err != nil {
return err
}
}

// sort data
h, err := readAllFromBuffer(fileContent)
Expand Down
Loading

0 comments on commit ae2fc6c

Please sign in to comment.