diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java index 2b03eac4b7..4cfecd0859 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java @@ -114,30 +114,19 @@ public void updateSubscription(ClientInfo clientInfo, String consumerGroup, if (CollectionUtils.isEmpty(groupTopicClients)) { log.error("group {} topic {} clients is empty", consumerGroup, subscription); } + ConsumerGroupConf consumerGroupConf = localConsumerGroupMapping.computeIfAbsent(consumerGroup, (consumerGroupInner) -> + new ConsumerGroupConf(consumerGroup) + ); - ConsumerGroupConf consumerGroupConf = localConsumerGroupMapping.get(consumerGroup); - if (consumerGroupConf == null) { - // new subscription - ConsumerGroupConf prev = localConsumerGroupMapping.putIfAbsent(consumerGroup, new ConsumerGroupConf(consumerGroup)); - if (prev == null) { - log.info("add new subscription, consumer group: {}", consumerGroup); - } - consumerGroupConf = localConsumerGroupMapping.get(consumerGroup); - } - - ConsumerGroupTopicConf consumerGroupTopicConf = consumerGroupConf.getConsumerGroupTopicConf() - .get(subscription.getTopic()); - if (consumerGroupTopicConf == null) { - consumerGroupConf.getConsumerGroupTopicConf().computeIfAbsent(subscription.getTopic(), (topic) -> { + ConsumerGroupTopicConf consumerGroupTopicConf = + consumerGroupConf.getConsumerGroupTopicConf().computeIfAbsent(subscription.getTopic(), (topicInner) -> { ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf(); newTopicConf.setConsumerGroup(consumerGroup); - newTopicConf.setTopic(topic); + newTopicConf.setTopic(topicInner); newTopicConf.setSubscriptionItem(subscription); log.info("add new {}", newTopicConf); return newTopicConf; }); - consumerGroupTopicConf = consumerGroupConf.getConsumerGroupTopicConf().get(subscription.getTopic()); - } consumerGroupTopicConf.getUrls().add(url); if (!consumerGroupTopicConf.getIdcUrls().containsKey(clientInfo.getIdc())) {