Skip to content

Commit

Permalink
merge LakeSoulMultiTableSinkGlobalCommittable with sorted createtime
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
  • Loading branch information
zenghua committed Jul 31, 2024
1 parent 5fbdf95 commit 096bac8
Showing 1 changed file with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,24 @@ public LakeSoulMultiTableSinkGlobalCommittable(
}

public static LakeSoulMultiTableSinkGlobalCommittable fromLakeSoulMultiTableSinkGlobalCommittable(
List<LakeSoulMultiTableSinkGlobalCommittable> globalCommittables, boolean isBounded) {
Map<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> groupedCommitables =
new HashMap<>();
globalCommittables.forEach(globalCommittable -> globalCommittable.getGroupedCommittable().forEach(
(key, value) -> groupedCommitables.computeIfAbsent(key, tuple2 -> new ArrayList<>()).addAll(value)));
return new LakeSoulMultiTableSinkGlobalCommittable(groupedCommitables, isBounded);
List<LakeSoulMultiTableSinkGlobalCommittable> globalCommittableList, boolean isBounded) {
List<LakeSoulMultiTableSinkCommittable> committableList = new ArrayList<>();
for (LakeSoulMultiTableSinkGlobalCommittable globalCommittable : globalCommittableList) {
globalCommittable.getGroupedCommittable().values().forEach(committableList::addAll);
}
return fromLakeSoulMultiTableSinkCommittable(committableList, isBounded);
// Map<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> groupedCommitables =
// new HashMap<>();
// globalCommittables.forEach(globalCommittable -> globalCommittable.getGroupedCommittable().forEach(
// (key, value) -> groupedCommitables.computeIfAbsent(key, tuple2 -> new ArrayList<>()).addAll(value)));
// return new LakeSoulMultiTableSinkGlobalCommittable(groupedCommitables, isBounded);
}

public static LakeSoulMultiTableSinkGlobalCommittable fromLakeSoulMultiTableSinkCommittable(
List<LakeSoulMultiTableSinkCommittable> committables, boolean isBounded) {
Map<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> groupedCommitables =
new HashMap<>();
committables.sort(LakeSoulMultiTableSinkCommittable::compareTo);
committables.forEach(committable -> groupedCommitables.computeIfAbsent(
Tuple2.of(committable.getIdentity(), committable.getBucketId()), tuple2 -> new ArrayList<>())
.add(committable));
Expand Down

0 comments on commit 096bac8

Please sign in to comment.