Skip to content

Commit

Permalink
Issue/110 import from bigquery (datacontract#194)
Browse files Browse the repository at this point in the history
* Extend Fields with necessary properties

precision and scale are possible and should be mappable

* Add an importer for Bigquery JSON

Add the necessary importer for the Bigquery JSON and wire it up in the CLI

* Add tests

* Extend the README

* Change all the bits of the README
  • Loading branch information
jpraetorius authored May 14, 2024
1 parent 443553a commit 88e4a6e
Show file tree
Hide file tree
Showing 8 changed files with 436 additions and 5 deletions.
15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ $ datacontract test --examples datacontract.yaml
# export data contract as html (other formats: avro, dbt, dbt-sources, dbt-staging-sql, jsonschema, odcs, rdf, sql, sodacl, terraform, ...)
$ datacontract export --format html datacontract.yaml > datacontract.html

# import avro (other formats: sql, ...)
# import avro (other formats: sql, glue, bigquery...)
$ datacontract import --format avro --source avro_schema.avsc

# find differences between to data contracts
Expand Down Expand Up @@ -652,9 +652,10 @@ data products, find the true domain owner of a field attribute)
Create a data contract from the given source location. Prints to stdout.
╭─ Options ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ * --format [sql|avro|glue] The format of the source file. [default: None] [required] │
│ * --source TEXT The path to the file or Glue Database that should be imported. [default: None] [required] │
│ --help Show this message and exit. │
│ * --format [sql|avro|glue|bigquery] The format of the source file. │
│ [default: None] [required] │
│ * --source TEXT The path to the file or Glue Database that should be imported. [default: None] [required] │
│ --help Show this message and exit. │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
```

Expand All @@ -663,6 +664,10 @@ Example:
# Example import from SQL DDL
datacontract import --format sql --source my_ddl.sql
```
```bash
# Example import from Bigquery JSON
datacontract import --format bigquery --source my_bigquery_table.json
```

Available import options:

Expand All @@ -673,7 +678,7 @@ Available import options:
| `glue` | Import from AWS Glue DataCatalog ||
| `protobuf` | Import from Protobuf schemas | TBD |
| `jsonschema` | Import from JSON Schemas | TBD |
| `bigquery` | Import from BigQuery Schemas | TBD |
| `bigquery` | Import from BigQuery Schemas | |
| `dbt` | Import from dbt models | TBD |
| `odcs` | Import from Open Data Contract Standard (ODCS) | TBD |
| Missing something? | Please create an issue on GitHub | TBD |
Expand Down
1 change: 1 addition & 0 deletions datacontract/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ class ImportFormat(str, Enum):
sql = "sql"
avro = "avro"
glue = "glue"
bigquery = "bigquery"


@app.command(name="import")
Expand Down
3 changes: 3 additions & 0 deletions datacontract/data_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from datacontract.export.sql_converter import to_sql_ddl, to_sql_query
from datacontract.export.terraform_converter import to_terraform
from datacontract.imports.avro_importer import import_avro
from datacontract.imports.bigquery_importer import import_bigquery
from datacontract.imports.glue_importer import import_glue
from datacontract.imports.sql_importer import import_sql
from datacontract.integration.publish_datamesh_manager import \
Expand Down Expand Up @@ -493,6 +494,8 @@ def import_from_source(self, format: str, source: str) -> DataContractSpecificat
data_contract_specification = import_avro(data_contract_specification, source)
elif format == "glue":
data_contract_specification = import_glue(data_contract_specification, source)
elif format == "bigquery":
data_contract_specification = import_bigquery(data_contract_specification, source)
else:
print(f"Import format {format} not supported.")

Expand Down
122 changes: 122 additions & 0 deletions datacontract/imports/bigquery_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import json

from datacontract.model.data_contract_specification import \
DataContractSpecification, Model, Field
from datacontract.model.exceptions import DataContractException


def import_bigquery(data_contract_specification: DataContractSpecification, source: str) -> DataContractSpecification:
if data_contract_specification.models is None:
data_contract_specification.models = {}

