diff --git a/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java b/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java index 70f0ad3e..6c04805b 100644 --- a/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java +++ b/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java @@ -62,6 +62,7 @@ public class KafkaReadStreamImpl implements KafkaReadStream { private final AtomicBoolean consuming = new AtomicBoolean(false); private final AtomicLong demand = new AtomicLong(Long.MAX_VALUE); + private final AtomicBoolean polling = new AtomicBoolean(false); private Handler> recordHandler; private Handler exceptionHandler; private Iterator> current; // Accessed on event loop @@ -137,24 +138,37 @@ private void submitTaskWhenStarted(java.util.function.BiConsumer> handler) { - this.worker.submit(() -> { - if (!this.closed.get()) { - try { - ConsumerRecords records = this.consumer.poll(pollTimeout); - if (records != null && records.count() > 0) { - this.context.runOnContext(v -> handler.handle(records)); - } else { - // Don't call pollRecords directly, but use schedule() to actually pause when the readStream is paused - schedule(0); - } - } catch (WakeupException ignore) { - } catch (Exception e) { - if (exceptionHandler != null) { - exceptionHandler.handle(e); - } - } + if(this.polling.compareAndSet(false, true)){ + this.worker.submit(() -> { + boolean submitted = false; + try { + if (!this.closed.get()) { + try { + ConsumerRecords records = this.consumer.poll(pollTimeout); + if (records != null && records.count() > 0) { + submitted = true; // sets false only when the iterator is overwritten + this.context.runOnContext(v -> { + this.polling.set(false); + handler.handle(records); + }); + } + } catch (WakeupException ignore) { + } catch (Exception e) { + if (exceptionHandler != null) { + exceptionHandler.handle(e); + } + } + } + } finally { + if(!submitted){ + this.context.runOnContext(v -> { + this.polling.set(false); + schedule(0); + }); + } + } + }); } - }); } private void schedule(long delay) { diff --git a/src/test/java/io/vertx/kafka/client/tests/KafkaReadStreamMockTest.java b/src/test/java/io/vertx/kafka/client/tests/KafkaReadStreamMockTest.java new file mode 100644 index 00000000..6262ea9e --- /dev/null +++ b/src/test/java/io/vertx/kafka/client/tests/KafkaReadStreamMockTest.java @@ -0,0 +1,120 @@ +package io.vertx.kafka.client.tests; + +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.junit.Test; +import org.junit.runner.RunWith; + +import io.vertx.core.Vertx; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.kafka.client.common.TopicPartition; +import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.consumer.KafkaReadStream; +import io.vertx.kafka.client.consumer.impl.KafkaConsumerImpl; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import io.vertx.kafka.client.producer.impl.KafkaProducerRecordImpl; + +@RunWith(VertxUnitRunner.class) +public class KafkaReadStreamMockTest extends KafkaTestBase { + + private LinkedList> recordsMock = new LinkedList<>(); + + private int SEND_BATCH = 5; + private int TOTAL_MESSAGES = 400; + private final String TOPIC = "topic"; + private Long timer = null; + + private void initRecords(){ + int numMessages = TOTAL_MESSAGES; + for (int i = 0;i < numMessages;i++) { + String key = "key-" + i; + String value = "value-" + i; + recordsMock.add(new ConsumerRecord(TOPIC, 0, i, key, value)); + } + } + + private MockConsumer createMockConsumer(){ + MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + + Map beginningOffsets = new HashMap<>(); + beginningOffsets.put(new org.apache.kafka.common.TopicPartition(TOPIC, 0), 0L); + consumer.updateBeginningOffsets(beginningOffsets); + return consumer; + } + + private void sendNextBatch(MockConsumer consumer){ + for(int i=0;i0;i++) + consumer.addRecord(recordsMock.pop()); + + } + + @Test + public void shouldNotLoseMessages(TestContext ctx){ + Vertx vertx = Vertx.vertx(); + + Async done = ctx.async(); + + initRecords(); + + MockConsumer consumer = createMockConsumer(); + KafkaReadStream readStream = KafkaReadStream.create(vertx, consumer); + KafkaConsumer consumerVertx = new KafkaConsumerImpl<>(readStream); + + + AtomicLong partitionOffset = new AtomicLong(-1); + + + consumerVertx.handler((r)->{ + long offset = r.offset(); + + partitionOffset.addAndGet(1); + ctx.assertEquals(partitionOffset.get(), offset); + + System.out.println("offset " + offset); + + if(offset == TOTAL_MESSAGES-1){ + consumerVertx.close(); + done.complete(); + } else { + + if(timer!=null) vertx.cancelTimer(timer); + timer = vertx.setTimer(5, (t)->{ + consumerVertx.pause(); + vertx.getOrCreateContext().runOnContext((t1)->{ + consumerVertx.commit(); + consumerVertx.resume(); + sendNextBatch(consumer); + // sends two batches of messages + vertx.getOrCreateContext().runOnContext((t2)->{ + sendNextBatch(consumer); + }); + }); + }); + + } + + }); + + consumerVertx.exceptionHandler(t->ctx.fail(t)); + + + Set partitions = new LinkedHashSet<>(); + partitions.add(new TopicPartition(TOPIC, 0)); + + consumerVertx.assign(partitions, (h)->{ + sendNextBatch(consumer); + }); + + } + +}