diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/JnrLoader.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/JnrLoader.java index 0e824dfd6..ee6905928 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/JnrLoader.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/JnrLoader.java @@ -8,8 +8,9 @@ import java.io.File; import java.io.FileNotFoundException; -import java.io.IOException; import java.io.InputStream; +import java.net.URL; +import java.net.URLConnection; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.util.HashMap; @@ -37,20 +38,27 @@ public synchronized static void tryLoad() { String finalPath = null; - if (JnrLoader.class.getClassLoader().getResource(libName) != null) { - try { - File temp = File.createTempFile(libName + "_", ".tmp", new File(System.getProperty("java.io.tmpdir"))); - temp.deleteOnExit(); - try (final InputStream is = JnrLoader.class.getClassLoader().getResourceAsStream(libName)) { + try { + URL url = JnrLoader.class.getClassLoader().getResource(libName); + if (url == null) { + throw new FileNotFoundException(libName); + } + URLConnection connection = url.openConnection(); + if (connection != null) { + connection.setUseCaches(false); + try (final InputStream is = connection.getInputStream()) { if (is == null) { throw new FileNotFoundException(libName); } + File temp = File.createTempFile(libName + "_", ".tmp", new File(System.getProperty("java.io.tmpdir"))); + temp.deleteOnExit(); Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING); finalPath = temp.getAbsolutePath(); } - } catch (IOException e) { - throw new IllegalStateException("error loading native libraries: " + e); } + } catch (Exception e) { + e.printStackTrace(); + throw new IllegalStateException("error loading native libraries: " + e); } if (finalPath != null) { diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java index 3bfbd7417..883f58c6a 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java @@ -76,13 +76,15 @@ public void start() { @Override public synchronized void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { LOG.info("handleSplitRequest subTaskId {}, oid {}, tid {}", - System.identityHashCode(this), - subtaskId, Thread.currentThread().getId()); + subtaskId, System.identityHashCode(this), Thread.currentThread().getId()); if (!context.registeredReaders().containsKey(subtaskId)) { // reader failed between sending the request and now. skip this request. return; } int tasksSize = context.registeredReaders().size(); + if (tasksSize == 0) { + return; + } Optional nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize); if (nextSplit.isPresent()) { context.assignSplit(nextSplit.get(), subtaskId); @@ -128,14 +130,21 @@ private synchronized void processDiscoveredSplits( LOG.error("Failed to enumerate files", error); return; } - LOG.info("Process discovered splits {}, oid {}, tid {}", splits, - System.identityHashCode(this), - Thread.currentThread().getId()); int tasksSize = context.registeredReaders().size(); + LOG.info("Process discovered splits {}, taskSize {}, oid {}, tid {}", splits, + tasksSize, System.identityHashCode(this), + Thread.currentThread().getId()); + if (tasksSize == 0) { + return; + } this.splitAssigner.addSplits(splits); Iterator iter = taskIdsAwaitingSplit.iterator(); while (iter.hasNext()) { int taskId = iter.next(); + if (!context.registeredReaders().containsKey(taskId)) { + iter.remove(); + continue; + } Optional al = this.splitAssigner.getNext(taskId, tasksSize); if (al.isPresent()) { context.assignSplit(al.get(), taskId); diff --git a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/jnr/JnrLoader.java b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/jnr/JnrLoader.java index b0ceec099..c8032f63a 100644 --- a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/jnr/JnrLoader.java +++ b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/jnr/JnrLoader.java @@ -9,8 +9,9 @@ import java.io.File; import java.io.FileNotFoundException; -import java.io.IOException; import java.io.InputStream; +import java.net.URL; +import java.net.URLConnection; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.util.HashMap; @@ -38,20 +39,27 @@ public synchronized static void tryLoad() { String finalPath = null; - if (JnrLoader.class.getClassLoader().getResource(libName) != null) { - try { - File temp = File.createTempFile(libName + "_", ".tmp", new File(System.getProperty("java.io.tmpdir"))); - temp.deleteOnExit(); - try (final InputStream is = JnrLoader.class.getClassLoader().getResourceAsStream(libName)) { + try { + URL url = JnrLoader.class.getClassLoader().getResource(libName); + if (url == null) { + throw new FileNotFoundException(libName); + } + URLConnection connection = url.openConnection(); + if (connection != null) { + connection.setUseCaches(false); + try (final InputStream is = connection.getInputStream()) { if (is == null) { throw new FileNotFoundException(libName); } + File temp = File.createTempFile(libName + "_", ".tmp", new File(System.getProperty("java.io.tmpdir"))); + temp.deleteOnExit(); Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING); finalPath = temp.getAbsolutePath(); } - } catch (IOException e) { - throw new IllegalStateException("error loading native libraries: " + e); } + } catch (Exception e) { + e.printStackTrace(); + throw new IllegalStateException("error loading native libraries: " + e); } if (finalPath != null) {