Skip to content

Commit

Permalink
[Flink] update fury version to 0.4 (#368)
Browse files Browse the repository at this point in the history
* update fury to 0.4.0

Signed-off-by: chenxu <[email protected]>

* fix test cases

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: chenxu <[email protected]>
Co-authored-by: chenxu <[email protected]>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Dec 7, 2023
1 parent 0e8cdea commit 1f72a09
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flink-cdc-hdfs-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
cp ./lakesoul-spark/target/$SPARK_TEST_JAR_NAME ./script/benchmark/work-dir
- uses: beyondstorage/setup-hdfs@master
with:
hdfs-version: '3.3.2'
hdfs-version: '3.3.6'
- name: Modify HDFS Host
run: |
sed -i 's/localhost/172.17.0.1/g' $HADOOP_HOME/etc/hadoop/core-site.xml
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/maven-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: beyondstorage/setup-hdfs@master
with:
hdfs-version: '3.3.2'
hdfs-version: '3.3.6'
- name: Modify HDFS User Group Mapping
run: |
sed -i '/^<\/configuration>/i <property><name>hadoop.user.group.static.mapping.overrides</name><value>admin1=domain1;user1=domain1;user2=domain1;admin2=domain2</value></property>' $HADOOP_HOME/etc/hadoop/core-site.xml
Expand Down Expand Up @@ -398,7 +398,7 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: beyondstorage/setup-hdfs@master
with:
hdfs-version: '3.3.2'
hdfs-version: '3.3.6'
- name: Modify HDFS User Group Mapping
run: |
sed -i '/^<\/configuration>/i <property><name>hadoop.user.group.static.mapping.overrides</name><value>admin1=domain1;user1=domain1;user2=domain1;admin2=domain2</value></property>' $HADOOP_HOME/etc/hadoop/core-site.xml
Expand Down
2 changes: 1 addition & 1 deletion lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ SPDX-License-Identifier: Apache-2.0
<dependency>
<groupId>org.furyio</groupId>
<artifactId>fury-core</artifactId>
<version>0.1.0</version>
<version>0.4.0</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import io.fury.Fury;
import io.fury.Language;
import io.fury.ThreadLocalFury;
import io.fury.config.Language;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryFormat;
Expand Down Expand Up @@ -78,10 +78,13 @@ public BinarySourceRecordSerializer() {
}

@Override public void write(Kryo kryo, Output output, BinarySourceRecord object) {
fury.getCurrentFury().serializeJavaObject(output, object);
fury.execute(f -> {
f.serializeJavaObject(output, object);
return 0;
});
}

@Override public BinarySourceRecord read(Kryo kryo, Input input, Class<BinarySourceRecord> type) {
return fury.getCurrentFury().deserializeJavaObject(input, BinarySourceRecord.class);
return fury.execute(f -> f.deserializeJavaObject(input, BinarySourceRecord.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,8 @@ public void testDeletePkSQL() throws ExecutionException, InterruptedException {
} catch (Throwable e) {
System.out.println("Unsupported DELETE SQL");
}
StreamTableEnvironment streamEnv = TestUtils.createStreamTableEnv(BATCH_TYPE);
String testSelect = "select * from user_info";
TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect);
TableImpl flinkTable = (TableImpl) tEnv.sqlQuery(testSelect);
List<Row> results = CollectionUtil.iteratorToList(flinkTable.execute().collect());
TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[3, Amy, 95]"});
}
Expand Down

0 comments on commit 1f72a09

Please sign in to comment.