Skip to content

Commit

Permalink
GEOMESA-3333 Update to Accumulo 2.1 (#3043)
Browse files Browse the repository at this point in the history
* Use testcontainers for Accumulo tests
* Move iterators into separate module to allow the distributed runtime to be used in tests
* Upgrade json-path to 2.9.0
  • Loading branch information
elahrvivaz authored Feb 27, 2024
1 parent 41b3197 commit 03dd53f
Show file tree
Hide file tree
Showing 103 changed files with 1,248 additions and 867 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build-and-test-2.12.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ jobs:
run: ./build/mvn clean install $MAVEN_CLI_OPTS -DskipTests -T4
- name: Unit tests
id: test
run: mvn surefire:test $MAVEN_CLI_OPTS $MAVEN_TEST_OPTS -o
run: mvn surefire:test $MAVEN_CLI_OPTS $MAVEN_TEST_OPTS
continue-on-error: true
- name: Unit tests (retry)
id: test-retry
if: steps.test.outcome=='failure'
run: mvn surefire:test $MAVEN_CLI_OPTS $MAVEN_TEST_OPTS -o
run: mvn surefire:test $MAVEN_CLI_OPTS $MAVEN_TEST_OPTS
- name: Integration Tests
run: mvn failsafe:integration-test failsafe:verify $MAVEN_CLI_OPTS $MAVEN_TEST_OPTS
- name: Remove geomesa artifacts
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/build-and-test-2.13.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ jobs:
run: ./build/mvn clean install $MAVEN_CLI_OPTS -DskipTests -T4
- name: Unit tests
id: test
run: mvn surefire:test $MAVEN_CLI_OPTS $MAVEN_TEST_OPTS -o
run: mvn surefire:test $MAVEN_CLI_OPTS $MAVEN_TEST_OPTS
continue-on-error: true
- name: Unit tests (retry)
id: test-retry
if: steps.test.outcome=='failure'
run: mvn surefire:test $MAVEN_CLI_OPTS $MAVEN_TEST_OPTS -o
run: mvn surefire:test $MAVEN_CLI_OPTS $MAVEN_TEST_OPTS
- name: Integration Tests
run: mvn failsafe:integration-test failsafe:verify $MAVEN_CLI_OPTS $MAVEN_TEST_OPTS
- name: Remove geomesa artifacts
Expand Down
4 changes: 2 additions & 2 deletions docs/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@
.. |geotools_version| replace:: %(gt_version)s
.. |accumulo_supported_versions| replace:: versions %(accumulo_version)s and %(accumulo_version_recommended)s
.. |accumulo_supported_versions| replace:: versions 2.0.1 and %(accumulo_version_recommended)s
.. |accumulo_required_version| replace:: %(accumulo_version)s or %(accumulo_version_recommended)s
.. |accumulo_required_version| replace:: 2.0.1 or %(accumulo_version_recommended)s
.. |hbase_required_version| replace:: %(hbase_version)s
Expand Down
3 changes: 2 additions & 1 deletion docs/user/upgrade.rst
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,15 @@ Dependency Version Upgrades

The following dependencies have been upgraded:

* accumulo ``2.0.1`` -> ``2.1.2``
* aircompressor ``0.21`` -> ``0.25``
* antlr ``4.7.1`` -> ``4.7.2``
* aws-java-sdk ``1.11.179`` -> ``1.12.625``
* caffeine ``2.9.3`` -> ``3.1.8``
* cassandra-driver ``3.11.3`` -> ``3.11.5``
* com.clearspring.analytics ``2.9.2`` -> ``2.9.8``
* com.fasterxml.jackson ``2.14.1`` -> ``2.16.1``
* com.jayway.jsonpath ``2.7.0`` -> ``2.8.0``
* com.jayway.jsonpath ``2.7.0`` -> ``2.9.0``
* com.typesafe:config ``1.4.2`` -> ``1.4.3``
* commons-cli ``1.2`` -> ``1.6.0``
* commons-codec ``1.15`` -> ``1.16.0``
Expand Down
21 changes: 19 additions & 2 deletions geomesa-accumulo/geomesa-accumulo-datastore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-index-api_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-accumulo-indices_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-accumulo-iterators_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-utils_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -117,8 +125,17 @@
<artifactId>specs2-mock_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-minicluster</artifactId>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</dependency>
<dependency>
<groupId>org.geomesa.testcontainers</groupId>
<artifactId>testcontainers-accumulo</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-accumulo-distributed-runtime_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.locationtech.geomesa.index.index.z3.{XZ3Index, Z3Index}
import org.locationtech.geomesa.index.metadata.{GeoMesaMetadata, MetadataStringSerializer}
import org.locationtech.geomesa.index.utils.Explainer
import org.locationtech.geomesa.utils.conf.FeatureExpiration.{FeatureTimeExpiration, IngestTimeExpiration}
import org.locationtech.geomesa.utils.conf.IndexId
import org.locationtech.geomesa.utils.conf.{FeatureExpiration, IndexId}
import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.AttributeOptions
Expand Down Expand Up @@ -207,9 +207,24 @@ class AccumuloDataStore(val connector: AccumuloClient, override val config: Accu
stats.configureStatCombiner(connector, sft)
}
sft.getFeatureExpiration.foreach {
case IngestTimeExpiration(ttl) => AgeOffIterator.set(this, sft, ttl)
case FeatureTimeExpiration(dtg, _, ttl) => DtgAgeOffIterator.set(this, sft, ttl, dtg)
case e => throw new IllegalArgumentException(s"Unexpected feature expiration: $e")
case IngestTimeExpiration(ttl) =>
val tableOps = connector.tableOperations()
getAllIndexTableNames(sft.getTypeName).filter(tableOps.exists).foreach { table =>
AgeOffIterator.set(tableOps, table, sft, ttl)
}

case FeatureTimeExpiration(dtg, _, ttl) =>
val tableOps = connector.tableOperations()
manager.indices(sft).foreach { index =>
val indexSft = index match {
case joinIndex: AttributeJoinIndex => joinIndex.indexSft
case _ => sft
}
DtgAgeOffIterator.set(tableOps, indexSft, index, ttl, dtg)
}

case e =>
throw new IllegalArgumentException(s"Unexpected feature expiration: $e")
}
}

