diff --git a/build.gradle b/build.gradle
index 7e9e639c..942e79cb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -36,7 +36,7 @@ allprojects {
 
     if (!project.name.endsWith("-bom")) {
 
-      dependencies {
+        dependencies {
 
           testCompile "junit:junit:4.12"
           testCompile "org.mockito:mockito-core:2.23.4"
diff --git a/eventuate-tram-commands-db-broker-integration-test/build.gradle b/eventuate-tram-commands-db-broker-integration-test/build.gradle
index 4cd750f7..0e702ac2 100644
--- a/eventuate-tram-commands-db-broker-integration-test/build.gradle
+++ b/eventuate-tram-commands-db-broker-integration-test/build.gradle
@@ -6,7 +6,11 @@ dependencies {
     compile project(":eventuate-tram-spring-commands")
 
     testCompile "io.eventuate.cdc:eventuate-local-java-test-util:$eventuateCdcVersion"
+
     testCompile "org.springframework.boot:spring-boot-starter-test:$springBootCdcVersion"
+
+    testCompile "io.netty:netty-resolver-dns:4.1.79.Final"
+
 }
 
 test {
diff --git a/eventuate-tram-commands-db-broker-integration-test/src/test/resources/custom-db-mysql-schema.sql b/eventuate-tram-commands-db-broker-integration-test/src/test/resources/custom-db-mysql-schema.sql
index 6b87a3dc..b9f24df6 100644
--- a/eventuate-tram-commands-db-broker-integration-test/src/test/resources/custom-db-mysql-schema.sql
+++ b/eventuate-tram-commands-db-broker-integration-test/src/test/resources/custom-db-mysql-schema.sql
@@ -13,6 +13,7 @@ CREATE TABLE message (
   headers VARCHAR(1000) NOT NULL,
   payload VARCHAR(1000) NOT NULL,
   published SMALLINT DEFAULT 0,
+  message_partition SMALLINT,
   creation_time BIGINT
 );
 
diff --git a/eventuate-tram-db-broker-integration-test/build.gradle b/eventuate-tram-db-broker-integration-test/build.gradle
index c161e282..9ebab000 100644
--- a/eventuate-tram-db-broker-integration-test/build.gradle
+++ b/eventuate-tram-db-broker-integration-test/build.gradle
@@ -3,9 +3,21 @@ dependencies {
     compile project(":eventuate-tram-integration-test-common")
     compile project(":eventuate-tram-spring-messaging")
     testCompile "org.springframework.boot:spring-boot-starter-test:$springBootCdcVersion"
+
     testCompile "io.eventuate.cdc:eventuate-local-java-test-util:$eventuateCdcVersion"
+
     testCompile "io.eventuate.util:eventuate-util-test:$eventuateUtilVersion"
 
+    testImplementation "io.eventuate.common:eventuate-common-testcontainers:$eventuateCommonVersion"
+
+
+    testImplementation "io.eventuate.messaging.kafka:eventuate-messaging-kafka-testcontainers:$eventuateMessagingKafkaVersion"
+
+    testImplementation "org.testcontainers:testcontainers:$testContainersVersion"
+    testImplementation "org.assertj:assertj-core:$assertjVersion"
+
+    testCompile "io.netty:netty-resolver-dns:4.1.79.Final"
+
 }
 
 test {
@@ -13,5 +25,10 @@ test {
     if (testCustomDBVariable != 'true') {
         exclude '**/TramIntegrationCustomDBTest**'
     }
+
+    if (System.env['BROKER'] != null && System.env['BROKER'] != 'kafka') {
+        exclude '**/MultipleOutboxTramIntegrationTest**'
+    }
+
     forkEvery 1
 }
diff --git a/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/AbstractTramIntegrationTest.java b/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/AbstractTramIntegrationTest.java
index 4fa86eef..f4b1f956 100644
--- a/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/AbstractTramIntegrationTest.java
+++ b/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/AbstractTramIntegrationTest.java
@@ -41,8 +41,14 @@ public void shouldDoSomething() throws InterruptedException {
 
     messageProducer.send(destination, MessageBuilder.withPayload("\"Hello\"").build());
 
+    preAssertCheck();
+
     assertTrue(String.format("Expected message. Subscriber %s for destination %s: ", subscriberId, destination), latch.await(60, TimeUnit.SECONDS));
 
   }
 
+  protected void preAssertCheck() {
+
+  }
+
 }
diff --git a/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/EventuateCdcContainer.java b/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/EventuateCdcContainer.java
new file mode 100644
index 00000000..52dde76a
--- /dev/null
+++ b/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/EventuateCdcContainer.java
@@ -0,0 +1,19 @@
+package io.eventuate.tram.broker.db.integrationtests;
+
+import io.eventuate.common.testcontainers.PropertyProvidingContainer;
+import org.testcontainers.containers.GenericContainer;
+
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+public class EventuateCdcContainer extends GenericContainer<EventuateCdcContainer> implements PropertyProvidingContainer  {
+
+    public EventuateCdcContainer() {
+        super("eventuateio/eventuate-cdc-service:0.14.0-SNAPSHOT");
+    }
+
+    @Override
+    public void registerProperties(BiConsumer<String, Supplier<Object>> registry) {
+
+    }
+}
diff --git a/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/MultipleOutboxTramIntegrationTest.java b/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/MultipleOutboxTramIntegrationTest.java
new file mode 100644
index 00000000..5f0a86af
--- /dev/null
+++ b/eventuate-tram-db-broker-integration-test/src/test/java/io/eventuate/tram/broker/db/integrationtests/MultipleOutboxTramIntegrationTest.java
@@ -0,0 +1,77 @@
+package io.eventuate.tram.broker.db.integrationtests;
+
+import io.eventuate.common.testcontainers.EventuateMySqlContainer;
+import io.eventuate.common.testcontainers.EventuateZookeeperContainer;
+import io.eventuate.common.testcontainers.PropertyProvidingContainer;
+import io.eventuate.messaging.kafka.testcontainers.EventuateKafkaContainer;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.DockerHealthcheckWaitStrategy;
+
+import java.util.concurrent.TimeUnit;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = TramIntegrationTestConfiguration.class, properties = {"eventuate.tram.outbox.partitioning.outbox.tables=8", "eventuate.tram.outbox.partitioning.message.partitions=4"})
+public class MultipleOutboxTramIntegrationTest extends AbstractTramIntegrationTest {
+
+    public static final int OUTBOX_TABLES = 8;
+    public static Network network = Network.newNetwork();
+    public static EventuateMySqlContainer mysql =
+            new EventuateMySqlContainer()
+                    .withNetwork(network)
+                    .withNetworkAliases("mysql")
+                    .withEnv("EVENTUATE_OUTBOX_TABLES", Integer.toString(OUTBOX_TABLES))
+                    .withReuse(true);
+
+
+    public static EventuateZookeeperContainer zookeeper = new EventuateZookeeperContainer().withReuse(true)
+            .withNetwork(network)
+            .withNetworkAliases("zookeeper");
+
+    public static EventuateKafkaContainer kafka =
+            new EventuateKafkaContainer("zookeeper:2181")
+                    .waitingFor(new DockerHealthcheckWaitStrategy())
+                    .withNetwork(network)
+                    .withNetworkAliases("kafka")
+                    .withReuse(true);
+
+    public static EventuateCdcContainer cdc =
+            new EventuateCdcContainer()
+                    .withNetwork(network)
+                    .withEnv("SPRING_DATASOURCE_URL", "jdbc:mysql://mysql/eventuate")
+                    .withEnv("SPRING_DATASOURCE_USERNAME", "mysqluser")
+                    .withEnv("SPRING_DATASOURCE_PASSWORD", "mysqlpw")
+                    .withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "com.mysql.cj.jdbc.Driver")
+                    .withEnv("EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING", "zookeeper:2181")
+                    .withEnv("EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS", "kafka:29092")
+                    .withEnv("EVENTUATELOCAL_CDC_DB_USER_NAME", "root")
+                    .withEnv("EVENTUATELOCAL_CDC_DB_PASSWORD", "rootpassword")
+                    .withEnv("EVENTUATELOCAL_CDC_READER_NAME", "MySqlReader")
+                    .withEnv("EVENTUATE_OUTBOX_ID", "1")
+                    .withEnv("EVENTUATELOCAL_CDC_MYSQL_BINLOG_CLIENT_UNIQUE_ID", "1234567890")
+                    .withEnv("EVENTUATELOCAL_CDC_READ_OLD_DEBEZIUM_DB_OFFSET_STORAGE_TOPIC", "false")
+                    .withEnv("SPRING_PROFILES_ACTIVE", "EventuatePolling")
+                    .withEnv("EVENTUATE_CDC_OUTBOX_PARTITIONING_OUTBOX_TABLES", Integer.toString(OUTBOX_TABLES))
+
+            ;
+
+    @DynamicPropertySource
+    static void registerMySqlProperties(DynamicPropertyRegistry registry) {
+        PropertyProvidingContainer.startAndProvideProperties(registry, mysql, zookeeper, kafka, cdc);
+    }
+
+
+    @Override
+    protected void preAssertCheck() {
+        try {
+            TimeUnit.SECONDS.sleep(15);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        System.out.println(cdc.getLogs());
+    }
+}
diff --git a/eventuate-tram-db-broker-integration-test/src/test/resources/application.properties b/eventuate-tram-db-broker-integration-test/src/test/resources/application.properties
index e685567f..4440c6f4 100644
--- a/eventuate-tram-db-broker-integration-test/src/test/resources/application.properties
+++ b/eventuate-tram-db-broker-integration-test/src/test/resources/application.properties
@@ -1,3 +1,5 @@
+logging.level.io.eventuate.messaging=DEBUG
+
 spring.datasource.url=jdbc:mysql://${DOCKER_HOST_IP:localhost}/eventuate?useSSL=false
 spring.datasource.username=mysqluser
 spring.datasource.password=mysqlpw
@@ -9,7 +11,6 @@ eventuatelocal.cdc.db.user.name=root
 eventuatelocal.cdc.db.password=rootpassword
 eventuatelocal.zookeeper.connection.string=${DOCKER_HOST_IP:localhost}:2181
 
-logging.level.io.eventuate.messaging=TRACE
 
 eventuate.local.kafka.consumer.properties.session.timeout.ms=15000
 
diff --git a/eventuate-tram-db-broker-integration-test/src/test/resources/custom-db-mysql-schema.sql b/eventuate-tram-db-broker-integration-test/src/test/resources/custom-db-mysql-schema.sql
index 6b87a3dc..b9f24df6 100644
--- a/eventuate-tram-db-broker-integration-test/src/test/resources/custom-db-mysql-schema.sql
+++ b/eventuate-tram-db-broker-integration-test/src/test/resources/custom-db-mysql-schema.sql
@@ -13,6 +13,7 @@ CREATE TABLE message (
   headers VARCHAR(1000) NOT NULL,
   payload VARCHAR(1000) NOT NULL,
   published SMALLINT DEFAULT 0,
+  message_partition SMALLINT,
   creation_time BIGINT
 );
 
diff --git a/eventuate-tram-in-memory/src/main/resources/eventuate-tram-embedded-schema.sql b/eventuate-tram-in-memory/src/main/resources/eventuate-tram-embedded-schema.sql
index ca804bf5..cd4255af 100755
--- a/eventuate-tram-in-memory/src/main/resources/eventuate-tram-embedded-schema.sql
+++ b/eventuate-tram-in-memory/src/main/resources/eventuate-tram-embedded-schema.sql
@@ -7,6 +7,7 @@ CREATE TABLE eventuate.message (
   DESTINATION VARCHAR(1000) NOT NULL,
   HEADERS VARCHAR(1000) NOT NULL,
   PAYLOAD VARCHAR(1000) NOT NULL,
+  MESSAGE_PARTITION SMALLINT,
   CREATION_TIME BIGINT
 );
 
diff --git a/eventuate-tram-messaging-producer-common/src/main/java/io/eventuate/tram/messaging/producer/common/MessageProducerImpl.java b/eventuate-tram-messaging-producer-common/src/main/java/io/eventuate/tram/messaging/producer/common/MessageProducerImpl.java
index c73eb498..fcd9debe 100644
--- a/eventuate-tram-messaging-producer-common/src/main/java/io/eventuate/tram/messaging/producer/common/MessageProducerImpl.java
+++ b/eventuate-tram-messaging-producer-common/src/main/java/io/eventuate/tram/messaging/producer/common/MessageProducerImpl.java
@@ -9,6 +9,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
+import java.util.UUID;
 
 public final class MessageProducerImpl implements MessageProducer {
   private Logger logger = LoggerFactory.getLogger(getClass());
@@ -41,6 +42,8 @@ protected void prepareMessageHeaders(String destination, Message message) {
     implementation.setMessageIdIfNecessary(message);
     message.getHeaders().put(Message.DESTINATION, channelMapping.transform(destination));
     message.getHeaders().put(Message.DATE, HttpDateHeaderFormatUtil.nowAsHttpDateString());
+    if (message.getHeaders().get(Message.PARTITION_ID) == null)
+      message.getHeaders().put(Message.PARTITION_ID, UUID.randomUUID().toString());
   }
 
   protected void send(Message message) {
diff --git a/eventuate-tram-reactive-in-memory/src/main/resources/eventuate-tram-embedded-schema.sql b/eventuate-tram-reactive-in-memory/src/main/resources/eventuate-tram-embedded-schema.sql
index ca804bf5..cd4255af 100755
--- a/eventuate-tram-reactive-in-memory/src/main/resources/eventuate-tram-embedded-schema.sql
+++ b/eventuate-tram-reactive-in-memory/src/main/resources/eventuate-tram-embedded-schema.sql
@@ -7,6 +7,7 @@ CREATE TABLE eventuate.message (
   DESTINATION VARCHAR(1000) NOT NULL,
   HEADERS VARCHAR(1000) NOT NULL,
   PAYLOAD VARCHAR(1000) NOT NULL,
+  MESSAGE_PARTITION SMALLINT,
   CREATION_TIME BIGINT
 );
 
diff --git a/eventuate-tram-spring-producer-jdbc/src/main/java/io/eventuate/tram/spring/messaging/producer/jdbc/TramMessageProducerJdbcConfiguration.java b/eventuate-tram-spring-producer-jdbc/src/main/java/io/eventuate/tram/spring/messaging/producer/jdbc/TramMessageProducerJdbcConfiguration.java
index e14ce0ac..c6d09d98 100644
--- a/eventuate-tram-spring-producer-jdbc/src/main/java/io/eventuate/tram/spring/messaging/producer/jdbc/TramMessageProducerJdbcConfiguration.java
+++ b/eventuate-tram-spring-producer-jdbc/src/main/java/io/eventuate/tram/spring/messaging/producer/jdbc/TramMessageProducerJdbcConfiguration.java
@@ -3,12 +3,14 @@
 import io.eventuate.common.id.IdGenerator;
 import io.eventuate.common.jdbc.EventuateCommonJdbcOperations;
 import io.eventuate.common.jdbc.EventuateSchema;
+import io.eventuate.common.jdbc.OutboxPartitioningSpec;
 import io.eventuate.common.spring.id.IdGeneratorConfiguration;
 import io.eventuate.common.spring.jdbc.EventuateCommonJdbcOperationsConfiguration;
 import io.eventuate.common.spring.jdbc.sqldialect.SqlDialectConfiguration;
 import io.eventuate.tram.messaging.producer.common.MessageProducerImplementation;
 import io.eventuate.tram.messaging.producer.jdbc.MessageProducerJdbcImpl;
 import io.eventuate.tram.spring.messaging.producer.common.TramMessagingCommonProducerConfiguration;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -21,6 +23,12 @@
     IdGeneratorConfiguration.class})
 public class TramMessageProducerJdbcConfiguration {
 
+  @Bean
+  public OutboxPartitioningSpec outboxPartitioningSpec(@Value("${eventuate.tram.outbox.partitioning.outbox.tables:#{null}}") Integer outboxTables,
+                                                       @Value("${eventuate.tram.outbox.partitioning.message.partitions:#{null}}")Integer outboxTablePartitions) {
+    return new OutboxPartitioningSpec(outboxTables, outboxTablePartitions);
+  }
+
   @Bean
   @ConditionalOnMissingBean(MessageProducerImplementation.class)
   public MessageProducerImplementation messageProducerImplementation(EventuateCommonJdbcOperations eventuateCommonJdbcOperations,
diff --git a/gradle.properties b/gradle.properties
index 41b7a9bc..aac72cfd 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -5,8 +5,8 @@ eventuateMavenRepoUrl=file:///Users/cer/.m2/testdeploy,https://snapshots.reposit
 
 eventuatePluginsGradleVersion=0.11.0.BUILD-SNAPSHOT
 
-springBootVersion=1.4.7.RELEASE
-springBootCdcVersion=2.1.1.RELEASE
+springBootVersion=2.6.11
+springBootCdcVersion=2.6.11
 micronautVersion=2.4.1
 micronautDataVersion=2.3.1
 eventuateUtilVersion=0.15.0.BUILD-SNAPSHOT
@@ -26,9 +26,12 @@ eventuateMessagingRabbitMQVersion=0.14.0.BUILD-SNAPSHOT
 eventuateMessagingRedisVersion=0.14.0.BUILD-SNAPSHOT
 
 eventuateCommonImageVersion=0.16.0.BUILD-SNAPSHOT
-eventuateMessagingKafkaImageVersion=0.15.0.BUILD-SNAPSHOT
+eventuateMessagingKafkaImageVersion=0.16.0.BUILD-SNAPSHOT
 eventuateCdcImageVersion=0.14.0.BUILD-SNAPSHOT
 
+assertjVersion=3.23.1
+testContainersVersion=1.17.3
+
 reactorVersion=3.4.18
 
 version=0.31.0-SNAPSHOT