Skip to content

Commit

Permalink
Follow up on fixing stuck reconciliation on failed request rebalance (s…
Browse files Browse the repository at this point in the history
…trimzi#10720)

Signed-off-by: Paolo Patierno <[email protected]>
  • Loading branch information
ppatierno authored Oct 16, 2024
1 parent bb133e2 commit 23e5305
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,12 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onPendingProposal(

if (rebalanceAnnotation(kafkaRebalance) == KafkaRebalanceAnnotation.refresh) {
LOGGER.infoCr(reconciliation, "Requesting a new proposal since refresh annotation is applied on the KafkaRebalance resource");
requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete);
requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder)
.onSuccess(p::complete)
.onFailure(e -> {
LOGGER.errorCr(reconciliation, "Requesting a new proposal with refresh annotation failed", e);
p.fail(e);
});
} else if (rebalanceAnnotation(kafkaRebalance) == KafkaRebalanceAnnotation.stop) {
LOGGER.infoCr(reconciliation, "Stopping to request proposal or checking the status");
p.complete(buildRebalanceStatus(null, KafkaRebalanceState.Stopped, StatusUtils.validate(reconciliation, kafkaRebalance)));
Expand All @@ -744,7 +749,12 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onPendingProposal(
String sessionId = kafkaRebalance.getStatus().getSessionId();
if (sessionId == null) {
// sessionId can be null if the response to the previously issued request for a proposal was NotEnoughDataForProposal.
requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete);
requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder)
.onSuccess(p::complete)
.onFailure(e -> {
LOGGER.errorCr(reconciliation, "Requesting a new proposal failed", e);
p.fail(e);
});
} else {
apiClient.getUserTaskStatus(reconciliation, host, cruiseControlPort, sessionId)
.onSuccess(cruiseControlResponse -> handleUserTaskStatusResponse(reconciliation, cruiseControlResponse, p, sessionId, conditions, kafkaRebalance, configMapOperator, true, host, apiClient, rebalanceOptionsBuilder))
Expand Down Expand Up @@ -915,7 +925,12 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onRebalancing(Reco
LOGGER.infoCr(reconciliation, "Stopping current Cruise Control rebalance user task since refresh annotation is applied on the KafkaRebalance resource and requesting a new proposal");
apiClient.stopExecution(reconciliation, host, cruiseControlPort)
.onSuccess(r -> {
requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete);
requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder)
.onSuccess(p::complete)
.onFailure(e -> {
LOGGER.errorCr(reconciliation, "Requesting a new proposal with refresh annotation failed", e);
p.fail(e);
});
})
.onFailure(e -> {
LOGGER.errorCr(reconciliation, "Cruise Control stopping execution failed", e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1133,25 +1133,127 @@ public void testNewToPendingProposalDeleteRemoveBroker(VertxTestContext context)
this.krNewToPendingProposalDelete(context, CruiseControlEndpoints.REMOVE_BROKER, kr);
}

private void krNewToPendingProposalDelete(VertxTestContext context, CruiseControlEndpoints endpoint, KafkaRebalance kr) throws IOException, URISyntaxException {
// Set up the rebalance endpoint with the number of pending calls before a response is received.
cruiseControlServer.setupCCRebalanceResponse(1, endpoint);
cruiseControlServer.setupCCUserTasksResponseNoGoals(1, 0);

Crds.kafkaRebalanceOperation(client).inNamespace(namespace).resource(kr).create();
crdCreateKafka();
crdCreateCruiseControlSecrets();

Checkpoint checkpoint = context.checkpoint();
krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()))
// the resource moved from 'New' to 'PendingProposal' (due to the configured Mock server pending calls)
.onComplete(context.succeeding(v ->
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.PendingProposal)))
.compose(v -> {
// trigger another reconcile to process the PendingProposal state
return krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()));
})
.onComplete(context.succeeding(v ->
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.PendingProposal)))
.compose(v -> {
// trigger another reconcile to process the PendingProposal state
Crds.kafkaRebalanceOperation(client).inNamespace(namespace).withName(kr.getMetadata().getName()).delete();
return krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()));
})
.onComplete(context.succeeding(v -> context.verify(() -> {
// the resource should not exist anymore
KafkaRebalance currentKR = Crds.kafkaRebalanceOperation(client).inNamespace(namespace).withName(kr.getMetadata().getName()).get();
assertThat(currentKR, is(nullValue()));
checkpoint.flag();
})));
}

