Skip to content

Commit

Permalink
refactor: rename and change back to milliseconds
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterZh6 committed Sep 18, 2024
1 parent a14f111 commit 8d7e076
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) {
registerMetricsForCurrentFetchEventTimeLag();
registerMetricsForCurrentEmitEventTimeLag();
registerMetricsForDeserializeTimeLag();
registerMetricsForNumCompletedCheckpoints(new ThreadSafeCounter());
registerMetricsForNumCompletedSnapshots(new ThreadSafeCounter());
registerMetricsForNumDeserializeSuccess(new ThreadSafeCounter());
registerMetricsForNumDeserializeError(new ThreadSafeCounter());
registerMetricsForNumSnapshotCreate(new ThreadSafeCounter());
Expand Down Expand Up @@ -216,7 +216,7 @@ public void registerMetricsForNumSnapshotError(Counter counter) {
numSnapshotError = registerCounter(NUM_SNAPSHOT_ERROR, counter);
}

public void registerMetricsForNumCompletedCheckpoints(Counter counter) {
public void registerMetricsForNumCompletedSnapshots(Counter counter) {
numCompletedSnapshots = registerCounter(NUM_COMPLETED_SNAPSHOTS, counter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.inlong.sort.postgre;

import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;

import com.ververica.cdc.debezium.table.AppendMetadataCollector;
import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
Expand All @@ -46,6 +43,8 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -136,7 +135,7 @@ public void open() {

@Override
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
long deserializeStartTime = System.nanoTime();
long deserializeStartTime = System.currentTimeMillis();
try {
doDeserialize(record, out, deserializeStartTime);
} catch (Exception e) {
Expand Down Expand Up @@ -186,7 +185,7 @@ private void doDeserialize(SourceRecord record, Collector<RowData> out, long des
}
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeSuccess();
sourceExactlyMetric.recordDeserializeDelay(System.nanoTime() - deserializeStartTime);
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime);
}
}

Expand Down

0 comments on commit 8d7e076

Please sign in to comment.