diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloQueryPlan.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloQueryPlan.scala index 37b3d54d9922..dd870a99b9d2 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloQueryPlan.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloQueryPlan.scala @@ -15,6 +15,7 @@ import org.apache.accumulo.core.client.{Connector, IteratorSetting, ScannerBase} import org.apache.accumulo.core.data.{Key, Value} import org.apache.accumulo.core.security.Authorizations import org.apache.hadoop.io.Text +import org.locationtech.geomesa.accumulo.data.AccumuloQueryPlan.{IteratorSettingMessage, IteratorSettingMessageKey} import org.locationtech.geomesa.accumulo.util.BatchMultiScanner import org.locationtech.geomesa.index.PartitionParallelScan import org.locationtech.geomesa.index.api.QueryPlan.{FeatureReducer, ResultsToFeatures} @@ -43,13 +44,19 @@ sealed trait AccumuloQueryPlan extends QueryPlan[AccumuloDataStore] { override def explain(explainer: Explainer, prefix: String = ""): Unit = AccumuloQueryPlan.explain(this, explainer, prefix) - protected def configure(scanner: ScannerBase): Unit = { + protected[data] def configure(scanner: ScannerBase): Unit = { + Option(IteratorSettingMessage.get).foreach { msg => + iterators.headOption.map{ is => + is.addOption(IteratorSettingMessageKey, msg)} + } iterators.foreach(scanner.addScanIterator) columnFamily.foreach(scanner.fetchColumnFamily) } } object AccumuloQueryPlan extends LazyLogging { + val IteratorSettingMessageKey = "message" + val IteratorSettingMessage = new ThreadLocal[String] import scala.collection.JavaConverters._ diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreAuthTest.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreAuthTest.scala index a3484d050570..1fe5574d0d9c 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreAuthTest.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreAuthTest.scala @@ -11,6 +11,8 @@ package org.locationtech.geomesa.accumulo.data import java.io.Serializable import java.util +import org.apache.accumulo.core.client.BatchScanner +import org.apache.accumulo.core.clientImpl.{ScannerOptions, TabletServerBatchReader} import org.apache.accumulo.core.security.Authorizations import org.geotools.data._ import org.geotools.data.collection.ListFeatureCollection @@ -18,6 +20,7 @@ import org.geotools.data.simple.SimpleFeatureStore import org.geotools.filter.text.ecql.ECQL import org.junit.runner.RunWith import org.locationtech.geomesa.accumulo.TestWithFeatureType +import org.locationtech.geomesa.accumulo.data.AccumuloQueryPlan.IteratorSettingMessageKey import org.locationtech.geomesa.features.ScalaSimpleFeature import org.locationtech.geomesa.security.{AuthorizationsProvider, DefaultAuthorizationsProvider, FilteringAuthorizationsProvider, SecurityUtils} import org.locationtech.geomesa.utils.collection.SelfClosingIterator @@ -54,7 +57,11 @@ class AccumuloDataStoreAuthTest extends TestWithFeatureType { val threadedAuths = new ThreadLocal[Authorizations] val authProvider = new AuthorizationsProvider { - override def getAuthorizations: java.util.List[String] = threadedAuths.get.getAuthorizations.map(new String(_)) + override def getAuthorizations: java.util.List[String] = { + val auths = threadedAuths.get.getAuthorizations.map(new String(_)) + AccumuloQueryPlan.IteratorSettingMessage.set(s"${auths.head}") + auths + } override def configure(params: util.Map[String, _ <: Serializable]): Unit = {} } @@ -186,6 +193,37 @@ class AccumuloDataStoreAuthTest extends TestWithFeatureType { } } } + + "allow for information from an AuthsProvider to be passed as an iteratorSetting" >> { + val params = dsParams ++ Map(AuthsParam.key -> "user,admin", AuthProviderParam.key -> authProvider) + val ds = DataStoreFinder.getDataStore(params).asInstanceOf[AccumuloDataStore] + ds must not(beNull) + + val user = Seq("bbox(geom, 44, 49.1, 46, 50.1)", "bbox(geom, 44, 49.1, 46, 50.1) AND dtg DURING 2016-01-01T00:59:30.000Z/2016-01-01T01:00:30.000Z") + + forall(user) { filter => + val q = new Query(sftName, ECQL.toFilter(filter)) + threadedAuths.set(new Authorizations("user")) + // Get the auths to push things into the message. + val auths = ds.auths + val plans = ds.getQueryPlan(q) + val bs: BatchScanner = ds.connector.createBatchScanner(ds.getAllTableNames(sftName).head, new Authorizations(), 1) + plans.foreach(_.configure(bs)) + val scannerOptions = bs.asInstanceOf[TabletServerBatchReader] + //val clazz = scannerOptions.getClass + val field = classOf[ScannerOptions].getDeclaredField("serverSideIteratorOptions") + field.setAccessible(true) + val map = field.get(scannerOptions).asInstanceOf[java.util.Map[String, java.util.Map[String, String]]] + + if (map != null) { + import scala.collection.JavaConverters._ + val message: String = map.values.asScala.flatMap(_.asScala.get(IteratorSettingMessageKey)).headOption.orNull + message mustEqual ("user") + } + threadedAuths.remove() + ok + } + } } "allow users with sufficient auths to write data" >> {