diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulMultiTableSinkGlobalCommittable.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulMultiTableSinkGlobalCommittable.java index f20e0c251..a3c400230 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulMultiTableSinkGlobalCommittable.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulMultiTableSinkGlobalCommittable.java @@ -49,18 +49,24 @@ public LakeSoulMultiTableSinkGlobalCommittable( } public static LakeSoulMultiTableSinkGlobalCommittable fromLakeSoulMultiTableSinkGlobalCommittable( - List globalCommittables, boolean isBounded) { - Map, List> groupedCommitables = - new HashMap<>(); - globalCommittables.forEach(globalCommittable -> globalCommittable.getGroupedCommittable().forEach( - (key, value) -> groupedCommitables.computeIfAbsent(key, tuple2 -> new ArrayList<>()).addAll(value))); - return new LakeSoulMultiTableSinkGlobalCommittable(groupedCommitables, isBounded); + List globalCommittableList, boolean isBounded) { + List committableList = new ArrayList<>(); + for (LakeSoulMultiTableSinkGlobalCommittable globalCommittable : globalCommittableList) { + globalCommittable.getGroupedCommittable().values().forEach(committableList::addAll); + } + return fromLakeSoulMultiTableSinkCommittable(committableList, isBounded); +// Map, List> groupedCommitables = +// new HashMap<>(); +// globalCommittables.forEach(globalCommittable -> globalCommittable.getGroupedCommittable().forEach( +// (key, value) -> groupedCommitables.computeIfAbsent(key, tuple2 -> new ArrayList<>()).addAll(value))); +// return new LakeSoulMultiTableSinkGlobalCommittable(groupedCommitables, isBounded); } public static LakeSoulMultiTableSinkGlobalCommittable fromLakeSoulMultiTableSinkCommittable( List committables, boolean isBounded) { Map, List> groupedCommitables = new HashMap<>(); + committables.sort(LakeSoulMultiTableSinkCommittable::compareTo); committables.forEach(committable -> groupedCommitables.computeIfAbsent( Tuple2.of(committable.getIdentity(), committable.getBucketId()), tuple2 -> new ArrayList<>()) .add(committable));