Skip to content

Commit

Permalink
Add getControllerId (#554)
Browse files Browse the repository at this point in the history
  • Loading branch information
qkdreyer authored Sep 2, 2024
1 parent 4b085d2 commit 554305b
Show file tree
Hide file tree
Showing 16 changed files with 375 additions and 69 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ jobs:
tests:
name: 'Tests'
strategy:
fail-fast: false
matrix:
include:
# Latest librdkafka 2.x with memcheck
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/test/start-kafka.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#!/bin/sh

docker network create kafka_network
docker pull wurstmeister/zookeeper:3.4.6
docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:3.4.6
docker pull wurstmeister/kafka:2.13-2.6.0
docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:2.13-2.6.0
docker pull wurstmeister/zookeeper:latest
docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:latest
docker pull wurstmeister/kafka:latest
docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:latest
printf "\n127.0.0.1 kafka\n"|sudo tee /etc/hosts >/dev/null

echo "Waiting for Kafka to be ready"
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ modules
package.xml
rdkafka-*.tgz
run-tests.php
gen_stub.php
tests/*/*.diff
tests/*/*.exp
tests/*/*.log
Expand Down
55 changes: 42 additions & 13 deletions conf_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,22 +1,38 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: 86e8e9fcd235f3affc4ef30ca0d96395abcad13f */
* Stub hash: 86934f54199c0af37cadfcedafeaffb569e33beb */

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0)
ZEND_END_ARG_INFO()

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_dump, 0, 0, IS_ARRAY, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_dump, 0, 0, 0)
#endif
ZEND_END_ARG_INFO()

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_set, 0, 2, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_set, 0, 0, 2)
#endif
ZEND_ARG_TYPE_INFO(0, name, IS_STRING, 0)
ZEND_ARG_TYPE_INFO(0, value, IS_STRING, 0)
ZEND_END_ARG_INFO()

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setDefaultTopicConf, 0, 1, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setDefaultTopicConf, 0, 0, 1)
#endif
ZEND_ARG_OBJ_INFO(0, topic_conf, RdKafka\\TopicConf, 0)
ZEND_END_ARG_INFO()

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setErrorCb, 0, 1, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setErrorCb, 0, 0, 1)
#endif
ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0)
ZEND_END_ARG_INFO()

Expand All @@ -32,8 +48,14 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 1, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 0, 1)
#endif
ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0)
ZEND_END_ARG_INFO()
#endif

#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct
Expand All @@ -42,11 +64,14 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_TopicConf_set arginfo_class_RdKafka_Conf_set

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 1, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 0, 1)
#endif
ZEND_ARG_TYPE_INFO(0, partitioner, IS_LONG, 0)
ZEND_END_ARG_INFO()


ZEND_METHOD(RdKafka_Conf, __construct);
ZEND_METHOD(RdKafka_Conf, dump);
ZEND_METHOD(RdKafka_Conf, set);
Expand All @@ -58,15 +83,12 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif

ZEND_METHOD(RdKafka_TopicConf, __construct);
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);


static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, __construct, arginfo_class_RdKafka_Conf___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, dump, arginfo_class_RdKafka_Conf_dump, ZEND_ACC_PUBLIC)
Expand All @@ -79,17 +101,24 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
#endif
#endif
ZEND_FE_END
};


static const zend_function_entry class_RdKafka_TopicConf_methods[] = {
ZEND_ME(RdKafka_TopicConf, __construct, arginfo_class_RdKafka_TopicConf___construct, ZEND_ACC_PUBLIC)
ZEND_MALIAS(RdKafka_Conf, dump, dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
ZEND_MALIAS(RdKafka_Conf, set, set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
#if (PHP_VERSION_ID >= 80400)
ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC, NULL, NULL)
#else
ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
#endif
#if (PHP_VERSION_ID >= 80400)
ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC, NULL, NULL)
#else
ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
#endif
ZEND_ME(RdKafka_TopicConf, setPartitioner, arginfo_class_RdKafka_TopicConf_setPartitioner, ZEND_ACC_PUBLIC)
ZEND_FE_END
};
Expand Down
29 changes: 18 additions & 11 deletions conf_legacy_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: 86e8e9fcd235f3affc4ef30ca0d96395abcad13f */
* Stub hash: 86934f54199c0af37cadfcedafeaffb569e33beb */

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0)
ZEND_END_ARG_INFO()
Expand Down Expand Up @@ -32,8 +32,10 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 0, 1)
ZEND_ARG_INFO(0, callback)
ZEND_END_ARG_INFO()
#endif

