From c7e71b71f575f983c7133d81db3c66266a62aaf4 Mon Sep 17 00:00:00 2001 From: Emilio Date: Thu, 7 Nov 2024 09:40:53 -0500 Subject: [PATCH] GEOMESA-3412 Accumulo - Option for disabling table block cache (#3229) --- docs/user/accumulo/index_config.rst | 18 +++++++++-- .../accumulo/data/AccumuloIndexAdapter.scala | 30 +++++++++++++++---- .../accumulo/data/AccumuloDataStoreTest.scala | 28 +++++++++++++++++ .../index/geotools/GeoMesaDataStore.scala | 11 +++---- .../utils/geotools/SimpleFeatureTypes.scala | 9 ++++++ 5 files changed, 81 insertions(+), 15 deletions(-) diff --git a/docs/user/accumulo/index_config.rst b/docs/user/accumulo/index_config.rst index 57eb480dc2fe..5171acf1c13d 100644 --- a/docs/user/accumulo/index_config.rst +++ b/docs/user/accumulo/index_config.rst @@ -2,7 +2,7 @@ Accumulo Index Configuration ============================ GeoMesa exposes a variety of configuration options that can be used to customize and optimize a given installation. -The Accumulo data store supports most of the general options described under :ref:`index_config`. +In addition to the ones here, the Accumulo data store supports most of the general options described under :ref:`index_config`. .. _accumulo_attribute_indices: @@ -87,7 +87,7 @@ Logical Timestamps By default, GeoMesa index tables are created using Accumulo's logical time. This ensures that updates to a given simple feature will be ordered correctly, however it obscures the actual insert time for the underlying data -row. For advanced use cases, standard system time can be used instead of logical time. To disble logical +row. For advanced use cases, standard system time can be used instead of logical time. To disable logical time, add the following user data hint to the simple feature type before calling ``createSchema``: .. code-block:: java @@ -96,6 +96,20 @@ time, add the following user data hint to the simple feature type before calling String spec = "name:String,dtg:Date,*geom:Point:srid=4326;geomesa.logical.time='false'"; SimpleFeatureType sft = SimpleFeatureTypes.createType("mySft", spec); +Table Block Cache +----------------- + +By default, GeoMesa will enable the Accumulo `data block cache `__ +on all index tables. To customize this, add the key ``table.cache.enabled`` to the ``SimpleFeatureType`` user data containing a +comma-separated list of indices for which the block cache will be enabled. An empty string will disable the block cache on +all tables. Indices can be indicated by name (e.g. ``z3``), by name and attributes (e.g. ``z3:geom:dtg``), or by the full index +id (which includes the index version, e.g. ``z3:7:geom:dtg``). Set the user data before calling ``createSchema``: + +.. code-block:: java + + sft.getUserData().put("table.cache.enabled", "z3,attr"); + datastore.createSchema(sft); + .. _index_upgrades: Upgrading Existing Indices diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala index 1e47d6bdd048..3bb36cf1ce39 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala @@ -8,7 +8,7 @@ package org.locationtech.geomesa.accumulo.data -import org.apache.accumulo.core.conf.Property +import com.typesafe.scalalogging.LazyLogging import org.apache.accumulo.core.data.{Key, Range, Value} import org.apache.accumulo.core.file.keyfunctor.RowFunctor import org.apache.hadoop.io.Text @@ -37,6 +37,7 @@ import org.locationtech.geomesa.index.iterators.StatsScan import org.locationtech.geomesa.index.planning.LocalQueryRunner.LocalTransformReducer import org.locationtech.geomesa.security.SecurityUtils import org.locationtech.geomesa.utils.concurrent.CachedThreadPool +import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.{Configs, InternalConfigs} import org.locationtech.geomesa.utils.index.VisibilityLevel import org.locationtech.geomesa.utils.io.WithClose @@ -48,7 +49,8 @@ import java.util.Map.Entry * * @param ds data store */ -class AccumuloIndexAdapter(ds: AccumuloDataStore) extends TableManager(ds.connector) with IndexAdapter[AccumuloDataStore] { +class AccumuloIndexAdapter(ds: AccumuloDataStore) + extends TableManager(ds.connector) with IndexAdapter[AccumuloDataStore] with LazyLogging { import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType @@ -95,15 +97,31 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore) extends TableManager(ds.connec } if (created) { + import org.apache.accumulo.core.conf.Property.{TABLE_BLOCKCACHE_ENABLED, TABLE_BLOOM_ENABLED, TABLE_BLOOM_KEY_FUNCTOR} + addSplitsAndGroups() - // enable block cache - tableOps.setProperty(table, Property.TABLE_BLOCKCACHE_ENABLED.getKey, "true") + // block cache config + val enableBlockCache = { + val key = if (index.sft.isPartitioned) { InternalConfigs.PartitionTableCache } else { Configs.TableCacheEnabled } + val config = index.sft.getUserData.get(key).asInstanceOf[String] + if (config == null) { true } else { + val enabled = config.split(',').exists { hint => + hint.equalsIgnoreCase(index.name) || hint.equalsIgnoreCase((Seq(index.name) ++ index.attributes).mkString(":")) || + hint.equalsIgnoreCase(index.identifier) + } + logger.debug(s"Setting ${TABLE_BLOCKCACHE_ENABLED.getKey}=$enabled for index ${index.identifier} based on user config: $config") + enabled + } + } + if (enableBlockCache) { + tableOps.setProperty(table, TABLE_BLOCKCACHE_ENABLED.getKey, "true") + } if (index.name == IdIndex.name) { // enable the row functor as the feature ID is stored in the Row ID - tableOps.setProperty(table, Property.TABLE_BLOOM_KEY_FUNCTOR.getKey, classOf[RowFunctor].getCanonicalName) - tableOps.setProperty(table, Property.TABLE_BLOOM_ENABLED.getKey, "true") + tableOps.setProperty(table, TABLE_BLOOM_KEY_FUNCTOR.getKey, classOf[RowFunctor].getCanonicalName) + tableOps.setProperty(table, TABLE_BLOOM_ENABLED.getKey, "true") } if (index.sft.isVisibilityRequired) { diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreTest.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreTest.scala index 51c51e8c5879..22beb62252fb 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreTest.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreTest.scala @@ -8,6 +8,7 @@ package org.locationtech.geomesa.accumulo.data +import org.apache.accumulo.core.conf.Property import org.apache.accumulo.core.security.Authorizations import org.apache.commons.codec.binary.Hex import org.apache.hadoop.io.Text @@ -24,6 +25,7 @@ import org.locationtech.geomesa.accumulo.TestWithMultipleSfts import org.locationtech.geomesa.accumulo.index._ import org.locationtech.geomesa.accumulo.iterators.Z2Iterator import org.locationtech.geomesa.features.ScalaSimpleFeature +import org.locationtech.geomesa.index.index.NamedIndex import org.locationtech.geomesa.index.index.attribute.AttributeIndex import org.locationtech.geomesa.index.index.id.IdIndex import org.locationtech.geomesa.index.index.z2.Z2Index @@ -578,5 +580,31 @@ class AccumuloDataStoreTest extends Specification with TestWithMultipleSfts { exists must beTrue ds.getSchema("test") must not(beNull) } + + "create tables with block cache enabled/disabled" >> { + foreach(Seq(",geomesa.table.partition=time", "")) { partitioned => + val sft = createNewSchema(s"name:String:index=true,dtg:Date,*geom:Point:srid=4326;table.cache.enabled='z3,attr'$partitioned") + addFeatures((0 until 6).map { i => + val sf = new ScalaSimpleFeature(sft, i.toString) + sf.setAttributes(Array[AnyRef](i.toString, s"2012-01-02T05:0$i:07.000Z", s"POINT(45.0 4$i.0)")) + sf + }) + val indices = ds.manager.indices(sft) + def getBlockCacheConfig(i: NamedIndex): String = { + val index = indices.find(_.name == i.name).orNull + index must not(beNull) + val tables = index.getTableNames() + tables must haveLength(1) + ds.connector.tableOperations().getProperties(tables.head).asScala + .find(_.getKey == Property.TABLE_BLOCKCACHE_ENABLED.getKey).map(_.getValue).orNull + } + foreach(Seq(Z3Index, AttributeIndex)) { i => + getBlockCacheConfig(i) mustEqual "true" + } + foreach(Seq(Z2Index, IdIndex)) { i => + getBlockCacheConfig(i) mustEqual "false" + } + } + } } } diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/GeoMesaDataStore.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/GeoMesaDataStore.scala index 17eca65fe7e0..40812c4af0eb 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/GeoMesaDataStore.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/GeoMesaDataStore.scala @@ -115,9 +115,6 @@ abstract class GeoMesaDataStore[DS <: GeoMesaDataStore[DS]](val config: GeoMesaD @throws(classOf[IllegalArgumentException]) override protected def preSchemaCreate(sft: SimpleFeatureType): Unit = { - import Configs.{TableSplitterClass, TableSplitterOpts} - import InternalConfigs.{PartitionSplitterClass, PartitionSplitterOpts} - // check for old enabled indices and re-map them // noinspection ScalaDeprecation SimpleFeatureTypes.Configs.ENABLED_INDEX_OPTS.drop(1).find(sft.getUserData.containsKey).foreach { key => @@ -154,9 +151,9 @@ abstract class GeoMesaDataStore[DS <: GeoMesaDataStore[DS]](val config: GeoMesaD sft.getAttributeDescriptors.asScala.foreach(_.getUserData.remove(AttributeOptions.OptIndex)) // for partitioned schemas, persist the table partitioning keys - if (TablePartition.partitioned(sft)) { - Seq((TableSplitterClass, PartitionSplitterClass), (TableSplitterOpts, PartitionSplitterOpts)).foreach { - case (from, to) => Option(sft.getUserData.get(from)).foreach(sft.getUserData.put(to, _)) + if (sft.isPartitioned) { + InternalConfigs.PartitionConfigMappings.foreach { case (from, to) => + Option(sft.getUserData.get(from)).foreach(sft.getUserData.put(to, _)) } } @@ -182,7 +179,7 @@ abstract class GeoMesaDataStore[DS <: GeoMesaDataStore[DS]](val config: GeoMesaD // create the index tables (if not using partitioned tables) override protected def onSchemaCreated(sft: SimpleFeatureType): Unit = { val indices = manager.indices(sft) - if (TablePartition.partitioned(sft)) { + if (sft.isPartitioned) { logger.debug(s"Delaying creation of partitioned indices ${indices.map(_.identifier).mkString(", ")}") } else { logger.debug(s"Creating indices ${indices.map(_.identifier).mkString(", ")}") diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypes.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypes.scala index 88f80ced2e20..7d0887824945 100644 --- a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypes.scala +++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypes.scala @@ -59,6 +59,7 @@ object SimpleFeatureTypes { val TableSharing = "geomesa.table.sharing" val TableSplitterClass = "table.splitter.class" val TableSplitterOpts = "table.splitter.options" + val TableCacheEnabled = "table.cache.enabled" val TemporalPriority = "geomesa.temporal.priority" val UpdateBackupMetadata = "schema.update.backup.metadata" val UpdateRenameTables = "schema.update.rename.tables" @@ -80,7 +81,15 @@ object SimpleFeatureTypes { val PartitionSplitterClass = "geomesa.splitter.class" val PartitionSplitterOpts = "geomesa.splitter.opts" val RemoteVersion = "gm.remote.version" // note: doesn't start with geomesa so we don't persist it + val PartitionTableCache = "geomesa.table.cache" val KeywordsDelimiter = "\u0000" + + // configs that are not normally persisted, but that we want to persist for creating partitioned tables down the line + val PartitionConfigMappings: Map[String, String] = Map( + Configs.TableCacheEnabled -> PartitionTableCache, + Configs.TableSplitterClass -> PartitionSplitterClass, + Configs.TableSplitterOpts -> PartitionSplitterOpts, + ) } object AttributeOptions {