We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
The following code:
$settings = [ 'socket.keepalive.enable' => true, 'log_level' => LOG_WARNING, 'enable.auto.offset.store' => 'true', 'auto.offset.reset' => 'earliest', 'enable.partition.eof' => 'false', 'enable.auto.commit' => 'false', 'max.poll.interval.ms' => 300000, 'session.timeout.ms' => 45000, 'group.id' => 'test-group', 'group.instance.id' => uniqid('', true), 'metadata.broker.list' => 'kafka1:9092,kafka2:9092,kafka3:9092', ]; foreach ($settings as $key => $value) { $conf->set($key, $value); } $consumer = new KafkaConsumer($conf); $consumer->subscribe(['dave-test1']); while (!$quit) { $message = $consumer->consume(60 * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: echo "consume: {$message->payload}\n"; $consumer->commitAsync($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "consume: no more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "consume: timed out\n"; break; default: throw new \RuntimeException($message->errstr(), $message->err); } } try { // Is this the best practice to ensure the success of commit offset, such as commitAsync failure ? $consumer->commit(null); } catch (Exception $e) { if ($e->getCode() !== RD_KAFKA_RESP_ERR__NO_OFFSET) { echo 'commit fail: ' . $e->getMessage(); } } echo "consumer process end PID {$process->pid}\n";
Resulted in this output:
Not sure commit all offsets
But I expected this output instead:
Commit all offsets even if commitAsync fails
6.0.1
1.8.2
7.4.33
Aws linux 2
2.1.1
The text was updated successfully, but these errors were encountered:
No branches or pull requests
Description
The following code:
Resulted in this output:
But I expected this output instead:
php-rdkafka Version
6.0.1
librdkafka Version
1.8.2
PHP Version
7.4.33
Operating System
Aws linux 2
Kafka Version
2.1.1
The text was updated successfully, but these errors were encountered: