Skip to content

Commit

Permalink
fix source split when context reader missing
Browse files Browse the repository at this point in the history
Signed-off-by: chenxu <[email protected]>
  • Loading branch information
dmetasoul01 committed Nov 22, 2024
1 parent e4b33d6 commit 20df501
Showing 1 changed file with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,15 @@ public void start() {
@Override
public synchronized void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
LOG.info("handleSplitRequest subTaskId {}, oid {}, tid {}",
System.identityHashCode(this),
subtaskId, Thread.currentThread().getId());
subtaskId, System.identityHashCode(this), Thread.currentThread().getId());
if (!context.registeredReaders().containsKey(subtaskId)) {
// reader failed between sending the request and now. skip this request.
return;
}
int tasksSize = context.registeredReaders().size();
if (tasksSize == 0) {
return;
}
Optional<LakeSoulPartitionSplit> nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize);
if (nextSplit.isPresent()) {
context.assignSplit(nextSplit.get(), subtaskId);
Expand Down Expand Up @@ -128,14 +130,21 @@ private synchronized void processDiscoveredSplits(
LOG.error("Failed to enumerate files", error);
return;
}
LOG.info("Process discovered splits {}, oid {}, tid {}", splits,
System.identityHashCode(this),
Thread.currentThread().getId());
int tasksSize = context.registeredReaders().size();
LOG.info("Process discovered splits {}, taskSize {}, oid {}, tid {}", splits,
tasksSize, System.identityHashCode(this),
Thread.currentThread().getId());
if (tasksSize == 0) {
return;
}
this.splitAssigner.addSplits(splits);
Iterator<Integer> iter = taskIdsAwaitingSplit.iterator();
while (iter.hasNext()) {
int taskId = iter.next();
if (!context.registeredReaders().containsKey(taskId)) {
iter.remove();
continue;
}
Optional<LakeSoulPartitionSplit> al = this.splitAssigner.getNext(taskId, tasksSize);
if (al.isPresent()) {
context.assignSplit(al.get(), taskId);
Expand Down

0 comments on commit 20df501

Please sign in to comment.