Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec(ticdc): simple support large message handle functionality #10251

Merged
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c3e319a
simple support encode large message handle .
3AceShowHand Dec 5, 2023
1e93072
adjust ddl.
3AceShowHand Dec 6, 2023
7133cac
add compression related unit test.
3AceShowHand Dec 6, 2023
e53f5c0
move canal test util to the utils, it should be shared among all prot…
3AceShowHand Dec 6, 2023
78f4b39
Merge branch 'master' into simple-support-large-message-handle
3AceShowHand Dec 8, 2023
0858e40
fix tests.
3AceShowHand Dec 8, 2023
b361329
add more unit test.
3AceShowHand Dec 8, 2023
58a4a5b
simple support encode large message handle .
3AceShowHand Dec 8, 2023
842b941
adjust log.
3AceShowHand Dec 11, 2023
031f597
fix integration test.
3AceShowHand Dec 11, 2023
e018113
fix tests.
3AceShowHand Dec 11, 2023
c6da186
use test.
3AceShowHand Dec 11, 2023
00340c7
Revert "ddlManager (ticdc): add bootstrap sending function (#10045)"
3AceShowHand Dec 12, 2023
d448af9
Merge branch 'master' into simple-support-large-message-handle
3AceShowHand Dec 12, 2023
658a398
remove useless sink type.
3AceShowHand Dec 12, 2023
02034aa
tiny adjust
3AceShowHand Dec 12, 2023
cf2db54
fix read data from tidb.
3AceShowHand Dec 12, 2023
d36b453
fix read data from tidb.
3AceShowHand Dec 12, 2023
f3fb23b
add more logs to debug the scheduler.
3AceShowHand Dec 13, 2023
335ab0a
debug read data from tidb.
3AceShowHand Dec 13, 2023
76a7adc
debug read data from tidb.
3AceShowHand Dec 13, 2023
8e78670
debug read data from tidb.
3AceShowHand Dec 13, 2023
ce12346
debug read data from tidb.
3AceShowHand Dec 13, 2023
fdea735
debug read data from tidb.
3AceShowHand Dec 13, 2023
4ad0774
fix read data from tidb.
3AceShowHand Dec 13, 2023
aad8b0a
Merge branch 'master' into simple-support-large-message-handle
3AceShowHand Dec 15, 2023
519f8b1
fix build
3AceShowHand Dec 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 0 additions & 63 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,42 +571,6 @@ func (info *ChangeFeedInfo) updateSinkURIAndConfigProtocol(uri *url.URL, newProt
info.Config.Sink.Protocol = util.AddressOf(newProtocol)
}

// DownstreamType returns the type of the downstream.
func (info *ChangeFeedInfo) DownstreamType() (DownstreamType, error) {
uri, err := url.Parse(info.SinkURI)
if err != nil {
return Unknown, errors.Trace(err)
}
if sink.IsMySQLCompatibleScheme(uri.Scheme) {
return DB, nil
}
if sink.IsMQScheme(uri.Scheme) {
return MQ, nil
}
if sink.IsStorageScheme(uri.Scheme) {
return Storage, nil
}
return Unknown, nil
}

// NeedSendBootstrapEvent returns true if the changefeed need to send bootstrap event.
func (info *ChangeFeedInfo) NeedSendBootstrapEvent() (bool, error) {
downStreamType, err := info.DownstreamType()
if err != nil {
return false, errors.Trace(err)
}
if downStreamType != MQ {
return false, nil
}
if info.Config.Sink.Protocol == nil {
return false, nil
}
if *info.Config.Sink.Protocol == config.ProtocolSimple.String() {
return true, nil
}
return false, nil
}

func (info *ChangeFeedInfo) fixMemoryQuota() {
info.Config.FixMemoryQuota()
}
Expand All @@ -615,33 +579,6 @@ func (info *ChangeFeedInfo) fixScheduler(inheritV66 bool) {
info.Config.FixScheduler(inheritV66)
}

// DownstreamType is the type of downstream.
type DownstreamType int

