From e4eceafc8abd72740addb64d7bc0be850338ce2e Mon Sep 17 00:00:00 2001 From: Emilio Date: Fri, 7 Jun 2024 08:28:28 -0400 Subject: [PATCH] GEOMESA-3366 FSDS - fix Hadoop 3.4 AWS compatibility (#3124) --- .../conf-filtered/dependencies.sh | 2 - geomesa-fs/geomesa-fs-dist/pom.xml | 9 ++ .../geomesa-fs-storage-common/pom.xml | 21 +++ ...ala => AbstractS3VisibilityObserver.scala} | 34 ++-- .../common/s3/S3ObjectTagObserver.scala | 54 ------- .../s3/S3VisibilityObserverFactory.scala | 52 +++---- .../common/s3/v1/S3VisibilityObserver.scala | 31 ++++ .../s3/v1/S3VisibilityObserverFactory.scala | 53 +++++++ .../common/s3/v2/S3VisibilityObserver.scala | 29 ++++ .../s3/v2/S3VisibilityObserverFactory.scala | 51 ++++++ .../common/s3/S3VisibilityObserverTest.scala | 96 +++++------- .../s3/v1/S3VisibilityObserverTest.scala | 147 ++++++++++++++++++ .../conf-filtered/dependencies.sh | 63 ++++++-- .../conf-filtered/dependencies.sh | 2 - .../conf-filtered/dependencies.sh | 2 - .../conf-filtered/dependencies.sh | 2 - geomesa-tools/conf-filtered/geomesa-env.sh | 3 + .../conf-filtered/parquet-dependencies.sh | 2 - .../geomesa/utils/io/CloseablePool.scala | 17 +- pom.xml | 31 +++- 20 files changed, 526 insertions(+), 175 deletions(-) rename geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/{S3VisibilityObserver.scala => AbstractS3VisibilityObserver.scala} (57%) delete mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3ObjectTagObserver.scala create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserver.scala create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserverFactory.scala create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v2/S3VisibilityObserver.scala create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v2/S3VisibilityObserverFactory.scala create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserverTest.scala diff --git a/geomesa-accumulo/geomesa-accumulo-tools/conf-filtered/dependencies.sh b/geomesa-accumulo/geomesa-accumulo-tools/conf-filtered/dependencies.sh index 2a926602d3f9..c3a9c6a9a091 100755 --- a/geomesa-accumulo/geomesa-accumulo-tools/conf-filtered/dependencies.sh +++ b/geomesa-accumulo/geomesa-accumulo-tools/conf-filtered/dependencies.sh @@ -17,8 +17,6 @@ zookeeper_install_version="%%zookeeper.version.recommended%%" # required for hadoop - make sure it corresponds to the hadoop installed version guava_install_version="%%accumulo.guava.version%%" -function version_ge() { test "$(echo "$@" | tr " " "\n" | sort -rV | head -n 1)" == "$1"; } - # gets the dependencies for this module # args: # $1 - current classpath diff --git a/geomesa-fs/geomesa-fs-dist/pom.xml b/geomesa-fs/geomesa-fs-dist/pom.xml index 5093d6cc6cc6..01d041a7c6bd 100644 --- a/geomesa-fs/geomesa-fs-dist/pom.xml +++ b/geomesa-fs/geomesa-fs-dist/pom.xml @@ -68,6 +68,15 @@ org.locationtech.geomesa geomesa-fs-datastore_${scala.binary.version} + + io.netty + netty-all + + + io.netty + netty-transport-native-epoll + linux-x86_64 + diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/pom.xml b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/pom.xml index b5ada8ccc5f0..45453837dfb2 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/pom.xml +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/pom.xml @@ -65,11 +65,32 @@ org.apache.hadoop hadoop-mapreduce-client-core + + org.apache.hadoop + hadoop-aws + + com.amazonaws aws-java-sdk-s3 provided + + + software.amazon.awssdk + s3 + provided + + + software.amazon.awssdk + s3-transfer-manager + provided + + + software.amazon.awssdk.crt + aws-crt + provided + diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserver.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/AbstractS3VisibilityObserver.scala similarity index 57% rename from geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserver.scala rename to geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/AbstractS3VisibilityObserver.scala index 5c0f399679db..ed721be13633 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserver.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/AbstractS3VisibilityObserver.scala @@ -8,26 +8,34 @@ package org.locationtech.geomesa.fs.storage.common.s3 -import com.amazonaws.services.s3.AmazonS3 import org.apache.accumulo.access.AccessExpression import org.apache.hadoop.fs.Path import org.geotools.api.feature.simple.SimpleFeature +import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver import org.locationtech.geomesa.security.SecurityUtils +import java.io.IOException import java.nio.charset.StandardCharsets import java.util.Base64 -/** - * Creates a tag containing the base64 encoded summary visibility for the observed file - * - * @param s3 s3 client - * @param path file path - * @param tag tag name to use - */ -class S3VisibilityObserver(s3: AmazonS3, path: Path, tag: String) extends S3ObjectTagObserver(s3, path) { +abstract class AbstractS3VisibilityObserver(path: Path) extends FileSystemObserver { private val visibilities = scala.collection.mutable.Set.empty[String] + private val (bucket, key) = { + val uri = path.toUri + val uriPath = uri.getPath + val key = if (uriPath.startsWith("/")) { uriPath.substring(1) } else { uriPath } + (uri.getHost, key) + } + + override def flush(): Unit = {} + + override def close(): Unit = { + try { makeTagRequest(bucket, key) } catch { + case e: Exception => throw new IOException("Error tagging object", e) + } + } override def write(feature: SimpleFeature): Unit = { val vis = SecurityUtils.getVisibility(feature) if (vis != null) { @@ -35,12 +43,14 @@ class S3VisibilityObserver(s3: AmazonS3, path: Path, tag: String) extends S3Obje } } - override protected def tags(): Iterable[(String, String)] = { - if (visibilities.isEmpty) { Seq.empty } else { + private def makeTagRequest(bucket: String, key: String): Unit = { + if (visibilities.nonEmpty) { val vis = visibilities.mkString("(", ")&(", ")") // this call simplifies and de-duplicates the expression val expression = AccessExpression.of(vis, /*normalize = */true).getExpression - Seq(tag -> Base64.getEncoder.encodeToString(expression.getBytes(StandardCharsets.UTF_8))) + makeTagRequest(bucket: String, key: String, Base64.getEncoder.encodeToString(expression.getBytes(StandardCharsets.UTF_8))) } } + + protected def makeTagRequest(bucket: String, key: String, visibility: String): Unit } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3ObjectTagObserver.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3ObjectTagObserver.scala deleted file mode 100644 index 65cbabe55b5c..000000000000 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3ObjectTagObserver.scala +++ /dev/null @@ -1,54 +0,0 @@ -/*********************************************************************** - * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Apache License, Version 2.0 - * which accompanies this distribution and is available at - * http://www.opensource.org/licenses/apache2.0.php. - ***********************************************************************/ - -package org.locationtech.geomesa.fs.storage.common.s3 - -import com.amazonaws.services.s3.AmazonS3 -import com.amazonaws.services.s3.model.{ObjectTagging, SetObjectTaggingRequest, Tag} -import org.apache.hadoop.fs.Path -import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver - -import java.io.IOException - -/** - * Abstract baseclass for writing s3 object tags - * - * @param s3 s3 client - * @param path file path - */ -abstract class S3ObjectTagObserver(s3: AmazonS3, path: Path) extends FileSystemObserver { - - private val (bucket, key) = { - val uri = path.toUri - val uriPath = uri.getPath - val key = if (uriPath.startsWith("/")) { uriPath.substring(1) } else { uriPath } - (uri.getHost, key) - } - - /** - * Return the tags to set on this file - * - * @return - */ - protected def tags(): Iterable[(String, String)] - - override def flush(): Unit = {} - - override def close(): Unit = { - val iter = tags() - if (iter.nonEmpty) { - try { - val list = new java.util.ArrayList[Tag]() - iter.foreach { case (k, v) => list.add(new Tag(k, v)) } - s3.setObjectTagging(new SetObjectTaggingRequest(bucket, key, new ObjectTagging(list))) - } catch { - case e: Exception => throw new IOException("Error tagging object", e) - } - } - } -} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserverFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserverFactory.scala index 3484a6c8d135..c4023c81f335 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserverFactory.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserverFactory.scala @@ -8,53 +8,49 @@ package org.locationtech.geomesa.fs.storage.common.s3 -import com.amazonaws.services.s3.AmazonS3 +import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path import org.geotools.api.feature.simple.SimpleFeatureType import org.locationtech.geomesa.fs.storage.common.observer.{FileSystemObserver, FileSystemObserverFactory} -import java.io.IOException +import scala.util.control.NonFatal /** * Factory for S3VisibilityObserver */ -class S3VisibilityObserverFactory extends FileSystemObserverFactory { +class S3VisibilityObserverFactory extends FileSystemObserverFactory with LazyLogging { - private var fs: FileSystem = _ - private var s3: AmazonS3 = _ - private var tag: String = _ + private var delegate: FileSystemObserverFactory = _ override def init(conf: Configuration, root: Path, sft: SimpleFeatureType): Unit = { try { - // use reflection to access to private client factory used by the s3a hadoop impl - fs = root.getFileSystem(conf) - val field = fs.getClass.getDeclaredField("s3") - field.setAccessible(true) - s3 = field.get(fs).asInstanceOf[AmazonS3] - tag = conf.get(S3VisibilityObserverFactory.TagNameConfig, S3VisibilityObserverFactory.DefaultTag) + if (S3VisibilityObserverFactory.UseV2) { + delegate = new v2.S3VisibilityObserverFactory() + } else { + delegate = new v1.S3VisibilityObserverFactory() + } + delegate.init(conf, root, sft) } catch { case e: Exception => throw new RuntimeException("Unable to get s3 client", e) } } - override def apply(path: Path): FileSystemObserver = new S3VisibilityObserver(s3, path, tag) - - override def close(): Unit = { - s3 = null - if (fs != null) { - try { - fs.close() - } catch { - case e: Exception => throw new IOException("Error closing S3 filesystem", e) - } finally { - fs = null - } - } - } + override def apply(path: Path): FileSystemObserver = delegate.apply(path) + + override def close(): Unit = if (delegate != null) { delegate.close() } } -object S3VisibilityObserverFactory { +object S3VisibilityObserverFactory extends LazyLogging { + val TagNameConfig = "geomesa.fs.vis.tag" val DefaultTag = "geomesa.file.visibility" + + lazy private val UseV2: Boolean = try { + val versionRegex = """(\d+)\.(\d+)\..*""".r + val versionRegex(maj, min) = org.apache.hadoop.util.VersionInfo.getVersion + maj.toInt >= 3 && min.toInt >= 4 + } catch { + case NonFatal(e) => logger.warn("Unable to evaluate hadoop version, defaulting to aws sdk v2: ", e); true + } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserver.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserver.scala new file mode 100644 index 000000000000..aad15d0a1ab0 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserver.scala @@ -0,0 +1,31 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.common.s3 +package v1 + +import com.amazonaws.services.s3.AmazonS3 +import com.amazonaws.services.s3.model.{ObjectTagging, SetObjectTaggingRequest, Tag} +import org.apache.hadoop.fs.Path + +import java.util.Collections + +/** + * Creates a tag containing the base64 encoded summary visibility for the observed file + * + * @param path file path + * @param s3 s3 client + * @param tag tag name to use + */ +class S3VisibilityObserver(val path: Path, s3: AmazonS3, tag: String) extends AbstractS3VisibilityObserver(path) { + override protected def makeTagRequest(bucket: String, key: String, visibility: String): Unit = { + val tagging = new ObjectTagging(Collections.singletonList(new Tag(tag, visibility))) + val request = new SetObjectTaggingRequest(bucket, key, tagging) + s3.setObjectTagging(request) + } +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserverFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserverFactory.scala new file mode 100644 index 000000000000..e615203e3ba9 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserverFactory.scala @@ -0,0 +1,53 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.common.s3 +package v1 + +import com.amazonaws.services.s3.AmazonS3 +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.geotools.api.feature.simple.SimpleFeatureType +import org.locationtech.geomesa.fs.storage.common.observer.{FileSystemObserver, FileSystemObserverFactory} +import org.locationtech.geomesa.utils.io.CloseQuietly + +import java.io.IOException + +/** + * Visibility observer for aws sdk v1 + */ +class S3VisibilityObserverFactory extends FileSystemObserverFactory { + + private var fs: FileSystem = _ + private var s3: AmazonS3 = _ + private var tag: String = _ + + override def init(conf: Configuration, root: Path, sft: SimpleFeatureType): Unit = { + try { + // use reflection to access to private client factory used by the s3a hadoop impl + fs = root.getFileSystem(conf) + val field = fs.getClass.getDeclaredField("s3") + field.setAccessible(true) + s3 = field.get(fs).asInstanceOf[AmazonS3] + tag = conf.get(S3VisibilityObserverFactory.TagNameConfig, S3VisibilityObserverFactory.DefaultTag) + } catch { + case e: Exception => throw new RuntimeException("Unable to get s3 client", e) + } + } + + override def apply(path: Path): FileSystemObserver = new S3VisibilityObserver(path, s3, tag) + + override def close(): Unit = { + if (fs != null) { + val err = CloseQuietly(fs) + s3 = null + fs = null + err.foreach(e => throw new IOException("Error closing S3 filesystem", e)) + } + } +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v2/S3VisibilityObserver.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v2/S3VisibilityObserver.scala new file mode 100644 index 000000000000..ba85575c8418 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v2/S3VisibilityObserver.scala @@ -0,0 +1,29 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.common.s3 +package v2 + +import org.apache.hadoop.fs.Path +import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.model.{PutObjectTaggingRequest, Tag, Tagging} + +/** + * Creates a tag containing the base64 encoded summary visibility for the observed file + * + * @param path file path + * @param s3 s3 client + * @param tag tag name to use + */ +class S3VisibilityObserver(path: Path, s3: S3Client, tag: String) extends AbstractS3VisibilityObserver(path) { + override protected def makeTagRequest(bucket: String, key: String, visibility: String): Unit = { + val tagging = Tagging.builder().tagSet(Tag.builder.key(tag).value(visibility).build()).build() + val request = PutObjectTaggingRequest.builder.bucket(bucket).key(key).tagging(tagging).build() + s3.putObjectTagging(request) + } +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v2/S3VisibilityObserverFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v2/S3VisibilityObserverFactory.scala new file mode 100644 index 000000000000..008ddfccceee --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/s3/v2/S3VisibilityObserverFactory.scala @@ -0,0 +1,51 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.common.s3 +package v2 + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.s3a.S3AFileSystem +import org.geotools.api.feature.simple.SimpleFeatureType +import org.locationtech.geomesa.fs.storage.common.observer.{FileSystemObserver, FileSystemObserverFactory} +import org.locationtech.geomesa.utils.io.CloseQuietly +import software.amazon.awssdk.services.s3.S3Client + +import java.io.IOException + +/** + * Visibility observer for aws sdk v2 + */ +class S3VisibilityObserverFactory extends FileSystemObserverFactory { + + private var fs: S3AFileSystem = _ + private var s3: S3Client = _ + private var tag: String = _ + + override def init(conf: Configuration, root: Path, sft: SimpleFeatureType): Unit = { + try { + fs = root.getFileSystem(conf).asInstanceOf[S3AFileSystem] + s3 = fs.getS3AInternals.getAmazonS3Client("tags") + tag = conf.get(S3VisibilityObserverFactory.TagNameConfig, S3VisibilityObserverFactory.DefaultTag) + } catch { + case e: Exception => throw new RuntimeException("Unable to get s3 client", e) + } + } + + override def apply(path: Path): FileSystemObserver = new S3VisibilityObserver(path, s3, tag) + + override def close(): Unit = { + if (fs != null) { + val err = CloseQuietly(fs) + s3 = null + fs = null + err.foreach(e => throw new IOException("Error closing S3 filesystem", e)) + } + } +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserverTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserverTest.scala index cc0987c9a45b..39b1b4626eeb 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserverTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/s3/S3VisibilityObserverTest.scala @@ -8,12 +8,9 @@ package org.locationtech.geomesa.fs.storage.common.s3 -import com.amazonaws.services.s3.AmazonS3 -import com.amazonaws.services.s3.model.{SetObjectTaggingRequest, Tag} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ -import org.apache.hadoop.fs.permission.FsPermission -import org.apache.hadoop.util.Progressable +import org.apache.hadoop.fs.s3a.{S3AFileSystem, S3AInternals} import org.geotools.api.feature.simple.SimpleFeature import org.junit.runner.RunWith import org.locationtech.geomesa.features.ScalaSimpleFeature @@ -24,8 +21,9 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.specs2.mock.Mockito import org.specs2.mutable.Specification import org.specs2.runner.JUnitRunner +import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.model.{PutObjectTaggingRequest, Tag} -import java.net.URI import java.nio.charset.StandardCharsets import java.util.Base64 @@ -42,105 +40,91 @@ class S3VisibilityObserverTest extends Specification with Mockito { sf } - // noinspection NotImplementedCode - class MockFileSystem extends FileSystem{ - - val s3: AmazonS3 = mock[AmazonS3] - - override def getUri: URI = ??? - override def open(path: Path, i: Int): FSDataInputStream = ??? - override def create(path: Path, fsp: FsPermission, b: Boolean, i: Int, i1: Short, l: Long, p: Progressable): FSDataOutputStream = ??? - override def append(path: Path, i: Int, p: Progressable): FSDataOutputStream = ??? - override def rename(path: Path, path1: Path): Boolean = ??? - override def delete(path: Path, b: Boolean): Boolean = ??? - override def listStatus(path: Path): Array[FileStatus] = ??? - override def setWorkingDirectory(path: Path): Unit = ??? - override def getWorkingDirectory: Path = ??? - override def mkdirs(path: Path, fsp: FsPermission): Boolean = ??? - override def getFileStatus(path: Path): FileStatus = ??? + def mockS3(factory: S3VisibilityObserverFactory): S3Client = { + val root = mock[Path] + val s3Internals = mock[S3AInternals] + val s3 = mock[S3Client] + val fs = new S3AFileSystem() { + override def getS3AInternals: S3AInternals = s3Internals + } + + root.getFileSystem(ArgumentMatchers.any()) returns fs + s3Internals.getAmazonS3Client(ArgumentMatchers.any()) returns s3 + factory.init(new Configuration(), root, sft) + s3 } "S3VisibilityObserver" should { "initialize factory correctly" >> { // mimic construction through reflection - WithClose(classOf[S3VisibilityObserverFactory].newInstance()) { factory => - val root = mock[Path] - root.getFileSystem(ArgumentMatchers.any()) returns new MockFileSystem() - factory.init(new Configuration(), root, sft) must not(throwAn[Exception]) + WithClose(classOf[S3VisibilityObserverFactory].getDeclaredConstructor().newInstance()) { factory => + mockS3(factory) must not(throwAn[Exception]) } } "tag a single visibility label" >> { WithClose(new S3VisibilityObserverFactory) { factory => - val fs = new MockFileSystem() - val root = mock[Path] - root.getFileSystem(ArgumentMatchers.any()) returns fs - factory.init(new Configuration(), root, sft) + val s3 = mockS3(factory) val observer = factory.apply(new Path("s3a://foo/bar/baz.json")) observer.write(feature(0, "user")) observer.close() - val captor: ArgumentCaptor[SetObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[SetObjectTaggingRequest]) - there was one(fs.s3).setObjectTagging(captor.capture()) + val captor: ArgumentCaptor[PutObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[PutObjectTaggingRequest]) + there was one(s3).putObjectTagging(captor.capture()) val request = captor.getValue - request.getBucketName mustEqual "foo" - request.getKey mustEqual "bar/baz.json" + request.bucket mustEqual "foo" + request.key mustEqual "bar/baz.json" val encoded = Base64.getEncoder.encodeToString("user".getBytes(StandardCharsets.UTF_8)) - request.getTagging.getTagSet.asScala mustEqual Seq(new Tag(S3VisibilityObserverFactory.DefaultTag, encoded)) + request.tagging.tagSet.asScala mustEqual + Seq(Tag.builder.key(S3VisibilityObserverFactory.DefaultTag).value(encoded).build()) } } "tag multiple visibility labels" >> { WithClose(new S3VisibilityObserverFactory) { factory => - val fs = new MockFileSystem() - val root = mock[Path] - root.getFileSystem(ArgumentMatchers.any()) returns fs - factory.init(new Configuration(), root, sft) + val s3 = mockS3(factory) val observer = factory.apply(new Path("s3a://foo/bar/baz.json")) observer.write(feature(0, "user")) observer.write(feature(1, "admin")) observer.write(feature(2, "user")) observer.close() - val captor: ArgumentCaptor[SetObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[SetObjectTaggingRequest]) - there was one(fs.s3).setObjectTagging(captor.capture()) + val captor: ArgumentCaptor[PutObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[PutObjectTaggingRequest]) + there was one(s3).putObjectTagging(captor.capture()) val request = captor.getValue - request.getBucketName mustEqual "foo" - request.getKey mustEqual "bar/baz.json" + request.bucket mustEqual "foo" + request.key mustEqual "bar/baz.json" // since the vis are kept in a set, the order is not defined val encodedFront = Base64.getEncoder.encodeToString("user&admin".getBytes(StandardCharsets.UTF_8)) val encodedBack = Base64.getEncoder.encodeToString("admin&user".getBytes(StandardCharsets.UTF_8)) - request.getTagging.getTagSet.asScala must beOneOf( - Seq(new Tag(S3VisibilityObserverFactory.DefaultTag, encodedFront)), - Seq(new Tag(S3VisibilityObserverFactory.DefaultTag, encodedBack)) + request.tagging.tagSet.asScala must beOneOf( + Seq(Tag.builder.key(S3VisibilityObserverFactory.DefaultTag).value(encodedFront).build()), + Seq(Tag.builder.key(S3VisibilityObserverFactory.DefaultTag).value(encodedBack).build()) ) } } "simplify tag expressions" >> { WithClose(new S3VisibilityObserverFactory) { factory => - val fs = new MockFileSystem() - val root = mock[Path] - root.getFileSystem(ArgumentMatchers.any()) returns fs - factory.init(new Configuration(), root, sft) + val s3 = mockS3(factory) val observer = factory.apply(new Path("s3a://foo/bar/baz.json")) observer.write(feature(0, "user&admin")) observer.write(feature(1, "admin")) observer.write(feature(2, "user")) observer.close() - val captor: ArgumentCaptor[SetObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[SetObjectTaggingRequest]) - there was one(fs.s3).setObjectTagging(captor.capture()) + val captor: ArgumentCaptor[PutObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[PutObjectTaggingRequest]) + there was one(s3).putObjectTagging(captor.capture()) val request = captor.getValue - request.getBucketName mustEqual "foo" - request.getKey mustEqual "bar/baz.json" + request.bucket mustEqual "foo" + request.key mustEqual "bar/baz.json" // since the vis are kept in a set, the order is not defined val encodedFront = Base64.getEncoder.encodeToString("user&admin".getBytes(StandardCharsets.UTF_8)) val encodedBack = Base64.getEncoder.encodeToString("admin&user".getBytes(StandardCharsets.UTF_8)) - request.getTagging.getTagSet.asScala must beOneOf( - Seq(new Tag(S3VisibilityObserverFactory.DefaultTag, encodedFront)), - Seq(new Tag(S3VisibilityObserverFactory.DefaultTag, encodedBack)) + request.tagging.tagSet.asScala must beOneOf( + Seq(Tag.builder.key(S3VisibilityObserverFactory.DefaultTag).value(encodedFront).build()), + Seq(Tag.builder.key(S3VisibilityObserverFactory.DefaultTag).value(encodedBack).build()) ) } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserverTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserverTest.scala new file mode 100644 index 000000000000..da217fa6c1cf --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/s3/v1/S3VisibilityObserverTest.scala @@ -0,0 +1,147 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.common.s3.v1 + +import com.amazonaws.services.s3.AmazonS3 +import com.amazonaws.services.s3.model.{SetObjectTaggingRequest, Tag} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.util.Progressable +import org.geotools.api.feature.simple.SimpleFeature +import org.junit.runner.RunWith +import org.locationtech.geomesa.features.ScalaSimpleFeature +import org.locationtech.geomesa.security.SecurityUtils +import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes +import org.locationtech.geomesa.utils.io.WithClose +import org.mockito.{ArgumentCaptor, ArgumentMatchers} +import org.specs2.mock.Mockito +import org.specs2.mutable.Specification +import org.specs2.runner.JUnitRunner + +import java.net.URI +import java.nio.charset.StandardCharsets +import java.util.Base64 + +@RunWith(classOf[JUnitRunner]) +class S3VisibilityObserverTest extends Specification with Mockito { + + import scala.collection.JavaConverters._ + + val sft = SimpleFeatureTypes.createType("s3", "dtg:Date,*geom:Point:srid=4326") + val defaultTag = org.locationtech.geomesa.fs.storage.common.s3.S3VisibilityObserverFactory.DefaultTag + + def feature(id: Int, vis: String): SimpleFeature = { + val sf = ScalaSimpleFeature.create(sft, s"$id", "2020-01-01T00:00:00.000Z", "POINT (45 55)") + SecurityUtils.setFeatureVisibility(sf, vis) + sf + } + + // noinspection NotImplementedCode + class MockFileSystem extends FileSystem { + + val s3: AmazonS3 = mock[AmazonS3] + + override def getUri: URI = ??? + override def open(path: Path, i: Int): FSDataInputStream = ??? + override def create(path: Path, fsp: FsPermission, b: Boolean, i: Int, i1: Short, l: Long, p: Progressable): FSDataOutputStream = ??? + override def append(path: Path, i: Int, p: Progressable): FSDataOutputStream = ??? + override def rename(path: Path, path1: Path): Boolean = ??? + override def delete(path: Path, b: Boolean): Boolean = ??? + override def listStatus(path: Path): Array[FileStatus] = ??? + override def setWorkingDirectory(path: Path): Unit = ??? + override def getWorkingDirectory: Path = ??? + override def mkdirs(path: Path, fsp: FsPermission): Boolean = ??? + override def getFileStatus(path: Path): FileStatus = ??? + } + + "S3VisibilityObserver" should { + + "initialize factory correctly" >> { + // mimic construction through reflection + WithClose(classOf[S3VisibilityObserverFactory].getDeclaredConstructor().newInstance()) { factory => + val root = mock[Path] + root.getFileSystem(ArgumentMatchers.any()) returns new MockFileSystem() + factory.init(new Configuration(), root, sft) must not(throwAn[Exception]) + } + } + + "tag a single visibility label" >> { + WithClose(new S3VisibilityObserverFactory) { factory => + val fs = new MockFileSystem() + val root = mock[Path] + root.getFileSystem(ArgumentMatchers.any()) returns fs + factory.init(new Configuration(), root, sft) + val observer = factory.apply(new Path("s3a://foo/bar/baz.json")) + observer.write(feature(0, "user")) + observer.close() + + val captor: ArgumentCaptor[SetObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[SetObjectTaggingRequest]) + there was one(fs.s3).setObjectTagging(captor.capture()) + val request = captor.getValue + request.getBucketName mustEqual "foo" + request.getKey mustEqual "bar/baz.json" + val encoded = Base64.getEncoder.encodeToString("user".getBytes(StandardCharsets.UTF_8)) + request.getTagging.getTagSet.asScala mustEqual Seq(new Tag(defaultTag, encoded)) + } + } + + "tag multiple visibility labels" >> { + WithClose(new S3VisibilityObserverFactory) { factory => + val fs = new MockFileSystem() + val root = mock[Path] + root.getFileSystem(ArgumentMatchers.any()) returns fs + factory.init(new Configuration(), root, sft) + val observer = factory.apply(new Path("s3a://foo/bar/baz.json")) + observer.write(feature(0, "user")) + observer.write(feature(1, "admin")) + observer.write(feature(2, "user")) + observer.close() + + val captor: ArgumentCaptor[SetObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[SetObjectTaggingRequest]) + there was one(fs.s3).setObjectTagging(captor.capture()) + val request = captor.getValue + request.getBucketName mustEqual "foo" + request.getKey mustEqual "bar/baz.json" + // since the vis are kept in a set, the order is not defined + val encodedFront = Base64.getEncoder.encodeToString("user&admin".getBytes(StandardCharsets.UTF_8)) + val encodedBack = Base64.getEncoder.encodeToString("admin&user".getBytes(StandardCharsets.UTF_8)) + request.getTagging.getTagSet.asScala must beOneOf( + Seq(new Tag(defaultTag, encodedFront)), Seq(new Tag(defaultTag, encodedBack)) + ) + } + } + + "simplify tag expressions" >> { + WithClose(new S3VisibilityObserverFactory) { factory => + val fs = new MockFileSystem() + val root = mock[Path] + root.getFileSystem(ArgumentMatchers.any()) returns fs + factory.init(new Configuration(), root, sft) + val observer = factory.apply(new Path("s3a://foo/bar/baz.json")) + observer.write(feature(0, "user&admin")) + observer.write(feature(1, "admin")) + observer.write(feature(2, "user")) + observer.close() + + val captor: ArgumentCaptor[SetObjectTaggingRequest] = ArgumentCaptor.forClass(classOf[SetObjectTaggingRequest]) + there was one(fs.s3).setObjectTagging(captor.capture()) + val request = captor.getValue + request.getBucketName mustEqual "foo" + request.getKey mustEqual "bar/baz.json" + // since the vis are kept in a set, the order is not defined + val encodedFront = Base64.getEncoder.encodeToString("user&admin".getBytes(StandardCharsets.UTF_8)) + val encodedBack = Base64.getEncoder.encodeToString("admin&user".getBytes(StandardCharsets.UTF_8)) + request.getTagging.getTagSet.asScala must beOneOf( + Seq(new Tag(defaultTag, encodedFront)), Seq(new Tag(defaultTag, encodedBack)) + ) + } + } + } +} diff --git a/geomesa-fs/geomesa-fs-tools/conf-filtered/dependencies.sh b/geomesa-fs/geomesa-fs-tools/conf-filtered/dependencies.sh index fca2dbb83267..d9696e586f5b 100755 --- a/geomesa-fs/geomesa-fs-tools/conf-filtered/dependencies.sh +++ b/geomesa-fs/geomesa-fs-tools/conf-filtered/dependencies.sh @@ -12,7 +12,9 @@ # Update the versions as required to match the target environment. hadoop_install_version="%%hadoop.version.recommended%%" -aws_sdk_install_version="1.12.385" # latest version as of 2023/01 +aws_sdk_v1_install_version="1.12.735" # latest version as of 2024/06 +aws_sdk_v2_install_version="2.25.64" # latest version as of 2024/06 +aws_crt_install_version="0.29.18" # this should match the parquet desired version snappy_install_version="1.1.1.6" @@ -23,12 +25,14 @@ function dependencies() { local classpath="$1" local hadoop_version="$hadoop_install_version" - local aws_sdk_version="$aws_sdk_install_version" + local aws_sdk_v1_version="$aws_sdk_v1_install_version" + local aws_sdk_v2_version="$aws_sdk_v2_install_version" local snappy_version="$snappy_install_version" if [[ -n "$classpath" ]]; then hadoop_version="$(get_classpath_version hadoop-common "$classpath" "$hadoop_version")" - aws_sdk_version="$(get_classpath_version aws-java-sdk-core "$classpath" "$aws_sdk_version")" + aws_sdk_v1_version="$(get_classpath_version aws-java-sdk-core "$classpath" "$aws_sdk_v1_version")" + aws_sdk_v2_version="$(get_classpath_version aws-core "$classpath" "$aws_sdk_v2_version")" snappy_version="$(get_classpath_version snappy-java "$classpath" "$snappy_version")" fi @@ -52,22 +56,14 @@ function dependencies() { "com.google.protobuf:protobuf-java:2.5.0:jar" "org.apache.htrace:htrace-core:3.1.0-incubating:jar" "org.apache.htrace:htrace-core4:4.1.0-incubating:jar" - "com.amazonaws:aws-java-sdk-core:${aws_sdk_version}:jar" - "com.amazonaws:aws-java-sdk-s3:${aws_sdk_version}:jar" - "com.amazonaws:aws-java-sdk-dynamodb:${aws_sdk_version}:jar" - # joda-time required for aws sdk - "joda-time:joda-time:2.8.1:jar" # these are the versions used by hadoop 2.8 and 3.1 "org.apache.httpcomponents:httpclient:4.5.2:jar" "org.apache.httpcomponents:httpcore:4.4.4:jar" "commons-httpclient:commons-httpclient:3.1:jar" - ) # add hadoop 3+ jars if needed - local hadoop_maj_ver - hadoop_maj_ver="$([[ "$hadoop_version" =~ ([0-9][0-9]*)\. ]] && echo "${BASH_REMATCH[1]}")" - if [[ "$hadoop_maj_ver" -ge 3 ]]; then + if version_ge "${hadoop_version}" 3.0.0; then gavs+=( "org.apache.hadoop:hadoop-client-api:${hadoop_version}:jar" "org.apache.hadoop:hadoop-client-runtime:${hadoop_version}:jar" @@ -79,6 +75,49 @@ function dependencies() { ) fi + # aws sdk + if version_ge "${hadoop_version}" 3.4.0; then + gavs+=( + "software.amazon.awssdk:annotations:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:apache-client:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:arns:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:auth:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:aws-core:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:aws-query-protocol:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:aws-xml-protocol:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:checksums:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:checksums-spi:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:crt-core:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:endpoints-spi:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:http-auth:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:http-auth-aws:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:http-auth-spi:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:http-client-spi:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:identity-spi:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:json-utils:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:metrics-spi:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:netty-nio-client:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:profiles:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:protocol-core:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:regions:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:s3:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:s3-transfer-manager:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:sdk-core:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:third-party-jackson-core:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk:utils:${aws_sdk_v2_version}:jar" + "software.amazon.awssdk.crt:aws-crt:${aws_crt_install_version}:jar" + "software.amazon.eventstream:eventstream:1.0.1:jar" + "org.reactivestreams:reactive-streams:1.0.4:jar" + ) + else + gavs+=( + "com.amazonaws:aws-java-sdk-core:${aws_sdk_v1_version}:jar" + "com.amazonaws:aws-java-sdk-s3:${aws_sdk_v1_version}:jar" + "com.amazonaws:aws-java-sdk-dynamodb:${aws_sdk_v1_version}:jar" + "joda-time:joda-time:2.8.1:jar" + ) + fi + echo "${gavs[@]}" | tr ' ' '\n' | sort | tr '\n' ' ' } diff --git a/geomesa-hbase/geomesa-hbase-tools/conf-filtered/dependencies.sh b/geomesa-hbase/geomesa-hbase-tools/conf-filtered/dependencies.sh index c3abf0235405..e04512c1fbdc 100755 --- a/geomesa-hbase/geomesa-hbase-tools/conf-filtered/dependencies.sh +++ b/geomesa-hbase/geomesa-hbase-tools/conf-filtered/dependencies.sh @@ -18,8 +18,6 @@ zookeeper_install_version="%%zookeeper.version.recommended%%" # required for hadoop - make sure it corresponds to the hadoop installed version guava_install_version="%%hbase.guava.version%%" -function version_ge() { test "$(echo "$@" | tr " " "\n" | sort -rV | head -n 1)" == "$1"; } - # gets the dependencies for this module # args: # $1 - current classpath diff --git a/geomesa-kafka/geomesa-kafka-tools/conf-filtered/dependencies.sh b/geomesa-kafka/geomesa-kafka-tools/conf-filtered/dependencies.sh index b415fa9edbbd..bb97eca55c42 100755 --- a/geomesa-kafka/geomesa-kafka-tools/conf-filtered/dependencies.sh +++ b/geomesa-kafka/geomesa-kafka-tools/conf-filtered/dependencies.sh @@ -14,8 +14,6 @@ kafka_install_version="%%kafka.version%%" zookeeper_install_version="%%zookeeper.version.recommended%%" -function version_ge() { test "$(echo "$@" | tr " " "\n" | sort -rV | head -n 1)" == "$1"; } - # gets the dependencies for this module # args: # $1 - current classpath diff --git a/geomesa-lambda/geomesa-lambda-tools/conf-filtered/dependencies.sh b/geomesa-lambda/geomesa-lambda-tools/conf-filtered/dependencies.sh index a46969c11083..c607a986a191 100755 --- a/geomesa-lambda/geomesa-lambda-tools/conf-filtered/dependencies.sh +++ b/geomesa-lambda/geomesa-lambda-tools/conf-filtered/dependencies.sh @@ -18,8 +18,6 @@ kafka_install_version="%%kafka.version%%" # required for hadoop - make sure it corresponds to the hadoop installed version guava_install_version="%%accumulo.guava.version%%" -function version_ge() { test "$(echo "$@" | tr " " "\n" | sort -rV | head -n 1)" == "$1"; } - # gets the dependencies for this module # args: # $1 - current classpath diff --git a/geomesa-tools/conf-filtered/geomesa-env.sh b/geomesa-tools/conf-filtered/geomesa-env.sh index 1a1ce5bf8f0b..f6338f9fb215 100644 --- a/geomesa-tools/conf-filtered/geomesa-env.sh +++ b/geomesa-tools/conf-filtered/geomesa-env.sh @@ -84,6 +84,9 @@ fi newline=$'\n' +# checks a version string is >= a test +function version_ge() { test "$(echo "$@" | tr " " "\n" | sort -rV | head -n 1)" == "$1"; } + # setup opts for invoking the geomesa java process function get_options() { # create log dir if needed diff --git a/geomesa-tools/conf-filtered/parquet-dependencies.sh b/geomesa-tools/conf-filtered/parquet-dependencies.sh index 7bdf14ecd5cb..228d13a74dda 100755 --- a/geomesa-tools/conf-filtered/parquet-dependencies.sh +++ b/geomesa-tools/conf-filtered/parquet-dependencies.sh @@ -12,8 +12,6 @@ hadoop_install_version="%%hadoop.version.recommended%%" -function version_ge() { test "$(echo "$@" | tr " " "\n" | sort -rV | head -n 1)" == "$1"; } - # gets the dependencies for this module # args: # $1 - current classpath diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/CloseablePool.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/CloseablePool.scala index 2e55641f29c4..f2709d5c6382 100644 --- a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/CloseablePool.scala +++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/CloseablePool.scala @@ -9,7 +9,7 @@ package org.locationtech.geomesa.utils.io import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool, GenericObjectPoolConfig} -import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject} +import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject, SwallowedExceptionListener} import java.io.Closeable @@ -70,6 +70,19 @@ object CloseablePool { } } - override def close(): Unit = pool.close() + override def close(): Unit = { + val errors = new java.util.concurrent.LinkedBlockingQueue[Exception]() + pool.setSwallowedExceptionListener(new SwallowedExceptionListener() { + override def onSwallowException(e: Exception): Unit = errors.offer(e) + }) + pool.close() + if (!errors.isEmpty) { + val e = errors.poll() + while (!errors.isEmpty) { + e.addSuppressed(errors.poll()) + } + throw e + } + } } } diff --git a/pom.xml b/pom.xml index 39994a3a502c..7b03b6387398 100644 --- a/pom.xml +++ b/pom.xml @@ -68,7 +68,9 @@ 1.0.0-beta 16.1.0 1.11.3 - 1.12.625 + 1.12.735 + 2.25.64 + 0.29.18 3.1.8 2.9.8 1.16.0 @@ -1587,6 +1589,21 @@ aws-java-sdk-cloudwatch ${aws.sdk.version} + + software.amazon.awssdk + s3 + ${aws.sdk.v2.version} + + + software.amazon.awssdk + s3-transfer-manager + ${aws.sdk.v2.version} + + + software.amazon.awssdk.crt + aws-crt + ${aws.crt.version} + commons-io commons-io @@ -2206,6 +2223,18 @@ ${hadoop.version} provided + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + provided + + + software.amazon.awssdk + bundle + + + xerces