Skip to content

Commit

Permalink
[INLONG-9192][Manager] Flat Sort Cluster types (#9193)
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng authored Nov 6, 2023
1 parent ad34b4a commit 632a477
Show file tree
Hide file tree
Showing 24 changed files with 330 additions and 310 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class SinkType extends StreamType {
public static final String HIVE = "HIVE";
public static final String CLICKHOUSE = "CLICKHOUSE";
public static final String HBASE = "HBASE";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
public static final String ELASTICSEARCH = "ES";
public static final String HDFS = "HDFS";
public static final String GREENPLUM = "GREENPLUM";
public static final String MYSQL = "MYSQL";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ public class ClusterType {
public static final String PULSAR = "PULSAR";
public static final String DATAPROXY = "DATAPROXY";
public static final String KAFKA = "KAFKA";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
public static final String SORTSTANDALONE = "SORTSTANDALONE";

public static final String SORT_ES = "SORT_ES";
public static final String SORT_CLS = "SORT_CLS";
public static final String SORT_PULSAR = "SORT_PULSAR";

private static final Set<String> TYPE_SET = new HashSet<String>() {

Expand All @@ -43,8 +45,9 @@ public class ClusterType {
add(ClusterType.PULSAR);
add(ClusterType.DATAPROXY);
add(ClusterType.KAFKA);
add(ClusterType.ELASTICSEARCH);
add(ClusterType.SORTSTANDALONE);
add(ClusterType.SORT_ES);
add(ClusterType.SORT_CLS);
add(ClusterType.SORT_PULSAR);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,4 @@ List<InlongClusterEntity> selectByKey(@Param("clusterTag") String clusterTag, @P

int deleteByPrimaryKey(Integer id);

List<InlongClusterEntity> selectStandaloneClusterByType(@Param("sinkType") String sinkType);

}
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,6 @@
from inlong_cluster
where is_deleted = 0
</select>
<select id="selectStandaloneClusterByType" resultType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
select
<include refid="Base_Column_List"/>
from inlong_cluster
<where>
type = 'SORTSTANDALONE'
and find_in_set(#{sinkType, jdbcType=VARCHAR}, ext_tag)
and is_deleted = 0
</where>
</select>

<update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
update inlong_cluster
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.sort.cls;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo;

import io.swagger.annotations.ApiModel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = ClusterType.SORT_CLS)
@ApiModel("Inlong cluster info for SortCls")
public class SortClsClusterInfo extends BaseSortClusterInfo {

public SortClsClusterInfo() {
this.setType(ClusterType.SORT_CLS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,25 @@
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.sortstandalone;
package org.apache.inlong.manager.pojo.cluster.sort.cls;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

import java.util.Set;

@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = ClusterType.SORTSTANDALONE)
@ApiModel("Inlong cluster request for SortStandalone")
public class SortStandaloneClusterRequest extends ClusterRequest {

@ApiModelProperty(value = "Supported sink types")
private Set<String> supportedSinkTypes;
@JsonTypeDefine(value = ClusterType.SORT_CLS)
@ApiModel("Inlong cluster request for SortCls")
public class SortClsClusterRequest extends BaseSortClusterRequest {

public SortStandaloneClusterRequest() {
this.setType(ClusterType.SORTSTANDALONE);
public SortClsClusterRequest() {
this.setType(ClusterType.SORT_CLS);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,28 @@
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.es;
package org.apache.inlong.manager.pojo.cluster.sort.es;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo;

import io.swagger.annotations.ApiModel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

/**
* Elasticsearch cluster info
*/
@Data
@SuperBuilder
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = ClusterType.ELASTICSEARCH)
@JsonTypeDefine(value = ClusterType.SORT_ES)
@ApiModel("Inlong cluster info for Elasticsearch")
public class ElasticsearchClusterInfo extends ClusterInfo {
public class SortEsClusterInfo extends BaseSortClusterInfo {

public ElasticsearchClusterInfo() {
this.setType(ClusterType.ELASTICSEARCH);
}

@Override
public ClusterRequest genRequest() {
return CommonBeanUtils.copyProperties(this, ElasticsearchClusterRequest::new);
public SortEsClusterInfo() {
this.setType(ClusterType.SORT_ES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.es;
package org.apache.inlong.manager.pojo.cluster.sort.es;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest;

import io.swagger.annotations.ApiModel;
import lombok.Data;
Expand All @@ -32,11 +32,11 @@
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = ClusterType.ELASTICSEARCH)
@ApiModel("Inlong cluster request for Elasticsearch")
public class ElasticsearchClusterRequest extends ClusterRequest {
@JsonTypeDefine(value = ClusterType.SORT_ES)
@ApiModel("Inlong cluster request for SortEs")
public class SortEsClusterRequest extends BaseSortClusterRequest {

public ElasticsearchClusterRequest() {
this.setType(ClusterType.ELASTICSEARCH);
public SortEsClusterRequest() {
this.setType(ClusterType.SORT_ES);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.sort.pulsar;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo;

import io.swagger.annotations.ApiModel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = ClusterType.SORT_PULSAR)
@ApiModel("Inlong cluster info for SortPulsar")
public class SortPulsarClusterInfo extends BaseSortClusterInfo {

public SortPulsarClusterInfo() {
this.setType(ClusterType.SORT_PULSAR);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.sort.pulsar;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest;

import io.swagger.annotations.ApiModel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = ClusterType.SORT_PULSAR)
@ApiModel("Inlong cluster request for SortPulsar")
public class SortPulsarClusterRequest extends BaseSortClusterRequest {

public SortPulsarClusterRequest() {
this.setType(ClusterType.SORT_PULSAR);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,14 @@

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

/**
* Cloud log service sink info
*/
@Data
@SuperBuilder
@AllArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "Cloud log service sink info")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
Expand All @@ -35,11 +34,10 @@
* Elasticsearch sink info
*/
@Data
@SuperBuilder
@AllArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "Elasticsearch sink info")
@SuperBuilder
@JsonTypeDefine(value = SinkType.ELASTICSEARCH)
public class ElasticsearchSink extends StreamSink {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,14 @@

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

/**
* Pulsar sink info
*/
@Data
@SuperBuilder
@AllArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "Pulsar sink info")
Expand Down
Loading

0 comments on commit 632a477

Please sign in to comment.