You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
FWIW, I've used this library and built schema support separately. The gist to get this running is
1. Extract Schema ID
Each Avro message starts with a 5 byte header, which includes the Schema ID
def deserialise(data):
message_bytes = io.BytesIO(data)
# The serialised avro text has 5 leading bytes, representing
# 0. Magic byte. Confluent serialization format; currently always 0
# 1-4. Schema ID as returned by Schema Registry
# See https://stackoverflow.com/questions/44407780
message_bytes.seek(1)
schema_id = struct.unpack("!i", message_bytes.read(4))[0]
2. Fetch Schema
Once the Schema ID is known, it can be fetched and parsed using requests, fastavro and json libraries
It would be great to be able to dynamically load schemas from the Kafka Schema Registry when producing / consuming records from Kafka.
At the moment you have to hard code everything and this can easily create a compatibility issue between systems inadvertently using outdated schema.
The text was updated successfully, but these errors were encountered: