From 851023e27f6214c2e202856fc6612cf6404c434d Mon Sep 17 00:00:00 2001 From: PeterZh6 Date: Wed, 18 Sep 2024 15:09:38 +0800 Subject: [PATCH] fix: added null checking --- .../sort/postgre/DebeziumSourceFunction.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java index 5ef6abacb98..5f9aaa2e300 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java @@ -342,8 +342,12 @@ private void doSnapshotState(FunctionSnapshotContext functionSnapshotContext) th .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); } // record the start time of each checkpoint - long checkpointId = functionSnapshotContext.getCheckpointId(); - checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis()); + Long checkpointId = functionSnapshotContext.getCheckpointId(); + if (checkpointStartTimeMap != null) { + checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis()); + } else { + LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint"); + } sourceExactlyMetric.incNumSnapshotCreate(); } @@ -525,10 +529,13 @@ public void notifyCheckpointComplete(long checkpointId) { schema.updateLastCheckpointId(checkpointId); } // get the start time of the currently completed checkpoint - Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); - if (snapShotStartTimeById != null) { - sourceExactlyMetric.incNumCompletedSnapshots(); - sourceExactlyMetric.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); + if (checkpointStartTimeMap != null) { + Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); + if (snapShotStartTimeById != null) { + sourceExactlyMetric.incNumCompletedSnapshots(); + sourceExactlyMetric + .recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); + } } } catch (Exception e) { // ignore exception if we are no longer running