diff --git a/.github/workflows/test/build-librdkafka.sh b/.github/workflows/test/build-librdkafka.sh index 99a363b1..7674b7d6 100755 --- a/.github/workflows/test/build-librdkafka.sh +++ b/.github/workflows/test/build-librdkafka.sh @@ -5,7 +5,7 @@ set -ex if ! [ -f ~/build-cache/librdkafka/usr/local/include/librdkafka/rdkafka.h ] || ! [ -f ~/build-cache/librdkafka/usr/local/bin/kafkacat ]; then echo "librdkafka build is not cached" - git clone --depth 1 --branch "${LIBRDKAFKA_VERSION:-1.5.0}" "${LIBRDKAFKA_REPOSITORY_URL:-https://github.com/edenhill/librdkafka.git}" + git clone --depth 1 --branch "${LIBRDKAFKA_VERSION:-v2.3.0}" "${LIBRDKAFKA_REPOSITORY_URL:-https://github.com/edenhill/librdkafka.git}" cd librdkafka ./configure diff --git a/conf.c b/conf.c index 1701a022..df3472b5 100644 --- a/conf.c +++ b/conf.c @@ -66,6 +66,8 @@ void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs) /* {{{ */ cbs->offset_commit = NULL; kafka_conf_callback_dtor(cbs->log); cbs->log = NULL; + kafka_conf_callback_dtor(cbs->oauthbearer_token_refresh); + cbs->oauthbearer_token_refresh = NULL; } /* }}} */ static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callback *from) /* {{{ */ @@ -337,6 +339,40 @@ static void kafka_conf_log_cb(const rd_kafka_t *rk, int level, const char *facil zval_ptr_dtor(&args[3]); } +/* +void rd_kafka_conf_set_oauthbearer_token_refresh_cb( + rd_kafka_conf_t *conf, + void (*oauthbearer_token_refresh_cb)(rd_kafka_t *rk, + const char *oauthbearer_config, + void *opaque)) { +}*/ +static void kafka_conf_set_oauthbearer_token_refresh_cb(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque) +{ + kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque; + zval args[2]; + + if (!opaque) { + return; + } + + if (!cbs->oauthbearer_token_refresh) { + return; + } + + ZVAL_NULL(&args[0]); + ZVAL_NULL(&args[1]); + + ZVAL_ZVAL(&args[0], &cbs->zrk, 1, 0); + ZVAL_STRING(&args[1], oauthbearer_config); + + rdkafka_call_function(&cbs->oauthbearer_token_refresh->fci, &cbs->oauthbearer_token_refresh->fcc, NULL, 2, args); + + zval_ptr_dtor(&args[0]); + zval_ptr_dtor(&args[1]); +} + + + /* {{{ proto RdKafka\Conf::__construct() */ PHP_METHOD(RdKafka_Conf, __construct) { @@ -698,6 +734,40 @@ PHP_METHOD(RdKafka_Conf, setLogCb) } /* }}} */ +#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB +/* {{{ proto void RdKafka\Conf::setOauthbearerTokenRefreshCb(mixed $callback) + Set token refresh callback for OAUTHBEARER sasl */ +PHP_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb) +{ + zend_fcall_info fci; + zend_fcall_info_cache fcc; + kafka_conf_object *conf; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "f", &fci, &fcc) == FAILURE) { + return; + } + + conf = get_kafka_conf_object(getThis()); + if (!conf) { + return; + } + + Z_ADDREF_P(&fci.function_name); + + if (conf->cbs.oauthbearer_token_refresh) { + zval_ptr_dtor(&conf->cbs.oauthbearer_token_refresh->fci.function_name); + } else { + conf->cbs.oauthbearer_token_refresh = ecalloc(1, sizeof(*conf->cbs.oauthbearer_token_refresh)); + } + + conf->cbs.oauthbearer_token_refresh->fci = fci; + conf->cbs.oauthbearer_token_refresh->fcc = fcc; + + rd_kafka_conf_set_oauthbearer_token_refresh_cb(conf->u.conf, kafka_conf_set_oauthbearer_token_refresh_cb); +} +/* }}} */ +#endif + /* {{{ proto RdKafka\TopicConf::__construct() */ PHP_METHOD(RdKafka_TopicConf, __construct) { diff --git a/conf.h b/conf.h index 783cdd52..21df4470 100644 --- a/conf.h +++ b/conf.h @@ -46,6 +46,7 @@ typedef struct _kafka_conf_callbacks { kafka_conf_callback *consume; kafka_conf_callback *offset_commit; kafka_conf_callback *log; + kafka_conf_callback *oauthbearer_token_refresh; } kafka_conf_callbacks; typedef struct _kafka_conf_object { diff --git a/conf.stub.php b/conf.stub.php index 4d1ad3b6..7aa6a62d 100644 --- a/conf.stub.php +++ b/conf.stub.php @@ -44,6 +44,11 @@ public function setOffsetCommitCb(callable $callback): void {} /** @tentative-return-type */ public function setLogCb(callable $callback): void {} + + #ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB + /** @tentative-return-type */ + public function setOauthbearerTokenRefreshCb(callable $callback): void {} + #endif } class TopicConf diff --git a/conf_arginfo.h b/conf_arginfo.h index f823b3e8..983e0e64 100644 --- a/conf_arginfo.h +++ b/conf_arginfo.h @@ -32,6 +32,10 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb +#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB +#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb +#endif + #define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct #define arginfo_class_RdKafka_TopicConf_dump arginfo_class_RdKafka_Conf_dump @@ -54,6 +58,11 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb); ZEND_METHOD(RdKafka_Conf, setConsumeCb); ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb); ZEND_METHOD(RdKafka_Conf, setLogCb); + +#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB +ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb); +#endif + ZEND_METHOD(RdKafka_TopicConf, __construct); ZEND_METHOD(RdKafka_TopicConf, setPartitioner); @@ -70,6 +79,9 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = { ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC) + #ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB + ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC) + #endif ZEND_FE_END }; diff --git a/conf_legacy_arginfo.h b/conf_legacy_arginfo.h index d05e42f2..6120e1cf 100644 --- a/conf_legacy_arginfo.h +++ b/conf_legacy_arginfo.h @@ -32,6 +32,10 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb +#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB +#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb +#endif + #define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct #define arginfo_class_RdKafka_TopicConf_dump arginfo_class_RdKafka_Conf_dump @@ -54,6 +58,9 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb); ZEND_METHOD(RdKafka_Conf, setConsumeCb); ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb); ZEND_METHOD(RdKafka_Conf, setLogCb); +#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB +ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb); +#endif ZEND_METHOD(RdKafka_TopicConf, __construct); ZEND_METHOD(RdKafka_TopicConf, setPartitioner); @@ -70,6 +77,9 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = { ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC) + #ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB + ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC) + #endif ZEND_FE_END }; diff --git a/config.m4 b/config.m4 index 9df6b31b..9a10e91f 100644 --- a/config.m4 +++ b/config.m4 @@ -84,6 +84,12 @@ if test "$PHP_RDKAFKA" != "no"; then AC_MSG_WARN([murmur2 partitioner is not available]) ]) + AC_CHECK_LIB($LIBNAME,[rd_kafka_conf_set_oauthbearer_token_refresh_cb],[ + AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB,1,[ ]) + ],[ + AC_MSG_WARN([oauthbearer token refresh cb is not available]) + ]) + LDFLAGS="$ORIG_LDFLAGS" CPPFLAGS="$ORIG_CPPFLAGS" diff --git a/package.xml b/package.xml index 368fcb8a..926b8167 100644 --- a/package.xml +++ b/package.xml @@ -114,6 +114,7 @@ + diff --git a/tests/conf_callbacks.phpt b/tests/conf_callbacks.phpt index 1ac26408..fd5e881e 100644 --- a/tests/conf_callbacks.phpt +++ b/tests/conf_callbacks.phpt @@ -2,7 +2,7 @@ RdKafka\Conf --SKIPIF-- = 0x090000 || die("skip librdkafka too old"); +(RD_KAFKA_VERSION >= 0x090000 && RD_KAFKA_VERSION < 0x010100ff) || die("skip librdkafka too old"); --FILE-- setRebalanceCb(function () { }); $dump = $conf->dump(); var_dump(isset($dump["rebalance_cb"])); +echo "Checking if oauthbearer cb exists\n"; +var_dump(method_exists($conf, 'setOauthbearerTokenRefreshCb')); --EXPECT-- Setting consume callback @@ -31,3 +33,5 @@ Setting offset_commit callback bool(true) Setting rebalance callback bool(true) +Checking if oauthbearer cb exists +bool(false) diff --git a/tests/conf_callbacks_rdkafka11.phpt b/tests/conf_callbacks_rdkafka11.phpt new file mode 100644 index 00000000..1e7b72fa --- /dev/null +++ b/tests/conf_callbacks_rdkafka11.phpt @@ -0,0 +1,39 @@ +--TEST-- +RdKafka\Conf +--SKIPIF-- += 0x010100ff || die("skip librdkafka too old"); +--FILE-- +setConsumeCb(function () { }); +$dump = $conf->dump(); +var_dump(isset($dump["consume_cb"])); + +echo "Setting offset_commit callback\n"; +$conf->setOffsetCommitCb(function () { }); +$dump = $conf->dump(); +var_dump(isset($dump["offset_commit_cb"])); + +echo "Setting rebalance callback\n"; +$conf->setRebalanceCb(function () { }); +$dump = $conf->dump(); +var_dump(isset($dump["rebalance_cb"])); + +echo "Setting oauth token bearer callback\n"; +$conf->setOauthbearerTokenRefreshCb(function () {}); +$dump = $conf->dump(); +var_dump(isset($dump["oauthbearer_token_refresh_cb"])); + +--EXPECT-- +Setting consume callback +bool(true) +Setting offset_commit callback +bool(true) +Setting rebalance callback +bool(true) +Setting oauth token bearer callback +bool(true)