From f685295aadd62ac8c23bbb39776ad0008398f328 Mon Sep 17 00:00:00 2001 From: Emilio Date: Thu, 3 Oct 2024 11:31:05 -0400 Subject: [PATCH] GEOMESA-3383 Accumulo - Bulk copy returns status (#3208) * Add redundant close check * Add flush to clone table op * Update LazyCloseable to use Closeable implicits --- .../geomesa/accumulo/util/SchemaCopier.scala | 159 ++++++++++++++---- .../ingest/AccumuloBulkCopyCommand.scala | 2 +- .../utils/concurrent/LazyCloseable.scala | 6 +- 3 files changed, 130 insertions(+), 37 deletions(-) diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/SchemaCopier.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/SchemaCopier.scala index d7c460990108..c4369d9f9c09 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/SchemaCopier.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/SchemaCopier.scala @@ -17,7 +17,7 @@ import org.apache.hadoop.tools.DistCp import org.geotools.api.data.DataStoreFinder import org.geotools.api.feature.simple.SimpleFeatureType import org.locationtech.geomesa.accumulo.data.{AccumuloDataStore, AccumuloDataStoreParams} -import org.locationtech.geomesa.accumulo.util.SchemaCopier.{Cluster, ClusterConfig, CopyOptions, PartitionId, PartitionName, PartitionValue} +import org.locationtech.geomesa.accumulo.util.SchemaCopier._ import org.locationtech.geomesa.features.ScalaSimpleFeature import org.locationtech.geomesa.index.api.GeoMesaFeatureIndex import org.locationtech.geomesa.index.conf.partition.TablePartition @@ -29,8 +29,8 @@ import org.locationtech.geomesa.utils.io.{CloseWithLogging, WithClose} import java.io.{Closeable, File, IOException} import java.nio.charset.StandardCharsets import java.util.Collections -import java.util.concurrent.atomic.AtomicBoolean -import scala.collection.mutable.ListBuffer +import java.util.concurrent.{Callable, ConcurrentHashMap} +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.util.Try import scala.util.control.NonFatal @@ -54,7 +54,7 @@ class SchemaCopier( indices: Seq[String], partitions: Seq[PartitionId], options: CopyOptions, - ) extends Runnable with Closeable with StrictLogging { + ) extends Callable[Set[CopyResult]] with Closeable with StrictLogging { import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType @@ -62,6 +62,7 @@ class SchemaCopier( private val tryFrom = Try(Cluster(fromCluster)) private val tryTo = Try(Cluster(toCluster)) + private var closed = false // note: all other class variables are lazy, so that we can instantiate an instance and then clean up connections on close() @@ -100,9 +101,9 @@ class SchemaCopier( sft } - lazy private val fromIndices = { + lazy private val indexPairs: Seq[(GeoMesaFeatureIndex[_, _], GeoMesaFeatureIndex[_, _])] = { val all = from.ds.manager.indices(sft) - if (indices.isEmpty) { all } else { + val fromIndices = if (indices.isEmpty) { all } else { val builder = Seq.newBuilder[GeoMesaFeatureIndex[_, _]] indices.foreach { ident => val filtered = all.filter(_.identifier.contains(ident)) @@ -115,6 +116,7 @@ class SchemaCopier( } builder.result.distinct } + fromIndices.map(from => from -> to.ds.manager.index(sft, from.identifier)) } // these get passed into our index method calls - for partitioned schemas, it must be a Seq[Some[_]], @@ -134,7 +136,7 @@ class SchemaCopier( } if (builder.isEmpty) { logger.debug("No partitions specified - loading all partitions from store") - builder ++= fromIndices.flatMap(_.getPartitions) + builder ++= indexPairs.flatMap(_._1.getPartitions) } builder.result.distinct.sorted.map(Option.apply) } else { @@ -145,38 +147,50 @@ class SchemaCopier( } } + // planned copies + lazy val plans: Set[CopyPlan] = + fromPartitions.flatMap { partition => + indexPairs.map { case (fromIndex, _) => + CopyPlan(fromIndex.identifier, partition) + } + }.toSet + /** * Execute the copy + * + * @return results */ - override def run(): Unit = run(false) + override def call(): Set[CopyResult] = call(false) /** * Execute the copy * * @param resume resume from a previously interrupted run, vs overwrite any existing output + * @return results */ - def run(resume: Boolean): Unit = { + def call(resume: Boolean): Set[CopyResult] = { + val results = Collections.newSetFromMap(new ConcurrentHashMap[CopyResult, java.lang.Boolean]()) CachedThreadPool.executor(options.tableConcurrency) { executor => fromPartitions.foreach { partition => - fromIndices.foreach { fromIndex => - val toIndex = to.ds.manager.index(sft, fromIndex.identifier) + indexPairs.foreach { case (fromIndex, toIndex) => val partitionLogId = s"${partition.fold(s"index")(p => s"partition $p")} ${fromIndex.identifier}" val runnable: Runnable = () => { - try { - logger.info(s"Copying $partitionLogId") - copy(fromIndex, toIndex, partition, resume, partitionLogId) - logger.info(s"Bulk copy complete for $partitionLogId") - } catch { - // catch Throwable so NoClassDefFound still gets logged - case e: Throwable => + logger.info(s"Copying $partitionLogId") + val result = copy(fromIndex, toIndex, partition, resume, partitionLogId) + result.error match { + case None => + logger.info(s"Bulk copy complete for $partitionLogId") + case Some(e) => logger.error(s"Error copying $partitionLogId: ${e.getMessage}") logger.debug(s"Error copying $partitionLogId", e) } + results.add(result) } executor.submit(runnable) } } } + results.asScala.toSet } /** @@ -187,19 +201,50 @@ class SchemaCopier( * @param partition partition name - must be Some if schema is partitioned * @param resume use any partial results from a previous run, if present * @param partitionLogId identifier for log messages + * @return result */ private def copy( fromIndex: GeoMesaFeatureIndex[_, _], toIndex: GeoMesaFeatureIndex[_, _], partition: Option[String], resume: Boolean, - partitionLogId: String): Unit = { + partitionLogId: String): CopyResult = { + val start = System.currentTimeMillis() + val files = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]()) + var fromTable: String = "" + // lazy so that table, files and finish time are filled in appropriately + lazy val result = + CopyResult(fromIndex.identifier, partition, fromTable, files.asScala.toSeq, None, start, System.currentTimeMillis()) + try { + fromTable = fromIndex.getTableName(partition) + copy(fromTable, toIndex, partition, resume, partitionLogId, files) + result + } catch { + // catch Throwable so NoClassDefFound still gets logged + case e: Throwable => result.withError(e) + } + } - require(sft.isPartitioned == partition.isDefined) // sanity check - this should always be true due to our setup + /** + * Copy a single index + partition + * + * @param fromTable from table + * @param toIndex to index + * @param partition partition name - must be Some if schema is partitioned + * @param resume use any partial results from a previous run, if present + * @param partitionLogId identifier for log messages + * @param fileResults set to hold files that we've copied successfully + * @return result + */ + private def copy( + fromTable: String, + toIndex: GeoMesaFeatureIndex[_, _], + partition: Option[String], + resume: Boolean, + partitionLogId: String, + fileResults: java.util.Set[String]): Unit = { - val fromTable = try { fromIndex.getTableName(partition) } catch { - case NonFatal(e) => throw new RuntimeException("Could not get source table", e) - } + require(sft.isPartitioned == partition.isDefined) // sanity check - this should always be true due to our setup val completeMarker = new Path(exportPath, s"$fromTable.complete") if (exportFs.exists(completeMarker)) { @@ -228,7 +273,7 @@ class SchemaCopier( logger.debug(s"Checking for existence and deleting any existing cloned table $cloneTable") from.ds.adapter.deleteTable(cloneTable) // no-op if table doesn't exist logger.debug(s"Cloning $fromTable to $cloneTable") - from.tableOps.clone(fromTable, cloneTable, false, Collections.emptyMap(), Collections.emptySet()) // use 2.0 method for compatibility + from.tableOps.clone(fromTable, cloneTable, true, Collections.emptyMap(), Collections.emptySet()) // use 2.0 method for compatibility logger.debug(s"Taking $cloneTable offline") from.tableOps.offline(cloneTable, true) } @@ -268,7 +313,7 @@ class SchemaCopier( to.tableOps.addSplits(toTable, splits) } - val hadCopyError = new AtomicBoolean(false) + val copyErrors = Collections.newSetFromMap(new ConcurrentHashMap[Throwable, java.lang.Boolean]()) // read the distcp.txt file produced by the table export // consumer: (src, dest) => Unit def distCpConsumer(threads: Int)(consumer: (Path, Path) => Unit): Unit = { @@ -289,7 +334,7 @@ class SchemaCopier( } catch { // catch Throwable so NoClassDefFound still gets logged case e: Throwable => - hadCopyError.set(true) + copyErrors.add(e) logger.error(s"Failed to copy $path to $copy: ${e.getMessage}") logger.debug(s"Failed to copy $path to $copy", e) } @@ -302,6 +347,7 @@ class SchemaCopier( if (options.distCp) { var inputPath = distcpPath + val distCpFiles = ArrayBuffer.empty[String] if (resume) { logger.debug(s"Checking copy status of files in $distcpPath") inputPath = new Path(tableExportPath, "distcp-remaining.txt") @@ -309,8 +355,14 @@ class SchemaCopier( distCpConsumer(1) { (path, _) => logger.debug(s"Adding $path to distcp") out.writeUTF(s"$path\n") + distCpFiles += path.getName } } + } else { + logger.debug(s"Checking file list at $distcpPath") + distCpConsumer(1) { (path, _) => + distCpFiles += path.getName + } } val job = new DistCp(from.conf, DistributedCopyOptions(inputPath, copyToDir)).execute() logger.info(s"Tracking available at ${job.getStatus.getTrackingUrl}") @@ -319,22 +371,28 @@ class SchemaCopier( } if (job.isSuccessful) { logger.info(s"Successfully copied data to $copyToDir") + fileResults.addAll(distCpFiles.asJava) } else { - hadCopyError.set(true) - logger.error(s"Job failed with state ${job.getStatus.getState} due to: ${job.getStatus.getFailureInfo}") + val msg = s"DistCp job failed with state ${job.getStatus.getState} due to: ${job.getStatus.getFailureInfo}" + copyErrors.add(new RuntimeException(msg)) + logger.error(msg) } } else { distCpConsumer(options.fileConcurrency) { (path, copy) => logger.debug(s"Copying $path to $copy") val fs = path.getFileSystem(from.conf) - if (!FileUtil.copy(fs, path, exportFs, copy, false, true, to.conf)) { + if (FileUtil.copy(fs, path, exportFs, copy, false, true, to.conf)) { + fileResults.add(path.getName) + } else { // consolidate error handling in the catch block throw new IOException(s"Failed to copy $path to $copy, copy returned false") } } } - if (hadCopyError.get) { - throw new RuntimeException("Error copying data files") + if (!copyErrors.isEmpty) { + val e = new RuntimeException("Error copying data files") + copyErrors.asScala.foreach(e.addSuppressed) + throw e } logger.debug(s"Loading rfiles from $copyToDir to $toTable") @@ -357,9 +415,12 @@ class SchemaCopier( from.tableOps.delete(cloneTable) } - override def close(): Unit = { - CloseWithLogging(tryFrom.toOption) - CloseWithLogging(tryTo.toOption) + override def close(): Unit = synchronized { + if (!closed) { + closed = true + CloseWithLogging(tryFrom.toOption) + CloseWithLogging(tryTo.toOption) + } } } @@ -395,6 +456,36 @@ object SchemaCopier { */ case class CopyOptions(tableConcurrency: Int = 1, fileConcurrency: Int = 4, distCp: Boolean = false) + /** + * Planned copy operations + * + * @param index index id planned to copy + * @param partition partition planned to copy + */ + case class CopyPlan(index: String, partition: Option[String]) + + /** + * Result of a copy operation + * + * @param index index id being copied + * @param partition partition being copied, if table is partitioned + * @param table table being copied + * @param files list of files that were successfully copied + * @param error error, if any + * @param start start of operation, in unix time + * @param finish end of operation, in unix time + */ + case class CopyResult( + index: String, + partition: Option[String], + table: String, + files: Seq[String], + error: Option[Throwable], + start: Long, + finish: Long) { + def withError(e: Throwable): CopyResult = copy(error = Option(e)) + } + /** * Holds state for a given Accumulo cluster * diff --git a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkCopyCommand.scala b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkCopyCommand.scala index 19daef7344c8..b89ff74d0371 100644 --- a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkCopyCommand.scala +++ b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkCopyCommand.scala @@ -48,7 +48,7 @@ class AccumuloBulkCopyCommand extends Command with StrictLogging { WithClose(new SchemaCopier(fromCluster, toCluster, params.featureName, params.exportPath, indices, partitions, opts)) { copier => try { - copier.run(params.resume) + copier.call(params.resume) } catch { case e: IllegalArgumentException => throw new ParameterException(e.getMessage) } diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/concurrent/LazyCloseable.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/concurrent/LazyCloseable.scala index 1cf0c4027e4f..5b28a556dad1 100644 --- a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/concurrent/LazyCloseable.scala +++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/concurrent/LazyCloseable.scala @@ -8,9 +8,11 @@ package org.locationtech.geomesa.utils.concurrent +import org.locationtech.geomesa.utils.io.IsCloseable + import java.io.Closeable -class LazyCloseable[T <: Closeable](create: => T) extends Closeable { +class LazyCloseable[T: IsCloseable](create: => T) extends Closeable { @volatile private var initialized = false @@ -22,7 +24,7 @@ class LazyCloseable[T <: Closeable](create: => T) extends Closeable { override def close(): Unit = { if (initialized) { - instance.close() + implicitly[IsCloseable[T]].close(instance).get } } }