Skip to content

Commit

Permalink
added kwargs, api_version can be specified
Browse files Browse the repository at this point in the history
  • Loading branch information
sudev committed Jul 28, 2016
1 parent 9bbb87c commit f318de6
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 14 deletions.
5 changes: 3 additions & 2 deletions woof/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@ def __init__(self,
self.wait_time_before_exit = wait_time_before_exit

if use_zk:
kwargs['api_version'] = '0.8.1'
kwargs['api_version'] = kwargs.get('api_version', '0.8.1')
# ZK autocommit does not seem to work reliably
# TODO
self.async_commit = False

else:
kwargs['api_version'] = CURRENT_PROD_BROKER_VERSION
kwargs['api_version'] = kwargs.get('api_version',
CURRENT_PROD_BROKER_VERSION)

try:
self.cons = KafkaConsumer(
Expand Down
17 changes: 10 additions & 7 deletions woof/partitioned_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,24 @@ def __init__(self, broker,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, # unused - here for legacy support
retries=3):
retries=3,
**kwargs):

try:
self.async = async
if partitioner is not None:
_partitioner = CustomPartitioner(partitioner)
else:
_partitioner = DefaultPartitioner()

kwargs['api_version'] = kwargs.get('api_version',
CURRENT_PROD_BROKER_VERSION)
self.prod = KafkaProducer(bootstrap_servers=broker,
key_serializer=make_kafka_safe,
value_serializer=make_kafka_safe,
batch_size=batch_send_every_n,
retries=retries,
api_version=CURRENT_PROD_BROKER_VERSION,
partitioner=_partitioner)
partitioner=_partitioner,
**kwargs)
except Exception as e1:
log.error("[partitionedproducer log] GEN err %s /n", str(e1))
raise
Expand Down Expand Up @@ -104,16 +106,17 @@ class CyclicPartitionedProducer(KafkaProducer):
use send() to send to any topic and distribute keys cyclically in partitions
"""

def __init__(self, broker, async=True, random_start=True):
def __init__(self, broker, async=True, random_start=True, **kwargs):
self.partition_cycles = {}
self.random_start = random_start
self.async = async

kwargs['api_version'] = kwargs.get('api_version',
CURRENT_PROD_BROKER_VERSION)
super(CyclicPartitionedProducer, self).__init__(
bootstrap_servers=broker,
key_serializer=make_kafka_safe,
value_serializer=make_kafka_safe,
api_version=CURRENT_PROD_BROKER_VERSION)
**kwargs)

def _partition(self, topic, partition, key, value, serialized_key,
serialized_value):
Expand Down
6 changes: 4 additions & 2 deletions woof/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ class FeedProducer():
use send() to send to any topic
"""

def __init__(self, broker, retries=3, async=False):
def __init__(self, broker, retries=3, async=False, **kwargs):
try:
kwargs['api_version'] = kwargs.get('api_version',
CURRENT_PROD_BROKER_VERSION)
self.prod = KafkaProducer(bootstrap_servers=broker,
key_serializer=make_kafka_safe,
value_serializer=make_kafka_safe,
retries=retries,
api_version=CURRENT_PROD_BROKER_VERSION)
**kwargs)
self.async = async
except Exception as e:
log.error("[feedproducer log] Constructor error ERROR %s /n",
Expand Down
10 changes: 7 additions & 3 deletions woof/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@ def __init__(self,
vertical,
host=socket.gethostname(),
async=False,
retries=1):
retries=1,
**kwargs):
self.broker = broker
self.this_host = host
self.vertical = vertical
self.async = async
self.topic = _get_topic_from_vertical(vertical)
kwargs['api_version'] = kwargs.get('api_version',
CURRENT_PROD_BROKER_VERSION)
# thread safe producer, uses default murmur2 partiioner by default
# good for us
self.producer = KafkaProducer(bootstrap_servers=broker,
key_serializer=make_kafka_safe,
value_serializer=make_kafka_safe,
api_version=CURRENT_PROD_BROKER_VERSION,
retries=retries)
retries=retries,
**kwargs)

def New(self,
txn_id,
Expand Down Expand Up @@ -90,6 +93,7 @@ def _send_log(self,
txn_id, msg)
try:
self.producer.send(self.topic, key=txn_id, value=msg)
print self.topic
self.producer.flush()
except KafkaTimeoutError as e:
log.error(
Expand Down

0 comments on commit f318de6

Please sign in to comment.