Skip to content

Commit

Permalink
test dynamic_bucket failover recover
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
  • Loading branch information
zenghua committed Aug 2, 2024
1 parent 9b64b0c commit 0e63939
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
.setTableSchema(msgSchema.toJson())
.setPartitions(partition)
.setProperties(properties.toJSONString());
Tuple2<Long, TableInfo> last = finalTableInfoMap.get(tablePath);
if (last == null) {
Tuple2<Long, TableInfo> latest = finalTableInfoMap.get(tablePath);
if (latest == null) {
finalTableInfoMap.put(tablePath, Tuple2.of(latestSrcTsMs, builder.build()));
} else if (last.f0 < latestSrcTsMs) {
} else if (latest.f0 < latestSrcTsMs) {
finalTableInfoMap.put(tablePath, Tuple2.of(latestSrcTsMs, builder.build()));
}
} else {
Expand All @@ -187,11 +187,11 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
}

String tablePath = tableInfo.getTablePath();
Tuple2<Long, TableInfo> last = finalTableInfoMap.get(tablePath);
Tuple2<Long, TableInfo> latest = finalTableInfoMap.get(tablePath);
TableInfo.Builder builder = tableInfo.toBuilder().setTableSchema(msgSchema.toJson());
if (last == null) {
if (latest == null) {
finalTableInfoMap.put(tablePath, Tuple2.of(latestSrcTsMs, builder.build()));
} else if (last.f0 < latestSrcTsMs) {
} else if (latest.f0 < latestSrcTsMs) {
finalTableInfoMap.put(tablePath, Tuple2.of(latestSrcTsMs, builder.build()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testLakeSoulDataStreamSink() throws Exception {
TableId tableId = new TableId(LakeSoulCatalog.CATALOG_NAME, "default", tableName);
LakeSoulRowDataWrapper data = mockInsertLakeSoulRowDataWrapper(1, useCDC, tableId);
recordCollection.add(new BinarySourceRecord(topic, primaryKeys, tableId, path, partitionKeys, false, data, ""));

Thread.sleep(100);
// LakeSoulRowDataWrapper data1 = mockUpdateLakeSoulRowDataWrapper(2, useCDC, tableId);
LakeSoulRowDataWrapper data1 = mockInsertLakeSoulRowDataWrapper(4, useCDC, tableId);
recordCollection.add(new BinarySourceRecord(topic, primaryKeys, tableId, path, partitionKeys, false, data1, ""));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,9 @@ abstract class DDLTestBase extends QueryTest with SQLTestUtils {
}
}

// test("read") {
// sql("show create table TestBinarySourceRecordSink").show(false)
// sql("desc TestBinarySourceRecordSink").show()
// sql("select * from TestBinarySourceRecordSink").show()
// }
test("read") {
sql("show create table TestBinarySourceRecordSink").show(false)
sql("desc TestBinarySourceRecordSink").show()
sql("select * from TestBinarySourceRecordSink").show()
}
}

0 comments on commit 0e63939

Please sign in to comment.