Skip to content

Commit

Permalink
update log info
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
  • Loading branch information
zenghua committed Aug 9, 2024
1 parent 651c21c commit 3fda471
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,18 @@ public Optional<SimpleVersionedSerializer<LakeSoulWriterBucketState>> getWriterS
// StatefulGlobalTwoPhaseCommittingSinkAdapter
@Override
public Optional<Committer<LakeSoulMultiTableSinkCommittable>> createCommitter() throws IOException {
return Optional.empty();
// return Optional.of(new Committer<LakeSoulMultiTableSinkCommittable>() {
// @Override
// public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSinkCommittable> committables)
// throws IOException, InterruptedException {
// System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis()) + " org.apache.flink.api.connector.sink.Committer.commit: " + committables);
// return Collections.emptyList();
// }
//
// @Override
// public void close() throws Exception {
// }
// });
return Optional.of(new Committer<LakeSoulMultiTableSinkCommittable>() {
@Override
public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSinkCommittable> committables)
throws IOException, InterruptedException {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis()) + " org.apache.flink.api.connector.sink.Committer.commit: " + committables);
return Collections.emptyList();
}

@Override
public void close() throws Exception {
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSin

DBManager lakeSoulDBManager = new DBManager();
for (LakeSoulMultiTableSinkCommittable committable : committables) {
// LOG.info("Committing {}", committable);
LOG.info("Committing {}", committable);
for (Map.Entry<String, List<InProgressFileWriter.PendingFileRecoverable>> entry : committable.getPendingFilesMap().entrySet()) {
List<InProgressFileWriter.PendingFileRecoverable> pendingFiles = entry.getValue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
StructType sparkSchema = ArrowUtils.fromArrowSchema(msgSchema);

TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(tableName, tableNamespace);
// LOG.info("Committing: {}, {}, {}, {} {}", tableNamespace, tableName, isCdc, msgSchema, tableInfo);
if (tableInfo == null) {
String tableId = TABLE_ID_PREFIX + UUID.randomUUID();
String partition = DBUtil.formatTableInfoPartitionsField(identity.primaryKeys,
Expand All @@ -162,6 +161,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
dbManager.createNewTable(tableId, tableNamespace, tableName, identity.tableLocation, msgSchema.toJson(),
properties, partition);
} else {
LOG.info("Try to update table: {}, {}, {}, {} {}", tableNamespace, tableName, isCdc, msgSchema, tableInfo);
DBUtil.TablePartitionKeys partitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions());
if (partitionKeys.primaryKeys.size() != identity.primaryKeys.size() ||
!new HashSet<>(partitionKeys.primaryKeys).containsAll(identity.primaryKeys)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,7 @@ public List<LakeSoulMultiTableSinkCommittable> prepareCommit(boolean flush) thro
}
}
LOG.info("LakeSoulArrowMultiTableSinkWriter.prepareCommit, subTaskId={}, flush={}, {}", getSubTaskId(), flush, committables);

// if (flush) {
// LOG.info("printStackTrace");
// new Exception().printStackTrace(System.out);
// }

return committables;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ private Path assembleNewPartPath() {
}

private void closePartFile() throws IOException {
// LOG.info("ClosePartFile {}", inProgressPartWriter);
LOG.info("ClosePartFile {}", inProgressPartWriter);
if (inProgressPartWriter != null) {
if (inProgressPartWriter instanceof NativeLakeSoulArrowWrapperWriter) {
Map<String, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverableMap =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private void initNativeWriter() throws IOException {

FlinkUtil.setFSConfigs(conf, nativeWriter);
nativeWriter.initializeWriter();
// LOG.info("Initialized NativeLakeSoulArrowWrapperWriter: {}", this);
LOG.info("Initialized NativeLakeSoulArrowWrapperWriter: {}", this);
}

@Override
Expand Down

0 comments on commit 3fda471

Please sign in to comment.