Skip to content

Commit

Permalink
[Bugfix]修复Balance功能,ES密码未生效的问题(#992)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZQKC committed Apr 2, 2023
1 parent 77b87f1 commit d3cc0cb
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ public void run(OptionSet options) {
Properties kafkaConfig = new Properties();
kafkaConfig.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("bootstrap-servers").toString());
balanceParameter.setKafkaConfig(kafkaConfig);
balanceParameter.setEsRestURL(options.valueOf("es-rest-url").toString());
balanceParameter.setEsIndexPrefix(options.valueOf("es-index-prefix").toString());
if (options.has("es-password")) {
balanceParameter.setEsInfo(options.valueOf("es-rest-url").toString(), options.valueOf("es-password").toString(), options.valueOf("es-index-prefix").toString());
} else {
balanceParameter.setEsInfo(options.valueOf("es-rest-url").toString(), "", options.valueOf("es-index-prefix").toString());
}
balanceParameter.setBeforeSeconds((Integer) options.valueOf("before-seconds"));
String envFile = options.valueOf("hardware-env-file").toString();
String envJson = FileUtils.readFileToString(new File(envFile), "UTF-8");
Expand All @@ -89,6 +92,7 @@ public static void main(String[] args) {
OptionParser parser = new OptionParser();
parser.accepts("bootstrap-servers", "Kafka cluster boot server").withRequiredArg().ofType(String.class);
parser.accepts("es-rest-url", "The url of elasticsearch").withRequiredArg().ofType(String.class);
parser.accepts("es-password", "The password of elasticsearch").withRequiredArg().ofType(String.class);
parser.accepts("es-index-prefix", "The Index Prefix of elasticsearch").withRequiredArg().ofType(String.class);
parser.accepts("goals", "Balanced goals include TopicLeadersDistributionGoal,TopicReplicaDistributionGoal,DiskDistributionGoal,NetworkInboundDistributionGoal,NetworkOutboundDistributionGoal").withRequiredArg().ofType(String.class);
parser.accepts("cluster", "Balanced cluster name").withRequiredArg().ofType(String.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ public class BalanceParameter {
private Properties kafkaConfig;
//ES访问地址
private String esRestURL;

//ES访问密码
private String esPassword;

//ES存储索引前缀
private String esIndexPrefix;
//均衡目标
Expand Down Expand Up @@ -51,8 +55,14 @@ public String getEsRestURL() {
return esRestURL;
}

public void setEsRestURL(String esRestURL) {
public void setEsInfo(String esRestURL, String esPassword, String esIndexPrefix) {
this.esRestURL = esRestURL;
this.esPassword = esPassword;
this.esIndexPrefix = esIndexPrefix;
}

public String getEsPassword() {
return esPassword;
}

public List<String> getGoals() {
Expand Down Expand Up @@ -147,10 +157,6 @@ public String getEsIndexPrefix() {
return esIndexPrefix;
}

public void setEsIndexPrefix(String esIndexPrefix) {
this.esIndexPrefix = esIndexPrefix;
}

public String getOfflineBrokers() {
return offlineBrokers;
}
Expand Down Expand Up @@ -181,9 +187,11 @@ public String toString() {
"cluster='" + cluster + '\'' +
", kafkaConfig=" + kafkaConfig +
", esRestURL='" + esRestURL + '\'' +
", esPassword='" + esPassword + '\'' +
", esIndexPrefix='" + esIndexPrefix + '\'' +
", goals=" + goals +
", excludedTopics='" + excludedTopics + '\'' +
", ignoredTopics='" + ignoredTopics + '\'' +
", offlineBrokers='" + offlineBrokers + '\'' +
", balanceBrokers='" + balanceBrokers + '\'' +
", topicReplicaThreshold=" + topicReplicaThreshold +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
import com.xiaojukeji.know.streaming.km.rebalance.algorithm.metric.MetricStore;
import com.xiaojukeji.know.streaming.km.rebalance.algorithm.metric.Metrics;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
Expand All @@ -17,9 +20,7 @@
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Set;
import java.util.TreeSet;
import java.util.*;

/**
* @author leewei
Expand All @@ -30,15 +31,19 @@ public class ElasticsearchMetricStore implements MetricStore {
private final ObjectMapper objectMapper = new ObjectMapper();

private final String hosts;

private final String password;

private final String indexPrefix;
private final String format;

public ElasticsearchMetricStore(String hosts, String indexPrefix) {
this(hosts, indexPrefix, "yyyy-MM-dd");
public ElasticsearchMetricStore(String hosts, String password, String indexPrefix) {
this(hosts, password, indexPrefix, "yyyy-MM-dd");
}

public ElasticsearchMetricStore(String hosts, String indexPrefix, String format) {
public ElasticsearchMetricStore(String hosts, String password, String indexPrefix, String format) {
this.hosts = hosts;
this.password = password;
this.indexPrefix = indexPrefix;
this.format = format;
}
Expand All @@ -50,7 +55,17 @@ public Metrics getMetrics(String clusterName, int beforeSeconds) {
String metricsQueryJson = IOUtils.resourceToString("/MetricsQuery.json", StandardCharsets.UTF_8);
metricsQueryJson = metricsQueryJson.replaceAll("<var_before_time>", Integer.toString(beforeSeconds))
.replaceAll("<var_cluster_name>", clusterName);
try (RestClient restClient = RestClient.builder(toHttpHosts(this.hosts)).build()) {

List<Header> defaultHeaders = new ArrayList<>();
if (StringUtils.isNotBlank(password)) {
String encode = Base64.getEncoder().encodeToString(String.format("%s", this.password).getBytes(StandardCharsets.UTF_8));
Header header = new BasicHeader("Authorization", "Basic " + encode);
defaultHeaders.add(header);
}

Header[] headers = new Header[defaultHeaders.size()];
defaultHeaders.toArray(headers);
try (RestClient restClient = RestClient.builder(toHttpHosts(this.hosts)).setDefaultHeaders(headers).build()) {
Request request = new Request(
"GET",
"/" + indices(beforeSeconds) + "/_search");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ public static Map<String, String> subConfig(Map<String, String> config, String p
Map.Entry::getValue));
}

public static ClusterModel load(String clusterName, int beforeSeconds, String kafkaBootstrapServer, String esUrls, String esIndexPrefix, Map<Integer, Capacity> capacitiesById, Set<String> ignoredTopics) {
public static ClusterModel load(String clusterName, int beforeSeconds, String kafkaBootstrapServer, String esUrls, String esPassword, String esIndexPrefix, Map<Integer, Capacity> capacitiesById, Set<String> ignoredTopics) {
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
return load(clusterName, beforeSeconds, kafkaProperties, esUrls, esIndexPrefix, capacitiesById, ignoredTopics);
return load(clusterName, beforeSeconds, kafkaProperties, esUrls, esPassword, esIndexPrefix, capacitiesById, ignoredTopics);
}

public static ClusterModel load(String clusterName, int beforeSeconds, Properties kafkaProperties, String esUrls, String esIndexPrefix, Map<Integer, Capacity> capacitiesById, Set<String> ignoredTopics) {
MetricStore store = new ElasticsearchMetricStore(esUrls, esIndexPrefix);
public static ClusterModel load(String clusterName, int beforeSeconds, Properties kafkaProperties, String esUrls, String esPassword, String esIndexPrefix, Map<Integer, Capacity> capacitiesById, Set<String> ignoredTopics) {
MetricStore store = new ElasticsearchMetricStore(esUrls, esPassword, esIndexPrefix);
Metrics metrics = store.getMetrics(clusterName, beforeSeconds);
return load(kafkaProperties, capacitiesById, metrics, ignoredTopics);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,16 @@ public static ClusterModel getInitClusterModel(BalanceParameter parameter) {
capacity.setCapacity(Resource.NW_OUT, env.getNetwork());
capacities.put(env.getId(), capacity);
}
return Supplier.load(parameter.getCluster(), parameter.getBeforeSeconds(), parameter.getKafkaConfig(),
parameter.getEsRestURL(), parameter.getEsIndexPrefix(), capacities, AnalyzerUtils.getSplitTopics(parameter.getIgnoredTopics()));
return Supplier.load(
parameter.getCluster(),
parameter.getBeforeSeconds(),
parameter.getKafkaConfig(),
parameter.getEsRestURL(),
parameter.getEsPassword(),
parameter.getEsIndexPrefix(),
capacities,
AnalyzerUtils.getSplitTopics(parameter.getIgnoredTopics())
);
}

public static Map<String, BalanceThreshold> getBalanceThreshold(BalanceParameter parameter, double[] clusterAvgResource) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.xiaojukeji.know.streaming.km.rebalance.common.converter;

import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance;
import com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant;
import com.xiaojukeji.know.streaming.km.rebalance.common.bean.dto.ClusterBalanceIntervalDTO;
import com.xiaojukeji.know.streaming.km.rebalance.common.bean.dto.ClusterBalancePreviewDTO;
import com.xiaojukeji.know.streaming.km.rebalance.common.bean.dto.ClusterBalanceStrategyDTO;
Expand Down Expand Up @@ -34,13 +35,16 @@

@EnterpriseLoadReBalance
public class ClusterBalanceConverter {

public final static String PARTITION_INDEX = "ks_kafka_partition_metric";

private ClusterBalanceConverter() {
}

public static BalanceParameter convert2BalanceParameter(ClusterBalanceJobConfigPO configPO, Map<Integer, Broker> brokerMap, Map<Integer, BrokerSpec> brokerSpecMap, ClusterPhy clusterPhy, String esUrl, List<String> topicNames) {
public static BalanceParameter convert2BalanceParameter(ClusterBalanceJobConfigPO configPO,
Map<Integer, Broker> brokerMap,
Map<Integer, BrokerSpec> brokerSpecMap,
ClusterPhy clusterPhy,
String esUrl,
String esPassword,
List<String> topicNames) {
BalanceParameter balanceParameter = new BalanceParameter();
List<ClusterBalanceIntervalDTO> clusterBalanceIntervalDTOS = ConvertUtil.str2ObjArrayByJson(configPO.getBalanceIntervalJson(), ClusterBalanceIntervalDTO.class);

Expand All @@ -63,8 +67,7 @@ public static BalanceParameter convert2BalanceParameter(ClusterBalanceJobConfigP
balanceParameter.setGoals(goals);
balanceParameter.setCluster(clusterPhy.getId().toString());
balanceParameter.setExcludedTopics(configPO.getTopicBlackList());
balanceParameter.setEsIndexPrefix(PARTITION_INDEX + "_");
balanceParameter.setEsRestURL(esUrl);
balanceParameter.setEsInfo(esUrl, esPassword, TemplateConstant.PARTITION_INDEX + "_");
balanceParameter.setBalanceBrokers(CommonUtils.intSet2String(brokerMap.keySet()));
balanceParameter.setHardwareEnv(convert2ListHostEnv(brokerMap, brokerSpecMap));
balanceParameter.setBeforeSeconds(configPO.getMetricCalculationPeriod());
Expand All @@ -78,7 +81,13 @@ public static BalanceParameter convert2BalanceParameter(ClusterBalanceJobConfigP

}

public static BalanceParameter convert2BalanceParameter(ClusterBalanceJobPO clusterBalanceJobPO, Map<Integer, Broker> brokerMap, Map<Integer, BrokerSpec> brokerSpecMap, ClusterPhy clusterPhy, String esUrl, List<String> topicNames) {
public static BalanceParameter convert2BalanceParameter(ClusterBalanceJobPO clusterBalanceJobPO,
Map<Integer, Broker> brokerMap,
Map<Integer, BrokerSpec> brokerSpecMap,
ClusterPhy clusterPhy,
String esUrl,
String esPassword,
List<String> topicNames) {
BalanceParameter balanceParameter = new BalanceParameter();
List<ClusterBalanceIntervalDTO> clusterBalanceIntervalDTOS = ConvertUtil.str2ObjArrayByJson(clusterBalanceJobPO.getBalanceIntervalJson(), ClusterBalanceIntervalDTO.class);

Expand All @@ -101,8 +110,7 @@ public static BalanceParameter convert2BalanceParameter(ClusterBalanceJobPO clus
balanceParameter.setGoals(goals);
balanceParameter.setCluster(clusterPhy.getId().toString());
balanceParameter.setExcludedTopics(clusterBalanceJobPO.getTopicBlackList());
balanceParameter.setEsIndexPrefix(PARTITION_INDEX + "_");
balanceParameter.setEsRestURL(esUrl);
balanceParameter.setEsInfo(esUrl, esPassword, TemplateConstant.PARTITION_INDEX + "_");
balanceParameter.setBalanceBrokers(clusterBalanceJobPO.getBrokers());
balanceParameter.setHardwareEnv(convert2ListHostEnv(brokerMap, brokerSpecMap));
balanceParameter.setBeforeSeconds(clusterBalanceJobPO.getMetricCalculationPeriod());
Expand All @@ -116,7 +124,13 @@ public static BalanceParameter convert2BalanceParameter(ClusterBalanceJobPO clus

}

public static BalanceParameter convert2BalanceParameter(JobClusterBalanceContent dto, List<Broker> brokers, Map<Integer, BrokerSpec> brokerSpecMap, ClusterPhy clusterPhy, String esUrl, List<String> topicNames) {
public static BalanceParameter convert2BalanceParameter(JobClusterBalanceContent dto,
List<Broker> brokers,
Map<Integer, BrokerSpec> brokerSpecMap,
ClusterPhy clusterPhy,
String esUrl,
String esPassword,
List<String> topicNames) {
BalanceParameter balanceParameter = new BalanceParameter();
List<ClusterBalanceIntervalDTO> clusterBalanceIntervalDTOS = dto.getClusterBalanceIntervalList().stream()
.sorted(Comparator.comparing(ClusterBalanceIntervalDTO::getPriority)).collect(Collectors.toList());
Expand All @@ -141,8 +155,7 @@ public static BalanceParameter convert2BalanceParameter(JobClusterBalanceContent
balanceParameter.setGoals(goals);
balanceParameter.setCluster(clusterPhy.getId().toString());
balanceParameter.setExcludedTopics(CommonUtils.strList2String(dto.getTopicBlackList()));
balanceParameter.setEsIndexPrefix(PARTITION_INDEX + "_");
balanceParameter.setEsRestURL(esUrl);
balanceParameter.setEsInfo(esUrl, esPassword, TemplateConstant.PARTITION_INDEX + "_");
balanceParameter.setBalanceBrokers(CommonUtils.intSet2String(brokerMap.keySet()));
balanceParameter.setHardwareEnv(convert2ListHostEnv(brokerMap, brokerSpecMap));
balanceParameter.setBeforeSeconds(dto.getMetricCalculationPeriod());
Expand All @@ -156,7 +169,13 @@ public static BalanceParameter convert2BalanceParameter(JobClusterBalanceContent

}

public static BalanceParameter convert2BalanceParameter(ClusterBalancePreviewDTO dto, Map<Integer, Broker> brokerMap, Map<Integer, BrokerSpec> brokerSpecMap, ClusterPhy clusterPhy, String esUrl, List<String> topicNames) {
public static BalanceParameter convert2BalanceParameter(ClusterBalancePreviewDTO dto,
Map<Integer, Broker> brokerMap,
Map<Integer, BrokerSpec> brokerSpecMap,
ClusterPhy clusterPhy,
String esUrl,
String esPassword,
List<String> topicNames) {
BalanceParameter balanceParameter = new BalanceParameter();
List<ClusterBalanceIntervalDTO> clusterBalanceIntervalDTOS = dto.getClusterBalanceIntervalList().stream()
.sorted(Comparator.comparing(ClusterBalanceIntervalDTO::getPriority)).collect(Collectors.toList());
Expand All @@ -179,8 +198,7 @@ public static BalanceParameter convert2BalanceParameter(ClusterBalancePreviewDTO
balanceParameter.setGoals(goals);
balanceParameter.setCluster(clusterPhy.getId().toString());
balanceParameter.setExcludedTopics(CommonUtils.strList2String(dto.getTopicBlackList()));
balanceParameter.setEsIndexPrefix(PARTITION_INDEX + "_");
balanceParameter.setEsRestURL(esUrl);
balanceParameter.setEsInfo(esUrl, esPassword, TemplateConstant.PARTITION_INDEX + "_");
balanceParameter.setBalanceBrokers(CommonUtils.intList2String(dto.getBrokers()));
balanceParameter.setHardwareEnv(convert2ListHostEnv(brokerMap, brokerSpecMap));
balanceParameter.setBeforeSeconds(dto.getMetricCalculationPeriod());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public class ClusterBalanceJobHandler implements JobHandler {
@Value("${es.client.address:}")
private String esAddress;

@Value("${es.client.pass:}")
private String esPassword;

@Autowired
private ClusterBalanceJobService clusterBalanceJobService;

Expand Down Expand Up @@ -116,7 +119,7 @@ public Result<Void> submit(Job job, String operator) {

//获取任务计划
List<String> topicNames = topicService.listRecentUpdateTopicNamesFromDB(dto.getClusterId(), configUtils.getClusterBalanceIgnoredTopicsTimeSecond());
BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(dto, brokers, brokerSpecMap, clusterPhy, esAddress, topicNames);
BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(dto, brokers, brokerSpecMap, clusterPhy, esAddress, esPassword, topicNames);
try {
ExecutionRebalance executionRebalance = new ExecutionRebalance();
OptimizerResult optimizerResult = executionRebalance.optimizations(balanceParameter);
Expand Down Expand Up @@ -202,7 +205,7 @@ public Result<Void> modify(Job job, String operator) {

List<String> topicNames = topicService.listRecentUpdateTopicNamesFromDB(job.getClusterId(), configUtils.getClusterBalanceIgnoredTopicsTimeSecond());
JobClusterBalanceContent dto = ConvertUtil.str2ObjByJson(job.getJobData(), JobClusterBalanceContent.class);
BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(dto, brokers, brokerSpecMap, clusterPhy, esAddress, topicNames);
BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(dto, brokers, brokerSpecMap, clusterPhy, esAddress, esPassword, topicNames);
ExecutionRebalance executionRebalance = new ExecutionRebalance();
try {
OptimizerResult optimizerResult = executionRebalance.optimizations(balanceParameter);
Expand Down
Loading

0 comments on commit d3cc0cb

Please sign in to comment.