Skip to content

Commit

Permalink
[INLONG-11569][Agent] Add COS Task (#11570)
Browse files Browse the repository at this point in the history
* [INLONG-11569][Agent] Add COS Task

* [INLONG-11569][Agent] Modify code based on comments
  • Loading branch information
justinwwhuang authored Dec 2, 2024
1 parent a843dc8 commit 1f8a7ff
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_CLUSTERS;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_TOPIC;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;

/**
* job profile which contains details describing properties of one job.
Expand Down Expand Up @@ -200,6 +200,6 @@ public int compareTo(InstanceProfile object) {
}

public boolean isRetry() {
return getBoolean(FILE_TASK_RETRY, false);
return getBoolean(TASK_RETRY, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;

/**
Expand Down Expand Up @@ -65,10 +65,6 @@ public String getCycleUnit() {
return get(TaskConstants.TASK_CYCLE_UNIT);
}

public String getTimeOffset() {
return get(TaskConstants.TASK_FILE_TIME_OFFSET, "");
}

public String getTimeZone() {
return get(TaskConstants.TASK_TIME_ZONE);
}
Expand All @@ -82,7 +78,7 @@ public void setState(TaskStateEnum state) {
}

public boolean isRetry() {
return getBoolean(FILE_TASK_RETRY, false);
return getBoolean(TASK_RETRY, false);
}

public String getTaskClass() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_CYCLE_UNIT = "task.cycleUnit";
public static final String FILE_TASK_CYCLE_UNIT = "task.fileTask.cycleUnit";
public static final String TASK_FILE_CONTENT_COLLECT_TYPE = "task.fileTask.contentCollectType";
public static final String SOURCE_DATA_CONTENT_STYLE = "task.fileTask.dataContentStyle";
public static final String SOURCE_DATA_SEPARATOR = "task.fileTask.dataSeparator";
public static final String SOURCE_FILTER_STREAMS = "task.fileTask.filterStreams";
public static final String FILE_TASK_RETRY = "task.fileTask.retry";
public static final String FILE_CONTENT_STYLE = "task.fileTask.dataContentStyle";
public static final String FILE_DATA_SEPARATOR = "task.fileTask.dataSeparator";
public static final String FILE_FILTER_STREAMS = "task.fileTask.filterStreams";
public static final String TASK_RETRY = "task.retry";
public static final String FILE_TASK_TIME_FROM = "task.fileTask.dataTimeFrom";
public static final String FILE_TASK_TIME_TO = "task.fileTask.dataTimeTo";
public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
Expand All @@ -75,6 +75,22 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_KAFKA_OFFSET = "task.kafkaTask.partition.offset";
public static final String TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET = "task.kafkaTask.autoOffsetReset";

// COS task
public static final String COS_TASK_CYCLE_UNIT = "task.cosTask.cycleUnit";
public static final String COS_CONTENT_STYLE = "task.cosTask.contentStyle";
public static final String COS_MAX_NUM = "task.cosTask.maxFileCount";
public static final String COS_TASK_PATTERN = "task.cosTask.pattern";
public static final String TASK_COS_TIME_OFFSET = "task.cosTask.timeOffset";
public static final String COS_TASK_RETRY = "task.cosTask.retry";
public static final String COS_TASK_TIME_FROM = "task.cosTask.dataTimeFrom";
public static final String COS_TASK_TIME_TO = "task.cosTask.dataTimeTo";
public static final String COS_TASK_BUCKET_NAME = "task.cosTask.bucketName";
public static final String COS_TASK_SECRET_ID = "task.cosTask.secretId";
public static final String COS_TASK_SECRET_KEY = "task.cosTask.secretKey";
public static final String COS_TASK_REGION = "task.cosTask.region";
public static final String COS_DATA_SEPARATOR = "task.cosTask.dataSeparator";
public static final String COS_FILTER_STREAMS = "task.cosTask.filterStreams";

/**
* delimiter to split offset for different task
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.pojo;

import lombok.Data;

import java.util.List;

@Data
public class COSTask {

private Integer id;
private String pattern;
private String cycleUnit;
private Boolean retry;
private String dataTimeFrom;
private String dataTimeTo;
private String timeOffset;
private Integer maxFileCount;
private String collectType;
private String contentStyle;
private String dataSeparator;
private String filterStreams;
private String bucketName;
private String secretId;
private String secretKey;
private String region;

@Data
public static class COSTaskConfig {

private String pattern;
private String cycleUnit;
private Boolean retry;
private String dataTimeFrom;
private String dataTimeTo;
// '1m' means one minute after, '-1m' means one minute before
// '1h' means one hour after, '-1h' means one hour before
// '1d' means one day after, '-1d' means one day before
// Null means from current timestamp
private String timeOffset;
private Integer maxFileCount;
// Collect type, for example: FULL, INCREMENT
private String collectType;
// Type of data result for column separator
// CSV format, set this parameter to a custom separator: , | :
// Json format, set this parameter to json
private String contentStyle;
// Column separator of data source
private String dataSeparator;
// The streamIds to be filtered out
private List<String> filterStreams;
private String bucketName;
private String credentialsId;
private String credentialsKey;
private String region;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.pojo.BinlogTask.BinlogTaskConfig;
import org.apache.inlong.agent.pojo.COSTask.COSTaskConfig;
import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
import org.apache.inlong.agent.pojo.FileTask.Line;
import org.apache.inlong.agent.pojo.KafkaTask.KafkaTaskConfig;
Expand All @@ -37,6 +38,8 @@

import com.google.gson.Gson;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.stream.Collectors;

Expand All @@ -48,6 +51,8 @@
@Data
public class TaskProfileDto {

private static final Logger logger = LoggerFactory.getLogger(TaskProfileDto.class);

public static final String DEFAULT_FILE_TASK = "org.apache.inlong.agent.plugin.task.file.LogFileTask";
public static final String DEFAULT_KAFKA_TASK = "org.apache.inlong.agent.plugin.task.KafkaTask";
public static final String DEFAULT_PULSAR_TASK = "org.apache.inlong.agent.plugin.task.PulsarTask";
Expand All @@ -62,7 +67,7 @@ public class TaskProfileDto {
public static final String DEFAULT_DATA_PROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink";
public static final String PULSAR_SINK = "org.apache.inlong.agent.plugin.sinks.PulsarSink";
public static final String KAFKA_SINK = "org.apache.inlong.agent.plugin.sinks.KafkaSink";

public static final String DEFAULT_COS_TASK = "org.apache.inlong.agent.plugin.task.cos.COSTask";
/**
* file source
*/
Expand Down Expand Up @@ -101,6 +106,10 @@ public class TaskProfileDto {
* sqlserver source
*/
public static final String SQLSERVER_SOURCE = "org.apache.inlong.agent.plugin.sources.SQLServerSource";
/**
* cos source
*/
public static final String COS_SOURCE = "org.apache.inlong.agent.plugin.sources.COSSource";

private static final Gson GSON = new Gson();

Expand Down Expand Up @@ -197,6 +206,35 @@ private static FileTask getFileTask(DataConfig dataConfig) {
return fileTask;
}

private static COSTask getCOSTask(DataConfig dataConfig) {
COSTask cosTask = new COSTask();
cosTask.setId(dataConfig.getTaskId());
COSTaskConfig taskConfig = GSON.fromJson(dataConfig.getExtParams(),
COSTaskConfig.class);
cosTask.setPattern(taskConfig.getPattern());
cosTask.setCollectType(taskConfig.getCollectType());
cosTask.setContentStyle(taskConfig.getContentStyle());
cosTask.setDataSeparator(taskConfig.getDataSeparator());
cosTask.setMaxFileCount(taskConfig.getMaxFileCount());
cosTask.setRetry(taskConfig.getRetry());
cosTask.setCycleUnit(taskConfig.getCycleUnit());
cosTask.setDataTimeFrom(taskConfig.getDataTimeFrom());
cosTask.setDataTimeTo(taskConfig.getDataTimeTo());
cosTask.setBucketName(taskConfig.getBucketName());
cosTask.setSecretId(taskConfig.getCredentialsId());
cosTask.setSecretKey(taskConfig.getCredentialsKey());
cosTask.setRegion(taskConfig.getRegion());
if (taskConfig.getFilterStreams() != null) {
cosTask.setFilterStreams(GSON.toJson(taskConfig.getFilterStreams()));
}
if (taskConfig.getTimeOffset() != null) {
cosTask.setTimeOffset(taskConfig.getTimeOffset());
} else {
cosTask.setTimeOffset(deafult_time_offset + cosTask.getCycleUnit());
}
return cosTask;
}

private static KafkaTask getKafkaTask(DataConfig dataConfigs) {

KafkaTaskConfig kafkaTaskConfig = GSON.fromJson(dataConfigs.getExtParams(),
Expand Down Expand Up @@ -468,6 +506,7 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
throw new IllegalArgumentException("invalid mq type " + mqType + " please check");
}
}
task.setRetry(false);
TaskTypeEnum taskType = TaskTypeEnum.getTaskType(dataConfig.getTaskType());
switch (requireNonNull(taskType)) {
case SQL:
Expand All @@ -483,6 +522,7 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
task.setCycleUnit(fileTask.getCycleUnit());
task.setFileTask(fileTask);
task.setSource(DEFAULT_SOURCE);
task.setRetry(fileTask.getRetry());
profileDto.setTask(task);
break;
case KAFKA:
Expand Down Expand Up @@ -544,7 +584,17 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
case MOCK:
profileDto.setTask(task);
break;
case COS:
task.setTaskClass(DEFAULT_COS_TASK);
COSTask cosTask = getCOSTask(dataConfig);
task.setCycleUnit(cosTask.getCycleUnit());
task.setCosTask(cosTask);
task.setSource(COS_SOURCE);
task.setRetry(cosTask.getRetry());
profileDto.setTask(task);
break;
default:
logger.error("invalid task type {}", taskType);
}
return TaskProfile.parseJsonStr(GSON.toJson(profileDto));
}
Expand Down Expand Up @@ -574,6 +624,7 @@ public static class Task {
private String cycleUnit;
private String timeZone;
private String auditVersion;
private boolean retry;

private FileTask fileTask;
private BinlogTask binlogTask;
Expand All @@ -585,6 +636,7 @@ public static class Task {
private RedisTask redisTask;
private MqttTask mqttTask;
private SqlServerTask sqlserverTask;
private COSTask cosTask;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.inlong.agent.constant.TaskConstants.SOURCE_DATA_CONTENT_STYLE;
import static org.apache.inlong.agent.constant.TaskConstants.FILE_CONTENT_STYLE;

/**
* Read text files
Expand Down Expand Up @@ -355,7 +355,7 @@ protected void releaseSource() {
FileStatic data = new FileStatic();
data.setTaskId(taskId);
data.setRetry(String.valueOf(profile.isRetry()));
data.setContentType(profile.get(SOURCE_DATA_CONTENT_STYLE));
data.setContentType(profile.get(FILE_CONTENT_STYLE));
data.setGroupId(profile.getInlongGroupId());
data.setStreamId(profile.getInlongStreamId());
data.setDataTime(format.format(profile.getSinkDataTime()));
Expand All @@ -364,10 +364,9 @@ protected void releaseSource() {
data.setReadBytes(String.valueOf(bytePosition));
data.setReadLines(String.valueOf(linePosition));
OffsetProfile offsetProfile = OffsetManager.getInstance().getOffset(taskId, instanceId);
if (offsetProfile == null) {
return;
if (offsetProfile != null) {
data.setSendLines(offsetProfile.getOffset());
}
data.setSendLines(offsetProfile.getOffset());
FileStaticManager.putStaticMsg(data);
randomAccessFile.close();
} catch (IOException e) {
Expand Down
Loading

0 comments on commit 1f8a7ff

Please sign in to comment.