From 8f9692d53c637724094f7dff525224acd9779a45 Mon Sep 17 00:00:00 2001 From: Ruslan Date: Tue, 2 Jun 2020 00:36:47 +0600 Subject: [PATCH 1/8] feat AdminClient: support delete_records Authored-by: Ruslan Cherrypicked-by: Arsen Kitov --- kafka/admin/client.py | 59 +++++++++++++++++++++++++++++++++++++++-- kafka/protocol/admin.py | 33 ++++++++++++++++++++++- 2 files changed, 89 insertions(+), 3 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 8eb7504a7..844314d8b 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -20,8 +20,7 @@ from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, - DeleteGroupsRequest -) + DeleteGroupsRequest, DeleteRecordsRequest) from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest from kafka.protocol.types import Array @@ -962,6 +961,62 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal .format(version)) return self._send_request_to_controller(request) + def delete_records(self, records_to_delete, timeout_ms=None): + """Delete records whose offset is smaller than the given offset of the corresponding partition. + + :param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the + given partitions. + + :return: List of DeleteRecordsResponse + """ + timeout_ms = self._validate_timeout(timeout_ms) + version = self._matching_api_version(MetadataRequest) + + topics = set() + + for topic2partition in records_to_delete: + topics.add(topic2partition.topic) + + request = MetadataRequest[version]( + topics=list(topics), + allow_auto_topic_creation=False + ) + + future = self._send_request_to_node(self._client.least_loaded_node(), request) + + self._wait_for_futures([future]) + response = future.value + + version = self._matching_api_version(DeleteRecordsRequest) + + PARTITIONS_INFO = 3 + NAME = 1 + PARTITION_INDEX = 1 + LEADER = 2 + + partition2leader = dict() + + for topic in response.topics: + for partition in topic[PARTITIONS_INFO]: + t2p = TopicPartition(topic=topic[NAME], partition=partition[PARTITION_INDEX]) + partition2leader[t2p] = partition[LEADER] + + responses = [] + + for topic2partition in records_to_delete: + request = DeleteRecordsRequest[version]( + topics=[(topic2partition.topic, [(topic2partition.partition, records_to_delete[topic2partition])])], + timeout_ms=timeout_ms + ) + # Sending separate request for each partition leader + future = self._send_request_to_node(partition2leader[topic2partition], request) + self._wait_for_futures([future]) + + response = future.value + responses.append(response) + + return responses + # delete records protocol not yet implemented # Note: send the request to the partition leaders diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index f9d61e5cd..f2ce96f26 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -239,6 +239,38 @@ class DeleteTopicsRequest_v3(Request): ] +class DeleteRecordsResponse_v0(Response): + API_KEY = 21 + API_VERSION = 0 + SCHEMA = Schema( + ('topics', Array( + ('name', String('utf-8')), + ('partitions', Array( + ('partition_index', Int32), + ('low_watermark', Int64), + ('error_code', Int16))))), + ('throttle_time_ms', Int32) + ) + + +class DeleteRecordsRequest_v0(Request): + API_KEY = 21 + API_VERSION = 0 + RESPONSE_TYPE = DeleteRecordsResponse_v0 + SCHEMA = Schema( + ('topics', Array( + ('name', String('utf-8')), + ('partitions', Array( + ('partition_index', Int32), + ('offset', Int64))))), + ('timeout_ms', Int32) + ) + + +DeleteRecordsResponse = [DeleteRecordsResponse_v0] +DeleteRecordsRequest = [DeleteRecordsRequest_v0] + + class ListGroupsResponse_v0(Response): API_KEY = 16 API_VERSION = 0 @@ -882,7 +914,6 @@ class CreatePartitionsRequest_v1(Request): CreatePartitionsResponse_v0, CreatePartitionsResponse_v1, ] - class DeleteGroupsResponse_v0(Response): API_KEY = 42 API_VERSION = 0 From 196e3616841308e75456538089e827f5597795f4 Mon Sep 17 00:00:00 2001 From: Arsen Kitov Date: Sun, 19 Mar 2023 13:57:01 +0100 Subject: [PATCH 2/8] address review comments --- kafka/admin/client.py | 3 --- kafka/protocol/admin.py | 1 + test/conftest.py | 4 ++-- test/test_admin_integration.py | 25 +++++++++++++++++++++++++ 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 844314d8b..4a1d2a3a5 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1017,9 +1017,6 @@ def delete_records(self, records_to_delete, timeout_ms=None): return responses - # delete records protocol not yet implemented - # Note: send the request to the partition leaders - # create delegation token protocol not yet implemented # Note: send the request to the least_loaded_node() diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index f2ce96f26..7c80972ae 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -914,6 +914,7 @@ class CreatePartitionsRequest_v1(Request): CreatePartitionsResponse_v0, CreatePartitionsResponse_v1, ] + class DeleteGroupsResponse_v0(Response): API_KEY = 42 API_VERSION = 0 diff --git a/test/conftest.py b/test/conftest.py index 3fa0262fd..98b629f4e 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -61,11 +61,11 @@ def kafka_consumer_factory(kafka_broker, topic, request): """Return a KafkaConsumer factory fixture""" _consumer = [None] - def factory(**kafka_consumer_params): + def factory(topics=(topic,), **kafka_consumer_params): params = {} if kafka_consumer_params is None else kafka_consumer_params.copy() params.setdefault('client_id', 'consumer_%s' % (request.node.name,)) params.setdefault('auto_offset_reset', 'earliest') - _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params)) + _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=list(topics), **params)) return _consumer[0] yield factory diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 06c40a223..e0a81ecc9 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -1,3 +1,4 @@ +from kafka.structs import TopicPartition import pytest from logging import info @@ -312,3 +313,27 @@ def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_fa assert group1 not in consumergroups assert group2 in consumergroups assert group3 not in consumergroups + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0") +def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_messages, topic): + p0 = TopicPartition(topic, 0) + p1 = TopicPartition(topic, 1) + p2 = TopicPartition(topic, 2) + + for p in (p0, p1, p2): + send_messages(range(0, 100), partition=p.partition, topic=p.topic) + + consumer1 = kafka_consumer_factory(group_id=None, topics=()) + consumer1.assign([p0, p1, p2]) + for _ in range(300): + next(consumer1) + + kafka_admin_client.delete_records({p0: -1, p1: 50}) + + consumer2 = kafka_consumer_factory(group_id=None, topics=()) + consumer2.assign([p0, p1, p2]) + all_messages = consumer2.poll(max_records=300, timeout_ms=1000) + assert not consumer2.poll(max_records=1, timeout_ms=1000) # ensure we read everything + assert not all_messages.get(p0, []) + assert [r.offset for r in all_messages[p1]] == list(range(50, 100)) + assert [r.offset for r in all_messages[p2]] == list(range(100)) From e1efa9e452fb30e54f68c6acac6267b9e34fa858 Mon Sep 17 00:00:00 2001 From: Arsen Kitov Date: Sun, 19 Mar 2023 16:58:24 +0100 Subject: [PATCH 3/8] fix: do not send unnecessary duplicate requests --- kafka/admin/client.py | 24 +++++++++++------- test/test_admin_integration.py | 46 +++++++++++++++++++++++----------- 2 files changed, 47 insertions(+), 23 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 4a1d2a3a5..0f57f9f58 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -974,8 +974,8 @@ def delete_records(self, records_to_delete, timeout_ms=None): topics = set() - for topic2partition in records_to_delete: - topics.add(topic2partition.topic) + for topic_partition in records_to_delete: + topics.add(topic_partition.topic) request = MetadataRequest[version]( topics=list(topics), @@ -994,22 +994,28 @@ def delete_records(self, records_to_delete, timeout_ms=None): PARTITION_INDEX = 1 LEADER = 2 - partition2leader = dict() - + # We want to make as few requests as possible + # If a single node serves as a partition leader for multiple partitions (and/or + # topics), we can send all of those in a single request. + # For that we store {leader -> {topic1 -> [p0, p1], topic2 -> [p0, p1]}} + leader2topic2partitions = defaultdict(lambda: defaultdict(list)) for topic in response.topics: for partition in topic[PARTITIONS_INFO]: t2p = TopicPartition(topic=topic[NAME], partition=partition[PARTITION_INDEX]) - partition2leader[t2p] = partition[LEADER] + if t2p in records_to_delete: + leader2topic2partitions[partition[LEADER]][t2p.topic].append(t2p) responses = [] - for topic2partition in records_to_delete: + for leader, topic2partitions in leader2topic2partitions.items(): request = DeleteRecordsRequest[version]( - topics=[(topic2partition.topic, [(topic2partition.partition, records_to_delete[topic2partition])])], + topics=[ + (topic, [(tp.partition, records_to_delete[tp]) for tp in partitions]) + for topic, partitions in topic2partitions.items() + ], timeout_ms=timeout_ms ) - # Sending separate request for each partition leader - future = self._send_request_to_node(partition2leader[topic2partition], request) + future = self._send_request_to_node(leader, request) self._wait_for_futures([future]) response = future.value diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index e0a81ecc9..0b1057da1 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -314,26 +314,44 @@ def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_fa assert group2 in consumergroups assert group3 not in consumergroups +@pytest.fixture(name="topic2") +def _topic2(kafka_broker, request): + """Same as `topic` fixture, but a different name if you need to topics.""" + topic_name = '%s_%s' % (request.node.name, random_string(10)) + kafka_broker.create_topics([topic_name]) + return topic_name + @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0") -def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_messages, topic): - p0 = TopicPartition(topic, 0) - p1 = TopicPartition(topic, 1) - p2 = TopicPartition(topic, 2) +def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_messages, topic, topic2): + t0p0 = TopicPartition(topic, 0) + t0p1 = TopicPartition(topic, 1) + t0p2 = TopicPartition(topic, 2) + t1p0 = TopicPartition(topic2, 0) + t1p1 = TopicPartition(topic2, 1) + t1p2 = TopicPartition(topic2, 2) + + partitions = (t0p0, t0p1, t0p2, t1p0, t1p1, t1p2) - for p in (p0, p1, p2): + for p in partitions: send_messages(range(0, 100), partition=p.partition, topic=p.topic) consumer1 = kafka_consumer_factory(group_id=None, topics=()) - consumer1.assign([p0, p1, p2]) - for _ in range(300): + consumer1.assign(partitions) + for _ in range(600): next(consumer1) - kafka_admin_client.delete_records({p0: -1, p1: 50}) + kafka_admin_client.delete_records({t0p0: -1, t0p1: 50, t1p0: 40, t1p2: 30}, timeout_ms=1000) consumer2 = kafka_consumer_factory(group_id=None, topics=()) - consumer2.assign([p0, p1, p2]) - all_messages = consumer2.poll(max_records=300, timeout_ms=1000) - assert not consumer2.poll(max_records=1, timeout_ms=1000) # ensure we read everything - assert not all_messages.get(p0, []) - assert [r.offset for r in all_messages[p1]] == list(range(50, 100)) - assert [r.offset for r in all_messages[p2]] == list(range(100)) + consumer2.assign(partitions) + all_messages = consumer2.poll(max_records=600, timeout_ms=2000) + assert sum(len(x) for x in all_messages.values()) == 600 - 100 - 50 - 40 - 30 + assert not consumer2.poll(max_records=1, timeout_ms=1000) # ensure there are no delayed messages + + assert not all_messages.get(t0p0, []) + assert [r.offset for r in all_messages[t0p1]] == list(range(50, 100)) + assert [r.offset for r in all_messages[t0p2]] == list(range(100)) + + assert [r.offset for r in all_messages[t1p0]] == list(range(40, 100)) + assert [r.offset for r in all_messages[t1p1]] == list(range(100)) + assert [r.offset for r in all_messages[t1p2]] == list(range(30, 100)) From c7e8ba9cce54feffc10f11d76ea3d875daf422b7 Mon Sep 17 00:00:00 2001 From: Arsen Kitov Date: Sun, 19 Mar 2023 18:29:30 +0100 Subject: [PATCH 4/8] tests & cleanup --- kafka/admin/client.py | 84 +++++++++++++++++++++++++++++----- test/test_admin_integration.py | 20 +++++++- 2 files changed, 91 insertions(+), 13 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 0f57f9f58..cc2b043c1 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -14,7 +14,7 @@ from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol import kafka.errors as Errors from kafka.errors import ( - IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, + IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, UnknownTopicOrPartitionError, UnrecognizedBrokerVersion, IllegalArgumentError) from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( @@ -961,20 +961,24 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal .format(version)) return self._send_request_to_controller(request) - def delete_records(self, records_to_delete, timeout_ms=None): - """Delete records whose offset is smaller than the given offset of the corresponding partition. + def _get_leader_for_partitions(self, partitions, timeout_ms=None): + """Finds ID of the leader node for every given topic partition. - :param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the - given partitions. + Will raise UnknownTopicOrPartitionError if for some partition no leader can be found. - :return: List of DeleteRecordsResponse + :param partitions: ``[TopicPartition]``: partitions for which to find leaders. + :param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from + config. + + :return: Dictionary with ``{leader_id -> {partitions}}`` """ timeout_ms = self._validate_timeout(timeout_ms) version = self._matching_api_version(MetadataRequest) + partitions = set(partitions) topics = set() - for topic_partition in records_to_delete: + for topic_partition in partitions: topics.add(topic_partition.topic) request = MetadataRequest[version]( @@ -998,21 +1002,60 @@ def delete_records(self, records_to_delete, timeout_ms=None): # If a single node serves as a partition leader for multiple partitions (and/or # topics), we can send all of those in a single request. # For that we store {leader -> {topic1 -> [p0, p1], topic2 -> [p0, p1]}} - leader2topic2partitions = defaultdict(lambda: defaultdict(list)) + leader2partitions = defaultdict(list) + valid_partitions = set() for topic in response.topics: for partition in topic[PARTITIONS_INFO]: t2p = TopicPartition(topic=topic[NAME], partition=partition[PARTITION_INDEX]) - if t2p in records_to_delete: - leader2topic2partitions[partition[LEADER]][t2p.topic].append(t2p) + if t2p in partitions: + leader2partitions[partition[LEADER]].append(t2p) + valid_partitions.add(t2p) + + if len(partitions) != len(valid_partitions): + unknown = set(partitions) - valid_partitions + raise UnknownTopicOrPartitionError( + "The following partitions are not known: %s" % ", " + .join(str(x) for x in unknown) + ) + + return leader2partitions + + def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): + """Delete records whose offset is smaller than the given offset of the corresponding partition. + + Note: if partition + + :param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the + given partitions. + :param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from + config. + :param partition_leader_id: ``str``: If specified, all deletion requests will be sent to + this node. No check is performed verifying that this is indeed the leader for all + listed partitions, use with caution. + :return: List of DeleteRecordsResponse + """ + timeout_ms = self._validate_timeout(timeout_ms) responses = [] + version = self._matching_api_version(DeleteRecordsRequest) + + if partition_leader_id is None: + leader2partitions = self._get_leader_for_partitions( + set(records_to_delete), timeout_ms + ) + else: + leader2partitions = {partition_leader_id: set(records_to_delete)} + + for leader, partitions in leader2partitions.items(): + topic2partitions = defaultdict(list) + for partition in partitions: + topic2partitions[partition.topic].append(partition) - for leader, topic2partitions in leader2topic2partitions.items(): request = DeleteRecordsRequest[version]( topics=[ (topic, [(tp.partition, records_to_delete[tp]) for tp in partitions]) for topic, partitions in topic2partitions.items() - ], + ], timeout_ms=timeout_ms ) future = self._send_request_to_node(leader, request) @@ -1021,6 +1064,23 @@ def delete_records(self, records_to_delete, timeout_ms=None): response = future.value responses.append(response) + partition2error = {} + for response in responses: + for topic in getattr(response, 'topics', ()): + for partition in getattr(topic, 'partitions', ()): + if getattr(partition, 'error_code', 0) != 0: + tp = TopicPartition(topic, partition['partition_index']) + partition2error[tp] =partition['error_code'] + + if partition2error: + if len(partition2error) == 1: + raise Errors.for_code(partition2error[0])() + else: + raise Errors.BrokerResponseError( + "The following errors occured when trying to delete records: " + ", ".join("%s: %s" % (partition, error) for partition, error in partition2error.items()) + ) + return responses # create delegation token protocol not yet implemented diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 0b1057da1..2de0be55b 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -8,7 +8,9 @@ from kafka.admin import ( ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) -from kafka.errors import (NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError) +from kafka.errors import ( + KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, + GroupIdNotFoundError, UnknownTopicOrPartitionError) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") @@ -355,3 +357,19 @@ def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_message assert [r.offset for r in all_messages[t1p0]] == list(range(40, 100)) assert [r.offset for r in all_messages[t1p1]] == list(range(100)) assert [r.offset for r in all_messages[t1p2]] == list(range(30, 100)) + + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0") +def test_delete_records_with_errors(kafka_admin_client, topic): + sleep(1) # sometimes the topic is not created yet...? + p0 = TopicPartition(topic, 0) + # verify that topic has been created + kafka_admin_client.delete_records({p0: 10}, timeout_ms=1000) + + with pytest.raises(UnknownTopicOrPartitionError): + kafka_admin_client.delete_records({TopicPartition(topic, 9999): -1}) + with pytest.raises(UnknownTopicOrPartitionError): + kafka_admin_client.delete_records({TopicPartition("doesntexist", 0): -1}) + + + From d93e7fb9b4ce00a4dc5fd10f7fed55f5c65a29de Mon Sep 17 00:00:00 2001 From: Arsen Kitov Date: Sun, 19 Mar 2023 18:42:07 +0100 Subject: [PATCH 5/8] fmt --- kafka/admin/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index cc2b043c1..423c371f1 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1070,7 +1070,7 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id for partition in getattr(topic, 'partitions', ()): if getattr(partition, 'error_code', 0) != 0: tp = TopicPartition(topic, partition['partition_index']) - partition2error[tp] =partition['error_code'] + partition2error[tp] = partition['error_code'] if partition2error: if len(partition2error) == 1: From f9afb4e31106caab1d4681d8c3f27076005607d8 Mon Sep 17 00:00:00 2001 From: Arsen Kitov Date: Mon, 20 Mar 2023 20:19:29 +0100 Subject: [PATCH 6/8] typo: typo in docs --- kafka/admin/client.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 423c371f1..b6b310702 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1023,8 +1023,6 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None): def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): """Delete records whose offset is smaller than the given offset of the corresponding partition. - Note: if partition - :param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the given partitions. :param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from From d944b36b09e99019d77f76d739e6ceaffec36b55 Mon Sep 17 00:00:00 2001 From: Arsen Kitov Date: Tue, 21 Mar 2023 08:21:37 +0100 Subject: [PATCH 7/8] better response structure, better tests, reuse more code, fix protocol --- kafka/admin/client.py | 79 +++++++++++++++------------------- kafka/protocol/admin.py | 2 +- test/test_admin_integration.py | 20 ++++++--- 3 files changed, 51 insertions(+), 50 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index b6b310702..fb5097aa3 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -973,49 +973,26 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None): :return: Dictionary with ``{leader_id -> {partitions}}`` """ timeout_ms = self._validate_timeout(timeout_ms) - version = self._matching_api_version(MetadataRequest) partitions = set(partitions) - topics = set() - - for topic_partition in partitions: - topics.add(topic_partition.topic) - - request = MetadataRequest[version]( - topics=list(topics), - allow_auto_topic_creation=False - ) - - future = self._send_request_to_node(self._client.least_loaded_node(), request) - - self._wait_for_futures([future]) - response = future.value - - version = self._matching_api_version(DeleteRecordsRequest) + topics = set(tp.topic for tp in partitions) - PARTITIONS_INFO = 3 - NAME = 1 - PARTITION_INDEX = 1 - LEADER = 2 + response = self._get_cluster_metadata(topics=topics).to_object() - # We want to make as few requests as possible - # If a single node serves as a partition leader for multiple partitions (and/or - # topics), we can send all of those in a single request. - # For that we store {leader -> {topic1 -> [p0, p1], topic2 -> [p0, p1]}} leader2partitions = defaultdict(list) valid_partitions = set() - for topic in response.topics: - for partition in topic[PARTITIONS_INFO]: - t2p = TopicPartition(topic=topic[NAME], partition=partition[PARTITION_INDEX]) + for topic in response.get("topics", ()): + for partition in topic.get("partitions", ()): + t2p = TopicPartition(topic=topic["topic"], partition=partition["partition"]) if t2p in partitions: - leader2partitions[partition[LEADER]].append(t2p) + leader2partitions[partition["leader"]].append(t2p) valid_partitions.add(t2p) if len(partitions) != len(valid_partitions): unknown = set(partitions) - valid_partitions raise UnknownTopicOrPartitionError( - "The following partitions are not known: %s" % ", " - .join(str(x) for x in unknown) + "The following partitions are not known: %s" + % ", ".join(str(x) for x in unknown) ) return leader2partitions @@ -1029,14 +1006,20 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id config. :param partition_leader_id: ``str``: If specified, all deletion requests will be sent to this node. No check is performed verifying that this is indeed the leader for all - listed partitions, use with caution. + listed partitions: use with caution. - :return: List of DeleteRecordsResponse + :return: Dictionary {topicPartition -> metadata}, where metadata is returned by the broker. + See DeleteRecordsResponse for possible fields. error_code for all partitions is + guaranteed to be zero, otherwise an exception is raised. """ timeout_ms = self._validate_timeout(timeout_ms) responses = [] version = self._matching_api_version(DeleteRecordsRequest) + # We want to make as few requests as possible + # If a single node serves as a partition leader for multiple partitions (and/or + # topics), we can send all of those in a single request. + # For that we store {leader -> {partitions for leader}}, and do 1 request per leader if partition_leader_id is None: leader2partitions = self._get_leader_for_partitions( set(records_to_delete), timeout_ms @@ -1059,27 +1042,35 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id future = self._send_request_to_node(leader, request) self._wait_for_futures([future]) - response = future.value - responses.append(response) + responses.append(future.value.to_object()) + partition2result = {} partition2error = {} for response in responses: - for topic in getattr(response, 'topics', ()): - for partition in getattr(topic, 'partitions', ()): - if getattr(partition, 'error_code', 0) != 0: - tp = TopicPartition(topic, partition['partition_index']) - partition2error[tp] = partition['error_code'] + for topic in response["topics"]: + for partition in topic["partitions"]: + tp = TopicPartition(topic["name"], partition["partition_index"]) + partition2result[tp] = partition + if partition["error_code"] != 0: + partition2error[tp] = partition["error_code"] if partition2error: if len(partition2error) == 1: - raise Errors.for_code(partition2error[0])() + key, error = next(iter(partition2error.items())) + raise Errors.for_code(error)( + "Error deleting records from topic %s partition %s" % (key.topic, key.partition) + ) else: raise Errors.BrokerResponseError( - "The following errors occured when trying to delete records: " - ", ".join("%s: %s" % (partition, error) for partition, error in partition2error.items()) + "The following errors occured when trying to delete records: " + + ", ".join( + "%s(partition=%d): %s" % + (partition.topic, partition.partition, Errors.for_code(error).__name__) + for partition, error in partition2error.items() + ) ) - return responses + return partition2result # create delegation token protocol not yet implemented # Note: send the request to the least_loaded_node() diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 7c80972ae..d8d770d53 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -243,13 +243,13 @@ class DeleteRecordsResponse_v0(Response): API_KEY = 21 API_VERSION = 0 SCHEMA = Schema( + ('throttle_time_ms', Int32), ('topics', Array( ('name', String('utf-8')), ('partitions', Array( ('partition_index', Int32), ('low_watermark', Int64), ('error_code', Int16))))), - ('throttle_time_ms', Int32) ) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 2de0be55b..add772427 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -9,8 +9,8 @@ from kafka.admin import ( ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) from kafka.errors import ( - KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, - GroupIdNotFoundError, UnknownTopicOrPartitionError) + BrokerResponseError, KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, + GroupIdNotFoundError, OffsetOutOfRangeError, UnknownTopicOrPartitionError) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") @@ -342,7 +342,11 @@ def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_message for _ in range(600): next(consumer1) - kafka_admin_client.delete_records({t0p0: -1, t0p1: 50, t1p0: 40, t1p2: 30}, timeout_ms=1000) + result = kafka_admin_client.delete_records({t0p0: -1, t0p1: 50, t1p0: 40, t1p2: 30}, timeout_ms=1000) + assert result[t0p0] == {"low_watermark": 100, "error_code": 0, "partition_index": t0p0.partition} + assert result[t0p1] == {"low_watermark": 50, "error_code": 0, "partition_index": t0p1.partition} + assert result[t1p0] == {"low_watermark": 60, "error_code": 0, "partition_index": t1p0.partition} + assert result[t1p2] == {"low_watermark": 70, "error_code": 0, "partition_index": t1p2.partition} consumer2 = kafka_consumer_factory(group_id=None, topics=()) consumer2.assign(partitions) @@ -360,16 +364,22 @@ def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_message @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0") -def test_delete_records_with_errors(kafka_admin_client, topic): +def test_delete_records_with_errors(kafka_admin_client, topic, send_messages): sleep(1) # sometimes the topic is not created yet...? p0 = TopicPartition(topic, 0) + p1 = TopicPartition(topic, 1) + p2 = TopicPartition(topic, 2) # verify that topic has been created - kafka_admin_client.delete_records({p0: 10}, timeout_ms=1000) + send_messages(range(0, 1), partition=p2.partition, topic=p2.topic) with pytest.raises(UnknownTopicOrPartitionError): kafka_admin_client.delete_records({TopicPartition(topic, 9999): -1}) with pytest.raises(UnknownTopicOrPartitionError): kafka_admin_client.delete_records({TopicPartition("doesntexist", 0): -1}) + with pytest.raises(OffsetOutOfRangeError): + kafka_admin_client.delete_records({p0: 1000}) + with pytest.raises(BrokerResponseError): + kafka_admin_client.delete_records({p0: 1000, p1: 1000}) From 3bac387deea4e0ee202e6932161921d03a80693d Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 20:54:37 -0500 Subject: [PATCH 8/8] move parenthesis in import to be consistent with the rest of the project --- kafka/admin/client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index fb5097aa3..0dda09c60 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -15,12 +15,14 @@ import kafka.errors as Errors from kafka.errors import ( IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, UnknownTopicOrPartitionError, - UnrecognizedBrokerVersion, IllegalArgumentError) + UnrecognizedBrokerVersion, IllegalArgumentError +) from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, - DeleteGroupsRequest, DeleteRecordsRequest) + DeleteGroupsRequest, DeleteRecordsRequest +) from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest from kafka.protocol.types import Array