diff --git a/kafka/client_async.py b/kafka/client_async.py index caa88cf5e..d6f316687 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -634,6 +634,16 @@ def _poll(self, timeout): self._sensors.select_time.record((end_select - start_select) * 1000000000) for key, events in ready: + if key.fileobj.fileno() < 0: + # if there is no new message, self._selector.select() will take 0.1s. + # However, if the socket is closed, it could return immediately, result in + # high CPU usage because of busy-retry, caused by the loop in + # producer/sender.py run() + # calling producer/sender.py run_once() + # calling lient_async.py poll() + # calling lient_async.py _poll() + time.sleep(0.1) + if key.fileobj is self._wake_r: self._clear_wake_fd() continue