try:
with open(source, "r") as file:
bigquery_schema = json.loads(file.read())
except json.JSONDecodeError as e:
raise DataContractException(
type="schema",
name="Parse bigquery schema",
reason=f"Failed to parse bigquery schema from {source}",
engine="datacontract",
original_exception=e,
)

# pprint.pp(bigquery_schema)
fields = import_table_fields(bigquery_schema["schema"]["fields"])

# Looking at actual export data, I guess this is always set and friendlyName isn't, though I couldn't say
# what exactly leads to friendlyName being set
table_id = bigquery_schema["tableReference"]["tableId"]

data_contract_specification.models[table_id] = Model(
fields=fields,
type='table'
)

# Copy the description, if it exists
if bigquery_schema.get("description") is not None:
data_contract_specification.models[table_id].description = bigquery_schema["description"]

# Set the title from friendlyName if it exists
if bigquery_schema.get("friendlyName") is not None:
data_contract_specification.models[table_id].title = bigquery_schema["friendlyName"]

return data_contract_specification


def import_table_fields(table_fields):
imported_fields = {}
for field in table_fields:
field_name = field["name"]
imported_fields[field_name] = Field()
imported_fields[field_name].required = field["mode"] == "REQUIRED"
imported_fields[field_name].description = field["description"]

if field["type"] == "RECORD":
imported_fields[field_name].type = "object"
imported_fields[field_name].fields = import_table_fields(field["fields"])
elif field["type"] == "STRUCT":
imported_fields[field_name].type = "struct"
imported_fields[field_name].fields = import_table_fields(field["fields"])
elif field["type"] == "RANGE":
# This is a range of date/datetime/timestamp but multiple values
# So we map it to an array
imported_fields[field_name].type = "array"
imported_fields[field_name].items = Field(type = map_type_from_bigquery(field["rangeElementType"]["type"]))
else: # primitive type
imported_fields[field_name].type = map_type_from_bigquery(field["type"])

if field["type"] == "STRING":
# in bigquery both string and bytes have maxLength but in the datacontracts
# spec it is only valid for strings
if field.get("maxLength") is not None:
imported_fields[field_name].maxLength = int(field["maxLength"])

if field["type"] == "NUMERIC" or field["type"] == "BIGNUMERIC":
if field.get("precision") is not None:
imported_fields[field_name].precision = int(field["precision"])

if field.get("scale") is not None:
imported_fields[field_name].scale = int(field["scale"])

return imported_fields

def map_type_from_bigquery(bigquery_type_str: str):
if bigquery_type_str == "STRING":
return "string"
elif bigquery_type_str == "BYTES":
return "bytes"
elif bigquery_type_str == "INTEGER":
return "int"
elif bigquery_type_str == "INT64":
return "bigint"
elif bigquery_type_str == "FLOAT":
return "float"
elif bigquery_type_str == "FLOAT64":
return "double"
elif bigquery_type_str == "BOOLEAN" or bigquery_type_str == "BOOL":
return "boolean"
elif bigquery_type_str == "TIMESTAMP":
return "timestamp"
elif bigquery_type_str == "DATE":
return "date"
elif bigquery_type_str == "TIME":
return "timestamp_ntz"
elif bigquery_type_str == "DATETIME":
return "timestamp"
elif bigquery_type_str == "NUMERIC":
return "numeric"
elif bigquery_type_str == "BIGNUMERIC":
return "double"
elif bigquery_type_str == "GEOGRAPHY":
return "object"
elif bigquery_type_str == "JSON":
return "object"
else:
raise DataContractException(
type="schema",
result="failed",
name="Map bigquery type to data contract type",
reason=f"Unsupported type {bigquery_type_str} in bigquery json definition.",
engine="datacontract",
)
2 changes: 2 additions & 0 deletions datacontract/model/data_contract_specification.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class Field(pyd.BaseModel):
tags: List[str] = []
fields: Dict[str, "Field"] = {}
items: "Field" = None
precision: int = None
scale: int = None


