Skip to content

Commit

Permalink
GEOMESA-3370,GEOMESA-3368 Kafka - Java-friendly at-least-once consumer (
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz authored Jun 10, 2024
1 parent 80e283e commit 6e2056a
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 5 deletions.
2 changes: 0 additions & 2 deletions docs/build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
<condition property="build.directory" value="${build.directory}" else="${basedir}/target">
<isset property="build.directory"/>
</condition>

<echo message="Build directory: ${build.directory}"/>
</target>

<target name="clean" depends="init">
Expand Down
72 changes: 71 additions & 1 deletion docs/user/kafka/feature_events.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
.. _kafka_feature_events:

Kafka Feature Events
--------------------

Listening for Feature Events
----------------------------
============================

.. warning::

The feature events API does not guarantee all messages will be fully processed. To ensure
at-least-once processing, use :ref:`kafka_guaranteed_processing`.

The GeoTools API includes a mechanism to fire off a `FeatureEvent`_ object each time
that there is an "event," which occurs when data is added, changed, or deleted in a
Expand Down Expand Up @@ -81,6 +89,68 @@ be used to mark the method that does the deregistration:
// other cleanup
}
.. _kafka_guaranteed_processing:

Guaranteed Message Processing
=============================

In order to guarantee at-least-once processing of **all** messages, implement an instance of ``GeoMessageProcessor``. The
underlying Kafka consumer will not acknowledge messages until the processor returns, ensuring that they are fully processed
without any errors:

.. tabs::

.. code-tab:: java

import org.locationtech.geomesa.kafka.data.KafkaDataStore
import org.locationtech.geomesa.kafka.utils.GeoMessage;
import org.locationtech.geomesa.kafka.utils.interop.GeoMessageProcessor;

GeoMessageProcessor processor = new GeoMessageProcessor() {
@Override
public BatchResult consume(List<GeoMessage> records) {
records.forEach((r) -> {
if (r instanceof GeoMessage.Change) {
System.out.println(((GeoMessage.Change) r).feature());
} else if (r instanceof GeoMessage.Delete) {
System.out.println(((GeoMessage.Delete) r).id());
} else if (r instanceof GeoMessage.Clear) {
System.out.println("clear");
}
});
return BatchResult.COMMIT;
}
};
// use try-with-resources to close the consumer
try (((KafkaDataStore) ds).createConsumer(sftName, "my-group-id", processor)) {
Thread.sleep(10000);
}

.. code-tab:: scala

import org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult
import org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.BatchResult
import org.locationtech.geomesa.kafka.data.KafkaDataStore
import org.locationtech.geomesa.kafka.utils.{GeoMessage, GeoMessageProcessor}

val processor = new GeoMessageProcessor() {
override def consume(records: Seq[GeoMessage]): BatchResult = {
records.foreach {
case GeoMessage.Change(sf) => println(sf)
case GeoMessage.Delete(id) => println(id)
case GeoMessage.Clear => println("clear")
}
BatchResult.Commit
}
}

val consumer = ds.asInstanceOf[KafkaDataStore].createConsumer(sftName, "my-group-id", processor)
try {
???
} finally {
consumer.close()
}

.. _FeatureEvent: https://docs.geotools.org/stable/javadocs/org/geotools/api/data/FeatureEvent.html
.. _FeatureEvent.Type: https://docs.geotools.org/stable/javadocs/org/geotools/api/data/FeatureEvent.Type.html
.. _FeatureListener: https://docs.geotools.org/stable/javadocs/org/geotools/api/data/FeatureListener.html
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.kafka.utils.interop;

import org.locationtech.geomesa.kafka.consumer.BatchConsumer;
import org.locationtech.geomesa.kafka.utils.GeoMessage;
import scala.Enumeration;

import java.util.List;

