From 86da2f5161d533c456654413b02d3befd9ceff76 Mon Sep 17 00:00:00 2001 From: Emilio Lahr-Vivaz Date: Fri, 19 Jan 2024 15:29:18 +0000 Subject: [PATCH] GEOMESA-3325 Fix Avro inferred ingest for list/map types --- .../convert/avro/AvroConverterFactory.scala | 25 +++++++++++-------- .../geomesa/convert/avro/AvroPath.scala | 24 +++++++++++++----- .../convert/avro/AvroConverterTest.scala | 14 ++++++++--- 3 files changed, 43 insertions(+), 20 deletions(-) diff --git a/geomesa-convert/geomesa-convert-avro/src/main/scala/org/locationtech/geomesa/convert/avro/AvroConverterFactory.scala b/geomesa-convert/geomesa-convert-avro/src/main/scala/org/locationtech/geomesa/convert/avro/AvroConverterFactory.scala index e8a1e29614a4..d784af78d733 100644 --- a/geomesa-convert/geomesa-convert-avro/src/main/scala/org/locationtech/geomesa/convert/avro/AvroConverterFactory.scala +++ b/geomesa-convert/geomesa-convert-avro/src/main/scala/org/locationtech/geomesa/convert/avro/AvroConverterFactory.scala @@ -177,15 +177,8 @@ object AvroConverterFactory { } // checks for nested array/map types we can handle - def isSimple: Boolean = field.schema().getFields.asScala.map(_.schema().getType).forall { - case Schema.Type.STRING => true - case Schema.Type.INT => true - case Schema.Type.LONG => true - case Schema.Type.FLOAT => true - case Schema.Type.DOUBLE => true - case Schema.Type.BOOLEAN => true - case _ => false - } + def isSimpleArray: Boolean = isSimple(field.schema().getElementType) + def isSimpleMap: Boolean = isSimple(field.schema().getValueType) val transform = FunctionTransform("avroPath(", s",'$path/${field.name}')") field.schema().getType match { @@ -196,8 +189,8 @@ object AvroConverterFactory { case Schema.Type.FLOAT => types += InferredType(name, ObjectType.FLOAT, transform) case Schema.Type.DOUBLE => types += InferredType(name, ObjectType.DOUBLE, transform) case Schema.Type.BOOLEAN => types += InferredType(name, ObjectType.BOOLEAN, transform) - case Schema.Type.ARRAY => if (isSimple) { types += InferredType(name, ObjectType.LIST, transform) } - case Schema.Type.MAP => if (isSimple) { types += InferredType(name, ObjectType.MAP, transform) } + case Schema.Type.ARRAY => if (isSimpleArray) { types += InferredType(name, ObjectType.LIST, transform) } + case Schema.Type.MAP => if (isSimpleMap) { types += InferredType(name, ObjectType.MAP, transform) } case Schema.Type.FIXED => types += InferredType(name, ObjectType.BYTES, transform) case Schema.Type.ENUM => types += InferredType(name, ObjectType.STRING, transform.copy(suffix = transform.suffix + "::string")) case Schema.Type.UNION => types += InferredType(name, ObjectType.STRING, transform.copy(suffix = transform.suffix + "::string")) @@ -214,6 +207,16 @@ object AvroConverterFactory { types.toSeq } + private def isSimple(s: Schema): Boolean = s.getType match { + case Schema.Type.STRING => true + case Schema.Type.INT => true + case Schema.Type.LONG => true + case Schema.Type.FLOAT => true + case Schema.Type.DOUBLE => true + case Schema.Type.BOOLEAN => true + case _ => false + } + object AvroConfigConvert extends ConverterConfigConvert[AvroConfig] with OptionConvert { override protected def decodeConfig( diff --git a/geomesa-convert/geomesa-convert-avro/src/main/scala/org/locationtech/geomesa/convert/avro/AvroPath.scala b/geomesa-convert/geomesa-convert-avro/src/main/scala/org/locationtech/geomesa/convert/avro/AvroPath.scala index 1fe6fc142c24..f177bd661764 100644 --- a/geomesa-convert/geomesa-convert-avro/src/main/scala/org/locationtech/geomesa/convert/avro/AvroPath.scala +++ b/geomesa-convert/geomesa-convert-avro/src/main/scala/org/locationtech/geomesa/convert/avro/AvroPath.scala @@ -40,12 +40,13 @@ object AvroPath extends BasicParser { private def convert(record: Any): Any = { record match { - case x: Utf8 => x.toString - case x: ByteBuffer => convertBytes(x) - case x: GenericFixed => x.bytes() - case x: GenericEnumSymbol[_] => x.toString - case x: GenericArray[Any] => convertList(x) - case x => x + case x: Utf8 => x.toString + case x: ByteBuffer => convertBytes(x) + case x: GenericFixed => x.bytes() + case x: GenericEnumSymbol[_] => x.toString + case x: GenericArray[Any] => convertList(x) + case x: java.util.Map[String, Any] => convertMap(x) + case x => x } } @@ -67,6 +68,17 @@ object AvroPath extends BasicParser { result } + // note: maps get re-used, so we need to copy to a new structure + private def convertMap(map: java.util.Map[String, Any]): java.util.Map[String, Any] = { + val result = new java.util.HashMap[String, Any](map.size()) + val iter = map.entrySet().iterator() + while (iter.hasNext) { + val entry = iter.next + result.put(convert(entry.getKey).asInstanceOf[String], convert(entry.getValue)) + } + result + } + case class PathExpr(field: String, predicate: AvroPredicate) extends AvroPath { override def eval(record: Any): Option[Any] = { record match { diff --git a/geomesa-convert/geomesa-convert-avro/src/test/scala/org/locationtech/geomesa/convert/avro/AvroConverterTest.scala b/geomesa-convert/geomesa-convert-avro/src/test/scala/org/locationtech/geomesa/convert/avro/AvroConverterTest.scala index 0d4423e08ee6..0e57c2c36757 100644 --- a/geomesa-convert/geomesa-convert-avro/src/test/scala/org/locationtech/geomesa/convert/avro/AvroConverterTest.scala +++ b/geomesa-convert/geomesa-convert-avro/src/test/scala/org/locationtech/geomesa/convert/avro/AvroConverterTest.scala @@ -27,6 +27,8 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File} @RunWith(classOf[JUnitRunner]) class AvroConverterTest extends Specification with AvroUtils with LazyLogging { + import scala.collection.JavaConverters._ + sequential val sft = SimpleFeatureTypes.createType(ConfigFactory.load("sft_testsft.conf")) @@ -247,6 +249,8 @@ class AvroConverterTest extends Specification with AvroUtils with LazyLogging { | { "name": "lat", "type": "double" }, | { "name": "lon", "type": "double" }, | { "name": "label", "type": [ "string", "null" ] }, + | { "name": "list", "type": { "type": "array", "items": "string" }}, + { "name": "map", "type": { "type": "map", "values": "int" }}, | { "name": "props", | "type": { | "name": "properties", @@ -271,6 +275,9 @@ class AvroConverterTest extends Specification with AvroUtils with LazyLogging { rec.put("lat", 40d + i) rec.put("lon", 50d + i) rec.put("label", s"name$i") + val list = new GenericData.Array[String](schema.getField("list").schema(), Seq(s"$i", s"${i+1}").asJava) + rec.put("list", list) + rec.put("map", Map[String, Int]("one" -> i, "two" -> {i + 1}).asJava) val props = new GenericData.Record(schema.getField("props").schema()) props.put("age", i) props.put("weight", 10f + i) @@ -287,7 +294,7 @@ class AvroConverterTest extends Specification with AvroUtils with LazyLogging { inferred must beSome val expectedSft = SimpleFeatureTypes.createType(inferred.get._1.getTypeName, - "lat:Double,lon:Double,label:String,age:Int,weight:Float,*geom:Point:srid=4326") + "lat:Double,lon:Double,label:String,list:List[String],map:Map[String,Int],age:Int,weight:Float,*geom:Point:srid=4326") inferred.get._1 mustEqual expectedSft logger.trace(inferred.get._2.root().render(ConfigRenderOptions.concise().setFormatted(true))) @@ -299,8 +306,9 @@ class AvroConverterTest extends Specification with AvroUtils with LazyLogging { converted must not(beEmpty) val expected = Seq.tabulate(10) { i => - ScalaSimpleFeature.create(expectedSft, s"$i", 40d + i, 50d + i, s"name$i", i, 10f + i, - s"POINT (${ 50d + i } ${ 40d + i })") + ScalaSimpleFeature.create(expectedSft, s"$i", + 40d + i, 50d + i, s"name$i", Seq(s"$i", s"${i+1}").asJava, + Map[String, Int]("one" -> i, "two" -> {i + 1}).asJava, i, 10f + i, s"POINT (${ 50d + i } ${ 40d + i })") } // note: feature ids won't be the same