const (
// DB is the type of Database.
DB DownstreamType = iota
// MQ is the type of MQ or Cloud Storage.
MQ
// Storage is the type of Cloud Storage.
Storage
// Unknown is the type of Unknown.
Unknown
)

// String implements fmt.Stringer interface.
func (t DownstreamType) String() string {
switch t {
case DB:
return "DB"
case MQ:
return "MQ"
case Storage:
return "Storage"
}
return "Unknown"
}

// ChangeFeedStatusForAPI uses to transfer the status of changefeed for API.
type ChangeFeedStatusForAPI struct {
ResolvedTs uint64 `json:"resolved-ts"`
Expand Down
15 changes: 1 addition & 14 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,16 +634,6 @@ LOOP2:
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID))

downstreamType, err := c.latestInfo.DownstreamType()
if err != nil {
return errors.Trace(err)
}

needSendBootstrapEvent, err := c.latestInfo.NeedSendBootstrapEvent()
if err != nil {
return errors.Trace(err)
}

c.ddlManager = newDDLManager(
c.id,
ddlStartTs,
Expand All @@ -653,10 +643,7 @@ LOOP2:
c.schema,
c.redoDDLMgr,
c.redoMetaMgr,
downstreamType,
util.GetOrZero(c.latestInfo.Config.BDRMode),
needSendBootstrapEvent,
)
util.GetOrZero(c.latestInfo.Config.BDRMode))

// create scheduler
cfg := *c.cfg
Expand Down
4 changes: 0 additions & 4 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,6 @@ func (m *mockDDLSink) emitCheckpointTs(ts uint64, tables []*model.TableInfo) {
m.mu.currentTables = tables
}

func (m *mockDDLSink) emitBootstrapEvent(ctx context.Context, ddl *model.DDLEvent) error {
return nil
}

