Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segmentation fault when KafkaConsumer objects get destroyed #571

Open
plehatron opened this issue Dec 11, 2024 · 0 comments
Open

Segmentation fault when KafkaConsumer objects get destroyed #571

plehatron opened this issue Dec 11, 2024 · 0 comments
Labels

Comments

@plehatron
Copy link

plehatron commented Dec 11, 2024

Description

A segmentation fault sporadically occurs at the end of a PHP process that creates many KafkaConsumer instances. The consumers are usually closed manually by calling the close method.

We only noticed this bug when running our PHPUnit test cases on the official PHP docker image php:8.3.14 (Debian-based), while the Alpine version php:8.3.14-alpine doesn't manifest this bug. Our test cases create many KafkaConsumer instances, and the segmentation fault occurs only at the end of the running PHP process.

I believe the problem lies in the PHP_METHOD(RdKafka_KafkaConsumer, close) function that sets the intern->rk = NULL but doesn't call rd_kafka_destroy. Then, in kafka_consumer_free, the rd_kafka_destroy call is skipped because intern->rk is already NULL.

The fix from @ikeberlein's PR #540 resolves this problem for me. After applying the patch, I no longer see segmentation faults.

Here's the reproducer code:

<?php

pcntl_sigprocmask(SIG_BLOCK, [SIGIO]);

$brokerList = 'kafka:9092';
$topicName = 'test-topic';
$consumerGroupId = 'test-consumer';

for ($iteration = 0; $iteration <= 100; $iteration++) {

    echo "Producing messages (iteration {$iteration})...\n";

    $producerConfig = new \RdKafka\Conf();
    $producerConfig->set('message.send.max.retries', '2147483647');
    $producerConfig->set('enable.idempotence', 'true');
    $producerConfig->set('topic.metadata.refresh.sparse', 'true');
    $producerConfig->set('internal.termination.signal', (string) SIGIO);
    $producerConfig->set('metadata.broker.list', $brokerList);
    $producerConfig->set('socket.timeout.ms', '10000');
    $producerConfig->set('compression.type', 'snappy');

    $producer = new \RdKafka\Producer($producerConfig);
    $topic = $producer->newTopic($topicName);
    for ($id = 1; $id <= 1000; $id++) {
        $topic->producev(
            RD_KAFKA_PARTITION_UA,
            0,
            json_encode(['id' => $id, 'value' => random_int(1, 10000)]),
            $id,
            ['value' => time()]
        );
        if ($id % 100 === 0) {
            $producer->poll(1);
            echo "$id messages produced\n";
        }
    }

    while ($producer->getOutQLen() > 0) {
        $producer->poll(1);
    }

    for ($flushRetries = 0; $flushRetries < 3; ++$flushRetries) {
        $result = $producer->flush(10000);
        if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
            break;
        }
    }

    echo "Consuming messages (iteration {$iteration})...\n";

    $consumerConfig = new \RdKafka\Conf();
    $consumerConfig->set('group.id', $consumerGroupId);
    $consumerConfig->set('metadata.broker.list', $brokerList);
    $consumerConfig->set('auto.offset.reset', 'smallest');
    $consumerConfig->set('enable.auto.commit', 'false');
    $consumerConfig->set('enable.partition.eof', 'true');
    $consumerConfig->set('topic.metadata.refresh.sparse', 'true');
    $consumerConfig->set('socket.timeout.ms', '10000');
    $consumerConfig->set('internal.termination.signal', (string) SIGIO);

    $consumer = new \RdKafka\KafkaConsumer($consumerConfig);
    $consumer->subscribe([$topicName]);
    $messagesConsumed = 0;
    $lastMessage = null;
    while ($rdkafkaMessage = $consumer->consume(10000)) {
        if (RD_KAFKA_RESP_ERR__PARTITION_EOF === $rdkafkaMessage->err) {
            break;
        }

        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $rdkafkaMessage->err) {
            throw new \Exception(sprintf('Error: %s', $rdkafkaMessage->errstr()));
        }

        $lastMessage = $rdkafkaMessage;
        $messagesConsumed++;
        if ($messagesConsumed % 100 === 0) {
            echo "$messagesConsumed messages consumed\n";
        }
    }

    if ($lastMessage) {
        $consumer->commit($lastMessage);
    }

    echo sprintf("Consumed total %d messages (iteration %d)\n", $messagesConsumed, $iteration);

    $consumer->unsubscribe();
    $consumer->close();
}

Resulted in this output with a segmentation fault:

...
Producing messages (iteration 100)...
100 messages produced
200 messages produced
300 messages produced
400 messages produced
500 messages produced
600 messages produced
700 messages produced
800 messages produced
900 messages produced
1000 messages produced
Consuming messages (iteration 100)...
100 messages consumed
200 messages consumed
300 messages consumed
400 messages consumed
500 messages consumed
600 messages consumed
700 messages consumed
800 messages consumed
900 messages consumed
1000 messages consumed
Consumed total 1000 messages (iteration 100)
Segmentation fault

But I expected this output instead without segmentation fault:

...
Producing messages (iteration 100)...
100 messages produced
200 messages produced
300 messages produced
400 messages produced
500 messages produced
600 messages produced
700 messages produced
800 messages produced
900 messages produced
1000 messages produced
Consuming messages (iteration 100)...
100 messages consumed
200 messages consumed
300 messages consumed
400 messages consumed
500 messages consumed
600 messages consumed
700 messages consumed
800 messages consumed
900 messages consumed
1000 messages consumed
Consumed total 1000 messages (iteration 100)

php-rdkafka Version

php-rdkafka 6.0.3-6.0.5

librdkafka Version

librdkafka 2.0.2-2.6.1

PHP Version

PHP 8.3.14

Operating System

No response

Kafka Version

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant