Skip to content

Commit

Permalink
[Flink] upgrade to sink v2 api
Browse files Browse the repository at this point in the history
Signed-off-by: chenxu <[email protected]>
  • Loading branch information
dmetasoul01 committed Nov 25, 2024
1 parent dd54ddb commit b57edb1
Show file tree
Hide file tree
Showing 14 changed files with 138 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

package org.apache.flink.lakesoul.sink;

import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
Expand All @@ -21,21 +21,23 @@
import org.apache.flink.lakesoul.sink.writer.AbstractLakeSoulMultiTableSinkWriter;
import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions;
import org.apache.flink.lakesoul.types.TableSchemaIdentity;
import org.apache.flink.table.data.RowData;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.FlinkRuntimeException;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.io.UncheckedIOException;
import java.util.*;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;

public class LakeSoulMultiTablesSink<IN, OUT> implements
Sink<IN, LakeSoulMultiTableSinkCommittable, LakeSoulWriterBucketState,
LakeSoulMultiTableSinkGlobalCommittable> {
StatefulSink<IN, LakeSoulWriterBucketState>,
TwoPhaseCommittingSink<IN, LakeSoulMultiTableSinkCommittable>,
WithPostCommitTopology<IN, LakeSoulMultiTableSinkCommittable> {

private final BucketsBuilder<IN, OUT, ? extends BucketsBuilder<IN, OUT, ?>> bucketsBuilder;

Expand All @@ -60,18 +62,24 @@ public static DefaultMultiTablesArrowFormatBuilder forMultiTablesArrowFormat(Con
}

@Override
public SinkWriter<IN, LakeSoulMultiTableSinkCommittable, LakeSoulWriterBucketState> createWriter(
InitContext context, List<LakeSoulWriterBucketState> states) throws IOException {
public AbstractLakeSoulMultiTableSinkWriter<IN, OUT> createWriter(InitContext context) throws IOException {
int subTaskId = context.getSubtaskId();
AbstractLakeSoulMultiTableSinkWriter<IN, OUT> writer = bucketsBuilder.createWriter(context, subTaskId);
writer.initializeState(states);
return writer;
}

@Override
public Optional<SimpleVersionedSerializer<LakeSoulWriterBucketState>> getWriterStateSerializer() {
public StatefulSinkWriter<IN, LakeSoulWriterBucketState> restoreWriter(InitContext context, Collection<LakeSoulWriterBucketState> recoveredState) throws IOException {
int subTaskId = context.getSubtaskId();
AbstractLakeSoulMultiTableSinkWriter<IN, OUT> writer = bucketsBuilder.createWriter(context, subTaskId);
writer.initializeState(new ArrayList<>(recoveredState));
return writer;
}

@Override
public SimpleVersionedSerializer<LakeSoulWriterBucketState> getWriterStateSerializer() {
try {
return Optional.of(bucketsBuilder.getWriterStateSerializer());
return bucketsBuilder.getWriterStateSerializer();
} catch (IOException e) {
// it's not optimal that we have to do this but creating the serializers for the
// LakeSoulMultiTablesSink requires (among other things) a call to FileSystem.get() which declares
Expand All @@ -83,25 +91,21 @@ public Optional<SimpleVersionedSerializer<LakeSoulWriterBucketState>> getWriterS
// committer must not be null since flink requires it to enable
// StatefulGlobalTwoPhaseCommittingSinkAdapter
@Override
public Optional<Committer<LakeSoulMultiTableSinkCommittable>> createCommitter() throws IOException {
return Optional.of(new Committer<LakeSoulMultiTableSinkCommittable>() {
public Committer<LakeSoulMultiTableSinkCommittable> createCommitter() throws IOException {
return new Committer<LakeSoulMultiTableSinkCommittable>() {
@Override
public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSinkCommittable> committables)
throws IOException, InterruptedException {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis()) + " org.apache.flink.api.connector.sink.Committer.commit: " + committables);
return Collections.emptyList();
public void close() throws Exception {
}

@Override
public void close() throws Exception {
public void commit(Collection<CommitRequest<LakeSoulMultiTableSinkCommittable>> committables) throws IOException, InterruptedException {
}
});
};
}

@Override
public Optional<SimpleVersionedSerializer<LakeSoulMultiTableSinkCommittable>> getCommittableSerializer() {
public SimpleVersionedSerializer<LakeSoulMultiTableSinkCommittable> getCommittableSerializer() {
try {
return Optional.of(bucketsBuilder.getCommittableSerializer());
return bucketsBuilder.getCommittableSerializer();
} catch (IOException e) {
// it's not optimal that we have to do this but creating the serializers for the
// LakeSoulMultiTablesSink requires (among other things) a call to FileSystem.get() which declares
Expand All @@ -110,30 +114,65 @@ public Optional<SimpleVersionedSerializer<LakeSoulMultiTableSinkCommittable>> ge
}
}

@Override
public Optional<GlobalCommitter<LakeSoulMultiTableSinkCommittable, LakeSoulMultiTableSinkGlobalCommittable>> createGlobalCommitter()
throws IOException {
return Optional.ofNullable(bucketsBuilder.createGlobalCommitter());
public BucketsBuilder getBucketsBuilder(){
return this.bucketsBuilder;
}

@Override
public Optional<SimpleVersionedSerializer<LakeSoulMultiTableSinkGlobalCommittable>> getGlobalCommittableSerializer() {
try {
return Optional.of(bucketsBuilder.getGlobalCommittableSerializer());
} catch (IOException e) {
// it's not optimal that we have to do this but creating the serializers for the
// LakeSoulMultiTablesSink requires (among other things) a call to FileSystem.get() which declares
// IOException.
throw new FlinkRuntimeException("Could not create global committable serializer.", e);
}
public void addPostCommitTopology(DataStream<CommittableMessage<LakeSoulMultiTableSinkCommittable>> committables) {
StandardSinkTopologies.addGlobalCommitter(
committables,
GlobalCommitterAdapter::new,
this::getCommittableSerializer);
}

@Override
public Collection<String> getCompatibleStateNames() {
// StreamingFileSink
return Collections.singleton("lakesoul-cdc-multitable-bucket-states");
}
public BucketsBuilder getBucketsBuilder(){
return this.bucketsBuilder;
public class GlobalCommitterAdapter implements Committer<LakeSoulMultiTableSinkCommittable> {
final GlobalCommitter<LakeSoulMultiTableSinkCommittable, LakeSoulMultiTableSinkGlobalCommittable> globalCommitter;
final SimpleVersionedSerializer<LakeSoulMultiTableSinkGlobalCommittable> globalCommittableSerializer;

GlobalCommitterAdapter() {
try {
globalCommitter = LakeSoulMultiTablesSink.this.bucketsBuilder.createGlobalCommitter();
globalCommittableSerializer = LakeSoulMultiTablesSink.this.bucketsBuilder.getGlobalCommittableSerializer();
} catch (IOException e) {
throw new UncheckedIOException("Cannot create global committer", e);
}
}

@Override
public void close() throws Exception {
globalCommitter.close();
}

@Override
public void commit(Collection<CommitRequest<LakeSoulMultiTableSinkCommittable>> committables)
throws IOException, InterruptedException {
if (committables.isEmpty()) {
return;
}

List<LakeSoulMultiTableSinkCommittable> rawCommittables =
committables.stream()
.map(CommitRequest::getCommittable)
.collect(Collectors.toList());
List<LakeSoulMultiTableSinkGlobalCommittable> globalCommittables =
Collections.singletonList(globalCommitter.combine(rawCommittables));
List<LakeSoulMultiTableSinkGlobalCommittable> failures = globalCommitter.commit(globalCommittables);
// Only committables are retriable so the complete batch of committables is retried
// because we cannot trace back the committable to which global committable it belongs.
// This might lead to committing the same global committable twice, but we assume that
// the GlobalCommitter commit call is idempotent.
if (!failures.isEmpty()) {
committables.forEach(CommitRequest::retryLater);
}
}

public GlobalCommitter<LakeSoulMultiTableSinkCommittable, LakeSoulMultiTableSinkGlobalCommittable> getGlobalCommitter() {
return globalCommitter;
}

public SimpleVersionedSerializer<LakeSoulMultiTableSinkGlobalCommittable> getGlobalCommittableSerializer() {
return globalCommittableSerializer;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package org.apache.flink.lakesoul.sink.bucket;

import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.lakesoul.sink.committer.LakeSoulSinkCommitter;
import org.apache.flink.lakesoul.sink.committer.LakeSoulSinkGlobalCommitter;
Expand Down Expand Up @@ -41,7 +41,7 @@ public abstract SimpleVersionedSerializer<LakeSoulWriterBucketState> getWriterSt
public abstract SimpleVersionedSerializer<LakeSoulMultiTableSinkCommittable> getCommittableSerializer()
throws IOException;

public abstract LakeSoulSinkGlobalCommitter createGlobalCommitter() throws IOException;
public abstract LakeSoulSinkGlobalCommitter createGlobalCommitter();

public abstract SimpleVersionedSerializer<LakeSoulMultiTableSinkGlobalCommittable> getGlobalCommittableSerializer()
throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package org.apache.flink.lakesoul.sink.bucket;

import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
Expand Down Expand Up @@ -110,7 +110,7 @@ public SimpleVersionedSerializer<LakeSoulMultiTableSinkCommittable> getCommittab
}

@Override
public LakeSoulSinkGlobalCommitter createGlobalCommitter() throws IOException {
public LakeSoulSinkGlobalCommitter createGlobalCommitter() {
return new LakeSoulSinkGlobalCommitter(conf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package org.apache.flink.lakesoul.sink.bucket;

import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.lakesoul.sink.writer.AbstractLakeSoulMultiTableSinkWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package org.apache.flink.lakesoul.sink.bucket;

import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.lakesoul.sink.writer.AbstractLakeSoulMultiTableSinkWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package org.apache.flink.lakesoul.sink.bucket;

import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.lakesoul.sink.LakeSoulMultiTablesSink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.entity.*;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -44,13 +44,11 @@ public class LakeSoulSinkCommitter implements Committer<LakeSoulMultiTableSinkCo
public LakeSoulSinkCommitter() {
}

@Override
public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSinkCommittable> committables)
throws IOException {
LOG.info("Found {} committable for LakeSoul to commit", committables.size());
// commit by file creation time in ascending order
committables.sort(LakeSoulMultiTableSinkCommittable::compareTo);

public void commit(List<LakeSoulMultiTableSinkCommittable> committables, boolean sort)
throws IOException, InterruptedException {
if (sort) {
committables.sort(LakeSoulMultiTableSinkCommittable::compareTo);
}
DBManager lakeSoulDBManager = new DBManager();
for (LakeSoulMultiTableSinkCommittable committable : committables) {
LOG.info("Committing {}", committable);
Expand Down Expand Up @@ -140,8 +138,16 @@ public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSin
}
LOG.info("Committing done, committable={} ", committable);
}
}

return Collections.emptyList();
@Override
public void commit(Collection<CommitRequest<LakeSoulMultiTableSinkCommittable>> commits)
throws IOException, InterruptedException {
LOG.info("Found {} committable for LakeSoul to commit", commits.size());
// commit by file creation time in ascending order
List<LakeSoulMultiTableSinkCommittable> committables =
commits.stream().map(CommitRequest::getCommittable).sorted(LakeSoulMultiTableSinkCommittable::compareTo).collect(Collectors.toList());
this.commit(committables, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
}
}

committer.commit(lakeSoulMultiTableSinkCommittable);
committer.commit(lakeSoulMultiTableSinkCommittable, true);
}
return Collections.emptyList();
}
Expand Down
Loading

0 comments on commit b57edb1

Please sign in to comment.