From 366beb85ada4436f88b78694233471d8335ea074 Mon Sep 17 00:00:00 2001 From: Xu Chen Date: Wed, 20 Nov 2024 18:47:49 +0800 Subject: [PATCH] [Flink] Fix potential dead loop in source enumerator (#560) * fix potential dead loop in source enumerator Signed-off-by: chenxu * fix spark/flink deps Signed-off-by: chenxu * add synchronized block Signed-off-by: chenxu * fix commons-io conflict Signed-off-by: chenxu * fix commons-csv version Signed-off-by: chenxu --------- Signed-off-by: chenxu Co-authored-by: chenxu --- lakesoul-common/pom.xml | 9 ++++ lakesoul-flink/pom.xml | 12 +++++ ...oulAllPartitionDynamicSplitEnumerator.java | 48 +++++++++++-------- lakesoul-spark/pom.xml | 8 ++++ native-io/lakesoul-io-java/pom.xml | 8 ++++ 5 files changed, 65 insertions(+), 20 deletions(-) diff --git a/lakesoul-common/pom.xml b/lakesoul-common/pom.xml index 93515dc8a..f2980a570 100644 --- a/lakesoul-common/pom.xml +++ b/lakesoul-common/pom.xml @@ -309,8 +309,17 @@ SPDX-License-Identifier: Apache-2.0 org.slf4j slf4j-api + + org.casbin + jcasbin + + + org.casbin + jcasbin + 1.73.0 + org.aspectj diff --git a/lakesoul-flink/pom.xml b/lakesoul-flink/pom.xml index bee08c8fe..2a4bcd780 100644 --- a/lakesoul-flink/pom.xml +++ b/lakesoul-flink/pom.xml @@ -41,6 +41,18 @@ SPDX-License-Identifier: Apache-2.0 org.slf4j slf4j-api + + com.fasterxml.jackson.core + * + + + commons-io + commons-io + + + commons-codec + commons-codec + diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java index ce8d7ffbd..e21a019c7 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java @@ -90,20 +90,23 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname return; } int tasksSize = context.registeredReaders().size(); - Optional nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize); - if (nextSplit.isPresent()) { - context.assignSplit(nextSplit.get(), subtaskId); - taskIdsAwaitingSplit.remove(subtaskId); - } else { - taskIdsAwaitingSplit.add(subtaskId); + synchronized (this) { + Optional nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize); + if (nextSplit.isPresent()) { + context.assignSplit(nextSplit.get(), subtaskId); + taskIdsAwaitingSplit.remove(subtaskId); + } else { + taskIdsAwaitingSplit.add(subtaskId); + } } - } @Override public void addSplitsBack(List splits, int subtaskId) { LOG.info("Add split back: {}", splits); - splitAssigner.addSplits(splits); + synchronized (this) { + splitAssigner.addSplits(splits); + } } @Override @@ -112,11 +115,13 @@ public void addReader(int subtaskId) { @Override public LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception { - LakeSoulPendingSplits pendingSplits = - new LakeSoulPendingSplits(splitAssigner.remainingSplits(), this.nextStartTime, this.tableId, "", - this.discoveryInterval, this.hashBucketNum); - LOG.info("LakeSoulAllPartitionDynamicSplitEnumerator snapshotState {}", pendingSplits); - return pendingSplits; + synchronized (this) { + LakeSoulPendingSplits pendingSplits = + new LakeSoulPendingSplits(splitAssigner.remainingSplits(), this.nextStartTime, this.tableId, "", + this.discoveryInterval, this.hashBucketNum); + LOG.info("LakeSoulAllPartitionDynamicSplitEnumerator snapshotState {}", pendingSplits); + return pendingSplits; + } } @Override @@ -131,12 +136,16 @@ private void processDiscoveredSplits(Collection splits, } LOG.info("Process discovered splits {}", splits); int tasksSize = context.registeredReaders().size(); - this.splitAssigner.addSplits(splits); - for (Integer item : taskIdsAwaitingSplit) { - Optional al = this.splitAssigner.getNext(item, tasksSize); - if (al.isPresent()) { - context.assignSplit(al.get(), item); - taskIdsAwaitingSplit.remove(item); + synchronized (this) { + this.splitAssigner.addSplits(splits); + Iterator iter = taskIdsAwaitingSplit.iterator(); + while (iter.hasNext()) { + int taskId = iter.next(); + Optional al = this.splitAssigner.getNext(taskId, tasksSize); + if (al.isPresent()) { + context.assignSplit(al.get(), taskId); + iter.remove(); + } } } } @@ -147,7 +156,6 @@ public Collection enumerateSplits() { List filteredPartition = SubstraitUtil.applyPartitionFilters(allPartitionInfo, partitionArrowSchema, partitionFilters); LOG.info("filteredPartition={}, filter={}", filteredPartition, partitionFilters); - ArrayList splits = new ArrayList<>(16); for (PartitionInfo partitionInfo : filteredPartition) { String partitionDesc = partitionInfo.getPartitionDesc(); diff --git a/lakesoul-spark/pom.xml b/lakesoul-spark/pom.xml index ff20be2c8..241300a67 100644 --- a/lakesoul-spark/pom.xml +++ b/lakesoul-spark/pom.xml @@ -63,6 +63,14 @@ SPDX-License-Identifier: Apache-2.0 com.fasterxml.jackson.core * + + commons-io + commons-io + + + commons-codec + commons-codec + diff --git a/native-io/lakesoul-io-java/pom.xml b/native-io/lakesoul-io-java/pom.xml index 5ff95d8d6..a5c1376b3 100644 --- a/native-io/lakesoul-io-java/pom.xml +++ b/native-io/lakesoul-io-java/pom.xml @@ -144,6 +144,14 @@ SPDX-License-Identifier: Apache-2.0 com.fasterxml.jackson.core * + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + ${substrait.version} compile