Skip to content

Commit

Permalink
feat: add support for oauthbearer_set_token/set_token_failure
Browse files Browse the repository at this point in the history
  • Loading branch information
cb-freddysart committed Jan 10, 2024
1 parent bcd5004 commit ce6d3e9
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 15 deletions.
2 changes: 1 addition & 1 deletion conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ PHP_METHOD(RdKafka_Conf, setLogCb)
}
/* }}} */

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#ifdef HAS_RD_KAFKA_OAUTHBEARER
/* {{{ proto void RdKafka\Conf::setOauthbearerTokenRefreshCb(mixed $callback)
Set token refresh callback for OAUTHBEARER sasl */
PHP_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb)
Expand Down
2 changes: 1 addition & 1 deletion conf.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public function setOffsetCommitCb(callable $callback): void {}
/** @tentative-return-type */
public function setLogCb(callable $callback): void {}

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#ifdef HAS_RD_KAFKA_OAUTHBEARER
/** @tentative-return-type */
public function setOauthbearerTokenRefreshCb(callable $callback): void {}
#endif
Expand Down
6 changes: 3 additions & 3 deletions conf_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#ifdef HAS_RD_KAFKA_OAUTHBEARER
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
#endif

Expand All @@ -59,7 +59,7 @@ ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#ifdef HAS_RD_KAFKA_OAUTHBEARER
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif

Expand All @@ -79,7 +79,7 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#ifdef HAS_RD_KAFKA_OAUTHBEARER
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
Expand Down
6 changes: 3 additions & 3 deletions conf_legacy_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#ifdef HAS_RD_KAFKA_OAUTHBEARER
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
#endif

Expand All @@ -58,7 +58,7 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#ifdef HAS_RD_KAFKA_OAUTHBEARER
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif
ZEND_METHOD(RdKafka_TopicConf, __construct);
Expand All @@ -77,7 +77,7 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#ifdef HAS_RD_KAFKA_OAUTHBEARER
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
Expand Down
6 changes: 3 additions & 3 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ if test "$PHP_RDKAFKA" != "no"; then
AC_MSG_WARN([murmur2 partitioner is not available])
])

AC_CHECK_LIB($LIBNAME,[rd_kafka_conf_set_oauthbearer_token_refresh_cb],[
AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB,1,[ ])
AC_CHECK_LIB($LIBNAME,[rd_kafka_oauthbearer_set_token],[
AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER,1,[ ])
],[
AC_MSG_WARN([oauthbearer token refresh cb is not available])
AC_MSG_WARN([oauthbearer support is not available])
])

LDFLAGS="$ORIG_LDFLAGS"
Expand Down
132 changes: 132 additions & 0 deletions rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,138 @@ PHP_METHOD(RdKafka, setLogLevel)
}
/* }}} */

#ifdef HAS_RD_KAFKA_OAUTHBEARER
/* {{{ proto void RdKafka::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = [])
* Set SASL/OAUTHBEARER token and metadata
*
* The SASL/OAUTHBEARER token refresh callback or event handler should cause
* this method to be invoked upon success, via
* $kafka->oauthbearerSetToken(). The extension keys must not include the
* reserved key "`auth`", and all extension keys and values must conform to the
* required format as per https://tools.ietf.org/html/rfc7628#section-3.1:
*
* key = 1*(ALPHA)
* value = *(VCHAR / SP / HTAB / CR / LF )
*/
PHP_METHOD(RdKafka, oauthbearerSetToken)
{
kafka_object *intern;
char *token_value;
size_t token_value_len;
zend_long lifetime_ms;
char *principal_name;
size_t principal_len;
zval *extensions_arg = NULL;

char errstr[512];
rd_kafka_resp_err_t ret = 0;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "sls|a", &token_value, &token_value_len, &lifetime_ms, &principal_name, &principal_len, &extensions_arg) == FAILURE) {
return;
}

intern = get_kafka_object(getThis());
if (!intern) {
return;
}

errstr[0] = '\0';

int extension_size;
char **extensions = NULL;

if (extensions_arg != NULL) {
extension_size = zend_hash_num_elements(Z_ARRVAL_P(extensions_arg)) * 2;
extensions = safe_emalloc((extension_size * 2), sizeof(char *), 0);

int pos = 0;
zend_ulong num_key;
zend_string *extension_key_str;
zval *extension_zval;
ZEND_HASH_FOREACH_KEY_VAL(Z_ARRVAL_P(extensions_arg), num_key, extension_key_str, extension_zval) {
zend_string *tmp_extension_val_str;
zend_string *extension_val_str = zval_get_tmp_string(extension_zval, &tmp_extension_val_str);

extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str));
extensions[pos++] = estrdup(ZSTR_VAL(extension_val_str));

zend_tmp_string_release(tmp_extension_val_str);
} ZEND_HASH_FOREACH_END();
}

/* rd_kafka_oauthbearer_set_token(rd_kafka_t *rk,
const char *token_value,
int64_t md_lifetime_ms,
const char *md_principal_name,
const char **extensions,
size_t extension_size,
char *errstr,
size_t errstr_size */
ret = rd_kafka_oauthbearer_set_token(
intern->rk,
token_value,
lifetime_ms,
principal_name,
extensions,
extension_size,
errstr,
sizeof(errstr));

if (extensions != NULL) {
efree(extensions);
}

switch (ret) {
case RD_KAFKA_RESP_ERR__INVALID_ARG:
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__INVALID_ARG);
return;
case RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED:
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED);
return;
case RD_KAFKA_RESP_ERR__STATE:
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__STATE);
return;
case RD_KAFKA_RESP_ERR_NO_ERROR:
break;
}
}
/* }}} */

/* {{{ proto void RdKafka::oauthbearerSetTokenFailure(string $error)
The SASL/OAUTHBEARER token refresh callback or event handler should cause
this method to be invoked upon failure, via
rd_kafka_oauthbearer_set_token_failure().
*/
PHP_METHOD(RdKafka, oauthbearerSetTokenFailure)
{
kafka_object *intern;
const char *errstr;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "s", &errstr) == FAILURE) {
return;
}

intern = get_kafka_object(getThis());
if (!intern) {
return;
}

rd_kafka_resp_err_t ret = rd_kafka_oauthbearer_set_token_failure(intern->rk, errstr);

switch (ret) {
case RD_KAFKA_RESP_ERR__INVALID_ARG:
zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__INVALID_ARG);
return;
case RD_KAFKA_RESP_ERR__STATE:
zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__STATE);
return;
case RD_KAFKA_RESP_ERR_NO_ERROR:
break;
}
}
/* }}} */
#endif

/* {{{ proto RdKafka\Topic RdKafka::newTopic(string $topic)
Returns an RdKafka\Topic object */
PHP_METHOD(RdKafka, newTopic)
Expand Down
8 changes: 8 additions & 0 deletions rdkafka.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ public function pausePartitions(array $topic_partitions): array {}

/** @tentative-return-type */
public function resumePartitions(array $topic_partitions): array {}

#ifdef HAS_RD_KAFKA_OAUTHBEARER
/** @tentative-return-type */
public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void;

/** @tentative-return-type */
public function oauthbearerSetTokenFailure(string $error): void;
#endif
}
}

Expand Down
Loading

0 comments on commit ce6d3e9

Please sign in to comment.