diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 5cb8a73e4b628..a7a6e915d99ca 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -774,8 +774,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState .getOrElse(commitRecordMetadataAndOffset.offsetAndMetadata.commitTimestamp) ) - case Some(ConsumerProtocol.PROTOCOL_TYPE) if subscribedTopics.isDefined => - // consumers exist in the group => + case Some(ConsumerProtocol.PROTOCOL_TYPE) if subscribedTopics.isDefined && is(Stable) => + // consumers exist in the group and group is stable => // - if the group is aware of the subscribed topics and retention period had passed since the // the last commit timestamp, expire the offset. offset with pending offset commit are not // expired diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala index 275b7f62351e5..eb0748d537c2d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala @@ -22,7 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.utils.{Time, MockTime} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{BeforeEach, Test} @@ -259,6 +259,48 @@ class GroupMetadataTest { assertFalse(group.supportsProtocols(protocolType, Set("range", "foo"))) } + @Test + def testOffsetRemovalDuringTransitionFromEmptyToNonEmpty(): Unit = { + val topic = "foo" + val partition = new TopicPartition(topic, 0) + val time = new MockTime() + group = new GroupMetadata("groupId", Empty, time) + + // Rebalance once in order to commit offsets + val member = new MemberMetadata(memberId, None, clientId, clientHost, rebalanceTimeoutMs, + sessionTimeoutMs, protocolType, List(("range", ConsumerProtocol.serializeSubscription(new Subscription(List("foo").asJava)).array()))) + group.transitionTo(PreparingRebalance) + group.add(member) + group.initNextGeneration() + assertEquals(Some(Set("foo")), group.getSubscribedTopics) + + val offset = offsetAndMetadata(offset = 37, timestamp = time.milliseconds()) + val commitRecordOffset = 3 + + group.prepareOffsetCommit(Map(partition -> offset)) + assertTrue(group.hasOffsets) + assertEquals(None, group.offset(partition)) + group.onOffsetCommitAppend(partition, CommitRecordMetadataAndOffset(Some(commitRecordOffset), offset)) + + val offsetRetentionMs = 50000L + time.sleep(offsetRetentionMs + 1) + + // Rebalance again so that the group becomes empty + group.transitionTo(PreparingRebalance) + group.remove(memberId) + group.initNextGeneration() + + // The group is empty, but we should not expire the offset because the state was just changed + assertEquals(Empty, group.currentState) + assertEquals(Map.empty, group.removeExpiredOffsets(time.milliseconds(), offsetRetentionMs)) + + // Start a new rebalance to add the member back. The offset should not be expired + // while the rebalance is in progress. + group.transitionTo(PreparingRebalance) + group.add(member) + assertEquals(Map.empty, group.removeExpiredOffsets(time.milliseconds(), offsetRetentionMs)) + } + @Test def testSubscribedTopics(): Unit = { // not able to compute it for a newly created group @@ -718,8 +760,8 @@ class GroupMetadataTest { assertTrue(group.is(targetState)) } - private def offsetAndMetadata(offset: Long): OffsetAndMetadata = { - OffsetAndMetadata(offset, "", Time.SYSTEM.milliseconds()) + private def offsetAndMetadata(offset: Long, timestamp: Long = Time.SYSTEM.milliseconds()): OffsetAndMetadata = { + OffsetAndMetadata(offset, "", timestamp) } }