Skip to content

Commit

Permalink
docs, remove compact param
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Jun 11, 2021
1 parent 691e0f7 commit 5bd5b57
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 10 deletions.
55 changes: 55 additions & 0 deletions docs/user/accumulo/commandline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,61 @@ Argument Description

For a description of index coverage, see :ref:`accumulo_attribute_indices`.

``bulk-ingest``
^^^^^^^^^^^^^^^

The bulk ingest command will ingest directly to Accumulo RFiles and then import the RFiles into Accumulo, bypassing
the normal write path. See `Bulk Ingest <https://accumulo.apache.org/docs/2.x/development/high_speed_ingest#bulk-ingest>`__
in the Accumulo documentation for additional details.

.. note::

Bulk ingest is currently only implemented for Accumulo 2.0.

The data to be ingested must be in the same distributed file system that Accumulo is using, and the ingest
must run in ``distributed`` mode as a map/reduce job.

In order to run efficiently, you should ensure that the data tables have appropriate splits, based on
your input. This will avoid creating extremely large files during the ingest, and will also prevent the cluster
from having to subsequently split the RFiles. See :ref:`table_split_config` for more information.

Note that some of the below options are inherited from the regular ``ingest`` command, but are not relevant
to bulk ingest. See :ref:`cli_ingest` for additional details on the available options.

========================== ==================================================================================================
Argument Description
========================== ==================================================================================================
``-c, --catalog *`` The catalog table containing schema metadata
``--output *`` The output directory used to write out RFiles
``-f, --feature-name`` The name of the schema
``-s, --spec`` The ``SimpleFeatureType`` specification to create
``-C, --converter`` The GeoMesa converter used to create ``SimpleFeature``\ s
``--converter-error-mode`` Override the error mode defined by the converter
``-q, --cql`` If using a partitioned store, a filter that covers the ingest data
``-t, --threads`` Number of parallel threads used
``--input-format`` Format of input files (csv, tsv, avro, shp, json, etc)
```--index`` Specify a particular GeoMesa index to write to, instead of all indices
``--no-tracking`` This application closes when ingest job is submitted. Useful for launching jobs with a script
``--run-mode`` Must be one of ``local`` or ``distributed`` (for map/reduce ingest)
``--split-max-size`` Maximum size of a split in bytes (distributed jobs)
``--src-list`` Input files are text files with lists of files, one per line, to ingest
``--skip-import`` Generate the RFiles but skip the bulk import into Accumulo
``--force`` Suppress any confirmation prompts
``<files>...`` Input files to ingest
========================== ==================================================================================================

The ``--output`` directory will be interpreted as a distributed file system path. If it already exists, the user will
be prompted to delete it before running the ingest.

The ``--cql`` parameter is required if using a partitioned schema (see :ref:`partitioned_indices` for details).
The filter must cover the partitions for all the input data, so that the partition tables can be
created appropriately. Any feature which doesn't match the filter or correspond to a an existing
table will fail to be ingested.

``--skip-import`` can be used to skip the import of the RFiles into Accumulo. The files can be imported later
through the ``importdirectory`` command in the Accumulo shell. Note that if ``--no-tracking`` is specified,
the import will be skipped regardless.

.. _compact_command:

``compact``
Expand Down
8 changes: 5 additions & 3 deletions docs/user/cli/ingest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ Argument Description
``--converter-error-mode`` Override the error mode defined by the converter
``-t, --threads`` Number of parallel threads used
``--input-format`` Format of input files (csv, tsv, avro, shp, json, etc)
```--index`` Specify a particular GeoMesa index to write to, instead of all indices
``--no-tracking`` This application closes when ingest job is submitted. Useful for launching jobs with a script
``--run-mode`` Must be one of ``local``, ``distributed``, or ``distributedcombine``
``--run-mode`` Must be one of ``local`` or ``distributed`` (for map/reduce ingest)
``--combine-inputs`` Combine multiple input files into a single input split (distributed jobs only)
``--split-max-size`` Maximum size of a split in bytes (distributed jobs)
``--src-list`` Input files are text files with lists of files, one per line, to ingest
``--force`` Suppress any confirmation prompts
Expand Down Expand Up @@ -93,8 +95,8 @@ still provide information about the status of the job submission.

The ``--run-mode`` argument can be used to run ingestion locally or distributed (using map/reduce). Note that in
order to run in distributed mode, the input files must be in HDFS. By default, input files on the local filesystem
will be ingested in local mode, and input files in HDFS will be ingested in distributed mode. If using the
``distributedcombine`` mode, multiple files will be processes by each mapper up to the limit specified by
will be ingested in local mode, and input files in HDFS will be ingested in distributed mode. The
``--combine-inputs`` flag can be used to process multiple files in each mapper up to the limit specified by
``--split-max-size``.

The ``--threads`` argument can be used to increase local ingest speed. However, there can not be more threads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,25 @@ import org.apache.hadoop.fs.{FileContext, Path}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore
import org.locationtech.geomesa.accumulo.tools.AccumuloDataStoreCommand.AccumuloDistributedCommand
import org.locationtech.geomesa.accumulo.tools.AccumuloDataStoreParams
import org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.AccumuloBulkIngestParams
import org.locationtech.geomesa.accumulo.tools.ingest.AccumuloIngestCommand.AccumuloIngestParams
import org.locationtech.geomesa.index.conf.partition.TablePartition
import org.locationtech.geomesa.jobs.JobResult.JobSuccess
import org.locationtech.geomesa.jobs.accumulo.mapreduce.GeoMesaAccumuloFileOutputFormat
import org.locationtech.geomesa.jobs.mapreduce.ConverterCombineInputFormat
import org.locationtech.geomesa.jobs.{Awaitable, JobResult, StatusCallback}
import org.locationtech.geomesa.tools.DistributedRunParam.RunModes
import org.locationtech.geomesa.tools.DistributedRunParam.RunModes.RunMode
import org.locationtech.geomesa.tools.ingest.IngestCommand.Inputs
import org.locationtech.geomesa.tools.ingest.IngestCommand.{IngestParams, Inputs}
import org.locationtech.geomesa.tools.ingest._
import org.locationtech.geomesa.tools.utils.Prompt
import org.locationtech.geomesa.tools.{Command, OptionalCqlFilterParam, OptionalIndexParam, OutputPathParam}
import org.locationtech.geomesa.utils.index.IndexMode
import org.locationtech.geomesa.utils.io.fs.HadoopDelegate.HiddenFileFilter
import org.opengis.feature.simple.SimpleFeatureType

class AccumuloBulkIngestCommand extends AccumuloIngestCommand {
class AccumuloBulkIngestCommand extends IngestCommand[AccumuloDataStore] with AccumuloDistributedCommand {

override val name = "bulk-ingest"
override val params = new AccumuloBulkIngestParams()
Expand All @@ -52,9 +53,6 @@ class AccumuloBulkIngestCommand extends AccumuloIngestCommand {
// validate index param now that we have a datastore and the sft has been created
val index = params.loadIndex(ds, sft.getTypeName, IndexMode.Write).map(_.identifier)

// disable compaction since we're bulk loading
params.compact = false

val partitions = TablePartition(ds, sft).map { tp =>
if (params.cqlFilter == null) {
throw new ParameterException(
Expand Down Expand Up @@ -153,7 +151,7 @@ object AccumuloBulkIngestCommand {
"\nFiles may be imported for each table through the Accumulo shell with the `importdirectory` command"

@Parameters(commandDescription = "Convert various file formats into bulk loaded Accumulo RFiles")
class AccumuloBulkIngestParams extends AccumuloIngestParams
class AccumuloBulkIngestParams extends IngestParams with AccumuloDataStoreParams
with OutputPathParam with OptionalIndexParam with OptionalCqlFilterParam {
@Parameter(names = Array("--skip-import"), description = "Generate the files but skip the bulk import into Accumulo")
var skipImport: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.locationtech.geomesa.tools.ingest.IngestCommand
import org.locationtech.geomesa.tools.ingest.IngestCommand.IngestParams

class AccumuloIngestCommand extends IngestCommand[AccumuloDataStore] with AccumuloDistributedCommand {

override val params = new AccumuloIngestParams()

override def execute(): Unit = {
Expand Down

0 comments on commit 5bd5b57

Please sign in to comment.