From 0b55520a5a771f76ef1095e223d566d37ab1efe4 Mon Sep 17 00:00:00 2001 From: Qi Liu Date: Tue, 5 Mar 2024 17:18:51 -0500 Subject: [PATCH] [LI-HOTFIX] Add log de-duplicate for ConfigException() in parseAndValidateAddresses() (#504) * add log dedup for ConfigException() in parseAndValidateAddresses * dedup warn --- .../org/apache/kafka/clients/ClientUtils.java | 31 ++++++++++++++++--- .../apache/kafka/clients/ClientUtilsTest.java | 19 ++++++++++++ 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index ef44d33268270..2265a66a7fc36 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.clients; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.SaslConfigs; @@ -40,6 +42,8 @@ public final class ClientUtils { private static final Logger log = LoggerFactory.getLogger(ClientUtils.class); + private static final long DUPLICATE_WINDOW_MS = 1000; // 1 second + private static final Map ERROR_DEDUPLICATION_CACHE = new ConcurrentHashMap<>(); private ClientUtils() { } @@ -72,7 +76,9 @@ public static List parseAndValidateAddresses(List url String resolvedCanonicalName = inetAddress.getCanonicalHostName(); InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port); if (address.isUnresolved()) { - log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname {} failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host); + String message = String.format("Couldn't resolve server %s from %s as DNS resolution of the canonical hostname %s failed for %s", + url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host); + dedupeAndHandleMessage(message, false); } else { addresses.add(address); } @@ -80,7 +86,8 @@ public static List parseAndValidateAddresses(List url } else { InetSocketAddress address = new InetSocketAddress(host, port); if (address.isUnresolved()) { - log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host); + String message = String.format("Couldn't resolve server %s from %s as DNS resolution failed for %s", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host); + dedupeAndHandleMessage(message, false); } else { addresses.add(address); } @@ -94,11 +101,27 @@ public static List parseAndValidateAddresses(List url } } if (addresses.isEmpty()) - throw new ConfigException("No resolvable bootstrap server in provided urls: " + - String.join(",", urls)); + dedupeAndHandleMessage("No resolvable bootstrap server in provided urls: " + String.join(",", urls), true); return addresses; } + public static void dedupeAndHandleMessage(String message, Boolean isError) { + long currentTime = System.currentTimeMillis(); + if (!isDuplicateError(message, currentTime)) { + ERROR_DEDUPLICATION_CACHE.put(message, currentTime); + if (isError) { + throw new ConfigException(message); + } else { + log.warn(message); + } + } + } + + private static boolean isDuplicateError(String message, long currentTime) { + Long previousTime = ERROR_DEDUPLICATION_CACHE.get(message); + return previousTime != null && (currentTime - previousTime) < DUPLICATE_WINDOW_MS; + } + /** * Create a new channel builder from the provided configuration. * diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java index 7ef55eb98538b..d9d0f5c147514 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java @@ -115,6 +115,25 @@ public void testResolveDnsLookup() throws UnknownHostException { assertEquals(asList(addresses), ClientUtils.resolve("kafka.apache.org", hostResolver)); } + @Test + public void testParseAndValidateAddressesDedupesErrors() { + int expectedNumberOfErrors = 1; + int actualNumberOfErrors = 0; + String expectedErrorMessage = "No resolvable bootstrap server in provided urls: "; + + for (int i = 0; i < 10; i++) { + try { + ClientUtils.parseAndValidateAddresses(Collections.emptyList()); + } catch (ConfigException e) { + assertEquals(expectedErrorMessage, e.getMessage()); + actualNumberOfErrors++; + } + } + + // Verify that only one error was thrown during the loop + assertEquals(expectedNumberOfErrors, actualNumberOfErrors); + } + private List checkWithoutLookup(String... url) { return ClientUtils.parseAndValidateAddresses(asList(url), ClientDnsLookup.USE_ALL_DNS_IPS); }