Skip to content

Commit

Permalink
Merge branch 'master' into propagate-publish-errors
Browse files Browse the repository at this point in the history
  • Loading branch information
wbarnha authored Mar 28, 2024
2 parents c3cbf95 + aba153f commit 7b425cb
Show file tree
Hide file tree
Showing 63 changed files with 849 additions and 3,144 deletions.
3 changes: 0 additions & 3 deletions .covrc

This file was deleted.

5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 2.0.3 (under development)

Consumer
* KIP-345: Implement static membership support

# 2.0.2 (Sep 29, 2020)

Consumer
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ test-local: build-integration
cov-local: build-integration
KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) pytest \
--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \
--cov-config=.covrc --cov-report html $(FLAGS) kafka test
--cov-report html $(FLAGS) kafka test
@echo "open file://`pwd`/htmlcov/index.html"

# Check the readme for syntax errors, which can lead to invalid formatting on
Expand Down
6 changes: 5 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ that expose basic message attributes: topic, partition, offset, key, and value:

.. code-block:: python
# join a consumer group for dynamic partition assignment and offset commits
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
# or as a static member with a fixed group member name
# consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group',
# group_instance_id='consumer-1', leave_group_on_close=False)
for msg in consumer:
print (msg)
Expand Down
2 changes: 0 additions & 2 deletions benchmarks/consumer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import threading
import traceback

from kafka.vendor.six.moves import range

from kafka import KafkaConsumer, KafkaProducer
from test.fixtures import KafkaFixture, ZookeeperFixture

Expand Down
2 changes: 0 additions & 2 deletions benchmarks/producer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import threading
import traceback

from kafka.vendor.six.moves import range

from kafka import KafkaProducer
from test.fixtures import KafkaFixture, ZookeeperFixture

Expand Down
25 changes: 9 additions & 16 deletions benchmarks/varint_speed.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/usr/bin/env python
from __future__ import print_function
import pyperf
from kafka.vendor import six


