From 444a9fd6f8f73b5e3b453847327fdd453a50e846 Mon Sep 17 00:00:00 2001 From: Emilio Date: Thu, 2 Jan 2025 10:36:39 -0500 Subject: [PATCH] GEOMESA-3411 FSDS - Fix path cache registration order (#3249) --- .../exporters/FileSystemExporter.scala | 11 ++--- .../common/AbstractFileSystemStorage.scala | 1 - .../fs/storage/common/utils/PathCache.scala | 2 +- .../fs/storage/orc/OrcFileSystemStorage.scala | 2 +- .../fs/storage/orc/OrcFileSystemWriter.scala | 8 ++-- .../storage/orc/OrcFileSystemWriterTest.scala | 31 +++++++------- .../parquet/ParquetFileSystemStorage.scala | 41 +++++++++++++------ 7 files changed, 55 insertions(+), 41 deletions(-) diff --git a/geomesa-features/geomesa-feature-exporters/src/main/scala/org/locationtech/geomesa/features/exporters/FileSystemExporter.scala b/geomesa-features/geomesa-feature-exporters/src/main/scala/org/locationtech/geomesa/features/exporters/FileSystemExporter.scala index 47421e7927c5..f6e61cb7f5e1 100644 --- a/geomesa-features/geomesa-feature-exporters/src/main/scala/org/locationtech/geomesa/features/exporters/FileSystemExporter.scala +++ b/geomesa-features/geomesa-feature-exporters/src/main/scala/org/locationtech/geomesa/features/exporters/FileSystemExporter.scala @@ -12,8 +12,8 @@ import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} +import org.locationtech.geomesa.fs.storage.api.FileSystemContext import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter -import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration import org.locationtech.geomesa.fs.storage.orc.OrcFileSystemWriter import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.ParquetFileSystemWriter import org.locationtech.geomesa.utils.io.PathUtils @@ -52,22 +52,23 @@ object FileSystemExporter extends LazyLogging { class ParquetFileSystemExporter(path: String) extends FileSystemExporter { override protected def createWriter(sft: SimpleFeatureType): FileSystemWriter = { + // use PathUtils.getUrl to handle local files, otherwise default can be in hdfs + val file = new Path(PathUtils.getUrl(path).toURI) val conf = new Configuration() - StorageConfiguration.setSft(conf, sft) try { Class.forName("org.xerial.snappy.Snappy") } catch { case _: ClassNotFoundException => logger.warn("SNAPPY compression is not available on the classpath - falling back to GZIP") conf.set("parquet.compression", "GZIP") } - // use PathUtils.getUrl to handle local files, otherwise default can be in hdfs - new ParquetFileSystemWriter(sft, new Path(PathUtils.getUrl(path).toURI), conf) + new ParquetFileSystemWriter(sft, FileSystemContext(file, conf), file) } } class OrcFileSystemExporter(path: String) extends FileSystemExporter { override protected def createWriter(sft: SimpleFeatureType): FileSystemWriter = { // use PathUtils.getUrl to handle local files, otherwise default can be in hdfs - new OrcFileSystemWriter(sft, new Configuration(), new Path(PathUtils.getUrl(path).toURI)) + val file = new Path(PathUtils.getUrl(path).toURI) + new OrcFileSystemWriter(sft, FileSystemContext(file, new Configuration()), file) } } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala index 5fcfb47ac9cf..389562243f56 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala @@ -233,7 +233,6 @@ abstract class AbstractFileSystemStorage( def pathAndObserver: WriterConfig = { val path = StorageUtils.nextFile(context.root, partition, metadata.leafStorage, extension, fileType) - PathCache.register(context.fs, path) val updateObserver = new UpdateObserver(partition, path, action) val observer = if (observers.isEmpty) { updateObserver } else { new CompositeObserver(observers.map(_.apply(path)).+:(updateObserver)) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala index 519fe4efaa38..4385e15d1831 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala @@ -50,7 +50,7 @@ object PathCache { ) /** - * * Register a path as existing + * Register a path as existing * * @param fs file system * @param path path diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala index a69b92690162..10a4f811fb9e 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala @@ -33,7 +33,7 @@ class OrcFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata extends AbstractFileSystemStorage(context, metadata, OrcFileSystemStorage.FileExtension) { override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter = - new OrcFileSystemWriter(metadata.sft, context.conf, file, observer) + new OrcFileSystemWriter(metadata.sft, context, file, observer) override protected def createReader( filter: Option[Filter], diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriter.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriter.scala index 58f4ae822638..498c183f8102 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriter.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriter.scala @@ -8,13 +8,14 @@ package org.locationtech.geomesa.fs.storage.orc -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.orc.OrcFile import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} +import org.locationtech.geomesa.fs.storage.api.FileSystemContext import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.NoOpObserver +import org.locationtech.geomesa.fs.storage.common.utils.PathCache import org.locationtech.geomesa.fs.storage.orc.utils.OrcAttributeWriter import org.locationtech.geomesa.utils.io.CloseQuietly @@ -22,14 +23,14 @@ import scala.util.control.NonFatal class OrcFileSystemWriter( sft: SimpleFeatureType, - config: Configuration, + context: FileSystemContext, file: Path, observer: FileSystemObserver = NoOpObserver ) extends FileSystemWriter { private val schema = OrcFileSystemStorage.createTypeDescription(sft) - private val options = OrcFile.writerOptions(config).setSchema(schema) + private val options = OrcFile.writerOptions(context.conf).setSchema(schema) private val writer = OrcFile.createWriter(file, options) private val batch = schema.createRowBatch() @@ -56,6 +57,7 @@ class OrcFileSystemWriter( case NonFatal(e) => CloseQuietly(Seq(writer, observer)).foreach(e.addSuppressed); throw e } CloseQuietly.raise(Seq(writer, observer)) + PathCache.register(context.fs, file) } private def flushBatch(): Unit = { diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriterTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriterTest.scala index be8402137c38..4eb530acb6f9 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriterTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriterTest.scala @@ -13,6 +13,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.junit.runner.RunWith import org.locationtech.geomesa.features.ScalaSimpleFeature +import org.locationtech.geomesa.fs.storage.api.FileSystemContext import org.locationtech.geomesa.utils.collection.SelfClosingIterator import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes import org.locationtech.geomesa.utils.io.WithClose @@ -32,26 +33,22 @@ class OrcFileSystemWriterTest extends Specification { ScalaSimpleFeature.create(sft, "1", "name1", "1", "2017-01-01T00:00:01.000Z", "LINESTRING (10 1, 5 1)") ) - val config = new Configuration() - "OrcFileSystemWriter" should { "write and read simple features" in { - - withPath { path => - withTestFile { file => - WithClose(new OrcFileSystemWriter(sft, config, file)) { writer => features.foreach(writer.write) } - val reader = new OrcFileSystemReader(sft, config, None, None) - val read = WithClose(reader.read(file)) { i => SelfClosingIterator(i).map(ScalaSimpleFeature.copy).toList } - read mustEqual features - // test out not calling 'hasNext' - var i = 0 - WithClose(reader.read(file)) { iter => - while (i < features.size) { - iter.next() mustEqual features(i) - i += 1 - } - iter.next must throwA[NoSuchElementException] + withTestFile { file => + val fc = FileSystemContext(file.getParent, new Configuration()) + WithClose(new OrcFileSystemWriter(sft, fc, file)) { writer => features.foreach(writer.write) } + val reader = new OrcFileSystemReader(sft, fc.conf, None, None) + val read = WithClose(reader.read(file)) { i => SelfClosingIterator(i).map(ScalaSimpleFeature.copy).toList } + read mustEqual features + // test out not calling 'hasNext' + var i = 0 + WithClose(reader.read(file)) { iter => + while (i < features.size) { + iter.next() mustEqual features(i) + i += 1 } + iter.next must throwA[NoSuchElementException] } } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala index 64a640167dd8..d77fe7a52c86 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala @@ -13,6 +13,7 @@ import org.apache.parquet.hadoop.ParquetReader import org.apache.parquet.hadoop.example.GroupReadSupport import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.parquet.example.data.Group import org.apache.parquet.filter2.compat.FilterCompat import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.geotools.api.filter.Filter @@ -24,9 +25,12 @@ import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.File import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.NoOpObserver +import org.locationtech.geomesa.fs.storage.common.utils.PathCache import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.ParquetFileSystemWriter import org.locationtech.geomesa.utils.io.CloseQuietly +import scala.util.control.NonFatal + /** * * @param context file system context @@ -35,11 +39,8 @@ import org.locationtech.geomesa.utils.io.CloseQuietly class ParquetFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata) extends AbstractFileSystemStorage(context, metadata, ParquetFileSystemStorage.FileExtension) { - override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter = { - val sftConf = new Configuration(context.conf) - StorageConfiguration.setSft(sftConf, metadata.sft) - new ParquetFileSystemWriter(metadata.sft, file, sftConf, observer) - } + override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter = + new ParquetFileSystemWriter(metadata.sft, context, file, observer) override protected def createReader( filter: Option[Filter], @@ -72,11 +73,17 @@ object ParquetFileSystemStorage extends LazyLogging { class ParquetFileSystemWriter( sft: SimpleFeatureType, + context: FileSystemContext, file: Path, - conf: Configuration, observer: FileSystemObserver = NoOpObserver ) extends FileSystemWriter { + private val conf = { + val conf = new Configuration(context.conf) + StorageConfiguration.setSft(conf, sft) + conf + } + private val writer = SimpleFeatureParquetWriter.builder(file, conf).build() override def write(f: SimpleFeature): Unit = { @@ -86,27 +93,35 @@ object ParquetFileSystemStorage extends LazyLogging { override def flush(): Unit = observer.flush() override def close(): Unit = { CloseQuietly(Seq(writer, observer)).foreach(e => throw e) - if (FileValidationEnabled.get.toBoolean) { + PathCache.register(context.fs, file) + if (FileValidationEnabled.toBoolean.get) { validateParquetFile(file) } } } + /** + * Validate a file by reading it back + * + * @param file file to validate + */ def validateParquetFile(file: Path): Unit = { - val reader = ParquetReader.builder(new GroupReadSupport(), file).build() - + var reader: ParquetReader[Group] = null try { - // Read Parquet file content + // read Parquet file content + reader = ParquetReader.builder(new GroupReadSupport(), file).build() var record = reader.read() while (record != null) { // Process the record record = reader.read() } - logger.debug(s"${file} is a valid Parquet file") + logger.trace(s"$file is a valid Parquet file") } catch { - case e: Exception => throw new RuntimeException(s"Unable to validate ${file}: File may be corrupted", e) + case NonFatal(e) => throw new RuntimeException(s"Unable to validate $file: File may be corrupted", e) } finally { - reader.close() + if (reader != null) { + reader.close() + } } } }