#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct
Expand All @@ -46,7 +48,6 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 0, 1)
ZEND_ARG_INFO(0, partitioner)
ZEND_END_ARG_INFO()


ZEND_METHOD(RdKafka_Conf, __construct);
ZEND_METHOD(RdKafka_Conf, dump);
ZEND_METHOD(RdKafka_Conf, set);
Expand All @@ -58,13 +59,12 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif
ZEND_METHOD(RdKafka_TopicConf, __construct);
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);


static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, __construct, arginfo_class_RdKafka_Conf___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, dump, arginfo_class_RdKafka_Conf_dump, ZEND_ACC_PUBLIC)
Expand All @@ -77,17 +77,24 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
#endif
#endif
ZEND_FE_END
};


static const zend_function_entry class_RdKafka_TopicConf_methods[] = {
ZEND_ME(RdKafka_TopicConf, __construct, arginfo_class_RdKafka_TopicConf___construct, ZEND_ACC_PUBLIC)
ZEND_MALIAS(RdKafka_Conf, dump, dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
ZEND_MALIAS(RdKafka_Conf, set, set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
#if (PHP_VERSION_ID >= 80400)
ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC, NULL, NULL)
#else
ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
#endif
#if (PHP_VERSION_ID >= 80400)
ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC, NULL, NULL)
#else
ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
#endif
ZEND_ME(RdKafka_TopicConf, setPartitioner, arginfo_class_RdKafka_TopicConf_setPartitioner, ZEND_ACC_PUBLIC)
ZEND_FE_END
};
Expand Down
10 changes: 10 additions & 0 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ if test "$PHP_RDKAFKA" != "no"; then
AC_MSG_WARN([purge is not available])
])

AC_CHECK_LIB($LIBNAME,[rd_kafka_controllerid],[
#if RD_KAFKA_VERSION >= 0x010000ff
AC_DEFINE(HAS_RD_KAFKA_CONTROLLERID,1,[ ])
#else
AC_MSG_WARN([controllerid is broken on 0.11.x])
#endif
],[
AC_MSG_WARN([controllerid is not available])
])

AC_CHECK_LIB($LIBNAME,[rd_kafka_init_transactions],[
AC_DEFINE(HAS_RD_KAFKA_TRANSACTIONS,1,[ ])
SOURCES="$SOURCES kafka_error_exception.c"
Expand Down
24 changes: 23 additions & 1 deletion kafka_consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ PHP_METHOD(RdKafka_KafkaConsumer, close)
}
/* }}} */

/* {{{ proto Metadata RdKafka\KafkaConsumer::getMetadata(bool all_topics, RdKafka\Topic only_topic, int timeout_ms)
/* {{{ proto RdKafka\Metadata RdKafka\KafkaConsumer::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms)
Request Metadata from broker */
PHP_METHOD(RdKafka_KafkaConsumer, getMetadata)
{
Expand Down Expand Up @@ -581,6 +581,28 @@ PHP_METHOD(RdKafka_KafkaConsumer, getMetadata)
}
/* }}} */

#ifdef HAS_RD_KAFKA_CONTROLLERID
/* {{{ proto int RdKafka\KafkaConsumer::getControllerId(int $timeout_ms)
Returns the current ControllerId (controller broker id) as reported in broker metadata */
PHP_METHOD(RdKafka_KafkaConsumer, getControllerId)
{
object_intern *intern;
zend_long timeout;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) {
return;
}

intern = get_object(getThis());
if (!intern) {
return;
}

RETURN_LONG(rd_kafka_controllerid(intern->rk, timeout));
}
/* }}} */
#endif

/* {{{ proto RdKafka\KafkaConsumerTopic RdKafka\KafkaConsumer::newTopic(string $topic)
Returns a RdKafka\KafkaConsumerTopic object */
PHP_METHOD(RdKafka_KafkaConsumer, newTopic)
Expand Down
5 changes: 5 additions & 0 deletions kafka_consumer.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public function unsubscribe(): void {}
/** @tentative-return-type */
public function getMetadata(bool $all_topics, ?Topic $only_topic, int $timeout_ms): Metadata {}

#ifdef HAS_RD_KAFKA_CONTROLLERID
/** @tentative-return-type */
public function getControllerId(int $timeout_ms): int {}
#endif

/** @tentative-return-type */
public function newTopic(string $topic_name, ?TopicConf $topic_conf = null): KafkaConsumerTopic {}

Expand Down
Loading

0 comments on commit 554305b

Please sign in to comment.