diff --git a/docs/user/accumulo/configuration.rst b/docs/user/accumulo/configuration.rst index aaa16c1fd73c..7a1d0b5a2e02 100644 --- a/docs/user/accumulo/configuration.rst +++ b/docs/user/accumulo/configuration.rst @@ -6,6 +6,31 @@ Accumulo Configuration This section details Accumulo specific configuration properties. For general properties, see :ref:`geomesa_site_xml`. +General Properties +------------------ + +geomesa.accumulo.table.cache.expiry ++++++++++++++++++++++++++++++++++++ + +The expiry to cache the existence of tables, defined as a duration, e.g. ``60 seconds`` or ``100 millis``. To avoid frequent +checks for the existence of tables before writing, tables checks are cached. If tables are deleted without stopping any ingest, +they will not be re-created until the cache expires. + +Default is ``10 minutes``. + +geomesa.accumulo.table.sync ++++++++++++++++++++++++++++ + +Sets the level of synchronization when creating and deleting tables. When using tables backed by S3, synchronization +may prevent table corruption errors in Accumulo. Possible values are: + +* ``zookeeper`` (default) - uses a distributed lock that works across JVMs. +* ``local`` - uses an in-memory lock that works within a single JVM. +* ``none`` - does not use any external locking. Generally this is safe when using tables backed by HDFS. + +The synchronization level may be adjusted depending on the architecture being used - for example, if tables are created +by a single-thread, then a system may safely disable synchronization. + Batch Writer Properties ----------------------- diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/audit/AccumuloEventWriter.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/audit/AccumuloEventWriter.scala index 45a593ee1429..c3d24e9a85f3 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/audit/AccumuloEventWriter.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/audit/AccumuloEventWriter.scala @@ -11,7 +11,7 @@ package org.locationtech.geomesa.accumulo.audit import com.typesafe.scalalogging.LazyLogging import org.apache.accumulo.core.client.{AccumuloClient, BatchWriter} import org.apache.accumulo.core.data.Mutation -import org.locationtech.geomesa.accumulo.util.{GeoMesaBatchWriterConfig, TableUtils} +import org.locationtech.geomesa.accumulo.util.{GeoMesaBatchWriterConfig, TableManager} import org.locationtech.geomesa.utils.audit.AuditedEvent import org.locationtech.geomesa.utils.concurrent.ExitingExecutor import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty @@ -69,7 +69,7 @@ class AccumuloEventWriter(connector: AccumuloClient, table: String) extends Runn private def getWriter: BatchWriter = synchronized { if (maybeWriter == null) { - TableUtils.createTableIfNeeded(connector, table) + new TableManager(connector).ensureTableExists(table) maybeWriter = connector.createBatchWriter(table, batchWriterConfig) } maybeWriter diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloBackedMetadata.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloBackedMetadata.scala index f9836b8c2ec3..c5ee5aa4e583 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloBackedMetadata.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloBackedMetadata.scala @@ -12,7 +12,7 @@ import org.apache.accumulo.core.client.{AccumuloClient, BatchWriter} import org.apache.accumulo.core.data.{Mutation, Range, Value} import org.apache.accumulo.core.security.Authorizations import org.apache.hadoop.io.Text -import org.locationtech.geomesa.accumulo.util.{GeoMesaBatchWriterConfig, TableUtils} +import org.locationtech.geomesa.accumulo.util.{GeoMesaBatchWriterConfig, TableManager} import org.locationtech.geomesa.index.metadata.{GeoMesaMetadata, KeyValueStoreMetadata, MetadataSerializer} import org.locationtech.geomesa.utils.collection.CloseableIterator import org.locationtech.geomesa.utils.io.{CloseQuietly, CloseWithLogging} @@ -32,7 +32,7 @@ class AccumuloBackedMetadata[T](val connector: AccumuloClient, val table: String override protected def checkIfTableExists: Boolean = connector.tableOperations().exists(table) - override protected def createTable(): Unit = TableUtils.createTableIfNeeded(connector, table) + override protected def createTable(): Unit = new TableManager(connector).ensureTableExists(table) override protected def createEmptyBackup(timestamp: String): AccumuloBackedMetadata[T] = new AccumuloBackedMetadata(connector, s"${table}_${timestamp}_bak", serializer) diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStore.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStore.scala index d5abaf54ed90..afecae0070c2 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStore.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStore.scala @@ -23,7 +23,6 @@ import org.locationtech.geomesa.accumulo.data.AccumuloDataStoreFactory.AccumuloD import org.locationtech.geomesa.accumulo.data.stats._ import org.locationtech.geomesa.accumulo.index._ import org.locationtech.geomesa.accumulo.iterators.{AgeOffIterator, DtgAgeOffIterator, ProjectVersionIterator, VisibilityIterator} -import org.locationtech.geomesa.accumulo.util.TableUtils import org.locationtech.geomesa.index.api.GeoMesaFeatureIndex import org.locationtech.geomesa.index.geotools.GeoMesaDataStore import org.locationtech.geomesa.index.index.attribute.AttributeIndex @@ -163,7 +162,9 @@ class AccumuloDataStore(val connector: AccumuloClient, override val config: Accu case -1 => "" case i => config.catalog.substring(0, i) } - TableUtils.createNamespaceIfNeeded(connector, namespace) + if (namespace.nonEmpty) { + adapter.ensureNamespaceExists(namespace) + } val canLoad = connector.namespaceOperations().testClassLoad(namespace, classOf[ProjectVersionIterator].getName, classOf[SortedKeyValueIterator[_, _]].getName) @@ -204,6 +205,7 @@ class AccumuloDataStore(val connector: AccumuloClient, override val config: Accu super.onSchemaCreated(sft) if (sft.statsEnabled) { // configure the stats combining iterator on the table for this sft + adapter.ensureTableExists(stats.metadata.table) stats.configureStatCombiner(connector, sft) } sft.getFeatureExpiration.foreach { @@ -274,6 +276,7 @@ class AccumuloDataStore(val connector: AccumuloClient, override val config: Accu stats.removeStatCombiner(connector, previous) } if (sft.statsEnabled) { + adapter.ensureTableExists(stats.metadata.table) stats.configureStatCombiner(connector, sft) } @@ -330,7 +333,7 @@ class AccumuloDataStore(val connector: AccumuloClient, override val config: Accu new SingleRowAccumuloMetadata[Stat](stats.metadata).migrate(typeName) } } finally { - lock.release() + lock.close() } sft = super.getSchema(typeName) } @@ -376,11 +379,12 @@ class AccumuloDataStore(val connector: AccumuloClient, override val config: Accu val lock = acquireCatalogLock() try { if (!metadata.read(typeName, configuredKey, cache = false).contains("true")) { + adapter.ensureTableExists(stats.metadata.table) stats.configureStatCombiner(connector, sft) metadata.insert(typeName, configuredKey, "true") } } finally { - lock.release() + lock.close() } } // kick off asynchronous stats run for the existing data - this will set the stat date diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala index 22247b2b3cfa..3957f0ec2611 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala @@ -13,17 +13,17 @@ import org.apache.accumulo.core.data.{Key, Range, Value} import org.apache.accumulo.core.file.keyfunctor.RowFunctor import org.apache.hadoop.io.Text import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} -import org.locationtech.geomesa.accumulo.data.AccumuloIndexAdapter.{AccumuloResultsToFeatures, ZIterPriority} +import org.locationtech.geomesa.accumulo.data.AccumuloIndexAdapter._ import org.locationtech.geomesa.accumulo.data.AccumuloQueryPlan.{BatchScanPlan, EmptyPlan} import org.locationtech.geomesa.accumulo.data.writer.tx.AccumuloAtomicIndexWriter import org.locationtech.geomesa.accumulo.data.writer.{AccumuloIndexWriter, ColumnFamilyMapper} -import org.locationtech.geomesa.accumulo.index.{AttributeJoinIndex, JoinIndex} +import org.locationtech.geomesa.accumulo.index.AttributeJoinIndex import org.locationtech.geomesa.accumulo.iterators.ArrowIterator.AccumuloArrowResultsToFeatures import org.locationtech.geomesa.accumulo.iterators.BinAggregatingIterator.AccumuloBinResultsToFeatures import org.locationtech.geomesa.accumulo.iterators.DensityIterator.AccumuloDensityResultsToFeatures import org.locationtech.geomesa.accumulo.iterators.StatsIterator.AccumuloStatsResultsToFeatures import org.locationtech.geomesa.accumulo.iterators._ -import org.locationtech.geomesa.accumulo.util.TableUtils +import org.locationtech.geomesa.accumulo.util.TableManager import org.locationtech.geomesa.index.api.IndexAdapter.{IndexWriter, RequiredVisibilityWriter} import org.locationtech.geomesa.index.api.QueryPlan.IndexResultsToFeatures import org.locationtech.geomesa.index.api._ @@ -48,7 +48,7 @@ import java.util.Map.Entry * * @param ds data store */ -class AccumuloIndexAdapter(ds: AccumuloDataStore) extends IndexAdapter[AccumuloDataStore] { +class AccumuloIndexAdapter(ds: AccumuloDataStore) extends TableManager(ds.connector) with IndexAdapter[AccumuloDataStore] { import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType @@ -63,7 +63,7 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore) extends IndexAdapter[AccumuloD splits: => Seq[Array[Byte]]): Unit = { val table = index.configureTableName(partition) // writes table name to metadata // create table if it doesn't exist - val created = TableUtils.createTableIfNeeded(ds.connector, table, index.sft.isLogicalTime) + val created = ensureTableExists(table, index.sft.isLogicalTime) def addSplitsAndGroups(): Unit = { // create splits @@ -115,20 +115,8 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore) extends IndexAdapter[AccumuloD } } - override def renameTable(from: String, to: String): Unit = { - if (tableOps.exists(from)) { - tableOps.rename(from, to) - } - } - - override def deleteTables(tables: Seq[String]): Unit = { - def deleteOne(table: String): Unit = { - if (tableOps.exists(table)) { - tableOps.delete(table) - } - } - tables.toList.map(table => CachedThreadPool.submit(() => deleteOne(table))).foreach(_.get) - } + override def deleteTables(tables: Seq[String]): Unit = + tables.toList.map(table => CachedThreadPool.submit(() => deleteTable(table))).foreach(_.get) override def clearTables(tables: Seq[String], prefix: Option[Array[Byte]]): Unit = { val auths = ds.auths // get the auths once up front diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/stats/AccumuloGeoMesaStats.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/stats/AccumuloGeoMesaStats.scala index 7fe6a3f4ae3e..09ef39cb0a18 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/stats/AccumuloGeoMesaStats.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/stats/AccumuloGeoMesaStats.scala @@ -13,7 +13,6 @@ import org.apache.hadoop.io.Text import org.geotools.api.feature.simple.SimpleFeatureType import org.locationtech.geomesa.accumulo.combiners.StatsCombiner import org.locationtech.geomesa.accumulo.data._ -import org.locationtech.geomesa.accumulo.util.TableUtils import org.locationtech.geomesa.index.stats.GeoMesaStats.{GeoMesaStatWriter, StatUpdater} import org.locationtech.geomesa.index.stats.MetadataBackedStats.{StatsMetadataSerializer, WritableStat} import org.locationtech.geomesa.index.stats._ @@ -77,7 +76,6 @@ class AccumuloGeoMesaStats(ds: AccumuloDataStore, val metadata: AccumuloBackedMe def configureStatCombiner(connector: AccumuloClient, sft: SimpleFeatureType): Unit = { import MetadataBackedStats._ - TableUtils.createTableIfNeeded(connector, metadata.table) StatsCombiner.configure(sft, connector, metadata.table, metadata.typeNameSeparator.toString) val keys = Seq(CountKey, BoundsKeyPrefix, TopKKeyPrefix, FrequencyKeyPrefix, HistogramKeyPrefix) diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/stats/StatsRunner.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/stats/StatsRunner.scala index 733abf4e02d9..847267b28834 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/stats/StatsRunner.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/stats/StatsRunner.scala @@ -141,7 +141,7 @@ class StatRunner(ds: AccumuloDataStore, sft: SimpleFeatureType, lockTimeout: Opt Instant.now(Clock.systemUTC()).plus(updateInterval, ChronoUnit.MINUTES) } } finally { - lock.release() + lock.close() } } } diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/package.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/package.scala index e06029499cd6..71c5c8322b6b 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/package.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/package.scala @@ -8,12 +8,19 @@ package org.locationtech.geomesa +import org.locationtech.geomesa.accumulo.util.TableManager import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty package object accumulo { object AccumuloProperties { + object TableProperties { + val TableCreationSync: SystemProperty = + SystemProperty("geomesa.accumulo.table.sync", TableManager.TableSynchronization.ZooKeeper.toString) + val TableCacheExpiry: SystemProperty = SystemProperty("geomesa.accumulo.table.cache.expiry", "10 minutes") + } + object AccumuloMapperProperties { val DESIRED_SPLITS_PER_TSERVER = SystemProperty("geomesa.mapreduce.splits.tserver.max") val DESIRED_ABSOLUTE_SPLITS = SystemProperty("geomesa.mapreduce.splits.max") diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/TableManager.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/TableManager.scala new file mode 100644 index 000000000000..1f1a0c690956 --- /dev/null +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/TableManager.scala @@ -0,0 +1,251 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.accumulo.util + +import com.github.benmanes.caffeine.cache.{Cache, Caffeine} +import com.typesafe.scalalogging.LazyLogging +import org.apache.accumulo.core.client.admin.{NewTableConfiguration, TimeType} +import org.apache.accumulo.core.client.{AccumuloClient, NamespaceExistsException, TableExistsException} +import org.apache.accumulo.core.conf.ClientProperty +import org.locationtech.geomesa.accumulo.AccumuloProperties.TableProperties.{TableCacheExpiry, TableCreationSync} +import org.locationtech.geomesa.accumulo.util.TableManager._ +import org.locationtech.geomesa.index.DistributedLockTimeout +import org.locationtech.geomesa.index.utils.DistributedLocking +import org.locationtech.geomesa.index.utils.DistributedLocking.LocalLocking +import org.locationtech.geomesa.utils.text.StringSerialization +import org.locationtech.geomesa.utils.zk.ZookeeperLocking + +import java.io.Closeable +import java.util.concurrent.TimeUnit + +/** + * Manages table creation/deletion + * + * @param client accumulo client - note, this client must be cleaned up externally (as it's usually shared) + */ +class TableManager(client: AccumuloClient) { + + private val delegate: TableLock = TableSynchronization(TableCreationSync.get) match { + case TableSynchronization.ZooKeeper => new TableLockZk(client) + case TableSynchronization.Local => new TableLockLocal(client) + case TableSynchronization.None => new TableLockNone(client) + } + + /** + * Create table if it does not exist. + * + * Note: to avoid having to lock and check namespaces, the namespace for the table must already exist. Generally + * all tables for a data store share a namespace, and the namespace is created up front, so it will always exist. + * + * @param table table name + * @param useLogicalTime use logical table time + * @return true if table was created, false if table already exists + */ + def ensureTableExists(table: String, useLogicalTime: Boolean = true): Boolean = + delegate.ensureTableExists(table, if (useLogicalTime) { TimeType.LOGICAL } else { TimeType.MILLIS }) + + /** + * Create namespace if it does not exist + * + * @param namespace namespace + */ + def ensureNamespaceExists(namespace: String): Unit = delegate.ensureNamespaceExists(namespace) + + /** + * Delete a table + * + * @param table table to delete + */ + def deleteTable(table: String): Unit = delegate.deleteTable(table) + + // note: shadows IndexAdapter.renameTable + // noinspection ScalaUnusedSymbol + def renameTable(from: String, to: String): Unit = delegate.renameTable(from, to) +} + +object TableManager { + + /** + * Types of synchronization available + */ + object TableSynchronization extends Enumeration { + + val ZooKeeper, Local, None = Value + + def apply(value: String): TableSynchronization.Value = { + Seq(ZooKeeper, Local, None).find(_.toString.equalsIgnoreCase(value)).getOrElse { + throw new IllegalArgumentException( + s"No matching value for '$value' - available sync types: ${Seq(ZooKeeper, Local, None).mkString(", ")}") + } + } + } + + /** + * No-op locking implementation of table utils + * + * @param client accumulo client + */ + private class TableLockNone(client: AccumuloClient) extends TableLock(client) { + override protected def acquireDistributedLock(key: String): Closeable = () => {} + override protected def acquireDistributedLock(key: String, timeOut: Long): Option[Closeable] = Some(() => {}) + } + + /** + * Local locking implementation of table utils + * + * @param client accumulo client + */ + private class TableLockLocal(client: AccumuloClient) extends TableLock(client) with LocalLocking + + /** + * Distributed zookeeper locking implementation of table utils + * + * @param client accumulo client + */ + private class TableLockZk(client: AccumuloClient) extends TableLock(client) with ZookeeperLocking with LazyLogging { + + override protected val zookeepers: String = + client.properties().getProperty(ClientProperty.INSTANCE_ZOOKEEPERS.getKey) + + override protected def onTableExists(table: String): Unit = { + logger.warn( + s"TableExistsException when creating '$table' - this indicates another " + + "GeoMesa client is creating tables in an unsafe manner") + } + + override protected def onNamespaceExists(namespace: String): Unit = { + logger.warn( + s"NamespaceExistsException when creating '$namespace' - this indicates another " + + "GeoMesa client is creating tables in an unsafe manner") + } + } + + /** + * Table utility class + * + * @param client accumulo client + */ + private abstract class TableLock(client: AccumuloClient) extends DistributedLocking { + + private val timeoutMillis = DistributedLockTimeout.toDuration.map(_.toMillis).getOrElse { + // note: this property has a valid default value so this exception should never be triggered + throw new IllegalArgumentException(s"Couldn't convert '${DistributedLockTimeout.get}' to a duration") + } + + // note: value is not used here, we're treating this as a set + private val tableCache: Cache[String, java.lang.Boolean] = + Caffeine.newBuilder().expireAfterWrite(TableCacheExpiry.toDuration.get.toMillis, TimeUnit.MILLISECONDS).build() + + private val nsCache: Cache[String, java.lang.Boolean] = + Caffeine.newBuilder().expireAfterWrite(TableCacheExpiry.toDuration.get.toMillis, TimeUnit.MILLISECONDS).build() + + /** + * Create the table if it doesn't exist + * + * @param table table name + * @param timeType table time type + * @return true if table was created, false if it already exists + */ + def ensureTableExists(table: String, timeType: TimeType): Boolean = { + var created = false + tableCache.get(table, _ => { + withLock(tablePath(table), timeoutMillis, { + val tableOps = client.tableOperations() + if (!tableOps.exists(table)) { + try { + tableOps.create(table, new NewTableConfiguration().setTimeType(timeType)) + created = true + } catch { + case _: TableExistsException => onTableExists(table) + } + } + }) + java.lang.Boolean.FALSE + }) + created + } + + /** + * Creates the namespace if it doesn't exist + * + * @param namespace namespace + * @return true if namespace was created, false if it already existed + */ + def ensureNamespaceExists(namespace: String): Unit = { + nsCache.get(namespace, _ => { + withLock(nsPath(namespace), timeoutMillis, { + val nsOps = client.namespaceOperations + if (!nsOps.exists(namespace)) { + try { nsOps.create(namespace) } catch { + case _: NamespaceExistsException => onNamespaceExists(namespace) + } + } + }) + java.lang.Boolean.FALSE + }) + } + + /** + * Rename a table + * + * @param from current name + * @param to new name + */ + def renameTable(from: String, to: String): Unit = { + withLock(tablePath(from), timeoutMillis, { + val tableOps = client.tableOperations() + if (tableOps.exists(from)) { + withLock(tablePath(to), timeoutMillis, { + tableOps.rename(from, to) + tableCache.put(to, java.lang.Boolean.FALSE) + }) + } + tableCache.invalidate(from) + }) + } + + /** + * Delete a table + * + * @param table table to delete + */ + def deleteTable(table: String): Unit = { + withLock(tablePath(table), timeoutMillis, { + val tableOps = client.tableOperations() + if (tableOps.exists(table)) { + tableOps.delete(table) + } + tableCache.invalidate(table) + }) + } + + // can happen sometimes with multiple threads but usually not a problem + protected def onTableExists(table: String): Unit = {} + // can happen sometimes with multiple threads but usually not a problem + protected def onNamespaceExists(namespace: String): Unit = {} + + /** + * ZK path for acquiring a table lock + * + * @param table table name + * @return + */ + private def tablePath(table: String): String = + s"/org.locationtech.geomesa/table-locks/${StringSerialization.alphaNumericSafeString(table)}" + + /** + * ZK path for acquiring a namespace lock + * + * @param namespace namespace + * @return + */ + private def nsPath(namespace: String): String = + s"/org.locationtech.geomesa/ns-locks/${StringSerialization.alphaNumericSafeString(namespace)}" + } +} diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/TableUtils.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/TableUtils.scala index cfe1de79ab27..a715f1c4c511 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/TableUtils.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/TableUtils.scala @@ -11,6 +11,7 @@ package org.locationtech.geomesa.accumulo.util import org.apache.accumulo.core.client.admin.{NewTableConfiguration, TimeType} import org.apache.accumulo.core.client.{AccumuloClient, NamespaceExistsException, TableExistsException} +@deprecated("use AccumuloDataStore.adapter or TableManager to ensure caching and distributed synchronization") object TableUtils { /** diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/AccumuloContainer.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/AccumuloContainer.scala index 7c308cb25b3e..9f9402100f4b 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/AccumuloContainer.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/AccumuloContainer.scala @@ -34,13 +34,11 @@ case object AccumuloContainer extends StrictLogging { lazy val Container: AccumuloContainer = { val container = tryContainer.get WithClose(container.client()) { client => - client.namespaceOperations().create(Namespace) val secOps = client.securityOperations() secOps.changeUserAuthorizations(Users.root.name, Users.root.auths) Seq(Users.admin, Users.user).foreach { case UserWithAuths(name, password, auths) => secOps.createLocalUser(name, new PasswordToken(password)) SystemPermissions.foreach(p => secOps.grantSystemPermission(name, p)) - NamespacePermissions.foreach(p => secOps.grantNamespacePermission(name, Namespace, p)) client.securityOperations().changeUserAuthorizations(name, auths) } } @@ -77,14 +75,9 @@ case object AccumuloContainer extends StrictLogging { private val SystemPermissions = Seq( SystemPermission.CREATE_NAMESPACE, SystemPermission.ALTER_NAMESPACE, - SystemPermission.DROP_NAMESPACE - ) - - private val NamespacePermissions = Seq( - NamespacePermission.READ, - NamespacePermission.WRITE, - NamespacePermission.CREATE_TABLE, - NamespacePermission.ALTER_TABLE, - NamespacePermission.DROP_TABLE + SystemPermission.DROP_NAMESPACE, + SystemPermission.CREATE_TABLE, + SystemPermission.ALTER_TABLE, + SystemPermission.DROP_TABLE, ) } diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/TestWithDataStore.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/TestWithDataStore.scala index 998c041a8add..11db24b53955 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/TestWithDataStore.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/TestWithDataStore.scala @@ -27,7 +27,8 @@ import scala.collection.JavaConverters._ trait TestWithDataStore extends Specification { // we use class name to prevent spillage between unit tests - lazy val catalog = s"${AccumuloContainer.Namespace}.${getClass.getSimpleName}" + // use different namespaces to verify namespace creation works correctly + lazy val catalog = s"${getClass.getSimpleName.take(2)}.${getClass.getSimpleName}" // note the table needs to be different to prevent tests from conflicting with each other lazy val dsParams: Map[String, String] = Map( diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreFactoryTest.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreFactoryTest.scala index 73b0b51bde4b..f583a1770412 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreFactoryTest.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreFactoryTest.scala @@ -23,7 +23,7 @@ class AccumuloDataStoreFactoryTest extends Specification { import scala.collection.JavaConverters._ // we use class name to prevent spillage between unit tests - lazy val catalog = s"${AccumuloContainer.Namespace}.${getClass.getSimpleName}" + lazy val catalog = s"gm.${getClass.getSimpleName}" "AccumuloDataStoreFactory" should { diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/VisibilitiesTest.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/VisibilitiesTest.scala index fc74ef1466c7..4955b2e3d588 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/VisibilitiesTest.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/VisibilitiesTest.scala @@ -8,11 +8,14 @@ package org.locationtech.geomesa.accumulo.data +import org.apache.accumulo.core.security.NamespacePermission import org.geotools.api.data._ import org.geotools.filter.text.ecql.ECQL import org.geotools.util.factory.Hints import org.junit.runner.RunWith -import org.locationtech.geomesa.accumulo.TestWithFeatureType +import org.locationtech.geomesa.accumulo.AccumuloContainer.Users +import org.locationtech.geomesa.accumulo.util.TableManager +import org.locationtech.geomesa.accumulo.{AccumuloContainer, TestWithFeatureType} import org.locationtech.geomesa.features.ScalaSimpleFeature import org.locationtech.geomesa.security.SecurityUtils import org.locationtech.geomesa.utils.collection.SelfClosingIterator @@ -41,10 +44,20 @@ class VisibilitiesTest extends TestWithFeatureType { sf.getUserData.put(Hints.USE_PROVIDED_FID, Boolean.box(true)) sf } - val privDS = DataStoreFinder.getDataStore((dsParams ++ Map(AccumuloDataStoreParams.UserParam.key -> admin.name)).asJava) - val unprivDS = DataStoreFinder.getDataStore((dsParams ++ Map(AccumuloDataStoreParams.UserParam.key -> user.name)).asJava) - + + lazy val privDS = DataStoreFinder.getDataStore((dsParams ++ Map(AccumuloDataStoreParams.UserParam.key -> admin.name)).asJava) + lazy val unprivDS = DataStoreFinder.getDataStore((dsParams ++ Map(AccumuloDataStoreParams.UserParam.key -> user.name)).asJava) + step { + val ns = catalog.substring(0, catalog.indexOf(".")) + WithClose(AccumuloContainer.Container.client()) { client => + new TableManager(client).ensureNamespaceExists(ns) + Seq(Users.user.name, Users.admin.name).foreach { user => + Seq(NamespacePermission.READ, NamespacePermission.WRITE).foreach { p => + client.securityOperations().grantNamespacePermission(user, ns, p) + } + } + } addFeatures(privFeatures ++ unprivFeatures) } diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/stats/usage/QueryStatTransformTest.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/stats/usage/QueryStatTransformTest.scala index 7d8d2f4dd785..062221863c2b 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/stats/usage/QueryStatTransformTest.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/stats/usage/QueryStatTransformTest.scala @@ -32,7 +32,8 @@ class QueryStatTransformTest extends TestWithDataStore { "convert query stats to and from accumulo" in { - ds.connector.tableOperations().create(catalog) + ds.adapter.ensureNamespaceExists(catalog.substring(0, catalog.indexOf("."))) + ds.adapter.ensureTableExists(catalog) // currently we don't restore table and feature in the query stat - thus setting them null here val stat = QueryEvent(AccumuloAuditService.StoreType, featureName, 500L, "user1", "attr=1", "hint1=true", 101L, 201L, 11) diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/stats/usage/UsageStatReaderTest.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/stats/usage/UsageStatReaderTest.scala index 11691fa5f2e4..7c0aed1e1564 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/stats/usage/UsageStatReaderTest.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/stats/usage/UsageStatReaderTest.scala @@ -32,6 +32,8 @@ class UsageStatReaderTest extends TestWithDataStore { lazy val reader = new AccumuloEventReader(ds.connector, statsTable) step { + // we have to create the namespace here (normally created by the ds when you create a feature type) + ds.adapter.ensureNamespaceExists(catalog.substring(0, catalog.indexOf("."))) val stats = Seq( QueryEvent(AccumuloAuditService.StoreType, featureName, DateParsing.parseMillis("2014-07-26T13:20:01Z"), "user1", "query1", "hint1=true", 101L, 201L, 11), QueryEvent(AccumuloAuditService.StoreType, featureName, DateParsing.parseMillis("2014-07-26T14:20:01Z"), "user1", "query2", "hint2=true", 102L, 202L, 12), diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/iterators/DtgAgeOffTest.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/iterators/DtgAgeOffTest.scala index 5f7a1c026053..a0d1b2953b41 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/iterators/DtgAgeOffTest.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/iterators/DtgAgeOffTest.scala @@ -8,11 +8,14 @@ package org.locationtech.geomesa.accumulo.iterators +import org.apache.accumulo.core.security.NamespacePermission import org.geotools.api.data.{DataStore, DataStoreFinder} import org.geotools.api.feature.simple.SimpleFeature import org.geotools.api.filter.Filter import org.junit.runner.RunWith +import org.locationtech.geomesa.accumulo.AccumuloContainer.Users import org.locationtech.geomesa.accumulo.data.AccumuloDataStoreParams +import org.locationtech.geomesa.accumulo.util.TableManager import org.locationtech.geomesa.accumulo.{AccumuloContainer, TestWithFeatureType} import org.locationtech.geomesa.features.ScalaSimpleFeature import org.locationtech.geomesa.security.SecurityUtils @@ -58,6 +61,15 @@ class DtgAgeOffTest extends Specification with TestWithFeatureType { def query(ds: DataStore): Seq[SimpleFeature] = SelfClosingIterator(ds.getFeatureSource(sft.getTypeName).getFeatures(Filter.INCLUDE).features).toList + step { + val ns = catalog.substring(0, catalog.indexOf(".")) + WithClose(AccumuloContainer.Container.client()) { client => + new TableManager(client).ensureNamespaceExists(ns) + client.securityOperations().grantNamespacePermission(Users.user.name, ns, NamespacePermission.READ) + client.securityOperations().grantNamespacePermission(Users.admin.name, ns, NamespacePermission.READ) + } + } + "DTGAgeOff" should { "run at scan time with vis" in { add(1 to 10, "id", "user") diff --git a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/stats/AccumuloStatsConfigureCommand.scala b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/stats/AccumuloStatsConfigureCommand.scala index bbedf4d7650c..199804d2abc2 100644 --- a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/stats/AccumuloStatsConfigureCommand.scala +++ b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/stats/AccumuloStatsConfigureCommand.scala @@ -32,10 +32,11 @@ class AccumuloStatsConfigureCommand extends StatsConfigureCommand[AccumuloDataSt try { ds.getTypeNames.map(ds.getSchema).foreach { sft => Command.user.info(s"Configuring stats iterator for '${sft.getTypeName}'...") + ds.adapter.ensureTableExists(ds.stats.metadata.table) ds.stats.configureStatCombiner(ds.connector, sft) } } finally { - lock.release() + lock.close() } Command.user.info("done") } @@ -51,7 +52,7 @@ class AccumuloStatsConfigureCommand extends StatsConfigureCommand[AccumuloDataSt ds.stats.removeStatCombiner(ds.connector, sft) } } finally { - lock.release() + lock.close() } Command.user.info("done") } diff --git a/geomesa-accumulo/geomesa-accumulo-tools/src/test/scala/org/locationtech/geomesa/accumulo/tools/ingest/IngestCommandTest.scala b/geomesa-accumulo/geomesa-accumulo-tools/src/test/scala/org/locationtech/geomesa/accumulo/tools/ingest/IngestCommandTest.scala index 4a523a4f17aa..ca9b8ecae142 100644 --- a/geomesa-accumulo/geomesa-accumulo-tools/src/test/scala/org/locationtech/geomesa/accumulo/tools/ingest/IngestCommandTest.scala +++ b/geomesa-accumulo/geomesa-accumulo-tools/src/test/scala/org/locationtech/geomesa/accumulo/tools/ingest/IngestCommandTest.scala @@ -34,7 +34,7 @@ class IngestCommandTest extends Specification { "--zookeepers", AccumuloContainer.zookeepers, "--user", AccumuloContainer.user, "--password", AccumuloContainer.password, - "--catalog", s"${AccumuloContainer.Namespace}.${getClass.getSimpleName}${sftCounter.getAndIncrement()}", + "--catalog", s"gm.${getClass.getSimpleName}${sftCounter.getAndIncrement()}", "--compact-stats", "false" ) diff --git a/geomesa-accumulo/geomesa-accumulo-tools/src/test/scala/org/locationtech/geomesa/accumulo/tools/ingest/ShpIngestTest.scala b/geomesa-accumulo/geomesa-accumulo-tools/src/test/scala/org/locationtech/geomesa/accumulo/tools/ingest/ShpIngestTest.scala index f9723d9a64bf..be991779c5d7 100644 --- a/geomesa-accumulo/geomesa-accumulo-tools/src/test/scala/org/locationtech/geomesa/accumulo/tools/ingest/ShpIngestTest.scala +++ b/geomesa-accumulo/geomesa-accumulo-tools/src/test/scala/org/locationtech/geomesa/accumulo/tools/ingest/ShpIngestTest.scala @@ -53,7 +53,7 @@ class ShpIngestTest extends Specification { command.params.instance = AccumuloContainer.instanceName command.params.zookeepers = AccumuloContainer.zookeepers command.params.password = AccumuloContainer.password - command.params.catalog = s"${AccumuloContainer.Namespace}.${getClass.getSimpleName}${sftCounter.getAndIncrement()}" + command.params.catalog = s"gm.${getClass.getSimpleName}${sftCounter.getAndIncrement()}" command.params.force = true command.params.files = Collections.singletonList(new File(dir.toFile, s"$file.shp").getAbsolutePath) command.params.compact = false diff --git a/geomesa-cassandra/geomesa-cassandra-datastore/src/main/scala/org/locationtech/geomesa/cassandra/data/CassandraDataStore.scala b/geomesa-cassandra/geomesa-cassandra-datastore/src/main/scala/org/locationtech/geomesa/cassandra/data/CassandraDataStore.scala index 85f5a06e1302..d1f5ee112e58 100644 --- a/geomesa-cassandra/geomesa-cassandra-datastore/src/main/scala/org/locationtech/geomesa/cassandra/data/CassandraDataStore.scala +++ b/geomesa-cassandra/geomesa-cassandra-datastore/src/main/scala/org/locationtech/geomesa/cassandra/data/CassandraDataStore.scala @@ -20,7 +20,8 @@ import org.locationtech.geomesa.index.index.z2.{XZ2Index, Z2Index} import org.locationtech.geomesa.index.index.z3.{XZ3Index, Z3Index} import org.locationtech.geomesa.index.metadata.{GeoMesaMetadata, MetadataStringSerializer} import org.locationtech.geomesa.index.stats.{GeoMesaStats, RunnableStats} -import org.locationtech.geomesa.index.utils.{Explainer, LocalLocking} +import org.locationtech.geomesa.index.utils.DistributedLocking.LocalLocking +import org.locationtech.geomesa.index.utils.Explainer import org.locationtech.geomesa.utils.concurrent.CachedThreadPool import org.locationtech.geomesa.utils.conf.IndexId import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/GeoMesaDataStore.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/GeoMesaDataStore.scala index d3bc5a759de5..3417bea9d51d 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/GeoMesaDataStore.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/GeoMesaDataStore.scala @@ -186,7 +186,7 @@ abstract class GeoMesaDataStore[DS <: GeoMesaDataStore[DS]](val config: GeoMesaD logger.debug(s"Delaying creation of partitioned indices ${indices.map(_.identifier).mkString(", ")}") } else { logger.debug(s"Creating indices ${indices.map(_.identifier).mkString(", ")}") - indices.foreach(index => adapter.createTable(index, None, index.getSplits(None))) + indices.toList.map(i => CachedThreadPool.submit(() => adapter.createTable(i, None, i.getSplits(None)))).foreach(_.get) } } @@ -227,7 +227,7 @@ abstract class GeoMesaDataStore[DS <: GeoMesaDataStore[DS]](val config: GeoMesaD logger.debug("Delaying creation of partitioned indices") } else { logger.debug(s"Ensuring indices ${manager.indices(sft).map(_.identifier).mkString(", ")}") - manager.indices(sft).foreach(index => adapter.createTable(index, None, index.getSplits(None))) + manager.indices(sft).toList.map(i => CachedThreadPool.submit(() => adapter.createTable(i, None, i.getSplits(None)))).foreach(_.get) } // update stats diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/MetadataBackedDataStore.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/MetadataBackedDataStore.scala index 7a075961a806..98bd513ff34b 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/MetadataBackedDataStore.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/MetadataBackedDataStore.scala @@ -22,7 +22,7 @@ import org.locationtech.geomesa.index.geotools.GeoMesaFeatureReader.HasGeoMesaFe import org.locationtech.geomesa.index.metadata.GeoMesaMetadata._ import org.locationtech.geomesa.index.metadata.HasGeoMesaMetadata import org.locationtech.geomesa.index.planning.QueryInterceptor.QueryInterceptorFactory -import org.locationtech.geomesa.index.utils.{DistributedLocking, Releasable} +import org.locationtech.geomesa.index.utils.DistributedLocking import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypeComparator.TypeComparison import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.Configs @@ -32,7 +32,7 @@ import org.locationtech.geomesa.utils.geotools.{FeatureUtils, GeoToolsDateFormat import org.locationtech.geomesa.utils.index.{GeoMesaSchemaValidator, ReservedWordCheck} import org.locationtech.geomesa.utils.io.CloseWithLogging -import java.io.IOException +import java.io.{Closeable, IOException} import java.time.{Instant, ZoneOffset} import java.util.{Locale, List => jList} import scala.util.control.NonFatal @@ -172,7 +172,7 @@ abstract class MetadataBackedDataStore(config: NamespaceConfig) extends DataStor } } } finally { - lock.release() + lock.close() } } } @@ -269,7 +269,7 @@ abstract class MetadataBackedDataStore(config: NamespaceConfig) extends DataStor onSchemaUpdated(sft, previousSft) } finally { - lock.release() + lock.close() } } @@ -287,7 +287,7 @@ abstract class MetadataBackedDataStore(config: NamespaceConfig) extends DataStor metadata.delete(typeName) } } finally { - lock.release() + lock.close() } } @@ -464,10 +464,10 @@ abstract class MetadataBackedDataStore(config: NamespaceConfig) extends DataStor } /** - * Acquires a distributed lock for all data stores sharing this catalog table. - * Make sure that you 'release' the lock in a finally block. - */ - protected [geomesa] def acquireCatalogLock(): Releasable = { + * Acquires a distributed lock for all data stores sharing this catalog table. + * Make sure that you 'release' the lock in a finally block. + */ + protected[geomesa] def acquireCatalogLock(): Closeable = { import org.locationtech.geomesa.index.DistributedLockTimeout val dsTypeName = getClass.getSimpleName.replaceAll("[^A-Za-z]", "") val path = s"/org.locationtech.geomesa/ds/$dsTypeName/${config.catalog}" diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/package.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/package.scala index 94a99ebe6bac..4460a062f2e4 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/package.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/package.scala @@ -18,10 +18,9 @@ package object index { type FlushableFeatureWriter = SimpleFeatureWriter with Flushable - val FilterCacheSize = SystemProperty("geomesa.cache.filters.size", "1000") - val ZFilterCacheSize = SystemProperty("geomesa.cache.z-filters.size", "1000") + val FilterCacheSize : SystemProperty = SystemProperty("geomesa.cache.filters.size", "1000") + val ZFilterCacheSize: SystemProperty = SystemProperty("geomesa.cache.z-filters.size", "1000") - val PartitionParallelScan = SystemProperty("geomesa.partition.scan.parallel", "false") - - val DistributedLockTimeout = SystemProperty("geomesa.distributed.lock.timeout", "2 minutes") + val PartitionParallelScan : SystemProperty = SystemProperty("geomesa.partition.scan.parallel", "false") + val DistributedLockTimeout: SystemProperty = SystemProperty("geomesa.distributed.lock.timeout", "2 minutes") } diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/utils/DistributedLocking.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/utils/DistributedLocking.scala index 911048bb755d..80b308fb0882 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/utils/DistributedLocking.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/utils/DistributedLocking.scala @@ -8,6 +8,7 @@ package org.locationtech.geomesa.index.utils +import java.io.Closeable import java.util.concurrent.TimeUnit import java.util.concurrent.locks.{Lock, ReentrantLock} @@ -20,7 +21,7 @@ trait DistributedLocking { * @param key key to lock on - equivalent to a path in zookeeper * @return the lock */ - protected def acquireDistributedLock(key: String): Releasable + protected def acquireDistributedLock(key: String): Closeable /** * Gets and acquires a distributed lock based on the key. @@ -30,7 +31,7 @@ trait DistributedLocking { * @param timeOut how long to wait to acquire the lock, in millis * @return the lock, if obtained */ - protected def acquireDistributedLock(key: String, timeOut: Long): Option[Releasable] + protected def acquireDistributedLock(key: String, timeOut: Long): Option[Closeable] /** * Execute a function wrapped in a lock @@ -42,9 +43,21 @@ trait DistributedLocking { */ protected def withLock[T](key: String, fn: => T): T = { val lock = acquireDistributedLock(key) - try { fn } finally { lock.release() } + try { fn } finally { lock.close() } } + /** + * Execute a function wrapped in a lock + * + * @param key key to lock on + * @param timeOut how long to wait to acquire the lock, in millis + * @param fn function to run with the lock + * @tparam T result type + * @return + */ + protected def withLock[T](key: String, timeOut: Long, fn: => T): T = + withLock(key, timeOut, fn, throw new RuntimeException(s"Could not acquire distributed lock at '$key' within ${timeOut}ms")) + /** * Execute a function wrapped in a lock * @@ -58,35 +71,28 @@ trait DistributedLocking { protected def withLock[T](key: String, timeOut: Long, fn: => T, fallback: => T): T = { acquireDistributedLock(key, timeOut) match { case None => fallback - case Some(lock) => try { fn } finally { lock.release() } + case Some(lock) => try { fn } finally { lock.close() } } } } -trait LocalLocking extends DistributedLocking { +object DistributedLocking { - import LocalLocking.locks + private val locks = scala.collection.mutable.Map.empty[String, Lock] - override protected def acquireDistributedLock(key: String): Releasable = { - val lock = locks.synchronized(locks.getOrElseUpdate(key, new ReentrantLock())) - lock.lock() - Releasable(lock) - } + def releasable(lock: Lock): Closeable = () => lock.unlock() - override protected def acquireDistributedLock(key: String, timeOut: Long): Option[Releasable] = { - val lock = locks.synchronized(locks.getOrElseUpdate(key, new ReentrantLock())) - if (lock.tryLock(timeOut, TimeUnit.MILLISECONDS)) { Some(Releasable(lock)) } else { None } - } -} + trait LocalLocking extends DistributedLocking { -object LocalLocking { - private val locks = scala.collection.mutable.Map.empty[String, Lock] -} - -trait Releasable { - def release(): Unit -} + override protected def acquireDistributedLock(key: String): Closeable = { + val lock = locks.synchronized(locks.getOrElseUpdate(key, new ReentrantLock())) + lock.lock() + releasable(lock) + } -object Releasable { - def apply(lock: Lock): Releasable = new Releasable { override def release(): Unit = lock.unlock() } + override protected def acquireDistributedLock(key: String, timeOut: Long): Option[Closeable] = { + val lock = locks.synchronized(locks.getOrElseUpdate(key, new ReentrantLock())) + if (lock.tryLock(timeOut, TimeUnit.MILLISECONDS)) { Some(releasable(lock)) } else { None } + } + } } diff --git a/geomesa-index-api/src/test/scala/org/locationtech/geomesa/index/TestGeoMesaDataStore.scala b/geomesa-index-api/src/test/scala/org/locationtech/geomesa/index/TestGeoMesaDataStore.scala index 02f9460d10a4..33ee08b69f0a 100644 --- a/geomesa-index-api/src/test/scala/org/locationtech/geomesa/index/TestGeoMesaDataStore.scala +++ b/geomesa-index-api/src/test/scala/org/locationtech/geomesa/index/TestGeoMesaDataStore.scala @@ -26,8 +26,9 @@ import org.locationtech.geomesa.index.metadata.GeoMesaMetadata import org.locationtech.geomesa.index.planning.LocalQueryRunner.{ArrowDictionaryHook, LocalTransformReducer} import org.locationtech.geomesa.index.stats.MetadataBackedStats.WritableStat import org.locationtech.geomesa.index.stats._ +import org.locationtech.geomesa.index.utils.DistributedLocking.LocalLocking +import org.locationtech.geomesa.index.utils.Explainer import org.locationtech.geomesa.index.utils.Reprojection.QueryReferenceSystems -import org.locationtech.geomesa.index.utils.{Explainer, LocalLocking} import org.locationtech.geomesa.security.DefaultAuthorizationsProvider import org.locationtech.geomesa.utils.audit.{AuditProvider, AuditWriter} import org.locationtech.geomesa.utils.collection.CloseableIterator diff --git a/geomesa-kafka/geomesa-kafka-confluent/src/main/scala/org/locationtech/geomesa/kafka/confluent/ConfluentKafkaDataStore.scala b/geomesa-kafka/geomesa-kafka-confluent/src/main/scala/org/locationtech/geomesa/kafka/confluent/ConfluentKafkaDataStore.scala index db19aa748e6e..4a8ec75fcc48 100644 --- a/geomesa-kafka/geomesa-kafka-confluent/src/main/scala/org/locationtech/geomesa/kafka/confluent/ConfluentKafkaDataStore.scala +++ b/geomesa-kafka/geomesa-kafka-confluent/src/main/scala/org/locationtech/geomesa/kafka/confluent/ConfluentKafkaDataStore.scala @@ -11,7 +11,6 @@ package org.locationtech.geomesa.kafka.confluent import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient import org.apache.avro.Schema import org.geotools.api.feature.simple.SimpleFeatureType -import org.locationtech.geomesa.index.utils.LocalLocking import org.locationtech.geomesa.kafka.confluent.ConfluentGeoMessageSerializer.ConfluentGeoMessageSerializerFactory import org.locationtech.geomesa.kafka.data.KafkaDataStore import org.locationtech.geomesa.kafka.data.KafkaDataStore.KafkaDataStoreConfig @@ -31,7 +30,7 @@ object ConfluentKafkaDataStore { val metadata = new ConfluentMetadata(client, topicToSft) val serialization = new ConfluentGeoMessageSerializerFactory(schemaRegistryUrl, topicToSchema) - new KafkaDataStore(config, metadata, serialization) with LocalLocking { + new KafkaDataStore(config, metadata, serialization) { override protected def preSchemaCreate(sft: SimpleFeatureType): Unit = throw new NotImplementedError( "Confluent Kafka stores do not support creating schemas, " + diff --git a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala index b7bb505b785c..1101b2407c1c 100644 --- a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala +++ b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala @@ -27,7 +27,7 @@ import org.locationtech.geomesa.index.geotools.GeoMesaDataStoreFactory.Namespace import org.locationtech.geomesa.index.geotools.{GeoMesaFeatureReader, MetadataBackedDataStore} import org.locationtech.geomesa.index.metadata.GeoMesaMetadata import org.locationtech.geomesa.index.stats.{GeoMesaStats, HasGeoMesaStats, RunnableStats} -import org.locationtech.geomesa.index.utils.LocalLocking +import org.locationtech.geomesa.index.utils.DistributedLocking.LocalLocking import org.locationtech.geomesa.kafka.consumer.ThreadedConsumer.ConsumerErrorHandler import org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl import org.locationtech.geomesa.kafka.data.KafkaDataStore.KafkaDataStoreConfig diff --git a/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/OffsetManager.scala b/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/OffsetManager.scala index a497ce981baa..dce99e6d6d94 100644 --- a/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/OffsetManager.scala +++ b/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/OffsetManager.scala @@ -8,7 +8,7 @@ package org.locationtech.geomesa.lambda.stream -import org.locationtech.geomesa.index.utils.{DistributedLocking, Releasable} +import org.locationtech.geomesa.index.utils.DistributedLocking import org.locationtech.geomesa.lambda.stream.OffsetManager.OffsetListener import java.io.Closeable @@ -20,7 +20,7 @@ trait OffsetManager extends DistributedLocking with Closeable { def getOffset(topic: String, partition: Int): Long def setOffset(topic: String, partition: Int, offset: Long): Unit def deleteOffsets(topic: String): Unit - def acquireLock(topic: String, partition: Int, timeOut: Long): Option[Releasable] = + def acquireLock(topic: String, partition: Int, timeOut: Long): Option[Closeable] = acquireDistributedLock(s"$topic/$partition", timeOut) def addOffsetListener(topic: String, listener: OffsetListener): Unit def removeOffsetListener(topic: String, listener: OffsetListener): Unit diff --git a/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/ZookeeperOffsetManager.scala b/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/ZookeeperOffsetManager.scala index 456cf38e67a6..6071de834823 100644 --- a/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/ZookeeperOffsetManager.scala +++ b/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/ZookeeperOffsetManager.scala @@ -12,7 +12,6 @@ import com.typesafe.scalalogging.LazyLogging import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.cache.{PathChildrenCache, PathChildrenCacheEvent, PathChildrenCacheListener} import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex -import org.locationtech.geomesa.index.utils.Releasable import org.locationtech.geomesa.lambda.stream.OffsetManager.OffsetListener import org.locationtech.geomesa.lambda.stream.ZookeeperOffsetManager.CuratorOffsetListener import org.locationtech.geomesa.utils.io.CloseWithLogging @@ -68,16 +67,16 @@ class ZookeeperOffsetManager(zookeepers: String, namespace: String = "geomesa") CloseWithLogging(client) } - override protected def acquireDistributedLock(path: String): Releasable = + override protected def acquireDistributedLock(path: String): Closeable = acquireLock(path, lock => { lock.acquire(); true }) - override protected def acquireDistributedLock(path: String, timeOut: Long): Option[Releasable] = + override protected def acquireDistributedLock(path: String, timeOut: Long): Option[Closeable] = Option(acquireLock(path, lock => lock.acquire(timeOut, TimeUnit.MILLISECONDS))) - private def acquireLock(path: String, acquire: InterProcessSemaphoreMutex => Boolean): Releasable = { + private def acquireLock(path: String, acquire: InterProcessSemaphoreMutex => Boolean): Closeable = { val lock = new InterProcessSemaphoreMutex(client, s"/$path/locks") if (acquire(lock)) { - new Releasable { override def release(): Unit = lock.release() } + () => lock.release() } else { null } diff --git a/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/kafka/DataStorePersistence.scala b/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/kafka/DataStorePersistence.scala index 4a4b196f4f22..764d5278a3e3 100644 --- a/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/kafka/DataStorePersistence.scala +++ b/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/kafka/DataStorePersistence.scala @@ -73,7 +73,7 @@ class DataStorePersistence(ds: DataStore, logger.trace(s"Acquired lock for [$topic:$partition]") persist(partition, clock.millis() - ageOffMillis) } finally { - lock.release() + lock.close() logger.trace(s"Released lock for [$topic:$partition]") } } diff --git a/geomesa-lambda/geomesa-lambda-datastore/src/test/scala/org/locationtech/geomesa/lambda/InMemoryOffsetManager.scala b/geomesa-lambda/geomesa-lambda-datastore/src/test/scala/org/locationtech/geomesa/lambda/InMemoryOffsetManager.scala index 4a5da24d178b..5c19bcba0dc1 100644 --- a/geomesa-lambda/geomesa-lambda-datastore/src/test/scala/org/locationtech/geomesa/lambda/InMemoryOffsetManager.scala +++ b/geomesa-lambda/geomesa-lambda-datastore/src/test/scala/org/locationtech/geomesa/lambda/InMemoryOffsetManager.scala @@ -8,10 +8,11 @@ package org.locationtech.geomesa.lambda -import org.locationtech.geomesa.index.utils.Releasable +import org.locationtech.geomesa.index.utils.DistributedLocking import org.locationtech.geomesa.lambda.stream.OffsetManager import org.locationtech.geomesa.lambda.stream.OffsetManager.OffsetListener +import java.io.Closeable import java.util.concurrent.TimeUnit import java.util.concurrent.locks.{Lock, ReentrantLock} @@ -43,16 +44,16 @@ class InMemoryOffsetManager extends OffsetManager { override def close(): Unit = {} - override protected def acquireDistributedLock(key: String): Releasable = { + override protected def acquireDistributedLock(key: String): Closeable = { val lock = locks.synchronized(locks.getOrElseUpdate(key, new ReentrantLock())) lock.lock() - Releasable(lock) + DistributedLocking.releasable(lock) } - override protected def acquireDistributedLock(key: String, timeOut: Long): Option[Releasable] = { + override protected def acquireDistributedLock(key: String, timeOut: Long): Option[Closeable] = { val lock = locks.synchronized(locks.getOrElseUpdate(key, new ReentrantLock())) if (lock.tryLock(timeOut, TimeUnit.MILLISECONDS)) { - Some(Releasable(lock)) + Some(DistributedLocking.releasable(lock)) } else { None } diff --git a/geomesa-redis/geomesa-redis-datastore/src/main/scala/org/locationtech/geomesa/redis/data/util/RedisLocking.scala b/geomesa-redis/geomesa-redis-datastore/src/main/scala/org/locationtech/geomesa/redis/data/util/RedisLocking.scala index bdcfd81406e8..d65c3c67ec6e 100644 --- a/geomesa-redis/geomesa-redis-datastore/src/main/scala/org/locationtech/geomesa/redis/data/util/RedisLocking.scala +++ b/geomesa-redis/geomesa-redis-datastore/src/main/scala/org/locationtech/geomesa/redis/data/util/RedisLocking.scala @@ -9,11 +9,12 @@ package org.locationtech.geomesa.redis.data.util import org.locationtech.geomesa.index.DistributedLockTimeout -import org.locationtech.geomesa.index.utils.{DistributedLocking, Releasable} +import org.locationtech.geomesa.index.utils.DistributedLocking import org.locationtech.geomesa.utils.io.WithClose import redis.clients.jedis.JedisPool import redis.clients.jedis.params.SetParams +import java.io.Closeable import java.util.UUID /** @@ -37,12 +38,12 @@ trait RedisLocking extends DistributedLocking { def connection: JedisPool - override protected def acquireDistributedLock(key: String): Releasable = + override protected def acquireDistributedLock(key: String): Closeable = acquireDistributedLock(key, Long.MaxValue).orNull - override protected def acquireDistributedLock(key: String, timeOut: Long): Option[Releasable] = { + override protected def acquireDistributedLock(key: String, timeOut: Long): Option[Closeable] = { val start = System.currentTimeMillis() - var lock: Releasable = null + var lock: Closeable = null while (lock == null && System.currentTimeMillis() - start < timeOut) { if (WithClose(connection.getResource)(_.set(key, id, params)) != null) { @@ -53,8 +54,8 @@ trait RedisLocking extends DistributedLocking { Option(lock) } - private class JedisReleasable(key: String) extends Releasable { - override def release(): Unit = { + private class JedisReleasable(key: String) extends Closeable { + override def close(): Unit = { WithClose(connection.getResource) { jedis => if (jedis.get(key) == id) { jedis.del(key) diff --git a/geomesa-utils-parent/geomesa-zk-utils/src/main/scala/org/locationtech/geomesa/utils/zk/ZookeeperLocking.scala b/geomesa-utils-parent/geomesa-zk-utils/src/main/scala/org/locationtech/geomesa/utils/zk/ZookeeperLocking.scala index 96b3fcbc122e..a3e75895a138 100644 --- a/geomesa-utils-parent/geomesa-zk-utils/src/main/scala/org/locationtech/geomesa/utils/zk/ZookeeperLocking.scala +++ b/geomesa-utils-parent/geomesa-zk-utils/src/main/scala/org/locationtech/geomesa/utils/zk/ZookeeperLocking.scala @@ -10,9 +10,10 @@ package org.locationtech.geomesa.utils.zk import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex -import org.locationtech.geomesa.index.utils.{DistributedLocking, Releasable} +import org.locationtech.geomesa.index.utils.DistributedLocking import org.locationtech.geomesa.utils.io.CloseQuietly +import java.io.Closeable import java.util.concurrent.TimeUnit trait ZookeeperLocking extends DistributedLocking { @@ -26,7 +27,7 @@ trait ZookeeperLocking extends DistributedLocking { * @param key key to lock on - equivalent to a path in zookeeper * @return the lock */ - override protected def acquireDistributedLock(key: String): Releasable = { + override protected def acquireDistributedLock(key: String): Closeable = { val (client, lock) = distributedLock(key) try { lock.acquire() @@ -44,7 +45,7 @@ trait ZookeeperLocking extends DistributedLocking { * @param timeOut how long to wait to acquire the lock, in millis * @return the lock, if obtained */ - override protected def acquireDistributedLock(key: String, timeOut: Long): Option[Releasable] = { + override protected def acquireDistributedLock(key: String, timeOut: Long): Option[Closeable] = { val (client, lock) = distributedLock(key) try { if (lock.acquire(timeOut, TimeUnit.MILLISECONDS)) { @@ -69,6 +70,6 @@ trait ZookeeperLocking extends DistributedLocking { object ZookeeperLocking { // delegate lock that will close the curator client upon release - def releasable(lock: InterProcessSemaphoreMutex, client: CuratorFramework): Releasable = - new Releasable { override def release(): Unit = try { lock.release() } finally { client.close() } } + def releasable(lock: InterProcessSemaphoreMutex, client: CuratorFramework): Closeable = + () => try { lock.release() } finally { client.close() } }