Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add getControllerId #554

Merged
merged 5 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ jobs:
tests:
name: 'Tests'
strategy:
fail-fast: false
matrix:
include:
# Latest librdkafka 2.x with memcheck
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/test/start-kafka.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#!/bin/sh

docker network create kafka_network
docker pull wurstmeister/zookeeper:3.4.6
docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:3.4.6
docker pull wurstmeister/kafka:2.13-2.6.0
docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:2.13-2.6.0
docker pull wurstmeister/zookeeper:latest
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a fixed version here, otherwise the CI may break independently of changes made in this repos

Copy link
Contributor Author

@qkdreyer qkdreyer Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the thing is wurstmeister/zookeeper only exists with 3.4.6 and latest tags, and 3.4.6 tag seems broken for 2 months

docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:latest
docker pull wurstmeister/kafka:latest
docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:latest
printf "\n127.0.0.1 kafka\n"|sudo tee /etc/hosts >/dev/null

echo "Waiting for Kafka to be ready"
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ modules
package.xml
rdkafka-*.tgz
run-tests.php
gen_stub.php
tests/*/*.diff
tests/*/*.exp
tests/*/*.log
Expand Down
55 changes: 42 additions & 13 deletions conf_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,22 +1,38 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: 86e8e9fcd235f3affc4ef30ca0d96395abcad13f */
* Stub hash: 86934f54199c0af37cadfcedafeaffb569e33beb */

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0)
ZEND_END_ARG_INFO()

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_dump, 0, 0, IS_ARRAY, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_dump, 0, 0, 0)
#endif
ZEND_END_ARG_INFO()

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_set, 0, 2, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_set, 0, 0, 2)
#endif
ZEND_ARG_TYPE_INFO(0, name, IS_STRING, 0)
ZEND_ARG_TYPE_INFO(0, value, IS_STRING, 0)
ZEND_END_ARG_INFO()

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setDefaultTopicConf, 0, 1, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setDefaultTopicConf, 0, 0, 1)
#endif
ZEND_ARG_OBJ_INFO(0, topic_conf, RdKafka\\TopicConf, 0)
ZEND_END_ARG_INFO()

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setErrorCb, 0, 1, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setErrorCb, 0, 0, 1)
#endif
ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0)
ZEND_END_ARG_INFO()

Expand All @@ -32,8 +48,14 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 1, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 0, 1)
#endif
ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0)
ZEND_END_ARG_INFO()
#endif

#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct
Expand All @@ -42,11 +64,14 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_TopicConf_set arginfo_class_RdKafka_Conf_set

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 1, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 0, 1)
#endif
ZEND_ARG_TYPE_INFO(0, partitioner, IS_LONG, 0)
ZEND_END_ARG_INFO()


ZEND_METHOD(RdKafka_Conf, __construct);
ZEND_METHOD(RdKafka_Conf, dump);
ZEND_METHOD(RdKafka_Conf, set);
Expand All @@ -58,15 +83,12 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif

ZEND_METHOD(RdKafka_TopicConf, __construct);
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);


static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, __construct, arginfo_class_RdKafka_Conf___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, dump, arginfo_class_RdKafka_Conf_dump, ZEND_ACC_PUBLIC)
Expand All @@ -79,17 +101,24 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
#endif
#endif
ZEND_FE_END
};


static const zend_function_entry class_RdKafka_TopicConf_methods[] = {
ZEND_ME(RdKafka_TopicConf, __construct, arginfo_class_RdKafka_TopicConf___construct, ZEND_ACC_PUBLIC)
ZEND_MALIAS(RdKafka_Conf, dump, dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
ZEND_MALIAS(RdKafka_Conf, set, set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
#if (PHP_VERSION_ID >= 80400)
ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC, NULL, NULL)
#else
ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
#endif
#if (PHP_VERSION_ID >= 80400)
ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC, NULL, NULL)
#else
ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
#endif
ZEND_ME(RdKafka_TopicConf, setPartitioner, arginfo_class_RdKafka_TopicConf_setPartitioner, ZEND_ACC_PUBLIC)
ZEND_FE_END
};
Expand Down
29 changes: 18 additions & 11 deletions conf_legacy_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: 86e8e9fcd235f3affc4ef30ca0d96395abcad13f */
* Stub hash: 86934f54199c0af37cadfcedafeaffb569e33beb */

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0)
ZEND_END_ARG_INFO()
Expand Down Expand Up @@ -32,8 +32,10 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 0, 1)
ZEND_ARG_INFO(0, callback)
ZEND_END_ARG_INFO()
#endif

#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct
Expand All @@ -46,7 +48,6 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 0, 1)
ZEND_ARG_INFO(0, partitioner)
ZEND_END_ARG_INFO()


ZEND_METHOD(RdKafka_Conf, __construct);
ZEND_METHOD(RdKafka_Conf, dump);
ZEND_METHOD(RdKafka_Conf, set);
Expand All @@ -58,13 +59,12 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif
ZEND_METHOD(RdKafka_TopicConf, __construct);
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);


