VALUE_FORMAT =
+ ConfigOptions.key("value" + FORMAT_SUFFIX)
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines the format identifier for decoding/encoding value data. "
+ + "The identifier is used to discover a suitable format factory.");
+
+ // --------------------------------------------------------------------------------------------
+ // Pulsar Options
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Exactly same as {@link
+ * org.apache.flink.connector.pulsar.common.config.PulsarOptions#PULSAR_ADMIN_URL}. Copied here
+ * because it is a required config option and should not be included in the {@link
+ * org.apache.flink.table.factories.FactoryUtil.FactoryHelper#validateExcept(String...)} method.
+ *
+ * By default all {@link org.apache.flink.connector.pulsar.common.config.PulsarOptions} are
+ * included in the validateExcept() method./p>
+ */
+ public static final ConfigOption ADMIN_URL =
+ ConfigOptions.key("admin-url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "The Pulsar service HTTP URL for the admin endpoint. For example, %s, or %s for TLS.",
+ code("http://my-broker.example.com:8080"),
+ code("https://my-broker.example.com:8443"))
+ .build());
+
+ /**
+ * Exactly same as {@link
+ * org.apache.flink.connector.pulsar.common.config.PulsarOptions#PULSAR_SERVICE_URL}. Copied
+ * here because it is a required config option and should not be included in the {@link
+ * org.apache.flink.table.factories.FactoryUtil.FactoryHelper#validateExcept(String...)} method.
+ *
+ * By default all {@link org.apache.flink.connector.pulsar.common.config.PulsarOptions} are
+ * included in the validateExcept() method./p>
+ */
+ public static final ConfigOption SERVICE_URL =
+ ConfigOptions.key("service-url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text("Service URL provider for Pulsar service.")
+ .linebreak()
+ .text(
+ "To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.")
+ .linebreak()
+ .text(
+ "You can assign Pulsar protocol URLs to specific clusters and use the Pulsar scheme.")
+ .linebreak()
+ .list(
+ text(
+ "This is an example of %s: %s.",
+ code("localhost"),
+ code("pulsar://localhost:6650")),
+ text(
+ "If you have multiple brokers, the URL is as: %s",
+ code(
+ "pulsar://localhost:6550,localhost:6651,localhost:6652")),
+ text(
+ "A URL for a production Pulsar cluster is as: %s",
+ code(
+ "pulsar://pulsar.us-west.example.com:6650")),
+ text(
+ "If you use TLS authentication, the URL is as %s",
+ code(
+ "pulsar+ssl://pulsar.us-west.example.com:6651")))
+ .build());
+
+ public static final ConfigOption EXPLICIT =
+ ConfigOptions.key("explicit")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Indicate if the table is an explicit Flink table.");
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java
new file mode 100644
index 00000000000..62440a462bf
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.types.RowKind;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.getValueDecodingFormat;
+import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FIELDS;
+import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FORMAT;
+import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
+import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
+import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
+import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
+import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
+import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
+import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPIC;
+import static org.apache.pulsar.common.naming.TopicName.isValid;
+
+/** Util class for source and sink validation rules.
+ * Modified from {@link org.apache.flink.connector.pulsar.table.PulsarTableValidationUtils}.
+*/
+public class PulsarTableValidationUtils {
+
+ private PulsarTableValidationUtils() {
+ }
+
+ public static void validatePrimaryKeyConstraints(
+ ObjectIdentifier tableName,
+ int[] primaryKeyIndexes,
+ FactoryUtil.TableFactoryHelper helper) {
+ final DecodingFormat> format =
+ getValueDecodingFormat(helper);
+ if (primaryKeyIndexes.length > 0
+ && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ throw new ValidationException(
+ String.format(
+ "The Pulsar table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
+ + " on the table, because it can't guarantee the semantic of primary key.",
+ tableName.asSummaryString(), format));
+ }
+ }
+
+ public static void validateTableSourceOptions(ReadableConfig tableOptions) {
+ validateTopicsConfigs(tableOptions);
+ validateStartCursorConfigs(tableOptions);
+ validateStopCursorConfigs(tableOptions);
+ validateSubscriptionTypeConfigs(tableOptions);
+ validateKeyFormatConfigs(tableOptions);
+ }
+
+ protected static void validateTopicsConfigs(ReadableConfig tableOptions) {
+ if (tableOptions.get(TOPIC).isEmpty()) {
+ throw new ValidationException("The topics list should not be empty.");
+ }
+
+ for (String topic : tableOptions.get(TOPIC)) {
+ if (!isValid(topic)) {
+ throw new ValidationException(
+ String.format("The topics name %s is not a valid topic name.", topic));
+ }
+ }
+ }
+
+ protected static void validateStartCursorConfigs(ReadableConfig tableOptions) {
+ if (tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()
+ && tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "Only one of %s and %s can be specified. Detected both of them",
+ SOURCE_START_FROM_MESSAGE_ID, SOURCE_START_FROM_PUBLISH_TIME));
+ }
+ }
+
+ protected static void validateStopCursorConfigs(ReadableConfig tableOptions) {
+ Set> conflictConfigOptions =
+ Sets.newHashSet(
+ SOURCE_STOP_AT_MESSAGE_ID,
+ SOURCE_STOP_AFTER_MESSAGE_ID,
+ SOURCE_STOP_AT_PUBLISH_TIME);
+
+ long configsNums =
+ conflictConfigOptions.stream()
+ .map(tableOptions::getOptional)
+ .filter(Optional::isPresent)
+ .count();
+
+ if (configsNums > 1) {
+ throw new ValidationException(
+ String.format(
+ "Only one of %s, %s and %s can be specified. Detected more than 1 of them",
+ SOURCE_STOP_AT_MESSAGE_ID,
+ SOURCE_STOP_AFTER_MESSAGE_ID,
+ SOURCE_STOP_AT_PUBLISH_TIME));
+ }
+ }
+
+ protected static void validateSubscriptionTypeConfigs(ReadableConfig tableOptions) {
+ SubscriptionType subscriptionType = tableOptions.get(SOURCE_SUBSCRIPTION_TYPE);
+
+ if (subscriptionType == SubscriptionType.Failover) {
+ throw new ValidationException(
+ String.format(
+ "%s SubscriptionType is not supported. ", SubscriptionType.Failover));
+ }
+ }
+
+ protected static void validateKeyFormatConfigs(ReadableConfig tableOptions) {
+ final Optional optionalKeyFormat = tableOptions.getOptional(KEY_FORMAT);
+ final Optional> optionalKeyFields = tableOptions.getOptional(KEY_FIELDS);
+ if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "The option '%s' can only be declared if a key format is defined using '%s'.",
+ KEY_FIELDS.key(), KEY_FORMAT.key()));
+ } else if (optionalKeyFormat.isPresent()
+ && (!optionalKeyFields.isPresent() || optionalKeyFields.get().size() == 0)) {
+ throw new ValidationException(
+ String.format(
+ "A key format '%s' requires the declaration of one or more of key fields using '%s'.",
+ KEY_FORMAT.key(), KEY_FIELDS.key()));
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java
new file mode 100644
index 00000000000..2e0c1ab1dd2
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.pulsar.client.api.Message;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Class for reading metadata fields from a Pulsar message and put in corresponding Flink row
+ * fields.
+ *
+ * Contains list of readable metadata and provide util methods for metadata manipulation.
+ * modified from {@link org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata}
+ */
+public class PulsarReadableMetadata implements Serializable {
+
+ private static final long serialVersionUID = -4409932324481235973L;
+
+ private final List connectorMetadataKeys;
+
+ private final List metadataConverters;
+
+ public PulsarReadableMetadata(List connectorMetadataKeys) {
+ this.connectorMetadataKeys = connectorMetadataKeys;
+ this.metadataConverters = initializeMetadataConverters();
+ }
+
+ private List initializeMetadataConverters() {
+ return connectorMetadataKeys.stream()
+ .map(
+ k -> Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .map(m -> m.converter)
+ .collect(Collectors.toList());
+ }
+
+ public void appendProducedRowWithMetadata(
+ GenericRowData producedRowData, int physicalArity, Message> message) {
+ for (int metadataPos = 0; metadataPos < metadataConverters.size(); metadataPos++) {
+ producedRowData.setField(
+ physicalArity + metadataPos, metadataConverters.get(metadataPos).read(message));
+ }
+ }
+
+ public int getConnectorMetadataArity() {
+ return metadataConverters.size();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Metadata handling
+ // --------------------------------------------------------------------------------------------
+ interface MetadataConverter extends Serializable {
+
+ Object read(Message> message);
+ }
+
+ /** Lists the metadata that is readable from a Pulsar message. Used in SQL source connector. */
+ public enum ReadableMetadata {
+
+ TOPIC(
+ "topic",
+ DataTypes.STRING().notNull(),
+ message -> StringData.fromString(message.getTopicName())),
+
+ MESSAGE_SIZE("message_size", DataTypes.INT().notNull(), Message::size),
+
+ PRODUCER_NAME(
+ "producer_name",
+ DataTypes.STRING().notNull(),
+ message -> StringData.fromString(message.getProducerName())),
+
+ MESSAGE_ID(
+ "message_id",
+ DataTypes.BYTES().notNull(),
+ message -> message.getMessageId().toByteArray()),
+
+ SEQUENCE_ID("sequenceId", DataTypes.BIGINT().notNull(), Message::getSequenceId),
+
+ PUBLISH_TIME(
+ "publish_time",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+ message -> TimestampData.fromEpochMillis(message.getPublishTime())),
+
+ EVENT_TIME(
+ "event_time",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+ message -> TimestampData.fromEpochMillis(message.getEventTime())),
+
+ PROPERTIES(
+ "properties",
+ // key and value of the map are nullable to make handling easier in queries
+ DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable())
+ .notNull(),
+ message -> {
+ final Map map = new HashMap<>();
+ for (Map.Entry e : message.getProperties().entrySet()) {
+ map.put(
+ StringData.fromString(e.getKey()),
+ StringData.fromString(e.getValue()));
+ }
+ return new GenericMapData(map);
+ });
+
+ public final String key;
+
+ public final DataType dataType;
+
+ public final MetadataConverter converter;
+
+ ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
new file mode 100644
index 00000000000..2f87ca15643
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.DeserializationException;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.pulsar.client.api.Message;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Contains the projection information needed to map a Pulsar message to proper key fields, value
+ * fields and metadata fields.
+ * modified from {@link org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter}
+ */
+public class PulsarRowDataConverter implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int physicalArity;
+
+ private final int[] keyProjection;
+
+ private final int[] valueProjection;
+
+ private final PulsarReadableMetadata readableMetadata;
+
+ private final boolean upsertMode;
+
+ public PulsarRowDataConverter(
+ int physicalArity,
+ int[] keyProjection,
+ int[] valueProjection,
+ PulsarReadableMetadata readableMetadata,
+ boolean upsertMode) {
+ this.physicalArity = physicalArity;
+ this.keyProjection = keyProjection;
+ this.valueProjection = valueProjection;
+ this.readableMetadata = readableMetadata;
+ this.upsertMode = upsertMode;
+ }
+
+ public void projectToProducedRowAndCollect(
+ Message> message,
+ List keyRowDataList,
+ List valueRowDataList,
+ Collector collector) {
+ // no key defined
+ if (hasNoKeyProjection()) {
+ valueRowDataList.forEach(
+ valueRow -> emitRow(null, (GenericRowData) valueRow, collector, message));
+ } else {
+ // otherwise emit a value for each key
+ valueRowDataList.forEach(
+ valueRow -> keyRowDataList.forEach(
+ keyRow -> emitRow(
+ (GenericRowData) keyRow,
+ (GenericRowData) valueRow,
+ collector,
+ message)));
+ }
+ }
+
+ public void projectToRowWithNullValueRow(
+ Message> message, List keyRowDataList, Collector collector) {
+ for (RowData keyRow : keyRowDataList) {
+ emitRow((GenericRowData) keyRow, null, collector, message);
+ }
+ }
+
+ private void emitRow(
+ @Nullable GenericRowData physicalKeyRow,
+ @Nullable GenericRowData physicalValueRow,
+ Collector collector,
+ Message> message) {
+
+ final RowKind rowKind;
+ if (physicalValueRow == null) {
+ if (upsertMode) {
+ rowKind = RowKind.DELETE;
+ } else {
+ throw new DeserializationException(
+ "Invalid null value received in non-upsert mode. Could not to set row kind for output record."
+ + "upsert mode is not supported yet.");
+ }
+
+ } else {
+ rowKind = physicalValueRow.getRowKind();
+ }
+
+ final GenericRowData producedRow =
+ new GenericRowData(
+ rowKind, physicalArity + readableMetadata.getConnectorMetadataArity());
+
+ for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) {
+ producedRow.setField(valueProjection[valuePos], physicalValueRow.getField(valuePos));
+ }
+
+ for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
+ assert physicalKeyRow != null;
+ producedRow.setField(keyProjection[keyPos], physicalKeyRow.getField(keyPos));
+ }
+
+ readableMetadata.appendProducedRowWithMetadata(producedRow, physicalArity, message);
+ collector.collect(producedRow);
+ }
+
+ private boolean hasNoKeyProjection() {
+ return keyProjection.length == 0;
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
new file mode 100644
index 00000000000..8377fce389a
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.apache.pulsar.client.api.Message;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A specific {@link PulsarDeserializationSchema} for {@link PulsarTableSource}.
+ * Modified from {@link org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchema}
+ */
+public class PulsarTableDeserializationSchema implements PulsarDeserializationSchema {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TypeInformation producedTypeInfo;
+
+ @Nullable
+ private final DeserializationSchema keyDeserialization;
+
+ private final DeserializationSchema valueDeserialization;
+
+ private final PulsarRowDataConverter rowDataConverter;
+
+ private final boolean upsertMode;
+
+ public PulsarTableDeserializationSchema(
+ @Nullable DeserializationSchema keyDeserialization,
+ DeserializationSchema valueDeserialization,
+ TypeInformation producedTypeInfo,
+ PulsarRowDataConverter rowDataConverter,
+ boolean upsertMode) {
+ if (upsertMode) {
+ checkNotNull(keyDeserialization, "upsert mode must specify a key format");
+ }
+ this.keyDeserialization = keyDeserialization;
+ this.valueDeserialization = checkNotNull(valueDeserialization);
+ this.rowDataConverter = checkNotNull(rowDataConverter);
+ this.producedTypeInfo = checkNotNull(producedTypeInfo);
+ this.upsertMode = upsertMode;
+ }
+
+ @Override
+ public void open(DeserializationSchema.InitializationContext context, SourceConfiguration configuration)
+ throws Exception {
+ if (keyDeserialization != null) {
+ keyDeserialization.open(context);
+ }
+ valueDeserialization.open(context);
+ }
+
+ @Override
+ public void deserialize(Message message, Collector collector)
+ throws IOException {
+
+ // Get the key row data
+ List keyRowData = new ArrayList<>();
+ if (keyDeserialization != null) {
+ keyDeserialization.deserialize(message.getKeyBytes(), new ListCollector<>(keyRowData));
+ }
+
+ // Get the value row data
+ List valueRowData = new ArrayList<>();
+
+ if (upsertMode && message.getData().length == 0) {
+ rowDataConverter.projectToRowWithNullValueRow(message, keyRowData, collector);
+ return;
+ }
+
+ valueDeserialization.deserialize(message.getData(), new ListCollector<>(valueRowData));
+
+ rowDataConverter.projectToProducedRowAndCollect(
+ message, keyRowData, valueRowData, collector);
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return producedTypeInfo;
+ }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
new file mode 100644
index 00000000000..671e82e0d3a
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Contains key, value projection and format information, and use such information to create a
+ * {@link PulsarTableDeserializationSchema} instance used by runtime {@link
+ * org.apache.flink.connector.pulsar.source.PulsarSource} instance.
+ *
+ * A Flink row fields has a strict order: Physical Fields (Key + value) + Format Metadata Fields
+ * Connector Metadata Fields. Physical Fields are fields come directly from Pulsar message body;
+ * Format Metadata Fields are from the extra information from the decoding format. Connector
+ * metadata fields are the ones most Pulsar messages have, such as publish time, message size and
+ * producer name.
+ *
+ *
In general, Physical fields + Format Metadata fields are contained in the RowData decoded
+ * using valueDecodingFormat. Only Connector Metadata fields needs to be appended to the decoded
+ * RowData. The tricky part is to put format metadata and connector metadata in the right location.
+ * This requires an explicit adjustment process.
+ *
+ *
For example, suppose Physical Fields (Key + value) + Format Metadata Fields + Connector
+ * Metadata Fields. has arity of 11, key projection is [0, 6], and physical value projection is [1,
+ * 2, 3, 4, 5], Then after the adjustment, key projection should be [0, 6], physical value
+ * projection should be [1, 2, 3, 4, 5] and format metadata projection should be [7], connector
+ * metadata projection should be [8, 9, 10].
+ * Modified from {@link org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory}
+ */
+public class PulsarTableDeserializationSchemaFactory implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final DataType physicalDataType;
+
+ @Nullable
+ private final DecodingFormat> keyDecodingFormat;
+
+ private final int[] keyProjection;
+
+ private final DecodingFormat> valueDecodingFormat;
+
+ private final int[] valueProjection;
+
+ // --------------------------------------------------------------------------------------------
+ // Mutable attributes. Will be updated after the applyReadableMetadata()
+ // --------------------------------------------------------------------------------------------
+ private DataType producedDataType;
+
+ private List connectorMetadataKeys;
+
+ private final boolean upsertMode;
+
+ public PulsarTableDeserializationSchemaFactory(
+ DataType physicalDataType,
+ @Nullable DecodingFormat> keyDecodingFormat,
+ int[] keyProjection,
+ DecodingFormat> valueDecodingFormat,
+ int[] valueProjection,
+ boolean upsertMode) {
+ this.physicalDataType =
+ checkNotNull(physicalDataType, "field physicalDataType must not be null.");
+ this.keyDecodingFormat = keyDecodingFormat;
+ this.keyProjection = checkNotNull(keyProjection);
+ this.valueDecodingFormat =
+ checkNotNull(valueDecodingFormat, "field valueDecodingFormat must not be null.");
+ this.valueProjection =
+ checkNotNull(valueProjection, "field valueProjection must not be null.");
+
+ this.producedDataType = physicalDataType;
+ this.connectorMetadataKeys = Collections.emptyList();
+ this.upsertMode = upsertMode;
+ }
+
+ private @Nullable DeserializationSchema createDeserialization(
+ DynamicTableSource.Context context,
+ @Nullable DecodingFormat> format,
+ int[] projection,
+ @Nullable String prefix) {
+ if (format == null) {
+ return null;
+ }
+
+ DataType physicalFormatDataType = Projection.of(projection).project(this.physicalDataType);
+ if (prefix != null) {
+ physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+ }
+ return format.createRuntimeDecoder(context, physicalFormatDataType);
+ }
+
+ public PulsarDeserializationSchema createPulsarDeserialization(
+ ScanTableSource.ScanContext context) {
+ final DeserializationSchema keyDeserialization =
+ createDeserialization(context, keyDecodingFormat, keyProjection, "");
+ final DeserializationSchema valueDeserialization =
+ createDeserialization(context, valueDecodingFormat, valueProjection, "");
+
+ final TypeInformation producedTypeInfo =
+ context.createTypeInformation(producedDataType);
+
+ final PulsarReadableMetadata readableMetadata =
+ new PulsarReadableMetadata(connectorMetadataKeys);
+
+ // Get Physical Fields (key + value) + Format Metadata arity
+ final int physicalPlusFormatMetadataArity =
+ DataType.getFieldDataTypes(producedDataType).size()
+ - readableMetadata.getConnectorMetadataArity();
+ final int[] physicalValuePlusFormatMetadataProjection =
+ adjustValueProjectionByAppendConnectorMetadata(physicalPlusFormatMetadataArity);
+
+ final PulsarRowDataConverter rowDataConverter =
+ new PulsarRowDataConverter(
+ physicalPlusFormatMetadataArity,
+ keyProjection,
+ physicalValuePlusFormatMetadataProjection,
+ readableMetadata,
+ upsertMode);
+
+ return new PulsarTableDeserializationSchema(
+ keyDeserialization,
+ valueDeserialization,
+ producedTypeInfo,
+ rowDataConverter,
+ upsertMode);
+ }
+
+ public void setProducedDataType(DataType producedDataType) {
+ this.producedDataType = producedDataType;
+ }
+
+ public void setConnectorMetadataKeys(List metadataKeys) {
+ this.connectorMetadataKeys = metadataKeys;
+ }
+
+ private int[] adjustValueProjectionByAppendConnectorMetadata(
+ int physicalValuePlusFormatMetadataArity) {
+ // Concat the Physical Fields (value only) with Format metadata projection.
+ final int[] physicalValuePlusFormatMetadataProjection =
+ IntStream.concat(
+ IntStream.of(valueProjection),
+ IntStream.range(
+ keyProjection.length + valueProjection.length,
+ physicalValuePlusFormatMetadataArity))
+ .toArray();
+ return physicalValuePlusFormatMetadataProjection;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PulsarTableDeserializationSchemaFactory that = (PulsarTableDeserializationSchemaFactory) o;
+ return Objects.equals(physicalDataType, that.physicalDataType)
+ && Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
+ && Arrays.equals(keyProjection, that.keyProjection)
+ && Objects.equals(valueDecodingFormat, that.valueDecodingFormat)
+ && Arrays.equals(valueProjection, that.valueProjection)
+ && Objects.equals(producedDataType, that.producedDataType)
+ && Objects.equals(connectorMetadataKeys, that.connectorMetadataKeys)
+ && Objects.equals(upsertMode, that.upsertMode);
+ }
+
+ @Override
+ public int hashCode() {
+ int result =
+ Objects.hash(
+ physicalDataType,
+ keyDecodingFormat,
+ valueDecodingFormat,
+ producedDataType,
+ connectorMetadataKeys,
+ upsertMode);
+ result = 31 * result + Arrays.hashCode(keyProjection);
+ result = 31 * result + Arrays.hashCode(valueProjection);
+ return result;
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
new file mode 100644
index 00000000000..9b3bf703823
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.pulsar.source.PulsarSource;
+import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ScanTableSource} implementation for Pulsar SQL Connector. It uses a {@link
+ * SourceProvider} so it doesn't need to support {@link
+ * org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown} interface.
+ * Modified from {@link org.apache.flink.connector.pulsar.table.source.PulsarTableSource}
+ * {@link PulsarTableSource}
+ */
+public class PulsarTableSource implements ScanTableSource, SupportsReadingMetadata {
+ // --------------------------------------------------------------------------------------------
+ // Format attributes
+ // --------------------------------------------------------------------------------------------
+
+ private static final String FORMAT_METADATA_PREFIX = "value.";
+
+ private final PulsarTableDeserializationSchemaFactory deserializationSchemaFactory;
+
+ /**
+ * Usually it is the same as the valueDecodingFormat, but use a different naming to show that it
+ * is used to list all the format metadata keys.
+ */
+ private final DecodingFormat> decodingFormatForReadingMetadata;
+
+ private final ChangelogMode changelogMode;
+
+ // --------------------------------------------------------------------------------------------
+ // PulsarSource needed attributes
+ // --------------------------------------------------------------------------------------------
+
+ private final List topics;
+
+ private final Properties properties;
+
+ private final StartCursor startCursor;
+
+ private final StopCursor stopCursor;
+
+ private final SubscriptionType subscriptionType;
+
+ public PulsarTableSource(
+ PulsarTableDeserializationSchemaFactory deserializationSchemaFactory,
+ DecodingFormat> decodingFormatForReadingMetadata,
+ ChangelogMode changelogMode,
+ List topics,
+ Properties properties,
+ StartCursor startCursor,
+ StopCursor stopCursor,
+ SubscriptionType subscriptionType) {
+ // Format attributes
+ this.deserializationSchemaFactory = checkNotNull(deserializationSchemaFactory);
+ this.decodingFormatForReadingMetadata = checkNotNull(decodingFormatForReadingMetadata);
+ this.changelogMode = changelogMode;
+ // DataStream connector attributes
+ this.topics = topics;
+ this.properties = checkNotNull(properties);
+ this.startCursor = checkNotNull(startCursor);
+ this.stopCursor = checkNotNull(stopCursor);
+ this.subscriptionType = checkNotNull(subscriptionType);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return changelogMode;
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+ PulsarDeserializationSchema deserializationSchema =
+ deserializationSchemaFactory.createPulsarDeserialization(context);
+ PulsarSource source =
+ PulsarSource.builder()
+ .setTopics(topics)
+ .setStartCursor(startCursor)
+ .setUnboundedStopCursor(stopCursor)
+ .setDeserializationSchema(deserializationSchema)
+ .setProperties(properties)
+ .setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true)
+ .build();
+ return SourceProvider.of(source);
+ }
+
+ /**
+ * According to convention, the order of the final row must be PHYSICAL + FORMAT METADATA +
+ * CONNECTOR METADATA where the format metadata has the highest precedence.
+ *
+ * @return
+ */
+ @Override
+ public Map listReadableMetadata() {
+ final Map allMetadataMap = new LinkedHashMap<>();
+
+ // add value format metadata with prefix
+ decodingFormatForReadingMetadata
+ .listReadableMetadata()
+ .forEach((key, value) -> allMetadataMap.put(FORMAT_METADATA_PREFIX + key, value));
+ // add connector metadata
+ Stream.of(PulsarReadableMetadata.ReadableMetadata.values())
+ .forEachOrdered(m -> allMetadataMap.putIfAbsent(m.key, m.dataType));
+
+ return allMetadataMap;
+ }
+
+ @Override
+ public void applyReadableMetadata(List allMetadataKeys, DataType producedDataType) {
+ // separate connector and format metadata
+ final List formatMetadataKeys =
+ allMetadataKeys.stream()
+ .filter(k -> k.startsWith(FORMAT_METADATA_PREFIX))
+ .collect(Collectors.toList());
+
+ final List connectorMetadataKeys = new ArrayList<>(allMetadataKeys);
+ connectorMetadataKeys.removeAll(formatMetadataKeys);
+
+ // push down format metadata
+ final Map formatMetadata =
+ decodingFormatForReadingMetadata.listReadableMetadata();
+ if (formatMetadata.size() > 0) {
+ final List requestedFormatMetadataKeys =
+ formatMetadataKeys.stream()
+ .map(k -> k.substring(FORMAT_METADATA_PREFIX.length()))
+ .collect(Collectors.toList());
+ decodingFormatForReadingMetadata.applyReadableMetadata(requestedFormatMetadataKeys);
+ }
+
+ // update the factory attributes.
+ deserializationSchemaFactory.setConnectorMetadataKeys(connectorMetadataKeys);
+ deserializationSchemaFactory.setProducedDataType(producedDataType);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Pulsar table source";
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new PulsarTableSource(
+ deserializationSchemaFactory,
+ decodingFormatForReadingMetadata,
+ changelogMode,
+ topics,
+ properties,
+ startCursor,
+ stopCursor,
+ subscriptionType);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PulsarTableSource that = (PulsarTableSource) o;
+ return Objects.equals(deserializationSchemaFactory, that.deserializationSchemaFactory)
+ && Objects.equals(
+ decodingFormatForReadingMetadata, that.decodingFormatForReadingMetadata)
+ && Objects.equals(changelogMode, that.changelogMode)
+ && Objects.equals(topics, that.topics)
+ && Objects.equals(properties, that.properties)
+ && Objects.equals(startCursor, that.startCursor)
+ && Objects.equals(stopCursor, that.stopCursor)
+ && subscriptionType == that.subscriptionType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ deserializationSchemaFactory,
+ decodingFormatForReadingMetadata,
+ changelogMode,
+ topics,
+ properties,
+ startCursor,
+ stopCursor,
+ subscriptionType);
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000000..601ec9e923f
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.pulsar.PulsarTableFactory
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index dbc52018558..bc88cd22b7f 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -782,6 +782,19 @@
Source : iceberg-flink:iceberg-flink-1.15:1.3.1 (Please note that the software have been modified.)
License : https://github.com/apache/iceberg/LICENSE
+1.3.18 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java
+
+ Source : flink-connector-pulsar 4.0-SNAPSHOT (Please note that the software have been modified.)
+ License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE
+
=======================================================================
Apache InLong Subcomponents: