Skip to content

Commit

Permalink
Fixed testing BigQuery tables with BOOL fields
Browse files Browse the repository at this point in the history
Apply ruff format
  • Loading branch information
jochenchrist committed May 26, 2024
1 parent d59518e commit 8d20f60
Show file tree
Hide file tree
Showing 28 changed files with 325 additions and 216 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Added support for `sqlserver` (#196)

- Added support for `sqlserver` (#196)
- `datacontract export --format dbml`: Export to [Database Markup Language (DBML)](https://dbml.dbdiagram.io/home/) (#135)
- `datacontract export --format avro`: Now supports config map on field level for logicalTypes and default values [Custom Avro Properties](./README.md#custom-avro-properties)
- `datacontract import --format avro`: Now supports importing logicalType and default definition on avro files [Custom Avro Properties](./README.md#custom-avro-properties)
- Support `bigqueryType` for testing BigQuery types

### Fixed

- Fixed jsonschema export for models with empty object-typed fields (#218)
- Fixed testing BigQuery tables with BOOL fields

## [0.10.4] - 2024-05-17

Expand Down
6 changes: 3 additions & 3 deletions datacontract/breaking/breaking.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,13 @@ def field_breaking_changes(
)
)
continue
if field_definition_field == "items" and old_field.type == 'array' and new_field.type == 'array':

if field_definition_field == "items" and old_field.type == "array" and new_field.type == "array":
results.extend(
field_breaking_changes(
old_field=old_value,
new_field=new_value,
composition=composition + ['items'],
composition=composition + ["items"],
new_path=new_path,
include_severities=include_severities,
)
Expand Down
21 changes: 14 additions & 7 deletions datacontract/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from importlib import metadata
from pathlib import Path
from typing import Iterable, Optional
from typing import List

import typer
from click import Context
Expand All @@ -10,15 +11,14 @@
from rich.table import Table
from typer.core import TyperGroup
from typing_extensions import Annotated
from typing import List

from datacontract.catalog.catalog import create_index_html, create_data_contract_html
from datacontract.catalog.catalog import create_index_html, \
create_data_contract_html
from datacontract.data_contract import DataContract
from datacontract.init.download_datacontract_file import download_datacontract_file, FileExistsException

from datacontract.init.download_datacontract_file import \
download_datacontract_file, FileExistsException
from datacontract.publish.publish import publish_to_datamesh_manager


console = Console()


Expand Down Expand Up @@ -230,10 +230,17 @@ class ImportFormat(str, Enum):
@app.command(name="import")
def import_(
format: Annotated[ImportFormat, typer.Option(help="The format of the source file.")],
source: Annotated[Optional[str], typer.Option(help="The path to the file or Glue Database that should be imported.")] = None,
source: Annotated[
Optional[str], typer.Option(help="The path to the file or Glue Database that should be imported.")
] = None,
bigquery_project: Annotated[Optional[str], typer.Option(help="The bigquery project id.")] = None,
bigquery_dataset: Annotated[Optional[str], typer.Option(help="The bigquery dataset id.")] = None,
bigquery_table: Annotated[Optional[List[str]], typer.Option(help="List of table ids to import from the bigquery API (repeat for multiple table ids, leave empty for all tables in the dataset).")] = None,
bigquery_table: Annotated[
Optional[List[str]],
typer.Option(
help="List of table ids to import from the bigquery API (repeat for multiple table ids, leave empty for all tables in the dataset)."
),
] = None,
):
"""
Create a data contract from the given source location. Prints to stdout.
Expand Down
55 changes: 39 additions & 16 deletions datacontract/data_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,56 @@
import yaml
from pyspark.sql import SparkSession

from datacontract.breaking.breaking import models_breaking_changes, quality_breaking_changes
from datacontract.breaking.breaking import models_breaking_changes, \
quality_breaking_changes
from datacontract.engines.datacontract.check_that_datacontract_contains_valid_servers_configuration import (
check_that_datacontract_contains_valid_server_configuration,
)
from datacontract.engines.fastjsonschema.check_jsonschema import check_jsonschema
from datacontract.engines.fastjsonschema.check_jsonschema import \
check_jsonschema
from datacontract.engines.soda.check_soda_execute import check_soda_execute
from datacontract.export.avro_converter import to_avro_schema_json
from datacontract.export.avro_idl_converter import to_avro_idl
from datacontract.export.bigquery_converter import to_bigquery_json
from datacontract.export.dbml_converter import to_dbml_diagram
from datacontract.export.dbt_converter import to_dbt_models_yaml, \
to_dbt_sources_yaml, to_dbt_staging_sql
from datacontract.export.go_converter import to_go_types
from datacontract.export.great_expectations_converter import \
to_great_expectations
from datacontract.export.html_export import to_html
from datacontract.export.jsonschema_converter import to_jsonschema_json
from datacontract.export.odcs_converter import to_odcs_yaml
from datacontract.export.protobuf_converter import to_protobuf
from datacontract.export.pydantic_converter import to_pydantic_model_str
from datacontract.export.go_converter import to_go_types
from datacontract.export.rdf_converter import to_rdf_n3
from datacontract.export.sodacl_converter import to_sodacl_yaml
from datacontract.export.sql_converter import to_sql_ddl, to_sql_query
from datacontract.export.terraform_converter import to_terraform
from datacontract.imports.avro_importer import import_avro
from datacontract.imports.bigquery_importer import import_bigquery_from_api, import_bigquery_from_json
from datacontract.imports.bigquery_importer import import_bigquery_from_api, \
import_bigquery_from_json
from datacontract.imports.glue_importer import import_glue
from datacontract.imports.sql_importer import import_sql
from datacontract.imports.jsonschema_importer import import_jsonschema
from datacontract.imports.sql_importer import import_sql
from datacontract.integration.publish_datamesh_manager import \
publish_datamesh_manager
from datacontract.integration.publish_opentelemetry import publish_opentelemetry
from datacontract.lint import resolve
from datacontract.lint.linters.description_linter import DescriptionLinter
from datacontract.lint.linters.example_model_linter import ExampleModelLinter
from datacontract.lint.linters.field_pattern_linter import FieldPatternLinter
from datacontract.lint.linters.field_reference_linter import FieldReferenceLinter
from datacontract.lint.linters.field_reference_linter import \
FieldReferenceLinter
from datacontract.lint.linters.notice_period_linter import NoticePeriodLinter
from datacontract.lint.linters.quality_schema_linter import QualityUsesSchemaLinter
from datacontract.lint.linters.valid_constraints_linter import ValidFieldConstraintsLinter
from datacontract.model.breaking_change import BreakingChanges, BreakingChange, Severity
from datacontract.model.data_contract_specification import DataContractSpecification, Server
from datacontract.lint.linters.quality_schema_linter import \
QualityUsesSchemaLinter
from datacontract.lint.linters.valid_constraints_linter import \
ValidFieldConstraintsLinter
from datacontract.model.breaking_change import BreakingChanges, BreakingChange, \
Severity
from datacontract.model.data_contract_specification import \
DataContractSpecification, Server
from datacontract.model.exceptions import DataContractException
from datacontract.model.run import Run, Check

Expand Down Expand Up @@ -331,9 +339,13 @@ def export(self, export_format, model: str = "all", rdf_base: str = None, sql_se
model_name, model_value = self._check_models_for_export(data_contract, model, export_format)
found_server = data_contract.servers.get(self._server)
if found_server is None:
raise RuntimeError(f"Export to {export_format} requires selecting a bigquery server from the data contract.")
if found_server.type != 'bigquery':
raise RuntimeError(f"Export to {export_format} requires selecting a bigquery server from the data contract.")
raise RuntimeError(
f"Export to {export_format} requires selecting a bigquery server from the data contract."
)
if found_server.type != "bigquery":
raise RuntimeError(
f"Export to {export_format} requires selecting a bigquery server from the data contract."
)
return to_bigquery_json(model_name, model_value, found_server)
if export_format == "dbml":
found_server = data_contract.servers.get(self._server)
Expand Down Expand Up @@ -392,7 +404,9 @@ def _get_examples_server(self, data_contract, run, tmp_dir):
run.log_info(f"Using {server} for testing the examples")
return server

def _check_models_for_export(self, data_contract: DataContractSpecification, model: str, export_format: str) -> typing.Tuple[str, str]:
def _check_models_for_export(
self, data_contract: DataContractSpecification, model: str, export_format: str
) -> typing.Tuple[str, str]:
if data_contract.models is None:
raise RuntimeError(f"Export to {export_format} requires models in the data contract.")

Expand All @@ -415,7 +429,14 @@ def _check_models_for_export(self, data_contract: DataContractSpecification, mod

return model_name, model_value

def import_from_source(self, format: str, source: typing.Optional[str] = None, bigquery_tables: typing.Optional[typing.List[str]] = None, bigquery_project: typing.Optional[str] = None, bigquery_dataset: typing.Optional[str] = None) -> DataContractSpecification:
def import_from_source(
self,
format: str,
source: typing.Optional[str] = None,
bigquery_tables: typing.Optional[typing.List[str]] = None,
bigquery_project: typing.Optional[str] = None,
bigquery_dataset: typing.Optional[str] = None,
) -> DataContractSpecification:
data_contract_specification = DataContract.init()

if format == "sql":
Expand All @@ -430,7 +451,9 @@ def import_from_source(self, format: str, source: typing.Optional[str] = None, b
if source is not None:
data_contract_specification = import_bigquery_from_json(data_contract_specification, source)
else:
data_contract_specification = import_bigquery_from_api(data_contract_specification, bigquery_tables, bigquery_project, bigquery_dataset)
data_contract_specification = import_bigquery_from_api(
data_contract_specification, bigquery_tables, bigquery_project, bigquery_dataset
)
else:
print(f"Import format {format} not supported.")

Expand Down
3 changes: 1 addition & 2 deletions datacontract/engines/soda/connections/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ def setup_s3_connection(con, server):
s3_endpoint = server.endpointUrl.removeprefix("http://").removeprefix("https://")
if server.endpointUrl.startswith("http://"):
use_ssl = "false"
url_style = 'path'

url_style = "path"

if s3_access_key_id is not None:
con.sql(f"""
Expand Down
12 changes: 7 additions & 5 deletions datacontract/engines/soda/connections/sqlserver.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os

import yaml

from datacontract.model.data_contract_specification import Server


def to_sqlserver_soda_configuration(server: Server) -> str:
"""Serialize server config to soda configuration.
Expand All @@ -19,21 +21,21 @@ def to_sqlserver_soda_configuration(server: Server) -> str:
encrypt: false
trust_server_certificate: false
driver: ODBC Driver 18 for SQL Server
"""
"""
# with service account key, using an external json file
soda_configuration = {
f"data_source {server.type}": {
"type": "sqlserver",
"host": server.host,
"port": str(server.port),
"username": os.getenv("DATACONTRACT_SQLSERVER_USERNAME", ''),
"password": os.getenv("DATACONTRACT_SQLSERVER_PASSWORD", ''),
"username": os.getenv("DATACONTRACT_SQLSERVER_USERNAME", ""),
"password": os.getenv("DATACONTRACT_SQLSERVER_PASSWORD", ""),
"database": server.database,
"schema": server.schema_,
"trusted_connection": os.getenv("DATACONTRACT_SQLSERVER_TRUSTED_CONNECTION", False),
"trust_server_certificate": os.getenv("DATACONTRACT_SQLSERVER_TRUST_SERVER_CERTIFICATE", False),
"encrypt": os.getenv("DATACONTRACT_SQLSERVER_ENCRYPTED_CONNECTION", True),
"driver": server.driver
"encrypt": os.getenv("DATACONTRACT_SQLSERVER_ENCRYPTED_CONNECTION", True),
"driver": server.driver,
}
}

Expand Down
3 changes: 2 additions & 1 deletion datacontract/export/avro_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def to_avro_type(field: Field, field_name: str) -> str | dict:
else:
return "bytes"


def to_avro_logical_type(type: str) -> str:
if type in ["timestamp", "timestamp_tz"]:
return "timestamp-millis"
Expand All @@ -91,4 +92,4 @@ def to_avro_logical_type(type: str) -> str:
elif type in ["date"]:
return "date"
else:
return ""
return ""
33 changes: 17 additions & 16 deletions datacontract/export/bigquery_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,21 @@
from datacontract.model.data_contract_specification import Model, Field, Server
from datacontract.model.exceptions import DataContractException


def to_bigquery_json(model_name: str, model_value: Model, server: Server) -> str:
bigquery_table = to_bigquery_schema(model_name, model_value, server)
return json.dumps(bigquery_table, indent=2)


def to_bigquery_schema(model_name: str, model_value: Model, server: Server) -> dict:
return {
"kind": "bigquery#table",
"tableReference": {
"datasetId": server.dataset,
"projectId": server.project,
"tableId": model_name
},
"tableReference": {"datasetId": server.dataset, "projectId": server.project, "tableId": model_name},
"description": model_value.description,
"schema": {
"fields": to_fields_array(model_value.fields)
}
"schema": {"fields": to_fields_array(model_value.fields)},
}


def to_fields_array(fields: Dict[str, Field]) -> List[Dict[str, Field]]:
bq_fields = []
for field_name, field in fields.items():
Expand All @@ -32,24 +29,25 @@ def to_fields_array(fields: Dict[str, Field]) -> List[Dict[str, Field]]:


def to_field(field_name: str, field: Field) -> dict:

bq_type = map_type_to_bigquery(field.type, field_name)
bq_field = {
"name": field_name,
"type": bq_type,
"mode": "REQUIRED" if field.required else "NULLABLE",
"description": field.description
"description": field.description,
}

# handle arrays
if field.type == 'array':
bq_field["mode"] = 'REPEATED'
if field.items.type == 'object':
if field.type == "array":
bq_field["mode"] = "REPEATED"
if field.items.type == "object":
# in case the array type is a complex object, we want to copy all its fields
bq_field["fields"] = to_fields_array(field.items.fields)
else:
# otherwise we make up a structure that gets us a single field of the specified type
bq_field["fields"] = to_fields_array({ f"{field_name}_1": Field(type=field.items.type, required=False, description="")})
bq_field["fields"] = to_fields_array(
{f"{field_name}_1": Field(type=field.items.type, required=False, description="")}
)
# all of these can carry other fields
elif bq_type.lower() in ["record", "struct"]:
bq_field["fields"] = to_fields_array(field.fields)
Expand All @@ -65,6 +63,7 @@ def to_field(field_name: str, field: Field) -> dict:

return bq_field


def map_type_to_bigquery(type_str: str, field_name: str) -> str:
logger = logging.getLogger(__name__)
if type_str.lower() in ["string", "varchar", "text"]:
Expand All @@ -78,7 +77,7 @@ def map_type_to_bigquery(type_str: str, field_name: str) -> str:
elif type_str == "float":
return "FLOAT"
elif type_str == "boolean":
return "BOOLEAN"
return "BOOL"
elif type_str.lower() in ["timestamp", "timestamp_tz"]:
return "TIMESTAMP"
elif type_str == "date":
Expand All @@ -94,7 +93,9 @@ def map_type_to_bigquery(type_str: str, field_name: str) -> str:
elif type_str == "struct":
return "STRUCT"
elif type_str == "null":
logger.info(f"Can't properly map {field_name} to bigquery Schema, as 'null' is not supported as a type. Mapping it to STRING.")
logger.info(
f"Can't properly map {field_name} to bigquery Schema, as 'null' is not supported as a type. Mapping it to STRING."
)
return "STRING"
else:
raise DataContractException(
Expand Down
Loading

0 comments on commit 8d20f60

Please sign in to comment.