Skip to content

Commit

Permalink
GEOMESA-3306 Send client version info with distributed processing calls
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Nov 6, 2023
1 parent ead9a5a commit cb3cc9e
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.locationtech.geomesa.index.api.GeoMesaFeatureIndex
import org.locationtech.geomesa.index.conf.FilterCompatibility
import org.locationtech.geomesa.index.conf.FilterCompatibility.FilterCompatibility
import org.locationtech.geomesa.index.iterators.{IteratorCache, SamplingIterator}
import org.locationtech.geomesa.utils.conf.GeoMesaProperties
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.opengis.filter.Filter
Expand Down Expand Up @@ -174,12 +175,15 @@ object FilterTransformIterator {
}
sampling.foreach(SamplingIterator.configure(sft, _).foreach { case (k, v) => is.addOption(k, v) })

compatibility.foreach {
case FilterCompatibility.`1.3` =>
compatibility match {
case None =>
is.addOption(VersionOpt, GeoMesaProperties.ProjectVersion)

case Some(FilterCompatibility.`1.3`) =>
is.setIteratorClass("org.locationtech.geomesa.accumulo.iterators.KryoLazyFilterTransformIterator")
is.addOption(IndexOpt, s"${index.name}:${index.version}")

case c =>
case Some(c) =>
throw new NotImplementedError(s"Unknown compatibility flag: '$c'")
}
Some(is)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.locationtech.geomesa.index.api.{FilterStrategy, GeoMesaFeatureIndex,
import org.locationtech.geomesa.index.conf.FilterCompatibility
import org.locationtech.geomesa.index.conf.QueryHints.RichHints
import org.locationtech.geomesa.index.iterators.{IteratorCache, SamplingIterator}
import org.locationtech.geomesa.utils.conf.GeoMesaProperties
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
import org.locationtech.geomesa.utils.index.{ByteArrays, IndexMode}
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
Expand Down Expand Up @@ -143,10 +144,14 @@ object CqlTransformFilter extends StrictLogging {
val samplingFactorBytes = samplingFactor.map(f => ByteBuffer.allocate(4).putFloat(f).array()).getOrElse(Array.empty)
val samplingFieldBytes = samplingField.map(field => field.getBytes(StandardCharsets.UTF_8)).getOrElse(Array.empty)

val version = GeoMesaProperties.ProjectVersion.getBytes(StandardCharsets.UTF_8)

delegate.transform match {
case None =>
val array = Array.ofDim[Byte](sftBytes.length + cqlBytes.length +
indexBytes.length + indexSftBytes.length + samplingFactorBytes.length + samplingFieldBytes.length + 4*7) //4 bytes (length info) per 7 fields
val array =
Array.ofDim[Byte](sftBytes.length + cqlBytes.length +
indexBytes.length + indexSftBytes.length + samplingFactorBytes.length +
samplingFieldBytes.length + version.length + 4*8) // 4 bytes (length info) per 8 fields

var offset = 0
ByteArrays.writeInt(sftBytes.length, array, offset)
Expand Down Expand Up @@ -191,20 +196,29 @@ object CqlTransformFilter extends StrictLogging {

if (samplingFieldBytes.isEmpty) {
ByteArrays.writeInt(0, array, offset)
offset += 4
} else {
ByteArrays.writeInt(samplingFieldBytes.length, array, offset)
offset += 4
System.arraycopy(samplingFieldBytes, 0, array, offset, samplingFieldBytes.length)
offset += samplingFieldBytes.length
}

ByteArrays.writeInt(version.length, array, offset)
offset += 4
System.arraycopy(version, 0, array, offset, version.length)
offset += version.length

array

case Some((tdefs, tsft)) =>
val tdefsBytes = tdefs.getBytes(StandardCharsets.UTF_8)
val tsftBytes = SimpleFeatureTypes.encodeType(tsft).getBytes(StandardCharsets.UTF_8)

val array = Array.ofDim[Byte](sftBytes.length + cqlBytes.length + tdefsBytes.length + tsftBytes.length +
indexBytes.length + indexSftBytes.length +samplingFactorBytes.length + samplingFieldBytes.length + 4*8) //4 bytes (length info) per 8 fields
val array =
Array.ofDim[Byte](sftBytes.length + cqlBytes.length + tdefsBytes.length + tsftBytes.length +
indexBytes.length + indexSftBytes.length + samplingFactorBytes.length +
samplingFieldBytes.length + version.length + 4*9) // 4 bytes (length info) per 9 fields

var offset = 0
ByteArrays.writeInt(sftBytes.length, array, offset)
Expand Down Expand Up @@ -254,12 +268,19 @@ object CqlTransformFilter extends StrictLogging {

if (samplingFieldBytes.isEmpty) {
ByteArrays.writeInt(0, array, offset)
offset += 4
} else {
ByteArrays.writeInt(samplingFieldBytes.length, array, offset)
offset += 4
System.arraycopy(samplingFieldBytes, 0, array, offset, samplingFieldBytes.length)
offset += samplingFieldBytes.length
}

ByteArrays.writeInt(version.length, array, offset)
offset += 4
System.arraycopy(version, 0, array, offset, version.length)
offset += version.length

array
}
}
Expand Down Expand Up @@ -292,7 +313,6 @@ object CqlTransformFilter extends StrictLogging {

val tdefsLength = ByteArrays.readInt(bytes, offset)


if (tdefsLength == -1) {
if (cql == null) {
throw new DeserializationException("No filter or transform defined")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package org.locationtech.geomesa.index.filters
import org.locationtech.geomesa.index.filters.RowFilter.RowFilterFactory
import org.locationtech.geomesa.index.filters.Z3Filter._
import org.locationtech.geomesa.index.index.z2.Z2IndexValues
import org.locationtech.geomesa.utils.conf.GeoMesaProperties
import org.locationtech.geomesa.utils.index.ByteArrays
import org.locationtech.geomesa.zorder.sfcurve.Z2

Expand Down Expand Up @@ -69,7 +70,7 @@ object Z2Filter extends RowFilterFactory[Z2Filter] {

override def serializeToStrings(filter: Z2Filter): Map[String, String] = {
val xy = filter.xy.map(bounds => bounds.mkString(RangeSeparator)).mkString(TermSeparator)
Map(XYKey -> xy)
Map(XYKey -> xy, VersionKey -> GeoMesaProperties.ProjectVersion)
}

override def deserializeFromStrings(serialized: scala.collection.Map[String, String]): Z2Filter = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package org.locationtech.geomesa.index.filters

import org.locationtech.geomesa.index.filters.RowFilter.RowFilterFactory
import org.locationtech.geomesa.index.index.z3.Z3IndexValues
import org.locationtech.geomesa.utils.conf.GeoMesaProperties
import org.locationtech.geomesa.utils.index.ByteArrays
import org.locationtech.geomesa.zorder.sfcurve.Z3

Expand Down Expand Up @@ -69,9 +70,10 @@ object Z3Filter extends RowFilterFactory[Z3Filter] {
private val TermSeparator = ";"
private val EpochSeparator = ","

val XYKey = "zxy"
val TKey = "zt"
val EpochKey = "epoch"
val XYKey = "zxy"
val TKey = "zt"
val EpochKey = "epoch"
val VersionKey = "v"

def apply(values: Z3IndexValues): Z3Filter = {
val Z3IndexValues(sfc, _, spatialBounds, _, temporalBounds, _) = values
Expand Down Expand Up @@ -160,9 +162,10 @@ object Z3Filter extends RowFilterFactory[Z3Filter] {
val epoch = s"${filter.minEpoch}$RangeSeparator${filter.maxEpoch}"

Map(
XYKey -> xy,
TKey -> t,
EpochKey -> epoch
XYKey -> xy,
TKey -> t,
EpochKey -> epoch,
VersionKey -> GeoMesaProperties.ProjectVersion
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.locationtech.geomesa.features.TransformSimpleFeature
import org.locationtech.geomesa.features.kryo.KryoBufferSimpleFeature
import org.locationtech.geomesa.index.api.GeoMesaFeatureIndex
import org.locationtech.geomesa.index.iterators.AggregatingScan.{AggregateCallback, CqlSampleValidator, CqlValidator, RowValidator, RowValue, SampleValidator, ValidateAll}
import org.locationtech.geomesa.utils.conf.GeoMesaProperties
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.opengis.filter.Filter
Expand Down Expand Up @@ -223,6 +224,7 @@ object AggregatingScan {
val TransformSchemaOpt = "tsft"
val TransformDefsOpt = "tdefs"
val BatchSizeOpt = "batch"
val VersionOpt = "v"
}

def configure(
Expand All @@ -243,7 +245,8 @@ object AggregatingScan {
Configuration.CqlOpt -> filter.map(ECQL.toCQL),
Configuration.TransformDefsOpt -> transform.map(_._1),
Configuration.TransformSchemaOpt -> transform.map(t => SimpleFeatureTypes.encodeType(t._2)),
Configuration.BatchSizeOpt -> batchSize.toString
Configuration.BatchSizeOpt -> batchSize.toString,
Configuration.VersionOpt -> GeoMesaProperties.ProjectVersion
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ case class IndexId(name: String, version: Int, attributes: Seq[String], mode: In
val state = Seq(encoded)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}

override def toString: String = encoded
}

object IndexId {
Expand Down

0 comments on commit cb3cc9e

Please sign in to comment.