diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java index e21a019c7..af95a24d2 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java @@ -7,7 +7,6 @@ import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil; import com.dmetasoul.lakesoul.meta.DataFileInfo; import com.dmetasoul.lakesoul.meta.DataOperation; -import com.dmetasoul.lakesoul.meta.LakeSoulOptions; import com.dmetasoul.lakesoul.meta.MetaVersion; import com.dmetasoul.lakesoul.meta.entity.PartitionInfo; import com.dmetasoul.lakesoul.meta.entity.TableInfo; @@ -16,7 +15,6 @@ import org.apache.arrow.vector.types.pojo.Schema; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.Path; import org.apache.flink.lakesoul.tool.FlinkUtil; import org.apache.flink.shaded.guava30.com.google.common.collect.Maps; @@ -43,22 +41,13 @@ public class LakeSoulAllPartitionDynamicSplitEnumerator implements SplitEnumerat private final Plan partitionFilters; private final List partitionColumns; private final TableInfo tableInfo; + protected Schema partitionArrowSchema; String tableId; private long startTime; private long nextStartTime; private int hashBucketNum = -1; - protected Schema partitionArrowSchema; - - public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext context, - LakeSoulDynSplitAssigner splitAssigner, - RowType rowType, - long discoveryInterval, - long startTime, - String tableId, - String hashBucketNum, - List partitionColumns, - Plan partitionFilters) { + public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext context, LakeSoulDynSplitAssigner splitAssigner, RowType rowType, long discoveryInterval, long startTime, String tableId, String hashBucketNum, List partitionColumns, Plan partitionFilters) { this.context = context; this.splitAssigner = splitAssigner; this.discoveryInterval = discoveryInterval; @@ -79,34 +68,35 @@ public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize); - if (nextSplit.isPresent()) { - context.assignSplit(nextSplit.get(), subtaskId); - taskIdsAwaitingSplit.remove(subtaskId); - } else { - taskIdsAwaitingSplit.add(subtaskId); - } + Optional nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize); + if (nextSplit.isPresent()) { + context.assignSplit(nextSplit.get(), subtaskId); + taskIdsAwaitingSplit.remove(subtaskId); + } else { + taskIdsAwaitingSplit.add(subtaskId); } } @Override - public void addSplitsBack(List splits, int subtaskId) { - LOG.info("Add split back: {}", splits); - synchronized (this) { - splitAssigner.addSplits(splits); - } + public synchronized void addSplitsBack(List splits, int subtaskId) { + LOG.info("Add split back: {} for subTaskId {}, oid {}, tid {}", + splits, subtaskId, + System.identityHashCode(this), + Thread.currentThread().getId()); + splitAssigner.addSplits(splits); } @Override @@ -114,14 +104,15 @@ public void addReader(int subtaskId) { } @Override - public LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception { - synchronized (this) { - LakeSoulPendingSplits pendingSplits = - new LakeSoulPendingSplits(splitAssigner.remainingSplits(), this.nextStartTime, this.tableId, "", - this.discoveryInterval, this.hashBucketNum); - LOG.info("LakeSoulAllPartitionDynamicSplitEnumerator snapshotState {}", pendingSplits); - return pendingSplits; - } + public synchronized LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception { + LakeSoulPendingSplits pendingSplits = new LakeSoulPendingSplits( + splitAssigner.remainingSplits(), this.nextStartTime, this.tableId, + "", this.discoveryInterval, this.hashBucketNum); + LOG.info("LakeSoulAllPartitionDynamicSplitEnumerator snapshotState chkId {}, splits {}, oid {}, tid {}", + checkpointId, pendingSplits, + System.identityHashCode(this), + Thread.currentThread().getId()); + return pendingSplits; } @Override @@ -129,31 +120,39 @@ public void close() throws IOException { } - private void processDiscoveredSplits(Collection splits, Throwable error) { + private synchronized void processDiscoveredSplits( + Collection splits, Throwable error) { if (error != null) { LOG.error("Failed to enumerate files", error); return; } - LOG.info("Process discovered splits {}", splits); + LOG.info("Process discovered splits {}, oid {}, tid {}", splits, + System.identityHashCode(this), + Thread.currentThread().getId()); int tasksSize = context.registeredReaders().size(); - synchronized (this) { - this.splitAssigner.addSplits(splits); - Iterator iter = taskIdsAwaitingSplit.iterator(); - while (iter.hasNext()) { - int taskId = iter.next(); - Optional al = this.splitAssigner.getNext(taskId, tasksSize); - if (al.isPresent()) { - context.assignSplit(al.get(), taskId); - iter.remove(); - } + this.splitAssigner.addSplits(splits); + Iterator iter = taskIdsAwaitingSplit.iterator(); + while (iter.hasNext()) { + int taskId = iter.next(); + Optional al = this.splitAssigner.getNext(taskId, tasksSize); + if (al.isPresent()) { + context.assignSplit(al.get(), taskId); + iter.remove(); } } + LOG.info("Process discovered splits done {}, oid {}, tid {}", splits, + System.identityHashCode(this), + Thread.currentThread().getId()); } - public Collection enumerateSplits() { + public synchronized Collection enumerateSplits() { + LOG.info("enumerateSplits begin, oid {}, tid {}", + System.identityHashCode(this), + Thread.currentThread().getId()); List allPartitionInfo = MetaVersion.getAllPartitionInfo(tableId); LOG.info("allPartitionInfo={}", allPartitionInfo); - List filteredPartition = SubstraitUtil.applyPartitionFilters(allPartitionInfo, partitionArrowSchema, partitionFilters); + List filteredPartition = SubstraitUtil.applyPartitionFilters( + allPartitionInfo, partitionArrowSchema, partitionFilters); LOG.info("filteredPartition={}, filter={}", filteredPartition, partitionFilters); ArrayList splits = new ArrayList<>(16); @@ -166,24 +165,28 @@ public Collection enumerateSplits() { if (partitionLatestTimestamp.containsKey(partitionDesc)) { Long lastTimestamp = partitionLatestTimestamp.get(partitionDesc); LOG.info("getIncrementalPartitionDataInfo, startTime={}, endTime={}", lastTimestamp, latestTimestamp); - dataFileInfos = - DataOperation.getIncrementalPartitionDataInfo(tableId, partitionDesc, lastTimestamp, latestTimestamp, "incremental"); + dataFileInfos = DataOperation.getIncrementalPartitionDataInfo( + tableId, partitionDesc, lastTimestamp, latestTimestamp, "incremental"); } else { - dataFileInfos = - DataOperation.getIncrementalPartitionDataInfo(tableId, partitionDesc, startTime, latestTimestamp, "incremental"); + dataFileInfos = DataOperation.getIncrementalPartitionDataInfo( + tableId, partitionDesc, startTime, latestTimestamp, "incremental"); } if (dataFileInfos.length > 0) { Map>> splitByRangeAndHashPartition = FlinkUtil.splitDataInfosToRangeAndHashPartition(tableInfo, dataFileInfos); for (Map.Entry>> entry : splitByRangeAndHashPartition.entrySet()) { for (Map.Entry> split : entry.getValue().entrySet()) { - splits.add(new LakeSoulPartitionSplit(String.valueOf(split.hashCode()), split.getValue(), 0, split.getKey(), partitionDesc)); + splits.add(new LakeSoulPartitionSplit(String.valueOf(split.hashCode()), split.getValue(), + 0, split.getKey(), partitionDesc)); } } } partitionLatestTimestamp.put(partitionDesc, latestTimestamp); } - LOG.info("partitionLatestTimestamp={}", partitionLatestTimestamp); + LOG.info("dynamic enumerate done, partitionLatestTimestamp={}, oid {}, tid {}", + partitionLatestTimestamp, + System.identityHashCode(this), + Thread.currentThread().getId()); return splits; } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSplitReader.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSplitReader.java index e76ba3335..ed781f4c8 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSplitReader.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSplitReader.java @@ -66,12 +66,17 @@ public LakeSoulSplitReader(Configuration conf, } @Override - public RecordsWithSplitIds fetch() throws IOException { + public synchronized RecordsWithSplitIds fetch() throws IOException { try { close(); + LakeSoulPartitionSplit split = splits.poll(); + LOG.info("Fetched split {}, oid {}, tid {}", + split, + System.identityHashCode(this), + Thread.currentThread().getId()); lastSplitReader = new LakeSoulOneSplitRecordsReader(this.conf, - Objects.requireNonNull(splits.poll()), + Objects.requireNonNull(split), this.tableRowType, this.projectedRowType, this.projectedRowTypeWithPk, @@ -88,15 +93,17 @@ public RecordsWithSplitIds fetch() throws IOException { } @Override - public void handleSplitsChanges(SplitsChange splitChange) { + public synchronized void handleSplitsChanges(SplitsChange splitChange) { if (!(splitChange instanceof SplitsAddition)) { throw new UnsupportedOperationException( String.format("The SplitChange type of %s is not supported.", splitChange.getClass())); } - LOG.info("Handling split change {}", - splitChange); + LOG.info("Handling split change {}, oid {}, tid {}", + splitChange, + System.identityHashCode(this), + Thread.currentThread().getId()); splits.addAll(splitChange.splits()); } @@ -105,7 +112,7 @@ public void wakeUp() { } @Override - public void close() throws Exception { + public synchronized void close() throws Exception { if (lastSplitReader != null) { lastSplitReader.close(); lastSplitReader = null; diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulStaticSplitEnumerator.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulStaticSplitEnumerator.java index f5d18d019..6cb79b5e5 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulStaticSplitEnumerator.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulStaticSplitEnumerator.java @@ -33,7 +33,7 @@ public void start() { } @Override - public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + public synchronized void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { if (!context.registeredReaders().containsKey(subtaskId)) { // reader failed between sending the request and now. skip this request. return; @@ -51,7 +51,7 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public void addSplitsBack(List splits, int subtaskId) { + public synchronized void addSplitsBack(List splits, int subtaskId) { LOG.info("Add split back: {}", splits); splitAssigner.addSplits(splits); } @@ -61,7 +61,7 @@ public void addReader(int subtaskId) { } @Override - public LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception { + public synchronized LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception { LOG.info("LakeSoulStaticSplitEnumerator snapshotState"); return null; }