Skip to content

Commit

Permalink
GEOMESA-3371 Lambda - Allow for customization of Kafka topic name (#3153
Browse files Browse the repository at this point in the history
)
  • Loading branch information
elahrvivaz authored Aug 26, 2024
1 parent 9d9341f commit a667fa3
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 29 deletions.
1 change: 1 addition & 0 deletions docs/user/lambda/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ To get started with the Lambda data store, try the :doc:`/tutorials/geomesa-quic
geoserver
commandline
configuration
index_config
advanced
21 changes: 21 additions & 0 deletions docs/user/lambda/index_config.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
Lambda Index Configuration
==========================

GeoMesa exposes a variety of configuration options that can be used to customize and optimize a given installation.
The Lambda data store supports most of the general options described under :ref:`index_config`.

Kafka Topic Name
----------------

Each SimpleFeatureType (or schema) will be written to a unique Kafka topic. By default, the topic will be
named based on the persistent data store and the SimpleFeatureType name.

If desired, the topic name can be set to an arbitrary value by setting the user data key ``geomesa.lambda.topic``
before calling ``createSchema``:

.. code-block:: java
SimpleFeatureType sft = ....;
sft.getUserData().put("geomesa.lambda.topic", "myTopicName");
For more information on how to set schema options, see :ref:`set_sft_options`.
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,16 @@ class LambdaDataStore(val persistence: DataStore, config: LambdaConfig)(implicit
override def getNames: java.util.List[Name] = persistence.getNames

override def createSchema(sft: SimpleFeatureType): Unit = {
val topic = LambdaDataStore.topic(sft, config.zkNamespace)
if (topic.contains("/")) {
// note: kafka doesn't allow slashes in topic names
throw new IllegalArgumentException(s"Topic cannot contain '/': $topic")
}
persistence.createSchema(sft)
// TODO for some reason lambda qs consumers don't rebalance when the topic is created after the consumers...
// transients.get(sft.getTypeName).createSchema()
val topic = KafkaStore.topic(config.zkNamespace, sft)
val props = new Properties()
config.producerConfig.foreach { case (k, v) => props.put(k, v) }

WithClose(AdminClient.create(props)) { admin =>
if (admin.listTopics().names().get.contains(topic)) {
logger.warn(s"Topic [$topic] already exists - it may contain stale data")
Expand Down Expand Up @@ -189,6 +192,23 @@ class LambdaDataStore(val persistence: DataStore, config: LambdaConfig)(implicit
}

object LambdaDataStore {

val TopicKey = "geomesa.lambda.topic"

/**
* Gets the kafka topic configured in the sft, or a default topic if nothing is configured.
*
* @param sft simple feature type
* @param namespace namespace to use for default topic
* @return
*/
def topic(sft: SimpleFeatureType, namespace: String): String = {
sft.getUserData.get(TopicKey) match {
case topic: String => topic
case _ => s"${namespace}_${sft.getTypeName}".replaceAll("[^a-zA-Z0-9_\\-]", "_")
}
}

case class LambdaConfig(
zookeepers: String,
zkNamespace: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.locationtech.geomesa.index.planning.QueryInterceptor.QueryInterceptor
import org.locationtech.geomesa.index.planning.QueryRunner.QueryResult
import org.locationtech.geomesa.index.utils.{ExplainLogging, Explainer}
import org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions
import org.locationtech.geomesa.lambda.data.LambdaDataStore
import org.locationtech.geomesa.lambda.data.LambdaDataStore.LambdaConfig
import org.locationtech.geomesa.lambda.stream.kafka.KafkaStore.MessageTypes
import org.locationtech.geomesa.lambda.stream.{OffsetManager, TransientStore}
Expand Down Expand Up @@ -53,7 +54,7 @@ class KafkaStore(

private val producer = KafkaStore.producer(sft, config.producerConfig)

private val topic = KafkaStore.topic(config.zkNamespace, sft)
private val topic = LambdaDataStore.topic(sft, config.zkNamespace)

private val cache = new KafkaFeatureCache(topic)

Expand Down Expand Up @@ -173,8 +174,10 @@ object KafkaStore {
val Delete: Byte = 1
}

def topic(ns: String, sft: SimpleFeatureType): String = topic(ns, sft.getTypeName)
@deprecated("Replaced with LambdaDataStore.topic")
def topic(ns: String, sft: SimpleFeatureType): String = LambdaDataStore.topic(sft, ns)

@deprecated("Does not return correct topic if topic is overridden in the feature type - replaced with LambdaDataStore.topic")
def topic(ns: String, typeName: String): String = s"${ns}_$typeName".replaceAll("[^a-zA-Z0-9_\\-]", "_")

def producer(sft: SimpleFeatureType, connect: Map[String, String]): Producer[Array[Byte], Array[Byte]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ class LambdaContainerTest extends KafkaContainerTest {
val offsetManager = new InMemoryOffsetManager

lazy val dsParams = Map(
"lambda.accumulo.instance.id" -> AccumuloContainer.instanceName,
"lambda.accumulo.zookeepers" -> AccumuloContainer.zookeepers,
"lambda.accumulo.user" -> AccumuloContainer.user,
"lambda.accumulo.password" -> AccumuloContainer.password,
"lambda.accumulo.instance.name" -> AccumuloContainer.instanceName,
"lambda.accumulo.zookeepers" -> AccumuloContainer.zookeepers,
"lambda.accumulo.user" -> AccumuloContainer.user,
"lambda.accumulo.password" -> AccumuloContainer.password,
// note the table needs to be different to prevent testing errors
"lambda.accumulo.catalog" -> sftName,
"lambda.kafka.brokers" -> brokers,
"lambda.kafka.zookeepers" -> zookeepers,
"lambda.kafka.partitions" -> 2,
"lambda.expiry" -> "100ms",
"lambda.clock" -> clock,
"lambda.offset-manager" -> offsetManager
"lambda.accumulo.catalog" -> sftName,
"lambda.kafka.brokers" -> brokers,
"lambda.kafka.zookeepers" -> zookeepers,
"lambda.kafka.partitions" -> 2,
"lambda.expiry" -> "100ms",
"lambda.clock" -> clock,
"lambda.offset-manager" -> offsetManager
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,45 @@ class LambdaDataStoreTest extends LambdaContainerTest {

"LambdaDataStore" should {
"write and read features" in {
val ds = DataStoreFinder.getDataStore(dsParams.asJava).asInstanceOf[LambdaDataStore]
readAndWriteTest()
}
}


def readAndWriteTest(): MatchResult[Any] = {
foreach(Seq(None, Some("my-lambda-topic"))) { customTopic =>
def dsParams(extras: (String, String)*): java.util.Map[String, _] = {
val catalog = s"${getClass.getSimpleName}${customTopic.getOrElse("").replaceAll("[^A-Za-z0-9]", "_")}"
(this.dsParams ++ Map("lambda.accumulo.catalog" -> catalog) ++ extras.toMap).asJava
}

clock.tick = 0

val sft = SimpleFeatureTypes.mutable(SimpleFeatureTypes.copy(this.sft))
customTopic.foreach(sft.getUserData.put(LambdaDataStore.TopicKey, _))

val ds = DataStoreFinder.getDataStore(dsParams()).asInstanceOf[LambdaDataStore]
ds must not(beNull)

try {
ds.createSchema(sft)
ds.getSchema(sft.getTypeName) mustEqual sft
SimpleFeatureTypes.compare(ds.getSchema(sft.getTypeName), sft) mustEqual 0

customTopic.foreach { topic =>
LambdaDataStore.topic(ds.getSchema(sft.getTypeName), "") mustEqual topic
}

// check namespaces
val ns = DataStoreFinder.getDataStore((dsParams ++ Map("namespace" -> "ns0")).asJava).getSchema(sft.getTypeName).getName
val ns = DataStoreFinder.getDataStore(dsParams("namespace" -> "ns0")).getSchema(sft.getTypeName).getName
ns.getNamespaceURI mustEqual "ns0"
ns.getLocalPart mustEqual sft.getTypeName

// note: instantiate after creating the schema so it's not cached as missing
val readOnly = DataStoreFinder.getDataStore((dsParams ++ Map("expiry" -> "Inf")).asJava).asInstanceOf[LambdaDataStore]
val readOnly = DataStoreFinder.getDataStore(dsParams("expiry" -> "Inf")).asInstanceOf[LambdaDataStore]
readOnly must not(beNull)

try {
readOnly.getSchema(sft.getTypeName) mustEqual sft
SimpleFeatureTypes.compare(readOnly.getSchema(sft.getTypeName), sft) mustEqual 0

WithClose(ds.getFeatureWriterAppend(sft.getTypeName, Transaction.AUTO_COMMIT)) { writer =>
features.foreach { feature =>
Expand All @@ -136,9 +157,9 @@ class LambdaDataStoreTest extends LambdaContainerTest {
// test queries against the transient store
forall(Seq(ds, readOnly)) { store =>
eventually(40, 100.millis)(SelfClosingIterator(store.transients.get(sft.getTypeName).read().iterator()).toSeq must
containTheSameElementsAs(features))
containTheSameElementsAs(features))
SelfClosingIterator(store.getFeatureReader(new Query(sft.getTypeName), Transaction.AUTO_COMMIT)).toSeq must
containTheSameElementsAs(features)
containTheSameElementsAs(features)
}
testTransforms(ds, SimpleFeatureTypes.createType("lambda", "*geom:Point:srid=4326"))
testBin(ds)
Expand All @@ -151,9 +172,9 @@ class LambdaDataStoreTest extends LambdaContainerTest {
// test mixed queries against both stores
forall(Seq(ds, readOnly)) { store =>
eventually(40, 100.millis)(SelfClosingIterator(store.transients.get(sft.getTypeName).read().iterator()).toSeq must
beEqualTo(features.drop(1)))
beEqualTo(features.drop(1)))
SelfClosingIterator(store.getFeatureReader(new Query(sft.getTypeName), Transaction.AUTO_COMMIT)).toSeq must
containTheSameElementsAs(features)
containTheSameElementsAs(features)
}
testTransforms(ds, SimpleFeatureTypes.createType("lambda", "*geom:Point:srid=4326"))
testBin(ds)
Expand All @@ -162,7 +183,7 @@ class LambdaDataStoreTest extends LambdaContainerTest {

// test query_persistent/query_transient hints
forall(Seq((features.take(1), QueryHints.LAMBDA_QUERY_TRANSIENT, "LAMBDA_QUERY_TRANSIENT"),
(features.drop(1) , QueryHints.LAMBDA_QUERY_PERSISTENT, "LAMBDA_QUERY_PERSISTENT"))) {
(features.drop(1) , QueryHints.LAMBDA_QUERY_PERSISTENT, "LAMBDA_QUERY_PERSISTENT"))) {
case (feature, hint, string) =>
val hints = Seq((hint, java.lang.Boolean.FALSE),
(Hints.VIRTUAL_TABLE_PARAMETERS, Map(string -> "false").asJava))
Expand All @@ -180,7 +201,7 @@ class LambdaDataStoreTest extends LambdaContainerTest {
forall(Seq(ds, readOnly)) { store =>
eventually(40, 100.millis)(SelfClosingIterator(store.transients.get(sft.getTypeName).read().iterator()) must beEmpty)
SelfClosingIterator(store.getFeatureReader(new Query(sft.getTypeName), Transaction.AUTO_COMMIT)).toSeq must
containTheSameElementsAs(features)
containTheSameElementsAs(features)
}
testTransforms(ds, SimpleFeatureTypes.createType("lambda", "*geom:Point:srid=4326"))
testBin(ds)
Expand All @@ -195,7 +216,7 @@ class LambdaDataStoreTest extends LambdaContainerTest {
forall(Seq(ds, readOnly)) { store =>
eventually(40, 100.millis)(
SelfClosingIterator(store.getFeatureReader(new Query(sft.getTypeName), Transaction.AUTO_COMMIT)).toSeq must
containTheSameElementsAs(Seq(update, features.last))
containTheSameElementsAs(Seq(update, features.last))
)
}

Expand All @@ -205,7 +226,7 @@ class LambdaDataStoreTest extends LambdaContainerTest {
forall(Seq(ds, readOnly)) { store =>
eventually(40, 100.millis)(SelfClosingIterator(store.transients.get(sft.getTypeName).read().iterator()) must beEmpty)
SelfClosingIterator(store.getFeatureReader(new Query(sft.getTypeName), Transaction.AUTO_COMMIT)).toSeq must
containTheSameElementsAs(Seq(update, features.last))
containTheSameElementsAs(Seq(update, features.last))
}
} finally {
readOnly.dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.geotools.util.factory.Hints
import org.junit.runner.RunWith
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.lambda.LambdaContainerTest.TestClock
import org.locationtech.geomesa.lambda.data.LambdaDataStore
import org.locationtech.geomesa.lambda.data.LambdaDataStore.LambdaConfig
import org.locationtech.geomesa.lambda.{InMemoryOffsetManager, LambdaContainerTest}
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
Expand All @@ -41,7 +42,7 @@ class KafkaStoreTest extends LambdaContainerTest {
def newNamespace(): String = s"ks-test-${namespaces.getAndIncrement()}"

def createTopic(ns: String, sft: SimpleFeatureType): Unit = {
val topic = KafkaStore.topic(ns, sft)
val topic = LambdaDataStore.topic(sft, ns)
val props = new Properties()
props.put("bootstrap.servers", brokers)
WithClose(AdminClient.create(props)) { admin =>
Expand Down

0 comments on commit a667fa3

Please sign in to comment.