Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Python Wrapper #43

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/python/.idea
/.idea/
*.iml
#local spark context data from unit tests
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,12 @@ evaluation specs and results
The summary report is meant to be just that, a summary of the failed rules. This will return only the records that
failed and only the rules that failed for that record; thus, if the `summaryReport.isEmpty` then all rules passed.


## Python Wraper
The Python Wrapper allows users to validate data quality of their PySpark DataFrames using Python.

They Python Wrapper can be found under the directory `/python`. A quickstart notebook is also located under `/python/examples`.

## Next Steps
Clearly, this is just a start. This is a small package and, as such, a GREAT place to start if you've never
contributed to a project before. Please feel free to fork the repo and/or submit PRs. We'd love to see what
Expand Down
47 changes: 47 additions & 0 deletions python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Python Connector for the DataFrame Rules Engine
The Python Connector allows users to validate data quality of their PySpark DataFrames using Python.

```python
validation_results = RuleSet(df)
.add(myRules)
.validate()
```

Currently, the Python Connector supports the following Rule types:
1. List of Values (Strings _only_)
2. Boolean Check
3. User-defined Functions (must evaluate to a Boolean)


### Boolean Check
Validate that an column expression evaluates to True.
```python
# Ensure that the temperature is a valid reading
valid_temp_rule = Rule("valid_temperature", F.col("temperature") > -100.0)
```

### List of Values (LOVs)
Validate that a Column only contains values present in a List of Strings.

```python
# Create a List of Strings (LOS)
building_sites = ["SiteA", "SiteB", "SiteC"]

# Build a Rule that validates that a column only contains values from LOS
building_name_rule = Rule("Building_LOV_Rule",
column=F.col("site_name"),
valid_strings=building_sites)
```

### User-Defined Functions (UDFs)
UDFs are great when you need to add custom business logic for validating dataset quality.
You can use User-defined Functions with the DataFrame Rules Engine that return a Boolean value.

```python
# Create a UDF to validate date entry
def valid_date_udf(ts_column):
return ts_column.isNotNull() & F.year(ts_column).isNotNull() & F.month(ts_column).isNotNull()

# Create a Rule that uses the UDF to validate data
valid_date_rule = Rule("valid_date_reading", valid_date_udf(F.col("reading_date")))
```
GeekSheikh marked this conversation as resolved.
Show resolved Hide resolved
Binary file not shown.
Binary file added python/dist/dataframe_rules_engine-0.0.1.tar.gz
GeekSheikh marked this conversation as resolved.
Show resolved Hide resolved
Binary file not shown.
60 changes: 60 additions & 0 deletions python/examples/01-Generate Mock Purchase Transactions.py
GeekSheikh marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Databricks notebook source
catalog_name = "REPLACE_ME"
schema_name = "REPLACE_ME"

# COMMAND ----------

import random
import datetime

def generate_sample_data():
"""Generates mock transaction data that randomly adds bad data"""

# randomly generate bad data
if bool(random.getrandbits(1)):
appl_id = None
acct_no = None
event_ts = None
cstone_last_updatetm = None
else:
appl_id = random.randint(1000000, 9999999)
acct_no = random.randint(1000000000000000, 9999999999999999)
event_ts = datetime.datetime.now()
cstone_last_updatetm = datetime.datetime.now()

# randomly generate an MCC description
categories = ["dining", "transportation", "merchendise", "hotels", "airfare", "grocery stores/supermarkets/bakeries"]
random_index = random.randint(0, len(categories)-1)
category = categories[random_index]

# randomly generate a transaction price
price = round(random.uniform(1.99, 9999.99), 2)

data = [
(appl_id, acct_no, event_ts, category, price, cstone_last_updatetm)
]
df = spark.createDataFrame(data,
"appl_id int, acct_no long, event_ts timestamp, category string, price float, cstone_last_updatetm timestamp")
return df

# COMMAND ----------

spark.sql(f"create schema if not exists {catalog_name}.{schema_name}")

# COMMAND ----------

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog_name}.{schema_name}.purchase_transactions_bronze
(appl_id int, acct_no long, event_ts timestamp, category string, price float, cstone_last_updatetm timestamp)
USING DELTA
TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# COMMAND ----------

df = generate_sample_data()
df.write.insertInto(f"{catalog_name}.{schema_name}.purchase_transactions_bronze")

# COMMAND ----------


152 changes: 152 additions & 0 deletions python/examples/02-Apply Purchase Transaction Rules.py
GeekSheikh marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Databricks notebook source
# MAGIC %run ./PythonWrapper

# COMMAND ----------

# MAGIC %md
# MAGIC # Ingest new Data

# COMMAND ----------

import datetime

starting_time = datetime.datetime.now() - datetime.timedelta(minutes=5)

