From 632a477ee73b0a98910b14310b42b7b19938a8b0 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Mon, 6 Nov 2023 17:18:09 +0800 Subject: [PATCH] [INLONG-9192][Manager] Flat Sort Cluster types (#9193) --- .../manager/common/consts/SinkType.java | 2 +- .../manager/common/enums/ClusterType.java | 11 +- .../dao/mapper/InlongClusterEntityMapper.java | 2 - .../mappers/InlongClusterEntityMapper.xml | 10 -- .../cluster/sort/cls/SortClsClusterInfo.java | 39 +++++++ .../cls/SortClsClusterRequest.java} | 21 ++-- .../es/SortEsClusterInfo.java} | 21 +--- .../es/SortEsClusterRequest.java} | 14 +-- .../sort/pulsar/SortPulsarClusterInfo.java | 40 +++++++ .../sort/pulsar/SortPulsarClusterRequest.java | 39 +++++++ .../inlong/manager/pojo/sink/cls/ClsSink.java | 4 - .../pojo/sink/es/ElasticsearchSink.java | 4 +- .../manager/pojo/sink/pulsar/PulsarSink.java | 4 - .../BaseSortClusterDTO.java} | 31 ++--- .../BaseSortClusterInfo.java} | 27 ++--- .../pojo/sort/BaseSortClusterRequest.java | 36 ++++++ .../cluster/ElasticsearchClusterOperator.java | 93 --------------- .../cluster/InlongClusterOperator.java | 4 +- .../service/cluster/SortClusterOperator.java | 106 ++++++++++++++++++ .../SortStandaloneClusterOperator.java | 81 ------------- ...bstractStandaloneSinkResourceOperator.java | 5 +- .../cluster/InlongClusterServiceTest.java | 29 ++--- .../sink/StandaloneAutoAssignTest.java | 12 +- .../sort-connectors/tubemq/pom.xml | 5 + 24 files changed, 330 insertions(+), 310 deletions(-) create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterInfo.java rename inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/{sortstandalone/SortStandaloneClusterRequest.java => sort/cls/SortClsClusterRequest.java} (66%) rename inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/{es/ElasticsearchClusterInfo.java => sort/es/SortEsClusterInfo.java} (65%) rename inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/{es/ElasticsearchClusterRequest.java => sort/es/SortEsClusterRequest.java} (75%) create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterInfo.java create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterRequest.java rename inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/{cluster/es/ElasticsearchClusterDTO.java => sort/BaseSortClusterDTO.java} (54%) rename inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/{cluster/sortstandalone/SortStandaloneClusterInfo.java => sort/BaseSortClusterInfo.java} (58%) create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterRequest.java delete mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/ElasticsearchClusterOperator.java create mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java delete mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java index 965e809f519..f0b215ac300 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java @@ -25,7 +25,7 @@ public class SinkType extends StreamType { public static final String HIVE = "HIVE"; public static final String CLICKHOUSE = "CLICKHOUSE"; public static final String HBASE = "HBASE"; - public static final String ELASTICSEARCH = "ELASTICSEARCH"; + public static final String ELASTICSEARCH = "ES"; public static final String HDFS = "HDFS"; public static final String GREENPLUM = "GREENPLUM"; public static final String MYSQL = "MYSQL"; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java index f6a33ff71ed..6d010d25cc4 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java @@ -32,8 +32,10 @@ public class ClusterType { public static final String PULSAR = "PULSAR"; public static final String DATAPROXY = "DATAPROXY"; public static final String KAFKA = "KAFKA"; - public static final String ELASTICSEARCH = "ELASTICSEARCH"; - public static final String SORTSTANDALONE = "SORTSTANDALONE"; + + public static final String SORT_ES = "SORT_ES"; + public static final String SORT_CLS = "SORT_CLS"; + public static final String SORT_PULSAR = "SORT_PULSAR"; private static final Set TYPE_SET = new HashSet() { @@ -43,8 +45,9 @@ public class ClusterType { add(ClusterType.PULSAR); add(ClusterType.DATAPROXY); add(ClusterType.KAFKA); - add(ClusterType.ELASTICSEARCH); - add(ClusterType.SORTSTANDALONE); + add(ClusterType.SORT_ES); + add(ClusterType.SORT_CLS); + add(ClusterType.SORT_PULSAR); } }; diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java index a0d71abc5ed..ce2644691d1 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java @@ -64,6 +64,4 @@ List selectByKey(@Param("clusterTag") String clusterTag, @P int deleteByPrimaryKey(Integer id); - List selectStandaloneClusterByType(@Param("sinkType") String sinkType); - } diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml index 77f0eb37698..f554faf64ae 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml @@ -189,16 +189,6 @@ from inlong_cluster where is_deleted = 0 - update inlong_cluster diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterInfo.java new file mode 100644 index 00000000000..96eedc4667a --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterInfo.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.sort.cls; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo; + +import io.swagger.annotations.ApiModel; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = ClusterType.SORT_CLS) +@ApiModel("Inlong cluster info for SortCls") +public class SortClsClusterInfo extends BaseSortClusterInfo { + + public SortClsClusterInfo() { + this.setType(ClusterType.SORT_CLS); + } +} \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterRequest.java similarity index 66% rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterRequest.java index fd8c10e0d09..5cf261c9329 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/cls/SortClsClusterRequest.java @@ -15,32 +15,25 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.sortstandalone; +package org.apache.inlong.manager.pojo.cluster.sort.cls; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.util.JsonTypeDefine; -import org.apache.inlong.manager.pojo.cluster.ClusterRequest; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest; import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -import java.util.Set; - @Data @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@JsonTypeDefine(value = ClusterType.SORTSTANDALONE) -@ApiModel("Inlong cluster request for SortStandalone") -public class SortStandaloneClusterRequest extends ClusterRequest { - - @ApiModelProperty(value = "Supported sink types") - private Set supportedSinkTypes; +@JsonTypeDefine(value = ClusterType.SORT_CLS) +@ApiModel("Inlong cluster request for SortCls") +public class SortClsClusterRequest extends BaseSortClusterRequest { - public SortStandaloneClusterRequest() { - this.setType(ClusterType.SORTSTANDALONE); + public SortClsClusterRequest() { + this.setType(ClusterType.SORT_CLS); } - } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterInfo.java similarity index 65% rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterInfo.java rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterInfo.java index fa56998169c..15eb6ef4f0c 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterInfo.java @@ -15,37 +15,28 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.es; +package org.apache.inlong.manager.pojo.cluster.sort.es; import org.apache.inlong.manager.common.enums.ClusterType; -import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonTypeDefine; -import org.apache.inlong.manager.pojo.cluster.ClusterInfo; -import org.apache.inlong.manager.pojo.cluster.ClusterRequest; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo; import io.swagger.annotations.ApiModel; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -import lombok.experimental.SuperBuilder; /** * Elasticsearch cluster info */ @Data -@SuperBuilder @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@JsonTypeDefine(value = ClusterType.ELASTICSEARCH) +@JsonTypeDefine(value = ClusterType.SORT_ES) @ApiModel("Inlong cluster info for Elasticsearch") -public class ElasticsearchClusterInfo extends ClusterInfo { +public class SortEsClusterInfo extends BaseSortClusterInfo { - public ElasticsearchClusterInfo() { - this.setType(ClusterType.ELASTICSEARCH); - } - - @Override - public ClusterRequest genRequest() { - return CommonBeanUtils.copyProperties(this, ElasticsearchClusterRequest::new); + public SortEsClusterInfo() { + this.setType(ClusterType.SORT_ES); } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterRequest.java similarity index 75% rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterRequest.java index 51e9536bb35..b51e0cd844e 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/es/SortEsClusterRequest.java @@ -15,11 +15,11 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.es; +package org.apache.inlong.manager.pojo.cluster.sort.es; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.util.JsonTypeDefine; -import org.apache.inlong.manager.pojo.cluster.ClusterRequest; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest; import io.swagger.annotations.ApiModel; import lombok.Data; @@ -32,11 +32,11 @@ @Data @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@JsonTypeDefine(value = ClusterType.ELASTICSEARCH) -@ApiModel("Inlong cluster request for Elasticsearch") -public class ElasticsearchClusterRequest extends ClusterRequest { +@JsonTypeDefine(value = ClusterType.SORT_ES) +@ApiModel("Inlong cluster request for SortEs") +public class SortEsClusterRequest extends BaseSortClusterRequest { - public ElasticsearchClusterRequest() { - this.setType(ClusterType.ELASTICSEARCH); + public SortEsClusterRequest() { + this.setType(ClusterType.SORT_ES); } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterInfo.java new file mode 100644 index 00000000000..0eaffe866a8 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterInfo.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.sort.pulsar; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo; + +import io.swagger.annotations.ApiModel; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = ClusterType.SORT_PULSAR) +@ApiModel("Inlong cluster info for SortPulsar") +public class SortPulsarClusterInfo extends BaseSortClusterInfo { + + public SortPulsarClusterInfo() { + this.setType(ClusterType.SORT_PULSAR); + } + +} \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterRequest.java new file mode 100644 index 00000000000..f8be4ad6147 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/pulsar/SortPulsarClusterRequest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.sort.pulsar; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest; + +import io.swagger.annotations.ApiModel; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = ClusterType.SORT_PULSAR) +@ApiModel("Inlong cluster request for SortPulsar") +public class SortPulsarClusterRequest extends BaseSortClusterRequest { + + public SortPulsarClusterRequest() { + this.setType(ClusterType.SORT_PULSAR); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java index 1668da82b88..275a2428383 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java @@ -25,18 +25,14 @@ import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -import lombok.experimental.SuperBuilder; /** * Cloud log service sink info */ @Data -@SuperBuilder -@AllArgsConstructor @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) @ApiModel(value = "Cloud log service sink info") diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java index 27a10b3cf50..5f92dddbebf 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java @@ -25,7 +25,6 @@ import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; @@ -35,11 +34,10 @@ * Elasticsearch sink info */ @Data -@SuperBuilder -@AllArgsConstructor @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) @ApiModel(value = "Elasticsearch sink info") +@SuperBuilder @JsonTypeDefine(value = SinkType.ELASTICSEARCH) public class ElasticsearchSink extends StreamSink { diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java index a29cd334609..cebb5f80661 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java @@ -25,18 +25,14 @@ import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -import lombok.experimental.SuperBuilder; /** * Pulsar sink info */ @Data -@SuperBuilder -@AllArgsConstructor @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) @ApiModel(value = "Pulsar sink info") diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterDTO.java similarity index 54% rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterDTO.java rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterDTO.java index 64c201e75f0..6d70512e7de 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/es/ElasticsearchClusterDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterDTO.java @@ -15,49 +15,38 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.es; +package org.apache.inlong.manager.pojo.sort; -import org.apache.inlong.manager.common.enums.ErrorCodeEnum; -import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; import io.swagger.annotations.ApiModel; -import lombok.Builder; import lombok.Data; -import lombok.NoArgsConstructor; import org.apache.commons.lang3.StringUtils; import javax.validation.constraints.NotNull; /** - * Elasticsearch cluster info + * Base sort cluster DTO */ @Data -@Builder -@NoArgsConstructor -@ApiModel("Elasticsearch cluster info") -public class ElasticsearchClusterDTO { +@ApiModel("Base sort cluster info") +public class BaseSortClusterDTO { /** * Get the dto instance from the request */ - public static ElasticsearchClusterDTO getFromRequest(ElasticsearchClusterRequest request, String extParams) { - ElasticsearchClusterDTO dto = StringUtils.isNotBlank(extParams) - ? ElasticsearchClusterDTO.getFromJson(extParams) - : new ElasticsearchClusterDTO(); + public static BaseSortClusterDTO getFromRequest(BaseSortClusterRequest request, String extParams) throws Exception { + BaseSortClusterDTO dto = StringUtils.isNotBlank(extParams) + ? BaseSortClusterDTO.getFromJson(extParams) + : new BaseSortClusterDTO(); return CommonBeanUtils.copyProperties(request, dto, true); } /** * Get the dto instance from the JSON string. */ - public static ElasticsearchClusterDTO getFromJson(@NotNull String extParams) { - try { - return JsonUtils.parseObject(extParams, ElasticsearchClusterDTO.class); - } catch (Exception e) { - throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT, - String.format("parse extParams of Elasticsearch Cluster failure: %s", e.getMessage())); - } + public static BaseSortClusterDTO getFromJson(@NotNull String extParams) { + return JsonUtils.parseObject(extParams, BaseSortClusterDTO.class); } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterInfo.java similarity index 58% rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterInfo.java index ea14d2a56b8..c33ec2d1d09 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterInfo.java @@ -15,38 +15,27 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.cluster.sortstandalone; +package org.apache.inlong.manager.pojo.sort; -import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.util.CommonBeanUtils; -import org.apache.inlong.manager.common.util.JsonTypeDefine; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; -import org.apache.inlong.manager.pojo.cluster.ClusterRequest; import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -import java.util.Set; - +/** + * Inlong base sort cluster info + */ @Data @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@JsonTypeDefine(value = ClusterType.SORTSTANDALONE) -@ApiModel("Inlong cluster info for SortStandalone") -public class SortStandaloneClusterInfo extends ClusterInfo { - - @ApiModelProperty(value = "Supported sink types") - private Set supportedSinkTypes; - - public SortStandaloneClusterInfo() { - this.setType(ClusterType.SORTSTANDALONE); - } +@ApiModel("Inlong base sort cluster info") +public class BaseSortClusterInfo extends ClusterInfo { @Override - public ClusterRequest genRequest() { - return CommonBeanUtils.copyProperties(this, SortStandaloneClusterRequest::new); + public BaseSortClusterRequest genRequest() { + return CommonBeanUtils.copyProperties(this, BaseSortClusterRequest::new); } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterRequest.java new file mode 100644 index 00000000000..1edec5dc5ca --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/BaseSortClusterRequest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sort; + +import org.apache.inlong.manager.pojo.cluster.ClusterRequest; + +import io.swagger.annotations.ApiModel; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * Inlong base sort cluster request + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel("Inlong base sort cluster request") +public class BaseSortClusterRequest extends ClusterRequest { + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/ElasticsearchClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/ElasticsearchClusterOperator.java deleted file mode 100644 index 9d282080982..00000000000 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/ElasticsearchClusterOperator.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.manager.service.cluster; - -import org.apache.inlong.manager.common.enums.ClusterType; -import org.apache.inlong.manager.common.enums.ErrorCodeEnum; -import org.apache.inlong.manager.common.exceptions.BusinessException; -import org.apache.inlong.manager.common.util.CommonBeanUtils; -import org.apache.inlong.manager.dao.entity.InlongClusterEntity; -import org.apache.inlong.manager.pojo.cluster.ClusterInfo; -import org.apache.inlong.manager.pojo.cluster.ClusterRequest; -import org.apache.inlong.manager.pojo.cluster.es.ElasticsearchClusterDTO; -import org.apache.inlong.manager.pojo.cluster.es.ElasticsearchClusterInfo; -import org.apache.inlong.manager.pojo.cluster.es.ElasticsearchClusterRequest; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.util.HashMap; -import java.util.Map; - -/** - * Elasticsearch cluster operator. - */ -@Service -public class ElasticsearchClusterOperator extends AbstractClusterOperator { - - @Autowired - private ObjectMapper mapper; - - @Override - public Boolean accept(String clusterType) { - return getClusterType().equals(clusterType); - } - - @Override - public String getClusterType() { - return ClusterType.ELASTICSEARCH; - } - - @Override - protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) { - ElasticsearchClusterRequest esRequest = (ElasticsearchClusterRequest) request; - CommonBeanUtils.copyProperties(esRequest, targetEntity, true); - try { - ElasticsearchClusterDTO dto = - ElasticsearchClusterDTO.getFromRequest(esRequest, targetEntity.getExtParams()); - targetEntity.setExtParams(mapper.writeValueAsString(dto)); - } catch (Exception e) { - throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT, - String.format("serialize extParams of Elasticsearch Cluster failure: %s", e.getMessage())); - } - } - - @Override - public ClusterInfo getFromEntity(InlongClusterEntity entity) { - if (entity == null) { - throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND); - } - ElasticsearchClusterInfo info = new ElasticsearchClusterInfo(); - CommonBeanUtils.copyProperties(entity, info); - if (StringUtils.isNotBlank(entity.getExtParams())) { - ElasticsearchClusterDTO dto = ElasticsearchClusterDTO.getFromJson(entity.getExtParams()); - CommonBeanUtils.copyProperties(dto, info); - } - return info; - } - - @Override - public Object getClusterInfo(InlongClusterEntity entity) { - ElasticsearchClusterInfo elasticsearchClusterInfo = (ElasticsearchClusterInfo) this.getFromEntity(entity); - Map map = new HashMap<>(); - map.put("url", elasticsearchClusterInfo.getUrl()); - return map; - } -} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java index 516f8943288..db9fb0b336c 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java @@ -36,7 +36,9 @@ public interface InlongClusterOperator { * * @return cluster type string */ - String getClusterType(); + default String getClusterType() { + return null; + } /** * Save the inlong cluster info. diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java new file mode 100644 index 00000000000..dbb889565ef --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.cluster; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.dao.entity.InlongClusterEntity; +import org.apache.inlong.manager.pojo.cluster.ClusterInfo; +import org.apache.inlong.manager.pojo.cluster.ClusterRequest; +import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterInfo; +import org.apache.inlong.manager.pojo.cluster.sort.es.SortEsClusterInfo; +import org.apache.inlong.manager.pojo.cluster.sort.pulsar.SortPulsarClusterInfo; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterDTO; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.HashSet; +import java.util.Set; + +@Service +public class SortClusterOperator extends AbstractClusterOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(SortClusterOperator.class); + + private static final Set SORT_CLUSTER_SET = new HashSet() { + + { + add(ClusterType.SORT_CLS); + add(ClusterType.SORT_PULSAR); + add(ClusterType.SORT_ES); + } + }; + + @Autowired + private ObjectMapper objectMapper; + + @Override + public Boolean accept(String clusterType) { + return SORT_CLUSTER_SET.contains(clusterType); + } + + @Override + protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) { + BaseSortClusterRequest clusterRequest = (BaseSortClusterRequest) request; + CommonBeanUtils.copyProperties(clusterRequest, targetEntity, true); + try { + BaseSortClusterDTO dto = BaseSortClusterDTO.getFromRequest(clusterRequest, targetEntity.getExtParams()); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + LOGGER.info("success to set entity for sort cluster"); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); + } + } + + @Override + public ClusterInfo getFromEntity(InlongClusterEntity entity) { + Preconditions.expectNotNull(entity, ErrorCodeEnum.CLUSTER_NOT_FOUND.getMessage()); + + ClusterInfo sortClusterInfo; + switch (entity.getType()) { + case ClusterType.SORT_CLS: + sortClusterInfo = new SortClsClusterInfo(); + break; + case ClusterType.SORT_PULSAR: + sortClusterInfo = new SortPulsarClusterInfo(); + break; + case ClusterType.SORT_ES: + sortClusterInfo = new SortEsClusterInfo(); + break; + default: + throw new BusinessException("unsupported cluster type " + entity.getType()); + } + + CommonBeanUtils.copyProperties(entity, sortClusterInfo); + if (StringUtils.isNotBlank(entity.getExtParams())) { + BaseSortClusterDTO dto = BaseSortClusterDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(dto, sortClusterInfo); + } + return sortClusterInfo; + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java deleted file mode 100644 index 3cf12837c2c..00000000000 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.manager.service.cluster; - -import org.apache.inlong.manager.common.consts.InlongConstants; -import org.apache.inlong.manager.common.enums.ClusterType; -import org.apache.inlong.manager.common.enums.ErrorCodeEnum; -import org.apache.inlong.manager.common.exceptions.BusinessException; -import org.apache.inlong.manager.common.util.CommonBeanUtils; -import org.apache.inlong.manager.dao.entity.InlongClusterEntity; -import org.apache.inlong.manager.pojo.cluster.ClusterInfo; -import org.apache.inlong.manager.pojo.cluster.ClusterRequest; -import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterInfo; -import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest; - -import com.google.common.base.Joiner; -import com.google.common.collect.Sets; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.springframework.stereotype.Service; - -import java.util.Set; - -@Slf4j -@Service -public class SortStandaloneClusterOperator extends AbstractClusterOperator { - - @Override - protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) { - SortStandaloneClusterRequest standaloneRequest = (SortStandaloneClusterRequest) request; - CommonBeanUtils.copyProperties(standaloneRequest, targetEntity, true); - Set supportedTypes = standaloneRequest.getSupportedSinkTypes(); - if (CollectionUtils.isNotEmpty(supportedTypes)) { - String extTag = Joiner.on(InlongConstants.COMMA).join(supportedTypes); - targetEntity.setExtTag(extTag); - } - } - - @Override - public Boolean accept(String clusterType) { - return getClusterType().equals(clusterType); - } - - @Override - public String getClusterType() { - return ClusterType.SORTSTANDALONE; - } - - @Override - public ClusterInfo getFromEntity(InlongClusterEntity entity) { - if (entity == null) { - throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND); - } - - SortStandaloneClusterInfo clusterInfo = new SortStandaloneClusterInfo(); - CommonBeanUtils.copyProperties(entity, clusterInfo); - String extTag = entity.getExtTag(); - if (StringUtils.isNotBlank(extTag)) { - Set supportedTypes = Sets.newHashSet(extTag.split(InlongConstants.COMMA)); - clusterInfo.setSupportedSinkTypes(supportedTypes); - } - return clusterInfo; - } - -} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java index a7e3fb9e626..61187e6f8dd 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java @@ -46,6 +46,8 @@ public abstract class AbstractStandaloneSinkResourceOperator implements SinkReso @Autowired private InlongGroupEntityMapper groupEntityMapper; + private static final String SORT_PREFIX = "SORT_"; + private Random rand = new Random(); @VisibleForTesting @@ -77,8 +79,9 @@ private String assignFromExist(String dataNodeName) { private String assignFromRelated(String sinkType, String groupId) { InlongGroupEntity group = groupEntityMapper.selectByGroupId(groupId); + String sortClusterType = SORT_PREFIX.concat(sinkType); List clusters = clusterEntityMapper - .selectStandaloneClusterByType(sinkType).stream() + .selectByKey(null, null, sortClusterType).stream() .filter(cluster -> checkCluster(cluster.getClusterTags(), group.getInlongClusterTag())) .collect(Collectors.toList()); diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java index 39106fccdac..353a244dfb3 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java @@ -32,8 +32,8 @@ import org.apache.inlong.manager.pojo.cluster.dataproxy.DataProxyClusterRequest; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest; -import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterInfo; -import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest; +import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterInfo; +import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterRequest; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.UpdateResult; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; @@ -45,9 +45,7 @@ import org.springframework.beans.factory.annotation.Autowired; import java.util.Comparator; -import java.util.HashSet; import java.util.List; -import java.util.Set; /** * Inlong cluster service test for {@link InlongClusterService} @@ -59,11 +57,10 @@ public class InlongClusterServiceTest extends ServiceBaseTest { @Autowired private HeartbeatManager heartbeatManager; - public Integer saveStandaloneCluster(String clusterTag, String clusterName, Set supportedSinkTypes) { - SortStandaloneClusterRequest request = new SortStandaloneClusterRequest(); + public Integer saveStandaloneCluster(String clusterTag, String clusterName) { + SortClsClusterRequest request = new SortClsClusterRequest(); request.setClusterTags(clusterTag); request.setName(clusterName); - request.setSupportedSinkTypes(supportedSinkTypes); request.setInCharges(GLOBAL_OPERATOR); return clusterService.save(request, GLOBAL_OPERATOR); } @@ -329,24 +326,12 @@ public void testPulsarClusterByKey() { public void testStandaloneCluster() { String clusterTag = "standalone_cluster"; String clusterName = "test_standalone"; - String type1 = "type1"; - String type2 = "type2"; - String type3 = "type3"; - Set supportedType = new HashSet<>(); - supportedType.add(type1); - supportedType.add(type2); - supportedType.add(type3); - - Integer id = this.saveStandaloneCluster(clusterTag, clusterName, supportedType); + + Integer id = this.saveStandaloneCluster(clusterTag, clusterName); Assertions.assertNotNull(id); ClusterInfo info = clusterService.get(id, GLOBAL_OPERATOR); - Assertions.assertInstanceOf(SortStandaloneClusterInfo.class, info); - - Set types = ((SortStandaloneClusterInfo) info).getSupportedSinkTypes(); - Assertions.assertTrue(types.contains(type1)); - Assertions.assertTrue(types.contains(type2)); - Assertions.assertTrue(types.contains(type3)); + Assertions.assertInstanceOf(SortClsClusterInfo.class, info); } @Test diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java index 48a1f00399b..ea9a0683146 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/sink/StandaloneAutoAssignTest.java @@ -21,20 +21,18 @@ import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper; -import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest; +import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterRequest; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.sink.SinkInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.service.ServiceBaseTest; import org.apache.inlong.manager.service.cluster.InlongClusterService; -import com.google.common.collect.Sets; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import java.util.List; -import java.util.Set; public class StandaloneAutoAssignTest extends ServiceBaseTest { @@ -55,8 +53,7 @@ public void testAutoAssign() { Integer id = saveClsSink(groupInfo.getInlongGroupId(), streamInfo.getInlongStreamId()); String clusterName = "clsCluster"; - Set types = Sets.newHashSet(SinkType.CLS, SinkType.ELASTICSEARCH); - saveStandaloneCluster(groupInfo.getInlongClusterTag(), clusterName, types); + saveStandaloneCluster(groupInfo.getInlongClusterTag(), clusterName); List sinkInfos = sinkEntityMapper.selectAllConfig(groupInfo.getInlongGroupId(), null); Assertions.assertEquals(1, sinkInfos.size()); @@ -81,11 +78,10 @@ public Integer saveClsSink(String groupId, String streamId) { return clsSinkEntity.getId(); } - public Integer saveStandaloneCluster(String clusterTag, String clusterName, Set supportedSinkTypes) { - SortStandaloneClusterRequest request = new SortStandaloneClusterRequest(); + public Integer saveStandaloneCluster(String clusterTag, String clusterName) { + SortClsClusterRequest request = new SortClsClusterRequest(); request.setClusterTags(clusterTag); request.setName(clusterName); - request.setSupportedSinkTypes(supportedSinkTypes); request.setInCharges(GLOBAL_OPERATOR); return clusterService.save(request, GLOBAL_OPERATOR); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml index ba5cce64a9d..aec6e19919c 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml @@ -47,6 +47,11 @@ com.google.guava guava + + org.apache.inlong + sort-connector-base + ${project.version} +