From 308fead6f5ca59f114aefa53f65b4330276ddbfa Mon Sep 17 00:00:00 2001 From: zenghua Date: Thu, 14 Mar 2024 16:22:46 +0800 Subject: [PATCH] refractor lakesoul sink state Signed-off-by: zenghua --- .../lakesoul/sink/state/LakeSoulWriterBucketState.java | 3 ++- .../sink/state/LakeSoulWriterBucketStateSerializer.java | 7 ++++--- .../flink/lakesoul/sink/writer/LakeSoulWriterBucket.java | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulWriterBucketState.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulWriterBucketState.java index d72d9584d..2d3a128bf 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulWriterBucketState.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulWriterBucketState.java @@ -46,6 +46,7 @@ public LakeSoulWriterBucketState( public LakeSoulWriterBucketState( TableSchemaIdentity identity, + Path bucketPath, HashMap> pendingFileRecoverableMap ) { this.identity = identity; @@ -55,7 +56,7 @@ public LakeSoulWriterBucketState( } else { this.bucketId = ""; } - this.bucketPath = null; + this.bucketPath = bucketPath; this.pendingFileRecoverableMap = pendingFileRecoverableMap; } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulWriterBucketStateSerializer.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulWriterBucketStateSerializer.java index 6090989c7..8fcbc440a 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulWriterBucketStateSerializer.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulWriterBucketStateSerializer.java @@ -67,7 +67,7 @@ public LakeSoulWriterBucketState deserialize(int version, byte[] serialized) thr private void serialize(LakeSoulWriterBucketState state, DataOutputView dataOutputView) throws IOException { // dataOutputView.writeUTF(state.getBucketId()); -// dataOutputView.writeUTF(state.getBucketPath().toString()); + dataOutputView.writeUTF(state.getBucketPath().toString()); SimpleVersionedSerialization.writeVersionAndSerialize( tableSchemaIdentitySerializer, state.getIdentity(), dataOutputView); @@ -102,7 +102,7 @@ private LakeSoulWriterBucketState internalDeserialize( throws IOException { // String bucketId = dataInputView.readUTF(); -// String bucketPathStr = dataInputView.readUTF(); + String bucketPathStr = dataInputView.readUTF(); TableSchemaIdentity identity = SimpleVersionedSerialization.readVersionAndDeSerialize( tableSchemaIdentitySerializer, dataInputView); @@ -119,9 +119,10 @@ private LakeSoulWriterBucketState internalDeserialize( } pendingFileRecoverableMap.put(bucketId, pendingFileRecoverableList); } - + return new LakeSoulWriterBucketState( identity, + new Path(bucketPathStr), pendingFileRecoverableMap); } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java index e745e617c..7b0909028 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java @@ -206,7 +206,7 @@ LakeSoulWriterBucketState snapshotState() throws IOException { // getBucketId(), // bucketPath, // tmpPending); - return new LakeSoulWriterBucketState(tableId, new HashMap<>(pendingFilesMap)); + return new LakeSoulWriterBucketState(tableId, bucketPath, new HashMap<>(pendingFilesMap)); } void onProcessingTime(long timestamp) throws IOException {