Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry create/update on workgroup conflict exception #50

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion aws-redshiftserverless-workgroup/.rpdk-config
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@
],
"codegen_template_path": "guided_aws",
"protocolVersion": "2.0.0"
}
},
"executableEntrypoint": "software.amazon.redshiftserverless.workgroup.HandlerWrapperExecutable"
}
6 changes: 6 additions & 0 deletions aws-redshiftserverless-workgroup/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@
<version>5.5.0-M1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>5.5.0-M1</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.mockito/mockito-core -->
<dependency>
<groupId>org.mockito</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package software.amazon.redshiftserverless.workgroup;

import software.amazon.awssdk.services.redshiftserverless.RedshiftServerlessClient;
import software.amazon.awssdk.services.redshiftserverless.model.ConflictException;
import software.amazon.awssdk.services.redshiftserverless.model.DeleteWorkgroupRequest;
import software.amazon.awssdk.services.redshiftserverless.model.GetWorkgroupRequest;
import software.amazon.awssdk.services.redshiftserverless.model.RedshiftServerlessResponse;
Expand All @@ -19,6 +20,13 @@

public abstract class BaseHandlerStd extends BaseHandler<CallbackContext> {

public static final String BUSY_WORKGROUP_RETRY_EXCEPTION_MESSAGE =
"There is an operation running on the existing workgroup";

protected static boolean isRetriableWorkgroupException(ConflictException exception) {
return exception.getMessage().contains(BUSY_WORKGROUP_RETRY_EXCEPTION_MESSAGE);
}

protected static final Constant BACKOFF_STRATEGY = Constant.of()
.timeout(Duration.ofMinutes(30L))
.delay(Duration.ofSeconds(5L))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
@lombok.ToString
@lombok.EqualsAndHashCode(callSuper = true)
public class CallbackContext extends StdCallbackContext {
int retryOnResourceNotFound = 5;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,40 @@ protected ProgressEvent<ResourceModel, CallbackContext> handleRequest(
.handleError(this::createWorkgroupErrorHandler)
.progress()
)

.then(progress -> new ReadHandler().handleRequest(proxy, request, callbackContext, proxyClient, logger));
}

private CreateWorkgroupResponse createWorkgroup(final CreateWorkgroupRequest awsRequest,
final ProxyClient<RedshiftServerlessClient> proxyClient) {
CreateWorkgroupResponse awsResponse;
awsResponse = proxyClient.injectCredentialsAndInvokeV2(awsRequest, proxyClient.client()::createWorkgroup);
final int MAX_RETRIES = 4;
int retryCount = 0;

while (true) {
try {
CreateWorkgroupResponse awsResponse =
proxyClient.injectCredentialsAndInvokeV2(awsRequest, proxyClient.client()::createWorkgroup);

logger.log(String.format("%s has successfully been created.", ResourceModel.TYPE_NAME));

return awsResponse;

logger.log(String.format("%s successfully created.", ResourceModel.TYPE_NAME));
return awsResponse;
} catch (ConflictException ex) {
if (retryCount >= MAX_RETRIES || !isRetriableWorkgroupException(ex)) {
throw ex;
}

logger.log(String.format("Retrying CreateWorkgroup due to expected ConflictException: " +
"%s. Attempt %d/%d", ex.getMessage(), retryCount + 1, MAX_RETRIES));
retryCount++;
}

try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // Restore the interrupted status
throw new RuntimeException("Interrupted during retry wait", ie);
}
}
}

