From 7cb5f1517ee55d2bf1483cca800f81af6444f0f2 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Fri, 18 Oct 2024 14:15:22 +0800 Subject: [PATCH] [INLONG-11368][Manager] Determine whether to issue a streamSource based on the stream status (#11371) --- .../service/group/GroupCheckService.java | 20 +++++++++++++++++-- .../source/AbstractSourceOperator.java | 16 +++++++-------- .../service/source/StreamSourceOperator.java | 4 ++-- .../source/StreamSourceServiceImpl.java | 10 +++++++--- 4 files changed, 35 insertions(+), 15 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java index fe1c7a4451c..6824cba63b9 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java @@ -19,10 +19,12 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; +import org.apache.inlong.manager.common.enums.StreamStatus; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.dao.entity.InlongGroupEntity; +import org.apache.inlong.manager.dao.entity.InlongStreamEntity; import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; -import org.apache.inlong.manager.dao.mapper.UserEntityMapper; +import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -36,7 +38,7 @@ public class GroupCheckService { @Autowired private InlongGroupEntityMapper groupMapper; @Autowired - private UserEntityMapper userMapper; + private InlongStreamEntityMapper streamMapper; /** * Check whether the inlong group status is temporary @@ -58,4 +60,18 @@ public InlongGroupEntity checkGroupStatus(String groupId, String operator) { return inlongGroupEntity; } + public InlongStreamEntity checkStreamStatus(String groupId, String streamId, String operator) { + InlongStreamEntity inlongStreamEntity = streamMapper.selectByIdentifier(groupId, streamId); + if (inlongStreamEntity == null) { + throw new BusinessException( + String.format("InlongStream does not exist with groupId=%s, streamId=%s", groupId, streamId)); + } + + StreamStatus status = StreamStatus.forCode(inlongStreamEntity.getStatus()); + if (StreamStatus.notAllowedUpdate(status)) { + throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status)); + } + + return inlongStreamEntity; + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index 885fd2a99e0..cdf6290281f 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -32,8 +32,8 @@ import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; -import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.enums.SourceStatus; +import org.apache.inlong.manager.common.enums.StreamStatus; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; @@ -146,12 +146,12 @@ public String getExtParams(StreamSourceEntity sourceEntity) { @Override @Transactional(rollbackFor = Throwable.class) - public Integer saveOpt(SourceRequest request, Integer groupStatus, String operator) { + public Integer saveOpt(SourceRequest request, Integer streamStatus, String operator) { StreamSourceEntity entity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new); if (SourceType.AUTO_PUSH.equals(request.getSourceType())) { // auto push task needs not be issued to agent entity.setStatus(SourceStatus.SOURCE_NORMAL.getCode()); - } else if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) { + } else if (StreamStatus.forCode(streamStatus).equals(StreamStatus.CONFIG_SUCCESSFUL)) { entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode()); } else { entity.setStatus(SourceStatus.SOURCE_NEW.getCode()); @@ -166,7 +166,7 @@ public Integer saveOpt(SourceRequest request, Integer groupStatus, String operat if (request.getEnableSyncSchema()) { syncSourceFieldInfo(request, operator); } - if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) { + if (StreamStatus.forCode(streamStatus).equals(StreamStatus.CONFIG_SUCCESSFUL)) { updateAgentTaskConfig(request, operator); } return entity.getId(); @@ -188,7 +188,7 @@ public PageResult getPageInfo(Page e @Override @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ) - public void updateOpt(SourceRequest request, Integer groupStatus, Integer groupMode, String operator) { + public void updateOpt(SourceRequest request, Integer streamStatus, Integer groupMode, String operator) { StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(request.getId()); if (entity == null) { throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND, @@ -242,7 +242,7 @@ public void updateOpt(SourceRequest request, Integer groupStatus, Integer groupM if (InlongConstants.STANDARD_MODE.equals(groupMode)) { SourceStatus sourceStatus = SourceStatus.forCode(entity.getStatus()); Integer nextStatus = entity.getStatus(); - if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) { + if (StreamStatus.forCode(streamStatus).equals(StreamStatus.CONFIG_SUCCESSFUL)) { nextStatus = SourceStatus.TO_BE_ISSUED_RETRY.getCode(); } else { switch (SourceStatus.forCode(entity.getStatus())) { @@ -267,7 +267,7 @@ public void updateOpt(SourceRequest request, Integer groupStatus, Integer groupM } updateFieldOpt(entity, request.getFieldList()); LOGGER.debug("success to update source of type={}", request.getSourceType()); - if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) { + if (StreamStatus.forCode(streamStatus).equals(StreamStatus.CONFIG_SUCCESSFUL)) { updateAgentTaskConfig(request, operator); } } @@ -479,7 +479,7 @@ public void updateAgentTaskConfig(SourceRequest request, String operator) { return cmdConfig; }).collect(Collectors.toList()); if (CollectionUtils.isEmpty(taskLists)) { - agentTaskConfigEntity.setTaskParams(null); + agentTaskConfigEntity.setTaskParams(""); agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity); return; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java index e820140ae67..07b0879330c 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java @@ -50,11 +50,11 @@ public interface StreamSourceOperator { * Save the source info. * * @param request request of source - * @param groupStatus the belongs group status + * @param streamStatus the belongs stream status * @param operator name of operator * @return source id after saving */ - Integer saveOpt(SourceRequest request, Integer groupStatus, String operator); + Integer saveOpt(SourceRequest request, Integer streamStatus, String operator); /** * Get source info by the given entity. diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index de96386b253..e82d9d2aeba 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.InlongGroupEntity; +import org.apache.inlong.manager.dao.entity.InlongStreamEntity; import org.apache.inlong.manager.dao.entity.StreamSourceEntity; import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; @@ -103,8 +104,8 @@ public Integer save(SourceRequest request, String operator) { // Check if it can be added String groupId = request.getInlongGroupId(); - InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator); String streamId = request.getInlongStreamId(); + InlongStreamEntity streamEntity = groupCheckService.checkStreamStatus(groupId, streamId, operator); String sourceName = request.getSourceName(); List existList = sourceMapper.selectByRelatedId(groupId, streamId, sourceName); if (CollectionUtils.isNotEmpty(existList)) { @@ -119,7 +120,7 @@ public Integer save(SourceRequest request, String operator) { if (CollectionUtils.isNotEmpty(streamFields)) { streamFields.forEach(streamField -> streamField.setId(null)); } - int id = sourceOperator.saveOpt(request, groupEntity.getStatus(), operator); + int id = sourceOperator.saveOpt(request, streamEntity.getStatus(), operator); LOGGER.info("success to save source info: {}", request); return id; @@ -300,6 +301,9 @@ public Boolean update(SourceRequest request, String operator) { throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, String.format("InlongGroup does not exist with InlongGroupId=%s", groupId)); } + String streamId = request.getInlongStreamId(); + InlongStreamEntity streamEntity = groupCheckService.checkStreamStatus(groupId, streamId, operator); + userService.checkUser(groupEntity.getInCharges(), operator, "Current user does not have permission to update source info"); StreamSourceOperator sourceOperator = operatorFactory.getInstance(request.getSourceType()); @@ -308,7 +312,7 @@ public Boolean update(SourceRequest request, String operator) { if (CollectionUtils.isNotEmpty(streamFields)) { streamFields.forEach(streamField -> streamField.setId(null)); } - sourceOperator.updateOpt(request, groupEntity.getStatus(), groupEntity.getInlongGroupMode(), operator); + sourceOperator.updateOpt(request, streamEntity.getStatus(), groupEntity.getInlongGroupMode(), operator); LOGGER.info("success to update source info: {}", request); return true;