Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Dec 1, 2023
1 parent 915acce commit 133dd96
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package org.apache.inlong.manager.common.consts;

import org.apache.inlong.manager.common.auth.SecretTokenAuthentication;
import org.apache.inlong.manager.common.enums.ClusterType;

import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

Expand Down Expand Up @@ -74,10 +79,18 @@ public class SinkType extends StreamType {
@SupportSortType(sortType = SortType.SORT_STANDALONE)
public static final String CLS = "CLS";

public static final Map<String, String> SINK_TO_CLUSTER = new HashMap<>();

public static final Set<String> SORT_FLINK_SINK = new HashSet<>();

public static final Set<String> SORT_STANDALONE_SINK = new HashSet<>();

static {
SINK_TO_CLUSTER.put(CLS, ClusterType.SORT_CLS);
SINK_TO_CLUSTER.put(ELASTICSEARCH, ClusterType.SORT_ES);
SINK_TO_CLUSTER.put(PULSAR, ClusterType.SORT_PULSAR);
}

static {
SinkType obj = new SinkType();
Class<? extends SinkType> clazz = obj.getClass();
Expand All @@ -98,4 +111,7 @@ public static boolean containSortFlinkSink(List<String> sinkTypes) {
return sinkTypes.stream().anyMatch(SORT_STANDALONE_SINK::contains);
}

public static String relatedSortClusterType(String sinkType) {
return SINK_TO_CLUSTER.get(sinkType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.apache.inlong.manager.common.auth.Authentication.AuthType;
import org.apache.inlong.manager.common.auth.SecretTokenAuthentication;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupMode;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
Expand Down Expand Up @@ -705,6 +707,13 @@ public Boolean startTagSwitch(String groupId, String clusterTag) {

InlongGroupInfo groupInfo = this.get(groupId);

// check if the group mode is data sync mode
if (groupInfo.getInlongGroupMode() == 1) {
String errMSg = String.format("no need to switch sync mode group = {}", groupId);
LOGGER.error(errMSg);
throw new BusinessException(errMSg);
}

// check if the group is under switching
List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
Set<String> keys = groupExt.stream()
Expand Down Expand Up @@ -736,7 +745,8 @@ public Boolean startTagSwitch(String groupId, String clusterTag) {
for (StreamSink sink : sinks) {
String clusterName = sink.getInlongClusterName();
InlongClusterEntity clusterEntity =
clusterEntityMapper.selectByNameAndType(clusterName, null);
clusterEntityMapper.selectByNameAndType(clusterName,
SinkType.relatedSortClusterType(sink.getSinkType()));
if (clusterEntity == null) {
String errMsg = String.format("find no cluster with name=[%s]", clusterName);
LOGGER.error(errMsg);
Expand Down Expand Up @@ -777,6 +787,14 @@ public Boolean finishTagSwitch(String groupId) {
InlongGroupInfo groupInfo = this.get(groupId);
UserInfo userInfo = LoginUserUtils.getLoginUser();

// check whether the current status supports modification
GroupStatus curStatus = GroupStatus.forCode(groupInfo.getStatus());
if (GroupStatus.notAllowedUpdate(curStatus)) {
String errMsg = String.format("Current status=%s is not allowed to update", curStatus);
LOGGER.error(errMsg);
throw new BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, errMsg);
}

// check if the group is under switching
List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
Map<String, InlongGroupExtInfo> extInfoMap = groupExt.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.resource.sink;

import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
Expand Down Expand Up @@ -79,7 +80,7 @@ private String assignFromExist(String dataNodeName) {

private String assignFromRelated(String sinkType, String groupId) {
InlongGroupEntity group = groupEntityMapper.selectByGroupId(groupId);
String sortClusterType = SORT_PREFIX.concat(sinkType);
String sortClusterType = SinkType.relatedSortClusterType(sinkType);
List<InlongClusterEntity> clusters = clusterEntityMapper
.selectByKey(null, null, sortClusterType).stream()
.filter(cluster -> checkCluster(cluster.getClusterTags(), group.getInlongClusterTag()))
Expand Down

0 comments on commit 133dd96

Please sign in to comment.