Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 848:Extend DescribeConfigs and IncrementalAlterConfigs to support GROUP Config #1856

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
TopicCollection, IsolationLevel,
ConsumerGroupType, ElectionType)
from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource,
ConfigEntry, ConfigSource, AclBinding,
ConfigEntry, ConfigSource, ConfigType, AclBinding,
AclBindingFilter, ResourceType, ResourcePatternType,
AclOperation, AclPermissionType, AlterConfigOpType,
ScramMechanism, ScramCredentialInfo,
Expand Down Expand Up @@ -103,12 +103,13 @@ def example_create_partitions(a, topics):


def print_config(config, depth):
print('%40s = %-50s [%s,is:read-only=%r,default=%r,sensitive=%r,synonym=%r,synonyms=%s]' %
print('%40s = %-50s [%s,is:read-only=%r,default=%r,sensitive=%r,synonym=%r,synonyms=%s,type=%s, documentation=%s]' %
((' ' * depth) + config.name, config.value, ConfigSource(config.source),
config.is_read_only, config.is_default,
config.is_sensitive, config.is_synonym,
["%s:%s" % (x.name, ConfigSource(x.source))
for x in iter(config.synonyms.values())]))
for x in iter(config.synonyms.values())],
ConfigType(config.type), config.documentation))


def example_describe_configs(a, args):
Expand Down
1 change: 1 addition & 0 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

# Unused imports are keeped to be accessible using this public module
from ._config import (ConfigSource, # noqa: F401
ConfigType,
ConfigEntry,
ConfigResource,
AlterConfigOpType)
Expand Down
26 changes: 26 additions & 0 deletions src/confluent_kafka/admin/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import functools
from .. import cimpl as _cimpl
from ._resource import ResourceType
from .._util import ConversionUtil


class AlterConfigOpType(Enum):
Expand Down Expand Up @@ -58,6 +59,25 @@ class ConfigSource(Enum):
DEFAULT_CONFIG = _cimpl.CONFIG_SOURCE_DEFAULT_CONFIG #: Default


class ConfigType(Enum):
"""
Enumerates the different types of configuration properties.
Used by ConfigEntry to specify the
type of configuration properties returned by `describe_configs()`.
"""
UNKNOWN = _cimpl.CONFIG_TYPE_UNKNOWN #: Unknown
BOOLEAN = _cimpl.CONFIG_TYPE_BOOLEAN #: Boolean
STRING = _cimpl.CONFIG_TYPE_STRING #: String
INT = _cimpl.CONFIG_TYPE_INT #: Integer
SHORT = _cimpl.CONFIG_TYPE_SHORT #: Short
LONG = _cimpl.CONFIG_TYPE_LONG #: Long
DOUBLE = _cimpl.CONFIG_TYPE_DOUBLE #: Double
LIST = _cimpl.CONFIG_TYPE_LIST #: List
CLASS = _cimpl.CONFIG_TYPE_CLASS #: Class
PASSWORD = _cimpl.CONFIG_TYPE_PASSWORD #: Password
CNT = _cimpl.CONFIG_TYPE_CNT #: Count


