Skip to content

Commit

Permalink
[INLONG-11368][Manager] Determine whether to issue a streamSource bas…
Browse files Browse the repository at this point in the history
…ed on the stream status (#11371)
  • Loading branch information
fuweng11 authored Oct 18, 2024
1 parent 396ca13 commit 7cb5f15
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.UserEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Expand All @@ -36,7 +38,7 @@ public class GroupCheckService {
@Autowired
private InlongGroupEntityMapper groupMapper;
@Autowired
private UserEntityMapper userMapper;
private InlongStreamEntityMapper streamMapper;

/**
* Check whether the inlong group status is temporary
Expand All @@ -58,4 +60,18 @@ public InlongGroupEntity checkGroupStatus(String groupId, String operator) {
return inlongGroupEntity;
}

public InlongStreamEntity checkStreamStatus(String groupId, String streamId, String operator) {
InlongStreamEntity inlongStreamEntity = streamMapper.selectByIdentifier(groupId, streamId);
if (inlongStreamEntity == null) {
throw new BusinessException(
String.format("InlongStream does not exist with groupId=%s, streamId=%s", groupId, streamId));
}

StreamStatus status = StreamStatus.forCode(inlongStreamEntity.getStatus());
if (StreamStatus.notAllowedUpdate(status)) {
throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
}

return inlongStreamEntity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
Expand Down Expand Up @@ -146,12 +146,12 @@ public String getExtParams(StreamSourceEntity sourceEntity) {

@Override
@Transactional(rollbackFor = Throwable.class)
public Integer saveOpt(SourceRequest request, Integer groupStatus, String operator) {
public Integer saveOpt(SourceRequest request, Integer streamStatus, String operator) {
StreamSourceEntity entity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new);
if (SourceType.AUTO_PUSH.equals(request.getSourceType())) {
// auto push task needs not be issued to agent
entity.setStatus(SourceStatus.SOURCE_NORMAL.getCode());
} else if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
} else if (StreamStatus.forCode(streamStatus).equals(StreamStatus.CONFIG_SUCCESSFUL)) {
entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
} else {
entity.setStatus(SourceStatus.SOURCE_NEW.getCode());
Expand All @@ -166,7 +166,7 @@ public Integer saveOpt(SourceRequest request, Integer groupStatus, String operat
if (request.getEnableSyncSchema()) {
syncSourceFieldInfo(request, operator);
}
if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
if (StreamStatus.forCode(streamStatus).equals(StreamStatus.CONFIG_SUCCESSFUL)) {
updateAgentTaskConfig(request, operator);
}
return entity.getId();
Expand All @@ -188,7 +188,7 @@ public PageResult<? extends StreamSource> getPageInfo(Page<StreamSourceEntity> e

@Override
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
public void updateOpt(SourceRequest request, Integer groupStatus, Integer groupMode, String operator) {
public void updateOpt(SourceRequest request, Integer streamStatus, Integer groupMode, String operator) {
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(request.getId());
if (entity == null) {
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND,
Expand Down Expand Up @@ -242,7 +242,7 @@ public void updateOpt(SourceRequest request, Integer groupStatus, Integer groupM
if (InlongConstants.STANDARD_MODE.equals(groupMode)) {
SourceStatus sourceStatus = SourceStatus.forCode(entity.getStatus());
Integer nextStatus = entity.getStatus();
if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
if (StreamStatus.forCode(streamStatus).equals(StreamStatus.CONFIG_SUCCESSFUL)) {
nextStatus = SourceStatus.TO_BE_ISSUED_RETRY.getCode();
} else {
switch (SourceStatus.forCode(entity.getStatus())) {
Expand All @@ -267,7 +267,7 @@ public void updateOpt(SourceRequest request, Integer groupStatus, Integer groupM
}
updateFieldOpt(entity, request.getFieldList());
LOGGER.debug("success to update source of type={}", request.getSourceType());
if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
if (StreamStatus.forCode(streamStatus).equals(StreamStatus.CONFIG_SUCCESSFUL)) {
updateAgentTaskConfig(request, operator);
}
}
Expand Down Expand Up @@ -479,7 +479,7 @@ public void updateAgentTaskConfig(SourceRequest request, String operator) {
return cmdConfig;
}).collect(Collectors.toList());
if (CollectionUtils.isEmpty(taskLists)) {
agentTaskConfigEntity.setTaskParams(null);
agentTaskConfigEntity.setTaskParams("");
agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ public interface StreamSourceOperator {
* Save the source info.
*
* @param request request of source
* @param groupStatus the belongs group status
* @param streamStatus the belongs stream status
* @param operator name of operator
* @return source id after saving
*/
Integer saveOpt(SourceRequest request, Integer groupStatus, String operator);
Integer saveOpt(SourceRequest request, Integer streamStatus, String operator);

/**
* Get source info by the given entity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
Expand Down Expand Up @@ -103,8 +104,8 @@ public Integer save(SourceRequest request, String operator) {

// Check if it can be added
String groupId = request.getInlongGroupId();
InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);
String streamId = request.getInlongStreamId();
InlongStreamEntity streamEntity = groupCheckService.checkStreamStatus(groupId, streamId, operator);
String sourceName = request.getSourceName();
List<StreamSourceEntity> existList = sourceMapper.selectByRelatedId(groupId, streamId, sourceName);
if (CollectionUtils.isNotEmpty(existList)) {
Expand All @@ -119,7 +120,7 @@ public Integer save(SourceRequest request, String operator) {
if (CollectionUtils.isNotEmpty(streamFields)) {
streamFields.forEach(streamField -> streamField.setId(null));
}
int id = sourceOperator.saveOpt(request, groupEntity.getStatus(), operator);
int id = sourceOperator.saveOpt(request, streamEntity.getStatus(), operator);

LOGGER.info("success to save source info: {}", request);
return id;
Expand Down Expand Up @@ -300,6 +301,9 @@ public Boolean update(SourceRequest request, String operator) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
String.format("InlongGroup does not exist with InlongGroupId=%s", groupId));
}
String streamId = request.getInlongStreamId();
InlongStreamEntity streamEntity = groupCheckService.checkStreamStatus(groupId, streamId, operator);

userService.checkUser(groupEntity.getInCharges(), operator,
"Current user does not have permission to update source info");
StreamSourceOperator sourceOperator = operatorFactory.getInstance(request.getSourceType());
Expand All @@ -308,7 +312,7 @@ public Boolean update(SourceRequest request, String operator) {
if (CollectionUtils.isNotEmpty(streamFields)) {
streamFields.forEach(streamField -> streamField.setId(null));
}
sourceOperator.updateOpt(request, groupEntity.getStatus(), groupEntity.getInlongGroupMode(), operator);
sourceOperator.updateOpt(request, streamEntity.getStatus(), groupEntity.getInlongGroupMode(), operator);

LOGGER.info("success to update source info: {}", request);
return true;
Expand Down

0 comments on commit 7cb5f15

Please sign in to comment.