func (m *mockDDLSink) getCheckpointTsAndTableNames() (uint64, []*model.TableInfo) {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
101 changes: 12 additions & 89 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"math/rand"
"sort"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -94,14 +93,6 @@ var redoBarrierDDLs = map[timodel.ActionType]struct{}{
timodel.ActionRemovePartitioning: {},
}

type bootstrapState int32

const (
bootstrapStateNone bootstrapState = iota
bootstrapStateRunning
bootstrapStateCompleted
)

// ddlManager holds the pending DDL events of all tables and responsible for
// executing them to downstream.
// It also provides the ability to calculate the barrier of a changefeed.
Expand Down Expand Up @@ -136,12 +127,6 @@ type ddlManager struct {

BDRMode bool
ddlResolvedTs model.Ts

// needBootstrap is true when the downstream is kafka
// and the protocol is simple protocol.
needSendBootstrapEvent bool
errCh chan error
bootstrapState int32
}

func newDDLManager(
Expand All @@ -153,32 +138,27 @@ func newDDLManager(
schema *schemaWrap4Owner,
redoManager redo.DDLManager,
redoMetaManager redo.MetaManager,
sinkType model.DownstreamType,
bdrMode bool,
needSendBootstrapEvent bool,
) *ddlManager {
log.Info("owner create ddl manager",
zap.String("namespace", changefeedID.Namespace),
zap.String("changefeed", changefeedID.ID),
zap.Uint64("startTs", startTs),
zap.Uint64("checkpointTs", checkpointTs),
zap.Bool("bdrMode", bdrMode),
zap.Stringer("sinkType", sinkType))
zap.Bool("bdrMode", bdrMode))

return &ddlManager{
changfeedID: changefeedID,
ddlSink: ddlSink,
ddlPuller: ddlPuller,
schema: schema,
redoDDLManager: redoManager,
redoMetaManager: redoMetaManager,
startTs: startTs,
checkpointTs: checkpointTs,
ddlResolvedTs: startTs,
BDRMode: bdrMode,
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
errCh: make(chan error, 1),
needSendBootstrapEvent: needSendBootstrapEvent,
changfeedID: changefeedID,
ddlSink: ddlSink,
ddlPuller: ddlPuller,
schema: schema,
redoDDLManager: redoManager,
redoMetaManager: redoMetaManager,
startTs: startTs,
checkpointTs: checkpointTs,
ddlResolvedTs: startTs,
BDRMode: bdrMode,
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
}
}

Expand All @@ -195,16 +175,6 @@ func (m *ddlManager) tick(
ctx context.Context,
checkpointTs model.Ts,
) ([]model.TableID, *schedulepb.BarrierWithMinTs, error) {
if m.needSendBootstrapEvent {
finished, err := m.checkAndBootstrap(ctx)
if err != nil {
return nil, nil, err
}
if !finished {
return nil, schedulepb.NewBarrierWithMinTs(checkpointTs), nil
}
}

m.justSentDDL = nil
m.checkpointTs = checkpointTs

Expand Down Expand Up @@ -594,53 +564,6 @@ func (m *ddlManager) cleanCache() {
m.physicalTablesCache = nil
}

func (m *ddlManager) checkAndBootstrap(ctx context.Context) (bool, error) {
if atomic.LoadInt32(&m.bootstrapState) == int32(bootstrapStateCompleted) {
return true, nil
}

select {
case err := <-m.errCh:
return false, err
default:
}

if atomic.LoadInt32(&m.bootstrapState) == int32(bootstrapStateRunning) {
return false, nil
}
// begin bootstrap
atomic.StoreInt32(&m.bootstrapState, int32(bootstrapStateRunning))
tables, err := m.allTables(ctx)
if err != nil {
return false, err
}
bootstrapEvents := make([]*model.DDLEvent, 0, len(tables))
for _, table := range tables {
ddlEvent := &model.DDLEvent{
StartTs: m.startTs,
CommitTs: m.startTs,
TableInfo: table,
IsBootstrap: true,
}
bootstrapEvents = append(bootstrapEvents, ddlEvent)
}
// send bootstrap events
go func() {
for _, event := range bootstrapEvents {
err := m.ddlSink.emitBootstrapEvent(ctx, event)
if err != nil {
log.Error("emit bootstrap event failed",
zap.Any("bootstrapEvent", event), zap.Error(err))
atomic.StoreInt32(&m.bootstrapState, int32(bootstrapStateNone))
m.errCh <- err
return
}
}
atomic.StoreInt32(&m.bootstrapState, int32(bootstrapStateCompleted))
}()
return false, nil
}

// getRelatedPhysicalTableIDs get all related physical table ids of a ddl event.
// It is a helper function to calculate tableBarrier.
func getRelatedPhysicalTableIDs(ddl *model.DDLEvent) []model.TableID {
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/ddl_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func createDDLManagerForTest(t *testing.T) *ddlManager {
schema,
redo.NewDisabledDDLManager(),
redo.NewDisabledMetaManager(),
model.DB, false, false)
false)
return res
}

Expand Down
17 changes: 0 additions & 17 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ type DDLSink interface {
// the DDL event will be sent to another goroutine and execute to downstream
// the caller of this function can call again and again until a true returned
emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bool, error)
emitBootstrapEvent(ctx context.Context, ddl *model.DDLEvent) error
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
emitSyncPoint(ctx context.Context, checkpointTs uint64) error
// close the ddlsink, cancel running goroutine.
close(ctx context.Context) error
Expand Down Expand Up @@ -385,22 +384,6 @@ func (s *ddlSinkImpl) emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bo
return false, nil
}

// emitBootstrapEvent sent bootstrap event to downstream.
// It is a synchronous operation.
func (s *ddlSinkImpl) emitBootstrapEvent(ctx context.Context, ddl *model.DDLEvent) error {
if !ddl.IsBootstrap {
return nil
}
err := s.sink.WriteDDLEvent(ctx, ddl)
if err != nil {
return errors.Trace(err)
}
// TODO: change this log to debug level after testing complete.
log.Info("emit bootstrap event", zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID), zap.Any("bootstrapEvent", ddl))
return nil
}

