From 850d43df6361375fcf7b72e54f1c89b575fcf97f Mon Sep 17 00:00:00 2001 From: zengqiao Date: Sat, 23 Jan 2021 13:19:29 +0800 Subject: [PATCH] add v2.2.0 feature & fix --- .../upgrade_manual/logi-km-v2.2.0.md | 27 ++++++++ docs/install_guide/create_mysql_table.sql | 36 ++++++---- docs/install_guide/install_guide_cn.md | 48 ++++++++------ .../entity/ao/cluster/LogicalCluster.java | 27 +++++--- .../common/entity/dto/rd/ClusterDTO.java | 14 +++- .../entity/dto/rd/LogicalClusterDTO.java | 13 ++++ .../manager/common/entity/pojo/ClusterDO.java | 59 ++++++++++------- .../common/entity/pojo/LogicalClusterDO.java | 11 ++++ .../vo/normal/cluster/LogicClusterVO.java | 57 ++++++++-------- .../entity/vo/rd/cluster/ClusterBaseVO.java | 16 ++++- .../vo/rd/cluster/LogicalClusterVO.java | 12 ++++ .../kafka/manager/common/utils/JsonUtils.java | 7 ++ .../manager/common/utils/jmx/JmxConfig.java | 65 +++++++++++++++++++ .../common/utils/jmx/JmxConnectorWrap.java | 35 ++++++++-- .../cache/LogicalClusterMetadataManager.java | 13 ++++ .../cache/PhysicalClusterMetadataManager.java | 15 ++++- .../service/impl/ClusterServiceImpl.java | 1 + .../impl/LogicalClusterServiceImpl.java | 1 + .../manager/service/utils/ConfigUtils.java | 11 ---- .../zookeeper/BrokerStateListener.java | 9 +-- .../src/main/resources/mapper/ClusterDao.xml | 6 +- .../resources/mapper/LogicalClusterDao.xml | 28 ++++---- .../monitor/component/n9e/N9eConverter.java | 19 +++++- .../n9e/entry/bizenum/CategoryEnum.java | 23 +++++++ .../SinkCommunityTopicMetrics2Monitor.java | 2 +- .../listener/SinkConsumerMetrics2Monitor.java | 2 +- .../SinkTopicThrottledMetrics2Monitor.java | 2 +- .../normal/NormalAccountController.java | 3 +- .../normal/NormalTopicController.java | 3 +- .../api/versionone/op/OpUtilsController.java | 2 +- .../web/converters/ClusterModelConverter.java | 6 +- .../LogicalClusterModelConverter.java | 2 + .../web/converters/TopicModelConverter.java | 4 +- .../src/main/resources/application.yml | 2 +- 34 files changed, 427 insertions(+), 154 deletions(-) create mode 100644 docs/dev_guide/upgrade_manual/logi-km-v2.2.0.md create mode 100644 kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConfig.java create mode 100644 kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/bizenum/CategoryEnum.java diff --git a/docs/dev_guide/upgrade_manual/logi-km-v2.2.0.md b/docs/dev_guide/upgrade_manual/logi-km-v2.2.0.md new file mode 100644 index 000000000..96622080b --- /dev/null +++ b/docs/dev_guide/upgrade_manual/logi-km-v2.2.0.md @@ -0,0 +1,27 @@ + +--- + +![kafka-manager-logo](../../assets/images/common/logo_name.png) + +**一站式`Apache Kafka`集群指标监控与运维管控平台** + +--- + +# 升级至`2.2.0`版本 + +`2.2.0`版本在`cluster`表及`logical_cluster`各增加了一个字段,因此需要执行下面的sql进行字段的增加。 + +```sql +# 往cluster表中增加jmx_properties字段, 这个字段会用于存储jmx相关的认证以及配置信息 +ALTER TABLE `cluster` ADD COLUMN `jmx_properties` TEXT NULL COMMENT 'JMX配置' AFTER `security_properties`; + +# 往logical_cluster中增加identification字段, 同时数据和原先name数据相同, 最后增加一个唯一键. +# 此后, name字段还是表示集群名称, 而identification字段表示的是集群标识, 只能是字母数字及下划线组成, +# 数据上报到监控系统时, 集群这个标识采用的字段就是identification字段, 之前使用的是name字段. +ALTER TABLE `logical_cluster` ADD COLUMN `identification` VARCHAR(192) NOT NULL DEFAULT '' COMMENT '逻辑集群标识' AFTER `name`; + +UPDATE `logical_cluster` SET `identification`=`name` WHERE id>=0; + +ALTER TABLE `logical_cluster` ADD INDEX `uniq_identification` (`identification` ASC); +``` + diff --git a/docs/install_guide/create_mysql_table.sql b/docs/install_guide/create_mysql_table.sql index 528838ee8..2a015de1e 100644 --- a/docs/install_guide/create_mysql_table.sql +++ b/docs/install_guide/create_mysql_table.sql @@ -1,3 +1,8 @@ +-- create database +CREATE DATABASE logi_kafka_manager; + +USE logi_kafka_manager; + -- -- Table structure for table `account` -- @@ -104,7 +109,8 @@ CREATE TABLE `cluster` ( `zookeeper` varchar(512) NOT NULL DEFAULT '' COMMENT 'zk地址', `bootstrap_servers` varchar(512) NOT NULL DEFAULT '' COMMENT 'server地址', `kafka_version` varchar(32) NOT NULL DEFAULT '' COMMENT 'kafka版本', - `security_properties` text COMMENT '安全认证参数', + `security_properties` text COMMENT 'Kafka安全认证参数', + `jmx_properties` text COMMENT 'JMX配置', `status` tinyint(4) NOT NULL DEFAULT '1' COMMENT ' 监控标记, 0表示未监控, 1表示监控中', `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', @@ -302,20 +308,22 @@ INSERT INTO kafka_user(app_id, password, user_type, operation) VALUES ('dkm_admi -- Table structure for table `logical_cluster` -- --- DROP TABLE IF EXISTS `logical_cluster`; CREATE TABLE `logical_cluster` ( - `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', - `name` varchar(192) NOT NULL DEFAULT '' COMMENT '逻辑集群名称', - `mode` int(16) NOT NULL DEFAULT '0' COMMENT '逻辑集群类型, 0:共享集群, 1:独享集群, 2:独立集群', - `app_id` varchar(64) NOT NULL DEFAULT '' COMMENT '所属应用', - `cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群id', - `region_list` varchar(256) NOT NULL DEFAULT '' COMMENT 'regionid列表', - `description` text COMMENT '备注说明', - `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', - `gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', - PRIMARY KEY (`id`), - UNIQUE KEY `uniq_name` (`name`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='逻辑集群信息表'; + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', + `name` varchar(192) NOT NULL DEFAULT '' COMMENT '逻辑集群名称', + `identification` varchar(192) NOT NULL DEFAULT '' COMMENT '逻辑集群标识', + `mode` int(16) NOT NULL DEFAULT '0' COMMENT '逻辑集群类型, 0:共享集群, 1:独享集群, 2:独立集群', + `app_id` varchar(64) NOT NULL DEFAULT '' COMMENT '所属应用', + `cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群id', + `region_list` varchar(256) NOT NULL DEFAULT '' COMMENT 'regionid列表', + `description` text COMMENT '备注说明', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uniq_name` (`name`), + UNIQUE KEY `uniq_identification` (`identification`) +) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8 COMMENT='逻辑集群信息表'; + -- -- Table structure for table `monitor_rule` diff --git a/docs/install_guide/install_guide_cn.md b/docs/install_guide/install_guide_cn.md index 9c700587a..9a4a415bb 100644 --- a/docs/install_guide/install_guide_cn.md +++ b/docs/install_guide/install_guide_cn.md @@ -9,50 +9,56 @@ # 安装手册 +## 1、环境依赖 -## 环境依赖 +如果是以Release包进行安装的,则仅安装`Java`及`MySQL`即可。如果是要先进行源码包进行打包,然后再使用,则需要安装`Maven`及`Node`环境。 -- `Maven 3.5+`(后端打包依赖) -- `node v12+`(前端打包依赖) - `Java 8+`(运行环境需要) - `MySQL 5.7`(数据存储) +- `Maven 3.5+`(后端打包依赖) +- `Node 10+`(前端打包依赖) --- -## 环境初始化 +## 2、获取安装包 -执行[create_mysql_table.sql](create_mysql_table.sql)中的SQL命令,从而创建所需的MySQL库及表,默认创建的库名是`kafka_manager`。 +**1、Release直接下载** -``` -# 示例: -mysql -uXXXX -pXXX -h XXX.XXX.XXX.XXX -PXXXX < ./create_mysql_table.sql -``` +这里如果觉得麻烦,然后也不想进行二次开发,则可以直接下载Release包,下载地址:[Github Release包下载地址](https://github.com/didi/Logi-KafkaManager/releases) + +如果觉得Github的下载地址太慢了,也可以进入`Logi-KafkaManager`的用户群获取,群地址在README中。 ---- -## 打包 +**2、源代码进行打包** -```bash +下载好代码之后,进入`Logi-KafkaManager`的主目录,执行`sh build.sh`命令即可,执行完成之后会在`output/kafka-manager-xxx`目录下面生成一个jar包。 -# 一次性打包 -cd .. -mvn install +对于`windows`环境的用户,估计执行不了`sh build.sh`命令,因此可以直接执行`mvn install`,然后在`kafka-manager-web/target`目录下生成一个kafka-manager-web-xxx.jar的包。 +获取到jar包之后,我们继续下面的步骤。 + +--- + +## 3、MySQL-DB初始化 + +执行[create_mysql_table.sql](create_mysql_table.sql)中的SQL命令,从而创建所需的MySQL库及表,默认创建的库名是`logi_kafka_manager`。 + +``` +# 示例: +mysql -uXXXX -pXXX -h XXX.XXX.XXX.XXX -PXXXX < ./create_mysql_table.sql ``` --- -## 启动 +## 4、启动 ``` -# application.yml 是配置文件 +# application.yml 是配置文件,最简单的是仅修改MySQL相关的配置即可启动 -cp kafka-manager-web/src/main/resources/application.yml kafka-manager-web/target/ -cd kafka-manager-web/target/ -nohup java -jar kafka-manager-web-2.1.0-SNAPSHOT.jar --spring.config.location=./application.yml > /dev/null 2>&1 & +nohup java -jar kafka-manager.jar --spring.config.location=./application.yml > /dev/null 2>&1 & ``` -## 使用 +### 5、使用 本地启动的话,访问`http://localhost:8080`,输入帐号及密码(默认`admin/admin`)进行登录。更多参考:[kafka-manager 用户使用手册](../user_guide/user_guide_cn.md) diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/cluster/LogicalCluster.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/cluster/LogicalCluster.java index 86941d0e9..a7525374e 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/cluster/LogicalCluster.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/cluster/LogicalCluster.java @@ -9,6 +9,8 @@ public class LogicalCluster { private String logicalClusterName; + private String logicalClusterIdentification; + private Integer mode; private Integer topicNum; @@ -41,6 +43,14 @@ public void setLogicalClusterName(String logicalClusterName) { this.logicalClusterName = logicalClusterName; } + public String getLogicalClusterIdentification() { + return logicalClusterIdentification; + } + + public void setLogicalClusterIdentification(String logicalClusterIdentification) { + this.logicalClusterIdentification = logicalClusterIdentification; + } + public Integer getMode() { return mode; } @@ -81,6 +91,14 @@ public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + public Long getGmtCreate() { return gmtCreate; } @@ -97,19 +115,12 @@ public void setGmtModify(Long gmtModify) { this.gmtModify = gmtModify; } - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - @Override public String toString() { return "LogicalCluster{" + "logicalClusterId=" + logicalClusterId + ", logicalClusterName='" + logicalClusterName + '\'' + + ", logicalClusterIdentification='" + logicalClusterIdentification + '\'' + ", mode=" + mode + ", topicNum=" + topicNum + ", clusterVersion='" + clusterVersion + '\'' + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/ClusterDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/ClusterDTO.java index c28bc8b61..0b6fcebbf 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/ClusterDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/ClusterDTO.java @@ -27,9 +27,12 @@ public class ClusterDTO { @ApiModelProperty(value="数据中心") private String idc; - @ApiModelProperty(value="安全配置参数") + @ApiModelProperty(value="Kafka安全配置") private String securityProperties; + @ApiModelProperty(value="Jmx配置") + private String jmxProperties; + public Long getClusterId() { return clusterId; } @@ -78,6 +81,14 @@ public void setSecurityProperties(String securityProperties) { this.securityProperties = securityProperties; } + public String getJmxProperties() { + return jmxProperties; + } + + public void setJmxProperties(String jmxProperties) { + this.jmxProperties = jmxProperties; + } + @Override public String toString() { return "ClusterDTO{" + @@ -87,6 +98,7 @@ public String toString() { ", bootstrapServers='" + bootstrapServers + '\'' + ", idc='" + idc + '\'' + ", securityProperties='" + securityProperties + '\'' + + ", jmxProperties='" + jmxProperties + '\'' + '}'; } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/LogicalClusterDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/LogicalClusterDTO.java index 790f9758f..def224798 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/LogicalClusterDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/LogicalClusterDTO.java @@ -21,6 +21,9 @@ public class LogicalClusterDTO { @ApiModelProperty(value = "名称") private String name; + @ApiModelProperty(value = "集群标识, 用于告警的上报") + private String identification; + @ApiModelProperty(value = "集群模式") private Integer mode; @@ -52,6 +55,14 @@ public void setName(String name) { this.name = name; } + public String getIdentification() { + return identification; + } + + public void setIdentification(String identification) { + this.identification = identification; + } + public Integer getMode() { return mode; } @@ -97,6 +108,7 @@ public String toString() { return "LogicalClusterDTO{" + "id=" + id + ", name='" + name + '\'' + + ", identification='" + identification + '\'' + ", mode=" + mode + ", clusterId=" + clusterId + ", regionIdList=" + regionIdList + @@ -117,6 +129,7 @@ public boolean legal() { } appId = ValidateUtils.isNull(appId)? "": appId; description = ValidateUtils.isNull(description)? "": description; + identification = ValidateUtils.isNull(identification)? name: identification; return true; } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ClusterDO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ClusterDO.java index cefbc9f2d..04ee265d6 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ClusterDO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ClusterDO.java @@ -17,6 +17,8 @@ public class ClusterDO implements Comparable { private String securityProperties; + private String jmxProperties; + private Integer status; private Date gmtCreate; @@ -31,30 +33,6 @@ public void setId(Long id) { this.id = id; } - public Integer getStatus() { - return status; - } - - public void setStatus(Integer status) { - this.status = status; - } - - public Date getGmtCreate() { - return gmtCreate; - } - - public void setGmtCreate(Date gmtCreate) { - this.gmtCreate = gmtCreate; - } - - public Date getGmtModify() { - return gmtModify; - } - - public void setGmtModify(Date gmtModify) { - this.gmtModify = gmtModify; - } - public String getClusterName() { return clusterName; } @@ -87,6 +65,38 @@ public void setSecurityProperties(String securityProperties) { this.securityProperties = securityProperties; } + public String getJmxProperties() { + return jmxProperties; + } + + public void setJmxProperties(String jmxProperties) { + this.jmxProperties = jmxProperties; + } + + public Integer getStatus() { + return status; + } + + public void setStatus(Integer status) { + this.status = status; + } + + public Date getGmtCreate() { + return gmtCreate; + } + + public void setGmtCreate(Date gmtCreate) { + this.gmtCreate = gmtCreate; + } + + public Date getGmtModify() { + return gmtModify; + } + + public void setGmtModify(Date gmtModify) { + this.gmtModify = gmtModify; + } + @Override public String toString() { return "ClusterDO{" + @@ -95,6 +105,7 @@ public String toString() { ", zookeeper='" + zookeeper + '\'' + ", bootstrapServers='" + bootstrapServers + '\'' + ", securityProperties='" + securityProperties + '\'' + + ", jmxProperties='" + jmxProperties + '\'' + ", status=" + status + ", gmtCreate=" + gmtCreate + ", gmtModify=" + gmtModify + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/LogicalClusterDO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/LogicalClusterDO.java index d5cb34974..db81c1c90 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/LogicalClusterDO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/LogicalClusterDO.java @@ -11,6 +11,8 @@ public class LogicalClusterDO { private String name; + private String identification; + private Integer mode; private String appId; @@ -41,6 +43,14 @@ public void setName(String name) { this.name = name; } + public String getIdentification() { + return identification; + } + + public void setIdentification(String identification) { + this.identification = identification; + } + public Integer getMode() { return mode; } @@ -102,6 +112,7 @@ public String toString() { return "LogicalClusterDO{" + "id=" + id + ", name='" + name + '\'' + + ", identification='" + identification + '\'' + ", mode=" + mode + ", appId='" + appId + '\'' + ", clusterId=" + clusterId + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/cluster/LogicClusterVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/cluster/LogicClusterVO.java index c3c5f9c34..8fa5db9d2 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/cluster/LogicClusterVO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/cluster/LogicClusterVO.java @@ -15,6 +15,9 @@ public class LogicClusterVO { @ApiModelProperty(value="逻辑集群名称") private String clusterName; + @ApiModelProperty(value="逻辑标识") + private String clusterIdentification; + @ApiModelProperty(value="逻辑集群类型, 0:共享集群, 1:独享集群, 2:独立集群") private Integer mode; @@ -24,9 +27,6 @@ public class LogicClusterVO { @ApiModelProperty(value="集群版本") private String clusterVersion; - @ApiModelProperty(value="物理集群ID") - private Long physicalClusterId; - @ApiModelProperty(value="集群服务地址") private String bootstrapServers; @@ -55,6 +55,22 @@ public void setClusterName(String clusterName) { this.clusterName = clusterName; } + public String getClusterIdentification() { + return clusterIdentification; + } + + public void setClusterIdentification(String clusterIdentification) { + this.clusterIdentification = clusterIdentification; + } + + public Integer getMode() { + return mode; + } + + public void setMode(Integer mode) { + this.mode = mode; + } + public Integer getTopicNum() { return topicNum; } @@ -71,14 +87,6 @@ public void setClusterVersion(String clusterVersion) { this.clusterVersion = clusterVersion; } - public Long getPhysicalClusterId() { - return physicalClusterId; - } - - public void setPhysicalClusterId(Long physicalClusterId) { - this.physicalClusterId = physicalClusterId; - } - public String getBootstrapServers() { return bootstrapServers; } @@ -87,6 +95,14 @@ public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + public Long getGmtCreate() { return gmtCreate; } @@ -103,32 +119,15 @@ public void setGmtModify(Long gmtModify) { this.gmtModify = gmtModify; } - public Integer getMode() { - return mode; - } - - public void setMode(Integer mode) { - this.mode = mode; - } - - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - @Override public String toString() { return "LogicClusterVO{" + "clusterId=" + clusterId + ", clusterName='" + clusterName + '\'' + + ", clusterIdentification='" + clusterIdentification + '\'' + ", mode=" + mode + ", topicNum=" + topicNum + ", clusterVersion='" + clusterVersion + '\'' + - ", physicalClusterId=" + physicalClusterId + ", bootstrapServers='" + bootstrapServers + '\'' + ", description='" + description + '\'' + ", gmtCreate=" + gmtCreate + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/cluster/ClusterBaseVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/cluster/ClusterBaseVO.java index ca2b73508..111304f1a 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/cluster/ClusterBaseVO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/cluster/ClusterBaseVO.java @@ -32,9 +32,12 @@ public class ClusterBaseVO { @ApiModelProperty(value="集群类型") private Integer mode; - @ApiModelProperty(value="安全配置参数") + @ApiModelProperty(value="Kafka安全配置") private String securityProperties; + @ApiModelProperty(value="Jmx配置") + private String jmxProperties; + @ApiModelProperty(value="1:监控中, 0:暂停监控") private Integer status; @@ -108,6 +111,14 @@ public void setSecurityProperties(String securityProperties) { this.securityProperties = securityProperties; } + public String getJmxProperties() { + return jmxProperties; + } + + public void setJmxProperties(String jmxProperties) { + this.jmxProperties = jmxProperties; + } + public Integer getStatus() { return status; } @@ -141,8 +152,9 @@ public String toString() { ", bootstrapServers='" + bootstrapServers + '\'' + ", kafkaVersion='" + kafkaVersion + '\'' + ", idc='" + idc + '\'' + - ", mode='" + mode + '\'' + + ", mode=" + mode + ", securityProperties='" + securityProperties + '\'' + + ", jmxProperties='" + jmxProperties + '\'' + ", status=" + status + ", gmtCreate=" + gmtCreate + ", gmtModify=" + gmtModify + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/cluster/LogicalClusterVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/cluster/LogicalClusterVO.java index 86ced10f6..61f9b90c7 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/cluster/LogicalClusterVO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/cluster/LogicalClusterVO.java @@ -18,6 +18,9 @@ public class LogicalClusterVO { @ApiModelProperty(value = "逻辑集群名称") private String logicalClusterName; + @ApiModelProperty(value = "逻辑集群标识") + private String logicalClusterIdentification; + @ApiModelProperty(value = "物理集群ID") private Long physicalClusterId; @@ -55,6 +58,14 @@ public void setLogicalClusterName(String logicalClusterName) { this.logicalClusterName = logicalClusterName; } + public String getLogicalClusterIdentification() { + return logicalClusterIdentification; + } + + public void setLogicalClusterIdentification(String logicalClusterIdentification) { + this.logicalClusterIdentification = logicalClusterIdentification; + } + public Long getPhysicalClusterId() { return physicalClusterId; } @@ -116,6 +127,7 @@ public String toString() { return "LogicalClusterVO{" + "logicalClusterId=" + logicalClusterId + ", logicalClusterName='" + logicalClusterName + '\'' + + ", logicalClusterIdentification='" + logicalClusterIdentification + '\'' + ", physicalClusterId=" + physicalClusterId + ", regionIdList=" + regionIdList + ", mode=" + mode + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java index d9724065c..46d177adf 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java @@ -53,6 +53,13 @@ public static String toJSONString(Object obj) { return JSON.toJSONString(obj); } + public static T stringToObj(String src, Class clazz) { + if (ValidateUtils.isBlank(src)) { + return null; + } + return JSON.parseObject(src, clazz); + } + public static List parseTopicConnections(Long clusterId, JSONObject jsonObject, long postTime) { List connectionDOList = new ArrayList<>(); for (String clientType: jsonObject.keySet()) { diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConfig.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConfig.java new file mode 100644 index 000000000..bbc913c4c --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConfig.java @@ -0,0 +1,65 @@ +package com.xiaojukeji.kafka.manager.common.utils.jmx; + +public class JmxConfig { + /** + * 单台最大连接数 + */ + private Integer maxConn; + + /** + * 用户名 + */ + private String username; + + /** + * 密码 + */ + private String password; + + /** + * 开启SSL + */ + private Boolean openSSL; + + public Integer getMaxConn() { + return maxConn; + } + + public void setMaxConn(Integer maxConn) { + this.maxConn = maxConn; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public Boolean isOpenSSL() { + return openSSL; + } + + public void setOpenSSL(Boolean openSSL) { + this.openSSL = openSSL; + } + + @Override + public String toString() { + return "JmxConfig{" + + "maxConn=" + maxConn + + ", username='" + username + '\'' + + ", password='" + password + '\'' + + ", openSSL=" + openSSL + + '}'; + } +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java index ed8a639e2..fc70c6b21 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java @@ -1,5 +1,6 @@ package com.xiaojukeji.kafka.manager.common.utils.jmx; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -7,8 +8,14 @@ import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; +import javax.management.remote.rmi.RMIConnectorServer; +import javax.naming.Context; +import javax.rmi.ssl.SslRMIClientSocketFactory; import java.io.IOException; import java.net.MalformedURLException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -28,13 +35,19 @@ public class JmxConnectorWrap { private AtomicInteger atomicInteger; - public JmxConnectorWrap(String host, int port, int maxConn) { + private JmxConfig jmxConfig; + + public JmxConnectorWrap(String host, int port, JmxConfig jmxConfig) { this.host = host; this.port = port; - if (maxConn <= 0) { - maxConn = 1; + this.jmxConfig = jmxConfig; + if (ValidateUtils.isNull(this.jmxConfig)) { + this.jmxConfig = new JmxConfig(); + } + if (ValidateUtils.isNullOrLessThanZero(this.jmxConfig.getMaxConn())) { + this.jmxConfig.setMaxConn(1); } - this.atomicInteger = new AtomicInteger(maxConn); + this.atomicInteger = new AtomicInteger(this.jmxConfig.getMaxConn()); } public boolean checkJmxConnectionAndInitIfNeed() { @@ -64,8 +77,18 @@ private synchronized boolean createJmxConnector() { } String jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", host, port); try { - JMXServiceURL url = new JMXServiceURL(jmxUrl); - jmxConnector = JMXConnectorFactory.connect(url, null); + Map environment = new HashMap(); + if (!ValidateUtils.isBlank(this.jmxConfig.getUsername()) && !ValidateUtils.isBlank(this.jmxConfig.getPassword())) { + environment.put(javax.management.remote.JMXConnector.CREDENTIALS, Arrays.asList(this.jmxConfig.getUsername(), this.jmxConfig.getPassword())); + } + if (jmxConfig.isOpenSSL() != null && this.jmxConfig.isOpenSSL()) { + environment.put(Context.SECURITY_PROTOCOL, "ssl"); + SslRMIClientSocketFactory clientSocketFactory = new SslRMIClientSocketFactory(); + environment.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, clientSocketFactory); + environment.put("com.sun.jndi.rmi.factory.socket", clientSocketFactory); + } + + jmxConnector = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), environment); LOGGER.info("JMX connect success, host:{} port:{}.", host, port); return true; } catch (MalformedURLException e) { diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java index 72bdcb76c..5cd815815 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java @@ -69,6 +69,19 @@ public Set getBrokerIdSet(Long logicClusterId) { return LOGICAL_CLUSTER_ID_BROKER_ID_MAP.getOrDefault(logicClusterId, new HashSet<>()); } + public Long getTopicLogicalClusterId(Long physicalClusterId, String topicName) { + if (!LOADED.get()) { + flush(); + } + + Map logicalClusterIdMap = TOPIC_LOGICAL_MAP.get(physicalClusterId); + if (ValidateUtils.isNull(logicalClusterIdMap)) { + return null; + } + + return logicalClusterIdMap.get(topicName); + } + public LogicalClusterDO getTopicLogicalCluster(Long physicalClusterId, String topicName) { if (!LOADED.get()) { flush(); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java index 345f7b9c1..594539198 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java @@ -4,9 +4,11 @@ import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion; +import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.ListUtils; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.ControllerData; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; @@ -118,8 +120,15 @@ public synchronized void addNew(ClusterDO clusterDO) { return; } + JmxConfig jmxConfig = null; + try { + jmxConfig = JsonUtils.stringToObj(clusterDO.getJmxProperties(), JmxConfig.class); + } catch (Exception e) { + LOGGER.error("class=PhysicalClusterMetadataManager||method=addNew||clusterDO={}||msg=parse jmx properties failed", JsonUtils.toJSONString(clusterDO)); + } + //增加Broker监控 - BrokerStateListener brokerListener = new BrokerStateListener(clusterDO.getId(), zkConfig, configUtils.getJmxMaxConn()); + BrokerStateListener brokerListener = new BrokerStateListener(clusterDO.getId(), zkConfig, jmxConfig); brokerListener.init(); zkConfig.watchChildren(ZkPathUtil.BROKER_IDS_ROOT, brokerListener); @@ -280,7 +289,7 @@ public static Long getTopicRetentionTime(Long clusterId, String topicName) { //---------------------------Broker元信息相关-------------- - public static void putBrokerMetadata(Long clusterId, Integer brokerId, BrokerMetadata brokerMetadata, Integer jmxMaxConn) { + public static void putBrokerMetadata(Long clusterId, Integer brokerId, BrokerMetadata brokerMetadata, JmxConfig jmxConfig) { Map metadataMap = BROKER_METADATA_MAP.get(clusterId); if (metadataMap == null) { return; @@ -288,7 +297,7 @@ public static void putBrokerMetadata(Long clusterId, Integer brokerId, BrokerMet metadataMap.put(brokerId, brokerMetadata); Map jmxMap = JMX_CONNECTOR_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>()); - jmxMap.put(brokerId, new JmxConnectorWrap(brokerMetadata.getHost(), brokerMetadata.getJmxPort(), jmxMaxConn)); + jmxMap.put(brokerId, new JmxConnectorWrap(brokerMetadata.getHost(), brokerMetadata.getJmxPort(), jmxConfig)); JMX_CONNECTOR_MAP.put(clusterId, jmxMap); Map versionMap = KAFKA_VERSION_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>()); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java index 9f9727e10..93b90d2f0 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java @@ -203,6 +203,7 @@ private boolean isZookeeperLegal(String zookeeper) { zk.close(); } } catch (Throwable t) { + return false; } } return true; diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/LogicalClusterServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/LogicalClusterServiceImpl.java index 5b2fb7038..9a6f40be9 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/LogicalClusterServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/LogicalClusterServiceImpl.java @@ -113,6 +113,7 @@ private LogicalCluster convert2LogicalCluster(LogicalClusterDO logicalClusterDO) LogicalCluster logicalCluster = new LogicalCluster(); logicalCluster.setLogicalClusterId(logicalClusterDO.getId()); logicalCluster.setLogicalClusterName(logicalClusterDO.getName()); + logicalCluster.setLogicalClusterIdentification(logicalClusterDO.getIdentification()); logicalCluster.setClusterVersion( physicalClusterMetadataManager.getKafkaVersion( logicalClusterDO.getClusterId(), diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java index 53e9a2ba3..2c2cc253c 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java @@ -13,9 +13,6 @@ public class ConfigUtils { @Value(value = "${custom.idc}") private String idc; - @Value("${custom.jmx.max-conn}") - private Integer jmxMaxConn; - @Value(value = "${spring.profiles.active}") private String kafkaManagerEnv; @@ -30,14 +27,6 @@ public void setIdc(String idc) { this.idc = idc; } - public Integer getJmxMaxConn() { - return jmxMaxConn; - } - - public void setJmxMaxConn(Integer jmxMaxConn) { - this.jmxMaxConn = jmxMaxConn; - } - public String getKafkaManagerEnv() { return kafkaManagerEnv; } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java index 16a185e04..a94ec9de7 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java @@ -1,5 +1,6 @@ package com.xiaojukeji.kafka.manager.service.zookeeper; +import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata; import com.xiaojukeji.kafka.manager.common.zookeeper.StateChangeListener; import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl; @@ -22,12 +23,12 @@ public class BrokerStateListener implements StateChangeListener { private ZkConfigImpl zkConfig; - private Integer jmxMaxConn; + private JmxConfig jmxConfig; - public BrokerStateListener(Long clusterId, ZkConfigImpl zkConfig, Integer jmxMaxConn) { + public BrokerStateListener(Long clusterId, ZkConfigImpl zkConfig, JmxConfig jmxConfig) { this.clusterId = clusterId; this.zkConfig = zkConfig; - this.jmxMaxConn = jmxMaxConn; + this.jmxConfig = jmxConfig; } @Override @@ -84,7 +85,7 @@ private void processBrokerAdded(Integer brokerId) { } brokerMetadata.setClusterId(clusterId); brokerMetadata.setBrokerId(brokerId); - PhysicalClusterMetadataManager.putBrokerMetadata(clusterId, brokerId, brokerMetadata, jmxMaxConn); + PhysicalClusterMetadataManager.putBrokerMetadata(clusterId, brokerId, brokerMetadata, jmxConfig); } catch (Exception e) { LOGGER.error("add broker failed, clusterId:{} brokerMetadata:{}.", clusterId, brokerMetadata, e); } diff --git a/kafka-manager-dao/src/main/resources/mapper/ClusterDao.xml b/kafka-manager-dao/src/main/resources/mapper/ClusterDao.xml index a03eb6e02..53b902939 100644 --- a/kafka-manager-dao/src/main/resources/mapper/ClusterDao.xml +++ b/kafka-manager-dao/src/main/resources/mapper/ClusterDao.xml @@ -12,6 +12,7 @@ + INSERT INTO cluster ( - cluster_name, zookeeper, bootstrap_servers, security_properties + cluster_name, zookeeper, bootstrap_servers, security_properties, jmx_properties ) VALUES ( - #{clusterName}, #{zookeeper}, #{bootstrapServers}, #{securityProperties} + #{clusterName}, #{zookeeper}, #{bootstrapServers}, #{securityProperties}, #{jmxProperties} ) @@ -30,6 +31,7 @@ cluster_name=#{clusterName}, bootstrap_servers=#{bootstrapServers}, security_properties=#{securityProperties}, + jmx_properties=#{jmxProperties}, status=#{status} WHERE id = #{id} diff --git a/kafka-manager-dao/src/main/resources/mapper/LogicalClusterDao.xml b/kafka-manager-dao/src/main/resources/mapper/LogicalClusterDao.xml index b4478067c..eef0b79fb 100644 --- a/kafka-manager-dao/src/main/resources/mapper/LogicalClusterDao.xml +++ b/kafka-manager-dao/src/main/resources/mapper/LogicalClusterDao.xml @@ -1,24 +1,25 @@ - - - - + + + + - - - - - - + + + + + + + INSERT INTO logical_cluster - (name, app_id, cluster_id, region_list, mode, description) + (name, identification, app_id, cluster_id, region_list, mode, description) VALUES - (#{name}, #{appId}, #{clusterId}, #{regionList}, #{mode}, #{description}) + (#{name}, #{identification}, #{appId}, #{clusterId}, #{regionList}, #{mode}, #{description}) @@ -27,7 +28,8 @@ UPDATE logical_cluster SET - + name=#{name}, + cluster_id=#{clusterId}, region_list=#{regionList}, description=#{description}, diff --git a/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eConverter.java b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eConverter.java index 7735caf85..c69ae906b 100644 --- a/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eConverter.java +++ b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eConverter.java @@ -4,6 +4,7 @@ import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.monitor.common.entry.*; import com.xiaojukeji.kafka.manager.monitor.component.n9e.entry.*; +import com.xiaojukeji.kafka.manager.monitor.component.n9e.entry.bizenum.CategoryEnum; import java.util.*; @@ -44,7 +45,7 @@ public static N9eStrategy convert2N9eStrategy(Strategy strategy, if (!ValidateUtils.isNull(strategy.getId())) { n9eStrategy.setId(strategy.getId().intValue()); } - n9eStrategy.setCategory(1); + n9eStrategy.setCategory(CategoryEnum.DEVICE_INDEPENDENT.getCode()); n9eStrategy.setName(strategy.getName()); n9eStrategy.setNid(monitorN9eNid); n9eStrategy.setExcl_nid(new ArrayList<>()); @@ -77,7 +78,13 @@ public static N9eStrategy convert2N9eStrategy(Strategy strategy, n9eStrategy.setRecovery_notify(0); StrategyAction strategyAction = strategy.getStrategyActionList().get(0); - n9eStrategy.setConverge(ListUtils.string2IntList(strategyAction.getConverge())); + + // 单位转换, 夜莺的单位是秒, KM前端的单位是分钟 + List convergeList = ListUtils.string2IntList(strategyAction.getConverge()); + if (!ValidateUtils.isEmptyList(convergeList)) { + convergeList.set(0, convergeList.get(0) * 60); + } + n9eStrategy.setConverge(convergeList); List notifyGroups = new ArrayList<>(); for (String name: ListUtils.string2StrList(strategyAction.getNotifyGroup())) { @@ -167,7 +174,13 @@ public static Strategy convert2Strategy(N9eStrategy n9eStrategy, Map convergeList = n9eStrategy.getConverge(); + if (!ValidateUtils.isEmptyList(convergeList)) { + convergeList.set(0, convergeList.get(0) / 60); + } + strategyAction.setConverge(ListUtils.intList2String(convergeList)); + strategyAction.setCallback(n9eStrategy.getCallback()); strategy.setStrategyActionList(Arrays.asList(strategyAction)); diff --git a/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/bizenum/CategoryEnum.java b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/bizenum/CategoryEnum.java new file mode 100644 index 000000000..9695c7571 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/bizenum/CategoryEnum.java @@ -0,0 +1,23 @@ +package com.xiaojukeji.kafka.manager.monitor.component.n9e.entry.bizenum; + +public enum CategoryEnum { + DEVICE_RELATED(1, "设备相关"), + DEVICE_INDEPENDENT(2, "设备无关"), + ; + private int code; + + private String msg; + + CategoryEnum(int code, String msg) { + this.code = code; + this.msg = msg; + } + + public int getCode() { + return code; + } + + public String getMsg() { + return msg; + } +} diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Monitor.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Monitor.java index e8df775bb..e2ac74a92 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Monitor.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Monitor.java @@ -73,7 +73,7 @@ private void sink2Monitor(Long clusterId, Long now) throws Exception { continue; } - metricSinkPoints.addAll(recordTopics(now, logicalClusterDO.getName(), metrics)); + metricSinkPoints.addAll(recordTopics(now, logicalClusterDO.getIdentification(), metrics)); if (metricSinkPoints.size() > MonitorSinkConstant.MONITOR_SYSTEM_SINK_THRESHOLD) { abstractMonitor.sinkMetrics(metricSinkPoints); metricSinkPoints.clear(); diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkConsumerMetrics2Monitor.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkConsumerMetrics2Monitor.java index 3b5f0ad4e..4ca276f9b 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkConsumerMetrics2Monitor.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkConsumerMetrics2Monitor.java @@ -64,7 +64,7 @@ private void sinkConsumerGroup(List metricsList) { continue; } - metricSinkPoints.addAll(recordConsumer(elem.getTimestampUnitMs() / 1000, logicalClusterDO.getName(), elem)); + metricSinkPoints.addAll(recordConsumer(elem.getTimestampUnitMs() / 1000, logicalClusterDO.getIdentification(), elem)); if (metricSinkPoints.size() > MonitorSinkConstant.MONITOR_SYSTEM_SINK_THRESHOLD) { abstractMonitor.sinkMetrics(metricSinkPoints); metricSinkPoints.clear(); diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkTopicThrottledMetrics2Monitor.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkTopicThrottledMetrics2Monitor.java index c48719057..fb95947c3 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkTopicThrottledMetrics2Monitor.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkTopicThrottledMetrics2Monitor.java @@ -57,7 +57,7 @@ private void sink2MonitorSystem(Long clusterId, continue; } - MetricSinkPoint point = recordTopicThrottled(startTime, logicalClusterDO.getName(), elem); + MetricSinkPoint point = recordTopicThrottled(startTime, logicalClusterDO.getIdentification(), elem); if (ValidateUtils.isNull(point)) { continue; } diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalAccountController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalAccountController.java index 91a0dbaf9..9b35ec87e 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalAccountController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalAccountController.java @@ -40,8 +40,7 @@ public class NormalAccountController { public Result> searchOnJobStaffByKeyWord(@RequestParam("keyWord") String keyWord) { List staffList = accountService.searchAccountByPrefix(keyWord); if (ValidateUtils.isEmptyList(staffList)) { - LOGGER.info("class=NormalAccountController||method=searchOnJobStaffByKeyWord||keyWord={}||msg=staffList is empty!" - ,keyWord); + LOGGER.info("class=NormalAccountController||method=searchOnJobStaffByKeyWord||keyWord={}||msg=staffList is empty!", keyWord); return new Result<>(); } List voList = new ArrayList<>(); diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java index efc0eec83..6e59816b5 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java @@ -69,7 +69,8 @@ public Result getTopicBasic( } return new Result<>(TopicModelConverter.convert2TopicBasicVO( topicService.getTopicBasicDTO(physicalClusterId, topicName), - clusterService.getById(physicalClusterId) + clusterService.getById(physicalClusterId), + logicalClusterMetadataManager.getTopicLogicalClusterId(physicalClusterId, topicName) )); } diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpUtilsController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpUtilsController.java index c7b36cba4..6d9e7a741 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpUtilsController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpUtilsController.java @@ -166,7 +166,7 @@ public Result modifyTopic(@RequestBody TopicModificationDTO dto) { if (!ResultStatus.SUCCESS.equals(rs)) { return Result.buildFrom(rs); } - topicManagerService.modifyTopic(dto.getClusterId(), dto.getTopicName(), dto.getDescription(), operator); + topicManagerService.modifyTopicByOp(dto.getClusterId(), dto.getTopicName(), dto.getAppId(), dto.getDescription(), operator); return new Result(); } diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/ClusterModelConverter.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/ClusterModelConverter.java index 9c76a8e5b..d92967dd5 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/ClusterModelConverter.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/ClusterModelConverter.java @@ -55,6 +55,7 @@ public static LogicClusterVO convert2LogicClusterVO(LogicalCluster logicalCluste CopyUtils.copyProperties(vo, logicalCluster); vo.setClusterId(logicalCluster.getLogicalClusterId()); vo.setClusterName(logicalCluster.getLogicalClusterName()); + vo.setClusterIdentification(logicalCluster.getLogicalClusterIdentification()); return vo; } @@ -78,9 +79,8 @@ public static ClusterDO convert2ClusterDO(ClusterDTO reqObj) { ClusterDO clusterDO = new ClusterDO(); CopyUtils.copyProperties(clusterDO, reqObj); clusterDO.setId(reqObj.getClusterId()); - clusterDO.setSecurityProperties( - ValidateUtils.isNull(clusterDO.getSecurityProperties())? "": clusterDO.getSecurityProperties() - ); + clusterDO.setSecurityProperties(ValidateUtils.isNull(reqObj.getSecurityProperties())? "": reqObj.getSecurityProperties()); + clusterDO.setJmxProperties(ValidateUtils.isNull(reqObj.getJmxProperties())? "": reqObj.getJmxProperties()); return clusterDO; } diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/LogicalClusterModelConverter.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/LogicalClusterModelConverter.java index 3067aa12c..afdf0f034 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/LogicalClusterModelConverter.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/LogicalClusterModelConverter.java @@ -21,6 +21,7 @@ public static LogicalClusterVO convert2LogicalClusterVO(LogicalClusterDO logical LogicalClusterVO vo = new LogicalClusterVO(); vo.setLogicalClusterId(logicalClusterDO.getId()); vo.setLogicalClusterName(logicalClusterDO.getName()); + vo.setLogicalClusterIdentification(logicalClusterDO.getIdentification()); vo.setPhysicalClusterId(logicalClusterDO.getClusterId()); vo.setMode(logicalClusterDO.getMode()); vo.setRegionIdList(ListUtils.string2LongList(logicalClusterDO.getRegionList())); @@ -45,6 +46,7 @@ public static List convert2LogicalClusterVOList(List