Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterZh6 committed Oct 9, 2024
1 parent 14463d1 commit b573aad
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.inlong.sort.base.metric.sub;

import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.apache.inlong.sort.base.metric.SinkExactlyMetric;

import java.util.Map;

Expand All @@ -31,5 +31,5 @@ public interface SinkSubMetricData {
*
* @return The sub sink metric map
*/
Map<String, SinkMetricData> getSubSinkMetricMap();
Map<String, SinkExactlyMetric> getSubSinkMetricMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.apache.inlong.sort.base.metric.SinkExactlyMetric;

import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -44,14 +44,14 @@
/**
* A collection class for handling sub metrics of table schema type
*/
public class SinkTableMetricData extends SinkMetricData implements SinkSubMetricData {
public class SinkTableMetricData extends SinkExactlyMetric implements SinkSubMetricData {

public static final Logger LOGGER = LoggerFactory.getLogger(SinkTableMetricData.class);

/**
* The sub sink metric data container of sink metric data
*/
private final Map<String, SinkMetricData> subSinkMetricMap = Maps.newHashMap();
private final Map<String, SinkExactlyMetric> subSinkMetricMap = Maps.newHashMap();

public SinkTableMetricData(MetricOption option, MetricGroup metricGroup) {
super(option, metricGroup);
Expand All @@ -75,7 +75,7 @@ public void registerSubMetricsGroup(MetricState metricState) {
for (Entry<String, MetricState> subMetricStateEntry : subMetricStateMap.entrySet()) {
String[] schemaInfoArray = parseSchemaIdentify(subMetricStateEntry.getKey());
final MetricState subMetricState = subMetricStateEntry.getValue();
SinkMetricData subSinkMetricData = buildSubSinkMetricData(schemaInfoArray, subMetricState, this);
SinkExactlyMetric subSinkMetricData = buildSubSinkMetricData(schemaInfoArray, subMetricState, this);
subSinkMetricMap.put(subMetricStateEntry.getKey(), subSinkMetricData);
}
LOGGER.info("register subMetricsGroup from metricState,sub metric map size:{}", subSinkMetricMap.size());
Expand All @@ -85,28 +85,28 @@ public void registerSubMetricsGroup(MetricState metricState) {
* build sub sink metric data
*
* @param schemaInfoArray sink record schema info
* @param sinkMetricData sink metric data
* @param sinkExactlyMetric sink metric data
* @return sub sink metric data
*/
private SinkMetricData buildSubSinkMetricData(String[] schemaInfoArray, SinkMetricData sinkMetricData) {
return buildSubSinkMetricData(schemaInfoArray, null, sinkMetricData);
private SinkExactlyMetric buildSubSinkMetricData(String[] schemaInfoArray, SinkExactlyMetric sinkExactlyMetric) {
return buildSubSinkMetricData(schemaInfoArray, null, sinkExactlyMetric);
}

/**
* build sub sink metric data
*
* @param schemaInfoArray the schema info array of record
* @param subMetricState sub metric state
* @param sinkMetricData sink metric data
* @param sinkExactlyMetric sink metric data
* @return sub sink metric data
*/
private SinkMetricData buildSubSinkMetricData(String[] schemaInfoArray, MetricState subMetricState,
SinkMetricData sinkMetricData) {
if (sinkMetricData == null || schemaInfoArray == null) {
private SinkExactlyMetric buildSubSinkMetricData(String[] schemaInfoArray, MetricState subMetricState,
SinkExactlyMetric sinkExactlyMetric) {
if (sinkExactlyMetric == null || schemaInfoArray == null) {
return null;
}
// build sub sink metric data
Map<String, String> labels = sinkMetricData.getLabels();
Map<String, String> labels = sinkExactlyMetric.getLabels();
String metricGroupLabels = labels.entrySet().stream().map(entry -> entry.getKey() + "=" + entry.getValue())
.collect(Collectors.joining(DELIMITER));
StringBuilder labelBuilder = new StringBuilder(metricGroupLabels);
Expand All @@ -125,7 +125,7 @@ private SinkMetricData buildSubSinkMetricData(String[] schemaInfoArray, MetricSt
.withInitDirtyRecords(subMetricState != null ? subMetricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
.withInitDirtyBytes(subMetricState != null ? subMetricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
.withInlongLabels(labelBuilder.toString()).withRegisterMetric(RegisteredMetric.ALL).build();
return new SinkTableMetricData(metricOption, sinkMetricData.getMetricGroup());
return new SinkTableMetricData(metricOption, sinkExactlyMetric.getMetricGroup());
}

/**
Expand Down Expand Up @@ -182,20 +182,20 @@ public void outputMetricsWithEstimate(String database, String schema, String tab
*/
public void outputMetrics(String database, String schema, String table, long rowCount, long rowSize) {
if (StringUtils.isBlank(database) || StringUtils.isBlank(table)) {
invoke(rowCount, rowSize);
invoke(rowCount, rowSize, System.currentTimeMillis());
return;
}
String identify = buildSchemaIdentify(database, schema, table);
SinkMetricData subSinkMetricData;
SinkExactlyMetric subSinkMetricData;
if (subSinkMetricMap.containsKey(identify)) {
subSinkMetricData = subSinkMetricMap.get(identify);
} else {
subSinkMetricData = buildSubSinkMetricData(new String[]{database, schema, table}, this);
subSinkMetricMap.put(identify, subSinkMetricData);
}
// sink metric and sub sink metric output metrics
this.invoke(rowCount, rowSize);
subSinkMetricData.invoke(rowCount, rowSize);
this.invoke(rowCount, rowSize, System.currentTimeMillis());
subSinkMetricData.invoke(rowCount, rowSize, System.currentTimeMillis());
}

/**
Expand All @@ -209,20 +209,20 @@ public void outputMetrics(String database, String schema, String table, long row
public void outputMetrics(String database, String table, long rowCount,
long rowSize) {
if (StringUtils.isBlank(database) || StringUtils.isBlank(table)) {
invoke(rowCount, rowSize);
invoke(rowCount, rowSize, System.currentTimeMillis());
return;
}
String identify = buildSchemaIdentify(database, null, table);
SinkMetricData subSinkMetricData;
SinkExactlyMetric subSinkMetricData;
if (subSinkMetricMap.containsKey(identify)) {
subSinkMetricData = subSinkMetricMap.get(identify);
} else {
subSinkMetricData = buildSubSinkMetricData(new String[]{database, table}, this);
subSinkMetricMap.put(identify, subSinkMetricData);
}
// sink metric and sub sink metric output metrics
this.invoke(rowCount, rowSize);
subSinkMetricData.invoke(rowCount, rowSize);
this.invoke(rowCount, rowSize, System.currentTimeMillis());
subSinkMetricData.invoke(rowCount, rowSize, System.currentTimeMillis());
}

/**
Expand All @@ -234,19 +234,19 @@ public void outputMetrics(String database, String table, long rowCount,
*/
public void outputMetrics(String index, long rowCount, long rowSize) {
if (StringUtils.isBlank(index)) {
invoke(rowCount, rowSize);
invoke(rowCount, rowSize, System.currentTimeMillis());
return;
}
SinkMetricData subSinkMetricData;
SinkExactlyMetric subSinkMetricData;
if (subSinkMetricMap.containsKey(index)) {
subSinkMetricData = subSinkMetricMap.get(index);
} else {
subSinkMetricData = buildSubSinkMetricData(new String[]{index}, this);
subSinkMetricMap.put(index, subSinkMetricData);
}
// sink metric and sub sink metric output metrics
this.invoke(rowCount, rowSize);
subSinkMetricData.invoke(rowCount, rowSize);
this.invoke(rowCount, rowSize, System.currentTimeMillis());
subSinkMetricData.invoke(rowCount, rowSize, System.currentTimeMillis());
}

/**
Expand All @@ -264,7 +264,7 @@ public void outputDirtyMetricsWithEstimate(String database, String table, long r
return;
}
String identify = buildSchemaIdentify(database, null, table);
SinkMetricData subSinkMetricData;
SinkExactlyMetric subSinkMetricData;
if (subSinkMetricMap.containsKey(identify)) {
subSinkMetricData = subSinkMetricMap.get(identify);
} else {
Expand Down Expand Up @@ -292,7 +292,7 @@ public void outputDirtyMetricsWithEstimate(String database, String schema, Strin
return;
}
String identify = buildSchemaIdentify(database, schema, table);
SinkMetricData subSinkMetricData;
SinkExactlyMetric subSinkMetricData;
if (subSinkMetricMap.containsKey(identify)) {
subSinkMetricData = subSinkMetricMap.get(identify);
} else {
Expand All @@ -306,7 +306,7 @@ public void outputDirtyMetricsWithEstimate(String database, String schema, Strin

public void outputMetricsWithEstimate(Object data) {
long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
invoke(1, size);
invoke(1, size, System.currentTimeMillis());
}

/**
Expand All @@ -324,7 +324,7 @@ public void outputDirtyMetrics(String database, String schema, String table, lon
return;
}
String identify = buildSchemaIdentify(database, schema, table);
SinkMetricData subSinkMetricData;
SinkExactlyMetric subSinkMetricData;
if (subSinkMetricMap.containsKey(identify)) {
subSinkMetricData = subSinkMetricMap.get(identify);
} else {
Expand All @@ -350,7 +350,7 @@ public void outputDirtyMetrics(String database, String table, long rowCount, lon
return;
}
String identify = buildSchemaIdentify(database, null, table);
SinkMetricData subSinkMetricData;
SinkExactlyMetric subSinkMetricData;
if (subSinkMetricMap.containsKey(identify)) {
subSinkMetricData = subSinkMetricMap.get(identify);
} else {
Expand All @@ -374,7 +374,7 @@ public void outputDirtyMetrics(String index, long rowCount, long rowSize) {
invokeDirty(rowCount, rowSize);
return;
}
SinkMetricData subSinkMetricData;
SinkExactlyMetric subSinkMetricData;
if (subSinkMetricMap.containsKey(index)) {
subSinkMetricData = subSinkMetricMap.get(index);
} else {
Expand Down Expand Up @@ -415,7 +415,7 @@ public void outputDirtyMetricsWithEstimate(Object data) {
}

@Override
public Map<String, SinkMetricData> getSubSinkMetricMap() {
public Map<String, SinkExactlyMetric> getSubSinkMetricMap() {
return this.subSinkMetricMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.apache.inlong.sort.base.metric.SinkExactlyMetric;

import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -39,14 +39,14 @@
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;

public class SinkTopicMetricData extends SinkMetricData implements SinkSubMetricData {
public class SinkTopicMetricData extends SinkExactlyMetric implements SinkSubMetricData {

public static final Logger LOGGER = LoggerFactory.getLogger(SinkTopicMetricData.class);

/**
* The sink metric data map
*/
private final Map<String, SinkMetricData> topicSinkMetricMap = Maps.newHashMap();
private final Map<String, SinkExactlyMetric> topicSinkMetricMap = Maps.newHashMap();

public SinkTopicMetricData(MetricOption option, MetricGroup metricGroup) {
super(option, metricGroup);
Expand All @@ -70,7 +70,7 @@ public void registerSubMetricsGroup(MetricState metricState) {
for (Entry<String, MetricState> subMetricStateEntry : subMetricStateMap.entrySet()) {
String topic = subMetricStateEntry.getKey();
final MetricState subMetricState = subMetricStateEntry.getValue();
SinkMetricData subSinkMetricData = buildSinkMetricData(topic, subMetricState, this);
SinkExactlyMetric subSinkMetricData = buildSinkExactlyMetric(topic, subMetricState, this);
topicSinkMetricMap.put(topic, subSinkMetricData);
}
LOGGER.info("register topicMetricsGroup from metricState,topic level metric map size:{}",
Expand All @@ -79,39 +79,40 @@ public void registerSubMetricsGroup(MetricState metricState) {

public void sendOutMetrics(String topic, long rowCount, long rowSize) {
if (StringUtils.isBlank(topic)) {
invoke(rowCount, rowSize);
invoke(rowCount, rowSize, System.currentTimeMillis());
return;
}
SinkMetricData sinkMetricData = getSinkMetricData(topic);
SinkExactlyMetric sinkExactlyMetric = getSinkExactlyMetric(topic);

this.invoke(rowCount, rowSize);
sinkMetricData.invoke(rowCount, rowSize);
this.invoke(rowCount, rowSize, System.currentTimeMillis());
sinkExactlyMetric.invoke(rowCount, rowSize, System.currentTimeMillis());
}

public void sendDirtyMetrics(String topic, long rowCount, long rowSize) {
if (StringUtils.isBlank(topic)) {
invokeDirty(rowCount, rowSize);
return;
}
SinkMetricData sinkMetricData = getSinkMetricData(topic);
SinkExactlyMetric sinkExactlyMetric = getSinkExactlyMetric(topic);

this.invokeDirty(rowCount, rowSize);
sinkMetricData.invokeDirty(rowCount, rowSize);
sinkExactlyMetric.invokeDirty(rowCount, rowSize);
}

private SinkMetricData getSinkMetricData(String topic) {
SinkMetricData sinkMetricData;
private SinkExactlyMetric getSinkExactlyMetric(String topic) {
SinkExactlyMetric sinkExactlyMetric;
if (topicSinkMetricMap.containsKey(topic)) {
sinkMetricData = topicSinkMetricMap.get(topic);
sinkExactlyMetric = topicSinkMetricMap.get(topic);
} else {
sinkMetricData = buildSinkMetricData(topic, null, this);
topicSinkMetricMap.put(topic, sinkMetricData);
sinkExactlyMetric = buildSinkExactlyMetric(topic, null, this);
topicSinkMetricMap.put(topic, sinkExactlyMetric);
}
return sinkMetricData;
return sinkExactlyMetric;
}

private SinkMetricData buildSinkMetricData(String topic, MetricState metricState, SinkMetricData sinkMetricData) {
Map<String, String> labels = sinkMetricData.getLabels();
private SinkExactlyMetric buildSinkExactlyMetric(String topic, MetricState metricState,
SinkExactlyMetric sinkExactlyMetric) {
Map<String, String> labels = sinkExactlyMetric.getLabels();
String metricGroupLabels = labels.entrySet().stream().map(entry -> entry.getKey() + "=" + entry.getValue())
.collect(Collectors.joining(DELIMITER));

Expand All @@ -123,11 +124,11 @@ private SinkMetricData buildSinkMetricData(String topic, MetricState metricState
.withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
.withRegisterMetric(RegisteredMetric.ALL)
.build();
return new SinkMetricData(metricOption, sinkMetricData.getMetricGroup());
return new SinkExactlyMetric(metricOption, sinkExactlyMetric.getMetricGroup());
}

@Override
public Map<String, SinkMetricData> getSubSinkMetricMap() {
public Map<String, SinkExactlyMetric> getSubSinkMetricMap() {
return this.topicSinkMetricMap;
}

Expand Down
Loading

0 comments on commit b573aad

Please sign in to comment.