An example of a Spring Boot Kafka consumer application with non-blocking retry.
- Java 17
- Spring Framework 6.0
- Spring Boot 3.0
- Spring Kafka 3.0.4
- @KafkaListener annotated method receives a String message from the main Kafka topic.
- The Application tries to process this String message (trim payload if its length > 5 symbols) using a method that could throw an exception (you can set up error possibility in application.properties). Possible cases:
- app prints the result in logs if processing succeeds;
- app sends failed record to retry topic if @KafkaListener annotated method throws an exception. After delayed time, this method will attempt to process failed record again from a retry topic (see the first step);
- app sends failed record to DLQ topic (dead letter queue) if it has exhausted all attempts to process this record.
- app sends failed record to DLQ topic without retries if specific fatal exception was thrown.
- Create Kafka environment using docker-compose.yml:
docker-compose up -d
-
(Optional) Set up application.properties if needed.
Some useful properties
- kafka.retry.attempts-count — count of attempts to process the message. First attempt is for reading from the main topic, the next ones — from the retry topic.
- kafka.retry.interval-ms — interval between attempts in milliseconds.
- processing.error-percentage — probability in percent of throwing processing exception. When this exception is thrown, application could send failed record to the retry topic.
- processing.fatal-error-percentage — probability in percent of throwing fatal processing exception. When this exception is thrown, application immediately sends failed record to the DLQ topic.
- processing.message-max-length — if an incoming message has longer length, the application will trim it up to this value.
-
Run SpringKafkaApplication.java by using your IDE or by executing next commands:
- Build .jar file:
mvn clean package
- Run Java app from this .jar:
java -jar "target/spring-kafka-non-blocking-retry.jar"
- Run Kafka producer in command line to send passages into the main kafka topic:
sh manualTesting/produce-message.sh
- Look into the SpringKafkaApplication logs and check if the app has processed the message.
- Retry mechanism in this example doesn't block receiving of messages from the main topic. If we didn't configure the retry logic in any way, by default Spring would make 9 retries to process the message using the DefaultErrorHandler. It seeks consumer to the current offset again and again, so it blocks receiving messages from the main topic.
- In this example, we use only one retry topic. By default, if we want application to make N retries (N > 0) using a retry topic, Spring creates N topics: 1 topic per a retry. This is not bad, but on some projects the regulations are aimed at a minimum topics count.
- This example shows how to set up your own retry and DLQ topic names. By default, Spring uses "-retry" and "-dlt" suffixes for the retry and DLQ topics. It can break naming rules for some projects, which don't use dash symbol in names.
- This example demonstrates how to set up informative Kafka record headers for a retry and DLQ topics. By default, Springs wraps any exception going from @KafkaListener method into the ListenerExecutionFailedException. It is used in the internal Spring logic. But information from this exception in the Kafka headers doesn't clarify the occurred problem at all.
- You can find all above-mentioned settings for retry logic in the class KafkaRetryDlqConfiguration.java
- Just like a real-time running application, these tests use the KafkaAutoConfiguration to create the context for the kafka logic. It makes tests more realistic and relieves from the need to manually create the objects necessary for Kafka consumer (e.g. ConsumerFactory, ProducerFactory)
- These tests are running rather quickly. Unlike the case of using @SpringBootTest annotation, they create only the context that is necessary for testing Kafka functionality (see KafkaTestConfiguration.java).
- Dependencies used in tests:
- JUnit5 & Mockito (see KafkaConsumerIntegrationTest.java)
- Testcontainers – to launch Kafka broker in a Docker container (see KafkaTestConfiguration.java)
- Spring Aspects – to catch a moment when Kafka consumer has processed the message (see KafkaConsumerAspect.java)
You can run these tests by executing the next command:
mvn clean test