Skip to content

Commit

Permalink
KAFKA-15764: Missing Tests for Transactions (apache#14702)
Browse files Browse the repository at this point in the history
I ran this test 40 times without KAFKA-15653 with and without compression enabled.
With compression it failed 39/40 times and without it passed 40/40 times.

With the KAFKA-15653 and compression it passed 40/40 times locally

Reviewers: Jason Gustafson <[email protected]>
  • Loading branch information
jolshan authored Dec 15, 2023
1 parent 3bd8ec1 commit ed7ad6d
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
34 changes: 32 additions & 2 deletions core/src/test/scala/integration/kafka/api/TransactionsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import kafka.server.KafkaConfig
import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.utils.TestUtils.{consumeRecords, waitUntilTrue}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerGroupMetadata, OffsetAndMetadata}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException}
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -820,6 +821,33 @@ class TransactionsTest extends IntegrationTestHarness {
assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testTransactionsWithCompression(quorum: String): Unit = {
val numRecords = 50
val numProducersWithCompression = 5
val numTransactions = 40
val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()

for (i <- 0 until numProducersWithCompression) {
transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy")
}
createTopic("topic", 100, brokerCount, topicConfig())
transactionalCompressionProducers.foreach(_.initTransactions())

for (i <- 0 until numTransactions) {
transactionalCompressionProducers.foreach(_.beginTransaction())

for (i <- 0 until numRecords) {
transactionalCompressionProducers.foreach(producer =>
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus("topic", null, i.toString, producer.toString, willBeCommitted = true),
new ErrorLoggingCallback("topic", i.toString.getBytes(StandardCharsets.UTF_8), producer.toString.getBytes(StandardCharsets.UTF_8), true))
)
}
transactionalCompressionProducers.foreach(_.commitTransaction())
}
}

private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String,
start: Int, end: Int, willBeCommitted: Boolean): Unit = {
for (i <- start until end) {
Expand Down Expand Up @@ -852,14 +880,16 @@ class TransactionsTest extends IntegrationTestHarness {
transactionTimeoutMs: Long = 60000,
maxBlockMs: Long = 60000,
deliveryTimeoutMs: Int = 120000,
requestTimeoutMs: Int = 30000): KafkaProducer[Array[Byte], Array[Byte]] = {
requestTimeoutMs: Int = 30000,
compressionType: String = "none"): KafkaProducer[Array[Byte], Array[Byte]] = {
val producer = TestUtils.createTransactionalProducer(
transactionalId,
brokers,
transactionTimeoutMs = transactionTimeoutMs,
maxBlockMs = maxBlockMs,
deliveryTimeoutMs = deliveryTimeoutMs,
requestTimeoutMs = requestTimeoutMs
requestTimeoutMs = requestTimeoutMs,
compressionType = compressionType
)
transactionalProducers += producer
producer
Expand Down
4 changes: 3 additions & 1 deletion core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1896,7 +1896,8 @@ object TestUtils extends Logging {
maxBlockMs: Long = 60000,
deliveryTimeoutMs: Int = 120000,
requestTimeoutMs: Int = 30000,
maxInFlight: Int = 5): KafkaProducer[Array[Byte], Array[Byte]] = {
maxInFlight: Int = 5,
compressionType: String = "none"): KafkaProducer[Array[Byte], Array[Byte]] = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, plaintextBootstrapServers(brokers))
props.put(ProducerConfig.ACKS_CONFIG, "all")
Expand All @@ -1908,6 +1909,7 @@ object TestUtils extends Logging {
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs.toString)
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs.toString)
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlight.toString)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer)
}

Expand Down

0 comments on commit ed7ad6d

Please sign in to comment.