Skip to content

Commit

Permalink
Refactor Kafka processing and Spark session initialization (datacontr…
Browse files Browse the repository at this point in the history
…act#149)

* Refactor Kafka processing and Spark session initialization
* ruff - linting changes
  • Loading branch information
mpearmain authored Apr 23, 2024
1 parent 03aa6ce commit f362a95
Show file tree
Hide file tree
Showing 37 changed files with 167 additions and 219 deletions.
3 changes: 1 addition & 2 deletions datacontract/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
from typing_extensions import Annotated

from datacontract.data_contract import DataContract
from datacontract.init.download_datacontract_file import \
download_datacontract_file, FileExistsException
from datacontract.init.download_datacontract_file import download_datacontract_file, FileExistsException

console = Console()

Expand Down
30 changes: 10 additions & 20 deletions datacontract/data_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,16 @@
import yaml
from pyspark.sql import SparkSession

from datacontract.breaking.breaking import models_breaking_changes, \
quality_breaking_changes
from datacontract.breaking.breaking import models_breaking_changes, quality_breaking_changes
from datacontract.engines.datacontract.check_that_datacontract_contains_valid_servers_configuration import (
check_that_datacontract_contains_valid_server_configuration,
)
from datacontract.engines.fastjsonschema.check_jsonschema import \
check_jsonschema
from datacontract.engines.fastjsonschema.check_jsonschema import check_jsonschema
from datacontract.engines.soda.check_soda_execute import check_soda_execute
from datacontract.export.avro_converter import to_avro_schema_json
from datacontract.export.avro_idl_converter import to_avro_idl
from datacontract.export.dbt_converter import to_dbt_models_yaml, \
to_dbt_sources_yaml, to_dbt_staging_sql
from datacontract.export.great_expectations_converter import \
to_great_expectations
from datacontract.export.dbt_converter import to_dbt_models_yaml, to_dbt_sources_yaml, to_dbt_staging_sql
from datacontract.export.great_expectations_converter import to_great_expectations
from datacontract.export.html_export import to_html
from datacontract.export.jsonschema_converter import to_jsonschema_json
from datacontract.export.odcs_converter import to_odcs_yaml
Expand All @@ -31,24 +27,18 @@
from datacontract.export.terraform_converter import to_terraform
from datacontract.imports.avro_importer import import_avro
from datacontract.imports.sql_importer import import_sql
from datacontract.integration.publish_datamesh_manager import \
publish_datamesh_manager
from datacontract.integration.publish_datamesh_manager import publish_datamesh_manager
from datacontract.integration.publish_opentelemetry import publish_opentelemetry
from datacontract.lint import resolve
from datacontract.lint.linters.description_linter import DescriptionLinter
from datacontract.lint.linters.example_model_linter import ExampleModelLinter
from datacontract.lint.linters.field_pattern_linter import FieldPatternLinter
from datacontract.lint.linters.field_reference_linter import \
FieldReferenceLinter
from datacontract.lint.linters.field_reference_linter import FieldReferenceLinter
from datacontract.lint.linters.notice_period_linter import NoticePeriodLinter
from datacontract.lint.linters.quality_schema_linter import \
QualityUsesSchemaLinter
from datacontract.lint.linters.valid_constraints_linter import \
ValidFieldConstraintsLinter
from datacontract.model.breaking_change import BreakingChanges, BreakingChange, \
Severity
from datacontract.model.data_contract_specification import \
DataContractSpecification, Server
from datacontract.lint.linters.quality_schema_linter import QualityUsesSchemaLinter
from datacontract.lint.linters.valid_constraints_linter import ValidFieldConstraintsLinter
from datacontract.model.breaking_change import BreakingChanges, BreakingChange, Severity
from datacontract.model.data_contract_specification import DataContractSpecification, Server
from datacontract.model.exceptions import DataContractException
from datacontract.model.run import Run, Check

Expand Down
3 changes: 1 addition & 2 deletions datacontract/engines/fastjsonschema/check_jsonschema.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@

from datacontract.engines.fastjsonschema.s3.s3_read_files import yield_s3_files
from datacontract.export.jsonschema_converter import to_jsonschema
from datacontract.model.data_contract_specification import \
DataContractSpecification, Server
from datacontract.model.data_contract_specification import DataContractSpecification, Server
from datacontract.model.exceptions import DataContractException
from datacontract.model.run import Run, Check

Expand Down
18 changes: 6 additions & 12 deletions datacontract/engines/soda/check_soda_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,14 @@
from pyspark.sql import SparkSession
from soda.scan import Scan

from datacontract.engines.soda.connections.bigquery import \
to_bigquery_soda_configuration
from datacontract.engines.soda.connections.databricks import \
to_databricks_soda_configuration
from datacontract.engines.soda.connections.bigquery import to_bigquery_soda_configuration
from datacontract.engines.soda.connections.databricks import to_databricks_soda_configuration
from datacontract.engines.soda.connections.duckdb import get_duckdb_connection
from datacontract.engines.soda.connections.kafka import create_spark_session, \
read_kafka_topic
from datacontract.engines.soda.connections.postgres import \
to_postgres_soda_configuration
from datacontract.engines.soda.connections.snowflake import \
to_snowflake_soda_configuration
from datacontract.engines.soda.connections.kafka import create_spark_session, read_kafka_topic
from datacontract.engines.soda.connections.postgres import to_postgres_soda_configuration
from datacontract.engines.soda.connections.snowflake import to_snowflake_soda_configuration
from datacontract.export.sodacl_converter import to_sodacl_yaml
from datacontract.model.data_contract_specification import \
DataContractSpecification, Server
from datacontract.model.data_contract_specification import DataContractSpecification, Server
from datacontract.model.run import Run, Check, Log


Expand Down
1 change: 0 additions & 1 deletion datacontract/engines/soda/connections/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,3 @@ def setup_azure_connection(con, server):
CLIENT_SECRET '{client_secret}'
);
""")

213 changes: 108 additions & 105 deletions datacontract/engines/soda/connections/kafka.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,36 @@
import os

import pyspark.sql.functions as fn
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, from_json
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import (
StructType,
DataType,
NullType,
ArrayType,
BinaryType,
DateType,
TimestampNTZType,
TimestampType,
BooleanType,
LongType,
IntegerType,
DoubleType,
DecimalType,
StringType,
StructField,
StringType,
DecimalType,
DoubleType,
IntegerType,
LongType,
BooleanType,
TimestampType,
TimestampNTZType,
DateType,
BinaryType,
ArrayType,
NullType,
DataType,
)

from datacontract.export.avro_converter import to_avro_schema_json
from datacontract.model.data_contract_specification import \
DataContractSpecification, Server, Field
from datacontract.model.data_contract_specification import DataContractSpecification, Server, Field
from datacontract.model.exceptions import DataContractException


def create_spark_session(tmp_dir) -> SparkSession:
# TODO: Update dependency versions when updating pyspark
# TODO: add protobuf library
def create_spark_session(tmp_dir: str) -> SparkSession:
"""Create and configure a Spark session."""
spark = (
SparkSession.builder.appName("datacontract")
.config("spark.sql.warehouse.dir", tmp_dir + "/spark-warehouse")
.config("spark.streaming.stopGracefullyOnShutdown", True)
.config("spark.sql.warehouse.dir", f"{tmp_dir}/spark-warehouse")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config(
"spark.jars.packages",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-avro_2.12:3.5.0",
Expand All @@ -47,106 +43,113 @@ def create_spark_session(tmp_dir) -> SparkSession:


def read_kafka_topic(spark: SparkSession, data_contract: DataContractSpecification, server: Server, tmp_dir):
host = server.host
topic = server.topic
auth_options = get_auth_options()

# read full kafka topic
"""Read and process data from a Kafka topic based on the server configuration."""
df = (
spark.read.format("kafka")
.options(**auth_options)
.option("kafka.bootstrap.servers", host)
.option("subscribe", topic)
.options(**get_auth_options())
.option("kafka.bootstrap.servers", server.host)
.option("subscribe", server.topic)
.option("startingOffsets", "earliest")
.load()
)
# TODO a warning if none or multiple models

model_name, model = next(iter(data_contract.models.items()))
if server.format == "avro":
avro_schema = to_avro_schema_json(model_name, model)

# Parse out the extra bytes from the Avro data
# A Kafka message contains a key and a value. Data going through a Kafka topic in Confluent Cloud has five bytes added to the beginning of every Avro value. If you are using Avro format keys, then five bytes will be added to the beginning of those as well. For this example, we’re assuming string keys. These bytes consist of one magic byte and four bytes representing the schema ID of the schema in the registry that is needed to decode that data. The bytes need to be removed so that the schema ID can be determined and the Avro data can be parsed. To manipulate the data, we need a couple of imports:
df2 = df.withColumn("fixedValue", fn.expr("substring(value, 6, length(value)-5)"))

options = {"mode": "PERMISSIVE"}
df3 = df2.select(from_avro(col("fixedValue"), avro_schema, options).alias("avro")).select(col("avro.*"))
elif server.format == "json":
# TODO A good warning when the conversion to json fails
struct_type = to_struct_type(model.fields)
df2 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

options = {"mode": "PERMISSIVE"}
df3 = df2.select(from_json(df2.value, struct_type, options).alias("json")).select(col("json.*"))
else:
raise DataContractException(
type="test",
name="Configuring Kafka checks",
result="warning",
reason=f"Kafka format '{server.format}' is not supported. Skip executing tests.",
engine="datacontract",
)

# df3.writeStream.toTable(model_name, checkpointLocation=tmp_dir + "/checkpoint")
df3.createOrReplaceTempView(model_name)
# print(spark.sql(f"select * from {model_name}").show())
match server.format:
case "avro":
process_avro_format(df, model_name, model)
case "json":
process_json_format(df, model_name, model)
case _:
raise DataContractException(
type="test",
name="Configuring Kafka checks",
result="warning",
reason=f"Kafka format '{server.format}' is not supported. " f"Skip executing tests.",
engine="datacontract",
)


def process_avro_format(df, model_name, model):
avro_schema = to_avro_schema_json(model_name, model)
df2 = df.withColumn("fixedValue", expr("substring(value, 6, length(value)-5)"))
options = {"mode": "PERMISSIVE"}
df2.select(from_avro(col("fixedValue"), avro_schema, options).alias("avro")).select(
col("avro.*")
).createOrReplaceTempView(model_name)


def process_json_format(df, model_name, model):
struct_type = to_struct_type(model.fields)
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").select(
from_json(col("value"), struct_type, {"mode": "PERMISSIVE"}).alias("json")
).select(col("json.*")).createOrReplaceTempView(model_name)


def get_auth_options():
"""Retrieve Kafka authentication options from environment variables."""
kafka_sasl_username = os.getenv("DATACONTRACT_KAFKA_SASL_USERNAME")
kafka_sasl_password = os.getenv("DATACONTRACT_KAFKA_SASL_PASSWORD")

if kafka_sasl_username is None:
auth_options = {}
else:
kafka_sasl_jaas_config = f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_sasl_username}" password="{kafka_sasl_password}";'
auth_options = {
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": kafka_sasl_jaas_config,
}
return auth_options
return {}

return {
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": (
f"org.apache.kafka.common.security.plain.PlainLoginModule required "
f'username="{kafka_sasl_username}" password="{kafka_sasl_password}";'
),
}


def to_struct_type(fields):
struct_fields = []
for field_name, field in fields.items():
struct_fields.append(to_struct_field(field_name, field))
return StructType(struct_fields)
"""Convert field definitions to Spark StructType."""
return StructType([to_struct_field(field_name, field) for field_name, field in fields.items()])


def to_struct_field(field_name: str, field: Field) -> StructField:
if field.type is None:
data_type = DataType()
if field.type in ["string", "varchar", "text"]:
data_type = StringType()
elif field.type in ["number", "decimal", "numeric"]:
data_type = DecimalType()
elif field.type in ["float", "double"]:
data_type = DoubleType()
elif field.type in ["integer", "int"]:
data_type = IntegerType()
elif field.type in ["long", "bigint"]:
data_type = LongType()
elif field.type in ["boolean"]:
data_type = BooleanType()
elif field.type in ["timestamp", "timestamp_tz"]:
data_type = TimestampType()
elif field.type in ["timestamp_ntz"]:
data_type = TimestampNTZType()
elif field.type in ["date"]:
data_type = DateType()
elif field.type in ["time"]:
data_type = DataType()
elif field.type in ["object", "record", "struct"]:
data_type = to_struct_type(field.fields)
elif field.type in ["binary"]:
data_type = BinaryType()
elif field.type in ["array"]:
# TODO support array structs
data_type = ArrayType()
elif field.type in ["null"]:
data_type = NullType()
else:
data_type = DataType()
"""Map field definitions to Spark StructField using match-case."""
match field.type:
case "string" | "varchar" | "text":
data_type = StringType()
case "number" | "decimal" | "numeric":
data_type = DecimalType()
case "float" | "double":
data_type = DoubleType()
case "integer" | "int":
data_type = IntegerType()
case "long" | "bigint":
data_type = LongType()
case "boolean":
data_type = BooleanType()
case "timestamp" | "timestamp_tz":
data_type = TimestampType()
case "timestamp_ntz":
data_type = TimestampNTZType()
case "date":
data_type = DateType()
case "time":
data_type = DataType() # Specific handling for time type
case "object" | "record" | "struct":
data_type = StructType(
[to_struct_field(sub_field_name, sub_field) for sub_field_name, sub_field in field.fields.items()]
)
case "binary":
data_type = BinaryType()
case "array":
element_type = (
StructType(
[to_struct_field(sub_field_name, sub_field) for sub_field_name, sub_field in field.fields.items()]
)
if field.fields
else DataType()
)
data_type = ArrayType(element_type)
case "null":
data_type = NullType()
case _:
data_type = DataType() # Fallback generic DataType

return StructField(field_name, data_type, nullable=not field.required)
3 changes: 1 addition & 2 deletions datacontract/export/avro_idl_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
from io import StringIO

from datacontract.lint.resolve import inline_definitions_into_data_contract
from datacontract.model.data_contract_specification import \
DataContractSpecification, Field
from datacontract.model.data_contract_specification import DataContractSpecification, Field
from datacontract.model.exceptions import DataContractException


Expand Down
3 changes: 1 addition & 2 deletions datacontract/export/dbt_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
import yaml

from datacontract.export.sql_type_converter import convert_to_sql_type
from datacontract.model.data_contract_specification import \
DataContractSpecification, Model, Field
from datacontract.model.data_contract_specification import DataContractSpecification, Model, Field


def to_dbt_models_yaml(data_contract_spec: DataContractSpecification):
Expand Down
3 changes: 1 addition & 2 deletions datacontract/export/great_expectations_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

import yaml

from datacontract.model.data_contract_specification import \
DataContractSpecification, Field, Quality
from datacontract.model.data_contract_specification import DataContractSpecification, Field, Quality


def to_great_expectations(data_contract_spec: DataContractSpecification, model_key: str) -> str:
Expand Down
Loading

0 comments on commit f362a95

Please sign in to comment.