Skip to content

Commit

Permalink
First pass at pushing down information to the tservers.
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Hughes <[email protected]>
  • Loading branch information
jnh5y committed Sep 14, 2020
1 parent 1b3fef4 commit f3a7306
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.apache.accumulo.core.client.{Connector, IteratorSetting, ScannerBase}
import org.apache.accumulo.core.data.{Key, Value}
import org.apache.accumulo.core.security.Authorizations
import org.apache.hadoop.io.Text
import org.locationtech.geomesa.accumulo.data.AccumuloQueryPlan.{IteratorSettingMessage, IteratorSettingMessageKey}
import org.locationtech.geomesa.accumulo.util.BatchMultiScanner
import org.locationtech.geomesa.index.PartitionParallelScan
import org.locationtech.geomesa.index.api.QueryPlan.{FeatureReducer, ResultsToFeatures}
Expand Down Expand Up @@ -43,13 +44,19 @@ sealed trait AccumuloQueryPlan extends QueryPlan[AccumuloDataStore] {
override def explain(explainer: Explainer, prefix: String = ""): Unit =
AccumuloQueryPlan.explain(this, explainer, prefix)

protected def configure(scanner: ScannerBase): Unit = {
protected[data] def configure(scanner: ScannerBase): Unit = {
Option(IteratorSettingMessage.get).foreach { msg =>
iterators.headOption.map{ is =>
is.addOption(IteratorSettingMessageKey, msg)}
}
iterators.foreach(scanner.addScanIterator)
columnFamily.foreach(scanner.fetchColumnFamily)
}
}

object AccumuloQueryPlan extends LazyLogging {
val IteratorSettingMessageKey = "message"
val IteratorSettingMessage = new ThreadLocal[String]

import scala.collection.JavaConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ package org.locationtech.geomesa.accumulo.data
import java.io.Serializable
import java.util

import org.apache.accumulo.core.client.BatchScanner
import org.apache.accumulo.core.clientImpl.{ScannerOptions, TabletServerBatchReader}
import org.apache.accumulo.core.security.Authorizations
import org.geotools.data._
import org.geotools.data.collection.ListFeatureCollection
import org.geotools.data.simple.SimpleFeatureStore
import org.geotools.filter.text.ecql.ECQL
import org.junit.runner.RunWith
import org.locationtech.geomesa.accumulo.TestWithFeatureType
import org.locationtech.geomesa.accumulo.data.AccumuloQueryPlan.IteratorSettingMessageKey
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.security.{AuthorizationsProvider, DefaultAuthorizationsProvider, FilteringAuthorizationsProvider, SecurityUtils}
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
Expand Down Expand Up @@ -54,7 +57,11 @@ class AccumuloDataStoreAuthTest extends TestWithFeatureType {
val threadedAuths = new ThreadLocal[Authorizations]

val authProvider = new AuthorizationsProvider {
override def getAuthorizations: java.util.List[String] = threadedAuths.get.getAuthorizations.map(new String(_))
override def getAuthorizations: java.util.List[String] = {
val auths = threadedAuths.get.getAuthorizations.map(new String(_))
AccumuloQueryPlan.IteratorSettingMessage.set(s"${auths.head}")
auths
}
override def configure(params: util.Map[String, _ <: Serializable]): Unit = {}
}

Expand Down Expand Up @@ -186,6 +193,37 @@ class AccumuloDataStoreAuthTest extends TestWithFeatureType {
}
}
}

"allow for information from an AuthsProvider to be passed as an iteratorSetting" >> {
val params = dsParams ++ Map(AuthsParam.key -> "user,admin", AuthProviderParam.key -> authProvider)
val ds = DataStoreFinder.getDataStore(params).asInstanceOf[AccumuloDataStore]
ds must not(beNull)

val user = Seq("bbox(geom, 44, 49.1, 46, 50.1)", "bbox(geom, 44, 49.1, 46, 50.1) AND dtg DURING 2016-01-01T00:59:30.000Z/2016-01-01T01:00:30.000Z")

forall(user) { filter =>
val q = new Query(sftName, ECQL.toFilter(filter))
threadedAuths.set(new Authorizations("user"))
// Get the auths to push things into the message.
val auths = ds.auths
val plans = ds.getQueryPlan(q)
val bs: BatchScanner = ds.connector.createBatchScanner(ds.getAllTableNames(sftName).head, new Authorizations(), 1)
plans.foreach(_.configure(bs))
val scannerOptions = bs.asInstanceOf[TabletServerBatchReader]
//val clazz = scannerOptions.getClass
val field = classOf[ScannerOptions].getDeclaredField("serverSideIteratorOptions")
field.setAccessible(true)
val map = field.get(scannerOptions).asInstanceOf[java.util.Map[String, java.util.Map[String, String]]]

if (map != null) {
import scala.collection.JavaConverters._
val message: String = map.values.asScala.flatMap(_.asScala.get(IteratorSettingMessageKey)).headOption.orNull
message mustEqual ("user")
}
threadedAuths.remove()
ok
}
}
}

"allow users with sufficient auths to write data" >> {
Expand Down

0 comments on commit f3a7306

Please sign in to comment.