From 43aca84be579a041fe81f6de46075a3a8476c8a0 Mon Sep 17 00:00:00 2001 From: weidong_chang <51365967+chang-wd@users.noreply.github.com> Date: Wed, 20 Nov 2024 21:40:43 +0800 Subject: [PATCH 1/2] [Feature] #1121 support clearInactiveClusterPhyBrokers --- .../km/biz/cluster/ClusterBrokersManager.java | 10 +++++ .../impl/ClusterBrokersManagerImpl.java | 6 +++ .../layout-clusters-fe/src/api/index.ts | 2 + .../src/pages/BrokerList/index.tsx | 42 +++++++++++++++++++ .../km/core/service/broker/BrokerService.java | 2 + .../broker/impl/BrokerServiceImpl.java | 14 +++++++ .../v3/cluster/ClusterBrokersController.java | 8 ++++ 7 files changed, 84 insertions(+) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterBrokersManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterBrokersManager.java index 8427c1ef0..68a5b7b95 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterBrokersManager.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterBrokersManager.java @@ -17,6 +17,16 @@ public interface ClusterBrokersManager { */ PaginationResult getClusterPhyBrokersOverview(Long clusterPhyId, ClusterBrokersOverviewDTO dto); + /** + * 删除status == 0 的所有broker -> 获取缓存查询结果 & broker 表查询结果并集 + * 获取缓存查询结果 & broker 表查询结果并集 + * @param clusterPhyId kafka 物理集群 id + * @param dto 封装分页查询参数对象 + * @return 返回获取到的缓存查询结果 & broker 表查询结果并集 + */ + PaginationResult deleteInactiveClusterPhyBrokers(Long clusterPhyId, ClusterBrokersOverviewDTO dto); + + /** * 根据物理集群id获取集群对应broker状态信息 * @param clusterPhyId 物理集群 id diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java index c77724dd8..3254731d3 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java @@ -107,6 +107,12 @@ public PaginationResult getClusterPhyBrokersOverview(L ); } + @Override + public PaginationResult deleteInactiveClusterPhyBrokers(Long clusterPhyId, ClusterBrokersOverviewDTO dto) { + brokerService.deleteInactiveClusterPhyBrokers(clusterPhyId); + return this.getClusterPhyBrokersOverview(clusterPhyId, dto); + } + @Override public ClusterBrokersStateVO getClusterPhyBrokersState(Long clusterPhyId) { ClusterBrokersStateVO clusterBrokersStateVO = new ClusterBrokersStateVO(); diff --git a/km-console/packages/layout-clusters-fe/src/api/index.ts b/km-console/packages/layout-clusters-fe/src/api/index.ts index d07599524..97db801a0 100755 --- a/km-console/packages/layout-clusters-fe/src/api/index.ts +++ b/km-console/packages/layout-clusters-fe/src/api/index.ts @@ -120,6 +120,8 @@ const api = { getTopicMetricPoints: (clusterPhyId: number, topicName: string) => getApi(`/clusters/${clusterPhyId}/topics/${topicName}/metric-points`), // Broker列表接口 getBrokersList: (clusterPhyId: number) => getApi(`/clusters/${clusterPhyId}/brokers-overview`), + // 删除失效Broker + clearInactiveBrokers: (clusterPhyId: number) => getApi(`/clusters/${clusterPhyId}/brokers-clear`), // Broker列表页健康检查指标 getBrokerMetricPoints: (clusterPhyId: number) => getApi(`/physical-clusters/${clusterPhyId}/latest-metrics`), // Controller列表接口 /api/v3/clusters/{clusterPhyId}/controller-history「controller-change-log」 diff --git a/km-console/packages/layout-clusters-fe/src/pages/BrokerList/index.tsx b/km-console/packages/layout-clusters-fe/src/pages/BrokerList/index.tsx index 9d5dd0537..7d6402117 100644 --- a/km-console/packages/layout-clusters-fe/src/pages/BrokerList/index.tsx +++ b/km-console/packages/layout-clusters-fe/src/pages/BrokerList/index.tsx @@ -66,6 +66,42 @@ const BrokerList: React.FC = (props: any) => { }); }; + // 请求接口获取数据 + const clearInactiveBrokers = async ({ pageNo, pageSize, filters, sorter }: any) => { + if (urlParams?.clusterId === undefined) return; + // filters = filters || filteredInfo; + setLoading(true); + const params = { + searchKeywords: searchKeywords.slice(0, 128), + pageNo, + pageSize, + latestMetricNames: ['PartitionsSkew', 'Leaders', 'LeadersSkew', 'LogSize'], + sortField: sorter?.field || 'brokerId', + sortType: sorter?.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : 'asc', + }; + + request(API.clearInactiveBrokers(urlParams?.clusterId), { method: 'POST', data: params }) + .then((res: any) => { + setPagination({ + current: res.pagination?.pageNo, + pageSize: res.pagination?.pageSize, + total: res.pagination?.total, + }); + const newData = + res?.bizData.map((item: any) => { + return { + ...item, + ...item?.latestMetrics?.metrics, + }; + }) || []; + setData(newData); + setLoading(false); + }) + .catch((err) => { + setLoading(false); + }); + }; + const onTableChange = (pagination: any, filters: any, sorter: any) => { // setFilteredInfo(filters); genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, sorter }); @@ -107,6 +143,12 @@ const BrokerList: React.FC = (props: any) => { > +
clearInactiveBrokers({ pageNo: pagination.current, pageSize: pagination.pageSize })} + > + +
listNotAliveBrokersFromDB(Long clusterPhyId) { return this.listAllBrokersAndUpdateCache(clusterPhyId).stream().filter( elem -> !elem.alive()).collect(Collectors.toList()); } + /** + * 清理对应集群中下线的broker记录 + * @param clusterPhyId + */ + @Override + public void clearInactiveClusterPhyBrokers(Long clusterPhyId) { + try { + this.getAllBrokerPOsFromDB(clusterPhyId).stream() + .filter(elem -> elem.getStatus().equals(Constant.DOWN)) + .forEach(elem -> brokerDAO.deleteById(elem.getId())); + } catch (Exception e) { + log.error("method=deleteInactiveClusterPhyBrokers||clusterPhyId={}||errMsg=exception!", clusterPhyId, e); + } + } @Override public List listAllBrokersFromDB(Long clusterPhyId) { return this.listAllBrokersAndUpdateCache(clusterPhyId); diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterBrokersController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterBrokersController.java index db9c933a4..3bfe5c6ad 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterBrokersController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterBrokersController.java @@ -62,4 +62,12 @@ public PaginationResult getClusterPhyBrokersOverview(@ @RequestBody ClusterBrokersOverviewDTO dto) { return clusterBrokersManager.getClusterPhyBrokersOverview(clusterPhyId, dto); } + + @ApiOperation(value = "集群无效brokers清理") + @PostMapping(value = "clusters/{clusterPhyId}/brokers-clear") + @ResponseBody + public PaginationResult clearInactiveClusterPhyBrokers(@PathVariable Long clusterPhyId, + @RequestBody ClusterBrokersOverviewDTO dto) { + return clusterBrokersManager.clearInactiveClusterPhyBrokers(clusterPhyId, dto); + } } \ No newline at end of file From aaf10ae3cad85587da996f2ab29ddaeb424056d1 Mon Sep 17 00:00:00 2001 From: weidong_chang <51365967+chang-wd@users.noreply.github.com> Date: Wed, 20 Nov 2024 22:02:16 +0800 Subject: [PATCH 2/2] [Feature] #1121 support clearInactiveClusterPhyBrokers --- .../know/streaming/km/biz/cluster/ClusterBrokersManager.java | 2 +- .../km/biz/cluster/impl/ClusterBrokersManagerImpl.java | 4 ++-- .../km/core/service/broker/impl/BrokerServiceImpl.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterBrokersManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterBrokersManager.java index 68a5b7b95..0eba136b1 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterBrokersManager.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterBrokersManager.java @@ -24,7 +24,7 @@ public interface ClusterBrokersManager { * @param dto 封装分页查询参数对象 * @return 返回获取到的缓存查询结果 & broker 表查询结果并集 */ - PaginationResult deleteInactiveClusterPhyBrokers(Long clusterPhyId, ClusterBrokersOverviewDTO dto); + PaginationResult clearInactiveClusterPhyBrokers(Long clusterPhyId, ClusterBrokersOverviewDTO dto); /** diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java index 3254731d3..d8796dcdd 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java @@ -108,8 +108,8 @@ public PaginationResult getClusterPhyBrokersOverview(L } @Override - public PaginationResult deleteInactiveClusterPhyBrokers(Long clusterPhyId, ClusterBrokersOverviewDTO dto) { - brokerService.deleteInactiveClusterPhyBrokers(clusterPhyId); + public PaginationResult clearInactiveClusterPhyBrokers(Long clusterPhyId, ClusterBrokersOverviewDTO dto) { + brokerService.clearInactiveClusterPhyBrokers(clusterPhyId); return this.getClusterPhyBrokersOverview(clusterPhyId, dto); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java index aad774730..105019f74 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java @@ -187,7 +187,7 @@ public void clearInactiveClusterPhyBrokers(Long clusterPhyId) { .filter(elem -> elem.getStatus().equals(Constant.DOWN)) .forEach(elem -> brokerDAO.deleteById(elem.getId())); } catch (Exception e) { - log.error("method=deleteInactiveClusterPhyBrokers||clusterPhyId={}||errMsg=exception!", clusterPhyId, e); + log.error("method=clearInactiveClusterPhyBrokers||clusterPhyId={}||errMsg=exception!", clusterPhyId, e); } } @Override