Skip to content

Commit

Permalink
[INLONG-11516][Agent] Accelerate the process exit speed (#11517)
Browse files Browse the repository at this point in the history
* [INLONG-11516][Agent] Accelerate the process exit speed

* [INLONG-11516][Agent] Add modifications to InstacneManager
  • Loading branch information
justinwwhuang authored Nov 21, 2024
1 parent 22d9a33 commit 8ade12d
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public abstract class Instance extends AbstractStateWrapper {
*/
public abstract void destroy();

/**
* notify destroy instance.
*/
public abstract void notifyDestroy();

/**
* get instance profile
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ public boolean submitAction(InstanceAction action) {
if (action == null) {
return false;
}
if (isFull()) {
return false;
}
return actionQueue.offer(action);
}

Expand All @@ -163,7 +166,7 @@ private Runnable coreThread() {
try {
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
printInstanceState();
dealWithActionQueue(actionQueue);
dealWithActionQueue();
keepPaceWithStore();
String inlongGroupId = taskFromStore.getInlongGroupId();
String inlongStreamId = taskFromStore.getInlongStreamId();
Expand Down Expand Up @@ -251,10 +254,10 @@ private void traverseMemoryTasksToStore() {
});
}

private void dealWithActionQueue(BlockingQueue<InstanceAction> queue) {
private void dealWithActionQueue() {
while (isRunnable()) {
try {
InstanceAction action = queue.poll();
InstanceAction action = actionQueue.poll();
if (action == null) {
break;
}
Expand Down Expand Up @@ -375,6 +378,15 @@ private void deleteFromMemory(String instanceId) {
instance.getProfile().getSinkDataTime(), 1, 1, auditVersion);
}

private void notifyDestroyInstance(String instanceId) {
Instance instance = instanceMap.get(instanceId);
if (instance == null) {
LOGGER.error("try to notify destroy instance but not found: taskId {} instanceId {}", taskId, instanceId);
return;
}
instance.notifyDestroy();
}

private void addToStore(InstanceProfile profile, boolean addNew) {
LOGGER.info("add instance to instance store state {} instanceId {}", profile.getState(),
profile.getInstanceId());
Expand Down Expand Up @@ -433,6 +445,9 @@ private void addToMemory(InstanceProfile instanceProfile) {
}

private void stopAllInstances() {
instanceMap.values().forEach((instance) -> {
notifyDestroyInstance(instance.getInstanceId());
});
instanceMap.values().forEach((instance) -> {
deleteFromMemory(instance.getInstanceId());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,29 @@ public boolean init(Object srcManager, InstanceProfile srcProfile) {

@Override
public void destroy() {
if (!inited) {
return;
}
doChangeState(State.SUCCEEDED);
Long start = AgentUtils.getCurrentTime();
notifyDestroy();
while (running) {
AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
}
LOGGER.info("destroy instance wait run elapse {} ms instance {}", AgentUtils.getCurrentTime() - start,
profile.getInstanceId());
start = AgentUtils.getCurrentTime();
this.source.destroy();
LOGGER.info("destroy instance wait source elapse {} ms instance {}", AgentUtils.getCurrentTime() - start,
profile.getInstanceId());
start = AgentUtils.getCurrentTime();
this.sink.destroy();
LOGGER.info("destroy instance wait sink elapse {} ms instance {}", AgentUtils.getCurrentTime() - start,
profile.getInstanceId());
}

@Override
public void notifyDestroy() {
if (!inited) {
return;
}
doChangeState(State.SUCCEEDED);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class SenderManager {

private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class);
private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance();
public static final int RESEND_QUEUE_WAIT_MS = 10;
// cache for group and sender list, share the map cross agent lifecycle.
private DefaultMessageSender sender;
private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
Expand Down Expand Up @@ -172,9 +173,12 @@ public void Stop() {
}

private void closeMessageSender() {
Long start = AgentUtils.getCurrentTime();
if (sender != null) {
sender.close();
}
LOGGER.info("close sender elapse {} ms instance {}", AgentUtils.getCurrentTime() - start,
profile.getInstanceId());
}

private AgentMetricItem getMetricItem(Map<String, String> otherDimensions) {
Expand Down Expand Up @@ -286,7 +290,7 @@ private Runnable flushResendQueue() {
resendRunning = true;
while (!shutdown) {
try {
AgentSenderCallback callback = resendQueue.poll(1, TimeUnit.SECONDS);
AgentSenderCallback callback = resendQueue.poll(RESEND_QUEUE_WAIT_MS, TimeUnit.MILLISECONDS);
if (callback != null) {
SenderMessage message = callback.message;
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND, message.getGroupId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ protected class SourceData {
protected final Integer BATCH_READ_LINE_COUNT = 10000;
protected final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024;
protected final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
protected final Integer READ_WAIT_TIMEOUT_MS = 10;
private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
protected final Integer WAIT_TIMEOUT_MS = 10;
private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60 * 100;
private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
protected BlockingQueue<SourceData> queue;

Expand Down Expand Up @@ -172,7 +172,7 @@ private void doRun() {
emptyCount = 0;
}
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN);
AgentUtils.silenceSleepInSeconds(1);
AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
continue;
}
emptyCount = 0;
Expand Down Expand Up @@ -231,7 +231,7 @@ private boolean waitForPermit(String permitName, int permitLen) {
if (!isRunnable()) {
return false;
}
AgentUtils.silenceSleepInSeconds(1);
AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
}
}
return true;
Expand All @@ -247,7 +247,7 @@ private void putIntoQueue(SourceData sourceData) {
try {
boolean offerSuc = false;
while (isRunnable() && !offerSuc) {
offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
offerSuc = queue.offer(sourceData, WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
if (!offerSuc) {
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length);
Expand Down Expand Up @@ -338,7 +338,7 @@ private boolean filterSourceData(Message msg) {
private SourceData readFromQueue() {
SourceData sourceData = null;
try {
sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
sourceData = queue.poll(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.warn("poll {} data get interrupted.", instanceId);
}
Expand Down Expand Up @@ -405,7 +405,7 @@ private void clearQueue(BlockingQueue<SourceData> queue) {
while (queue != null && !queue.isEmpty()) {
SourceData sourceData = null;
try {
sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
sourceData = queue.poll(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.warn("poll {} data get interrupted.", instanceId, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public void destroy() {
destroyTime = index.getAndAdd(1);
}

@Override
public void notifyDestroy() {

}

@Override
public InstanceProfile getProfile() {
return profile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private LogFileSource getSource(int taskId, long offset) {
Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS", 0);
Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 2);
Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3);
Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10);
Whitebox.setInternalState(source, "WAIT_TIMEOUT_MS", 10);
if (offset > 0) {
OffsetProfile offsetProfile = new OffsetProfile(instanceProfile.getTaskId(),
instanceProfile.getInstanceId(),
Expand Down

0 comments on commit 8ade12d

Please sign in to comment.