Skip to content

Commit

Permalink
[INLONG-11406][Audit] Provides an interface for asynchronously flushi…
Browse files Browse the repository at this point in the history
…ng Audit data (#11409)
  • Loading branch information
doleyzi authored Oct 25, 2024
1 parent 0ee2e58 commit 47f33c3
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.utils;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class NamedThreadFactory implements ThreadFactory {

private final String baseName;
private final AtomicInteger counter = new AtomicInteger(0);

public NamedThreadFactory(String baseName) {
this.baseName = baseName;
}

@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, baseName + "-Thread-" + counter.getAndIncrement());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.inlong.audit.util.Config;
import org.apache.inlong.audit.util.RequestIdUtils;
import org.apache.inlong.audit.util.StatInfo;
import org.apache.inlong.audit.utils.NamedThreadFactory;

import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -76,7 +77,8 @@ public class AuditReporterImpl implements Serializable {
private final ConcurrentHashMap<Long, HashSet<String>> expiredKeyList = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, Long> flushTime = new ConcurrentHashMap<>();
private final Config config = new Config();
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService timerExecutor =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("inlong-audit-flush"));
private int packageId = 1;
private int dataId = 0;
private volatile boolean initialized = false;
Expand All @@ -86,6 +88,7 @@ public class AuditReporterImpl implements Serializable {
private SocketAddressListLoader loader = null;
private int flushStatThreshold = 100;
private boolean autoFlush = true;
private boolean enableDebug = false;

private AuditMetric auditMetric = new AuditMetric();

Expand All @@ -108,6 +111,14 @@ public void setAutoFlush(boolean autoFlush) {
this.autoFlush = autoFlush;
}

/**
* Debug mode supports printing audit details in the log
* @param enableDebug
*/
public void setEnableDebug(boolean enableDebug) {
this.enableDebug = enableDebug;
}

/**
* Init
*/
Expand All @@ -116,7 +127,7 @@ private void init() {
return;
}
config.init();
timeoutExecutor.scheduleWithFixedDelay(new Runnable() {
timerExecutor.scheduleWithFixedDelay(new Runnable() {

@Override
public void run() {
Expand Down Expand Up @@ -301,6 +312,22 @@ private void addByKey(long isolateKey, String statKey, long count, long size, lo
stat.delay.addAndGet(delayTime);
}

/**
* Asynchronously flush audit data
* @param isolateKey
*/
public synchronized void asyncFlush(long isolateKey) {
LOGGER.info("Async flush audit by isolate key: {} ", isolateKey);
Runnable task = () -> {
try {
flush(isolateKey);
} catch (Exception e) {
LOGGER.error("Async flush audit by isolate key: {}, has exception: ", isolateKey, e);
}
};
timerExecutor.schedule(task, 0, TimeUnit.MILLISECONDS);
}

/**
* Flush audit data by default audit version
*/
Expand All @@ -314,10 +341,12 @@ public synchronized void flush() {
public synchronized void flush(long isolateKey) {
if (flushTime.putIfAbsent(isolateKey, System.currentTimeMillis()) != null
|| flushStat.addAndGet(1) > flushStatThreshold) {
LOGGER.info("Skip audit flush isolate key: {}, last flush time: {}, count: {}", isolateKey,
flushTime.get(isolateKey), flushStat.get());
return;
}
long startTime = System.currentTimeMillis();
LOGGER.info("Audit flush isolate key {} ", isolateKey);
LOGGER.info("Audit flush isolate key: {} ", isolateKey);

try {
manager.checkFailedData();
Expand Down Expand Up @@ -444,7 +473,7 @@ private void clearExpiredKey(long isolateKey) {
while (iterator.hasNext()) {
Map.Entry<Long, HashSet<String>> entry = iterator.next();
if (entry.getValue().isEmpty()) {
LOGGER.info("Remove the key of expired key list: {},isolate key: {}", entry.getKey(), isolateKey);
LOGGER.info("Remove the key of expired key list: {}, isolate key: {}", entry.getKey(), isolateKey);
iterator.remove();
continue;
}
Expand Down Expand Up @@ -528,17 +557,20 @@ private void send(long isolateKey) {
if (dataId++ >= BATCH_NUM) {
dataId = 0;
packageId++;
sendData(requestBuild);
sendData(requestBuild, isolateKey);
}
}

if (requestBuild.getMsgBodyCount() > 0) {
sendData(requestBuild);
sendData(requestBuild, isolateKey);
}
summaryStatMap.get(isolateKey).clear();
}

private void sendData(AuditApi.AuditRequest.Builder requestBuild) {
private void sendData(AuditApi.AuditRequest.Builder requestBuild, long isolateKey) {
if (enableDebug) {
LOGGER.info("Send audit data by isolate key: {}, data: {}", isolateKey, requestBuild);
}
requestBuild.setRequestId(RequestIdUtils.nextRequestId());
sendByBaseCommand(requestBuild.build());
auditMetric.addTotalMsg(requestBuild.getMsgBodyCount());
Expand All @@ -554,6 +586,7 @@ private void checkFlushTime() {
flushTime.forEach((key, value) -> {
if ((currentTime - value) > PERIOD) {
flushTime.remove(key);
LOGGER.info("Remove audit flush limitation. isolate key: {}, flush time: {}", key, value);
}
});
}
Expand Down Expand Up @@ -649,4 +682,9 @@ public void setUpdateInterval(int updateInterval) {
public void setMaxGlobalAuditMemory(long maxGlobalAuditMemory) {
SenderManager.setMaxGlobalAuditMemory(maxGlobalAuditMemory);
}

public void shutdown() {
ProxyManager.getInstance().shutdown();
timerExecutor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.inlong.audit.entity.AuditProxy;
import org.apache.inlong.audit.entity.CommonResponse;
import org.apache.inlong.audit.utils.HttpUtils;
import org.apache.inlong.audit.utils.NamedThreadFactory;
import org.apache.inlong.audit.utils.ThreadUtils;

import org.slf4j.Logger;
Expand All @@ -41,7 +42,8 @@ public class ProxyManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ProxyManager.class);
private static final ProxyManager instance = new ProxyManager();
private final List<String> currentIpPorts = new CopyOnWriteArrayList<>();
private final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService timer =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("inlong-audit-proxy-manager"));
private final static String GET_AUDIT_PROXY_API_PATH = "/inlong/manager/openapi/audit/getAuditProxy";
private int timeoutMs = 10000;
private int updateInterval = 60000;
Expand Down Expand Up @@ -167,4 +169,7 @@ public InetSocketAddress getInetSocketAddress() {
}
return new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1]));
}
public void shutdown() {
timer.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ public boolean checkSocket() {
try {
InetSocketAddress inetSocketAddress = ProxyManager.getInstance().getInetSocketAddress();
if (inetSocketAddress == null) {
LOGGER.error("Audit inet socket address is null!");
LOGGER.error("Audit proxy address is null!");
return false;
}
reconnect(inetSocketAddress, auditConfig.getSocketTimeout());
} catch (IOException exception) {
LOGGER.error("Connect to {} has exception!", socket.getInetAddress(), exception);
LOGGER.error("Connect to audit proxy {} has exception!", socket.getInetAddress(), exception);
return false;
}
}
Expand Down

0 comments on commit 47f33c3

Please sign in to comment.