Skip to content

Commit

Permalink
GEOMESA-3426 FSDS - Support path filtering for converter queries (#3245)
Browse files Browse the repository at this point in the history
Co-authored-by: Forrest Feaser <[email protected]>
  • Loading branch information
fdfea and Forrest Feaser authored Dec 16, 2024
1 parent b048860 commit d6d45c8
Show file tree
Hide file tree
Showing 19 changed files with 366 additions and 51 deletions.
22 changes: 22 additions & 0 deletions docs/user/filesystem/index_config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,25 @@ Observers can be specified through the user data key ``geomesa.fs.observers``:
sft.setObservers(factories)
// or set directly in the user data as a comma-delimited string
sft.getUserData.put("geomesa.fs.observers", factories.mkString(","))

Configuring Path Filters
------------------------

.. note::

Path filtering is supported for ``converter`` encoding only.

The FSDS can filter paths within a partition for more granular control of queries. Path filtering is configured
through the user data key ``geomesa.fs.path-filter.name``.

Currently, the only implementation is the ``dtg`` path filter, whose purpose is to parse a datetime from the given
path and compare it to the query filter to include or exclude the file from the query. The following options are
required for the ``dtg`` path filter, configured through the key ``geomesa.fs.path-filter.opts``:

* ``attribute`` - The ``Date`` attribute in the query to compare against.
* ``pattern`` - The regular expression, with a single capturing group, to extract a datetime string from the path.
* ``format`` - The datetime formatting pattern to parse a date from the regex capture.
* ``buffer`` - The duration to buffer the bounds of the parsed datetime by within the current partition. To buffer time
across partitions, see the ``receipt-time`` partition scheme.

Custom path filters can be loaded via SPI.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ object Bounds {
object Bound {
private val unboundedBound = Bound[Any](None, inclusive = false)
def unbounded[T]: Bound[T] = unboundedBound.asInstanceOf[Bound[T]]
def inclusive[T](value: T): Bound[T] = Bound(Option(value), inclusive = true)
}

private val allValues = Bounds(Bound.unbounded, Bound.unbounded)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.locationtech.geomesa.fs.storage.converter.pathfilter.DtgPathFiltering$DtgPathFilteringFactory
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ package org.locationtech.geomesa.fs.storage.converter

import com.typesafe.scalalogging.StrictLogging
import org.apache.commons.compress.archivers.ArchiveStreamFactory
import org.apache.hadoop.fs.{FileContext, FileSystem, Path}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.convert.EvaluationContext
import org.locationtech.geomesa.convert2.SimpleFeatureConverter
import org.locationtech.geomesa.features.{ScalaSimpleFeature, TransformSimpleFeature}
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader
import org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFiltering
import org.locationtech.geomesa.utils.collection.CloseableIterator
import org.locationtech.geomesa.utils.hadoop.HadoopDelegate.{HadoopFileHandle, HadoopTarHandle, HadoopZipHandle}
import org.locationtech.geomesa.utils.io.PathUtils
Expand All @@ -28,27 +29,34 @@ class ConverterFileSystemReader(
fs: FileSystem,
converter: SimpleFeatureConverter,
filter: Option[Filter],
transform: Option[(String, SimpleFeatureType)]
transform: Option[(String, SimpleFeatureType)],
pathFiltering: Option[PathFiltering]
) extends FileSystemPathReader with StrictLogging {

import ArchiveStreamFactory.{JAR, TAR, ZIP}

private lazy val pathFilter: Option[PathFilter] = pathFiltering.flatMap(pf => filter.map(pf.apply))

override def read(path: Path): CloseableIterator[SimpleFeature] = {
logger.debug(s"Opening file $path")
val iter = try {
val handle = PathUtils.getUncompressedExtension(path.getName).toLowerCase(Locale.US) match {
case TAR => new HadoopTarHandle(fs, path)
case ZIP | JAR => new HadoopZipHandle(fs, path)
case _ => new HadoopFileHandle(fs, path)
}
handle.open.flatMap { case (name, is) =>
val params = EvaluationContext.inputFileParam(name.getOrElse(handle.path))
converter.process(is, converter.createEvaluationContext(params))
if (pathFilter.forall(_.accept(path))) {
logger.debug(s"Opening file $path")
val iter = try {
val handle = PathUtils.getUncompressedExtension(path.getName).toLowerCase(Locale.US) match {
case TAR => new HadoopTarHandle(fs, path)
case ZIP | JAR => new HadoopZipHandle(fs, path)
case _ => new HadoopFileHandle(fs, path)
}
handle.open.flatMap { case (name, is) =>
val params = EvaluationContext.inputFileParam(name.getOrElse(handle.path))
converter.process(is, converter.createEvaluationContext(params))
}
} catch {
case NonFatal(e) => logger.error(s"Error processing uri '$path'", e); CloseableIterator.empty
}
} catch {
case NonFatal(e) => logger.error(s"Error processing uri '$path'", e); CloseableIterator.empty
transformed(filtered(iter))
} else {
CloseableIterator.empty
}
transformed(filtered(iter))
}

private def filtered(in: CloseableIterator[SimpleFeature]): CloseableIterator[SimpleFeature] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.fs.storage.common.utils.PathCache
import org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFiltering

class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, converter: SimpleFeatureConverter)
class ConverterStorage(context: FileSystemContext,
metadata: StorageMetadata,
converter: SimpleFeatureConverter,
pathFiltering: Option[PathFiltering])
extends AbstractFileSystemStorage(context, metadata, "") {

// TODO close converter...
Expand All @@ -36,7 +40,7 @@ class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, co
override protected def createReader(
filter: Option[Filter],
transform: Option[(String, SimpleFeatureType)]): FileSystemPathReader = {
new ConverterFileSystemReader(context.fs, converter, filter, transform)
new ConverterFileSystemReader(context.fs, converter, filter, transform, pathFiltering)
}

override def getFilePaths(partition: String): Seq[StorageFilePath] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ import com.typesafe.scalalogging.LazyLogging
import org.locationtech.geomesa.convert.{ConfArgs, ConverterConfigResolver}
import org.locationtech.geomesa.convert2.SimpleFeatureConverter
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.converter.ConverterStorageFactory.{ConverterConfigParam, ConverterNameParam}
import org.locationtech.geomesa.fs.storage.converter.ConverterStorageFactory._
import org.locationtech.geomesa.fs.storage.converter.pathfilter.{PathFiltering, PathFilteringFactory}

import java.util.regex.Pattern

class ConverterStorageFactory extends FileSystemStorageFactory with LazyLogging {

import scala.collection.JavaConverters._

override val encoding: String = "converter"

override def apply(context: FileSystemContext, metadata: StorageMetadata): FileSystemStorage = {
Expand All @@ -29,9 +34,22 @@ class ConverterStorageFactory extends FileSystemStorageFactory with LazyLogging
}
SimpleFeatureConverter(metadata.sft, converterConfig)
}
new ConverterStorage(context, metadata, converter)
}

val pathFilteringOpts =
context.conf.getValByRegex(Pattern.quote(PathFilterOptsPrefix) + ".*").asScala.map {
case (k, v) => k.substring(PathFilterOptsPrefix.length) -> v
}

val pathFiltering = Option(context.conf.get(PathFilterName)).flatMap { name =>
val factory = PathFilteringFactory.load(NamedOptions(name, pathFilteringOpts.toMap))
if (factory.isEmpty) {
throw new IllegalArgumentException(s"Failed to load ${classOf[PathFiltering].getName} for config '$name'")
}
factory
}

new ConverterStorage(context, metadata, converter, pathFiltering)
}
}

object ConverterStorageFactory {
Expand All @@ -43,4 +61,6 @@ object ConverterStorageFactory {
val LeafStorageParam = "fs.options.leaf-storage"
val PartitionSchemeParam = "fs.partition-scheme.name"
val PartitionOptsPrefix = "fs.partition-scheme.opts."
val PathFilterName = "fs.path-filter.name"
val PathFilterOptsPrefix = "fs.path-filter.opts."
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.converter.pathfilter

import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.fs.PathFilter
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.filter.Bounds.Bound
import org.locationtech.geomesa.filter.{Bounds, FilterHelper}
import org.locationtech.geomesa.fs.storage.api.NamedOptions

import java.time.format.DateTimeFormatter
import java.time.{ZoneOffset, ZonedDateTime}
import java.util.regex.Pattern
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

class DtgPathFiltering(attribute: String, pattern: Pattern, format: DateTimeFormatter, buffer: Duration)
extends PathFiltering with LazyLogging {

def apply(filter: Filter): PathFilter = {
val filterIntervals = FilterHelper.extractIntervals(filter, attribute, handleExclusiveBounds = true)
path => try {
val time = parseDtg(path.getName).toInstant
val millis = buffer.toMillis
val lower = ZonedDateTime.ofInstant(time.minusMillis(millis), ZoneOffset.UTC)
val upper = ZonedDateTime.ofInstant(time.plusMillis(millis), ZoneOffset.UTC)
val buffered = Bounds(Bound.inclusive(lower), Bound.inclusive(upper))
val included = filterIntervals.exists(bounds => bounds.intersects(buffered))
logger.whenDebugEnabled {
if (included) {
logger.debug(s"Including path ${path.getName} for filter $filter")
} else {
logger.debug(s"Excluding path ${path.getName} for filter $filter")
}
}
included
} catch {
case NonFatal(ex) =>
logger.warn(s"Failed to evaluate filter for path '${path.getName}'", ex)
true
}
}

private def parseDtg(name: String): ZonedDateTime = {
Option(name)
.map(pattern.matcher)
.filter(_.matches)
.filter(_.groupCount > 0)
.map(_.group(1))
.map(ZonedDateTime.parse(_, format))
.getOrElse {
throw new IllegalArgumentException(s"Failed to parse ${classOf[ZonedDateTime].getName} " +
s"from file name '$name' for pattern '$pattern' and format '$format'")
}
}

override def toString: String = {
s"${this.getClass.getName}(attribute = $attribute, pattern = $pattern, format = $format, buffer = $buffer)"
}
}

object DtgPathFiltering extends LazyLogging {

val Name = "dtg"

object Config {
val Attribute = "attribute"
val Pattern = "pattern"
val Format = "format"
val Buffer = "buffer"
}

class DtgPathFilteringFactory extends PathFilteringFactory {
override def load(config: NamedOptions): Option[PathFiltering] = {
if (config.name != Name) { None } else {
val attribute = config.options.getOrElse(Config.Attribute, null)
require(attribute != null, s"$Name path filter requires a dtg attribute config '${Config.Attribute}'")
val patternConfig = config.options.getOrElse(Config.Pattern, null)
require(patternConfig != null, s"$Name path filter requires a dtg pattern config '${Config.Pattern}'")
val formatConfig = config.options.getOrElse(Config.Format, null)
require(formatConfig != null, s"$Name path filter requires a dtg format config '${Config.Format}'")
val bufferConfig = config.options.getOrElse(Config.Buffer, null)
require(bufferConfig != null, s"$Name path filter requires a buffer duration config '${Config.Buffer}'")

val pattern = Pattern.compile(patternConfig)
val format = DateTimeFormatter.ofPattern(formatConfig).withZone(ZoneOffset.UTC)
val buffer = Duration.apply(bufferConfig)
val pathFiltering = new DtgPathFiltering(attribute, pattern, format, buffer)
logger.info(s"Loaded PathFiltering: $pathFiltering")
Some(pathFiltering)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.converter.pathfilter

import org.apache.hadoop.fs.PathFilter
import org.geotools.api.filter.Filter

trait PathFiltering {
def apply(filter: Filter): PathFilter
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.converter.pathfilter

import org.locationtech.geomesa.fs.storage.api.NamedOptions

import java.util.ServiceLoader

trait PathFilteringFactory {
def load(config: NamedOptions): Option[PathFiltering]
}

object PathFilteringFactory {

import scala.collection.JavaConverters._

private lazy val factories = ServiceLoader.load(classOf[PathFilteringFactory]).asScala.toSeq

def load(config: NamedOptions): Option[PathFiltering] = factories.toStream.flatMap(_.load(config)).headOption
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
0,Harry,17,2024-12-11T01:00:00.000Z,0.0,0.0
1,Hermione,18,2024-12-11T11:00:00.000Z,0.0,0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
2,Ronald,17,2024-12-11T07:00:00.000Z,0.0,0.0
3,Draco,18,2024-12-11T11:00:00.000Z,0.0,0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
4,Neville,17,2024-12-11T13:00:00.000Z,0.0,0.0
5,Rubeus,43,2024-12-11T08:00:00.000Z,0.0,0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
6,Severus,52,2024-12-11T19:00:00.000Z,0.0,0.0
7,Alfred,78,2024-12-11T23:00:00.000Z,0.0,0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
8,Dean,17,2024-12-11T23:50:00.000Z,0.0,0.0
9,Minerva,57,2024-12-12T00:30:00.000Z,0.0,0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
10,Luna,17,2024-12-12T03:00:00.000Z,0.0,0.0
11,Dudley,19,2024-12-12T05:00:00.000Z,0.0,0.0
Loading

0 comments on commit d6d45c8

Please sign in to comment.