Some demos of using Spark to write MySQL and Kafka data to data lake,such as Delta,Hudi,Iceberg.
SPARK
3.1.2
DELTA
1.0.0
HUDI
0.10.1
ICEBERG
0.12.1
MYSQL
8.0.24
use kafka.py to generate data.
kafka.bootstrap.servers
and subscribe
is necessary
import sparkSession.implicits._
val df: DataFrame = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaConsumer)
.option("subscribe", kafkaTopic)
.option("startingOffsets", startingOffsets)
.option("failOnDataLoss", failOnDataLoss)
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()
val frame: Dataset[Row] = df.select(from_json('value.cast("string"), schema) as "value").select($"value.*")
val query: StreamingQuery = frame
.writeStream
.format("delta")
.option("path", deltaPath)
.option("checkpointLocation", deltaPath)
.partitionBy(partitionColumns:_*)
.start()
val query: StreamingQuery = frame
.writeStream
.format("hudi")
.option(RECORDKEY_FIELD.key, "user_id")
.option(PRECOMBINE_FIELD.key, "user_id")
.option(PARTITIONPATH_FIELD.key(), partitionFields)
.option(HoodieWriteConfig.TBL_NAME.key, hoodieTableName)
.outputMode("append")
.option("path", lakePath)
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.ProcessingTime(10,TimeUnit.SECONDS))
.start()
create iceberg table first
//The table should be created in prior to start the streaming query.
//Refer SQL create table on Spark page to see how to create the Iceberg table.
sparkSession.sql("create table czl_iceberg.demo.kafka_spark_iceberg (" +
"user_id bigint, station_time string, score integer, local_time timestamp" +
") using iceberg")
val query: StreamingQuery = frame
.writeStream
.format("iceberg")
//Iceberg supports append and complete output modes:
//append: appends the rows of every micro-batch to the table
//complete: replaces the table contents every micro-batch
.outputMode("append")
.trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
.option("path", icebergPath)
.option("checkpointLocation", checkpointPath)
.start()
val readMysql: DataFrame = sparkSession
.read
.format("jdbc")
.option("url", url)
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("dbtable", tableName)
.option("user", username)
.option("password", password).load()
readMysql
.write
.format("delta")
.mode(SaveMode.Overwrite)
.partitionBy(partitionColumns:_*)
.save(path)
readMysql
.write
.format("hudi")
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "id")
.option(HoodieWriteConfig.TBL_NAME.key, hoodieTableName)
.mode(SaveMode.Overwrite)
.partitionBy(partitionColumns:_*)
.save(path)
readMysql
.writeTo(path)
.create()