Skip to content

Commit

Permalink
Support azure data lake
Browse files Browse the repository at this point in the history
  • Loading branch information
jochenchrist committed Apr 23, 2024
1 parent f1a0014 commit f976772
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 89 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added test for Azure Blob storage an Azure Data Lake Storage (#146)


## [0.10.1] - 2024-04-19

### Fixed
Expand Down
202 changes: 117 additions & 85 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,40 +255,41 @@ Commands
╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
```

Data Contract CLI can connect to data sources and run schema and quality tests to verify that the data contract is valid.
Data Contract CLI connects to a data source and runs schema and quality tests to verify that the data contract is valid.

```bash
$ datacontract test --server production datacontract.yaml
```

To connect to the databases the `server` block in the datacontract.yaml is used to set up the connection. In addition, credentials, such as username and passwords, may be defined with environment variables.
To connect to the databases the `server` block in the datacontract.yaml is used to set up the connection.
In addition, credentials, such as username and passwords, may be defined with environment variables.

The application uses different engines, based on the server `type`.
Internally, it connects with DuckDB, Spark, or a native connection and executes the most tests with soda-core and fastjsonschema.
Credentials are read from the environment variables.
Internally, it connects with DuckDB, Spark, or a native connection and executes the most tests with _soda-core_ and _fastjsonschema_.

Credentials are provided with environment variables.

Supported server types:

| Type | Format | Status |
|--------------|------------|---------------------------------------------------------------------------------|
| `s3` | `parquet` ||
| `s3` | `json` ||
| `s3` | `csv` ||
| `s3` | `delta` | Coming soon ([#24](https://github.com/datacontract/datacontract-cli/issues/24)) |
| `s3` | `iceberg` | Coming soon |
| `postgres` | n/a ||
| `snowflake` | n/a ||
| `bigquery` | n/a ||
| `redshift` | n/a | Coming soon |
| `databricks` | n/a ||
| `kafka` | `json` ||
| `kafka` | `avro` | Coming soon |
| `kafka` | `protobuf` | Coming soon |
| `local` | `parquet` ||
| `local` | `json` ||
| `local` | `csv` ||

Feel free to create an issue, if you need support for an additional type.
- [s3](#S3)
- [bigquery](#bigquery)
- [azure](#azure)
- [databricks](#databricks)
- [databricks (programmatic)](#databricks-programmatic)
- [snowflake](#snowflake)
- [kafka](#kafka)
- [postgres](#postgres)
- [local](#local)

Supported formats:

- parquet
- json
- csv
- iceberg (coming soon)
- delta (coming soon)

Feel free to create an [issue](https://github.com/datacontract/datacontract-cli/issues), if you need support for an additional type and formats.

### S3

Expand Down Expand Up @@ -316,97 +317,62 @@ servers:
| `DATACONTRACT_S3_SECRET_ACCESS_KEY` | `93S7LRrJcqLaaaa/XXXXXXXXXXXXX` | AWS Secret Access Key |


### Postgres

Data Contract CLI can test data in Postgres or Postgres-compliant databases (e.g., RisingWave).

#### Example

datacontract.yaml
```yaml
servers:
postgres:
type: postgres
host: localhost
port: 5432
database: postgres
schema: public
models:
my_table_1: # corresponds to a table
type: table
fields:
my_column_1: # corresponds to a column
type: varchar
```

#### Environment Variables

| Environment Variable | Example | Description |
|----------------------------------|--------------------|-------------|
| `DATACONTRACT_POSTGRES_USERNAME` | `postgres` | Username |
| `DATACONTRACT_POSTGRES_PASSWORD` | `mysecretpassword` | Password |

### BigQuery

### Snowflake
We support authentication to BigQuery using Service Account Key. The used Service Account should include the roles:
* BigQuery Job User
* BigQuery Data Viewer

Data Contract CLI can test data in Snowflake.

#### Example

datacontract.yaml
```yaml
servers:
snowflake:
type: snowflake
account: abcdefg-xn12345
database: ORDER_DB
schema: ORDERS_PII_V2
production:
type: bigquery
project: datameshexample-product
dataset: datacontract_cli_test_dataset
models:
my_table_1: # corresponds to a table
datacontract_cli_test_table: # corresponds to a BigQuery table
type: table
fields:
my_column_1: # corresponds to a column
type: varchar
fields: ...
```

#### Environment Variables

| Environment Variable | Example | Description |
|------------------------------------|--------------------|-----------------------------------------------------|
| `DATACONTRACT_SNOWFLAKE_USERNAME` | `datacontract` | Username |
| `DATACONTRACT_SNOWFLAKE_PASSWORD` | `mysecretpassword` | Password |
| `DATACONTRACT_SNOWFLAKE_ROLE` | `DATAVALIDATION` | The snowflake role to use. |
| `DATACONTRACT_SNOWFLAKE_WAREHOUSE` | `COMPUTE_WH` | The Snowflake Warehouse to use executing the tests. |
| Environment Variable | Example | Description |
|----------------------------------------------|---------------------------|---------------------------------------------------------|
| `DATACONTRACT_BIGQUERY_ACCOUNT_INFO_JSON_PATH` | `~/service-access-key.json` | Service Access key as saved on key creation by BigQuery |


### BigQuery

We support authentication to BigQuery using Service Account Key. The used Service Account should include the roles:
* BigQuery Job User
* BigQuery Data Viewer
### Azure

Data Contract CLI can test data that is stored in Azure Blob storage or Azure Data Lake Storage (Gen2) (ADLS) in various formats.

#### Example

datacontract.yaml
```yaml
servers:
production:
type: bigquery
project: datameshexample-product
dataset: datacontract_cli_test_dataset
models:
datacontract_cli_test_table: # corresponds to a BigQuery table
type: table
fields: ...
type: azure
location: abfss://datameshdatabricksdemo.dfs.core.windows.net/dataproducts/inventory_events/*.parquet
format: parquet
```

#### Environment Variables

| Environment Variable | Example | Description |
|----------------------------------------------|---------------------------|---------------------------------------------------------|
| `DATACONTRACT_BIGQUERY_ACCOUNT_INFO_JSON_PATH` | `~/service-access-key.json` | Service Access key as saved on key creation by BigQuery |
Authentication works with an Azure Service Principal (SPN) aka App Registration with a secret.

| Environment Variable | Example | Description |
|-----------------------------------|-------------------------------|------------------------------------------------------|
| `DATACONTRACT_AZURE_TENANT_ID` | `79f5b80f-10ff-40b9-9d1f-774b42d605fc` | The Azure Tenant ID |
| `DATACONTRACT_AZURE_CLIENT_ID` | `3cf7ce49-e2e9-4cbc-a922-4328d4a58622` | The ApplicationID / ClientID of the app registration |
| `DATACONTRACT_AZURE_CLIENT_SECRET` | `yZK8Q~GWO1MMXXXXXXXXXXXXX` | The Client Secret value |



### Databricks
Expand Down Expand Up @@ -477,6 +443,41 @@ run = data_contract.test()
run.result
```


### Snowflake

Data Contract CLI can test data in Snowflake.

#### Example

datacontract.yaml
```yaml
servers:
snowflake:
type: snowflake
account: abcdefg-xn12345
database: ORDER_DB
schema: ORDERS_PII_V2
models:
my_table_1: # corresponds to a table
type: table
fields:
my_column_1: # corresponds to a column
type: varchar
```

#### Environment Variables

| Environment Variable | Example | Description |
|------------------------------------|--------------------|-----------------------------------------------------|
| `DATACONTRACT_SNOWFLAKE_USERNAME` | `datacontract` | Username |
| `DATACONTRACT_SNOWFLAKE_PASSWORD` | `mysecretpassword` | Password |
| `DATACONTRACT_SNOWFLAKE_ROLE` | `DATAVALIDATION` | The snowflake role to use. |
| `DATACONTRACT_SNOWFLAKE_WAREHOUSE` | `COMPUTE_WH` | The Snowflake Warehouse to use executing the tests. |



### Kafka

Kafka support is currently considered experimental.
Expand All @@ -501,6 +502,37 @@ servers:
| `DATACONTRACT_KAFKA_SASL_PASSWORD` | `xxx` | The SASL password (secret). |


### Postgres

Data Contract CLI can test data in Postgres or Postgres-compliant databases (e.g., RisingWave).

#### Example

datacontract.yaml
```yaml
servers:
postgres:
type: postgres
host: localhost
port: 5432
database: postgres
schema: public
models:
my_table_1: # corresponds to a table
type: table
fields:
my_column_1: # corresponds to a column
type: varchar
```

#### Environment Variables

| Environment Variable | Example | Description |
|----------------------------------|--------------------|-------------|
| `DATACONTRACT_POSTGRES_USERNAME` | `postgres` | Username |
| `DATACONTRACT_POSTGRES_PASSWORD` | `mysecretpassword` | Password |



### export

Expand Down
2 changes: 1 addition & 1 deletion datacontract/engines/soda/check_soda_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def check_soda_execute(
run.log_info("Running engine soda-core")
scan = Scan()

if server.type == "s3" or server.type == "local":
if server.type in ["s3", "azure", "local"]:
if server.format in ["json", "parquet", "csv"]:
con = get_duckdb_connection(data_contract, server)
scan.add_duckdb_connection(duckdb_connection=con, data_source_name=server.type)
Expand Down
32 changes: 31 additions & 1 deletion datacontract/engines/soda/connections/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ def get_duckdb_connection(data_contract, server):
path = server.path
if server.type == "s3":
path = server.location
setup_s3_connection(con, server)
setup_s3_connection(con, server)
if server.type == "azure":
path = server.location
setup_azure_connection(con, server)
for model_name, model in data_contract.models.items():
model_path = path
if "{model}" in model_path:
Expand Down Expand Up @@ -74,3 +77,30 @@ def setup_s3_connection(con, server):
SET s3_access_key_id = '{s3_access_key_id}';
SET s3_secret_access_key = '{s3_secret_access_key}';
""")


def setup_azure_connection(con, server):
tenant_id = os.getenv("DATACONTRACT_AZURE_TENANT_ID")
client_id = os.getenv("DATACONTRACT_AZURE_CLIENT_ID")
client_secret = os.getenv("DATACONTRACT_AZURE_CLIENT_SECRET")

if tenant_id is None:
raise ValueError("Error: Environment variable DATACONTRACT_AZURE_TENANT_ID is not set")
if client_id is None:
raise ValueError("Error: Environment variable DATACONTRACT_AZURE_CLIENT_ID is not set")
if client_secret is None:
raise ValueError("Error: Environment variable DATACONTRACT_AZURE_CLIENT_SECRET is not set")

con.install_extension("azure")
con.load_extension("azure")

con.sql(f"""
CREATE SECRET azure_spn (
TYPE AZURE,
PROVIDER SERVICE_PRINCIPAL,
TENANT_ID '{tenant_id}',
CLIENT_ID '{client_id}',
CLIENT_SECRET '{client_secret}'
);
""")

23 changes: 23 additions & 0 deletions tests/fixtures/azure-parquet-remote/datacontract.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
dataContractSpecification: 0.9.2
id: orders-unit-test
info:
title: Orders Unit Test
version: 1.0.0
servers:
production:
type: azure
location: abfss://datameshdatabricksdemo.dfs.core.windows.net/dataproducts/inventory_events/*.parquet
format: parquet
models:
orders:
fields:
updated_at:
type: varchar
available:
type: varchar # for historic reasons
location:
type: varchar
minLength: 2
maxLength: 2
sku:
type: varchar
1 change: 0 additions & 1 deletion tests/fixtures/export/rdf/datacontract-complex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ info:
servers:
production:
type: "s3"
endpointUrl: __S3_ENDPOINT_URL__
location: "s3://multiple-bucket/fixtures/s3-json-multiple-models/data/{model}/*.json"
format: "json"
delimiter: "new_line"
Expand Down
1 change: 0 additions & 1 deletion tests/test_export_rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ def test_to_rdf_complex():
\"\"\" ] .
<production> a dc1:Server ;
dcx:endpointUrl "__S3_ENDPOINT_URL__" ;
dc1:delimiter "new_line" ;
dc1:format "json" ;
dc1:location "s3://multiple-bucket/fixtures/s3-json-multiple-models/data/{model}/*.json" ;
Expand Down
Loading

0 comments on commit f976772

Please sign in to comment.