diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index dab2cdc93ae..a278bc51c0c 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -153,7 +153,12 @@ public Integer save(SinkRequest request, String operator) { // Check if it can be added String groupId = request.getInlongGroupId(); groupCheckService.checkGroupStatus(groupId, operator); - + InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId); + if (groupEntity == null) { + throw new BusinessException(String.format("InlongGroup does not exist with InlongGroupId=%s", groupId)); + } + userService.checkUser(groupEntity.getInCharges(), operator, + "Current user does not have permission to create sink info"); // Make sure that there is no same sink name under the current groupId and streamId String streamId = request.getInlongStreamId(); String sinkName = request.getSinkName(); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index 324c9969103..04e39415000 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -105,6 +105,12 @@ public Integer save(SourceRequest request, String operator) { // Check if it can be added String groupId = request.getInlongGroupId(); String streamId = request.getInlongStreamId(); + InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId); + if (groupEntity == null) { + throw new BusinessException(String.format("InlongGroup does not exist with InlongGroupId=%s", groupId)); + } + userService.checkUser(groupEntity.getInCharges(), operator, + "Current user does not have permission to create source info"); InlongStreamEntity streamEntity = groupCheckService.checkStreamStatus(groupId, streamId, operator); String sourceName = request.getSourceName(); List existList = sourceMapper.selectByRelatedId(groupId, streamId, sourceName); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java index 101c39d24fb..46f8fe5e946 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java @@ -166,7 +166,12 @@ public Integer save(InlongStreamRequest request, String operator) { // Check if it can be added checkGroupStatusIsTemp(groupId); - + InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId); + if (groupEntity == null) { + throw new BusinessException(String.format("InlongGroup does not exist with InlongGroupId=%s", groupId)); + } + userService.checkUser(groupEntity.getInCharges(), operator, + "Current user does not have permission to create stream info"); // The streamId under the same groupId cannot be repeated Integer count = streamMapper.selectExistByIdentifier(groupId, streamId); if (count >= 1) { diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java index 5032057db0e..0bc5e9c54a3 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java @@ -297,7 +297,7 @@ private void prepareGroupId(String groupId) { request.setVersion(InlongConstants.INITIAL_VERSION); request.setName("test_group_name"); request.setMqType(ClusterType.PULSAR); - request.setInCharges(TEST_CREATOR); + request.setInCharges(GLOBAL_OPERATOR); List extList = new ArrayList<>(); InlongGroupExtInfo ext1 = InlongGroupExtInfo .builder() @@ -315,7 +315,7 @@ private void prepareGroupId(String groupId) { extList.add(ext1); extList.add(ext2); request.setExtList(extList); - groupService.save(request, "test operator"); + groupService.save(request, GLOBAL_OPERATOR); } private void prepareStreamId(String groupId, String streamId, String topic) { @@ -333,7 +333,7 @@ private void prepareStreamId(String groupId, String streamId, String topic) { ext.setKeyName(ClusterSwitch.BACKUP_MQ_RESOURCE); ext.setKeyValue("backup_" + topic); request.setExtList(extInfos); - streamService.save(request, "test_operator"); + streamService.save(request, GLOBAL_OPERATOR); } private void prepareCluster(String clusterName, String clusterTag) { @@ -382,7 +382,7 @@ private void prepareTask(String taskName, String groupId, String clusterName, St properties.put("delimiter", "|"); properties.put("dataType", "text"); request.setProperties(properties); - streamSinkService.save(request, TEST_CREATOR); + streamSinkService.save(request, GLOBAL_OPERATOR); } }