diff --git a/datacontract/engines/soda/check_soda_execute.py b/datacontract/engines/soda/check_soda_execute.py index 0ce4f58b..04d2f24f 100644 --- a/datacontract/engines/soda/check_soda_execute.py +++ b/datacontract/engines/soda/check_soda_execute.py @@ -93,6 +93,7 @@ def check_soda_execute(run: Run, data_contract: DataContractSpecification, serve # Don't check types for csv format, as they are hard to detect server_type = server.type check_types = server.format != "json" and server.format != "csv" and server.format != "avro" + sodacl_yaml_str = to_sodacl_yaml(data_contract, server_type, check_types) # print("sodacl_yaml_str:\n" + sodacl_yaml_str) scan.add_sodacl_yaml_str(sodacl_yaml_str) diff --git a/datacontract/engines/soda/connections/duckdb.py b/datacontract/engines/soda/connections/duckdb.py index b819790b..b0951f60 100644 --- a/datacontract/engines/soda/connections/duckdb.py +++ b/datacontract/engines/soda/connections/duckdb.py @@ -2,6 +2,7 @@ import os import duckdb +from datacontract.export.csv_type_converter import convert_to_duckdb_csv_type def get_duckdb_connection(data_contract, server): @@ -12,7 +13,7 @@ def get_duckdb_connection(data_contract, server): if server.type == "s3": path = server.location setup_s3_connection(con, server) - for model_name in data_contract.models: + for model_name, model in data_contract.models.items(): model_path = path if "{model}" in model_path: model_path = model_path.format(model=model_name) @@ -32,12 +33,24 @@ def get_duckdb_connection(data_contract, server): CREATE VIEW "{model_name}" AS SELECT * FROM read_parquet('{model_path}', hive_partitioning=1); """) elif server.format == "csv": - con.sql(f""" - CREATE VIEW "{model_name}" AS SELECT * FROM read_csv_auto('{model_path}', hive_partitioning=1); - """) + columns = to_csv_types(model) + if columns is None: + con.sql(f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1);""") + else: + con.sql(f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1, columns={columns});""") return con +def to_csv_types(model) -> dict: + if model is None: + return None + columns = {} + # ['SQLNULL', 'BOOLEAN', 'BIGINT', 'DOUBLE', 'TIME', 'DATE', 'TIMESTAMP', 'VARCHAR'] + for field_name, field in model.fields.items(): + columns[field_name] = convert_to_duckdb_csv_type(field) + return columns + + def setup_s3_connection(con, server): s3_region = os.getenv("DATACONTRACT_S3_REGION") s3_access_key_id = os.getenv("DATACONTRACT_S3_ACCESS_KEY_ID") diff --git a/datacontract/export/csv_type_converter.py b/datacontract/export/csv_type_converter.py new file mode 100644 index 00000000..cc6f1d0b --- /dev/null +++ b/datacontract/export/csv_type_converter.py @@ -0,0 +1,38 @@ + +# https://duckdb.org/docs/data/csv/overview.html +# ['SQLNULL', 'BOOLEAN', 'BIGINT', 'DOUBLE', 'TIME', 'DATE', 'TIMESTAMP', 'VARCHAR'] +def convert_to_duckdb_csv_type(field) -> None | str: + type = field.type + if type is None: + return "VARCHAR" + if type.lower() in ["string", "varchar", "text"]: + return "VARCHAR" + if type.lower() in ["timestamp", "timestamp_tz"]: + return "TIMESTAMP" + if type.lower() in ["timestamp_ntz"]: + return "TIMESTAMP" + if type.lower() in ["date"]: + return "DATE" + if type.lower() in ["time"]: + return "TIME" + if type.lower() in ["number", "decimal", "numeric"]: + # precision and scale not supported by data contract + return "VARCHAR" + if type.lower() in ["float", "double"]: + return "DOUBLE" + if type.lower() in ["integer", "int", "long", "bigint"]: + return "BIGINT" + if type.lower() in ["boolean"]: + return "BOOLEAN" + if type.lower() in ["object", "record", "struct"]: + # not supported in CSV + return "VARCHAR" + if type.lower() in ["bytes"]: + # not supported in CSV + return "VARCHAR" + if type.lower() in ["array"]: + return "VARCHAR" + if type.lower() in ["null"]: + return "SQLNULL" + return "VARCHAR" + diff --git a/datacontract/export/sodacl_converter.py b/datacontract/export/sodacl_converter.py index 356bd1a5..5c4b61ac 100644 --- a/datacontract/export/sodacl_converter.py +++ b/datacontract/export/sodacl_converter.py @@ -5,7 +5,8 @@ DataContractSpecification -def to_sodacl_yaml(data_contract_spec: DataContractSpecification, server_type: str = None, check_types: bool = True) -> str: +def to_sodacl_yaml(data_contract_spec: DataContractSpecification, server_type: str = None, + check_types: bool = True) -> str: try: sodacl = {} for model_key, model_value in data_contract_spec.models.items(): @@ -33,6 +34,26 @@ def to_checks(model_key, model_value, server_type: str, check_types: bool): checks.append(check_field_required(field_name, quote_field_name)) if field.unique: checks.append(check_field_unique(field_name, quote_field_name)) + if field.minLength is not None: + checks.append(check_field_min_length(field_name, field.minLength)) + if field.maxLength is not None: + checks.append(check_field_max_length(field_name, field.maxLength)) + if field.minimum is not None: + checks.append(check_field_minimum(field_name, field.minimum)) + if field.maximum is not None: + checks.append(check_field_maximum(field_name, field.maximum)) + if field.exclusiveMinimum is not None: + checks.append(check_field_minimum(field_name, field.exclusiveMinimum)) + checks.append(check_field_not_equal(field_name, field.exclusiveMinimum)) + if field.exclusiveMaximum is not None: + checks.append(check_field_maximum(field_name, field.exclusiveMaximum)) + checks.append(check_field_not_equal(field_name, field.exclusiveMaximum)) + if field.pattern is not None: + checks.append(check_field_regex(field_name, field.pattern)) + if field.enum is not None and len(field.enum) > 0: + checks.append(check_field_enum(field_name, field.enum)) + # TODO references: str = None + # TODO format return f"checks for {model_key}", checks @@ -74,6 +95,82 @@ def check_field_unique(field_name, quote_field_name: bool = False): } +def check_field_min_length(field_name, min_length, quote_field_name: bool = False): + if quote_field_name: + field_name = f"\"{field_name}\"" + return { + f"invalid_count({field_name}) = 0": { + "name": f"Check that field {field_name} has a min length of {min}", + "valid min length": min_length + } + } + +def check_field_max_length(field_name, max_length, quote_field_name: bool = False): + if quote_field_name: + field_name = f"\"{field_name}\"" + return { + f"invalid_count({field_name}) = 0": { + "name": f"Check that field {field_name} has a max length of {max_length}", + "valid max length": max_length + } + } + + +def check_field_minimum(field_name, minimum, quote_field_name: bool = False): + if quote_field_name: + field_name = f"\"{field_name}\"" + return { + f"invalid_count({field_name}) = 0": { + "name": f"Check that field {field_name} has a minimum of {min}", + "valid min": minimum + } + } + + +def check_field_maximum(field_name, maximum, quote_field_name: bool = False): + if quote_field_name: + field_name = f"\"{field_name}\"" + return { + f"invalid_count({field_name}) = 0": { + "name": f"Check that field {field_name} has a maximum of {maximum}", + "valid max": maximum + } + } + +def check_field_not_equal(field_name, value, quote_field_name: bool = False): + if quote_field_name: + field_name = f"\"{field_name}\"" + return { + f"invalid_count({field_name}) = 0": { + "name": f"Check that field {field_name} is not equal to {value}", + "invalid values": [value] + } + } + + +def check_field_enum(field_name, enum, quote_field_name: bool = False): + if quote_field_name: + field_name = f"\"{field_name}\"" + return { + f"invalid_count({field_name}) = 0": { + "name": f"Check that field {field_name} only contains enum values {enum}", + "valid values": enum + } + } + + +def check_field_regex(field_name, pattern, quote_field_name: bool = False): + if quote_field_name: + field_name = f"\"{field_name}\"" + return { + f"invalid_count({field_name}) = 0": { + "name": f"Check that field {field_name} matches regex pattern {pattern}", + "valid regex": pattern + } + } + + + def add_quality_checks(sodacl, data_contract_spec): if data_contract_spec.quality is None: return diff --git a/datacontract/templates/datacontract.html b/datacontract/templates/datacontract.html index 8147a70e..884da143 100644 --- a/datacontract/templates/datacontract.html +++ b/datacontract/templates/datacontract.html @@ -4,7 +4,7 @@ Data Contract - + diff --git a/tests/fixtures/examples/datacontract_formats_valid.yaml b/tests/fixtures/examples/datacontract_formats_valid.yaml new file mode 100644 index 00000000..54410384 --- /dev/null +++ b/tests/fixtures/examples/datacontract_formats_valid.yaml @@ -0,0 +1,42 @@ +dataContractSpecification: 0.9.2 +id: "123" +info: + title: "Test" + version: 1.0.0 + owner: my-domain-team +models: + sample_model: + description: Sample Model + type: table + fields: + id: + type: text + required: true + primary: true + unique: true + title: ID + description: A unique identifier + minLength: 4 + maxLength: 5 + pattern: ^\d+ + enum: + - "1234" + - "22345" + model_date: + type: text + required: true + unique: false + title: Model Date + description: Model date + field_c: + type: integer + exclusiveMaximum: 22346 + +examples: + - type: csv + description: Sample Model Example + model: sample_model + data: |- + id,model_date,field_c + "1234","2023-09-09",2 + "22345","2023-09-09",22345 \ No newline at end of file diff --git a/tests/test_test_examples_formats_valid.py b/tests/test_test_examples_formats_valid.py new file mode 100644 index 00000000..a2378bbd --- /dev/null +++ b/tests/test_test_examples_formats_valid.py @@ -0,0 +1,17 @@ +import logging + +from typer.testing import CliRunner + +from datacontract.data_contract import DataContract + +runner = CliRunner() + +logging.basicConfig(level=logging.DEBUG, force=True) + + +def test_formats(): + data_contract = DataContract(data_contract_file="fixtures/examples/datacontract_formats_valid.yaml", examples=True) + run = data_contract.test() + print(run) + print(run.result) + assert run.result == "passed"