Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support external offset storage (Elasticsearch, JDBC, etc) #34

Open
dialtahi opened this issue Mar 31, 2020 · 3 comments
Open

Support external offset storage (Elasticsearch, JDBC, etc) #34

dialtahi opened this issue Mar 31, 2020 · 3 comments

Comments

@dialtahi
Copy link

For the read side we could be using Elasticsearch to index the data for searching. In case of lost of data in Elasticsearch because some restore required it's difficult to sync the command side and the read side. One strategy is to store the offsets in the same datastore of the production data so we could use the hot spots of the Kafka Consumer to store the offsets not only in Kafka/Zookeeper but in the other datastore.

For the regular subscription of the KafkaConsumer, KafkaConsumer.subscribe, we can attach a Listener to get notifications on partition assignment/revocation for the subscribed topics and set and get the position to the last offset/topic/partition stored in the datastore.

https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe-java.util.Collection-org.apache.kafka.clients.consumer.ConsumerRebalanceListener-

https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html

The usage of the Eventuate Consumer should provide the possibility to specify an implementation or configuration to choose the ExtenalOffsetStorage due to the consumer of the messages and the offset storage should put the data in the same datastore.

@Bean("customMessageConsumer")
MessageConsumer customMessageConsumer(MessageConsumerBuilder builder, RestHighLevelClient client, ObjectMapper objectMapper) {
    builder.build(new ElasticsearchOffsetStorage(client, objectMapper))
}

The subscription would be directly to the custom message consumer

@Autowired
@Qualifier("customMessageConsumer")
private final MessageConsumer customMessagesConsumer
@cer
Copy link
Contributor

cer commented Mar 31, 2020

Another possibility is to still have a global MessageConsumer @Bean (actually its MessageConsumerKafkaImpl) but define a ElasticsearchOffsetStorage @Bean that gets injected into it and automatically overrides the default behavior

@cer
Copy link
Contributor

cer commented Apr 1, 2020

Here are some design diagrams.

message-consumer-design
subscribing

@cer
Copy link
Contributor

cer commented Apr 1, 2020

Here are some thoughts:

EventuateKafkaConsumer is responsible for creating the KafkaConsumer and calling subscribe().
I'd propose that it do that using:

interface KafkaConsumerConfigurer {
   KafkaConsumer make(consumerProperties)
   subscribe(KafkaConsumer, subscriberId, topics);
}

The MessageConsumerKafkaImpl @Bean would be injected with a KafkaConsumerConfigurer @Bean and pass that down the call chain to EventuateKafkaConsumer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants