Skip to content

Commit

Permalink
Scope flush workaround in confluent_kafka_wrapper to only the
Browse files Browse the repository at this point in the history
affected versions (librdkafka 1.8.x). See
confluentinc/librdkafka#3633
  • Loading branch information
Tyrel M. McQueen committed Apr 28, 2022
1 parent f3b6b7c commit ef23f6d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 10 deletions.
5 changes: 5 additions & 0 deletions kafkacrypto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
import confluent_kafka
from kafkacrypto.confluent_kafka_wrapper import KafkaConsumer,KafkaProducer,TopicPartition,TopicPartitionOffset,OffsetAndMetadata
warnings.warn("Using confluent_kafka: {}, librdkafka: {}".format(str(confluent_kafka.version()), str(confluent_kafka.libversion())),category=RuntimeWarning)
# enable custom flush workaround for affected versions of librdkafka: 1.8.x
# See https://github.com/edenhill/librdkafka/issues/3633
if confluent_kafka.libversion()[1] >= 0x01080000 and confluent_kafka.libversion()[1] < 0x01090000:
KafkaProducer.enable_flush_workaround = True
warnings.warn(" Enabling flush() workaround for librdkafka 1.8.x", category=RuntimeWarning)
except ImportError:
# fallback to kafka-python
warnings.warn("No confluent_kafka package found. Falling back to kafka-python. It is highly, recommended that you install confluent_kafka and librdkafka for better performance, especially with large messages.",category=RuntimeWarning)
Expand Down
42 changes: 32 additions & 10 deletions kafkacrypto/confluent_kafka_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import inspect
import logging
from time import time
from threading import Lock
from confluent_kafka import Producer, Consumer, TopicPartition as TopicPartitionOffset, OFFSET_BEGINNING, OFFSET_END, TIMESTAMP_NOT_AVAILABLE, KafkaException, KafkaError
from kafka.future import Future
from kafkacrypto.exceptions import KafkaCryptoWrapperError
Expand Down Expand Up @@ -37,7 +38,7 @@ def base_callback(self, err, msg):
elif msg is None:
self._producer._log.debug("Callback failed (None msg).")
super().failure(KafkaException(KafkaError.UNKNOWN,'null msg'))
elif msg.error() != None:
elif msg.error() is not None:
self._producer._log.debug("Callback failed (non-None msg.error).")
super().failure(KafkaException(msg.error()))
else:
Expand Down Expand Up @@ -325,6 +326,9 @@ class KafkaProducer(Producer):
'sasl_kerberos_domain_name',
'sasl_oauth_token_provider',
]

enable_flush_workaround = False

def __init__(self, **configs):
self._log = logging.getLogger(__name__)
self.raw_config = configs
Expand Down Expand Up @@ -363,15 +367,29 @@ def __init__(self, **configs):
for oldk,newk in self.CONFIG_MAP.items():
if newk in self.cf_config.keys():
self.config[oldk] = self.cf_config[newk]
self.messages_processed = 0
self.messages_processed_lock = Lock()
super().__init__(self.cf_config)
if KafkaProducer.enable_flush_workaround:
self.flush = self._flush_workaround
else:
self.flush = self._flush_native

def close(self):
# confluent-kafka has no concept of a "close" operation,
# so this flushes queues only
self.flush()
pass

def flush(self, timeout="default", timeout_jiffy=0.1):
def _flush_native(self, timeout="default", timeout_jiffy=0.1):
if timeout is None:
# confluent_kafka uses -1 for infinite timeout
timeout = -1
elif timeout == "default":
timeout = self.config['produce_timeout']
return super().flush(timeout)

def _flush_workaround(self, timeout="default", timeout_jiffy=0.1):
# librdkafka 1.8.0 changed the behavior of flush() to ignore linger_ms and
# immediately attempt to send messages. Unfortunately, that change added
# a call to rd_kafka_all_brokers_wakeup , which seems to cause a hang of
Expand All @@ -381,10 +399,6 @@ def flush(self, timeout="default", timeout_jiffy=0.1):
# This eventually results in non-sensical exceptions being thrown. We
# fix it here by implementing flush as sucessive polling directly.
#
# It is highly recommended this be used with a timeout, since otherwise
# with high message production rates and other poll-callers winning the
# race for calling callbacks, this will never complete.
#
# timeout = None is infinite timeout
# timeout = "default" (or any non-number) should never be passed by callers,
# but is used here to indicate use of configured default.
Expand All @@ -394,19 +408,23 @@ def flush(self, timeout="default", timeout_jiffy=0.1):
timeout = -1
elif timeout == "default":
timeout = self.config['produce_timeout']
left = len(self)
with self.messages_processed_lock:
left = len(self)
final_processed = self.messages_processed+left
initial = left
self._log.debug("Entering Producer flush with timeout=%s and left=%s.", str(timeout), str(left))
if timeout<0:
while left > 0:
con = self.poll(timeout_jiffy)
initial -= con
left = min([len(self),initial])
with self.messages_processed_lock:
left = min([len(self),initial,max([0,final_processed-self.messages_processed])])
self._log.debug("Producer flush poll cycle complete, left=%s.", str(left))
elif left > 0:
con = self.poll(timeout)
initial -= con
left = min([len(self),initial])
with self.messages_processed_lock:
left = min([len(self),initial,max([0,final_processed-self.messages_processed])])
self._log.debug("Producer flush single poll cycle complete, left=%s.", str(left))
if max([left,0]) != 0:
self._log.info("Producer flush complete, but messages left=%s greater than zero.", str(max([left,0])))
Expand All @@ -421,7 +439,11 @@ def poll(self, timeout=0):
if timeout is None:
# confluent_kafka uses -1 for infinite timeout
timeout = -1
return super().poll(timeout)
rv = super().poll(timeout)
if rv > 0:
with self.messages_processed_lock:
self.messages_processed += rv
return rv

def send(self, topic, value=None, key=None, headers=None, partition=0, timestamp_ms=None):
self._log.debug("Executing Producer send to topic=%s, with value=%s, key=%s, headers=%s, partition=%s, timestamp_ms=%s.", str(topic), str(value), str(key), str(headers), str(partition), str(timestamp_ms))
Expand Down

0 comments on commit ef23f6d

Please sign in to comment.