Skip to content

Commit

Permalink
[Bugfix]修复删除Kafka集群后,Connect集群任务出现NPE问题 (#1129)
Browse files Browse the repository at this point in the history
原因:

首先,删除Kafka集群后,没有将DB中的Connect集群进行删除。随后,进行Connect集群指标采集时,由于所在的Kafka集群已经不存在了。最终,导致NPE;

解决:
发布一个Kafka集群删除事件,触发MetaDataService子类,将其在DB中的数据进行删除。

遗留:

当前MetaDataService仅在部分元信息同步类中实现,导致当前DB中的脏数据清理不彻底,后续等MetaDataService在所有元信息同步类中实现后,便可彻底清理数据。

PS:当前修复已保证NPE问题不会再出现。
  • Loading branch information
ZQKC authored Aug 16, 2023
1 parent a730961 commit d1417be
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.xiaojukeji.know.streaming.km.common.bean.event.cluster.connect;

import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyBaseEvent;
import lombok.Getter;

/**
* 集群删除事件
* @author zengqiao
* @date 23/08/15
*/
@Getter
public class ClusterPhyDeletedEvent extends ClusterPhyBaseEvent {
public ClusterPhyDeletedEvent(Object source, Long clusterPhyId) {
super(source, clusterPhyId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent;
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.connect.ClusterPhyDeletedEvent;
import com.xiaojukeji.know.streaming.km.common.bean.po.cluster.ClusterPhyPO;
import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
Expand Down Expand Up @@ -146,6 +147,9 @@ public Result<Void> removeClusterPhyById(Long clusterPhyId, String operator) {
String.format("删除集群:%s",clusterPhy.toString()));
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);

// 发布删除集群事件
SpringTool.publish(new ClusterPhyDeletedEvent(this, clusterPhyId));

return Result.buildSuc();
} catch (Exception e) {
log.error("method=removeClusterPhyById||clusterPhyId={}||operator={}||msg=remove cluster failed||errMsg=exception!",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.cluster.ConnectClusterDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectClusterMetadata;
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSGroupDescription;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService;

import java.util.List;

/**
* Connect-Cluster
*/
public interface ConnectClusterService {
public interface ConnectClusterService extends MetaDataService<KSGroupDescription> {
Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata);

List<ConnectCluster> listByKafkaCluster(Long kafkaClusterPhyId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ public class ConnectClusterServiceImpl implements ConnectClusterService {
@Autowired
private OpLogWrapService opLogWrapService;

@Override
public int deleteInDBByKafkaClusterId(Long clusterPhyId) {
LambdaQueryWrapper<ConnectClusterPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(ConnectClusterPO::getKafkaClusterPhyId, clusterPhyId);

return connectClusterDAO.deleteById(lambdaQueryWrapper);
}

@Override
public Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata) {
ConnectClusterPO oldPO = this.getPOFromDB(metadata.getKafkaClusterPhyId(), metadata.getGroupName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.xiaojukeji.know.streaming.km.task.service.listener;

import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.connect.ClusterPhyDeletedEvent;
import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Service;

@Service
public class TaskClusterDeletedListener implements ApplicationListener<ClusterPhyDeletedEvent> {
private static final ILog LOGGER = LogFactory.getLog(TaskClusterDeletedListener.class);

@Override
public void onApplicationEvent(ClusterPhyDeletedEvent event) {
LOGGER.info("method=onApplicationEvent||clusterPhyId={}||msg=listened delete cluster", event.getClusterPhyId());

// 交由KS自定义的线程池,异步执行任务
FutureUtil.quickStartupFutureUtil.submitTask(
() -> {
// 延迟60秒,避免正在运行的任务,将数据写入DB中
BackoffUtils.backoff(60000);

for (MetaDataService metaDataService: SpringTool.getBeansOfType(MetaDataService.class).values()) {
LOGGER.info(
"method=onApplicationEvent||clusterPhyId={}||className={}||msg=delete cluster data in db starting",
event.getClusterPhyId(), metaDataService.getClass().getSimpleName()
);

try {
// 删除数据
metaDataService.deleteInDBByKafkaClusterId(event.getClusterPhyId());

LOGGER.info(
"method=onApplicationEvent||clusterPhyId={}||className={}||msg=delete cluster data in db finished",
event.getClusterPhyId(), metaDataService.getClass().getSimpleName()
);
} catch (Exception e) {
LOGGER.error(
"method=onApplicationEvent||clusterPhyId={}||className={}||msg=delete cluster data in db failed||errMsg=exception",
event.getClusterPhyId(), metaDataService.getClass().getSimpleName(), e
);
}
}
}
);


}
}

0 comments on commit d1417be

Please sign in to comment.