diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java index 3c1a6fa8f2f..4dafc58adc5 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java @@ -110,7 +110,7 @@ public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) { registerMetricsForCurrentFetchEventTimeLag(); registerMetricsForCurrentEmitEventTimeLag(); registerMetricsForDeserializeTimeLag(); - registerMetricsForNumCompletedCheckpoints(new ThreadSafeCounter()); + registerMetricsForNumCompletedSnapshots(new ThreadSafeCounter()); registerMetricsForNumDeserializeSuccess(new ThreadSafeCounter()); registerMetricsForNumDeserializeError(new ThreadSafeCounter()); registerMetricsForNumSnapshotCreate(new ThreadSafeCounter()); @@ -216,7 +216,7 @@ public void registerMetricsForNumSnapshotError(Counter counter) { numSnapshotError = registerCounter(NUM_SNAPSHOT_ERROR, counter); } - public void registerMetricsForNumCompletedCheckpoints(Counter counter) { + public void registerMetricsForNumCompletedSnapshots(Counter counter) { numCompletedSnapshots = registerCounter(NUM_COMPLETED_SNAPSHOTS, counter); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java index 048b0d282eb..95c79a53f4f 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java @@ -17,9 +17,6 @@ package org.apache.inlong.sort.postgre; -import org.apache.inlong.sort.base.metric.MetricsCollector; -import org.apache.inlong.sort.base.metric.SourceExactlyMetric; - import com.ververica.cdc.debezium.table.AppendMetadataCollector; import com.ververica.cdc.debezium.table.DebeziumChangelogMode; import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter; @@ -46,6 +43,8 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; +import org.apache.inlong.sort.base.metric.MetricsCollector; +import org.apache.inlong.sort.base.metric.SourceExactlyMetric; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; @@ -136,7 +135,7 @@ public void open() { @Override public void deserialize(SourceRecord record, Collector out) throws Exception { - long deserializeStartTime = System.nanoTime(); + long deserializeStartTime = System.currentTimeMillis(); try { doDeserialize(record, out, deserializeStartTime); } catch (Exception e) { @@ -186,7 +185,7 @@ private void doDeserialize(SourceRecord record, Collector out, long des } if (sourceExactlyMetric != null) { sourceExactlyMetric.incNumDeserializeSuccess(); - sourceExactlyMetric.recordDeserializeDelay(System.nanoTime() - deserializeStartTime); + sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime); } }