From 7c8ea53323eb769c0a9ee34141978781502e95b3 Mon Sep 17 00:00:00 2001 From: Irena Reznikov Date: Wed, 4 Sep 2019 10:25:22 +0300 Subject: [PATCH] feat(stitch_marketo_whitelist): new hudi property: whitelist (#219) --- .../writers/file/HudiOutputWriter.scala | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/yotpo/metorikku/output/writers/file/HudiOutputWriter.scala b/src/main/scala/com/yotpo/metorikku/output/writers/file/HudiOutputWriter.scala index 89160a32b..6fa6f2453 100644 --- a/src/main/scala/com/yotpo/metorikku/output/writers/file/HudiOutputWriter.scala +++ b/src/main/scala/com/yotpo/metorikku/output/writers/file/HudiOutputWriter.scala @@ -23,7 +23,8 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext tableName: Option[String], hivePartitions: Option[String], extraOptions: Option[Map[String, String]], - alignToPreviousSchema: Option[Boolean]) + alignToPreviousSchema: Option[Boolean], + whitelist: Option[String]) val hudiOutputProperties = HudiOutputProperties( props.get("path").asInstanceOf[Option[String]], @@ -34,7 +35,8 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext props.get("tableName").asInstanceOf[Option[String]], props.get("hivePartitions").asInstanceOf[Option[String]], props.get("extraOptions").asInstanceOf[Option[Map[String, String]]], - props.get("alignToPreviousSchema").asInstanceOf[Option[Boolean]]) + props.get("alignToPreviousSchema").asInstanceOf[Option[Boolean]], + props.get("whitelist").asInstanceOf[Option[String]]) // scalastyle:off cyclomatic.complexity @@ -46,6 +48,13 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext } log.info(s"Starting to write dataframe to hudi") var df = dataFrame + + // Filter dataframe columns according to whitelist + df = this.hudiOutputProperties.whitelist match { + case Some(whitelist) => filterColumnsByWhitelist(df, whitelist) + case _ => df + } + // To support schema evolution all fields should be nullable df = supportNullableFields(df) @@ -263,6 +272,17 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext df } + def filterColumnsByWhitelist(dataFrame: DataFrame, whitelist: String) : DataFrame = { + whitelist.trim.nonEmpty match { + case true => { + val wlColumns = whitelist.split(",").map(_.trim).toList.map(x => col(x)) + dataFrame.select(wlColumns :_*) + } + case false => dataFrame + } + + } + } // scalastyle:on cyclomatic.complexity