Skip to content

Commit

Permalink
GEOMESA-3411 FSDS - Fix reads of tar files from s3 (#3228)
Browse files Browse the repository at this point in the history
* Replace Hadoop FileContext with FileSystem, as FileContext
  seems to close the underlying FileSystem prematurely
  • Loading branch information
elahrvivaz committed Nov 6, 2024
1 parent 457b215 commit a207801
Show file tree
Hide file tree
Showing 42 changed files with 433 additions and 293 deletions.
7 changes: 4 additions & 3 deletions build/cqs.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,9 @@ org.slf4j:jul-to-slf4j 1.7.36 test
org.specs2:specs2-core_2.12 4.20.5 test
org.specs2:specs2-junit_2.12 4.20.5 test
org.specs2:specs2-mock_2.12 4.20.5 test
org.testcontainers:kafka 1.19.7 test
org.testcontainers:postgresql 1.19.7 test
org.testcontainers:testcontainers 1.19.7 test
org.testcontainers:kafka 1.20.3 test
org.testcontainers:minio 1.20.3 test
org.testcontainers:postgresql 1.20.3 test
org.testcontainers:testcontainers 1.20.3 test
org.wololo:jts2geojson 0.16.1 test
org.xerial.snappy:snappy-java 1.1.10.5 test
7 changes: 7 additions & 0 deletions build/test/resources/log4j.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@
</layout>
</appender>

<category name="org.locationtech.geomesa">
<priority value="info"/>
</category>

<category name="org.apache.hadoop">
<priority value="error"/>
</category>
<category name="software.amazon.awssdk.transfer.s3.S3TransferManager">
<priority value="error"/>
</category>
<category name="org.apache.accumulo">
<priority value="error"/>
</category>
Expand Down
19 changes: 19 additions & 0 deletions geomesa-fs/geomesa-fs-datastore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,30 @@
<artifactId>geomesa-fs-storage-orc_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3-transfer-manager</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>minio</artifactId>
</dependency>
<dependency>
<groupId>org.geomesa.testcontainers</groupId>
<artifactId>testcontainers-accumulo</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ package org.locationtech.geomesa.fs.data

import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, Path}
import org.apache.hadoop.fs.{FileContext, FileSystem, Path}
import org.geotools.api.data.Query
import org.geotools.api.feature.`type`.Name
import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.data.store.{ContentDataStore, ContentEntry, ContentFeatureSource}
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.StorageKeys
import org.locationtech.geomesa.fs.storage.common.metadata.FileBasedMetadata
import org.locationtech.geomesa.fs.storage.common.utils.PathCache
import org.locationtech.geomesa.index.stats.RunnableStats.UnoptimizedRunnableStats
import org.locationtech.geomesa.index.stats.{GeoMesaStats, HasGeoMesaStats}
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
Expand All @@ -27,8 +28,19 @@ import org.locationtech.geomesa.utils.io.CloseQuietly
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

