diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala
index 7d9860096835..9e3541268a01 100644
--- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala
+++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala
@@ -12,11 +12,9 @@ import java.nio.charset.StandardCharsets
import java.util.Collections
import java.util.Map.Entry
-import org.apache.accumulo.core.client.IteratorSetting
import org.apache.accumulo.core.conf.Property
import org.apache.accumulo.core.data.{Key, Mutation, Range, Value}
import org.apache.accumulo.core.file.keyfunctor.RowFunctor
-import org.apache.accumulo.core.iterators.user.ReqVisFilter
import org.apache.accumulo.core.security.ColumnVisibility
import org.apache.hadoop.io.Text
import org.locationtech.geomesa.accumulo.data.AccumuloIndexAdapter.{AccumuloIndexWriter, AccumuloResultsToFeatures, ZIterPriority}
@@ -280,15 +278,7 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore) extends IndexAdapter[AccumuloD
sft: SimpleFeatureType,
indices: Seq[GeoMesaFeatureIndex[_, _]],
partition: Option[String]): AccumuloIndexWriter = {
- // make sure to provide our index values for attribute join indices if we need them
- val base = WritableFeature.wrapper(sft, groups)
- val wrapper =
- if (indices.exists(_.isInstanceOf[AccumuloJoinIndex])) {
- AccumuloWritableFeature.wrapper(sft, base)
- } else {
- base
- }
-
+ val wrapper = AccumuloWritableFeature.wrapper(sft, groups, indices)
if (sft.isVisibilityRequired) {
new AccumuloIndexWriter(ds, indices, wrapper, partition) with RequiredVisibilityWriter
} else {
@@ -386,9 +376,7 @@ object AccumuloIndexAdapter {
private val colFamilyMappings = indices.map(mapColumnFamily).toArray
private val timestamps = indices.exists(i => !i.sft.isLogicalTime)
-
- private val defaultVisibility = new ColumnVisibility()
- private val visibilities = new java.util.HashMap[VisHolder, ColumnVisibility]()
+ private val visCache = new VisibilityCache()
private var i = 0
@@ -404,16 +392,7 @@ object AccumuloIndexAdapter {
case kv: SingleRowKeyValue[_] =>
val mutation = new Mutation(kv.row)
kv.values.foreach { v =>
- val vis = if (v.vis.isEmpty) { defaultVisibility } else {
- val lookup = new VisHolder(v.vis)
- var cached = visibilities.get(lookup)
- if (cached == null) {
- cached = new ColumnVisibility(v.vis)
- visibilities.put(lookup, cached)
- }
- cached
- }
- mutation.put(colFamilyMappings(i)(v.cf), v.cq, vis, v.value)
+ mutation.put(colFamilyMappings(i)(v.cf), v.cq, visCache(v.vis), v.value)
}
writers(i).addMutation(mutation)
@@ -421,16 +400,7 @@ object AccumuloIndexAdapter {
mkv.rows.foreach { row =>
val mutation = new Mutation(row)
mkv.values.foreach { v =>
- val vis = if (v.vis.isEmpty) { defaultVisibility } else {
- val lookup = new VisHolder(v.vis)
- var cached = visibilities.get(lookup)
- if (cached == null) {
- cached = new ColumnVisibility(v.vis)
- visibilities.put(lookup, cached)
- }
- cached
- }
- mutation.put(colFamilyMappings(i)(v.cf), v.cq, vis, v.value)
+ mutation.put(colFamilyMappings(i)(v.cf), v.cq, visCache(v.vis), v.value)
}
writers(i).addMutation(mutation)
}
@@ -446,16 +416,7 @@ object AccumuloIndexAdapter {
case SingleRowKeyValue(row, _, _, _, _, _, vals) =>
val mutation = new Mutation(row)
vals.foreach { v =>
- val vis = if (v.vis.isEmpty) { defaultVisibility } else {
- val lookup = new VisHolder(v.vis)
- var cached = visibilities.get(lookup)
- if (cached == null) {
- cached = new ColumnVisibility(v.vis)
- visibilities.put(lookup, cached)
- }
- cached
- }
- mutation.putDelete(colFamilyMappings(i)(v.cf), v.cq, vis)
+ mutation.putDelete(colFamilyMappings(i)(v.cf), v.cq, visCache(v.vis))
}
writers(i).addMutation(mutation)
@@ -463,16 +424,7 @@ object AccumuloIndexAdapter {
rows.foreach { row =>
val mutation = new Mutation(row)
vals.foreach { v =>
- val vis = if (v.vis.isEmpty) { defaultVisibility } else {
- val lookup = new VisHolder(v.vis)
- var cached = visibilities.get(lookup)
- if (cached == null) {
- cached = new ColumnVisibility(v.vis)
- visibilities.put(lookup, cached)
- }
- cached
- }
- mutation.putDelete(colFamilyMappings(i)(v.cf), v.cq, vis)
+ mutation.putDelete(colFamilyMappings(i)(v.cf), v.cq, visCache(v.vis))
}
writers(i).addMutation(mutation)
}
@@ -532,6 +484,27 @@ object AccumuloIndexAdapter {
}
}
+ /**
+ * Cache for storing column visibilities - not thread safe
+ */
+ class VisibilityCache {
+
+ private val defaultVisibility = new ColumnVisibility()
+ private val visibilities = new java.util.HashMap[VisHolder, ColumnVisibility]()
+
+ def apply(vis: Array[Byte]): ColumnVisibility = {
+ if (vis.isEmpty) { defaultVisibility } else {
+ val lookup = new VisHolder(vis)
+ var cached = visibilities.get(lookup)
+ if (cached == null) {
+ cached = new ColumnVisibility(vis)
+ visibilities.put(lookup, cached)
+ }
+ cached
+ }
+ }
+ }
+
/**
* Wrapper for byte array to use as a key in the cached visibilities map
*
diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloWritableFeature.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloWritableFeature.scala
index 592371ebf2a5..3e5ce2ccbf76 100644
--- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloWritableFeature.scala
+++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloWritableFeature.scala
@@ -8,10 +8,11 @@
package org.locationtech.geomesa.accumulo.data
-import org.locationtech.geomesa.accumulo.index.IndexValueEncoder
+import org.locationtech.geomesa.accumulo.index.{AccumuloJoinIndex, IndexValueEncoder}
import org.locationtech.geomesa.features.{ScalaSimpleFeature, SimpleFeatureSerializer}
import org.locationtech.geomesa.index.api.WritableFeature.{AttributeLevelWritableFeature, FeatureWrapper}
-import org.locationtech.geomesa.index.api.{KeyValue, WritableFeature}
+import org.locationtech.geomesa.index.api.{GeoMesaFeatureIndex, KeyValue, WritableFeature}
+import org.locationtech.geomesa.index.conf.ColumnGroups
import org.locationtech.geomesa.utils.index.VisibilityLevel
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
@@ -35,12 +36,17 @@ object AccumuloWritableFeature {
def wrapper(
sft: SimpleFeatureType,
- delegate: FeatureWrapper[WritableFeature]): FeatureWrapper[AccumuloWritableFeature] = {
- val serializer = IndexValueEncoder(sft)
- if (sft.getVisibilityLevel == VisibilityLevel.Attribute) {
- new AccumuloAttributeLevelFeatureWrapper(delegate, serializer)
- } else {
- new AccumuloFeatureLevelFeatureWrapper(delegate, serializer)
+ groups: ColumnGroups,
+ indices: Seq[GeoMesaFeatureIndex[_, _]]): FeatureWrapper[WritableFeature] = {
+ // make sure to provide our index values for attribute join indices if we need them
+ val base = WritableFeature.wrapper(sft, groups)
+ if (indices.forall(i => !i.isInstanceOf[AccumuloJoinIndex])) { base } else {
+ val serializer = IndexValueEncoder(sft)
+ if (sft.getVisibilityLevel == VisibilityLevel.Attribute) {
+ new AccumuloAttributeLevelFeatureWrapper(base, serializer)
+ } else {
+ new AccumuloFeatureLevelFeatureWrapper(base, serializer)
+ }
}
}
diff --git a/geomesa-accumulo/geomesa-accumulo-jobs/pom.xml b/geomesa-accumulo/geomesa-accumulo-jobs/pom.xml
index 05f70b024b0a..a7db7f6b32bd 100644
--- a/geomesa-accumulo/geomesa-accumulo-jobs/pom.xml
+++ b/geomesa-accumulo/geomesa-accumulo-jobs/pom.xml
@@ -24,6 +24,10 @@
org.locationtech.geomesa
geomesa-jobs_${scala.binary.version}
+
+ com.github.ben-manes.caffeine
+ caffeine
+
com.beust
jcommander
@@ -38,6 +42,10 @@
org.apache.accumulo
accumulo-core
+
+ org.apache.accumulo
+ accumulo-hadoop-mapreduce
+
org.apache.zookeeper
zookeeper
diff --git a/geomesa-accumulo/geomesa-accumulo-jobs/src/main/scala/org/locationtech/geomesa/jobs/accumulo/index/AttributeIndexJob.scala b/geomesa-accumulo/geomesa-accumulo-jobs/src/main/scala/org/locationtech/geomesa/jobs/accumulo/index/AttributeIndexJob.scala
index 147b7523a829..eabf802dc33e 100644
--- a/geomesa-accumulo/geomesa-accumulo-jobs/src/main/scala/org/locationtech/geomesa/jobs/accumulo/index/AttributeIndexJob.scala
+++ b/geomesa-accumulo/geomesa-accumulo-jobs/src/main/scala/org/locationtech/geomesa/jobs/accumulo/index/AttributeIndexJob.scala
@@ -106,7 +106,7 @@ object AttributeIndexJob {
(i.name == AttributeIndex.name || i.name == JoinIndex.name) &&
i.attributes.headOption.exists(attributes.contains)
}
- wrapper = AccumuloWritableFeature.wrapper(sft, WritableFeature.wrapper(sft, ds.adapter.groups))
+ wrapper = AccumuloWritableFeature.wrapper(sft, ds.adapter.groups, indices)
converters = indices.map(_.createConverter()).zipWithIndex
colFamilyMappings = indices.map(AccumuloIndexAdapter.mapColumnFamily).toIndexedSeq
defaultVis = new ColumnVisibility()
diff --git a/geomesa-accumulo/geomesa-accumulo-jobs/src/main/scala/org/locationtech/geomesa/jobs/accumulo/mapreduce/GeoMesaAccumuloFileOutputFormat.scala b/geomesa-accumulo/geomesa-accumulo-jobs/src/main/scala/org/locationtech/geomesa/jobs/accumulo/mapreduce/GeoMesaAccumuloFileOutputFormat.scala
new file mode 100644
index 000000000000..bea848e00607
--- /dev/null
+++ b/geomesa-accumulo/geomesa-accumulo-jobs/src/main/scala/org/locationtech/geomesa/jobs/accumulo/mapreduce/GeoMesaAccumuloFileOutputFormat.scala
@@ -0,0 +1,209 @@
+/***********************************************************************
+ * Copyright (c) 2013-2021 Commonwealth Computer Research, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Apache License, Version 2.0
+ * which accompanies this distribution and is available at
+ * http://www.opensource.org/licenses/apache2.0.php.
+ ***********************************************************************/
+
+package org.locationtech.geomesa.jobs.accumulo.mapreduce
+
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.accumulo.core.data.{Key, Value}
+import org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{Text, Writable}
+import org.apache.hadoop.mapreduce.lib.output.{LazyOutputFormat, MultipleOutputs}
+import org.apache.hadoop.mapreduce.{Counter, Job, Mapper, Reducer}
+import org.geotools.data.DataStoreFinder
+import org.locationtech.geomesa.accumulo.data.AccumuloIndexAdapter.VisibilityCache
+import org.locationtech.geomesa.accumulo.data.{AccumuloDataStore, AccumuloWritableFeature}
+import org.locationtech.geomesa.index.api.WritableFeature.FeatureWrapper
+import org.locationtech.geomesa.index.api._
+import org.locationtech.geomesa.index.conf.partition.TablePartition
+import org.locationtech.geomesa.jobs.GeoMesaConfigurator
+import org.locationtech.geomesa.jobs.mapreduce.GeoMesaOutputFormat.OutputCounters
+import org.locationtech.geomesa.utils.index.IndexMode
+import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
+
+import scala.util.control.NonFatal
+
+object GeoMesaAccumuloFileOutputFormat extends LazyLogging {
+
+ import scala.collection.JavaConverters._
+
+ val FilesPath = "files"
+ val SplitsPath = "splits"
+
+ /**
+ * Sets mapper class, reducer class, output format and associated options
+ *
+ * @param job job
+ * @param ds data store for output data
+ * @param params data store parameters for output data
+ * @param sft feature type to write (schema must exist already)
+ * @param output output path for rFiles
+ * @param index optional index to write
+ * @param partitions if writing to a partitioned store, the partitions being written to
+ */
+ def configure(
+ job: Job,
+ ds: AccumuloDataStore,
+ params: Map[String, String],
+ sft: SimpleFeatureType,
+ output: Path,
+ index: Option[String],
+ partitions: Option[Seq[String]]): Unit = {
+
+ val indices = index match {
+ case None => ds.manager.indices(sft, IndexMode.Write)
+ case Some(i) => Seq(ds.manager.index(sft, i, IndexMode.Write))
+ }
+
+ val tables = partitions match {
+ case None => indices.flatMap(_.getTableNames(None))
+ case Some(parts) =>
+ Configurator.setPartitions(job.getConfiguration, parts)
+ logger.debug(s"Creating index tables for ${parts.length} partitions")
+ parts.flatMap { p =>
+ // create the partitions up front so we know the number of splits and reducers - this call is idempotent
+ indices.par.foreach(index => ds.adapter.createTable(index, Some(p), index.getSplits(Some(p))))
+ indices.flatMap(_.getTableNames(Some(p)))
+ }
+ }
+
+ if (tables.isEmpty) {
+ throw new IllegalArgumentException("No tables found for output")
+ }
+
+ GeoMesaConfigurator.setDataStoreOutParams(job.getConfiguration, params)
+ GeoMesaConfigurator.setIndicesOut(job.getConfiguration, indices.map(_.identifier))
+ GeoMesaConfigurator.setSerialization(job.getConfiguration, sft)
+ Configurator.setTypeName(job.getConfiguration, sft.getTypeName)
+ // using LazyOutputFormat prevents creating an empty output file in the default path
+ LazyOutputFormat.setOutputFormatClass(job, classOf[AccumuloFileOutputFormat])
+ // note: this is equivalent to FileOutputFormat.setOutputPath(job, output)
+ AccumuloFileOutputFormat.configure.outputPath(new Path(output, FilesPath)).store(job)
+
+ job.setPartitionerClass(classOf[TableRangePartitioner])
+ TableRangePartitioner.setSplitsPath(job.getConfiguration, new Path(output, SplitsPath).toString)
+
+ var numReducers = 0
+ tables.foreach { table =>
+ val splits = ds.connector.tableOperations.listSplits(table).asScala
+ TableRangePartitioner.setTableOffset(job.getConfiguration, table, numReducers)
+ TableRangePartitioner.setTableSplits(job, table, splits)
+ numReducers += (splits.size + 1) // add one for the region before the first split point
+ }
+
+ job.setMapperClass(classOf[AccumuloFileMapper])
+ job.setMapOutputKeyClass(classOf[TableAndKey])
+ job.setMapOutputValueClass(classOf[Value])
+ job.setReducerClass(classOf[AccumuloFileReducer])
+ job.setOutputKeyClass(classOf[Key])
+ job.setOutputValueClass(classOf[Value])
+ job.setNumReduceTasks(numReducers)
+ }
+
+ class AccumuloFileMapper extends Mapper[Writable, SimpleFeature, TableAndKey, Value] with LazyLogging {
+
+ type MapContext = Mapper[Writable, SimpleFeature, TableAndKey, Value]#Context
+
+ private var ds: AccumuloDataStore = _
+ private var sft: SimpleFeatureType = _
+ private var wrapper: FeatureWrapper[WritableFeature] = _
+ private var partitioner: Option[TablePartition] = _
+ private var writers: Seq[(GeoMesaFeatureIndex[_, _], WriteConverter[_])] = _
+
+ private val visCache = new VisibilityCache()
+ private val tableAndKey = new TableAndKey(new Text(), null)
+
+ private var features: Counter = _
+ private var entries: Counter = _
+ private var failed: Counter = _
+
+ override def setup(context: MapContext): Unit = {
+ val params = GeoMesaConfigurator.getDataStoreOutParams(context.getConfiguration).asJava
+ ds = DataStoreFinder.getDataStore(params).asInstanceOf[AccumuloDataStore]
+ require(ds != null, "Could not find data store - check your configuration and hbase-site.xml")
+ sft = ds.getSchema(Configurator.getTypeName(context.getConfiguration))
+ require(sft != null, "Could not find schema - check your configuration")
+
+ val indexIds = GeoMesaConfigurator.getIndicesOut(context.getConfiguration).orNull
+ require(indexIds != null, "Indices to write was not set in the job configuration")
+ val indices = indexIds.map(ds.manager.index(sft, _, IndexMode.Write))
+ wrapper = AccumuloWritableFeature.wrapper(sft, ds.adapter.groups, indices)
+ partitioner = TablePartition(ds, sft)
+ writers = indices.map(i => (i, i.createConverter()))
+
+ features = context.getCounter(OutputCounters.Group, OutputCounters.Written)
+ entries = context.getCounter(OutputCounters.Group, "entries")
+ failed = context.getCounter(OutputCounters.Group, OutputCounters.Failed)
+ }
+
+ override def cleanup(context: MapContext): Unit = if (ds != null) { ds.dispose() }
+
+ override def map(key: Writable, value: SimpleFeature, context: MapContext): Unit = {
+ // TODO create a common writer that will create mutations without writing them
+ try {
+ val feature = wrapper.wrap(value)
+ val partition = partitioner.map(_.partition(value))
+ writers.foreach { case (index, writer) =>
+ index.getTableNames(partition) match {
+ case Seq(table) => tableAndKey.getTable.set(table)
+ case tables =>
+ val msg = if (tables.isEmpty) { "No table found" } else { "Multiple tables found" }
+ throw new IllegalStateException(msg + partition.map(p => s" for partition $p").getOrElse(""))
+ }
+
+ writer.convert(feature) match {
+ case kv: SingleRowKeyValue[_] =>
+ kv.values.foreach { value =>
+ tableAndKey.setKey(new Key(kv.row, value.cf, value.cq, visCache(value.vis), Long.MaxValue))
+ context.write(tableAndKey, new Value(value.value))
+ entries.increment(1L)
+ }
+
+ case mkv: MultiRowKeyValue[_] =>
+ mkv.rows.foreach { row =>
+ mkv.values.foreach { value =>
+ tableAndKey.setKey(new Key(row, value.cf, value.cq, visCache(value.vis), Long.MaxValue))
+ context.write(tableAndKey, new Value(value.value))
+ entries.increment(1L)
+ }
+ }
+ }
+ }
+
+ features.increment(1L)
+ } catch {
+ case NonFatal(e) =>
+ logger.error(s"Error writing feature ${Option(value).orNull}", e)
+ failed.increment(1L)
+ }
+ }
+ }
+
+ class AccumuloFileReducer extends Reducer[TableAndKey, Value, Key, Value] {
+
+ type ReducerContext = Reducer[TableAndKey, Value, Key, Value]#Context
+
+ private var id: String = _
+ private var out: MultipleOutputs[Key, Value] = _
+
+ override def setup(context: ReducerContext): Unit = {
+ id = context.getJobID.appendTo(new java.lang.StringBuilder("gm")).toString
+ out = new MultipleOutputs(context)
+ }
+ override def cleanup(context: ReducerContext): Unit = if (out != null) { out.close() }
+
+ override def reduce(key: TableAndKey, values: java.lang.Iterable[Value], context: ReducerContext): Unit = {
+ val path = s"${key.getTable}/$id"
+ val iter = values.iterator()
+ while (iter.hasNext) {
+ out.write(key.getKey, iter.next, path)
+ }
+ }
+ }
+}
+
diff --git a/geomesa-accumulo/geomesa-accumulo-jobs/src/main/scala/org/locationtech/geomesa/jobs/accumulo/mapreduce/package.scala b/geomesa-accumulo/geomesa-accumulo-jobs/src/main/scala/org/locationtech/geomesa/jobs/accumulo/mapreduce/package.scala
new file mode 100644
index 000000000000..e39390b06715
--- /dev/null
+++ b/geomesa-accumulo/geomesa-accumulo-jobs/src/main/scala/org/locationtech/geomesa/jobs/accumulo/mapreduce/package.scala
@@ -0,0 +1,136 @@
+/***********************************************************************
+ * Copyright (c) 2013-2021 Commonwealth Computer Research, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Apache License, Version 2.0
+ * which accompanies this distribution and is available at
+ * http://www.opensource.org/licenses/apache2.0.php.
+ ***********************************************************************/
+
+package org.locationtech.geomesa.jobs.accumulo
+
+import java.io.{BufferedOutputStream, DataInput, DataOutput, PrintStream}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util.{Base64, Scanner}
+
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
+import org.apache.accumulo.core.clientImpl.mapreduce.lib.DistributedCacheHelper
+import org.apache.accumulo.core.data.Key
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.{BinaryComparable, Text, Writable, WritableComparable}
+import org.apache.hadoop.mapreduce.{Job, Partitioner}
+import org.locationtech.geomesa.jobs.accumulo.mapreduce.GeoMesaAccumuloFileOutputFormat.SplitsPath
+import org.locationtech.geomesa.utils.io.WithClose
+
+import scala.collection.mutable.ArrayBuffer
+
+package object mapreduce {
+
+ object Configurator {
+
+ private val TypeNameKey = "org.locationtech.geomesa.accumulo.typename"
+ private val PartitionsKey = "org.locationtech.geomesa.accumulo.partitions"
+
+ def setTypeName(conf: Configuration, typeName: String): Unit = conf.set(TypeNameKey, typeName)
+ def getTypeName(conf: Configuration): String = conf.get(TypeNameKey)
+ def setPartitions(conf: Configuration, partitions: Seq[String]): Unit =
+ conf.set(PartitionsKey, partitions.mkString(","))
+ def getPartitions(conf: Configuration): Option[Seq[String]] = Option(conf.get(PartitionsKey)).map(_.split(","))
+ }
+
+ class TableAndKey extends WritableComparable[TableAndKey] {
+
+ private var table: Text = _
+ private var key: Key = _
+
+ def this(table: Text, key: Key) = {
+ this()
+ this.table = table
+ this.key = key
+ }
+
+ def getTable: Text = table
+ def setTable(table: Text): Unit = this.table = table
+ def getKey: Key = key
+ def setKey(key: Key): Unit = this.key = key
+
+ override def write(out: DataOutput): Unit = {
+ table.write(out)
+ key.write(out)
+ }
+
+ override def readFields(in: DataInput): Unit = {
+ table = new Text()
+ table.readFields(in)
+ key = new Key()
+ key.readFields(in)
+ }
+
+ override def compareTo(o: TableAndKey): Int = {
+ val c = table.compareTo(o.table)
+ if (c != 0) { c } else {
+ key.compareTo(o.key)
+ }
+ }
+ }
+
+ class TableRangePartitioner extends Partitioner[TableAndKey, Writable] with Configurable {
+
+ private var conf: Configuration = _
+
+ private val splitsPerTable = Caffeine.newBuilder().build(new CacheLoader[Text, (Int, Array[AnyRef])]() {
+ override def load(k: Text): (Int, Array[AnyRef]) = {
+ val splits = ArrayBuffer.empty[Text]
+ // the should be available due to our calling job.addCacheFile
+ WithClose(FileSystem.getLocal(conf)) { fs =>
+ val path = new Path(s"${k.toString}.txt")
+ WithClose(new Scanner(fs.open(path), StandardCharsets.UTF_8.name)) { scanner =>
+ while (scanner.hasNextLine) {
+ splits += new Text(Base64.getDecoder.decode(scanner.nextLine))
+ }
+ }
+ }
+ val sorted = splits.distinct.sorted(Ordering.by[Text, BinaryComparable](identity)).toArray[AnyRef]
+ val offset = TableRangePartitioner.getTableOffset(conf, k.toString)
+ (offset, sorted)
+ }
+ })
+
+ override def getPartition(key: TableAndKey, value: Writable, total: Int): Int = {
+ val (offset, splits) = splitsPerTable.get(key.getTable)
+ val i = java.util.Arrays.binarySearch(splits, key.getKey.getRow)
+ // account for negative results indicating the spot between 2 values
+ val index = if (i < 0) { (i + 1) * -1 } else { i }
+ offset + index
+ }
+
+ override def setConf(configuration: Configuration): Unit = this.conf = configuration
+ override def getConf: Configuration = conf
+ }
+
+ object TableRangePartitioner {
+
+ private val SplitsPath = "org.locationtech.geomesa.accumulo.splits.path"
+ private val TableOffset = "org.locationtech.geomesa.accumulo.table.offset"
+
+ // must be called after setSplitsPath
+ def setTableSplits(job: Job, table: String, splits: Iterable[Text]): Unit = {
+ WithClose(FileSystem.get(job.getConfiguration)) { fs =>
+ val output = s"${getSplitsPath(job.getConfiguration)}/$table.txt"
+ WithClose(new PrintStream(new BufferedOutputStream(fs.create(new Path(output))))) { out =>
+ splits.foreach(split => out.println(Base64.getEncoder.encodeToString(split.copyBytes)))
+ }
+ // this makes the file accessible as a local file on the cluster
+ job.addCacheFile(new URI(s"$output#$table.txt"))
+ }
+ }
+
+ def setSplitsPath(conf: Configuration, path: String): Unit = conf.set(SplitsPath, path)
+ def getSplitsPath(conf: Configuration): String = conf.get(SplitsPath)
+
+ def setTableOffset(conf: Configuration, table: String, offset: Int): Unit =
+ conf.setInt(s"$TableOffset.$table", offset)
+ def getTableOffset(conf: Configuration, table: String): Int = conf.get(s"$TableOffset.$table").toInt
+ }
+}
diff --git a/geomesa-accumulo/geomesa-accumulo-tools/conf-filtered/dependencies.sh b/geomesa-accumulo/geomesa-accumulo-tools/conf-filtered/dependencies.sh
index 2a8e704fe24c..34a4b8350cf1 100755
--- a/geomesa-accumulo/geomesa-accumulo-tools/conf-filtered/dependencies.sh
+++ b/geomesa-accumulo/geomesa-accumulo-tools/conf-filtered/dependencies.sh
@@ -61,6 +61,11 @@ function dependencies() {
"org.apache.accumulo:accumulo-fate:${accumulo_version}:jar"
"org.apache.accumulo:accumulo-trace:${accumulo_version}:jar"
)
+ else
+ gavs+=(
+ "org.apache.commons:commons-collections4:4.3:jar"
+ "org.apache.accumulo:accumulo-hadoop-mapreduce:${accumulo_version}:jar"
+ )
fi
# add hadoop 3+ jars if needed
diff --git a/geomesa-accumulo/geomesa-accumulo-tools/src/main/resources/org/locationtech/geomesa/accumulo/tools/accumulo-libjars.list b/geomesa-accumulo/geomesa-accumulo-tools/src/main/resources/org/locationtech/geomesa/accumulo/tools/accumulo-libjars.list
index da53df6decbc..333a0a1066a3 100644
--- a/geomesa-accumulo/geomesa-accumulo-tools/src/main/resources/org/locationtech/geomesa/accumulo/tools/accumulo-libjars.list
+++ b/geomesa-accumulo/geomesa-accumulo-tools/src/main/resources/org/locationtech/geomesa/accumulo/tools/accumulo-libjars.list
@@ -1,7 +1,9 @@
accumulo-core
accumulo-fate
+accumulo-hadoop-mapreduce
accumulo-start
accumulo-trace
+commons-collections4
curator-client
curator-framework
curator-recipes
diff --git a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/AccumuloRunner.scala b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/AccumuloRunner.scala
index fb54d1351193..e0d6aec13043 100644
--- a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/AccumuloRunner.scala
+++ b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/AccumuloRunner.scala
@@ -31,6 +31,7 @@ object AccumuloRunner extends RunnerWithAccumuloEnvironment {
new tools.export.AccumuloExplainCommand,
new tools.export.AccumuloExportCommand,
new tools.export.AccumuloPlaybackCommand,
+ new tools.ingest.AccumuloBulkIngestCommand,
new tools.ingest.AccumuloDeleteFeaturesCommand,
new tools.ingest.AccumuloIngestCommand,
new tools.schema.AccumuloCreateSchemaCommand,
diff --git a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkIngestCommand.scala b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkIngestCommand.scala
new file mode 100644
index 000000000000..e6d2e7360811
--- /dev/null
+++ b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkIngestCommand.scala
@@ -0,0 +1,161 @@
+/***********************************************************************
+ * Copyright (c) 2013-2021 Commonwealth Computer Research, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Apache License, Version 2.0
+ * which accompanies this distribution and is available at
+ * http://www.opensource.org/licenses/apache2.0.php.
+ ***********************************************************************/
+
+package org.locationtech.geomesa.accumulo.tools.ingest
+
+import java.io.File
+
+import com.beust.jcommander.{Parameter, ParameterException, Parameters}
+import com.typesafe.config.Config
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.locationtech.geomesa.accumulo.data.AccumuloDataStore
+import org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.AccumuloBulkIngestParams
+import org.locationtech.geomesa.accumulo.tools.ingest.AccumuloIngestCommand.AccumuloIngestParams
+import org.locationtech.geomesa.index.conf.partition.TablePartition
+import org.locationtech.geomesa.jobs.JobResult.JobSuccess
+import org.locationtech.geomesa.jobs.accumulo.mapreduce.GeoMesaAccumuloFileOutputFormat
+import org.locationtech.geomesa.jobs.mapreduce.ConverterCombineInputFormat
+import org.locationtech.geomesa.jobs.{Awaitable, JobResult, StatusCallback}
+import org.locationtech.geomesa.tools.DistributedRunParam.RunModes
+import org.locationtech.geomesa.tools.DistributedRunParam.RunModes.RunMode
+import org.locationtech.geomesa.tools.ingest.IngestCommand.Inputs
+import org.locationtech.geomesa.tools.ingest._
+import org.locationtech.geomesa.tools.utils.Prompt
+import org.locationtech.geomesa.tools.{Command, OptionalCqlFilterParam, OptionalIndexParam, OutputPathParam}
+import org.locationtech.geomesa.utils.index.IndexMode
+import org.locationtech.geomesa.utils.io.fs.HadoopDelegate.HiddenFileFilter
+import org.opengis.feature.simple.SimpleFeatureType
+
+class AccumuloBulkIngestCommand extends AccumuloIngestCommand {
+
+ override val name = "bulk-ingest"
+ override val params = new AccumuloBulkIngestParams()
+
+ override protected def startIngest(
+ mode: RunMode,
+ ds: AccumuloDataStore,
+ sft: SimpleFeatureType,
+ converter: Config,
+ inputs: Inputs): Awaitable = {
+
+ val maxSplitSize =
+ if (!params.combineInputs) { None } else { Option(params.maxSplitSize).map(_.intValue()).orElse(Some(0)) }
+
+ // validate index param now that we have a datastore and the sft has been created
+ val index = params.loadIndex(ds, sft.getTypeName, IndexMode.Write).map(_.identifier)
+
+ // disable compaction since we're bulk loading
+ params.compact = false
+
+ val partitions = TablePartition(ds, sft).map { tp =>
+ if (params.cqlFilter == null) {
+ throw new ParameterException(
+ s"Schema '${sft.getTypeName}' is a partitioned store. In order to bulk load, the '--cql' parameter " +
+ "must be used to specify the range of the input data set")
+ }
+ tp.partitions(params.cqlFilter).filter(_.nonEmpty).getOrElse {
+ throw new ParameterException(
+ s"Partition filter does not correspond to partition scheme ${tp.getClass.getSimpleName}. Please specify " +
+ "a valid filter using the '--cql' parameter")
+ }
+ }
+
+ mode match {
+ case RunModes.Local =>
+ throw new IllegalArgumentException("Bulk ingest must be run in distributed mode")
+
+ case RunModes.Distributed =>
+ // file output format doesn't let you write to an existing directory
+ val output = new Path(params.outputPath)
+ val context = FileContext.getFileContext(output.toUri, new Configuration())
+ if (context.util.exists(output)) {
+ val warning = s"Output directory '$output' exists"
+ if (params.force) {
+ Command.user.warn(s"$warning - deleting it")
+ } else if (!Prompt.confirm(s"WARNING DATA MAY BE LOST: $warning. Delete it and continue (y/n)? ")) {
+ throw new ParameterException(s"Output directory '$output' exists")
+ }
+ context.delete(output, true)
+ }
+
+ Command.user.info(s"Running bulk ingestion in distributed ${if (params.combineInputs) "combine " else "" }mode")
+ new BulkConverterIngest(ds, connection, sft, converter, inputs.paths, params.outputPath, maxSplitSize,
+ index, partitions, libjarsFiles, libjarsPaths)
+
+ case _ =>
+ throw new NotImplementedError(s"Missing implementation for mode $mode")
+ }
+ }
+
+ class BulkConverterIngest(
+ ds: AccumuloDataStore,
+ dsParams: Map[String, String],
+ sft: SimpleFeatureType,
+ converterConfig: Config,
+ paths: Seq[String],
+ output: String,
+ maxSplitSize: Option[Int],
+ index: Option[String],
+ partitions: Option[Seq[String]],
+ libjarsFiles: Seq[String],
+ libjarsPaths: Iterator[() => Seq[File]]
+ ) extends ConverterIngestJob(dsParams, sft, converterConfig, paths, libjarsFiles, libjarsPaths) {
+
+ override def configureJob(job: Job): Unit = {
+ super.configureJob(job)
+ GeoMesaAccumuloFileOutputFormat.configure(job, ds, dsParams, sft, new Path(output), index, partitions)
+ maxSplitSize.foreach { max =>
+ job.setInputFormatClass(classOf[ConverterCombineInputFormat])
+ if (max > 0) {
+ FileInputFormat.setMaxInputSplitSize(job, max.toLong)
+ }
+ }
+ }
+
+ override def await(reporter: StatusCallback): JobResult = {
+ super.await(reporter).merge {
+ if (params.skipImport) {
+ Command.user.info("Skipping import of RFiles into Accumulo")
+ Some(JobSuccess(AccumuloBulkIngestCommand.ImportMessage, Map.empty))
+ } else {
+ Command.user.info("Importing RFiles into Accumulo")
+ val tableOps = ds.connector.tableOperations()
+ val filesPath = new Path(output, GeoMesaAccumuloFileOutputFormat.FilesPath)
+ val fc = FileContext.getFileContext(filesPath.toUri, new Configuration())
+ val files = fc.listLocatedStatus(filesPath)
+ while (files.hasNext) {
+ val file = files.next()
+ val path = file.getPath
+ val table = path.getName
+ if (file.isDirectory && HiddenFileFilter.accept(path) && tableOps.exists(table)) {
+ Command.user.info(s"Importing $table")
+ tableOps.importDirectory(path.toString).to(table).load()
+ }
+ }
+ Some(JobSuccess("", Map.empty))
+ }
+ }
+ }
+ }
+}
+
+object AccumuloBulkIngestCommand {
+
+ private val ImportMessage =
+ "\nFiles may be imported for each table through the Accumulo shell with the `importdirectory` command"
+
+ @Parameters(commandDescription = "Convert various file formats into bulk loaded Accumulo RFiles")
+ class AccumuloBulkIngestParams extends AccumuloIngestParams
+ with OutputPathParam with OptionalIndexParam with OptionalCqlFilterParam {
+ @Parameter(names = Array("--skip-import"), description = "Generate the files but skip the bulk import into Accumulo")
+ var skipImport: Boolean = false
+ }
+}
diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/CommonParams.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/CommonParams.scala
index 98a8c0989d0f..e7b6fffd0558 100644
--- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/CommonParams.scala
+++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/CommonParams.scala
@@ -11,6 +11,7 @@ package org.locationtech.geomesa.tools
import java.util
import java.util.regex.Pattern
+import com.beust.jcommander.validators.PositiveInteger
import com.beust.jcommander.{Parameter, ParameterException}
import org.locationtech.geomesa.convert.Modes.ErrorMode
import org.locationtech.geomesa.index.api.GeoMesaFeatureIndex
@@ -270,3 +271,11 @@ trait OutputPathParam {
@Parameter(names = Array("--output"), description = "Path to use for writing output", required = true)
var outputPath: String = _
}
+
+trait NumReducersParam {
+ @Parameter(
+ names = Array("--num-reducers"),
+ description = "Number of reducers to use when sorting or merging (for distributed jobs)",
+ validateWith = Array(classOf[PositiveInteger]))
+ var reducers: java.lang.Integer = _
+}
\ No newline at end of file
diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/export/ExportCommand.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/export/ExportCommand.scala
index 63edb9366bef..1fdf9a4163c2 100644
--- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/export/ExportCommand.scala
+++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/export/ExportCommand.scala
@@ -507,8 +507,8 @@ object ExportCommand extends LazyLogging {
/**
* Export parameters
*/
- trait ExportParams extends OptionalCqlFilterParam
- with QueryHintsParams with DistributedRunParam with TypeNameParam with OptionalForceParam {
+ trait ExportParams extends OptionalCqlFilterParam with QueryHintsParams
+ with DistributedRunParam with TypeNameParam with NumReducersParam with OptionalForceParam {
@Parameter(names = Array("-o", "--output"), description = "Output to a file instead of std out")
var file: String = _
@@ -550,12 +550,6 @@ object ExportCommand extends LazyLogging {
arity = 0)
var sortDescending: Boolean = false
- @Parameter(
- names = Array("--num-reducers"),
- description = "Number of reducers to use when sorting or merging (for distributed export)",
- validateWith = Array(classOf[PositiveInteger]))
- var reducers: java.lang.Integer = _
-
@Parameter(
names = Array("--chunk-size"),
description = "Split the output into multiple files, by specifying the rough number of bytes to store per file",
diff --git a/pom.xml b/pom.xml
index 2828fc64f19c..87bb8d893ac2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1875,6 +1875,12 @@
${accumulo.version}
provided
+
+ org.apache.accumulo
+ accumulo-hadoop-mapreduce
+ ${accumulo.version}
+ provided
+
org.apache.thrift
libthrift