From f9c28bad17278d193038ae77a88dba62b1c8a40d Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Thu, 9 Nov 2023 09:56:25 +0800 Subject: [PATCH] [INLONG-9244][Agent] Fix bug: miss file from next data time (#9245) --- .../plugin/task/filecollect/LogFileCollectTask.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java index 1629b895955..26c61efa7a5 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java @@ -325,7 +325,7 @@ private void dealWithEvenMap() { for (Map.Entry> entry : eventMap.entrySet()) { Map sameDataTimeEvents = entry.getValue(); if (sameDataTimeEvents.isEmpty()) { - return; + continue; } /* * Calculate whether the event needs to be processed at the current time based on its data time, business @@ -442,10 +442,10 @@ private Path resolvePathFromEvent(WatchEvent watchEvent, Path contextPath) { private void handleFilePath(Path filePath, WatchEntity entity) { String newFileName = filePath.toFile().getAbsolutePath(); - LOGGER.info("[New File] {} {}", newFileName, entity.getOriginPattern()); + LOGGER.info("new file {} {}", newFileName, entity.getOriginPattern()); Matcher matcher = entity.getPattern().matcher(newFileName); if (matcher.matches() || matcher.lookingAt()) { - LOGGER.info("[Matched File] {} {}", newFileName, entity.getOriginPattern()); + LOGGER.info("matched file {} {}", newFileName, entity.getOriginPattern()); String dataTime = getDataTimeFromFileName(newFileName, entity.getOriginPattern(), entity.getDateExpression()); if (!checkFileNameForTime(newFileName, entity)) { @@ -458,10 +458,14 @@ private void handleFilePath(Path filePath, WatchEntity entity) { private void addToEvenMap(String fileName, String dataTime) { if (isInEventMap(fileName, dataTime)) { + LOGGER.info("addToEvenMap isInEventMap returns true skip taskId {} dataTime {} fileName {}", + taskProfile.getTaskId(), dataTime, fileName); return; } Long fileUpdateTime = FileUtils.getFileLastModifyTime(fileName); if (!instanceManager.shouldAddAgain(fileName, fileUpdateTime)) { + LOGGER.info("addToEvenMap shouldAddAgain returns false skip taskId {} dataTime {} fileName {}", + taskProfile.getTaskId(), dataTime, fileName); return; } Map sameDataTimeEvents = eventMap.computeIfAbsent(dataTime, @@ -474,6 +478,7 @@ private void addToEvenMap(String fileName, String dataTime) { InstanceProfile instanceProfile = taskProfile.createInstanceProfile(DEFAULT_FILE_INSTANCE, fileName, dataTime, fileUpdateTime); sameDataTimeEvents.put(fileName, instanceProfile); + LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}", taskProfile.getTaskId(), dataTime, fileName); } private boolean checkFileNameForTime(String newFileName, WatchEntity entity) {