From 23e5305253cdcbc5137a6af4e260bbbe5e807166 Mon Sep 17 00:00:00 2001 From: Paolo Patierno Date: Wed, 16 Oct 2024 17:07:01 +0200 Subject: [PATCH] Follow up on fixing stuck reconciliation on failed request rebalance (#10720) Signed-off-by: Paolo Patierno --- .../KafkaRebalanceAssemblyOperator.java | 21 ++- .../KafkaRebalanceAssemblyOperatorTest.java | 145 +++++++++++++----- 2 files changed, 125 insertions(+), 41 deletions(-) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java index 3ac85a09d75..7f6af890b38 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java @@ -733,7 +733,12 @@ private Future> onPendingProposal( if (rebalanceAnnotation(kafkaRebalance) == KafkaRebalanceAnnotation.refresh) { LOGGER.infoCr(reconciliation, "Requesting a new proposal since refresh annotation is applied on the KafkaRebalance resource"); - requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete); + requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder) + .onSuccess(p::complete) + .onFailure(e -> { + LOGGER.errorCr(reconciliation, "Requesting a new proposal with refresh annotation failed", e); + p.fail(e); + }); } else if (rebalanceAnnotation(kafkaRebalance) == KafkaRebalanceAnnotation.stop) { LOGGER.infoCr(reconciliation, "Stopping to request proposal or checking the status"); p.complete(buildRebalanceStatus(null, KafkaRebalanceState.Stopped, StatusUtils.validate(reconciliation, kafkaRebalance))); @@ -744,7 +749,12 @@ private Future> onPendingProposal( String sessionId = kafkaRebalance.getStatus().getSessionId(); if (sessionId == null) { // sessionId can be null if the response to the previously issued request for a proposal was NotEnoughDataForProposal. - requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete); + requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder) + .onSuccess(p::complete) + .onFailure(e -> { + LOGGER.errorCr(reconciliation, "Requesting a new proposal failed", e); + p.fail(e); + }); } else { apiClient.getUserTaskStatus(reconciliation, host, cruiseControlPort, sessionId) .onSuccess(cruiseControlResponse -> handleUserTaskStatusResponse(reconciliation, cruiseControlResponse, p, sessionId, conditions, kafkaRebalance, configMapOperator, true, host, apiClient, rebalanceOptionsBuilder)) @@ -915,7 +925,12 @@ private Future> onRebalancing(Reco LOGGER.infoCr(reconciliation, "Stopping current Cruise Control rebalance user task since refresh annotation is applied on the KafkaRebalance resource and requesting a new proposal"); apiClient.stopExecution(reconciliation, host, cruiseControlPort) .onSuccess(r -> { - requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete); + requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder) + .onSuccess(p::complete) + .onFailure(e -> { + LOGGER.errorCr(reconciliation, "Requesting a new proposal with refresh annotation failed", e); + p.fail(e); + }); }) .onFailure(e -> { LOGGER.errorCr(reconciliation, "Cruise Control stopping execution failed", e.getCause()); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperatorTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperatorTest.java index 9f89c3b52bc..df591c39652 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperatorTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperatorTest.java @@ -1133,25 +1133,127 @@ public void testNewToPendingProposalDeleteRemoveBroker(VertxTestContext context) this.krNewToPendingProposalDelete(context, CruiseControlEndpoints.REMOVE_BROKER, kr); } + private void krNewToPendingProposalDelete(VertxTestContext context, CruiseControlEndpoints endpoint, KafkaRebalance kr) throws IOException, URISyntaxException { + // Set up the rebalance endpoint with the number of pending calls before a response is received. + cruiseControlServer.setupCCRebalanceResponse(1, endpoint); + cruiseControlServer.setupCCUserTasksResponseNoGoals(1, 0); + + Crds.kafkaRebalanceOperation(client).inNamespace(namespace).resource(kr).create(); + crdCreateKafka(); + crdCreateCruiseControlSecrets(); + + Checkpoint checkpoint = context.checkpoint(); + krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())) + // the resource moved from 'New' to 'PendingProposal' (due to the configured Mock server pending calls) + .onComplete(context.succeeding(v -> + assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.PendingProposal))) + .compose(v -> { + // trigger another reconcile to process the PendingProposal state + return krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())); + }) + .onComplete(context.succeeding(v -> + assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.PendingProposal))) + .compose(v -> { + // trigger another reconcile to process the PendingProposal state + Crds.kafkaRebalanceOperation(client).inNamespace(namespace).withName(kr.getMetadata().getName()).delete(); + return krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())); + }) + .onComplete(context.succeeding(v -> context.verify(() -> { + // the resource should not exist anymore + KafkaRebalance currentKR = Crds.kafkaRebalanceOperation(client).inNamespace(namespace).withName(kr.getMetadata().getName()).get(); + assertThat(currentKR, is(nullValue())); + checkpoint.flag(); + }))); + } + /** * Tests the transition from 'Rebalancing' to 'NotReady' due to a user task not existing and an error - * on re-issuing the optimization proposal request + * on re-issuing the optimization proposal request (i.e. brokers don't exist during the "remove-broker" operation) * * 1. A new KafkaRebalance resource is created; it is in the Rebalancing state * 2. The operator requests the status of the corresponding user task through the Cruise Control REST API - * 3. The user task doesn't exist anymore and the KafkaRebalance resource moves to the 'NotReady' state + * 3. The user task doesn't exist anymore and a new optimization proposal request is sent + * 4. The response from Cruise Control has an error and the KafkaRebalance resource moves to the 'NotReady' state */ @Test public void testRebalancingToNotReadyRemoveBroker(VertxTestContext context) { cruiseControlServer.setupUserTasktoEmpty(); cruiseControlServer.setupCCBrokerDoesNotExist(CruiseControlEndpoints.REMOVE_BROKER); + this.krToNotReadyRemoveBrokers(context, "test-session-id", false, KafkaRebalanceState.Rebalancing); + } + + /** + * See the {@link KafkaRebalanceAssemblyOperatorTest#testRebalancingToNotReadyRemoveBroker} for description + * but assuming the PendingProposal as initial state for the KafkaRebalance custom resource + */ + @Test + public void testPendingProposalToNotReadyRemoveBroker(VertxTestContext context) { + cruiseControlServer.setupUserTasktoEmpty(); + cruiseControlServer.setupCCBrokerDoesNotExist(CruiseControlEndpoints.REMOVE_BROKER); + this.krToNotReadyRemoveBrokers(context, "test-session-id", false, KafkaRebalanceState.PendingProposal); + } - KafkaRebalance kr = new KafkaRebalanceBuilder(createKafkaRebalance(namespace, CLUSTER_NAME, RESOURCE_NAME, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, false)) + /** + * Tests the transition from 'PendingProposal' to 'NotReady' due to an error (i.e. brokers don't exist during the "remove-broker" operation) + * on re-issuing the optimization proposal request when applying the "strimzi.io/rebalance: refresh" annotation + * + * 1. A new KafkaRebalance resource is created; it is in the PendingProposal state + * 2. The "strimzi.io/rebalance: refresh" annotation is applied and an optimization proposal request is sent + * 3. The response from Cruise Control has an error and the KafkaRebalance resource moves to the 'NotReady' state + */ + @Test + @Timeout(value = 60, timeUnit = TimeUnit.MINUTES) + public void testPendingProposalToNotReadyRemoveBrokerWithRefresh(VertxTestContext context) { + cruiseControlServer.setupCCBrokerDoesNotExist(CruiseControlEndpoints.REMOVE_BROKER); + this.krToNotReadyRemoveBrokers(context, "test-session-id", true, KafkaRebalanceState.PendingProposal); + } + + /** + * Tests the transition from 'PendingProposal' to 'NotReady' due to an error (i.e. brokers don't exist during the "remove-broker" operation) + * on re-issuing the optimization proposal request because the user task session ID is empty and status cannot be checked + * + * 1. A new KafkaRebalance resource is created; it is in the PendingProposal state + * 2. Because of missing user task session ID, an optimization proposal request is sent again + * 3. The response from Cruise Control has an error and the KafkaRebalance resource moves to the 'NotReady' state + */ + @Test + @Timeout(value = 60, timeUnit = TimeUnit.MINUTES) + public void testPendingProposalToNotReadyRemoveBrokerWithNoSessionId(VertxTestContext context) { + cruiseControlServer.setupCCBrokerDoesNotExist(CruiseControlEndpoints.REMOVE_BROKER); + this.krToNotReadyRemoveBrokers(context, null, false, KafkaRebalanceState.PendingProposal); + } + + /** + * Tests the transition from 'Rebalancing' to 'NotReady' due to an error (i.e. brokers don't exist during the "remove-broker" operation) + * on re-issuing the optimization proposal request by applying the "strimzi.io/rebalance: refresh" annotation + * + * 1. A new KafkaRebalance resource is created; it is in the Rebalancing state + * 2. The "strimzi.io/rebalance: refresh" annotation is applied and an optimization proposal request is sent + * 3. The response from Cruise Control has an error and the KafkaRebalance resource moves to the 'NotReady' state + */ + @Test + @Timeout(value = 60, timeUnit = TimeUnit.MINUTES) + public void testRebalancingToNotReadyRemoveBrokerWithRefresh(VertxTestContext context) { + cruiseControlServer.setupCCStopResponse(); + cruiseControlServer.setupCCBrokerDoesNotExist(CruiseControlEndpoints.REMOVE_BROKER); + this.krToNotReadyRemoveBrokers(context, "test-session-id", true, KafkaRebalanceState.Rebalancing); + } + + private void krToNotReadyRemoveBrokers(VertxTestContext context, String sessionId, boolean refresh, KafkaRebalanceState initialState) { + KafkaRebalanceBuilder builder = + new KafkaRebalanceBuilder(createKafkaRebalance(namespace, CLUSTER_NAME, RESOURCE_NAME, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, false)); + + if (refresh) { + builder.editMetadata() + .addToAnnotations(Annotations.ANNO_STRIMZI_IO_REBALANCE, "refresh") + .endMetadata(); + } + KafkaRebalance kr = builder .withNewStatus() .withObservedGeneration(1L) - .withSessionId("test-session-id") + .withSessionId(sessionId) .withConditions(new ConditionBuilder() - .withType(KafkaRebalanceState.Rebalancing.name()) + .withType(initialState.name()) .withStatus("True") .build()) .endStatus() @@ -1175,39 +1277,6 @@ public void testRebalancingToNotReadyRemoveBroker(VertxTestContext context) { }))); } - private void krNewToPendingProposalDelete(VertxTestContext context, CruiseControlEndpoints endpoint, KafkaRebalance kr) throws IOException, URISyntaxException { - // Set up the rebalance endpoint with the number of pending calls before a response is received. - cruiseControlServer.setupCCRebalanceResponse(1, endpoint); - cruiseControlServer.setupCCUserTasksResponseNoGoals(1, 0); - - Crds.kafkaRebalanceOperation(client).inNamespace(namespace).resource(kr).create(); - crdCreateKafka(); - crdCreateCruiseControlSecrets(); - - Checkpoint checkpoint = context.checkpoint(); - krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())) - // the resource moved from 'New' to 'PendingProposal' (due to the configured Mock server pending calls) - .onComplete(context.succeeding(v -> - assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.PendingProposal))) - .compose(v -> { - // trigger another reconcile to process the PendingProposal state - return krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())); - }) - .onComplete(context.succeeding(v -> - assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.PendingProposal))) - .compose(v -> { - // trigger another reconcile to process the PendingProposal state - Crds.kafkaRebalanceOperation(client).inNamespace(namespace).withName(kr.getMetadata().getName()).delete(); - return krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())); - }) - .onComplete(context.succeeding(v -> context.verify(() -> { - // the resource should not exist anymore - KafkaRebalance currentKR = Crds.kafkaRebalanceOperation(client).inNamespace(namespace).withName(kr.getMetadata().getName()).get(); - assertThat(currentKR, is(nullValue())); - checkpoint.flag(); - }))); - } - /** * Tests the transition from 'New' to 'ProposalReady' * The rebalance proposal is approved and the resource moves to 'Rebalancing' then to 'Stopped' (via annotation)