From 8b9cd3d9bde79b0ab2565aaa5dad4aff217887da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Thu, 12 Sep 2024 15:28:58 +0200 Subject: [PATCH] Add optional fallback for `ControlConnection#reconnect()` Adds an experimental option to allow `ControlConnection` to try reconnecting to the original contact points held by `MetadataManager`, in case of getting empty query plan from the load balancing policy. In order to separate this logic from query plans of other queries `LoadBalancingPolicyWrapper#newControlReconnectionQueryPlan()` was introduced and is called during reconnection in place of `newQueryPlan()`. --- .../api/core/config/DefaultDriverOption.java | 9 ++++++++ .../driver/api/core/config/OptionsMap.java | 1 + .../api/core/config/TypedDriverOption.java | 4 ++++ .../core/control/ControlConnection.java | 2 +- .../metadata/LoadBalancingPolicyWrapper.java | 23 +++++++++++++++++++ .../core/metadata/SchemaAgreementChecker.java | 2 +- core/src/main/resources/reference.conf | 11 +++++++++ .../control/ControlConnectionTestBase.java | 14 +++++++++++ 8 files changed, 64 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java index 55e8d53dc66..b218c088a9c 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java @@ -668,6 +668,15 @@ public enum DefaultDriverOption implements DriverOption { */ CONTROL_CONNECTION_AGREEMENT_WARN("advanced.control-connection.schema-agreement.warn-on-failure"), + /** + * Whether to forcibly add original contact points held by MetadataManager to the reconnection + * plan, in case there is no live nodes available according to LBP. Experimental. + * + *

Value-type: boolean + */ + CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS( + "advanced.control-connection.reconnection.fallback-to-original-contacts"), + /** * Whether `Session.prepare` calls should be sent to all nodes in the cluster. * diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java index 8906e1dd349..53e5f4caa6f 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java @@ -360,6 +360,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) { map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_INTERVAL, Duration.ofMillis(200)); map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT, Duration.ofSeconds(10)); map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, true); + map.put(TypedDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, false); map.put(TypedDriverOption.PREPARE_ON_ALL_NODES, true); map.put(TypedDriverOption.REPREPARE_ENABLED, true); map.put(TypedDriverOption.REPREPARE_CHECK_SYSTEM_TABLE, false); diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java index 9be69d0424f..64f4bd5a224 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java @@ -566,6 +566,10 @@ public String toString() { public static final TypedDriverOption CONTROL_CONNECTION_AGREEMENT_WARN = new TypedDriverOption<>( DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, GenericType.BOOLEAN); + /** Whether to forcibly try original contacts if no live nodes are available */ + public static final TypedDriverOption CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS = + new TypedDriverOption<>( + DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, GenericType.BOOLEAN); /** Whether `Session.prepare` calls should be sent to all nodes in the cluster. */ public static final TypedDriverOption PREPARE_ON_ALL_NODES = new TypedDriverOption<>(DefaultDriverOption.PREPARE_ON_ALL_NODES, GenericType.BOOLEAN); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java index 7e9592c64d3..460564c69df 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java @@ -336,7 +336,7 @@ private void init( private CompletionStage reconnect() { assert adminExecutor.inEventLoop(); - Queue nodes = context.getLoadBalancingPolicyWrapper().newQueryPlan(); + Queue nodes = context.getLoadBalancingPolicyWrapper().newControlReconnectionQueryPlan(); CompletableFuture result = new CompletableFuture<>(); connect( nodes, diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java index 20d045d4e72..cd7fbba2cfc 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java @@ -17,6 +17,7 @@ */ package com.datastax.oss.driver.internal.core.metadata; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance; @@ -165,6 +166,28 @@ public Queue newQueryPlan() { return newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null); } + @NonNull + public Queue newControlReconnectionQueryPlan() { + // First try the original way + Queue regularQueryPlan = newQueryPlan(); + if (!regularQueryPlan.isEmpty()) return regularQueryPlan; + + if (context + .getConfig() + .getDefaultProfile() + .getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS)) { + Set originalNodes = context.getMetadataManager().getContactPoints(); + List nodes = new ArrayList<>(); + for (DefaultNode node : originalNodes) { + nodes.add(new DefaultNode(node.getEndPoint(), context)); + } + Collections.shuffle(nodes); + return new ConcurrentLinkedQueue<>(nodes); + } else { + return regularQueryPlan; + } + } + // when it comes in from the outside private void onNodeStateEvent(NodeStateEvent event) { eventFilter.accept(event); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementChecker.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementChecker.java index c5935dba4bb..10970b99363 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementChecker.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementChecker.java @@ -174,7 +174,7 @@ private void completeOrReschedule(Set uuids, Throwable error) { f -> { if (!f.isSuccess()) { LOG.debug( - "[{}] Error while rescheduling schema agreement, completing now (false)", + "[{}] listesner Error while rescheduling schema agreement, completing now (false)", logPrefix, f.cause()); } diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 75bed97e498..c1482f55ad9 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -2113,6 +2113,17 @@ datastax-java-driver { # Overridable in a profile: no warn-on-failure = true } + + reconnection { + # Whether to forcibly add original contact points held by MetadataManager to the reconnection plan, + # in case there is no live nodes available according to LBP. + # Experimental. + # + # Required: yes + # Modifiable at runtime: yes, the new value will be used for checks issued after the change. + # Overridable in a profile: no + fallback-to-original-contacts = false + } } advanced.prepared-statements { diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java index c52199465a8..9deaca25308 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java @@ -132,6 +132,11 @@ public void setup() { when(defaultProfile.getBoolean(DefaultDriverOption.CONNECTION_WARN_INIT_ERROR)) .thenReturn(false); + when(context.getConfig()).thenReturn(config); + when(config.getDefaultProfile()).thenReturn(defaultProfile); + when(defaultProfile.getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS)) + .thenReturn(false); + controlConnection = new ControlConnection(context); } @@ -145,6 +150,15 @@ protected void mockQueryPlan(Node... nodes) { } return queryPlan; }); + when(loadBalancingPolicyWrapper.newControlReconnectionQueryPlan()) + .thenAnswer( + i -> { + ConcurrentLinkedQueue queryPlan = new ConcurrentLinkedQueue<>(); + for (Node node : nodes) { + queryPlan.offer(node); + } + return queryPlan; + }); } @After