diff --git a/docs/user/accumulo/commandline.rst b/docs/user/accumulo/commandline.rst index c644b146bf37..b6ad4f5e7fa3 100644 --- a/docs/user/accumulo/commandline.rst +++ b/docs/user/accumulo/commandline.rst @@ -88,6 +88,61 @@ Argument Description For a description of index coverage, see :ref:`accumulo_attribute_indices`. +``bulk-ingest`` +^^^^^^^^^^^^^^^ + +The bulk ingest command will ingest directly to Accumulo RFiles and then import the RFiles into Accumulo, bypassing +the normal write path. See `Bulk Ingest `__ +in the Accumulo documentation for additional details. + +.. note:: + + Bulk ingest is currently only implemented for Accumulo 2.0. + +The data to be ingested must be in the same distributed file system that Accumulo is using, and the ingest +must run in ``distributed`` mode as a map/reduce job. + +In order to run efficiently, you should ensure that the data tables have appropriate splits, based on +your input. This will avoid creating extremely large files during the ingest, and will also prevent the cluster +from having to subsequently split the RFiles. See :ref:`table_split_config` for more information. + +Note that some of the below options are inherited from the regular ``ingest`` command, but are not relevant +to bulk ingest. See :ref:`cli_ingest` for additional details on the available options. + +========================== ================================================================================================== +Argument Description +========================== ================================================================================================== +``-c, --catalog *`` The catalog table containing schema metadata +``--output *`` The output directory used to write out RFiles +``-f, --feature-name`` The name of the schema +``-s, --spec`` The ``SimpleFeatureType`` specification to create +``-C, --converter`` The GeoMesa converter used to create ``SimpleFeature``\ s +``--converter-error-mode`` Override the error mode defined by the converter +``-q, --cql`` If using a partitioned store, a filter that covers the ingest data +``-t, --threads`` Number of parallel threads used +``--input-format`` Format of input files (csv, tsv, avro, shp, json, etc) +```--index`` Specify a particular GeoMesa index to write to, instead of all indices +``--no-tracking`` This application closes when ingest job is submitted. Useful for launching jobs with a script +``--run-mode`` Must be one of ``local`` or ``distributed`` (for map/reduce ingest) +``--split-max-size`` Maximum size of a split in bytes (distributed jobs) +``--src-list`` Input files are text files with lists of files, one per line, to ingest +``--skip-import`` Generate the RFiles but skip the bulk import into Accumulo +``--force`` Suppress any confirmation prompts +``...`` Input files to ingest +========================== ================================================================================================== + +The ``--output`` directory will be interpreted as a distributed file system path. If it already exists, the user will +be prompted to delete it before running the ingest. + +The ``--cql`` parameter is required if using a partitioned schema (see :ref:`partitioned_indices` for details). +The filter must cover the partitions for all the input data, so that the partition tables can be +created appropriately. Any feature which doesn't match the filter or correspond to a an existing +table will fail to be ingested. + +``--skip-import`` can be used to skip the import of the RFiles into Accumulo. The files can be imported later +through the ``importdirectory`` command in the Accumulo shell. Note that if ``--no-tracking`` is specified, +the import will be skipped regardless. + .. _compact_command: ``compact`` diff --git a/docs/user/cli/ingest.rst b/docs/user/cli/ingest.rst index 38c530527b22..c188faf932e9 100644 --- a/docs/user/cli/ingest.rst +++ b/docs/user/cli/ingest.rst @@ -41,8 +41,10 @@ Argument Description ``--converter-error-mode`` Override the error mode defined by the converter ``-t, --threads`` Number of parallel threads used ``--input-format`` Format of input files (csv, tsv, avro, shp, json, etc) +```--index`` Specify a particular GeoMesa index to write to, instead of all indices ``--no-tracking`` This application closes when ingest job is submitted. Useful for launching jobs with a script -``--run-mode`` Must be one of ``local``, ``distributed``, or ``distributedcombine`` +``--run-mode`` Must be one of ``local`` or ``distributed`` (for map/reduce ingest) +``--combine-inputs`` Combine multiple input files into a single input split (distributed jobs only) ``--split-max-size`` Maximum size of a split in bytes (distributed jobs) ``--src-list`` Input files are text files with lists of files, one per line, to ingest ``--force`` Suppress any confirmation prompts @@ -93,8 +95,8 @@ still provide information about the status of the job submission. The ``--run-mode`` argument can be used to run ingestion locally or distributed (using map/reduce). Note that in order to run in distributed mode, the input files must be in HDFS. By default, input files on the local filesystem -will be ingested in local mode, and input files in HDFS will be ingested in distributed mode. If using the -``distributedcombine`` mode, multiple files will be processes by each mapper up to the limit specified by +will be ingested in local mode, and input files in HDFS will be ingested in distributed mode. The +``--combine-inputs`` flag can be used to process multiple files in each mapper up to the limit specified by ``--split-max-size``. The ``--threads`` argument can be used to increase local ingest speed. However, there can not be more threads diff --git a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkIngestCommand.scala b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkIngestCommand.scala index e6d2e7360811..6520ec27803f 100644 --- a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkIngestCommand.scala +++ b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkIngestCommand.scala @@ -17,8 +17,9 @@ import org.apache.hadoop.fs.{FileContext, Path} import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.locationtech.geomesa.accumulo.data.AccumuloDataStore +import org.locationtech.geomesa.accumulo.tools.AccumuloDataStoreCommand.AccumuloDistributedCommand +import org.locationtech.geomesa.accumulo.tools.AccumuloDataStoreParams import org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.AccumuloBulkIngestParams -import org.locationtech.geomesa.accumulo.tools.ingest.AccumuloIngestCommand.AccumuloIngestParams import org.locationtech.geomesa.index.conf.partition.TablePartition import org.locationtech.geomesa.jobs.JobResult.JobSuccess import org.locationtech.geomesa.jobs.accumulo.mapreduce.GeoMesaAccumuloFileOutputFormat @@ -26,7 +27,7 @@ import org.locationtech.geomesa.jobs.mapreduce.ConverterCombineInputFormat import org.locationtech.geomesa.jobs.{Awaitable, JobResult, StatusCallback} import org.locationtech.geomesa.tools.DistributedRunParam.RunModes import org.locationtech.geomesa.tools.DistributedRunParam.RunModes.RunMode -import org.locationtech.geomesa.tools.ingest.IngestCommand.Inputs +import org.locationtech.geomesa.tools.ingest.IngestCommand.{IngestParams, Inputs} import org.locationtech.geomesa.tools.ingest._ import org.locationtech.geomesa.tools.utils.Prompt import org.locationtech.geomesa.tools.{Command, OptionalCqlFilterParam, OptionalIndexParam, OutputPathParam} @@ -34,7 +35,7 @@ import org.locationtech.geomesa.utils.index.IndexMode import org.locationtech.geomesa.utils.io.fs.HadoopDelegate.HiddenFileFilter import org.opengis.feature.simple.SimpleFeatureType -class AccumuloBulkIngestCommand extends AccumuloIngestCommand { +class AccumuloBulkIngestCommand extends IngestCommand[AccumuloDataStore] with AccumuloDistributedCommand { override val name = "bulk-ingest" override val params = new AccumuloBulkIngestParams() @@ -52,9 +53,6 @@ class AccumuloBulkIngestCommand extends AccumuloIngestCommand { // validate index param now that we have a datastore and the sft has been created val index = params.loadIndex(ds, sft.getTypeName, IndexMode.Write).map(_.identifier) - // disable compaction since we're bulk loading - params.compact = false - val partitions = TablePartition(ds, sft).map { tp => if (params.cqlFilter == null) { throw new ParameterException( @@ -153,7 +151,7 @@ object AccumuloBulkIngestCommand { "\nFiles may be imported for each table through the Accumulo shell with the `importdirectory` command" @Parameters(commandDescription = "Convert various file formats into bulk loaded Accumulo RFiles") - class AccumuloBulkIngestParams extends AccumuloIngestParams + class AccumuloBulkIngestParams extends IngestParams with AccumuloDataStoreParams with OutputPathParam with OptionalIndexParam with OptionalCqlFilterParam { @Parameter(names = Array("--skip-import"), description = "Generate the files but skip the bulk import into Accumulo") var skipImport: Boolean = false diff --git a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloIngestCommand.scala b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloIngestCommand.scala index b101c3b50b8f..fff0cf3c5efb 100644 --- a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloIngestCommand.scala +++ b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloIngestCommand.scala @@ -18,6 +18,7 @@ import org.locationtech.geomesa.tools.ingest.IngestCommand import org.locationtech.geomesa.tools.ingest.IngestCommand.IngestParams class AccumuloIngestCommand extends IngestCommand[AccumuloDataStore] with AccumuloDistributedCommand { + override val params = new AccumuloIngestParams() override def execute(): Unit = {