diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/NamedThreadFactory.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/NamedThreadFactory.java new file mode 100644 index 00000000000..cd751ec03c4 --- /dev/null +++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/NamedThreadFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.utils; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class NamedThreadFactory implements ThreadFactory { + + private final String baseName; + private final AtomicInteger counter = new AtomicInteger(0); + + public NamedThreadFactory(String baseName) { + this.baseName = baseName; + } + + @Override + public Thread newThread(Runnable runnable) { + return new Thread(runnable, baseName + "-Thread-" + counter.getAndIncrement()); + } +} diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java index 7d3f1c5755d..bd2c5aae476 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java @@ -32,6 +32,7 @@ import org.apache.inlong.audit.util.Config; import org.apache.inlong.audit.util.RequestIdUtils; import org.apache.inlong.audit.util.StatInfo; +import org.apache.inlong.audit.utils.NamedThreadFactory; import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; @@ -76,7 +77,8 @@ public class AuditReporterImpl implements Serializable { private final ConcurrentHashMap> expiredKeyList = new ConcurrentHashMap<>(); private final ConcurrentHashMap flushTime = new ConcurrentHashMap<>(); private final Config config = new Config(); - private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService timerExecutor = + Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("inlong-audit-flush")); private int packageId = 1; private int dataId = 0; private volatile boolean initialized = false; @@ -86,6 +88,7 @@ public class AuditReporterImpl implements Serializable { private SocketAddressListLoader loader = null; private int flushStatThreshold = 100; private boolean autoFlush = true; + private boolean enableDebug = false; private AuditMetric auditMetric = new AuditMetric(); @@ -108,6 +111,14 @@ public void setAutoFlush(boolean autoFlush) { this.autoFlush = autoFlush; } + /** + * Debug mode supports printing audit details in the log + * @param enableDebug + */ + public void setEnableDebug(boolean enableDebug) { + this.enableDebug = enableDebug; + } + /** * Init */ @@ -116,7 +127,7 @@ private void init() { return; } config.init(); - timeoutExecutor.scheduleWithFixedDelay(new Runnable() { + timerExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { @@ -301,6 +312,22 @@ private void addByKey(long isolateKey, String statKey, long count, long size, lo stat.delay.addAndGet(delayTime); } + /** + * Asynchronously flush audit data + * @param isolateKey + */ + public synchronized void asyncFlush(long isolateKey) { + LOGGER.info("Async flush audit by isolate key: {} ", isolateKey); + Runnable task = () -> { + try { + flush(isolateKey); + } catch (Exception e) { + LOGGER.error("Async flush audit by isolate key: {}, has exception: ", isolateKey, e); + } + }; + timerExecutor.schedule(task, 0, TimeUnit.MILLISECONDS); + } + /** * Flush audit data by default audit version */ @@ -314,10 +341,12 @@ public synchronized void flush() { public synchronized void flush(long isolateKey) { if (flushTime.putIfAbsent(isolateKey, System.currentTimeMillis()) != null || flushStat.addAndGet(1) > flushStatThreshold) { + LOGGER.info("Skip audit flush isolate key: {}, last flush time: {}, count: {}", isolateKey, + flushTime.get(isolateKey), flushStat.get()); return; } long startTime = System.currentTimeMillis(); - LOGGER.info("Audit flush isolate key {} ", isolateKey); + LOGGER.info("Audit flush isolate key: {} ", isolateKey); try { manager.checkFailedData(); @@ -444,7 +473,7 @@ private void clearExpiredKey(long isolateKey) { while (iterator.hasNext()) { Map.Entry> entry = iterator.next(); if (entry.getValue().isEmpty()) { - LOGGER.info("Remove the key of expired key list: {},isolate key: {}", entry.getKey(), isolateKey); + LOGGER.info("Remove the key of expired key list: {}, isolate key: {}", entry.getKey(), isolateKey); iterator.remove(); continue; } @@ -528,17 +557,20 @@ private void send(long isolateKey) { if (dataId++ >= BATCH_NUM) { dataId = 0; packageId++; - sendData(requestBuild); + sendData(requestBuild, isolateKey); } } if (requestBuild.getMsgBodyCount() > 0) { - sendData(requestBuild); + sendData(requestBuild, isolateKey); } summaryStatMap.get(isolateKey).clear(); } - private void sendData(AuditApi.AuditRequest.Builder requestBuild) { + private void sendData(AuditApi.AuditRequest.Builder requestBuild, long isolateKey) { + if (enableDebug) { + LOGGER.info("Send audit data by isolate key: {}, data: {}", isolateKey, requestBuild); + } requestBuild.setRequestId(RequestIdUtils.nextRequestId()); sendByBaseCommand(requestBuild.build()); auditMetric.addTotalMsg(requestBuild.getMsgBodyCount()); @@ -554,6 +586,7 @@ private void checkFlushTime() { flushTime.forEach((key, value) -> { if ((currentTime - value) > PERIOD) { flushTime.remove(key); + LOGGER.info("Remove audit flush limitation. isolate key: {}, flush time: {}", key, value); } }); } @@ -649,4 +682,9 @@ public void setUpdateInterval(int updateInterval) { public void setMaxGlobalAuditMemory(long maxGlobalAuditMemory) { SenderManager.setMaxGlobalAuditMemory(maxGlobalAuditMemory); } + + public void shutdown() { + ProxyManager.getInstance().shutdown(); + timerExecutor.shutdown(); + } } diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java index 127609d2ab9..b87c563da30 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java @@ -21,6 +21,7 @@ import org.apache.inlong.audit.entity.AuditProxy; import org.apache.inlong.audit.entity.CommonResponse; import org.apache.inlong.audit.utils.HttpUtils; +import org.apache.inlong.audit.utils.NamedThreadFactory; import org.apache.inlong.audit.utils.ThreadUtils; import org.slf4j.Logger; @@ -41,7 +42,8 @@ public class ProxyManager { private static final Logger LOGGER = LoggerFactory.getLogger(ProxyManager.class); private static final ProxyManager instance = new ProxyManager(); private final List currentIpPorts = new CopyOnWriteArrayList<>(); - private final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService timer = + Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("inlong-audit-proxy-manager")); private final static String GET_AUDIT_PROXY_API_PATH = "/inlong/manager/openapi/audit/getAuditProxy"; private int timeoutMs = 10000; private int updateInterval = 60000; @@ -167,4 +169,7 @@ public InetSocketAddress getInetSocketAddress() { } return new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1])); } + public void shutdown() { + timer.shutdown(); + } } diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java index f941cbae6de..b795b7aa981 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java @@ -80,12 +80,12 @@ public boolean checkSocket() { try { InetSocketAddress inetSocketAddress = ProxyManager.getInstance().getInetSocketAddress(); if (inetSocketAddress == null) { - LOGGER.error("Audit inet socket address is null!"); + LOGGER.error("Audit proxy address is null!"); return false; } reconnect(inetSocketAddress, auditConfig.getSocketTimeout()); } catch (IOException exception) { - LOGGER.error("Connect to {} has exception!", socket.getInetAddress(), exception); + LOGGER.error("Connect to audit proxy {} has exception!", socket.getInetAddress(), exception); return false; } }