Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink] upgrade flink version to 1.20 #567

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 94 additions & 84 deletions lakesoul-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,90 +117,6 @@ SPDX-License-Identifier: Apache-2.0
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>shaded</shadedClassifierName>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<includes>
<include>**</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/versions/**</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>com.zaxxer.hikari</pattern>
<shadedPattern>com.lakesoul.shaded.com.zaxxer.hikari</shadedPattern>
</relocation>
<relocation>
<pattern>org.postgresql</pattern>
<shadedPattern>com.lakesoul.shaded.org.postgresql</shadedPattern>
</relocation>
<relocation>
<pattern>com.alibaba.fastjson</pattern>
<shadedPattern>com.lakesoul.shaded.com.alibaba.fastjson</shadedPattern>
</relocation>
<relocation>
<pattern>org.ow2.asm</pattern>
<shadedPattern>com.lakesoul.shaded.org.ow2.asm</shadedPattern>
</relocation>
<relocation>
<pattern>org.objectweb.asm</pattern>
<shadedPattern>com.lakesoul.shaded.org.objectweb.asm</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.gson</pattern>
<shadedPattern>com.lakesoul.shaded.com.google.gson</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>com.lakesoul.shaded.com.fasterxml.jackson</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>com.lakesoul.shaded.org.apache.commons</shadedPattern>
</relocation>
<relocation>
<pattern>dev.failsafe</pattern>
<shadedPattern>com.lakesoul.shaded.dev.failsafe</shadedPattern>
</relocation>
<relocation>
<pattern>org.aspectj</pattern>
<shadedPattern>com.lakesoul.shaded.org.aspectj</shadedPattern>
</relocation>
<relocation>
<pattern>org.checkerframework</pattern>
<shadedPattern>com.lakesoul.shaded.org.checkerframework</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>com.lakesoul.shaded.com.google.protobuf</shadedPattern>
</relocation>
</relocations>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>flatten-maven-plugin</artifactId>
Expand Down Expand Up @@ -504,5 +420,99 @@ SPDX-License-Identifier: Apache-2.0
</resources>
</build>
</profile>
<profile>
<id>release-shaded</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>shaded</shadedClassifierName>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<includes>
<include>**</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/versions/**</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>com.zaxxer.hikari</pattern>
<shadedPattern>com.lakesoul.shaded.com.zaxxer.hikari</shadedPattern>
</relocation>
<relocation>
<pattern>org.postgresql</pattern>
<shadedPattern>com.lakesoul.shaded.org.postgresql</shadedPattern>
</relocation>
<relocation>
<pattern>com.alibaba.fastjson</pattern>
<shadedPattern>com.lakesoul.shaded.com.alibaba.fastjson</shadedPattern>
</relocation>
<relocation>
<pattern>org.ow2.asm</pattern>
<shadedPattern>com.lakesoul.shaded.org.ow2.asm</shadedPattern>
</relocation>
<relocation>
<pattern>org.objectweb.asm</pattern>
<shadedPattern>com.lakesoul.shaded.org.objectweb.asm</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.gson</pattern>
<shadedPattern>com.lakesoul.shaded.com.google.gson</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>com.lakesoul.shaded.com.fasterxml.jackson</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>com.lakesoul.shaded.org.apache.commons</shadedPattern>
</relocation>
<relocation>
<pattern>dev.failsafe</pattern>
<shadedPattern>com.lakesoul.shaded.dev.failsafe</shadedPattern>
</relocation>
<relocation>
<pattern>org.aspectj</pattern>
<shadedPattern>com.lakesoul.shaded.org.aspectj</shadedPattern>
</relocation>
<relocation>
<pattern>org.checkerframework</pattern>
<shadedPattern>com.lakesoul.shaded.org.checkerframework</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>com.lakesoul.shaded.com.google.protobuf</shadedPattern>
</relocation>
</relocations>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
6 changes: 3 additions & 3 deletions lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ SPDX-License-Identifier: Apache-2.0
<name>LakeSoul Flink</name>

<artifactId>lakesoul-flink</artifactId>
<version>1.17-${revision}</version>
<version>1.20-${revision}</version>
<properties>
<flink.version>1.17.1</flink.version>
<flink.version>1.20.0</flink.version>
<log4j.version>2.17.2</log4j.version>
<cdc.version>3.0.0</cdc.version>
</properties>
Expand Down Expand Up @@ -92,7 +92,7 @@ SPDX-License-Identifier: Apache-2.0
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<version>3.4.0-1.20</version>
<scope>${local.scope}</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.apache.flink.connector.file.table.PartitionFetcher;
import org.apache.flink.lakesoul.tool.FlinkUtil;
import org.apache.flink.lakesoul.types.TableId;
import org.apache.flink.shaded.guava30.com.google.common.base.Splitter;
import org.apache.flink.shaded.guava31.com.google.common.base.Splitter;
import org.apache.flink.table.utils.PartitionPathUtils;

import java.util.*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.core.fs.Path;
import org.apache.flink.lakesoul.tool.FlinkUtil;
import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
import org.apache.flink.shaded.guava31.com.google.common.collect.Maps;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
import org.apache.flink.table.runtime.arrow.ArrowUtils;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public final class ArrowReader {
public ArrowReader(ColumnVector[] columnVectors) {
this.columnVectors = Preconditions.checkNotNull(columnVectors);
this.reuseRow = new ColumnarRowData();
reuseRow.setVectorizedColumnBatch(new VectorizedColumnBatch(columnVectors));
this.reuseRow.setVectorizedColumnBatch(new VectorizedColumnBatch(columnVectors));
}

/** Gets the column vectors. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,11 @@

package org.apache.flink.table.runtime.arrow;

import org.apache.arrow.flatbuf.MessageHeader;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.MessageMetadataResult;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.shaded.guava30.com.google.common.collect.LinkedHashMultiset;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
Expand All @@ -61,6 +42,48 @@
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.guava31.com.google.common.collect.LinkedHashMultiset;

import org.apache.arrow.flatbuf.MessageHeader;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.NullVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeMilliVector;
import org.apache.arrow.vector.TimeNanoVector;
import org.apache.arrow.vector.TimeSecVector;
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.MessageMetadataResult;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -78,9 +101,7 @@

import static org.apache.arrow.vector.types.TimeUnit.*;

/**
* Utilities for Arrow.
*/
/** Utilities for Arrow. */
@Internal
public final class ArrowUtils {

Expand Down Expand Up @@ -184,9 +205,7 @@ public static Field toArrowField(String fieldName, LogicalType logicalType) {
return new Field(fieldName, fieldType, children);
}

/**
* Creates an {@link ArrowWriter} for the specified {@link VectorSchemaRoot}.
*/
/** Creates an {@link ArrowWriter} for the specified {@link VectorSchemaRoot}. */
public static ArrowWriter<RowData> createRowDataArrowWriter(
VectorSchemaRoot root, RowType rowType) {
ArrowFieldWriter<RowData>[] fieldWriters =
Expand Down Expand Up @@ -354,9 +373,7 @@ private static ArrowFieldWriter<ArrayData> createArrowFieldWriterForArray(
}
}

/**
* Creates an {@link ArrowReader} for the specified {@link VectorSchemaRoot}.
*/
/** Creates an {@link ArrowReader} for the specified {@link VectorSchemaRoot}. */
public static ArrowReader createArrowReader(VectorSchemaRoot root, RowType rowType) {
List<ColumnVector> columnVectors = new ArrayList<>();
List<FieldVector> fieldVectors = root.getFieldVectors();
Expand Down Expand Up @@ -485,9 +502,7 @@ private static byte[] readNextBatch(ReadableByteChannel channel) throws IOExcept
}
}

/**
* Fills a buffer with data read from the channel.
*/
/** Fills a buffer with data read from the channel. */
private static void readFully(ReadableByteChannel channel, ByteBuffer dst) throws IOException {
int expected = dst.remaining();
while (dst.hasRemaining()) {
Expand All @@ -498,9 +513,7 @@ private static void readFully(ReadableByteChannel channel, ByteBuffer dst) throw
}
}

/**
* Convert Flink table to Pandas DataFrame.
*/
/** Convert Flink table to Pandas DataFrame. */
public static CustomIterator<byte[]> collectAsPandasDataFrame(
Table table, int maxArrowBatchSize) throws Exception {
checkArrowUsable();
Expand Down
Loading
Loading