diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Dispatcher.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Dispatcher.scala index a3d93217..81a1c06f 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Dispatcher.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Dispatcher.scala @@ -54,7 +54,7 @@ object Dispatcher { startPaused: Boolean = false, consumeInParallel: Boolean = false, maxParallelism: Int = 1, - updateBatch: Chunk[Record] => UIO[Unit] = _ => ZIO.unit, + updateBatch: Chunk[Record] => URIO[GreyhoundMetrics, Unit] = _ => ZIO.unit, currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, Option[OffsetAndGaps]]] = _ => ZIO.succeed(Map.empty) )(implicit trace: Trace): UIO[Dispatcher[R]] = @@ -73,7 +73,7 @@ object Dispatcher { override def submitBatch(records: Records): URIO[R with Env, SubmitResult] = for { - _ <- report(SubmittingRecordBatch(group, clientId, records.size, consumerAttributes)) + _ <- report(SubmittingRecordBatch(group, clientId, records.size, records, consumerAttributes)) allSamePartition = records.map(r => RecordTopicPartition(r)).distinct.size == 1 submitResult <- if (allSamePartition) { val partition = RecordTopicPartition(records.head) @@ -240,7 +240,7 @@ object Dispatcher { consumerAttributes: Map[String, String], consumeInParallel: Boolean, maxParallelism: Int, - updateBatch: Chunk[Record] => UIO[Unit] = _ => ZIO.unit, + updateBatch: Chunk[Record] => URIO[GreyhoundMetrics, Unit] = _ => ZIO.unit, currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, Option[OffsetAndGaps]]] )(implicit trace: Trace): URIO[R with Env, Worker] = for { queue <- Queue.dropping[Record](capacity) @@ -355,7 +355,7 @@ object Dispatcher { partition: TopicPartition, consumerAttributes: Map[String, String], maxParallelism: Int, - updateBatch: Chunk[Record] => UIO[Unit], + updateBatch: Chunk[Record] => URIO[GreyhoundMetrics, Unit], currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, Option[OffsetAndGaps]]] )(implicit trace: Trace): ZIO[R with GreyhoundMetrics, Any, Boolean] = internalState.update(s => s.cleared).commit *> @@ -391,8 +391,8 @@ object Dispatcher { clientId: ClientId, partition: TopicPartition, consumerAttributes: Map[ClientId, ClientId], - maxParallelism: RuntimeFlags, - updateBatch: Chunk[Record] => UIO[Unit], + maxParallelism: Int, + updateBatch: Chunk[Record] => URIO[GreyhoundMetrics, Unit], currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, Option[OffsetAndGaps]]] ): ZIO[R with GreyhoundMetrics, Throwable, Boolean] = for { @@ -511,8 +511,13 @@ object DispatcherMetric { case class SubmittingRecord[K, V](group: Group, clientId: ClientId, record: ConsumerRecord[K, V], attributes: Map[String, String]) extends DispatcherMetric - case class SubmittingRecordBatch[K, V](group: Group, clientId: ClientId, numRecords: Int, attributes: Map[String, String]) - extends DispatcherMetric + case class SubmittingRecordBatch[K, V]( + group: Group, + clientId: ClientId, + numRecords: Int, + records: Records, + attributes: Map[String, String] + ) extends DispatcherMetric case class HandlingRecord[K, V]( group: Group, diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala index 0708c19a..fd291da0 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala @@ -43,7 +43,7 @@ object EventLoop { offsetsAndGaps <- OffsetsAndGaps.make handle = if (config.consumePartitionInParallel) { cr: Record => handler.handle(cr) } else handler.andThen(offsets.update).handle(_) - updateBatch = { records: Chunk[Record] => offsetsAndGaps.update(records) } + updateBatch = { records: Chunk[Record] => report(HandledBatch(records)) *> offsetsAndGaps.update(records) } currentGaps = { partitions: Set[TopicPartition] => currentGapsForPartitions(partitions, clientId)(consumer) } _ <- report(CreatingDispatcher(clientId, group, consumerAttributes, config.startPaused)) dispatcher <- Dispatcher.make( @@ -278,7 +278,9 @@ object EventLoop { report(PartitionThrottled(partition, partitionToRecords._2.map(_.offset).min, consumer.config.consumerAttributes)).as(acc) else dispatcher.submitBatch(partitionToRecords._2.toSeq).flatMap { - case SubmitResult.Submitted => ZIO.succeed(acc) + case SubmitResult.Submitted => + report(SubmittedBatch(partitionToRecords._2.size, partitionToRecords._1, partitionToRecords._2.map(_.offset))) *> + ZIO.succeed(acc) case RejectedBatch(firstRejected) => report(HighWatermarkReached(partition, firstRejected.offset, consumer.config.consumerAttributes)) *> consumer.pause(firstRejected).fold(_ => acc, _ => acc + partition) @@ -292,7 +294,12 @@ object EventLoop { private def commitOffsetsAndGaps(consumer: Consumer, offsetsAndGaps: OffsetsAndGaps): URIO[GreyhoundMetrics, Unit] = offsetsAndGaps.getCommittableAndClear.flatMap { committable => val offsetsAndMetadataToCommit = OffsetsAndGaps.toOffsetsAndMetadata(committable) - consumer.commitWithMetadata(offsetsAndMetadataToCommit).catchAll { _ => offsetsAndGaps.setCommittable(committable) } + consumer + .commitWithMetadata(offsetsAndMetadataToCommit) + .tap(_ => report(CommittedOffsetsAndMetadata(offsetsAndMetadataToCommit))) + .catchAll { t => + report(FailedToCommitOffsetsAndMetadata(t, offsetsAndMetadataToCommit)) *> offsetsAndGaps.setCommittable(committable) + } } private def commitOffsetsOnRebalance( @@ -396,6 +403,8 @@ object EventLoopMetric { attributes: Map[String, String] = Map.empty ) extends EventLoopMetric + case class SubmittedBatch(numSubmitted: Int, partition: TopicPartition, offsets: Iterable[Offset]) extends EventLoopMetric + case class FailedToUpdatePositions(t: Throwable, clientId: ClientId, attributes: Map[String, String] = Map.empty) extends EventLoopMetric case class FailedToFetchCommittedGaps(t: Throwable, clientId: ClientId, attributes: Map[String, String] = Map.empty) @@ -410,6 +419,13 @@ object EventLoopMetric { case class CreatingPollOnceFiber(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric case class AwaitingPartitionsAssignment(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric + + case class CommittedOffsetsAndMetadata(offsetsAndMetadata: Map[TopicPartition, OffsetAndMetadata]) extends EventLoopMetric + + case class FailedToCommitOffsetsAndMetadata(t: Throwable, offsetsAndMetadata: Map[TopicPartition, OffsetAndMetadata]) + extends EventLoopMetric + + case class HandledBatch(records: Records) extends EventLoopMetric } sealed trait EventLoopState