From 03105f51442803279b20a9395adb97787b63b5af Mon Sep 17 00:00:00 2001 From: Chris Richardson Date: Mon, 5 Sep 2022 10:31:22 +0900 Subject: [PATCH] #181 Implement message table 'sharding' - enable configuration of MessageProducerJdbcImpl to write to sharded outbox table. --- build.gradle | 2 +- .../build.gradle | 4 + .../test/resources/custom-db-mysql-schema.sql | 1 + .../build.gradle | 17 ++++ .../AbstractTramIntegrationTest.java | 6 ++ .../EventuateCdcContainer.java | 19 +++++ .../MultipleOutboxTramIntegrationTest.java | 77 +++++++++++++++++++ .../src/test/resources/application.properties | 3 +- .../test/resources/custom-db-mysql-schema.sql | 1 + .../eventuate-tram-embedded-schema.sql | 1 + .../producer/common/MessageProducerImpl.java | 3 + .../eventuate-tram-embedded-schema.sql | 1 + .../TramMessageProducerJdbcConfiguration.java | 8 ++ gradle.properties | 9 ++- 14 files changed, 147 insertions(+), 5 deletions(-) create mode 100644 eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/EventuateCdcContainer.java create mode 100644 eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/MultipleOutboxTramIntegrationTest.java diff --git a/build.gradle b/build.gradle index 7e9e639c..942e79cb 100644 --- a/build.gradle +++ b/build.gradle @@ -36,7 +36,7 @@ allprojects { if (!project.name.endsWith("-bom")) { - dependencies { + dependencies { testCompile "junit:junit:4.12" testCompile "org.mockito:mockito-core:2.23.4" diff --git a/eventuate-tram-commands-db-broker-integration-test/build.gradle b/eventuate-tram-commands-db-broker-integration-test/build.gradle index 4cd750f7..0e702ac2 100644 --- a/eventuate-tram-commands-db-broker-integration-test/build.gradle +++ b/eventuate-tram-commands-db-broker-integration-test/build.gradle @@ -6,7 +6,11 @@ dependencies { compile project(":eventuate-tram-spring-commands") testCompile "io.eventuate.cdc:eventuate-local-java-test-util:$eventuateCdcVersion" + testCompile "org.springframework.boot:spring-boot-starter-test:$springBootCdcVersion" + + testCompile "io.netty:netty-resolver-dns:4.1.79.Final" + } test { diff --git a/eventuate-tram-commands-db-broker-integration-test/src/test/resources/custom-db-mysql-schema.sql b/eventuate-tram-commands-db-broker-integration-test/src/test/resources/custom-db-mysql-schema.sql index 6b87a3dc..b9f24df6 100644 --- a/eventuate-tram-commands-db-broker-integration-test/src/test/resources/custom-db-mysql-schema.sql +++ b/eventuate-tram-commands-db-broker-integration-test/src/test/resources/custom-db-mysql-schema.sql @@ -13,6 +13,7 @@ CREATE TABLE message ( headers VARCHAR(1000) NOT NULL, payload VARCHAR(1000) NOT NULL, published SMALLINT DEFAULT 0, + message_partition SMALLINT, creation_time BIGINT ); diff --git a/eventuate-tram-db-broker-integration-test/build.gradle b/eventuate-tram-db-broker-integration-test/build.gradle index c161e282..9ebab000 100644 --- a/eventuate-tram-db-broker-integration-test/build.gradle +++ b/eventuate-tram-db-broker-integration-test/build.gradle @@ -3,9 +3,21 @@ dependencies { compile project(":eventuate-tram-integration-test-common") compile project(":eventuate-tram-spring-messaging") testCompile "org.springframework.boot:spring-boot-starter-test:$springBootCdcVersion" + testCompile "io.eventuate.cdc:eventuate-local-java-test-util:$eventuateCdcVersion" + testCompile "io.eventuate.util:eventuate-util-test:$eventuateUtilVersion" + testImplementation "io.eventuate.common:eventuate-common-testcontainers:$eventuateCommonVersion" + + + testImplementation "io.eventuate.messaging.kafka:eventuate-messaging-kafka-testcontainers:$eventuateMessagingKafkaVersion" + + testImplementation "org.testcontainers:testcontainers:$testContainersVersion" + testImplementation "org.assertj:assertj-core:$assertjVersion" + + testCompile "io.netty:netty-resolver-dns:4.1.79.Final" + } test { @@ -13,5 +25,10 @@ test { if (testCustomDBVariable != 'true') { exclude '**/TramIntegrationCustomDBTest**' } + + if (System.env['BROKER'] != null && System.env['BROKER'] != 'kafka') { + exclude '**/MultipleOutboxTramIntegrationTest**' + } + forkEvery 1 } diff --git a/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/AbstractTramIntegrationTest.java b/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/AbstractTramIntegrationTest.java index 4fa86eef..f4b1f956 100644 --- a/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/AbstractTramIntegrationTest.java +++ b/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/AbstractTramIntegrationTest.java @@ -41,8 +41,14 @@ public void shouldDoSomething() throws InterruptedException { messageProducer.send(destination, MessageBuilder.withPayload("\"Hello\"").build()); + preAssertCheck(); + assertTrue(String.format("Expected message. Subscriber %s for destination %s: ", subscriberId, destination), latch.await(60, TimeUnit.SECONDS)); } + protected void preAssertCheck() { + + } + } diff --git a/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/EventuateCdcContainer.java b/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/EventuateCdcContainer.java new file mode 100644 index 00000000..52dde76a --- /dev/null +++ b/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/EventuateCdcContainer.java @@ -0,0 +1,19 @@ +package io.eventuate.tram.broker.db.integrationtests; + +import io.eventuate.common.testcontainers.PropertyProvidingContainer; +import org.testcontainers.containers.GenericContainer; + +import java.util.function.BiConsumer; +import java.util.function.Supplier; + +public class EventuateCdcContainer extends GenericContainer implements PropertyProvidingContainer { + + public EventuateCdcContainer() { + super("eventuateio/eventuate-cdc-service:0.14.0-SNAPSHOT"); + } + + @Override + public void registerProperties(BiConsumer> registry) { + + } +} diff --git a/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/MultipleOutboxTramIntegrationTest.java b/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/MultipleOutboxTramIntegrationTest.java new file mode 100644 index 00000000..5f0a86af --- /dev/null +++ b/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/MultipleOutboxTramIntegrationTest.java @@ -0,0 +1,77 @@ +package io.eventuate.tram.broker.db.integrationtests; + +import io.eventuate.common.testcontainers.EventuateMySqlContainer; +import io.eventuate.common.testcontainers.EventuateZookeeperContainer; +import io.eventuate.common.testcontainers.PropertyProvidingContainer; +import io.eventuate.messaging.kafka.testcontainers.EventuateKafkaContainer; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.DockerHealthcheckWaitStrategy; + +import java.util.concurrent.TimeUnit; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = TramIntegrationTestConfiguration.class, properties = {"eventuate.tram.outbox.partitioning.outbox.tables=8", "eventuate.tram.outbox.partitioning.message.partitions=4"}) +public class MultipleOutboxTramIntegrationTest extends AbstractTramIntegrationTest { + + public static final int OUTBOX_TABLES = 8; + public static Network network = Network.newNetwork(); + public static EventuateMySqlContainer mysql = + new EventuateMySqlContainer() + .withNetwork(network) + .withNetworkAliases("mysql") + .withEnv("EVENTUATE_OUTBOX_TABLES", Integer.toString(OUTBOX_TABLES)) + .withReuse(true); + + + public static EventuateZookeeperContainer zookeeper = new EventuateZookeeperContainer().withReuse(true) + .withNetwork(network) + .withNetworkAliases("zookeeper"); + + public static EventuateKafkaContainer kafka = + new EventuateKafkaContainer("zookeeper:2181") + .waitingFor(new DockerHealthcheckWaitStrategy()) + .withNetwork(network) + .withNetworkAliases("kafka") + .withReuse(true); + + public static EventuateCdcContainer cdc = + new EventuateCdcContainer() + .withNetwork(network) + .withEnv("SPRING_DATASOURCE_URL", "jdbc:mysql://mysql/eventuate") + .withEnv("SPRING_DATASOURCE_USERNAME", "mysqluser") + .withEnv("SPRING_DATASOURCE_PASSWORD", "mysqlpw") + .withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "com.mysql.cj.jdbc.Driver") + .withEnv("EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING", "zookeeper:2181") + .withEnv("EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS", "kafka:29092") + .withEnv("EVENTUATELOCAL_CDC_DB_USER_NAME", "root") + .withEnv("EVENTUATELOCAL_CDC_DB_PASSWORD", "rootpassword") + .withEnv("EVENTUATELOCAL_CDC_READER_NAME", "MySqlReader") + .withEnv("EVENTUATE_OUTBOX_ID", "1") + .withEnv("EVENTUATELOCAL_CDC_MYSQL_BINLOG_CLIENT_UNIQUE_ID", "1234567890") + .withEnv("EVENTUATELOCAL_CDC_READ_OLD_DEBEZIUM_DB_OFFSET_STORAGE_TOPIC", "false") + .withEnv("SPRING_PROFILES_ACTIVE", "EventuatePolling") + .withEnv("EVENTUATE_CDC_OUTBOX_PARTITIONING_OUTBOX_TABLES", Integer.toString(OUTBOX_TABLES)) + + ; + + @DynamicPropertySource + static void registerMySqlProperties(DynamicPropertyRegistry registry) { + PropertyProvidingContainer.startAndProvideProperties(registry, mysql, zookeeper, kafka, cdc); + } + + + @Override + protected void preAssertCheck() { + try { + TimeUnit.SECONDS.sleep(15); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + System.out.println(cdc.getLogs()); + } +} diff --git a/eventuate-tram-db-broker-integration-test/src/test/resources/application.properties b/eventuate-tram-db-broker-integration-test/src/test/resources/application.properties index e685567f..4440c6f4 100644 --- a/eventuate-tram-db-broker-integration-test/src/test/resources/application.properties +++ b/eventuate-tram-db-broker-integration-test/src/test/resources/application.properties @@ -1,3 +1,5 @@ +logging.level.io.eventuate.messaging=DEBUG + spring.datasource.url=jdbc:mysql://${DOCKER_HOST_IP:localhost}/eventuate?useSSL=false spring.datasource.username=mysqluser spring.datasource.password=mysqlpw @@ -9,7 +11,6 @@ eventuatelocal.cdc.db.user.name=root eventuatelocal.cdc.db.password=rootpassword eventuatelocal.zookeeper.connection.string=${DOCKER_HOST_IP:localhost}:2181 -logging.level.io.eventuate.messaging=TRACE eventuate.local.kafka.consumer.properties.session.timeout.ms=15000 diff --git a/eventuate-tram-db-broker-integration-test/src/test/resources/custom-db-mysql-schema.sql b/eventuate-tram-db-broker-integration-test/src/test/resources/custom-db-mysql-schema.sql index 6b87a3dc..b9f24df6 100644 --- a/eventuate-tram-db-broker-integration-test/src/test/resources/custom-db-mysql-schema.sql +++ b/eventuate-tram-db-broker-integration-test/src/test/resources/custom-db-mysql-schema.sql @@ -13,6 +13,7 @@ CREATE TABLE message ( headers VARCHAR(1000) NOT NULL, payload VARCHAR(1000) NOT NULL, published SMALLINT DEFAULT 0, + message_partition SMALLINT, creation_time BIGINT ); diff --git a/eventuate-tram-in-memory/src/main/resources/eventuate-tram-embedded-schema.sql b/eventuate-tram-in-memory/src/main/resources/eventuate-tram-embedded-schema.sql index ca804bf5..cd4255af 100755 --- a/eventuate-tram-in-memory/src/main/resources/eventuate-tram-embedded-schema.sql +++ b/eventuate-tram-in-memory/src/main/resources/eventuate-tram-embedded-schema.sql @@ -7,6 +7,7 @@ CREATE TABLE eventuate.message ( DESTINATION VARCHAR(1000) NOT NULL, HEADERS VARCHAR(1000) NOT NULL, PAYLOAD VARCHAR(1000) NOT NULL, + MESSAGE_PARTITION SMALLINT, CREATION_TIME BIGINT ); diff --git a/eventuate-tram-messaging-producer-common/src/main/java/io/eventuate/tram/messaging/producer/common/MessageProducerImpl.java b/eventuate-tram-messaging-producer-common/src/main/java/io/eventuate/tram/messaging/producer/common/MessageProducerImpl.java index c73eb498..fcd9debe 100644 --- a/eventuate-tram-messaging-producer-common/src/main/java/io/eventuate/tram/messaging/producer/common/MessageProducerImpl.java +++ b/eventuate-tram-messaging-producer-common/src/main/java/io/eventuate/tram/messaging/producer/common/MessageProducerImpl.java @@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.UUID; public final class MessageProducerImpl implements MessageProducer { private Logger logger = LoggerFactory.getLogger(getClass()); @@ -41,6 +42,8 @@ protected void prepareMessageHeaders(String destination, Message message) { implementation.setMessageIdIfNecessary(message); message.getHeaders().put(Message.DESTINATION, channelMapping.transform(destination)); message.getHeaders().put(Message.DATE, HttpDateHeaderFormatUtil.nowAsHttpDateString()); + if (message.getHeaders().get(Message.PARTITION_ID) == null) + message.getHeaders().put(Message.PARTITION_ID, UUID.randomUUID().toString()); } protected void send(Message message) { diff --git a/eventuate-tram-reactive-in-memory/src/main/resources/eventuate-tram-embedded-schema.sql b/eventuate-tram-reactive-in-memory/src/main/resources/eventuate-tram-embedded-schema.sql index ca804bf5..cd4255af 100755 --- a/eventuate-tram-reactive-in-memory/src/main/resources/eventuate-tram-embedded-schema.sql +++ b/eventuate-tram-reactive-in-memory/src/main/resources/eventuate-tram-embedded-schema.sql @@ -7,6 +7,7 @@ CREATE TABLE eventuate.message ( DESTINATION VARCHAR(1000) NOT NULL, HEADERS VARCHAR(1000) NOT NULL, PAYLOAD VARCHAR(1000) NOT NULL, + MESSAGE_PARTITION SMALLINT, CREATION_TIME BIGINT ); diff --git a/eventuate-tram-spring-producer-jdbc/src/main/java/io/eventuate/tram/spring/messaging/producer/jdbc/TramMessageProducerJdbcConfiguration.java b/eventuate-tram-spring-producer-jdbc/src/main/java/io/eventuate/tram/spring/messaging/producer/jdbc/TramMessageProducerJdbcConfiguration.java index e14ce0ac..c6d09d98 100644 --- a/eventuate-tram-spring-producer-jdbc/src/main/java/io/eventuate/tram/spring/messaging/producer/jdbc/TramMessageProducerJdbcConfiguration.java +++ b/eventuate-tram-spring-producer-jdbc/src/main/java/io/eventuate/tram/spring/messaging/producer/jdbc/TramMessageProducerJdbcConfiguration.java @@ -3,12 +3,14 @@ import io.eventuate.common.id.IdGenerator; import io.eventuate.common.jdbc.EventuateCommonJdbcOperations; import io.eventuate.common.jdbc.EventuateSchema; +import io.eventuate.common.jdbc.OutboxPartitioningSpec; import io.eventuate.common.spring.id.IdGeneratorConfiguration; import io.eventuate.common.spring.jdbc.EventuateCommonJdbcOperationsConfiguration; import io.eventuate.common.spring.jdbc.sqldialect.SqlDialectConfiguration; import io.eventuate.tram.messaging.producer.common.MessageProducerImplementation; import io.eventuate.tram.messaging.producer.jdbc.MessageProducerJdbcImpl; import io.eventuate.tram.spring.messaging.producer.common.TramMessagingCommonProducerConfiguration; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -21,6 +23,12 @@ IdGeneratorConfiguration.class}) public class TramMessageProducerJdbcConfiguration { + @Bean + public OutboxPartitioningSpec outboxPartitioningSpec(@Value("${eventuate.tram.outbox.partitioning.outbox.tables:#{null}}") Integer outboxTables, + @Value("${eventuate.tram.outbox.partitioning.message.partitions:#{null}}")Integer outboxTablePartitions) { + return new OutboxPartitioningSpec(outboxTables, outboxTablePartitions); + } + @Bean @ConditionalOnMissingBean(MessageProducerImplementation.class) public MessageProducerImplementation messageProducerImplementation(EventuateCommonJdbcOperations eventuateCommonJdbcOperations, diff --git a/gradle.properties b/gradle.properties index 41b7a9bc..aac72cfd 100644 --- a/gradle.properties +++ b/gradle.properties @@ -5,8 +5,8 @@ eventuateMavenRepoUrl=file:///Users/cer/.m2/testdeploy,https://snapshots.reposit eventuatePluginsGradleVersion=0.11.0.BUILD-SNAPSHOT -springBootVersion=1.4.7.RELEASE -springBootCdcVersion=2.1.1.RELEASE +springBootVersion=2.6.11 +springBootCdcVersion=2.6.11 micronautVersion=2.4.1 micronautDataVersion=2.3.1 eventuateUtilVersion=0.15.0.BUILD-SNAPSHOT @@ -26,9 +26,12 @@ eventuateMessagingRabbitMQVersion=0.14.0.BUILD-SNAPSHOT eventuateMessagingRedisVersion=0.14.0.BUILD-SNAPSHOT eventuateCommonImageVersion=0.16.0.BUILD-SNAPSHOT -eventuateMessagingKafkaImageVersion=0.15.0.BUILD-SNAPSHOT +eventuateMessagingKafkaImageVersion=0.16.0.BUILD-SNAPSHOT eventuateCdcImageVersion=0.14.0.BUILD-SNAPSHOT +assertjVersion=3.23.1 +testContainersVersion=1.17.3 + reactorVersion=3.4.18 version=0.31.0-SNAPSHOT