Skip to content

Commit

Permalink
[INLONG-9244][Agent] Fix bug: miss file from next data time (#9245)
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang authored Nov 9, 2023
1 parent b1c0beb commit f9c28ba
Showing 1 changed file with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ private void dealWithEvenMap() {
for (Map.Entry<String, Map<String, InstanceProfile>> entry : eventMap.entrySet()) {
Map<String, InstanceProfile> sameDataTimeEvents = entry.getValue();
if (sameDataTimeEvents.isEmpty()) {
return;
continue;
}
/*
* Calculate whether the event needs to be processed at the current time based on its data time, business
Expand Down Expand Up @@ -442,10 +442,10 @@ private Path resolvePathFromEvent(WatchEvent<?> watchEvent, Path contextPath) {

private void handleFilePath(Path filePath, WatchEntity entity) {
String newFileName = filePath.toFile().getAbsolutePath();
LOGGER.info("[New File] {} {}", newFileName, entity.getOriginPattern());
LOGGER.info("new file {} {}", newFileName, entity.getOriginPattern());
Matcher matcher = entity.getPattern().matcher(newFileName);
if (matcher.matches() || matcher.lookingAt()) {
LOGGER.info("[Matched File] {} {}", newFileName, entity.getOriginPattern());
LOGGER.info("matched file {} {}", newFileName, entity.getOriginPattern());
String dataTime = getDataTimeFromFileName(newFileName, entity.getOriginPattern(),
entity.getDateExpression());
if (!checkFileNameForTime(newFileName, entity)) {
Expand All @@ -458,10 +458,14 @@ private void handleFilePath(Path filePath, WatchEntity entity) {

private void addToEvenMap(String fileName, String dataTime) {
if (isInEventMap(fileName, dataTime)) {
LOGGER.info("addToEvenMap isInEventMap returns true skip taskId {} dataTime {} fileName {}",
taskProfile.getTaskId(), dataTime, fileName);
return;
}
Long fileUpdateTime = FileUtils.getFileLastModifyTime(fileName);
if (!instanceManager.shouldAddAgain(fileName, fileUpdateTime)) {
LOGGER.info("addToEvenMap shouldAddAgain returns false skip taskId {} dataTime {} fileName {}",
taskProfile.getTaskId(), dataTime, fileName);
return;
}
Map<String, InstanceProfile> sameDataTimeEvents = eventMap.computeIfAbsent(dataTime,
Expand All @@ -474,6 +478,7 @@ private void addToEvenMap(String fileName, String dataTime) {
InstanceProfile instanceProfile = taskProfile.createInstanceProfile(DEFAULT_FILE_INSTANCE,
fileName, dataTime, fileUpdateTime);
sameDataTimeEvents.put(fileName, instanceProfile);
LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}", taskProfile.getTaskId(), dataTime, fileName);
}

private boolean checkFileNameForTime(String newFileName, WatchEntity entity) {
Expand Down

0 comments on commit f9c28ba

Please sign in to comment.