Skip to content

Commit

Permalink
Revert "feat(stitch_marketo_whitelist): new hudi property: whitelist (#…
Browse files Browse the repository at this point in the history
…219)" (#222)

This reverts commit 7c8ea53.
  • Loading branch information
doronporat authored Sep 4, 2019
1 parent 7c8ea53 commit c1f9ab7
Showing 1 changed file with 2 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext
tableName: Option[String],
hivePartitions: Option[String],
extraOptions: Option[Map[String, String]],
alignToPreviousSchema: Option[Boolean],
whitelist: Option[String])
alignToPreviousSchema: Option[Boolean])

val hudiOutputProperties = HudiOutputProperties(
props.get("path").asInstanceOf[Option[String]],
Expand All @@ -35,8 +34,7 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext
props.get("tableName").asInstanceOf[Option[String]],
props.get("hivePartitions").asInstanceOf[Option[String]],
props.get("extraOptions").asInstanceOf[Option[Map[String, String]]],
props.get("alignToPreviousSchema").asInstanceOf[Option[Boolean]],
props.get("whitelist").asInstanceOf[Option[String]])
props.get("alignToPreviousSchema").asInstanceOf[Option[Boolean]])


// scalastyle:off cyclomatic.complexity
Expand All @@ -48,13 +46,6 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext
}
log.info(s"Starting to write dataframe to hudi")
var df = dataFrame

// Filter dataframe columns according to whitelist
df = this.hudiOutputProperties.whitelist match {
case Some(whitelist) => filterColumnsByWhitelist(df, whitelist)
case _ => df
}

// To support schema evolution all fields should be nullable
df = supportNullableFields(df)

Expand Down Expand Up @@ -272,17 +263,6 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext
df
}

def filterColumnsByWhitelist(dataFrame: DataFrame, whitelist: String) : DataFrame = {
whitelist.trim.nonEmpty match {
case true => {
val wlColumns = whitelist.split(",").map(_.trim).toList.map(x => col(x))
dataFrame.select(wlColumns :_*)
}
case false => dataFrame
}

}


}
// scalastyle:on cyclomatic.complexity
Expand Down

0 comments on commit c1f9ab7

Please sign in to comment.