Skip to content

Commit

Permalink
Add sql and sql-query as export formats.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonharrer committed Mar 14, 2024
1 parent 7a0d631 commit 9276c2c
Show file tree
Hide file tree
Showing 13 changed files with 406 additions and 44 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Fixed a bug where the export to YAML always escaped the unicode characters.
- Added export format **protobuf**: `datacontract export --format protobuf`
- Added export format **terraform**: `datacontract export --format terraform`
- Added export format **terraform**: `datacontract export --format terraform` (limitation: only works for AWS S3 right now)
- Added export format **sql**: `datacontract export --format sql`
- Added export format **sql-query**: `datacontract export --format sql-query`
- Added extensive linting on data contracts. `datacontract lint` will now check for a variety of possible errors in the data contract, such as missing descriptions, incorrect references to models or fields, nonsensical constraints, and more.
- Added changelog command: `datacontract changelog` will now generate a changelog based on the changes in the data contract. This will be useful for keeping track of changes in the data contract over time.
- Added Avro IDL export: Generates an Avro IDL file containing records for each model.
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,9 @@ Available export options:
| `avro` | Export to AVRO models | ✅ |
| `protobuf` | Export to Protobuf | ✅ |
| `terraform` | Export to terraform resources | ✅ |
| `sql` | Export to SQL DDL | ✅ |
| `sql-query` | Export to SQL Query | ✅ |
| `pydantic` | Export to pydantic models | TBD |
| `sql` | Export to SQL DDL | TBD |
| Missing something? | Please create an issue on GitHub | TBD |

#### RDF
Expand Down
10 changes: 9 additions & 1 deletion datacontract/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class ExportFormat(str, Enum):
protobuf = "protobuf"
terraform = "terraform"
avro_idl = "avro-idl"
sql = "sql"
sql_query = "sql-query"


@app.command()
Expand All @@ -138,14 +140,20 @@ def export(
"to refer to a model, e.g., `orders`, or `all` for all "
"models (default).")] = "all",
rdf_base: Annotated[Optional[str], typer.Option(help="[rdf] The base URI used to generate the RDF graph.", rich_help_panel="RDF Options")] = None,
sql_server_type: Annotated[Optional[str], typer.Option(help="[sql] The server type to determine the sql dialect. By default, it uses 'auto' to automatically detect the sql dialect via the specified servers in the data contract.", rich_help_panel="SQL Options")] = "auto",
location: Annotated[
str, typer.Argument(help="The location (url or path) of the data contract yaml.")] = "datacontract.yaml",
):
"""
Convert data contract to a specific format. Prints to stdout.
"""
# TODO exception handling
result = DataContract(data_contract_file=location, server=server).export(export_format=format, model=model,rdf_base=rdf_base)
result = DataContract(data_contract_file=location, server=server).export(
export_format=format,
model=model,
rdf_base=rdf_base,
sql_server_type=sql_server_type,
)
print(result)


Expand Down
46 changes: 44 additions & 2 deletions datacontract/data_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from datacontract.export.protobuf_converter import to_protobuf
from datacontract.export.rdf_converter import to_rdf_n3
from datacontract.export.sodacl_converter import to_sodacl_yaml
from datacontract.export.sql_converter import to_sql_ddl, to_sql_query
from datacontract.export.terraform_converter import to_terraform
from datacontract.imports.sql_importer import import_sql
from datacontract.integration.publish_datamesh_manager import \
Expand All @@ -42,6 +43,23 @@
Run, Check


def _determine_sql_server_type(data_contract, sql_server_type):
if sql_server_type == "auto":
if data_contract.servers is None or len(data_contract.servers) == 0:
raise RuntimeError(f"Export with server_type='auto' requires servers in the data contract.")

server_types = set([server.type for server in data_contract.servers.values()])
if "snowflake" in server_types:
return "snowflake"
elif "postgres" in server_types:
return "postgres"
else:
# default to snowflake dialect
return "snowflake"
else:
return sql_server_type


class DataContract:
def __init__(
self,
Expand Down Expand Up @@ -259,9 +277,9 @@ def get_data_contract_specification(self) -> DataContractSpecification:
inline_definitions=self._inline_definitions,
)

def export(self, export_format, model: str = "all", rdf_base: str = None) -> str:
def export(self, export_format, model: str = "all", rdf_base: str = None, sql_server_type: str = "auto") -> str:
data_contract = resolve.resolve_data_contract(self._data_contract_file, self._data_contract_str,
self._data_contract)
self._data_contract, inline_definitions=True)
if export_format == "jsonschema":
if data_contract.models is None:
raise RuntimeError( f"Export to {export_format} requires models in the data contract.")
Expand Down Expand Up @@ -335,6 +353,30 @@ def export(self, export_format, model: str = "all", rdf_base: str = None) -> str
return to_avro_idl(data_contract)
if export_format == "terraform":
return to_terraform(data_contract)
if export_format == "sql":
server_type = _determine_sql_server_type(data_contract, sql_server_type)
return to_sql_ddl(data_contract, server_type=server_type)
if export_format == "sql-query":
if data_contract.models is None:
raise RuntimeError(f"Export to {export_format} requires models in the data contract.")

