Skip to content

Commit

Permalink
[LI-HOTFIX] Add log de-duplicate for ConfigException() in parseAndVal…
Browse files Browse the repository at this point in the history
…idateAddresses() (#504)

* add log dedup for ConfigException() in parseAndValidateAddresses

* dedup warn
  • Loading branch information
Q1Liu authored Mar 5, 2024
1 parent bca7532 commit 0b55520
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 4 deletions.
31 changes: 27 additions & 4 deletions clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.clients;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
Expand All @@ -40,6 +42,8 @@

public final class ClientUtils {
private static final Logger log = LoggerFactory.getLogger(ClientUtils.class);
private static final long DUPLICATE_WINDOW_MS = 1000; // 1 second
private static final Map<String, Long> ERROR_DEDUPLICATION_CACHE = new ConcurrentHashMap<>();

private ClientUtils() {
}
Expand Down Expand Up @@ -72,15 +76,18 @@ public static List<InetSocketAddress> parseAndValidateAddresses(List<String> url
String resolvedCanonicalName = inetAddress.getCanonicalHostName();
InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port);
if (address.isUnresolved()) {
log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname {} failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host);
String message = String.format("Couldn't resolve server %s from %s as DNS resolution of the canonical hostname %s failed for %s",
url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host);
dedupeAndHandleMessage(message, false);
} else {
addresses.add(address);
}
}
} else {
InetSocketAddress address = new InetSocketAddress(host, port);
if (address.isUnresolved()) {
log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
String message = String.format("Couldn't resolve server %s from %s as DNS resolution failed for %s", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
dedupeAndHandleMessage(message, false);
} else {
addresses.add(address);
}
Expand All @@ -94,11 +101,27 @@ public static List<InetSocketAddress> parseAndValidateAddresses(List<String> url
}
}
if (addresses.isEmpty())
throw new ConfigException("No resolvable bootstrap server in provided urls: " +
String.join(",", urls));
dedupeAndHandleMessage("No resolvable bootstrap server in provided urls: " + String.join(",", urls), true);
return addresses;
}

public static void dedupeAndHandleMessage(String message, Boolean isError) {
long currentTime = System.currentTimeMillis();
if (!isDuplicateError(message, currentTime)) {
ERROR_DEDUPLICATION_CACHE.put(message, currentTime);
if (isError) {
throw new ConfigException(message);
} else {
log.warn(message);
}
}
}

private static boolean isDuplicateError(String message, long currentTime) {
Long previousTime = ERROR_DEDUPLICATION_CACHE.get(message);
return previousTime != null && (currentTime - previousTime) < DUPLICATE_WINDOW_MS;
}

/**
* Create a new channel builder from the provided configuration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,25 @@ public void testResolveDnsLookup() throws UnknownHostException {
assertEquals(asList(addresses), ClientUtils.resolve("kafka.apache.org", hostResolver));
}

@Test
public void testParseAndValidateAddressesDedupesErrors() {
int expectedNumberOfErrors = 1;
int actualNumberOfErrors = 0;
String expectedErrorMessage = "No resolvable bootstrap server in provided urls: ";

for (int i = 0; i < 10; i++) {
try {
ClientUtils.parseAndValidateAddresses(Collections.emptyList());
} catch (ConfigException e) {
assertEquals(expectedErrorMessage, e.getMessage());
actualNumberOfErrors++;
}
}

// Verify that only one error was thrown during the loop
assertEquals(expectedNumberOfErrors, actualNumberOfErrors);
}

private List<InetSocketAddress> checkWithoutLookup(String... url) {
return ClientUtils.parseAndValidateAddresses(asList(url), ClientDnsLookup.USE_ALL_DNS_IPS);
}
Expand Down

0 comments on commit 0b55520

Please sign in to comment.