class Model(pyd.BaseModel):
Expand Down
165 changes: 165 additions & 0 deletions tests/fixtures/bigquery/import/complete_table_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
{
"creationTime": "1715608399201",
"description": "This is a test table that contains all the possible field types for testing",
"etag": "vv0Ksh3XakMcCTFmhM0FOA==",
"expirationTime": "1720792399201",
"id": "bigquery-test-423213:test_dataset.BQ Example Table",
"kind": "bigquery#table",
"labels": {
"label_1": "value_1",
"label_2": "value_2",
"label_3": ""
},
"lastModifiedTime": "1715610311747",
"location": "europe-west3",
"numActiveLogicalBytes": "0",
"numBytes": "0",
"numLongTermBytes": "0",
"numLongTermLogicalBytes": "0",
"numRows": "0",
"numTotalLogicalBytes": "0",
"schema": {
"fields": [
{
"description": "A simple String field",
"mode": "NULLABLE",
"name": "String_field",
"type": "STRING"
},
{
"description": "A required String field",
"mode": "REQUIRED",
"name": "Nonnullable_String_field",
"type": "STRING"
},
{
"description": "A required String field with a maximum length",
"maxLength": "42",
"mode": "REQUIRED",
"name": "Maxlength_string_field",
"type": "STRING"
},
{
"description": "A_nullable_bytes_field",
"mode": "NULLABLE",
"name": "Bytes field",
"type": "BYTES"
},
{
"description": "An bytes field with maxlength (which doesn't translate into datacontracts)",
"maxLength": "42",
"mode": "NULLABLE",
"name": "Bytes_field_with_maxlength",
"type": "BYTES"
},
{
"description": "An Integer field",
"mode": "NULLABLE",
"name": "Int_field",
"type": "INTEGER"
},
{
"description": "A float field",
"mode": "NULLABLE",
"name": "Float_field",
"type": "FLOAT"
},
{
"description": "A boolean field",
"mode": "NULLABLE",
"name": "Boolean_field",
"type": "BOOLEAN"
},
{
"description": "A Timestamp field",
"mode": "NULLABLE",
"name": "Timestamp_field",
"type": "TIMESTAMP"
},
{
"description": "A Date field",
"mode": "NULLABLE",
"name": "Date_field",
"type": "DATE"
},
{
"description": "A time field",
"mode": "NULLABLE",
"name": "Time_Field",
"type": "TIME"
},
{
"description": "A Datetime field",
"mode": "NULLABLE",
"name": "Datetime_Field",
"type": "DATETIME"
},
{
"description": "A Numeric field with precision 5 and scale 3",
"mode": "NULLABLE",
"name": "Numeric_Field",
"precision": "5",
"roundingMode": "ROUND_HALF_EVEN",
"scale": "3",
"type": "NUMERIC"
},
{
"description": "A bignumeric field with precision 8 and sclae 4",
"mode": "NULLABLE",
"name": "Bignumeric_field",
"precision": "8",
"roundingMode": "ROUND_HALF_AWAY_FROM_ZERO",
"scale": "4",
"type": "BIGNUMERIC"
},
{
"description": "A record field with two subfields",
"fields": [
{
"description": "subfield 1 of type string",
"mode": "NULLABLE",
"name": "subfield_1",
"type": "STRING"
},
{
"description": "Subfield 2 of type integer",
"mode": "NULLABLE",
"name": "subfield_2",
"type": "INTEGER"
}
],
"mode": "NULLABLE",
"name": "Record_field",
"type": "RECORD"
},
{
"description": "a datetime range",
"mode": "NULLABLE",
"name": "Range_field",
"rangeElementType": {
"type": "DATETIME"
},
"type": "RANGE"
},
{
"description": "a geography field",
"mode": "NULLABLE",
"name": "Geography_Field",
"type": "GEOGRAPHY"
},
{
"description": "a json field",
"mode": "NULLABLE",
"name": "JSON_Field",
"type": "JSON"
}
]
},
"selfLink": "https://bigquery.googleapis.com/bigquery/v2/projects/bigquery-test-423213/datasets/test_dataset/tables/BQ Example Table",
"tableReference": {
"datasetId": "test_dataset",
"projectId": "bigquery-test-423213",
"tableId": "BQ_Example_Table"
},
"type": "TABLE"
}
Loading

0 comments on commit 88e4a6e

Please sign in to comment.