diff --git a/README.md b/README.md index 0a1a2bcc..0d427226 100644 --- a/README.md +++ b/README.md @@ -286,8 +286,8 @@ Supported formats: - parquet - json - csv +- delta - iceberg (coming soon) -- delta (coming soon) Feel free to create an [issue](https://github.com/datacontract/datacontract-cli/issues), if you need support for an additional type and formats. @@ -295,7 +295,9 @@ Feel free to create an [issue](https://github.com/datacontract/datacontract-cli/ Data Contract CLI can test data that is stored in S3 buckets or any S3-compliant endpoints in various formats. -#### Example +#### Examples + +##### JSON datacontract.yaml ```yaml @@ -308,6 +310,18 @@ servers: delimiter: new_line # new_line, array, or none ``` +##### Delta Tables + +datacontract.yaml +```yaml +servers: + production: + type: s3 + endpointUrl: https://minio.example.com # not needed with AWS S3 + location: s3://bucket-name/path/table.delta # path to the Delta table folder containing parquet data files and the _delta_log + format: delta +``` + #### Environment Variables | Environment Variable | Example | Description | diff --git a/datacontract/engines/soda/check_soda_execute.py b/datacontract/engines/soda/check_soda_execute.py index 220fc9b4..d88a04a2 100644 --- a/datacontract/engines/soda/check_soda_execute.py +++ b/datacontract/engines/soda/check_soda_execute.py @@ -25,7 +25,7 @@ def check_soda_execute( scan = Scan() if server.type in ["s3", "azure", "local"]: - if server.format in ["json", "parquet", "csv"]: + if server.format in ["json", "parquet", "csv", "delta"]: con = get_duckdb_connection(data_contract, server) scan.add_duckdb_connection(duckdb_connection=con, data_source_name=server.type) scan.set_data_source_name(server.type) diff --git a/datacontract/engines/soda/connections/duckdb.py b/datacontract/engines/soda/connections/duckdb.py index 81502434..54fa4136 100644 --- a/datacontract/engines/soda/connections/duckdb.py +++ b/datacontract/engines/soda/connections/duckdb.py @@ -4,6 +4,8 @@ import duckdb from datacontract.export.csv_type_converter import convert_to_duckdb_csv_type +from deltalake import DeltaTable + def get_duckdb_connection(data_contract, server): con = duckdb.connect(database=":memory:") @@ -45,6 +47,21 @@ def get_duckdb_connection(data_contract, server): con.sql( f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1, columns={columns});""" ) + elif server.format == "delta": + if server.type == "azure": + raise NotImplementedError("Support for Delta Tables on Azure Storage is not implemented yet") + + storage_options = { + "AWS_ENDPOINT_URL": server.endpointUrl, + "AWS_ACCESS_KEY_ID": os.getenv("DATACONTRACT_S3_ACCESS_KEY_ID"), + "AWS_SECRET_ACCESS_KEY": os.getenv("DATACONTRACT_S3_SECRET_ACCESS_KEY"), + "AWS_REGION": os.getenv("DATACONTRACT_S3_REGION", "us-east-1"), + "AWS_ALLOW_HTTP": "True" if server.endpointUrl.startswith("http://") else "False", + } + + delta_table_arrow = DeltaTable(model_path, storage_options=storage_options).to_pyarrow_dataset() + + con.register(model_name, delta_table_arrow) return con diff --git a/pyproject.toml b/pyproject.toml index cf0ff27b..630a2900 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "avro==1.11.3", "opentelemetry-exporter-otlp-proto-grpc~=1.16.0", "opentelemetry-exporter-otlp-proto-http~=1.16.0", + "deltalake~=0.17.0" ] [project.optional-dependencies] @@ -69,4 +70,4 @@ build-backend = "setuptools.build_meta" addopts="-n 8" [tool.ruff] -line-length = 120 \ No newline at end of file +line-length = 120 diff --git a/tests/fixtures/s3-delta/data/orders.delta/0-66aaa7ef-36e3-4985-9359-72874e273705-0.parquet b/tests/fixtures/s3-delta/data/orders.delta/0-66aaa7ef-36e3-4985-9359-72874e273705-0.parquet new file mode 100644 index 00000000..1babf776 Binary files /dev/null and b/tests/fixtures/s3-delta/data/orders.delta/0-66aaa7ef-36e3-4985-9359-72874e273705-0.parquet differ diff --git a/tests/fixtures/s3-delta/data/orders.delta/_delta_log/00000000000000000000.json b/tests/fixtures/s3-delta/data/orders.delta/_delta_log/00000000000000000000.json new file mode 100644 index 00000000..efcab346 --- /dev/null +++ b/tests/fixtures/s3-delta/data/orders.delta/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}} +{"metaData":{"id":"89311f21-9efa-47cd-b7ed-9e1440e5df04","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"order_id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"order_timestamp\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}},{\"name\":\"order_total\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1713886932161,"configuration":{}}} +{"add":{"path":"0-66aaa7ef-36e3-4985-9359-72874e273705-0.parquet","partitionValues":{},"size":1481,"modificationTime":1713886932161,"dataChange":true,"stats":"{\"numRecords\": 8, \"minValues\": {\"order_id\": \"1001\", \"order_timestamp\": \"2024-01-01T10:00:00\", \"order_total\": 2000}, \"maxValues\": {\"order_id\": \"1008\", \"order_timestamp\": \"2024-01-02T11:30:00\", \"order_total\": 12000}, \"nullCount\": {\"order_id\": 0, \"order_timestamp\": 0, \"order_total\": 0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1713886932161,"operation":"CREATE TABLE","operationParameters":{"metadata":"{\"configuration\":{},\"createdTime\":1713886932161,\"description\":null,\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"89311f21-9efa-47cd-b7ed-9e1440e5df04\",\"name\":null,\"partitionColumns\":[],\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"order_id\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"order_timestamp\\\",\\\"type\\\":\\\"timestamp_ntz\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"order_total\\\",\\\"type\\\":\\\"long\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\"}","protocol":"{\"minReaderVersion\":3,\"minWriterVersion\":7,\"readerFeatures\":[\"timestampNtz\"],\"writerFeatures\":[\"timestampNtz\"]}","mode":"ErrorIfExists"},"clientVersion":"delta-rs.0.17.1"}} diff --git a/tests/fixtures/s3-delta/datacontract.yaml b/tests/fixtures/s3-delta/datacontract.yaml new file mode 100644 index 00000000..90239b8e --- /dev/null +++ b/tests/fixtures/s3-delta/datacontract.yaml @@ -0,0 +1,22 @@ +dataContractSpecification: 0.9.3 +id: s3-delta-orders +info: + title: S3 Delta Table Test + version: 0.0.1 + owner: my-domain-team +servers: + orders/s3: + type: s3 + endpointUrl: __S3_ENDPOINT_URL__ + location: s3://test-bucket/fixtures/s3-delta/data/orders.delta + format: delta + dataProductId: orders + outputPortId: s3 +models: + orders: + type: table + fields: + order_id: + type: varchar + unique: true + required: true diff --git a/tests/fixtures/s3-delta/helper/create_delta_files.py b/tests/fixtures/s3-delta/helper/create_delta_files.py new file mode 100644 index 00000000..fdfcef41 --- /dev/null +++ b/tests/fixtures/s3-delta/helper/create_delta_files.py @@ -0,0 +1,31 @@ +import os + +import pandas as pd +from deltalake.writer import write_deltalake + +# Ensure the required directory exists +output_dir = "../data" +if not os.path.exists(output_dir): + os.makedirs(output_dir) + +# Sample data for Orders table +orders_data = { + "order_id": ["1001", "1002", "1003", "1004", "1005", "1006", "1007", "1008"], + "order_timestamp": [ + "2024-01-01T10:00:00.000Z", + "2024-01-01T11:30:00.000Z", + "2024-01-01T12:45:00.000Z", + "2024-01-02T08:20:00.000Z", + "2024-01-02T09:15:00.000Z", + "2024-01-02T10:05:00.000Z", + "2024-01-02T10:45:00.000Z", + "2024-01-02T11:30:00.000Z", + ], + "order_total": [5000, 7500, 3000, 2000, 6500, 12000, 4500, 8000], +} + +orders_df = pd.DataFrame(orders_data) +orders_df["order_timestamp"] = pd.to_datetime(orders_df["order_timestamp"], format="%Y-%m-%dT%H:%M:%S.%fZ") + +# Write to Delta table files +write_deltalake(os.path.join(output_dir, "orders.delta"), orders_df) diff --git a/tests/test_test_s3_delta.py b/tests/test_test_s3_delta.py new file mode 100644 index 00000000..7b15c4d3 --- /dev/null +++ b/tests/test_test_s3_delta.py @@ -0,0 +1,56 @@ +import logging +import os +import glob + +import pytest +from testcontainers.minio import MinioContainer + +from datacontract.data_contract import DataContract + +logging.basicConfig(level=logging.DEBUG, force=True) + +datacontract = "fixtures/s3-delta/datacontract.yaml" +file_name = "fixtures/s3-delta/data/orders.delta" +bucket_name = "test-bucket" +s3_access_key = "test-access" +s3_secret_access_key = "test-secret" + + +@pytest.fixture(scope="session") +def minio_container(): + with MinioContainer( + image="quay.io/minio/minio", + access_key=s3_access_key, + secret_key=s3_secret_access_key, + ) as minio_container: + yield minio_container + + +def test_test_s3_delta(minio_container, monkeypatch): + monkeypatch.setenv("DATACONTRACT_S3_ACCESS_KEY_ID", s3_access_key) + monkeypatch.setenv("DATACONTRACT_S3_SECRET_ACCESS_KEY", s3_secret_access_key) + data_contract_str = _prepare_s3_files(minio_container) + data_contract = DataContract(data_contract_str=data_contract_str) + + run = data_contract.test() + + print(run) + assert run.result == "passed" + assert all(check.result == "passed" for check in run.checks) + + +def _prepare_s3_files(minio_container): + s3_endpoint_url = f"http://{minio_container.get_container_host_ip()}:{minio_container.get_exposed_port(9000)}" + minio_client = minio_container.get_client() + minio_client.make_bucket(bucket_name) + + rel_paths = glob.glob(file_name + "/**", recursive=True) + for local_file in rel_paths: + remote_path = local_file + if os.path.isfile(local_file): + minio_client.fput_object(bucket_name, remote_path, local_file) + + with open(datacontract) as data_contract_file: + data_contract_str = data_contract_file.read() + data_contract_str = data_contract_str.replace("__S3_ENDPOINT_URL__", s3_endpoint_url) + return data_contract_str