Skip to content

Commit

Permalink
refractor lakesoul sink state
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
  • Loading branch information
zenghua committed Mar 14, 2024
1 parent 5d59447 commit a9d56c3
Showing 1 changed file with 49 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.*;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
Expand Down Expand Up @@ -52,8 +49,11 @@ public class LakeSoulWriterBucket {

private long tsMs;

private final List<InProgressFileWriter.PendingFileRecoverable> pendingFiles =
new ArrayList<>();
// private final List<InProgressFileWriter.PendingFileRecoverable> pendingFiles =
// new ArrayList<>();

private final Map<String, List<InProgressFileWriter.PendingFileRecoverable>> pendingFilesMap =
new HashMap<>();

private long partCounter;

Expand Down Expand Up @@ -107,11 +107,16 @@ private LakeSoulWriterBucket(
}

private void restoreState(LakeSoulWriterBucketState state) throws IOException {
pendingFiles.addAll(state.getPendingFileRecoverableList());
for (Map.Entry<String, List<InProgressFileWriter.PendingFileRecoverable>> entry : state.getPendingFileRecoverableMap().entrySet()) {
pendingFilesMap.computeIfAbsent(entry.getKey(), key -> new ArrayList<>()).addAll(entry.getValue());
}
// pendingFiles.addAll(state.getPendingFileRecoverableList());
}

public String getBucketId() {
return bucketId;
// System.out.println(pendingFilesMap.keySet());
// return pendingFilesMap.keySet().toString();
}

public Path getBucketPath() {
Expand All @@ -123,24 +128,27 @@ public long getPartCounter() {
}

public boolean isActive() {
return inProgressPartWriter != null || !pendingFiles.isEmpty();
return inProgressPartWriter != null || !pendingFilesMap.isEmpty();
}

void merge(final LakeSoulWriterBucket bucket) throws IOException {
checkNotNull(bucket);
checkState(Objects.equals(bucket.bucketPath, bucketPath));

bucket.closePartFile();
pendingFiles.addAll(bucket.pendingFiles);
for (Map.Entry<String, List<InProgressFileWriter.PendingFileRecoverable>> entry : bucket.pendingFilesMap.entrySet()) {
pendingFilesMap.computeIfAbsent(entry.getKey(), key -> new ArrayList<>()).addAll(entry.getValue());
}
// pendingFiles.addAll(bucket.pendingFiles);

LOG.info("Merging buckets for bucket id={}", bucketId);
LOG.info("Merging buckets for bucket id={}", getBucketId());
}

void write(RowData element, long currentTime, long tsMs) throws IOException {
if (inProgressPartWriter == null || rollingPolicy.shouldRollOnEvent(inProgressPartWriter, element)) {
LOG.info(
"Opening new part file for bucket id={} at {}.",
bucketId,
getBucketId(),
tsMs);
inProgressPartWriter = rollPartFile(currentTime);
this.tsMs = tsMs;
Expand All @@ -149,26 +157,35 @@ void write(RowData element, long currentTime, long tsMs) throws IOException {
inProgressPartWriter.write(element, currentTime);
}

List<LakeSoulMultiTableSinkCommittable> prepareCommit(boolean flush,String dmlType) throws IOException {
List<LakeSoulMultiTableSinkCommittable> prepareCommit(boolean flush, String dmlType) throws IOException {
// we always close part file and do not keep in-progress file
// since the native parquet writer doesn't support resume
if (inProgressPartWriter != null) {
LOG.info(
"Closing in-progress part file for bucket id={} on checkpoint.", bucketId);
"Closing in-progress part file for bucket id={} on checkpoint.", getBucketId());
closePartFile();
}

List<LakeSoulMultiTableSinkCommittable> committables = new ArrayList<>();
long time = pendingFiles.isEmpty() ? Long.MIN_VALUE :
((NativeParquetWriter.NativeWriterPendingFileRecoverable) pendingFiles.get(0)).creationTime;
long time = pendingFilesMap.isEmpty() ? Long.MIN_VALUE :
((NativeParquetWriter.NativeWriterPendingFileRecoverable) pendingFilesMap.values().stream().findFirst().get().get(0)).creationTime;

// this.pendingFiles would be cleared later, we need to make a copy
List<InProgressFileWriter.PendingFileRecoverable> tmpPending = new ArrayList<>(pendingFiles);
// List<InProgressFileWriter.PendingFileRecoverable> tmpPending = new ArrayList<>(pendingFiles);
// committables.add(new LakeSoulMultiTableSinkCommittable(
// getBucketId(),
// tmpPending,
// time, tableId, tsMs, dmlType));
committables.add(new LakeSoulMultiTableSinkCommittable(
bucketId,
tmpPending,
time, tableId, tsMs, dmlType));
pendingFiles.clear();
// getBucketId(),
tableId,
new HashMap<>(pendingFilesMap),
time,
UUID.randomUUID().toString(),
tsMs,
dmlType
));
pendingFilesMap.clear();

return committables;
}
Expand All @@ -179,12 +196,13 @@ LakeSoulWriterBucketState snapshotState() throws IOException {
}

// this.pendingFiles would be cleared later, we need to make a copy
List<InProgressFileWriter.PendingFileRecoverable> tmpPending = new ArrayList<>(pendingFiles);
return new LakeSoulWriterBucketState(
tableId,
bucketId,
bucketPath,
tmpPending);
// List<InProgressFileWriter.PendingFileRecoverable> tmpPending = new ArrayList<>(pendingFiles);
// return new LakeSoulWriterBucketState(
// tableId,
// getBucketId(),
// bucketPath,
// tmpPending);
return new LakeSoulWriterBucketState(tableId, new HashMap<>(pendingFilesMap));
}

void onProcessingTime(long timestamp) throws IOException {
Expand All @@ -194,7 +212,7 @@ void onProcessingTime(long timestamp) throws IOException {
"Bucket {} closing in-progress part file for part file id={} due to processing time rolling " +
"policy "
+ "(in-progress file created @ {}, last updated @ {} and current time is {}).",
bucketId,
getBucketId(),
uniqueId,
inProgressPartWriter.getCreationTime(),
inProgressPartWriter.getLastUpdateTime(),
Expand All @@ -212,9 +230,9 @@ private InProgressFileWriter<RowData, String> rollPartFile(long currentTime) thr
LOG.info(
"Opening new part file \"{}\" for bucket id={}.",
partFilePath.getName(),
bucketId);
getBucketId());

return bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime);
return bucketWriter.openNewInProgressFile(getBucketId(), partFilePath, currentTime);
}

/**
Expand Down Expand Up @@ -243,7 +261,8 @@ private void closePartFile() throws IOException {
long start = System.currentTimeMillis();
InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable =
inProgressPartWriter.closeForCommit();
pendingFiles.add(pendingFileRecoverable);
// pendingFiles.add(pendingFileRecoverable);
pendingFilesMap.computeIfAbsent(bucketId, bucketId -> new ArrayList()).add(pendingFileRecoverable);
inProgressPartWriter = null;
LOG.info("Closed part file {} for {}ms", pendingFileRecoverable.getPath(),
(System.currentTimeMillis() - start));
Expand Down

0 comments on commit a9d56c3

Please sign in to comment.