From 88e4a6ef1502cd2e2d54dc1c5ca6e47ac63daca4 Mon Sep 17 00:00:00 2001 From: Joachim Praetorius Date: Tue, 14 May 2024 17:01:56 +0200 Subject: [PATCH] Issue/110 import from bigquery (#194) * Extend Fields with necessary properties precision and scale are possible and should be mappable * Add an importer for Bigquery JSON Add the necessary importer for the Bigquery JSON and wire it up in the CLI * Add tests * Extend the README * Change all the bits of the README --- README.md | 15 +- datacontract/cli.py | 1 + datacontract/data_contract.py | 3 + datacontract/imports/bigquery_importer.py | 122 +++++++++++++ .../model/data_contract_specification.py | 2 + .../import/complete_table_schema.json | 165 ++++++++++++++++++ .../bigquery/import/datacontract.yaml | 99 +++++++++++ tests/test_import_bigquery.py | 34 ++++ 8 files changed, 436 insertions(+), 5 deletions(-) create mode 100644 datacontract/imports/bigquery_importer.py create mode 100644 tests/fixtures/bigquery/import/complete_table_schema.json create mode 100644 tests/fixtures/bigquery/import/datacontract.yaml create mode 100644 tests/test_import_bigquery.py diff --git a/README.md b/README.md index 1271d61e..608d31e7 100644 --- a/README.md +++ b/README.md @@ -115,7 +115,7 @@ $ datacontract test --examples datacontract.yaml # export data contract as html (other formats: avro, dbt, dbt-sources, dbt-staging-sql, jsonschema, odcs, rdf, sql, sodacl, terraform, ...) $ datacontract export --format html datacontract.yaml > datacontract.html -# import avro (other formats: sql, ...) +# import avro (other formats: sql, glue, bigquery...) $ datacontract import --format avro --source avro_schema.avsc # find differences between to data contracts @@ -652,9 +652,10 @@ data products, find the true domain owner of a field attribute) Create a data contract from the given source location. Prints to stdout. ╭─ Options ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ -│ * --format [sql|avro|glue] The format of the source file. [default: None] [required] │ -│ * --source TEXT The path to the file or Glue Database that should be imported. [default: None] [required] │ -│ --help Show this message and exit. │ +│ * --format [sql|avro|glue|bigquery] The format of the source file. │ +│ [default: None] [required] │ +│ * --source TEXT The path to the file or Glue Database that should be imported. [default: None] [required] │ +│ --help Show this message and exit. │ ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ ``` @@ -663,6 +664,10 @@ Example: # Example import from SQL DDL datacontract import --format sql --source my_ddl.sql ``` +```bash +# Example import from Bigquery JSON +datacontract import --format bigquery --source my_bigquery_table.json +``` Available import options: @@ -673,7 +678,7 @@ Available import options: | `glue` | Import from AWS Glue DataCatalog | ✅ | | `protobuf` | Import from Protobuf schemas | TBD | | `jsonschema` | Import from JSON Schemas | TBD | -| `bigquery` | Import from BigQuery Schemas | TBD | +| `bigquery` | Import from BigQuery Schemas | ✅ | | `dbt` | Import from dbt models | TBD | | `odcs` | Import from Open Data Contract Standard (ODCS) | TBD | | Missing something? | Please create an issue on GitHub | TBD | diff --git a/datacontract/cli.py b/datacontract/cli.py index 36eebd97..0228dbb2 100644 --- a/datacontract/cli.py +++ b/datacontract/cli.py @@ -213,6 +213,7 @@ class ImportFormat(str, Enum): sql = "sql" avro = "avro" glue = "glue" + bigquery = "bigquery" @app.command(name="import") diff --git a/datacontract/data_contract.py b/datacontract/data_contract.py index 46493274..e3012e26 100644 --- a/datacontract/data_contract.py +++ b/datacontract/data_contract.py @@ -30,6 +30,7 @@ from datacontract.export.sql_converter import to_sql_ddl, to_sql_query from datacontract.export.terraform_converter import to_terraform from datacontract.imports.avro_importer import import_avro +from datacontract.imports.bigquery_importer import import_bigquery from datacontract.imports.glue_importer import import_glue from datacontract.imports.sql_importer import import_sql from datacontract.integration.publish_datamesh_manager import \ @@ -493,6 +494,8 @@ def import_from_source(self, format: str, source: str) -> DataContractSpecificat data_contract_specification = import_avro(data_contract_specification, source) elif format == "glue": data_contract_specification = import_glue(data_contract_specification, source) + elif format == "bigquery": + data_contract_specification = import_bigquery(data_contract_specification, source) else: print(f"Import format {format} not supported.") diff --git a/datacontract/imports/bigquery_importer.py b/datacontract/imports/bigquery_importer.py new file mode 100644 index 00000000..536a3d1b --- /dev/null +++ b/datacontract/imports/bigquery_importer.py @@ -0,0 +1,122 @@ +import json + +from datacontract.model.data_contract_specification import \ + DataContractSpecification, Model, Field +from datacontract.model.exceptions import DataContractException + + +def import_bigquery(data_contract_specification: DataContractSpecification, source: str) -> DataContractSpecification: + if data_contract_specification.models is None: + data_contract_specification.models = {} + + try: + with open(source, "r") as file: + bigquery_schema = json.loads(file.read()) + except json.JSONDecodeError as e: + raise DataContractException( + type="schema", + name="Parse bigquery schema", + reason=f"Failed to parse bigquery schema from {source}", + engine="datacontract", + original_exception=e, + ) + + # pprint.pp(bigquery_schema) + fields = import_table_fields(bigquery_schema["schema"]["fields"]) + + # Looking at actual export data, I guess this is always set and friendlyName isn't, though I couldn't say + # what exactly leads to friendlyName being set + table_id = bigquery_schema["tableReference"]["tableId"] + + data_contract_specification.models[table_id] = Model( + fields=fields, + type='table' + ) + + # Copy the description, if it exists + if bigquery_schema.get("description") is not None: + data_contract_specification.models[table_id].description = bigquery_schema["description"] + + # Set the title from friendlyName if it exists + if bigquery_schema.get("friendlyName") is not None: + data_contract_specification.models[table_id].title = bigquery_schema["friendlyName"] + + return data_contract_specification + + +def import_table_fields(table_fields): + imported_fields = {} + for field in table_fields: + field_name = field["name"] + imported_fields[field_name] = Field() + imported_fields[field_name].required = field["mode"] == "REQUIRED" + imported_fields[field_name].description = field["description"] + + if field["type"] == "RECORD": + imported_fields[field_name].type = "object" + imported_fields[field_name].fields = import_table_fields(field["fields"]) + elif field["type"] == "STRUCT": + imported_fields[field_name].type = "struct" + imported_fields[field_name].fields = import_table_fields(field["fields"]) + elif field["type"] == "RANGE": + # This is a range of date/datetime/timestamp but multiple values + # So we map it to an array + imported_fields[field_name].type = "array" + imported_fields[field_name].items = Field(type = map_type_from_bigquery(field["rangeElementType"]["type"])) + else: # primitive type + imported_fields[field_name].type = map_type_from_bigquery(field["type"]) + + if field["type"] == "STRING": + # in bigquery both string and bytes have maxLength but in the datacontracts + # spec it is only valid for strings + if field.get("maxLength") is not None: + imported_fields[field_name].maxLength = int(field["maxLength"]) + + if field["type"] == "NUMERIC" or field["type"] == "BIGNUMERIC": + if field.get("precision") is not None: + imported_fields[field_name].precision = int(field["precision"]) + + if field.get("scale") is not None: + imported_fields[field_name].scale = int(field["scale"]) + + return imported_fields + +def map_type_from_bigquery(bigquery_type_str: str): + if bigquery_type_str == "STRING": + return "string" + elif bigquery_type_str == "BYTES": + return "bytes" + elif bigquery_type_str == "INTEGER": + return "int" + elif bigquery_type_str == "INT64": + return "bigint" + elif bigquery_type_str == "FLOAT": + return "float" + elif bigquery_type_str == "FLOAT64": + return "double" + elif bigquery_type_str == "BOOLEAN" or bigquery_type_str == "BOOL": + return "boolean" + elif bigquery_type_str == "TIMESTAMP": + return "timestamp" + elif bigquery_type_str == "DATE": + return "date" + elif bigquery_type_str == "TIME": + return "timestamp_ntz" + elif bigquery_type_str == "DATETIME": + return "timestamp" + elif bigquery_type_str == "NUMERIC": + return "numeric" + elif bigquery_type_str == "BIGNUMERIC": + return "double" + elif bigquery_type_str == "GEOGRAPHY": + return "object" + elif bigquery_type_str == "JSON": + return "object" + else: + raise DataContractException( + type="schema", + result="failed", + name="Map bigquery type to data contract type", + reason=f"Unsupported type {bigquery_type_str} in bigquery json definition.", + engine="datacontract", + ) diff --git a/datacontract/model/data_contract_specification.py b/datacontract/model/data_contract_specification.py index f725bbdf..2f52e33e 100644 --- a/datacontract/model/data_contract_specification.py +++ b/datacontract/model/data_contract_specification.py @@ -84,6 +84,8 @@ class Field(pyd.BaseModel): tags: List[str] = [] fields: Dict[str, "Field"] = {} items: "Field" = None + precision: int = None + scale: int = None class Model(pyd.BaseModel): diff --git a/tests/fixtures/bigquery/import/complete_table_schema.json b/tests/fixtures/bigquery/import/complete_table_schema.json new file mode 100644 index 00000000..669f176a --- /dev/null +++ b/tests/fixtures/bigquery/import/complete_table_schema.json @@ -0,0 +1,165 @@ +{ + "creationTime": "1715608399201", + "description": "This is a test table that contains all the possible field types for testing", + "etag": "vv0Ksh3XakMcCTFmhM0FOA==", + "expirationTime": "1720792399201", + "id": "bigquery-test-423213:test_dataset.BQ Example Table", + "kind": "bigquery#table", + "labels": { + "label_1": "value_1", + "label_2": "value_2", + "label_3": "" + }, + "lastModifiedTime": "1715610311747", + "location": "europe-west3", + "numActiveLogicalBytes": "0", + "numBytes": "0", + "numLongTermBytes": "0", + "numLongTermLogicalBytes": "0", + "numRows": "0", + "numTotalLogicalBytes": "0", + "schema": { + "fields": [ + { + "description": "A simple String field", + "mode": "NULLABLE", + "name": "String_field", + "type": "STRING" + }, + { + "description": "A required String field", + "mode": "REQUIRED", + "name": "Nonnullable_String_field", + "type": "STRING" + }, + { + "description": "A required String field with a maximum length", + "maxLength": "42", + "mode": "REQUIRED", + "name": "Maxlength_string_field", + "type": "STRING" + }, + { + "description": "A_nullable_bytes_field", + "mode": "NULLABLE", + "name": "Bytes field", + "type": "BYTES" + }, + { + "description": "An bytes field with maxlength (which doesn't translate into datacontracts)", + "maxLength": "42", + "mode": "NULLABLE", + "name": "Bytes_field_with_maxlength", + "type": "BYTES" + }, + { + "description": "An Integer field", + "mode": "NULLABLE", + "name": "Int_field", + "type": "INTEGER" + }, + { + "description": "A float field", + "mode": "NULLABLE", + "name": "Float_field", + "type": "FLOAT" + }, + { + "description": "A boolean field", + "mode": "NULLABLE", + "name": "Boolean_field", + "type": "BOOLEAN" + }, + { + "description": "A Timestamp field", + "mode": "NULLABLE", + "name": "Timestamp_field", + "type": "TIMESTAMP" + }, + { + "description": "A Date field", + "mode": "NULLABLE", + "name": "Date_field", + "type": "DATE" + }, + { + "description": "A time field", + "mode": "NULLABLE", + "name": "Time_Field", + "type": "TIME" + }, + { + "description": "A Datetime field", + "mode": "NULLABLE", + "name": "Datetime_Field", + "type": "DATETIME" + }, + { + "description": "A Numeric field with precision 5 and scale 3", + "mode": "NULLABLE", + "name": "Numeric_Field", + "precision": "5", + "roundingMode": "ROUND_HALF_EVEN", + "scale": "3", + "type": "NUMERIC" + }, + { + "description": "A bignumeric field with precision 8 and sclae 4", + "mode": "NULLABLE", + "name": "Bignumeric_field", + "precision": "8", + "roundingMode": "ROUND_HALF_AWAY_FROM_ZERO", + "scale": "4", + "type": "BIGNUMERIC" + }, + { + "description": "A record field with two subfields", + "fields": [ + { + "description": "subfield 1 of type string", + "mode": "NULLABLE", + "name": "subfield_1", + "type": "STRING" + }, + { + "description": "Subfield 2 of type integer", + "mode": "NULLABLE", + "name": "subfield_2", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "Record_field", + "type": "RECORD" + }, + { + "description": "a datetime range", + "mode": "NULLABLE", + "name": "Range_field", + "rangeElementType": { + "type": "DATETIME" + }, + "type": "RANGE" + }, + { + "description": "a geography field", + "mode": "NULLABLE", + "name": "Geography_Field", + "type": "GEOGRAPHY" + }, + { + "description": "a json field", + "mode": "NULLABLE", + "name": "JSON_Field", + "type": "JSON" + } + ] + }, + "selfLink": "https://bigquery.googleapis.com/bigquery/v2/projects/bigquery-test-423213/datasets/test_dataset/tables/BQ Example Table", + "tableReference": { + "datasetId": "test_dataset", + "projectId": "bigquery-test-423213", + "tableId": "BQ_Example_Table" + }, + "type": "TABLE" +} \ No newline at end of file diff --git a/tests/fixtures/bigquery/import/datacontract.yaml b/tests/fixtures/bigquery/import/datacontract.yaml new file mode 100644 index 00000000..a3b638ae --- /dev/null +++ b/tests/fixtures/bigquery/import/datacontract.yaml @@ -0,0 +1,99 @@ +dataContractSpecification: 0.9.3 +id: my-data-contract-id +info: + title: My Data Contract + version: 0.0.1 +models: + BQ_Example_Table: + description: This is a test table that contains all the possible field types for + testing + type: table + fields: + String_field: + type: string + required: false + description: A simple String field + Nonnullable_String_field: + type: string + required: true + description: A required String field + Maxlength_string_field: + type: string + required: true + description: A required String field with a maximum length + maxLength: 42 + Bytes field: + type: bytes + required: false + description: A_nullable_bytes_field + Bytes_field_with_maxlength: + type: bytes + required: false + description: An bytes field with maxlength (which doesn't translate into datacontracts) + Int_field: + type: int + required: false + description: An Integer field + Float_field: + type: float + required: false + description: A float field + Boolean_field: + type: boolean + required: false + description: A boolean field + Timestamp_field: + type: timestamp + required: false + description: A Timestamp field + Date_field: + type: date + required: false + description: A Date field + Time_Field: + type: timestamp_ntz + required: false + description: A time field + Datetime_Field: + type: timestamp + required: false + description: A Datetime field + Numeric_Field: + type: numeric + required: false + description: A Numeric field with precision 5 and scale 3 + precision: 5 + scale: 3 + Bignumeric_field: + type: double + required: false + description: A bignumeric field with precision 8 and sclae 4 + precision: 8 + scale: 4 + Record_field: + type: object + required: false + description: A record field with two subfields + fields: + subfield_1: + type: string + required: false + description: subfield 1 of type string + subfield_2: + type: int + required: false + description: Subfield 2 of type integer + Range_field: + type: array + required: false + description: a datetime range + items: + type: timestamp + Geography_Field: + type: object + required: false + description: a geography field + JSON_Field: + type: object + required: false + description: a json field \ No newline at end of file diff --git a/tests/test_import_bigquery.py b/tests/test_import_bigquery.py new file mode 100644 index 00000000..251ceec6 --- /dev/null +++ b/tests/test_import_bigquery.py @@ -0,0 +1,34 @@ +import logging + +import yaml +from typer.testing import CliRunner + +from datacontract.cli import app +from datacontract.data_contract import DataContract + +logging.basicConfig(level=logging.DEBUG, force=True) + + +def test_cli(): + runner = CliRunner() + result = runner.invoke( + app, + [ + "import", + "--format", + "bigquery", + "--source", + "fixtures/bigquery/import/complete_table_schema.json", + ], + ) + assert result.exit_code == 0 + + +def test_import_bigquery_schema(): + result = DataContract().import_from_source("bigquery", "fixtures/bigquery/import/complete_table_schema.json") + + print("Result:\n", result.to_yaml()) + with open("fixtures/bigquery/import/datacontract.yaml") as file: + expected = file.read() + assert yaml.safe_load(result.to_yaml()) == yaml.safe_load(expected) + assert DataContract(data_contract_str=expected).lint(enabled_linters="none").has_passed()