Skip to content

Commit

Permalink
feat: added preliminary support for reading delta lake tables from S3 (
Browse files Browse the repository at this point in the history
…datacontract#151)

* feat: added preliminary support for reading delta lake tables from S3

* refactor: switch to arrow dataset and removed custom schema
  • Loading branch information
chgl authored Apr 23, 2024
1 parent 050e810 commit 8cc3d94
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 4 deletions.
18 changes: 16 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,16 +286,18 @@ Supported formats:
- parquet
- json
- csv
- delta
- iceberg (coming soon)
- delta (coming soon)

Feel free to create an [issue](https://github.com/datacontract/datacontract-cli/issues), if you need support for an additional type and formats.

### S3

Data Contract CLI can test data that is stored in S3 buckets or any S3-compliant endpoints in various formats.

#### Example
#### Examples

##### JSON

datacontract.yaml
```yaml
Expand All @@ -308,6 +310,18 @@ servers:
delimiter: new_line # new_line, array, or none
```
##### Delta Tables
datacontract.yaml
```yaml
servers:
production:
type: s3
endpointUrl: https://minio.example.com # not needed with AWS S3
location: s3://bucket-name/path/table.delta # path to the Delta table folder containing parquet data files and the _delta_log
format: delta
```
#### Environment Variables
| Environment Variable | Example | Description |
Expand Down
2 changes: 1 addition & 1 deletion datacontract/engines/soda/check_soda_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def check_soda_execute(
scan = Scan()

if server.type in ["s3", "azure", "local"]:
if server.format in ["json", "parquet", "csv"]:
if server.format in ["json", "parquet", "csv", "delta"]:
con = get_duckdb_connection(data_contract, server)
scan.add_duckdb_connection(duckdb_connection=con, data_source_name=server.type)
scan.set_data_source_name(server.type)
Expand Down
17 changes: 17 additions & 0 deletions datacontract/engines/soda/connections/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import duckdb
from datacontract.export.csv_type_converter import convert_to_duckdb_csv_type

from deltalake import DeltaTable


def get_duckdb_connection(data_contract, server):
con = duckdb.connect(database=":memory:")
Expand Down Expand Up @@ -45,6 +47,21 @@ def get_duckdb_connection(data_contract, server):
con.sql(
f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1, columns={columns});"""
)
elif server.format == "delta":
if server.type == "azure":
raise NotImplementedError("Support for Delta Tables on Azure Storage is not implemented yet")

storage_options = {
"AWS_ENDPOINT_URL": server.endpointUrl,
"AWS_ACCESS_KEY_ID": os.getenv("DATACONTRACT_S3_ACCESS_KEY_ID"),
"AWS_SECRET_ACCESS_KEY": os.getenv("DATACONTRACT_S3_SECRET_ACCESS_KEY"),
"AWS_REGION": os.getenv("DATACONTRACT_S3_REGION", "us-east-1"),
"AWS_ALLOW_HTTP": "True" if server.endpointUrl.startswith("http://") else "False",
}

delta_table_arrow = DeltaTable(model_path, storage_options=storage_options).to_pyarrow_dataset()

con.register(model_name, delta_table_arrow)
return con


Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies = [
"avro==1.11.3",
"opentelemetry-exporter-otlp-proto-grpc~=1.16.0",
"opentelemetry-exporter-otlp-proto-http~=1.16.0",
"deltalake~=0.17.0"
]

[project.optional-dependencies]
Expand Down Expand Up @@ -69,4 +70,4 @@ build-backend = "setuptools.build_meta"
addopts="-n 8"

[tool.ruff]
line-length = 120
line-length = 120
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}}
{"metaData":{"id":"89311f21-9efa-47cd-b7ed-9e1440e5df04","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"order_id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"order_timestamp\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}},{\"name\":\"order_total\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1713886932161,"configuration":{}}}
{"add":{"path":"0-66aaa7ef-36e3-4985-9359-72874e273705-0.parquet","partitionValues":{},"size":1481,"modificationTime":1713886932161,"dataChange":true,"stats":"{\"numRecords\": 8, \"minValues\": {\"order_id\": \"1001\", \"order_timestamp\": \"2024-01-01T10:00:00\", \"order_total\": 2000}, \"maxValues\": {\"order_id\": \"1008\", \"order_timestamp\": \"2024-01-02T11:30:00\", \"order_total\": 12000}, \"nullCount\": {\"order_id\": 0, \"order_timestamp\": 0, \"order_total\": 0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1713886932161,"operation":"CREATE TABLE","operationParameters":{"metadata":"{\"configuration\":{},\"createdTime\":1713886932161,\"description\":null,\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"89311f21-9efa-47cd-b7ed-9e1440e5df04\",\"name\":null,\"partitionColumns\":[],\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"order_id\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"order_timestamp\\\",\\\"type\\\":\\\"timestamp_ntz\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"order_total\\\",\\\"type\\\":\\\"long\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\"}","protocol":"{\"minReaderVersion\":3,\"minWriterVersion\":7,\"readerFeatures\":[\"timestampNtz\"],\"writerFeatures\":[\"timestampNtz\"]}","mode":"ErrorIfExists"},"clientVersion":"delta-rs.0.17.1"}}
22 changes: 22 additions & 0 deletions tests/fixtures/s3-delta/datacontract.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
dataContractSpecification: 0.9.3
id: s3-delta-orders
info:
title: S3 Delta Table Test
version: 0.0.1
owner: my-domain-team
servers:
orders/s3:
type: s3
endpointUrl: __S3_ENDPOINT_URL__
location: s3://test-bucket/fixtures/s3-delta/data/orders.delta
format: delta
dataProductId: orders
outputPortId: s3
models:
orders:
type: table
fields:
order_id:
type: varchar
unique: true
required: true
31 changes: 31 additions & 0 deletions tests/fixtures/s3-delta/helper/create_delta_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os

import pandas as pd
from deltalake.writer import write_deltalake

# Ensure the required directory exists
output_dir = "../data"
if not os.path.exists(output_dir):
os.makedirs(output_dir)

# Sample data for Orders table
orders_data = {
"order_id": ["1001", "1002", "1003", "1004", "1005", "1006", "1007", "1008"],
"order_timestamp": [
"2024-01-01T10:00:00.000Z",
"2024-01-01T11:30:00.000Z",
"2024-01-01T12:45:00.000Z",
"2024-01-02T08:20:00.000Z",
"2024-01-02T09:15:00.000Z",
"2024-01-02T10:05:00.000Z",
"2024-01-02T10:45:00.000Z",
"2024-01-02T11:30:00.000Z",
],
"order_total": [5000, 7500, 3000, 2000, 6500, 12000, 4500, 8000],
}

orders_df = pd.DataFrame(orders_data)
orders_df["order_timestamp"] = pd.to_datetime(orders_df["order_timestamp"], format="%Y-%m-%dT%H:%M:%S.%fZ")

# Write to Delta table files
write_deltalake(os.path.join(output_dir, "orders.delta"), orders_df)
56 changes: 56 additions & 0 deletions tests/test_test_s3_delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import logging
import os
import glob

import pytest
from testcontainers.minio import MinioContainer

from datacontract.data_contract import DataContract

logging.basicConfig(level=logging.DEBUG, force=True)

datacontract = "fixtures/s3-delta/datacontract.yaml"
file_name = "fixtures/s3-delta/data/orders.delta"
bucket_name = "test-bucket"
s3_access_key = "test-access"
s3_secret_access_key = "test-secret"


@pytest.fixture(scope="session")
def minio_container():
with MinioContainer(
image="quay.io/minio/minio",
access_key=s3_access_key,
secret_key=s3_secret_access_key,
) as minio_container:
yield minio_container


def test_test_s3_delta(minio_container, monkeypatch):
monkeypatch.setenv("DATACONTRACT_S3_ACCESS_KEY_ID", s3_access_key)
monkeypatch.setenv("DATACONTRACT_S3_SECRET_ACCESS_KEY", s3_secret_access_key)
data_contract_str = _prepare_s3_files(minio_container)
data_contract = DataContract(data_contract_str=data_contract_str)

run = data_contract.test()

print(run)
assert run.result == "passed"
assert all(check.result == "passed" for check in run.checks)


def _prepare_s3_files(minio_container):
s3_endpoint_url = f"http://{minio_container.get_container_host_ip()}:{minio_container.get_exposed_port(9000)}"
minio_client = minio_container.get_client()
minio_client.make_bucket(bucket_name)

rel_paths = glob.glob(file_name + "/**", recursive=True)
for local_file in rel_paths:
remote_path = local_file
if os.path.isfile(local_file):
minio_client.fput_object(bucket_name, remote_path, local_file)

with open(datacontract) as data_contract_file:
data_contract_str = data_contract_file.read()
data_contract_str = data_contract_str.replace("__S3_ENDPOINT_URL__", s3_endpoint_url)
return data_contract_str

0 comments on commit 8cc3d94

Please sign in to comment.