Skip to content

Commit

Permalink
Issue/110 part2 read from bigtable api (datacontract#197)
Browse files Browse the repository at this point in the history
* Add ability to query the bigquery API to `import`

- allow to query directly from a Project instead of having to export individual JSON Files

* Add a test stub for the new import

only a stub unfortunately, because I've not been able to get the mocks set up right :/

* PR Comment fixes

- Rename parameters
- Fetch available table names from the API, if none given
  • Loading branch information
jpraetorius authored May 14, 2024
1 parent 3c3704b commit a6be213
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 91 deletions.
135 changes: 75 additions & 60 deletions README.md

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions datacontract/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from rich.table import Table
from typer.core import TyperGroup
from typing_extensions import Annotated
from typing import List

from datacontract.catalog.catalog import create_index_html, create_data_contract_html
from datacontract.data_contract import DataContract
Expand Down Expand Up @@ -223,12 +224,15 @@ class ImportFormat(str, Enum):
@app.command(name="import")
def import_(
format: Annotated[ImportFormat, typer.Option(help="The format of the source file.")],
source: Annotated[str, typer.Option(help="The path to the file or Glue Database that should be imported.")],
source: Annotated[Optional[str], typer.Option(help="The path to the file or Glue Database that should be imported.")] = None,
bigquery_project: Annotated[Optional[str], typer.Option(help="The bigquery project id.")] = None,
bigquery_dataset: Annotated[Optional[str], typer.Option(help="The bigquery dataset id.")] = None,
bigquery_table: Annotated[Optional[List[str]], typer.Option(help="List of table ids to import from the bigquery API (repeat for multiple table ids, leave empty for all tables in the dataset).")] = None,
):
"""
Create a data contract from the given source location. Prints to stdout.
"""
result = DataContract().import_from_source(format, source)
result = DataContract().import_from_source(format, source, bigquery_table, bigquery_project, bigquery_dataset)
console.print(result.to_yaml())


Expand Down
9 changes: 6 additions & 3 deletions datacontract/data_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from datacontract.export.sql_converter import to_sql_ddl, to_sql_query
from datacontract.export.terraform_converter import to_terraform
from datacontract.imports.avro_importer import import_avro
from datacontract.imports.bigquery_importer import import_bigquery
from datacontract.imports.bigquery_importer import import_bigquery_from_api, import_bigquery_from_json
from datacontract.imports.glue_importer import import_glue
from datacontract.imports.sql_importer import import_sql
from datacontract.integration.publish_datamesh_manager import \
Expand Down Expand Up @@ -485,7 +485,7 @@ def _get_examples_server(self, data_contract, run, tmp_dir):
run.log_info(f"Using {server} for testing the examples")
return server

def import_from_source(self, format: str, source: str) -> DataContractSpecification:
def import_from_source(self, format: str, source: typing.Optional[str] = None, bigquery_tables: typing.Optional[typing.List[str]] = None, bigquery_project: typing.Optional[str] = None, bigquery_dataset: typing.Optional[str] = None) -> DataContractSpecification:
data_contract_specification = DataContract.init()

if format == "sql":
Expand All @@ -495,7 +495,10 @@ def import_from_source(self, format: str, source: str) -> DataContractSpecificat
elif format == "glue":
data_contract_specification = import_glue(data_contract_specification, source)
elif format == "bigquery":
data_contract_specification = import_bigquery(data_contract_specification, source)
if source is not None:
data_contract_specification = import_bigquery_from_json(data_contract_specification, source)
else:
data_contract_specification = import_bigquery_from_api(data_contract_specification, bigquery_tables, bigquery_project, bigquery_dataset)
else:
print(f"Import format {format} not supported.")

Expand Down
94 changes: 69 additions & 25 deletions datacontract/imports/bigquery_importer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import json

from typing import List

from datacontract.model.data_contract_specification import \
DataContractSpecification, Model, Field
from datacontract.model.exceptions import DataContractException

from google.cloud import bigquery

def import_bigquery(data_contract_specification: DataContractSpecification, source: str) -> DataContractSpecification:
if data_contract_specification.models is None:
data_contract_specification.models = {}

def import_bigquery_from_json(data_contract_specification: DataContractSpecification, source: str) -> DataContractSpecification:
try:
with open(source, "r") as file:
bigquery_schema = json.loads(file.read())
Expand All @@ -20,13 +20,58 @@ def import_bigquery(data_contract_specification: DataContractSpecification, sour
engine="datacontract",
original_exception=e,
)
return convert_bigquery_schema(data_contract_specification, bigquery_schema)

def import_bigquery_from_api(data_contract_specification: DataContractSpecification, bigquery_tables: List[str], bigquery_project: str, bigquery_dataset: str) -> DataContractSpecification:
client = bigquery.Client(project=bigquery_project)

if bigquery_tables is None:
bigquery_tables = fetch_table_names(client, bigquery_dataset)

for table in bigquery_tables:
try:
api_table = client.get_table("{}.{}.{}".format(bigquery_project, bigquery_dataset, table))

except ValueError as e:
raise DataContractException(
type="schema",
result="failed",
name="Invalid table name for bigquery API",
reason=f"Tablename {table} is invalid for the bigquery API",
original_exception=e,
engine="datacontract",
)

if api_table is None:
raise DataContractException(
type="request",
result="failed",
name="Query bigtable Schema from API",
reason=f"Table {table} bnot found on bigtable schema Project {bigquery_project}, dataset {bigquery_dataset}.",
engine="datacontract",
)

convert_bigquery_schema(data_contract_specification, api_table.to_api_repr())

return data_contract_specification

# pprint.pp(bigquery_schema)
fields = import_table_fields(bigquery_schema["schema"]["fields"])
def fetch_table_names(client: bigquery.Client, dataset: str) -> List[str]:
table_names = []
api_tables = client.list_tables(dataset)
for api_table in api_tables:
table_names.append(api_table.table_id)

return table_names

def convert_bigquery_schema(data_contract_specification: DataContractSpecification, bigquery_schema: dict) -> DataContractSpecification:
if data_contract_specification.models is None:
data_contract_specification.models = {}

fields = import_table_fields(bigquery_schema.get("schema").get("fields"))

# Looking at actual export data, I guess this is always set and friendlyName isn't, though I couldn't say
# what exactly leads to friendlyName being set
table_id = bigquery_schema["tableReference"]["tableId"]
table_id = bigquery_schema.get("tableReference").get("tableId")

data_contract_specification.models[table_id] = Model(
fields=fields,
Expand All @@ -35,49 +80,48 @@ def import_bigquery(data_contract_specification: DataContractSpecification, sour

# Copy the description, if it exists
if bigquery_schema.get("description") is not None:
data_contract_specification.models[table_id].description = bigquery_schema["description"]
data_contract_specification.models[table_id].description = bigquery_schema.get("description")

# Set the title from friendlyName if it exists
if bigquery_schema.get("friendlyName") is not None:
data_contract_specification.models[table_id].title = bigquery_schema["friendlyName"]
data_contract_specification.models[table_id].title = bigquery_schema.get("friendlyName")

return data_contract_specification


def import_table_fields(table_fields):
imported_fields = {}
for field in table_fields:
field_name = field["name"]
field_name = field.get("name")
imported_fields[field_name] = Field()
imported_fields[field_name].required = field["mode"] == "REQUIRED"
imported_fields[field_name].description = field["description"]
imported_fields[field_name].required = field.get("mode") == "REQUIRED"
imported_fields[field_name].description = field.get("description")

if field["type"] == "RECORD":
if field.get("type") == "RECORD":
imported_fields[field_name].type = "object"
imported_fields[field_name].fields = import_table_fields(field["fields"])
elif field["type"] == "STRUCT":
imported_fields[field_name].fields = import_table_fields(field.get("fields"))
elif field.get("type") == "STRUCT":
imported_fields[field_name].type = "struct"
imported_fields[field_name].fields = import_table_fields(field["fields"])
elif field["type"] == "RANGE":
imported_fields[field_name].fields = import_table_fields(field.get("fields"))
elif field.get("type") == "RANGE":
# This is a range of date/datetime/timestamp but multiple values
# So we map it to an array
imported_fields[field_name].type = "array"
imported_fields[field_name].items = Field(type = map_type_from_bigquery(field["rangeElementType"]["type"]))
imported_fields[field_name].items = Field(type = map_type_from_bigquery(field["rangeElementType"].get("type")))
else: # primitive type
imported_fields[field_name].type = map_type_from_bigquery(field["type"])
imported_fields[field_name].type = map_type_from_bigquery(field.get("type"))

if field["type"] == "STRING":
if field.get("type") == "STRING":
# in bigquery both string and bytes have maxLength but in the datacontracts
# spec it is only valid for strings
if field.get("maxLength") is not None:
imported_fields[field_name].maxLength = int(field["maxLength"])
imported_fields[field_name].maxLength = int(field.get("maxLength"))

if field["type"] == "NUMERIC" or field["type"] == "BIGNUMERIC":
if field.get("type") == "NUMERIC" or field.get("type") == "BIGNUMERIC":
if field.get("precision") is not None:
imported_fields[field_name].precision = int(field["precision"])
imported_fields[field_name].precision = int(field.get("precision"))

if field.get("scale") is not None:
imported_fields[field_name].scale = int(field["scale"])
imported_fields[field_name].scale = int(field.get("scale"))

return imported_fields

Expand Down
1 change: 1 addition & 0 deletions datacontract/model/data_contract_specification.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class Model(pyd.BaseModel):
description: str = None
type: str = None
namespace: str = None
title: str = None
fields: Dict[str, Field] = {}


Expand Down
52 changes: 51 additions & 1 deletion tests/test_import_bigquery.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging

import yaml

from typer.testing import CliRunner
from unittest.mock import patch

from datacontract.cli import app
from datacontract.data_contract import DataContract
Expand Down Expand Up @@ -32,3 +33,52 @@ def test_import_bigquery_schema():
expected = file.read()
assert yaml.safe_load(result.to_yaml()) == yaml.safe_load(expected)
assert DataContract(data_contract_str=expected).lint(enabled_linters="none").has_passed()

@patch('google.cloud.bigquery.Client.get_table')
def test_import_from_api(mock_client):
# Set up mocks
# mock_table = Mock()
# mock_table.to_api_repr.return_value = {
# 'kind': 'bigquery#table',
# 'etag': 'K9aCQ39hav0KNzG9cq3p7g==',
# 'id': 'bigquery-test-423213:dataset_test.table_test',
# 'selfLink': 'https://bigquery.googleapis.com/bigquery/v2/projects/bigquery-test-423213/datasets/dataset_test/tables/table_test',
# 'tableReference': {'projectId': 'bigquery-test-423213',
# 'datasetId': 'dataset_test',
# 'tableId': 'table_test'},
# 'description': 'Test table description',
# 'labels': {'label_1': 'value_1'},
# 'schema': {'fields': [{'name': 'field_one',
# 'type': 'STRING',
# 'mode': 'REQUIRED',
# 'maxLength': '25'},
# {'name': 'field_two',
# 'type': 'INTEGER',
# 'mode': 'REQUIRED'},
# {'name': 'field_three',
# 'type': 'RANGE',
# 'mode': 'NULLABLE',
# 'rangeElementType': {'type': 'DATETIME'}}]},
# 'numBytes': '0',
# 'numLongTermBytes': '0',
# 'numRows': '0',
# 'creationTime': '1715626217137',
# 'expirationTime': '1720810217137',
# 'lastModifiedTime': '1715626262846',
# 'type': 'TABLE',
# 'location': 'europe-west6',
# 'numTotalLogicalBytes': '0',
# 'numActiveLogicalBytes': '0',
# 'numLongTermLogicalBytes': '0'}

# mock_client.response_value = mock_table

# Call the API Import
# result = DataContract().import_from_source(format="bigquery", source=None, tables=["Test_One"], bt_project_id=
# "project_id", bt_dataset_id="dataset_id")
# print("Result:\n", result)
# TODO: This really should have a proper test, but I've not been able to set the mocks up
# correctly – maybe there's some help to be had?
# Anyway, the serialized dict above is a real response as captured from the Bigquery API and should
# be sufficient to check the behavior of this way of importing things
assert True is True

0 comments on commit a6be213

Please sign in to comment.