Skip to content

Commit

Permalink
Add support for confluent python lib (#1042)
Browse files Browse the repository at this point in the history
* Use our own ConsumerRecord

* Replace aiokafka ConsumerRecord with custom one

* Update api docs

* Use _missing from local

* Move ConsumerRecord to separate file

* WIP: Add custom producer using Confluent Kafka

It still behaves like sync. Asyncify?

* Remove unnecessary line

* Add one more config param

* Fix patching in security tests

* Update tests for kafka security

* Add acks param

* WIP: Add consumer

* Add separate ConfluentKafkaBroker and tests for it

* WIP: Integrate confluent async consumer to wrappers

* Use aiokafka's ConsumerRecord and fix mypy issues as much as possible

* Update docstring

* Update security tests

* Update docs

* Revert back old changes

* Use confluent kafka consumer

* WIP: Debug tests

* Catch confluent_kafka's KafkaException also

* Remove using AbstractPartitionAssignor from aiokafka in confluent_broker

* Ad confluent_broker.pyi

* Use confluent_kafka.Message instead of ConsumerRecord in AsyncConfluentConsumer

* Use only necessary config params for AdminClient

* Add docstring at missing places

* refactor: refactor confluent get_data methods

* WIP: Separate kafka and confluent

* WIP: Lint

* Separate out confluent as standalone broker

* Update API docs

* Pass sasl params only if sasl_mechanism is provided

* linting...

* linting...

* WIP: Reformat tests

* Update import

* Fix mypy issues

* WIP: more mypy fixes

* Use warning instead of warn

* Enable confluent security tests

* Add security tests

* Fix mypy issue

* Add tests for router

* Fix ruff linting issues

* Fix some mypy issues

* Fix import issues

* Fix typo and update similar to main changes

* enable CI on the branch

* enable CI on the branch

* fixing pre-commit

* Install confluent dependency in workflows

* Add test-confluent-smoke workflow

* Add 'not confluent' in workflow

* Add 'not confluent' in workflow

* Disable test which is getting stuck randomly

* Commit and close confluent consumer, Enable problematic tests

* Add docs for confluent KafkaBroker

* Install pytest-retry above 3.8 version and update tests

* Revert fastapi test changes

* Bump version to 0.4.0rc0

---------

Co-authored-by: Sternakt <[email protected]>
Co-authored-by: Nikita Pastukhov <[email protected]>
Co-authored-by: Davor Runje <[email protected]>
  • Loading branch information
4 people authored Jan 12, 2024
1 parent d391563 commit 5cac95b
Show file tree
Hide file tree
Showing 118 changed files with 7,658 additions and 22 deletions.
1 change: 1 addition & 0 deletions .codespell-whitelist.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dependant
89 changes: 76 additions & 13 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- main
- use-confluent
pull_request:
types: [opened, synchronize]
merge_group:
Expand All @@ -22,7 +23,7 @@ jobs:
run: |
set -ux
python -m pip install --upgrade pip
pip install -e ".[docs,rabbit,kafka,redis,nats,lint]"
pip install -e ".[docs,rabbit,kafka,confluent,redis,nats,lint]"
- name: Run ruff
shell: bash
Expand Down Expand Up @@ -64,7 +65,7 @@ jobs:
key: ${{ runner.os }}-python-${{ env.pythonLocation }}-${{ hashFiles('pyproject.toml') }}-test-v03
- name: Install Dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: pip install .[rabbit,kafka,nats,redis,docs,testing]
run: pip install .[rabbit,kafka,confluent,nats,redis,docs,testing]
- name: Install Pydantic v1
if: matrix.pydantic-version == 'pydantic-v1'
run: pip install "pydantic>=1.10.0,<2.0.0"
Expand All @@ -73,7 +74,7 @@ jobs:
run: pip install --pre "pydantic>=2.0.0b2,<3.0.0"
- run: mkdir coverage
- name: Test
run: bash scripts/test.sh -m "(slow and (not nats and not kafka and not rabbit and not redis)) or (not nats and not kafka and not rabbit and not redis)"
run: bash scripts/test.sh -m "(slow and (not nats and not kafka and not confluent and not rabbit and not redis)) or (not nats and not kafka and not confluent and not rabbit and not redis)"
env:
COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }}
CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }}
Expand All @@ -97,10 +98,10 @@ jobs:
cache-dependency-path: pyproject.toml
- name: Install Dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: pip install .[nats,kafka,rabbit,redis,docs,testing] orjson
run: pip install .[nats,kafka,confluent,rabbit,redis,docs,testing] orjson
- run: mkdir coverage
- name: Test
run: bash scripts/test.sh -m"(slow and (not nats and not kafka and not rabbit and not redis)) or (not nats and not kafka and not rabbit and not redis)"
run: bash scripts/test.sh -m"(slow and (not nats and not kafka and not confluent and not rabbit and not redis)) or (not nats and not kafka and not confluent and not rabbit and not redis)"
env:
COVERAGE_FILE: coverage/.coverage.orjson
CONTEXT: orjson
Expand All @@ -124,9 +125,9 @@ jobs:
cache-dependency-path: pyproject.toml
- name: Install Dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: pip install .[rabbit,kafka,nats,redis,docs,testing]
run: pip install .[rabbit,kafka,confluent,nats,redis,docs,testing]
- name: Test
run: bash scripts/test.sh -m "(slow and (not nats and not kafka and not rabbit and not redis)) or (not nats and not kafka and not rabbit and not redis)"
run: bash scripts/test.sh -m "(slow and (not nats and not kafka and not confluent and not rabbit and not redis)) or (not nats and not kafka and not confluent and not rabbit and not redis)"

