Skip to content

Commit

Permalink
[INLONG-11195][Manager] It is not allowed to modify group information…
Browse files Browse the repository at this point in the history
… when ordinary users are not responsible (apache#11196)
  • Loading branch information
fuweng11 authored Sep 25, 2024
1 parent 0122aa4 commit b670373
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.service.tenant.InlongTenantService;
import org.apache.inlong.manager.service.user.InlongRoleService;
import org.apache.inlong.manager.service.user.UserService;
import org.apache.inlong.manager.service.workflow.WorkflowService;

import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -166,6 +167,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
private InlongRoleService inlongRoleService;
@Autowired
private TenantUserRoleEntityMapper tenantUserRoleEntityMapper;
@Autowired
private UserService userService;

@Autowired
ScheduleOperator scheduleOperator;
Expand Down Expand Up @@ -501,6 +504,8 @@ public String update(InlongGroupRequest request, String operator) {
LOGGER.error("inlong group not found by groupId={}", groupId);
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
}
userService.checkUser(entity.getInCharges(), operator,
"Current user does not have permission to update group info");
chkUnmodifiableParams(entity, request);
// check whether the current status can be modified
doUpdateCheck(entity, request, operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.service.group.GroupCheckService;
import org.apache.inlong.manager.service.stream.InlongStreamProcessService;
import org.apache.inlong.manager.service.user.UserService;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -138,6 +139,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
private AutowireCapableBeanFactory autowireCapableBeanFactory;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private UserService userService;
// To avoid circular dependencies, you cannot use @Autowired, it will be injected by AutowireCapableBeanFactory
private InlongStreamProcessService streamProcessOperation;

Expand Down Expand Up @@ -449,7 +452,9 @@ public Boolean update(SinkRequest request, String operator) {
throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
}
chkUnmodifiableParams(curEntity, request);
groupCheckService.checkGroupStatus(request.getInlongGroupId(), operator);
InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(request.getInlongGroupId(), operator);
userService.checkUser(groupEntity.getInCharges(), operator,
"Current user does not have permission to update sink info");
// Check whether the stream exist or not
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
request.getInlongGroupId(), request.getInlongStreamId());
Expand Down Expand Up @@ -526,7 +531,9 @@ public Boolean delete(Integer id, Boolean startProcess, String operator) {
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
Preconditions.expectNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());

groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
userService.checkUser(groupEntity.getInCharges(), operator,
"Current user does not have permission to delete sink info");

StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType());
sinkOperator.deleteOpt(entity, operator);
Expand All @@ -553,7 +560,9 @@ public Boolean deleteByKey(String groupId, String streamId, String sinkName,
Preconditions.expectNotNull(entity, String.format("stream sink not exist by groupId=%s streamId=%s sinkName=%s",
groupId, streamId, sinkName));

groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
userService.checkUser(groupEntity.getInCharges(), operator,
"Current user does not have permission to delete sink info");

StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType());
sinkOperator.deleteOpt(entity, operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.service.group.GroupCheckService;
import org.apache.inlong.manager.service.user.UserService;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
Expand Down Expand Up @@ -90,6 +91,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
private StreamSourceFieldEntityMapper sourceFieldMapper;
@Autowired
private GroupCheckService groupCheckService;
@Autowired
private UserService userService;

@Override
@Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
Expand Down Expand Up @@ -296,6 +299,8 @@ public Boolean update(SourceRequest request, String operator) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
String.format("InlongGroup does not exist with InlongGroupId=%s", groupId));
}
userService.checkUser(groupEntity.getInCharges(), operator,
"Current user does not have permission to update source info");
StreamSourceOperator sourceOperator = operatorFactory.getInstance(request.getSourceType());
// Remove id in sourceField when save
List<StreamField> streamFields = request.getFieldList();
Expand Down Expand Up @@ -334,7 +339,8 @@ public Boolean delete(Integer id, String operator) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
String.format("InlongGroup does not exist with InlongGroupId=%s", entity.getInlongGroupId()));
}

