From e8b20a2e1e1e1846730b6fe7970a1feb1b449705 Mon Sep 17 00:00:00 2001 From: Herminio Vazquez Date: Fri, 29 Dec 2023 14:49:04 +0100 Subject: [PATCH] Feature dagster checks (#133) * Formated and refactored * Added dagster features --- cuallee/__init__.py | 10 +-- cuallee/dagster/__init__.py | 27 +++++++ cuallee/polars_validation.py | 132 +++++++++++++++++++++++++---------- pyproject.toml | 6 +- setup.cfg | 2 +- 5 files changed, 133 insertions(+), 44 deletions(-) create mode 100644 cuallee/dagster/__init__.py diff --git a/cuallee/__init__.py b/cuallee/__init__.py index e13e67d4..555b2a2e 100644 --- a/cuallee/__init__.py +++ b/cuallee/__init__.py @@ -257,23 +257,23 @@ def is_unique(self, column: str, pct: float = 1.0): """Validation for unique values in column""" Rule("is_unique", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule return self - + def is_primary_key(self, column: str, pct: float = 1.0): """Validation for unique values in column""" Rule("is_unique", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule return self - def are_unique(self, column: Union[List[str], Tuple[str, str]], pct: float = 1.0): """Validation for unique values in a group of columns""" Rule("are_unique", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule return self - - def is_composite_key(self, column: Union[List[str], Tuple[str, str]], pct: float = 1.0): + + def is_composite_key( + self, column: Union[List[str], Tuple[str, str]], pct: float = 1.0 + ): """Validation for unique values in a group of columns""" Rule("are_unique", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule return self - def is_greater_than(self, column: str, value: float, pct: float = 1.0): """Validation for numeric greater than value""" diff --git a/cuallee/dagster/__init__.py b/cuallee/dagster/__init__.py new file mode 100644 index 00000000..761d651f --- /dev/null +++ b/cuallee/dagster/__init__.py @@ -0,0 +1,27 @@ +from dagster import asset_check, AssetCheckResult +from cuallee import Check +import pandas as pd + + +def make_dagster_checks(check: Check, asset: str, data: pd.DataFrame): + rules = check.rules + results = check.validate(data) + checks = [] + for item in results.itertuples(): + + @asset_check(name=f"{item.rule}", asset=asset) + def _check(): + return AssetCheckResult( + passed=(item.status == "PASS"), + metadata={ + "level" : item.level, + "rows": int(item.rows), + "column": item.column, + "value": str(item.value), + "violations": int(item.violations), + "pass_rate": item.pass_rate + }, + ) + + checks.append(_check) + return checks diff --git a/cuallee/polars_validation.py b/cuallee/polars_validation.py index de563ba4..233036c5 100644 --- a/cuallee/polars_validation.py +++ b/cuallee/polars_validation.py @@ -20,8 +20,14 @@ def _result(series: pl.Series) -> int: @staticmethod def _value(dataframe: pl.DataFrame): - return compose(first, first, list, operator.methodcaller("values"), operator.methodcaller("to_dict", as_series=False))(dataframe) - + return compose( + first, + first, + list, + operator.methodcaller("values"), + operator.methodcaller("to_dict", as_series=False), + )(dataframe) + def is_complete(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: """Validate not null""" return Compute._result( @@ -145,9 +151,7 @@ def is_between(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: low, high = rule.value return Compute._result( dataframe.select( - pl.col(rule.column) - .is_between(low, high, closed="both") - .cast(pl.Int8) + pl.col(rule.column).is_between(low, high, closed="both").cast(pl.Int8) ).sum() ) @@ -197,14 +201,19 @@ def has_correlation(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, in def satisfies(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: ctx = pl.SQLContext(cuallee=dataframe) - return ctx.execute( - ''' + return ( + ctx.execute( + """ SELECT ({}) as total FROM cuallee - '''.format(rule.value), - eager=True, - ).cast(pl.Int8).sum() - + """.format( + rule.value + ), + eager=True, + ) + .cast(pl.Int8) + .sum() + ) def has_entropy(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: def entropy(labels): @@ -223,43 +232,59 @@ def entropy(labels): return -np.sum(probs * np.log(probs)) / np.log(n_classes) - return entropy(dataframe.select(pl.col(rule.column)).to_series()) == float(rule.value) + return entropy(dataframe.select(pl.col(rule.column)).to_series()) == float( + rule.value + ) def is_on_weekday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: - return ( - dataframe.select(pl.col(rule.column).dt.weekday().is_between(1, 5).cast(pl.Int8)).sum() - ) + return dataframe.select( + pl.col(rule.column).dt.weekday().is_between(1, 5).cast(pl.Int8) + ).sum() def is_on_weekend(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: - return ( - dataframe.select(pl.col(rule.column).dt.weekday().is_between(6, 7).cast(pl.Int8)).sum() - ) + return dataframe.select( + pl.col(rule.column).dt.weekday().is_between(6, 7).cast(pl.Int8) + ).sum() def is_on_monday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: - return dataframe.select(pl.col(rule.column).dt.weekday().eq(1).cast(pl.Int8)).sum() + return dataframe.select( + pl.col(rule.column).dt.weekday().eq(1).cast(pl.Int8) + ).sum() def is_on_tuesday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: - return dataframe.select(pl.col(rule.column).dt.weekday().eq(2).cast(pl.Int8)).sum() + return dataframe.select( + pl.col(rule.column).dt.weekday().eq(2).cast(pl.Int8) + ).sum() def is_on_wednesday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: - return dataframe.select(pl.col(rule.column).dt.weekday().eq(3).cast(pl.Int8)).sum() + return dataframe.select( + pl.col(rule.column).dt.weekday().eq(3).cast(pl.Int8) + ).sum() def is_on_thursday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: - return dataframe.select(pl.col(rule.column).dt.weekday().eq(4).cast(pl.Int8)).sum() + return dataframe.select( + pl.col(rule.column).dt.weekday().eq(4).cast(pl.Int8) + ).sum() def is_on_friday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: - return dataframe.select(pl.col(rule.column).dt.weekday().eq(5).cast(pl.Int8)).sum() + return dataframe.select( + pl.col(rule.column).dt.weekday().eq(5).cast(pl.Int8) + ).sum() def is_on_saturday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: - return dataframe.select(pl.col(rule.column).dt.weekday().eq(6).cast(pl.Int8)).sum() + return dataframe.select( + pl.col(rule.column).dt.weekday().eq(6).cast(pl.Int8) + ).sum() def is_on_sunday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: - return dataframe.select(pl.col(rule.column).dt.weekday().eq(7).cast(pl.Int8)).sum() + return dataframe.select( + pl.col(rule.column).dt.weekday().eq(7).cast(pl.Int8) + ).sum() def is_on_schedule(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: - return ( - dataframe.select(pl.col(rule.column).dt.hour().is_between(*rule.value).cast(pl.Int8)).sum() - ) + return dataframe.select( + pl.col(rule.column).dt.hour().is_between(*rule.value).cast(pl.Int8) + ).sum() def is_daily(self, rule: Rule, dataframe: pl.DataFrame) -> complex: if rule.value is None: @@ -267,12 +292,21 @@ def is_daily(self, rule: Rule, dataframe: pl.DataFrame) -> complex: else: day_mask = rule.value - lower = self._value(dataframe.select(pl.col(rule.column)).min()) upper = self._value(dataframe.select(pl.col(rule.column)).max()) - sequence = pl.DataFrame({"ts" : pl.date_range(start=lower, end=upper, interval="1d", eager=True)}) - sequence = sequence.filter(pl.col("ts").dt.weekday().is_in(day_mask)).to_series().to_list() - delivery = dataframe.filter(pl.col(rule.column).dt.weekday().is_in(day_mask)).to_series().to_list() + sequence = pl.DataFrame( + {"ts": pl.date_range(start=lower, end=upper, interval="1d", eager=True)} + ) + sequence = ( + sequence.filter(pl.col("ts").dt.weekday().is_in(day_mask)) + .to_series() + .to_list() + ) + delivery = ( + dataframe.filter(pl.col(rule.column).dt.weekday().is_in(day_mask)) + .to_series() + .to_list() + ) # No difference between sequence of daily as a complex number return complex(len(dataframe), len(set(sequence).difference(delivery))) @@ -281,9 +315,21 @@ def is_inside_interquartile_range( self, rule: Rule, dataframe: pl.DataFrame ) -> Union[bool, complex]: min_q, max_q = rule.value - lower = self._value(dataframe.select(pl.col(rule.column).quantile(min_q, interpolation="linear"))) - upper = self._value(dataframe.select(pl.col(rule.column).quantile(max_q, interpolation="linear"))) - return dataframe.select(pl.col(rule.column).is_between(lower, upper)).cast(pl.Int8).sum() + lower = self._value( + dataframe.select( + pl.col(rule.column).quantile(min_q, interpolation="linear") + ) + ) + upper = self._value( + dataframe.select( + pl.col(rule.column).quantile(max_q, interpolation="linear") + ) + ) + return ( + dataframe.select(pl.col(rule.column).is_between(lower, upper)) + .cast(pl.Int8) + .sum() + ) def has_workflow(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: """Compliance with adjacency matrix""" @@ -292,9 +338,21 @@ def workflow(dataframe): group, event, order = rule.column groups = dataframe.partition_by(group) interactions = [] - _d = compose(list, operator.methodcaller("values"), operator.methodcaller("to_dict", as_series=False)) + _d = compose( + list, + operator.methodcaller("values"), + operator.methodcaller("to_dict", as_series=False), + ) for g in groups: - pairs = list(zip(*_d(g.select(pl.col(event), pl.col(event).shift(-1).alias("target"))))) + pairs = list( + zip( + *_d( + g.select( + pl.col(event), pl.col(event).shift(-1).alias("target") + ) + ) + ) + ) if result := set(pairs).difference(rule.value): for t in result: interactions.append(t) diff --git a/pyproject.toml b/pyproject.toml index 6ece9ca6..b608bca3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "cuallee" -version = "0.6.0" +version = "0.6.1" authors = [ { name="Herminio Vazquez", email="canimus@gmail.com"}, { name="Virginie Grosboillot", email="vestalisvirginis@gmail.com" } @@ -27,6 +27,10 @@ dependencies = [ ] [project.optional-dependencies] +dev = [ + "black==23.9.1", + "ruff==0.0.290" +] pyspark = [ "pyspark>=3.4.0" ] diff --git a/setup.cfg b/setup.cfg index 12f1fbd1..2758685a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [metadata] name = cuallee -version = 0.6.0 +version = 0.6.1 [options] packages = find: \ No newline at end of file