/**
* Message processor class. Guarantees 'at-least-once' processing.
*/
public interface GeoMessageProcessor extends org.locationtech.geomesa.kafka.utils.GeoMessageProcessor {

/**
* Consume a batch of records.
* <p>
* The response from this method will determine the continued processing of messages. If `Commit`
* is returned, the batch is considered complete and won't be presented again. If `Continue` is
* returned, the batch will be presented again in the future, and more messages will be read off the topic
* in the meantime. If `Pause` is returned, the batch will be presented again in the future, but
* no more messages will be read off the topic in the meantime.
* <p>
* This method should return in a reasonable amount of time. If too much time is spent processing
* messages, consumers may be considered inactive and be dropped from processing. See
* <a href="https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html">https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html</a>
* <p>
* Note: if there is an error committing the batch or something else goes wrong, some messages may
* be repeated in a subsequent call, regardless of the response from this method
*
* @param records records
* @return indication to continue, pause, or commit
*/
BatchResult consume(List<GeoMessage> records);

// scala 2.12 - note, can't @Override these due to scala version differences
default Enumeration.Value consume(scala.collection.Seq<GeoMessage> records) {
List<GeoMessage> list = scala.collection.JavaConverters.seqAsJavaListConverter(records).asJava();
BatchResult result = consume(list);
switch(result) {
case COMMIT: return BatchConsumer.BatchResult$.MODULE$.Commit();
case CONTINUE: return BatchConsumer.BatchResult$.MODULE$.Continue();
case PAUSE: return BatchConsumer.BatchResult$.MODULE$.Pause();
}
return null;
}

// scala 2.13 - note, can't @Override these due to scala version differences
default Enumeration.Value consume(scala.collection.immutable.Seq<GeoMessage> records) {
List<GeoMessage> list = scala.collection.JavaConverters.seqAsJavaListConverter(records).asJava();
BatchResult result = consume(list);
switch(result) {
case COMMIT: return BatchConsumer.BatchResult$.MODULE$.Commit();
case CONTINUE: return BatchConsumer.BatchResult$.MODULE$.Continue();
case PAUSE: return BatchConsumer.BatchResult$.MODULE$.Pause();
}
return null;
}

enum BatchResult {
COMMIT, CONTINUE, PAUSE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,24 @@ class KafkaDataStore(
* @param processor message processor
* @return
*/
def createConsumer(typeName: String, groupId: String, processor: GeoMessageProcessor): Closeable =
createConsumer(typeName, groupId, processor, None)

/**
* Create a message consumer for the given feature type. This can be used for guaranteed at-least-once
* message processing
*
* @param typeName type name
* @param groupId consumer group id
* @param processor message processor
* @param errorHandler error handler
* @return
*/
def createConsumer(
typeName: String,
groupId: String,
processor: GeoMessageProcessor,
errorHandler: Option[ConsumerErrorHandler] = None): Closeable = {
errorHandler: Option[ConsumerErrorHandler]): Closeable = {
val sft = getSchema(typeName)
if (sft == null) {
throw new IllegalArgumentException(s"Schema '$typeName' does not exist; call `createSchema` first")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.kafka.data;

import org.junit.Test;
import org.locationtech.geomesa.kafka.utils.GeoMessage;
import org.locationtech.geomesa.kafka.utils.interop.GeoMessageProcessor;

import java.util.ArrayList;
import java.util.List;

import static org.mockito.Mockito.mock;

public class GeoMessageProcessorApiTest {

public static class TestProcessor implements GeoMessageProcessor {
public List<GeoMessage.Change> added = new ArrayList<>();
public List<GeoMessage.Delete> removed = new ArrayList<>();
public int cleared = 0;
@Override
public BatchResult consume(List<GeoMessage> records) {
records.forEach((r) -> {
if (r instanceof GeoMessage.Change) {
added.add((GeoMessage.Change) r);
} else if (r instanceof GeoMessage.Delete) {
removed.add(((GeoMessage.Delete) r));
} else if (r instanceof GeoMessage.Clear) {
cleared++;
}
});
return BatchResult.COMMIT;
}
}

@Test
public void testJavaApi() {
GeoMessageProcessor processor = new TestProcessor();
KafkaDataStore kds = mock();
// verify that things compile from java
// noinspection resource
kds.createConsumer("type-name", "group-id", processor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,6 @@ class KafkaDataStoreTest extends KafkaContainerTest with Mockito {
}

"support at-least-once consumers" >> {
skipped("inconsistent")
val params = Map(
KafkaDataStoreParams.ConsumerConfig.key -> "auto.offset.reset=earliest",
KafkaDataStoreParams.ConsumerCount.key -> "2",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.kafka.utils

import org.junit.runner.RunWith
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult
import org.locationtech.geomesa.kafka.data.GeoMessageProcessorApiTest
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
import org.specs2.mutable.Specification
import org.specs2.runner.JUnitRunner

@RunWith(classOf[JUnitRunner])
class GeoMessageProcessorJavaTest extends Specification {

import scala.collection.JavaConverters._

lazy val sft = SimpleFeatureTypes.createType("KafkaGeoMessageTest", "name:String,*geom:Point:srid=4326")
lazy val feature = ScalaSimpleFeature.create(sft, "test_id", "foo", "POINT(1 -1)")

"GeoMessageProcessor" should {
"work through the java api" in {
val change = GeoMessage.change(feature)
val del = GeoMessage.delete(feature.getID)
val clear = GeoMessage.clear()

val processor = new GeoMessageProcessorApiTest.TestProcessor()
processor.consume(Seq(change, del, clear)) mustEqual BatchResult.Commit
processor.added.asScala mustEqual Seq(change)
processor.removed.asScala mustEqual Seq(del)
processor.cleared mustEqual 1
}
}
}

0 comments on commit 6e2056a

Please sign in to comment.