Skip to content

Commit

Permalink
refractor lakesoul sink state
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
  • Loading branch information
zenghua committed Mar 14, 2024
1 parent 50344cd commit 308fead
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public LakeSoulWriterBucketState(

public LakeSoulWriterBucketState(
TableSchemaIdentity identity,
Path bucketPath,
HashMap<String, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverableMap
) {
this.identity = identity;
Expand All @@ -55,7 +56,7 @@ public LakeSoulWriterBucketState(
} else {
this.bucketId = "";
}
this.bucketPath = null;
this.bucketPath = bucketPath;

this.pendingFileRecoverableMap = pendingFileRecoverableMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public LakeSoulWriterBucketState deserialize(int version, byte[] serialized) thr
private void serialize(LakeSoulWriterBucketState state, DataOutputView dataOutputView)
throws IOException {
// dataOutputView.writeUTF(state.getBucketId());
// dataOutputView.writeUTF(state.getBucketPath().toString());
dataOutputView.writeUTF(state.getBucketPath().toString());

SimpleVersionedSerialization.writeVersionAndSerialize(
tableSchemaIdentitySerializer, state.getIdentity(), dataOutputView);
Expand Down Expand Up @@ -102,7 +102,7 @@ private LakeSoulWriterBucketState internalDeserialize(
throws IOException {

// String bucketId = dataInputView.readUTF();
// String bucketPathStr = dataInputView.readUTF();
String bucketPathStr = dataInputView.readUTF();

TableSchemaIdentity identity = SimpleVersionedSerialization.readVersionAndDeSerialize(
tableSchemaIdentitySerializer, dataInputView);
Expand All @@ -119,9 +119,10 @@ private LakeSoulWriterBucketState internalDeserialize(
}
pendingFileRecoverableMap.put(bucketId, pendingFileRecoverableList);
}

return new LakeSoulWriterBucketState(
identity,
new Path(bucketPathStr),
pendingFileRecoverableMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ LakeSoulWriterBucketState snapshotState() throws IOException {
// getBucketId(),
// bucketPath,
// tmpPending);
return new LakeSoulWriterBucketState(tableId, new HashMap<>(pendingFilesMap));
return new LakeSoulWriterBucketState(tableId, bucketPath, new HashMap<>(pendingFilesMap));
}

void onProcessingTime(long timestamp) throws IOException {
Expand Down

0 comments on commit 308fead

Please sign in to comment.