/**
* Tests the transition from 'Rebalancing' to 'NotReady' due to a user task not existing and an error
* on re-issuing the optimization proposal request
* on re-issuing the optimization proposal request (i.e. brokers don't exist during the "remove-broker" operation)
*
* 1. A new KafkaRebalance resource is created; it is in the Rebalancing state
* 2. The operator requests the status of the corresponding user task through the Cruise Control REST API
* 3. The user task doesn't exist anymore and the KafkaRebalance resource moves to the 'NotReady' state
* 3. The user task doesn't exist anymore and a new optimization proposal request is sent
* 4. The response from Cruise Control has an error and the KafkaRebalance resource moves to the 'NotReady' state
*/
@Test
public void testRebalancingToNotReadyRemoveBroker(VertxTestContext context) {
cruiseControlServer.setupUserTasktoEmpty();
cruiseControlServer.setupCCBrokerDoesNotExist(CruiseControlEndpoints.REMOVE_BROKER);
this.krToNotReadyRemoveBrokers(context, "test-session-id", false, KafkaRebalanceState.Rebalancing);
}

/**
* See the {@link KafkaRebalanceAssemblyOperatorTest#testRebalancingToNotReadyRemoveBroker} for description
* but assuming the PendingProposal as initial state for the KafkaRebalance custom resource
*/
@Test
public void testPendingProposalToNotReadyRemoveBroker(VertxTestContext context) {
cruiseControlServer.setupUserTasktoEmpty();
cruiseControlServer.setupCCBrokerDoesNotExist(CruiseControlEndpoints.REMOVE_BROKER);
this.krToNotReadyRemoveBrokers(context, "test-session-id", false, KafkaRebalanceState.PendingProposal);
}

KafkaRebalance kr = new KafkaRebalanceBuilder(createKafkaRebalance(namespace, CLUSTER_NAME, RESOURCE_NAME, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, false))
/**
* Tests the transition from 'PendingProposal' to 'NotReady' due to an error (i.e. brokers don't exist during the "remove-broker" operation)
* on re-issuing the optimization proposal request when applying the "strimzi.io/rebalance: refresh" annotation
*
* 1. A new KafkaRebalance resource is created; it is in the PendingProposal state
* 2. The "strimzi.io/rebalance: refresh" annotation is applied and an optimization proposal request is sent
* 3. The response from Cruise Control has an error and the KafkaRebalance resource moves to the 'NotReady' state
*/
@Test
@Timeout(value = 60, timeUnit = TimeUnit.MINUTES)
public void testPendingProposalToNotReadyRemoveBrokerWithRefresh(VertxTestContext context) {
cruiseControlServer.setupCCBrokerDoesNotExist(CruiseControlEndpoints.REMOVE_BROKER);
this.krToNotReadyRemoveBrokers(context, "test-session-id", true, KafkaRebalanceState.PendingProposal);
}

/**
* Tests the transition from 'PendingProposal' to 'NotReady' due to an error (i.e. brokers don't exist during the "remove-broker" operation)
* on re-issuing the optimization proposal request because the user task session ID is empty and status cannot be checked
*
* 1. A new KafkaRebalance resource is created; it is in the PendingProposal state
* 2. Because of missing user task session ID, an optimization proposal request is sent again
* 3. The response from Cruise Control has an error and the KafkaRebalance resource moves to the 'NotReady' state
*/
@Test
@Timeout(value = 60, timeUnit = TimeUnit.MINUTES)
public void testPendingProposalToNotReadyRemoveBrokerWithNoSessionId(VertxTestContext context) {
cruiseControlServer.setupCCBrokerDoesNotExist(CruiseControlEndpoints.REMOVE_BROKER);
this.krToNotReadyRemoveBrokers(context, null, false, KafkaRebalanceState.PendingProposal);
}

