Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INVALID_TXN_STATE returned from abortTransaction() #556

Open
TimWolla opened this issue Sep 18, 2024 · 1 comment
Open

INVALID_TXN_STATE returned from abortTransaction() #556

TimWolla opened this issue Sep 18, 2024 · 1 comment
Labels

Comments

@TimWolla
Copy link

Description

The following code:

<?php
$conf = new \RdKafka\Conf();
$conf->set('log_level', LOG_DEBUG);
$conf->set('metadata.broker.list', 'localhost:9092');
$conf->set('transactional.id', 'dummy');

$producer = new RdKafka\Producer($conf);

$topic = $producer->newTopic('myTopic');

$producer->initTransactions(3000);

for (;;) {
	$producer->beginTransaction();

	for ($i = 0; $i < 5; $i++) {
		$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
	}

	$producer->commitTransaction(-1);

	$producer->beginTransaction();

	for ($i = 0; $i < 5; $i++) {
		$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
	}

	$producer->abortTransaction(-1);
}

Resulted in this output (after roughly 15 seconds):

%1|1726655131.138|TXNERR|rdkafka#producer-1| [thrd:main]: Fatal transaction error: Failed to end transaction: Broker: Producer attempted a transactional operation in an invalid state (INVALID_TXN_STATE)
%3|1726655131.138|ERROR|rdkafka#producer-1| [thrd:main]: Fatal error: Broker: Producer attempted a transactional operation in an invalid state: Failed to end transaction: Broker: Producer attempted a transactional operation in an invalid state
PHP Fatal error:  Uncaught RdKafka\KafkaErrorException: INVALID_TXN_STATE in test.php:28
Stack trace:
#0 test.php(28): RdKafka\Producer->abortTransaction(-1)
#1 {main}
  thrown in test.php on line 28

But I expected this output instead:

Some rdkafka logs, but no exception.

My understanding is that I safely use the Rdkafka API, using a timeout of -1 to ensure that the commit or abort settles before I attempt to proceed with a new transaction.

I'm running this against:

docker run -it --rm -p 9092:9092 -e KAFKA_LOG4J_LOGGERS="kafka=DEBUG"  apache/kafka:latest

and the Kafka logs show:

