diff --git a/debussy_concert/core/entities/bigquery_table.py b/debussy_concert/core/entities/bigquery_table.py new file mode 100644 index 0000000..690d3fb --- /dev/null +++ b/debussy_concert/core/entities/bigquery_table.py @@ -0,0 +1,156 @@ +from dataclasses import dataclass, asdict, field as dataclass_field +from typing import List, Optional +from yaml_env_var_parser import load as yaml_load + +from debussy_concert.core.entities.table import Partitioning, Table, TableField, TableSchema + + +@dataclass +class BigQueryPolicyTags: + names: List[str] + + +@dataclass +class BigQueryTableField: + """ + ref: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables?hl=pt-br#TableFieldSchema + { + "name": string, + "description": string, + "type": string, + "mode": string, + "fields": [ + { + object (TableFieldSchema) + } + ], + "policyTags": { + "names": [ + string + ] + }, + "maxLength": string, + "precision": string, + "scale": string, + "collation": string + } + """ + + name: str + description: str + type: str + mode: str = "NULLABLE" + fields: Optional[List["BigQueryTableField"]] = None + policy_tags: Optional[BigQueryPolicyTags] = dataclass_field( + default=BigQueryPolicyTags([]) + ) + + def __post_init__(self): + # those should be upper case + self.type = self.type.upper() + if self.mode is not None: + self.mode = self.mode.upper() + + @classmethod + def load_from_internal_table_field(cls, table_field: TableField): + """ + Load data into BigQueryTableField class using TableField + """ + fields_key = table_field.extra_options.get('fields') + fields = None + if fields_key: + fields = [] + for inner_fields in fields_key: + bq_field = cls.load_from_internal_table_field( + TableField(**inner_fields) + ) + fields.append(bq_field) + policy_tags = table_field.column_tags or [] + bq_policy_tags = BigQueryPolicyTags(names=policy_tags) + field_schema = cls( + name=table_field.name, + description=table_field.description, + type=table_field.data_type, + mode=table_field.constraint, + fields=fields, + policy_tags=bq_policy_tags + ) + return field_schema + + def get_field_schema(self): + schema = asdict(self) + return schema + + +@dataclass +class BigQueryTimePartitioning: + """ + NOTE: might exist an implementation for this in the google.cloud.bigquery sdk, i could not find it + https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timepartitioning + { + "type": string, + "expirationMs": string, + "field": string, + "requirePartitionFilter": boolean # deprecated + } + """ + + type: str + expiration_ms: Optional[str] = None + field: Optional[str] = None + + def __post_init__(self): + type_ = self.type.upper() + if type_ not in ("DAY", "HOUR", "MONTH", "YEAR"): + raise ValueError(f"Invalid type: {type}") + self.type = type_ + + def to_dict(self) -> dict: + ret = { + "type": self.type, + "expirationMs": self.expiration_ms, + "field": self.field, + } + return ret + + @classmethod + def load_from_partitioning(cls, partitioning: Partitioning): + return cls(type=partitioning.granularity, field=partitioning.field) + + +def data_partitioning_factory(data_partitioning: Partitioning): + partitioning_type = data_partitioning.type.lower() + mapping = {"time": BigQueryTimePartitioning} + output_cls = mapping.get(partitioning_type) + if output_cls is None: + raise TypeError(f"Format `{partitioning_type}` is not supported") + return output_cls.load_from_partitioning(data_partitioning) + + +class BigQueryTableSchema(TableSchema): + @classmethod + def load_from_table_schema(cls, table_schema: TableSchema): + fields = [] + for table_field in table_schema.fields: + field = BigQueryTableField.load_from_internal_table_field( + table_field + ) + fields.append(field) + return cls(fields=fields) + + +@dataclass +class BigQueryTable: + schema: BigQueryTableSchema + partitioning: BigQueryTimePartitioning + + @classmethod + def load_from_table(cls, table: Table): + schema = BigQueryTableSchema.load_from_table_schema(table.schema) + partitioning = None + if partitioning := table.partitioning: + partitioning = data_partitioning_factory(partitioning) + return cls(schema=schema, partitioning=partitioning) + + def as_dict(self): + return asdict(self) diff --git a/debussy_concert/core/entities/table.py b/debussy_concert/core/entities/table.py index 4c77119..a2881c0 100644 --- a/debussy_concert/core/entities/table.py +++ b/debussy_concert/core/entities/table.py @@ -13,6 +13,7 @@ def __init__( constraint: Optional[str] = None, description: Optional[str] = None, column_tags: Optional[List] = None, + is_metadata: Optional[bool] = False, **extra_options, ) -> None: self.name = name @@ -20,131 +21,23 @@ def __init__( self.constraint = constraint self.description = description self.column_tags = column_tags + self.is_metadata = is_metadata self.extra_options = extra_options or {} - def get_field_schema(self): - raise NotImplementedError - - -@dataclass -class BigQueryPolicyTags: - names: List[str] - - -@dataclass -class BigQueryTableField: - """ - ref: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables?hl=pt-br#TableFieldSchema - { - "name": string, - "description": string, - "type": string, - "mode": string, - "fields": [ - { - object (TableFieldSchema) - } - ], - "policyTags": { - "names": [ - string - ] - }, - "maxLength": string, - "precision": string, - "scale": string, - "collation": string - } - """ - - name: str - description: str - type: str - mode: str = "NULLABLE" - fields: Optional[List["BigQueryTableField"]] = None - policy_tags: Optional[BigQueryPolicyTags] = dataclass_field( - default=BigQueryPolicyTags([]) - ) - - def __post_init__(self): - # those should be upper case - self.type = self.type.upper() - if self.mode is not None: - self.mode = self.mode.upper() - - @classmethod - def load_from_internal_table_field_interface_dict(cls, field_dict): - """ - Load data into BigQueryTableField class using TableField interface - """ - fields_key = field_dict.get("fields") - fields = None - if fields_key: - fields = [] - for inner_fields in fields_key: - bq_field = cls.load_from_internal_table_field_interface_dict( - inner_fields - ) - fields.append(bq_field) - policy_tags = field_dict.get("tags", []) - bq_policy_tags = BigQueryPolicyTags(names=policy_tags) - field_schema = cls( - name=field_dict["name"], - description=field_dict.get("description"), - type=field_dict["data_type"], - mode=field_dict.get("constraint"), - fields=fields, - policy_tags=bq_policy_tags, - ) - return field_schema + def __repr__(self): + return str(self.__dict__) def get_field_schema(self): - schema = asdict(self) - return schema + raise NotImplementedError( + "This is a generic table field and must be converted to a specific technology schema") class Partitioning: + type: str granularity: str field: str -@dataclass -class BigQueryTimePartitioning: - """ - NOTE: might exist an implementation for this in the google.cloud.bigquery sdk, i could not find it - https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timepartitioning - { - "type": string, - "expirationMs": string, - "field": string, - "requirePartitionFilter": boolean # deprecated - } - """ - - type: str - expiration_ms: Optional[str] = None - field: Optional[str] = None - - def __post_init__(self): - type_ = self.type.upper() - if type_ not in ("DAY", "HOUR", "MONTH", "YEAR"): - raise ValueError(f"Invalid type: {type}") - self.type = type_ - - def to_dict(self) -> dict: - ret = { - "type": self.type, - "expirationMs": self.expiration_ms, - "field": self.field, - } - return ret - - @classmethod - def load_from_internal_partitioning_interface_dict(cls, data_dict): - del data_dict["type"] - return cls(type=data_dict["granularity"], field=data_dict["field"]) - - @dataclass class TableSchema: fields: List[TableField] @@ -161,40 +54,10 @@ def as_dict(self): return asdict(self) -def data_partitioning_factory(data_partitioning): - partitioning_type = data_partitioning["type"].lower() - mapping = {"time": BigQueryTimePartitioning} - output_cls = mapping.get(partitioning_type) - if output_cls is None: - raise TypeError(f"Format `{partitioning_type}` is not supported") - return output_cls.load_from_internal_partitioning_interface_dict(data_partitioning) - - -class BigQueryTableSchema(TableSchema): - @classmethod - def load_from_dict(cls, table_dict): - fields = [] - for field_dict in table_dict["fields"]: - field = BigQueryTableField.load_from_internal_table_field_interface_dict( - field_dict - ) - fields.append(field) - return cls(fields=fields) - - @dataclass -class BigQueryTable: - schema: BigQueryTableSchema - partitioning: BigQueryTimePartitioning - - @classmethod - def load_from_dict(cls, table_dict): - print(table_dict) - schema = BigQueryTableSchema.load_from_dict(table_dict) - partitioning = None - if partitioning_dict := table_dict.get("partitioning"): - partitioning = data_partitioning_factory(partitioning_dict) - return cls(schema=schema, partitioning=partitioning) +class Table: + schema: TableSchema + partitioning: Partitioning @classmethod def load_from_file(cls, file_path: str): @@ -202,5 +65,10 @@ def load_from_file(cls, file_path: str): table_dict = yaml_load(file) return cls.load_from_dict(table_dict) - def as_dict(self): - return asdict(self) + @classmethod + def load_from_dict(cls, table_dict): + schema = TableSchema.load_from_dict(table_dict) + partitioning = None + if partitioning_dict := table_dict.get("partitioning"): + partitioning = Partitioning(**partitioning_dict) + return cls(schema=schema, partitioning=partitioning) diff --git a/debussy_concert/core/service/lakehouse/google_cloud.py b/debussy_concert/core/service/lakehouse/google_cloud.py index cd155f0..59efa6e 100644 --- a/debussy_concert/core/service/lakehouse/google_cloud.py +++ b/debussy_concert/core/service/lakehouse/google_cloud.py @@ -1,22 +1,52 @@ -from debussy_concert.core.entities.table import BigQueryTable +from debussy_concert.core.entities.table import Partitioning, Table, TableField +from debussy_concert.core.entities.bigquery_table import BigQueryTable, BigQueryTableField, BigQueryTimePartitioning class GoogleCloudLakeHouseService: @staticmethod - def get_table_schema(table: BigQueryTable): + def convert_table(table: Table) -> BigQueryTable: + table = BigQueryTable.load_from_table(table) + return table + + @staticmethod + def convert_partitioning(partitioning: Partitioning) -> BigQueryTimePartitioning: + partitioning = BigQueryTimePartitioning.load_from_partitioning(partitioning) + return partitioning + + @staticmethod + def convert_table_field(table: TableField) -> BigQueryTableField: + bq_field = BigQueryTableField.load_from_internal_table_field(table) + return bq_field + + @staticmethod + def get_table_schema_without_metadata_columns(table: Table) -> dict: + table_fields = table.schema.fields + table_schema = [] + for field in table_fields: + if field.is_metadata: + continue + field = GoogleCloudLakeHouseService.convert_table_field(field) + table_schema.append(field.get_field_schema()) + return table_schema + + @staticmethod + def get_table_schema(table: Table) -> dict: table_fields = table.schema.fields table_schema = [] for field in table_fields: + field = GoogleCloudLakeHouseService.convert_table_field(field) table_schema.append(field.get_field_schema()) return table_schema @staticmethod - def get_table_partitioning(table: BigQueryTable): + def get_table_partitioning(table: Table) -> dict: partitioning = table.partitioning if partitioning is not None: + partitioning = GoogleCloudLakeHouseService.convert_partitioning(partitioning) partitioning = partitioning.to_dict() return partitioning @staticmethod - def get_table_resource(table: BigQueryTable): + def get_table_resource(table: Table) -> dict: + table = GoogleCloudLakeHouseService.convert_table(table) return table.as_dict() diff --git a/debussy_concert/pipeline/data_ingestion/config/movement_parameters/time_partitioned.py b/debussy_concert/pipeline/data_ingestion/config/movement_parameters/time_partitioned.py index 3054a78..52ead58 100644 --- a/debussy_concert/pipeline/data_ingestion/config/movement_parameters/time_partitioned.py +++ b/debussy_concert/pipeline/data_ingestion/config/movement_parameters/time_partitioned.py @@ -1,7 +1,7 @@ from typing import Optional from dataclasses import dataclass from debussy_concert.core.config.movement_parameters.base import MovementParametersBase -from debussy_concert.core.entities.table import BigQueryTable +from debussy_concert.core.entities.table import Table @dataclass(frozen=True) @@ -14,7 +14,7 @@ class BigQueryDataPartitioning: class TimePartitionedDataIngestionMovementParameters(MovementParametersBase): extract_connection_id: str data_partitioning: BigQueryDataPartitioning - raw_table_definition: str or BigQueryTable + raw_table_definition: str or Table def __post_init__(self): if not isinstance(self.data_partitioning, BigQueryDataPartitioning): @@ -27,11 +27,11 @@ def __post_init__(self): def load_raw_table_definition_attr(self, raw_table_definition): if isinstance(raw_table_definition, str): - raw_table_definition = BigQueryTable.load_from_file( + raw_table_definition = Table.load_from_file( self.raw_table_definition ) elif isinstance(raw_table_definition, dict): - raw_table_definition = BigQueryTable.load_from_dict( + raw_table_definition = Table.load_from_dict( self.raw_table_definition ) else: diff --git a/tests/services/test_lakehouse.py b/tests/services/test_lakehouse.py index 00efdfb..61596fa 100644 --- a/tests/services/test_lakehouse.py +++ b/tests/services/test_lakehouse.py @@ -1,4 +1,4 @@ -from debussy_concert.core.entities.table import BigQueryTable +from debussy_concert.core.entities.table import Table from debussy_concert.core.service.lakehouse.google_cloud import ( GoogleCloudLakeHouseService, ) @@ -29,9 +29,16 @@ def get_example_table(): {"name": "work", "data_type": "STRING"}, ], }, + { + "name": "_metadata", + "description": "metadata column", + "data_type": "STRING", + "constraint": "REQUIRED", + "is_metadata": True + }, ] } - return BigQueryTable.load_from_dict(table_dict) + return Table.load_from_dict(table_dict) def test_get_table_schema(): @@ -78,7 +85,65 @@ def test_get_table_schema(): }, ], }, + { + "name": "_metadata", + "description": "metadata column", + "type": "STRING", + "mode": "REQUIRED", + "policy_tags": {"names": []}, + "fields": None, + } ] schema = GoogleCloudLakeHouseService.get_table_schema(table) # print(schema) assert schema == expected_schema + + +def test_get_table_schema_without_metadata_columns(): + table = get_example_table() + expected_schema = [ + { + "name": "user_id", + "description": "id of the user", + "type": "STRING", + "mode": "REQUIRED", + "policy_tags": {"names": []}, # defaults to empty list + "fields": None, # defaults do None + }, + { + "name": "user_email", + "description": "email of the user", + "type": "STRING", + "mode": "REQUIRED", + "policy_tags": {"names": []}, + "fields": None, + }, + { + "name": "user_addresses", + "description": "home and work addresses of the user", + "type": "RECORD", + "mode": "REPEATED", + "policy_tags": {"names": []}, + "fields": [ + { + "name": "home", + "description": None, # defaults do None + "type": "STRING", + "mode": None, # defaults do None + "fields": None, + "policy_tags": {"names": []}, + }, + { + "name": "work", + "description": None, + "type": "STRING", + "mode": None, + "fields": None, + "policy_tags": {"names": []}, + }, + ], + } + ] + schema = GoogleCloudLakeHouseService.get_table_schema_without_metadata_columns(table) + # print(schema) + assert schema == expected_schema