Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] #1121 support clearInactiveClusterPhyBrokers #1248

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ public interface ClusterBrokersManager {
*/
PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(Long clusterPhyId, ClusterBrokersOverviewDTO dto);

/**
* 删除status == 0 的所有broker -> 获取缓存查询结果 & broker 表查询结果并集
* 获取缓存查询结果 & broker 表查询结果并集
* @param clusterPhyId kafka 物理集群 id
* @param dto 封装分页查询参数对象
* @return 返回获取到的缓存查询结果 & broker 表查询结果并集
*/
PaginationResult<ClusterBrokersOverviewVO> clearInactiveClusterPhyBrokers(Long clusterPhyId, ClusterBrokersOverviewDTO dto);


/**
* 根据物理集群id获取集群对应broker状态信息
* @param clusterPhyId 物理集群 id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ public PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(L
);
}

@Override
public PaginationResult<ClusterBrokersOverviewVO> clearInactiveClusterPhyBrokers(Long clusterPhyId, ClusterBrokersOverviewDTO dto) {
brokerService.clearInactiveClusterPhyBrokers(clusterPhyId);
return this.getClusterPhyBrokersOverview(clusterPhyId, dto);
}

@Override
public ClusterBrokersStateVO getClusterPhyBrokersState(Long clusterPhyId) {
ClusterBrokersStateVO clusterBrokersStateVO = new ClusterBrokersStateVO();
Expand Down
2 changes: 2 additions & 0 deletions km-console/packages/layout-clusters-fe/src/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ const api = {
getTopicMetricPoints: (clusterPhyId: number, topicName: string) => getApi(`/clusters/${clusterPhyId}/topics/${topicName}/metric-points`),
// Broker列表接口
getBrokersList: (clusterPhyId: number) => getApi(`/clusters/${clusterPhyId}/brokers-overview`),
// 删除失效Broker
clearInactiveBrokers: (clusterPhyId: number) => getApi(`/clusters/${clusterPhyId}/brokers-clear`),
// Broker列表页健康检查指标
getBrokerMetricPoints: (clusterPhyId: number) => getApi(`/physical-clusters/${clusterPhyId}/latest-metrics`),
// Controller列表接口 /api/v3/clusters/{clusterPhyId}/controller-history「controller-change-log」
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,42 @@ const BrokerList: React.FC = (props: any) => {
});
};

// 请求接口获取数据
const clearInactiveBrokers = async ({ pageNo, pageSize, filters, sorter }: any) => {
if (urlParams?.clusterId === undefined) return;
// filters = filters || filteredInfo;
setLoading(true);
const params = {
searchKeywords: searchKeywords.slice(0, 128),
pageNo,
pageSize,
latestMetricNames: ['PartitionsSkew', 'Leaders', 'LeadersSkew', 'LogSize'],
sortField: sorter?.field || 'brokerId',
sortType: sorter?.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : 'asc',
};

request(API.clearInactiveBrokers(urlParams?.clusterId), { method: 'POST', data: params })
.then((res: any) => {
setPagination({
current: res.pagination?.pageNo,
pageSize: res.pagination?.pageSize,
total: res.pagination?.total,
});
const newData =
res?.bizData.map((item: any) => {
return {
...item,
...item?.latestMetrics?.metrics,
};
}) || [];
setData(newData);
setLoading(false);
})
.catch((err) => {
setLoading(false);
});
};

const onTableChange = (pagination: any, filters: any, sorter: any) => {
// setFilteredInfo(filters);
genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, sorter });
Expand Down Expand Up @@ -107,6 +143,12 @@ const BrokerList: React.FC = (props: any) => {
>
<IconFont className={`${tableHeaderPrefix}-left-refresh-icon`} type="icon-shuaxin1" />
</div>
<div
className={`${tableHeaderPrefix}-left-clear`}
onClick={() => clearInactiveBrokers({ pageNo: pagination.current, pageSize: pagination.pageSize })}
>
<IconFont className={`${tableHeaderPrefix}-left-clear-icon`} type="icon-Operation" />
</div>
</div>
<div className={`${tableHeaderPrefix}-right`}>
<SearchInput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ public interface BrokerService {
boolean allServerDown(Long clusterPhyId);

boolean existServerDown(Long clusterPhyId);

void clearInactiveClusterPhyBrokers(Long clusterPhyId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,20 @@ public List<Broker> listNotAliveBrokersFromDB(Long clusterPhyId) {
return this.listAllBrokersAndUpdateCache(clusterPhyId).stream().filter( elem -> !elem.alive()).collect(Collectors.toList());
}

/**
* 清理对应集群中下线的broker记录
* @param clusterPhyId
*/
@Override
public void clearInactiveClusterPhyBrokers(Long clusterPhyId) {
try {
this.getAllBrokerPOsFromDB(clusterPhyId).stream()
.filter(elem -> elem.getStatus().equals(Constant.DOWN))
.forEach(elem -> brokerDAO.deleteById(elem.getId()));
} catch (Exception e) {
log.error("method=clearInactiveClusterPhyBrokers||clusterPhyId={}||errMsg=exception!", clusterPhyId, e);
}
}
@Override
public List<Broker> listAllBrokersFromDB(Long clusterPhyId) {
return this.listAllBrokersAndUpdateCache(clusterPhyId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,12 @@ public PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(@
@RequestBody ClusterBrokersOverviewDTO dto) {
return clusterBrokersManager.getClusterPhyBrokersOverview(clusterPhyId, dto);
}

@ApiOperation(value = "集群无效brokers清理")
@PostMapping(value = "clusters/{clusterPhyId}/brokers-clear")
@ResponseBody
public PaginationResult<ClusterBrokersOverviewVO> clearInactiveClusterPhyBrokers(@PathVariable Long clusterPhyId,
@RequestBody ClusterBrokersOverviewDTO dto) {
return clusterBrokersManager.clearInactiveClusterPhyBrokers(clusterPhyId, dto);
}
}
Loading