Skip to content

Commit

Permalink
fix: added null checking
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterZh6 committed Sep 18, 2024
1 parent e8f5381 commit 851023e
Showing 1 changed file with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,12 @@ private void doSnapshotState(FunctionSnapshotContext functionSnapshotContext) th
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
}
// record the start time of each checkpoint
long checkpointId = functionSnapshotContext.getCheckpointId();
checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis());
Long checkpointId = functionSnapshotContext.getCheckpointId();
if (checkpointStartTimeMap != null) {
checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis());
} else {
LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint");
}
sourceExactlyMetric.incNumSnapshotCreate();
}

Expand Down Expand Up @@ -525,10 +529,13 @@ public void notifyCheckpointComplete(long checkpointId) {
schema.updateLastCheckpointId(checkpointId);
}
// get the start time of the currently completed checkpoint
Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
if (snapShotStartTimeById != null) {
sourceExactlyMetric.incNumCompletedSnapshots();
sourceExactlyMetric.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById);
if (checkpointStartTimeMap != null) {
Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
if (snapShotStartTimeById != null) {
sourceExactlyMetric.incNumCompletedSnapshots();
sourceExactlyMetric
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById);
}
}
} catch (Exception e) {
// ignore exception if we are no longer running
Expand Down

0 comments on commit 851023e

Please sign in to comment.