diff --git a/woof/consumer.py b/woof/consumer.py index aef8b89..4c1bcfc 100644 --- a/woof/consumer.py +++ b/woof/consumer.py @@ -58,13 +58,14 @@ def __init__(self, self.wait_time_before_exit = wait_time_before_exit if use_zk: - kwargs['api_version'] = '0.8.1' + kwargs['api_version'] = kwargs.get('api_version', '0.8.1') # ZK autocommit does not seem to work reliably # TODO self.async_commit = False else: - kwargs['api_version'] = CURRENT_PROD_BROKER_VERSION + kwargs['api_version'] = kwargs.get('api_version', + CURRENT_PROD_BROKER_VERSION) try: self.cons = KafkaConsumer( diff --git a/woof/partitioned_producer.py b/woof/partitioned_producer.py index cd3e7aa..4bf3552 100644 --- a/woof/partitioned_producer.py +++ b/woof/partitioned_producer.py @@ -27,7 +27,8 @@ def __init__(self, broker, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, # unused - here for legacy support - retries=3): + retries=3, + **kwargs): try: self.async = async @@ -35,14 +36,15 @@ def __init__(self, broker, _partitioner = CustomPartitioner(partitioner) else: _partitioner = DefaultPartitioner() - + kwargs['api_version'] = kwargs.get('api_version', + CURRENT_PROD_BROKER_VERSION) self.prod = KafkaProducer(bootstrap_servers=broker, key_serializer=make_kafka_safe, value_serializer=make_kafka_safe, batch_size=batch_send_every_n, retries=retries, - api_version=CURRENT_PROD_BROKER_VERSION, - partitioner=_partitioner) + partitioner=_partitioner, + **kwargs) except Exception as e1: log.error("[partitionedproducer log] GEN err %s /n", str(e1)) raise @@ -104,16 +106,17 @@ class CyclicPartitionedProducer(KafkaProducer): use send() to send to any topic and distribute keys cyclically in partitions """ - def __init__(self, broker, async=True, random_start=True): + def __init__(self, broker, async=True, random_start=True, **kwargs): self.partition_cycles = {} self.random_start = random_start self.async = async - + kwargs['api_version'] = kwargs.get('api_version', + CURRENT_PROD_BROKER_VERSION) super(CyclicPartitionedProducer, self).__init__( bootstrap_servers=broker, key_serializer=make_kafka_safe, value_serializer=make_kafka_safe, - api_version=CURRENT_PROD_BROKER_VERSION) + **kwargs) def _partition(self, topic, partition, key, value, serialized_key, serialized_value): diff --git a/woof/producer.py b/woof/producer.py index 7e80bee..62177b2 100644 --- a/woof/producer.py +++ b/woof/producer.py @@ -13,13 +13,15 @@ class FeedProducer(): use send() to send to any topic """ - def __init__(self, broker, retries=3, async=False): + def __init__(self, broker, retries=3, async=False, **kwargs): try: + kwargs['api_version'] = kwargs.get('api_version', + CURRENT_PROD_BROKER_VERSION) self.prod = KafkaProducer(bootstrap_servers=broker, key_serializer=make_kafka_safe, value_serializer=make_kafka_safe, retries=retries, - api_version=CURRENT_PROD_BROKER_VERSION) + **kwargs) self.async = async except Exception as e: log.error("[feedproducer log] Constructor error ERROR %s /n", diff --git a/woof/transactions.py b/woof/transactions.py index a1ce874..d9008b6 100644 --- a/woof/transactions.py +++ b/woof/transactions.py @@ -15,19 +15,22 @@ def __init__(self, vertical, host=socket.gethostname(), async=False, - retries=1): + retries=1, + **kwargs): self.broker = broker self.this_host = host self.vertical = vertical self.async = async self.topic = _get_topic_from_vertical(vertical) + kwargs['api_version'] = kwargs.get('api_version', + CURRENT_PROD_BROKER_VERSION) # thread safe producer, uses default murmur2 partiioner by default # good for us self.producer = KafkaProducer(bootstrap_servers=broker, key_serializer=make_kafka_safe, value_serializer=make_kafka_safe, - api_version=CURRENT_PROD_BROKER_VERSION, - retries=retries) + retries=retries, + **kwargs) def New(self, txn_id, @@ -90,6 +93,7 @@ def _send_log(self, txn_id, msg) try: self.producer.send(self.topic, key=txn_id, value=msg) + print self.topic self.producer.flush() except KafkaTimeoutError as e: log.error(