Skip to content

Commit

Permalink
GEOMESA-3412 Accumulo - Option for disabling table block cache (#3229)
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz authored Nov 7, 2024
1 parent e88d258 commit c7e71b7
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 15 deletions.
18 changes: 16 additions & 2 deletions docs/user/accumulo/index_config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ Accumulo Index Configuration
============================

GeoMesa exposes a variety of configuration options that can be used to customize and optimize a given installation.
The Accumulo data store supports most of the general options described under :ref:`index_config`.
In addition to the ones here, the Accumulo data store supports most of the general options described under :ref:`index_config`.

.. _accumulo_attribute_indices:

Expand Down Expand Up @@ -87,7 +87,7 @@ Logical Timestamps

By default, GeoMesa index tables are created using Accumulo's logical time. This ensures that updates to a given
simple feature will be ordered correctly, however it obscures the actual insert time for the underlying data
row. For advanced use cases, standard system time can be used instead of logical time. To disble logical
row. For advanced use cases, standard system time can be used instead of logical time. To disable logical
time, add the following user data hint to the simple feature type before calling ``createSchema``:

.. code-block:: java
Expand All @@ -96,6 +96,20 @@ time, add the following user data hint to the simple feature type before calling
String spec = "name:String,dtg:Date,*geom:Point:srid=4326;geomesa.logical.time='false'";
SimpleFeatureType sft = SimpleFeatureTypes.createType("mySft", spec);
Table Block Cache
-----------------

By default, GeoMesa will enable the Accumulo `data block cache <https://accumulo.apache.org/docs/2.x/administration/caching>`__
on all index tables. To customize this, add the key ``table.cache.enabled`` to the ``SimpleFeatureType`` user data containing a
comma-separated list of indices for which the block cache will be enabled. An empty string will disable the block cache on
all tables. Indices can be indicated by name (e.g. ``z3``), by name and attributes (e.g. ``z3:geom:dtg``), or by the full index
id (which includes the index version, e.g. ``z3:7:geom:dtg``). Set the user data before calling ``createSchema``:

.. code-block:: java
sft.getUserData().put("table.cache.enabled", "z3,attr");
datastore.createSchema(sft);
.. _index_upgrades:

Upgrading Existing Indices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

package org.locationtech.geomesa.accumulo.data

import org.apache.accumulo.core.conf.Property
import com.typesafe.scalalogging.LazyLogging
import org.apache.accumulo.core.data.{Key, Range, Value}
import org.apache.accumulo.core.file.keyfunctor.RowFunctor
import org.apache.hadoop.io.Text
Expand Down Expand Up @@ -37,6 +37,7 @@ import org.locationtech.geomesa.index.iterators.StatsScan
import org.locationtech.geomesa.index.planning.LocalQueryRunner.LocalTransformReducer
import org.locationtech.geomesa.security.SecurityUtils
import org.locationtech.geomesa.utils.concurrent.CachedThreadPool
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.{Configs, InternalConfigs}
import org.locationtech.geomesa.utils.index.VisibilityLevel
import org.locationtech.geomesa.utils.io.WithClose

Expand All @@ -48,7 +49,8 @@ import java.util.Map.Entry
*
* @param ds data store
*/
class AccumuloIndexAdapter(ds: AccumuloDataStore) extends TableManager(ds.connector) with IndexAdapter[AccumuloDataStore] {
class AccumuloIndexAdapter(ds: AccumuloDataStore)
extends TableManager(ds.connector) with IndexAdapter[AccumuloDataStore] with LazyLogging {

import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType

Expand Down Expand Up @@ -95,15 +97,31 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore) extends TableManager(ds.connec
}

if (created) {
import org.apache.accumulo.core.conf.Property.{TABLE_BLOCKCACHE_ENABLED, TABLE_BLOOM_ENABLED, TABLE_BLOOM_KEY_FUNCTOR}

addSplitsAndGroups()

// enable block cache
tableOps.setProperty(table, Property.TABLE_BLOCKCACHE_ENABLED.getKey, "true")
// block cache config
val enableBlockCache = {
val key = if (index.sft.isPartitioned) { InternalConfigs.PartitionTableCache } else { Configs.TableCacheEnabled }
val config = index.sft.getUserData.get(key).asInstanceOf[String]
if (config == null) { true } else {
val enabled = config.split(',').exists { hint =>
hint.equalsIgnoreCase(index.name) || hint.equalsIgnoreCase((Seq(index.name) ++ index.attributes).mkString(":")) ||
hint.equalsIgnoreCase(index.identifier)
}
logger.debug(s"Setting ${TABLE_BLOCKCACHE_ENABLED.getKey}=$enabled for index ${index.identifier} based on user config: $config")
enabled
}
}
if (enableBlockCache) {
tableOps.setProperty(table, TABLE_BLOCKCACHE_ENABLED.getKey, "true")
}

if (index.name == IdIndex.name) {
// enable the row functor as the feature ID is stored in the Row ID
tableOps.setProperty(table, Property.TABLE_BLOOM_KEY_FUNCTOR.getKey, classOf[RowFunctor].getCanonicalName)
tableOps.setProperty(table, Property.TABLE_BLOOM_ENABLED.getKey, "true")
tableOps.setProperty(table, TABLE_BLOOM_KEY_FUNCTOR.getKey, classOf[RowFunctor].getCanonicalName)
tableOps.setProperty(table, TABLE_BLOOM_ENABLED.getKey, "true")
}

if (index.sft.isVisibilityRequired) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.locationtech.geomesa.accumulo.data

import org.apache.accumulo.core.conf.Property
import org.apache.accumulo.core.security.Authorizations
import org.apache.commons.codec.binary.Hex
import org.apache.hadoop.io.Text
Expand All @@ -24,6 +25,7 @@ import org.locationtech.geomesa.accumulo.TestWithMultipleSfts
import org.locationtech.geomesa.accumulo.index._
import org.locationtech.geomesa.accumulo.iterators.Z2Iterator
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.index.index.NamedIndex
import org.locationtech.geomesa.index.index.attribute.AttributeIndex
import org.locationtech.geomesa.index.index.id.IdIndex
import org.locationtech.geomesa.index.index.z2.Z2Index
Expand Down Expand Up @@ -578,5 +580,31 @@ class AccumuloDataStoreTest extends Specification with TestWithMultipleSfts {
exists must beTrue
ds.getSchema("test") must not(beNull)
}

"create tables with block cache enabled/disabled" >> {
foreach(Seq(",geomesa.table.partition=time", "")) { partitioned =>
val sft = createNewSchema(s"name:String:index=true,dtg:Date,*geom:Point:srid=4326;table.cache.enabled='z3,attr'$partitioned")
addFeatures((0 until 6).map { i =>
val sf = new ScalaSimpleFeature(sft, i.toString)
sf.setAttributes(Array[AnyRef](i.toString, s"2012-01-02T05:0$i:07.000Z", s"POINT(45.0 4$i.0)"))
sf
})
val indices = ds.manager.indices(sft)
def getBlockCacheConfig(i: NamedIndex): String = {
val index = indices.find(_.name == i.name).orNull
index must not(beNull)
val tables = index.getTableNames()
tables must haveLength(1)
ds.connector.tableOperations().getProperties(tables.head).asScala
.find(_.getKey == Property.TABLE_BLOCKCACHE_ENABLED.getKey).map(_.getValue).orNull
}
foreach(Seq(Z3Index, AttributeIndex)) { i =>
getBlockCacheConfig(i) mustEqual "true"
}
foreach(Seq(Z2Index, IdIndex)) { i =>
getBlockCacheConfig(i) mustEqual "false"
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ abstract class GeoMesaDataStore[DS <: GeoMesaDataStore[DS]](val config: GeoMesaD

@throws(classOf[IllegalArgumentException])
override protected def preSchemaCreate(sft: SimpleFeatureType): Unit = {
import Configs.{TableSplitterClass, TableSplitterOpts}
import InternalConfigs.{PartitionSplitterClass, PartitionSplitterOpts}

// check for old enabled indices and re-map them
// noinspection ScalaDeprecation
SimpleFeatureTypes.Configs.ENABLED_INDEX_OPTS.drop(1).find(sft.getUserData.containsKey).foreach { key =>
Expand Down Expand Up @@ -154,9 +151,9 @@ abstract class GeoMesaDataStore[DS <: GeoMesaDataStore[DS]](val config: GeoMesaD
sft.getAttributeDescriptors.asScala.foreach(_.getUserData.remove(AttributeOptions.OptIndex))

// for partitioned schemas, persist the table partitioning keys
if (TablePartition.partitioned(sft)) {
Seq((TableSplitterClass, PartitionSplitterClass), (TableSplitterOpts, PartitionSplitterOpts)).foreach {
case (from, to) => Option(sft.getUserData.get(from)).foreach(sft.getUserData.put(to, _))
if (sft.isPartitioned) {
InternalConfigs.PartitionConfigMappings.foreach { case (from, to) =>
Option(sft.getUserData.get(from)).foreach(sft.getUserData.put(to, _))
}
}

Expand All @@ -182,7 +179,7 @@ abstract class GeoMesaDataStore[DS <: GeoMesaDataStore[DS]](val config: GeoMesaD
// create the index tables (if not using partitioned tables)
override protected def onSchemaCreated(sft: SimpleFeatureType): Unit = {
val indices = manager.indices(sft)
if (TablePartition.partitioned(sft)) {
if (sft.isPartitioned) {
logger.debug(s"Delaying creation of partitioned indices ${indices.map(_.identifier).mkString(", ")}")
} else {
logger.debug(s"Creating indices ${indices.map(_.identifier).mkString(", ")}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ object SimpleFeatureTypes {
val TableSharing = "geomesa.table.sharing"
val TableSplitterClass = "table.splitter.class"
val TableSplitterOpts = "table.splitter.options"
val TableCacheEnabled = "table.cache.enabled"
val TemporalPriority = "geomesa.temporal.priority"
val UpdateBackupMetadata = "schema.update.backup.metadata"
val UpdateRenameTables = "schema.update.rename.tables"
Expand All @@ -80,7 +81,15 @@ object SimpleFeatureTypes {
val PartitionSplitterClass = "geomesa.splitter.class"
val PartitionSplitterOpts = "geomesa.splitter.opts"
val RemoteVersion = "gm.remote.version" // note: doesn't start with geomesa so we don't persist it
val PartitionTableCache = "geomesa.table.cache"
val KeywordsDelimiter = "\u0000"

// configs that are not normally persisted, but that we want to persist for creating partitioned tables down the line
val PartitionConfigMappings: Map[String, String] = Map(
Configs.TableCacheEnabled -> PartitionTableCache,
Configs.TableSplitterClass -> PartitionSplitterClass,
Configs.TableSplitterOpts -> PartitionSplitterOpts,
)
}

object AttributeOptions {
Expand Down

0 comments on commit c7e71b7

Please sign in to comment.