From 5cd6a18e5d6bf24c1074f25de1081b03ab7915eb Mon Sep 17 00:00:00 2001 From: huasiy Date: Sun, 8 Oct 2023 17:25:37 +0800 Subject: [PATCH] add hive connector for flink1.15 --- .../main/assemblies/sort-connectors-v1.15.xml | 8 + .../manager-plugins-flink-v1.15/pom.xml | 5 + inlong-manager/manager-plugins/pom.xml | 33 ++- inlong-sort/sort-core/pom.xml | 6 + .../sort-flink/sort-flink-v1.15/pom.xml | 18 +- .../sort-connectors/hive/pom.xml | 191 ++++++++++++++++++ .../sort/hive/HiveDynamicTableFactory.java | 100 +++++++++ .../org.apache.flink.table.factories.Factory | 16 ++ .../sort-flink-v1.15/sort-connectors/pom.xml | 1 + .../sort-flink-dependencies/pom.xml | 20 +- 10 files changed, 378 insertions(+), 20 deletions(-) create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/pom.xml create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveDynamicTableFactory.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml index a91ea91e24d..e3b9f956927 100644 --- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml +++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml @@ -67,5 +67,13 @@ 0644 + + ../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/target + inlong-sort/connectors + + sort-connector-hive-v1.15-${project.version}.jar + + 0644 + diff --git a/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml index 0d7b97df0b5..86d55b120d8 100644 --- a/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml +++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml @@ -64,6 +64,11 @@ flink-shaded-jackson 2.12.4-15.0 + + org.apache.flink + flink-clients + ${flink.version} + org.projectlombok diff --git a/inlong-manager/manager-plugins/pom.xml b/inlong-manager/manager-plugins/pom.xml index 535473efe4b..e0d8a956777 100644 --- a/inlong-manager/manager-plugins/pom.xml +++ b/inlong-manager/manager-plugins/pom.xml @@ -28,14 +28,35 @@ pom Apache InLong - Manager Plugins - - manager-plugins-flink-v1.13 - manager-plugins-flink-v1.15 - base - - ${project.parent.parent.basedir} + + + flink-all-version + + manager-plugins-flink-v1.13 + manager-plugins-flink-v1.15 + base + + + + v1.13 + + manager-plugins-flink-v1.13 + base + + + + v1.15 + + true + + + manager-plugins-flink-v1.15 + base + + + diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml index 8e78f08a1c1..af95611679b 100644 --- a/inlong-sort/sort-core/pom.xml +++ b/inlong-sort/sort-core/pom.xml @@ -269,6 +269,12 @@ ${project.version} test + + org.apache.inlong + sort-connector-hive-v1.15 + ${project.version} + test + diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml index d1e459e088b..80253865b89 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml @@ -52,9 +52,11 @@ 2.7.6 7.2.2.jre8 0.12.3 + 3.1.3 0.9.3 2.7.1 2.3.0 + 2.12 @@ -143,8 +145,18 @@ mssql-jdbc ${sqlserver.jdbc.version} + + org.apache.hive + hive-exec + ${hive.version} + + + org.apache.flink + flink-table-api-scala-bridge_${flink.scala.binary.version} + ${flink.version} + org.apache.flink flink-table-api-java-bridge @@ -152,17 +164,17 @@ org.apache.flink - flink-table-runtime + flink-table-api-java ${flink.version} org.apache.flink - flink-clients + flink-table-api-scala_${flink.scala.binary.version} ${flink.version} org.apache.flink - flink-table-planner_${flink.scala.binary.version} + flink-table-runtime_${flink.scala.binary.version} ${flink.version} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/pom.xml new file mode 100644 index 00000000000..7b6a7323a8d --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/pom.xml @@ -0,0 +1,191 @@ + + + + 4.0.0 + + org.apache.inlong + sort-connectors-v1.15 + 1.10.0-SNAPSHOT + + + sort-connector-hive-v1.15 + jar + Apache InLong - Sort-connector-hive + + + ${project.parent.parent.parent.parent.parent.basedir} + 1.4.3 + 0.8 + + + + + org.apache.inlong + sort-connector-base + ${project.version} + + + + org.apache.inlong + audit-sdk + ${project.version} + compile + + + org.slf4j + slf4j-api + + + commons-lang + commons-lang + + + + + + + org.apache.flink + flink-table-api-scala-bridge_${flink.scala.binary.version} + ${flink.version} + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + + + org.apache.flink + flink-table-api-java + ${flink.version} + + + org.apache.flink + flink-table-api-scala_${flink.scala.binary.version} + ${flink.version} + + + org.apache.flink + flink-table-runtime + ${flink.version} + + + org.apache.flink + flink-streaming-java + ${flink.version} + + + org.apache.flink + flink-streaming-scala_${flink.scala.binary.version} + ${flink.version} + + + org.apache.flink + flink-connector-hive_${flink.scala.binary.version} + ${flink.version} + + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.hadoop + hadoop-yarn-common + + + com.google.inject.extensions + guice-servlet + + + io.netty + netty + + + com.google.protobuf + protobuf-java + + + org.apache.hadoop + hadoop-annotations + + + org.slf4j + slf4j-api + + + commons-lang + commons-lang + + + + + + + + hive3 + + true + + + + org.apache.hive + hive-exec + ${hive3x.version} + provided + + + + + hive2 + + + org.apache.hive + hive-exec + ${hive2x.version} + + + + + org.apache.orc + orc-core + ${orc.core.version} + + + org.slf4j + slf4j-api + + + + + io.airlift + aircompressor + ${aircompressor.version} + + + + + diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveDynamicTableFactory.java new file mode 100644 index 00000000000..759b6e2a547 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveDynamicTableFactory.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hive; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.connectors.hive.HiveTableSink; +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.mapred.JobConf; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.DEFAULT_DATABASE; +import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HADOOP_CONF_DIR; +import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_CONF_DIR; +import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_VERSION; +import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; + +public class HiveDynamicTableFactory implements DynamicTableSinkFactory { + + private static final HiveConf hiveConf = new HiveConf(); + + @Override + public String factoryIdentifier() { + return HiveCatalogFactoryOptions.IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + final Set> options = new HashSet<>(); + options.add(HIVE_VERSION); + options.add(DEFAULT_DATABASE); + options.add(HIVE_CONF_DIR); + options.add(HADOOP_CONF_DIR); + return options; + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + final boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions()); + Map options = context.getCatalogTable().getOptions(); + if (isHiveTable) { + updateHiveConf(options); + Integer configuredParallelism = helper.getOptions().get(SINK_PARALLELISM); + + return new HiveTableSink( + context.getConfiguration(), + new JobConf(hiveConf), + context.getObjectIdentifier(), + context.getCatalogTable(), + configuredParallelism); + } else { + return FactoryUtil.createTableSink( + null, // we already in the factory of catalog + context.getObjectIdentifier(), + context.getCatalogTable(), + context.getConfiguration(), + context.getClassLoader(), + context.isTemporary()); + } + } + + private void updateHiveConf(Map properties) { + for (Map.Entry entry : properties.entrySet()) { + hiveConf.set(entry.getKey(), entry.getValue()); + } + } + + public static HiveConf getHiveConf() { + return hiveConf; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 00000000000..a64e3c49fdc --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.inlong.sort.hive.HiveDynamicTableFactory \ No newline at end of file diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml index bf102894cbd..37842ea51f8 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml @@ -37,6 +37,7 @@ sqlserver-cdc mysql-cdc iceberg + hive diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-flink-dependencies/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-flink-dependencies/pom.xml index d64406acb12..e52cf4c192e 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-flink-dependencies/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-flink-dependencies/pom.xml @@ -31,49 +31,47 @@ ${project.parent.parent.parent.parent.basedir} + 3.1.3 org.apache.flink - flink-table-planner_${flink.scala.binary.version} - - - org.apache.flink - flink-table-api-java-bridge + flink-table-api-scala-bridge_${flink.scala.binary.version} ${flink.version} org.apache.flink - flink-table-runtime + flink-table-api-java-bridge ${flink.version} org.apache.flink - flink-streaming-java + flink-table-api-java ${flink.version} org.apache.flink - flink-streaming-scala_${flink.scala.binary.version} + flink-table-api-scala_${flink.scala.binary.version} ${flink.version} org.apache.flink - flink-clients + flink-table-runtime ${flink.version} org.apache.flink - flink-table-common + flink-streaming-java ${flink.version} org.apache.flink - flink-connector-base + flink-streaming-scala_${flink.scala.binary.version} ${flink.version} +