[2024-09-18 10:25:31,024] DEBUG TransactionalId dummy prepare transition from PrepareCommit to TxnTransitMetadata(producerId=0, lastProducerId=0, producerEpoch=1, lastProducerEpoch=-1, txnTimeoutMs=60000, txnState=CompleteCommit, topicPartitions=Set(), txnStartTimestamp=1726655131023, txnLastUpdateTimestamp=1726655131024) (kafka.coordinator.transaction.TransactionMetadata)
[2024-09-18 10:25:31,024] DEBUG [TransactionCoordinator id=1] Returning CONCURRENT_TRANSACTIONS error code to client for dummy's AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator)
[2024-09-18 10:25:31,024] DEBUG [UnifiedLog partition=myTopic-0, dir=/tmp/kraft-combined-logs] First unstable offset updated to None (kafka.log.UnifiedLog)
[2024-09-18 10:25:31,024] DEBUG [Partition myTopic-0 broker=1] High watermark updated from (offset=10397, segment=[0:379449]) to (offset=10398, segment=[0:379527]) (kafka.cluster.Partition)
[2024-09-18 10:25:31,024] DEBUG [ReplicaManager broker=1] Produce to local log in 0 ms (kafka.server.ReplicaManager)
[2024-09-18 10:25:31,024] DEBUG [Transaction Marker Request Completion Handler 1]: Received WriteTxnMarker response ClientResponse(receivedTimeMs=1726655131024, latencyMs=0, disconnected=false, timedOut=false, requestHeader=RequestHeader(apiKey=WRITE_TXN_MARKERS, apiVersion=1, clientId=broker-1-txn-marker-sender, correlationId=1732, headerVersion=2), responseBody=WriteTxnMarkersResponseData(markers=[WritableTxnMarkerResult(producerId=0, topics=[WritableTxnMarkerTopicResult(name='myTopic', partitions=[WritableTxnMarkerPartitionResult(partitionIndex=0, errorCode=0)])])])) from node 1 with correlation id 1732 (kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler)
[2024-09-18 10:25:31,024] DEBUG [Transaction Marker Channel Manager 1]: Sending dummy's transaction markers for TransactionMetadata(transactionalId=dummy, producerId=0, producerEpoch=1, txnTimeoutMs=60000, state=PrepareCommit, pendingState=Some(CompleteCommit), topicPartitions=HashSet(), txnStartTimestamp=1726655131023, txnLastUpdateTimestamp=1726655131024) with coordinator epoch 0 succeeded, trying to append complete transaction log now (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2024-09-18 10:25:31,025] DEBUG [Partition __transaction_state-46 broker=1] High watermark updated from (offset=5200, segment=[0:655188]) to (offset=5201, segment=[0:655302]) (kafka.cluster.Partition)
[2024-09-18 10:25:31,025] DEBUG [ReplicaManager broker=1] Produce to local log in 0 ms (kafka.server.ReplicaManager)
[2024-09-18 10:25:31,025] DEBUG TransactionalId dummy complete transition from PrepareCommit to TxnTransitMetadata(producerId=0, lastProducerId=0, producerEpoch=1, lastProducerEpoch=-1, txnTimeoutMs=60000, txnState=CompleteCommit, topicPartitions=Set(), txnStartTimestamp=1726655131023, txnLastUpdateTimestamp=1726655131024) (kafka.coordinator.transaction.TransactionMetadata)
[2024-09-18 10:25:31,025] DEBUG [Transaction State Manager 1]: Updating dummy's transaction state to TxnTransitMetadata(producerId=0, lastProducerId=0, producerEpoch=1, lastProducerEpoch=-1, txnTimeoutMs=60000, txnState=CompleteCommit, topicPartitions=Set(), txnStartTimestamp=1726655131023, txnLastUpdateTimestamp=1726655131024) with coordinator epoch 0 for dummy succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2024-09-18 10:25:31,025] DEBUG [TransactionCoordinator id=1] Aborting append of ABORT to transaction log with coordinator and returning CONCURRENT_TRANSACTIONS error to client for dummy's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)
[2024-09-18 10:25:31,137] DEBUG [TransactionCoordinator id=1] TransactionalId: dummy's state is CompleteCommit, but received transaction marker result to send: ABORT (kafka.coordinator.transaction.TransactionCoordinator)
[2024-09-18 10:25:31,137] DEBUG [TransactionCoordinator id=1] Aborting append of ABORT to transaction log with coordinator and returning INVALID_TXN_STATE error to client for dummy's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)

It appears that Kafka returned a CONCURRENT_TRANSACTIONS error to Rdkafka, which then automatically retried the abort(). The next attempt then resulted in the INVALID_TXN_STATE.

I'm not sure of this is a bug in my test case, php-rdkafka, in librdkafka, or in Kafka itself. Given that I believe that my code is correct, I'm filing it here, because php-rdkafka is what I directly use.

Feel free to file an upstream issue, if you believe that php-rdkafka is doing everything correctly. Let me know if you need any further information.

php-rdkafka Version

6.0.3-1+ubuntu24.04.1+deb.sury.org+2

librdkafka Version

2.3.0-1build2

PHP Version

8.3.11-1+ubuntu24.04.1+deb.sury.org+1

Operating System

Ubuntu 24.04

Kafka Version

3.8.0

@TimWolla TimWolla added the bug label Sep 18, 2024
@arnaud-lb
Copy link
Owner

Thank you!

I was able to reproduce this in C. I've reported it here: confluentinc/librdkafka#4849. I will leave this issue open until confirmation it's not a php-rdkafka issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants