Skip to content

Commit

Permalink
Support logical Types in Avro Export (datacontract#199)
Browse files Browse the repository at this point in the history
* Support logical Types in Avro Export

- Map Datacontracts date-type fields to avro logical types
- date: `int/date`
- timestamp, timestamp_tz: `long/timestamp-millis`
- timestamp_ntz: `long/local-timestamp-millis`

* Update CHANGELOG
  • Loading branch information
jpraetorius authored May 15, 2024
1 parent 63ddf0f commit 15e3e40
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 54 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- datacontract catalog: Search form
- `datacontract import --format bigquery`: Import from BigQuery format
- `datacontract export --format bigquery`: Export to BigQuery format
- `datacontract import --format bigquery`: Import from BigQuery format (#110)
- `datacontract export --format bigquery`: Export to BigQuery format (#111)
- `datacontract export --format avro`: Now supports [Avro logical types](https://avro.apache.org/docs/1.11.1/specification/#logical-types) to better model date types. `date`, `timestamp`/`timestamp-tz` and `timestamp-ntz` are now mapped to the appropriate logical types. (#141)
- `datacontract publish`: Publish the data contract to the Data Mesh Manager

## [0.10.3] - 2024-05-05
Expand Down
18 changes: 16 additions & 2 deletions datacontract/export/avro_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ def to_avro_field(field, field_name):
if field.description is not None:
avro_field["doc"] = field.description
avro_field["type"] = to_avro_type(field, field_name)
# add logical type definitions for any of the date type fields
if field.type in ["timestamp", "timestamp_tz", "timestamp_ntz", "date"]:
avro_field["logicalType"] = to_avro_logical_type(field.type)

return avro_field


Expand All @@ -54,9 +58,9 @@ def to_avro_type(field: Field, field_name: str) -> str | dict:
elif field.type in ["boolean"]:
return "boolean"
elif field.type in ["timestamp", "timestamp_tz"]:
return "string"
return "long"
elif field.type in ["timestamp_ntz"]:
return "string"
return "long"
elif field.type in ["date"]:
return "int"
elif field.type in ["time"]:
Expand All @@ -72,3 +76,13 @@ def to_avro_type(field: Field, field_name: str) -> str | dict:
return "null"
else:
return "bytes"

def to_avro_logical_type(type: str) -> str:
if type in ["timestamp", "timestamp_tz"]:
return "timestamp-millis"
elif type in ["timestamp_ntz"]:
return "local-timestamp-millis"
elif type in ["date"]:
return "date"
else:
return ""
46 changes: 46 additions & 0 deletions tests/fixtures/avro/export/datacontract.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
dataContractSpecification: 0.9.2
id: orders
info:
title: Orders
version: 0.0.1
description: Order messages as generated by Confluent Datagen Source Adapter
servers:
production:
type: kafka
host: pkc-7xoy1.eu-central-1.aws.confluent.cloud:9092
topic: orders.avro.v1
format: avro
models:
orders:
type: table
description: My Model
namespace: com.example.checkout
fields:
orderdate:
type: date
description: My Field
order_timestamp:
type: timestamp
delivery_timestamp:
type: timestamp_ntz
orderid:
type: int
itemid:
type: string
orderunits:
type: double
address:
type: object
fields:
city:
type: string
state:
type: string
zipcode:
type: long
quality:
type: SodaCL
specification:
checks for orders:
- row_count >= 5000

57 changes: 57 additions & 0 deletions tests/fixtures/avro/export/orders_with_datefields.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
{
"type": "record",
"name": "orders",
"doc": "My Model",
"namespace": "com.example.checkout",
"fields": [
{
"name": "orderdate",
"doc": "My Field",
"type": "int",
"logicalType": "date"
},
{
"name": "order_timestamp",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "delivery_timestamp",
"type": "long",
"logicalType": "local-timestamp-millis"
},
{
"name": "orderid",
"type": "int"
},
{
"name": "itemid",
"type": "string"
},
{
"name": "orderunits",
"type": "double"
},
{
"name": "address",
"type": {
"type": "record",
"name": "address",
"fields": [
{
"name": "city",
"type": "string"
},
{
"name": "state",
"type": "string"
},
{
"name": "zipcode",
"type": "long"
}
]
}
}
]
}
54 changes: 4 additions & 50 deletions tests/test_export_avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,60 +12,14 @@

def test_cli():
runner = CliRunner()
result = runner.invoke(app, ["export", "./fixtures/kafka-avro-remote/datacontract.yaml", "--format", "avro"])
result = runner.invoke(app, ["export", "./fixtures/avro/export/datacontract.yaml", "--format", "avro"])
assert result.exit_code == 0


def test_to_avro_schema():
data_contract = DataContractSpecification.from_file("fixtures/kafka-avro-remote/datacontract.yaml")
expected_avro_schema = """
{
"fields": [
{
"name": "ordertime",
"doc": "My Field",
"type": "long"
},
{
"name": "orderid",
"type": "int"
},
{
"name": "itemid",
"type": "string"
},
{
"name": "orderunits",
"type": "double"
},
{
"name": "address",
"type": {
"fields": [
{
"name": "city",
"type": "string"
},
{
"name": "state",
"type": "string"
},
{
"name": "zipcode",
"type": "long"
}
],
"name": "address",
"type": "record"
}
}
],
"name": "orders",
"namespace": "com.example.checkout",
"doc": "My Model",
"type": "record"
}
"""
data_contract = DataContractSpecification.from_file("fixtures/avro/export/datacontract.yaml")
with open("fixtures/avro/export/orders_with_datefields.avsc") as file:
expected_avro_schema = file.read()

model_name, model = next(iter(data_contract.models.items()))
result = to_avro_schema_json(model_name, model)
Expand Down

0 comments on commit 15e3e40

Please sign in to comment.