From 73fad1578cf657dea67092a095b967fe408b24e7 Mon Sep 17 00:00:00 2001 From: Emilio Lahr-Vivaz Date: Mon, 29 Jan 2024 16:29:46 +0000 Subject: [PATCH] GEOMESA-3329 FSDS - Receipt time partition scheme * Also fixes the 'pattern' for date-time schemes, which is only used for display purposes. --- docs/user/filesystem/partition_schemes.rst | 15 + ...mesa.fs.storage.api.PartitionSchemeFactory | 1 + .../common/partitions/DateTimeScheme.scala | 30 +- .../common/partitions/ReceiptTimeScheme.scala | 294 ++++++++++++++++++ .../partitions/ReceiptTimeSchemeTest.scala | 102 ++++++ 5 files changed, 431 insertions(+), 11 deletions(-) create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/partitions/ReceiptTimeScheme.scala create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/partitions/ReceiptTimeSchemeTest.scala diff --git a/docs/user/filesystem/partition_schemes.rst b/docs/user/filesystem/partition_schemes.rst index 4af572b8cecc..dc5d1d3a8a34 100644 --- a/docs/user/filesystem/partition_schemes.rst +++ b/docs/user/filesystem/partition_schemes.rst @@ -86,6 +86,21 @@ Julian Schemes Julian schemes partition data by Julian day, instead of month/day. They use the patterns ``yyyy/DDD/HH/mm``, ``yyyy/DDD/HH``, and ``yyyy/DDD`` respectively +Receipt Time Scheme +^^^^^^^^^^^^^^^^^^^ + +**Name:** ``receipt-time`` + +**Configuration:** + +* ``datetime-scheme`` - The name of another date-time scheme describing the layout of the data, e.g. ``weekly`` or +``hourly``. Additional options may be required to configure the date-time scheme selected. +* ``buffer`` - The amount of time to buffer queries by, expressed as a duration, e.g. ``30 minutes``. This represents +the latency in the system. + +The receipt time scheme partitions data based on when a message is received. Generally this is useful +only for reading existing data that may have been aggregated and stored by an external process. + Spatial Schemes --------------- diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.api.PartitionSchemeFactory b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.api.PartitionSchemeFactory index 51028b66b33b..13112cf87785 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.api.PartitionSchemeFactory +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.api.PartitionSchemeFactory @@ -2,5 +2,6 @@ org.locationtech.geomesa.fs.storage.common.partitions.AttributeScheme$AttributeP org.locationtech.geomesa.fs.storage.common.partitions.CompositeScheme$CompositePartitionSchemeFactory org.locationtech.geomesa.fs.storage.common.partitions.DateTimeScheme$DateTimePartitionSchemeFactory org.locationtech.geomesa.fs.storage.common.partitions.FlatScheme$FlatPartitionSchemeFactory +org.locationtech.geomesa.fs.storage.common.partitions.ReceiptTimeScheme$ReceiptTimePartitionSchemeFactory org.locationtech.geomesa.fs.storage.common.partitions.Z2Scheme$Z2PartitionSchemeFactory org.locationtech.geomesa.fs.storage.common.partitions.XZ2Scheme$XZ2PartitionSchemeFactory diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/partitions/DateTimeScheme.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/partitions/DateTimeScheme.scala index 6e30fc1c8e81..dbdaf4bc0e6c 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/partitions/DateTimeScheme.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/partitions/DateTimeScheme.scala @@ -24,8 +24,14 @@ import scala.annotation.tailrec import scala.collection.mutable.ListBuffer import scala.util.control.NonFatal -case class DateTimeScheme(formatter: DateTimeFormatter, stepUnit: ChronoUnit, step: Int, dtg: String, dtgIndex: Int) - extends PartitionScheme { +case class DateTimeScheme( + formatter: DateTimeFormatter, + pattern: String, + stepUnit: ChronoUnit, + step: Int, + dtg: String, + dtgIndex: Int + ) extends PartitionScheme { import FilterHelper.ff import org.locationtech.geomesa.filter.{andOption, isTemporalFilter, partitionSubFilters} @@ -51,9 +57,7 @@ case class DateTimeScheme(formatter: DateTimeFormatter, stepUnit: ChronoUnit, st // TODO This may not be the best way to calculate max depth... // especially if we are going to use other separators - override val depth: Int = formatter.toString.count(_ == '/') + 1 - - override val pattern: String = formatter.toString + override val depth: Int = pattern.count(_ == '/') + 1 override def getPartitionName(feature: SimpleFeature): String = formatter.format(toInstant(feature.getAttribute(dtgIndex).asInstanceOf[Date]).atZone(ZoneOffset.UTC)) @@ -162,7 +166,11 @@ object DateTimeScheme { val Name = "datetime" def apply(format: String, stepUnit: ChronoUnit, step: Int, dtg: String, dtgIndex: Int): DateTimeScheme = - DateTimeScheme(DateTimeFormatter.ofPattern(format), stepUnit, step, dtg, dtgIndex) + DateTimeScheme(DateTimeFormatter.ofPattern(format), format, stepUnit, step, dtg, dtgIndex) + + @deprecated("Pattern is not correct when using this constructor") + def apply(formatter: DateTimeFormatter, stepUnit: ChronoUnit, step: Int, dtg: String, dtgIndex: Int): DateTimeScheme = + DateTimeScheme(formatter, formatter.toString, stepUnit, step, dtg, dtgIndex) object Config { val DateTimeFormatOpt: String = "datetime-format" @@ -175,11 +183,11 @@ object DateTimeScheme { def apply(name: String): Option[Format] = all.find(_.name.equalsIgnoreCase(name)) - case class Format private[Formats] (name: String, formatter: DateTimeFormatter, unit: ChronoUnit) + case class Format private[Formats](name: String, formatter: DateTimeFormatter, pattern: String, unit: ChronoUnit) private[Formats] object Format { def apply(name: String, format: String, unit: ChronoUnit): Format = - Format(name, DateTimeFormatter.ofPattern(format), unit) + Format(name, DateTimeFormatter.ofPattern(format), format, unit) } val Minute : Format = Format("minute", "yyyy/MM/dd/HH/mm", ChronoUnit.MINUTES) @@ -200,7 +208,7 @@ object DateTimeScheme { .appendValue(WeekFields.ISO.weekOfWeekBasedYear(), 2) .parseDefaulting(ChronoField.DAY_OF_WEEK, 1) .toFormatter() - Format("weekly", formatter, ChronoUnit.WEEKS) + Format("weekly", formatter, "YYYY/'W'ww", ChronoUnit.WEEKS) } private val all = Seq(Minute, Hourly, Daily, Weekly, Monthly, JulianMinute, JulianHourly, JulianDaily) @@ -228,9 +236,9 @@ object DateTimeScheme { val formatter = try { DateTimeFormatter.ofPattern(format) } catch { case NonFatal(e) => throw new IllegalArgumentException(s"Invalid date format '$format':", e) } - Some(DateTimeScheme(formatter, unit, step, dtg, dtgIndex)) + Some(DateTimeScheme(formatter, format, unit, step, dtg, dtgIndex)) } else { - Formats(config.name).map(f => DateTimeScheme(f.formatter, f.unit, step, dtg, dtgIndex)) + Formats(config.name).map(f => DateTimeScheme(f.formatter, f.pattern, f.unit, step, dtg, dtgIndex)) } } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/partitions/ReceiptTimeScheme.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/partitions/ReceiptTimeScheme.scala new file mode 100644 index 000000000000..a67029be5617 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/partitions/ReceiptTimeScheme.scala @@ -0,0 +1,294 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.common.partitions + +import org.geotools.filter.visitor.DuplicatingFilterVisitor +import org.geotools.temporal.`object`.{DefaultInstant, DefaultPeriod, DefaultPosition} +import org.locationtech.geomesa.fs.storage.api.PartitionScheme.SimplifiedFilter +import org.locationtech.geomesa.fs.storage.api.{NamedOptions, PartitionScheme, PartitionSchemeFactory} +import org.locationtech.geomesa.fs.storage.common.partitions.ReceiptTimeScheme.BufferingFilterVisitor +import org.locationtech.geomesa.utils.geotools.converters.FastConverter +import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType} +import org.opengis.filter._ +import org.opengis.filter.expression.{Expression, Literal, PropertyName} +import org.opengis.filter.temporal.{After, Before, During, TEquals} +import org.opengis.temporal.{Instant, Period} + +import java.util.Date +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.Duration + +/** + * Scheme for partitioning based on "receipt time", i.e. when a message is received. Generally this is useful + * only for reading existing data that may have been aggregated and stored by an external process. + * + * @param delegate delegate date time scheme options + * @param buffer amount of time to buffer queries by, in order to match a feature date to a receipt time date - + * i.e. the amount of latency in the ingest process + */ +case class ReceiptTimeScheme(delegate: DateTimeScheme, buffer: Duration) extends PartitionScheme { + + override val depth: Int = delegate.depth + + override val pattern: String = delegate.pattern + + override def getPartitionName(feature: SimpleFeature): String = delegate.getPartitionName(feature) + + override def getSimplifiedFilters(filter: Filter, partition: Option[String]): Option[Seq[SimplifiedFilter]] = { + delegate.getSimplifiedFilters(buffered(filter), partition).map { filters => + // always use the full filter since our dates are not guaranteed to match the partition bounds + filters.map(f => f.copy(filter = filter)) + } + } + + override def getIntersectingPartitions(filter: Filter): Option[Seq[String]] = + delegate.getIntersectingPartitions(buffered(filter)) + + override def getCoveringFilter(partition: String): Filter = + throw new NotImplementedError("Dates may overlap in multiple partitions") + + private def buffered(filter: Filter): Filter = + filter.accept(new BufferingFilterVisitor(buffer, delegate.dtg), null).asInstanceOf[Filter] +} + +object ReceiptTimeScheme { + + val Name = "receipt-time" + + object Config { + val DateTimeSchemaOpt: String = "datetime-scheme" + val BufferOpt : String = "buffer" + } + + class ReceiptTimePartitionSchemeFactory extends PartitionSchemeFactory { + override def load(sft: SimpleFeatureType, config: NamedOptions): Option[PartitionScheme] = { + if (config.name != Name) { None } else { + val buffer = config.options.get(Config.BufferOpt).map(Duration.apply).getOrElse(Duration.apply(30, TimeUnit.MINUTES)) + val dateTimeName = config.options.getOrElse(Config.DateTimeSchemaOpt, DateTimeScheme.Name) + val delegate = PartitionSchemeFactory.load(sft, NamedOptions(dateTimeName, config.options)) match { + case d: DateTimeScheme => d + case s => throw new IllegalArgumentException(s"Expected DateTimeScheme, but got: $s") + } + Some(ReceiptTimeScheme(delegate, buffer)) + } + } + } + + /** + * Buffers any filters against the specified date attribute by the amount specified + * + * @param buffer amount of time to buffer (on each side of) a temporal filter + * @param dtg date attribute + */ + class BufferingFilterVisitor(buffer: Duration, dtg: String) extends DuplicatingFilterVisitor { + + private var inverted = false + private val millis = buffer.toMillis + + override def visit(filter: PropertyIsBetween, extraData: AnyRef): AnyRef = { + val factory = getFactory(extraData) + def buffer(p: PropertyName, lower: Literal, upper: Literal): Option[Filter] = { + for { + lo <- Option(FastConverter.evaluate(lower, classOf[Date])) + up <- Option(FastConverter.evaluate(upper, classOf[Date])) + } yield { + val bufferedLo = bufferDown(lo) + val bufferedUp = bufferUp(up) + // account for inverted filters that may result in invalid clauses after buffering + if (bufferedLo.before(bufferedUp)) { + factory.between(p, factory.literal(bufferedLo), factory.literal(bufferedUp), filter.getMatchAction) + } else { + Filter.EXCLUDE + } + } + } + val prop = visit(filter.getExpression, extraData) + val lowerBoundary = visit(filter.getLowerBoundary, extraData) + val upperBoundary = visit(filter.getUpperBoundary, extraData) + val buffered = (prop, lowerBoundary, upperBoundary) match { + case (p: PropertyName, lower: Literal, upper: Literal) if p.getPropertyName == dtg => buffer(p, lower, upper) + case _ => None + } + + buffered.getOrElse(super.visit(filter, extraData)) + } + + override def visit(filter: PropertyIsEqualTo, extraData: AnyRef): AnyRef = { + def buffer(p: PropertyName, lit: Literal): Filter = + getFactory(extraData).between(p, bufferDown(lit, extraData), bufferUp(lit, extraData), filter.getMatchAction) + val expr1 = visit(filter.getExpression1, extraData) + val expr2 = visit(filter.getExpression2, extraData) + (expr1, expr2) match { + case (p: PropertyName, lit: Literal) if p.getPropertyName == dtg => buffer(p, lit) + case (lit: Literal, p: PropertyName) if p.getPropertyName == dtg => buffer(p, lit) + case _ => super.visit(filter, extraData) + } + } + + override def visit(filter: PropertyIsNotEqualTo, extraData: AnyRef): AnyRef = { + val expr1 = visit(filter.getExpression1, extraData) + val expr2 = visit(filter.getExpression2, extraData) + (expr1, expr2) match { + case (p: PropertyName, _: Literal) if p.getPropertyName == dtg => Filter.INCLUDE + case (_: Literal, p: PropertyName) if p.getPropertyName == dtg => Filter.INCLUDE + case _ => super.visit(filter, extraData) + } + } + + override def visit(filter: PropertyIsGreaterThan, extraData: AnyRef): AnyRef = { + val expr1 = visit(filter.getExpression1, extraData) + val expr2 = visit(filter.getExpression2, extraData) + (expr1, expr2) match { + case (p: PropertyName, lit: Literal) if p.getPropertyName == dtg => + getFactory(extraData).greater(p, bufferDown(lit, extraData), false, filter.getMatchAction) + case (lit: Literal, p: PropertyName) if p.getPropertyName == dtg => + getFactory(extraData).greater(bufferUp(lit, extraData), p, false, filter.getMatchAction) + case _ => + super.visit(filter, extraData) + } + } + + override def visit(filter: PropertyIsGreaterThanOrEqualTo, extraData: AnyRef): AnyRef = { + val expr1 = visit(filter.getExpression1, extraData) + val expr2 = visit(filter.getExpression2, extraData) + (expr1, expr2) match { + case (p: PropertyName, lit: Literal) if p.getPropertyName == dtg => + getFactory(extraData).greaterOrEqual(p, bufferDown(lit, extraData), false, filter.getMatchAction) + case (lit: Literal, p: PropertyName) if p.getPropertyName == dtg => + getFactory(extraData).greaterOrEqual(bufferUp(lit, extraData), p, false, filter.getMatchAction) + case _ => + super.visit(filter, extraData) + } + } + + override def visit(filter: PropertyIsLessThan, extraData: AnyRef): AnyRef = { + val expr1 = visit(filter.getExpression1, extraData) + val expr2 = visit(filter.getExpression2, extraData) + (expr1, expr2) match { + case (p: PropertyName, lit: Literal) if p.getPropertyName == dtg => + getFactory(extraData).less(p, bufferUp(lit, extraData), false, filter.getMatchAction) + case (lit: Literal, p: PropertyName) if p.getPropertyName == dtg => + getFactory(extraData).less(bufferDown(lit, extraData), p, false, filter.getMatchAction) + case _ => + super.visit(filter, extraData) + } + } + + override def visit(filter: PropertyIsLessThanOrEqualTo, extraData: AnyRef): AnyRef = { + val expr1 = visit(filter.getExpression1, extraData) + val expr2 = visit(filter.getExpression2, extraData) + (expr1, expr2) match { + case (p: PropertyName, lit: Literal) if p.getPropertyName == dtg => + getFactory(extraData).lessOrEqual(p, bufferUp(lit, extraData), false, filter.getMatchAction) + case (lit: Literal, p: PropertyName) if p.getPropertyName == dtg => + getFactory(extraData).lessOrEqual(bufferDown(lit, extraData), p, false, filter.getMatchAction) + case _ => + super.visit(filter, extraData) + } + } + + override def visit(filter: After, extraData: AnyRef): AnyRef = { + val expr1 = visit(filter.getExpression1, extraData) + val expr2 = visit(filter.getExpression2, extraData) + (expr1, expr2) match { + case (p: PropertyName, lit: Literal) if p.getPropertyName == dtg => + getFactory(extraData).after(p, bufferDown(lit, extraData), filter.getMatchAction) + case (lit: Literal, p: PropertyName) if p.getPropertyName == dtg => + getFactory(extraData).after(bufferUp(lit, extraData), p, filter.getMatchAction) + case _ => + super.visit(filter, extraData) + } + } + + override def visit(filter: Before, extraData: AnyRef): AnyRef = { + val expr1 = visit(filter.getExpression1, extraData) + val expr2 = visit(filter.getExpression2, extraData) + (expr1, expr2) match { + case (p: PropertyName, lit: Literal) if p.getPropertyName == dtg => + getFactory(extraData).before(p, bufferUp(lit, extraData), filter.getMatchAction) + case (lit: Literal, p: PropertyName) if p.getPropertyName == dtg => + getFactory(extraData).before(bufferDown(lit, extraData), p, filter.getMatchAction) + case _ => + super.visit(filter, extraData) + } + } + + override def visit(filter: During, extraData: AnyRef): AnyRef = { + val factory = getFactory(extraData) + def instant(date: Date): Instant = new DefaultInstant(new DefaultPosition(date)) + def buffer(p: PropertyName, lit: Literal): Option[Filter] = { + for { + period <- Option(FastConverter.evaluate(lit, classOf[Period])) + lowerPos <- Option(period.getBeginning.getPosition) + upperPos <- Option(period.getEnding.getPosition) + lower <- Option(lowerPos.getDate) + upper <- Option(upperPos.getDate) + } yield { + val low = bufferDown(lower) + val up = bufferUp(upper) + // account for inverted filters that may result in invalid clauses after buffering + if (low.before(up)) { + factory.during(p, factory.literal(new DefaultPeriod(instant(low), instant(up))), filter.getMatchAction) + } else { + Filter.EXCLUDE + } + } + } + val expr1 = visit(filter.getExpression1, extraData) + val expr2 = visit(filter.getExpression2, extraData) + val buffered = (expr1, expr2) match { + case (p: PropertyName, lit: Literal) if p.getPropertyName == dtg => buffer(p, lit) + case (lit: Literal, p: PropertyName) if p.getPropertyName == dtg => buffer(p, lit) + case _ => None + } + buffered.getOrElse(super.visit(filter, extraData)) + } + + override def visit(filter: TEquals, extraData: AnyRef): AnyRef = { + def buffer(p: PropertyName, lit: Literal): Filter = + getFactory(extraData).between(p, bufferDown(lit, extraData), bufferUp(lit, extraData), filter.getMatchAction) + val expr1 = visit(filter.getExpression1, extraData) + val expr2 = visit(filter.getExpression2, extraData) + (expr1, expr2) match { + case (p: PropertyName, lit: Literal) if p.getPropertyName == dtg => buffer(p, lit) + case (lit: Literal, p: PropertyName) if p.getPropertyName == dtg => buffer(p, lit) + case _ => super.visit(filter, extraData) + } + } + + override def visit(filter: Not, extraData: AnyRef): AnyRef = { + inverted = !inverted + val res = try { filter.getFilter.accept(this, extraData).asInstanceOf[Filter] } finally { + inverted = !inverted + } + getFactory(extraData).not(res) + } + + private def bufferUp(lit: Literal, extraData: AnyRef): Expression = + buffer(lit, if (inverted) { -1L * millis } else { millis }, extraData) + + private def bufferUp(date: Date): Date = + buffer(date, if (inverted) { -1L * millis } else { millis }) + + private def bufferDown(lit: Literal, extraData: AnyRef): Expression = + buffer(lit, if (inverted) { millis } else { -1L * millis }, extraData) + + private def bufferDown(date: Date): Date = + buffer(date, if (inverted) { millis } else { -1L * millis }) + + private def buffer(lit: Literal, offset: Long, extraData: AnyRef): Expression = { + FastConverter.convert(lit.evaluate(null), classOf[Date]) match { + case null => lit + case date => getFactory(extraData).literal(buffer(date, offset)) + } + } + + private def buffer(date: Date, offset: Long): Date = new Date(date.getTime + offset) + } +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/partitions/ReceiptTimeSchemeTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/partitions/ReceiptTimeSchemeTest.scala new file mode 100644 index 000000000000..d891e4b85491 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/partitions/ReceiptTimeSchemeTest.scala @@ -0,0 +1,102 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.common.partitions + +import org.geotools.filter.text.ecql.ECQL +import org.junit.runner.RunWith +import org.locationtech.geomesa.features.ScalaSimpleFeature +import org.locationtech.geomesa.fs.storage.api.PartitionSchemeFactory +import org.locationtech.geomesa.fs.storage.common.StorageSerialization +import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes +import org.specs2.mutable.Specification +import org.specs2.runner.JUnitRunner + +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.Duration + +@RunWith(classOf[JUnitRunner]) +class ReceiptTimeSchemeTest extends Specification { + + val sft = SimpleFeatureTypes.createType("test", "name:String,age:Int,dtg:Date,*geom:Point:srid=4326") + val sf = ScalaSimpleFeature.create(sft, "1", "test", 10, "2017-02-03T10:15:30Z", "POINT (10 10)") + + "ReceiptTimeScheme" should { + + "load from conf with named datetime scheme" >> { + val conf = """{ scheme = "receipt-time", options = { buffer = "60 minutes", datetime-scheme = "daily" }}""" + val scheme = PartitionSchemeFactory.load(sft, StorageSerialization.deserialize(conf)) + scheme must beAnInstanceOf[ReceiptTimeScheme] + scheme.asInstanceOf[ReceiptTimeScheme].buffer mustEqual Duration(60, TimeUnit.MINUTES) + val delegate = scheme.asInstanceOf[ReceiptTimeScheme].delegate + delegate.dtg mustEqual "dtg" + delegate.pattern mustEqual "yyyy/MM/dd" + } + + "load from conf with configured datetime scheme" >> { + val conf = + """ + | { + | scheme = "receipt-time" + | options = { + | buffer = "60 minutes" + | datetime-scheme = "datetime" + | datetime-format = "yyyy" + | step-unit = "years" + | step = 1 + | } + | } + """.stripMargin + + val scheme = PartitionSchemeFactory.load(sft, StorageSerialization.deserialize(conf)) + scheme must beAnInstanceOf[ReceiptTimeScheme] + scheme.asInstanceOf[ReceiptTimeScheme].buffer mustEqual Duration(60, TimeUnit.MINUTES) + val delegate = scheme.asInstanceOf[ReceiptTimeScheme].delegate + delegate.dtg mustEqual "dtg" + delegate.pattern mustEqual "yyyy" + } + + "buffer filters appropriately" >> { + val conf = """{ scheme = "receipt-time", options = { buffer = "60 minutes", datetime-scheme = "daily" }}""" + val scheme = PartitionSchemeFactory.load(sft, StorageSerialization.deserialize(conf)) + + val filtersAndResults = Seq( + ("dtg == '2024-01-02T00:01:00.000Z'", Seq("2024/01/01", "2024/01/02")), + ("dtg == '2024-01-02T01:01:00.000Z'", Seq("2024/01/02")), + ("dtg between '2024-01-02T00:00:00.000Z' and '2024-01-02T23:59:59.999Z'", Seq("2024/01/01", "2024/01/02", "2024/01/03")), + ("dtg >= '2024-01-02T00:00:00.000Z' and dtg <= '2024-01-02T23:59:59.999Z'", Seq("2024/01/01", "2024/01/02", "2024/01/03")), + ("dtg >= '2024-01-02T00:00:00.000Z' and dtg < '2024-01-03T00:00:00.000Z'", Seq("2024/01/01", "2024/01/02", "2024/01/03")), + ("dtg during 2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z", Seq("2024/01/01", "2024/01/02", "2024/01/03")), + ("dtg during 2024-01-02T12:00:00.000Z/2024-01-02T13:00:00.000Z", Seq("2024/01/02")), + ) + foreach(filtersAndResults) { case (filter, expected) => + val ecql = ECQL.toFilter(filter) + scheme.getIntersectingPartitions(ecql) must beSome(containTheSameElementsAs(expected)) + val simplified = scheme.getSimplifiedFilters(ecql).orNull + simplified must not(beNull) + simplified.flatMap(_.partitions) must containTheSameElementsAs(expected) + // verify that we don't remove the date filter from the simplified filter + foreach(simplified.map(_.filter))(_ mustEqual ecql) + } + } + + "buffer inverted filters appropriately" >> { + val conf = """{ scheme = "receipt-time", options = { buffer = "60 minutes", datetime-scheme = "daily" }}""" + val scheme = PartitionSchemeFactory.load(sft, StorageSerialization.deserialize(conf)) + + val filters = Seq( + "not(dtg during 2024-01-02T12:00:00.000Z/2024-01-02T13:00:00.000Z)", + "not(dtg between '2024-01-02T12:00:00.000Z' and '2024-01-02T12:59:59.999Z')" + ) + foreach(filters) { filter => + val ecql = ECQL.toFilter(filter) + scheme.getIntersectingPartitions(ecql) must beNone + } + } + } +}