From c808091ff1de7c94c4253cb56a34f20a092ac7fa Mon Sep 17 00:00:00 2001 From: Katie Liu Date: Thu, 22 Jun 2023 13:58:36 -0700 Subject: [PATCH] Close SystemConsumer properly --- .../samza/checkpoint/kafka/KafkaCheckpointManager.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index 7793b562e2..7ef0b62138 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -68,6 +68,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, val expectedGrouperFactory: String = new JobConfig(config).getSystemStreamPartitionGrouperFactory val systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName) + var systemConsumerStarted: Boolean = false val systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName) var taskNames: Set[TaskName] = Set[TaskName]() @@ -121,6 +122,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, info(s"Starting the checkpoint SystemConsumer from oldest offset $oldestOffset") systemConsumer.register(checkpointSsp, oldestOffset) systemConsumer.start() + systemConsumerStarted = true } /** @@ -148,6 +150,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, if (stopConsumerAfterFirstRead) { info("Stopping system consumer") systemConsumer.stop() + systemConsumerStarted = false } } else if (!stopConsumerAfterFirstRead) { taskNamesToCheckpoints ++= readCheckpoints() @@ -224,9 +227,10 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, info ("Stopping system producer.") producerRef.get().stop() - if (!stopConsumerAfterFirstRead) { + if (systemConsumerStarted) { info("Stopping system consumer") systemConsumer.stop() + systemConsumerStarted = false } info("CheckpointManager stopped.")