Skip to content

Commit

Permalink
fix unit test.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Sep 12, 2023
1 parent 654084d commit cdca12b
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 15 deletions.
3 changes: 1 addition & 2 deletions cdc/sink/ddlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package factory
import (
"context"
"net/url"
"strings"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
Expand Down Expand Up @@ -46,7 +45,7 @@ func New(
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := strings.ToLower(sinkURI.Scheme)
scheme := sink.GetScheme(sinkURI)
switch scheme {
case sink.KafkaScheme, sink.KafkaSSLScheme:
factoryCreator := kafka.NewSaramaFactory
Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/dmlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package factory
import (
"context"
"net/url"
"strings"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -63,7 +62,7 @@ func New(
}

s := &SinkFactory{}
schema := strings.ToLower(sinkURI.Scheme)
schema := sink.GetScheme(sinkURI)
switch schema {
case sink.MySQLScheme, sink.MySQLSSLScheme, sink.TiDBScheme, sink.TiDBSSLScheme:
txnSink, err := txn.NewMySQLSink(ctx, changefeedID, sinkURI, cfg, errCh,
Expand Down
6 changes: 4 additions & 2 deletions cdc/sink/dmlsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/codec/builder"
"github.com/pingcap/tiflow/pkg/sink/kafka"
Expand Down Expand Up @@ -90,7 +91,8 @@ func NewKafkaDMLSink(
return nil, errors.Trace(err)
}

eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic, sinkURI.Scheme)
scheme := sink.GetScheme(sinkURI)
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic, scheme)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -116,7 +118,7 @@ func NewKafkaDMLSink(
concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency)
encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID)
s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager,
eventRouter, encoderGroup, protocol, errCh)
eventRouter, encoderGroup, protocol, scheme, errCh)
log.Info("DML sink producer created",
zap.String("namespace", changefeedID.Namespace),
zap.String("changefeedID", changefeedID.ID))
Expand Down
6 changes: 5 additions & 1 deletion cdc/sink/dmlsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type dmlSink struct {

wg sync.WaitGroup
dead chan struct{}

scheme string
}

func newDMLSink(
Expand All @@ -74,6 +76,7 @@ func newDMLSink(
eventRouter *dispatcher.EventRouter,
encoderGroup codec.EncoderGroup,
protocol config.Protocol,
scheme string,
errCh chan error,
) *dmlSink {
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -87,6 +90,7 @@ func newDMLSink(
ctx: ctx,
cancel: cancel,
dead: make(chan struct{}),
scheme: scheme,
}
s.alive.eventRouter = eventRouter
s.alive.topicManager = topicManager
Expand Down Expand Up @@ -188,5 +192,5 @@ func (s *dmlSink) Dead() <-chan struct{} {

// Scheme returns the scheme of this sink.
func (s *dmlSink) Scheme() string {
return sink.KafkaScheme
return s.scheme
}
7 changes: 5 additions & 2 deletions cdc/sink/dmlsink/mq/pulsar_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/codec/builder"
pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar"
Expand Down Expand Up @@ -86,13 +87,14 @@ func NewPulsarDMLSink(
return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err)
}

scheme := sink.GetScheme(sinkURI)
// The topicManager is not actually used in pulsar , it is only used to create dmlSink.
// TODO: Find a way to remove it in newDMLSink.
topicManager, err := pulsarTopicManagerCreator(pConfig, client)
if err != nil {
return nil, errors.Trace(err)
}
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, sinkURI.Scheme)
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, scheme)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -111,7 +113,8 @@ func NewPulsarDMLSink(
concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency)
encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID)

s := newDMLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderGroup, protocol, errCh)
s := newDMLSink(ctx, changefeedID, p, nil, topicManager,
eventRouter, encoderGroup, protocol, scheme, errCh)

return s, nil
}
14 changes: 9 additions & 5 deletions cdc/sink/dmlsink/txn/txn_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type dmlSink struct {
dead chan struct{}

statistics *metrics.Statistics

scheme string
}

// GetDBConnImpl is the implementation of pmysql.Factory.
Expand Down Expand Up @@ -84,11 +86,13 @@ func NewMySQLSink(
for _, impl := range backendImpls {
backends = append(backends, impl)
}
sink := newSink(ctx, changefeedID, backends, errCh, conflictDetectorSlots)
sink.statistics = statistics
sink.cancel = cancel

return sink, nil
s := newSink(ctx, changefeedID, backends, errCh, conflictDetectorSlots)
s.statistics = statistics
s.cancel = cancel
s.scheme = sink.GetScheme(sinkURI)

return s, nil
}

func newSink(ctx context.Context,
Expand Down Expand Up @@ -175,5 +179,5 @@ func (s *dmlSink) Dead() <-chan struct{} {
}

func (s *dmlSink) Scheme() string {
return sink.MySQLScheme
return s.scheme
}
1 change: 0 additions & 1 deletion cdc/sink/tablesink/table_sink_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ func TestNewEventTableSink(t *testing.T) {
require.NotNil(t, tb.eventAppender, "eventAppender should be set")
require.Equal(t, 0, len(tb.eventBuffer), "eventBuffer should be empty")
require.Equal(t, state.TableSinkSinking, tb.state, "table sink should be sinking")
require.True(t, tb.splitSingleUpdate, "splitSingleUpdate should be true")
}

func TestAppendRowChangedEvents(t *testing.T) {
Expand Down

0 comments on commit cdca12b

Please sign in to comment.