Skip to content

Commit

Permalink
GEOMESA-3373 Add distributed lock around Accumulo table creation (#3132)
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz authored Jul 8, 2024
1 parent bc651be commit 638b631
Show file tree
Hide file tree
Showing 34 changed files with 427 additions and 122 deletions.
25 changes: 25 additions & 0 deletions docs/user/accumulo/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,31 @@ Accumulo Configuration
This section details Accumulo specific configuration properties. For general properties,
see :ref:`geomesa_site_xml`.

General Properties
------------------

geomesa.accumulo.table.cache.expiry
+++++++++++++++++++++++++++++++++++

The expiry to cache the existence of tables, defined as a duration, e.g. ``60 seconds`` or ``100 millis``. To avoid frequent
checks for the existence of tables before writing, tables checks are cached. If tables are deleted without stopping any ingest,
they will not be re-created until the cache expires.

Default is ``10 minutes``.

geomesa.accumulo.table.sync
+++++++++++++++++++++++++++

Sets the level of synchronization when creating and deleting tables. When using tables backed by S3, synchronization
may prevent table corruption errors in Accumulo. Possible values are:

* ``zookeeper`` (default) - uses a distributed lock that works across JVMs.
* ``local`` - uses an in-memory lock that works within a single JVM.
* ``none`` - does not use any external locking. Generally this is safe when using tables backed by HDFS.

The synchronization level may be adjusted depending on the architecture being used - for example, if tables are created
by a single-thread, then a system may safely disable synchronization.

Batch Writer Properties
-----------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package org.locationtech.geomesa.accumulo.audit
import com.typesafe.scalalogging.LazyLogging
import org.apache.accumulo.core.client.{AccumuloClient, BatchWriter}
import org.apache.accumulo.core.data.Mutation
import org.locationtech.geomesa.accumulo.util.{GeoMesaBatchWriterConfig, TableUtils}
import org.locationtech.geomesa.accumulo.util.{GeoMesaBatchWriterConfig, TableManager}
import org.locationtech.geomesa.utils.audit.AuditedEvent
import org.locationtech.geomesa.utils.concurrent.ExitingExecutor
import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty
Expand Down Expand Up @@ -69,7 +69,7 @@ class AccumuloEventWriter(connector: AccumuloClient, table: String) extends Runn

private def getWriter: BatchWriter = synchronized {
if (maybeWriter == null) {
TableUtils.createTableIfNeeded(connector, table)
new TableManager(connector).ensureTableExists(table)
maybeWriter = connector.createBatchWriter(table, batchWriterConfig)
}
maybeWriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.apache.accumulo.core.client.{AccumuloClient, BatchWriter}
import org.apache.accumulo.core.data.{Mutation, Range, Value}
import org.apache.accumulo.core.security.Authorizations
import org.apache.hadoop.io.Text
import org.locationtech.geomesa.accumulo.util.{GeoMesaBatchWriterConfig, TableUtils}
import org.locationtech.geomesa.accumulo.util.{GeoMesaBatchWriterConfig, TableManager}
import org.locationtech.geomesa.index.metadata.{GeoMesaMetadata, KeyValueStoreMetadata, MetadataSerializer}
import org.locationtech.geomesa.utils.collection.CloseableIterator
import org.locationtech.geomesa.utils.io.{CloseQuietly, CloseWithLogging}
Expand All @@ -32,7 +32,7 @@ class AccumuloBackedMetadata[T](val connector: AccumuloClient, val table: String

override protected def checkIfTableExists: Boolean = connector.tableOperations().exists(table)

override protected def createTable(): Unit = TableUtils.createTableIfNeeded(connector, table)
override protected def createTable(): Unit = new TableManager(connector).ensureTableExists(table)

override protected def createEmptyBackup(timestamp: String): AccumuloBackedMetadata[T] =
new AccumuloBackedMetadata(connector, s"${table}_${timestamp}_bak", serializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.locationtech.geomesa.accumulo.data.AccumuloDataStoreFactory.AccumuloD
import org.locationtech.geomesa.accumulo.data.stats._
import org.locationtech.geomesa.accumulo.index._
import org.locationtech.geomesa.accumulo.iterators.{AgeOffIterator, DtgAgeOffIterator, ProjectVersionIterator, VisibilityIterator}
import org.locationtech.geomesa.accumulo.util.TableUtils
import org.locationtech.geomesa.index.api.GeoMesaFeatureIndex
import org.locationtech.geomesa.index.geotools.GeoMesaDataStore
import org.locationtech.geomesa.index.index.attribute.AttributeIndex
Expand Down Expand Up @@ -163,7 +162,9 @@ class AccumuloDataStore(val connector: AccumuloClient, override val config: Accu
case -1 => ""
case i => config.catalog.substring(0, i)
}
TableUtils.createNamespaceIfNeeded(connector, namespace)
if (namespace.nonEmpty) {
adapter.ensureNamespaceExists(namespace)
}
val canLoad = connector.namespaceOperations().testClassLoad(namespace,
classOf[ProjectVersionIterator].getName, classOf[SortedKeyValueIterator[_, _]].getName)

Expand Down Expand Up @@ -204,6 +205,7 @@ class AccumuloDataStore(val connector: AccumuloClient, override val config: Accu
super.onSchemaCreated(sft)
if (sft.statsEnabled) {
// configure the stats combining iterator on the table for this sft
adapter.ensureTableExists(stats.metadata.table)
stats.configureStatCombiner(connector, sft)
}
sft.getFeatureExpiration.foreach {
Expand Down Expand Up @@ -274,6 +276,7 @@ class AccumuloDataStore(val connector: AccumuloClient, override val config: Accu
stats.removeStatCombiner(connector, previous)
}
if (sft.statsEnabled) {
adapter.ensureTableExists(stats.metadata.table)
stats.configureStatCombiner(connector, sft)
}

Expand Down Expand Up @@ -330,7 +333,7 @@ class AccumuloDataStore(val connector: AccumuloClient, override val config: Accu
new SingleRowAccumuloMetadata[Stat](stats.metadata).migrate(typeName)
}
} finally {
lock.release()
lock.close()
}
sft = super.getSchema(typeName)
}
Expand Down Expand Up @@ -376,11 +379,12 @@ class AccumuloDataStore(val connector: AccumuloClient, override val config: Accu
val lock = acquireCatalogLock()
try {
if (!metadata.read(typeName, configuredKey, cache = false).contains("true")) {
adapter.ensureTableExists(stats.metadata.table)
stats.configureStatCombiner(connector, sft)
metadata.insert(typeName, configuredKey, "true")
}
} finally {
lock.release()
lock.close()
}
}
// kick off asynchronous stats run for the existing data - this will set the stat date
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ import org.apache.accumulo.core.data.{Key, Range, Value}
import org.apache.accumulo.core.file.keyfunctor.RowFunctor
import org.apache.hadoop.io.Text
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.locationtech.geomesa.accumulo.data.AccumuloIndexAdapter.{AccumuloResultsToFeatures, ZIterPriority}
import org.locationtech.geomesa.accumulo.data.AccumuloIndexAdapter._
import org.locationtech.geomesa.accumulo.data.AccumuloQueryPlan.{BatchScanPlan, EmptyPlan}
import org.locationtech.geomesa.accumulo.data.writer.tx.AccumuloAtomicIndexWriter
import org.locationtech.geomesa.accumulo.data.writer.{AccumuloIndexWriter, ColumnFamilyMapper}
import org.locationtech.geomesa.accumulo.index.{AttributeJoinIndex, JoinIndex}
import org.locationtech.geomesa.accumulo.index.AttributeJoinIndex
import org.locationtech.geomesa.accumulo.iterators.ArrowIterator.AccumuloArrowResultsToFeatures
import org.locationtech.geomesa.accumulo.iterators.BinAggregatingIterator.AccumuloBinResultsToFeatures
import org.locationtech.geomesa.accumulo.iterators.DensityIterator.AccumuloDensityResultsToFeatures
import org.locationtech.geomesa.accumulo.iterators.StatsIterator.AccumuloStatsResultsToFeatures
import org.locationtech.geomesa.accumulo.iterators._
import org.locationtech.geomesa.accumulo.util.TableUtils
import org.locationtech.geomesa.accumulo.util.TableManager
import org.locationtech.geomesa.index.api.IndexAdapter.{IndexWriter, RequiredVisibilityWriter}
import org.locationtech.geomesa.index.api.QueryPlan.IndexResultsToFeatures
import org.locationtech.geomesa.index.api._
Expand All @@ -48,7 +48,7 @@ import java.util.Map.Entry
*
* @param ds data store
*/
class AccumuloIndexAdapter(ds: AccumuloDataStore) extends IndexAdapter[AccumuloDataStore] {
class AccumuloIndexAdapter(ds: AccumuloDataStore) extends TableManager(ds.connector) with IndexAdapter[AccumuloDataStore] {

import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType

Expand All @@ -63,7 +63,7 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore) extends IndexAdapter[AccumuloD
splits: => Seq[Array[Byte]]): Unit = {
val table = index.configureTableName(partition) // writes table name to metadata
// create table if it doesn't exist
val created = TableUtils.createTableIfNeeded(ds.connector, table, index.sft.isLogicalTime)
val created = ensureTableExists(table, index.sft.isLogicalTime)

def addSplitsAndGroups(): Unit = {
// create splits
Expand Down Expand Up @@ -115,20 +115,8 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore) extends IndexAdapter[AccumuloD
}
}

override def renameTable(from: String, to: String): Unit = {
if (tableOps.exists(from)) {
tableOps.rename(from, to)
}
}

override def deleteTables(tables: Seq[String]): Unit = {
def deleteOne(table: String): Unit = {
if (tableOps.exists(table)) {
tableOps.delete(table)
}
}
tables.toList.map(table => CachedThreadPool.submit(() => deleteOne(table))).foreach(_.get)
}
override def deleteTables(tables: Seq[String]): Unit =
tables.toList.map(table => CachedThreadPool.submit(() => deleteTable(table))).foreach(_.get)

override def clearTables(tables: Seq[String], prefix: Option[Array[Byte]]): Unit = {
val auths = ds.auths // get the auths once up front
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import org.apache.hadoop.io.Text
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.accumulo.combiners.StatsCombiner
import org.locationtech.geomesa.accumulo.data._
import org.locationtech.geomesa.accumulo.util.TableUtils
import org.locationtech.geomesa.index.stats.GeoMesaStats.{GeoMesaStatWriter, StatUpdater}
import org.locationtech.geomesa.index.stats.MetadataBackedStats.{StatsMetadataSerializer, WritableStat}
import org.locationtech.geomesa.index.stats._
Expand Down Expand Up @@ -77,7 +76,6 @@ class AccumuloGeoMesaStats(ds: AccumuloDataStore, val metadata: AccumuloBackedMe
def configureStatCombiner(connector: AccumuloClient, sft: SimpleFeatureType): Unit = {
import MetadataBackedStats._

TableUtils.createTableIfNeeded(connector, metadata.table)
StatsCombiner.configure(sft, connector, metadata.table, metadata.typeNameSeparator.toString)

val keys = Seq(CountKey, BoundsKeyPrefix, TopKKeyPrefix, FrequencyKeyPrefix, HistogramKeyPrefix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class StatRunner(ds: AccumuloDataStore, sft: SimpleFeatureType, lockTimeout: Opt
Instant.now(Clock.systemUTC()).plus(updateInterval, ChronoUnit.MINUTES)
}
} finally {
lock.release()
lock.close()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@

package org.locationtech.geomesa

import org.locationtech.geomesa.accumulo.util.TableManager
import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty

package object accumulo {

object AccumuloProperties {

object TableProperties {
val TableCreationSync: SystemProperty =
SystemProperty("geomesa.accumulo.table.sync", TableManager.TableSynchronization.ZooKeeper.toString)
val TableCacheExpiry: SystemProperty = SystemProperty("geomesa.accumulo.table.cache.expiry", "10 minutes")
}

object AccumuloMapperProperties {
val DESIRED_SPLITS_PER_TSERVER = SystemProperty("geomesa.mapreduce.splits.tserver.max")
val DESIRED_ABSOLUTE_SPLITS = SystemProperty("geomesa.mapreduce.splits.max")
Expand Down
Loading

0 comments on commit 638b631

Please sign in to comment.