Skip to content

Commit

Permalink
Merge pull request #2 from homeaway/master
Browse files Browse the repository at this point in the history
Sync from master to fork
  • Loading branch information
markovarghese authored Mar 26, 2021
2 parents ee77025 + 91f8866 commit 044a9a8
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 21 deletions.
16 changes: 16 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# ref https://docs.github.com/en/github/using-git/configuring-git-to-handle-line-endings
# Set the default behavior, in case people don't have core.autocrlf set.
* text=auto

# Explicitly declare text files you want to always be normalized and converted
# to native line endings on checkout.
*.c text
*.h text

# Declare files that will always have CRLF line endings on checkout.
*.sln text eol=crlf
*.sh text eol=lf

# Denote all files that are truly binary and should not be modified.
*.png binary
*.jpg binary
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,22 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html)

## [0.1.25] - 2021-03-24
### changed
- Migration.scala - Fix port issue

## [0.1.24] - 2021-03-20
Added support for string event format for Kafka destinations
### Added
- .gitattributes - Force *.sh files to be handled with LF line endings regardless of OS, by git
### Changed
- core/src/main/scala/core/{DataFrameFromTo.scala, Migration.scala} - Added support for string event format for Kafka destinations
- core/src/main/resources/Samples/Input_Sample_s3_to_kafka.json - Updated example to use string format for kafka event key
- core/src/main/resources/Samples/Input_Json_Specification.json - Updated spec to support string event format for Kafka destinations
- manual-tests/filesystem_dataset_to_kafka/{README.md, datapull_input.json} - Updated manual test to test for string event format
- api/terraform/*/*.sh - changed line endings from CRLF to LF
- api/terraform/datapull_task/ecs_deploy.sh - removed `exit 0` from some previous debug accidentally committed

## [0.1.23] - 2021-03-14
### Added
- manual-tests/filesyste_dataset_to_kafka* - Added manual test instructions to test data movement from filesystem to kafka topic
Expand Down
1 change: 0 additions & 1 deletion api/terraform/datapull_task/ecs_deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ echo "Uploading core Jar to s3"
docker run --rm -v "$(pwd)":/data -v "${HOME}/.aws":"/root/.aws" amazon/aws-cli --profile ${AWS_PROFILE} s3 cp /data/target/DataMigrationFramework-1.0-SNAPSHOT-jar-with-dependencies.jar "$jar_file_path"

exitAfterFailure
exit 0

cd ../api/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,11 @@
"comment_keysubjectrecordname": "Optional, defaults to empty string. Used only with RecordNameStrategy, TopicRecordNameStrategy. Record Name to use",
"keysubjectrecordname": "somename",
"comment_keyschemaversion": "optional, version of the topic's key schema, if the topic already has a key schema. If this is not specified, the latest version of the key schema is picked. If there are no key schemas registered for the topic in schema registry, DataPull will register the schema of the keyfield to Schema Registry",
"keyschemaversion": "1"
"keyschemaversion": "1",
"comment_keyformat": "Optional, by default will be avro. Can also be string. If string is chosen, the key field should be of type string. If the field is a complex data type like a struct, you can use the to_json() spark sql command",
"keyformat": "avro",
"comment_valueformat": "Optional, by default will be avro. Can also be string. If string is chosen, the value field should be of type string. If the field is a complex data type like a struct, you can use the to_json() spark sql command",
"valueformat": "avro"
},
"source_influxdb": {
"platform": "influxdb",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"schemaregistries": "http://KAFKA_SCHEMA_REGISTRY:8082",
"topic": "TOPIC_NAME",
"keyfield": "some_keyfield",
"keyFormat": "avro"
"keyformat": "string"
},
"sql": {
"query": "select some_keyfield, struct(startIpNum, endIpNum, locId, isp, organization) as value from stage"
Expand Down
30 changes: 21 additions & 9 deletions core/src/main/scala/core/DataFrameFromTo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import scala.util.control.Breaks._

class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializable {
val helper = new Helper(appConfig)

def fileToDataFrame(filePath: String, fileFormat: String, delimiter: String, charset: String, mergeSchema: String, sparkSession: org.apache.spark.sql.SparkSession, isS3: Boolean, secretstore: String, isSFTP: Boolean, login: String, host: String, password: String, pemFilePath: String, awsEnv: String, vaultEnv: String): org.apache.spark.sql.DataFrame = {

if (filePath == null && fileFormat == null && delimiter == null && charset == null && mergeSchema == null && sparkSession == null && login == null && host == null && password == null) {
Expand Down Expand Up @@ -104,7 +105,7 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
format("com.springml.spark.sftp").
option("host", host).
option("username", login).
option( if (pemFilePath == "") "password" else "pem", if (pemFilePath == "") password else pemFilePath).
option(if (pemFilePath == "") "password" else "pem", if (pemFilePath == "") password else pemFilePath).
option("fileType", fileFormat).
load(filePath))

Expand Down Expand Up @@ -263,7 +264,7 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
format("com.springml.spark.sftp").
option("host", host).
option("username", login).
option( if (pemFilePath == "") "password" else "pem", if (pemFilePath == "") password else pemFilePath).
option(if (pemFilePath == "") "password" else "pem", if (pemFilePath == "") password else pemFilePath).
option("fileType", fileFormat).
save(filePath)

Expand Down Expand Up @@ -1168,14 +1169,25 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
trustStorePath: Option[String] = None,
keyStorePassword: Option[String] = None,
trustStorePassword: Option[String] = None,
keyPassword: Option[String] = None
keyPassword: Option[String] = None,
keyFormat: String,
valueFormat: String
): Unit = {

var dfavro = spark.emptyDataFrame
var columnsToSelect = Seq(to_avro(df.col(valueField), helper.GetToAvroConfig(topic = topic, schemaRegistryUrl = schemaRegistryUrl, dfColumn = df.col(valueField), schemaVersion = valueSchemaVersion, isKey = false, subjectNamingStrategy = valueSubjectNamingStrategy, subjectRecordName = valueSubjectRecordName, subjectRecordNamespace = valueSubjectRecordNamespace)) as 'value)
val valueFieldCol = df.col(valueField)
val valueAvroConfig = helper.GetToAvroConfig(topic = topic, schemaRegistryUrl = schemaRegistryUrl, dfColumn = valueFieldCol, schemaVersion = valueSchemaVersion, isKey = false, subjectNamingStrategy = valueSubjectNamingStrategy, subjectRecordName = valueSubjectRecordName, subjectRecordNamespace = valueSubjectRecordNamespace)
var columnsToSelect = Seq((valueFormat match {
case "avro" => to_avro(valueFieldCol, valueAvroConfig)
case _ => valueFieldCol
}) as 'value)
if (!keyField.isEmpty) {
val keyFieldCol = df.col(keyField.get)
columnsToSelect = columnsToSelect ++ Seq(to_avro(keyFieldCol, helper.GetToAvroConfig(topic = topic, schemaRegistryUrl = schemaRegistryUrl, dfColumn = keyFieldCol, schemaVersion = keySchemaVersion, isKey = true, subjectNamingStrategy = keySubjectNamingStrategy, subjectRecordName = keySubjectRecordName, subjectRecordNamespace = keySubjectRecordNamespace)) as 'key)
val keyAvroConfig = helper.GetToAvroConfig(topic = topic, schemaRegistryUrl = schemaRegistryUrl, dfColumn = keyFieldCol, schemaVersion = keySchemaVersion, isKey = true, subjectNamingStrategy = keySubjectNamingStrategy, subjectRecordName = keySubjectRecordName, subjectRecordNamespace = keySubjectRecordNamespace)
columnsToSelect = columnsToSelect ++ Seq((keyFormat match {
case "avro" => to_avro(keyFieldCol, keyAvroConfig)
case _ => keyFieldCol
}) as 'key)
}
if (!headerField.isEmpty) {
columnsToSelect = columnsToSelect ++ Seq(df.col(headerField.get) as 'header)
Expand Down Expand Up @@ -1208,7 +1220,7 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
else if (platform == "teradata") {
driver = "com.teradata.jdbc.TeraDriver"

url = helper.buildTeradataURI(server, database, if (port == null) None else Some(port.toInt),isWindowsAuthenticated)
url = helper.buildTeradataURI(server, database, if (port == null) None else Some(port.toInt), isWindowsAuthenticated)
}
} else {
if (platform == "mssql") {
Expand All @@ -1222,7 +1234,7 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
else if (platform == "teradata") {
driver = "com.teradata.jdbc.TeraDriver"

url = helper.buildTeradataURI(server, database, if (port == null) None else Some(port.toInt),isWindowsAuthenticated)
url = helper.buildTeradataURI(server, database, if (port == null) None else Some(port.toInt), isWindowsAuthenticated)

}

Expand Down Expand Up @@ -1311,7 +1323,7 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
driver = "com.teradata.jdbc.TeraDriver"

val helper = new Helper(appConfig)
url = helper.buildTeradataURI(server, database, if (port == null) None else Some(port.toInt),isWindowsAuthenticated)
url = helper.buildTeradataURI(server, database, if (port == null) None else Some(port.toInt), isWindowsAuthenticated)
dflocal = dflocal.coalesce(1) //to prevent locking, by ensuring only there is one writer per table
}

Expand Down Expand Up @@ -1416,7 +1428,7 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
driver = "com.teradata.jdbc.TeraDriver"

val helper = new Helper(appConfig)
url = helper.buildTeradataURI(server, database, if (port == null) None else Some(port.toInt),isWindowsAuthenticated )
url = helper.buildTeradataURI(server, database, if (port == null) None else Some(port.toInt), isWindowsAuthenticated)

}

Expand Down
Loading

0 comments on commit 044a9a8

Please sign in to comment.