Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use.latest.version override is not working #18

Open
ekerstens opened this issue Sep 29, 2022 · 2 comments
Open

use.latest.version override is not working #18

ekerstens opened this issue Sep 29, 2022 · 2 comments

Comments

@ekerstens
Copy link

ekerstens commented Sep 29, 2022

This library seems to be ignoring the use.latest.version`. Using this library to serialize some data, I was getting an error like the following

Error retrieving Avro schema id from schema registry for subject '<subject>' and schema: <schema>

Schema not found; error code: 40403

After investigating the original schema, I discovered that some string fields were incorrectly defined as:

         "type":{  
            "type":"string",
            "avro.java.string":"String"
         },

There is an issue in avro which can malform registered String types into the above union type. I stumbled upon this issue after getting the following error trying to use this library to serialize some data to a topic using a schema which was malformed in this way. The error I got was as follows and only after investigating I discovered that my schema which got registered in my schema registry had the above problem.

I found a suggested workaround using use.latest.version but it was not working. I then tried switching to using Avro.default.toRecord to manually convert my data class into a GenericRecord and then configuring my producer to use KafkaAvroSerializer. This worked. Based on this behavior, I think that the KafkaAvro4kSerializer is not using the latest schema version despite setting use.latest.version.

@ekerstens
Copy link
Author

Full traceback

java.lang.IllegalStateException: Failed to execute ApplicationRunner
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:763)
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:750)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:309)
	at com.expediagroup.sd.casedomain.enricher.ApplicationKt.main(Application.kt:105)
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message with avro4k
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerializer.serializeImpl(AbstractKafkaAvro4kSerializer.kt:50)
	at com.github.thake.kafka.avro4k.serializer.KafkaAvro4kSerializer.serialize(KafkaAvro4kSerializer.kt:29)
	at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:921)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:889)
	at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:984)
	at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:649)
	at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:409)
	at com.expediagroup.sd.casedomain.enricher.Application.runner$lambda-0(Application.kt:65)
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:760)
	... 3 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema id from schema registry for subject '<subject>' and schema: <schema>	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe.doCallToSchemaRegistry(AbstractKafkaAvro4kSerDe.kt:71)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe.getSchemaIdWithRetry(AbstractKafkaAvro4kSerDe.kt:82)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerializer.getSchemaId(AbstractKafkaAvro4kSerializer.kt:87)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerializer.serializeImpl(AbstractKafkaAvro4kSerializer.kt:38)
	... 12 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:458)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:434)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getIdFromRegistry(CachedSchemaRegistryClient.java:316)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getId(CachedSchemaRegistryClient.java:539)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getId(CachedSchemaRegistryClient.java:519)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe$getSchemaIdWithRetry$2.invoke(AbstractKafkaAvro4kSerDe.kt:83)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe$getSchemaIdWithRetry$2.invoke(AbstractKafkaAvro4kSerDe.kt:82)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe$doCallToSchemaRegistry$2$1.invokeSuspend(AbstractKafkaAvro4kSerDe.kt:67)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe$doCallToSchemaRegistry$2$1.invoke(AbstractKafkaAvro4kSerDe.kt)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe$doCallToSchemaRegistry$2$1.invoke(AbstractKafkaAvro4kSerDe.kt)
	at com.github.michaelbull.retry.RetryKt$retry$5.invokeSuspend(Retry.kt:52)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
	at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:166)
	at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:431)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:420)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeUndispatched(CancellableContinuationImpl.kt:518)
	at kotlinx.coroutines.EventLoopImplBase$DelayedResumeTask.run(EventLoop.common.kt:500)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:284)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe.doCallToSchemaRegistry(AbstractKafkaAvro4kSerDe.kt:65)

@thake
Copy link
Owner

thake commented Oct 10, 2022

@ekerstens, thanks for raising this issue. I didn't have time to look into it yet, but I'll try to do so this week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants