From 20df501b6515526136e4367c7d585055936eb140 Mon Sep 17 00:00:00 2001 From: chenxu Date: Fri, 22 Nov 2024 19:58:02 +0800 Subject: [PATCH] fix source split when context reader missing Signed-off-by: chenxu --- ...oulAllPartitionDynamicSplitEnumerator.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java index 3bfbd7417..883f58c6a 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java @@ -76,13 +76,15 @@ public void start() { @Override public synchronized void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { LOG.info("handleSplitRequest subTaskId {}, oid {}, tid {}", - System.identityHashCode(this), - subtaskId, Thread.currentThread().getId()); + subtaskId, System.identityHashCode(this), Thread.currentThread().getId()); if (!context.registeredReaders().containsKey(subtaskId)) { // reader failed between sending the request and now. skip this request. return; } int tasksSize = context.registeredReaders().size(); + if (tasksSize == 0) { + return; + } Optional nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize); if (nextSplit.isPresent()) { context.assignSplit(nextSplit.get(), subtaskId); @@ -128,14 +130,21 @@ private synchronized void processDiscoveredSplits( LOG.error("Failed to enumerate files", error); return; } - LOG.info("Process discovered splits {}, oid {}, tid {}", splits, - System.identityHashCode(this), - Thread.currentThread().getId()); int tasksSize = context.registeredReaders().size(); + LOG.info("Process discovered splits {}, taskSize {}, oid {}, tid {}", splits, + tasksSize, System.identityHashCode(this), + Thread.currentThread().getId()); + if (tasksSize == 0) { + return; + } this.splitAssigner.addSplits(splits); Iterator iter = taskIdsAwaitingSplit.iterator(); while (iter.hasNext()) { int taskId = iter.next(); + if (!context.registeredReaders().containsKey(taskId)) { + iter.remove(); + continue; + } Optional al = this.splitAssigner.getNext(taskId, tasksSize); if (al.isPresent()) { context.assignSplit(al.get(), taskId);