diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/auth/PlainTextAuthProviderBase.java b/core/src/main/java/com/datastax/oss/driver/api/core/auth/PlainTextAuthProviderBase.java index fb85797af9e..68f74ae3825 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/auth/PlainTextAuthProviderBase.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/auth/PlainTextAuthProviderBase.java @@ -21,6 +21,7 @@ import com.datastax.oss.driver.api.core.metadata.EndPoint; import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.shaded.guava.common.base.Charsets; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.net.InetSocketAddress; @@ -29,6 +30,7 @@ import java.nio.CharBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.List; import java.util.Objects; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; @@ -171,6 +173,12 @@ public SocketAddress resolve() { return new InetSocketAddress("127.0.0.1", 9042); } + @NonNull + @Override + public List resolveAll() { + return ImmutableList.of(this); + } + @NonNull @Override public String asMetricPrefix() { diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java index 530f2ad38ac..907d2265c2e 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java @@ -20,6 +20,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.List; /** * Encapsulates the information needed to open connections to a node. @@ -40,6 +41,13 @@ public interface EndPoint { @NonNull SocketAddress resolve(); + /** + * Resolves this instance to a list of {@link EndPoint}. + * + *

This is called occasionally to resolve unresolved endpoints to their resolved counterparts. + */ + @NonNull + List resolveAll(); /** * Returns an alternate string representation for use in node-level metric names. * diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java b/core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java index 1ed2a1cebf3..821b3ca2f79 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java @@ -19,6 +19,7 @@ import com.datastax.oss.driver.api.core.metadata.EndPoint; import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint; +import com.datastax.oss.driver.internal.core.metadata.UnresolvedEndPoint; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; import com.datastax.oss.driver.shaded.guava.common.collect.Sets; import java.net.InetAddress; @@ -41,18 +42,17 @@ public static Set merge( Set result = Sets.newHashSet(programmaticContactPoints); for (String spec : configContactPoints) { - for (InetSocketAddress address : extract(spec, resolve)) { - DefaultEndPoint endPoint = new DefaultEndPoint(address); + for (EndPoint endPoint : extract(spec, resolve)) { boolean wasNew = result.add(endPoint); if (!wasNew) { - LOG.warn("Duplicate contact point {}", address); + LOG.warn("Duplicate contact point {}", endPoint); } } } return ImmutableSet.copyOf(result); } - private static Set extract(String spec, boolean resolve) { + private static Set extract(String spec, boolean resolve) { int separator = spec.lastIndexOf(':'); if (separator < 0) { LOG.warn("Ignoring invalid contact point {} (expecting host:port)", spec); @@ -69,7 +69,7 @@ private static Set extract(String spec, boolean resolve) { return Collections.emptySet(); } if (!resolve) { - return ImmutableSet.of(InetSocketAddress.createUnresolved(host, port)); + return ImmutableSet.of(new UnresolvedEndPoint(host, port)); } else { try { InetAddress[] inetAddresses = InetAddress.getAllByName(host); @@ -79,9 +79,9 @@ private static Set extract(String spec, boolean resolve) { spec, Arrays.deepToString(inetAddresses)); } - Set result = new HashSet<>(); + Set result = new HashSet<>(); for (InetAddress inetAddress : inetAddresses) { - result.add(new InetSocketAddress(inetAddress, port)); + result.add(new DefaultEndPoint(new InetSocketAddress(inetAddress, port))); } return result; } catch (UnknownHostException e) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java index 7ffbee8e4bb..905c8c9f16b 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java @@ -18,9 +18,11 @@ package com.datastax.oss.driver.internal.core.metadata; import com.datastax.oss.driver.api.core.metadata.EndPoint; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.Serializable; import java.net.InetSocketAddress; +import java.util.List; import java.util.Objects; public class DefaultEndPoint implements EndPoint, Serializable { @@ -41,6 +43,12 @@ public InetSocketAddress resolve() { return address; } + @NonNull + @Override + public List resolveAll() { + return ImmutableList.of(this); + } + @Override public boolean equals(Object other) { if (other == this) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java index c21d5d8171e..1d17d334378 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java @@ -72,20 +72,26 @@ public Result compute( + "keeping only the first one", logPrefix, hostId); + continue; + } + EndPoint endPoint = nodeInfo.getEndPoint(); + DefaultNode node = findIn(contactPoints, endPoint); + if (node == null) { + node = new DefaultNode(endPoint, context); + LOG.debug("[{}] Adding new node {}", logPrefix, node); } else { - EndPoint endPoint = nodeInfo.getEndPoint(); - DefaultNode node = findIn(contactPoints, endPoint); - if (node == null) { - node = new DefaultNode(endPoint, context); - LOG.debug("[{}] Adding new node {}", logPrefix, node); - } else { - LOG.debug("[{}] Copying contact point {}", logPrefix, node); - } - if (tokenMapEnabled && tokenFactory == null && nodeInfo.getPartitioner() != null) { + LOG.debug("[{}] Copying contact point {}", logPrefix, node); + } + copyInfos(nodeInfo, node, context); + newNodes.put(hostId, node); + } + + if (tokenMapEnabled) { + for (NodeInfo nodeInfo : nodeInfos) { + if (nodeInfo.getPartitioner() != null) { tokenFactory = tokenFactoryRegistry.tokenFactoryFor(nodeInfo.getPartitioner()); + break; } - copyInfos(nodeInfo, node, context); - newNodes.put(hostId, node); } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java index 7aa2fb13bcd..88b5c00cab0 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java @@ -55,6 +55,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArraySet; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +81,8 @@ public class MetadataManager implements AsyncAutoCloseable { private volatile KeyspaceFilter keyspaceFilter; private volatile Boolean schemaEnabledProgrammatically; private volatile boolean tokenMapEnabled; - private volatile Set contactPoints; + private volatile Set contactPoints; + private volatile Set resolvedContactPoints; private volatile boolean wasImplicitContactPoint; private volatile TypeCodec tabletPayloadCodec = null; @@ -102,7 +104,7 @@ protected MetadataManager(InternalDriverContext context, DefaultMetadata initial DefaultDriverOption.METADATA_SCHEMA_REFRESHED_KEYSPACES, Collections.emptyList()); this.keyspaceFilter = KeyspaceFilter.newInstance(logPrefix, refreshedKeyspaces); this.tokenMapEnabled = config.getBoolean(DefaultDriverOption.METADATA_TOKEN_MAP_ENABLED); - + this.resolvedContactPoints = new CopyOnWriteArraySet<>(); context.getEventBus().register(ConfigChangeEvent.class, this::onConfigChanged); } @@ -145,18 +147,19 @@ public void addContactPoints(Set providedContactPoints) { // Convert the EndPoints to Nodes, but we can't put them into the Metadata yet, because we // don't know their host_id. So store them in a volatile field instead, they will get copied // during the first node refresh. - ImmutableSet.Builder contactPointsBuilder = ImmutableSet.builder(); + ImmutableSet.Builder contactPointsBuilder = ImmutableSet.builder(); if (providedContactPoints == null || providedContactPoints.isEmpty()) { LOG.info( "[{}] No contact points provided, defaulting to {}", logPrefix, DEFAULT_CONTACT_POINT); this.wasImplicitContactPoint = true; - contactPointsBuilder.add(new DefaultNode(DEFAULT_CONTACT_POINT, context)); + contactPointsBuilder.add(DEFAULT_CONTACT_POINT); } else { for (EndPoint endPoint : providedContactPoints) { - contactPointsBuilder.add(new DefaultNode(endPoint, context)); + contactPointsBuilder.add(endPoint); } } this.contactPoints = contactPointsBuilder.build(); + this.resolveContactPoints(); LOG.debug("[{}] Adding initial contact points {}", logPrefix, contactPoints); } @@ -167,7 +170,30 @@ public void addContactPoints(Set providedContactPoints) { * @see #wasImplicitContactPoint() */ public Set getContactPoints() { - return contactPoints; + return resolvedContactPoints; + } + + public synchronized void resolveContactPoints() { + ImmutableSet.Builder resultBuilder = ImmutableSet.builder(); + for (EndPoint endPoint : contactPoints) { + List resolveEndpoints = endPoint.resolveAll(); + if (resolveEndpoints.isEmpty()) { + LOG.error("failed to resolve contact endpoint {}", endPoint); + } else { + resultBuilder.addAll(resolveEndpoints); + } + } + + Set result = resultBuilder.build(); + for (EndPoint endPoint : result) { + if (resolvedContactPoints.stream() + .anyMatch(resolved -> resolved.getEndPoint().equals(endPoint))) { + continue; + } + this.resolvedContactPoints.add(new DefaultNode(endPoint, context)); + } + + this.resolvedContactPoints.removeIf(endPoint -> !result.contains(endPoint.getEndPoint())); } /** Whether the default contact point was used (because none were provided explicitly). */ @@ -337,10 +363,13 @@ private SingleThreaded(InternalDriverContext context, DriverExecutionProfile con } private Void refreshNodes(Iterable nodeInfos) { + if (!didFirstNodeListRefresh) { + resolveContactPoints(); + } MetadataRefresh refresh = didFirstNodeListRefresh ? new FullNodeListRefresh(nodeInfos) - : new InitialNodeListRefresh(nodeInfos, contactPoints); + : new InitialNodeListRefresh(nodeInfos, resolvedContactPoints); didFirstNodeListRefresh = true; return apply(refresh); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java index ace4e82617d..f5085df5ab8 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java @@ -18,6 +18,7 @@ package com.datastax.oss.driver.internal.core.metadata; import com.datastax.oss.driver.api.core.metadata.EndPoint; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import com.datastax.oss.driver.shaded.guava.common.primitives.UnsignedBytes; import edu.umd.cs.findbugs.annotations.NonNull; import java.net.InetAddress; @@ -25,6 +26,7 @@ import java.net.UnknownHostException; import java.util.Arrays; import java.util.Comparator; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; @@ -72,6 +74,12 @@ public InetSocketAddress resolve() { } } + @NonNull + @Override + public List resolveAll() { + return ImmutableList.of(this); + } + @Override public boolean equals(Object other) { if (other == this) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/UnresolvedEndPoint.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/UnresolvedEndPoint.java new file mode 100644 index 00000000000..d55ae047951 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/UnresolvedEndPoint.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.metadata; + +import com.datastax.oss.driver.api.core.metadata.EndPoint; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.Serializable; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class UnresolvedEndPoint implements EndPoint, Serializable { + private final String metricPrefix; + String host; + int port; + + private final List EMPTY = new ArrayList<>(); + + public UnresolvedEndPoint(String host, int port) { + this.host = host; + this.port = port; + this.metricPrefix = buildMetricPrefix(host, port); + } + + @NonNull + @Override + public SocketAddress resolve() { + throw new RuntimeException( + String.format( + "This endpoint %s should never been resolved, but it happened, it somehow leaked to downstream code.", + this)); + } + + @NonNull + @Override + public List resolveAll() { + try { + InetAddress[] inetAddresses = InetAddress.getAllByName(host); + Set result = new HashSet<>(); + for (InetAddress inetAddress : inetAddresses) { + result.add(new DefaultEndPoint(new InetSocketAddress(inetAddress, port))); + } + return new ArrayList<>(result); + } catch (UnknownHostException e) { + return EMPTY; + } + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + if (other instanceof UnresolvedEndPoint) { + UnresolvedEndPoint that = (UnresolvedEndPoint) other; + return this.host.equals(that.host) && this.port == that.port; + } + return false; + } + + @Override + public int hashCode() { + return host.toLowerCase().hashCode() + port; + } + + @Override + public String toString() { + return host + ":" + port; + } + + @NonNull + @Override + public String asMetricPrefix() { + return metricPrefix; + } + + private static String buildMetricPrefix(String host, int port) { + // Append the port since Cassandra 4 supports nodes with different ports + return host.replace('.', '_') + ':' + port; + } +} diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java index 9e0d8737619..81093d35134 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java @@ -29,6 +29,7 @@ import ch.qos.logback.core.Appender; import com.datastax.oss.driver.api.core.metadata.EndPoint; import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint; +import com.datastax.oss.driver.internal.core.metadata.UnresolvedEndPoint; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; import java.net.InetAddress; @@ -94,9 +95,7 @@ public void should_parse_host_and_port_in_configuration_and_create_unresolved() Set endPoints = ContactPoints.merge(Collections.emptySet(), ImmutableList.of("localhost:9042"), false); - assertThat(endPoints) - .containsExactly( - new DefaultEndPoint(InetSocketAddress.createUnresolved("localhost", 9042))); + assertThat(endPoints).containsExactly(new UnresolvedEndPoint("localhost", 9042)); } @Test diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/EmbeddedEndPoint.java b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/EmbeddedEndPoint.java index 5e463299a66..0d9895374d3 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/EmbeddedEndPoint.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/EmbeddedEndPoint.java @@ -20,6 +20,7 @@ import com.datastax.oss.driver.api.core.metadata.EndPoint; import edu.umd.cs.findbugs.annotations.NonNull; import java.net.SocketAddress; +import java.util.List; /** Endpoint implementation for unit tests that use an embedded Netty channel. */ public class EmbeddedEndPoint implements EndPoint { @@ -30,6 +31,12 @@ public SocketAddress resolve() { throw new UnsupportedOperationException("This should not get called from unit tests"); } + @NonNull + @Override + public List resolveAll() { + throw new UnsupportedOperationException("This should not get called from unit tests"); + } + @NonNull @Override public String asMetricPrefix() { diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/LocalEndPoint.java b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/LocalEndPoint.java index c90731eece9..8836ae607d7 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/LocalEndPoint.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/LocalEndPoint.java @@ -18,9 +18,11 @@ package com.datastax.oss.driver.internal.core.channel; import com.datastax.oss.driver.api.core.metadata.EndPoint; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import edu.umd.cs.findbugs.annotations.NonNull; import io.netty.channel.local.LocalAddress; import java.net.SocketAddress; +import java.util.List; /** Endpoint implementation for unit tests that use the local Netty transport. */ public class LocalEndPoint implements EndPoint { @@ -37,6 +39,12 @@ public SocketAddress resolve() { return localAddress; } + @NonNull + @Override + public List resolveAll() { + return ImmutableList.of(this); + } + @NonNull @Override public String asMetricPrefix() { diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java index 93ecbf1815c..18eb174b911 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java @@ -25,7 +25,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.datastax.oss.driver.api.core.CqlSession; @@ -96,9 +95,6 @@ public void should_connect_with_mocked_hostname() { .filter(x -> x.toString().contains("test.cluster.fake")) .collect(Collectors.toSet()); assertThat(filteredNodes).hasSize(1); - InetSocketAddress address = - (InetSocketAddress) filteredNodes.iterator().next().getEndPoint().resolve(); - assertTrue(address.isUnresolved()); } } } @@ -172,7 +168,7 @@ public void replace_cluster_test() { nodes.stream() .filter(x -> x.toString().contains("test.cluster.fake")) .collect(Collectors.toSet()); - assertThat(filteredNodes).hasSize(1); + assertThat(filteredNodes).hasSize(3); } try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(numberOfNodes).withIpPrefix("127.0.1.").build()) { @@ -413,4 +409,40 @@ public void cannot_reconnect_with_resolved_socket() { } session.close(); } + + @Test + public void should_connect_when_first_node_is_unavailable() { + // Reproduce case when dns first record points to the node that is unresponsive + // With RESOLVE_CONTACT_POINTS set to false + DriverConfigLoader loader = + new DefaultProgrammaticDriverConfigLoaderBuilder() + .withBoolean(TypedDriverOption.RESOLVE_CONTACT_POINTS.getRawOption(), false) + .withBoolean(TypedDriverOption.RECONNECT_ON_INIT.getRawOption(), true) + .withStringList( + TypedDriverOption.CONTACT_POINTS.getRawOption(), + Collections.singletonList("test.cluster.fake:9042")) + .build(); + + CqlSessionBuilder builder = new CqlSessionBuilder().withConfigLoader(loader); + CqlSession session; + try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.0.1.").build()) { + MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake"); + MultimapHostResolverProvider.addResolverEntry( + "test.cluster.fake", ccmBridge.getNodeIpAddress(11)); + MultimapHostResolverProvider.addResolverEntry( + "test.cluster.fake", ccmBridge.getNodeIpAddress(2)); + MultimapHostResolverProvider.addResolverEntry( + "test.cluster.fake", ccmBridge.getNodeIpAddress(3)); + ccmBridge.create(); + ccmBridge.start(); + session = builder.build(); + SimpleStatement statement = + new SimpleStatementBuilder("SELECT * FROM system.local") + .setTimeout(Duration.ofSeconds(3)) + .build(); + session.execute(statement); + ccmBridge.stop(2); + session.execute(statement); + } + } }