From bcc9a77f7ced4ab5f731ec0f128ba269bedc75f3 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Wed, 1 Nov 2023 16:43:56 +0800 Subject: [PATCH] [INLONG-9192][Manager] Flat Sort Cluster types --- .../manager/common/consts/SinkType.java | 2 +- .../manager/common/enums/ClusterType.java | 11 ++-- .../dao/mapper/InlongClusterEntityMapper.java | 2 +- .../mappers/InlongClusterEntityMapper.xml | 5 +- .../cluster/es/ElasticsearchClusterDTO.java | 63 ------------------- .../cls/SortClsClusterInfo.java} | 24 +++---- .../cls/SortClsClusterRequest.java} | 19 ++---- .../es/SortEsClusterInfo.java} | 12 ++-- .../es/SortEsClusterRequest.java} | 12 ++-- .../sort/pulsar/SortPulsarClusterInfo.java | 48 ++++++++++++++ .../sort/pulsar/SortPulsarClusterRequest.java | 39 ++++++++++++ ...rator.java => SortClsClusterOperator.java} | 47 ++++---------- ...erator.java => SortEsClusterOperator.java} | 42 ++----------- .../cluster/SortPulsarClusterOperator.java | 60 ++++++++++++++++++ ...bstractStandaloneSinkResourceOperator.java | 5 +- .../cluster/InlongClusterServiceTest.java | 29 +++------ .../sink/StandaloneAutoAssignTest.java | 12 ++-- .../sort-connectors/tubemq/pom.xml | 5 ++ 18 files changed, 225 insertions(+), 212 deletions(-) delete mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterDTO.java rename inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/{sortstandalone/SortStandaloneClusterInfo.java => sort/cls/SortClsClusterInfo.java} (70%) rename inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/{sortstandalone/SortStandaloneClusterRequest.java => sort/cls/SortClsClusterRequest.java} (70%) rename inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/{es/ElasticsearchClusterInfo.java => sort/es/SortEsClusterInfo.java} (81%) rename inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/{es/ElasticsearchClusterRequest.java => sort/es/SortEsClusterRequest.java} (79%) create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterInfo.java create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterRequest.java rename inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/{SortStandaloneClusterOperator.java => SortClsClusterOperator.java} (55%) rename inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/{ElasticsearchClusterOperator.java => SortEsClusterOperator.java} (53%) create mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortPulsarClusterOperator.java diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java index 965e809f519..f0b215ac300 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java @@ -25,7 +25,7 @@ public class SinkType extends StreamType { public static final String HIVE = "HIVE"; public static final String CLICKHOUSE = "CLICKHOUSE"; public static final String HBASE = "HBASE"; - public static final String ELASTICSEARCH = "ELASTICSEARCH"; + public static final String ELASTICSEARCH = "ES"; public static final String HDFS = "HDFS"; public static final String GREENPLUM = "GREENPLUM"; public static final String MYSQL = "MYSQL"; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java index f6a33ff71ed..b48ec2d98da 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java @@ -32,8 +32,10 @@ public class ClusterType { public static final String PULSAR = "PULSAR"; public static final String DATAPROXY = "DATAPROXY"; public static final String KAFKA = "KAFKA"; - public static final String ELASTICSEARCH = "ELASTICSEARCH"; - public static final String SORTSTANDALONE = "SORTSTANDALONE"; + + public static final String SORTES = "SORT_ES"; + public static final String SORTCLS = "SORT_CLS"; + public static final String SORTPULSAR = "SORT_PULSAR"; private static final Set TYPE_SET = new HashSet() { @@ -43,8 +45,9 @@ public class ClusterType { add(ClusterType.PULSAR); add(ClusterType.DATAPROXY); add(ClusterType.KAFKA); - add(ClusterType.ELASTICSEARCH); - add(ClusterType.SORTSTANDALONE); + add(ClusterType.SORTES); + add(ClusterType.SORTCLS); + add(ClusterType.SORTPULSAR); } }; diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java index a0d71abc5ed..9b837005ef2 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java @@ -64,6 +64,6 @@ List selectByKey(@Param("clusterTag") String clusterTag, @P int deleteByPrimaryKey(Integer id); - List selectStandaloneClusterByType(@Param("sinkType") String sinkType); + List selectSortClusterByType(@Param("sinkType") String sinkType); } diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml index 77f0eb37698..eef324e79ab 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml @@ -189,13 +189,12 @@ from inlong_cluster where is_deleted = 0 - select from inlong_cluster - type = 'SORTSTANDALONE' - and find_in_set(#{sinkType, jdbcType=VARCHAR}, ext_tag) + type = #{sinkType, jdbcType=VARCHAR} and is_deleted = 0 diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterDTO.java deleted file mode 100644 index 64c201e75f0..00000000000 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterDTO.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.manager.pojo.cluster.es; - -import org.apache.inlong.manager.common.enums.ErrorCodeEnum; -import org.apache.inlong.manager.common.exceptions.BusinessException; -import org.apache.inlong.manager.common.util.CommonBeanUtils; -import org.apache.inlong.manager.common.util.JsonUtils; - -import io.swagger.annotations.ApiModel; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; -import org.apache.commons.lang3.StringUtils; - -import javax.validation.constraints.NotNull; - -/** - * Elasticsearch cluster info - */ -@Data -@Builder -@NoArgsConstructor -@ApiModel("Elasticsearch cluster info") -public class ElasticsearchClusterDTO { - - /** - * Get the dto instance from the request - */ - public static ElasticsearchClusterDTO getFromRequest(ElasticsearchClusterRequest request, String extParams) { - ElasticsearchClusterDTO dto = StringUtils.isNotBlank(extParams) - ? ElasticsearchClusterDTO.getFromJson(extParams) - : new ElasticsearchClusterDTO(); - return CommonBeanUtils.copyProperties(request, dto, true); - } - - /** - * Get the dto instance from the JSON string. - */ - public static ElasticsearchClusterDTO getFromJson(@NotNull String extParams) { - try { - return JsonUtils.parseObject(extParams, ElasticsearchClusterDTO.class); - } catch (Exception e) { - throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT, - String.format("parse extParams of Elasticsearch Cluster failure: %s", e.getMessage())); - } - } -} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterInfo.java similarity index 70% rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterInfo.java index ea14d2a56b8..d5515c4c93a 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterInfo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.sortstandalone; +package org.apache.inlong.manager.pojo.cluster.sort.cls; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.util.CommonBeanUtils; @@ -24,29 +24,25 @@ import org.apache.inlong.manager.pojo.cluster.ClusterRequest; import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; - -import java.util.Set; +import lombok.experimental.SuperBuilder; @Data +@SuperBuilder @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@JsonTypeDefine(value = ClusterType.SORTSTANDALONE) -@ApiModel("Inlong cluster info for SortStandalone") -public class SortStandaloneClusterInfo extends ClusterInfo { - - @ApiModelProperty(value = "Supported sink types") - private Set supportedSinkTypes; +@JsonTypeDefine(value = ClusterType.SORTCLS) +@ApiModel("Inlong cluster info for SortCls") +public class SortClsClusterInfo extends ClusterInfo { - public SortStandaloneClusterInfo() { - this.setType(ClusterType.SORTSTANDALONE); + public SortClsClusterInfo() { + this.setType(ClusterType.SORTCLS); } @Override public ClusterRequest genRequest() { - return CommonBeanUtils.copyProperties(this, SortStandaloneClusterRequest::new); + return CommonBeanUtils.copyProperties(this, SortClsClusterRequest::new); } -} +} \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterRequest.java similarity index 70% rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterRequest.java index fd8c10e0d09..d10a100821d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterRequest.java @@ -15,32 +15,25 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.sortstandalone; +package org.apache.inlong.manager.pojo.cluster.sort.cls; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.util.JsonTypeDefine; import org.apache.inlong.manager.pojo.cluster.ClusterRequest; import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -import java.util.Set; - @Data @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@JsonTypeDefine(value = ClusterType.SORTSTANDALONE) -@ApiModel("Inlong cluster request for SortStandalone") -public class SortStandaloneClusterRequest extends ClusterRequest { - - @ApiModelProperty(value = "Supported sink types") - private Set supportedSinkTypes; +@JsonTypeDefine(value = ClusterType.SORTCLS) +@ApiModel("Inlong cluster request for SortCls") +public class SortClsClusterRequest extends ClusterRequest { - public SortStandaloneClusterRequest() { - this.setType(ClusterType.SORTSTANDALONE); + public SortClsClusterRequest() { + this.setType(ClusterType.SORTCLS); } - } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterInfo.java similarity index 81% rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterInfo.java rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterInfo.java index fa56998169c..9c1a77fcf15 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterInfo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.es; +package org.apache.inlong.manager.pojo.cluster.sort.es; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.util.CommonBeanUtils; @@ -36,16 +36,16 @@ @SuperBuilder @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@JsonTypeDefine(value = ClusterType.ELASTICSEARCH) +@JsonTypeDefine(value = ClusterType.SORTES) @ApiModel("Inlong cluster info for Elasticsearch") -public class ElasticsearchClusterInfo extends ClusterInfo { +public class SortEsClusterInfo extends ClusterInfo { - public ElasticsearchClusterInfo() { - this.setType(ClusterType.ELASTICSEARCH); + public SortEsClusterInfo() { + this.setType(ClusterType.SORTES); } @Override public ClusterRequest genRequest() { - return CommonBeanUtils.copyProperties(this, ElasticsearchClusterRequest::new); + return CommonBeanUtils.copyProperties(this, SortEsClusterRequest::new); } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterRequest.java similarity index 79% rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterRequest.java index 51e9536bb35..d5a5b2ae59d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterRequest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.es; +package org.apache.inlong.manager.pojo.cluster.sort.es; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.util.JsonTypeDefine; @@ -32,11 +32,11 @@ @Data @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@JsonTypeDefine(value = ClusterType.ELASTICSEARCH) -@ApiModel("Inlong cluster request for Elasticsearch") -public class ElasticsearchClusterRequest extends ClusterRequest { +@JsonTypeDefine(value = ClusterType.SORTES) +@ApiModel("Inlong cluster request for SortEs") +public class SortEsClusterRequest extends ClusterRequest { - public ElasticsearchClusterRequest() { - this.setType(ClusterType.ELASTICSEARCH); + public SortEsClusterRequest() { + this.setType(ClusterType.SORTES); } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterInfo.java new file mode 100644 index 00000000000..63a6d310fbb --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterInfo.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.sort.pulsar; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.cluster.ClusterInfo; +import org.apache.inlong.manager.pojo.cluster.ClusterRequest; + +import io.swagger.annotations.ApiModel; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +@Data +@SuperBuilder +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = ClusterType.SORTPULSAR) +@ApiModel("Inlong cluster info for SortPulsar") +public class SortPulsarClusterInfo extends ClusterInfo { + + public SortPulsarClusterInfo() { + this.setType(ClusterType.SORTPULSAR); + } + + @Override + public ClusterRequest genRequest() { + return CommonBeanUtils.copyProperties(this, SortPulsarClusterRequest::new); + } +} \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterRequest.java new file mode 100644 index 00000000000..a85bb861dd8 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterRequest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.sort.pulsar; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.cluster.ClusterRequest; + +import io.swagger.annotations.ApiModel; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = ClusterType.SORTPULSAR) +@ApiModel("Inlong cluster request for SortPulsar") +public class SortPulsarClusterRequest extends ClusterRequest { + + public SortPulsarClusterRequest() { + this.setType(ClusterType.SORTPULSAR); + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClsClusterOperator.java similarity index 55% rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClsClusterOperator.java index 3cf12837c2c..6454c5ba507 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClsClusterOperator.java @@ -17,7 +17,6 @@ package org.apache.inlong.manager.service.cluster; -import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; @@ -25,32 +24,13 @@ import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.cluster.ClusterRequest; -import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterInfo; -import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest; +import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterInfo; +import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterRequest; -import com.google.common.base.Joiner; -import com.google.common.collect.Sets; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; -import java.util.Set; - -@Slf4j @Service -public class SortStandaloneClusterOperator extends AbstractClusterOperator { - - @Override - protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) { - SortStandaloneClusterRequest standaloneRequest = (SortStandaloneClusterRequest) request; - CommonBeanUtils.copyProperties(standaloneRequest, targetEntity, true); - Set supportedTypes = standaloneRequest.getSupportedSinkTypes(); - if (CollectionUtils.isNotEmpty(supportedTypes)) { - String extTag = Joiner.on(InlongConstants.COMMA).join(supportedTypes); - targetEntity.setExtTag(extTag); - } - } +public class SortClsClusterOperator extends AbstractClusterOperator { @Override public Boolean accept(String clusterType) { @@ -59,7 +39,13 @@ public Boolean accept(String clusterType) { @Override public String getClusterType() { - return ClusterType.SORTSTANDALONE; + return ClusterType.SORTCLS; + } + + @Override + protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) { + SortClsClusterRequest esRequest = (SortClsClusterRequest) request; + CommonBeanUtils.copyProperties(esRequest, targetEntity, true); } @Override @@ -67,15 +53,8 @@ public ClusterInfo getFromEntity(InlongClusterEntity entity) { if (entity == null) { throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND); } - - SortStandaloneClusterInfo clusterInfo = new SortStandaloneClusterInfo(); - CommonBeanUtils.copyProperties(entity, clusterInfo); - String extTag = entity.getExtTag(); - if (StringUtils.isNotBlank(extTag)) { - Set supportedTypes = Sets.newHashSet(extTag.split(InlongConstants.COMMA)); - clusterInfo.setSupportedSinkTypes(supportedTypes); - } - return clusterInfo; + SortClsClusterInfo info = new SortClsClusterInfo(); + CommonBeanUtils.copyProperties(entity, info); + return info; } - } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/ElasticsearchClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortEsClusterOperator.java similarity index 53% rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/ElasticsearchClusterOperator.java rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortEsClusterOperator.java index 9d282080982..11667197ba3 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/ElasticsearchClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortEsClusterOperator.java @@ -24,26 +24,16 @@ import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.cluster.ClusterRequest; -import org.apache.inlong.manager.pojo.cluster.es.ElasticsearchClusterDTO; -import org.apache.inlong.manager.pojo.cluster.es.ElasticsearchClusterInfo; -import org.apache.inlong.manager.pojo.cluster.es.ElasticsearchClusterRequest; +import org.apache.inlong.manager.pojo.cluster.sort.es.SortEsClusterInfo; +import org.apache.inlong.manager.pojo.cluster.sort.es.SortEsClusterRequest; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.util.HashMap; -import java.util.Map; - /** * Elasticsearch cluster operator. */ @Service -public class ElasticsearchClusterOperator extends AbstractClusterOperator { - - @Autowired - private ObjectMapper mapper; +public class SortEsClusterOperator extends AbstractClusterOperator { @Override public Boolean accept(String clusterType) { @@ -52,21 +42,13 @@ public Boolean accept(String clusterType) { @Override public String getClusterType() { - return ClusterType.ELASTICSEARCH; + return ClusterType.SORTES; } @Override protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) { - ElasticsearchClusterRequest esRequest = (ElasticsearchClusterRequest) request; + SortEsClusterRequest esRequest = (SortEsClusterRequest) request; CommonBeanUtils.copyProperties(esRequest, targetEntity, true); - try { - ElasticsearchClusterDTO dto = - ElasticsearchClusterDTO.getFromRequest(esRequest, targetEntity.getExtParams()); - targetEntity.setExtParams(mapper.writeValueAsString(dto)); - } catch (Exception e) { - throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT, - String.format("serialize extParams of Elasticsearch Cluster failure: %s", e.getMessage())); - } } @Override @@ -74,20 +56,8 @@ public ClusterInfo getFromEntity(InlongClusterEntity entity) { if (entity == null) { throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND); } - ElasticsearchClusterInfo info = new ElasticsearchClusterInfo(); + SortEsClusterInfo info = new SortEsClusterInfo(); CommonBeanUtils.copyProperties(entity, info); - if (StringUtils.isNotBlank(entity.getExtParams())) { - ElasticsearchClusterDTO dto = ElasticsearchClusterDTO.getFromJson(entity.getExtParams()); - CommonBeanUtils.copyProperties(dto, info); - } return info; } - - @Override - public Object getClusterInfo(InlongClusterEntity entity) { - ElasticsearchClusterInfo elasticsearchClusterInfo = (ElasticsearchClusterInfo) this.getFromEntity(entity); - Map map = new HashMap<>(); - map.put("url", elasticsearchClusterInfo.getUrl()); - return map; - } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortPulsarClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortPulsarClusterOperator.java new file mode 100644 index 00000000000..94b863ff833 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortPulsarClusterOperator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.cluster; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.InlongClusterEntity; +import org.apache.inlong.manager.pojo.cluster.ClusterInfo; +import org.apache.inlong.manager.pojo.cluster.ClusterRequest; +import org.apache.inlong.manager.pojo.cluster.sort.pulsar.SortPulsarClusterInfo; +import org.apache.inlong.manager.pojo.cluster.sort.pulsar.SortPulsarClusterRequest; + +import org.springframework.stereotype.Service; + +@Service +public class SortPulsarClusterOperator extends AbstractClusterOperator { + + @Override + public Boolean accept(String clusterType) { + return getClusterType().equals(clusterType); + } + + @Override + public String getClusterType() { + return ClusterType.SORTPULSAR; + } + + @Override + protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) { + SortPulsarClusterRequest esRequest = (SortPulsarClusterRequest) request; + CommonBeanUtils.copyProperties(esRequest, targetEntity, true); + } + + @Override + public ClusterInfo getFromEntity(InlongClusterEntity entity) { + if (entity == null) { + throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND); + } + SortPulsarClusterInfo info = new SortPulsarClusterInfo(); + CommonBeanUtils.copyProperties(entity, info); + return info; + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java index a7e3fb9e626..fb671ba987c 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java @@ -46,6 +46,8 @@ public abstract class AbstractStandaloneSinkResourceOperator implements SinkReso @Autowired private InlongGroupEntityMapper groupEntityMapper; + private static final String SORT_PREFIX = "SORT_"; + private Random rand = new Random(); @VisibleForTesting @@ -77,8 +79,9 @@ private String assignFromExist(String dataNodeName) { private String assignFromRelated(String sinkType, String groupId) { InlongGroupEntity group = groupEntityMapper.selectByGroupId(groupId); + String sortClusterType = SORT_PREFIX.concat(sinkType); List clusters = clusterEntityMapper - .selectStandaloneClusterByType(sinkType).stream() + .selectSortClusterByType(sortClusterType).stream() .filter(cluster -> checkCluster(cluster.getClusterTags(), group.getInlongClusterTag())) .collect(Collectors.toList()); diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java index 39106fccdac..353a244dfb3 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java @@ -32,8 +32,8 @@ import org.apache.inlong.manager.pojo.cluster.dataproxy.DataProxyClusterRequest; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest; -import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterInfo; -import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest; +import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterInfo; +import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterRequest; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.UpdateResult; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; @@ -45,9 +45,7 @@ import org.springframework.beans.factory.annotation.Autowired; import java.util.Comparator; -import java.util.HashSet; import java.util.List; -import java.util.Set; /** * Inlong cluster service test for {@link InlongClusterService} @@ -59,11 +57,10 @@ public class InlongClusterServiceTest extends ServiceBaseTest { @Autowired private HeartbeatManager heartbeatManager; - public Integer saveStandaloneCluster(String clusterTag, String clusterName, Set supportedSinkTypes) { - SortStandaloneClusterRequest request = new SortStandaloneClusterRequest(); + public Integer saveStandaloneCluster(String clusterTag, String clusterName) { + SortClsClusterRequest request = new SortClsClusterRequest(); request.setClusterTags(clusterTag); request.setName(clusterName); - request.setSupportedSinkTypes(supportedSinkTypes); request.setInCharges(GLOBAL_OPERATOR); return clusterService.save(request, GLOBAL_OPERATOR); } @@ -329,24 +326,12 @@ public void testPulsarClusterByKey() { public void testStandaloneCluster() { String clusterTag = "standalone_cluster"; String clusterName = "test_standalone"; - String type1 = "type1"; - String type2 = "type2"; - String type3 = "type3"; - Set supportedType = new HashSet<>(); - supportedType.add(type1); - supportedType.add(type2); - supportedType.add(type3); - - Integer id = this.saveStandaloneCluster(clusterTag, clusterName, supportedType); + + Integer id = this.saveStandaloneCluster(clusterTag, clusterName); Assertions.assertNotNull(id); ClusterInfo info = clusterService.get(id, GLOBAL_OPERATOR); - Assertions.assertInstanceOf(SortStandaloneClusterInfo.class, info); - - Set types = ((SortStandaloneClusterInfo) info).getSupportedSinkTypes(); - Assertions.assertTrue(types.contains(type1)); - Assertions.assertTrue(types.contains(type2)); - Assertions.assertTrue(types.contains(type3)); + Assertions.assertInstanceOf(SortClsClusterInfo.class, info); } @Test diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java index 48a1f00399b..ea9a0683146 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java @@ -21,20 +21,18 @@ import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper; -import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest; +import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterRequest; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.sink.SinkInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.service.ServiceBaseTest; import org.apache.inlong.manager.service.cluster.InlongClusterService; -import com.google.common.collect.Sets; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import java.util.List; -import java.util.Set; public class StandaloneAutoAssignTest extends ServiceBaseTest { @@ -55,8 +53,7 @@ public void testAutoAssign() { Integer id = saveClsSink(groupInfo.getInlongGroupId(), streamInfo.getInlongStreamId()); String clusterName = "clsCluster"; - Set types = Sets.newHashSet(SinkType.CLS, SinkType.ELASTICSEARCH); - saveStandaloneCluster(groupInfo.getInlongClusterTag(), clusterName, types); + saveStandaloneCluster(groupInfo.getInlongClusterTag(), clusterName); List sinkInfos = sinkEntityMapper.selectAllConfig(groupInfo.getInlongGroupId(), null); Assertions.assertEquals(1, sinkInfos.size()); @@ -81,11 +78,10 @@ public Integer saveClsSink(String groupId, String streamId) { return clsSinkEntity.getId(); } - public Integer saveStandaloneCluster(String clusterTag, String clusterName, Set supportedSinkTypes) { - SortStandaloneClusterRequest request = new SortStandaloneClusterRequest(); + public Integer saveStandaloneCluster(String clusterTag, String clusterName) { + SortClsClusterRequest request = new SortClsClusterRequest(); request.setClusterTags(clusterTag); request.setName(clusterName); - request.setSupportedSinkTypes(supportedSinkTypes); request.setInCharges(GLOBAL_OPERATOR); return clusterService.save(request, GLOBAL_OPERATOR); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml index ba5cce64a9d..aec6e19919c 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml @@ -47,6 +47,11 @@ com.google.guava guava + + org.apache.inlong + sort-connector-base + ${project.version} +