From c8e07656947593c4d6ed06d05761f8d13088499b Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Thu, 5 Dec 2024 15:38:23 +0800 Subject: [PATCH 1/2] [INLONG-11574][Agent] Add COS source unit test --- .../agent/core/AgentBaseTestsHelper.java | 33 --- .../agent/plugin/task/cos/FileScanner.java | 20 +- .../agent/plugin/AgentBaseTestsHelper.java | 53 +++- .../plugin/instance/TestInstanceManager.java | 5 +- .../agent/plugin/sinks/KafkaSinkTest.java | 5 +- .../agent/plugin/sinks/PulsarSinkTest.java | 5 +- .../sinks/filecollect/TestSenderManager.java | 5 +- .../plugin/sources/TestLogFileSource.java | 2 +- .../agent/plugin/sources/TestRedisSource.java | 2 +- .../plugin/sources/TestSQLServerSource.java | 2 +- .../inlong/agent/plugin/task/TestCOSTask.java | 233 ++++++++++++++++++ .../agent/plugin/task/TestLogFileTask.java | 8 +- .../agent/plugin/task/TestTaskManager.java | 9 +- 13 files changed, 317 insertions(+), 65 deletions(-) create mode 100644 inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java index a8dfbdf91ac..af0a54f1295 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java @@ -18,12 +18,8 @@ package org.apache.inlong.agent.core; import org.apache.inlong.agent.conf.AgentConfiguration; -import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.constant.FetcherConstants; -import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig; -import org.apache.inlong.common.enums.TaskStateEnum; -import org.apache.inlong.common.pojo.agent.DataConfig; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -74,33 +70,4 @@ public void teardownAgentHome() { } } } - - public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, String startTime, String endTime, - TaskStateEnum state, String timeZone) { - DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state, timeZone); - TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig); - return profile; - } - - private DataConfig getDataConfig(int taskId, String pattern, boolean retry, String startTime, String endTime, - TaskStateEnum state, String timeZone) { - DataConfig dataConfig = new DataConfig(); - dataConfig.setInlongGroupId("testGroupId"); - dataConfig.setInlongStreamId("testStreamId"); - dataConfig.setDataReportType(1); - dataConfig.setTaskType(3); - dataConfig.setTaskId(taskId); - dataConfig.setTimeZone(timeZone); - dataConfig.setState(state.ordinal()); - FileTaskConfig fileTaskConfig = new FileTaskConfig(); - fileTaskConfig.setPattern(pattern); - fileTaskConfig.setTimeOffset("0h"); - fileTaskConfig.setMaxFileCount(100); - fileTaskConfig.setCycleUnit("h"); - fileTaskConfig.setRetry(retry); - fileTaskConfig.setDataTimeFrom(startTime); - fileTaskConfig.setDataTimeTo(endTime); - dataConfig.setExtParams(GSON.toJson(fileTaskConfig)); - return dataConfig; - } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java index 4eac0eeff2a..1019d341352 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java @@ -97,16 +97,20 @@ public static List scanTaskInOneCycle(COSClient cosClient, String } List commonPrefixes = objectListing.getCommonPrefixes(); int depth; - Pattern patternByDepth = null; + Pattern patternByDepth; if (!commonPrefixes.isEmpty()) { depth = countCharacterOccurrences(commonPrefixes.get(0), PATH_SEP); - String temp = findNthOccurrenceSubstring(pattern.pattern(), PATH_SEP, depth); - patternByDepth = Pattern.compile(temp, Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE); - } - for (String commonPrefix : commonPrefixes) { - Matcher matcher = patternByDepth.matcher(commonPrefix); - if (matcher.matches()) { - infos.addAll(scanTaskInOneCycle(cosClient, bucketName, pattern, commonPrefix, dataTime, cycleUnit)); + String nthOccurrenceSubstring = findNthOccurrenceSubstring(pattern.pattern(), PATH_SEP, depth); + if (nthOccurrenceSubstring != null) { + patternByDepth = Pattern.compile(nthOccurrenceSubstring, + Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE); + for (String commonPrefix : commonPrefixes) { + Matcher matcher = patternByDepth.matcher(commonPrefix); + if (matcher.matches()) { + infos.addAll(scanTaskInOneCycle(cosClient, bucketName, pattern, commonPrefix, dataTime, + cycleUnit)); + } + } } } List cosObjectSummaries = objectListing.getObjectSummaries(); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java index 3dc4f8ab152..214a29b24e7 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java @@ -21,8 +21,10 @@ import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.constant.FetcherConstants; +import org.apache.inlong.agent.pojo.COSTask.COSTaskConfig; import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig; import org.apache.inlong.common.enums.TaskStateEnum; +import org.apache.inlong.common.enums.TaskTypeEnum; import org.apache.inlong.common.pojo.agent.DataConfig; import com.google.gson.Gson; @@ -82,24 +84,24 @@ public void teardownAgentHome() { } } - public TaskProfile getTaskProfile(int taskId, String pattern, String dataContentStyle, boolean retry, + public TaskProfile getFileTaskProfile(int taskId, String pattern, String dataContentStyle, boolean retry, String startTime, String endTime, TaskStateEnum state, String cycleUnit, String timeZone, List filterStreams) { - DataConfig dataConfig = getDataConfig(taskId, pattern, dataContentStyle, retry, startTime, endTime, + DataConfig dataConfig = getFileDataConfig(taskId, pattern, dataContentStyle, retry, startTime, endTime, state, cycleUnit, timeZone, filterStreams); TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig); return profile; } - private DataConfig getDataConfig(int taskId, String pattern, String dataContentStyle, boolean retry, + private DataConfig getFileDataConfig(int taskId, String pattern, String dataContentStyle, boolean retry, String startTime, String endTime, TaskStateEnum state, String cycleUnit, String timeZone, List filterStreams) { DataConfig dataConfig = new DataConfig(); dataConfig.setInlongGroupId("testGroupId"); dataConfig.setInlongStreamId("testStreamId"); dataConfig.setDataReportType(1); - dataConfig.setTaskType(3); + dataConfig.setTaskType(TaskTypeEnum.FILE.getType()); dataConfig.setTaskId(taskId); dataConfig.setTimeZone(timeZone); dataConfig.setState(state.ordinal()); @@ -119,4 +121,47 @@ private DataConfig getDataConfig(int taskId, String pattern, String dataContentS dataConfig.setExtParams(GSON.toJson(fileTaskConfig)); return dataConfig; } + + public TaskProfile getCOSTaskProfile(int taskId, String pattern, String contentStyle, boolean retry, + String startTime, String endTime, + TaskStateEnum state, String cycleUnit, String timeZone, List filterStreams) { + DataConfig dataConfig = getCOSDataConfig(taskId, pattern, contentStyle, retry, startTime, endTime, + state, cycleUnit, timeZone, + filterStreams); + TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig); + return profile; + } + + private DataConfig getCOSDataConfig(int taskId, String pattern, String contentStyle, boolean retry, + String startTime, String endTime, TaskStateEnum state, String cycleUnit, String timeZone, + List filterStreams) { + DataConfig dataConfig = new DataConfig(); + dataConfig.setInlongGroupId("testGroupId"); + dataConfig.setInlongStreamId("testStreamId"); + dataConfig.setDataReportType(1); + dataConfig.setTaskType(TaskTypeEnum.COS.getType()); + dataConfig.setTaskId(taskId); + dataConfig.setTimeZone(timeZone); + dataConfig.setState(state.ordinal()); + COSTaskConfig cosTaskConfig = new COSTaskConfig(); + cosTaskConfig.setBucketName("testBucket"); + cosTaskConfig.setCredentialsId("testSecretId"); + cosTaskConfig.setCredentialsKey("testSecretKey"); + cosTaskConfig.setRegion("testRegion"); + cosTaskConfig.setPattern(pattern); + cosTaskConfig.setTimeOffset("0d"); + // GMT-8:00 same with Asia/Shanghai + cosTaskConfig.setMaxFileCount(100); + cosTaskConfig.setCycleUnit(cycleUnit); + cosTaskConfig.setRetry(retry); + cosTaskConfig.setDataTimeFrom(startTime); + cosTaskConfig.setDataTimeTo(endTime); + // mix: login|87601|968|67826|23579 or login|a=b&c=d&x=y&asdf + cosTaskConfig.setContentStyle(contentStyle); + cosTaskConfig.setDataSeparator("|"); + cosTaskConfig.setFilterStreams(filterStreams); + dataConfig.setExtParams(GSON.toJson(cosTaskConfig)); + return dataConfig; + } + } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java index 61c94c4dd12..9901f290232 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java @@ -59,8 +59,9 @@ public static void setup() { helper = new AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDDhh_[0-9]+.txt"; Store basicInstanceStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE); - taskProfile = helper.getTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, CycleUnitType.HOUR, - "GMT+6:00", null); + taskProfile = + helper.getFileTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, CycleUnitType.HOUR, + "GMT+6:00", null); Store taskBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK); TaskStore taskStore = new TaskStore(taskBasicStore); taskStore.storeTask(taskProfile); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java index 066776d32fe..b8703c7056b 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java @@ -47,8 +47,9 @@ public static void setUp() throws Exception { String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", - "GMT+8:00", null); + TaskProfile taskProfile = + helper.getFileTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", + "GMT+8:00", null); profile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime()); kafkaSink = new MockSink(); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java index d7733259abd..43e3115dcb6 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java @@ -47,8 +47,9 @@ public static void setUp() throws Exception { String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", - "GMT+8:00", null); + TaskProfile taskProfile = + helper.getFileTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", + "GMT+8:00", null); profile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime()); pulsarSink = new MockSink(); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java index 5a1168edef3..1c9e623b9bb 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java @@ -70,8 +70,9 @@ public static void setup() { String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", - "GMT+8:00", null); + TaskProfile taskProfile = + helper.getFileTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", + "GMT+8:00", null); profile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime()); } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index 408b9f1b708..d7a93a0df88 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -84,7 +84,7 @@ private LogFileSource getSource(int taskId, long lineOffset, long byteOffset, St fileName = LOADER.getResource("test/20230928_1.txt").getPath(); pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; retry = false; - TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, dataContentStyle, retry, "", "", + TaskProfile taskProfile = helper.getFileTaskProfile(taskId, pattern, dataContentStyle, retry, "", "", TaskStateEnum.RUNNING, "D", "GMT+8:00", Arrays.asList("ok")); InstanceProfile instanceProfile = taskProfile.createInstanceProfile("", diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java index 4f2e90870bc..14518078f28 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java @@ -121,7 +121,7 @@ private void initProfile() { final String command = "zscore"; final String subOperation = "set,del"; - TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, "", "", TaskStateEnum.RUNNING, "D", + TaskProfile taskProfile = helper.getFileTaskProfile(1, "", "csv", false, "", "", TaskStateEnum.RUNNING, "D", "GMT+8:00", null); profile = taskProfile.createInstanceProfile("", "", taskProfile.getCycleUnit(), "20240725", AgentUtils.getCurrentTime()); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java index 377e5ae9132..410acd8e772 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java @@ -136,7 +136,7 @@ private SQLServerSource getSource() { final String tableName = "test_source"; final String serverName = "server-01"; - TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, "", "", TaskStateEnum.RUNNING, "D", + TaskProfile taskProfile = helper.getFileTaskProfile(1, "", "csv", false, "", "", TaskStateEnum.RUNNING, "D", "GMT+8:00", null); instanceProfile = taskProfile.createInstanceProfile("", "", taskProfile.getCycleUnit(), "20240725", AgentUtils.getCurrentTime()); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java new file mode 100644 index 00000000000..f66e4845b1b --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task; + +import org.apache.inlong.agent.common.AgentThreadFactory; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.CycleUnitType; +import org.apache.inlong.agent.core.task.TaskManager; +import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; +import org.apache.inlong.agent.plugin.task.cos.COSTask; +import org.apache.inlong.agent.plugin.utils.cos.COSUtils; +import org.apache.inlong.common.enums.TaskStateEnum; + +import com.qcloud.cos.COSClient; +import com.qcloud.cos.model.COSObjectSummary; +import com.qcloud.cos.model.ListObjectsRequest; +import com.qcloud.cos.model.ObjectListing; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({COSUtils.class, COSTask.class, COSClient.class, ObjectListing.class}) +@PowerMockIgnore({"javax.management.*"}) +public class TestCOSTask { + + private static final Logger LOGGER = LoggerFactory.getLogger(TestCOSTask.class); + private static final ClassLoader LOADER = TestCOSTask.class.getClassLoader(); + private static AgentBaseTestsHelper helper; + private static TaskManager manager; + private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( + 0, Integer.MAX_VALUE, + 1L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new AgentThreadFactory("TestCOSTask")); + private static COSClient cosClient; + + @BeforeClass + public static void setup() throws Exception { + helper = new AgentBaseTestsHelper(TestCOSTask.class.getName()).setupAgentHome(); + manager = new TaskManager(); + cosClient = Mockito.mock(COSClient.class); + PowerMockito.mockStatic(COSUtils.class); + Mockito.when(COSUtils.createCli(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + .thenReturn(cosClient); + } + + @AfterClass + public static void teardown() throws Exception { + helper.teardownAgentHome(); + } + + private void mockDay(COSClient cosClient) { + ObjectListing objectListing1_1 = Mockito.mock(ObjectListing.class); + when(objectListing1_1.getCommonPrefixes()).thenReturn( + Arrays.asList("some/20230928_0/", "some/20230928_1/", "some/20230928_aaa/")); + when(objectListing1_1.getObjectSummaries()).thenReturn(getSummaries(Arrays.asList("some/20230928_test_0.txt"))); + + ObjectListing objectListing1_2 = Mockito.mock(ObjectListing.class); + when(objectListing1_2.getCommonPrefixes()).thenReturn( + Arrays.asList("some/20230929_aaa/", "some/20230929_1/", "some/20230929_2/")); + when(objectListing1_2.getObjectSummaries()).thenReturn( + getSummaries(Arrays.asList("some/20230929_0_test_0.txt"))); + + ObjectListing objectListing2_1 = Mockito.mock(ObjectListing.class); + when(objectListing2_1.getCommonPrefixes()).thenReturn( + Arrays.asList("some/20230928_0/where/", "some/20230928_0/test_1/")); + when(objectListing2_1.getObjectSummaries()).thenReturn(getSummaries( + Arrays.asList("some/20230928_0/test_0.txt", "some/20230928_0/test_1.txt", + "some/20230928_0/test_o.txt"))); + + ObjectListing objectListing2_2 = Mockito.mock(ObjectListing.class); + when(objectListing2_2.getCommonPrefixes()).thenReturn( + Arrays.asList("some/20230929_1/where/", "some/20230929_1/test_1/")); + when(objectListing2_2.getObjectSummaries()).thenReturn(getSummaries( + Arrays.asList("some/20230929_1/test_0.txt", "some/20230929_1/test_1.txt", + "some/20230929_1/test_o.txt"))); + + when(cosClient.listObjects(Mockito.any(ListObjectsRequest.class))).thenAnswer(mock -> { + ListObjectsRequest req = mock.getArgument(0); + if (req.getPrefix().equals("some/20230928_")) { + return objectListing1_1; + } else if (req.getPrefix().equals("some/20230929_")) { + return objectListing1_2; + } else if (req.getPrefix().equals("some/20230928_0/")) { + return objectListing2_1; + } else if (req.getPrefix().equals("some/20230929_1/")) { + return objectListing2_2; + } else { + return new ObjectListing(); + } + }); + } + + private void mockHour(COSClient cosClient) { + ObjectListing objectListing1_1 = Mockito.mock(ObjectListing.class); + when(objectListing1_1.getCommonPrefixes()).thenReturn( + Arrays.asList("some/2023092800_0/", "some/2023092800_1/", "some/2023092800_aaa/")); + when(objectListing1_1.getObjectSummaries()).thenReturn( + getSummaries(Arrays.asList("some/2023092800_test_0.txt"))); + + ObjectListing objectListing1_2 = Mockito.mock(ObjectListing.class); + when(objectListing1_2.getCommonPrefixes()).thenReturn( + Arrays.asList("some/2023092901_aaa/", "some/2023092901_1/", "some/2023092901_2/")); + when(objectListing1_2.getObjectSummaries()).thenReturn( + getSummaries(Arrays.asList("some/2023092901_0_test_0.txt"))); + + ObjectListing objectListing2_1 = Mockito.mock(ObjectListing.class); + when(objectListing2_1.getCommonPrefixes()).thenReturn( + Arrays.asList("some/2023092800_0/where/", "some/2023092800_0/test_1/")); + when(objectListing2_1.getObjectSummaries()).thenReturn(getSummaries( + Arrays.asList("some/2023092800_0/test_0.txt", "some/2023092800_0/test_1.txt", + "some/2023092800_0/test_o.txt"))); + + ObjectListing objectListing2_2 = Mockito.mock(ObjectListing.class); + when(objectListing2_2.getCommonPrefixes()).thenReturn( + Arrays.asList("some/2023092901_1/where/", "some/2023092901_1/test_1/")); + when(objectListing2_2.getObjectSummaries()).thenReturn(getSummaries( + Arrays.asList("some/2023092901_1/test_0.txt", "some/2023092901_1/test_1.txt", + "some/2023092901_1/test_o.txt"))); + + when(cosClient.listObjects(Mockito.any(ListObjectsRequest.class))).thenAnswer(mock -> { + ListObjectsRequest req = mock.getArgument(0); + if (req.getPrefix().equals("some/2023092800_")) { + return objectListing1_1; + } else if (req.getPrefix().equals("some/2023092901_")) { + return objectListing1_2; + } else if (req.getPrefix().equals("some/2023092800_0/")) { + return objectListing2_1; + } else if (req.getPrefix().equals("some/2023092901_1/")) { + return objectListing2_2; + } else { + return new ObjectListing(); + } + }); + } + + private List getSummaries(List keys) { + List summaries = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) { + COSObjectSummary summary = new COSObjectSummary(); + summary.setKey(keys.get(i)); + summary.setSize(100); + summary.setStorageClass("what"); + summaries.add(summary); + } + return summaries; + } + + @Test + public void testScan() { + mockDay(cosClient); + doTest(1, "some/YYYYMMDD_[0-9]+/test_[0-9]+.txt", CycleUnitType.DAY, + Arrays.asList("some/20230928_0/test_0.txt", "some/20230928_0/test_1.txt", "some/20230929_1/test_0.txt", + "some/20230929_1/test_1.txt"), + Arrays.asList("20230928", "20230928", "20230929", "20230929"), + "20230928", + "20230930"); + mockHour(cosClient); + doTest(2, "some/YYYYMMDDhh_[0-9]+/test_[0-9]+.txt", CycleUnitType.HOUR, + Arrays.asList("some/2023092800_0/test_0.txt", "some/2023092800_0/test_1.txt", + "some/2023092901_1/test_0.txt", + "some/2023092901_1/test_1.txt"), + Arrays.asList("2023092800", "2023092800", "2023092901", "2023092901"), "2023092800", + "2023093023"); + } + + private void doTest(int taskId, String pattern, String cycle, List srcKeys, List srcDataTimes, + String startTime, String endTime) { + TaskProfile taskProfile = helper.getCOSTaskProfile(taskId, pattern, "csv", true, startTime, endTime, + TaskStateEnum.RUNNING, + cycle, "GMT+8:00", null); + COSTask task = null; + final List fileName = new ArrayList(); + final List dataTime = new ArrayList(); + try { + task = PowerMockito.spy(new COSTask()); + PowerMockito.doAnswer(invocation -> { + fileName.add(invocation.getArgument(0)); + dataTime.add(invocation.getArgument(1)); + return null; + }).when(task, "addToEvenMap", Mockito.anyString(), Mockito.anyString()); + Assert.assertTrue(task.isProfileValid(taskProfile)); + manager.getTaskStore().storeTask(taskProfile); + task.init(manager, taskProfile, manager.getInstanceBasicStore()); + EXECUTOR_SERVICE.submit(task); + } catch (Exception e) { + LOGGER.error("source init error", e); + Assert.assertTrue("source init error", false); + } + await().atMost(10, TimeUnit.SECONDS) + .until(() -> fileName.size() == srcDataTimes.size() && dataTime.size() == srcDataTimes.size()); + for (int i = 0; i < fileName.size(); i++) { + Assert.assertEquals(0, fileName.get(i).compareTo(srcKeys.get(i))); + Assert.assertEquals(0, dataTime.get(i).compareTo(srcDataTimes.get(i))); + } + task.destroy(); + } +} \ No newline at end of file diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java index 440c4a52080..7d1962c314b 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java @@ -20,7 +20,6 @@ import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.constant.CycleUnitType; -import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.core.task.TaskManager; import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; import org.apache.inlong.agent.plugin.task.file.LogFileTask; @@ -101,14 +100,13 @@ private void doTest(int taskId, List resources, String pattern, String c for (int i = 0; i < resources.size(); i++) { resourceName.add(LOADER.getResource(resources.get(i)).getPath()); } - TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, "csv", true, "", "", TaskStateEnum.RUNNING, - cycle, "GMT+8:00", null); + TaskProfile taskProfile = + helper.getFileTaskProfile(taskId, pattern, "csv", true, startTime, endTime, TaskStateEnum.RUNNING, + cycle, "GMT+8:00", null); LogFileTask dayTask = null; final List fileName = new ArrayList(); final List dataTime = new ArrayList(); try { - taskProfile.set(TaskConstants.FILE_TASK_TIME_FROM, startTime); - taskProfile.set(TaskConstants.FILE_TASK_TIME_TO, endTime); dayTask = PowerMockito.spy(new LogFileTask()); PowerMockito.doAnswer(invocation -> { fileName.add(invocation.getArgument(0)); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java index 014c5ce4e20..4cab9fa8f0c 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java @@ -58,8 +58,9 @@ public void testTaskManager() { manager = new TaskManager(); TaskStore taskStore = manager.getTaskStore(); for (int i = 1; i <= 10; i++) { - TaskProfile taskProfile = helper.getTaskProfile(i, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, - "D", "GMT+8:00", null); + TaskProfile taskProfile = + helper.getFileTaskProfile(i, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, + "D", "GMT+8:00", null); taskProfile.setTaskClass(MockTask.class.getCanonicalName()); taskStore.storeTask(taskProfile); } @@ -74,7 +75,7 @@ public void testTaskManager() { Assert.assertTrue("manager start error", false); } - TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, + TaskProfile taskProfile1 = helper.getFileTaskProfile(100, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", "GMT+8:00", null); String taskId1 = taskProfile1.getTaskId(); taskProfile1.setTaskClass(MockTask.class.getCanonicalName()); @@ -99,7 +100,7 @@ public void testTaskManager() { Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == TaskStateEnum.RUNNING); // test delete - TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, + TaskProfile taskProfile2 = helper.getFileTaskProfile(200, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", "GMT+8:00", null); taskProfile2.setTaskClass(MockTask.class.getCanonicalName()); List taskProfiles2 = new ArrayList<>(); From c43401fca2e8453eba9b266a047ca1505bfe7978 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Thu, 5 Dec 2024 15:47:58 +0800 Subject: [PATCH 2/2] [INLONG-11574][Agent] Delete useless code --- .../java/org/apache/inlong/agent/plugin/task/cos/COSTask.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java index eceb38e8be4..de2beb7cc03 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java @@ -196,7 +196,6 @@ private void runForNormal() { } private void scanExistingFile() { - LOGGER.info("test123 qqqq"); List fileInfos = FileScanner.scanTaskBetweenTimes(cosClient, bucketName, originPattern, taskProfile.getCycleUnit(), timeOffset, startTime, endTime, retry); LOGGER.info("taskId {} scan {} get file count {}", getTaskId(), originPattern, fileInfos.size()); @@ -314,7 +313,6 @@ private void addToEvenMap(String fileName, String dataTime) { taskProfile.getTaskId(), dataTime, fileName); return; } - LOGGER.info("test123 {}", cosClient); ObjectMetadata meta = cosClient.getObjectMetadata(bucketName, fileName); Long fileUpdateTime = meta.getLastModified().getTime(); if (!shouldAddAgain(fileName, fileUpdateTime)) {