diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
index b7fead209a5..92b53d27349 100644
--- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
+++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
@@ -67,6 +67,14 @@
0644
+
+ ../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/target
+ inlong-sort/connectors
+
+ sort-connector-hive-v1.15-${project.version}.jar
+
+ 0644
+
../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/target
inlong-sort/connectors
diff --git a/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml
index 0d7b97df0b5..86d55b120d8 100644
--- a/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml
+++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml
@@ -64,6 +64,11 @@
flink-shaded-jackson
2.12.4-15.0
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+
org.projectlombok
diff --git a/inlong-manager/manager-plugins/pom.xml b/inlong-manager/manager-plugins/pom.xml
index 535473efe4b..e0d8a956777 100644
--- a/inlong-manager/manager-plugins/pom.xml
+++ b/inlong-manager/manager-plugins/pom.xml
@@ -28,14 +28,35 @@
pom
Apache InLong - Manager Plugins
-
- manager-plugins-flink-v1.13
- manager-plugins-flink-v1.15
- base
-
-
${project.parent.parent.basedir}
+
+
+ flink-all-version
+
+ manager-plugins-flink-v1.13
+ manager-plugins-flink-v1.15
+ base
+
+
+
+ v1.13
+
+ manager-plugins-flink-v1.13
+ base
+
+
+
+ v1.15
+
+ true
+
+
+ manager-plugins-flink-v1.15
+ base
+
+
+
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index 8e78f08a1c1..af95611679b 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -269,6 +269,12 @@
${project.version}
test
+
+ org.apache.inlong
+ sort-connector-hive-v1.15
+ ${project.version}
+ test
+
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml
index d1e459e088b..80253865b89 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml
@@ -52,9 +52,11 @@
2.7.6
7.2.2.jre8
0.12.3
+ 3.1.3
0.9.3
2.7.1
2.3.0
+ 2.12
@@ -143,8 +145,18 @@
mssql-jdbc
${sqlserver.jdbc.version}
+
+ org.apache.hive
+ hive-exec
+ ${hive.version}
+
+
+ org.apache.flink
+ flink-table-api-scala-bridge_${flink.scala.binary.version}
+ ${flink.version}
+
org.apache.flink
flink-table-api-java-bridge
@@ -152,17 +164,17 @@
org.apache.flink
- flink-table-runtime
+ flink-table-api-java
${flink.version}
org.apache.flink
- flink-clients
+ flink-table-api-scala_${flink.scala.binary.version}
${flink.version}
org.apache.flink
- flink-table-planner_${flink.scala.binary.version}
+ flink-table-runtime_${flink.scala.binary.version}
${flink.version}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/pom.xml
new file mode 100644
index 00000000000..7b6a7323a8d
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/pom.xml
@@ -0,0 +1,191 @@
+
+
+
+ 4.0.0
+
+ org.apache.inlong
+ sort-connectors-v1.15
+ 1.10.0-SNAPSHOT
+
+
+ sort-connector-hive-v1.15
+ jar
+ Apache InLong - Sort-connector-hive
+
+
+ ${project.parent.parent.parent.parent.parent.basedir}
+ 1.4.3
+ 0.8
+
+
+
+
+ org.apache.inlong
+ sort-connector-base
+ ${project.version}
+
+
+
+ org.apache.inlong
+ audit-sdk
+ ${project.version}
+ compile
+
+
+ org.slf4j
+ slf4j-api
+
+
+ commons-lang
+ commons-lang
+
+
+
+
+
+
+ org.apache.flink
+ flink-table-api-scala-bridge_${flink.scala.binary.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-api-java-bridge
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-api-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-api-scala_${flink.scala.binary.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-runtime
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-streaming-scala_${flink.scala.binary.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-connector-hive_${flink.scala.binary.version}
+ ${flink.version}
+
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ org.apache.hadoop
+ hadoop-yarn-common
+
+
+ com.google.inject.extensions
+ guice-servlet
+
+
+ io.netty
+ netty
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+ org.apache.hadoop
+ hadoop-annotations
+
+
+ org.slf4j
+ slf4j-api
+
+
+ commons-lang
+ commons-lang
+
+
+
+
+
+
+
+ hive3
+
+ true
+
+
+
+ org.apache.hive
+ hive-exec
+ ${hive3x.version}
+ provided
+
+
+
+
+ hive2
+
+
+ org.apache.hive
+ hive-exec
+ ${hive2x.version}
+
+
+
+
+ org.apache.orc
+ orc-core
+ ${orc.core.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ io.airlift
+ aircompressor
+ ${aircompressor.version}
+
+
+
+
+
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveDynamicTableFactory.java
new file mode 100644
index 00000000000..759b6e2a547
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveDynamicTableFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.connectors.hive.HiveTableSink;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.DEFAULT_DATABASE;
+import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HADOOP_CONF_DIR;
+import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_CONF_DIR;
+import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_VERSION;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+
+public class HiveDynamicTableFactory implements DynamicTableSinkFactory {
+
+ private static final HiveConf hiveConf = new HiveConf();
+
+ @Override
+ public String factoryIdentifier() {
+ return HiveCatalogFactoryOptions.IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ final Set> options = new HashSet<>();
+ options.add(HIVE_VERSION);
+ options.add(DEFAULT_DATABASE);
+ options.add(HIVE_CONF_DIR);
+ options.add(HADOOP_CONF_DIR);
+ return options;
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ final boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions());
+ Map options = context.getCatalogTable().getOptions();
+ if (isHiveTable) {
+ updateHiveConf(options);
+ Integer configuredParallelism = helper.getOptions().get(SINK_PARALLELISM);
+
+ return new HiveTableSink(
+ context.getConfiguration(),
+ new JobConf(hiveConf),
+ context.getObjectIdentifier(),
+ context.getCatalogTable(),
+ configuredParallelism);
+ } else {
+ return FactoryUtil.createTableSink(
+ null, // we already in the factory of catalog
+ context.getObjectIdentifier(),
+ context.getCatalogTable(),
+ context.getConfiguration(),
+ context.getClassLoader(),
+ context.isTemporary());
+ }
+ }
+
+ private void updateHiveConf(Map properties) {
+ for (Map.Entry entry : properties.entrySet()) {
+ hiveConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public static HiveConf getHiveConf() {
+ return hiveConf;
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000000..a64e3c49fdc
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.hive.HiveDynamicTableFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
index 74c17313ff0..80ae4f42d03 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
@@ -37,6 +37,7 @@
sqlserver-cdc
mysql-cdc
iceberg
+ hive
pulsar
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-flink-dependencies/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-flink-dependencies/pom.xml
index d64406acb12..e52cf4c192e 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-flink-dependencies/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-flink-dependencies/pom.xml
@@ -31,49 +31,47 @@
${project.parent.parent.parent.parent.basedir}
+ 3.1.3
org.apache.flink
- flink-table-planner_${flink.scala.binary.version}
-
-
- org.apache.flink
- flink-table-api-java-bridge
+ flink-table-api-scala-bridge_${flink.scala.binary.version}
${flink.version}
org.apache.flink
- flink-table-runtime
+ flink-table-api-java-bridge
${flink.version}
org.apache.flink
- flink-streaming-java
+ flink-table-api-java
${flink.version}
org.apache.flink
- flink-streaming-scala_${flink.scala.binary.version}
+ flink-table-api-scala_${flink.scala.binary.version}
${flink.version}
org.apache.flink
- flink-clients
+ flink-table-runtime
${flink.version}
org.apache.flink
- flink-table-common
+ flink-streaming-java
${flink.version}
org.apache.flink
- flink-connector-base
+ flink-streaming-scala_${flink.scala.binary.version}
${flink.version}
+