Skip to content

Commit

Permalink
feat(stitch_marketo_whitelist): new hudi property: whitelist (#219)
Browse files Browse the repository at this point in the history
  • Loading branch information
Irenez753 authored and doronporat committed Sep 4, 2019
1 parent 52fd264 commit 7c8ea53
Showing 1 changed file with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext
tableName: Option[String],
hivePartitions: Option[String],
extraOptions: Option[Map[String, String]],
alignToPreviousSchema: Option[Boolean])
alignToPreviousSchema: Option[Boolean],
whitelist: Option[String])

val hudiOutputProperties = HudiOutputProperties(
props.get("path").asInstanceOf[Option[String]],
Expand All @@ -34,7 +35,8 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext
props.get("tableName").asInstanceOf[Option[String]],
props.get("hivePartitions").asInstanceOf[Option[String]],
props.get("extraOptions").asInstanceOf[Option[Map[String, String]]],
props.get("alignToPreviousSchema").asInstanceOf[Option[Boolean]])
props.get("alignToPreviousSchema").asInstanceOf[Option[Boolean]],
props.get("whitelist").asInstanceOf[Option[String]])


// scalastyle:off cyclomatic.complexity
Expand All @@ -46,6 +48,13 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext
}
log.info(s"Starting to write dataframe to hudi")
var df = dataFrame

// Filter dataframe columns according to whitelist
df = this.hudiOutputProperties.whitelist match {
case Some(whitelist) => filterColumnsByWhitelist(df, whitelist)
case _ => df
}

// To support schema evolution all fields should be nullable
df = supportNullableFields(df)

Expand Down Expand Up @@ -263,6 +272,17 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext
df
}

def filterColumnsByWhitelist(dataFrame: DataFrame, whitelist: String) : DataFrame = {
whitelist.trim.nonEmpty match {
case true => {
val wlColumns = whitelist.split(",").map(_.trim).toList.map(x => col(x))
dataFrame.select(wlColumns :_*)
}
case false => dataFrame
}

}


}
// scalastyle:on cyclomatic.complexity
Expand Down

0 comments on commit 7c8ea53

Please sign in to comment.