diff --git a/geomesa-accumulo/geomesa-accumulo-jobs/src/main/scala/org/locationtech/geomesa/jobs/accumulo/mapreduce/package.scala b/geomesa-accumulo/geomesa-accumulo-jobs/src/main/scala/org/locationtech/geomesa/jobs/accumulo/mapreduce/package.scala index e39390b06715..cba0fc40b38d 100644 --- a/geomesa-accumulo/geomesa-accumulo-jobs/src/main/scala/org/locationtech/geomesa/jobs/accumulo/mapreduce/package.scala +++ b/geomesa-accumulo/geomesa-accumulo-jobs/src/main/scala/org/locationtech/geomesa/jobs/accumulo/mapreduce/package.scala @@ -9,18 +9,16 @@ package org.locationtech.geomesa.jobs.accumulo import java.io.{BufferedOutputStream, DataInput, DataOutput, PrintStream} -import java.net.URI import java.nio.charset.StandardCharsets import java.util.{Base64, Scanner} import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine} -import org.apache.accumulo.core.clientImpl.mapreduce.lib.DistributedCacheHelper import org.apache.accumulo.core.data.Key import org.apache.hadoop.conf.{Configurable, Configuration} -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Options.CreateOpts +import org.apache.hadoop.fs.{CreateFlag, FileContext, FileSystem, Path} import org.apache.hadoop.io.{BinaryComparable, Text, Writable, WritableComparable} import org.apache.hadoop.mapreduce.{Job, Partitioner} -import org.locationtech.geomesa.jobs.accumulo.mapreduce.GeoMesaAccumuloFileOutputFormat.SplitsPath import org.locationtech.geomesa.utils.io.WithClose import scala.collection.mutable.ArrayBuffer @@ -116,14 +114,16 @@ package object mapreduce { // must be called after setSplitsPath def setTableSplits(job: Job, table: String, splits: Iterable[Text]): Unit = { - WithClose(FileSystem.get(job.getConfiguration)) { fs => - val output = s"${getSplitsPath(job.getConfiguration)}/$table.txt" - WithClose(new PrintStream(new BufferedOutputStream(fs.create(new Path(output))))) { out => - splits.foreach(split => out.println(Base64.getEncoder.encodeToString(split.copyBytes))) - } - // this makes the file accessible as a local file on the cluster - job.addCacheFile(new URI(s"$output#$table.txt")) + val dir = getSplitsPath(job.getConfiguration) + val file = s"$table.txt" + val output = new Path(s"$dir/$file") + val fc = FileContext.getFileContext(output.toUri, job.getConfiguration) + val flags = java.util.EnumSet.of(CreateFlag.CREATE) + WithClose(new PrintStream(new BufferedOutputStream(fc.create(output, flags, CreateOpts.createParent)))) { out => + splits.foreach(split => out.println(Base64.getEncoder.encodeToString(split.copyBytes))) } + // this makes the file accessible as a local file on the cluster + job.addCacheFile(output.toUri) } def setSplitsPath(conf: Configuration, path: String): Unit = conf.set(SplitsPath, path) diff --git a/geomesa-accumulo/geomesa-accumulo-tools/src/main/resources/org/locationtech/geomesa/accumulo/tools/accumulo-libjars.list b/geomesa-accumulo/geomesa-accumulo-tools/src/main/resources/org/locationtech/geomesa/accumulo/tools/accumulo-libjars.list index 333a0a1066a3..b18148c7df6b 100644 --- a/geomesa-accumulo/geomesa-accumulo-tools/src/main/resources/org/locationtech/geomesa/accumulo/tools/accumulo-libjars.list +++ b/geomesa-accumulo/geomesa-accumulo-tools/src/main/resources/org/locationtech/geomesa/accumulo/tools/accumulo-libjars.list @@ -1,3 +1,4 @@ +aws-java-sdk accumulo-core accumulo-fate accumulo-hadoop-mapreduce @@ -7,5 +8,6 @@ commons-collections4 curator-client curator-framework curator-recipes +hadoop-aws htrace libthrift