Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connection_timeout_ms and reset the timeout counter more often #2388

Open
wants to merge 36 commits into
base: master
Choose a base branch
from
Open
Changes from 13 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
85f91c6
Add connection_timeout_ms and reset the timeout counter more often
petterroea Aug 9, 2023
0d6d70e
Refactor last_attempt -> last_activity
petterroea Aug 14, 2023
015268d
Make tests work again
petterroea Aug 14, 2023
6876f68
Add unit tests of new BrokerConnection functionality
petterroea Aug 14, 2023
91581ed
Re-introduce last_attempt to avoid breakage
petterroea Nov 24, 2023
b95e46d
Rename project from kafka-python to kafka-python-ng (#1)
wbarnha Mar 7, 2024
78c74c0
Fix artifact downloads for release
wbarnha Mar 7, 2024
e796019
Fix badge links in README.rst
wbarnha Mar 8, 2024
38e159a
Reconfigure tests to complete in a more timely manner and skip some i…
wbarnha Mar 8, 2024
e762321
Test Kafka 0.8.2.2 using Python 3.10 in the meantime (#161)
wbarnha Mar 9, 2024
00750aa
Remove support for EOL'ed versions of Python (#160)
wbarnha Mar 9, 2024
5bd1323
Stop testing Python 3.13 in python-package.yml (#162)
wbarnha Mar 9, 2024
cda8f81
Avoid 100% CPU usage while socket is closed (#156)
wbarnha Mar 9, 2024
c02df08
Fix DescribeConfigsResponse_v1 config_source (#150)
wbarnha Mar 9, 2024
65eacfb
Fix base class of DescribeClientQuotasResponse_v0 (#144)
wbarnha Mar 10, 2024
e0ebe5d
Update license_file to license_files (#131)
wbarnha Mar 10, 2024
26bb3eb
Update some RST documentation syntax (#130)
wbarnha Mar 10, 2024
2e6649c
Merge branch 'master' into master
wbarnha Mar 10, 2024
88763da
Fix crc32c's __main__ for Python 3 (#142)
wbarnha Mar 10, 2024
b1a4c53
Strip trailing dot off hostname. (#133)
wbarnha Mar 10, 2024
18eaa2d
Handle OSError to properly recycle SSL connection, fix infinite loop …
wbarnha Mar 10, 2024
54cbd63
client_async: Allow throwing an exception upon socket error during (#…
wbarnha Mar 10, 2024
eb6fd9b
Log connection errors at ERROR level (#139)
wbarnha Mar 12, 2024
6ad79a4
Support custom SASL mechanisms including AWS MSK (#170)
wbarnha Mar 18, 2024
deeccfa
Update python-package.yml to have 15m as timeout
wbarnha Mar 18, 2024
fcca556
Run pyupgrade on everything. (#171)
wbarnha Mar 18, 2024
a856dc4
Remove all vendoring (#169)
s-t-e-v-e-n-k Mar 19, 2024
2f2ccb1
Support Describe log dirs (#145)
wbarnha Mar 19, 2024
0259502
Update conftest.py to use request.node.originalname instead for legal…
wbarnha Mar 20, 2024
3c124b2
KIP-345 Static membership implementation (#137)
wbarnha Mar 20, 2024
56065da
Use monkeytype to create some semblance of typing (#173)
wbarnha Mar 26, 2024
cbf317b
Add zstd support on legacy record and ensure no variable is referred …
wbarnha Mar 26, 2024
d34ad3c
Merge branch 'master' into master
wbarnha Mar 26, 2024
af1a5f0
Update __init__.py of SASL to catch ImportErrors in case botocore is …
wbarnha Mar 27, 2024
aba153f
Add botocore to extras in setup.py
wbarnha Mar 27, 2024
a9e30b0
Merge branch 'master' into master
wbarnha Apr 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ jobs:
- name: Build artifacts
run: python -m build
- name: Upload built artifacts for testing
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: ${{ env.sdist-artifact }}
# NOTE: Exact expected file names are specified here
@@ -66,12 +66,8 @@ jobs:
- "3.10"
- "3.11"
- "3.12"
- "pypy3.9"
experimental: [ false ]
include:
- python-version: "pypy3.9"
experimental: true
- python-version: "~3.13.0-0"
experimental: true
steps:
- name: Checkout the source code
uses: actions/checkout@v4
@@ -111,15 +107,15 @@ jobs:
KAFKA_VERSION: ${{ env.KAFKA_LATEST }}

test-kafka:
name: Tests for Kafka ${{ matrix.kafka-version }}
name: Tests for Kafka ${{ matrix.kafka-version }} (Python ${{ matrix.python-version }})
needs:
- build-sdist
runs-on: ubuntu-latest
timeout-minutes: 10
strategy:
fail-fast: false
matrix:
kafka-version:
- "0.8.2.2"
- "0.9.0.1"
- "0.10.2.2"
- "0.11.0.2"
@@ -128,6 +124,18 @@ jobs:
- "2.4.0"
- "2.5.0"
- "2.6.0"
python-version: ['3.12']
experimental: [false]
include:
- kafka-version: '0.8.2.2'
experimental: true
python-version: "3.12"
- kafka-version: '0.8.2.2'
experimental: false
python-version: "3.10"
env:
PYTHON_LATEST: ${{ matrix.python-version }}
continue-on-error: ${{ matrix.experimental }}
steps:
- name: Checkout the source code
uses: actions/checkout@v4
@@ -141,7 +149,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_LATEST }}
python-version: ${{ matrix.python-version }}
cache: pip
cache-dependency-path: |
requirements-dev.txt
@@ -187,11 +195,11 @@ jobs:
environment: pypi
if: github.event_name == 'release' && github.event.action == 'created'
steps:
- name: Download the sdist artifact
uses: actions/download-artifact@v3
- name: Download the artifacts
uses: actions/download-artifact@v4
with:
name: artifact
path: dist
name: ${{ env.sdist-artifact }}
path: dist/${{ env.sdist-name }}
- name: Publish package to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -413,7 +413,7 @@ Some of the major changes include:
* SASL authentication is working (we think)
* Removed several circular references to improve gc on close()

Thanks to all contributors -- the state of the kafka-python community is strong!
Thanks to all contributors -- the state of the kafka-python-ng community is strong!

Detailed changelog are listed below:

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ cov-local: build-integration
@echo "open file://`pwd`/htmlcov/index.html"

# Check the readme for syntax errors, which can lead to invalid formatting on
# PyPi homepage (https://pypi.python.org/pypi/kafka-python)
# PyPi homepage (https://pypi.python.org/pypi/kafka-python-ng)
check-readme:
python setup.py check -rms

195 changes: 119 additions & 76 deletions README.rst
Original file line number Diff line number Diff line change
@@ -2,27 +2,27 @@ Kafka Python client
------------------------

.. image:: https://img.shields.io/badge/kafka-2.6%2C%202.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg
:target: https://kafka-python.readthedocs.io/en/master/compatibility.html
.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg
:target: https://pypi.python.org/pypi/kafka-python
.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github
:target: https://coveralls.io/github/dpkp/kafka-python?branch=master
:target: https://kafka-python-ng.readthedocs.io/en/master/compatibility.html
.. image:: https://img.shields.io/pypi/pyversions/kafka-python-ng.svg
:target: https://pypi.python.org/pypi/kafka-python-ng
.. image:: https://coveralls.io/repos/wbarnha/kafka-python-ng/badge.svg?branch=master&service=github
:target: https://coveralls.io/github/wbarnha/kafka-python-ng?branch=master
.. image:: https://img.shields.io/badge/license-Apache%202-blue.svg
:target: https://github.com/dpkp/kafka-python/blob/master/LICENSE
.. image:: https://img.shields.io/pypi/dw/kafka-python.svg
:target: https://pypistats.org/packages/kafka-python
:target: https://github.com/wbarnha/kafka-python-ng/blob/master/LICENSE
.. image:: https://img.shields.io/pypi/dw/kafka-python-ng.svg
:target: https://pypistats.org/packages/kafka-python-ng
.. image:: https://img.shields.io/pypi/v/kafka-python.svg
:target: https://pypi.org/project/kafka-python
.. image:: https://img.shields.io/pypi/implementation/kafka-python
:target: https://github.com/dpkp/kafka-python/blob/master/setup.py
:target: https://pypi.org/project/kafka-python-ng
.. image:: https://img.shields.io/pypi/implementation/kafka-python-ng
:target: https://github.com/wbarnha/kafka-python-ng/blob/master/setup.py



Python client for the Apache Kafka distributed stream processing system.
kafka-python is designed to function much like the official java client, with a
kafka-python-ng is designed to function much like the official java client, with a
sprinkling of pythonic interfaces (e.g., consumer iterators).

kafka-python is best used with newer brokers (0.9+), but is backwards-compatible with
kafka-python-ng is best used with newer brokers (0.9+), but is backwards-compatible with
older versions (to 0.8.0). Some features will only be enabled on newer brokers.
For example, fully coordinated consumer groups -- i.e., dynamic partition
assignment to multiple consumers in the same group -- requires use of 0.9+ kafka
@@ -32,13 +32,19 @@ check code (perhaps using zookeeper or consul). For older brokers, you can
achieve something similar by manually assigning different partitions to each
consumer instance with config management tools like chef, ansible, etc. This
approach will work fine, though it does not support rebalancing on failures.
See <https://kafka-python.readthedocs.io/en/master/compatibility.html>

See https://kafka-python.readthedocs.io/en/master/compatibility.html

for more details.

Please note that the master branch may contain unreleased features. For release
documentation, please see readthedocs and/or python's inline help.

>>> pip install kafka-python

.. code-block:: bash

$ pip install kafka-python-ng



KafkaConsumer
@@ -48,89 +54,123 @@ KafkaConsumer is a high-level message consumer, intended to operate as similarly
as possible to the official java client. Full support for coordinated
consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+.

See <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html>

See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

for API and configuration details.

The consumer iterator returns ConsumerRecords, which are simple namedtuples
that expose basic message attributes: topic, partition, offset, key, and value:

>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic')
>>> for msg in consumer:
... print (msg)
.. code-block:: python

>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
>>> for msg in consumer:
... print (msg)
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
for msg in consumer:
print (msg)

>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)
.. code-block:: python

>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
... assert isinstance(msg.value, dict)
# join a consumer group for dynamic partition assignment and offset commits
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
for msg in consumer:
print (msg)

>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)
.. code-block:: python

>>> # Get consumer metrics
>>> metrics = consumer.metrics()
# manually assign the partition list for the consumer
from kafka import TopicPartition
consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
consumer.assign([TopicPartition('foobar', 2)])
msg = next(consumer)

.. code-block:: python

# Deserialize msgpack-encoded values
consumer = KafkaConsumer(value_deserializer=msgpack.loads)
consumer.subscribe(['msgpackfoo'])
for msg in consumer:
assert isinstance(msg.value, dict)

.. code-block:: python

# Access record headers. The returned value is a list of tuples
# with str, bytes for key and value
for msg in consumer:
print (msg.headers)

.. code-block:: python

# Get consumer metrics
metrics = consumer.metrics()


KafkaProducer
*************

KafkaProducer is a high-level, asynchronous message producer. The class is
intended to operate as similarly as possible to the official java client.
See <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html>

See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

for more details.

>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:1234')
>>> for _ in range(100):
... producer.send('foobar', b'some_message_bytes')
.. code-block:: python

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:1234')
for _ in range(100):
producer.send('foobar', b'some_message_bytes')

.. code-block:: python

# Block until a single message is sent (or timeout)
future = producer.send('foobar', b'another_message')
result = future.get(timeout=60)

.. code-block:: python

# Block until all pending messages are at least put on the network
# NOTE: This does not guarantee delivery or success! It is really
# only useful if you configure internal batching using linger_ms
producer.flush()

.. code-block:: python

>>> # Block until a single message is sent (or timeout)
>>> future = producer.send('foobar', b'another_message')
>>> result = future.get(timeout=60)
# Use a key for hashed-partitioning
producer.send('foobar', key=b'foo', value=b'bar')

>>> # Block until all pending messages are at least put on the network
>>> # NOTE: This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
.. code-block:: python

>>> # Use a key for hashed-partitioning
>>> producer.send('foobar', key=b'foo', value=b'bar')
# Serialize json messages
import json
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('fizzbuzz', {'foo': 'bar'})

>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('fizzbuzz', {'foo': 'bar'})
.. code-block:: python

>>> # Serialize string keys
>>> producer = KafkaProducer(key_serializer=str.encode)
>>> producer.send('flipflap', key='ping', value=b'1234')
# Serialize string keys
producer = KafkaProducer(key_serializer=str.encode)
producer.send('flipflap', key='ping', value=b'1234')

>>> # Compress messages
>>> producer = KafkaProducer(compression_type='gzip')
>>> for i in range(1000):
... producer.send('foobar', b'msg %d' % i)
.. code-block:: python

>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])
# Compress messages
producer = KafkaProducer(compression_type='gzip')
for i in range(1000):
producer.send('foobar', b'msg %d' % i)

>>> # Get producer performance metrics
>>> metrics = producer.metrics()
.. code-block:: python

# Include record headers. The format is list of tuples with string key
# and bytes value.
producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])

.. code-block:: python

# Get producer performance metrics
metrics = producer.metrics()


Thread safety
@@ -146,31 +186,34 @@ multiprocessing is recommended.
Compression
***********

kafka-python supports the following compression formats:
kafka-python-ng supports the following compression formats:

- gzip
- LZ4
- Snappy
- Zstandard (zstd)

gzip is supported natively, the others require installing additional libraries.
See <https://kafka-python.readthedocs.io/en/master/install.html> for more information.

See https://kafka-python.readthedocs.io/en/master/install.html for more information.



Optimized CRC32 Validation
**************************

Kafka uses CRC32 checksums to validate messages. kafka-python includes a pure
Kafka uses CRC32 checksums to validate messages. kafka-python-ng includes a pure
python implementation for compatibility. To improve performance for high-throughput
applications, kafka-python will use `crc32c` for optimized native code if installed.
See <https://kafka-python.readthedocs.io/en/master/install.html> for installation instructions.
See https://kafka-python.readthedocs.io/en/master/install.html for installation instructions.

See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib.


Protocol
********

A secondary goal of kafka-python is to provide an easy-to-use protocol layer
A secondary goal of kafka-python-ng is to provide an easy-to-use protocol layer
for interacting with kafka brokers via the python repl. This is useful for
testing, probing, and general experimentation. The protocol support is
leveraged to enable a KafkaClient.check_version() method that
Loading