From e25c3dd3b88561bf794b85df287ffd3666f6e376 Mon Sep 17 00:00:00 2001 From: Jim Hughes Date: Tue, 16 Nov 2021 16:32:39 -0500 Subject: [PATCH] Identifying bug. Signed-off-by: Jim Hughes --- .../accumulo/data/AccumuloIndexAdapter.scala | 4 +++ .../iterators/KryoVisibilityRowEncoder.scala | 23 ++++++++++++ .../geomesa/accumulo/MiniCluster.scala | 2 +- ...muloDataStoreAttributeVisibilityTest.scala | 35 +++++++++++++++++++ .../KryoUserDataSerialization.scala | 7 ++++ .../geomesa/index/api/QueryPlan.scala | 2 +- .../geomesa/index/api/WritableFeature.scala | 3 ++ .../index/planning/LocalQueryRunner.scala | 2 +- .../geomesa/index/planning/QueryPlanner.scala | 2 +- 9 files changed, 76 insertions(+), 4 deletions(-) diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala index 9e3541268a01..93192782c304 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala @@ -392,6 +392,7 @@ object AccumuloIndexAdapter { case kv: SingleRowKeyValue[_] => val mutation = new Mutation(kv.row) kv.values.foreach { v => + println(s"Adding for $v") mutation.put(colFamilyMappings(i)(v.cf), v.cq, visCache(v.vis), v.value) } writers(i).addMutation(mutation) @@ -400,6 +401,7 @@ object AccumuloIndexAdapter { mkv.rows.foreach { row => val mutation = new Mutation(row) mkv.values.foreach { v => + println(s"Adding for $v") mutation.put(colFamilyMappings(i)(v.cf), v.cq, visCache(v.vis), v.value) } writers(i).addMutation(mutation) @@ -416,6 +418,7 @@ object AccumuloIndexAdapter { case SingleRowKeyValue(row, _, _, _, _, _, vals) => val mutation = new Mutation(row) vals.foreach { v => + println(s"Deleting single row with ${new String(v.vis)} for $v") mutation.putDelete(colFamilyMappings(i)(v.cf), v.cq, visCache(v.vis)) } writers(i).addMutation(mutation) @@ -424,6 +427,7 @@ object AccumuloIndexAdapter { rows.foreach { row => val mutation = new Mutation(row) vals.foreach { v => + println(s"Deleting multi row with ${new String(v.vis)} for $v") mutation.putDelete(colFamilyMappings(i)(v.cf), v.cq, visCache(v.vis)) } writers(i).addMutation(mutation) diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/iterators/KryoVisibilityRowEncoder.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/iterators/KryoVisibilityRowEncoder.scala index 85341953f8be..b232ecec1adf 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/iterators/KryoVisibilityRowEncoder.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/iterators/KryoVisibilityRowEncoder.scala @@ -15,12 +15,19 @@ import org.apache.accumulo.core.data.{Key, Value} import org.apache.accumulo.core.iterators.user.RowEncodingIterator import org.apache.accumulo.core.iterators.{IteratorEnvironment, SortedKeyValueIterator} import org.locationtech.geomesa.features.kryo.impl.KryoFeatureDeserialization +import org.locationtech.geomesa.features.kryo.serialization.KryoUserDataSerialization import org.locationtech.geomesa.features.kryo.{KryoFeatureSerializer, Metadata} import org.locationtech.geomesa.index.iterators.IteratorCache +import org.locationtech.geomesa.security.SecurityUtils.FEATURE_VISIBILITY import org.locationtech.geomesa.utils.collection.IntBitSet +import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType import org.locationtech.geomesa.utils.geotools.{ObjectType, SimpleFeatureTypes} +import org.locationtech.geomesa.utils.index.VisibilityLevel import org.opengis.feature.simple.SimpleFeatureType +import scala.collection.JavaConverters.asJavaIterableConverter +import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable` + /** * Assumes cq are byte-encoded attribute number */ @@ -42,6 +49,9 @@ class KryoVisibilityRowEncoder extends RowEncodingIterator { env: IteratorEnvironment): Unit = { super.init(source, options, env) sft = IteratorCache.sft(options.get(KryoVisibilityRowEncoder.SftOpt)) + if (sft.getVisibilityLevel == VisibilityLevel.Attribute) { + println(s"Initializing a reader for $sft") + } count = sft.getAttributeCount if (attributes == null || attributes.length < count) { attributes = Array.ofDim[(Array[Byte], Int, Int)](count) @@ -100,6 +110,10 @@ class KryoVisibilityRowEncoder extends RowEncodingIterator { i += 1 } + // TODO: Calculate size of userData for vis. + + length += 54 // 2 ($s) + 26 "geomesa.feature.visibility" + 2 "$s" + 19 "user,user,user,user" + 4 for two string sizes + val value = Array.ofDim[Byte](length) val output = new Output(value) output.writeByte(KryoFeatureSerializer.Version3) @@ -128,9 +142,18 @@ class KryoVisibilityRowEncoder extends RowEncodingIterator { } output.writeInt(valueCursor - 4) // user-data offset. Note no user data has actually been copied in. + + import scala.collection.JavaConversions._ + val map: Map[String, String] = Map(FEATURE_VISIBILITY -> Array.fill(count)("user").mkString(",")) + // write nulls - we should already be in the right position nulls.serialize(output) + println(s"Output at position after null writer: ${output.position()}") + output.setPosition(valueCursor-3) + println(s"Output at position before userdata: ${output.position()}") + KryoUserDataSerialization.serialize(output, map) + println(s"Output at position after userdata: ${output.position()}") new Value(value) } diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/MiniCluster.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/MiniCluster.scala index 87f47a9b0f06..5eb0a1ffcef1 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/MiniCluster.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/MiniCluster.scala @@ -71,7 +71,7 @@ case object MiniCluster extends LazyLogging { sys.addShutdownHook({ logger.info("Stopping Accumulo minicluster") try { cluster.stop() } finally { - PathUtils.deleteRecursively(miniClusterTempDir) + //PathUtils.deleteRecursively(miniClusterTempDir) } logger.info("Stopped Accumulo minicluster") }) diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreAttributeVisibilityTest.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreAttributeVisibilityTest.scala index 12d010d772cb..181e36aec190 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreAttributeVisibilityTest.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreAttributeVisibilityTest.scala @@ -22,6 +22,7 @@ import org.locationtech.geomesa.security.SecurityUtils import org.locationtech.geomesa.utils.collection.SelfClosingIterator import org.opengis.feature.simple.SimpleFeature import org.specs2.runner.JUnitRunner +import org.geotools.data.simple._ @RunWith(classOf[JUnitRunner]) class AccumuloDataStoreAttributeVisibilityTest extends TestWithFeatureType { @@ -104,5 +105,39 @@ class AccumuloDataStoreAttributeVisibilityTest extends TestWithFeatureType { m.get.getAttribute(3) mustEqual mixedFeature.getAttribute(3) } } + +// "delete one record" in { +// val ds = DataStoreFinder.getDataStore(dsParams).asInstanceOf[AccumuloDataStore] +// val fs: SimpleFeatureStore = ds.getFeatureSource(sftName).asInstanceOf[SimpleFeatureStore] +// +// val query1 = new Query(sftName, ECQL.toFilter("IN('user')")) +// val foo1 = SelfClosingIterator(ds.getFeatureReader(query1, Transaction.AUTO_COMMIT)).toSeq +// foo1.size mustEqual 1 +// +// fs.removeFeatures(ECQL.toFilter("IN('user')")) +// +// val query = new Query(sftName, ECQL.toFilter(filters.head._1)) +// val foo = SelfClosingIterator(ds.getFeatureReader(query, Transaction.AUTO_COMMIT)).toSeq +// println(foo.head) +// foo.size mustEqual 0 +// ok +// } + + "delete all records" in { + val ds = DataStoreFinder.getDataStore(dsParams).asInstanceOf[AccumuloDataStore] + val fs: SimpleFeatureStore = ds.getFeatureSource(sftName).asInstanceOf[SimpleFeatureStore] + + val query1 = new Query(sftName, ECQL.toFilter("INCLUDE")) + val foo1 = SelfClosingIterator(ds.getFeatureReader(query1, Transaction.AUTO_COMMIT)).toSeq + foo1.size mustEqual 3 + + fs.removeFeatures(ECQL.toFilter(filters.head._1)) + + // import org.geotools.filter._ + val query = new Query(sftName, ECQL.toFilter("INCLUDE")) + val foo = SelfClosingIterator(ds.getFeatureReader(query, Transaction.AUTO_COMMIT)).toSeq + foo.size mustEqual 0 + ok + } } } diff --git a/geomesa-features/geomesa-feature-kryo/src/main/scala/org/locationtech/geomesa/features/kryo/serialization/KryoUserDataSerialization.scala b/geomesa-features/geomesa-feature-kryo/src/main/scala/org/locationtech/geomesa/features/kryo/serialization/KryoUserDataSerialization.scala index 0faa97e25284..582fc274cf6b 100644 --- a/geomesa-features/geomesa-feature-kryo/src/main/scala/org/locationtech/geomesa/features/kryo/serialization/KryoUserDataSerialization.scala +++ b/geomesa-features/geomesa-feature-kryo/src/main/scala/org/locationtech/geomesa/features/kryo/serialization/KryoUserDataSerialization.scala @@ -71,13 +71,20 @@ object KryoUserDataSerialization extends LazyLogging { out.writeInt(toWrite.size) // don't use positive optimized version for back compatibility toWrite.foreach { case (key, value) => + val string = baseClassMappings.getOrElse(key.getClass, key.getClass.getName) out.writeString(baseClassMappings.getOrElse(key.getClass, key.getClass.getName)) + println(s"Writing ${string.length}: $string") + println(s"Writing ${key.toString.length}: $key") write(out, key) if (value == null) { out.writeString(nullMapping) } else { + val valueString = baseClassMappings.getOrElse(value.getClass, value.getClass.getName) + println(s"Writing value ${valueString.length}: $valueString") + println(s"Writing value ${value.toString.length}: $value") out.writeString(baseClassMappings.getOrElse(value.getClass, value.getClass.getName)) write(out, value) + } } } diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/api/QueryPlan.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/api/QueryPlan.scala index b0700d4f3866..3bc64e23a43e 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/api/QueryPlan.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/api/QueryPlan.scala @@ -286,7 +286,7 @@ object QueryPlan { protected def createSerializer: KryoFeatureSerializer = { val builder = KryoFeatureSerializer.builder(sft) - if (index.serializedWithId) { builder.withId.build() } else { builder.withoutId.build() } + if (index.serializedWithId) { builder.withId.withUserData.build() } else { builder.withoutId.withUserData.build() } } def canEqual(other: Any): Boolean = other.isInstanceOf[IndexResultsToFeatures[T]] diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/api/WritableFeature.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/api/WritableFeature.scala index 18eb12846ea9..875c5ddea75c 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/api/WritableFeature.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/api/WritableFeature.scala @@ -171,6 +171,9 @@ object WritableFeature { private lazy val visibilities: Array[String] = { val count = feature.getFeatureType.getAttributeCount val userData = Option(feature.getUserData.get(FEATURE_VISIBILITY).asInstanceOf[String]) + if (userData.isEmpty) { + println(s"User data is empty for $feature") + } val visibilities = userData.map(_.split(",")).getOrElse(Array.fill(count)("")) require(visibilities.length == count, s"Per-attribute visibilities do not match feature type ($count values expected): ${userData.getOrElse("")}") diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/LocalQueryRunner.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/LocalQueryRunner.scala index 0192856cd96d..b2ce08e45a13 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/LocalQueryRunner.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/LocalQueryRunner.scala @@ -76,7 +76,7 @@ abstract class LocalQueryRunner(stats: GeoMesaStats, authProvider: Option[Author val filter = Option(query.getFilter).filter(_ != Filter.INCLUDE) val visible = LocalQueryRunner.visible(authProvider) - val iter = features(sft, filter).filter(visible.apply) + val iter: CloseableIterator[SimpleFeature] = features(sft, filter).filter(visible.apply) val hook = Some(ArrowDictionaryHook(stats, filter)) var result = transform(sft, iter, query.getHints.getTransform, query.getHints, hook) diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/QueryPlanner.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/QueryPlanner.scala index fa7e9292748f..b7c5f8ee92b7 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/QueryPlanner.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/QueryPlanner.scala @@ -56,7 +56,7 @@ class QueryPlanner[DS <: GeoMesaDataStore[DS]](ds: DS) extends QueryRunner with override def runQuery(sft: SimpleFeatureType, query: Query, explain: Explainer): CloseableIterator[SimpleFeature] = { val plans = getQueryPlans(sft, query, None, explain) - var iterator = SelfClosingIterator(plans.iterator).flatMap(p => p.scan(ds).map(p.resultsToFeatures.apply)) + var iterator: CloseableIterator[SimpleFeature] = SelfClosingIterator(plans.iterator).flatMap(p => p.scan(ds).map(p.resultsToFeatures.apply)) if (!query.getHints.isSkipReduce) { plans.headOption.flatMap(_.reducer).foreach { reducer =>