private ProgressEvent<ResourceModel, CallbackContext> createWorkgroupErrorHandler(final CreateWorkgroupRequest awsRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@
import software.amazon.awssdk.services.redshiftserverless.model.InternalServerException;
import software.amazon.awssdk.services.redshiftserverless.model.ResourceNotFoundException;
import software.amazon.awssdk.services.redshiftserverless.model.ValidationException;
import software.amazon.awssdk.services.redshiftserverless.model.WorkgroupStatus;
import software.amazon.cloudformation.proxy.AmazonWebServicesClientProxy;
import software.amazon.cloudformation.proxy.HandlerErrorCode;
import software.amazon.cloudformation.proxy.Logger;
import software.amazon.cloudformation.proxy.OperationStatus;
import software.amazon.cloudformation.proxy.ProgressEvent;
import software.amazon.cloudformation.proxy.ProxyClient;
import software.amazon.cloudformation.proxy.ResourceHandlerRequest;

import java.util.Arrays;
import java.util.List;

import static software.amazon.cloudformation.proxy.ProgressEvent.progress;

public class ReadHandler extends BaseHandlerStd {
private Logger logger;

Expand All @@ -29,16 +36,45 @@ protected ProgressEvent<ResourceModel, CallbackContext> handleRequest(
.translateToServiceRequest(Translator::translateToReadRequest)
.makeServiceCall(this::readWorkgroup)
.handleError(this::readWorkgroupErrorHandler)
.done(awsResponse -> ProgressEvent.defaultSuccessHandler(Translator.translateFromReadResponse(awsResponse)));
.done(awsResponse -> getProgressEventFromReadWorkgroupResponse(awsResponse, callbackContext));
}

private GetWorkgroupResponse readWorkgroup(final GetWorkgroupRequest awsRequest,
final ProxyClient<RedshiftServerlessClient> proxyClient) {
GetWorkgroupResponse awsResponse;
awsResponse = proxyClient.injectCredentialsAndInvokeV2(awsRequest, proxyClient.client()::getWorkgroup);
awsResponse = proxyClient.injectCredentialsAndInvokeV2(awsRequest, proxyClient.client()::getWorkgroup);

logger.log(String.format("%s has yo successfully been read.", ResourceModel.TYPE_NAME));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, let me revise that

return awsResponse;
}

logger.log(String.format("%s has successfully been read.", ResourceModel.TYPE_NAME));
return awsResponse;
/**
* We used to return operationStatus.SUCCESS for all workgroup statuses,
* including creating, deleting, modifying.
*
* When CFN contract test checks if the resource has been deleted by calling ReadHandler,
* and when the workgroup == DELETING, we should have returned in_progress to indicate the
* deletion is not finished, and contract test can't start creating the same resource again.
*
* Same scenario would cause customer issues too.
*
* @param getWorkgroupResponse
* @param ctx
* @return
*/
private static ProgressEvent<ResourceModel, CallbackContext> getProgressEventFromReadWorkgroupResponse(GetWorkgroupResponse getWorkgroupResponse,
CallbackContext ctx) {
ResourceModel workgroupModel = Translator.translateFromReadResponse(getWorkgroupResponse);
List<WorkgroupStatus> inProgressWorkgroupStatuses = Arrays.asList(
WorkgroupStatus.CREATING,
WorkgroupStatus.DELETING,
WorkgroupStatus.MODIFYING
);
boolean isInProgress = inProgressWorkgroupStatuses.contains(getWorkgroupResponse.workgroup().status());

ProgressEvent<ResourceModel, CallbackContext> progressEvent = progress(workgroupModel, ctx);
progressEvent.setStatus(isInProgress ? OperationStatus.IN_PROGRESS : OperationStatus.SUCCESS);
return progressEvent;
}

private ProgressEvent<ResourceModel, CallbackContext> readWorkgroupErrorHandler(final GetWorkgroupRequest awsRequest,
Expand All @@ -48,13 +84,10 @@ private ProgressEvent<ResourceModel, CallbackContext> readWorkgroupErrorHandler(
final CallbackContext context) {
if (exception instanceof ResourceNotFoundException) {
return ProgressEvent.defaultFailureHandler(exception, HandlerErrorCode.NotFound);

} else if (exception instanceof ValidationException) {
return ProgressEvent.defaultFailureHandler(exception, HandlerErrorCode.InvalidRequest);

} else if (exception instanceof InternalServerException) {
return ProgressEvent.defaultFailureHandler(exception, HandlerErrorCode.InternalFailure);

} else {
return ProgressEvent.defaultFailureHandler(exception, HandlerErrorCode.GeneralServiceException);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,35 @@ private ProgressEvent<ResourceModel, CallbackContext> operateTagsErrorHandler(fi

private UpdateWorkgroupResponse updateWorkgroup(final UpdateWorkgroupRequest awsRequest,
final ProxyClient<RedshiftServerlessClient> proxyClient) {
UpdateWorkgroupResponse awsResponse;
awsResponse = proxyClient.injectCredentialsAndInvokeV2(awsRequest, proxyClient.client()::updateWorkgroup);
final int MAX_RETRIES = 4;
int retryCount = 0;

while (true) {
try {
UpdateWorkgroupResponse awsResponse =
proxyClient.injectCredentialsAndInvokeV2(awsRequest, proxyClient.client()::updateWorkgroup);

logger.log(String.format("%s has successfully been updated.", ResourceModel.TYPE_NAME));

return awsResponse;
} catch (ConflictException ex) {
if (retryCount >= MAX_RETRIES || !isRetriableWorkgroupException(ex)) {
throw ex;
}

logger.log(String.format("Retrying UpdateWorkgroup due to expected ConflictException: " +
"%s. Attempt %d/%d", ex.getMessage(), retryCount + 1, MAX_RETRIES));
retryCount++;
}

try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // Restore the interrupted status
throw new RuntimeException("Interrupted during retry wait", ie);
}
}

logger.log(String.format("%s has successfully been updated.", ResourceModel.TYPE_NAME));
return awsResponse;
}

private ProgressEvent<ResourceModel, CallbackContext> updateWorkgroupErrorHandler(final UpdateWorkgroupRequest awsRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.services.redshiftserverless.RedshiftServerlessClient;
import software.amazon.awssdk.services.redshiftserverless.model.ConflictException;
import software.amazon.awssdk.services.redshiftserverless.model.CreateWorkgroupRequest;
import software.amazon.awssdk.services.redshiftserverless.model.GetWorkgroupRequest;
import software.amazon.awssdk.services.redshiftserverless.model.InternalServerException;
import software.amazon.cloudformation.proxy.AmazonWebServicesClientProxy;
import software.amazon.cloudformation.proxy.HandlerErrorCode;
import software.amazon.cloudformation.proxy.OperationStatus;
import software.amazon.cloudformation.proxy.ProgressEvent;
import software.amazon.cloudformation.proxy.ProxyClient;
Expand Down Expand Up @@ -72,4 +75,69 @@ public void handleRequest_SimpleSuccess() {
assertThat(response.getMessage()).isNull();
assertThat(response.getErrorCode()).isNull();
}

@Test
public void handleRequest_retryOnConflictException() {
final CreateHandler handler = new CreateHandler();

final ResourceModel requestResourceModel = createRequestResourceModel();
final ResourceModel responseResourceModel = getReadResponseResourceModel();

final ResourceHandlerRequest<ResourceModel> request = ResourceHandlerRequest.<ResourceModel>builder()
.desiredResourceState(requestResourceModel)
.build();

ConflictException exception = ConflictException.builder()
.message("There is an operation running on the existing workgroup. Try again later.")
.build();

/**
* The Thread.sleep() is actually called here, making unit longer.
* MockStatic easily will need a Java version upgrade, then the Mockito upgrade,
* I'll leave the upgrade in another commit.
*/
when(proxyClient.client().createWorkgroup(any(CreateWorkgroupRequest.class)))
.thenThrow(exception)
.thenReturn(createResponseSdk());
when(proxyClient.client().getWorkgroup(any(GetWorkgroupRequest.class))).thenReturn(getReadResponseSdk());

final ProgressEvent<ResourceModel, CallbackContext> response = handler.handleRequest(proxy, request, new CallbackContext(), proxyClient, logger);

assertThat(response).isNotNull();
assertThat(response.getStatus()).isEqualTo(OperationStatus.SUCCESS);
assertThat(response.getCallbackDelaySeconds()).isEqualTo(0);
assertThat(response.getResourceModel()).isEqualTo(responseResourceModel);
assertThat(response.getResourceModels()).isNull();
assertThat(response.getMessage()).isNull();
assertThat(response.getErrorCode()).isNull();
}

@Test
public void handleRequest_noRetryOnOtherException() throws InterruptedException {
final CreateHandler handler = new CreateHandler();
final ResourceModel requestResourceModel = createRequestResourceModel();
final ResourceHandlerRequest<ResourceModel> request = ResourceHandlerRequest.<ResourceModel>builder()
.desiredResourceState(requestResourceModel)
.build();

/**
* The Thread.sleep() is actually called here, making unit longer.
* MockStatic easily will need a Java version upgrade, then the Mockito upgrade,
* I'll leave the upgrade in another commit.
*/
when(proxyClient.client().createWorkgroup(any(CreateWorkgroupRequest.class)))
.thenThrow(InternalServerException.builder()
.message("test")
.build());

final ProgressEvent<ResourceModel, CallbackContext> response =
handler.handleRequest(proxy, request, new CallbackContext(), proxyClient, logger);

assertThat(response).isNotNull();
assertThat(response.getStatus()).isEqualTo(OperationStatus.FAILED);
assertThat(response.getCallbackDelaySeconds()).isEqualTo(0);
assertThat(response.getResourceModels()).isNull();
assertThat(response.getMessage()).isEqualTo("test");
assertThat(response.getErrorCode()).isEqualTo(HandlerErrorCode.InternalFailure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,30 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.services.redshiftserverless.RedshiftServerlessClient;
import software.amazon.awssdk.services.redshiftserverless.model.GetNamespaceRequest;
import software.amazon.awssdk.services.redshiftserverless.model.GetWorkgroupRequest;
import software.amazon.awssdk.services.redshiftserverless.model.GetWorkgroupResponse;
import software.amazon.awssdk.services.redshiftserverless.model.WorkgroupStatus;
import software.amazon.cloudformation.exceptions.CfnGeneralServiceException;
import software.amazon.cloudformation.exceptions.CfnInvalidRequestException;
import software.amazon.cloudformation.proxy.AmazonWebServicesClientProxy;
import software.amazon.cloudformation.proxy.OperationStatus;
import software.amazon.cloudformation.proxy.ProgressEvent;
import software.amazon.cloudformation.proxy.ProxyClient;
import software.amazon.cloudformation.proxy.ResourceHandlerRequest;

import java.lang.reflect.Method;
import java.time.Duration;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -47,23 +58,41 @@ public void tear_down() {
verifyNoMoreInteractions(sdkClient);
}

@Test
public void handleRequest_SimpleSuccess() {
static Stream<Arguments> provideReadHandlerParams() {
return Stream.of(
Arguments.of(WorkgroupStatus.AVAILABLE, OperationStatus.SUCCESS),
Arguments.of(WorkgroupStatus.CREATING, OperationStatus.IN_PROGRESS),
Arguments.of(WorkgroupStatus.DELETING, OperationStatus.IN_PROGRESS),
Arguments.of(WorkgroupStatus.MODIFYING, OperationStatus.IN_PROGRESS),
Arguments.of(WorkgroupStatus.UNKNOWN_TO_SDK_VERSION, OperationStatus.SUCCESS)
);
}

@ParameterizedTest
@MethodSource("provideReadHandlerParams")
public void handleRequest_callReturns(
WorkgroupStatus returnedWgStatus,
OperationStatus expectedOperationStatus
) {
final ReadHandler handler = new ReadHandler();

final ResourceModel requestResourceModel = getReadRequestResourceModel();
final ResourceModel responseResourceModel = getReadResponseResourceModel();
GetWorkgroupResponse defaultResponse = getReadResponseSdk();
GetWorkgroupResponse testResponse = defaultResponse.toBuilder()
.workgroup(defaultResponse.workgroup().toBuilder().status(returnedWgStatus).build())
.build();

when(proxyClient.client().getWorkgroup(any(GetWorkgroupRequest.class))).thenReturn(testResponse);

final ResourceModel requestResourceModel = getReadRequestResourceModel();
final ResourceModel responseResourceModel = Translator.translateFromReadResponse(testResponse);
final ResourceHandlerRequest<ResourceModel> request = ResourceHandlerRequest.<ResourceModel>builder()
.desiredResourceState(requestResourceModel)
.build();

when(proxyClient.client().getWorkgroup(any(GetWorkgroupRequest.class))).thenReturn(getReadResponseSdk());

final ProgressEvent<ResourceModel, CallbackContext> response = handler.handleRequest(proxy, request, new CallbackContext(), proxyClient, logger);

assertThat(response).isNotNull();
assertThat(response.getStatus()).isEqualTo(OperationStatus.SUCCESS);
assertThat(response.getStatus()).isEqualTo(expectedOperationStatus);
assertThat(response.getCallbackDelaySeconds()).isEqualTo(0);
assertThat(response.getResourceModel()).isEqualTo(responseResourceModel);
assertThat(response.getResourceModels()).isNull();
Expand Down
Loading