From ef057e19c2357b3cb9c4d07cc63918b2f9f84a91 Mon Sep 17 00:00:00 2001 From: Emilio Lahr-Vivaz Date: Mon, 28 Aug 2023 16:46:54 +0000 Subject: [PATCH] GEOMESA-3291 Fix implementation of parallel table scans * Fixes `geomesa.partition.scan.parallel` --- .../accumulo/data/AccumuloQueryPlan.scala | 4 +- .../data/AccumuloPartitioningTest.scala | 38 +++++++++++-------- .../geomesa/hbase/data/HBaseQueryPlan.scala | 2 +- .../hbase/data/HBasePartitioningTest.scala | 22 ++++++----- .../redis/data/index/RedisQueryPlan.scala | 2 +- .../utils/collection/CloseableIterator.scala | 7 +--- 6 files changed, 41 insertions(+), 34 deletions(-) diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloQueryPlan.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloQueryPlan.scala index 0d70a930b938..284641e93c4d 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloQueryPlan.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloQueryPlan.scala @@ -131,7 +131,7 @@ object AccumuloQueryPlan extends LazyLogging { timeout: Option[Timeout]): CloseableIterator[Entry[Key, Value]] = { if (partitionParallelScans) { // kick off all the scans at once - tables.map(scanner(connector, _, auths, timeout)).foldLeft(CloseableIterator.empty[Entry[Key, Value]])(_ ++ _) + tables.map(scanner(connector, _, auths, timeout)).foldLeft(CloseableIterator.empty[Entry[Key, Value]])(_ concat _) } else { // kick off the scans sequentially as they finish SelfClosingIterator(tables.iterator).flatMap(scanner(connector, _, auths, timeout)) @@ -189,7 +189,7 @@ object AccumuloQueryPlan extends LazyLogging { if (ds.config.queries.parallelPartitionScans) { // kick off all the scans at once tables.map(scanner(ds.connector, _, joinTables.next, auths, partitionParallelScans = true, timeout)) - .foldLeft(CloseableIterator.empty[Entry[Key, Value]])(_ ++ _) + .foldLeft(CloseableIterator.empty[Entry[Key, Value]])(_ concat _) } else { // kick off the scans sequentially as they finish SelfClosingIterator(tables.iterator) diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloPartitioningTest.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloPartitioningTest.scala index 5323c4a5a295..7b7283ca9211 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloPartitioningTest.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloPartitioningTest.scala @@ -39,6 +39,11 @@ class AccumuloPartitioningTest extends Specification with TestWithFeatureType { override val spec: String = s"name:String:index=true,attr:String,dtg:Date,*geom:Point:srid=4326;${Configs.TablePartitioning}=${TimePartition.Name}" + lazy val parallelDs = { + val params = dsParams + (AccumuloDataStoreParams.PartitionParallelScansParam.key -> "true") + DataStoreFinder.getDataStore(params.asJava).asInstanceOf[AccumuloDataStore] + } + val features = (0 until 10).map { i => val sf = new ScalaSimpleFeature(sft, i.toString) sf.getUserData.put(Hints.USE_PROVIDED_FID, java.lang.Boolean.TRUE) @@ -92,22 +97,25 @@ class AccumuloPartitioningTest extends Specification with TestWithFeatureType { } def testQuery(filter: String, transforms: Array[String], results: Seq[SimpleFeature]): Unit = { - val query = new Query(sftName, ECQL.toFilter(filter), transforms: _*) - val fr = ds.getFeatureReader(query, Transaction.AUTO_COMMIT) - val features = SelfClosingIterator(fr).toList - if (features.length != results.length) { - ds.getQueryPlan(query, explainer = new ExplainPrintln) - } - val attributes = Option(transforms).getOrElse(ds.getSchema(sftName).getAttributeDescriptors.asScala.map(_.getLocalName).toArray) - features.map(_.getID) must containTheSameElementsAs(results.map(_.getID)) - forall(features) { feature => - feature.getAttributes must haveLength(attributes.length) - forall(attributes.zipWithIndex) { case (attribute, i) => - feature.getAttribute(attribute) mustEqual feature.getAttribute(i) - feature.getAttribute(attribute) mustEqual results.find(_.getID == feature.getID).get.getAttribute(attribute) + foreach(Seq(ds, parallelDs)) { ds => + val query = new Query(sftName, ECQL.toFilter(filter), transforms: _*) + val fr = ds.getFeatureReader(query, Transaction.AUTO_COMMIT) + val features = SelfClosingIterator(fr).toList + if (features.length != results.length) { + ds.getQueryPlan(query, explainer = new ExplainPrintln) + } + val attributes = Option(transforms).getOrElse(ds.getSchema(sftName).getAttributeDescriptors.asScala.map(_ + .getLocalName).toArray) + features.map(_.getID) must containTheSameElementsAs(results.map(_.getID)) + forall(features) { feature => + feature.getAttributes must haveLength(attributes.length) + forall(attributes.zipWithIndex) { case (attribute, i) => feature.getAttribute(attribute) mustEqual + feature.getAttribute(i) + feature.getAttribute(attribute) mustEqual results.find(_.getID == feature.getID).get.getAttribute(attribute) + } } + query.getHints.put(QueryHints.EXACT_COUNT, java.lang.Boolean.TRUE) + ds.getFeatureSource(sftName).getFeatures(query).size() mustEqual results.length } - query.getHints.put(QueryHints.EXACT_COUNT, java.lang.Boolean.TRUE) - ds.getFeatureSource(sftName).getFeatures(query).size() mustEqual results.length } } diff --git a/geomesa-hbase/geomesa-hbase-datastore/src/main/scala/org/locationtech/geomesa/hbase/data/HBaseQueryPlan.scala b/geomesa-hbase/geomesa-hbase-datastore/src/main/scala/org/locationtech/geomesa/hbase/data/HBaseQueryPlan.scala index 1927813ac671..e6c3c5c1e326 100644 --- a/geomesa-hbase/geomesa-hbase-datastore/src/main/scala/org/locationtech/geomesa/hbase/data/HBaseQueryPlan.scala +++ b/geomesa-hbase/geomesa-hbase-datastore/src/main/scala/org/locationtech/geomesa/hbase/data/HBaseQueryPlan.scala @@ -46,7 +46,7 @@ sealed trait HBaseQueryPlan extends QueryPlan[HBaseDataStore] { val iter = scans.iterator.map(singleTableScan(_, ds.connection, threads(ds), timeout)) if (ds.config.queries.parallelPartitionScans) { // kick off all the scans at once - iter.foldLeft(CloseableIterator.empty[Results])(_ ++ _) + iter.foldLeft(CloseableIterator.empty[Results])(_ concat _) } else { // kick off the scans sequentially as they finish SelfClosingIterator(iter).flatMap(s => s) diff --git a/geomesa-hbase/geomesa-hbase-datastore/src/test/scala/org/locationtech/geomesa/hbase/data/HBasePartitioningTest.scala b/geomesa-hbase/geomesa-hbase-datastore/src/test/scala/org/locationtech/geomesa/hbase/data/HBasePartitioningTest.scala index 6bd1d2df9f75..4b3e1b66adbf 100644 --- a/geomesa-hbase/geomesa-hbase-datastore/src/test/scala/org/locationtech/geomesa/hbase/data/HBasePartitioningTest.scala +++ b/geomesa-hbase/geomesa-hbase-datastore/src/test/scala/org/locationtech/geomesa/hbase/data/HBasePartitioningTest.scala @@ -109,15 +109,19 @@ class HBasePartitioningTest extends Specification with LazyLogging { val transformsList = Seq(null, Array("geom"), Array("geom", "dtg"), Array("name"), Array("dtg", "geom", "attr", "name")) - foreach(transformsList) { transforms => - testQuery(ds, typeName, "IN('0', '2')", transforms, Seq(toAdd(0), toAdd(2))) - testQuery(ds, typeName, "bbox(geom,38,48,52,62) and dtg DURING 2018-01-01T00:00:00.000Z/2018-01-08T12:00:00.000Z", transforms, toAdd.dropRight(2)) - testQuery(ds, typeName, "bbox(geom,42,48,52,62) and dtg DURING 2017-12-15T00:00:00.000Z/2018-01-15T00:00:00.000Z", transforms, toAdd.drop(2)) - testQuery(ds, typeName, "bbox(geom,42,48,52,62)", transforms, toAdd.drop(2)) - testQuery(ds, typeName, "dtg DURING 2018-01-01T00:00:00.000Z/2018-01-08T12:00:00.000Z", transforms, toAdd.dropRight(2)) - testQuery(ds, typeName, "attr = 'name5' and bbox(geom,38,48,52,62) and dtg DURING 2018-01-01T00:00:00.000Z/2018-01-08T12:00:00.000Z", transforms, Seq(toAdd(5))) - testQuery(ds, typeName, "name < 'name5'", transforms, toAdd.take(5)) - testQuery(ds, typeName, "name = 'name5'", transforms, Seq(toAdd(5))) + WithClose(DataStoreFinder.getDataStore((params + (HBaseDataStoreParams.PartitionParallelScansParam.key -> "true")).asJava).asInstanceOf[HBaseDataStore]) { parallelDs => + foreach(Seq(ds, parallelDs)) { ds => + foreach(transformsList) { transforms => + testQuery(ds, typeName, "IN('0', '2')", transforms, Seq(toAdd(0), toAdd(2))) + testQuery(ds, typeName, "bbox(geom,38,48,52,62) and dtg DURING 2018-01-01T00:00:00.000Z/2018-01-08T12:00:00.000Z", transforms, toAdd.dropRight(2)) + testQuery(ds, typeName, "bbox(geom,42,48,52,62) and dtg DURING 2017-12-15T00:00:00.000Z/2018-01-15T00:00:00.000Z", transforms, toAdd.drop(2)) + testQuery(ds, typeName, "bbox(geom,42,48,52,62)", transforms, toAdd.drop(2)) + testQuery(ds, typeName, "dtg DURING 2018-01-01T00:00:00.000Z/2018-01-08T12:00:00.000Z", transforms, toAdd.dropRight(2)) + testQuery(ds, typeName, "attr = 'name5' and bbox(geom,38,48,52,62) and dtg DURING 2018-01-01T00:00:00.000Z/2018-01-08T12:00:00.000Z", transforms, Seq(toAdd(5))) + testQuery(ds, typeName, "name < 'name5'", transforms, toAdd.take(5)) + testQuery(ds, typeName, "name = 'name5'", transforms, Seq(toAdd(5))) + } + } } { diff --git a/geomesa-redis/geomesa-redis-datastore/src/main/scala/org/locationtech/geomesa/redis/data/index/RedisQueryPlan.scala b/geomesa-redis/geomesa-redis-datastore/src/main/scala/org/locationtech/geomesa/redis/data/index/RedisQueryPlan.scala index 239e8ad7e3d2..1eceb11fc657 100644 --- a/geomesa-redis/geomesa-redis-datastore/src/main/scala/org/locationtech/geomesa/redis/data/index/RedisQueryPlan.scala +++ b/geomesa-redis/geomesa-redis-datastore/src/main/scala/org/locationtech/geomesa/redis/data/index/RedisQueryPlan.scala @@ -109,7 +109,7 @@ object RedisQueryPlan { val scans = iter.map(singleTableScan(ds, _)) if (ds.config.queries.parallelPartitionScans) { // kick off all the scans at once - scans.foldLeft(CloseableIterator.empty[Array[Byte]])(_ ++ _) + scans.foldLeft(CloseableIterator.empty[Array[Byte]])(_ concat _) } else { // kick off the scans sequentially as they finish SelfClosingIterator(scans).flatMap(s => s) diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala_2.12/org/locationtech/geomesa/utils/collection/CloseableIterator.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala_2.12/org/locationtech/geomesa/utils/collection/CloseableIterator.scala index 5ec99c5a9399..ffa4dc0e919b 100644 --- a/geomesa-utils-parent/geomesa-utils/src/main/scala_2.12/org/locationtech/geomesa/utils/collection/CloseableIterator.scala +++ b/geomesa-utils-parent/geomesa-utils/src/main/scala_2.12/org/locationtech/geomesa/utils/collection/CloseableIterator.scala @@ -122,11 +122,6 @@ object CloseableIterator { queue.foreach(_.apply().close()) queue.clear() } - - override def ++[B >: A](that: => GenTraversableOnce[B]): CloseableIterator[B] = { - lazy val applied = CloseableIterator.wrap(that) - new ConcatCloseableIterator[B](queue.+:(() => current).:+(() => applied)) - } } private final class FlatMapCloseableIterator[A, B](source: CloseableIterator[A], f: A => GenTraversableOnce[B]) @@ -177,7 +172,7 @@ trait CloseableIterator[+A] extends Iterator[A] with Closeable { new ConcatCloseableIterator[B](queue) } - // in scala 2.13 this metho is final, and can cause resource leaks due to not returning a closeable iterator + // in scala 2.13 this method is final, and can cause resource leaks due to not returning a closeable iterator override def ++[B >: A](that: => GenTraversableOnce[B]): CloseableIterator[B] = throw new NotImplementedError("Not safe for cross-scala usage")