diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/admin/AdminClient.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/admin/AdminClient.scala index cb7d7054..17ab8b07 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/admin/AdminClient.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/admin/AdminClient.scala @@ -1,26 +1,26 @@ package com.wixpress.dst.greyhound.core.admin -import java.util import com.wixpress.dst.greyhound.core import com.wixpress.dst.greyhound.core.admin.AdminClient.isTopicExistsError +import com.wixpress.dst.greyhound.core.admin.AdminClientMetric.TopicCreateResult.fromExit +import com.wixpress.dst.greyhound.core.admin.AdminClientMetric.{TopicConfigUpdated, TopicCreated, TopicPartitionsIncreased} import com.wixpress.dst.greyhound.core.admin.TopicPropertiesResult.{TopicDoesnExistException, TopicProperties} import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics +import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics._ import com.wixpress.dst.greyhound.core.zioutils.KafkaFutures._ -import com.wixpress.dst.greyhound.core.{CommonGreyhoundConfig, GHThrowable, Group, GroupTopicPartition, Offset, OffsetAndMetadata, Topic, TopicConfig, TopicPartition} +import com.wixpress.dst.greyhound.core._ import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource -import org.apache.kafka.clients.admin.{AlterConfigOp, Config, ConfigEntry, ListConsumerGroupOffsetsOptions, ListConsumerGroupOffsetsSpec, ListOffsetsOptions, ListOffsetsResult, NewPartitions, NewTopic, OffsetSpec, TopicDescription, AdminClient => KafkaAdminClient, AdminClientConfig => KafkaAdminClientConfig} +import org.apache.kafka.clients.admin.{AlterConfigOp, Config, ConfigEntry, ListConsumerGroupOffsetsOptions, ListConsumerGroupOffsetsSpec, NewPartitions, NewTopic, OffsetSpec, AdminClient => KafkaAdminClient, AdminClientConfig => KafkaAdminClientConfig} +import org.apache.kafka.common import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource.Type.TOPIC import org.apache.kafka.common.errors.{InvalidTopicException, TopicExistsException, UnknownTopicOrPartitionException} +import zio.ZIO.attemptBlocking import zio.{IO, RIO, Scope, Trace, ZIO} -import GreyhoundMetrics._ -import com.wixpress.dst.greyhound.core.admin.AdminClientMetric.TopicCreateResult.fromExit -import com.wixpress.dst.greyhound.core.admin.AdminClientMetric.{TopicConfigUpdated, TopicCreated, TopicPartitionsIncreased} -import org.apache.kafka.common +import java.util import scala.collection.JavaConverters._ -import zio.ZIO.attemptBlocking trait AdminClient { def shutdown(implicit trace: Trace): RIO[Any, Unit] @@ -56,8 +56,6 @@ trait AdminClient { implicit trace: Trace ): RIO[Any, Map[GroupTopicPartition, PartitionOffset]] -// def groupOffsetsSpecific(requestedTopicPartitions: Map[Group, Set[TopicPartition]])(implicit trace: Trace): RIO[Any, Map[GroupTopicPartition, PartitionOffset]] - def groupState(groups: Set[Group])(implicit trace: Trace): RIO[Any, Map[String, GroupState]] def deleteTopic(topic: Topic)(implicit trace: Trace): RIO[Any, Unit] @@ -68,13 +66,12 @@ trait AdminClient { implicit trace: Trace ): RIO[Any, Map[TopicPartition, OffsetAndMetadata]] + def consumerGroupsOffsets( + groups: Map[Group, Option[Set[TopicPartition]]] + )(implicit trace: Trace): RIO[Any, Map[Group, Map[TopicPartition, OffsetAndMetadata]]] + def increasePartitions(topic: Topic, newCount: Int)(implicit trace: Trace): RIO[Any with GreyhoundMetrics, Unit] - /** - * @param useNonIncrementalAlter - * \- [[org.apache.kafka.clients.admin.AdminClient.incrementalAlterConfigs()]] is not supported by older brokers (< 2.3), so if this is - * true, use the deprecated non incremental alter - */ def updateTopicConfigProperties( topic: Topic, configProperties: Map[String, ConfigPropOp], @@ -273,13 +270,14 @@ object AdminClient { rawOffsets = result.asScala.toMap.mapValues(_.asScala.toMap) offset = rawOffsets.map { case (group, offsets) => - offsets.map { case (tp, offset) => - ( - GroupTopicPartition(group, TopicPartition.fromKafka(tp)), - PartitionOffset(Option(offset).map(_.offset()).getOrElse(-1L)) - ) - } - .filter{case (_, o) => o.offset >= 0} + offsets + .map { case (tp, offset) => + ( + GroupTopicPartition(group, TopicPartition.fromKafka(tp)), + PartitionOffset(Option(offset).map(_.offset()).getOrElse(-1L)) + ) + } + .filter { case (_, o) => o.offset >= 0 } } groupOffsets = offset.foldLeft(Map.empty[GroupTopicPartition, PartitionOffset])((x, y) => x ++ y) } yield groupOffsets @@ -346,6 +344,30 @@ object AdminClient { } yield res.asScala.toMap.map { case (tp, om) => (TopicPartition(tp), OffsetAndMetadata(om)) } } + override def consumerGroupsOffsets( + groups: Map[Group, Option[Set[TopicPartition]]] + )(implicit trace: Trace): RIO[Any, Map[Group, Map[TopicPartition, OffsetAndMetadata]]] = + for { + desc <- attemptBlocking( + client + .listConsumerGroupOffsets( + groups + .mapValues(tps => + new ListConsumerGroupOffsetsSpec().topicPartitions(tps.map(_.map(_.asKafka).toList.asJava).orNull) + ) + .asJava + ) + ) + res <- attemptBlocking(groups.map(g => (g._1, desc.partitionsToOffsetAndMetadata(g._1).get()))) + } yield res.map { case (group, o) => + ( + group, + o.asScala.toSeq + .map(om => (TopicPartition.fromKafka(om._1), OffsetAndMetadata(om._2.offset(), om._2.metadata()))) + .toMap + ) + } + override def increasePartitions(topic: Topic, newCount: Int)( implicit trace: Trace ): RIO[GreyhoundMetrics, Unit] = {