Skip to content

Commit

Permalink
GEOMESA-3326 Ingest from stdin with converter inference (#3036)
Browse files Browse the repository at this point in the history
* Cache the bytes from the stdin input stream, allowing re-use of the stream for type inference
* Fix lat/lon column order in test data
* Add ThreadLocal hook for testing stdin
* Add srcList logic to convert command

Co-authored-by: Emilio Lahr-Vivaz <[email protected]>
  • Loading branch information
adeet1 and elahrvivaz authored Feb 27, 2024
1 parent e4a88cd commit 41b3197
Show file tree
Hide file tree
Showing 14 changed files with 296 additions and 112 deletions.
1 change: 1 addition & 0 deletions docs/user/cli/export.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Argument Description
``--hints`` Query hints used to modify the query
``--gzip`` Level of gzip compression to use for output, from 1-9
``--no-header`` Don't export the type header, for CSV and TSV formats
``--src-list`` Input files are text files with lists of files, one per line, to ingest
``--suppress-empty`` If no features are converted, don't write any headers or other output
``--force`` Force execution without prompt
========================== ===================================================================================
Expand Down
2 changes: 1 addition & 1 deletion docs/user/convert/function_usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ Here's some sample CSV data:

::

ID,Name,Age,LastSeen,Friends,Lat,Lon
ID,Name,Age,LastSeen,Friends,Lon,Lat
23623,Harry,20,2015-05-06,"Will, Mark, Suzan",-100.236523,23
26236,Hermione,25,2015-06-07,"Edward, Bill, Harry",40.232,-53.2356
3233,Severus,30,2015-10-23,"Tom, Riddle, Voldemort",3,-62.23
Expand Down
2 changes: 1 addition & 1 deletion docs/user/convert/usage_tools.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Given the following sample CSV file ``example.csv``:

::

ID,Name,Age,LastSeen,Friends,Lat,Lon
ID,Name,Age,LastSeen,Friends,Lon,Lat
23623,Harry,20,2015-05-06,"Will, Mark, Suzan",-100.236523,23
26236,Hermione,25,2015-06-07,"Edward, Bill, Harry",40.232,-53.2356
3233,Severus,30,2015-10-23,"Tom, Riddle, Voldemort",3,-62.23
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ID,Name,Age,LastSeen,Friends,Lat,Lon
ID,Name,Age,LastSeen,Friends,Lon,Lat
23623,Harry,20,2015-05-06,"Will, Mark, Suzan",-100.236523,23
26236,Hermione,25,2015-06-07,"Edward, Bill, Harry",40.232,-53.2356
3233,Severus,30,2015-10-23,"Tom, Riddle, Voldemort",3,-62.23
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
ID,Name,Age,LastSeen,Friends,Lat,Lon
ID,Name,Age,LastSeen,Friends,Lon,Lat
33623,Ron,20,2015-05-06,"Will, Mark, Suzan",-100.236523,23
36236,Ginny,25,2015-06-07,"Edward, Bill, Harry",40.232,-53.2356
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import org.locationtech.geomesa.accumulo.MiniCluster
import org.locationtech.geomesa.accumulo.tools.{AccumuloDataStoreCommand, AccumuloRunner}
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
import org.locationtech.geomesa.utils.io.WithClose
import org.locationtech.geomesa.utils.io.fs.LocalDelegate.StdInHandle
import org.specs2.mutable.Specification
import org.specs2.runner.JUnitRunner

import java.io.{ByteArrayInputStream, File}
import java.io.{BufferedInputStream, ByteArrayInputStream, File}
import java.util.concurrent.atomic.AtomicInteger

@RunWith(classOf[JUnitRunner])
Expand Down Expand Up @@ -81,58 +82,82 @@ class IngestCommandTest extends Specification {
}
}

"ingest from stdin" in {
val confFile = new File(getClass.getClassLoader.getResource("examples/example1-csv.conf").getFile)
"require sft or sft name to be specified during ingest from stdin with type inference" in {
val dataFile = WithClose(getClass.getClassLoader.getResourceAsStream("examples/example1.csv")) { in =>
IOUtils.toByteArray(in)
}
val input = new ByteArrayInputStream(dataFile)
val input = new BufferedInputStream(new ByteArrayInputStream(dataFile))

val args = baseArgs ++ Array("--converter", confFile.getPath, "-s", confFile.getPath, "-")
val args = baseArgs ++ Array("--force", "-")

val command = AccumuloRunner.parseCommand(args).asInstanceOf[AccumuloDataStoreCommand]

val in = System.in
in.synchronized {
try {
System.setIn(input)
command.execute()
} finally {
System.setIn(in)
StdInHandle.SystemIns.set(input)
try {
command.execute() must throwA[ParameterException].like { case e =>
e.getMessage mustEqual
"SimpleFeatureType name not specified. Please ensure the -f or --feature-name flag is set."
}
} finally {
StdInHandle.SystemIns.remove()
}
}

"ingest from stdin if no sft and converter are specified" in {
val dataFile = WithClose(getClass.getClassLoader.getResourceAsStream("examples/example1.csv")) { in =>
IOUtils.toByteArray(in)
}
val input = new BufferedInputStream(new ByteArrayInputStream(dataFile))

val sftName = "test"

val args = baseArgs ++ Array("--force", "-f", sftName, "-")

val command = AccumuloRunner.parseCommand(args).asInstanceOf[AccumuloDataStoreCommand]

StdInHandle.SystemIns.set(input)
try {
command.execute()
} finally {
StdInHandle.SystemIns.remove()
}

command.withDataStore { ds =>
try {
val features = SelfClosingIterator(ds.getFeatureSource("renegades").getFeatures.features).toList
val features = SelfClosingIterator(ds.getFeatureSource(sftName).getFeatures.features).toList
features.size mustEqual 3
features.map(_.getAttribute("name")) must containTheSameElementsAs(Seq("Hermione", "Harry", "Severus"))
features.map(_.getAttribute(1)) must containTheSameElementsAs(Seq("Hermione", "Harry", "Severus"))
} finally {
ds.delete()
}
}
}

"fail to ingest from stdin if no converter is specified" in {
"ingest from stdin" in {
val confFile = new File(getClass.getClassLoader.getResource("examples/example1-csv.conf").getFile)
val dataFile = WithClose(getClass.getClassLoader.getResourceAsStream("examples/example1.csv")) { in =>
IOUtils.toByteArray(in)
}
val input = new ByteArrayInputStream(dataFile)
val input = new BufferedInputStream(new ByteArrayInputStream(dataFile))

val args = baseArgs ++ Array("-s", confFile.getPath, "-")
val args = baseArgs ++ Array("--converter", confFile.getPath, "-s", confFile.getPath, "-")

val command = AccumuloRunner.parseCommand(args).asInstanceOf[AccumuloDataStoreCommand]

val in = System.in
in.synchronized {
StdInHandle.SystemIns.set(input)
try {
command.execute()
} finally {
StdInHandle.SystemIns.remove()
}

command.withDataStore { ds =>
try {
System.setIn(input)
command.execute() must throwA[ParameterException].like {
case e => e.getMessage mustEqual "Cannot infer types from stdin - please specify a converter/sft or ingest from a file"
}
val features = SelfClosingIterator(ds.getFeatureSource("renegades").getFeatures.features).toList
features.size mustEqual 3
features.map(_.getAttribute("name")) must containTheSameElementsAs(Seq("Hermione", "Harry", "Severus"))
} finally {
System.setIn(in)
ds.delete()
}
}
}
Expand All @@ -142,22 +167,19 @@ class IngestCommandTest extends Specification {
val dataFile = WithClose(getClass.getClassLoader.getResourceAsStream("examples/example1.csv")) { in =>
IOUtils.toByteArray(in)
}
val input = new ByteArrayInputStream(dataFile)
val input = new BufferedInputStream(new ByteArrayInputStream(dataFile))

val args = baseArgs ++ Array("--converter", confFile.getPath, "-")

val command = AccumuloRunner.parseCommand(args).asInstanceOf[AccumuloDataStoreCommand]

val in = System.in
in.synchronized {
try {
System.setIn(input)
command.execute() must throwA[ParameterException].like {
case e => e.getMessage mustEqual "SimpleFeatureType name and/or specification argument is required"
}
} finally {
System.setIn(in)
StdInHandle.SystemIns.set(input)
try {
command.execute() must throwA[ParameterException].like {
case e => e.getMessage mustEqual "SimpleFeatureType name and/or specification argument is required"
}
} finally {
StdInHandle.SystemIns.remove()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class DelimitedTextConverterFactory

import scala.collection.JavaConverters._

// @param path is unused in this class
override def infer(
is: InputStream,
sft: Option[SimpleFeatureType],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

package org.locationtech.geomesa.tools.export

import com.beust.jcommander.{ParameterException, Parameters}
import com.beust.jcommander.{Parameter, ParameterException, Parameters}
import com.typesafe.scalalogging.LazyLogging
import org.geotools.api.data.Query
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
Expand All @@ -19,14 +19,15 @@ import org.locationtech.geomesa.convert2.SimpleFeatureConverter
import org.locationtech.geomesa.index.planning.LocalQueryRunner
import org.locationtech.geomesa.index.planning.LocalQueryRunner.ArrowDictionaryHook
import org.locationtech.geomesa.index.stats.RunnableStats
import org.locationtech.geomesa.tools._
import org.locationtech.geomesa.tools.export.ConvertCommand.ConvertParameters
import org.locationtech.geomesa.tools.export.ExportCommand.{ChunkedExporter, ExportOptions, ExportParams, Exporter}
import org.locationtech.geomesa.tools.ingest.IngestCommand
import org.locationtech.geomesa.tools._
import org.locationtech.geomesa.tools.ingest.IngestCommand.Inputs
import org.locationtech.geomesa.utils.collection.CloseableIterator
import org.locationtech.geomesa.utils.io.WithClose
import org.locationtech.geomesa.utils.io.fs.FileSystemDelegate.FileHandle
import org.locationtech.geomesa.utils.io.fs.LocalDelegate.StdInHandle
import org.locationtech.geomesa.utils.io.{PathUtils, WithClose}
import org.locationtech.geomesa.utils.stats.{MethodProfiling, Stat}
import org.locationtech.geomesa.utils.text.TextTools.getPlural

Expand All @@ -48,18 +49,27 @@ class ConvertCommand extends Command with MethodProfiling with LazyLogging {

import scala.collection.JavaConverters._

if (params.files.isEmpty && !StdInHandle.isAvailable) {
throw new ParameterException("Missing option: <files>... is required")
val inputs: Inputs = {
val files = Inputs(params.files.asScala.toSeq)
if (files.stdin && !StdInHandle.isAvailable) {
if (files.paths.isEmpty) {
throw new ParameterException("Missing option: <files>... is required, or use `-` to ingest from standard in")
} else {
Command.user.info("Waiting for input...")
while (!StdInHandle.isAvailable) {
Thread.sleep(10)
}
}
}
if (params.srcList) { files.asSourceList } else { files }
}

val inputs = params.files.asScala
val format = IngestCommand.getDataFormat(params, inputs.toSeq)
val format = IngestCommand.getDataFormat(params, inputs.paths)

// use .get to re-throw the exception if we fail
IngestCommand.getSftAndConverter(params, inputs.toSeq, format, None).get.flatMap { case (sft: SimpleFeatureType, config: com.typesafe.config.Config) =>
val files = if (inputs.isEmpty) { StdInHandle.available().iterator } else {
inputs.iterator.flatMap(PathUtils.interpretPath)
}
IngestCommand.getSftAndConverter(params, inputs, format, None).get.flatMap { case (sft: SimpleFeatureType, config: com.typesafe.config.Config) =>
val files = inputs.handles.iterator

WithClose(SimpleFeatureConverter(sft, config)) { converter =>
val ec = converter.createEvaluationContext()
val query = ExportCommand.createQuery(sft, params)
Expand Down Expand Up @@ -152,5 +162,8 @@ object ConvertCommand extends LazyLogging {

@Parameters(commandDescription = "Convert files using GeoMesa's internal converter framework")
class ConvertParameters extends ExportParams with OptionalInputFormatParam with OptionalTypeNameParam
with OptionalFeatureSpecParam with ConverterConfigParam with OptionalForceParam
with OptionalFeatureSpecParam with ConverterConfigParam with OptionalForceParam {
@Parameter(names = Array("--src-list"), description = "Input files are text files with lists of files, one per line, to ingest.")
var srcList: Boolean = false
}
}
Loading

0 comments on commit 41b3197

Please sign in to comment.