test-windows-latest:
if: github.event.pull_request.draft == false
Expand All @@ -141,9 +142,9 @@ jobs:
cache-dependency-path: pyproject.toml
- name: Install Dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: pip install .[rabbit,kafka,nats,redis,docs,testing]
run: pip install .[rabbit,kafka,confluent,nats,redis,docs,testing]
- name: Test
run: bash scripts/test.sh -m "(slow and (not nats and not kafka and not rabbit and not redis)) or (not nats and not kafka and not rabbit and not redis)"
run: bash scripts/test.sh -m "(slow and (not nats and not kafka and not confluent and not rabbit and not redis)) or (not nats and not kafka and not confluent and not rabbit and not redis)"

test-kafka-real:
if: github.event.pull_request.draft == false
Expand Down Expand Up @@ -174,7 +175,7 @@ jobs:
cache-dependency-path: pyproject.toml
- name: Install Dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: pip install .[nats,kafka,rabbit,redis,docs,testing]
run: pip install .[nats,kafka,confluent,rabbit,redis,docs,testing]
- run: mkdir coverage
- name: Test
run: bash scripts/test.sh -m "(slow and kafka) or kafka"
Expand Down Expand Up @@ -205,6 +206,65 @@ jobs:
- name: Test
run: bash scripts/test.sh -m "not kafka" tests/brokers/kafka/test_test_client.py

test-confluent-real:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
services:
kafka:
image: bitnami/kafka:3.5.0
ports:
- 9092:9092
env:
KAFKA_ENABLE_KRAFT: "true"
KAFKA_CFG_NODE_ID: "1"
KAFKA_CFG_PROCESS_ROLES: "broker,controller"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://127.0.0.1:9092"
KAFKA_BROKER_ID: "1"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
ALLOW_PLAINTEXT_LISTENER: "true"
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: "pip"
cache-dependency-path: pyproject.toml
- name: Install Dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: pip install .[nats,kafka,confluent,rabbit,redis,docs,testing]
- run: mkdir coverage
- name: Test
run: bash scripts/test.sh -m "(slow and confluent) or confluent"
env:
COVERAGE_FILE: coverage/.coverage.kafka-py
CONTEXT: kafka-py
- name: Store coverage files
uses: actions/upload-artifact@v3
with:
name: coverage
path: coverage

test-confluent-smoke:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: "pip"
cache-dependency-path: pyproject.toml
- name: Install Dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: pip install .[confluent,test-core]
- name: Test
run: bash scripts/test.sh -m "not confluent" tests/brokers/confluent/test_test_client.py