server_type = _determine_sql_server_type(data_contract, sql_server_type)

model_names = list(data_contract.models.keys())

if model == "all":
if len(data_contract.models.items()) != 1:
raise RuntimeError(f"Export to {export_format} is model specific. Specify the model via --model $MODEL_NAME. Available models: {model_names}")

model_name, model_value = next(iter(data_contract.models.items()))
return to_sql_query(data_contract, model_name, model_value, server_type)
else:
model_name = model
model_value = data_contract.models.get(model_name)
if model_value is None:
raise RuntimeError(f"Model {model_name} not found in the data contract. Available models: {model_names}")

return to_sql_query(data_contract, model_name, model_value, server_type)
else:
print(f"Export format {export_format} not supported.")
return ""
Expand Down
41 changes: 3 additions & 38 deletions datacontract/export/dbt_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import yaml

from datacontract.export.sql_type_converter import convert_to_sql_type
from datacontract.model.data_contract_specification import \
DataContractSpecification, Model, Field


# snowflake data types:
# https://docs.snowflake.com/en/sql-reference/data-types.html



def to_dbt_models_yaml(data_contract_spec: DataContractSpecification):
Expand Down Expand Up @@ -132,7 +132,7 @@ def _to_columns(fields: Dict[str, Field], supports_constraints: bool, supports_d

def _to_column(field: Field, supports_constraints: bool, supports_datatype: bool) -> dict:
column = {}
dbt_type = _convert_type_to_snowflake(field.type)
dbt_type = convert_to_sql_type(field, "snowflake")
if dbt_type is not None:
if supports_datatype:
column["data_type"] = dbt_type
Expand Down Expand Up @@ -204,38 +204,3 @@ def _to_column(field: Field, supports_constraints: bool, supports_datatype: bool
# TODO: all constraints
return column


def _convert_type_to_snowflake(type) -> None | str:
# currently optimized for snowflake
# LEARNING: data contract has no direct support for CHAR,CHARACTER
# LEARNING: data contract has no support for "date-time", "datetime", "time"
# LEARNING: No precision and scale support in data contract
# LEARNING: no support for any
# GEOGRAPHY and GEOMETRY are not supported by the mapping
if type is None:
return None
if type.lower() in ["string", "varchar", "text"]:
return type.upper() # STRING, TEXT, VARCHAR are all the same in snowflake
if type.lower() in ["timestamp", "timestamp_tz"]:
return "TIMESTAMP_TZ"
if type.lower() in ["timestamp_ntz"]:
return "TIMESTAMP_NTZ"
if type.lower() in ["date"]:
return "DATE"
if type.lower() in ["time"]:
return "TIME"
if type.lower() in ["number", "decimal", "numeric"]:
return "NUMBER" # precision and scale not supported by data contract
if type.lower() in ["float", "double"]:
return "FLOAT"
if type.lower() in ["integer", "int", "long", "bigint"]:
return "NUMBER" # always NUMBER(38,0)
if type.lower() in ["boolean"]:
return "BOOLEAN"
if type.lower() in ["object", "record", "struct"]:
return "OBJECT"
if type.lower() in ["bytes"]:
return "BINARY"
if type.lower() in ["array"]:
return "ARRAY"
return None
82 changes: 82 additions & 0 deletions datacontract/export/sql_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from datacontract.export.sql_type_converter import convert_to_sql_type
from datacontract.model.data_contract_specification import \
DataContractSpecification, Model


def to_sql_query(data_contract_spec: DataContractSpecification, model_name: str, model_value: Model, server_type: str = "snowflake") -> str:
if data_contract_spec is None:
return ""
if data_contract_spec.models is None or len(data_contract_spec.models) == 0:
return ""

result = ""
result += f"-- Data Contract: {data_contract_spec.id}\n"
result += f"-- SQL Dialect: {server_type}\n"
result += _to_sql_query(model_name, model_value, server_type)

return result


def _to_sql_query(model_name, model_value, server_type) -> str:
columns = []
for field_name, field in model_value.fields.items():
# TODO escape SQL reserved key words, probably dependent on server type
columns.append(field_name)

result = "select\n"
current_column_index = 1
number_of_columns = len(columns)
for column in columns:
result += f" {column}"
if current_column_index < number_of_columns:
result += ","
result += "\n"
current_column_index += 1
result += f"from {model_name}\n"
return result


def to_sql_ddl(data_contract_spec: DataContractSpecification, server_type: str = "snowflake") -> str:
if data_contract_spec is None:
return ""
if data_contract_spec.models is None or len(data_contract_spec.models) == 0:
return ""

for server_name, server in iter(data_contract_spec.servers.items()):
if server.type == server_type:
break
if server.type == "snowflake":
server_type = "snowflake"
break
if server.type == "postgres":
server_type = "postgres"
break

result = ""
result += f"-- Data Contract: {data_contract_spec.id}\n"
result += f"-- SQL Dialect: {server_type}\n"
for model_name, model in iter(data_contract_spec.models.items()):
result += _to_sql_table(model_name, model, server_type)

return result.strip()


def _to_sql_table(model_name, model, server_type="snowflake"):
result = f"CREATE TABLE {model_name} (\n"
fields = len(model.fields)
current_field_index = 1
for field_name, field in iter(model.fields.items()):
type = convert_to_sql_type(field, server_type)
result += f" {field_name} {type}"
if field.required:
result += " not null"
if field.primary:
result += " primary key"
if current_field_index < fields:
result += ","
result += "\n"
current_field_index += 1
result += ");\n"
return result


91 changes: 91 additions & 0 deletions datacontract/export/sql_type_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from datacontract.model.data_contract_specification import Field


def convert_to_sql_type(field: Field, server_type: str) -> str:
if server_type == "snowflake":
return convert_to_snowflake(field)
if server_type == "postgres":
return convert_type_to_postgres(field)
return str(type)

# snowflake data types:
# https://docs.snowflake.com/en/sql-reference/data-types.html
def convert_to_snowflake(field) -> None | str:
type = field.type
# currently optimized for snowflake
# LEARNING: data contract has no direct support for CHAR,CHARACTER
# LEARNING: data contract has no support for "date-time", "datetime", "time"
# LEARNING: No precision and scale support in data contract
# LEARNING: no support for any
# GEOGRAPHY and GEOMETRY are not supported by the mapping
if type is None:
return None
if type.lower() in ["string", "varchar", "text"]:
return type.upper() # STRING, TEXT, VARCHAR are all the same in snowflake
if type.lower() in ["timestamp", "timestamp_tz"]:
return "TIMESTAMP_TZ"
if type.lower() in ["timestamp_ntz"]:
return "TIMESTAMP_NTZ"
if type.lower() in ["date"]:
return "DATE"
if type.lower() in ["time"]:
return "TIME"
if type.lower() in ["number", "decimal", "numeric"]:
# precision and scale not supported by data contract
return "NUMBER"
if type.lower() in ["float", "double"]:
return "FLOAT"
if type.lower() in ["integer", "int", "long", "bigint"]:
return "NUMBER" # always NUMBER(38,0)
if type.lower() in ["boolean"]:
return "BOOLEAN"
if type.lower() in ["object", "record", "struct"]:
return "OBJECT"
if type.lower() in ["bytes"]:
return "BINARY"
if type.lower() in ["array"]:
return "ARRAY"
return None



# https://www.postgresql.org/docs/current/datatype.html
# Using the name whenever possible
def convert_type_to_postgres(field : Field) -> None | str:
type = field.type
if type is None:
return None
if type.lower() in ["string", "varchar", "text"]:
if field.format == "uuid":
return "uuid"
return "text" # STRING does not exist, TEXT and VARCHAR are all the same in postrges
if type.lower() in ["timestamp", "timestamp_tz"]:
return "timestamptz"
if type.lower() in ["timestamp_ntz"]:
return "timestamp"
if type.lower() in ["date"]:
return "date"
if type.lower() in ["time"]:
return "time"
if type.lower() in ["number", "decimal", "numeric"]:
# precision and scale not supported by data contract
if type.lower() == "number":
return "numeric"
return type.lower()
if type.lower() in ["float"]:
return "real"
if type.lower() in ["double"]:
return "double precision"
if type.lower() in ["integer", "int", "bigint"]:
return type.lower()
if type.lower() in ["long"]:
return "bigint"
if type.lower() in ["boolean"]:
return "boolean"
if type.lower() in ["object", "record", "struct"]:
return "jsonb"
if type.lower() in ["bytes"]:
return "bytea"
if type.lower() in ["array"]:
return convert_to_sql_type(field.items, "postgres") + "[]"
return None
1 change: 0 additions & 1 deletion datacontract/model/data_contract_specification.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class Field(pyd.BaseModel):
ref_obj: Definition = pyd.Field(default=None, exclude=True)
type: str = None
format: str = None
primary: bool = None
required: bool = None
primary: bool = None
unique: bool = None
Expand Down
19 changes: 19 additions & 0 deletions tests/examples/postgres-export/data/data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Create the table
CREATE TABLE public.my_table (
field_one VARCHAR(10) primary key,
field_two INT not null,
field_three TIMESTAMP
);

-- Insert the data
INSERT INTO public.my_table (field_one, field_two, field_three) VALUES
('CX-263-DU', 50, '2023-06-16 13:12:56'),
('IK-894-MN', 47, '2023-10-08 22:40:57'),
('ER-399-JY', 22, '2023-05-16 01:08:22'),
('MT-939-FH', 63, '2023-03-15 05:15:21'),
('LV-849-MI', 33, '2023-09-08 20:08:43'),
('VS-079-OH', 85, '2023-04-15 00:50:32'),
('DN-297-XY', 79, '2023-11-08 12:55:42'),
('ZE-172-FP', 14, '2023-12-03 18:38:38'),
('ID-840-EG', 89, '2023-10-02 17:17:58'),
('FK-230-KZ', 64, '2023-11-27 15:21:48');
Loading

0 comments on commit 9276c2c

Please sign in to comment.