diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java deleted file mode 100644 index d67a15fb5ab..00000000000 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.core.task; - -import org.apache.inlong.agent.conf.AgentConfiguration; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Semaphore; - -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_WRITER_PERMIT; - -/** - * used to limit global memory to avoid oom - */ -public class MemoryManager { - - private static final Logger LOGGER = LoggerFactory.getLogger(MemoryManager.class); - private static volatile MemoryManager memoryManager = null; - private final AgentConfiguration conf; - private ConcurrentHashMap semaphoreMap = new ConcurrentHashMap<>(); - - private MemoryManager() { - this.conf = AgentConfiguration.getAgentConf(); - Semaphore semaphore = null; - semaphore = new Semaphore( - conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT)); - semaphoreMap.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, semaphore); - - semaphore = new Semaphore( - conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT)); - semaphoreMap.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, semaphore); - - semaphore = new Semaphore( - conf.getInt(AGENT_GLOBAL_CHANNEL_PERMIT, DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT)); - semaphoreMap.put(AGENT_GLOBAL_CHANNEL_PERMIT, semaphore); - - semaphore = new Semaphore( - conf.getInt(AGENT_GLOBAL_WRITER_PERMIT, DEFAULT_AGENT_GLOBAL_WRITER_PERMIT)); - semaphoreMap.put(AGENT_GLOBAL_WRITER_PERMIT, semaphore); - } - - /** - * manager singleton - */ - public static MemoryManager getInstance() { - if (memoryManager == null) { - synchronized (MemoryManager.class) { - if (memoryManager == null) { - memoryManager = new MemoryManager(); - } - } - } - return memoryManager; - } - - public boolean tryAcquire(String semaphoreName, int permit) { - Semaphore semaphore = semaphoreMap.get(semaphoreName); - if (semaphore == null) { - LOGGER.error("tryAcquire {} not exist"); - return false; - } - return semaphore.tryAcquire(permit); - } - - public void release(String semaphoreName, int permit) { - Semaphore semaphore = semaphoreMap.get(semaphoreName); - if (semaphore == null) { - LOGGER.error("release {} not exist"); - return; - } - semaphore.release(permit); - } - - public void printDetail(String semaphoreName) { - Semaphore semaphore = semaphoreMap.get(semaphoreName); - if (semaphore == null) { - LOGGER.error("printDetail {} not exist"); - return; - } - LOGGER.info("permit left {} wait {} {}", semaphore.availablePermits(), semaphore.getQueueLength(), - semaphoreName); - } - - public void printAll() { - printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT); - printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT); - printDetail(AGENT_GLOBAL_CHANNEL_PERMIT); - printDetail(AGENT_GLOBAL_WRITER_PERMIT); - } -} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java deleted file mode 100755 index c3a5bbfc079..00000000000 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.plugin.sinks; - -import org.apache.inlong.agent.common.AgentThreadFactory; -import org.apache.inlong.agent.conf.JobProfile; -import org.apache.inlong.agent.constant.CommonConstants; -import org.apache.inlong.agent.core.task.MemoryManager; -import org.apache.inlong.agent.message.BatchProxyMessage; -import org.apache.inlong.agent.message.EndMessage; -import org.apache.inlong.agent.message.PackProxyMessage; -import org.apache.inlong.agent.message.ProxyMessage; -import org.apache.inlong.agent.plugin.Message; -import org.apache.inlong.agent.plugin.MessageFilter; -import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.ThreadUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.charset.StandardCharsets; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER; -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT; - -/** - * sink message data to inlong-dataproxy - */ -public class ProxySink extends AbstractSink { - - private static final Logger LOGGER = LoggerFactory.getLogger(ProxySink.class); - private static AtomicLong index = new AtomicLong(0); - private final ExecutorService executorService = new ThreadPoolExecutor(1, 1, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), new AgentThreadFactory("ProxySink")); - private MessageFilter messageFilter; - private SenderManager senderManager; - private byte[] fieldSplitter; - private volatile boolean shutdown = false; - private int maxPackSize; - - public ProxySink() { - } - - @Override - public void write(Message message) { - if (message == null) { - return; - } - boolean suc = false; - while (!suc) { - suc = putInCache(message); - if (!suc) { - AgentUtils.silenceSleepInMs(batchFlushInterval); - } - } - } - - private boolean putInCache(Message message) { - try { - if (message == null) { - return true; - } - message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, inlongGroupId); - message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, inlongStreamId); - extractStreamFromMessage(message, fieldSplitter); - if (message instanceof EndMessage) { - // increment the count of failed sinks - sinkMetric.sinkFailCount.incrementAndGet(); - return true; - } - AtomicBoolean suc = new AtomicBoolean(false); - ProxyMessage proxyMessage = new ProxyMessage(message); - boolean writerPermitSuc = MemoryManager.getInstance() - .tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, message.getBody().length); - if (!writerPermitSuc) { - LOGGER.warn("writer tryAcquire failed"); - MemoryManager.getInstance().printDetail(AGENT_GLOBAL_WRITER_PERMIT); - return false; - } - // add proxy message to cache. - cache.compute(proxyMessage.getBatchKey(), - (s, packProxyMessage) -> { - if (packProxyMessage == null) { - packProxyMessage = new PackProxyMessage(jobInstanceId, jobConf, inlongGroupId, - proxyMessage.getInlongStreamId()); - packProxyMessage.generateExtraMap(proxyMessage.getDataKey()); - } - // add message to package proxy - suc.set(packProxyMessage.addProxyMessage(proxyMessage)); - return packProxyMessage; - }); - if (suc.get()) { - MemoryManager.getInstance().release(AGENT_GLOBAL_CHANNEL_PERMIT, message.getBody().length); - // increment the count of successful sinks - sinkMetric.sinkSuccessCount.incrementAndGet(); - } else { - MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, message.getBody().length); - // increment the count of failed sinks - sinkMetric.sinkFailCount.incrementAndGet(); - } - return suc.get(); - } catch (Exception e) { - LOGGER.error("write message to Proxy sink error", e); - } catch (Throwable t) { - ThreadUtils.threadThrowableHandler(Thread.currentThread(), t); - } - return false; - } - - /** - * extract stream id from message if message filter is presented - */ - private void extractStreamFromMessage(Message message, byte[] fieldSplitter) { - if (messageFilter != null) { - message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, - messageFilter.filterStreamId(message, fieldSplitter)); - } else { - message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, inlongStreamId); - } - } - - /** - * flush cache by batch - * - * @return thread runner - */ - private Runnable flushCache() { - return () -> { - LOGGER.info("start flush cache {}:{}", inlongGroupId, sourceName); - while (!shutdown) { - try { - cache.forEach((batchKey, packProxyMessage) -> { - BatchProxyMessage batchProxyMessage = packProxyMessage.fetchBatch(); - if (batchProxyMessage != null) { - senderManager.sendBatch(batchProxyMessage); - LOGGER.info("send group id {}, message key {},with message size {}, the job id is {}, " - + "read source is {} sendTime is {}", inlongGroupId, batchKey, - batchProxyMessage.getDataList().size(), jobInstanceId, sourceName, - batchProxyMessage.getDataTime()); - } - }); - } catch (Exception ex) { - LOGGER.error("error caught", ex); - } catch (Throwable t) { - ThreadUtils.threadThrowableHandler(Thread.currentThread(), t); - } finally { - AgentUtils.silenceSleepInMs(batchFlushInterval); - } - } - LOGGER.info("stop flush cache {}:{}", inlongGroupId, sourceName); - }; - } - - @Override - public void init(JobProfile jobConf) { - super.init(jobConf); - this.maxPackSize = jobConf.getInt(PROXY_PACKAGE_MAX_SIZE, DEFAULT_PROXY_PACKAGE_MAX_SIZE); - messageFilter = initMessageFilter(jobConf); - fieldSplitter = jobConf.get(CommonConstants.FIELD_SPLITTER, DEFAULT_FIELD_SPLITTER).getBytes( - StandardCharsets.UTF_8); - senderManager = new SenderManager(jobConf, inlongGroupId, sourceName); - try { - senderManager.Start(); - executorService.execute(flushCache()); - } catch (Throwable ex) { - LOGGER.error("error while init sender for group id {}", inlongGroupId); - ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); - throw new IllegalStateException(ex); - } - } - - @Override - public void destroy() { - LOGGER.info("destroy sink source name {}", sourceName); - while (!sinkFinish()) { - LOGGER.info("sourceName {} wait until cache all flushed to proxy", sourceName); - AgentUtils.silenceSleepInMs(batchFlushInterval); - } - shutdown = true; - executorService.shutdown(); - senderManager.Stop(); - LOGGER.info("destroy sink source name {} end", sourceName); - } - - /** - * check whether all stream id messages finished - */ - private boolean sinkFinish() { - return cache.values().stream().allMatch(PackProxyMessage::isEmpty); - } -} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java index 554d6f59166..6b05f23fb76 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java @@ -21,12 +21,10 @@ import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.JobProfile; import org.apache.inlong.agent.constant.CommonConstants; -import org.apache.inlong.agent.core.task.MemoryManager; import org.apache.inlong.agent.core.task.PositionManager; import org.apache.inlong.agent.message.BatchProxyMessage; import org.apache.inlong.agent.metrics.AgentMetricItem; import org.apache.inlong.agent.metrics.AgentMetricItemSet; -import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.message.SequentialID; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.ThreadUtils; @@ -53,7 +51,6 @@ import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST; @@ -175,10 +172,6 @@ private void cleanResendQueue() { while (!resendQueue.isEmpty()) { try { AgentSenderCallback callback = resendQueue.poll(1, TimeUnit.SECONDS); - if (callback != null) { - MemoryManager.getInstance() - .release(AGENT_GLOBAL_WRITER_PERMIT, (int) callback.batchMessage.getTotalSize()); - } } catch (InterruptedException e) { LOGGER.error("clean resend queue error{}", e.getMessage()); } @@ -324,9 +317,6 @@ public void onMessageAck(SendResult result) { String jobId = batchMessage.getJobId(); long dataTime = batchMessage.getDataTime(); if (result != null && result.equals(SendResult.OK)) { - MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, (int) batchMessage.getTotalSize()); - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, msgCnt, - batchMessage.getTotalSize()); getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt); PositionManager.getInstance() .updateSinkPosition(batchMessage.getJobId(), sourcePath, msgCnt, false);