catalog_name = "REPLACE_ME"
schema_name = "REPLACE_ME"

# COMMAND ----------

# Read table changes from 5 mins ago
df = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", starting_time) \
.table(f"{catalog_name}.{schema_name}.purchase_transactions_bronze")
purchase_transactions_df = df.select("appl_id", "acct_no", "event_ts", "category", "price", "cstone_last_updatetm")\
.where("_change_type='insert'")
purchase_transactions_df.display()

# COMMAND ----------

# MAGIC %md
# MAGIC # Define Rules using Builder Pattern

# COMMAND ----------

# MAGIC %md
# MAGIC ## Sample Rules
# MAGIC
# MAGIC From a DQ rule point of view, we would be looking at following scenarios:
# MAGIC
# MAGIC - **event_ts**: Should have a timestamp for every day (timestamp format doesn’t matter)
# MAGIC - **cstone_last_updatetm**: Should have a timestamp for every day
# MAGIC - **acct_no**: No null values for this column
# MAGIC - **appl_id**: No null values for this column
# MAGIC - **Changes in string length** - for all columns
# MAGIC

# COMMAND ----------

import pyspark.sql.functions as F

# First, begin by defining your RuleSet by passing in your input DataFrame
myRuleSet = RuleSet(purchase_transactions_df)

# Rule 1 - define a Rule that validates that the `acct_no` is never null
acct_num_rule = Rule("valid_acct_no_rule", F.col("acct_no").isNotNull())
myRuleSet.add(acct_num_rule)

# Rule 2 - add a Rule that validates that the `appl_id` is never null
appl_id_rule = Rule("valid_appl_id", F.col("appl_id").isNotNull())
myRuleSet.add(appl_id_rule)

# COMMAND ----------

# Rules can even be used in conjunction with User-Defined Functions
def valid_timestamp(ts_column):
return ts_column.isNotNull() & F.year(ts_column).isNotNull() & F.month(ts_column).isNotNull()

# COMMAND ----------

# Rule 3 - enforce a valid `event_ts` timestamp
valid_event_ts_rule = Rule("valid_event_ts_rule", valid_timestamp(F.col("event_ts")))
myRuleSet.add(valid_event_ts_rule)

# Rule 4 - enforce a valid `cstone_last_updatetm` timestamp
valid_cstone_last_updatetm_rule = Rule("valid_cstone_last_updatetm_rule", valid_timestamp(F.col("cstone_last_updatetm")))
myRuleSet.add(valid_cstone_last_updatetm_rule)

# COMMAND ----------

# Rule 5 - validate string lengths
import pyspark.sql.functions as F
import datetime

starting_timestamp = datetime.datetime.now() - datetime.timedelta(minutes=5)
ending_timestamp = datetime.datetime.now() - datetime.timedelta(minutes=1)

# Read table changes from 5 mins ago
df = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table(f"{catalog_name}.{schema_name}.purchase_transactions_bronze")
df_category = df.select("category").where("_change_type='insert'").agg(F.mean(F.length(F.col("category"))).alias("avg_category_len"))
avg_category_len = df_category.collect()[0]['avg_category_len']
print(avg_category_len)

# COMMAND ----------

def valid_category_len(category_column, avg_category_str_len):
return F.length(category_column) <= avg_category_str_len

# Rule 5 - validate `category` string lengths
valid_str_length_rule = Rule("valid_category_str_length_rule", valid_category_len(F.col("category"), avg_category_len))
myRuleSet.add(valid_str_length_rule)

# COMMAND ----------

# MAGIC %md
# MAGIC # Validate Rows

# COMMAND ----------

from pyspark.sql import DataFrame

# Finally, add the Rule to the RuleSet and validate!
summaryReport = myRuleSet.get_summary_report()
completeReport = myRuleSet.get_complete_report()

# Display the summary validation report
display(summaryReport)

# COMMAND ----------

# Display the complete validation report
display(completeReport)

# COMMAND ----------

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog_name}.{schema_name}.purchase_transactions_validated
(appl_id int, acct_no long, event_ts timestamp, category string, price float, cstone_last_updatetm timestamp, failed_rules array<string>)
USING DELTA
TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# COMMAND ----------

import pyspark.sql.functions as F
import pyspark.sql.types as T

if summaryReport.count() > 0:
summaryReport.write.insertInto(f"{catalog_name}.{schema_name}.purchase_transactions_validated")
else:
string_array_type = T.ArrayType(T.StringType())
purchase_transactions_df \
.withColumn("failed_rules", F.array(F.array().cast(string_array_type))) \
.write.insertInto(f"{catalog_name}.{schema_name}.purchase_transactions_validated")

# COMMAND ----------


Loading
Loading