Expand All @@ -236,7 +251,16 @@ class AccumuloDataStore(val connector: AccumuloClient, override val config: Accu
// check any previous age-off - previously age-off wasn't tied to the sft metadata
if (!sft.isFeatureExpirationEnabled && !previous.isFeatureExpirationEnabled) {
// explicitly set age-off in the feature type if found
val configured = AgeOffIterator.expiry(this, previous).orElse(DtgAgeOffIterator.expiry(this, previous))
val tableOps = connector.tableOperations()
val tables = getAllIndexTableNames(previous.getTypeName).filter(tableOps.exists)
val ageOff = tables.foldLeft[Option[FeatureExpiration]](None) { (res, table) =>
res.orElse(AgeOffIterator.expiry(tableOps, table))
}
val configured = ageOff.orElse {
tables.foldLeft[Option[FeatureExpiration]](None) { (res, table) =>
res.orElse(DtgAgeOffIterator.expiry(tableOps, previous, table))
}
}
configured.foreach(sft.setFeatureExpiration)
}

Expand All @@ -253,19 +277,35 @@ class AccumuloDataStore(val connector: AccumuloClient, override val config: Accu
stats.configureStatCombiner(connector, sft)
}

val tableOps = connector.tableOperations()
val previousTables = getAllIndexTableNames(previous.getTypeName).filter(tableOps.exists)
val tables = getAllIndexTableNames(sft.getTypeName).filter(tableOps.exists)

if (previous.isVisibilityRequired != sft.isVisibilityRequired) {
VisibilityIterator.clear(this, previous)
previousTables.foreach(VisibilityIterator.clear(tableOps, _))
if (sft.isVisibilityRequired) {
VisibilityIterator.set(this, sft)
tables.foreach(VisibilityIterator.set(tableOps, _))
}
}

AgeOffIterator.clear(this, previous)
DtgAgeOffIterator.clear(this, previous)
previousTables.foreach { table =>
AgeOffIterator.clear(tableOps, table)
DtgAgeOffIterator.clear(tableOps, table)
}

sft.getFeatureExpiration.foreach {
case IngestTimeExpiration(ttl) => AgeOffIterator.set(this, sft, ttl)
case FeatureTimeExpiration(dtg, _, ttl) => DtgAgeOffIterator.set(this, sft, ttl, dtg)
case IngestTimeExpiration(ttl) =>
tables.foreach(AgeOffIterator.set(tableOps, _, sft, ttl))

case FeatureTimeExpiration(dtg, _, ttl) =>
manager.indices(sft).foreach { index =>
val indexSft = index match {
case joinIndex: AttributeJoinIndex => joinIndex.indexSft
case _ => sft
}
DtgAgeOffIterator.set(tableOps, indexSft, index, ttl, dtg)
}

case e => throw new IllegalArgumentException(s"Unexpected feature expiration: $e")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.apache.hadoop.io.Text
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.locationtech.geomesa.accumulo.data.AccumuloIndexAdapter.{AccumuloIndexWriter, AccumuloResultsToFeatures, ZIterPriority}
import org.locationtech.geomesa.accumulo.data.AccumuloQueryPlan.{BatchScanPlan, EmptyPlan}
import org.locationtech.geomesa.accumulo.index.{AccumuloJoinIndex, JoinIndex}
import org.locationtech.geomesa.accumulo.index.{AttributeJoinIndex, JoinIndex}
import org.locationtech.geomesa.accumulo.iterators.ArrowIterator.AccumuloArrowResultsToFeatures
import org.locationtech.geomesa.accumulo.iterators.BinAggregatingIterator.AccumuloBinResultsToFeatures
import org.locationtech.geomesa.accumulo.iterators.DensityIterator.AccumuloDensityResultsToFeatures
Expand Down Expand Up @@ -178,8 +178,8 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore) extends IndexAdapter[AccumuloD
}

index match {
case i: AccumuloJoinIndex =>
i.createQueryPlan(filter, tables, ranges, colFamily, schema, ecql, hints, numThreads)
case i: AttributeJoinIndex =>
AccumuloJoinIndexAdapter.createQueryPlan(ds, i, filter, tables, ranges, colFamily, schema, ecql, hints, numThreads)

case _ =>
val (iter, eToF, reduce) = if (strategy.hints.isBinQuery) {
Expand Down
Loading

0 comments on commit 03dd53f

Please sign in to comment.