/**
* Tests the transition from 'Rebalancing' to 'NotReady' due to an error (i.e. brokers don't exist during the "remove-broker" operation)
* on re-issuing the optimization proposal request by applying the "strimzi.io/rebalance: refresh" annotation
*
* 1. A new KafkaRebalance resource is created; it is in the Rebalancing state
* 2. The "strimzi.io/rebalance: refresh" annotation is applied and an optimization proposal request is sent
* 3. The response from Cruise Control has an error and the KafkaRebalance resource moves to the 'NotReady' state
*/
@Test
@Timeout(value = 60, timeUnit = TimeUnit.MINUTES)
public void testRebalancingToNotReadyRemoveBrokerWithRefresh(VertxTestContext context) {
cruiseControlServer.setupCCStopResponse();
cruiseControlServer.setupCCBrokerDoesNotExist(CruiseControlEndpoints.REMOVE_BROKER);
this.krToNotReadyRemoveBrokers(context, "test-session-id", true, KafkaRebalanceState.Rebalancing);
}

private void krToNotReadyRemoveBrokers(VertxTestContext context, String sessionId, boolean refresh, KafkaRebalanceState initialState) {
KafkaRebalanceBuilder builder =
new KafkaRebalanceBuilder(createKafkaRebalance(namespace, CLUSTER_NAME, RESOURCE_NAME, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, false));

if (refresh) {
builder.editMetadata()
.addToAnnotations(Annotations.ANNO_STRIMZI_IO_REBALANCE, "refresh")
.endMetadata();
}
KafkaRebalance kr = builder
.withNewStatus()
.withObservedGeneration(1L)
.withSessionId("test-session-id")
.withSessionId(sessionId)
.withConditions(new ConditionBuilder()
.withType(KafkaRebalanceState.Rebalancing.name())
.withType(initialState.name())
.withStatus("True")
.build())
.endStatus()
Expand All @@ -1175,39 +1277,6 @@ public void testRebalancingToNotReadyRemoveBroker(VertxTestContext context) {
})));
}

private void krNewToPendingProposalDelete(VertxTestContext context, CruiseControlEndpoints endpoint, KafkaRebalance kr) throws IOException, URISyntaxException {
// Set up the rebalance endpoint with the number of pending calls before a response is received.
cruiseControlServer.setupCCRebalanceResponse(1, endpoint);
cruiseControlServer.setupCCUserTasksResponseNoGoals(1, 0);

Crds.kafkaRebalanceOperation(client).inNamespace(namespace).resource(kr).create();
crdCreateKafka();
crdCreateCruiseControlSecrets();

Checkpoint checkpoint = context.checkpoint();
krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()))
// the resource moved from 'New' to 'PendingProposal' (due to the configured Mock server pending calls)
.onComplete(context.succeeding(v ->
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.PendingProposal)))
.compose(v -> {
// trigger another reconcile to process the PendingProposal state
return krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()));
})
.onComplete(context.succeeding(v ->
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.PendingProposal)))
.compose(v -> {
// trigger another reconcile to process the PendingProposal state
Crds.kafkaRebalanceOperation(client).inNamespace(namespace).withName(kr.getMetadata().getName()).delete();
return krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()));
})
.onComplete(context.succeeding(v -> context.verify(() -> {
// the resource should not exist anymore
KafkaRebalance currentKR = Crds.kafkaRebalanceOperation(client).inNamespace(namespace).withName(kr.getMetadata().getName()).get();
assertThat(currentKR, is(nullValue()));
checkpoint.flag();
})));
}

/**
* Tests the transition from 'New' to 'ProposalReady'
* The rebalance proposal is approved and the resource moves to 'Rebalancing' then to 'Stopped' (via annotation)
Expand Down

0 comments on commit 23e5305

Please sign in to comment.