Skip to content

Commit

Permalink
[INLONG-11364][Audit] Add a metric monitoring system for the Audit Se…
Browse files Browse the repository at this point in the history
…rvice itself (#11381)

* [INLONG-11364][Audit] Adjust Audit Service package position

* [INLONG-11364][Audit] Add a metric monitoring system for the Audit Service itself

* [INLONG-11364][Audit] Resolving code conflicts
  • Loading branch information
doleyzi authored Oct 21, 2024
1 parent cf1f787 commit 754ba80
Show file tree
Hide file tree
Showing 54 changed files with 671 additions and 327 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
*/
public class ConfigConstants {

public static final String AUDIT_PROXY_SERVER_NAME = "audit-proxy";
public static final String KEY_PROMETHEUS_PORT = "audit.proxy.prometheus.port";
public static final int DEFAULT_PROMETHEUS_PORT = 10082;
public static final String KEY_PROXY_METRIC_CLASSNAME = "audit.proxy.metric.classname";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROMETHEUS_PORT;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROXY_METRIC_CLASSNAME;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_PROMETHEUS_PORT;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_PROXY_METRIC_CLASSNAME;

public class MetricsManager {
Expand All @@ -44,16 +42,15 @@ private static class Holder {

private AbstractMetric metric;

public void init(String metricName) {
public void init() {
try {
ConfigManager configManager = ConfigManager.getInstance();
String metricClassName = configManager.getValue(KEY_PROXY_METRIC_CLASSNAME, DEFAULT_PROXY_METRIC_CLASSNAME);
LOGGER.info("Metric class name: {}", metricClassName);
Constructor<?> constructor = Class.forName(metricClassName)
.getDeclaredConstructor(String.class, MetricItem.class, int.class);
.getDeclaredConstructor(MetricItem.class);
constructor.setAccessible(true);
metric = (AbstractMetric) constructor.newInstance(metricName, metricItem,
configManager.getValue(KEY_PROMETHEUS_PORT, DEFAULT_PROMETHEUS_PORT));
metric = (AbstractMetric) constructor.newInstance(metricItem);

timer.scheduleWithFixedDelay(() -> {
metric.report();
Expand Down Expand Up @@ -89,9 +86,11 @@ public void addReceiveSuccess(long count, long pack, long size) {
public void addSendSuccess(long count) {
metricItem.getSendCountSuccess().addAndGet(count);
}

public void addSendFailed(long count) {
metricItem.getSendCountFailed().addAndGet(count);
}

public void shutdown() {
timer.shutdown();
metric.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.audit.metric.prometheus;

import org.apache.inlong.audit.file.ConfigManager;
import org.apache.inlong.audit.metric.AbstractMetric;
import org.apache.inlong.audit.metric.MetricDimension;
import org.apache.inlong.audit.metric.MetricItem;
Expand All @@ -31,23 +32,25 @@
import java.util.Collections;
import java.util.List;

import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROMETHEUS_PORT;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_PROMETHEUS_PORT;

/**
* PrometheusMetric
*/
public class ProxyPrometheusMetric extends Collector implements AbstractMetric {

private static final Logger LOGGER = LoggerFactory.getLogger(ProxyPrometheusMetric.class);
private static final String HELP_DESCRIPTION = "help";
private static final String HELP_DESCRIPTION = "Audit Proxy metrics help description";
private static final String AUDIT_PROXY_SERVER_NAME = "audit-proxy";

private final MetricItem metricItem;
private final String metricName;
private HTTPServer server;

public ProxyPrometheusMetric(String metricName, MetricItem metricItem, int prometheusPort) {
this.metricName = metricName;
public ProxyPrometheusMetric(MetricItem metricItem) {
this.metricItem = metricItem;
try {
server = new HTTPServer(prometheusPort);
server = new HTTPServer(ConfigManager.getInstance().getValue(KEY_PROMETHEUS_PORT, DEFAULT_PROMETHEUS_PORT));
this.register();
} catch (IOException e) {
LOGGER.error("Construct proxy prometheus metric has IOException", e);
Expand All @@ -66,23 +69,30 @@ public List<MetricFamilySamples> collect() {
createSample(MetricDimension.SEND_COUNT_FAILED, metricItem.getSendCountFailed().doubleValue()));

MetricFamilySamples metricFamilySamples =
new MetricFamilySamples(metricName, Type.GAUGE, HELP_DESCRIPTION, samples);
new MetricFamilySamples(AUDIT_PROXY_SERVER_NAME, Type.GAUGE, HELP_DESCRIPTION, samples);

return Collections.singletonList(metricFamilySamples);
}

private MetricFamilySamples.Sample createSample(MetricDimension key, double value) {
return new MetricFamilySamples.Sample(metricName, Collections.singletonList(MetricItem.K_DIMENSION_KEY),
return new MetricFamilySamples.Sample(AUDIT_PROXY_SERVER_NAME,
Collections.singletonList(MetricItem.K_DIMENSION_KEY),
Collections.singletonList(key.getKey()), value);
}

@Override
public void report() {
LOGGER.info("Report proxy prometheus metric: {} ", metricItem.toString());
if (metricItem != null) {
LOGGER.info("Report proxy Prometheus metric: {}", metricItem);
} else {
LOGGER.warn("MetricItem is null, nothing to report.");
}
}

@Override
public void stop() {
server.close();
if (server != null) {
server.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.inlong.audit.config.ConfigConstants.AUDIT_PROXY_SERVER_NAME;

/**
* Application
*/
Expand Down Expand Up @@ -351,7 +349,7 @@ public void run() {
}
});

MetricsManager.getInstance().init(AUDIT_PROXY_SERVER_NAME);
MetricsManager.getInstance().init();

} catch (Exception e) {
logger.error("A fatal error occurred while running. Exception follows.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
* limitations under the License.
*/

package org.apache.inlong.audit.cache;
package org.apache.inlong.audit.service.cache;

import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.AuditCycle;
import org.apache.inlong.audit.entities.CacheKeyEntity;
import org.apache.inlong.audit.entities.StatData;
import org.apache.inlong.audit.utils.CacheUtils;
import org.apache.inlong.audit.service.config.ConfigConstants;
import org.apache.inlong.audit.service.config.Configuration;
import org.apache.inlong.audit.service.entities.AuditCycle;
import org.apache.inlong.audit.service.entities.CacheKeyEntity;
import org.apache.inlong.audit.service.entities.StatData;
import org.apache.inlong.audit.service.metric.MetricsManager;
import org.apache.inlong.audit.service.utils.CacheUtils;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
Expand All @@ -36,7 +38,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.audit.config.ConfigConstants.DATE_FORMAT;
import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_HOURS;
import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE;
Expand All @@ -57,7 +58,7 @@ public class AbstractCache {
// According to the startTime and endTime of the request parameters, the maximum number of cache keys generated.
private static final int MAX_CACHE_KEY_SIZE = 1440;

private final DateTimeFormatter FORMATTER_YYMMDDHHMMSS = DateTimeFormatter.ofPattern(DATE_FORMAT);
private final DateTimeFormatter FORMATTER_YYMMDDHHMMSS = DateTimeFormatter.ofPattern(ConfigConstants.DATE_FORMAT);

protected AbstractCache(AuditCycle auditCycle) {
cache = Caffeine.newBuilder()
Expand Down Expand Up @@ -108,10 +109,16 @@ public List<StatData> getData(String startTime, String endTime, String inlongGro
if (null != statData) {
result.add(statData);
} else {
long currentTimeMillis = System.currentTimeMillis();

statData = fetchDataFromAuditStorage(cacheKey.getStartTime(), cacheKey.getEndTime(), inlongGroupId,
inlongStreamId,
auditId, auditTag);
result.add(statData);

MetricsManager.getInstance().addApiMetricNoCache(auditCycle,
System.currentTimeMillis() - currentTimeMillis);

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
* limitations under the License.
*/

package org.apache.inlong.audit.cache;
package org.apache.inlong.audit.service.cache;

import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entity.AuditProxy;
import org.apache.inlong.audit.service.config.Configuration;
import org.apache.inlong.audit.service.config.ProxyConstants;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,14 +32,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.apache.inlong.audit.config.ProxyConstants.DEFAULT_AUDIT_PROXY_ADDRESS_AGENT;
import static org.apache.inlong.audit.config.ProxyConstants.DEFAULT_AUDIT_PROXY_ADDRESS_DATAPROXY;
import static org.apache.inlong.audit.config.ProxyConstants.DEFAULT_AUDIT_PROXY_ADDRESS_SORT;
import static org.apache.inlong.audit.config.ProxyConstants.IP_PORT_SEPARATOR;
import static org.apache.inlong.audit.config.ProxyConstants.KEY_AUDIT_PROXY_ADDRESS_AGENT;
import static org.apache.inlong.audit.config.ProxyConstants.KEY_AUDIT_PROXY_ADDRESS_DATAPROXY;
import static org.apache.inlong.audit.config.ProxyConstants.KEY_AUDIT_PROXY_ADDRESS_SORT;
import static org.apache.inlong.audit.config.ProxyConstants.PROXY_SEPARATOR;
import static org.apache.inlong.audit.entity.AuditComponent.AGENT;
import static org.apache.inlong.audit.entity.AuditComponent.DATAPROXY;
import static org.apache.inlong.audit.entity.AuditComponent.SORT;
Expand Down Expand Up @@ -77,17 +70,20 @@ private Map<String, String> getProxyConfigs() {
Configuration config = Configuration.getInstance();
Map<String, String> proxyConfigs = new HashMap<>();
proxyConfigs.put(AGENT.getComponent(),
config.get(KEY_AUDIT_PROXY_ADDRESS_AGENT, DEFAULT_AUDIT_PROXY_ADDRESS_AGENT));
config.get(ProxyConstants.KEY_AUDIT_PROXY_ADDRESS_AGENT,
ProxyConstants.DEFAULT_AUDIT_PROXY_ADDRESS_AGENT));
proxyConfigs.put(DATAPROXY.getComponent(),
config.get(KEY_AUDIT_PROXY_ADDRESS_DATAPROXY, DEFAULT_AUDIT_PROXY_ADDRESS_DATAPROXY));
config.get(ProxyConstants.KEY_AUDIT_PROXY_ADDRESS_DATAPROXY,
ProxyConstants.DEFAULT_AUDIT_PROXY_ADDRESS_DATAPROXY));
proxyConfigs.put(SORT.getComponent(),
config.get(KEY_AUDIT_PROXY_ADDRESS_SORT, DEFAULT_AUDIT_PROXY_ADDRESS_SORT));
config.get(ProxyConstants.KEY_AUDIT_PROXY_ADDRESS_SORT,
ProxyConstants.DEFAULT_AUDIT_PROXY_ADDRESS_SORT));
return proxyConfigs;
}

private List<AuditProxy> createAuditProxySet(String proxyList) {
return Arrays.stream(proxyList.split(PROXY_SEPARATOR))
.map(element -> element.split(IP_PORT_SEPARATOR))
return Arrays.stream(proxyList.split(ProxyConstants.PROXY_SEPARATOR))
.map(element -> element.split(ProxyConstants.IP_PORT_SEPARATOR))
.filter(ipPort -> ipPort.length == 2)
.map(ipPort -> new AuditProxy(ipPort[0], Integer.parseInt(ipPort[1])))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
* limitations under the License.
*/

package org.apache.inlong.audit.cache;
package org.apache.inlong.audit.service.cache;

import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.JdbcConfig;
import org.apache.inlong.audit.entities.StatData;
import org.apache.inlong.audit.utils.JdbcUtils;
import org.apache.inlong.audit.service.config.ConfigConstants;
import org.apache.inlong.audit.service.config.Configuration;
import org.apache.inlong.audit.service.entities.JdbcConfig;
import org.apache.inlong.audit.service.entities.StatData;
import org.apache.inlong.audit.service.utils.JdbcUtils;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
Expand All @@ -36,21 +37,8 @@
import java.util.LinkedList;
import java.util.List;

import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL;
import static org.apache.inlong.audit.config.SqlConstants.KEY_MYSQL_SOURCE_QUERY_DAY_SQL;
import static org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL;
import static org.apache.inlong.audit.service.config.SqlConstants.KEY_MYSQL_SOURCE_QUERY_DAY_SQL;

/**
* Cache Of day ,for day openapi
Expand Down Expand Up @@ -139,17 +127,20 @@ private void createDataSource() {
config.setJdbcUrl(jdbcConfig.getJdbcUrl());
config.setUsername(jdbcConfig.getUserName());
config.setPassword(jdbcConfig.getPassword());
config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
DEFAULT_CONNECTION_TIMEOUT));
config.addDataSourceProperty(CACHE_PREP_STMTS,
Configuration.getInstance().get(KEY_CACHE_PREP_STMTS, DEFAULT_CACHE_PREP_STMTS));
config.addDataSourceProperty(PREP_STMT_CACHE_SIZE,
Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SIZE, DEFAULT_PREP_STMT_CACHE_SIZE));
config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT,
Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SQL_LIMIT, DEFAULT_PREP_STMT_CACHE_SQL_LIMIT));
config.setConnectionTimeout(Configuration.getInstance().get(ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT,
ConfigConstants.DEFAULT_CONNECTION_TIMEOUT));
config.addDataSourceProperty(ConfigConstants.CACHE_PREP_STMTS,
Configuration.getInstance().get(ConfigConstants.KEY_CACHE_PREP_STMTS,
ConfigConstants.DEFAULT_CACHE_PREP_STMTS));
config.addDataSourceProperty(ConfigConstants.PREP_STMT_CACHE_SIZE,
Configuration.getInstance().get(ConfigConstants.KEY_PREP_STMT_CACHE_SIZE,
ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE));
config.addDataSourceProperty(ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT,
Configuration.getInstance().get(ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT,
ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT));
config.setMaximumPoolSize(
Configuration.getInstance().get(KEY_DATASOURCE_POOL_SIZE,
DEFAULT_DATASOURCE_POOL_SIZE));
Configuration.getInstance().get(ConfigConstants.KEY_DATASOURCE_POOL_SIZE,
ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE));
dataSource = new HikariDataSource(config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
* limitations under the License.
*/

package org.apache.inlong.audit.cache;
package org.apache.inlong.audit.service.cache;

import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.AuditCycle;
import org.apache.inlong.audit.service.config.Configuration;
import org.apache.inlong.audit.service.entities.AuditCycle;

/**
* Cache Of minute 30 ,for minute 30 openapi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
* limitations under the License.
*/

package org.apache.inlong.audit.cache;
package org.apache.inlong.audit.service.cache;

import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.AuditCycle;
import org.apache.inlong.audit.service.config.Configuration;
import org.apache.inlong.audit.service.entities.AuditCycle;

/**
* Cache Of hour ,for hour openapi
Expand Down
Loading

0 comments on commit 754ba80

Please sign in to comment.