Skip to content

Commit

Permalink
Feature dagster checks (#133)
Browse files Browse the repository at this point in the history
* Formated and refactored

* Added dagster features
  • Loading branch information
canimus authored Dec 29, 2023
1 parent 7418e78 commit e8b20a2
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 44 deletions.
10 changes: 5 additions & 5 deletions cuallee/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,23 +257,23 @@ def is_unique(self, column: str, pct: float = 1.0):
"""Validation for unique values in column"""
Rule("is_unique", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule
return self

def is_primary_key(self, column: str, pct: float = 1.0):
"""Validation for unique values in column"""
Rule("is_unique", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule
return self


def are_unique(self, column: Union[List[str], Tuple[str, str]], pct: float = 1.0):
"""Validation for unique values in a group of columns"""
Rule("are_unique", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule
return self

def is_composite_key(self, column: Union[List[str], Tuple[str, str]], pct: float = 1.0):

def is_composite_key(
self, column: Union[List[str], Tuple[str, str]], pct: float = 1.0
):
"""Validation for unique values in a group of columns"""
Rule("are_unique", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule
return self


def is_greater_than(self, column: str, value: float, pct: float = 1.0):
"""Validation for numeric greater than value"""
Expand Down
27 changes: 27 additions & 0 deletions cuallee/dagster/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from dagster import asset_check, AssetCheckResult
from cuallee import Check
import pandas as pd


def make_dagster_checks(check: Check, asset: str, data: pd.DataFrame):
rules = check.rules
results = check.validate(data)
checks = []
for item in results.itertuples():

@asset_check(name=f"{item.rule}", asset=asset)
def _check():
return AssetCheckResult(
passed=(item.status == "PASS"),
metadata={
"level" : item.level,
"rows": int(item.rows),
"column": item.column,
"value": str(item.value),
"violations": int(item.violations),
"pass_rate": item.pass_rate
},
)

checks.append(_check)
return checks
132 changes: 95 additions & 37 deletions cuallee/polars_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@ def _result(series: pl.Series) -> int:

@staticmethod
def _value(dataframe: pl.DataFrame):
return compose(first, first, list, operator.methodcaller("values"), operator.methodcaller("to_dict", as_series=False))(dataframe)

return compose(
first,
first,
list,
operator.methodcaller("values"),
operator.methodcaller("to_dict", as_series=False),
)(dataframe)

def is_complete(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
"""Validate not null"""
return Compute._result(
Expand Down Expand Up @@ -145,9 +151,7 @@ def is_between(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
low, high = rule.value
return Compute._result(
dataframe.select(
pl.col(rule.column)
.is_between(low, high, closed="both")
.cast(pl.Int8)
pl.col(rule.column).is_between(low, high, closed="both").cast(pl.Int8)
).sum()
)

Expand Down Expand Up @@ -197,14 +201,19 @@ def has_correlation(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, in

def satisfies(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
ctx = pl.SQLContext(cuallee=dataframe)
return ctx.execute(
'''
return (
ctx.execute(
"""
SELECT ({}) as total
FROM cuallee
'''.format(rule.value),
eager=True,
).cast(pl.Int8).sum()

""".format(
rule.value
),
eager=True,
)
.cast(pl.Int8)
.sum()
)

def has_entropy(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
def entropy(labels):
Expand All @@ -223,56 +232,81 @@ def entropy(labels):

return -np.sum(probs * np.log(probs)) / np.log(n_classes)

return entropy(dataframe.select(pl.col(rule.column)).to_series()) == float(rule.value)
return entropy(dataframe.select(pl.col(rule.column)).to_series()) == float(
rule.value
)

def is_on_weekday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return (
dataframe.select(pl.col(rule.column).dt.weekday().is_between(1, 5).cast(pl.Int8)).sum()
)
return dataframe.select(
pl.col(rule.column).dt.weekday().is_between(1, 5).cast(pl.Int8)
).sum()

def is_on_weekend(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return (
dataframe.select(pl.col(rule.column).dt.weekday().is_between(6, 7).cast(pl.Int8)).sum()
)
return dataframe.select(
pl.col(rule.column).dt.weekday().is_between(6, 7).cast(pl.Int8)
).sum()

def is_on_monday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return dataframe.select(pl.col(rule.column).dt.weekday().eq(1).cast(pl.Int8)).sum()
return dataframe.select(
pl.col(rule.column).dt.weekday().eq(1).cast(pl.Int8)
).sum()

def is_on_tuesday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return dataframe.select(pl.col(rule.column).dt.weekday().eq(2).cast(pl.Int8)).sum()
return dataframe.select(
pl.col(rule.column).dt.weekday().eq(2).cast(pl.Int8)
).sum()

def is_on_wednesday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return dataframe.select(pl.col(rule.column).dt.weekday().eq(3).cast(pl.Int8)).sum()
return dataframe.select(
pl.col(rule.column).dt.weekday().eq(3).cast(pl.Int8)
).sum()

def is_on_thursday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return dataframe.select(pl.col(rule.column).dt.weekday().eq(4).cast(pl.Int8)).sum()
return dataframe.select(
pl.col(rule.column).dt.weekday().eq(4).cast(pl.Int8)
).sum()

def is_on_friday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return dataframe.select(pl.col(rule.column).dt.weekday().eq(5).cast(pl.Int8)).sum()
return dataframe.select(
pl.col(rule.column).dt.weekday().eq(5).cast(pl.Int8)
).sum()

def is_on_saturday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return dataframe.select(pl.col(rule.column).dt.weekday().eq(6).cast(pl.Int8)).sum()
return dataframe.select(
pl.col(rule.column).dt.weekday().eq(6).cast(pl.Int8)
).sum()

def is_on_sunday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return dataframe.select(pl.col(rule.column).dt.weekday().eq(7).cast(pl.Int8)).sum()
return dataframe.select(
pl.col(rule.column).dt.weekday().eq(7).cast(pl.Int8)
).sum()

def is_on_schedule(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return (
dataframe.select(pl.col(rule.column).dt.hour().is_between(*rule.value).cast(pl.Int8)).sum()
)
return dataframe.select(
pl.col(rule.column).dt.hour().is_between(*rule.value).cast(pl.Int8)
).sum()

def is_daily(self, rule: Rule, dataframe: pl.DataFrame) -> complex:
if rule.value is None:
day_mask = [1, 2, 3, 4, 5]
else:
day_mask = rule.value


lower = self._value(dataframe.select(pl.col(rule.column)).min())
upper = self._value(dataframe.select(pl.col(rule.column)).max())
sequence = pl.DataFrame({"ts" : pl.date_range(start=lower, end=upper, interval="1d", eager=True)})
sequence = sequence.filter(pl.col("ts").dt.weekday().is_in(day_mask)).to_series().to_list()
delivery = dataframe.filter(pl.col(rule.column).dt.weekday().is_in(day_mask)).to_series().to_list()
sequence = pl.DataFrame(
{"ts": pl.date_range(start=lower, end=upper, interval="1d", eager=True)}
)
sequence = (
sequence.filter(pl.col("ts").dt.weekday().is_in(day_mask))
.to_series()
.to_list()
)
delivery = (
dataframe.filter(pl.col(rule.column).dt.weekday().is_in(day_mask))
.to_series()
.to_list()
)

# No difference between sequence of daily as a complex number
return complex(len(dataframe), len(set(sequence).difference(delivery)))
Expand All @@ -281,9 +315,21 @@ def is_inside_interquartile_range(
self, rule: Rule, dataframe: pl.DataFrame
) -> Union[bool, complex]:
min_q, max_q = rule.value
lower = self._value(dataframe.select(pl.col(rule.column).quantile(min_q, interpolation="linear")))
upper = self._value(dataframe.select(pl.col(rule.column).quantile(max_q, interpolation="linear")))
return dataframe.select(pl.col(rule.column).is_between(lower, upper)).cast(pl.Int8).sum()
lower = self._value(
dataframe.select(
pl.col(rule.column).quantile(min_q, interpolation="linear")
)
)
upper = self._value(
dataframe.select(
pl.col(rule.column).quantile(max_q, interpolation="linear")
)
)
return (
dataframe.select(pl.col(rule.column).is_between(lower, upper))
.cast(pl.Int8)
.sum()
)

def has_workflow(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
"""Compliance with adjacency matrix"""
Expand All @@ -292,9 +338,21 @@ def workflow(dataframe):
group, event, order = rule.column
groups = dataframe.partition_by(group)
interactions = []
_d = compose(list, operator.methodcaller("values"), operator.methodcaller("to_dict", as_series=False))
_d = compose(
list,
operator.methodcaller("values"),
operator.methodcaller("to_dict", as_series=False),
)
for g in groups:
pairs = list(zip(*_d(g.select(pl.col(event), pl.col(event).shift(-1).alias("target")))))
pairs = list(
zip(
*_d(
g.select(
pl.col(event), pl.col(event).shift(-1).alias("target")
)
)
)
)
if result := set(pairs).difference(rule.value):
for t in result:
interactions.append(t)
Expand Down
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "cuallee"
version = "0.6.0"
version = "0.6.1"
authors = [
{ name="Herminio Vazquez", email="[email protected]"},
{ name="Virginie Grosboillot", email="[email protected]" }
Expand All @@ -27,6 +27,10 @@ dependencies = [
]

[project.optional-dependencies]
dev = [
"black==23.9.1",
"ruff==0.0.290"
]
pyspark = [
"pyspark>=3.4.0"
]
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[metadata]
name = cuallee
version = 0.6.0
version = 0.6.1
[options]
packages = find:

0 comments on commit e8b20a2

Please sign in to comment.