Skip to content

Commit

Permalink
[INLONG-9192][Manager] Flat Sort Cluster types
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Nov 1, 2023
1 parent e136f2d commit bcc9a77
Show file tree
Hide file tree
Showing 18 changed files with 225 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class SinkType extends StreamType {
public static final String HIVE = "HIVE";
public static final String CLICKHOUSE = "CLICKHOUSE";
public static final String HBASE = "HBASE";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
public static final String ELASTICSEARCH = "ES";
public static final String HDFS = "HDFS";
public static final String GREENPLUM = "GREENPLUM";
public static final String MYSQL = "MYSQL";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ public class ClusterType {
public static final String PULSAR = "PULSAR";
public static final String DATAPROXY = "DATAPROXY";
public static final String KAFKA = "KAFKA";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
public static final String SORTSTANDALONE = "SORTSTANDALONE";

public static final String SORTES = "SORT_ES";
public static final String SORTCLS = "SORT_CLS";
public static final String SORTPULSAR = "SORT_PULSAR";

private static final Set<String> TYPE_SET = new HashSet<String>() {

Expand All @@ -43,8 +45,9 @@ public class ClusterType {
add(ClusterType.PULSAR);
add(ClusterType.DATAPROXY);
add(ClusterType.KAFKA);
add(ClusterType.ELASTICSEARCH);
add(ClusterType.SORTSTANDALONE);
add(ClusterType.SORTES);
add(ClusterType.SORTCLS);
add(ClusterType.SORTPULSAR);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ List<InlongClusterEntity> selectByKey(@Param("clusterTag") String clusterTag, @P

int deleteByPrimaryKey(Integer id);

List<InlongClusterEntity> selectStandaloneClusterByType(@Param("sinkType") String sinkType);
List<InlongClusterEntity> selectSortClusterByType(@Param("sinkType") String sinkType);

}
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,12 @@
from inlong_cluster
where is_deleted = 0
</select>
<select id="selectStandaloneClusterByType" resultType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
<select id="selectSortClusterByType" resultType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
select
<include refid="Base_Column_List"/>
from inlong_cluster
<where>
type = 'SORTSTANDALONE'
and find_in_set(#{sinkType, jdbcType=VARCHAR}, ext_tag)
type = #{sinkType, jdbcType=VARCHAR}
and is_deleted = 0
</where>
</select>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.sortstandalone;
package org.apache.inlong.manager.pojo.cluster.sort.cls;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
Expand All @@ -24,29 +24,25 @@
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

import java.util.Set;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = ClusterType.SORTSTANDALONE)
@ApiModel("Inlong cluster info for SortStandalone")
public class SortStandaloneClusterInfo extends ClusterInfo {

@ApiModelProperty(value = "Supported sink types")
private Set<String> supportedSinkTypes;
@JsonTypeDefine(value = ClusterType.SORTCLS)
@ApiModel("Inlong cluster info for SortCls")
public class SortClsClusterInfo extends ClusterInfo {

public SortStandaloneClusterInfo() {
this.setType(ClusterType.SORTSTANDALONE);
public SortClsClusterInfo() {
this.setType(ClusterType.SORTCLS);
}

@Override
public ClusterRequest genRequest() {
return CommonBeanUtils.copyProperties(this, SortStandaloneClusterRequest::new);
return CommonBeanUtils.copyProperties(this, SortClsClusterRequest::new);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,25 @@
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.sortstandalone;
package org.apache.inlong.manager.pojo.cluster.sort.cls;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

import java.util.Set;

@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = ClusterType.SORTSTANDALONE)
@ApiModel("Inlong cluster request for SortStandalone")
public class SortStandaloneClusterRequest extends ClusterRequest {

@ApiModelProperty(value = "Supported sink types")
private Set<String> supportedSinkTypes;
@JsonTypeDefine(value = ClusterType.SORTCLS)
@ApiModel("Inlong cluster request for SortCls")
public class SortClsClusterRequest extends ClusterRequest {

public SortStandaloneClusterRequest() {
this.setType(ClusterType.SORTSTANDALONE);
public SortClsClusterRequest() {
this.setType(ClusterType.SORTCLS);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.es;
package org.apache.inlong.manager.pojo.cluster.sort.es;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
Expand All @@ -36,16 +36,16 @@
@SuperBuilder
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = ClusterType.ELASTICSEARCH)
@JsonTypeDefine(value = ClusterType.SORTES)
@ApiModel("Inlong cluster info for Elasticsearch")
public class ElasticsearchClusterInfo extends ClusterInfo {
public class SortEsClusterInfo extends ClusterInfo {

public ElasticsearchClusterInfo() {
this.setType(ClusterType.ELASTICSEARCH);
public SortEsClusterInfo() {
this.setType(ClusterType.SORTES);
}

@Override
public ClusterRequest genRequest() {
return CommonBeanUtils.copyProperties(this, ElasticsearchClusterRequest::new);
return CommonBeanUtils.copyProperties(this, SortEsClusterRequest::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.es;
package org.apache.inlong.manager.pojo.cluster.sort.es;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
Expand All @@ -32,11 +32,11 @@
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = ClusterType.ELASTICSEARCH)
@ApiModel("Inlong cluster request for Elasticsearch")
public class ElasticsearchClusterRequest extends ClusterRequest {
@JsonTypeDefine(value = ClusterType.SORTES)
@ApiModel("Inlong cluster request for SortEs")
public class SortEsClusterRequest extends ClusterRequest {

public ElasticsearchClusterRequest() {
this.setType(ClusterType.ELASTICSEARCH);
public SortEsClusterRequest() {
this.setType(ClusterType.SORTES);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.sort.pulsar;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;

import io.swagger.annotations.ApiModel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = ClusterType.SORTPULSAR)
@ApiModel("Inlong cluster info for SortPulsar")
public class SortPulsarClusterInfo extends ClusterInfo {

public SortPulsarClusterInfo() {
this.setType(ClusterType.SORTPULSAR);
}

@Override
public ClusterRequest genRequest() {
return CommonBeanUtils.copyProperties(this, SortPulsarClusterRequest::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.sort.pulsar;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;

import io.swagger.annotations.ApiModel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = ClusterType.SORTPULSAR)
@ApiModel("Inlong cluster request for SortPulsar")
public class SortPulsarClusterRequest extends ClusterRequest {

public SortPulsarClusterRequest() {
this.setType(ClusterType.SORTPULSAR);
}
}
Loading

0 comments on commit bcc9a77

Please sign in to comment.