test_data = [
Expand Down Expand Up @@ -67,6 +65,10 @@
BENCH_VALUES_DEC = list(map(bytearray, BENCH_VALUES_DEC))


def int2byte(i):
return bytes((i),)


def _assert_valid_enc(enc_func):
for encoded, decoded in test_data:
assert enc_func(decoded) == encoded, decoded
Expand Down Expand Up @@ -116,7 +118,7 @@ def encode_varint_1(num):
_assert_valid_enc(encode_varint_1)


def encode_varint_2(value, int2byte=six.int2byte):
def encode_varint_2(value, int2byte=int2byte):
value = (value << 1) ^ (value >> 63)

bits = value & 0x7f
Expand Down Expand Up @@ -151,7 +153,7 @@ def encode_varint_3(value, buf):
assert res == encoded


def encode_varint_4(value, int2byte=six.int2byte):
def encode_varint_4(value, int2byte=int2byte):
value = (value << 1) ^ (value >> 63)

if value <= 0x7f: # 1 byte
Expand Down Expand Up @@ -301,22 +303,13 @@ def size_of_varint_2(value):
_assert_valid_size(size_of_varint_2)


if six.PY3:
def _read_byte(memview, pos):
""" Read a byte from memoryview as an integer
Raises:
IndexError: if position is out of bounds
"""
return memview[pos]
else:
def _read_byte(memview, pos):
""" Read a byte from memoryview as an integer
def _read_byte(memview, pos):
""" Read a byte from memoryview as an integer
Raises:
IndexError: if position is out of bounds
"""
return ord(memview[pos])
return memview[pos]


def decode_varint_1(buffer, pos=0):
Expand Down
7 changes: 7 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
=========

2.2.0
####################

Consumer
--------
* KIP-345: Implement static membership support


2.0.2 (Sep 29, 2020)
####################
Expand Down
12 changes: 12 additions & 0 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ KafkaConsumer
group_id='my-group',
bootstrap_servers='my.server.com')
# Use multiple static consumers w/ 2.3.0 kafka brokers
consumer1 = KafkaConsumer('my-topic',
group_id='my-group',
group_instance_id='process-1',
leave_group_on_close=False,
bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('my-topic',
group_id='my-group',
group_instance_id='process-2',
leave_group_on_close=False,
bootstrap_servers='my.server.com')
There are many configuration options for the consumer class. See
:class:`~kafka.KafkaConsumer` API documentation for more details.
Expand Down
9 changes: 2 additions & 7 deletions kafka/admin/acl_resource.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
from kafka.errors import IllegalArgumentError
from enum import IntEnum

# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor.enum34 import IntEnum
from kafka.errors import IllegalArgumentError


class ResourceType(IntEnum):
Expand Down
19 changes: 17 additions & 2 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import socket

from . import ConfigResourceType
from kafka.vendor import six

from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
ACLResourcePatternType
Expand All @@ -18,7 +17,7 @@
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
DeleteGroupsRequest
DeleteGroupsRequest, DescribeLogDirsRequest
)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
Expand Down Expand Up @@ -1343,3 +1342,19 @@ def _wait_for_futures(self, futures):

if future.failed():
raise future.exception # pylint: disable-msg=raising-bad-type

def describe_log_dirs(self):
"""Send a DescribeLogDirsRequest request to a broker.
:return: A message future
"""
version = self._matching_api_version(DescribeLogDirsRequest)
if version <= 1:
request = DescribeLogDirsRequest[version]()
future = self._send_request_to_node(self._client.least_loaded_node(), request)
self._wait_for_futures([future])
else:
raise NotImplementedError(
"Support for DescribeLogDirsRequest_v{} has not yet been added to KafkaAdminClient."
.format(version))
return future.value
7 changes: 1 addition & 6 deletions kafka/admin/config_resource.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor.enum34 import IntEnum
from enum import IntEnum


class ConfigResourceType(IntEnum):
Expand Down
13 changes: 1 addition & 12 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,12 @@
import copy
import logging
import random
import selectors
import socket
import threading
import time
import weakref

# selectors in stdlib as of py3.4
try:
import selectors # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor import selectors34 as selectors

from kafka.vendor import six

from kafka.cluster import ClusterMetadata
from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
from kafka import errors as Errors
Expand All @@ -25,9 +17,6 @@
from kafka.metrics.stats.rate import TimeUnit
from kafka.protocol.metadata import MetadataRequest
from kafka.util import Dict, WeakMethod
# Although this looks unused, it actually monkey-patches socket.socketpair()
# and should be left in as long as we're using socket.socketpair() in this file
from kafka.vendor import socketpair
from kafka.version import __version__

log = logging.getLogger('kafka.client')
Expand Down
2 changes: 0 additions & 2 deletions kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import threading
import time

from kafka.vendor import six

from kafka import errors as Errors
from kafka.conn import collect_hosts
from kafka.future import Future
Expand Down
3 changes: 0 additions & 3 deletions kafka/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
import platform
import struct

from kafka.vendor import six
from kafka.vendor.six.moves import range

_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024
Expand Down
14 changes: 1 addition & 13 deletions kafka/conn.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
import copy
import errno
import logging
import selectors
from random import shuffle, uniform

# selectors in stdlib as of py3.4
try:
import selectors # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor import selectors34 as selectors

import socket
import threading
import time

from kafka.vendor import six

from kafka import sasl
import kafka.errors as Errors
from kafka.future import Future
Expand Down Expand Up @@ -565,8 +557,6 @@ def _send_bytes(self, data):
except (SSLWantReadError, SSLWantWriteError):
break
except (ConnectionError, TimeoutError) as e:
if six.PY2 and e.errno == errno.EWOULDBLOCK:
break
raise
except BlockingIOError:
break
Expand Down Expand Up @@ -863,8 +853,6 @@ def _recv(self):
except (SSLWantReadError, SSLWantWriteError):
break
except (ConnectionError, TimeoutError) as e:
if six.PY2 and e.errno == errno.EWOULDBLOCK:
break
log.exception('%s: Error receiving network data'
' closing socket', self)
err = Errors.KafkaConnectionError(e)
Expand Down
2 changes: 0 additions & 2 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import sys
import time

from kafka.vendor import six

import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
Expand Down
14 changes: 11 additions & 3 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

from kafka.errors import KafkaConfigurationError, UnsupportedVersionError

from kafka.vendor import six

from kafka.client_async import KafkaClient, selectors
from kafka.consumer.fetcher import Fetcher
from kafka.consumer.subscription_state import SubscriptionState
Expand Down Expand Up @@ -54,6 +52,12 @@ class KafkaConsumer:
committing offsets. If None, auto-partition assignment (via
group coordinator) and offset commits are disabled.
Default: None
group_instance_id (str): the unique identifier to distinguish
each client instance. If set and leave_group_on_close is
False consumer group rebalancing won't be triggered until
sessiont_timeout_ms is met. Requires 2.3.0+.
leave_group_on_close (bool or None): whether to leave a consumer
group or not on consumer shutdown.
key_deserializer (callable): Any callable that takes a
raw message key and returns a deserialized key.
value_deserializer (callable): Any callable that takes a
Expand Down Expand Up @@ -243,6 +247,7 @@ class KafkaConsumer:
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
kafka_client (callable): Custom class / callable for creating KafkaClient instances
coordinator (callable): Custom class / callable for creating ConsumerCoordinator instances
Note:
Configuration parameters are described in more detail at
Expand All @@ -252,6 +257,8 @@ class KafkaConsumer:
'bootstrap_servers': 'localhost',
'client_id': 'kafka-python-' + __version__,
'group_id': None,
'group_instance_id': '',
'leave_group_on_close': None,
'key_deserializer': None,
'value_deserializer': None,
'fetch_max_wait_ms': 500,
Expand Down Expand Up @@ -306,6 +313,7 @@ class KafkaConsumer:
'sasl_oauth_token_provider': None,
'legacy_iterator': False, # enable to revert to < 1.4.7 iterator
'kafka_client': KafkaClient,
'coordinator': ConsumerCoordinator,
}
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000

Expand Down Expand Up @@ -381,7 +389,7 @@ def __init__(self, *topics, **configs):
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
self._client, self._subscription, self._metrics, **self.config)
self._coordinator = ConsumerCoordinator(
self._coordinator = self.config['coordinator'](
self._client, self._subscription, self._metrics,
assignors=self.config['partition_assignment_strategy'],
**self.config)
Expand Down
2 changes: 0 additions & 2 deletions kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
import logging
import re

from kafka.vendor import six

from kafka.errors import IllegalStateError
from kafka.protocol.offset import OffsetResetStrategy
from kafka.structs import OffsetAndMetadata
Expand Down
2 changes: 1 addition & 1 deletion kafka/coordinator/assignors/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class AbstractPartitionAssignor(object):
partition counts which are always needed in assignors).
"""

@abc.abstractproperty
@abc.abstractmethod
def name(self):
""".name should be a string identifying the assignor"""
pass
Expand Down
Loading

0 comments on commit 7b425cb

Please sign in to comment.