diff --git a/kafkacrypto/__init__.py b/kafkacrypto/__init__.py index 08ef9fc..911554d 100644 --- a/kafkacrypto/__init__.py +++ b/kafkacrypto/__init__.py @@ -13,6 +13,11 @@ import confluent_kafka from kafkacrypto.confluent_kafka_wrapper import KafkaConsumer,KafkaProducer,TopicPartition,TopicPartitionOffset,OffsetAndMetadata warnings.warn("Using confluent_kafka: {}, librdkafka: {}".format(str(confluent_kafka.version()), str(confluent_kafka.libversion())),category=RuntimeWarning) + # enable custom flush workaround for affected versions of librdkafka: 1.8.x + # See https://github.com/edenhill/librdkafka/issues/3633 + if confluent_kafka.libversion()[1] >= 0x01080000 and confluent_kafka.libversion()[1] < 0x01090000: + KafkaProducer.enable_flush_workaround = True + warnings.warn(" Enabling flush() workaround for librdkafka 1.8.x", category=RuntimeWarning) except ImportError: # fallback to kafka-python warnings.warn("No confluent_kafka package found. Falling back to kafka-python. It is highly, recommended that you install confluent_kafka and librdkafka for better performance, especially with large messages.",category=RuntimeWarning) diff --git a/kafkacrypto/confluent_kafka_wrapper.py b/kafkacrypto/confluent_kafka_wrapper.py index 8a2266c..0acf6d4 100644 --- a/kafkacrypto/confluent_kafka_wrapper.py +++ b/kafkacrypto/confluent_kafka_wrapper.py @@ -2,6 +2,7 @@ import inspect import logging from time import time +from threading import Lock from confluent_kafka import Producer, Consumer, TopicPartition as TopicPartitionOffset, OFFSET_BEGINNING, OFFSET_END, TIMESTAMP_NOT_AVAILABLE, KafkaException, KafkaError from kafka.future import Future from kafkacrypto.exceptions import KafkaCryptoWrapperError @@ -37,7 +38,7 @@ def base_callback(self, err, msg): elif msg is None: self._producer._log.debug("Callback failed (None msg).") super().failure(KafkaException(KafkaError.UNKNOWN,'null msg')) - elif msg.error() != None: + elif msg.error() is not None: self._producer._log.debug("Callback failed (non-None msg.error).") super().failure(KafkaException(msg.error())) else: @@ -325,6 +326,9 @@ class KafkaProducer(Producer): 'sasl_kerberos_domain_name', 'sasl_oauth_token_provider', ] + + enable_flush_workaround = False + def __init__(self, **configs): self._log = logging.getLogger(__name__) self.raw_config = configs @@ -363,7 +367,13 @@ def __init__(self, **configs): for oldk,newk in self.CONFIG_MAP.items(): if newk in self.cf_config.keys(): self.config[oldk] = self.cf_config[newk] + self.messages_processed = 0 + self.messages_processed_lock = Lock() super().__init__(self.cf_config) + if KafkaProducer.enable_flush_workaround: + self.flush = self._flush_workaround + else: + self.flush = self._flush_native def close(self): # confluent-kafka has no concept of a "close" operation, @@ -371,7 +381,15 @@ def close(self): self.flush() pass - def flush(self, timeout="default", timeout_jiffy=0.1): + def _flush_native(self, timeout="default", timeout_jiffy=0.1): + if timeout is None: + # confluent_kafka uses -1 for infinite timeout + timeout = -1 + elif timeout == "default": + timeout = self.config['produce_timeout'] + return super().flush(timeout) + + def _flush_workaround(self, timeout="default", timeout_jiffy=0.1): # librdkafka 1.8.0 changed the behavior of flush() to ignore linger_ms and # immediately attempt to send messages. Unfortunately, that change added # a call to rd_kafka_all_brokers_wakeup , which seems to cause a hang of @@ -381,10 +399,6 @@ def flush(self, timeout="default", timeout_jiffy=0.1): # This eventually results in non-sensical exceptions being thrown. We # fix it here by implementing flush as sucessive polling directly. # - # It is highly recommended this be used with a timeout, since otherwise - # with high message production rates and other poll-callers winning the - # race for calling callbacks, this will never complete. - # # timeout = None is infinite timeout # timeout = "default" (or any non-number) should never be passed by callers, # but is used here to indicate use of configured default. @@ -394,19 +408,23 @@ def flush(self, timeout="default", timeout_jiffy=0.1): timeout = -1 elif timeout == "default": timeout = self.config['produce_timeout'] - left = len(self) + with self.messages_processed_lock: + left = len(self) + final_processed = self.messages_processed+left initial = left self._log.debug("Entering Producer flush with timeout=%s and left=%s.", str(timeout), str(left)) if timeout<0: while left > 0: con = self.poll(timeout_jiffy) initial -= con - left = min([len(self),initial]) + with self.messages_processed_lock: + left = min([len(self),initial,max([0,final_processed-self.messages_processed])]) self._log.debug("Producer flush poll cycle complete, left=%s.", str(left)) elif left > 0: con = self.poll(timeout) initial -= con - left = min([len(self),initial]) + with self.messages_processed_lock: + left = min([len(self),initial,max([0,final_processed-self.messages_processed])]) self._log.debug("Producer flush single poll cycle complete, left=%s.", str(left)) if max([left,0]) != 0: self._log.info("Producer flush complete, but messages left=%s greater than zero.", str(max([left,0]))) @@ -421,7 +439,11 @@ def poll(self, timeout=0): if timeout is None: # confluent_kafka uses -1 for infinite timeout timeout = -1 - return super().poll(timeout) + rv = super().poll(timeout) + if rv > 0: + with self.messages_processed_lock: + self.messages_processed += rv + return rv def send(self, topic, value=None, key=None, headers=None, partition=0, timestamp_ms=None): self._log.debug("Executing Producer send to topic=%s, with value=%s, key=%s, headers=%s, partition=%s, timestamp_ms=%s.", str(topic), str(value), str(key), str(headers), str(partition), str(timestamp_ms))