Skip to content

Commit

Permalink
Support Bigquery JSON as export format (datacontract#198)
Browse files Browse the repository at this point in the history
* Support Bigquery JSON as export format

- have a new exporter for BigQuery JSON Files
- hook it up to the CLI
- simplify the checking for model selection during export

* Use the correct type for the 'schema' field contents

* Mention Bigquery Export in the Changelog
  • Loading branch information
jpraetorius authored May 14, 2024
1 parent a6be213 commit fb343fe
Show file tree
Hide file tree
Showing 9 changed files with 689 additions and 149 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- datacontract catalog: Search form
- `datacontract import --format bigquery`: Import from BigQuery format
- `datacontract export --format bigquery`: Export to BigQuery format
- `datacontract publish`: Publish the data contract to the Data Mesh Manager

## [0.10.3] - 2024-05-05
Expand Down
75 changes: 40 additions & 35 deletions README.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions datacontract/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ class ExportFormat(str, Enum):
sql = "sql"
sql_query = "sql-query"
html = "html"
bigquery = "bigquery"


@app.command()
Expand Down
155 changes: 42 additions & 113 deletions datacontract/data_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from datacontract.engines.soda.check_soda_execute import check_soda_execute
from datacontract.export.avro_converter import to_avro_schema_json
from datacontract.export.avro_idl_converter import to_avro_idl
from datacontract.export.bigquery_converter import to_bigquery_json
from datacontract.export.dbt_converter import to_dbt_models_yaml, \
to_dbt_sources_yaml, to_dbt_staging_sql
from datacontract.export.great_expectations_converter import \
Expand Down Expand Up @@ -290,86 +291,26 @@ def export(self, export_format, model: str = "all", rdf_base: str = None, sql_se
inline_quality=True,
)
if export_format == "jsonschema":
if data_contract.models is None:
raise RuntimeError(f"Export to {export_format} requires models in the data contract.")

model_names = list(data_contract.models.keys())

if model == "all":
if len(data_contract.models.items()) != 1:
raise RuntimeError(
f"Export to {export_format} is model specific. Specify the model via --model $MODEL_NAME. Available models: {model_names}"
)

model_name, model_value = next(iter(data_contract.models.items()))
return to_jsonschema_json(model_name, model_value)
else:
model_name = model
model_value = data_contract.models.get(model_name)
if model_value is None:
raise RuntimeError(
f"Model {model_name} not found in the data contract. Available models: {model_names}"
)

return to_jsonschema_json(model_name, model_value)
model_name, model_value = self._check_models_for_export(data_contract, model, export_format)
return to_jsonschema_json(model_name, model_value)
if export_format == "sodacl":
return to_sodacl_yaml(data_contract)
if export_format == "dbt":
return to_dbt_models_yaml(data_contract)
if export_format == "dbt-sources":
return to_dbt_sources_yaml(data_contract, self._server)
if export_format == "dbt-staging-sql":
if data_contract.models is None:
raise RuntimeError(f"Export to {export_format} requires models in the data contract.")

model_names = list(data_contract.models.keys())

if model == "all":
if len(data_contract.models.items()) != 1:
raise RuntimeError(
f"Export to {export_format} is model specific. Specify the model via --model $MODEL_NAME. Available models: {model_names}"
)

model_name, model_value = next(iter(data_contract.models.items()))
return to_dbt_staging_sql(data_contract, model_name, model_value)
else:
model_name = model
model_value = data_contract.models.get(model_name)
if model_value is None:
raise RuntimeError(
f"Model {model_name} not found in the data contract. Available models: {model_names}"
)

return to_dbt_staging_sql(data_contract, model_name, model_value)
model_name, model_value = self._check_models_for_export(data_contract, model, export_format)
return to_dbt_staging_sql(data_contract, model_name, model_value)
if export_format == "odcs":
return to_odcs_yaml(data_contract)
if export_format == "rdf":
return to_rdf_n3(data_contract, rdf_base)
if export_format == "protobuf":
return to_protobuf(data_contract)
if export_format == "avro":
if data_contract.models is None:
raise RuntimeError(f"Export to {export_format} requires models in the data contract.")

model_names = list(data_contract.models.keys())

if model == "all":
if len(data_contract.models.items()) != 1:
raise RuntimeError(
f"Export to {export_format} is model specific. Specify the model via --model $MODEL_NAME. Available models: {model_names}"
)

model_name, model_value = next(iter(data_contract.models.items()))
return to_avro_schema_json(model_name, model_value)
else:
model_name = model
model_value = data_contract.models.get(model_name)
if model_value is None:
raise RuntimeError(
f"Model {model_name} not found in the data contract. Available models: {model_names}"
)

return to_avro_schema_json(model_name, model_value)
model_name, model_value = self._check_models_for_export(data_contract, model, export_format)
return to_avro_schema_json(model_name, model_value)
if export_format == "avro-idl":
return to_avro_idl(data_contract)
if export_format == "terraform":
Expand All @@ -378,59 +319,24 @@ def export(self, export_format, model: str = "all", rdf_base: str = None, sql_se
server_type = self._determine_sql_server_type(data_contract, sql_server_type)
return to_sql_ddl(data_contract, server_type=server_type)
if export_format == "sql-query":
if data_contract.models is None:
raise RuntimeError(f"Export to {export_format} requires models in the data contract.")

model_name, model_value = self._check_models_for_export(data_contract, model, export_format)
server_type = self._determine_sql_server_type(data_contract, sql_server_type)

model_names = list(data_contract.models.keys())

if model == "all":
if len(data_contract.models.items()) != 1:
raise RuntimeError(
f"Export to {export_format} is model specific. Specify the model via --model $MODEL_NAME. Available models: {model_names}"
)

model_name, model_value = next(iter(data_contract.models.items()))
return to_sql_query(data_contract, model_name, model_value, server_type)
else:
model_name = model
model_value = data_contract.models.get(model_name)
if model_value is None:
raise RuntimeError(
f"Model {model_name} not found in the data contract. Available models: {model_names}"
)

return to_sql_query(data_contract, model_name, model_value, server_type)

return to_sql_query(data_contract, model_name, model_value, server_type)
if export_format == "great-expectations":
if data_contract.models is None:
raise RuntimeError(f"Export to {export_format} requires models in the data contract.")

model_names = list(data_contract.models.keys())

if model == "all":
if len(data_contract.models.items()) != 1:
raise RuntimeError(
f"Export to {export_format} is model specific. Specify the model via --model "
f"$MODEL_NAME. Available models: {model_names}"
)

model_name, model_value = next(iter(data_contract.models.items()))
return to_great_expectations(data_contract, model_name)
else:
model_name = model
model_value = data_contract.models.get(model_name)
if model_value is None:
raise RuntimeError(
f"Model {model_name} not found in the data contract. " f"Available models: {model_names}"
)

return to_great_expectations(data_contract, model_name)
model_name, model_value = self._check_models_for_export(data_contract, model, export_format)
return to_great_expectations(data_contract, model_name)
if export_format == "pydantic-model":
return to_pydantic_model_str(data_contract)
if export_format == "html":
return to_html(data_contract)
if export_format == "bigquery":
model_name, model_value = self._check_models_for_export(data_contract, model, export_format)
found_server = data_contract.servers.get(self._server)
if found_server is None:
raise RuntimeError(f"Export to {export_format} requires selecting a bigquery server from the data contract.")
if found_server.type != 'bigquery':
raise RuntimeError(f"Export to {export_format} requires selecting a bigquery server from the data contract.")
return to_bigquery_json(model_name, model_value, found_server)
else:
print(f"Export format {export_format} not supported.")
return ""
Expand Down Expand Up @@ -484,6 +390,29 @@ def _get_examples_server(self, data_contract, run, tmp_dir):
)
run.log_info(f"Using {server} for testing the examples")
return server

def _check_models_for_export(self, data_contract: DataContractSpecification, model: str, export_format: str) -> typing.Tuple[str, str]:
if data_contract.models is None:
raise RuntimeError(f"Export to {export_format} requires models in the data contract.")

model_names = list(data_contract.models.keys())

if model == "all":
if len(data_contract.models.items()) != 1:
raise RuntimeError(
f"Export to {export_format} is model specific. Specify the model via --model $MODEL_NAME. Available models: {model_names}"
)

model_name, model_value = next(iter(data_contract.models.items()))
else:
model_name = model
model_value = data_contract.models.get(model_name)
if model_value is None:
raise RuntimeError(
f"Model {model_name} not found in the data contract. Available models: {model_names}"
)

return model_name, model_value

def import_from_source(self, format: str, source: typing.Optional[str] = None, bigquery_tables: typing.Optional[typing.List[str]] = None, bigquery_project: typing.Optional[str] = None, bigquery_dataset: typing.Optional[str] = None) -> DataContractSpecification:
data_contract_specification = DataContract.init()
Expand Down
109 changes: 109 additions & 0 deletions datacontract/export/bigquery_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import json
import logging
from typing import Dict, List

from datacontract.model.data_contract_specification import Model, Field, Server
from datacontract.model.exceptions import DataContractException

logging.basicConfig(level=logging.INFO, force=True)

def to_bigquery_json(model_name: str, model_value: Model, server: Server) -> str:
bigquery_table = to_bigquery_schema(model_name, model_value, server)
return json.dumps(bigquery_table, indent=2)

def to_bigquery_schema(model_name: str, model_value: Model, server: Server) -> dict:
return {
"kind": "bigquery#table",
"tableReference": {
"datasetId": server.dataset,
"projectId": server.project,
"tableId": model_name
},
"description": model_value.description,
"schema": {
"fields": to_fields_array(model_value.fields)
}
}

def to_fields_array(fields: Dict[str, Field]) -> List[Dict[str, Field]]:
bq_fields = []
for field_name, field in fields.items():
bq_fields.append(to_field(field_name, field))

return bq_fields


def to_field(field_name: str, field: Field) -> dict:

bq_type = map_type_to_bigquery(field.type, field_name)
bq_field = {
"name": field_name,
"type": bq_type,
"mode": "REQUIRED" if field.required else "NULLABLE",
"description": field.description
}

# handle arrays
if field.type == 'array':
bq_field["mode"] = 'REPEATED'
if field.items.type == 'object':
# in case the array type is a complex object, we want to copy all its fields
bq_field["fields"] = to_fields_array(field.items.fields)
else:
# otherwise we make up a structure that gets us a single field of the specified type
bq_field["fields"] = to_fields_array({ f"{field_name}_1": Field(type=field.items.type, required=False, description="")})
# all of these can carry other fields
elif bq_type.lower() in ["record", "struct"]:
bq_field["fields"] = to_fields_array(field.fields)

# strings can have a maxlength
if bq_type.lower() == "string":
bq_field["maxLength"] = field.maxLength

# number types have precision and scale
if bq_type.lower() in ["numeric", "bignumeric"]:
bq_field["precision"] = field.precision
bq_field["scale"] = field.scale

return bq_field

def map_type_to_bigquery(type_str: str, field_name: str) -> str:
logger = logging.getLogger(__name__)
if type_str.lower() in ["string", "varchar", "text"]:
return "STRING"
elif type_str == "bytes":
return "BYTES"
elif type_str.lower() in ["int", "integer"]:
return "INTEGER"
elif type_str.lower() in ["long", "bigint"]:
return "INT64"
elif type_str == "float":
return "FLOAT"
elif type_str == "boolean":
return "BOOLEAN"
elif type_str.lower() in ["timestamp", "timestamp_tz"]:
return "TIMESTAMP"
elif type_str == "date":
return "DATE"
elif type_str == "timestamp_ntz":
return "TIME"
elif type_str.lower() in ["number", "decimal", "numeric"]:
return "NUMERIC"
elif type_str == "double":
return "BIGNUMERIC"
elif type_str.lower() in ["object", "record", "array"]:
return "RECORD"
elif type_str == "struct":
return "STRUCT"
elif type_str == "null":
logger.info(f"Can't properly map {field_name} to bigquery Schema, as 'null' is not supported as a type. Mapping it to STRING.")
return "STRING"
else:
raise DataContractException(
type="schema",
result="failed",
name="Map datacontract type to bigquery data type",
reason=f"Unsupported type {type_str} in data contract definition.",
engine="datacontract",
)

Loading

0 comments on commit fb343fe

Please sign in to comment.