Skip to content

Commit

Permalink
Identifying bug.
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Hughes <[email protected]>
  • Loading branch information
Jim Hughes committed Nov 16, 2021
1 parent 09a0a50 commit e25c3dd
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ object AccumuloIndexAdapter {
case kv: SingleRowKeyValue[_] =>
val mutation = new Mutation(kv.row)
kv.values.foreach { v =>
println(s"Adding for $v")
mutation.put(colFamilyMappings(i)(v.cf), v.cq, visCache(v.vis), v.value)
}
writers(i).addMutation(mutation)
Expand All @@ -400,6 +401,7 @@ object AccumuloIndexAdapter {
mkv.rows.foreach { row =>
val mutation = new Mutation(row)
mkv.values.foreach { v =>
println(s"Adding for $v")
mutation.put(colFamilyMappings(i)(v.cf), v.cq, visCache(v.vis), v.value)
}
writers(i).addMutation(mutation)
Expand All @@ -416,6 +418,7 @@ object AccumuloIndexAdapter {
case SingleRowKeyValue(row, _, _, _, _, _, vals) =>
val mutation = new Mutation(row)
vals.foreach { v =>
println(s"Deleting single row with ${new String(v.vis)} for $v")
mutation.putDelete(colFamilyMappings(i)(v.cf), v.cq, visCache(v.vis))
}
writers(i).addMutation(mutation)
Expand All @@ -424,6 +427,7 @@ object AccumuloIndexAdapter {
rows.foreach { row =>
val mutation = new Mutation(row)
vals.foreach { v =>
println(s"Deleting multi row with ${new String(v.vis)} for $v")
mutation.putDelete(colFamilyMappings(i)(v.cf), v.cq, visCache(v.vis))
}
writers(i).addMutation(mutation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@ import org.apache.accumulo.core.data.{Key, Value}
import org.apache.accumulo.core.iterators.user.RowEncodingIterator
import org.apache.accumulo.core.iterators.{IteratorEnvironment, SortedKeyValueIterator}
import org.locationtech.geomesa.features.kryo.impl.KryoFeatureDeserialization
import org.locationtech.geomesa.features.kryo.serialization.KryoUserDataSerialization
import org.locationtech.geomesa.features.kryo.{KryoFeatureSerializer, Metadata}
import org.locationtech.geomesa.index.iterators.IteratorCache
import org.locationtech.geomesa.security.SecurityUtils.FEATURE_VISIBILITY
import org.locationtech.geomesa.utils.collection.IntBitSet
import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType
import org.locationtech.geomesa.utils.geotools.{ObjectType, SimpleFeatureTypes}
import org.locationtech.geomesa.utils.index.VisibilityLevel
import org.opengis.feature.simple.SimpleFeatureType

import scala.collection.JavaConverters.asJavaIterableConverter
import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`

/**
* Assumes cq are byte-encoded attribute number
*/
Expand All @@ -42,6 +49,9 @@ class KryoVisibilityRowEncoder extends RowEncodingIterator {
env: IteratorEnvironment): Unit = {
super.init(source, options, env)
sft = IteratorCache.sft(options.get(KryoVisibilityRowEncoder.SftOpt))
if (sft.getVisibilityLevel == VisibilityLevel.Attribute) {
println(s"Initializing a reader for $sft")
}
count = sft.getAttributeCount
if (attributes == null || attributes.length < count) {
attributes = Array.ofDim[(Array[Byte], Int, Int)](count)
Expand Down Expand Up @@ -100,6 +110,10 @@ class KryoVisibilityRowEncoder extends RowEncodingIterator {
i += 1
}

// TODO: Calculate size of userData for vis.

length += 54 // 2 ($s) + 26 "geomesa.feature.visibility" + 2 "$s" + 19 "user,user,user,user" + 4 for two string sizes

val value = Array.ofDim[Byte](length)
val output = new Output(value)
output.writeByte(KryoFeatureSerializer.Version3)
Expand Down Expand Up @@ -128,9 +142,18 @@ class KryoVisibilityRowEncoder extends RowEncodingIterator {
}
output.writeInt(valueCursor - 4) // user-data offset. Note no user data has actually been copied in.


import scala.collection.JavaConversions._
val map: Map[String, String] = Map(FEATURE_VISIBILITY -> Array.fill(count)("user").mkString(","))

// write nulls - we should already be in the right position
nulls.serialize(output)
println(s"Output at position after null writer: ${output.position()}")
output.setPosition(valueCursor-3)

println(s"Output at position before userdata: ${output.position()}")
KryoUserDataSerialization.serialize(output, map)
println(s"Output at position after userdata: ${output.position()}")
new Value(value)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ case object MiniCluster extends LazyLogging {
sys.addShutdownHook({
logger.info("Stopping Accumulo minicluster")
try { cluster.stop() } finally {
PathUtils.deleteRecursively(miniClusterTempDir)
//PathUtils.deleteRecursively(miniClusterTempDir)
}
logger.info("Stopped Accumulo minicluster")
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.locationtech.geomesa.security.SecurityUtils
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
import org.opengis.feature.simple.SimpleFeature
import org.specs2.runner.JUnitRunner
import org.geotools.data.simple._

@RunWith(classOf[JUnitRunner])
class AccumuloDataStoreAttributeVisibilityTest extends TestWithFeatureType {
Expand Down Expand Up @@ -104,5 +105,39 @@ class AccumuloDataStoreAttributeVisibilityTest extends TestWithFeatureType {
m.get.getAttribute(3) mustEqual mixedFeature.getAttribute(3)
}
}

// "delete one record" in {
// val ds = DataStoreFinder.getDataStore(dsParams).asInstanceOf[AccumuloDataStore]
// val fs: SimpleFeatureStore = ds.getFeatureSource(sftName).asInstanceOf[SimpleFeatureStore]
//
// val query1 = new Query(sftName, ECQL.toFilter("IN('user')"))
// val foo1 = SelfClosingIterator(ds.getFeatureReader(query1, Transaction.AUTO_COMMIT)).toSeq
// foo1.size mustEqual 1
//
// fs.removeFeatures(ECQL.toFilter("IN('user')"))
//
// val query = new Query(sftName, ECQL.toFilter(filters.head._1))
// val foo = SelfClosingIterator(ds.getFeatureReader(query, Transaction.AUTO_COMMIT)).toSeq
// println(foo.head)
// foo.size mustEqual 0
// ok
// }

"delete all records" in {
val ds = DataStoreFinder.getDataStore(dsParams).asInstanceOf[AccumuloDataStore]
val fs: SimpleFeatureStore = ds.getFeatureSource(sftName).asInstanceOf[SimpleFeatureStore]

val query1 = new Query(sftName, ECQL.toFilter("INCLUDE"))
val foo1 = SelfClosingIterator(ds.getFeatureReader(query1, Transaction.AUTO_COMMIT)).toSeq
foo1.size mustEqual 3

fs.removeFeatures(ECQL.toFilter(filters.head._1))

// import org.geotools.filter._
val query = new Query(sftName, ECQL.toFilter("INCLUDE"))
val foo = SelfClosingIterator(ds.getFeatureReader(query, Transaction.AUTO_COMMIT)).toSeq
foo.size mustEqual 0
ok
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,20 @@ object KryoUserDataSerialization extends LazyLogging {
out.writeInt(toWrite.size) // don't use positive optimized version for back compatibility

toWrite.foreach { case (key, value) =>
val string = baseClassMappings.getOrElse(key.getClass, key.getClass.getName)
out.writeString(baseClassMappings.getOrElse(key.getClass, key.getClass.getName))
println(s"Writing ${string.length}: $string")
println(s"Writing ${key.toString.length}: $key")
write(out, key)
if (value == null) {
out.writeString(nullMapping)
} else {
val valueString = baseClassMappings.getOrElse(value.getClass, value.getClass.getName)
println(s"Writing value ${valueString.length}: $valueString")
println(s"Writing value ${value.toString.length}: $value")
out.writeString(baseClassMappings.getOrElse(value.getClass, value.getClass.getName))
write(out, value)

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ object QueryPlan {

protected def createSerializer: KryoFeatureSerializer = {
val builder = KryoFeatureSerializer.builder(sft)
if (index.serializedWithId) { builder.withId.build() } else { builder.withoutId.build() }
if (index.serializedWithId) { builder.withId.withUserData.build() } else { builder.withoutId.withUserData.build() }
}

def canEqual(other: Any): Boolean = other.isInstanceOf[IndexResultsToFeatures[T]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ object WritableFeature {
private lazy val visibilities: Array[String] = {
val count = feature.getFeatureType.getAttributeCount
val userData = Option(feature.getUserData.get(FEATURE_VISIBILITY).asInstanceOf[String])
if (userData.isEmpty) {
println(s"User data is empty for $feature")
}
val visibilities = userData.map(_.split(",")).getOrElse(Array.fill(count)(""))
require(visibilities.length == count,
s"Per-attribute visibilities do not match feature type ($count values expected): ${userData.getOrElse("")}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ abstract class LocalQueryRunner(stats: GeoMesaStats, authProvider: Option[Author

val filter = Option(query.getFilter).filter(_ != Filter.INCLUDE)
val visible = LocalQueryRunner.visible(authProvider)
val iter = features(sft, filter).filter(visible.apply)
val iter: CloseableIterator[SimpleFeature] = features(sft, filter).filter(visible.apply)

val hook = Some(ArrowDictionaryHook(stats, filter))
var result = transform(sft, iter, query.getHints.getTransform, query.getHints, hook)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class QueryPlanner[DS <: GeoMesaDataStore[DS]](ds: DS) extends QueryRunner with
override def runQuery(sft: SimpleFeatureType, query: Query, explain: Explainer): CloseableIterator[SimpleFeature] = {
val plans = getQueryPlans(sft, query, None, explain)

var iterator = SelfClosingIterator(plans.iterator).flatMap(p => p.scan(ds).map(p.resultsToFeatures.apply))
var iterator: CloseableIterator[SimpleFeature] = SelfClosingIterator(plans.iterator).flatMap(p => p.scan(ds).map(p.resultsToFeatures.apply))

if (!query.getHints.isSkipReduce) {
plans.headOption.flatMap(_.reducer).foreach { reducer =>
Expand Down

0 comments on commit e25c3dd

Please sign in to comment.