From 4552466dfefed046a2188571a3213570244c7d9e Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Thu, 9 Nov 2023 09:57:24 +0800 Subject: [PATCH] [INLONG-9241][Agent] Print task and instance detail every ten seconds (#9243) --- .../agent/core/instance/InstanceManager.java | 28 ++++++++++++++++--- .../agent/core/task/file/TaskManager.java | 17 +++++++++++ .../core/instance/TestInstanceManager.java | 1 + 3 files changed, 42 insertions(+), 4 deletions(-) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index de96b93bc1e..d4a278a974b 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -47,7 +47,9 @@ public class InstanceManager extends AbstractDaemon { private static final Logger LOGGER = LoggerFactory.getLogger(InstanceManager.class); private static final int ACTION_QUEUE_CAPACITY = 100; - public static final int CORE_THREAD_SLEEP_TIME = 100; + public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000; + public static final int CORE_THREAD_PRINT_TIME = 10000; + private long lastPrintTime = 0; // task in db private final InstanceDb instanceDb; // task in memory @@ -109,7 +111,8 @@ private Runnable coreThread() { running = true; while (isRunnable()) { try { - AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); + AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS); + printInstanceDetail(); dealWithActionQueue(actionQueue); keepPaceWithDb(); } catch (Throwable ex) { @@ -122,6 +125,22 @@ private Runnable coreThread() { }; } + private void printInstanceDetail() { + if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_TIME) { + LOGGER.info("instanceManager coreThread running! taskId {} action count {}", taskId, + actionQueue.size()); + List instances = instanceDb.getInstances(taskId); + for (int i = 0; i < instances.size(); i++) { + InstanceProfile instance = instances.get(i); + LOGGER.info( + "instanceManager coreThread instance taskId {} index {} total {} instanceId {} state {}", + taskId, i, + instances.size(), instance.getInstanceId(), instance.getState()); + } + lastPrintTime = AgentUtils.getCurrentTime(); + } + } + private void keepPaceWithDb() { traverseDbTasksToMemory(); traverseMemoryTasksToDb(); @@ -215,7 +234,7 @@ public void stop() { public void waitForTerminate() { super.waitForTerminate(); while (running) { - AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); + AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS); } } @@ -241,7 +260,8 @@ private void addInstance(InstanceProfile profile) { } LOGGER.info("add instance taskId {} instanceId {}", taskId, profile.getInstanceId()); if (!shouldAddAgain(profile.getInstanceId(), profile.getFileUpdateTime())) { - LOGGER.info("shouldAddAgain returns false skip taskId {} instanceId {}", taskId, profile.getInstanceId()); + LOGGER.info("addInstance shouldAddAgain returns false skip taskId {} instanceId {}", taskId, + profile.getInstanceId()); return; } addToDb(profile); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java index 9aa71a96abb..b4cb79a48cb 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java @@ -56,7 +56,9 @@ public class TaskManager extends AbstractDaemon { private static final Logger LOGGER = LoggerFactory.getLogger(TaskManager.class); public static final int CONFIG_QUEUE_CAPACITY = 1; public static final int CORE_THREAD_SLEEP_TIME = 1000; + public static final int CORE_THREAD_PRINT_TIME = 10000; private static final int ACTION_QUEUE_CAPACITY = 100000; + private long lastPrintTime = 0; // task basic db private final Db taskBasicDb; // instance basic db @@ -143,6 +145,7 @@ private Runnable coreThread() { while (isRunnable()) { try { AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); + printTaskDetail(); dealWithConfigQueue(configQueue); dealWithActionQueue(actionQueue); } catch (Throwable ex) { @@ -153,6 +156,20 @@ private Runnable coreThread() { }; } + private void printTaskDetail() { + if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_TIME) { + LOGGER.info("taskManager coreThread running!"); + List tasks = taskDb.getTasks(); + for (int i = 0; i < tasks.size(); i++) { + TaskProfile task = tasks.get(i); + LOGGER.info("taskManager coreThread task index {} total {} taskId {} state {}", + i, tasks.size(), task.getTaskId(), task.getState()); + } + lastPrintTime = AgentUtils.getCurrentTime(); + } + + } + private void dealWithConfigQueue(BlockingQueue> queue) { List dataConfigs = queue.poll(); if (dataConfigs == null) { diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java index b1107bb2ab0..cff9a7c2436 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java @@ -51,6 +51,7 @@ public static void setup() { Db basicDb = TaskManager.initDb("/localdb"); taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); manager = new InstanceManager("1", 2, basicDb); + manager.CORE_THREAD_SLEEP_TIME_MS = 100; manager.start(); }