> offsetsState;
+ /**
+ * The current offsets of partitions which are stored in {@link #offsetsState}
+ * once a checkpoint is triggered.
+ *
+ * NOTE: The offsets are populated in the main thread and saved in the
+ * checkpoint thread. Its usage must be guarded by the checkpoint lock.
+ */
+ private transient Map currentOffsets;
+ /**
+ * The TubeMQ session factory.
+ */
+ private transient TubeSingleSessionFactory messageSessionFactory;
+ /**
+ * The TubeMQ pull consumer.
+ */
+ private transient PullMessageConsumer messagePullConsumer;
+
+ /**
+ * Build a TubeMQ source function
+ *
+ * @param masterAddress the master address of TubeMQ
+ * @param topic the topic name
+ * @param tidSet the topic's filter condition items
+ * @param consumerGroup the consumer group name
+ * @param deserializationSchema the deserialize schema
+ * @param configuration the configure
+ * @param sessionKey the tube session key
+ */
+ public FlinkTubeMQConsumer(
+ String masterAddress,
+ String topic,
+ TreeSet tidSet,
+ String consumerGroup,
+ DeserializationSchema deserializationSchema,
+ Configuration configuration,
+ String sessionKey,
+ Boolean innerFormat) {
+ checkNotNull(masterAddress, "The master address must not be null.");
+ checkNotNull(topic, "The topic must not be null.");
+ checkNotNull(tidSet, "The tid set must not be null.");
+ checkNotNull(consumerGroup, "The consumer group must not be null.");
+ checkNotNull(deserializationSchema, "The deserialization schema must not be null.");
+ checkNotNull(configuration, "The configuration must not be null.");
+
+ this.masterAddress = masterAddress;
+ this.topic = topic;
+ this.tidSet = tidSet;
+ this.consumerGroup = consumerGroup;
+ this.deserializationSchema = deserializationSchema;
+ this.sessionKey = sessionKey;
+
+ // those param set default
+ this.consumeFromMax = configuration.getBoolean(TubeMQOptions.BOOTSTRAP_FROM_MAX);
+ this.messageNotFoundWaitPeriod = parseDuration(configuration.getString(
+ TubeMQOptions.MESSAGE_NOT_FOUND_WAIT_PERIOD));
+ this.maxIdleTime = parseDuration(configuration.getString(
+ TubeMQOptions.SOURCE_MAX_IDLE_TIME));
+ this.innerFormat = innerFormat;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ TypeInformation> typeInformation =
+ new TupleTypeInfo<>(STRING_TYPE_INFO, LONG_TYPE_INFO);
+ ListStateDescriptor> stateDescriptor =
+ new ListStateDescriptor<>(TUBE_OFFSET_STATE, typeInformation);
+
+ OperatorStateStore stateStore = context.getOperatorStateStore();
+ offsetsState = stateStore.getListState(stateDescriptor);
+ currentOffsets = new HashMap<>();
+ if (context.isRestored()) {
+ for (Tuple2 tubeOffset : offsetsState.get()) {
+ currentOffsets.put(tubeOffset.f0, tubeOffset.f1);
+ }
+ LOG.info("Successfully restore the offsets {}.", currentOffsets);
+ } else {
+ LOG.info("No restore offsets.");
+ }
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, consumerGroup);
+ consumerConfig.setConsumePosition(consumeFromMax
+ ? ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS
+ : ConsumePosition.CONSUMER_FROM_FIRST_OFFSET);
+
+ consumerConfig.setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis());
+
+ final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+ messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
+ messagePullConsumer.subscribe(topic, tidSet);
+ messagePullConsumer.completeSubscribe(sessionKey, numTasks, true, currentOffsets);
+
+ running = true;
+ }
+
+ @Override
+ public void run(SourceContext ctx) throws Exception {
+
+ Instant lastConsumeInstant = Instant.now();
+
+ while (running) {
+ ConsumerResult consumeResult = messagePullConsumer.getMessage();
+ if (!consumeResult.isSuccess()) {
+ if (!(consumeResult.getErrCode() == TErrCodeConstants.BAD_REQUEST
+ || consumeResult.getErrCode() == TErrCodeConstants.NOT_FOUND
+ || consumeResult.getErrCode() == TErrCodeConstants.ALL_PARTITION_FROZEN
+ || consumeResult.getErrCode() == TErrCodeConstants.NO_PARTITION_ASSIGNED
+ || consumeResult.getErrCode() == TErrCodeConstants.ALL_PARTITION_WAITING
+ || consumeResult.getErrCode() == TErrCodeConstants.ALL_PARTITION_INUSE)) {
+ LOG.info("Could not consume messages from tubemq (errcode: {},errmsg: {}).",
+ consumeResult.getErrCode(),
+ consumeResult.getErrMsg());
+ }
+
+ Duration idleTime = Duration.between(lastConsumeInstant, Instant.now());
+ if (idleTime.compareTo(maxIdleTime) > 0) {
+ ctx.markAsTemporarilyIdle();
+ }
+
+ continue;
+ }
+
+ List messageList = consumeResult.getMessageList();
+ lastConsumeInstant = Instant.now();
+
+ List records = new ArrayList<>();
+ lastConsumeInstant = getRecords(lastConsumeInstant, messageList, records);
+
+ synchronized (ctx.getCheckpointLock()) {
+
+ for (T record : records) {
+ ctx.collect(record);
+ }
+ currentOffsets.put(
+ consumeResult.getPartitionKey(),
+ consumeResult.getCurrOffset());
+ }
+
+ ConsumerResult confirmResult = messagePullConsumer
+ .confirmConsume(consumeResult.getConfirmContext(), true);
+ if (!confirmResult.isSuccess()) {
+ if (!(confirmResult.getErrCode() == TErrCodeConstants.BAD_REQUEST
+ || confirmResult.getErrCode() == TErrCodeConstants.NOT_FOUND
+ || confirmResult.getErrCode() == TErrCodeConstants.ALL_PARTITION_FROZEN
+ || confirmResult.getErrCode() == TErrCodeConstants.NO_PARTITION_ASSIGNED
+ || confirmResult.getErrCode() == TErrCodeConstants.ALL_PARTITION_WAITING
+ || confirmResult.getErrCode() == TErrCodeConstants.ALL_PARTITION_INUSE)) {
+ LOG.warn("Could not confirm messages to tubemq (errcode: {},errmsg: {}).",
+ confirmResult.getErrCode(),
+ confirmResult.getErrMsg());
+ }
+ }
+ }
+ }
+
+ private Instant getRecords(Instant lastConsumeInstant, List messageList, List records)
+ throws Exception {
+ if (messageList != null) {
+ lastConsumeInstant = Instant.now();
+ if (!innerFormat) {
+ for (Message message : messageList) {
+ T record = deserializationSchema.deserialize(message.getData());
+ records.add(record);
+ }
+ } else {
+ List rowDataList = new ArrayList<>();
+ ListCollector out = new ListCollector<>(rowDataList);
+ for (Message message : messageList) {
+ deserializationSchema.deserialize(message.getData(), (Collector) out);
+ }
+ rowDataList.forEach(data -> records.add((T) data));
+ }
+ }
+
+ return lastConsumeInstant;
+
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+
+ offsetsState.clear();
+ for (Map.Entry entry : currentOffsets.entrySet()) {
+ offsetsState.add(new Tuple2<>(entry.getKey(), entry.getValue()));
+ }
+
+ LOG.info("Successfully save the offsets in checkpoint {}: {}.",
+ context.getCheckpointId(), currentOffsets);
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ cancel();
+
+ if (messagePullConsumer != null) {
+ try {
+ messagePullConsumer.shutdown();
+ } catch (Throwable t) {
+ LOG.warn("Could not properly shutdown the tubemq pull consumer.", t);
+ }
+ }
+
+ if (messageSessionFactory != null) {
+ try {
+ messageSessionFactory.shutdown();
+ } catch (Throwable t) {
+ LOG.warn("Could not properly shutdown the tubemq session factory.", t);
+ }
+ }
+
+ super.close();
+
+ LOG.info("Closed the tubemq source.");
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java
new file mode 100644
index 00000000000..fb2f6249611
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq;
+
+import org.apache.inlong.sort.tubemq.table.TubeMQOptions;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.client.producer.MessageProducer;
+import org.apache.inlong.tubemq.client.producer.MessageSentResult;
+import org.apache.inlong.tubemq.corebase.Message;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.TreeSet;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class FlinkTubeMQProducer extends RichSinkFunction implements CheckpointedFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkTubeMQProducer.class);
+
+ private static final String SYSTEM_HEADER_TIME_FORMAT = "yyyyMMddHHmm";
+
+ /**
+ * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081.
+ */
+ private final String masterAddress;
+
+ /**
+ * The topic name.
+ */
+ private final String topic;
+
+ /**
+ * The tubemq consumers use this tid set to filter records reading from server.
+ */
+ private final TreeSet tidSet;
+ /**
+ * The serializer for the records sent to tube.
+ */
+ private final SerializationSchema serializationSchema;
+
+ /**
+ * The tubemq producer.
+ */
+ private transient MessageProducer producer;
+
+ /**
+ * The tubemq session factory.
+ */
+ private transient MessageSessionFactory sessionFactory;
+
+ /**
+ * The maximum number of retries.
+ */
+ private final int maxRetries;
+
+ public FlinkTubeMQProducer(String topic,
+ String masterAddress,
+ SerializationSchema serializationSchema,
+ TreeSet tidSet,
+ Configuration configuration) {
+ checkNotNull(topic, "The topic must not be null.");
+ checkNotNull(masterAddress, "The master address must not be null.");
+ checkNotNull(serializationSchema, "The serialization schema must not be null.");
+ checkNotNull(tidSet, "The tid set must not be null.");
+ checkNotNull(configuration, "The configuration must not be null.");
+
+ int max_retries = configuration.getInteger(TubeMQOptions.MAX_RETRIES);
+ checkArgument(max_retries > 0);
+
+ this.topic = topic;
+ this.masterAddress = masterAddress;
+ this.serializationSchema = serializationSchema;
+ this.tidSet = tidSet;
+ this.maxRetries = max_retries;
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
+ // Nothing to do.
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext functionInitializationContext) {
+ // Nothing to do.
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ TubeClientConfig tubeClientConfig = new TubeClientConfig(masterAddress);
+ this.sessionFactory = new TubeSingleSessionFactory(tubeClientConfig);
+ this.producer = sessionFactory.createProducer();
+ HashSet hashSet = new HashSet<>();
+ hashSet.add(topic);
+ producer.publish(hashSet);
+ }
+
+ @Override
+ public void invoke(T in, SinkFunction.Context context) throws Exception {
+
+ int retries = 0;
+ Exception exception = null;
+
+ while (maxRetries <= 0 || retries < maxRetries) {
+
+ try {
+ byte[] body = serializationSchema.serialize(in);
+ Message message = new Message(topic, body);
+ MessageSentResult sendResult = producer.sendMessage(message);
+ if (sendResult.isSuccess()) {
+ return;
+ } else {
+ LOG.warn("Send msg fail, error code: {}, error message: {}",
+ sendResult.getErrCode(), sendResult.getErrMsg());
+ }
+ } catch (Exception e) {
+ LOG.warn("Could not properly send the message to tube "
+ + "(retries: {}).", retries, e);
+
+ retries++;
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+ }
+
+ throw new IOException("Could not properly send the message to tube.", exception);
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ try {
+ if (producer != null) {
+ producer.shutdown();
+ }
+ if (sessionFactory != null) {
+ sessionFactory.shutdown();
+ }
+ } catch (Throwable e) {
+ LOG.error("Shutdown producer error", e);
+ } finally {
+ super.close();
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
new file mode 100644
index 00000000000..f5880f1a784
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.inlong.tubemq.corebase.Message;
+
+import com.google.common.base.Objects;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class DynamicTubeMQDeserializationSchema implements DeserializationSchema {
+
+ /**
+ * data buffer message
+ */
+ private final DeserializationSchema deserializationSchema;
+
+ /**
+ * {@link MetadataConverter} of how to produce metadata from message.
+ */
+ private final MetadataConverter[] metadataConverters;
+
+ /**
+ * {@link TypeInformation} of the produced {@link RowData} (physical + meta data).
+ */
+ private final TypeInformation producedTypeInfo;
+
+ /**
+ * status of error
+ */
+ private final boolean ignoreErrors;
+
+ public DynamicTubeMQDeserializationSchema(
+ DeserializationSchema schema,
+ MetadataConverter[] metadataConverters,
+ TypeInformation producedTypeInfo,
+ boolean ignoreErrors) {
+ this.deserializationSchema = schema;
+ this.metadataConverters = metadataConverters;
+ this.producedTypeInfo = producedTypeInfo;
+ this.ignoreErrors = ignoreErrors;
+ }
+
+ @Override
+ public RowData deserialize(byte[] bytes) throws IOException {
+ return deserializationSchema.deserialize(bytes);
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector out) throws IOException {
+ deserializationSchema.deserialize(message, out);
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData rowData) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return producedTypeInfo;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DynamicTubeMQDeserializationSchema)) {
+ return false;
+ }
+ DynamicTubeMQDeserializationSchema that = (DynamicTubeMQDeserializationSchema) o;
+ return ignoreErrors == that.ignoreErrors
+ && Objects.equal(Arrays.stream(metadataConverters).collect(Collectors.toList()),
+ Arrays.stream(that.metadataConverters).collect(Collectors.toList()))
+ && Objects.equal(deserializationSchema, that.deserializationSchema)
+ && Objects.equal(producedTypeInfo, that.producedTypeInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(deserializationSchema, metadataConverters, producedTypeInfo, ignoreErrors);
+ }
+
+ /**
+ * add metadata column
+ */
+ private void emitRow(Message head, GenericRowData physicalRow, Collector out) {
+ if (metadataConverters.length == 0) {
+ out.collect(physicalRow);
+ return;
+ }
+ final int physicalArity = physicalRow.getArity();
+ final int metadataArity = metadataConverters.length;
+ final GenericRowData producedRow =
+ new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
+ for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
+ producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
+ }
+ for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
+ producedRow.setField(
+ physicalArity + metadataPos, metadataConverters[metadataPos].read(head));
+ }
+ out.collect(producedRow);
+ }
+
+ interface MetadataConverter extends Serializable {
+
+ Object read(Message head);
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQSerializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQSerializationSchema.java
new file mode 100644
index 00000000000..a1f95fcdd65
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQSerializationSchema.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.data.RowData;
+
+public class DynamicTubeMQSerializationSchema implements SerializationSchema {
+
+ private final SerializationSchema serializationSchema;
+
+ public DynamicTubeMQSerializationSchema(SerializationSchema serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ }
+
+ @Override
+ public byte[] serialize(RowData element) {
+ return serializationSchema.serialize(element);
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
new file mode 100644
index 00000000000..17275d8d113
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.GROUP_NAME;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.MASTER_RPC;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.SESSION_KEY;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.STREAMID;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC_PATTERN;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.getTubeMQProperties;
+
+/**
+ * A dynamic table factory implementation for TubeMQ.
+ */
+public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+ public static final String IDENTIFIER = "tubemq";
+
+ public static final List INNERFORMATTYPE = Arrays.asList("inlong-msg");
+
+ public static boolean innerFormat = false;
+
+ private static DecodingFormat> getValueDecodingFormat(
+ TableFactoryHelper helper) {
+ return helper.discoverOptionalDecodingFormat(DeserializationFormatFactory.class, FORMAT)
+ .orElseGet(() -> helper.discoverDecodingFormat(DeserializationFormatFactory.class, FORMAT));
+ }
+
+ private static EncodingFormat> getValueEncodingFormat(
+ TableFactoryHelper helper) {
+ return helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, FORMAT)
+ .orElseGet(() -> helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT));
+ }
+
+ private static void validatePKConstraints(
+ ObjectIdentifier tableName, CatalogTable catalogTable, Format format) {
+ if (catalogTable.getSchema().getPrimaryKey().isPresent()
+ && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ Configuration options = Configuration.fromMap(catalogTable.getOptions());
+ String formatName = options.getOptional(FORMAT).orElse(options.get(FORMAT));
+ innerFormat = INNERFORMATTYPE.contains(formatName);
+ throw new ValidationException(String.format(
+ "The TubeMQ table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
+ + " on the table, because it can't guarantee the semantic of primary key.",
+ tableName.asSummaryString(), formatName));
+ }
+ }
+
+ private static Optional>> getKeyDecodingFormat(
+ TableFactoryHelper helper) {
+ final Optional>> keyDecodingFormat = helper
+ .discoverOptionalDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT);
+ keyDecodingFormat.ifPresent(format -> {
+ if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ throw new ValidationException(String.format(
+ "A key format should only deal with INSERT-only records. "
+ + "But %s has a changelog mode of %s.",
+ helper.getOptions().get(KEY_FORMAT),
+ format.getChangelogMode()));
+ }
+ });
+ return keyDecodingFormat;
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+ final ReadableConfig tableOptions = helper.getOptions();
+
+ final DecodingFormat> valueDecodingFormat = getValueDecodingFormat(helper);
+
+ // validate all options
+ helper.validate();
+
+ validatePKConstraints(context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);
+
+ final Configuration properties = getTubeMQProperties(context.getCatalogTable().getOptions());
+
+ final DataType physicalDataType = context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
+
+ return createTubeMQTableSource(
+ physicalDataType,
+ valueDecodingFormat,
+ TubeMQOptions.getSourceTopics(tableOptions),
+ TubeMQOptions.getMasterRpcAddress(tableOptions),
+ TubeMQOptions.getTiSet(tableOptions),
+ TubeMQOptions.getConsumerGroup(tableOptions),
+ TubeMQOptions.getSessionKey(tableOptions),
+ properties);
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+ final ReadableConfig tableOptions = helper.getOptions();
+
+ final EncodingFormat> valueEncodingFormat = getValueEncodingFormat(helper);
+
+ // validate all options
+ helper.validate();
+
+ validatePKConstraints(context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat);
+
+ final Configuration properties = getTubeMQProperties(context.getCatalogTable().getOptions());
+
+ final DataType physicalDataType = context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
+
+ return createTubeMQTableSink(
+ physicalDataType,
+ valueEncodingFormat,
+ TubeMQOptions.getSinkTopics(tableOptions),
+ TubeMQOptions.getMasterRpcAddress(tableOptions),
+ TubeMQOptions.getTiSet(tableOptions),
+ properties);
+ }
+
+ protected TubeMQTableSource createTubeMQTableSource(
+ DataType physicalDataType,
+ DecodingFormat> valueDecodingFormat,
+ String topic,
+ String url,
+ TreeSet streamId,
+ String consumerGroup,
+ String sessionKey,
+ Configuration properties) {
+ return new TubeMQTableSource(
+ physicalDataType,
+ valueDecodingFormat,
+ url,
+ topic,
+ streamId,
+ consumerGroup,
+ sessionKey,
+ properties,
+ null,
+ null,
+ false,
+ innerFormat);
+ }
+
+ protected TubeMQTableSink createTubeMQTableSink(
+ DataType physicalDataType,
+ EncodingFormat> valueEncodingFormat,
+ String topic,
+ String masterAddress,
+ TreeSet streamId,
+ Configuration configuration) {
+ return new TubeMQTableSink(
+ physicalDataType,
+ valueEncodingFormat,
+ topic,
+ masterAddress,
+ streamId,
+ configuration);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ final Set> options = new HashSet<>();
+ options.add(MASTER_RPC);
+ return options;
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ final Set> options = new HashSet<>();
+ options.add(FORMAT);
+ options.add(TOPIC);
+ options.add(GROUP_NAME);
+ options.add(STREAMID);
+ options.add(SESSION_KEY);
+ options.add(BOOTSTRAP_FROM_MAX);
+ options.add(TOPIC_PATTERN);
+ return options;
+ }
+
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
new file mode 100644
index 00000000000..0085100c870
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.description.Description;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+
+/**
+ * Option utils for tubeMQ table source and sink.
+ */
+public class TubeMQOptions {
+
+ // --------------------------------------------------------------------------------------------
+ // Option enumerations
+ // --------------------------------------------------------------------------------------------
+
+ public static final String PROPERTIES_PREFIX = "properties.";
+
+ // Start up offset.
+ // Always start from the max consume position.
+ public static final String CONSUMER_FROM_MAX_OFFSET_ALWAYS = "max";
+ // Start from the latest position for the first time. Otherwise start from last consume position.
+ public static final String CONSUMER_FROM_LATEST_OFFSET = "latest";
+ // Start from 0 for the first time. Otherwise start from last consume position.
+ public static final String CONSUMER_FROM_FIRST_OFFSET = "earliest";
+
+ // --------------------------------------------------------------------------------------------
+ // Format options
+ // --------------------------------------------------------------------------------------------
+
+ public static final ConfigOption KEY_FORMAT = ConfigOptions
+ .key("key." + FORMAT.key())
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Defines the format identifier for encoding key data. "
+ + "The identifier is used to discover a suitable format factory.");
+
+ public static final ConfigOption> KEY_FIELDS =
+ ConfigOptions.key("key.fields")
+ .stringType()
+ .asList()
+ .defaultValues()
+ .withDescription(
+ "Defines an explicit list of physical columns from the table schema "
+ + "that configure the data type for the key format. By default, this list is "
+ + "empty and thus a key is undefined.");
+
+ // --------------------------------------------------------------------------------------------
+ // TubeMQ specific options
+ // --------------------------------------------------------------------------------------------
+
+ public static final ConfigOption TOPIC =
+ ConfigOptions.key("topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Topic names from which the table is read. Either 'topic' "
+ + "or 'topic-pattern' must be set for source.");
+
+ public static final ConfigOption TOPIC_PATTERN =
+ ConfigOptions.key("topic-pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional topic pattern from which the table is read for source."
+ + " Either 'topic' or 'topic-pattern' must be set.");
+
+ public static final ConfigOption MASTER_RPC =
+ ConfigOptions.key("master.rpc")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Required TubeMQ master connection string");
+
+ public static final ConfigOption GROUP_NAME =
+ ConfigOptions.key("group.name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Required consumer group in TubeMQ consumer");
+
+ public static final ConfigOption TUBE_MESSAGE_NOT_FOUND_WAIT_PERIOD =
+ ConfigOptions.key("tubemq.message.not.found.wait.period")
+ .stringType()
+ .defaultValue("350ms")
+ .withDescription("The time of waiting period if "
+ + "tubeMQ broker return message not found.");
+
+ public static final ConfigOption TUBE_SUBSCRIBE_RETRY_TIMEOUT =
+ ConfigOptions.key("tubemq.subscribe.retry.timeout")
+ .longType()
+ .defaultValue(300000L)
+ .withDescription("The time of subscribing tubeMQ timeout, in millisecond");
+
+ public static final ConfigOption SOURCE_EVENT_QUEUE_CAPACITY =
+ ConfigOptions.key("source.event.queue.capacity")
+ .intType()
+ .defaultValue(1024);
+
+ public static final ConfigOption SESSION_KEY =
+ ConfigOptions.key("session.key")
+ .stringType()
+ .defaultValue("default_session_key")
+ .withDescription("The session key for this consumer group at startup.");
+
+ public static final ConfigOption> STREAMID =
+ ConfigOptions.key("topic.streamId")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription("The streamId owned this topic.");
+
+ public static final ConfigOption MAX_RETRIES =
+ ConfigOptions.key("max.retries")
+ .intType()
+ .defaultValue(5)
+ .withDescription("The maximum number of retries when an "
+ + "exception is caught.");
+
+ public static final ConfigOption BOOTSTRAP_FROM_MAX =
+ ConfigOptions.key("bootstrap.from.max")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("True if consuming from the most recent "
+ + "position when the tubemq source starts.. It only takes "
+ + "effect when the tubemq source does not recover from "
+ + "checkpoints.");
+
+ public static final ConfigOption SOURCE_MAX_IDLE_TIME =
+ ConfigOptions.key("source.task.max.idle.time")
+ .stringType()
+ .defaultValue("5min")
+ .withDescription("The max time of the source marked as temporarily idle.");
+
+ public static final ConfigOption MESSAGE_NOT_FOUND_WAIT_PERIOD =
+ ConfigOptions.key("message.not.found.wait.period")
+ .stringType()
+ .defaultValue("500ms")
+ .withDescription("The time of waiting period if tubemq broker return message not found.");
+
+ public static final ConfigOption VALUE_FIELDS_INCLUDE =
+ ConfigOptions.key("value.fields-include")
+ .enumType(ValueFieldsStrategy.class)
+ .defaultValue(ValueFieldsStrategy.ALL)
+ .withDescription(
+ String.format(
+ "Defines a strategy how to deal with key columns in the data type "
+ + "of the value format. By default, '%s' physical columns "
+ + "of the table schema will be included in the value "
+ + "format which means that the key columns "
+ + "appear in the data type for both the key and value format.",
+ ValueFieldsStrategy.ALL));
+
+ public static final ConfigOption KEY_FIELDS_PREFIX =
+ ConfigOptions.key("key.fields-prefix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Defines a custom prefix for all fields of the key format to avoid "
+ + "name clashes with fields of the value format. "
+ + "By default, the prefix is empty.")
+ .linebreak()
+ .text(
+ String.format(
+ "If a custom prefix is defined, both the table schema "
+ + "and '%s' will work with prefixed names.",
+ KEY_FIELDS.key()))
+ .linebreak()
+ .text(
+ "When constructing the data type of the key format, "
+ + "the prefix will be removed and the "
+ + "non-prefixed names will be used within the key format.")
+ .linebreak()
+ .text(
+ String.format(
+ "Please note that this option requires that '%s' must be '%s'.",
+ VALUE_FIELDS_INCLUDE.key(),
+ ValueFieldsStrategy.EXCEPT_KEY))
+ .build());
+
+ // --------------------------------------------------------------------------------------------
+ // Validation
+ // --------------------------------------------------------------------------------------------
+ private static final Set CONSUMER_STARTUP_MODE_ENUMS = new HashSet<>(Arrays.asList(
+ CONSUMER_FROM_MAX_OFFSET_ALWAYS,
+ CONSUMER_FROM_LATEST_OFFSET,
+ CONSUMER_FROM_FIRST_OFFSET));
+
+ public static Configuration getTubeMQProperties(Map tableOptions) {
+ final Configuration tubeMQProperties = new Configuration();
+
+ if (hasTubeMQClientProperties(tableOptions)) {
+ tableOptions.keySet().stream()
+ .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+ .forEach(
+ key -> {
+ final String value = tableOptions.get(key);
+ final String subKey = key.substring((PROPERTIES_PREFIX).length());
+ tubeMQProperties.toMap().put(subKey, value);
+ });
+ }
+ return tubeMQProperties;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Scan specific options
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Decides if the table options contains TubeMQ client properties that start with prefix
+ * 'properties'.
+ */
+ private static boolean hasTubeMQClientProperties(Map tableOptions) {
+ return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
+ }
+
+ public static String getSourceTopics(ReadableConfig tableOptions) {
+ return tableOptions.getOptional(TOPIC).orElse(null);
+ }
+
+ public static String getSinkTopics(ReadableConfig tableOptions) {
+ return tableOptions.getOptional(TOPIC).orElse(null);
+ }
+
+ public static String getMasterRpcAddress(ReadableConfig tableOptions) {
+ return tableOptions.getOptional(MASTER_RPC).orElse(null);
+ }
+
+ public static TreeSet getTiSet(ReadableConfig tableOptions) {
+ TreeSet set = new TreeSet<>();
+ tableOptions.getOptional(STREAMID).ifPresent(new Consumer>() {
+
+ @Override
+ public void accept(List strings) {
+ set.addAll(strings);
+ }
+ });
+ return set;
+ }
+
+ public static String getConsumerGroup(ReadableConfig tableOptions) {
+ return tableOptions.getOptional(GROUP_NAME).orElse(null);
+ }
+
+ public static String getSessionKey(ReadableConfig tableOptions) {
+ return tableOptions.getOptional(SESSION_KEY).orElse(SESSION_KEY.defaultValue());
+ }
+
+ /**
+ * Strategies to derive the data type of a value format by considering a key format.
+ */
+ public enum ValueFieldsStrategy {
+
+ ALL,
+
+ EXCEPT_KEY
+
+ }
+
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java
new file mode 100644
index 00000000000..5d2f8c2a4d2
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.inlong.sort.tubemq.FlinkTubeMQProducer;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.TreeSet;
+
+public class TubeMQTableSink implements DynamicTableSink {
+
+ /**
+ * Format for encoding values from TubeMQ.
+ */
+ private final EncodingFormat> valueEncodingFormat;
+ /**
+ * Data type to configure the formats.
+ */
+ private final DataType physicalDataType;
+ /**
+ * The TubeMQ topic name.
+ */
+ private final String topic;
+ /**
+ * The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715.
+ */
+ private final String masterAddress;
+ /**
+ * The TubeMQ tid filter collection.
+ */
+ private final TreeSet tidSet;
+ /**
+ * The parameters collection for tubemq producer.
+ */
+ private final Configuration configuration;
+
+ public TubeMQTableSink(
+ DataType physicalDataType,
+ EncodingFormat> valueEncodingFormat,
+ String topic,
+ String masterAddress,
+ TreeSet tidSet,
+ Configuration configuration) {
+ Preconditions.checkNotNull(valueEncodingFormat, "The serialization schema must not be null.");
+ Preconditions.checkNotNull(physicalDataType, "Physical data type must not be null.");
+ Preconditions.checkNotNull(topic, "Topic must not be null.");
+ Preconditions.checkNotNull(masterAddress, "Master address must not be null.");
+ Preconditions.checkNotNull(configuration, "The configuration must not be null.");
+ Preconditions.checkNotNull(tidSet, "The tid set must not be null.");
+
+ this.valueEncodingFormat = valueEncodingFormat;
+ this.physicalDataType = physicalDataType;
+ this.topic = topic;
+ this.masterAddress = masterAddress;
+ this.tidSet = tidSet;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return valueEncodingFormat.getChangelogMode();
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ final SerializationSchema serialization = createSerialization(context,
+ valueEncodingFormat, physicalDataType);
+
+ final FlinkTubeMQProducer tubeMQProducer =
+ createTubeMQProducer(topic, masterAddress, serialization, configuration);
+
+ return SinkFunctionProvider.of(tubeMQProducer, 1);
+ }
+
+ private FlinkTubeMQProducer createTubeMQProducer(
+ String topic,
+ String masterAddress,
+ SerializationSchema serializationSchema,
+ Configuration configuration) {
+ final FlinkTubeMQProducer tubeMQProducer =
+ new FlinkTubeMQProducer(topic, masterAddress, serializationSchema, tidSet, configuration);
+ return tubeMQProducer;
+ }
+
+ private SerializationSchema createSerialization(
+ Context context,
+ EncodingFormat> format,
+ DataType physicalDataType) {
+ return format.createRuntimeEncoder(context, physicalDataType);
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new TubeMQTableSink(
+ physicalDataType,
+ valueEncodingFormat,
+ topic,
+ masterAddress,
+ tidSet,
+ configuration);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "TubeMQ table sink";
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
new file mode 100644
index 00000000000..2dc21ca2e87
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.inlong.sort.tubemq.FlinkTubeMQConsumer;
+import org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema.MetadataConverter;
+import org.apache.inlong.tubemq.corebase.Message;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * .
+ */
+public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown {
+
+ private static final String VALUE_METADATA_PREFIX = "value.";
+
+ // --------------------------------------------------------------------------------------------
+ // Mutable attributes
+ // --------------------------------------------------------------------------------------------
+ /**
+ * Data type to configure the formats.
+ */
+ private final DataType physicalDataType;
+ /**
+ * Format for decoding values from TubeMQ.
+ */
+ private final DecodingFormat> valueDecodingFormat;
+
+ // -------------------------------------------------------------------
+ /**
+ * The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715.
+ */
+ private final String masterAddress;
+ /**
+ * The TubeMQ topic name.
+ */
+ private final String topic;
+ /**
+ * The TubeMQ tid filter collection.
+ */
+ private final TreeSet tidSet;
+ /**
+ * The TubeMQ consumer group name.
+ */
+ private final String consumerGroup;
+ /**
+ * The parameters collection for TubeMQ consumer.
+ */
+ private final Configuration configuration;
+ /**
+ * The TubeMQ session key.
+ */
+ private final String sessionKey;
+ /**
+ * Field name of the processing time attribute, null if no processing time
+ * field is defined.
+ */
+ private final Optional proctimeAttribute;
+ /**
+ * status of error
+ */
+ private final boolean ignoreErrors;
+ /**
+ * The InLong inner format.
+ */
+ private final boolean innerFormat;
+ /**
+ * Data type that describes the final output of the source.
+ */
+ protected DataType producedDataType;
+ /**
+ * Metadata that is appended at the end of a physical source row.
+ */
+ protected List metadataKeys;
+ /**
+ * Watermark strategy that is used to generate per-partition watermark.
+ */
+ @Nullable
+ private WatermarkStrategy watermarkStrategy;
+
+ public TubeMQTableSource(DataType physicalDataType,
+ DecodingFormat> valueDecodingFormat,
+ String masterAddress, String topic,
+ TreeSet tidSet, String consumerGroup, String sessionKey,
+ Configuration configuration, @Nullable WatermarkStrategy watermarkStrategy,
+ Optional proctimeAttribute, Boolean ignoreErrors, Boolean innerFormat) {
+
+ Preconditions.checkNotNull(physicalDataType, "Physical data type must not be null.");
+ Preconditions.checkNotNull(valueDecodingFormat, "The deserialization schema must not be null.");
+ Preconditions.checkNotNull(masterAddress, "The master address must not be null.");
+ Preconditions.checkNotNull(topic, "The topic must not be null.");
+ Preconditions.checkNotNull(tidSet, "The tid set must not be null.");
+ Preconditions.checkNotNull(consumerGroup, "The consumer group must not be null.");
+ Preconditions.checkNotNull(configuration, "The configuration must not be null.");
+
+ this.physicalDataType = physicalDataType;
+ this.producedDataType = physicalDataType;
+ this.metadataKeys = Collections.emptyList();
+ this.valueDecodingFormat = valueDecodingFormat;
+ this.masterAddress = masterAddress;
+ this.topic = topic;
+ this.tidSet = tidSet;
+ this.consumerGroup = consumerGroup;
+ this.sessionKey = sessionKey;
+ this.configuration = configuration;
+ this.watermarkStrategy = watermarkStrategy;
+ this.proctimeAttribute = proctimeAttribute;
+ this.ignoreErrors = ignoreErrors;
+ this.innerFormat = innerFormat;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return valueDecodingFormat.getChangelogMode();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+ final LogicalType physicalType = physicalDataType.getLogicalType();
+ final int physicalFieldCount = LogicalTypeChecks.getFieldCount(physicalType);
+ final IntStream physicalFields = IntStream.range(0, physicalFieldCount);
+ final DeserializationSchema deserialization = createDeserialization(context,
+ valueDecodingFormat, physicalFields.toArray(), null);
+
+ final TypeInformation producedTypeInfo = context.createTypeInformation(physicalDataType);
+
+ final FlinkTubeMQConsumer tubeMQConsumer = createTubeMQConsumer(deserialization, producedTypeInfo,
+ ignoreErrors);
+
+ return SourceFunctionProvider.of(tubeMQConsumer, false);
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new TubeMQTableSource(
+ physicalDataType, valueDecodingFormat, masterAddress,
+ topic, tidSet, consumerGroup, sessionKey, configuration,
+ watermarkStrategy, proctimeAttribute, ignoreErrors, innerFormat);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "TubeMQ table source";
+ }
+
+ @Override
+ public Map listReadableMetadata() {
+ final Map metadataMap = new LinkedHashMap<>();
+ valueDecodingFormat
+ .listReadableMetadata()
+ .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, value));
+
+ // add connector metadata
+ Stream.of(ReadableMetadata.values())
+ .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
+
+ return metadataMap;
+ }
+
+ @Override
+ public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {
+ // separate connector and format metadata
+ final List formatMetadataKeys =
+ metadataKeys.stream()
+ .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
+ .collect(Collectors.toList());
+ final List connectorMetadataKeys = new ArrayList<>(metadataKeys);
+ connectorMetadataKeys.removeAll(formatMetadataKeys);
+
+ // push down format metadata
+ final Map formatMetadata = valueDecodingFormat.listReadableMetadata();
+ if (formatMetadata.size() > 0) {
+ final List requestedFormatMetadataKeys =
+ formatMetadataKeys.stream()
+ .map(k -> k.substring(VALUE_METADATA_PREFIX.length()))
+ .collect(Collectors.toList());
+ valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ }
+ this.metadataKeys = connectorMetadataKeys;
+ this.producedDataType = producedDataType;
+
+ }
+
+ @Override
+ public void applyWatermark(WatermarkStrategy watermarkStrategy) {
+ this.watermarkStrategy = watermarkStrategy;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final TubeMQTableSource that = (TubeMQTableSource) o;
+ return Objects.equals(physicalDataType, that.physicalDataType)
+ && Objects.equals(valueDecodingFormat, that.valueDecodingFormat)
+ && Objects.equals(masterAddress, that.masterAddress)
+ && Objects.equals(topic, that.topic)
+ && Objects.equals(String.valueOf(tidSet), String.valueOf(that.tidSet))
+ && Objects.equals(consumerGroup, that.consumerGroup)
+ && Objects.equals(proctimeAttribute, that.proctimeAttribute)
+ && Objects.equals(watermarkStrategy, that.watermarkStrategy);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ physicalDataType,
+ valueDecodingFormat,
+ masterAddress,
+ topic,
+ tidSet,
+ consumerGroup,
+ configuration,
+ watermarkStrategy,
+ proctimeAttribute);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Metadata handling
+ // --------------------------------------------------------------------------------------------
+
+ @Nullable
+ private DeserializationSchema createDeserialization(
+ Context context,
+ @Nullable DecodingFormat> format,
+ int[] projection,
+ @Nullable String prefix) {
+ if (format == null) {
+ return null;
+ }
+ DataType physicalFormatDataType = DataTypeUtils.projectRow(this.physicalDataType, projection);
+ if (prefix != null) {
+ physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+ }
+ return format.createRuntimeDecoder(context, physicalFormatDataType);
+ }
+
+ protected FlinkTubeMQConsumer createTubeMQConsumer(
+ DeserializationSchema deserialization,
+ TypeInformation producedTypeInfo,
+ boolean ignoreErrors) {
+ final MetadataConverter[] metadataConverters =
+ metadataKeys.stream()
+ .map(k -> Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .map(m -> m.converter)
+ .toArray(MetadataConverter[]::new);
+ final DeserializationSchema tubeMQDeserializer = new DynamicTubeMQDeserializationSchema(
+ deserialization, metadataConverters, producedTypeInfo, ignoreErrors);
+
+ final FlinkTubeMQConsumer tubeMQConsumer = new FlinkTubeMQConsumer(masterAddress, topic, tidSet,
+ consumerGroup, tubeMQDeserializer, configuration, sessionKey, innerFormat);
+ return tubeMQConsumer;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Metadata handling
+ // --------------------------------------------------------------------------------------------
+
+ enum ReadableMetadata {
+
+ TOPIC(
+ "topic",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(Message msg) {
+ return StringData.fromString(msg.getTopic());
+ }
+ });
+
+ final String key;
+
+ final DataType dataType;
+
+ final MetadataConverter converter;
+
+ ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000000..ed092ea8e5b
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.tubemq.table.TubeMQDynamicTableFactory