Skip to content

Commit

Permalink
Merge pull request vert-x3#113 from emasab/bugfix/112-lost-messages
Browse files Browse the repository at this point in the history
Bugfix/112 lost messages
  • Loading branch information
ppatierno authored Oct 16, 2018
2 parents 3de08a8 + 4962cc7 commit 00655e9
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class KafkaReadStreamImpl<K, V> implements KafkaReadStream<K, V> {

private final AtomicBoolean consuming = new AtomicBoolean(false);
private final AtomicLong demand = new AtomicLong(Long.MAX_VALUE);
private final AtomicBoolean polling = new AtomicBoolean(false);
private Handler<ConsumerRecord<K, V>> recordHandler;
private Handler<Throwable> exceptionHandler;
private Iterator<ConsumerRecord<K, V>> current; // Accessed on event loop
Expand Down Expand Up @@ -137,24 +138,37 @@ private <T> void submitTaskWhenStarted(java.util.function.BiConsumer<Consumer<K,
}

private void pollRecords(Handler<ConsumerRecords<K, V>> handler) {
this.worker.submit(() -> {
if (!this.closed.get()) {
try {
ConsumerRecords<K, V> records = this.consumer.poll(pollTimeout);
if (records != null && records.count() > 0) {
this.context.runOnContext(v -> handler.handle(records));
} else {
// Don't call pollRecords directly, but use schedule() to actually pause when the readStream is paused
schedule(0);
}
} catch (WakeupException ignore) {
} catch (Exception e) {
if (exceptionHandler != null) {
exceptionHandler.handle(e);
}
}
if(this.polling.compareAndSet(false, true)){
this.worker.submit(() -> {
boolean submitted = false;
try {
if (!this.closed.get()) {
try {
ConsumerRecords<K, V> records = this.consumer.poll(pollTimeout);
if (records != null && records.count() > 0) {
submitted = true; // sets false only when the iterator is overwritten
this.context.runOnContext(v -> {
this.polling.set(false);
handler.handle(records);
});
}
} catch (WakeupException ignore) {
} catch (Exception e) {
if (exceptionHandler != null) {
exceptionHandler.handle(e);
}
}
}
} finally {
if(!submitted){
this.context.runOnContext(v -> {
this.polling.set(false);
schedule(0);
});
}
}
});
}
});
}

private void schedule(long delay) {
Expand Down
120 changes: 120 additions & 0 deletions src/test/java/io/vertx/kafka/client/tests/KafkaReadStreamMockTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package io.vertx.kafka.client.tests;

import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.junit.Test;
import org.junit.runner.RunWith;

import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import io.vertx.kafka.client.consumer.impl.KafkaConsumerImpl;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.impl.KafkaProducerRecordImpl;

@RunWith(VertxUnitRunner.class)
public class KafkaReadStreamMockTest extends KafkaTestBase {

private LinkedList<ConsumerRecord<String,String>> recordsMock = new LinkedList<>();

private int SEND_BATCH = 5;
private int TOTAL_MESSAGES = 400;
private final String TOPIC = "topic";
private Long timer = null;

private void initRecords(){
int numMessages = TOTAL_MESSAGES;
for (int i = 0;i < numMessages;i++) {
String key = "key-" + i;
String value = "value-" + i;
recordsMock.add(new ConsumerRecord<String,String>(TOPIC, 0, i, key, value));
}
}

private MockConsumer<String, String> createMockConsumer(){
MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);

Map<org.apache.kafka.common.TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(new org.apache.kafka.common.TopicPartition(TOPIC, 0), 0L);
consumer.updateBeginningOffsets(beginningOffsets);
return consumer;
}

private void sendNextBatch(MockConsumer<String, String> consumer){
for(int i=0;i<SEND_BATCH && recordsMock.size()>0;i++)
consumer.addRecord(recordsMock.pop());

}

@Test
public void shouldNotLoseMessages(TestContext ctx){
Vertx vertx = Vertx.vertx();

Async done = ctx.async();

initRecords();

MockConsumer<String, String> consumer = createMockConsumer();
KafkaReadStream<String, String> readStream = KafkaReadStream.create(vertx, consumer);
KafkaConsumer<String, String> consumerVertx = new KafkaConsumerImpl<>(readStream);


AtomicLong partitionOffset = new AtomicLong(-1);


consumerVertx.handler((r)->{
long offset = r.offset();

partitionOffset.addAndGet(1);
ctx.assertEquals(partitionOffset.get(), offset);

System.out.println("offset " + offset);

if(offset == TOTAL_MESSAGES-1){
consumerVertx.close();
done.complete();
} else {

if(timer!=null) vertx.cancelTimer(timer);
timer = vertx.setTimer(5, (t)->{
consumerVertx.pause();
vertx.getOrCreateContext().runOnContext((t1)->{
consumerVertx.commit();
consumerVertx.resume();
sendNextBatch(consumer);
// sends two batches of messages
vertx.getOrCreateContext().runOnContext((t2)->{
sendNextBatch(consumer);
});
});
});

}

});

consumerVertx.exceptionHandler(t->ctx.fail(t));


Set<TopicPartition> partitions = new LinkedHashSet<>();
partitions.add(new TopicPartition(TOPIC, 0));

consumerVertx.assign(partitions, (h)->{
sendNextBatch(consumer);
});

}

}

0 comments on commit 00655e9

Please sign in to comment.