class ConfigEntry(object):
"""
Represents a configuration property. Returned by describe_configs() for each configuration
Expand All @@ -72,6 +92,8 @@ def __init__(self, name, value,
is_default=False,
is_sensitive=False,
is_synonym=False,
type=ConfigType.UNKNOWN,
documentation=None,
synonyms=[],
incremental_operation=None):
"""
Expand Down Expand Up @@ -100,6 +122,10 @@ def __init__(self, name, value,
"""Indicates whether the configuration property is a synonym for the parent configuration entry."""
self.synonyms = synonyms
"""A list of synonyms (ConfigEntry) and alternate sources for this configuration property."""
self.type = ConversionUtil.convert_to_enum(type, ConfigType)
"""The type of the configuration property."""
self.documentation = documentation
"""The documentation for the configuration property."""
self.incremental_operation = incremental_operation
"""The incremental operation (AlterConfigOpType) to use in incremental_alter_configs."""

Expand Down
8 changes: 8 additions & 0 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -3435,6 +3435,7 @@ Admin_c_ConfigEntries_to_py (PyObject *ConfigEntry_type,
PyObject *kwargs, *args;
const rd_kafka_ConfigEntry_t *ent = c_configs[ci];
const rd_kafka_ConfigEntry_t **c_synonyms;
const char *documentation;
PyObject *entry, *synonyms;
size_t synonym_cnt;
const char *val;
Expand Down Expand Up @@ -3472,6 +3473,13 @@ Admin_c_ConfigEntries_to_py (PyObject *ConfigEntry_type,
PyDict_SetItemString(kwargs, "synonyms", synonyms);
Py_DECREF(synonyms);

cfl_PyDict_SetInt(kwargs, "type", rd_kafka_ConfigEntry_type(ent));

documentation = rd_kafka_ConfigEntry_documentation(ent);

if (documentation)
cfl_PyDict_SetString(kwargs, "documentation", documentation);

args = PyTuple_New(0);
entry = PyObject_Call(ConfigEntry_type, args, kwargs);
Py_DECREF(args);
Expand Down
16 changes: 16 additions & 0 deletions src/confluent_kafka/src/AdminTypes.c
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,21 @@ static void AdminTypes_AddObjectsConfigSource (PyObject *m) {
RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG);
}

static void AdminTypes_AddObjectsConfigType(PyObject *m) {
/* rd_kafka_ConfigType_t */
PyModule_AddIntConstant(m, "CONFIG_TYPE_UNKNOWN", RD_KAFKA_CONFIG_UNKNOWN);
PyModule_AddIntConstant(m, "CONFIG_TYPE_BOOLEAN", RD_KAFKA_CONFIG_BOOLEAN);
PyModule_AddIntConstant(m, "CONFIG_TYPE_STRING", RD_KAFKA_CONFIG_STRING);
PyModule_AddIntConstant(m, "CONFIG_TYPE_INT", RD_KAFKA_CONFIG_INT);
PyModule_AddIntConstant(m, "CONFIG_TYPE_SHORT", RD_KAFKA_CONFIG_SHORT);
PyModule_AddIntConstant(m, "CONFIG_TYPE_LONG", RD_KAFKA_CONFIG_LONG);
PyModule_AddIntConstant(m, "CONFIG_TYPE_DOUBLE", RD_KAFKA_CONFIG_DOUBLE);
PyModule_AddIntConstant(m, "CONFIG_TYPE_LIST", RD_KAFKA_CONFIG_LIST);
PyModule_AddIntConstant(m, "CONFIG_TYPE_CLASS", RD_KAFKA_CONFIG_CLASS);
PyModule_AddIntConstant(m, "CONFIG_TYPE_PASSWORD", RD_KAFKA_CONFIG_PASSWORD);
PyModule_AddIntConstant(m, "CONFIG_TYPE_CNT", RD_KAFKA_CONFIG__CNT);
}


static void AdminTypes_AddObjectsResourceType (PyObject *m) {
/* rd_kafka_ResourceType_t */
Expand Down Expand Up @@ -622,6 +637,7 @@ void AdminTypes_AddObjects (PyObject *m) {
PyModule_AddObject(m, "NewPartitions", (PyObject *)&NewPartitionsType);

AdminTypes_AddObjectsConfigSource(m);
AdminTypes_AddObjectsConfigType(m);
AdminTypes_AddObjectsResourceType(m);
AdminTypes_AddObjectsResourcePatternType(m);
AdminTypes_AddObjectsAclOperation(m);
Expand Down
33 changes: 33 additions & 0 deletions tests/integration/admin/test_incremental_alter_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
ConfigEntry, ResourceType, \
AlterConfigOpType

from tests.common import TestUtils


def assert_expected_config_entries(fs, num_fs, expected):
"""
Expand Down Expand Up @@ -147,3 +149,34 @@ def test_incremental_alter_configs(kafka_cluster):

# Assert expected config entries.
assert_expected_config_entries(fs, 1, expected)

if TestUtils.use_group_protocol_consumer():
group_id = "test-group"

res_group = ConfigResource(
ResourceType.GROUP,
group_id,
incremental_configs=[
ConfigEntry("consumer.session.timeout.ms", "50000",
incremental_operation=AlterConfigOpType.SET)
]
)

expected[res_group] = ['consumer.session.timeout.ms="50000"']

#
# Incrementally alter some configuration values
#
fs = admin_client.incremental_alter_configs([res_group])

assert_operation_succeeded(fs, 1)

time.sleep(1)

#
# Get current group config
#
fs = admin_client.describe_configs([res_group])

# Assert expected config entries.
assert_expected_config_entries(fs, 1, expected)