From e88d258880cb283dfd2f7fb2bc817193f36dcf58 Mon Sep 17 00:00:00 2001 From: Emilio Date: Wed, 6 Nov 2024 08:52:49 -0500 Subject: [PATCH] GEOMESA-3411 FSDS - Fix reads of tar files from s3 (#3228) * Replace Hadoop FileContext with FileSystem, as FileContext seems to close the underlying FileSystem prematurely --- build/cqs.tsv | 7 +- build/test/resources/log4j.xml | 7 + geomesa-fs/geomesa-fs-datastore/pom.xml | 19 +++ .../geomesa/fs/data/FileSystemDataStore.scala | 34 +++- .../fs/data/FileSystemDataStoreFactory.scala | 16 +- .../fs/data/FileSystemFeatureStore.scala | 5 +- .../fs/data/FileSystemStorageManager.scala | 24 +-- .../src/test/resources/day-scheme.conf | 0 .../fs/converter/ConverterDataStoreTest.scala | 152 ++++++++++++++---- .../geomesa/fs/storage/api/package.scala | 10 +- .../common/AbstractFileSystemStorage.scala | 10 +- .../common/SizeableFileSystemStorage.scala | 2 +- .../common/jobs/PartitionOutputFormat.scala | 6 +- .../common/metadata/FileBasedMetadata.scala | 36 ++--- .../metadata/FileBasedMetadataFactory.scala | 22 ++- .../common/metadata/MetadataJson.scala | 14 +- .../fs/storage/common/utils/PathCache.scala | 67 ++++---- .../metadata/FileBasedMetadataTest.scala | 25 ++- .../common/metadata/JdbcMetadataTest.scala | 6 +- .../common/metadata/MetadataJsonTest.scala | 8 +- .../converter/ConverterFileSystemReader.scala | 10 +- .../storage/converter/ConverterMetadata.scala | 16 +- .../storage/converter/ConverterStorage.scala | 4 +- .../ConverterFileSystemStorageTest.scala | 4 +- .../orc/OrcFileSystemStorageTest.scala | 18 +-- .../geomesa/parquet/CompactionTest.scala | 12 +- .../geomesa/parquet/ParquetStorageTest.scala | 16 +- .../compact/FileSystemCompactionJob.scala | 4 +- .../tools/compact/PartitionInputFormat.scala | 8 +- .../tools/ingest/FileSystemConverterJob.scala | 8 +- .../fs/tools/ingest/FsIngestCommand.scala | 6 +- .../ingest/FsManageMetadataCommand.scala | 6 +- .../fs/tools/ingest/CompactCommandTest.scala | 2 +- .../ingest/FsManageMetadataCommandTest.scala | 4 +- .../geomesa/tools/export/ExportCommand.scala | 2 +- .../geomesa/tools/export/ExportToFsTest.scala | 5 +- .../geomesa/utils/hadoop/HadoopDelegate.scala | 63 ++++---- .../geomesa/utils/io/CompressionUtils.scala | 10 +- .../geomesa/utils/io/PathUtils.scala | 2 +- .../utils/io/fs/FileSystemDelegate.scala | 20 ++- .../geomesa/utils/io/fs/LocalDelegate.scala | 20 +-- pom.xml | 18 ++- 42 files changed, 436 insertions(+), 292 deletions(-) delete mode 100644 geomesa-fs/geomesa-fs-datastore/src/test/resources/day-scheme.conf diff --git a/build/cqs.tsv b/build/cqs.tsv index aa94d96180b9..81dfb47d01d8 100644 --- a/build/cqs.tsv +++ b/build/cqs.tsv @@ -349,8 +349,9 @@ org.slf4j:jul-to-slf4j 1.7.36 test org.specs2:specs2-core_2.12 4.20.5 test org.specs2:specs2-junit_2.12 4.20.5 test org.specs2:specs2-mock_2.12 4.20.5 test -org.testcontainers:kafka 1.19.7 test -org.testcontainers:postgresql 1.19.7 test -org.testcontainers:testcontainers 1.19.7 test +org.testcontainers:kafka 1.20.3 test +org.testcontainers:minio 1.20.3 test +org.testcontainers:postgresql 1.20.3 test +org.testcontainers:testcontainers 1.20.3 test org.wololo:jts2geojson 0.16.1 test org.xerial.snappy:snappy-java 1.1.10.5 test diff --git a/build/test/resources/log4j.xml b/build/test/resources/log4j.xml index 051b86fef69d..487166bd90da 100644 --- a/build/test/resources/log4j.xml +++ b/build/test/resources/log4j.xml @@ -7,9 +7,16 @@ + + + + + + + diff --git a/geomesa-fs/geomesa-fs-datastore/pom.xml b/geomesa-fs/geomesa-fs-datastore/pom.xml index 3fa5d9acf837..e77d1417d757 100644 --- a/geomesa-fs/geomesa-fs-datastore/pom.xml +++ b/geomesa-fs/geomesa-fs-datastore/pom.xml @@ -83,11 +83,30 @@ geomesa-fs-storage-orc_${scala.binary.version} test + + org.apache.hadoop + hadoop-aws + test + + + software.amazon.awssdk + s3 + test + + + software.amazon.awssdk + s3-transfer-manager + test + org.testcontainers testcontainers + + org.testcontainers + minio + org.geomesa.testcontainers testcontainers-accumulo diff --git a/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemDataStore.scala b/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemDataStore.scala index f1d096c8aa5e..3a4dea6519c4 100644 --- a/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemDataStore.scala +++ b/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemDataStore.scala @@ -10,7 +10,7 @@ package org.locationtech.geomesa.fs.data import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileContext, Path} +import org.apache.hadoop.fs.{FileContext, FileSystem, Path} import org.geotools.api.data.Query import org.geotools.api.feature.`type`.Name import org.geotools.api.feature.simple.SimpleFeatureType @@ -18,6 +18,7 @@ import org.geotools.data.store.{ContentDataStore, ContentEntry, ContentFeatureSo import org.locationtech.geomesa.fs.storage.api._ import org.locationtech.geomesa.fs.storage.common.StorageKeys import org.locationtech.geomesa.fs.storage.common.metadata.FileBasedMetadata +import org.locationtech.geomesa.fs.storage.common.utils.PathCache import org.locationtech.geomesa.index.stats.RunnableStats.UnoptimizedRunnableStats import org.locationtech.geomesa.index.stats.{GeoMesaStats, HasGeoMesaStats} import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes @@ -27,8 +28,19 @@ import org.locationtech.geomesa.utils.io.CloseQuietly import scala.concurrent.duration.Duration import scala.util.control.NonFatal +/** + * File system data store + * + * @param fs file system - note, this is expected to be a shared resource, and is not cleaned up on data store dispose + * @param conf conf + * @param root root path + * @param readThreads number of threads used for read ops + * @param writeTimeout timeout for write ops + * @param defaultEncoding default file encoding + * @param namespace geoserver namespace + */ class FileSystemDataStore( - fc: FileContext, + fs: FileSystem, conf: Configuration, root: Path, readThreads: Int, @@ -37,9 +49,21 @@ class FileSystemDataStore( namespace: Option[String] ) extends ContentDataStore with HasGeoMesaStats with LazyLogging { + // noinspection ScalaUnusedSymbol + @deprecated("Use FileSystem instead of FileContext") + def this( + fc: FileContext, + conf: Configuration, + root: Path, + readThreads: Int, + writeTimeout: Duration, + defaultEncoding: Option[String], + namespace: Option[String]) = + this(FileSystem.get(root.toUri, conf), conf, root, readThreads, writeTimeout, defaultEncoding, namespace) + namespace.foreach(setNamespaceURI) - private val manager = FileSystemStorageManager(fc, conf, root, namespace) + private val manager = FileSystemStorageManager(fs, conf, root, namespace) override val stats: GeoMesaStats = new UnoptimizedRunnableStats(this) @@ -82,13 +106,15 @@ class FileSystemDataStore( val fileSize = sft.removeTargetFileSize() val path = manager.defaultPath(sft.getTypeName) - val context = FileSystemContext(fc, conf, path, namespace) + val context = FileSystemContext(fs, conf, path, namespace) val metadata = StorageMetadataFactory.create(context, meta, Metadata(sft, encoding, scheme, leafStorage, fileSize)) try { manager.register(path, FileSystemStorageFactory(context, metadata)) } catch { case NonFatal(e) => CloseQuietly(metadata).foreach(e.addSuppressed); throw e } + PathCache.register(fs, root) + PathCache.register(fs, path) } } diff --git a/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemDataStoreFactory.scala b/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemDataStoreFactory.scala index d754402e351d..c8714acb364a 100644 --- a/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemDataStoreFactory.scala +++ b/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemDataStoreFactory.scala @@ -8,9 +8,8 @@ package org.locationtech.geomesa.fs.data -import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileContext, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.geotools.api.data.DataAccessFactory.Param import org.geotools.api.data.{DataStore, DataStoreFactorySpi} import org.locationtech.geomesa.fs.storage.api.FileSystemStorageFactory @@ -30,7 +29,6 @@ import scala.concurrent.duration.Duration class FileSystemDataStoreFactory extends DataStoreFactorySpi { import FileSystemDataStoreFactory.FileSystemDataStoreParams._ - import FileSystemDataStoreFactory.fileContextCache override def createDataStore(params: java.util.Map[String, _]): DataStore = { @@ -45,8 +43,6 @@ class FileSystemDataStoreFactory extends DataStoreFactorySpi { conf } - val fc = fileContextCache.get(conf) - val path = new Path(PathParam.lookup(params)) val encoding = EncodingParam.lookupOpt(params).filterNot(_.isEmpty) @@ -63,7 +59,9 @@ class FileSystemDataStoreFactory extends DataStoreFactorySpi { val namespace = NamespaceParam.lookupOpt(params) - new FileSystemDataStore(fc, conf, path, readThreads, writeTimeout, encoding, namespace) + val fs = FileSystem.get(path.toUri, conf) + + new FileSystemDataStore(fs, conf, path, readThreads, writeTimeout, encoding, namespace) } override def createNewDataStore(params: java.util.Map[String, _]): DataStore = @@ -105,12 +103,6 @@ object FileSystemDataStoreFactory extends GeoMesaDataStoreInfo { private val configuration = new Configuration() - private val fileContextCache: LoadingCache[Configuration, FileContext] = Caffeine.newBuilder().build( - new CacheLoader[Configuration, FileContext]() { - override def load(conf: Configuration): FileContext = FileContext.getFileContext(conf) - } - ) - object FileSystemDataStoreParams extends NamespaceParams { val WriterFileTimeout: SystemProperty = SystemProperty("geomesa.fs.writer.partition.timeout", "60s") diff --git a/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemFeatureStore.scala b/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemFeatureStore.scala index 239ed4322bb2..76802dbf9b2c 100644 --- a/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemFeatureStore.scala +++ b/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemFeatureStore.scala @@ -81,13 +81,16 @@ class FileSystemFeatureStore( } override def canLimit: Boolean = false + override def canLimit(query: Query): Boolean = false override def canTransact: Boolean = false override def canEvent: Boolean = false override def canReproject: Boolean = false override def canSort: Boolean = false - + override def canSort(query: Query): Boolean = false override def canFilter: Boolean = true + override def canFilter(query: Query): Boolean = true override def canRetype: Boolean = true + override def canRetype(query: Query): Boolean = true override protected def buildQueryCapabilities(): QueryCapabilities = FileSystemFeatureStore.capabilities } diff --git a/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemStorageManager.scala b/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemStorageManager.scala index c4631675582c..56273964511c 100644 --- a/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemStorageManager.scala +++ b/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemStorageManager.scala @@ -11,7 +11,7 @@ package org.locationtech.geomesa.fs.data import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine} import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileContext, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.locationtech.geomesa.fs.storage.api._ import org.locationtech.geomesa.fs.storage.common.utils.PathCache import org.locationtech.geomesa.utils.io.CloseQuietly @@ -23,11 +23,11 @@ import scala.util.control.NonFatal /** * Manages the storages and associated simple feature types underneath a given path * - * @param fc file context + * @param fs file system * @param conf configuration * @param root root path for the data store */ -class FileSystemStorageManager private (fc: FileContext, conf: Configuration, root: Path, namespace: Option[String]) +class FileSystemStorageManager private (fs: FileSystem, conf: Configuration, root: Path, namespace: Option[String]) extends MethodProfiling with LazyLogging { import scala.collection.JavaConverters._ @@ -42,7 +42,7 @@ class FileSystemStorageManager private (fc: FileContext, conf: Configuration, ro */ def storage(typeName: String): Option[FileSystemStorage] = { cache.get(typeName).map(_._2) // check cached values - .orElse(Some(defaultPath(typeName)).filter(PathCache.exists(fc, _)).flatMap(loadPath)) // check expected (default) path + .orElse(Some(defaultPath(typeName)).filter(PathCache.exists(fs, _)).flatMap(loadPath)) // check expected (default) path .orElse(loadAll().find(_.metadata.sft.getTypeName == typeName)) // check other paths until we find it } @@ -80,8 +80,8 @@ class FileSystemStorageManager private (fc: FileContext, conf: Configuration, ro * @return */ private def loadAll(): Iterator[FileSystemStorage] = { - if (!PathCache.exists(fc, root)) { Iterator.empty } else { - val dirs = PathCache.list(fc, root).filter(_.isDirectory).map(_.getPath) + if (!PathCache.exists(fs, root)) { Iterator.empty } else { + val dirs = PathCache.list(fs, root).filter(_.isDirectory).map(_.getPath) dirs.filterNot(path => cache.exists { case (_, (p, _)) => p == path }).flatMap(loadPath) } } @@ -99,7 +99,7 @@ class FileSystemStorageManager private (fc: FileContext, conf: Configuration, ro logger.debug(s"${ if (storage.isDefined) "Loaded" else "No" } storage at path '$path' in ${time}ms") profile(complete _) { - val context = FileSystemContext(fc, conf, path, namespace) + val context = FileSystemContext(fs, conf, path, namespace) StorageMetadataFactory.load(context).map { meta => try { val storage = FileSystemStorageFactory(context, meta) @@ -116,8 +116,8 @@ class FileSystemStorageManager private (fc: FileContext, conf: Configuration, ro object FileSystemStorageManager { private val cache = Caffeine.newBuilder().build( - new CacheLoader[(FileContext, Configuration, Path, Option[String]), FileSystemStorageManager]() { - override def load(key: (FileContext, Configuration, Path, Option[String])): FileSystemStorageManager = + new CacheLoader[(FileSystem, Configuration, Path, Option[String]), FileSystemStorageManager]() { + override def load(key: (FileSystem, Configuration, Path, Option[String])): FileSystemStorageManager = new FileSystemStorageManager(key._1, key._2, key._3, key._4) } ) @@ -125,11 +125,11 @@ object FileSystemStorageManager { /** * Load a cached storage manager instance * - * @param fc file context + * @param fs file system * @param conf configuration * @param root data store root path * @return */ - def apply(fc: FileContext, conf: Configuration, root: Path, namespace: Option[String]): FileSystemStorageManager = - cache.get((fc, conf, root, namespace)) + def apply(fs: FileSystem, conf: Configuration, root: Path, namespace: Option[String]): FileSystemStorageManager = + cache.get((fs, conf, root, namespace)) } diff --git a/geomesa-fs/geomesa-fs-datastore/src/test/resources/day-scheme.conf b/geomesa-fs/geomesa-fs-datastore/src/test/resources/day-scheme.conf deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/geomesa-fs/geomesa-fs-datastore/src/test/scala/org/locationtech/geomesa/fs/converter/ConverterDataStoreTest.scala b/geomesa-fs/geomesa-fs-datastore/src/test/scala/org/locationtech/geomesa/fs/converter/ConverterDataStoreTest.scala index 0cbca4b563a4..9fc298c83ee5 100644 --- a/geomesa-fs/geomesa-fs-datastore/src/test/scala/org/locationtech/geomesa/fs/converter/ConverterDataStoreTest.scala +++ b/geomesa-fs/geomesa-fs-datastore/src/test/scala/org/locationtech/geomesa/fs/converter/ConverterDataStoreTest.scala @@ -9,53 +9,89 @@ package org.locationtech.geomesa.fs.converter import com.typesafe.config.{ConfigFactory, ConfigRenderOptions} +import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveOutputStream} +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream +import org.apache.commons.io.IOUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Options.CreateOpts +import org.apache.hadoop.fs.{CreateFlag, FileContext, Path} import org.geotools.api.data.{DataStoreFinder, Query, Transaction} import org.geotools.api.feature.simple.SimpleFeature import org.geotools.api.filter.Filter import org.junit.runner.RunWith +import org.locationtech.geomesa.utils.collection.SelfClosingIterator +import org.locationtech.geomesa.utils.io.WithClose +import org.slf4j.LoggerFactory import org.specs2.mutable.Specification import org.specs2.runner.JUnitRunner +import org.specs2.specification.BeforeAfterAll +import org.testcontainers.containers.MinIOContainer +import org.testcontainers.containers.output.Slf4jLogConsumer +import org.testcontainers.utility.DockerImageName +import java.io.{BufferedOutputStream, ByteArrayInputStream} +import java.nio.charset.StandardCharsets import scala.collection.mutable -/** - * Created by hulbert on 6/21/17. - */ + @RunWith(classOf[JUnitRunner]) -class ConverterDataStoreTest extends Specification { +class ConverterDataStoreTest extends Specification with BeforeAfterAll { import scala.collection.JavaConverters._ sequential + var minio: MinIOContainer = _ + val bucket = "geomesa" + + override def beforeAll(): Unit = { + minio = + new MinIOContainer( + DockerImageName.parse("minio/minio").withTag(sys.props.getOrElse("minio.docker.tag", "RELEASE.2024-10-29T16-01-48Z"))) + minio.start() + minio.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("minio"))) + minio.execInContainer("mc", "alias", "set", "localhost", "http://localhost:9000", minio.getUserName, minio.getPassword) + minio.execInContainer("mc", "mb", s"localhost/$bucket") + } + + override def afterAll(): Unit = { + if (minio != null) { + minio.close() + } + } + def fsConfig(converter: String, path: String): String = { - s""" - | + val props = Seq( + prop("fs.options.converter.path", path), + prop("fs.partition-scheme.name", "datetime"), + prop("fs.partition-scheme.opts.datetime-format", "yyyy/DDD/HH/mm"), + prop("fs.partition-scheme.opts.step-unit", "MINUTES"), + prop("fs.partition-scheme.opts.step", "15"), + prop("fs.partition-scheme.opts.dtg-attribute", "dtg"), + prop("fs.options.leaf-storage", "true"), + ) + s""" |$converter - | fs.options.converter.path$path - | fs.partition-scheme.namedatetime - | fs.partition-scheme.opts.datetime-formatyyyy/DDD/HH/mm - | fs.partition-scheme.opts.step-unitMINUTES - | fs.partition-scheme.opts.step15 - | fs.partition-scheme.opts.dtg-attributedtg - | fs.options.leaf-storagetrue + |${props.mkString("\n")} | |""".stripMargin } def sftByName(name: String): String = { - s""" - | fs.options.sft.name$name - | fs.options.converter.name$name - |""".stripMargin + Seq( + prop("fs.options.sft.name", name), + prop("fs.options.converter.name", name), + ).mkString("\n") } def sftByConf(conf: String): String = { - s""" - | fs.options.sft.conf$conf - | fs.options.converter.conf$conf - |""".stripMargin + Seq( + prop("fs.options.sft.conf", conf), + prop("fs.options.converter.conf", conf), + ).mkString("\n") } + def prop(key: String, value: String): String = s" $key$value" + "ConverterDataStore" should { "work with one datastore" >> { val ds = DataStoreFinder.getDataStore(Map( @@ -70,12 +106,8 @@ class ConverterDataStoreTest extends Specification { types.head mustEqual "fs-test" val q = new Query("fs-test", Filter.INCLUDE) - val fr = ds.getFeatureReader(q, Transaction.AUTO_COMMIT) - val feats = mutable.ListBuffer.empty[SimpleFeature] - while (fr.hasNext) { - feats += fr.next() - } - feats.size mustEqual 4 + val feats = SelfClosingIterator(ds.getFeatureReader(q, Transaction.AUTO_COMMIT)).toList + feats must haveLength(4) } "work with something else" >> { @@ -91,12 +123,68 @@ class ConverterDataStoreTest extends Specification { types.head mustEqual "fs-test" val q = new Query("fs-test", Filter.INCLUDE) - val fr = ds.getFeatureReader(q, Transaction.AUTO_COMMIT) - val feats = mutable.ListBuffer.empty[SimpleFeature] - while (fr.hasNext) { - feats += fr.next() + val feats = SelfClosingIterator(ds.getFeatureReader(q, Transaction.AUTO_COMMIT)).toList + feats must haveLength(4) + } + + "read tar.gz files from s3 storage" >> { + val bucket = s"s3a://${this.bucket}/" + val config = { + val props = Seq( + sftByName("fs-test"), + prop("fs.s3a.endpoint", minio.getS3URL), + prop("fs.s3a.access.key", minio.getUserName), + prop("fs.s3a.secret.key", minio.getPassword), + prop("fs.s3a.path.style.access", "true"), + prop("dfs.client.use.datanode.hostname", "true"), + ).mkString("\n") + fsConfig(props, "datastore1") } - feats.size mustEqual 4 + val fc = { + val conf = new Configuration() + conf.addResource(new ByteArrayInputStream(config.getBytes(StandardCharsets.UTF_8))) + FileContext.getFileContext(conf) + } + // number of times to write the sample files into our tgz + // note: we need fairly large files to trigger GEOMESA-3411 + val multiplier = 177156 + Seq("00", "15", "30", "45").foreach { file => + val path = s"datastore1/2017/001/01/$file" + val contents = WithClose(getClass.getClassLoader.getResourceAsStream(s"example/$path"))(IOUtils.toByteArray) + WithClose(fc.create(new Path(s"$bucket$path.tgz"), java.util.EnumSet.of(CreateFlag.CREATE), CreateOpts.createParent())) { os => + WithClose(new BufferedOutputStream(os)) { buf => + WithClose(new GzipCompressorOutputStream(buf)) { gz => + WithClose(new TarArchiveOutputStream(gz)) { tar => + val entry = new TarArchiveEntry(file) + entry.setSize(contents.length * multiplier) + tar.putArchiveEntry(entry) + var i = 0 + while (i < multiplier) { + tar.write(contents) + i += 1 + } + tar.closeArchiveEntry() + tar.finish() + } + } + } + } + } + + val ds = DataStoreFinder.getDataStore(Map( + "fs.path" -> bucket, + "fs.encoding" -> "converter", + "fs.config.xml" -> config, + ).asJava) + ds must not(beNull) + + val types = ds.getTypeNames + types must haveSize(1) + types.head mustEqual "fs-test" + + val q = new Query("fs-test", Filter.INCLUDE) + val count = SelfClosingIterator(ds.getFeatureReader(q, Transaction.AUTO_COMMIT)).length + count mustEqual multiplier * 4 } "load sft as a string" >> { diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/package.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/package.scala index 9259f8a8996c..9bf4a90d1370 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/package.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/package.scala @@ -9,7 +9,7 @@ package org.locationtech.geomesa.fs.storage import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileContext, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.geotools.api.filter.Filter @@ -22,11 +22,15 @@ package object api { /** * Holder for file system references * - * @param fc file context + * @param fs file system * @param conf configuration * @param root root path */ - case class FileSystemContext(fc: FileContext, conf: Configuration, root: Path, namespace: Option[String] = None) + case class FileSystemContext(fs: FileSystem, conf: Configuration, root: Path, namespace: Option[String] = None) + + object FileSystemContext { + def apply(root: Path, conf: Configuration): FileSystemContext = FileSystemContext(root.getFileSystem(conf), conf, root) + } /** * Identifier plus configuration diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala index d8cc86722ff1..5fcfb47ac9cf 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala @@ -91,7 +91,7 @@ abstract class AbstractFileSystemStorage( val baseDir = StorageUtils.baseDirectory(context.root, partition, metadata.leafStorage) p.files.flatMap { file => val path = new Path(baseDir, file.name) - if (PathCache.exists(context.fc, path)) { + if (PathCache.exists(context.fs, path)) { Seq(StorageFilePath(file, path)) } else { logger.warn(s"Inconsistent metadata for ${metadata.sft.getTypeName}: $path") @@ -201,10 +201,10 @@ abstract class AbstractFileSystemStorage( val failures = ListBuffer.empty[Path] toCompact.foreach { file => - if (!context.fc.delete(file.path, false)) { + if (!context.fs.delete(file.path, false)) { failures.append(file.path) } - PathCache.invalidate(context.fc, file.path) + PathCache.invalidate(context.fs, file.path) } if (failures.nonEmpty) { @@ -233,7 +233,7 @@ abstract class AbstractFileSystemStorage( def pathAndObserver: WriterConfig = { val path = StorageUtils.nextFile(context.root, partition, metadata.leafStorage, extension, fileType) - PathCache.register(context.fc, path) + PathCache.register(context.fs, path) val updateObserver = new UpdateObserver(partition, path, action) val observer = if (observers.isEmpty) { updateObserver } else { new CompositeObserver(observers.map(_.apply(path)).+:(updateObserver)) @@ -278,7 +278,7 @@ abstract class AbstractFileSystemStorage( writer.close() writer = null // adjust our estimate to account for the actual bytes written - total += context.fc.getFileStatus(path).getLen + total += context.fs.getFileStatus(path).getLen estimator.update(total, count) remaining = estimator.estimate(0L) } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/SizeableFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/SizeableFileSystemStorage.scala index cf41b1af9df6..dbe908323675 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/SizeableFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/SizeableFileSystemStorage.scala @@ -39,7 +39,7 @@ trait SizeableFileSystemStorage extends FileSystemStorage { * @return true if the file is appropriately sized */ def fileIsSized(path: Path, target: Long): Boolean = { - val size = context.fc.getFileStatus(path).getLen + val size = context.fs.getFileStatus(path).getLen math.abs((size.toDouble / target) - 1d) <= fileSizeError } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/jobs/PartitionOutputFormat.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/jobs/PartitionOutputFormat.scala index dae3c9351c1c..02c8e6f1ea70 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/jobs/PartitionOutputFormat.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/jobs/PartitionOutputFormat.scala @@ -9,7 +9,7 @@ package org.locationtech.geomesa.fs.storage.common.jobs import com.typesafe.scalalogging.LazyLogging -import org.apache.hadoop.fs.{FileContext, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.InvalidJobConfException import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat} @@ -57,7 +57,7 @@ class PartitionOutputFormat(delegate: SingleFileOutputFormat) extends OutputForm private val storage = { val conf = context.getConfiguration val root = StorageConfiguration.getRootPath(conf) - val fsc = FileSystemContext(FileContext.getFileContext(root.toUri, conf), conf, root) + val fsc = FileSystemContext(root, conf) val metadata = StorageMetadataFactory.load(fsc).getOrElse { throw new IllegalArgumentException(s"No storage defined under path '$root'") } @@ -171,7 +171,7 @@ class PartitionOutputFormat(delegate: SingleFileOutputFormat) extends OutputForm writer.close(context) writer = null // adjust our estimate to account for the actual bytes written - total += storage.context.fc.getFileStatus(path).getLen + total += storage.context.fs.getFileStatus(path).getLen estimator.update(total, count) remaining = estimator.estimate(0L) } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadata.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadata.scala index 4c2ef533d127..e8ea0f492ecc 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadata.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadata.scala @@ -11,7 +11,6 @@ package org.locationtech.geomesa.fs.storage.common.metadata import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache} import com.typesafe.config._ import com.typesafe.scalalogging.LazyLogging -import org.apache.hadoop.fs.Options.CreateOpts import org.apache.hadoop.fs._ import org.geotools.api.feature.simple.SimpleFeatureType import org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata @@ -46,14 +45,14 @@ import scala.util.control.NonFatal * reload. In general this does not cause problems, as reads and writes happen in different JVMs (ingest * vs query). * - * @param fc file context + * @param fs file system * @param directory metadata root path * @param sft simple feature type * @param meta basic metadata config * @param converter file converter */ class FileBasedMetadata( - private val fc: FileContext, + private val fs: FileSystem, val directory: Path, val sft: SimpleFeatureType, private val meta: Metadata, @@ -94,7 +93,7 @@ class FileBasedMetadata( override def set(key: String, value: String): Unit = { kvs.put(key, value) - FileBasedMetadataFactory.write(fc, directory.getParent, meta.copy(config = kvs.asScala.toMap)) + FileBasedMetadataFactory.write(fs, directory.getParent, meta.copy(config = kvs.asScala.toMap)) } override def getPartitions(prefix: Option[String]): Seq[PartitionMetadata] = { @@ -204,12 +203,12 @@ class FileBasedMetadata( val encoded = StringSerialization.alphaNumericSafeString(config.name) val name = s"$UpdatePartitionPrefix$encoded-${UUID.randomUUID()}${converter.suffix}" val file = new Path(directory, name) - WithClose(fc.create(file, java.util.EnumSet.of(CreateFlag.CREATE), CreateOpts.createParent)) { out => + WithClose(fs.create(file, false)) { out => out.write(data.getBytes(StandardCharsets.UTF_8)) out.hflush() out.hsync() } - PathCache.register(fc, file) + PathCache.register(fs, file) file } } @@ -225,19 +224,18 @@ class FileBasedMetadata( } profile("Persisted compacted partition configuration") { val file = new Path(directory, CompactedPrefix + converter.suffix) - val flags = java.util.EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) - WithClose(fc.create(file, flags, CreateOpts.createParent)) { out => + WithClose(fs.create(file, true)) { out => out.write(data.getBytes(StandardCharsets.UTF_8)) out.hflush() out.hsync() } - PathCache.register(fc, file) + PathCache.register(fs, file) // generally we overwrite the existing file but if we change rendering the name will change val toRemove = new Path(directory, if (converter.suffix == HoconPathSuffix) { CompactedJson } else { CompactedHocon }) - if (PathCache.exists(fc, toRemove, reload = true)) { - fc.delete(toRemove, false) - PathCache.invalidate(fc, toRemove) + if (PathCache.exists(fs, toRemove, reload = true)) { + fs.delete(toRemove, false) + PathCache.invalidate(fs, toRemove) } } } @@ -257,7 +255,7 @@ class FileBasedMetadata( val pool = new CachedThreadPool(threads) // use a phaser to track worker thread completion val phaser = new Phaser(2) // 1 for the initial directory worker + 1 for this thread - pool.submit(new DirectoryWorker(pool, phaser, fc.listStatus(directory), result)) + pool.submit(new DirectoryWorker(pool, phaser, fs.listStatusIterator(directory), result)) // wait for the worker threads to complete try { phaser.awaitAdvanceInterruptibly(phaser.arrive()) @@ -306,7 +304,7 @@ class FileBasedMetadata( private def readPartitionConfig(file: Path): Option[PartitionConfig] = { try { val config = profile("Loaded partition configuration") { - WithClose(new InputStreamReader(fc.open(file), StandardCharsets.UTF_8)) { in => + WithClose(new InputStreamReader(fs.open(file), StandardCharsets.UTF_8)) { in => ConfigFactory.parseReader(in, ConfigParseOptions.defaults().setSyntax(getSyntax(file.getName))) } } @@ -327,7 +325,7 @@ class FileBasedMetadata( private def readCompactedConfig(file: Path): Seq[PartitionConfig] = { try { val config = profile("Loaded compacted partition configuration") { - WithClose(new InputStreamReader(fc.open(file), StandardCharsets.UTF_8)) { in => + WithClose(new InputStreamReader(fs.open(file), StandardCharsets.UTF_8)) { in => ConfigFactory.parseReader(in, ConfigParseOptions.defaults().setSyntax(getSyntax(file.getName))) } } @@ -347,11 +345,11 @@ class FileBasedMetadata( */ private def delete(paths: Iterable[Path], threads: Int): Unit = { if (threads < 2) { - paths.foreach(fc.delete(_, false)) + paths.foreach(fs.delete(_, false)) } else { val ec = new CachedThreadPool(threads) try { - paths.toList.map(p => ec.submit(new Runnable() { override def run(): Unit = fc.delete(p, false)})).foreach(_.get) + paths.toList.map(p => ec.submit(new Runnable() { override def run(): Unit = fs.delete(p, false)})).foreach(_.get) } finally { ec.shutdown() } @@ -376,7 +374,7 @@ class FileBasedMetadata( if (status.isDirectory) { i += 1 // use a tiered phaser on each directory avoid the limit of 65535 registered parties - es.submit(new DirectoryWorker(es, new Phaser(phaser, 1), fc.listStatus(path), result)) + es.submit(new DirectoryWorker(es, new Phaser(phaser, 1), fs.listStatusIterator(path), result)) } else if (name.startsWith(UpdatePartitionPrefix)) { // pull out the partition name but don't parse the file yet val encoded = name.substring(8, name.length - 42) // strip out prefix and suffix @@ -491,7 +489,7 @@ object FileBasedMetadata { * @return */ def copy(m: FileBasedMetadata): FileBasedMetadata = - new FileBasedMetadata(m.fc, m.directory, m.sft, m.meta, m.converter) + new FileBasedMetadata(m.fs, m.directory, m.sft, m.meta, m.converter) private def getSyntax(file: String): ConfigSyntax = { if (file.endsWith(HoconPathSuffix)) { diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadataFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadataFactory.scala index a4689347a3a1..1ada242f483a 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadataFactory.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadataFactory.scala @@ -9,8 +9,7 @@ package org.locationtech.geomesa.fs.storage.common.metadata import com.typesafe.scalalogging.LazyLogging -import org.apache.hadoop.fs.Options.CreateOpts -import org.apache.hadoop.fs.{CreateFlag, FileContext, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.locationtech.geomesa.fs.storage.api._ import org.locationtech.geomesa.fs.storage.common.metadata.FileBasedMetadata.Config import org.locationtech.geomesa.fs.storage.common.utils.PathCache @@ -58,9 +57,9 @@ class FileBasedMetadataFactory extends StorageMetadataFactory { PartitionSchemeFactory.load(sft, meta.scheme) val renderer = config.get(Config.RenderKey).map(MetadataConverter.apply).getOrElse(RenderCompact) MetadataJson.writeMetadata(context, NamedOptions(name, config + (Config.RenderKey -> renderer.name))) - FileBasedMetadataFactory.write(context.fc, context.root, meta) + FileBasedMetadataFactory.write(context.fs, context.root, meta) val directory = new Path(context.root, FileBasedMetadataFactory.MetadataDirectory) - val metadata = new FileBasedMetadata(context.fc, directory, sft, meta, renderer) + val metadata = new FileBasedMetadata(context.fs, directory, sft, meta, renderer) FileBasedMetadataFactory.cache.put(FileBasedMetadataFactory.key(context), metadata) metadata } @@ -80,12 +79,12 @@ object FileBasedMetadataFactory extends MethodProfiling with LazyLogging { val loader = new java.util.function.Function[String, FileBasedMetadata]() { override def apply(ignored: String): FileBasedMetadata = { val file = new Path(context.root, StoragePath) - if (!PathCache.exists(context.fc, file)) { null } else { + if (!PathCache.exists(context.fs, file)) { null } else { val directory = new Path(context.root, MetadataDirectory) - val meta = WithClose(context.fc.open(file))(MetadataSerialization.deserialize) + val meta = WithClose(context.fs.open(file))(MetadataSerialization.deserialize) val sft = namespaced(meta.sft, context.namespace) val renderer = config.get(Config.RenderKey).map(MetadataConverter.apply).getOrElse(RenderPretty) - new FileBasedMetadata(context.fc, directory, sft, meta, renderer) + new FileBasedMetadata(context.fs, directory, sft, meta, renderer) } } } @@ -95,18 +94,17 @@ object FileBasedMetadataFactory extends MethodProfiling with LazyLogging { /** * Write basic metadata to disk. This should be done once, when the storage is created * - * @param fc file context + * @param fs file system * @param root root path * @param metadata simple feature type, file encoding, partition scheme, etc */ - private [metadata] def write(fc: FileContext, root: Path, metadata: Metadata): Unit = { + private [metadata] def write(fs: FileSystem, root: Path, metadata: Metadata): Unit = { val file = new Path(root, StoragePath) - val flags = java.util.EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) - WithClose(fc.create(file, flags, CreateOpts.createParent)) { out => + WithClose(fs.create(file, true)) { out => MetadataSerialization.serialize(out, metadata) out.hflush() out.hsync() } - PathCache.register(fc, file) + PathCache.register(fs, file) } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/MetadataJson.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/MetadataJson.scala index c1d4d3915b9c..5822d7a08ba5 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/MetadataJson.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/MetadataJson.scala @@ -49,9 +49,9 @@ object MetadataJson extends MethodProfiling { // using an atomic operation or cache loader can cause problems, as we sometimes insert into the // map during the load, which is not allowed val file = new Path(context.root, MetadataPath) - if (PathCache.exists(context.fc, file)) { + if (PathCache.exists(context.fs, file)) { val config = profile("Loaded metadata configuration") { - WithClose(new InputStreamReader(context.fc.open(file), StandardCharsets.UTF_8)) { in => + WithClose(new InputStreamReader(context.fs.open(file), StandardCharsets.UTF_8)) { in => ConfigFactory.load(ConfigFactory.parseReader(in, ParseOptions)) // call load to resolve sys props } } @@ -61,8 +61,8 @@ object MetadataJson extends MethodProfiling { } cache.put(key, cached) } else { - context.fc.rename(file, new Path(context.root, s"$MetadataPath.bak")) - PathCache.invalidate(context.fc, file) + context.fs.rename(file, new Path(context.root, s"$MetadataPath.bak")) + PathCache.invalidate(context.fs, file) transitionMetadata(context, config).foreach { meta => cached = meta // will be set in the cache in the transition code } @@ -80,7 +80,7 @@ object MetadataJson extends MethodProfiling { */ def writeMetadata(context: FileSystemContext, metadata: NamedOptions): Unit = { val file = new Path(context.root, MetadataPath) - if (PathCache.exists(context.fc, file, reload = true)) { + if (PathCache.exists(context.fs, file, reload = true)) { throw new IllegalArgumentException( s"Trying to create a new storage instance but metadata already exists at '$file'") } @@ -92,7 +92,7 @@ object MetadataJson extends MethodProfiling { // either side of the expression (typesafe will concatenate them), i.e. "foo ${bar}" -> "foo "${bar}"" val interpolated = data.replaceAll("\\$\\{[a-zA-Z0-9_.]+}", "\"$0\"") profile("Persisted metadata configuration") { - WithClose(context.fc.create(file, java.util.EnumSet.of(CreateFlag.CREATE), CreateOpts.createParent)) { out => + WithClose(context.fs.create(file, false)) { out => out.write(interpolated.getBytes(StandardCharsets.UTF_8)) out.hflush() out.hsync() @@ -104,7 +104,7 @@ object MetadataJson extends MethodProfiling { .loadOrThrow[NamedOptions] } cache.put(context.root.toUri.toString, toCache) - PathCache.register(context.fc, file) + PathCache.register(context.fs, file) } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala index cf11d66ec340..519fe4efaa38 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala @@ -9,7 +9,7 @@ package org.locationtech.geomesa.fs.storage.common.utils import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine} -import org.apache.hadoop.fs.{FileContext, FileStatus, Path, RemoteIterator} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, RemoteIterator} import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty import java.util.concurrent.TimeUnit @@ -27,97 +27,96 @@ object PathCache { // cache for checking existence of files private val pathCache = Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build( - new CacheLoader[(FileContext, Path), java.lang.Boolean]() { - override def load(key: (FileContext, Path)): java.lang.Boolean = key._1.util.exists(key._2) + new CacheLoader[(FileSystem, Path), java.lang.Boolean]() { + override def load(key: (FileSystem, Path)): java.lang.Boolean = key._1.exists(key._2) } ) // cache for individual file status private val statusCache = Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build( - new CacheLoader[(FileContext, Path), FileStatus]() { - override def load(key: (FileContext, Path)): FileStatus = key._1.getFileStatus(key._2) + new CacheLoader[(FileSystem, Path), FileStatus]() { + override def load(key: (FileSystem, Path)): FileStatus = key._1.getFileStatus(key._2) } ) // cache for checking directory contents private val listCache = Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build( - new CacheLoader[(FileContext, Path), Stream[FileStatus]]() { - override def load(key: (FileContext, Path)): Stream[FileStatus] = - RemoteIterator(key._1.listStatus(key._2)).toStream + new CacheLoader[(FileSystem, Path), Stream[FileStatus]]() { + override def load(key: (FileSystem, Path)): Stream[FileStatus] = + RemoteIterator(key._1.listStatusIterator(key._2)).toStream } ) /** * * Register a path as existing * - * @param fc file context + * @param fs file system * @param path path - * @param status file status, if available - * @param list directory contents, if available */ - def register( - fc: FileContext, - path: Path, - status: Option[FileStatus] = None, - list: Option[Stream[FileStatus]] = None): Unit = { - pathCache.put((fc, path), java.lang.Boolean.TRUE) - status.foreach(statusCache.put((fc, path), _)) - list.foreach(listCache.put((fc, path), _)) + def register(fs: FileSystem, path: Path): Unit = { + pathCache.put((fs, path), java.lang.Boolean.TRUE) + val status = statusCache.refresh((fs, path)) + val parent = path.getParent + if (parent != null) { + listCache.getIfPresent((fs, parent)) match { + case null => // no-op + case list => listCache.put((fs, parent), list :+ status.get()) + } + } } /** * Check to see if a path exists * - * @param fc file context + * @param fs file system * @param path path * @param reload reload the file status from the underlying file system before checking * @return */ - def exists(fc: FileContext, path: Path, reload: Boolean = false): Boolean = { + def exists(fs: FileSystem, path: Path, reload: Boolean = false): Boolean = { if (reload) { - invalidate(fc, path) + invalidate(fs, path) } - pathCache.get((fc, path)).booleanValue() + pathCache.get((fs, path)).booleanValue() } /** * Gets the file status for a path * - * @param fc file context + * @param fs file system * @param path path * @return */ - def status(fc: FileContext, path: Path, reload: Boolean = false): FileStatus = { + def status(fs: FileSystem, path: Path, reload: Boolean = false): FileStatus = { if (reload) { - invalidate(fc, path) + invalidate(fs, path) } - statusCache.get((fc, path)) + statusCache.get((fs, path)) } /** * List the children of a path * - * @param fc file context + * @param fs file system * @param dir directory path * @return */ - def list(fc: FileContext, dir: Path, reload: Boolean = false): Iterator[FileStatus] = { + def list(fs: FileSystem, dir: Path, reload: Boolean = false): Iterator[FileStatus] = { if (reload) { - invalidate(fc, dir) + invalidate(fs, dir) } - listCache.get((fc, dir)).iterator + listCache.get((fs, dir)).iterator } /** * Invalidate any cached values for the path - they will be re-loaded on next access * - * @param fc file context + * @param fs file system * @param path path */ - def invalidate(fc: FileContext, path: Path): Unit = - Seq(pathCache, statusCache, listCache).foreach(_.invalidate((fc, path))) + def invalidate(fs: FileSystem, path: Path): Unit = Seq(pathCache, statusCache, listCache).foreach(_.invalidate((fs, path))) object RemoteIterator { def apply[T](iter: RemoteIterator[T]): Iterator[T] = new Iterator[T] { diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadataTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadataTest.scala index c5d4094068e3..9c66a0581096 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadataTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadataTest.scala @@ -10,8 +10,7 @@ package org.locationtech.geomesa.fs.storage.common.metadata import org.apache.commons.io.{FileUtils, IOUtils} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.permission.FsPermission -import org.apache.hadoop.fs.{FileContext, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.junit.runner.RunWith import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{PartitionBounds, PartitionMetadata, StorageFile, StorageFileAction} import org.locationtech.geomesa.fs.storage.api._ @@ -33,7 +32,7 @@ class FileBasedMetadataTest extends Specification with AllExpectations { import scala.collection.JavaConverters._ lazy val conf = new Configuration() - lazy val fc = FileContext.getFileContext(conf) + lazy val fs = FileSystem.get(conf) val sft = SimpleFeatureTypes.createType("metadata", "name:String,dtg:Date,*geom:Point:srid=4326;geomesa.user-data.prefix=desc,desc.name=姓名,desc.dtg=ひづけ,desc.geom=좌표") val encoding = "parquet" @@ -53,7 +52,7 @@ class FileBasedMetadataTest extends Specification with AllExpectations { "create and persist an empty metadata file" in { withPath { context => val created = factory.create(context, Map.empty, meta) - PathCache.invalidate(fc, context.root) + PathCache.invalidate(fs, context.root) factory.load(context) must beSome(created) foreach(Seq(created, FileBasedMetadata.copy(created))) { metadata => metadata.encoding mustEqual encoding @@ -69,7 +68,7 @@ class FileBasedMetadataTest extends Specification with AllExpectations { created.addPartition(PartitionMetadata("1", Seq(f1), new Envelope(-10, 10, -5, 5), 10L)) created.addPartition(PartitionMetadata("1", Seq(f2,f3), new Envelope(-11, 11, -5, 5), 20L)) created.addPartition(PartitionMetadata("2", Seq(f5, f6), new Envelope(-1, 1, -5, 5), 20L)) - PathCache.invalidate(fc, context.root) + PathCache.invalidate(fs, context.root) factory.load(context) must beSome(created) foreach(Seq(created, FileBasedMetadata.copy(created))) { metadata => metadata.encoding mustEqual encoding @@ -87,10 +86,10 @@ class FileBasedMetadataTest extends Specification with AllExpectations { val created = factory.create(context, Map.empty, meta) created.addPartition(PartitionMetadata("1", Seq(f1), new Envelope(-10, 10, -5, 5), 10L)) created.addPartition(PartitionMetadata("1", Seq(f2, f3), new Envelope(-11, 11, -5, 5), 20L)) - fc.mkdir(new Path(context.root, "metadata/nested/"), FsPermission.getDirDefault, false) - fc.util.listStatus(new Path(context.root, "metadata")).foreach { file => + fs.mkdirs(new Path(context.root, "metadata/nested/")) + fs.listStatus(new Path(context.root, "metadata")).foreach { file => if (file.getPath.getName.startsWith("update-")) { - fc.rename(file.getPath, new Path(context.root, s"metadata/nested/${file.getPath.getName}")) + fs.rename(file.getPath, new Path(context.root, s"metadata/nested/${file.getPath.getName}")) } } factory.load(context) must beSome(created) @@ -176,7 +175,7 @@ class FileBasedMetadataTest extends Specification with AllExpectations { metadata.addPartition(PartitionMetadata("1", Seq(f1), new Envelope(-10, 10, -5, 5), 10L)) val updates = list(metadata.directory).filter(_.startsWith("update")) updates must haveLength(1) - val update = WithClose(fc.open(new Path(metadata.directory, updates.head))) { in => + val update = WithClose(fs.open(new Path(metadata.directory, updates.head))) { in => IOUtils.toString(in, StandardCharsets.UTF_8) } update must not(beEmpty) @@ -185,7 +184,7 @@ class FileBasedMetadataTest extends Specification with AllExpectations { metadata.compact(None) val compactions = list(metadata.directory).filter(_.startsWith("compact")) compactions must haveLength(1) - val compaction = WithClose(fc.open(new Path(metadata.directory, compactions.head))) { in => + val compaction = WithClose(fs.open(new Path(metadata.directory, compactions.head))) { in => IOUtils.toString(in, StandardCharsets.UTF_8) } compaction must not(beEmpty) @@ -198,7 +197,7 @@ class FileBasedMetadataTest extends Specification with AllExpectations { val path = new Path(url.toURI) val conf = new Configuration() conf.set("parquet.compression", "gzip") - val context = FileSystemContext(FileContext.getFileContext(url.toURI), conf, path) + val context = FileSystemContext(path, conf) val metadata = StorageMetadataFactory.load(context).orNull metadata must beAnInstanceOf[FileBasedMetadata] val partitions = metadata.getPartitions() @@ -215,14 +214,14 @@ class FileBasedMetadataTest extends Specification with AllExpectations { def withPath[R](code: FileSystemContext => R): R = { val file = Files.createTempDirectory("geomesa").toFile.getPath - try { code(FileSystemContext(fc, conf, new Path(file))) } finally { + try { code(FileSystemContext(fs, conf, new Path(file))) } finally { FileUtils.deleteDirectory(new File(file)) } } def list(dir: Path): Seq[String] = { val builder = Seq.newBuilder[String] - val iter = fc.listStatus(dir) + val iter = fs.listStatusIterator(dir) while (iter.hasNext) { val status = iter.next val path = status.getPath diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/JdbcMetadataTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/JdbcMetadataTest.scala index aa7569bc47f1..bb35a0ac5ba8 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/JdbcMetadataTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/JdbcMetadataTest.scala @@ -11,7 +11,7 @@ package org.locationtech.geomesa.fs.storage.common.metadata import com.typesafe.scalalogging.LazyLogging import org.apache.commons.io.{FileUtils, IOUtils} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileContext, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.junit.runner.RunWith import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{PartitionBounds, PartitionMetadata, StorageFile, StorageFileAction} import org.locationtech.geomesa.fs.storage.api.{FileSystemContext, Metadata, NamedOptions, PartitionSchemeFactory} @@ -35,7 +35,7 @@ class JdbcMetadataTest extends Specification with LazyLogging with BeforeAfterAl import scala.collection.JavaConverters._ lazy val conf = new Configuration() - lazy val fc = FileContext.getFileContext(conf) + lazy val fs = FileSystem.get(conf) val sft = SimpleFeatureTypes.createType("metadata", "name:String,dtg:Date,*geom:Point:srid=4326;geomesa.user-data.prefix=desc,desc.name=姓名,desc.dtg=ひづけ,desc.geom=좌표") val encoding = "parquet" @@ -252,7 +252,7 @@ class JdbcMetadataTest extends Specification with LazyLogging with BeforeAfterAl def withPath[R](code: FileSystemContext => R): R = { val file = Files.createTempDirectory("geomesa").toFile.getPath - try { code(FileSystemContext(fc, conf, new Path(file))) } finally { + try { code(FileSystemContext(fs, conf, new Path(file))) } finally { FileUtils.deleteDirectory(new File(file)) } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/MetadataJsonTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/MetadataJsonTest.scala index 8a615fffdb7c..9b2c735e9d12 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/MetadataJsonTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/MetadataJsonTest.scala @@ -10,7 +10,7 @@ package org.locationtech.geomesa.fs.storage.common.metadata import org.apache.commons.io.{FileUtils, IOUtils} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileContext, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.junit.runner.RunWith import org.locationtech.geomesa.fs.storage.api._ import org.locationtech.geomesa.fs.storage.common.metadata.MetadataJson.MetadataPath @@ -26,7 +26,7 @@ import java.nio.file.Files class MetadataJsonTest extends Specification { lazy val conf = new Configuration() - lazy val fc = FileContext.getFileContext(conf) + lazy val fs = FileSystem.get(conf) "MetadataJson" should { "persist and replace system properties (and environment variables)" in { @@ -39,7 +39,7 @@ class MetadataJsonTest extends Specification { val opts = NamedOptions("jdbc", Map("user" -> "root", "password" -> interpolated)) MetadataJson.writeMetadata(context, opts) val file = new Path(context.root, MetadataPath) - val serialized = WithClose(context.fc.open(file))(is => IOUtils.toString(is, StandardCharsets.UTF_8)) + val serialized = WithClose(context.fs.open(file))(is => IOUtils.toString(is, StandardCharsets.UTF_8)) serialized must contain(interpolated) serialized must not(contain("bar")) val returned = MetadataJson.readMetadata(context) @@ -53,7 +53,7 @@ class MetadataJsonTest extends Specification { def withPath[R](code: FileSystemContext => R): R = { val file = Files.createTempDirectory("geomesa").toFile.getPath - try { code(FileSystemContext(fc, conf, new Path(file))) } finally { + try { code(FileSystemContext(fs, conf, new Path(file))) } finally { FileUtils.deleteDirectory(new File(file)) } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala index 741bc33fa956..6db37de760bc 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala @@ -10,7 +10,7 @@ package org.locationtech.geomesa.fs.storage.converter import com.typesafe.scalalogging.StrictLogging import org.apache.commons.compress.archivers.ArchiveStreamFactory -import org.apache.hadoop.fs.{FileContext, Path} +import org.apache.hadoop.fs.{FileContext, FileSystem, Path} import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.geotools.api.filter.Filter import org.locationtech.geomesa.convert.EvaluationContext @@ -25,7 +25,7 @@ import java.util.Locale import scala.util.control.NonFatal class ConverterFileSystemReader( - fc: FileContext, + fs: FileSystem, converter: SimpleFeatureConverter, filter: Option[Filter], transform: Option[(String, SimpleFeatureType)] @@ -37,9 +37,9 @@ class ConverterFileSystemReader( logger.debug(s"Opening file $path") val iter = try { val handle = PathUtils.getUncompressedExtension(path.getName).toLowerCase(Locale.US) match { - case TAR => new HadoopTarHandle(fc, path) - case ZIP | JAR => new HadoopZipHandle(fc, path) - case _ => new HadoopFileHandle(fc, path) + case TAR => new HadoopTarHandle(fs, path) + case ZIP | JAR => new HadoopZipHandle(fs, path) + case _ => new HadoopFileHandle(fs, path) } handle.open.flatMap { case (name, is) => val params = EvaluationContext.inputFileParam(name.getOrElse(handle.path)) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadata.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadata.scala index 0ceda79f1348..c0fd6db0fa62 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadata.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadata.scala @@ -38,9 +38,9 @@ class ConverterMetadata( override def getPartition(name: String): Option[PartitionMetadata] = { val path = new Path(context.root, name) - if (!PathCache.exists(context.fc, path)) { None } else { + if (!PathCache.exists(context.fs, path)) { None } else { val files = if (leafStorage) { Seq(StorageFile(name, 0L)) } else { - PathCache.list(context.fc, path).map(fs => StorageFile(fs.getPath.getName, 0L)).toList + PathCache.list(context.fs, path).map(fs => StorageFile(fs.getPath.getName, 0L)).toList } Some(PartitionMetadata(name, files, None, -1L)) } @@ -49,7 +49,7 @@ class ConverterMetadata( override def getPartitions(prefix: Option[String]): Seq[PartitionMetadata] = { buildPartitionList(prefix, dirty.compareAndSet(true, false)).map { name => val files = if (leafStorage) { Seq(StorageFile(name, 0L)) } else { - PathCache.list(context.fc, new Path(context.root, name)).map(fs => StorageFile(fs.getPath.getName, 0L)).toList + PathCache.list(context.fs, new Path(context.root, name)).map(fs => StorageFile(fs.getPath.getName, 0L)).toList } PartitionMetadata(name, files, None, -1L) } @@ -73,9 +73,9 @@ class ConverterMetadata( private def buildPartitionList(prefix: Option[String], invalidate: Boolean): List[String] = { if (invalidate) { - PathCache.invalidate(context.fc, context.root) + PathCache.invalidate(context.fs, context.root) } - val top = PathCache.list(context.fc, context.root) + val top = PathCache.list(context.fs, context.root) top.flatMap(f => buildPartitionList(f.getPath, "", prefix, 1, invalidate)).toList } @@ -86,9 +86,9 @@ class ConverterMetadata( curDepth: Int, invalidate: Boolean): List[String] = { if (invalidate) { - PathCache.invalidate(context.fc, path) + PathCache.invalidate(context.fs, path) } - if (curDepth > scheme.depth || !PathCache.status(context.fc, path).isDirectory) { + if (curDepth > scheme.depth || !PathCache.status(context.fs, path).isDirectory) { val file = s"$prefix${path.getName}" if (filter.forall(file.startsWith)) { List(file) } else { List.empty } } else { @@ -97,7 +97,7 @@ class ConverterMetadata( if (next.length >= f.length) { next.startsWith(f) } else { next == f.substring(0, next.length) } } if (continue) { - PathCache.list(context.fc, path).toList.flatMap { f => + PathCache.list(context.fs, path).toList.flatMap { f => buildPartitionList(f.getPath, s"$next/", filter, curDepth + 1, invalidate) } } else { diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala index 4e92c5fddb80..8a4503fb2b80 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala @@ -36,13 +36,13 @@ class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, co override protected def createReader( filter: Option[Filter], transform: Option[(String, SimpleFeatureType)]): FileSystemPathReader = { - new ConverterFileSystemReader(context.fc, converter, filter, transform) + new ConverterFileSystemReader(context.fs, converter, filter, transform) } override def getFilePaths(partition: String): Seq[StorageFilePath] = { val path = new Path(context.root, partition) if (metadata.leafStorage) { Seq(StorageFilePath(StorageFile(path.getName, 0L), path)) } else { - PathCache.list(context.fc, path).map(p => StorageFilePath(StorageFile(p.getPath.getName, 0L), p.getPath)).toList + PathCache.list(context.fs, path).map(p => StorageFilePath(StorageFile(p.getPath.getName, 0L), p.getPath)).toList } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemStorageTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemStorageTest.scala index 8bfbe41eb7bc..5d6c2eb6683b 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemStorageTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemStorageTest.scala @@ -10,7 +10,7 @@ package org.locationtech.geomesa.fs.storage.converter import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileContext, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.geotools.api.data.Query import org.geotools.filter.text.ecql.ECQL import org.junit.runner.RunWith @@ -60,7 +60,7 @@ class ConverterFileSystemStorageTest extends Specification with LazyLogging { conf.set(ConverterStorageFactory.PartitionSchemeParam, "daily") conf.set(ConverterStorageFactory.LeafStorageParam, "false") - val context = FileSystemContext(FileContext.getFileContext(dir), conf, new Path(dir)) + val context = FileSystemContext(new Path(dir), conf) val metadata = StorageMetadataFactory.load(context).orNull metadata must not(beNull) val storage = FileSystemStorageFactory(context, metadata) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorageTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorageTest.scala index 18e5608ac4fa..9a53cf9e11a1 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorageTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorageTest.scala @@ -10,7 +10,7 @@ package org.locationtech.geomesa.fs.storage.orc import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileContext, Path} +import org.apache.hadoop.fs.{FileContext, FileSystem, Path} import org.geotools.api.data.Query import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.geotools.api.filter.Filter @@ -59,7 +59,7 @@ class OrcFileSystemStorageTest extends Specification with LazyLogging { } withTestDir { dir => - val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir) + val context = FileSystemContext(dir, config) val metadata = new FileBasedMetadataFactory() .create(context, Map.empty, Metadata(sft, "orc", scheme, leafStorage = true)) @@ -145,7 +145,7 @@ class OrcFileSystemStorageTest extends Specification with LazyLogging { } withTestDir { dir => - val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir) + val context = FileSystemContext(dir, config) val metadata = new FileBasedMetadataFactory() .create(context, Map.empty, Metadata(sft, "orc", scheme, leafStorage = true)) @@ -202,7 +202,7 @@ class OrcFileSystemStorageTest extends Specification with LazyLogging { } withTestDir { dir => - val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir) + val context = FileSystemContext(dir, config) val metadata = new FileBasedMetadataFactory() .create(context, Map.empty, Metadata(sft, "orc", scheme, leafStorage = true)) @@ -264,7 +264,7 @@ class OrcFileSystemStorageTest extends Specification with LazyLogging { } withTestDir { dir => - val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir) + val context = FileSystemContext(dir, config) val metadata = new FileBasedMetadataFactory() .create(context, Map.empty, Metadata(sft, "orc", scheme, leafStorage = true)) @@ -315,11 +315,11 @@ class OrcFileSystemStorageTest extends Specification with LazyLogging { "transition old metadata files" in { withTestDir { dir => - val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir) + val context = FileSystemContext(dir, config) val meta = new Path(dir, "metadata.json") - context.fc.util.copy(new Path(getClass.getClassLoader.getResource("metadata-old.json").toURI), meta) - context.fc.util.exists(meta) must beTrue - PathCache.invalidate(context.fc, meta) + context.fs.copyFromLocalFile(new Path(getClass.getClassLoader.getResource("metadata-old.json").toURI), meta) + context.fs.exists(meta) must beTrue + PathCache.invalidate(context.fs, meta) val metadata = new FileBasedMetadataFactory().load(context) metadata must beSome diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/CompactionTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/CompactionTest.scala index b016357343f2..19f6096c3a54 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/CompactionTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/CompactionTest.scala @@ -10,7 +10,7 @@ package org.locationtech.geomesa.parquet import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileContext, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.geotools.api.data.Query import org.geotools.filter.text.ecql.ECQL import org.geotools.util.factory.Hints @@ -35,15 +35,15 @@ class CompactionTest extends Specification with AllExpectations { sequential - val sft = SimpleFeatureTypes.createType("test", "name:String,age:Int,dtg:Date,*geom:Point:srid=4326") - val tempDir = Files.createTempDirectory("geomesa") - val fc = FileContext.getFileContext(tempDir.toUri) + lazy val sft = SimpleFeatureTypes.createType("test", "name:String,age:Int,dtg:Date,*geom:Point:srid=4326") + lazy val tempDir = Files.createTempDirectory("geomesa") + lazy val fs = FileSystem.get(tempDir.toUri, new Configuration()) "ParquetFileSystemStorage" should { "compact partitions" >> { val conf = new Configuration() conf.set("parquet.compression", "gzip") - val context = FileSystemContext(fc, conf, new Path(tempDir.toUri)) + val context = FileSystemContext(fs, conf, new Path(tempDir.toUri)) val metadata = new FileBasedMetadataFactory() @@ -107,7 +107,7 @@ class CompactionTest extends Specification with AllExpectations { // compact to a given file size // verify if file is appropriately sized, it won't be modified val paths = fsStorage.getFilePaths(partition).map(_.path) - val size = paths.map(f => fc.getFileStatus(f).getLen).sum + val size = paths.map(f => fs.getFileStatus(f).getLen).sum fsStorage.compact(Some(partition), Some(size)) fsStorage.getFilePaths(partition).map(_.path) mustEqual paths // verify files are split into smaller ones diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala index 995482f535ba..1207d259da17 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala @@ -11,7 +11,7 @@ package org.locationtech.geomesa.parquet import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileContext, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.geotools.api.data.Query import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.geotools.api.filter.Filter @@ -62,7 +62,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog } withTestDir { dir => - val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir) + val context = FileSystemContext(dir, config) val metadata = new FileBasedMetadataFactory() .create(context, Map.empty, Metadata(sft, "parquet", scheme, leafStorage = true)) @@ -148,7 +148,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog } withTestDir { dir => - val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir) + val context = FileSystemContext(dir, config) val metadata = new FileBasedMetadataFactory() .create(context, Map.empty, Metadata(sft, "parquet", scheme, leafStorage = true)) @@ -205,7 +205,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog } withTestDir { dir => - val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir) + val context = FileSystemContext(dir, config) val metadata = new FileBasedMetadataFactory() .create(context, Map.empty, Metadata(sft, "parquet", scheme, leafStorage = true)) @@ -267,7 +267,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog } withTestDir { dir => - val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir) + val context = FileSystemContext(dir, config) val metadata = new FileBasedMetadataFactory() .create(context, Map.empty, Metadata(sft, "parquet", scheme, leafStorage = true)) @@ -333,7 +333,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog val targetSize = 2100L withTestDir { dir => - val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir) + val context = FileSystemContext(dir, config) val metadata = new FileBasedMetadataFactory() .create(context, Map.empty, Metadata(sft, "parquet", scheme, leafStorage = true, Some(targetSize))) @@ -358,7 +358,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog foreach(partitions) { partition => val paths = storage.getFilePaths(partition) paths.size must beGreaterThan(1) - foreach(paths)(p => context.fc.getFileStatus(p.path).getLen must beCloseTo(targetSize, targetSize / 10)) + foreach(paths)(p => context.fs.getFileStatus(p.path).getLen must beCloseTo(targetSize, targetSize / 10)) } } } @@ -367,7 +367,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog val url = getClass.getClassLoader.getResource("data/2.3.0/example-csv/") url must not(beNull) val path = new Path(url.toURI) - val context = FileSystemContext(FileContext.getFileContext(url.toURI), config, path) + val context = FileSystemContext(path, config) val metadata = StorageMetadataFactory.load(context).orNull metadata must not(beNull) val storage = FileSystemStorageFactory(context, metadata) diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/FileSystemCompactionJob.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/FileSystemCompactionJob.scala index 6101117f5786..d8fd5daf6d5d 100644 --- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/FileSystemCompactionJob.scala +++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/FileSystemCompactionJob.scala @@ -63,7 +63,7 @@ trait FileSystemCompactionJob extends StorageConfiguration with JobWithLibJars { job.setOutputKeyClass(classOf[Void]) job.setOutputValueClass(classOf[SimpleFeature]) - val qualifiedTempPath = tempPath.map(storage.context.fc.makeQualified) + val qualifiedTempPath = tempPath.map(storage.context.fs.makeQualified) StorageConfiguration.setRootPath(job.getConfiguration, storage.context.root) StorageConfiguration.setPartitions(job.getConfiguration, partitions.map(_.name).toArray) @@ -105,7 +105,7 @@ trait FileSystemCompactionJob extends StorageConfiguration with JobWithLibJars { existingDataFiles.foreach { case (partition, files) => val counter = StorageConfiguration.Counters.partition(partition.name) val count = Option(job.getCounters.findCounter(StorageConfiguration.Counters.Group, counter)).map(_.getValue) - files.foreach(f => storage.context.fc.delete(f.path, false)) + files.foreach(f => storage.context.fs.delete(f.path, false)) storage.metadata.removePartition(partition.copy(count = count.getOrElse(0L))) val removed = count.map(c => s"containing $c features ").getOrElse("") Command.user.info(s"Removed ${TextTools.getPlural(files.size, "file")} ${removed}in partition ${partition.name}") diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala index 8af2bfc64b35..8895273e76ca 100644 --- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala +++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala @@ -8,7 +8,7 @@ package org.locationtech.geomesa.fs.tools.compact -import org.apache.hadoop.fs.{FileContext, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ import org.geotools.api.data.Query @@ -35,7 +35,7 @@ class PartitionInputFormat extends InputFormat[Void, SimpleFeature] { val conf = context.getConfiguration val root = StorageConfiguration.getRootPath(conf) - val fsc = FileSystemContext(FileContext.getFileContext(root.toUri, conf), conf, root) + val fsc = FileSystemContext(root, conf) val fileSize = StorageConfiguration.getTargetFileSize(conf) val metadata = StorageMetadataFactory.load(fsc).getOrElse { @@ -49,7 +49,7 @@ class PartitionInputFormat extends InputFormat[Void, SimpleFeature] { var size = 0L val files = storage.getFilePaths(partition).filter { f => if (sizeCheck.exists(_.apply(f.path))) { false } else { - size += PathCache.status(fsc.fc, f.path).getLen + size += PathCache.status(fsc.fs, f.path).getLen true } } @@ -127,7 +127,7 @@ object PartitionInputFormat { override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = { val conf = context.getConfiguration val root = StorageConfiguration.getRootPath(conf) - val fsc = FileSystemContext(FileContext.getFileContext(root.toUri, conf), conf, root) + val fsc = FileSystemContext(root, conf) val metadata = StorageMetadataFactory.load(fsc).getOrElse { throw new IllegalArgumentException(s"No storage defined under path '$root'") } diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FileSystemConverterJob.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FileSystemConverterJob.scala index 53b0bc5333d8..94a0e71eb696 100644 --- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FileSystemConverterJob.scala +++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FileSystemConverterJob.scala @@ -10,7 +10,7 @@ package org.locationtech.geomesa.fs.tools.ingest import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging -import org.apache.hadoop.fs.{FileContext, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat @@ -137,9 +137,8 @@ object FileSystemConverterJob { override def setup(context: Context): Unit = { val root = StorageConfiguration.getRootPath(context.getConfiguration) - val fc = FileContext.getFileContext(root.toUri, context.getConfiguration) // note: we don't call `reload` (to get the partition metadata) as we aren't using it - metadata = StorageMetadataFactory.load(FileSystemContext(fc, context.getConfiguration, root)).getOrElse { + metadata = StorageMetadataFactory.load(FileSystemContext(root, context.getConfiguration)).getOrElse { throw new IllegalArgumentException(s"Could not load storage instance at path $root") } serializer = KryoFeatureSerializer(metadata.sft, SerializationOptions.none) @@ -179,9 +178,8 @@ object FileSystemConverterJob { override def setup(context: Context): Unit = { val root = StorageConfiguration.getRootPath(context.getConfiguration) - val fc = FileContext.getFileContext(root.toUri, context.getConfiguration) // note: we don't call `reload` (to get the partition metadata) as we aren't using it - val metadata = StorageMetadataFactory.load(FileSystemContext(fc, context.getConfiguration, root)).getOrElse { + val metadata = StorageMetadataFactory.load(FileSystemContext(root, context.getConfiguration)).getOrElse { throw new IllegalArgumentException(s"Could not load storage instance at path $root") } serializer = KryoFeatureSerializer(metadata.sft, SerializationOptions.none) diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsIngestCommand.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsIngestCommand.scala index f3b7f047c46d..fc900c3e8fe7 100644 --- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsIngestCommand.scala +++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsIngestCommand.scala @@ -53,13 +53,13 @@ class FsIngestCommand extends IngestCommand[FileSystemDataStore] with FsDistribu throw new ParameterException("Please specify --num-reducers for distributed ingest") } val storage = ds.storage(sft.getTypeName) - val tmpPath = Option(params.tempPath).map(d => storage.context.fc.makeQualified(new Path(d))) + val tmpPath = Option(params.tempPath).map(d => storage.context.fs.makeQualified(new Path(d))) val targetFileSize = storage.metadata.get(Metadata.TargetFileSize).map(_.toLong) tmpPath.foreach { tp => - if (storage.context.fc.util.exists(tp)) { + if (storage.context.fs.exists(tp)) { Command.user.info(s"Deleting temp path $tp") - storage.context.fc.delete(tp, true) + storage.context.fs.delete(tp, true) } } diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsManageMetadataCommand.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsManageMetadataCommand.scala index ad8c33aa8276..9dafbd8738a1 100644 --- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsManageMetadataCommand.scala +++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsManageMetadataCommand.scala @@ -228,7 +228,7 @@ object FsManageMetadataCommand { * added to onDisk */ private def listRoot(): Unit = { - val iter = storage.context.fc.listStatus(storage.context.root) + val iter = storage.context.fs.listStatusIterator(storage.context.root) // use a phaser to track worker thread completion val phaser = new Phaser(2) // 1 for this thread + 1 for the worker pool.submit(new TopLevelListWorker(phaser, iter)) @@ -263,7 +263,7 @@ object FsManageMetadataCommand { partitions.forall(_.exists(p => p == name || p.startsWith(s"$name/")))) { i += 1 // use a tiered phaser on each directory avoid the limit of 65535 registered parties - pool.submit(new ListWorker(new Phaser(phaser, 1), name, storage.context.fc.listStatus(path))) + pool.submit(new ListWorker(new Phaser(phaser, 1), name, storage.context.fs.listStatusIterator(path))) } } else if (name != MetadataJson.MetadataPath) { onDisk.computeIfAbsent("", computeFunction).put(name, java.lang.Boolean.TRUE) @@ -293,7 +293,7 @@ object FsManageMetadataCommand { if (partitions.forall(_.exists(p => p == nextPartition || p.startsWith(s"$nextPartition/")))) { i += 1 // use a tiered phaser on each directory avoid the limit of 65535 registered parties - pool.submit(new ListWorker(new Phaser(phaser, 1), nextPartition, storage.context.fc.listStatus(path))) + pool.submit(new ListWorker(new Phaser(phaser, 1), nextPartition, storage.context.fs.listStatusIterator(path))) } } else { val leafPartition = diff --git a/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala b/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala index 3ea889ef558b..f5aa815c7e90 100644 --- a/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala +++ b/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala @@ -160,7 +160,7 @@ class CompactCommandTest extends Specification { val storage = ds.storage(sft.getTypeName) foreach(storage.metadata.getPartitions()) { partition => partition.files.size must beGreaterThan(1) - val sizes = storage.getFilePaths(partition.name).map(p => storage.context.fc.getFileStatus(p.path).getLen) + val sizes = storage.getFilePaths(partition.name).map(p => storage.context.fs.getFileStatus(p.path).getLen) // hard to get very close with 2 different formats and small files... foreach(sizes)(_ must beCloseTo(targetFileSize, 4000)) } diff --git a/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/FsManageMetadataCommandTest.scala b/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/FsManageMetadataCommandTest.scala index cd74c3229766..88325a39ceb0 100644 --- a/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/FsManageMetadataCommandTest.scala +++ b/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/FsManageMetadataCommandTest.scala @@ -61,7 +61,7 @@ class FsManageMetadataCommandTest extends Specification { val files = storage.metadata.getPartitions().flatMap(_.files.map(_.name)).toList // move a file - it's not in the right partition so it won't be matched correctly by filters, // but it's good enough for a test - storage.context.fc.rename(new Path(storage.context.root, "2022"), new Path(storage.context.root, "2019")) + storage.context.fs.rename(new Path(storage.context.root, "2022"), new Path(storage.context.root, "2019")) // verify we can't retrieve the moved file SelfClosingIterator(ds.getFeatureReader(new Query(sft.getTypeName), Transaction.AUTO_COMMIT)).toList must containTheSameElementsAs(features.take(2)) @@ -94,7 +94,7 @@ class FsManageMetadataCommandTest extends Specification { } val storage = ds.storage(sft.getTypeName) // delete a file - storage.context.fc.delete(new Path(storage.context.root, "2022"), true) + storage.context.fs.delete(new Path(storage.context.root, "2022"), true) // delete a partition from the metadata storage.metadata.getPartition("2021/01/01") match { case None => ko("Expected Some for partition but got none") diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/export/ExportCommand.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/export/ExportCommand.scala index f2ec865e6aab..48f7c8ecee7e 100644 --- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/export/ExportCommand.scala +++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/export/ExportCommand.scala @@ -601,7 +601,7 @@ object ExportCommand extends LazyLogging { // lowest level - keep track of the bytes we write // do this before any compression, buffering, etc so we get an accurate count - private val counter = new CountingOutputStream(out.write(CreateMode.Create, createParents = true)) + private val counter = new CountingOutputStream(out.write(CreateMode.Create)) private val stream = { val compressed = gzip match { case None => counter diff --git a/geomesa-tools/src/test/scala/org/locationtech/geomesa/tools/export/ExportToFsTest.scala b/geomesa-tools/src/test/scala/org/locationtech/geomesa/tools/export/ExportToFsTest.scala index b32eb4133a0f..47bfc909f608 100644 --- a/geomesa-tools/src/test/scala/org/locationtech/geomesa/tools/export/ExportToFsTest.scala +++ b/geomesa-tools/src/test/scala/org/locationtech/geomesa/tools/export/ExportToFsTest.scala @@ -9,7 +9,7 @@ package org.locationtech.geomesa.tools.`export` import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileContext, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.geotools.api.data.{DataStore, Query, SimpleFeatureStore} import org.geotools.data.collection.ListFeatureCollection import org.geotools.data.memory.MemoryDataStore @@ -59,7 +59,8 @@ class ExportToFsTest extends Specification { .addFeatures(new ListFeatureCollection(sft, features: _*)) val storage = { - val context = FileSystemContext(FileContext.getFileContext, new Configuration(), new Path(out.toUri)) + val conf = new Configuration() + val context = FileSystemContext(new Path(out.toUri), conf) val metadata = new FileBasedMetadataFactory() .create(context, Map.empty, Metadata(sft, "parquet", NamedOptions("daily"), leafStorage = true)) diff --git a/geomesa-utils-parent/geomesa-hadoop-utils/src/main/scala/org/locationtech/geomesa/utils/hadoop/HadoopDelegate.scala b/geomesa-utils-parent/geomesa-hadoop-utils/src/main/scala/org/locationtech/geomesa/utils/hadoop/HadoopDelegate.scala index 8576e9c289f8..9693a6479181 100644 --- a/geomesa-utils-parent/geomesa-hadoop-utils/src/main/scala/org/locationtech/geomesa/utils/hadoop/HadoopDelegate.scala +++ b/geomesa-utils-parent/geomesa-hadoop-utils/src/main/scala/org/locationtech/geomesa/utils/hadoop/HadoopDelegate.scala @@ -9,12 +9,11 @@ package org.locationtech.geomesa.utils.hadoop import com.typesafe.scalalogging.LazyLogging -import org.apache.commons.compress.archivers.{ArchiveEntry, ArchiveInputStream, ArchiveStreamFactory} import org.apache.commons.compress.archivers.zip.ZipFile +import org.apache.commons.compress.archivers.{ArchiveEntry, ArchiveInputStream, ArchiveStreamFactory} import org.apache.commons.compress.utils.SeekableInMemoryByteChannel import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Options.CreateOpts import org.apache.hadoop.fs._ import org.locationtech.geomesa.utils.collection.CloseableIterator import org.locationtech.geomesa.utils.hadoop.HadoopDelegate.{HadoopFileHandle, HadoopTarHandle, HadoopZipHandle} @@ -47,19 +46,19 @@ class HadoopDelegate(conf: Configuration) extends FileSystemDelegate { override def getHandle(path: String): FileHandle = { val p = new Path(path) - val fc = FileContext.getFileContext(p.toUri, conf) + val fs = FileSystem.get(p.toUri, conf) PathUtils.getUncompressedExtension(p.getName).toLowerCase(Locale.US) match { - case TAR => new HadoopTarHandle(fc, p) - case ZIP | JAR => new HadoopZipHandle(fc, p) - case _ => new HadoopFileHandle(fc, p) + case TAR => new HadoopTarHandle(fs, p) + case ZIP | JAR => new HadoopZipHandle(fs, p) + case _ => new HadoopFileHandle(fs, p) } } // based on logic from hadoop FileInputFormat override def interpretPath(path: String): Seq[FileHandle] = { val p = new Path(path) - val fc = FileContext.getFileContext(p.toUri, conf) - val files = fc.util.globStatus(p, HiddenFileFilter) + val fs = FileSystem.get(p.toUri, conf) + val files = fs.globStatus(p, HiddenFileFilter) if (files == null) { throw new IllegalArgumentException(s"Input path does not exist: $path") @@ -74,7 +73,7 @@ class HadoopDelegate(conf: Configuration) extends FileSystemDelegate { val file = remaining.dequeue() if (file.isDirectory) { if (recursive) { - val children = fc.listLocatedStatus(file.getPath) + val children = fs.listLocatedStatus(file.getPath) val iter = new Iterator[LocatedFileStatus] { override def hasNext: Boolean = children.hasNext override def next(): LocatedFileStatus = children.next @@ -83,9 +82,9 @@ class HadoopDelegate(conf: Configuration) extends FileSystemDelegate { } } else { PathUtils.getUncompressedExtension(file.getPath.getName).toLowerCase(Locale.US) match { - case TAR => result += new HadoopTarHandle(fc, file.getPath) - case ZIP | JAR => result += new HadoopZipHandle(fc, file.getPath) - case _ => result += new HadoopFileHandle(fc, file.getPath) + case TAR => result += new HadoopTarHandle(fs, file.getPath) + case ZIP | JAR => result += new HadoopZipHandle(fs, file.getPath) + case _ => result += new HadoopFileHandle(fs, file.getPath) } } } @@ -129,63 +128,57 @@ object HadoopDelegate extends LazyLogging { } } - class HadoopFileHandle(fc: FileContext, file: Path) extends FileHandle { + class HadoopFileHandle(fs: FileSystem, file: Path) extends FileHandle { override def path: String = file.toString - override def exists: Boolean = fc.util.exists(file) + override def exists: Boolean = fs.exists(file) - override def length: Long = if (exists) { fc.getFileStatus(file).getLen } else { 0L } + override def length: Long = if (exists) { fs.getFileStatus(file).getLen } else { 0L } override def open: CloseableIterator[(Option[String], InputStream)] = { - val is = PathUtils.handleCompression(fc.open(file), file.getName) + val is = PathUtils.handleCompression(fs.open(file), file.getName) CloseableIterator.single(None -> is, is.close()) } - override def write(mode: CreateMode, createParents: Boolean): OutputStream = { + override def write(mode: CreateMode): OutputStream = { mode.validate() - val flags = java.util.EnumSet.noneOf(classOf[CreateFlag]) if (mode.append) { - flags.add(CreateFlag.APPEND) - } else if (mode.overwrite) { - flags.add(CreateFlag.OVERWRITE) - } - if (mode.create) { - flags.add(CreateFlag.CREATE) + fs.append(file) + } else { + fs.create(file, mode.overwrite) // TODO do we need to hsync/hflush? } - val ops = if (createParents) { CreateOpts.createParent() } else { CreateOpts.donotCreateParent() } - fc.create(file, flags, ops) // TODO do we need to hsync/hflush? } override def delete(recursive: Boolean): Unit = { - if (!fc.delete(file, recursive)) { + if (!fs.delete(file, recursive)) { throw new IOException(s"Could not delete file: $path") } } } - class HadoopZipHandle(fc: FileContext, file: Path) extends HadoopFileHandle(fc, file) { + class HadoopZipHandle(fs: FileSystem, file: Path) extends HadoopFileHandle(fs, file) { override def open: CloseableIterator[(Option[String], InputStream)] = { // we have to read the bytes into memory to get random access reads - val bytes = WithClose(PathUtils.handleCompression(fc.open(file), file.getName)) { is => + val bytes = WithClose(PathUtils.handleCompression(fs.open(file), file.getName)) { is => IOUtils.toByteArray(is) } new ZipFileIterator(new ZipFile(new SeekableInMemoryByteChannel(bytes)), file.toString) } - override def write(mode: CreateMode, createParents: Boolean): OutputStream = - factory.createArchiveOutputStream(ArchiveStreamFactory.ZIP, super.write(mode, createParents)) + override def write(mode: CreateMode): OutputStream = + factory.createArchiveOutputStream(ArchiveStreamFactory.ZIP, super.write(mode)) } - class HadoopTarHandle(fc: FileContext, file: Path) extends HadoopFileHandle(fc, file) { + class HadoopTarHandle(fs: FileSystem, file: Path) extends HadoopFileHandle(fs, file) { override def open: CloseableIterator[(Option[String], InputStream)] = { - val uncompressed = PathUtils.handleCompression(fc.open(file), file.getName) + val uncompressed = PathUtils.handleCompression(fs.open(file), file.getName) val archive: ArchiveInputStream[_ <: ArchiveEntry] = factory.createArchiveInputStream(ArchiveStreamFactory.TAR, uncompressed) new ArchiveFileIterator(archive, file.toString) } - override def write(mode: CreateMode, createParents: Boolean): OutputStream = - factory.createArchiveOutputStream(ArchiveStreamFactory.TAR, super.write(mode, createParents)) + override def write(mode: CreateMode): OutputStream = + factory.createArchiveOutputStream(ArchiveStreamFactory.TAR, super.write(mode)) } } diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/CompressionUtils.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/CompressionUtils.scala index 726ee9f80645..0e1e2132ac44 100644 --- a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/CompressionUtils.scala +++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/CompressionUtils.scala @@ -24,7 +24,9 @@ trait CompressionUtils { def isCompressedFilename(filename: String): Boolean def getUncompressedFilename(filename: String): String def getCompressedFilename(filename: String): String - def compress(is: InputStream): InputStream + @deprecated("misleading name - replaced with decompress") + def compress(is: InputStream): InputStream = decompress(is) + def decompress(is: InputStream): InputStream } object CompressionUtils { @@ -59,7 +61,7 @@ object CompressionUtils { org.apache.commons.compress.compressors.gzip.GzipUtils.getUncompressedFilename(filename) override def getCompressedFilename(filename: String): String = org.apache.commons.compress.compressors.gzip.GzipUtils.getCompressedFilename(filename) - override def compress(is: InputStream): InputStream = new GZIPInputStream(is) + override def decompress(is: InputStream): InputStream = new GZIPInputStream(is) } case object XZUtils extends CompressionUtils { @@ -69,7 +71,7 @@ object CompressionUtils { org.apache.commons.compress.compressors.xz.XZUtils.getUncompressedFilename(filename) override def getCompressedFilename(filename: String): String = org.apache.commons.compress.compressors.xz.XZUtils.getCompressedFilename(filename) - override def compress(is: InputStream): InputStream = new XZCompressorInputStream(is) + override def decompress(is: InputStream): InputStream = new XZCompressorInputStream(is) } case object BZip2Utils extends CompressionUtils { @@ -79,6 +81,6 @@ object CompressionUtils { org.apache.commons.compress.compressors.bzip2.BZip2Utils.getUncompressedFilename(filename) override def getCompressedFilename(filename: String): String = org.apache.commons.compress.compressors.bzip2.BZip2Utils.getCompressedFilename(filename) - override def compress(is: InputStream): InputStream = new BZip2CompressorInputStream(is) + override def decompress(is: InputStream): InputStream = new BZip2CompressorInputStream(is) } } diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/PathUtils.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/PathUtils.scala index 58e11f093822..e17a9c09e55a 100644 --- a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/PathUtils.scala +++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/PathUtils.scala @@ -93,7 +93,7 @@ object PathUtils extends FileSystemDelegate with LazyLogging { val buffered = new BufferedInputStream(is) CompressionUtils.Utils.find(_.isCompressedFilename(filename)) match { case None => buffered - case Some(utils) => utils.compress(buffered) + case Some(utils) => utils.decompress(buffered) } } diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/fs/FileSystemDelegate.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/fs/FileSystemDelegate.scala index c5d4aa6b1272..47fcaec70ebc 100644 --- a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/fs/FileSystemDelegate.scala +++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/fs/FileSystemDelegate.scala @@ -43,7 +43,7 @@ trait FileSystemDelegate extends LazyLogging { def getUrl(path: String): URL } -object FileSystemDelegate { +object FileSystemDelegate extends LazyLogging { /** * Creation mode for files @@ -125,10 +125,22 @@ object FileSystemDelegate { * Open the file for writing * * @param mode write mode - * @param createParents if the file does not exist, create its parents. Note that this only makes sense - * with `CreateMode.Create` */ - def write(mode: CreateMode, createParents: Boolean = false): OutputStream + def write(mode: CreateMode): OutputStream + + /** + * Open the file for writing + * + * @param mode write mode + * @param createParents create parent dirs as necessary + */ + @deprecated("createParents is always true") + def write(mode: CreateMode, createParents: Boolean): OutputStream = { + if (!createParents) { + logger.warn("Call to write with createParents=false, which is not supported") + } + write(mode) + } /** * Delete the file diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/fs/LocalDelegate.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/fs/LocalDelegate.scala index 4430472cd712..a4a186413e6f 100644 --- a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/fs/LocalDelegate.scala +++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/fs/LocalDelegate.scala @@ -105,7 +105,7 @@ object LocalDelegate { CloseableIterator.single(None -> is, is.close()) } - override def write(mode: CreateMode, createParents: Boolean): OutputStream = { + override def write(mode: CreateMode): OutputStream = { mode.validate() if (file.exists()) { if (mode.append) { @@ -120,12 +120,8 @@ object LocalDelegate { throw new FileNotFoundException(s"File does not exist: $path") } else { val parent = file.getParentFile - if (parent != null && !parent.exists()) { - if (!createParents) { - throw new FileNotFoundException(s"Parent file does not exist: $path") - } else if (!parent.mkdirs()) { - throw new IOException(s"Parent file does not exist and could not be created: $path") - } + if (parent != null && !parent.exists() && !parent.mkdirs()) { + throw new IOException(s"Parent file does not exist and could not be created: $path") } new FileOutputStream(file) } @@ -143,8 +139,8 @@ object LocalDelegate { class LocalZipHandle(file: File) extends LocalFileHandle(file) { override def open: CloseableIterator[(Option[String], InputStream)] = new ZipFileIterator(new ZipFile(file), file.getAbsolutePath) - override def write(mode: CreateMode, createParents: Boolean): OutputStream = - factory.createArchiveOutputStream(ArchiveStreamFactory.ZIP, super.write(mode, createParents)) + override def write(mode: CreateMode): OutputStream = + factory.createArchiveOutputStream(ArchiveStreamFactory.ZIP, super.write(mode)) } class LocalTarHandle(file: File) extends LocalFileHandle(file) { @@ -154,8 +150,8 @@ object LocalDelegate { factory.createArchiveInputStream(ArchiveStreamFactory.TAR, uncompressed) new ArchiveFileIterator(archive, file.getAbsolutePath) } - override def write(mode: CreateMode, createParents: Boolean): OutputStream = - factory.createArchiveOutputStream(ArchiveStreamFactory.TAR, super.write(mode, createParents)) + override def write(mode: CreateMode): OutputStream = + factory.createArchiveOutputStream(ArchiveStreamFactory.TAR, super.write(mode)) } private class StdInHandle(in: InputStream) extends FileHandle { @@ -164,7 +160,7 @@ object LocalDelegate { override def length: Long = Try(in.available().toLong).getOrElse(0L) // .available will throw if stream is closed override def open: CloseableIterator[(Option[String], InputStream)] = CloseableIterator.single(None -> CloseShieldInputStream.wrap(in)) - override def write(mode: CreateMode, createParents: Boolean): OutputStream = System.out + override def write(mode: CreateMode): OutputStream = System.out override def delete(recursive: Boolean): Unit = {} } diff --git a/pom.xml b/pom.xml index 2683e3301e56..5ec085ea36b9 100644 --- a/pom.xml +++ b/pom.xml @@ -122,7 +122,7 @@ 4.20.5 4.13.2 5.9.3 - 1.19.7 + 1.20.3 1.4.1 2.28.2 @@ -202,10 +202,11 @@ 2 false 2.1.3 - 7-alpine - 15.1 - 15-3.4 7.6.0 + RELEASE.2024-10-29T16-01-48Z + 15-3.4 + 15.1 + 7-alpine 3.9.2 @@ -2855,6 +2856,12 @@ ${testcontainers.version} test + + org.testcontainers + minio + ${testcontainers.version} + test + org.geomesa.testcontainers testcontainers-accumulo @@ -3061,10 +3068,11 @@ logging.properties slf4j ${test.accumulo.docker.tag} + ${test.confluent.docker.tag} + ${test.minio.docker.tag} ${test.postgis.docker.tag} ${test.postgres.docker.tag} ${test.redis.docker.tag} - ${test.confluent.docker.tag} ${test.zookeeper.docker.tag}