Skip to content

Commit

Permalink
[INLONG-10526][Sort] ClsSink support switch metadata acquire mode
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Jun 30, 2024
1 parent 147c49a commit d8fb950
Show file tree
Hide file tree
Showing 20 changed files with 304 additions and 398 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,25 @@
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SortClusterConfig implements Serializable {
public class ClusterTagConfig implements Serializable {

private String clusterTag;
private List<MqClusterConfig> mqClusterConfigs;
private List<DataFlowConfig> dataFlowConfigs;

public static SortClusterConfig checkDelete(SortClusterConfig last, SortClusterConfig current) {
public static ClusterTagConfig checkDelete(ClusterTagConfig last, ClusterTagConfig current) {
if (CollectionUtils.isEmpty(current.getMqClusterConfigs())) {
return last;
}

return check(last, current, MqClusterConfig::batchCheckLast, DataFlowConfig::batchCheckDelete);
}

public static SortClusterConfig checkNew(SortClusterConfig last, SortClusterConfig current) {
public static ClusterTagConfig checkNew(ClusterTagConfig last, ClusterTagConfig current) {
return check(last, current, MqClusterConfig::batchCheckLatest, DataFlowConfig::batchCheckNew);
}

public static SortClusterConfig checkUpdate(SortClusterConfig last, SortClusterConfig current) {
public static ClusterTagConfig checkUpdate(ClusterTagConfig last, ClusterTagConfig current) {
List<MqClusterConfig> updateCluster =
MqClusterConfig.batchCheckUpdate(last.getMqClusterConfigs(), current.getMqClusterConfigs());

Expand All @@ -77,7 +77,7 @@ public static SortClusterConfig checkUpdate(SortClusterConfig last, SortClusterC
DataFlowConfig.batchCheckNoUpdate(last.getDataFlowConfigs(), current.getDataFlowConfigs());
noUpdateDataflows.addAll(updateDataflows);

return SortClusterConfig.builder()
return ClusterTagConfig.builder()
.clusterTag(last.getClusterTag())
.mqClusterConfigs(latestCluster)
.dataFlowConfigs(noUpdateDataflows)
Expand All @@ -90,55 +90,55 @@ public static SortClusterConfig checkUpdate(SortClusterConfig last, SortClusterC
}

// if only dataflow update, use latest mq and update dataflow
return SortClusterConfig.builder()
return ClusterTagConfig.builder()
.clusterTag(last.getClusterTag())
.mqClusterConfigs(latestCluster)
.dataFlowConfigs(updateDataflows)
.build();
}

public static SortClusterConfig checkLatest(SortClusterConfig last, SortClusterConfig current) {
public static ClusterTagConfig checkLatest(ClusterTagConfig last, ClusterTagConfig current) {
if (CollectionUtils.isEmpty(current.getMqClusterConfigs())) {
return null;
}

return check(last, current, MqClusterConfig::batchCheckLatest, DataFlowConfig::batchCheckLatest);
}

public static List<SortClusterConfig> batchCheckDelete(
List<SortClusterConfig> last,
List<SortClusterConfig> current) {
public static List<ClusterTagConfig> batchCheckDelete(
List<ClusterTagConfig> last,
List<ClusterTagConfig> current) {
return SortConfigUtil.batchCheckDeleteRecursive(last, current,
SortClusterConfig::getClusterTag, SortClusterConfig::checkDelete);
ClusterTagConfig::getClusterTag, ClusterTagConfig::checkDelete);
}

public static List<SortClusterConfig> batchCheckNew(
List<SortClusterConfig> last,
List<SortClusterConfig> current) {
public static List<ClusterTagConfig> batchCheckNew(
List<ClusterTagConfig> last,
List<ClusterTagConfig> current) {
return SortConfigUtil.batchCheckNewRecursive(last, current,
SortClusterConfig::getClusterTag, SortClusterConfig::checkNew);
ClusterTagConfig::getClusterTag, ClusterTagConfig::checkNew);
}

public static List<SortClusterConfig> batchCheckUpdate(
List<SortClusterConfig> last,
List<SortClusterConfig> current) {
public static List<ClusterTagConfig> batchCheckUpdate(
List<ClusterTagConfig> last,
List<ClusterTagConfig> current) {
return SortConfigUtil.batchCheckUpdateRecursive(last, current,
SortClusterConfig::getClusterTag, SortClusterConfig::checkUpdate);
ClusterTagConfig::getClusterTag, ClusterTagConfig::checkUpdate);
}

public static List<SortClusterConfig> batchCheckLatest(
List<SortClusterConfig> last,
List<SortClusterConfig> current) {
public static List<ClusterTagConfig> batchCheckLatest(
List<ClusterTagConfig> last,
List<ClusterTagConfig> current) {
return SortConfigUtil.batchCheckLatestRecursive(last, current,
SortClusterConfig::getClusterTag, SortClusterConfig::checkLatest);
ClusterTagConfig::getClusterTag, ClusterTagConfig::checkLatest);
}

public static SortClusterConfig check(
SortClusterConfig last, SortClusterConfig current,
public static ClusterTagConfig check(
ClusterTagConfig last, ClusterTagConfig current,
BiFunction<List<MqClusterConfig>, List<MqClusterConfig>, List<MqClusterConfig>> mqCheckFunction,
BiFunction<List<DataFlowConfig>, List<DataFlowConfig>, List<DataFlowConfig>> flowCheckFunction) {

List<MqClusterConfig> checkCluster = mqCheckFunction
List<MqClusterConfig> checkMqCluster = mqCheckFunction
.apply(last.getMqClusterConfigs(), current.getMqClusterConfigs());
List<DataFlowConfig> checkDataflows = flowCheckFunction
.apply(last.getDataFlowConfigs(), current.getDataFlowConfigs());
Expand All @@ -147,9 +147,9 @@ public static SortClusterConfig check(
return null;
}

return SortClusterConfig.builder()
return ClusterTagConfig.builder()
.clusterTag(last.getClusterTag())
.mqClusterConfigs(checkCluster)
.mqClusterConfigs(checkMqCluster)
.dataFlowConfigs(checkDataflows)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class SortConfig implements Serializable {

private String sortClusterName;
private List<SortTaskConfig> tasks;
private List<TaskConfig> tasks;

public static SortConfig checkLatest(SortConfig last, SortConfig current) {
if (last == null) {
Expand All @@ -45,7 +45,7 @@ public static SortConfig checkLatest(SortConfig last, SortConfig current) {
}
return SortConfig.builder()
.sortClusterName(current.getSortClusterName())
.tasks(SortTaskConfig.batchCheckLatest(last.getTasks(), current.getTasks()))
.tasks(TaskConfig.batchCheckLatest(last.getTasks(), current.getTasks()))
.build();
}

Expand All @@ -56,14 +56,14 @@ public static SortConfig checkDelete(SortConfig last, SortConfig current) {
if (current == null) {
return last;
}
return check(last, current, SortTaskConfig::batchCheckDelete);
return check(last, current, TaskConfig::batchCheckDelete);
}

public static SortConfig checkUpdate(SortConfig last, SortConfig current) {
if (last == null || current == null) {
return null;
}
return check(last, current, SortTaskConfig::batchCheckUpdate);
return check(last, current, TaskConfig::batchCheckUpdate);
}

public static SortConfig checkNew(SortConfig last, SortConfig current) {
Expand All @@ -73,17 +73,17 @@ public static SortConfig checkNew(SortConfig last, SortConfig current) {
if (current == null) {
return null;
}
return check(last, current, SortTaskConfig::batchCheckNew);
return check(last, current, TaskConfig::batchCheckNew);
}

public static SortConfig check(
SortConfig last, SortConfig current,
BiFunction<List<SortTaskConfig>, List<SortTaskConfig>, List<SortTaskConfig>> taskCheckFunction) {
BiFunction<List<TaskConfig>, List<TaskConfig>, List<TaskConfig>> taskCheckFunction) {
if (!last.getSortClusterName().equals(current.getSortClusterName())) {
return null;
}

List<SortTaskConfig> checkTasks = taskCheckFunction.apply(last.getTasks(), current.getTasks());
List<TaskConfig> checkTasks = taskCheckFunction.apply(last.getTasks(), current.getTasks());
if (CollectionUtils.isEmpty(checkTasks)) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,39 +34,39 @@
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SortTaskConfig implements Serializable {
public class TaskConfig implements Serializable {

private String sortTaskName;
private List<SortClusterConfig> clusters;
private List<ClusterTagConfig> clusterTagConfigs;
private NodeConfig nodeConfig;

public static List<SortTaskConfig> batchCheckDelete(List<SortTaskConfig> last, List<SortTaskConfig> current) {
public static List<TaskConfig> batchCheckDelete(List<TaskConfig> last, List<TaskConfig> current) {
return SortConfigUtil.batchCheckDeleteRecursive(last, current,
SortTaskConfig::getSortTaskName, SortTaskConfig::checkDelete);
TaskConfig::getSortTaskName, TaskConfig::checkDelete);
}

public static List<SortTaskConfig> batchCheckUpdate(List<SortTaskConfig> last, List<SortTaskConfig> current) {
public static List<TaskConfig> batchCheckUpdate(List<TaskConfig> last, List<TaskConfig> current) {
return SortConfigUtil.batchCheckUpdateRecursive(last, current,
SortTaskConfig::getSortTaskName, SortTaskConfig::checkUpdate);
TaskConfig::getSortTaskName, TaskConfig::checkUpdate);
}

public static List<SortTaskConfig> batchCheckNew(List<SortTaskConfig> last, List<SortTaskConfig> current) {
public static List<TaskConfig> batchCheckNew(List<TaskConfig> last, List<TaskConfig> current) {
return SortConfigUtil.batchCheckNewRecursive(last, current,
SortTaskConfig::getSortTaskName, SortTaskConfig::checkNew);
TaskConfig::getSortTaskName, TaskConfig::checkNew);
}

public static List<SortTaskConfig> batchCheckLatest(List<SortTaskConfig> latest, List<SortTaskConfig> current) {
public static List<TaskConfig> batchCheckLatest(List<TaskConfig> latest, List<TaskConfig> current) {
return SortConfigUtil.batchCheckLatestRecursive(latest, current,
SortTaskConfig::getSortTaskName, SortTaskConfig::checkLatest);
TaskConfig::getSortTaskName, TaskConfig::checkLatest);
}

public static SortTaskConfig checkDelete(SortTaskConfig last, SortTaskConfig current) {
return check(last, current, SortClusterConfig::batchCheckDelete,
public static TaskConfig checkDelete(TaskConfig last, TaskConfig current) {
return check(last, current, ClusterTagConfig::batchCheckDelete,
(lastNode, currentNode) -> lastNode);
}

public static SortTaskConfig checkUpdate(SortTaskConfig last, SortTaskConfig current) {
return check(last, current, SortClusterConfig::batchCheckUpdate,
public static TaskConfig checkUpdate(TaskConfig last, TaskConfig current) {
return check(last, current, ClusterTagConfig::batchCheckUpdate,
(lastNode, currentNode) -> {
if (lastNode == null || currentNode == null) {
return null;
Expand All @@ -75,8 +75,8 @@ public static SortTaskConfig checkUpdate(SortTaskConfig last, SortTaskConfig cur
});
}

public static SortTaskConfig checkNew(SortTaskConfig last, SortTaskConfig current) {
return check(last, current, SortClusterConfig::batchCheckNew,
public static TaskConfig checkNew(TaskConfig last, TaskConfig current) {
return check(last, current, ClusterTagConfig::batchCheckNew,
(lastNode, currentNode) -> {
if (lastNode == null || currentNode == null) {
return null;
Expand All @@ -85,8 +85,8 @@ public static SortTaskConfig checkNew(SortTaskConfig last, SortTaskConfig curren
});
}

public static SortTaskConfig checkLatest(SortTaskConfig last, SortTaskConfig current) {
return check(last, current, SortClusterConfig::batchCheckLatest,
public static TaskConfig checkLatest(TaskConfig last, TaskConfig current) {
return check(last, current, ClusterTagConfig::batchCheckLatest,
(lastNode, currentNode) -> {
if (lastNode == null || currentNode == null) {
return null;
Expand All @@ -95,23 +95,24 @@ public static SortTaskConfig checkLatest(SortTaskConfig last, SortTaskConfig cur
});
}

public static SortTaskConfig check(
SortTaskConfig last, SortTaskConfig current,
BiFunction<List<SortClusterConfig>, List<SortClusterConfig>, List<SortClusterConfig>> clusterCheckFunction,
public static TaskConfig check(
TaskConfig last, TaskConfig current,
BiFunction<List<ClusterTagConfig>, List<ClusterTagConfig>, List<ClusterTagConfig>> clusterCheckFunction,
BiFunction<NodeConfig, NodeConfig, NodeConfig> nodeCheckFunction) {

List<SortClusterConfig> checkCluster = clusterCheckFunction.apply(last.getClusters(), current.getClusters());
List<ClusterTagConfig> checkClusterTags =
clusterCheckFunction.apply(last.getClusterTagConfigs(), current.getClusterTagConfigs());

NodeConfig checkNode = nodeCheckFunction.apply(last.getNodeConfig(), current.getNodeConfig());

if (CollectionUtils.isEmpty(checkCluster) && checkNode == null) {
if (CollectionUtils.isEmpty(checkClusterTags) && checkNode == null) {
return null;
}

return SortTaskConfig
return TaskConfig
.builder()
.sortTaskName(last.getSortTaskName())
.clusters(checkCluster)
.clusterTagConfigs(checkClusterTags)
.nodeConfig(checkNode)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.inlong.manager.service.core.impl;

import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
import org.apache.inlong.common.pojo.sort.SortClusterConfig;
import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
import org.apache.inlong.common.pojo.sort.SortConfig;
import org.apache.inlong.common.pojo.sort.SortConfigResponse;
import org.apache.inlong.common.pojo.sort.SortTaskConfig;
import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;
import org.apache.inlong.common.pojo.sort.mq.PulsarClusterConfig;
Expand Down Expand Up @@ -272,7 +272,7 @@ private void reloadDataFlowConfig() {
ObjectMapper objectMapper = new ObjectMapper();
Map<String, byte[]> sortConfigs = new HashMap<>();
Map<String, String> sortConfigMd5s = new HashMap<>();
Map<String, List<SortTaskConfig>> temp = new HashMap<>();
Map<String, List<TaskConfig>> temp = new HashMap<>();
List<SortConfigEntity> sinkConfigEntityList = configLoader.loadAllSortConfigEntity();
for (SortConfigEntity sortConfigEntity : sinkConfigEntityList) {
if (StringUtils.isBlank(sortConfigEntity.getSortTaskName())) {
Expand All @@ -284,16 +284,16 @@ private void reloadDataFlowConfig() {
Collectors.groupingBy(SortConfigEntity::getSortTaskName,
Collectors.groupingBy(SortConfigEntity::getInlongClusterTag))));
for (String sortClusterName : cluster2SinkMap.keySet()) {
List<SortTaskConfig> map = temp.computeIfAbsent(sortClusterName,
List<TaskConfig> map = temp.computeIfAbsent(sortClusterName,
v -> new ArrayList<>());
SortConfig sortConfig = new SortConfig();
sortConfig.setSortClusterName(sortClusterName);
Map<String, Map<String, List<SortConfigEntity>>> sortTaskNameMap = cluster2SinkMap.get(sortClusterName);
for (String sortTaskName : sortTaskNameMap.keySet()) {
Map<String, List<SortConfigEntity>> clusterTagMap = sortTaskNameMap.get(sortTaskName);
SortTaskConfig sortTaskConfig = SortTaskConfig.builder()
TaskConfig sortTaskConfig = TaskConfig.builder()
.sortTaskName(sortTaskName)
.clusters(new ArrayList<>())
.clusterTagConfigs(new ArrayList<>())
.nodeConfig(nodeInfoMap.get(sortTaskName))
.build();
for (String clusterTag : clusterTagMap.keySet()) {
Expand All @@ -308,12 +308,12 @@ private void reloadDataFlowConfig() {
}).filter(Objects::nonNull)
.sorted(Comparator.comparingInt(x -> Integer.parseInt(x.getDataflowId())))
.collect(Collectors.toList());
SortClusterConfig sortClusterConfig = SortClusterConfig.builder()
ClusterTagConfig sortClusterConfig = ClusterTagConfig.builder()
.mqClusterConfigs(mqClusterConfigMap.getOrDefault(clusterTag, new ArrayList<>()))
.clusterTag(clusterTag)
.dataFlowConfigs(dataFlowConfigs)
.build();
sortTaskConfig.getClusters().add(sortClusterConfig);
sortTaskConfig.getClusterTagConfigs().add(sortClusterConfig);
}
map.add(sortTaskConfig);
}
Expand Down
Loading

0 comments on commit d8fb950

Please sign in to comment.