userService.checkUser(groupEntity.getInCharges(), operator,
"Current user does not have permission to delete source info");
SourceStatus curStatus = SourceStatus.forCode(entity.getStatus());
SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE;
// if source is frozen|failed|new, or if it is a template source or auto push source, delete directly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.inlong.manager.service.sink.StreamSinkOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.service.user.UserService;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -150,6 +151,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Autowired
@Lazy
private SinkOperatorFactory sinkOperatorFactory;
@Autowired
private UserService userService;

@Transactional(rollbackFor = Throwable.class)
@Override
Expand Down Expand Up @@ -451,6 +454,13 @@ public Boolean update(InlongStreamRequest request, String operator) {
Preconditions.expectNotNull(request, "inlong stream request is empty");
String groupId = request.getInlongGroupId();
Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
InlongGroupEntity groupEntity = groupMapper.selectByGroupIdWithoutTenant(groupId);
if (groupEntity == null) {
throw new BusinessException(String.format("InlongGroup does not exist with InlongGroupId=%s", groupId));
}
userService.checkUser(groupEntity.getInCharges(), operator,
"Current user does not have permission to update stream info");

String streamId = request.getInlongStreamId();
Preconditions.expectNotBlank(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY);

Expand Down Expand Up @@ -514,6 +524,12 @@ public Boolean delete(String groupId, String streamId, String operator) {

// Check if it can be deleted
this.checkGroupStatusIsTemp(groupId);
InlongGroupEntity groupEntity = groupMapper.selectByGroupIdWithoutTenant(groupId);
if (groupEntity == null) {
throw new BusinessException(String.format("InlongGroup does not exist with InlongGroupId=%s", groupId));
}
userService.checkUser(groupEntity.getInCharges(), operator,
"Current user does not have permission to delete stream info");

InlongStreamEntity entity = streamMapper.selectByIdentifier(groupId, streamId);
if (entity == null) {
Expand Down Expand Up @@ -951,6 +967,12 @@ private InlongGroupEntity checkGroupStatusIsTemp(String groupId) {
@Override
public List<BriefMQMessage> listMessages(QueryMessageRequest request, String operator) {
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(request.getGroupId());
if (groupEntity == null) {
throw new BusinessException(
String.format("InlongGroup does not exist with InlongGroupId=%s", request.getGroupId()));
}
userService.checkUser(groupEntity.getInCharges(), operator, String
.format("Current user does not have permission to query message for groupId=%s", request.getGroupId()));
InlongGroupOperator instance = groupOperatorFactory.getInstance(groupEntity.getMqType());
InlongGroupInfo groupInfo = instance.getFromEntity(groupEntity);
InlongStreamInfo inlongStreamInfo = get(request.getGroupId(), request.getStreamId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,13 @@ public interface UserService {
*/
void login(UserLoginRequest req);

/**
* Check the given user is the admin or is one of the in charges.
*
* @param inCharges incharge list
* @param user current user name
* @param errMsg error message
*/
void checkUser(String inCharges, String user, String errMsg);

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@
import org.apache.inlong.manager.pojo.user.UserLoginLockStatus;
import org.apache.inlong.manager.pojo.user.UserLoginRequest;
import org.apache.inlong.manager.pojo.user.UserRequest;
import org.apache.inlong.manager.pojo.user.UserRoleCode;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.shiro.SecurityUtils;
Expand Down Expand Up @@ -351,6 +353,17 @@ public void login(UserLoginRequest req) {
loginLockStatusMap.put(username, userLoginLockStatus);
}

@Override
public void checkUser(String inCharges, String user, String errMsg) {
Set<String> userRoles = LoginUserUtils.getLoginUser().getRoles();
boolean isAdmin = false;
if (CollectionUtils.isNotEmpty(userRoles)) {
isAdmin = userRoles.contains(UserRoleCode.INLONG_ADMIN) || userRoles.contains(UserRoleCode.TENANT_ADMIN);
}
boolean isInCharge = Preconditions.inSeparatedString(user, inCharges, InlongConstants.COMMA);
Preconditions.expectTrue(isInCharge || isAdmin, errMsg);
}

public void removeInChargeForGroup(String user, String operator) {
InlongGroupPageRequest pageRequest = new InlongGroupPageRequest();
pageRequest.setCurrentUser(user);
Expand Down

0 comments on commit b670373

Please sign in to comment.