From 7f4948cb4cf007956a42bd41657e782e2ab42dc7 Mon Sep 17 00:00:00 2001 From: Emilio Date: Tue, 12 Nov 2024 11:22:02 -0500 Subject: [PATCH] GEOMESA-3415 Allow for custom index table prefixes (#3233) * `index.table.prefix` user data specifies the prefix to use --- docs/user/datastores/index_config.rst | 20 ++++++++++ .../accumulo/data/AccumuloDataStore.scala | 40 ++++++++++--------- .../accumulo/data/AccumuloDataStoreTest.scala | 27 ++++++++++--- .../index/api/GeoMesaFeatureIndex.scala | 4 +- .../index/geotools/GeoMesaDataStore.scala | 15 +++++-- .../geomesa/utils/geotools/Conversions.scala | 7 +++- .../utils/geotools/SimpleFeatureTypes.scala | 11 +++++ 7 files changed, 95 insertions(+), 29 deletions(-) diff --git a/docs/user/datastores/index_config.rst b/docs/user/datastores/index_config.rst index 3b00ad9309a2..901254faa15a 100644 --- a/docs/user/datastores/index_config.rst +++ b/docs/user/datastores/index_config.rst @@ -173,6 +173,26 @@ you may instead call the ``indexes`` methods: .indices(List("id", "z3", "attr")) .build("mySft") +Configuring Index Table Names +----------------------------- + +The names used for index tables attempt to be unique, usually being composed of the catalog table name, the feature type name, +and the index identifier. In certain situations, it may be useful to modify the index table names. For example, in Accumulo +you may want to put index tables in different namespaces that have custom configurations. Table name prefixes can be set +using the user data key ``index.table.prefix``, or, to configure prefixes for a specific index type, ``index.table.prefix.`` +where ```` is an index name such as ``z3`` or ``id``: + +.. code-block:: java + + import org.locationtech.geomesa.utils.interop.SimpleFeatureTypes; + + String spec = "name:String,dtg:Date,*geom:Point:srid=4326"; + SimpleFeatureType sft = SimpleFeatureTypes.createType("mySft", spec); + // table names will look like geomesa.custom_mySft_id_v4 + sft.getUserData().put("index.table.prefix", "geomesa.custom"); + // override table names for just the z3 index + sft.getUserData().put("index.table.prefix.z3", "geomesa_z3.custom"); + Configuring Feature ID Encoding ------------------------------- diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStore.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStore.scala index 936c92cbf31f..f48c4e9f4bef 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStore.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStore.scala @@ -152,26 +152,33 @@ class AccumuloDataStore(val connector: AccumuloClient, override val config: Accu override protected def preSchemaCreate(sft: SimpleFeatureType): Unit = { import org.locationtech.geomesa.index.conf.SchemaProperties.ValidateDistributedClasspath - // validate that the accumulo runtime is available - val namespace = config.catalog.indexOf('.') match { + // call super first so that user data keys are updated + super.preSchemaCreate(sft) + + def getNamespace(prefix: String): String = prefix.indexOf('.') match { case -1 => "" - case i => config.catalog.substring(0, i) + case i => prefix.substring(0, i) } - if (namespace.nonEmpty) { - adapter.ensureNamespaceExists(namespace) - } - val canLoad = connector.namespaceOperations().testClassLoad(namespace, - classOf[ProjectVersionIterator].getName, classOf[SortedKeyValueIterator[_, _]].getName) - - if (!canLoad) { - val msg = s"Could not load GeoMesa distributed code from the Accumulo classpath for table '${config.catalog}'" - logger.error(msg) - if (ValidateDistributedClasspath.toBoolean.contains(true)) { - val nsMsg = if (namespace.isEmpty) { "" } else { s" for the namespace '$namespace'" } - throw new RuntimeException(s"$msg. You may override this check by setting the system property " + + + val prefixes = Seq(config.catalog) ++ sft.getIndices.flatMap(i => sft.getTablePrefix(i.name)) + prefixes.map(getNamespace).distinct.foreach { namespace => + if (namespace.nonEmpty) { + adapter.ensureNamespaceExists(namespace) + } + // validate that the accumulo runtime is available + val canLoad = connector.namespaceOperations().testClassLoad(namespace, + classOf[ProjectVersionIterator].getName, classOf[SortedKeyValueIterator[_, _]].getName) + + if (!canLoad) { + val msg = s"Could not load GeoMesa distributed code from the Accumulo classpath" + logger.error(s"$msg for catalog ${config.catalog}") + if (ValidateDistributedClasspath.toBoolean.contains(true)) { + val nsMsg = if (namespace.isEmpty) { "" } else { s" for the namespace '$namespace'" } + throw new RuntimeException(s"$msg. You may override this check by setting the system property " + s"'${ValidateDistributedClasspath.property}=false'. Otherwise, please verify that the appropriate " + s"JARs are installed$nsMsg - see http://www.geomesa.org/documentation/user/accumulo/install.html" + "#installing-the-accumulo-distributed-runtime-library") + } } } @@ -179,9 +186,6 @@ class AccumuloDataStore(val connector: AccumuloClient, override val config: Accu throw new IllegalArgumentException("Attribute level visibility only supports up to 255 attributes") } - super.preSchemaCreate(sft) - - // note: dtg should be set appropriately before calling this method sft.getDtgField.foreach { dtg => if (sft.getIndices.exists(i => i.name == JoinIndex.name && i.attributes.headOption.contains(dtg))) { if (!GeoMesaSchemaValidator.declared(sft, OverrideDtgJoin)) { diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreTest.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreTest.scala index 22beb62252fb..1ab09d6940c2 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreTest.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreTest.scala @@ -521,7 +521,7 @@ class AccumuloDataStoreTest extends Specification with TestWithMultipleSfts { } } - "delete all associated tables" >> { + "delete all associated tables" in { val catalog = "AccumuloDataStoreDeleteAllTablesTest" // note the table needs to be different to prevent testing errors val ds = DataStoreFinder.getDataStore((dsParams ++ Map(AccumuloDataStoreParams.CatalogParam.key -> catalog)).asJava).asInstanceOf[AccumuloDataStore] @@ -534,7 +534,7 @@ class AccumuloDataStoreTest extends Specification with TestWithMultipleSfts { ds.connector.tableOperations().list().asScala.toSeq must not(containAnyOf(tables)) } - "query on bbox and unbounded temporal" >> { + "query on bbox and unbounded temporal" in { val sft = createNewSchema("name:String,dtg:Date,*geom:Point:srid=4326") addFeatures((0 until 6).map { i => @@ -554,7 +554,7 @@ class AccumuloDataStoreTest extends Specification with TestWithMultipleSfts { read.map(_.getID) must containAllOf(Seq("2", "3", "4")) } - "create tables with an accumulo namespace" >> { + "create tables with an accumulo namespace" in { val table = "test.AccumuloDataStoreNamespaceTest" val params = dsParams ++ Map(AccumuloDataStoreParams.CatalogParam.key -> table) val dsWithNs = DataStoreFinder.getDataStore(params.asJava).asInstanceOf[AccumuloDataStore] @@ -563,7 +563,7 @@ class AccumuloDataStoreTest extends Specification with TestWithMultipleSfts { dsWithNs.connector.namespaceOperations().exists("test") must beTrue } - "only create catalog table when necessary" >> { + "only create catalog table when necessary" in { val table = "AccumuloDataStoreTableTest" val params = dsParams ++ Map(AccumuloDataStoreParams.CatalogParam.key -> table) val ds = DataStoreFinder.getDataStore(params.asJava).asInstanceOf[AccumuloDataStore] @@ -581,7 +581,7 @@ class AccumuloDataStoreTest extends Specification with TestWithMultipleSfts { ds.getSchema("test") must not(beNull) } - "create tables with block cache enabled/disabled" >> { + "create tables with block cache enabled/disabled" in { foreach(Seq(",geomesa.table.partition=time", "")) { partitioned => val sft = createNewSchema(s"name:String:index=true,dtg:Date,*geom:Point:srid=4326;table.cache.enabled='z3,attr'$partitioned") addFeatures((0 until 6).map { i => @@ -606,5 +606,22 @@ class AccumuloDataStoreTest extends Specification with TestWithMultipleSfts { } } } + + "create index tables with a different prefix than the catalog table" in { + val prefix = s"custom.${catalog.replaceFirst(".*\\.", "")}" + foreach(Seq(",geomesa.table.partition=time", "")) { partitioned => + val userData = s"index.table.prefix='$prefix',index.table.prefix.z3='z3$prefix'$partitioned" + val sft = createNewSchema(s"name:String:index=true,dtg:Date,*geom:Point:srid=4326;$userData") + addFeatures((0 until 6).map { i => + val sf = new ScalaSimpleFeature(sft, i.toString) + sf.setAttributes(Array[AnyRef](i.toString, s"2012-01-02T05:0$i:07.000Z", s"POINT(45.0 4$i.0)")) + sf + }) + foreach(ds.manager.indices(sft)) { index => + val p = if (index.name == Z3Index.name) { s"z3$prefix" } else { prefix } + foreach(index.getTableNames())(_ must startWith(s"${p}_${sft.getTypeName}_")) + } + } + } } } diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/api/GeoMesaFeatureIndex.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/api/GeoMesaFeatureIndex.scala index fa4deda931d1..0c50a128c296 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/api/GeoMesaFeatureIndex.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/api/GeoMesaFeatureIndex.scala @@ -366,8 +366,10 @@ abstract class GeoMesaFeatureIndex[T, U](val ds: GeoMesaDataStore[_], */ protected def generateTableName(partition: Option[String] = None, limit: Option[Int] = None): String = { import StringSerialization.alphaNumericSafeString + import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType - val prefix = (ds.config.catalog +: Seq(sft.getTypeName, name).map(alphaNumericSafeString)).mkString("_") + val namespace = sft.getTablePrefix(name).getOrElse(ds.config.catalog) + val prefix = (namespace +: Seq(sft.getTypeName, name).map(alphaNumericSafeString)).mkString("_") val suffix = s"v$version${partition.map(p => s"_$p").getOrElse("")}" def build(attrs: Seq[String]): String = (prefix +: attrs :+ suffix).mkString("_") diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/GeoMesaDataStore.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/GeoMesaDataStore.scala index 40812c4af0eb..9b52a67a8769 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/GeoMesaDataStore.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/GeoMesaDataStore.scala @@ -115,10 +115,12 @@ abstract class GeoMesaDataStore[DS <: GeoMesaDataStore[DS]](val config: GeoMesaD @throws(classOf[IllegalArgumentException]) override protected def preSchemaCreate(sft: SimpleFeatureType): Unit = { - // check for old enabled indices and re-map them - // noinspection ScalaDeprecation - SimpleFeatureTypes.Configs.ENABLED_INDEX_OPTS.drop(1).find(sft.getUserData.containsKey).foreach { key => - sft.getUserData.put(SimpleFeatureTypes.Configs.EnabledIndices, sft.getUserData.remove(key)) + // check for old user data keys and re-map them + InternalConfigs.DeprecatedConfigMappings.foreach { case (from, to) => + val v = sft.getUserData.remove(from) + if (v != null) { + sft.getUserData.put(to, v) + } } // validate column groups @@ -155,6 +157,11 @@ abstract class GeoMesaDataStore[DS <: GeoMesaDataStore[DS]](val config: GeoMesaD InternalConfigs.PartitionConfigMappings.foreach { case (from, to) => Option(sft.getUserData.get(from)).foreach(sft.getUserData.put(to, _)) } + InternalConfigs.PartitionConfigPrefixMappings.foreach { case (from, to) => + sft.getUserData.asScala.toMap.collect { + case (k: String, v) if k.startsWith(from) => sft.getUserData.put(to + k.substring(from.length), v) + } + } } // set stats enabled based on the data store config if not explicitly set diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/Conversions.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/Conversions.scala index 34a81ce8dddc..a6058cdb7805 100644 --- a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/Conversions.scala +++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/Conversions.scala @@ -16,7 +16,7 @@ import org.locationtech.geomesa.curve.TimePeriod.TimePeriod import org.locationtech.geomesa.curve.{TimePeriod, XZSFC} import org.locationtech.geomesa.utils.conf.{FeatureExpiration, IndexId, SemanticVersion} import org.locationtech.geomesa.utils.geometry.GeometryPrecision -import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.Configs +import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.{Configs, InternalConfigs} import org.locationtech.geomesa.utils.index.VisibilityLevel import org.locationtech.geomesa.utils.index.VisibilityLevel.VisibilityLevel import org.locationtech.geomesa.utils.stats.Cardinality @@ -333,6 +333,11 @@ object RichSimpleFeatureType extends Conversions { def isPartitioned: Boolean = sft.getUserData.containsKey(Configs.TablePartitioning) + def getTablePrefix(indexName: String): Option[String] = { + val key = if (isPartitioned) { InternalConfigs.PartitionTablePrefix } else { Configs.IndexTablePrefix } + userData[String](s"$key.$indexName").orElse(userData[String](key)) + } + def getRemoteVersion: Option[SemanticVersion] = userData[String](RemoteVersion).map(SemanticVersion.apply) def getKeywords: Set[String] = diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypes.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypes.scala index 7d0887824945..40b2f13d0330 100644 --- a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypes.scala +++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypes.scala @@ -42,6 +42,7 @@ object SimpleFeatureTypes { val IndexAttributeShards = "geomesa.attr.splits" val IndexIdShards = "geomesa.id.splits" val IndexIgnoreDtg = "geomesa.ignore.dtg" + val IndexTablePrefix = "index.table.prefix" val IndexVisibilityLevel = "geomesa.visibility.level" val IndexXzPrecision = "geomesa.xz.precision" val IndexZ3Interval = "geomesa.z3.interval" @@ -80,6 +81,7 @@ object SimpleFeatureTypes { val IndexVersions = "geomesa.indices" val PartitionSplitterClass = "geomesa.splitter.class" val PartitionSplitterOpts = "geomesa.splitter.opts" + val PartitionTablePrefix = "geomesa.table.prefix" val RemoteVersion = "gm.remote.version" // note: doesn't start with geomesa so we don't persist it val PartitionTableCache = "geomesa.table.cache" val KeywordsDelimiter = "\u0000" @@ -90,6 +92,15 @@ object SimpleFeatureTypes { Configs.TableSplitterClass -> PartitionSplitterClass, Configs.TableSplitterOpts -> PartitionSplitterOpts, ) + val PartitionConfigPrefixMappings: Map[String, String] = Map( + Configs.IndexTablePrefix -> PartitionTablePrefix, + ) + + // deprecated configs that we want to re-map for back-compatibility + val DeprecatedConfigMappings: Map[String, String] = Map( + "geomesa.indexes.enabled" -> Configs.EnabledIndices, + "table.indexes.enabled" -> Configs.EnabledIndices, + ) } object AttributeOptions {