Skip to content

Commit

Permalink
[Flink] Fix source split when context reader missing (#564)
Browse files Browse the repository at this point in the history
* fix source split when context reader missing

Signed-off-by: chenxu <[email protected]>

* fix resource file loading

Signed-off-by: chenxu <[email protected]>

* fix npe

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: chenxu <[email protected]>
Co-authored-by: chenxu <[email protected]>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Nov 23, 2024
1 parent e4b33d6 commit cba53a2
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
Expand Down Expand Up @@ -37,20 +38,27 @@ public synchronized static void tryLoad() {

String finalPath = null;

if (JnrLoader.class.getClassLoader().getResource(libName) != null) {
try {
File temp = File.createTempFile(libName + "_", ".tmp", new File(System.getProperty("java.io.tmpdir")));
temp.deleteOnExit();
try (final InputStream is = JnrLoader.class.getClassLoader().getResourceAsStream(libName)) {
try {
URL url = JnrLoader.class.getClassLoader().getResource(libName);
if (url == null) {
throw new FileNotFoundException(libName);
}
URLConnection connection = url.openConnection();
if (connection != null) {
connection.setUseCaches(false);
try (final InputStream is = connection.getInputStream()) {
if (is == null) {
throw new FileNotFoundException(libName);
}
File temp = File.createTempFile(libName + "_", ".tmp", new File(System.getProperty("java.io.tmpdir")));
temp.deleteOnExit();
Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING);
finalPath = temp.getAbsolutePath();
}
} catch (IOException e) {
throw new IllegalStateException("error loading native libraries: " + e);
}
} catch (Exception e) {
e.printStackTrace();
throw new IllegalStateException("error loading native libraries: " + e);
}

if (finalPath != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,15 @@ public void start() {
@Override
public synchronized void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
LOG.info("handleSplitRequest subTaskId {}, oid {}, tid {}",
System.identityHashCode(this),
subtaskId, Thread.currentThread().getId());
subtaskId, System.identityHashCode(this), Thread.currentThread().getId());
if (!context.registeredReaders().containsKey(subtaskId)) {
// reader failed between sending the request and now. skip this request.
return;
}
int tasksSize = context.registeredReaders().size();
if (tasksSize == 0) {
return;
}
Optional<LakeSoulPartitionSplit> nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize);
if (nextSplit.isPresent()) {
context.assignSplit(nextSplit.get(), subtaskId);
Expand Down Expand Up @@ -128,14 +130,21 @@ private synchronized void processDiscoveredSplits(
LOG.error("Failed to enumerate files", error);
return;
}
LOG.info("Process discovered splits {}, oid {}, tid {}", splits,
System.identityHashCode(this),
Thread.currentThread().getId());
int tasksSize = context.registeredReaders().size();
LOG.info("Process discovered splits {}, taskSize {}, oid {}, tid {}", splits,
tasksSize, System.identityHashCode(this),
Thread.currentThread().getId());
if (tasksSize == 0) {
return;
}
this.splitAssigner.addSplits(splits);
Iterator<Integer> iter = taskIdsAwaitingSplit.iterator();
while (iter.hasNext()) {
int taskId = iter.next();
if (!context.registeredReaders().containsKey(taskId)) {
iter.remove();
continue;
}
Optional<LakeSoulPartitionSplit> al = this.splitAssigner.getNext(taskId, tasksSize);
if (al.isPresent()) {
context.assignSplit(al.get(), taskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
Expand Down Expand Up @@ -38,20 +39,27 @@ public synchronized static void tryLoad() {

String finalPath = null;

if (JnrLoader.class.getClassLoader().getResource(libName) != null) {
try {
File temp = File.createTempFile(libName + "_", ".tmp", new File(System.getProperty("java.io.tmpdir")));
temp.deleteOnExit();
try (final InputStream is = JnrLoader.class.getClassLoader().getResourceAsStream(libName)) {
try {
URL url = JnrLoader.class.getClassLoader().getResource(libName);
if (url == null) {
throw new FileNotFoundException(libName);
}
URLConnection connection = url.openConnection();
if (connection != null) {
connection.setUseCaches(false);
try (final InputStream is = connection.getInputStream()) {
if (is == null) {
throw new FileNotFoundException(libName);
}
File temp = File.createTempFile(libName + "_", ".tmp", new File(System.getProperty("java.io.tmpdir")));
temp.deleteOnExit();
Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING);
finalPath = temp.getAbsolutePath();
}
} catch (IOException e) {
throw new IllegalStateException("error loading native libraries: " + e);
}
} catch (Exception e) {
e.printStackTrace();
throw new IllegalStateException("error loading native libraries: " + e);
}

if (finalPath != null) {
Expand Down

0 comments on commit cba53a2

Please sign in to comment.