From 15e3e40d69da58de3fa519e8ce4a2e4049aaaec5 Mon Sep 17 00:00:00 2001 From: Joachim Praetorius Date: Wed, 15 May 2024 13:41:19 +0200 Subject: [PATCH] Support logical Types in Avro Export (#199) * Support logical Types in Avro Export - Map Datacontracts date-type fields to avro logical types - date: `int/date` - timestamp, timestamp_tz: `long/timestamp-millis` - timestamp_ntz: `long/local-timestamp-millis` * Update CHANGELOG --- CHANGELOG.md | 5 +- datacontract/export/avro_converter.py | 18 +++++- tests/fixtures/avro/export/datacontract.yaml | 46 +++++++++++++++ .../avro/export/orders_with_datefields.avsc | 57 +++++++++++++++++++ tests/test_export_avro.py | 54 ++---------------- 5 files changed, 126 insertions(+), 54 deletions(-) create mode 100644 tests/fixtures/avro/export/datacontract.yaml create mode 100644 tests/fixtures/avro/export/orders_with_datefields.avsc diff --git a/CHANGELOG.md b/CHANGELOG.md index a399e9a0..debd964e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,8 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - datacontract catalog: Search form -- `datacontract import --format bigquery`: Import from BigQuery format -- `datacontract export --format bigquery`: Export to BigQuery format +- `datacontract import --format bigquery`: Import from BigQuery format (#110) +- `datacontract export --format bigquery`: Export to BigQuery format (#111) +- `datacontract export --format avro`: Now supports [Avro logical types](https://avro.apache.org/docs/1.11.1/specification/#logical-types) to better model date types. `date`, `timestamp`/`timestamp-tz` and `timestamp-ntz` are now mapped to the appropriate logical types. (#141) - `datacontract publish`: Publish the data contract to the Data Mesh Manager ## [0.10.3] - 2024-05-05 diff --git a/datacontract/export/avro_converter.py b/datacontract/export/avro_converter.py index 2adf8f89..ed0072f5 100644 --- a/datacontract/export/avro_converter.py +++ b/datacontract/export/avro_converter.py @@ -34,6 +34,10 @@ def to_avro_field(field, field_name): if field.description is not None: avro_field["doc"] = field.description avro_field["type"] = to_avro_type(field, field_name) + # add logical type definitions for any of the date type fields + if field.type in ["timestamp", "timestamp_tz", "timestamp_ntz", "date"]: + avro_field["logicalType"] = to_avro_logical_type(field.type) + return avro_field @@ -54,9 +58,9 @@ def to_avro_type(field: Field, field_name: str) -> str | dict: elif field.type in ["boolean"]: return "boolean" elif field.type in ["timestamp", "timestamp_tz"]: - return "string" + return "long" elif field.type in ["timestamp_ntz"]: - return "string" + return "long" elif field.type in ["date"]: return "int" elif field.type in ["time"]: @@ -72,3 +76,13 @@ def to_avro_type(field: Field, field_name: str) -> str | dict: return "null" else: return "bytes" + +def to_avro_logical_type(type: str) -> str: + if type in ["timestamp", "timestamp_tz"]: + return "timestamp-millis" + elif type in ["timestamp_ntz"]: + return "local-timestamp-millis" + elif type in ["date"]: + return "date" + else: + return "" \ No newline at end of file diff --git a/tests/fixtures/avro/export/datacontract.yaml b/tests/fixtures/avro/export/datacontract.yaml new file mode 100644 index 00000000..7073eab7 --- /dev/null +++ b/tests/fixtures/avro/export/datacontract.yaml @@ -0,0 +1,46 @@ +dataContractSpecification: 0.9.2 +id: orders +info: + title: Orders + version: 0.0.1 + description: Order messages as generated by Confluent Datagen Source Adapter +servers: + production: + type: kafka + host: pkc-7xoy1.eu-central-1.aws.confluent.cloud:9092 + topic: orders.avro.v1 + format: avro +models: + orders: + type: table + description: My Model + namespace: com.example.checkout + fields: + orderdate: + type: date + description: My Field + order_timestamp: + type: timestamp + delivery_timestamp: + type: timestamp_ntz + orderid: + type: int + itemid: + type: string + orderunits: + type: double + address: + type: object + fields: + city: + type: string + state: + type: string + zipcode: + type: long +quality: + type: SodaCL + specification: + checks for orders: + - row_count >= 5000 + diff --git a/tests/fixtures/avro/export/orders_with_datefields.avsc b/tests/fixtures/avro/export/orders_with_datefields.avsc new file mode 100644 index 00000000..f1dea35b --- /dev/null +++ b/tests/fixtures/avro/export/orders_with_datefields.avsc @@ -0,0 +1,57 @@ +{ + "type": "record", + "name": "orders", + "doc": "My Model", + "namespace": "com.example.checkout", + "fields": [ + { + "name": "orderdate", + "doc": "My Field", + "type": "int", + "logicalType": "date" + }, + { + "name": "order_timestamp", + "type": "long", + "logicalType": "timestamp-millis" + }, + { + "name": "delivery_timestamp", + "type": "long", + "logicalType": "local-timestamp-millis" + }, + { + "name": "orderid", + "type": "int" + }, + { + "name": "itemid", + "type": "string" + }, + { + "name": "orderunits", + "type": "double" + }, + { + "name": "address", + "type": { + "type": "record", + "name": "address", + "fields": [ + { + "name": "city", + "type": "string" + }, + { + "name": "state", + "type": "string" + }, + { + "name": "zipcode", + "type": "long" + } + ] + } + } + ] +} \ No newline at end of file diff --git a/tests/test_export_avro.py b/tests/test_export_avro.py index af1a50e4..76e5da8b 100644 --- a/tests/test_export_avro.py +++ b/tests/test_export_avro.py @@ -12,60 +12,14 @@ def test_cli(): runner = CliRunner() - result = runner.invoke(app, ["export", "./fixtures/kafka-avro-remote/datacontract.yaml", "--format", "avro"]) + result = runner.invoke(app, ["export", "./fixtures/avro/export/datacontract.yaml", "--format", "avro"]) assert result.exit_code == 0 def test_to_avro_schema(): - data_contract = DataContractSpecification.from_file("fixtures/kafka-avro-remote/datacontract.yaml") - expected_avro_schema = """ - { - "fields": [ - { - "name": "ordertime", - "doc": "My Field", - "type": "long" - }, - { - "name": "orderid", - "type": "int" - }, - { - "name": "itemid", - "type": "string" - }, - { - "name": "orderunits", - "type": "double" - }, - { - "name": "address", - "type": { - "fields": [ - { - "name": "city", - "type": "string" - }, - { - "name": "state", - "type": "string" - }, - { - "name": "zipcode", - "type": "long" - } - ], - "name": "address", - "type": "record" - } - } - ], - "name": "orders", - "namespace": "com.example.checkout", - "doc": "My Model", - "type": "record" -} -""" + data_contract = DataContractSpecification.from_file("fixtures/avro/export/datacontract.yaml") + with open("fixtures/avro/export/orders_with_datefields.avsc") as file: + expected_avro_schema = file.read() model_name, model = next(iter(data_contract.models.items())) result = to_avro_schema_json(model_name, model)