Skip to content

Commit

Permalink
[INLONG-10535][Agent] Support minute level tasks (apache#10536)
Browse files Browse the repository at this point in the history
Co-authored-by: AloysZhang <[email protected]>
  • Loading branch information
justinwwhuang and aloyszhang authored Jun 29, 2024
1 parent d4db882 commit 31a6f8b
Show file tree
Hide file tree
Showing 15 changed files with 203 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

public class CycleUnitType {

public static final String DAY = "D";
public static final String HOUR = "h";
public static final String DAY = "d";
public static final String HOUR = "H";
public static final String MINUTE = "m";
public static final String REAL_TIME = "R";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.agent.utils;

import org.apache.inlong.agent.constant.CycleUnitType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -45,15 +47,11 @@ public static long timeStrConvertToMillSec(String time, String cycleUnit, TimeZo
throws ParseException {
long retTime = 0;
SimpleDateFormat df = null;
if (cycleUnit.equals("Y") && time.length() == 4) {
df = new SimpleDateFormat("yyyy");
} else if (cycleUnit.equals("M") && time.length() == 6) {
df = new SimpleDateFormat("yyyyMM");
} else if (cycleUnit.equals("D") && time.length() == 8) {
if (cycleUnit.equalsIgnoreCase(CycleUnitType.DAY) && time.length() == 8) {
df = new SimpleDateFormat("yyyyMMdd");
} else if (cycleUnit.equalsIgnoreCase("h") && time.length() == 10) {
} else if (cycleUnit.equalsIgnoreCase(CycleUnitType.HOUR) && time.length() == 10) {
df = new SimpleDateFormat("yyyyMMddHH");
} else if (cycleUnit.contains("m") && time.length() == 12) {
} else if (cycleUnit.equals(CycleUnitType.MINUTE) && time.length() == 12) {
df = new SimpleDateFormat("yyyyMMddHHmm");
} else {
logger.error("time {}, cycleUnit {} can't parse!", time, cycleUnit);
Expand All @@ -77,37 +75,18 @@ public static String millSecConvertToTimeStr(long time, String cycleUnit, TimeZo

Date dateTime = calendarInstance.getTime();
SimpleDateFormat df = null;
if ("Y".equalsIgnoreCase(cycleUnit)) {
df = new SimpleDateFormat("yyyy");
} else if ("M".equals(cycleUnit)) {
df = new SimpleDateFormat("yyyyMM");
} else if ("D".equalsIgnoreCase(cycleUnit)) {
if (CycleUnitType.DAY.equalsIgnoreCase(cycleUnit)) {
df = new SimpleDateFormat("yyyyMMdd");
} else if ("h".equalsIgnoreCase(cycleUnit)) {
} else if (CycleUnitType.HOUR.equalsIgnoreCase(cycleUnit)) {
df = new SimpleDateFormat("yyyyMMddHH");
} else if (cycleUnit.contains("m")) {
} else if (CycleUnitType.MINUTE.equals(cycleUnit)) {
df = new SimpleDateFormat("yyyyMMddHHmm");
} else {
logger.error("cycleUnit {} can't parse!", cycleUnit);
df = new SimpleDateFormat("yyyyMMddHH");
}
df.setTimeZone(tz);
retTime = df.format(dateTime);

if (cycleUnit.contains("m")) {
int cycleNum = Integer.parseInt(cycleUnit.substring(0,
cycleUnit.length() - 1));
int mmTime = Integer.parseInt(retTime.substring(
retTime.length() - 2, retTime.length()));
String realMMTime = "";
if (cycleNum * (mmTime / cycleNum) <= 0) {
realMMTime = "0" + cycleNum * (mmTime / cycleNum);
} else {
realMMTime = "" + cycleNum * (mmTime / cycleNum);
}
retTime = retTime.substring(0, retTime.length() - 2) + realMMTime;
}

return retTime;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.ProfileFetcher;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
import org.apache.inlong.agent.utils.AgentUtils;
Expand Down Expand Up @@ -217,7 +218,7 @@ private TaskResult getTestConfig(String testDir, int normalTaskId, int retryTask
String endStr = "2023-07-22 00:00:00";
Long start = 0L;
Long end = 0L;
String normalPattern = testDir + "YYYY/YYYYMMDD_2.log_[0-9]+";
String normalPattern = testDir + "YYYY/YYYYMMDDhhmm_2.log_[0-9]+";
String retryPattern = testDir + "YYYY/YYYYMMDD_1.log_[0-9]+";
try {
Date parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(startStr);
Expand All @@ -227,28 +228,31 @@ private TaskResult getTestConfig(String testDir, int normalTaskId, int retryTask
} catch (ParseException e) {
e.printStackTrace();
}
configs.add(getTestDataConfig(normalTaskId, normalPattern, false, start, end, state));
configs.add(getTestDataConfig(retryTaskId, retryPattern, true, start, end, state));
configs.add(getTestDataConfig(normalTaskId, normalPattern, false, start, end, CycleUnitType.MINUTE, state));
configs.add(getTestDataConfig(retryTaskId, retryPattern, true, start, end, CycleUnitType.DAY, state));
return TaskResult.builder().dataConfigs(configs).build();
}

private DataConfig getTestDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime,
int state) {
String cycleUnit, int state) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("testGroupId"); // 老字段 groupId
dataConfig.setInlongStreamId("testStreamId"); // 老字段 streamId
dataConfig.setDataReportType(1); // 老字段 reportType
dataConfig.setTaskType(3); // 老字段 任务类型,3 代表文件采集
dataConfig.setTaskId(taskId); // 老字段 任务 id
dataConfig.setState(state); // 新增! 任务状态 1 正常 2 暂停
dataConfig.setInlongGroupId("devcloud_group_id");
dataConfig.setInlongStreamId("devcloud_stream_id");
dataConfig.setDataReportType(0);
dataConfig.setTaskType(3);
dataConfig.setTaskId(taskId);
dataConfig.setState(state);
dataConfig.setTimeZone("GMT+8:00");
FileTaskConfig fileTaskConfig = new FileTaskConfig();
fileTaskConfig.setPattern(pattern);// 正则
fileTaskConfig.setTimeOffset("0d"); // 老字段 时间偏移 "-1d" 采一天前的 "-2h" 采 2 小时前的
fileTaskConfig.setMaxFileCount(100); // 最大文件数
fileTaskConfig.setCycleUnit("D"); // 新增! 任务周期 "D" 天 "h" 小时
fileTaskConfig.setRetry(retry); // 新增! 是否补录,如果是补录任务则为 true
fileTaskConfig.setPattern(pattern);
fileTaskConfig.setTimeOffset("0d");
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit(cycleUnit);
fileTaskConfig.setRetry(retry);
fileTaskConfig.setStartTime(startTime);
fileTaskConfig.setEndTime(endTime);
fileTaskConfig.setDataContentStyle("CSV");
fileTaskConfig.setDataSeparator("|");
dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
return dataConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ public static List<BasicFileInfo> scanTaskBetweenTimes(String cycleUnit, String
for (Long time : dateRegion) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
String filename = NewDateUtils.replaceDateExpression(calendar, originPattern);
ArrayList<String> allPaths = FilePathUtil.cutDirectory(filename);
String fileName = NewDateUtils.replaceDateExpression(calendar, originPattern);
ArrayList<String> allPaths = FilePathUtil.cutDirectoryByWildcard(fileName);
String firstDir = allPaths.get(0);
String secondDir = allPaths.get(0) + File.separator + allPaths.get(1);
ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir, secondDir, filename, 3,
ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir, secondDir, fileName, 3,
DEFAULT_FILE_MAX_NUM);
for (String file : fileList) {
// TODO the time is not YYYYMMDDHH
Expand All @@ -111,17 +111,6 @@ public static List<BasicFileInfo> scanTaskBetweenTimes(String cycleUnit, String
return infos;
}

public static ArrayList<String> scanFile(int maxFileNum, String originPattern, long dataTime) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(dataTime);

String filename = NewDateUtils.replaceDateExpression(calendar, originPattern);
ArrayList<String> allPaths = FilePathUtil.cutDirectory(filename);
String firstDir = allPaths.get(0);
String secondDir = allPaths.get(0) + File.separator + allPaths.get(1);
return getUpdatedOrNewFiles(firstDir, secondDir, filename, 3, maxFileNum);
}

private static ArrayList<String> getUpdatedOrNewFiles(String firstDir, String secondDir,
String fileName, long depth, int maxFileNum) {
ArrayList<String> ret = new ArrayList<String>();
Expand Down Expand Up @@ -151,7 +140,7 @@ private static ArrayList<String> getUpdatedOrNewFiles(String logFileName,
int maxFileNum) {
ArrayList<String> ret = new ArrayList<String>();
ArrayList<String> directories = FilePathUtil
.getDirectoryLayers(logFileName);
.cutDirectoryByWildcardAndDateExpression(logFileName);
String parentDir = directories.get(0) + File.separator
+ directories.get(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private void watchInit() {
}

public void addPathPattern(String originPattern) {
ArrayList<String> directories = FilePathUtil.getDirectoryLayers(originPattern);
ArrayList<String> directories = FilePathUtil.cutDirectoryByWildcardAndDateExpression(originPattern);
String basicStaticPath = directories.get(0);
LOGGER.info("dataName {} watchPath {}", new Object[]{originPattern, basicStaticPath});
/* Remember the failed watcher creations. */
Expand Down Expand Up @@ -530,7 +530,7 @@ private boolean checkFileNameForTime(String newFileName, WatchEntity entity) {
PathDateExpression dateExpression = entity.getDateExpression();
if (dateExpression.getLongestDatePattern().length() != 0) {
String dataTime = getDataTimeFromFileName(newFileName, entity.getOriginPattern(), dateExpression);
LOGGER.info("file {} ,fileTime {}", newFileName, dataTime);
LOGGER.info("file {}, fileTime {}", newFileName, dataTime);
if (!NewDateUtils.isValidCreationTime(dataTime, entity.getCycleUnit(),
taskProfile.getTimeOffset())) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ public WatchEntity(WatchService watchService,
String cycleUnit) {
this.watchService = watchService;
this.originPattern = originPattern;
ArrayList<String> directoryLayers = FilePathUtil.getDirectoryLayers(originPattern);
ArrayList<String> directoryLayers = FilePathUtil.cutDirectoryByWildcardAndDateExpression(originPattern);
this.basicStaticPath = directoryLayers.get(0);
this.regexPattern = NewDateUtils.replaceDateExpressionWithRegex(originPattern);
pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
ArrayList<String> directories = FilePathUtil.cutDirectory(originPattern);
ArrayList<String> directories = FilePathUtil.cutDirectoryByWildcard(originPattern);
this.originPatternWithoutFileName = directories.get(0);
this.patternWithoutFileName = Pattern
.compile(NewDateUtils.replaceDateExpressionWithRegex(originPatternWithoutFileName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class FilePathUtil {
private static final String DAY = "DD";
private static final String HOUR = "hh";

public static ArrayList<String> cutDirectory(String directory) {
public static ArrayList<String> cutDirectoryByWildcard(String directory) {
String baseDirectory;
String regixDirecotry;
String fileName;
Expand Down Expand Up @@ -81,7 +81,7 @@ public static ArrayList<String> cutDirectory(String directory) {
return ret;
}

public static ArrayList<String> getDirectoryLayers(String directory) {
public static ArrayList<String> cutDirectoryByWildcardAndDateExpression(String directory) {
String baseDirectory;
String regixDirectory;
String fileName;
Expand Down Expand Up @@ -158,8 +158,8 @@ public static ArrayList<String> getDirectoryLayers(String directory) {
}

public static boolean isSameDir(String fileName1, String fileName2) {
ArrayList<String> ret1 = FilePathUtil.cutDirectory(fileName1);
ArrayList<String> ret2 = FilePathUtil.cutDirectory(fileName2);
ArrayList<String> ret1 = FilePathUtil.cutDirectoryByWildcard(fileName1);
ArrayList<String> ret2 = FilePathUtil.cutDirectoryByWildcard(fileName2);
return ret1.get(0).equals(ret2.get(0));
}

Expand Down
Loading

0 comments on commit 31a6f8b

Please sign in to comment.