From a9d56c3e4209173b1a893d808d60fb2ef58f2ae9 Mon Sep 17 00:00:00 2001 From: zenghua Date: Thu, 14 Mar 2024 14:45:47 +0800 Subject: [PATCH] refractor lakesoul sink state Signed-off-by: zenghua --- .../sink/writer/LakeSoulWriterBucket.java | 79 ++++++++++++------- 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java index 8e53dbd8c..667e44f75 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java @@ -16,10 +16,7 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.UUID; +import java.util.*; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -52,8 +49,11 @@ public class LakeSoulWriterBucket { private long tsMs; - private final List pendingFiles = - new ArrayList<>(); +// private final List pendingFiles = +// new ArrayList<>(); + + private final Map> pendingFilesMap = + new HashMap<>(); private long partCounter; @@ -107,11 +107,16 @@ private LakeSoulWriterBucket( } private void restoreState(LakeSoulWriterBucketState state) throws IOException { - pendingFiles.addAll(state.getPendingFileRecoverableList()); + for (Map.Entry> entry : state.getPendingFileRecoverableMap().entrySet()) { + pendingFilesMap.computeIfAbsent(entry.getKey(), key -> new ArrayList<>()).addAll(entry.getValue()); + } +// pendingFiles.addAll(state.getPendingFileRecoverableList()); } public String getBucketId() { return bucketId; +// System.out.println(pendingFilesMap.keySet()); +// return pendingFilesMap.keySet().toString(); } public Path getBucketPath() { @@ -123,7 +128,7 @@ public long getPartCounter() { } public boolean isActive() { - return inProgressPartWriter != null || !pendingFiles.isEmpty(); + return inProgressPartWriter != null || !pendingFilesMap.isEmpty(); } void merge(final LakeSoulWriterBucket bucket) throws IOException { @@ -131,16 +136,19 @@ void merge(final LakeSoulWriterBucket bucket) throws IOException { checkState(Objects.equals(bucket.bucketPath, bucketPath)); bucket.closePartFile(); - pendingFiles.addAll(bucket.pendingFiles); + for (Map.Entry> entry : bucket.pendingFilesMap.entrySet()) { + pendingFilesMap.computeIfAbsent(entry.getKey(), key -> new ArrayList<>()).addAll(entry.getValue()); + } +// pendingFiles.addAll(bucket.pendingFiles); - LOG.info("Merging buckets for bucket id={}", bucketId); + LOG.info("Merging buckets for bucket id={}", getBucketId()); } void write(RowData element, long currentTime, long tsMs) throws IOException { if (inProgressPartWriter == null || rollingPolicy.shouldRollOnEvent(inProgressPartWriter, element)) { LOG.info( "Opening new part file for bucket id={} at {}.", - bucketId, + getBucketId(), tsMs); inProgressPartWriter = rollPartFile(currentTime); this.tsMs = tsMs; @@ -149,26 +157,35 @@ void write(RowData element, long currentTime, long tsMs) throws IOException { inProgressPartWriter.write(element, currentTime); } - List prepareCommit(boolean flush,String dmlType) throws IOException { + List prepareCommit(boolean flush, String dmlType) throws IOException { // we always close part file and do not keep in-progress file // since the native parquet writer doesn't support resume if (inProgressPartWriter != null) { LOG.info( - "Closing in-progress part file for bucket id={} on checkpoint.", bucketId); + "Closing in-progress part file for bucket id={} on checkpoint.", getBucketId()); closePartFile(); } List committables = new ArrayList<>(); - long time = pendingFiles.isEmpty() ? Long.MIN_VALUE : - ((NativeParquetWriter.NativeWriterPendingFileRecoverable) pendingFiles.get(0)).creationTime; + long time = pendingFilesMap.isEmpty() ? Long.MIN_VALUE : + ((NativeParquetWriter.NativeWriterPendingFileRecoverable) pendingFilesMap.values().stream().findFirst().get().get(0)).creationTime; // this.pendingFiles would be cleared later, we need to make a copy - List tmpPending = new ArrayList<>(pendingFiles); +// List tmpPending = new ArrayList<>(pendingFiles); +// committables.add(new LakeSoulMultiTableSinkCommittable( +// getBucketId(), +// tmpPending, +// time, tableId, tsMs, dmlType)); committables.add(new LakeSoulMultiTableSinkCommittable( - bucketId, - tmpPending, - time, tableId, tsMs, dmlType)); - pendingFiles.clear(); +// getBucketId(), + tableId, + new HashMap<>(pendingFilesMap), + time, + UUID.randomUUID().toString(), + tsMs, + dmlType + )); + pendingFilesMap.clear(); return committables; } @@ -179,12 +196,13 @@ LakeSoulWriterBucketState snapshotState() throws IOException { } // this.pendingFiles would be cleared later, we need to make a copy - List tmpPending = new ArrayList<>(pendingFiles); - return new LakeSoulWriterBucketState( - tableId, - bucketId, - bucketPath, - tmpPending); +// List tmpPending = new ArrayList<>(pendingFiles); +// return new LakeSoulWriterBucketState( +// tableId, +// getBucketId(), +// bucketPath, +// tmpPending); + return new LakeSoulWriterBucketState(tableId, new HashMap<>(pendingFilesMap)); } void onProcessingTime(long timestamp) throws IOException { @@ -194,7 +212,7 @@ void onProcessingTime(long timestamp) throws IOException { "Bucket {} closing in-progress part file for part file id={} due to processing time rolling " + "policy " + "(in-progress file created @ {}, last updated @ {} and current time is {}).", - bucketId, + getBucketId(), uniqueId, inProgressPartWriter.getCreationTime(), inProgressPartWriter.getLastUpdateTime(), @@ -212,9 +230,9 @@ private InProgressFileWriter rollPartFile(long currentTime) thr LOG.info( "Opening new part file \"{}\" for bucket id={}.", partFilePath.getName(), - bucketId); + getBucketId()); - return bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime); + return bucketWriter.openNewInProgressFile(getBucketId(), partFilePath, currentTime); } /** @@ -243,7 +261,8 @@ private void closePartFile() throws IOException { long start = System.currentTimeMillis(); InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = inProgressPartWriter.closeForCommit(); - pendingFiles.add(pendingFileRecoverable); +// pendingFiles.add(pendingFileRecoverable); + pendingFilesMap.computeIfAbsent(bucketId, bucketId -> new ArrayList()).add(pendingFileRecoverable); inProgressPartWriter = null; LOG.info("Closed part file {} for {}ms", pendingFileRecoverable.getPath(), (System.currentTimeMillis() - start));