static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, __construct, arginfo_class_RdKafka_Conf___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, dump, arginfo_class_RdKafka_Conf_dump, ZEND_ACC_PUBLIC)
Expand All @@ -77,17 +77,24 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
#endif
#endif
ZEND_FE_END
};


static const zend_function_entry class_RdKafka_TopicConf_methods[] = {
ZEND_ME(RdKafka_TopicConf, __construct, arginfo_class_RdKafka_TopicConf___construct, ZEND_ACC_PUBLIC)
ZEND_MALIAS(RdKafka_Conf, dump, dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
ZEND_MALIAS(RdKafka_Conf, set, set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
#if (PHP_VERSION_ID >= 80400)
ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC, NULL, NULL)
#else
ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
#endif
#if (PHP_VERSION_ID >= 80400)
ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC, NULL, NULL)
#else
ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
#endif
ZEND_ME(RdKafka_TopicConf, setPartitioner, arginfo_class_RdKafka_TopicConf_setPartitioner, ZEND_ACC_PUBLIC)
ZEND_FE_END
};
Expand Down
10 changes: 10 additions & 0 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ if test "$PHP_RDKAFKA" != "no"; then
AC_MSG_WARN([purge is not available])
])

AC_CHECK_LIB($LIBNAME,[rd_kafka_controllerid],[
#if RD_KAFKA_VERSION >= 0x010000ff
AC_DEFINE(HAS_RD_KAFKA_CONTROLLERID,1,[ ])
#else
AC_MSG_WARN([controllerid is broken on 0.11.x])
#endif
],[
AC_MSG_WARN([controllerid is not available])
])

AC_CHECK_LIB($LIBNAME,[rd_kafka_init_transactions],[
AC_DEFINE(HAS_RD_KAFKA_TRANSACTIONS,1,[ ])
SOURCES="$SOURCES kafka_error_exception.c"
Expand Down
24 changes: 23 additions & 1 deletion kafka_consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ PHP_METHOD(RdKafka_KafkaConsumer, close)
}
/* }}} */

/* {{{ proto Metadata RdKafka\KafkaConsumer::getMetadata(bool all_topics, RdKafka\Topic only_topic, int timeout_ms)
/* {{{ proto RdKafka\Metadata RdKafka\KafkaConsumer::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms)
Request Metadata from broker */
PHP_METHOD(RdKafka_KafkaConsumer, getMetadata)
{
Expand Down Expand Up @@ -581,6 +581,28 @@ PHP_METHOD(RdKafka_KafkaConsumer, getMetadata)
}
/* }}} */

#ifdef HAS_RD_KAFKA_CONTROLLERID
Copy link
Owner

@arnaud-lb arnaud-lb Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rd_kafka_controllerid appears to be broken in librdkafka 0.11.x. I suggest not compiling this method on this version:

Suggested change
#ifdef HAS_RD_KAFKA_CONTROLLERID
#if defined(HAS_RD_KAFKA_CONTROLLERID) && RD_KAFKA_VERSION >= 0x010000ff

Then update the test SKIPIF section to check if the method exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've improved HAS_RD_KAFKA_CONTROLLERID the definition instead

php-rdkafka/config.m4

Lines 74 to 82 in 015b7a2

AC_CHECK_LIB($LIBNAME,[rd_kafka_controllerid],[
#if RD_KAFKA_VERSION >= 0x010000ff
AC_DEFINE(HAS_RD_KAFKA_CONTROLLERID,1,[ ])
#else
AC_MSG_WARN([controllerid is broken on 0.11.x])
#endif
],[
AC_MSG_WARN([controllerid is not available])
])

/* {{{ proto int RdKafka\KafkaConsumer::getControllerId(int $timeout_ms)
Returns the current ControllerId (controller broker id) as reported in broker metadata */
PHP_METHOD(RdKafka_KafkaConsumer, getControllerId)
{
object_intern *intern;
zend_long timeout;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) {
return;
}

intern = get_object(getThis());
if (!intern) {
return;
}

RETURN_LONG(rd_kafka_controllerid(intern->rk, timeout));
}
/* }}} */
#endif

/* {{{ proto RdKafka\KafkaConsumerTopic RdKafka\KafkaConsumer::newTopic(string $topic)
Returns a RdKafka\KafkaConsumerTopic object */
PHP_METHOD(RdKafka_KafkaConsumer, newTopic)
Expand Down
5 changes: 5 additions & 0 deletions kafka_consumer.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public function unsubscribe(): void {}
/** @tentative-return-type */
public function getMetadata(bool $all_topics, ?Topic $only_topic, int $timeout_ms): Metadata {}

#ifdef HAS_RD_KAFKA_CONTROLLERID
/** @tentative-return-type */
public function getControllerId(int $timeout_ms): int {}
#endif

/** @tentative-return-type */
public function newTopic(string $topic_name, ?TopicConf $topic_conf = null): KafkaConsumerTopic {}

Expand Down
Loading
Loading