/**
* File system data store
*
* @param fs file system - note, this is expected to be a shared resource, and is not cleaned up on data store dispose
* @param conf conf
* @param root root path
* @param readThreads number of threads used for read ops
* @param writeTimeout timeout for write ops
* @param defaultEncoding default file encoding
* @param namespace geoserver namespace
*/
class FileSystemDataStore(
fc: FileContext,
fs: FileSystem,
conf: Configuration,
root: Path,
readThreads: Int,
Expand All @@ -37,9 +49,21 @@ class FileSystemDataStore(
namespace: Option[String]
) extends ContentDataStore with HasGeoMesaStats with LazyLogging {

// noinspection ScalaUnusedSymbol
@deprecated("Use FileSystem instead of FileContext")
def this(
fc: FileContext,
conf: Configuration,
root: Path,
readThreads: Int,
writeTimeout: Duration,
defaultEncoding: Option[String],
namespace: Option[String]) =
this(FileSystem.get(root.toUri, conf), conf, root, readThreads, writeTimeout, defaultEncoding, namespace)

namespace.foreach(setNamespaceURI)

private val manager = FileSystemStorageManager(fc, conf, root, namespace)
private val manager = FileSystemStorageManager(fs, conf, root, namespace)

override val stats: GeoMesaStats = new UnoptimizedRunnableStats(this)

Expand Down Expand Up @@ -82,13 +106,15 @@ class FileSystemDataStore(
val fileSize = sft.removeTargetFileSize()

val path = manager.defaultPath(sft.getTypeName)
val context = FileSystemContext(fc, conf, path, namespace)
val context = FileSystemContext(fs, conf, path, namespace)

val metadata =
StorageMetadataFactory.create(context, meta, Metadata(sft, encoding, scheme, leafStorage, fileSize))
try { manager.register(path, FileSystemStorageFactory(context, metadata)) } catch {
case NonFatal(e) => CloseQuietly(metadata).foreach(e.addSuppressed); throw e
}
PathCache.register(fs, root)
PathCache.register(fs, path)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@

package org.locationtech.geomesa.fs.data

import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, Path}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.geotools.api.data.DataAccessFactory.Param
import org.geotools.api.data.{DataStore, DataStoreFactorySpi}
import org.locationtech.geomesa.fs.storage.api.FileSystemStorageFactory
Expand All @@ -30,7 +29,6 @@ import scala.concurrent.duration.Duration
class FileSystemDataStoreFactory extends DataStoreFactorySpi {

import FileSystemDataStoreFactory.FileSystemDataStoreParams._
import FileSystemDataStoreFactory.fileContextCache

override def createDataStore(params: java.util.Map[String, _]): DataStore = {

Expand All @@ -45,8 +43,6 @@ class FileSystemDataStoreFactory extends DataStoreFactorySpi {
conf
}

val fc = fileContextCache.get(conf)

val path = new Path(PathParam.lookup(params))
val encoding = EncodingParam.lookupOpt(params).filterNot(_.isEmpty)

Expand All @@ -63,7 +59,9 @@ class FileSystemDataStoreFactory extends DataStoreFactorySpi {

val namespace = NamespaceParam.lookupOpt(params)

new FileSystemDataStore(fc, conf, path, readThreads, writeTimeout, encoding, namespace)
val fs = FileSystem.get(path.toUri, conf)

new FileSystemDataStore(fs, conf, path, readThreads, writeTimeout, encoding, namespace)
}

override def createNewDataStore(params: java.util.Map[String, _]): DataStore =
Expand Down Expand Up @@ -105,12 +103,6 @@ object FileSystemDataStoreFactory extends GeoMesaDataStoreInfo {

private val configuration = new Configuration()

private val fileContextCache: LoadingCache[Configuration, FileContext] = Caffeine.newBuilder().build(
new CacheLoader[Configuration, FileContext]() {
override def load(conf: Configuration): FileContext = FileContext.getFileContext(conf)
}
)

object FileSystemDataStoreParams extends NamespaceParams {

val WriterFileTimeout: SystemProperty = SystemProperty("geomesa.fs.writer.partition.timeout", "60s")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ class FileSystemFeatureStore(
override def canEvent: Boolean = false
override def canReproject: Boolean = false
override def canSort: Boolean = false

override def canFilter: Boolean = true
override def canRetype: Boolean = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package org.locationtech.geomesa.fs.data
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, Path}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.utils.PathCache
import org.locationtech.geomesa.utils.io.CloseQuietly
Expand All @@ -23,11 +23,11 @@ import scala.util.control.NonFatal
/**
* Manages the storages and associated simple feature types underneath a given path
*
* @param fc file context
* @param fs file system
* @param conf configuration
* @param root root path for the data store
*/
class FileSystemStorageManager private (fc: FileContext, conf: Configuration, root: Path, namespace: Option[String])
class FileSystemStorageManager private (fs: FileSystem, conf: Configuration, root: Path, namespace: Option[String])
extends MethodProfiling with LazyLogging {

import scala.collection.JavaConverters._
Expand All @@ -42,7 +42,7 @@ class FileSystemStorageManager private (fc: FileContext, conf: Configuration, ro
*/
def storage(typeName: String): Option[FileSystemStorage] = {
cache.get(typeName).map(_._2) // check cached values
.orElse(Some(defaultPath(typeName)).filter(PathCache.exists(fc, _)).flatMap(loadPath)) // check expected (default) path
.orElse(Some(defaultPath(typeName)).filter(PathCache.exists(fs, _)).flatMap(loadPath)) // check expected (default) path
.orElse(loadAll().find(_.metadata.sft.getTypeName == typeName)) // check other paths until we find it
}

Expand Down Expand Up @@ -80,8 +80,8 @@ class FileSystemStorageManager private (fc: FileContext, conf: Configuration, ro
* @return
*/
private def loadAll(): Iterator[FileSystemStorage] = {
if (!PathCache.exists(fc, root)) { Iterator.empty } else {
val dirs = PathCache.list(fc, root).filter(_.isDirectory).map(_.getPath)
if (!PathCache.exists(fs, root)) { Iterator.empty } else {
val dirs = PathCache.list(fs, root).filter(_.isDirectory).map(_.getPath)
dirs.filterNot(path => cache.exists { case (_, (p, _)) => p == path }).flatMap(loadPath)
}
}
Expand All @@ -99,7 +99,7 @@ class FileSystemStorageManager private (fc: FileContext, conf: Configuration, ro
logger.debug(s"${ if (storage.isDefined) "Loaded" else "No" } storage at path '$path' in ${time}ms")

profile(complete _) {
val context = FileSystemContext(fc, conf, path, namespace)
val context = FileSystemContext(fs, conf, path, namespace)
StorageMetadataFactory.load(context).map { meta =>
try {
val storage = FileSystemStorageFactory(context, meta)
Expand All @@ -116,20 +116,20 @@ class FileSystemStorageManager private (fc: FileContext, conf: Configuration, ro
object FileSystemStorageManager {

private val cache = Caffeine.newBuilder().build(
new CacheLoader[(FileContext, Configuration, Path, Option[String]), FileSystemStorageManager]() {
override def load(key: (FileContext, Configuration, Path, Option[String])): FileSystemStorageManager =
new CacheLoader[(FileSystem, Configuration, Path, Option[String]), FileSystemStorageManager]() {
override def load(key: (FileSystem, Configuration, Path, Option[String])): FileSystemStorageManager =
new FileSystemStorageManager(key._1, key._2, key._3, key._4)
}
)

/**
* Load a cached storage manager instance
*
* @param fc file context
* @param fs file system
* @param conf configuration
* @param root data store root path
* @return
*/
def apply(fc: FileContext, conf: Configuration, root: Path, namespace: Option[String]): FileSystemStorageManager =
cache.get((fc, conf, root, namespace))
def apply(fs: FileSystem, conf: Configuration, root: Path, namespace: Option[String]): FileSystemStorageManager =
cache.get((fs, conf, root, namespace))
}
Empty file.
Loading

0 comments on commit a207801

Please sign in to comment.