Skip to content

Commit

Permalink
[INLONG-9209][Manager] Support configuring predefined fields and issu…
Browse files Browse the repository at this point in the history
…ing agents (#9210)

* [INLONG-9209][Manager] Support configuring predefined fields and issuing agents

* [INLONG-9209][Manager] Fix UT
  • Loading branch information
fuweng11 authored Nov 3, 2023
1 parent 7cbe641 commit ff41d6a
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class DataConfig {
private Integer syncSend;
private String syncPartitionKey;
private Integer state;
private String predefinedFields;
private String extParams;
/**
* The task version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@
import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* The base parameter class of InlongStream, support user extend their own business params.
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ApiModel("Base info of inlong stream")
public class BaseInlongStream {

// you can add extend parameters in this class
private String predefinedFields;

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public class InlongStreamExtParam implements Serializable {
@ApiModelProperty(value = "If use extended fields")
private Boolean useExtendedFields = false;

@ApiModelProperty(value = "Predefined fields")
private String predefinedFields;

/**
* Pack extended attributes into ExtParams
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.source.SourceSnapshotOperator;

Expand Down Expand Up @@ -101,6 +102,7 @@
import java.util.stream.Stream;

import static org.apache.inlong.manager.common.consts.InlongConstants.DOT;
import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams;

/**
* Agent service layer implementation
Expand Down Expand Up @@ -161,7 +163,7 @@ private void startHeartbeatTask() {
// because the eviction handler needs to query cluster info cache
long expireTime = 10 * 5;
taskCache = Caffeine.newBuilder()
.expireAfterAccess(expireTime * 2L, TimeUnit.SECONDS)
.expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS)
.build(this::fetchTask);

if (updateTaskTimeoutEnabled) {
Expand Down Expand Up @@ -601,6 +603,11 @@ private DataConfig getDataConfig(StreamSourceEntity entity, int op) {
extParams = (null != dataSeparator ? getExtParams(extParams, dataSeparator) : extParams);
}

InlongStreamInfo streamInfo = CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
// Processing extParams
unpackExtParams(streamEntity.getExtParams(), streamInfo);
dataConfig.setPredefinedFields(streamInfo.getPredefinedFields());

int dataReportType = groupEntity.getDataReportType();
dataConfig.setDataReportType(dataReportType);
if (InlongConstants.REPORT_TO_MQ_RECEIVED == dataReportType) {
Expand Down

0 comments on commit ff41d6a

Please sign in to comment.