Skip to content

Commit

Permalink
Merge branch 'master' into redo-block-owner
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Oct 31, 2023
2 parents 5a64656 + afe4331 commit e7fc85d
Show file tree
Hide file tree
Showing 19 changed files with 401 additions and 156 deletions.
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ linters-settings:
excludes:
- G404
- G601
- G108
- G114

issues:
exclude-rules:
Expand Down
33 changes: 18 additions & 15 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
}
if c.Consistent != nil {
res.Consistent = &config.ConsistentConfig{
Level: c.Consistent.Level,
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
Level: c.Consistent.Level,
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
MetaFlushIntervalInMs: c.Consistent.MetaFlushIntervalInMs,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
}
}
if c.Sink != nil {
Expand Down Expand Up @@ -741,11 +742,12 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
}
if cloned.Consistent != nil {
res.Consistent = &ConsistentConfig{
Level: cloned.Consistent.Level,
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
Level: cloned.Consistent.Level,
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
MetaFlushIntervalInMs: cloned.Consistent.MetaFlushIntervalInMs,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
}
}
if cloned.Mounter != nil {
Expand Down Expand Up @@ -933,11 +935,12 @@ type ColumnSelector struct {
// ConsistentConfig represents replication consistency config for a changefeed
// This is a duplicate of config.ConsistentConfig
type ConsistentConfig struct {
Level string `json:"level,omitempty"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
Storage string `json:"storage,omitempty"`
UseFileBackend bool `json:"use_file_backend"`
Level string `json:"level,omitempty"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
MetaFlushIntervalInMs int64 `json:"meta_flush_interval"`
Storage string `json:"storage,omitempty"`
UseFileBackend bool `json:"use_file_backend"`
}

// ChangefeedSchedulerConfig is per changefeed scheduler settings.
Expand Down
11 changes: 6 additions & 5 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ var defaultAPIConfig = &ReplicaConfig{
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
},
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
},
Scheduler: &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: config.GetDefaultReplicaConfig().
Expand Down
7 changes: 4 additions & 3 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,9 +537,10 @@ func TestRemoveChangefeed(t *testing.T) {
dir := t.TempDir()
info.SinkURI = "mysql://"
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Expand Down
11 changes: 6 additions & 5 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,12 @@ func newProcessor4Test(
tmpDir := t.TempDir()
redoDir := fmt.Sprintf("%s/%s", tmpDir, changefeedID)
dmlMgr := redo.NewDMLManager(changefeedID, &config.ConsistentConfig{
Level: string(redoPkg.ConsistentLevelEventual),
MaxLogSize: redoPkg.DefaultMaxLogSize,
FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs,
Storage: "file://" + redoDir,
UseFileBackend: false,
Level: string(redoPkg.ConsistentLevelEventual),
MaxLogSize: redoPkg.DefaultMaxLogSize,
FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redoPkg.DefaultMetaFlushIntervalInMs,
Storage: "file://" + redoDir,
UseFileBackend: false,
})
p.redo.r = dmlMgr
}
Expand Down
23 changes: 16 additions & 7 deletions cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type metaManager struct {
lastFlushTime time.Time
cfg *config.ConsistentConfig
metricFlushLogDuration prometheus.Observer

flushIntervalInMs int64
}

// NewDisabledMetaManager creates a disabled Meta Manager.
Expand All @@ -92,14 +94,21 @@ func NewMetaManager(
return &metaManager{enabled: false}
}

return &metaManager{
captureID: config.GetGlobalServerConfig().AdvertiseAddr,
changeFeedID: changefeedID,
uuidGenerator: uuid.NewGenerator(),
enabled: true,
cfg: cfg,
startTs: checkpoint,
m := &metaManager{
captureID: config.GetGlobalServerConfig().AdvertiseAddr,
changeFeedID: changefeedID,
uuidGenerator: uuid.NewGenerator(),
enabled: true,
cfg: cfg,
flushIntervalInMs: cfg.MetaFlushIntervalInMs,
}

if m.flushIntervalInMs < redo.MinFlushIntervalInMs {
log.Warn("redo flush interval is too small, use default value",
zap.Int64("interval", m.flushIntervalInMs))
m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs
}
return m
}

// Enabled returns whether this log manager is enabled
Expand Down
27 changes: 15 additions & 12 deletions cdc/redo/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ func TestInitAndWriteMeta(t *testing.T) {

startTs := uint64(10)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
m := NewMetaManager(changefeedID, cfg, startTs)

Expand Down Expand Up @@ -150,10 +151,11 @@ func TestPreCleanupAndWriteMeta(t *testing.T) {

startTs := uint64(10)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
m := NewMetaManager(changefeedID, cfg, startTs)

Expand Down Expand Up @@ -280,10 +282,11 @@ func TestGCAndCleanup(t *testing.T) {

startTs := uint64(3)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
m := NewMetaManager(changefeedID, cfg, startTs)

Expand Down
16 changes: 7 additions & 9 deletions cdc/sink/ddlsink/mq/mq_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// partition-number is 2, so only send DDL events to 2 partitions.
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&max-message-bytes=1048576&partition-num=2" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol"
uri := fmt.Sprintf(uriTemplate, "127.0.0.1:9092", kafka.DefaultMockTopicName)

Expand Down Expand Up @@ -89,10 +90,9 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) {
err = s.WriteDDLEvent(ctx, ddl)
require.NoError(t, err)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(),
3, "All partitions should be broadcast")
2, "All partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 2), 1)
}

func TestWriteDDLEventToZeroPartition(t *testing.T) {
Expand Down Expand Up @@ -144,8 +144,9 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// partition-num is set to 2, so send checkpoint to 2 partitions.
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&max-message-bytes=1048576&partition-num=2" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" +
"&protocol=canal-json&enable-tidb-extension=true"
uri := fmt.Sprintf(uriTemplate, "127.0.0.1:9092", kafka.DefaultMockTopicName)
Expand All @@ -169,10 +170,9 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) {
require.Nil(t, err)

require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(),
3, "All partitions should be broadcast")
2, "All partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 2), 1)
}

func TestWriteCheckpointTsToTableTopics(t *testing.T) {
Expand Down Expand Up @@ -233,10 +233,8 @@ func TestWriteCheckpointTsToTableTopics(t *testing.T) {
require.NoError(t, err)

require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(),
6, "All topics and partitions should be broadcast")
4, "All topics and partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 2), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person1", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person2", 0), 1)
Expand Down
21 changes: 19 additions & 2 deletions cdc/sink/dmlsink/mq/manager/kafka_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
type kafkaTopicManager struct {
changefeedID model.ChangeFeedID

defaultTopic string

admin kafka.ClusterAdminClient

cfg *kafka.AutoCreateTopicConfig
Expand All @@ -55,11 +57,13 @@ type kafkaTopicManager struct {
// NewKafkaTopicManager creates a new topic manager.
func NewKafkaTopicManager(
ctx context.Context,
defaultTopic string,
changefeedID model.ChangeFeedID,
admin kafka.ClusterAdminClient,
cfg *kafka.AutoCreateTopicConfig,
) *kafkaTopicManager {
mgr := &kafkaTopicManager{
defaultTopic: defaultTopic,
changefeedID: changefeedID,
admin: admin,
cfg: cfg,
Expand Down Expand Up @@ -165,6 +169,15 @@ func (m *kafkaTopicManager) fetchAllTopicsPartitionsNum(
return nil, err
}

// it may happen the following case:
// 1. user create the default topic with partition number set as 3 manually
// 2. set the partition-number as 2 in the sink-uri.
// in the such case, we should use 2 instead of 3 as the partition number.
_, ok := numPartitions[m.defaultTopic]
if ok {
numPartitions[m.defaultTopic] = m.cfg.PartitionNum
}

log.Info(
"Kafka admin client describe topics success",
zap.String("namespace", m.changefeedID.Namespace),
Expand Down Expand Up @@ -271,8 +284,12 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(
return 0, errors.Trace(err)
}
if detail, ok := topicDetails[topicName]; ok {
m.tryUpdatePartitionsAndLogging(topicName, detail.NumPartitions)
return detail.NumPartitions, nil
numPartition := detail.NumPartitions
if topicName == m.defaultTopic {
numPartition = m.cfg.PartitionNum
}
m.tryUpdatePartitionsAndLogging(topicName, numPartition)
return numPartition, nil
}

partitionNum, err := m.createTopic(ctx, topicName)
Expand Down
Loading

0 comments on commit e7fc85d

Please sign in to comment.