Skip to content

Commit

Permalink
[INLONG-11412][Agent] Do not report Agent status and file metrics (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang authored Oct 29, 2024
1 parent c2391b7 commit f78f45f
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,30 +141,24 @@ public String getFieldsString() {
private String systemStartupTime = ExcuteLinux.exeCmd("uptime -s").replaceAll("\r|\n", "");

private AgentStatusManager(AgentManager agentManager) {
this.agentManager = agentManager;
this.conf = AgentConfiguration.getAgentConf();
threadBean = ManagementFactory.getThreadMXBean();
this.agentManager = agentManager;
}

public static AgentStatusManager getInstance(AgentManager agentManager) {
if (manager == null) {
synchronized (AgentStatusManager.class) {
if (manager == null) {
manager = new AgentStatusManager(agentManager);
}
public static void init(AgentManager agentManager) {
synchronized (AgentStatusManager.class) {
if (manager == null) {
manager = new AgentStatusManager(agentManager);
}
}
return manager;
}

public static AgentStatusManager getInstance() {
if (manager == null) {
throw new RuntimeException("HeartbeatManager has not been initialized by agentManager");
}
private static AgentStatusManager getInstance() {
return manager;
}

public void sendStatusMsg(DefaultMessageSender sender) {
private void doSendStatusMsg(DefaultMessageSender sender) {
AgentStatus data = AgentStatusManager.getInstance().getStatus();
LOGGER.info("status detail: {}", data);
if (sender == null) {
Expand All @@ -180,6 +174,12 @@ public void sendStatusMsg(DefaultMessageSender sender) {
}
}

public static void sendStatusMsg(DefaultMessageSender sender) {
if (AgentStatusManager.getInstance() != null) {
AgentStatusManager.getInstance().doSendStatusMsg(sender);
}
}

private double getProcessCpu() {
double cpu = tryGetProcessCpu();
int tryTimes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,27 +92,24 @@ public String getFieldsString() {
private final AgentConfiguration conf;
protected BlockingQueue<FileStatic> queue;

private FileStaticManager(AgentManager agentManager) {
private FileStaticManager() {
this.conf = AgentConfiguration.getAgentConf();
queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
}

public static FileStaticManager getInstance(AgentManager agentManager) {
if (manager == null) {
synchronized (FileStaticManager.class) {
if (manager == null) {
manager = new FileStaticManager(agentManager);
}
public static void init() {
synchronized (FileStaticManager.class) {
if (manager == null) {
manager = new FileStaticManager();
}
}
return manager;
}

public static FileStaticManager getInstance() {
private static FileStaticManager getInstance() {
return manager;
}

public void putStaticMsg(FileStatic data) {
private void doPutStaticMsg(FileStatic data) {
data.setAgentIp(AgentUtils.fetchLocalIp());
data.setTag(conf.get(AGENT_CLUSTER_TAG));
data.setCluster(conf.get(AGENT_CLUSTER_NAME));
Expand All @@ -121,7 +118,13 @@ public void putStaticMsg(FileStatic data) {
}
}

public void sendStaticMsg(DefaultMessageSender sender) {
public static void putStaticMsg(FileStatic data) {
if (FileStaticManager.getInstance() != null) {
FileStaticManager.getInstance().doPutStaticMsg(data);
}
}

private void doSendStaticMsg(DefaultMessageSender sender) {
while (!queue.isEmpty()) {
FileStatic data = queue.poll();
LOGGER.info("file static detail: {}", data);
Expand All @@ -138,4 +141,10 @@ public void sendStaticMsg(DefaultMessageSender sender) {
}
}
}

public static void sendStaticMsg(DefaultMessageSender sender) {
if (FileStaticManager.getInstance() != null) {
FileStaticManager.getInstance().doSendStaticMsg(sender);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ private HeartbeatManager(AgentManager agentManager) {
baseManagerUrl = httpManager.getBaseUrl();
reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl);
createMessageSender();
AgentStatusManager.getInstance(agentManager);
FileStaticManager.getInstance(agentManager);
}

public static HeartbeatManager getInstance(AgentManager agentManager) {
Expand Down Expand Up @@ -126,8 +124,8 @@ private Runnable heartbeatReportThread() {
if (sender == null) {
createMessageSender();
}
AgentStatusManager.getInstance().sendStatusMsg(sender);
FileStaticManager.getInstance().sendStaticMsg(sender);
AgentStatusManager.sendStatusMsg(sender);
FileStaticManager.sendStaticMsg(sender);
} catch (Throwable e) {
LOGGER.error("interrupted while report heartbeat", e);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,6 @@ public boolean sourceExist() {
protected void releaseSource() {
if (randomAccessFile != null) {
try {
if (FileStaticManager.getInstance() == null) {
return;
}
FileStatic data = new FileStatic();
data.setTaskId(taskId);
data.setRetry(String.valueOf(profile.isRetry()));
Expand All @@ -342,7 +339,7 @@ protected void releaseSource() {
return;
}
data.setSendLines(offsetProfile.getOffset());
FileStaticManager.getInstance().putStaticMsg(data);
FileStaticManager.putStaticMsg(data);
randomAccessFile.close();
} catch (IOException e) {
LOGGER.error("close randomAccessFile error", e);
Expand Down

0 comments on commit f78f45f

Please sign in to comment.