Skip to content

Commit

Permalink
#181 Implement message table 'sharding' - enable configuration of Mes…
Browse files Browse the repository at this point in the history
…sageProducerJdbcImpl to write to sharded outbox table.
  • Loading branch information
cer committed Sep 5, 2022
1 parent 859c9da commit 03105f5
Show file tree
Hide file tree
Showing 14 changed files with 147 additions and 5 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ allprojects {

if (!project.name.endsWith("-bom")) {

dependencies {
dependencies {

testCompile "junit:junit:4.12"
testCompile "org.mockito:mockito-core:2.23.4"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ dependencies {
compile project(":eventuate-tram-spring-commands")

testCompile "io.eventuate.cdc:eventuate-local-java-test-util:$eventuateCdcVersion"

testCompile "org.springframework.boot:spring-boot-starter-test:$springBootCdcVersion"

testCompile "io.netty:netty-resolver-dns:4.1.79.Final"

}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ CREATE TABLE message (
headers VARCHAR(1000) NOT NULL,
payload VARCHAR(1000) NOT NULL,
published SMALLINT DEFAULT 0,
message_partition SMALLINT,
creation_time BIGINT
);

Expand Down
17 changes: 17 additions & 0 deletions eventuate-tram-db-broker-integration-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,32 @@ dependencies {
compile project(":eventuate-tram-integration-test-common")
compile project(":eventuate-tram-spring-messaging")
testCompile "org.springframework.boot:spring-boot-starter-test:$springBootCdcVersion"

testCompile "io.eventuate.cdc:eventuate-local-java-test-util:$eventuateCdcVersion"

testCompile "io.eventuate.util:eventuate-util-test:$eventuateUtilVersion"

testImplementation "io.eventuate.common:eventuate-common-testcontainers:$eventuateCommonVersion"


testImplementation "io.eventuate.messaging.kafka:eventuate-messaging-kafka-testcontainers:$eventuateMessagingKafkaVersion"

testImplementation "org.testcontainers:testcontainers:$testContainersVersion"
testImplementation "org.assertj:assertj-core:$assertjVersion"

testCompile "io.netty:netty-resolver-dns:4.1.79.Final"

}

test {
def testCustomDBVariable = System.env['TEST_CUSTOM_DB']
if (testCustomDBVariable != 'true') {
exclude '**/TramIntegrationCustomDBTest**'
}

if (System.env['BROKER'] != null && System.env['BROKER'] != 'kafka') {
exclude '**/MultipleOutboxTramIntegrationTest**'
}

forkEvery 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,14 @@ public void shouldDoSomething() throws InterruptedException {

messageProducer.send(destination, MessageBuilder.withPayload("\"Hello\"").build());

preAssertCheck();

assertTrue(String.format("Expected message. Subscriber %s for destination %s: ", subscriberId, destination), latch.await(60, TimeUnit.SECONDS));

}

protected void preAssertCheck() {

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.eventuate.tram.broker.db.integrationtests;

import io.eventuate.common.testcontainers.PropertyProvidingContainer;
import org.testcontainers.containers.GenericContainer;

import java.util.function.BiConsumer;
import java.util.function.Supplier;

public class EventuateCdcContainer extends GenericContainer<EventuateCdcContainer> implements PropertyProvidingContainer {

public EventuateCdcContainer() {
super("eventuateio/eventuate-cdc-service:0.14.0-SNAPSHOT");
}

@Override
public void registerProperties(BiConsumer<String, Supplier<Object>> registry) {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.eventuate.tram.broker.db.integrationtests;

import io.eventuate.common.testcontainers.EventuateMySqlContainer;
import io.eventuate.common.testcontainers.EventuateZookeeperContainer;
import io.eventuate.common.testcontainers.PropertyProvidingContainer;
import io.eventuate.messaging.kafka.testcontainers.EventuateKafkaContainer;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.DockerHealthcheckWaitStrategy;

import java.util.concurrent.TimeUnit;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = TramIntegrationTestConfiguration.class, properties = {"eventuate.tram.outbox.partitioning.outbox.tables=8", "eventuate.tram.outbox.partitioning.message.partitions=4"})
public class MultipleOutboxTramIntegrationTest extends AbstractTramIntegrationTest {

public static final int OUTBOX_TABLES = 8;
public static Network network = Network.newNetwork();
public static EventuateMySqlContainer mysql =
new EventuateMySqlContainer()
.withNetwork(network)
.withNetworkAliases("mysql")
.withEnv("EVENTUATE_OUTBOX_TABLES", Integer.toString(OUTBOX_TABLES))
.withReuse(true);


public static EventuateZookeeperContainer zookeeper = new EventuateZookeeperContainer().withReuse(true)
.withNetwork(network)
.withNetworkAliases("zookeeper");

public static EventuateKafkaContainer kafka =
new EventuateKafkaContainer("zookeeper:2181")
.waitingFor(new DockerHealthcheckWaitStrategy())
.withNetwork(network)
.withNetworkAliases("kafka")
.withReuse(true);

public static EventuateCdcContainer cdc =
new EventuateCdcContainer()
.withNetwork(network)
.withEnv("SPRING_DATASOURCE_URL", "jdbc:mysql://mysql/eventuate")
.withEnv("SPRING_DATASOURCE_USERNAME", "mysqluser")
.withEnv("SPRING_DATASOURCE_PASSWORD", "mysqlpw")
.withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "com.mysql.cj.jdbc.Driver")
.withEnv("EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING", "zookeeper:2181")
.withEnv("EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS", "kafka:29092")
.withEnv("EVENTUATELOCAL_CDC_DB_USER_NAME", "root")
.withEnv("EVENTUATELOCAL_CDC_DB_PASSWORD", "rootpassword")
.withEnv("EVENTUATELOCAL_CDC_READER_NAME", "MySqlReader")
.withEnv("EVENTUATE_OUTBOX_ID", "1")
.withEnv("EVENTUATELOCAL_CDC_MYSQL_BINLOG_CLIENT_UNIQUE_ID", "1234567890")
.withEnv("EVENTUATELOCAL_CDC_READ_OLD_DEBEZIUM_DB_OFFSET_STORAGE_TOPIC", "false")
.withEnv("SPRING_PROFILES_ACTIVE", "EventuatePolling")
.withEnv("EVENTUATE_CDC_OUTBOX_PARTITIONING_OUTBOX_TABLES", Integer.toString(OUTBOX_TABLES))

;

@DynamicPropertySource
static void registerMySqlProperties(DynamicPropertyRegistry registry) {
PropertyProvidingContainer.startAndProvideProperties(registry, mysql, zookeeper, kafka, cdc);
}


@Override
protected void preAssertCheck() {
try {
TimeUnit.SECONDS.sleep(15);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(cdc.getLogs());
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
logging.level.io.eventuate.messaging=DEBUG

spring.datasource.url=jdbc:mysql://${DOCKER_HOST_IP:localhost}/eventuate?useSSL=false
spring.datasource.username=mysqluser
spring.datasource.password=mysqlpw
Expand All @@ -9,7 +11,6 @@ eventuatelocal.cdc.db.user.name=root
eventuatelocal.cdc.db.password=rootpassword
eventuatelocal.zookeeper.connection.string=${DOCKER_HOST_IP:localhost}:2181

logging.level.io.eventuate.messaging=TRACE

eventuate.local.kafka.consumer.properties.session.timeout.ms=15000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ CREATE TABLE message (
headers VARCHAR(1000) NOT NULL,
payload VARCHAR(1000) NOT NULL,
published SMALLINT DEFAULT 0,
message_partition SMALLINT,
creation_time BIGINT
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ CREATE TABLE eventuate.message (
DESTINATION VARCHAR(1000) NOT NULL,
HEADERS VARCHAR(1000) NOT NULL,
PAYLOAD VARCHAR(1000) NOT NULL,
MESSAGE_PARTITION SMALLINT,
CREATION_TIME BIGINT
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.UUID;

public final class MessageProducerImpl implements MessageProducer {
private Logger logger = LoggerFactory.getLogger(getClass());
Expand Down Expand Up @@ -41,6 +42,8 @@ protected void prepareMessageHeaders(String destination, Message message) {
implementation.setMessageIdIfNecessary(message);
message.getHeaders().put(Message.DESTINATION, channelMapping.transform(destination));
message.getHeaders().put(Message.DATE, HttpDateHeaderFormatUtil.nowAsHttpDateString());
if (message.getHeaders().get(Message.PARTITION_ID) == null)
message.getHeaders().put(Message.PARTITION_ID, UUID.randomUUID().toString());
}

protected void send(Message message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ CREATE TABLE eventuate.message (
DESTINATION VARCHAR(1000) NOT NULL,
HEADERS VARCHAR(1000) NOT NULL,
PAYLOAD VARCHAR(1000) NOT NULL,
MESSAGE_PARTITION SMALLINT,
CREATION_TIME BIGINT
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import io.eventuate.common.id.IdGenerator;
import io.eventuate.common.jdbc.EventuateCommonJdbcOperations;
import io.eventuate.common.jdbc.EventuateSchema;
import io.eventuate.common.jdbc.OutboxPartitioningSpec;
import io.eventuate.common.spring.id.IdGeneratorConfiguration;
import io.eventuate.common.spring.jdbc.EventuateCommonJdbcOperationsConfiguration;
import io.eventuate.common.spring.jdbc.sqldialect.SqlDialectConfiguration;
import io.eventuate.tram.messaging.producer.common.MessageProducerImplementation;
import io.eventuate.tram.messaging.producer.jdbc.MessageProducerJdbcImpl;
import io.eventuate.tram.spring.messaging.producer.common.TramMessagingCommonProducerConfiguration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -21,6 +23,12 @@
IdGeneratorConfiguration.class})
public class TramMessageProducerJdbcConfiguration {

@Bean
public OutboxPartitioningSpec outboxPartitioningSpec(@Value("${eventuate.tram.outbox.partitioning.outbox.tables:#{null}}") Integer outboxTables,
@Value("${eventuate.tram.outbox.partitioning.message.partitions:#{null}}")Integer outboxTablePartitions) {
return new OutboxPartitioningSpec(outboxTables, outboxTablePartitions);
}

@Bean
@ConditionalOnMissingBean(MessageProducerImplementation.class)
public MessageProducerImplementation messageProducerImplementation(EventuateCommonJdbcOperations eventuateCommonJdbcOperations,
Expand Down
9 changes: 6 additions & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ eventuateMavenRepoUrl=file:///Users/cer/.m2/testdeploy,https://snapshots.reposit

eventuatePluginsGradleVersion=0.11.0.BUILD-SNAPSHOT

springBootVersion=1.4.7.RELEASE
springBootCdcVersion=2.1.1.RELEASE
springBootVersion=2.6.11
springBootCdcVersion=2.6.11
micronautVersion=2.4.1
micronautDataVersion=2.3.1
eventuateUtilVersion=0.15.0.BUILD-SNAPSHOT
Expand All @@ -26,9 +26,12 @@ eventuateMessagingRabbitMQVersion=0.14.0.BUILD-SNAPSHOT
eventuateMessagingRedisVersion=0.14.0.BUILD-SNAPSHOT

eventuateCommonImageVersion=0.16.0.BUILD-SNAPSHOT
eventuateMessagingKafkaImageVersion=0.15.0.BUILD-SNAPSHOT
eventuateMessagingKafkaImageVersion=0.16.0.BUILD-SNAPSHOT
eventuateCdcImageVersion=0.14.0.BUILD-SNAPSHOT

assertjVersion=3.23.1
testContainersVersion=1.17.3

reactorVersion=3.4.18

version=0.31.0-SNAPSHOT

0 comments on commit 03105f5

Please sign in to comment.