diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java index f76f5bfb014..87508b5e1ce 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java @@ -55,6 +55,15 @@ public class ClsSinkDTO { @ApiModelProperty("Cloud log service index tokenizer") private String tokenizer; + @ApiModelProperty("contentOffset") + private Integer contentOffset = 0; + + @ApiModelProperty("fieldOffset") + private Integer fieldOffset; + + @ApiModelProperty("separator") + private String separator; + /** * Get the dto instance from the request */ diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java index 35565f2cd5d..4045231768a 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java @@ -44,7 +44,7 @@ public class ElasticsearchSinkDTO { private String indexNamePattern; @ApiModelProperty("contentOffset") - private Integer contentOffset; + private Integer contentOffset = 0; @ApiModelProperty("fieldOffset") private Integer fieldOffset; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java index 182308fe0ae..0f8d756c5a7 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java @@ -40,13 +40,4 @@ public class ElasticsearchSinkRequest extends SinkRequest { @ApiModelProperty("indexNamePattern") private String indexNamePattern; - @ApiModelProperty("contentOffset") - private Integer contentOffset; - - @ApiModelProperty("fieldOffset") - private Integer fieldOffset; - - @ApiModelProperty("separator") - private String separator; - } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java index ad69b997c90..2690576aad1 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java @@ -51,6 +51,9 @@ public class InlongStreamExtParam implements Serializable { @ApiModelProperty(value = "Predefined fields") private String predefinedFields; + @ApiModelProperty(value = "Extended field size") + private Integer extendedFieldSize = 0; + /** * Pack extended attributes into ExtParams * diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java index f1c305c4753..06a441336f7 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java @@ -136,6 +136,9 @@ public class InlongStreamInfo extends BaseInlongStream { @ApiModelProperty(value = "If use extended fields") private Boolean useExtendedFields = false; + @ApiModelProperty(value = "Extended field size") + private Integer extendedFieldSize = 0; + @ApiModelProperty(value = "Whether to ignore the parse errors of field value") private Boolean ignoreParseError = true; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java index 4ad91f17af2..bb6a0d29643 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java @@ -124,6 +124,9 @@ public class InlongStreamRequest extends BaseInlongStream { @ApiModelProperty(value = "If use extended fields") private Boolean useExtendedFields = false; + @ApiModelProperty(value = "Extended field size") + private Integer extendedFieldSize = 0; + @ApiModelProperty(value = "The message body wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, PB, etc") private String wrapType; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java index 170b2952493..24ee49f7576 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java @@ -20,14 +20,12 @@ import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig; import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse; import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig; -import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo; import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo; import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo; -import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam; import org.apache.inlong.manager.service.core.SortClusterService; import org.apache.inlong.manager.service.core.SortConfigLoader; import org.apache.inlong.manager.service.node.DataNodeOperator; @@ -37,7 +35,6 @@ import com.google.gson.Gson; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -281,7 +278,6 @@ private List> parseIdParams(List streams, StreamSinkOperator operator = sinkOperatorFactory.getInstance(streamSink.getSinkType()); List fields = fieldMap.get(streamSink.getId()); Map params = operator.parse2IdParams(streamSink, fields, dataNodeInfo); - setFiledOffset(streamSink, params); return params; } catch (Exception e) { LOGGER.error("fail to parse id params of groupId={}, streamId={} name={}, type={}}", @@ -294,17 +290,6 @@ private List> parseIdParams(List streams, .collect(Collectors.toList()); } - private void setFiledOffset(StreamSinkEntity streamSink, Map params) { - - SortSourceStreamInfo sortSourceStreamInfo = allStreams.get(streamSink.getInlongGroupId()) - .get(streamSink.getInlongStreamId()); - InlongStreamExtParam inlongStreamExtParam = JsonUtils.parseObject( - sortSourceStreamInfo.getExtParams(), InlongStreamExtParam.class); - if (ObjectUtils.anyNotNull(inlongStreamExtParam) && !inlongStreamExtParam.getUseExtendedFields()) { - params.put(FILED_OFFSET, String.valueOf(0)); - } - } - private Map parseSinkParams(DataNodeInfo nodeInfo) { DataNodeOperator operator = dataNodeOperatorFactory.getInstance(nodeInfo.getType()); return operator.parse2SinkParams(nodeInfo); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java index a2ec9e19583..cc0dafc91c7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java @@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.dao.entity.DataNodeEntity; +import org.apache.inlong.manager.dao.entity.InlongStreamEntity; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper; import org.apache.inlong.manager.pojo.node.DataNodeInfo; @@ -36,6 +37,7 @@ import org.apache.inlong.manager.pojo.sink.cls.ClsSink; import org.apache.inlong.manager.pojo.sink.cls.ClsSinkDTO; import org.apache.inlong.manager.pojo.sink.cls.ClsSinkRequest; +import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam; import org.apache.inlong.manager.service.sink.AbstractSinkOperator; import com.fasterxml.jackson.databind.ObjectMapper; @@ -74,6 +76,15 @@ protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntit ClsSinkRequest sinkRequest = (ClsSinkRequest) request; try { ClsSinkDTO dto = ClsSinkDTO.getFromRequest(sinkRequest, targetEntity.getExtParams()); + + InlongStreamEntity stream = inlongStreamEntityMapper + .selectByIdentifier(request.getInlongGroupId(), request.getInlongStreamId()); + dto.setSeparator(String.valueOf((char) (Integer.parseInt(stream.getDataSeparator())))); + + InlongStreamExtParam streamExt = + JsonUtils.parseObject(stream.getExtParams(), InlongStreamExtParam.class); + dto.setFieldOffset(streamExt.getExtendedFieldSize()); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); } catch (Exception e) { throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED, @@ -116,16 +127,16 @@ public Map parse2IdParams(StreamSinkEntity streamSink, List params = super.parse2IdParams(streamSink, fields, dataNodeInfo); ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(streamSink.getExtParams(), ClsSinkDTO.class); - params.put(TOPIC_ID, clsSinkDTO.getTopicId()); + params.computeIfAbsent(TOPIC_ID, k -> clsSinkDTO.getTopicId()); ClsDataNodeInfo clsDataNodeInfo = (ClsDataNodeInfo) dataNodeInfo; - params.put(SECRET_ID, clsDataNodeInfo.getSendSecretId()); - params.put(SECRET_KEY, clsDataNodeInfo.getSendSecretKey()); - params.put(END_POINT, clsDataNodeInfo.getEndpoint()); + params.computeIfAbsent(SECRET_ID, k -> clsDataNodeInfo.getSendSecretId()); + params.computeIfAbsent(SECRET_KEY, k -> clsDataNodeInfo.getSendSecretKey()); + params.computeIfAbsent(END_POINT, k -> clsDataNodeInfo.getEndpoint()); StringBuilder fieldNames = new StringBuilder(); for (String field : fields) { fieldNames.append(field).append(InlongConstants.BLANK); } - params.put(KEY_FIELDS, fieldNames.toString()); + params.computeIfAbsent(KEY_FIELDS, k -> fieldNames.toString()); return params; } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java index fba388029f2..7b2109c352d 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java @@ -22,6 +22,8 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.dao.entity.InlongStreamEntity; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; import org.apache.inlong.manager.pojo.node.DataNodeInfo; @@ -32,6 +34,7 @@ import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink; import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO; import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkRequest; +import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam; import org.apache.inlong.manager.service.sink.AbstractSinkOperator; import com.fasterxml.jackson.databind.ObjectMapper; @@ -78,6 +81,14 @@ protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntit try { ElasticsearchSinkDTO dto = ElasticsearchSinkDTO.getFromRequest(sinkRequest, targetEntity.getExtParams()); + InlongStreamEntity stream = inlongStreamEntityMapper + .selectByIdentifier(request.getInlongGroupId(), request.getInlongStreamId()); + dto.setSeparator(String.valueOf((char) (Integer.parseInt(stream.getDataSeparator())))); + + InlongStreamExtParam streamExt = + JsonUtils.parseObject(stream.getExtParams(), InlongStreamExtParam.class); + dto.setFieldOffset(streamExt.getExtendedFieldSize()); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); } catch (Exception e) { throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED, @@ -108,7 +119,7 @@ public Map parse2IdParams(StreamSinkEntity streamSink, List sb.toString()); return idParams; }