Skip to content

Commit

Permalink
[INLONG-9241][Agent] Print task and instance detail every ten seconds (
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang authored Nov 9, 2023
1 parent f9c28ba commit 4552466
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public class InstanceManager extends AbstractDaemon {

private static final Logger LOGGER = LoggerFactory.getLogger(InstanceManager.class);
private static final int ACTION_QUEUE_CAPACITY = 100;
public static final int CORE_THREAD_SLEEP_TIME = 100;
public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000;
public static final int CORE_THREAD_PRINT_TIME = 10000;
private long lastPrintTime = 0;
// task in db
private final InstanceDb instanceDb;
// task in memory
Expand Down Expand Up @@ -109,7 +111,8 @@ private Runnable coreThread() {
running = true;
while (isRunnable()) {
try {
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
printInstanceDetail();
dealWithActionQueue(actionQueue);
keepPaceWithDb();
} catch (Throwable ex) {
Expand All @@ -122,6 +125,22 @@ private Runnable coreThread() {
};
}

private void printInstanceDetail() {
if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_TIME) {
LOGGER.info("instanceManager coreThread running! taskId {} action count {}", taskId,
actionQueue.size());
List<InstanceProfile> instances = instanceDb.getInstances(taskId);
for (int i = 0; i < instances.size(); i++) {
InstanceProfile instance = instances.get(i);
LOGGER.info(
"instanceManager coreThread instance taskId {} index {} total {} instanceId {} state {}",
taskId, i,
instances.size(), instance.getInstanceId(), instance.getState());
}
lastPrintTime = AgentUtils.getCurrentTime();
}
}

private void keepPaceWithDb() {
traverseDbTasksToMemory();
traverseMemoryTasksToDb();
Expand Down Expand Up @@ -215,7 +234,7 @@ public void stop() {
public void waitForTerminate() {
super.waitForTerminate();
while (running) {
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
}
}

Expand All @@ -241,7 +260,8 @@ private void addInstance(InstanceProfile profile) {
}
LOGGER.info("add instance taskId {} instanceId {}", taskId, profile.getInstanceId());
if (!shouldAddAgain(profile.getInstanceId(), profile.getFileUpdateTime())) {
LOGGER.info("shouldAddAgain returns false skip taskId {} instanceId {}", taskId, profile.getInstanceId());
LOGGER.info("addInstance shouldAddAgain returns false skip taskId {} instanceId {}", taskId,
profile.getInstanceId());
return;
}
addToDb(profile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ public class TaskManager extends AbstractDaemon {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskManager.class);
public static final int CONFIG_QUEUE_CAPACITY = 1;
public static final int CORE_THREAD_SLEEP_TIME = 1000;
public static final int CORE_THREAD_PRINT_TIME = 10000;
private static final int ACTION_QUEUE_CAPACITY = 100000;
private long lastPrintTime = 0;
// task basic db
private final Db taskBasicDb;
// instance basic db
Expand Down Expand Up @@ -143,6 +145,7 @@ private Runnable coreThread() {
while (isRunnable()) {
try {
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
printTaskDetail();
dealWithConfigQueue(configQueue);
dealWithActionQueue(actionQueue);
} catch (Throwable ex) {
Expand All @@ -153,6 +156,20 @@ private Runnable coreThread() {
};
}

private void printTaskDetail() {
if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_TIME) {
LOGGER.info("taskManager coreThread running!");
List<TaskProfile> tasks = taskDb.getTasks();
for (int i = 0; i < tasks.size(); i++) {
TaskProfile task = tasks.get(i);
LOGGER.info("taskManager coreThread task index {} total {} taskId {} state {}",
i, tasks.size(), task.getTaskId(), task.getState());
}
lastPrintTime = AgentUtils.getCurrentTime();
}

}

private void dealWithConfigQueue(BlockingQueue<List<TaskProfile>> queue) {
List<TaskProfile> dataConfigs = queue.poll();
if (dataConfigs == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public static void setup() {
Db basicDb = TaskManager.initDb("/localdb");
taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING);
manager = new InstanceManager("1", 2, basicDb);
manager.CORE_THREAD_SLEEP_TIME_MS = 100;
manager.start();
}

Expand Down

0 comments on commit 4552466

Please sign in to comment.