func (s *ddlSinkImpl) emitSyncPoint(ctx context.Context, checkpointTs uint64) (err error) {
if checkpointTs == s.lastSyncPoint {
return nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
}
decoder = avro.NewDecoder(c.codecConfig, schemaM, c.option.topic, c.tz)
case config.ProtocolSimple:
decoder = simple.NewDecoder()
decoder, err = simple.NewDecoder(ctx, c.codecConfig, c.upstreamTiDB)
default:
log.Panic("Protocol not supported", zap.Any("Protocol", c.codecConfig.Protocol))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/large_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (c *LargeMessageHandleConfig) AdjustAndValidate(protocol Protocol, enableTi
}

switch protocol {
case ProtocolOpen:
case ProtocolOpen, ProtocolSimple:
case ProtocolCanalJSON:
if !enableTiDBExtension {
return cerror.ErrInvalidReplicaConfig.GenWithStack(
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/builder/encoder_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewRowEventEncoderBuilder(
case config.ProtocolDebezium:
return debezium.NewBatchEncoderBuilder(cfg), nil
case config.ProtocolSimple:
return simple.NewBuilder(cfg), nil
return simple.NewBuilder(ctx, cfg)
default:
return nil, cerror.ErrSinkUnknownProtocol.GenWithStackByArgs(cfg.Protocol)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/sink/codec/canal/canal_json_row_event_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestBuildCanalJSONRowEventEncoder(t *testing.T) {
}

func TestDMLE2E(t *testing.T) {
insertEvent, updateEvent, deleteEvent := utils.NewLargeEvent4Test(t)
_, insertEvent, updateEvent, deleteEvent := utils.NewLargeEvent4Test(t)

ctx := context.Background()

Expand Down Expand Up @@ -126,7 +126,7 @@ func TestDMLE2E(t *testing.T) {
}

func TestCanalJSONCompressionE2E(t *testing.T) {
insertEvent, _, _ := utils.NewLargeEvent4Test(t)
_, insertEvent, _, _ := utils.NewLargeEvent4Test(t)

codecConfig := common.NewConfig(config.ProtocolCanalJSON)
codecConfig.EnableTiDBExtension = true
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestCanalJSONClaimCheckE2E(t *testing.T) {
require.NoError(t, err)
encoder := builder.Build()

insertEvent, _, _ := utils.NewLargeEvent4Test(t)
_, insertEvent, _, _ := utils.NewLargeEvent4Test(t)
err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {})
require.NoError(t, err)

Expand Down Expand Up @@ -269,7 +269,7 @@ func TestNewCanalJSONMessageHandleKeyOnly4LargeMessage(t *testing.T) {
require.NoError(t, err)
encoder := builder.Build()

insertEvent, _, _ := utils.NewLargeEvent4Test(t)
_, insertEvent, _, _ := utils.NewLargeEvent4Test(t)
err = encoder.AppendRowChangedEvent(context.Background(), "", insertEvent, func() {})
require.NoError(t, err)

Expand Down Expand Up @@ -351,7 +351,7 @@ func TestBatching(t *testing.T) {
encoder := builder.Build()
require.NotNil(t, encoder)

_, updateEvent, _ := utils.NewLargeEvent4Test(t)
_, _, updateEvent, _ := utils.NewLargeEvent4Test(t)
updateCase := *updateEvent
for i := 1; i <= 1000; i++ {
ts := uint64(i)
Expand Down Expand Up @@ -629,7 +629,7 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) {

encoder := builder.Build()

insertEvent, _, _ := utils.NewLargeEvent4Test(t)
_, insertEvent, _, _ := utils.NewLargeEvent4Test(t)
err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {})
require.NoError(t, err)

Expand Down Expand Up @@ -666,7 +666,7 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) {
}

func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) {
insertEvent, _, _ := utils.NewLargeEvent4Test(t)
_, insertEvent, _, _ := utils.NewLargeEvent4Test(t)
ctx := context.Background()

for _, encodeEnable := range []bool{false, true} {
Expand Down
Loading
Loading