Skip to content

Commit

Permalink
feat: 카프카 consumer id를 동적으로 생성하도록 변경
Browse files Browse the repository at this point in the history
  • Loading branch information
kimday0326 committed May 7, 2024
1 parent d6f5654 commit 410956c
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class Consumer {

private final SimpMessageSendingOperations messageSendingOperations;

@KafkaListener(topics = "${spring.kafka.template.default-topic}", groupId = "${spring.kafka.consumer.group-id}")
@KafkaListener(topics = "${spring.kafka.template.default-topic}")
public void consume(KafkaMessage message) {
log.info(">>>>>> Consume !!!!!!! Message {} ", message);
messageSendingOperations.convertAndSend(message.destination(), message.message());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.pgms.coreinfrakafka.kafka.config;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -22,16 +26,16 @@

import com.pgms.coreinfrakafka.kafka.KafkaMessage;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@EnableKafka
@Configuration
public class KafkaConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServer;

@Value("${spring.kafka.consumer.group-id}")
private String groupId;

@Bean
public ProducerFactory<String, KafkaMessage> producerFactory() {
Map<String, Object> config = new HashMap<>();
Expand All @@ -53,7 +57,7 @@ public KafkaTemplate<String, KafkaMessage> kafkaTemplate() {
public ConsumerFactory<String, KafkaMessage> consumerFactory() {
HashMap<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.GROUP_ID_CONFIG, getConsumerGroupId());
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(KafkaMessage.class));
}
Expand All @@ -64,4 +68,23 @@ public ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> kafkaListen
factory.setConsumerFactory(consumerFactory());
return factory;
}

private String getConsumerGroupId() {
// 현재 시간 가져오기 (UTC)
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
String currentTime = LocalDateTime.now().format(formatter);

// 호스트 이름 추출
String hostName = "unknown-host";
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
log.error("Failed to get host name", e);
}

// 컨슈머 그룹 ID 생성
String consumerGroupId = "consumer-" + hostName + "-" + currentTime;
log.info("Kafka Consumer Group ID: {}", consumerGroupId);
return consumerGroupId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@ spring:
bootstrap-servers: ${KAFKA_HOST}:9092
consumer:
auto-offset-reset: earliest
group-id: foo
template:
default-topic: tikitaza

0 comments on commit 410956c

Please sign in to comment.