test-rabbit-real:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
Expand All @@ -223,7 +283,7 @@ jobs:
cache-dependency-path: pyproject.toml
- name: Install Dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: pip install .[nats,kafka,rabbit,redis,docs,testing]
run: pip install .[nats,kafka,confluent,rabbit,redis,docs,testing]
- run: mkdir coverage
- name: Test
run: bash scripts/test.sh -m "(slow and rabbit) or rabbit"
Expand Down Expand Up @@ -272,7 +332,7 @@ jobs:
cache-dependency-path: pyproject.toml
- name: Install Dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: pip install .[nats,kafka,rabbit,redis,docs,testing]
run: pip install .[nats,kafka,confluent,rabbit,redis,docs,testing]
- run: mkdir coverage
- name: Test
run: bash scripts/test.sh -m "(slow and nats) or nats"
Expand Down Expand Up @@ -321,7 +381,7 @@ jobs:
cache-dependency-path: pyproject.toml
- name: Install Dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: pip install .[nats,kafka,rabbit,redis,docs,testing]
run: pip install .[nats,kafka,confluent,rabbit,redis,docs,testing]
- run: mkdir coverage
- name: Test
run: bash scripts/test.sh -m "(slow and redis) or redis"
Expand Down Expand Up @@ -357,6 +417,7 @@ jobs:
needs:
- test
- test-kafka-real
- test-confluent-real
- test-rabbit-real
- test-nats-real
- test-redis-real
Expand Down Expand Up @@ -402,6 +463,8 @@ jobs:
- test-windows-latest
- test-kafka-real
- test-kafka-smoke
- test-confluent-real
- test-confluent-smoke
- test-rabbit-real
- test-rabbit-smoke
- test-nats-real
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ repos:
rev: v2.2.6
hooks:
- id: codespell
args: [--ignore-words=.codespell-whitelist.txt]
exclude: |
(?x)^(
docs/overrides/home.html
Expand Down
4 changes: 2 additions & 2 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
"filename": "docs/docs/en/release.md",
"hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450",
"is_verified": false,
"line_number": 485,
"line_number": 502,
"is_secret": false
}
],
Expand Down Expand Up @@ -163,5 +163,5 @@
}
]
},
"generated_at": "2024-01-06T14:20:54Z"
"generated_at": "2024-01-10T10:22:36Z"
}
60 changes: 60 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ search:
- [Acknowledgement](kafka/ack.md)
- [Message Information](kafka/message.md)
- [Security Configuration](kafka/security.md)
- [Confluent](confluent/index.md)
- [Subscription](confluent/Subscriber/index.md)
- [Batch Subscriber](confluent/Subscriber/batch_subscriber.md)
- [Publishing](confluent/Publisher/index.md)
- [Batch Publishing](confluent/Publisher/batch_publisher.md)
- [Publish With Key](confluent/Publisher/using_a_key.md)
- [Acknowledgement](confluent/ack.md)
- [Message Information](confluent/message.md)
- [Security Configuration](confluent/security.md)
- [RabbitMQ](rabbit/index.md)
- [Subscription](rabbit/examples/index.md)
- [Direct](rabbit/examples/direct.md)
Expand Down Expand Up @@ -314,6 +323,57 @@ search:
- parser
- [parse_cli_args](api/faststream/cli/utils/parser/parse_cli_args.md)
- [remove_prefix](api/faststream/cli/utils/parser/remove_prefix.md)
- confluent
- [KafkaBroker](api/faststream/confluent/KafkaBroker.md)
- [KafkaRoute](api/faststream/confluent/KafkaRoute.md)
- [KafkaRouter](api/faststream/confluent/KafkaRouter.md)
- [TestApp](api/faststream/confluent/TestApp.md)
- [TestKafkaBroker](api/faststream/confluent/TestKafkaBroker.md)
- asyncapi
- [Handler](api/faststream/confluent/asyncapi/Handler.md)
- [Publisher](api/faststream/confluent/asyncapi/Publisher.md)
- broker
- [KafkaBroker](api/faststream/confluent/broker/KafkaBroker.md)
- client
- [AsyncConfluentConsumer](api/faststream/confluent/client/AsyncConfluentConsumer.md)
- [AsyncConfluentProducer](api/faststream/confluent/client/AsyncConfluentProducer.md)
- [BatchBuilder](api/faststream/confluent/client/BatchBuilder.md)
- [MsgToSend](api/faststream/confluent/client/MsgToSend.md)
- [TopicPartition](api/faststream/confluent/client/TopicPartition.md)
- [check_msg_error](api/faststream/confluent/client/check_msg_error.md)
- [create_topics](api/faststream/confluent/client/create_topics.md)
- handler
- [LogicHandler](api/faststream/confluent/handler/LogicHandler.md)
- message
- [ConsumerProtocol](api/faststream/confluent/message/ConsumerProtocol.md)
- [FakeConsumer](api/faststream/confluent/message/FakeConsumer.md)
- [KafkaMessage](api/faststream/confluent/message/KafkaMessage.md)
- parser
- [AsyncConfluentParser](api/faststream/confluent/parser/AsyncConfluentParser.md)
- producer
- [AsyncConfluentFastProducer](api/faststream/confluent/producer/AsyncConfluentFastProducer.md)
- publisher
- [LogicPublisher](api/faststream/confluent/publisher/LogicPublisher.md)
- router
- [KafkaRouter](api/faststream/confluent/router/KafkaRouter.md)
- security
- [parse_security](api/faststream/confluent/security/parse_security.md)
- shared
- logging
- [KafkaLoggingMixin](api/faststream/confluent/shared/logging/KafkaLoggingMixin.md)
- publisher
- [ABCPublisher](api/faststream/confluent/shared/publisher/ABCPublisher.md)
- router
- [BrokerRouter](api/faststream/confluent/shared/router/BrokerRouter.md)
- [KafkaRoute](api/faststream/confluent/shared/router/KafkaRoute.md)
- [KafkaRouter](api/faststream/confluent/shared/router/KafkaRouter.md)
- schemas
- [ConsumerConnectionParams](api/faststream/confluent/shared/schemas/ConsumerConnectionParams.md)
- test
- [FakeProducer](api/faststream/confluent/test/FakeProducer.md)
- [MockConfluentMessage](api/faststream/confluent/test/MockConfluentMessage.md)
- [TestKafkaBroker](api/faststream/confluent/test/TestKafkaBroker.md)
- [build_message](api/faststream/confluent/test/build_message.md)
- constants
- [ContentTypes](api/faststream/constants/ContentTypes.md)
- exceptions
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/KafkaBroker.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.KafkaBroker
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/KafkaRoute.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.router.BrokerRoute
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/KafkaRouter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.KafkaRouter
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/TestApp.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.test.TestApp
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/TestKafkaBroker.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.TestKafkaBroker
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/asyncapi/Handler.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.asyncapi.Handler
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/asyncapi/Publisher.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.asyncapi.Publisher
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/broker/KafkaBroker.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.broker.KafkaBroker
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.client.AsyncConfluentConsumer
Loading

0 comments on commit 5cac95b

Please sign in to comment.