diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkSubMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkSubMetricData.java index 35f77c9c4fb..e6039f97078 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkSubMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkSubMetricData.java @@ -17,7 +17,7 @@ package org.apache.inlong.sort.base.metric.sub; -import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.metric.SinkExactlyMetric; import java.util.Map; @@ -31,5 +31,5 @@ public interface SinkSubMetricData { * * @return The sub sink metric map */ - Map getSubSinkMetricMap(); + Map getSubSinkMetricMap(); } \ No newline at end of file diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java index 7124d169769..94b62098c6b 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java @@ -21,7 +21,7 @@ import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; -import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.metric.SinkExactlyMetric; import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; @@ -44,14 +44,14 @@ /** * A collection class for handling sub metrics of table schema type */ -public class SinkTableMetricData extends SinkMetricData implements SinkSubMetricData { +public class SinkTableMetricData extends SinkExactlyMetric implements SinkSubMetricData { public static final Logger LOGGER = LoggerFactory.getLogger(SinkTableMetricData.class); /** * The sub sink metric data container of sink metric data */ - private final Map subSinkMetricMap = Maps.newHashMap(); + private final Map subSinkMetricMap = Maps.newHashMap(); public SinkTableMetricData(MetricOption option, MetricGroup metricGroup) { super(option, metricGroup); @@ -75,7 +75,7 @@ public void registerSubMetricsGroup(MetricState metricState) { for (Entry subMetricStateEntry : subMetricStateMap.entrySet()) { String[] schemaInfoArray = parseSchemaIdentify(subMetricStateEntry.getKey()); final MetricState subMetricState = subMetricStateEntry.getValue(); - SinkMetricData subSinkMetricData = buildSubSinkMetricData(schemaInfoArray, subMetricState, this); + SinkExactlyMetric subSinkMetricData = buildSubSinkMetricData(schemaInfoArray, subMetricState, this); subSinkMetricMap.put(subMetricStateEntry.getKey(), subSinkMetricData); } LOGGER.info("register subMetricsGroup from metricState,sub metric map size:{}", subSinkMetricMap.size()); @@ -85,11 +85,11 @@ public void registerSubMetricsGroup(MetricState metricState) { * build sub sink metric data * * @param schemaInfoArray sink record schema info - * @param sinkMetricData sink metric data + * @param sinkExactlyMetric sink metric data * @return sub sink metric data */ - private SinkMetricData buildSubSinkMetricData(String[] schemaInfoArray, SinkMetricData sinkMetricData) { - return buildSubSinkMetricData(schemaInfoArray, null, sinkMetricData); + private SinkExactlyMetric buildSubSinkMetricData(String[] schemaInfoArray, SinkExactlyMetric sinkExactlyMetric) { + return buildSubSinkMetricData(schemaInfoArray, null, sinkExactlyMetric); } /** @@ -97,16 +97,16 @@ private SinkMetricData buildSubSinkMetricData(String[] schemaInfoArray, SinkMetr * * @param schemaInfoArray the schema info array of record * @param subMetricState sub metric state - * @param sinkMetricData sink metric data + * @param sinkExactlyMetric sink metric data * @return sub sink metric data */ - private SinkMetricData buildSubSinkMetricData(String[] schemaInfoArray, MetricState subMetricState, - SinkMetricData sinkMetricData) { - if (sinkMetricData == null || schemaInfoArray == null) { + private SinkExactlyMetric buildSubSinkMetricData(String[] schemaInfoArray, MetricState subMetricState, + SinkExactlyMetric sinkExactlyMetric) { + if (sinkExactlyMetric == null || schemaInfoArray == null) { return null; } // build sub sink metric data - Map labels = sinkMetricData.getLabels(); + Map labels = sinkExactlyMetric.getLabels(); String metricGroupLabels = labels.entrySet().stream().map(entry -> entry.getKey() + "=" + entry.getValue()) .collect(Collectors.joining(DELIMITER)); StringBuilder labelBuilder = new StringBuilder(metricGroupLabels); @@ -125,7 +125,7 @@ private SinkMetricData buildSubSinkMetricData(String[] schemaInfoArray, MetricSt .withInitDirtyRecords(subMetricState != null ? subMetricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L) .withInitDirtyBytes(subMetricState != null ? subMetricState.getMetricValue(DIRTY_BYTES_OUT) : 0L) .withInlongLabels(labelBuilder.toString()).withRegisterMetric(RegisteredMetric.ALL).build(); - return new SinkTableMetricData(metricOption, sinkMetricData.getMetricGroup()); + return new SinkTableMetricData(metricOption, sinkExactlyMetric.getMetricGroup()); } /** @@ -182,11 +182,11 @@ public void outputMetricsWithEstimate(String database, String schema, String tab */ public void outputMetrics(String database, String schema, String table, long rowCount, long rowSize) { if (StringUtils.isBlank(database) || StringUtils.isBlank(table)) { - invoke(rowCount, rowSize); + invoke(rowCount, rowSize, System.currentTimeMillis()); return; } String identify = buildSchemaIdentify(database, schema, table); - SinkMetricData subSinkMetricData; + SinkExactlyMetric subSinkMetricData; if (subSinkMetricMap.containsKey(identify)) { subSinkMetricData = subSinkMetricMap.get(identify); } else { @@ -194,8 +194,8 @@ public void outputMetrics(String database, String schema, String table, long row subSinkMetricMap.put(identify, subSinkMetricData); } // sink metric and sub sink metric output metrics - this.invoke(rowCount, rowSize); - subSinkMetricData.invoke(rowCount, rowSize); + this.invoke(rowCount, rowSize, System.currentTimeMillis()); + subSinkMetricData.invoke(rowCount, rowSize, System.currentTimeMillis()); } /** @@ -209,11 +209,11 @@ public void outputMetrics(String database, String schema, String table, long row public void outputMetrics(String database, String table, long rowCount, long rowSize) { if (StringUtils.isBlank(database) || StringUtils.isBlank(table)) { - invoke(rowCount, rowSize); + invoke(rowCount, rowSize, System.currentTimeMillis()); return; } String identify = buildSchemaIdentify(database, null, table); - SinkMetricData subSinkMetricData; + SinkExactlyMetric subSinkMetricData; if (subSinkMetricMap.containsKey(identify)) { subSinkMetricData = subSinkMetricMap.get(identify); } else { @@ -221,8 +221,8 @@ public void outputMetrics(String database, String table, long rowCount, subSinkMetricMap.put(identify, subSinkMetricData); } // sink metric and sub sink metric output metrics - this.invoke(rowCount, rowSize); - subSinkMetricData.invoke(rowCount, rowSize); + this.invoke(rowCount, rowSize, System.currentTimeMillis()); + subSinkMetricData.invoke(rowCount, rowSize, System.currentTimeMillis()); } /** @@ -234,10 +234,10 @@ public void outputMetrics(String database, String table, long rowCount, */ public void outputMetrics(String index, long rowCount, long rowSize) { if (StringUtils.isBlank(index)) { - invoke(rowCount, rowSize); + invoke(rowCount, rowSize, System.currentTimeMillis()); return; } - SinkMetricData subSinkMetricData; + SinkExactlyMetric subSinkMetricData; if (subSinkMetricMap.containsKey(index)) { subSinkMetricData = subSinkMetricMap.get(index); } else { @@ -245,8 +245,8 @@ public void outputMetrics(String index, long rowCount, long rowSize) { subSinkMetricMap.put(index, subSinkMetricData); } // sink metric and sub sink metric output metrics - this.invoke(rowCount, rowSize); - subSinkMetricData.invoke(rowCount, rowSize); + this.invoke(rowCount, rowSize, System.currentTimeMillis()); + subSinkMetricData.invoke(rowCount, rowSize, System.currentTimeMillis()); } /** @@ -264,7 +264,7 @@ public void outputDirtyMetricsWithEstimate(String database, String table, long r return; } String identify = buildSchemaIdentify(database, null, table); - SinkMetricData subSinkMetricData; + SinkExactlyMetric subSinkMetricData; if (subSinkMetricMap.containsKey(identify)) { subSinkMetricData = subSinkMetricMap.get(identify); } else { @@ -292,7 +292,7 @@ public void outputDirtyMetricsWithEstimate(String database, String schema, Strin return; } String identify = buildSchemaIdentify(database, schema, table); - SinkMetricData subSinkMetricData; + SinkExactlyMetric subSinkMetricData; if (subSinkMetricMap.containsKey(identify)) { subSinkMetricData = subSinkMetricMap.get(identify); } else { @@ -306,7 +306,7 @@ public void outputDirtyMetricsWithEstimate(String database, String schema, Strin public void outputMetricsWithEstimate(Object data) { long size = data.toString().getBytes(StandardCharsets.UTF_8).length; - invoke(1, size); + invoke(1, size, System.currentTimeMillis()); } /** @@ -324,7 +324,7 @@ public void outputDirtyMetrics(String database, String schema, String table, lon return; } String identify = buildSchemaIdentify(database, schema, table); - SinkMetricData subSinkMetricData; + SinkExactlyMetric subSinkMetricData; if (subSinkMetricMap.containsKey(identify)) { subSinkMetricData = subSinkMetricMap.get(identify); } else { @@ -350,7 +350,7 @@ public void outputDirtyMetrics(String database, String table, long rowCount, lon return; } String identify = buildSchemaIdentify(database, null, table); - SinkMetricData subSinkMetricData; + SinkExactlyMetric subSinkMetricData; if (subSinkMetricMap.containsKey(identify)) { subSinkMetricData = subSinkMetricMap.get(identify); } else { @@ -374,7 +374,7 @@ public void outputDirtyMetrics(String index, long rowCount, long rowSize) { invokeDirty(rowCount, rowSize); return; } - SinkMetricData subSinkMetricData; + SinkExactlyMetric subSinkMetricData; if (subSinkMetricMap.containsKey(index)) { subSinkMetricData = subSinkMetricMap.get(index); } else { @@ -415,7 +415,7 @@ public void outputDirtyMetricsWithEstimate(Object data) { } @Override - public Map getSubSinkMetricMap() { + public Map getSubSinkMetricMap() { return this.subSinkMetricMap; } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTopicMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTopicMetricData.java index 620e3289db0..6a4fe1dab32 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTopicMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTopicMetricData.java @@ -21,7 +21,7 @@ import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; -import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.metric.SinkExactlyMetric; import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; @@ -39,14 +39,14 @@ import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; -public class SinkTopicMetricData extends SinkMetricData implements SinkSubMetricData { +public class SinkTopicMetricData extends SinkExactlyMetric implements SinkSubMetricData { public static final Logger LOGGER = LoggerFactory.getLogger(SinkTopicMetricData.class); /** * The sink metric data map */ - private final Map topicSinkMetricMap = Maps.newHashMap(); + private final Map topicSinkMetricMap = Maps.newHashMap(); public SinkTopicMetricData(MetricOption option, MetricGroup metricGroup) { super(option, metricGroup); @@ -70,7 +70,7 @@ public void registerSubMetricsGroup(MetricState metricState) { for (Entry subMetricStateEntry : subMetricStateMap.entrySet()) { String topic = subMetricStateEntry.getKey(); final MetricState subMetricState = subMetricStateEntry.getValue(); - SinkMetricData subSinkMetricData = buildSinkMetricData(topic, subMetricState, this); + SinkExactlyMetric subSinkMetricData = buildSinkExactlyMetric(topic, subMetricState, this); topicSinkMetricMap.put(topic, subSinkMetricData); } LOGGER.info("register topicMetricsGroup from metricState,topic level metric map size:{}", @@ -79,13 +79,13 @@ public void registerSubMetricsGroup(MetricState metricState) { public void sendOutMetrics(String topic, long rowCount, long rowSize) { if (StringUtils.isBlank(topic)) { - invoke(rowCount, rowSize); + invoke(rowCount, rowSize, System.currentTimeMillis()); return; } - SinkMetricData sinkMetricData = getSinkMetricData(topic); + SinkExactlyMetric sinkExactlyMetric = getSinkExactlyMetric(topic); - this.invoke(rowCount, rowSize); - sinkMetricData.invoke(rowCount, rowSize); + this.invoke(rowCount, rowSize, System.currentTimeMillis()); + sinkExactlyMetric.invoke(rowCount, rowSize, System.currentTimeMillis()); } public void sendDirtyMetrics(String topic, long rowCount, long rowSize) { @@ -93,25 +93,26 @@ public void sendDirtyMetrics(String topic, long rowCount, long rowSize) { invokeDirty(rowCount, rowSize); return; } - SinkMetricData sinkMetricData = getSinkMetricData(topic); + SinkExactlyMetric sinkExactlyMetric = getSinkExactlyMetric(topic); this.invokeDirty(rowCount, rowSize); - sinkMetricData.invokeDirty(rowCount, rowSize); + sinkExactlyMetric.invokeDirty(rowCount, rowSize); } - private SinkMetricData getSinkMetricData(String topic) { - SinkMetricData sinkMetricData; + private SinkExactlyMetric getSinkExactlyMetric(String topic) { + SinkExactlyMetric sinkExactlyMetric; if (topicSinkMetricMap.containsKey(topic)) { - sinkMetricData = topicSinkMetricMap.get(topic); + sinkExactlyMetric = topicSinkMetricMap.get(topic); } else { - sinkMetricData = buildSinkMetricData(topic, null, this); - topicSinkMetricMap.put(topic, sinkMetricData); + sinkExactlyMetric = buildSinkExactlyMetric(topic, null, this); + topicSinkMetricMap.put(topic, sinkExactlyMetric); } - return sinkMetricData; + return sinkExactlyMetric; } - private SinkMetricData buildSinkMetricData(String topic, MetricState metricState, SinkMetricData sinkMetricData) { - Map labels = sinkMetricData.getLabels(); + private SinkExactlyMetric buildSinkExactlyMetric(String topic, MetricState metricState, + SinkExactlyMetric sinkExactlyMetric) { + Map labels = sinkExactlyMetric.getLabels(); String metricGroupLabels = labels.entrySet().stream().map(entry -> entry.getKey() + "=" + entry.getValue()) .collect(Collectors.joining(DELIMITER)); @@ -123,11 +124,11 @@ private SinkMetricData buildSinkMetricData(String topic, MetricState metricState .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); - return new SinkMetricData(metricOption, sinkMetricData.getMetricGroup()); + return new SinkExactlyMetric(metricOption, sinkExactlyMetric.getMetricGroup()); } @Override - public Map getSubSinkMetricMap() { + public Map getSubSinkMetricMap() { return this.topicSinkMetricMap; } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java index b77ef1adb2e..e69ce145f8a 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java @@ -19,7 +19,7 @@ import org.apache.inlong.sort.base.enums.ReadPhase; import org.apache.inlong.sort.base.metric.MetricState; -import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.metric.SinkExactlyMetric; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.sort.base.metric.phase.ReadPhaseMetricData; import org.apache.inlong.sort.base.metric.sub.SinkSubMetricData; @@ -208,52 +208,52 @@ private static void snapshotMetricStateForSourceSubMetricData(SourceMetricData s } /** - * Snapshot metric state data for {@link SinkMetricData} + * Snapshot metric state data for {@link org.apache.inlong.sort.base.metric.SinkExactlyMetric} * @param metricStateListState state data list - * @param sinkMetricData {@link SinkMetricData} A collection class for handling metrics + * @param sinkExactlyMetric {@link SinkExactlyMetric} A collection class for handling metrics * @param subtaskIndex subtask index * @throws Exception throw exception when add metric state */ public static void snapshotMetricStateForSinkMetricData(ListState metricStateListState, - SinkMetricData sinkMetricData, Integer subtaskIndex) + SinkExactlyMetric sinkExactlyMetric, Integer subtaskIndex) throws Exception { LOGGER.info("snapshotMetricStateForSinkMetricData:{}, sinkMetricData:{}, subtaskIndex:{}", - metricStateListState, sinkMetricData, subtaskIndex); + metricStateListState, sinkExactlyMetric, subtaskIndex); metricStateListState.clear(); Map metricDataMap = new HashMap<>(); - metricDataMap.put(NUM_RECORDS_OUT, sinkMetricData.getNumRecordsOut().getCount()); - metricDataMap.put(NUM_BYTES_OUT, sinkMetricData.getNumBytesOut().getCount()); - if (sinkMetricData.getDirtyRecordsOut() != null) { - metricDataMap.put(DIRTY_RECORDS_OUT, sinkMetricData.getDirtyRecordsOut().getCount()); + metricDataMap.put(NUM_RECORDS_OUT, sinkExactlyMetric.getNumRecordsOut().getCount()); + metricDataMap.put(NUM_BYTES_OUT, sinkExactlyMetric.getNumBytesOut().getCount()); + if (sinkExactlyMetric.getDirtyRecordsOut() != null) { + metricDataMap.put(DIRTY_RECORDS_OUT, sinkExactlyMetric.getDirtyRecordsOut().getCount()); } - if (sinkMetricData.getDirtyBytesOut() != null) { - metricDataMap.put(DIRTY_BYTES_OUT, sinkMetricData.getDirtyBytesOut().getCount()); + if (sinkExactlyMetric.getDirtyBytesOut() != null) { + metricDataMap.put(DIRTY_BYTES_OUT, sinkExactlyMetric.getDirtyBytesOut().getCount()); } MetricState metricState = new MetricState(subtaskIndex, metricDataMap); // snapshot sub metric data state - snapshotMetricStateForSinkSubMetricData(sinkMetricData, subtaskIndex, metricState); + snapshotMetricStateForSinkSubMetricData(sinkExactlyMetric, subtaskIndex, metricState); metricStateListState.add(metricState); } /** - * Snapshot sub metric state data for {@link SinkSubMetricData} - * @param sinkMetricData {@link SinkMetricData} A collection class for handling metrics + * Snapshot sub metric state data for {@link SinkExactlyMetric} + * @param sinkExactlyMetric {@link SinkExactlyMetric} A collection class for handling metrics * @param subtaskIndex subtask index * @param metricState state of source metric data */ - private static void snapshotMetricStateForSinkSubMetricData(SinkMetricData sinkMetricData, + private static void snapshotMetricStateForSinkSubMetricData(SinkExactlyMetric sinkExactlyMetric, Integer subtaskIndex, MetricState metricState) { - if (!(sinkMetricData instanceof SinkSubMetricData)) { + if (!(sinkExactlyMetric instanceof SinkSubMetricData)) { return; } - SinkSubMetricData sinkSubMetricData = (SinkSubMetricData) sinkMetricData; + SinkSubMetricData sinkSubMetricData = (SinkSubMetricData) sinkExactlyMetric; - Map subSinkMetricMap = sinkSubMetricData.getSubSinkMetricMap(); + Map subSinkMetricMap = sinkSubMetricData.getSubSinkMetricMap(); if (subSinkMetricMap != null && !subSinkMetricMap.isEmpty()) { Map subMetricStateMap = new HashMap<>(); - Set> entries = subSinkMetricMap.entrySet(); - for (Entry entry : entries) { + Set> entries = subSinkMetricMap.entrySet(); + for (Entry entry : entries) { Map subMetricDataMap = new HashMap<>(); subMetricDataMap.put(NUM_RECORDS_OUT, entry.getValue().getNumRecordsOut().getCount()); subMetricDataMap.put(NUM_BYTES_OUT, entry.getValue().getNumBytesOut().getCount()); diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java index 5171d96da2b..7446293d5cc 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java @@ -485,7 +485,8 @@ private boolean asyncFlush() throws Exception { metricData.outputMetrics(flushData.getDatabase(), flushData.getTable(), flushData.getBatchCount(), flushData.getBatchSize()); } else { - metricData.invoke(flushData.getBatchCount(), flushData.getBatchSize()); + metricData.invoke(flushData.getBatchCount(), flushData.getBatchSize(), + System.currentTimeMillis()); } } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java index cb438e82310..3796b15d367 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java @@ -19,7 +19,7 @@ import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricState; -import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.metric.SinkExactlyMetric; import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.redis.common.container.InlongRedisCommandsContainer; import org.apache.inlong.sort.redis.common.container.RedisCommandsContainerBuilder; @@ -27,6 +27,7 @@ import org.apache.commons.lang3.time.StopWatch; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; @@ -45,11 +46,17 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.inlong.sort.base.Constants.*; +import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; /** * The Flink Redis Producer. @@ -58,7 +65,8 @@ public abstract class AbstractRedisSinkFunction extends RichSinkFunction implements - CheckpointedFunction { + CheckpointedFunction, + CheckpointListener { private static final long serialVersionUID = 1L; @@ -115,7 +123,10 @@ public abstract class AbstractRedisSinkFunction private final String inLongMetric; private transient MetricState metricState; private transient ListState metricStateListState; - private SinkMetricData sinkMetricData; + private transient SinkExactlyMetric sinkExactlyMetric; + + /** The map to store the start time of each checkpoint. */ + private transient Map checkpointStartTimeMap; public AbstractRedisSinkFunction( TypeInformation outputType, @@ -178,8 +189,9 @@ public void open(Configuration parameters) { .withRegisterMetric(MetricOption.RegisteredMetric.ALL) .build(); if (metricOption != null) { - sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); + sinkExactlyMetric = new SinkExactlyMetric(metricOption, getRuntimeContext().getMetricGroup()); } + checkpointStartTimeMap = new HashMap<>(); } @Override @@ -208,23 +220,51 @@ public void initializeState(FunctionInitializationContext context) throws Except @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + if (sinkExactlyMetric != null) { + // record the start time of each checkpoint + checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis()); + } LOG.info("redis start snapshotState, id: {}", functionSnapshotContext.getCheckpointId()); synchronized (lock) { listState.clear(); listState.addAll(rows); } - if (sinkMetricData != null && metricStateListState != null) { - MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData, + if (sinkExactlyMetric != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkExactlyMetric, getRuntimeContext().getIndexOfThisSubtask()); } LOG.info("redis end snapshotState, id: {}", functionSnapshotContext.getCheckpointId()); + if (sinkExactlyMetric != null) { + sinkExactlyMetric.incNumSnapshotCreate(); + } + } + + /** let flink report the completion of checkpoint */ + @Override + public void notifyCheckpointComplete(long checkpointId) { + if (sinkExactlyMetric != null) { + sinkExactlyMetric.incNumSnapshotComplete(); + // get the start time of the currently completed checkpoint + Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); + sinkExactlyMetric + .recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); + } } protected List serialize(RowData in) { try { - return stateEncoder.serialize(in, serializationSchema); + long serializeStartTime = System.currentTimeMillis(); + List serializedData = stateEncoder.serialize(in, serializationSchema); + if (sinkExactlyMetric != null) { + sinkExactlyMetric.recordSerializeDelay(System.currentTimeMillis() - serializeStartTime); + sinkExactlyMetric.incNumSerializeSuccess(); + } + return serializedData; } catch (Exception e) { LOG.error("Error when serializing data: " + in); + if (sinkExactlyMetric != null) { + sinkExactlyMetric.incNumSerializeError(); + } throw new RuntimeException(e); } } @@ -327,8 +367,8 @@ private void flush() { } protected void sendMetrics(byte[] document) { - if (sinkMetricData != null) { - sinkMetricData.invoke(1, document.length); + if (sinkExactlyMetric != null) { + sinkExactlyMetric.invoke(1, document.length, System.currentTimeMillis()); } } }