Skip to content

Commit

Permalink
cmd (ticdc): add pulsar consumer (#9730)
Browse files Browse the repository at this point in the history
ref #9413
  • Loading branch information
asddongmen authored Sep 15, 2023
1 parent 87b5912 commit d0a9a4b
Show file tree
Hide file tree
Showing 7 changed files with 789 additions and 22 deletions.
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ kafka_consumer:
storage_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_storage_consumer ./cmd/storage-consumer/main.go

pulsar_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_pulsar_consumer ./cmd/pulsar-consumer/main.go

kafka_consumer_image:
@which docker || (echo "docker not found in ${PATH}"; exit 1)
DOCKER_BUILDKIT=1 docker build -f ./deployments/ticdc/docker/kafka-consumer.Dockerfile . -t ticdc:kafka-consumer --platform linux/amd64
Expand All @@ -171,6 +174,10 @@ storage_consumer_image:
@which docker || (echo "docker not found in ${PATH}"; exit 1)
DOCKER_BUILDKIT=1 docker build -f ./deployments/ticdc/docker/storage-consumer.Dockerfile . -t ticdc:storage-consumer --platform linux/amd64

pulsar_consumer_image:
@which docker || (echo "docker not found in ${PATH}"; exit 1)
DOCKER_BUILDKIT=1 docker build -f ./deployments/ticdc/docker/pulsar-consumer.Dockerfile . -t ticdc:pulsar-consumer --platform linux/amd64

filter_helper:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_filter_helper ./cmd/filter-helper/main.go

Expand Down Expand Up @@ -215,7 +222,7 @@ check_third_party_binary:
@which bin/minio
@which bin/bin/schema-registry-start

integration_test_build: check_failpoint_ctl storage_consumer kafka_consumer
integration_test_build: check_failpoint_ctl storage_consumer kafka_consumer pulsar_consumer
$(FAILPOINT_ENABLE)
$(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \
-coverpkg=github.com/pingcap/tiflow/... \
Expand Down
6 changes: 6 additions & 0 deletions cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ func (p *pulsarProducers) SyncSendMessage(ctx context.Context, topic string,
return err
}

if message.Type == model.MessageTypeDDL {
log.Info("pulsarProducers SyncSendMessage success",
zap.Any("mID", mID), zap.String("topic", topic),
zap.String("ddl", string(message.Value)))
}

log.Debug("pulsarProducers SyncSendMessage success",
zap.Any("mID", mID), zap.String("topic", topic))

Expand Down
3 changes: 2 additions & 1 deletion cdc/sink/ddlsink/mq/mq_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func (k *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error

topic := k.eventRouter.GetTopicForDDL(ddl)
partitionRule := k.eventRouter.GetDLLDispatchRuleByProtocol(k.protocol)
log.Debug("Emit ddl event",
log.Info("Emit ddl event",
zap.String("topic", topic),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("query", ddl.Query),
zap.String("namespace", k.id.Namespace),
Expand Down
42 changes: 22 additions & 20 deletions cdc/sink/dmlsink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher/partition"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher/topic"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/errors"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/util"
Expand Down Expand Up @@ -267,29 +268,30 @@ func getTopicDispatcher(
if sink.IsPulsarScheme(schema) {
err := topicExpr.PulsarValidate()
if err != nil {
return nil, err
}
}

// validate the topic expression for kafka sink
var p config.Protocol
var err error
if protocol != "" {
p, err = config.ParseSinkProtocolFromString(protocol)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
}
if p == config.ProtocolAvro {
err = topicExpr.ValidateForAvro()
if err != nil {
return nil, err
return nil, errors.Trace(err)
}
} else {
err = topicExpr.Validate()
if err != nil {
return nil, err
// validate the topic expression for kafka sink
var p config.Protocol
var err error
if protocol != "" {
p, err = config.ParseSinkProtocolFromString(protocol)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
}
if p == config.ProtocolAvro {
err = topicExpr.ValidateForAvro()
if err != nil {
return nil, err
}
} else {
err = topicExpr.Validate()
if err != nil {
return nil, err
}
}
}

return topic.NewDynamicTopicDispatcher(topicExpr), nil
}
5 changes: 5 additions & 0 deletions cdc/sink/dmlsink/mq/pulsar_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func NewPulsarDMLSink(
if err != nil {
return nil, errors.Trace(err)
}
if !util.IsPulsarSupportedProtocols(protocol) {
return nil, cerror.ErrSinkURIInvalid.
GenWithStackByArgs("unsupported protocol, " +
"pulsar sink currently only support these protocols: [canal-json, canal, maxwell]")
}

pConfig, err := pulsarConfig.NewPulsarConfig(sinkURI, replicaConfig.Sink.PulsarConfig)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions cdc/sink/util/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,8 @@ func GetTopicManagerAndTryCreateTopic(

return topicManager, nil
}

// IsPulsarSupportedProtocols returns whether the protocol is supported by pulsar.
func IsPulsarSupportedProtocols(p config.Protocol) bool {
return p == config.ProtocolCanalJSON
}
Loading

0 comments on commit d0a9a4b

Please sign in to comment.