From 1b3ca24f90c89158352940b2689ef691ade8bd91 Mon Sep 17 00:00:00 2001 From: mkromer-tc <64023901+mkromer-tc@users.noreply.github.com> Date: Tue, 14 Jul 2020 11:06:05 -0400 Subject: [PATCH 1/2] Propagate future.exception --- kafka/producer/kafka.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 9509ab940..768db161c 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -686,6 +686,8 @@ def _wait_on_metadata(self, topic, max_wait): self._sender.wakeup() metadata_event.wait(max_wait - elapsed) elapsed = time.time() - begin + if future.exception: + raise future.exception if not metadata_event.is_set(): raise Errors.KafkaTimeoutError( "Failed to update metadata after %.1f secs." % (max_wait,)) From dc5fe87871e0d9303acb0aae9920f9d3f0d50395 Mon Sep 17 00:00:00 2001 From: Matt Kromer Date: Mon, 11 Jan 2021 13:43:36 -0500 Subject: [PATCH 2/2] Use failed topics not set of topics in error message --- kafka/consumer/fetcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 2c11eb945..7fb2466b0 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -854,7 +854,7 @@ def _parse_fetched_data(self, completed_fetch): elif error_type is Errors.TopicAuthorizationFailedError: log.warning("Not authorized to read from topic %s.", tp.topic) - raise Errors.TopicAuthorizationFailedError(set(tp.topic)) + raise Errors.TopicAuthorizationFailedError(tp.topic) elif error_type is Errors.UnknownError: log.warning("Unknown error fetching data for topic-partition %s", tp) else: