Skip to content

Commit

Permalink
BFD-2842: colima compatibility and images in pom (#1885)
Browse files Browse the repository at this point in the history
  • Loading branch information
brianburton authored Aug 17, 2023
1 parent a4dc839 commit 374bc06
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package gov.cms.bfd.migrator.app;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import lombok.AllArgsConstructor;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
Expand Down Expand Up @@ -38,21 +37,13 @@ public String lookupQueueUrl(String queueName) {
* Sends a message to an SQS queue.
*
* @param queueUrl identifies the queue to post the message to
* @param messageGroupId identifies a specific sequence of messages within a FIFO queue
* @param messageId identifies a specific message within a sequence
* @param messageBody text of the message to send
* @throws QueueDoesNotExistException if queue does not exist
* @throws SqsException if the operation cannot be completed
*/
public void sendMessage(
String queueUrl, String messageGroupId, String messageId, String messageBody) {
public void sendMessage(String queueUrl, String messageBody) {
final var request =
SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageGroupId(messageGroupId)
.messageDeduplicationId(messageId)
.messageBody(messageBody)
.build();
SendMessageRequest.builder().queueUrl(queueUrl).messageBody(messageBody).build();
sqsClient.sendMessage(request);
}

Expand All @@ -64,12 +55,8 @@ public void sendMessage(
* @return URL of created queue
* @throws SqsException if the operation cannot be completed
*/
public String createFifoQueue(String queueName) {
final var createQueueRequest =
CreateQueueRequest.builder()
.attributes(Map.of(QueueAttributeName.FIFO_QUEUE, "true"))
.queueName(queueName)
.build();
public String createQueue(String queueName) {
final var createQueueRequest = CreateQueueRequest.builder().queueName(queueName).build();
final var response = sqsClient.createQueue(createQueueRequest);
return response.queueUrl();
}
Expand All @@ -89,4 +76,20 @@ public Optional<String> nextMessage(String queueUrl) {
List<Message> messages = sqsClient.receiveMessage(request).messages();
return messages.isEmpty() ? Optional.empty() : Optional.of(messages.get(0).body());
}

/**
* Read all currently available messages and pass them to the provided function.
*
* @param queueUrl identifies the queue to read from
* @param consumer a function to receive each message
* @throws QueueDoesNotExistException if queue does not exist
* @throws SqsException if the operation cannot be completed
*/
public void processAllMessages(String queueUrl, Consumer<String> consumer) {
for (Optional<String> message = nextMessage(queueUrl);
message.isPresent();
message = nextMessage(queueUrl)) {
consumer.accept(message.get());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gov.cms.bfd.migrator.app;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
Expand All @@ -11,6 +13,7 @@
import gov.cms.bfd.sharedutils.database.DatabaseMigrationProgress;
import gov.cms.bfd.sharedutils.exceptions.UncheckedIOException;
import java.io.IOException;
import java.util.Comparator;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
Expand All @@ -30,7 +33,7 @@ public class SqsProgressReporter {
* make it more compact. Sorting properties alphabetically can make tests more stable and make it
* easier to find particular fields in test samples.
*/
private final ObjectMapper objectMapper =
private static final ObjectMapper objectMapper =
JsonMapper.builder()
.enable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY)
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
Expand Down Expand Up @@ -86,10 +89,13 @@ public SqsProgressReporter(SqsDao sqsDao, String queueUrl, String messageGroupId
public void reportMigratorProgress(MigratorProgress progress) {
final long pid = getPid();
final var message =
new SqsProgressMessage(pid, progress.getStage(), progress.getMigrationProgress());
final var messageId = String.valueOf(nextMessageId.getAndIncrement());
new SqsProgressMessage(
pid,
nextMessageId.getAndIncrement(),
progress.getStage(),
progress.getMigrationProgress());
final var messageText = convertMessageToJson(message);
sqsDao.sendMessage(queueUrl, messageGroupId, messageId, messageText);
sqsDao.sendMessage(queueUrl, messageText);
}

/**
Expand All @@ -108,24 +114,67 @@ long getPid() {
* @param message object to convert into JSON
* @return converted JSON string
*/
private String convertMessageToJson(SqsProgressMessage message) {
static String convertMessageToJson(SqsProgressMessage message) {
try {
return objectMapper.writeValueAsString(message);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}

/**
* Does the conversion and wraps any checked exception in an unchecked one.
*
* @param jsonMessage JSON string representation of a {@link SqsProgressMessage}
* @return converted {@link SqsProgressMessage}
*/
static SqsProgressMessage convertJsonToMessage(String jsonMessage) {
try {
return objectMapper.readValue(jsonMessage, SqsProgressMessage.class);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}

/** Java object from which the JSON message is constructed. */
@Data
public static class SqsProgressMessage {
/** Used to sort messages in original send order when testing. */
static final Comparator<SqsProgressMessage> SORT_BY_IDS =
Comparator.comparingLong(SqsProgressMessage::getPid)
.thenComparingLong(SqsProgressMessage::getMessageId);

/** Our process id. */
private final long pid;

/** Unique id for this message (relative to pid). Can be used for sorting messages. */
private final long messageId;

/** Stage of app processing. */
private final MigratorProgress.Stage appStage;

/** Migration stage if appropriate. */
@Nullable private final DatabaseMigrationProgress migrationStage;

/**
* Initializes an instance. Has approperiate Jackson annotations to allow deserialization of
* JSON into an instance.
*
* @param pid the {@link #pid}
* @param messageId the {@link #messageId}
* @param appStage the {@link #appStage}
* @param migrationStage the {@link #migrationStage}
*/
@JsonCreator
public SqsProgressMessage(
@JsonProperty("pid") long pid,
@JsonProperty("messageId") long messageId,
@JsonProperty("appStage") MigratorProgress.Stage appStage,
@JsonProperty("migrationStage") @Nullable DatabaseMigrationProgress migrationStage) {
this.pid = pid;
this.messageId = messageId;
this.appStage = appStage;
this.migrationStage = migrationStage;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import gov.cms.bfd.DataSourceComponents;
import gov.cms.bfd.DatabaseTestUtils;
import gov.cms.bfd.ProcessOutputConsumer;
import gov.cms.bfd.migrator.app.SqsProgressReporter.SqsProgressMessage;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -22,11 +23,11 @@
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.zip.ZipEntry;
import javax.sql.DataSource;
import lombok.Getter;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.awaitility.core.ConditionTimeoutException;
Expand All @@ -42,12 +43,13 @@ public final class MigratorAppIT extends AbstractLocalStackTest {
private static final Logger LOGGER = LoggerFactory.getLogger(MigratorApp.class);

/** Name of SQS queue created in localstack to receive progress messages via SQS. */
private static final String SQS_QUEUE_NAME = "migrator-progress.fifo";
private static final String SQS_QUEUE_NAME = "migrator-progress";

/** Used to communicate with the localstack SQS service. */
private SqsDao sqsDao;

/** Enum for determining which flyway script directory to run a test against. */
@Getter
private enum TestDirectory {
/** Value that will not override the flyway script location, and use the real flyway scripts. */
REAL(""),
Expand All @@ -62,22 +64,13 @@ private enum TestDirectory {
private final String path;

/**
* Instantiates a new Test directory.
* Initializes an instance.
*
* @param path the path to the test files for this selection
*/
TestDirectory(String path) {
this.path = path;
}

/**
* Gets the path for this selection.
*
* @return the path override for the test files
*/
public String getPath() {
return path;
}
}

/** Cleans up the database before each test. */
Expand All @@ -96,7 +89,7 @@ public static void teardown() {
@BeforeEach
void createQueue() {
sqsDao = new SqsDao(SqsDaoIT.createSqsClientForLocalStack(localstack));
sqsDao.createFifoQueue(SQS_QUEUE_NAME);
sqsDao.createQueue(SQS_QUEUE_NAME);
}

/**
Expand Down Expand Up @@ -151,20 +144,13 @@ void testMigrationRunWhenNoErrorsAndAllFilesRunExpectExitCodeZeroAndSchemaMigrat
final var progressMessages = readProgressMessagesFromSQSQueue();
assertThat(progressMessages)
.first()
.asString()
.contains(MigratorProgress.Stage.Started.name());
assertThat(progressMessages)
.hasAtLeastOneElementOfType(String.class)
.asString()
.contains(MigratorProgress.Stage.Connected.name());
.matches(m -> m.getAppStage() == MigratorProgress.Stage.Started);
assertThat(progressMessages)
.hasAtLeastOneElementOfType(String.class)
.asString()
.contains(MigratorProgress.Stage.Migrating.name());
.anyMatch(m -> m.getAppStage() == MigratorProgress.Stage.Connected)
.anyMatch(m -> m.getAppStage() == MigratorProgress.Stage.Migrating);
assertThat(progressMessages)
.last()
.asString()
.contains(MigratorProgress.Stage.Finished.name());
.matches(m -> m.getAppStage() == MigratorProgress.Stage.Finished);

} catch (ConditionTimeoutException e) {
throw new RuntimeException(
Expand Down Expand Up @@ -424,14 +410,13 @@ private ProcessBuilder createAppProcessBuilder(TestDirectory testDirectory) {
*
* @return the list
*/
private List<String> readProgressMessagesFromSQSQueue() {
private List<SqsProgressMessage> readProgressMessagesFromSQSQueue() {
final var queueUrl = sqsDao.lookupQueueUrl(SQS_QUEUE_NAME);
var messages = new ArrayList<String>();
for (Optional<String> message = sqsDao.nextMessage(queueUrl);
message.isPresent();
message = sqsDao.nextMessage(queueUrl)) {
message.ifPresent(messages::add);
}
var messages = new ArrayList<SqsProgressMessage>();
sqsDao.processAllMessages(
queueUrl,
messageJson -> messages.add(SqsProgressReporter.convertJsonToMessage(messageJson)));
messages.sort(SqsProgressMessage.SORT_BY_IDS);
return messages;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import gov.cms.bfd.AbstractLocalStackTest;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.localstack.LocalStackContainer;
Expand All @@ -29,24 +31,27 @@ void setUp() {
/** Test creating a queue. */
@Test
void createQueue() {
String queueName = "my-created-queue.fifo";
String createdQueueUri = dao.createFifoQueue(queueName);
String queueName = "my-created-queue";
String createdQueueUri = dao.createQueue(queueName);
String lookupQueueUri = dao.lookupQueueUrl(queueName);
assertEquals(createdQueueUri, lookupQueueUri);
}

/** Test sending and receiving. */
@Test
void sendAndReceiveMessages() {
String queueName = "my-test-queue.fifo";
String queueUri = dao.createFifoQueue(queueName);
String messageGroupId = "sendAndReceiveMessages";
String queueName = "my-test-queue";
String queueUri = dao.createQueue(queueName);
String message1 = "this is a first message";
String message2 = "this is a second message";
dao.sendMessage(queueUri, messageGroupId, "1", message1);
dao.sendMessage(queueUri, messageGroupId, "2", message2);
assertEquals(Optional.of(message1), dao.nextMessage(queueUri));
assertEquals(Optional.of(message2), dao.nextMessage(queueUri));
dao.sendMessage(queueUri, message1);
dao.sendMessage(queueUri, message2);

// SQS does not guarantee messages will be received in order so we just collect all
// of them and compare to all that we sent.
Set<String> receivedMessages = new HashSet<>();
dao.processAllMessages(queueUri, receivedMessages::add);
assertEquals(Set.of(message1, message2), receivedMessages);
assertEquals(Optional.empty(), dao.nextMessage(queueUri));
}

Expand All @@ -55,8 +60,7 @@ void sendAndReceiveMessages() {
void variousNonExistentQueueScenarios() {
assertThatThrownBy(() -> dao.lookupQueueUrl("no-such-queue-exists"))
.isInstanceOf(QueueDoesNotExistException.class);
assertThatThrownBy(
() -> dao.sendMessage("no-such-queue-exists", "g1", "m1", "not gonna make it there"))
assertThatThrownBy(() -> dao.sendMessage("no-such-queue-exists", "not gonna make it there"))
.isInstanceOf(QueueDoesNotExistException.class);
assertThatThrownBy(() -> dao.nextMessage("no-such-queue-exists"))
.isInstanceOf(QueueDoesNotExistException.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,10 @@ void reportProgressAndVerifyMessageText() {
doReturn(5046L).when(reporter).getPid();
reporter.reportMigratorProgress(appProgress);
reporter.reportMigratorProgress(migratorProgress);
verify(sqsDao)
.sendMessage(queueUrl, messageGroupId, "1", "{\"appStage\":\"Started\",\"pid\":5046}");
verify(sqsDao).sendMessage(queueUrl, "{\"appStage\":\"Started\",\"messageId\":1,\"pid\":5046}");
verify(sqsDao)
.sendMessage(
queueUrl,
messageGroupId,
"2",
"{\"appStage\":\"Migrating\",\"migrationStage\":{\"migrationFile\":\"detail\",\"stage\":\"Completed\",\"version\":\"1\"},\"pid\":5046}");
"{\"appStage\":\"Migrating\",\"messageId\":2,\"migrationStage\":{\"stage\":\"Completed\",\"migrationFile\":\"detail\",\"version\":\"1\"},\"pid\":5046}");
}
}
Loading

0 comments on commit 374bc06

Please sign in to comment.