Skip to content

Commit

Permalink
[greyhound] expose internal kafka producer metrics (#34160)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: ab792e2227f389d5642802944a982f3a8d3587d8
  • Loading branch information
berman7 authored and wix-oss committed Sep 11, 2023
1 parent b90bbfa commit 305c732
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfi
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.{Metric, MetricName}
import zio.ZIO.attemptBlocking
import zio._

import scala.collection.JavaConverters._
import zio.ZIO.attemptBlocking
import zio.managed._

trait ProducerR[-R] { self =>
def metrics : UIO[Option[Map[MetricName, Metric]]] = ZIO.none

def produceAsync(record: ProducerRecord[Chunk[Byte], Chunk[Byte]])(
implicit trace: Trace
): ZIO[R, ProducerError, IO[ProducerError, RecordMetadata]]
Expand Down Expand Up @@ -80,6 +82,9 @@ object Producer {
val acquire = ZIO.attemptBlocking(new KafkaProducer(config.properties, serializer, serializer))
ZIO.acquireRelease(acquire)(producer => attemptBlocking(producer.close()).ignore).map { producer =>
new ProducerR[R] {
override def metrics: UIO[Option[Map[MetricName, Metric]]] =
ZIO.succeed(Option(producer.metrics().asScala.toMap))

private def recordFrom(record: ProducerRecord[Chunk[Byte], Chunk[Byte]]) =
new KafkaProducerRecord(
record.topic,
Expand Down Expand Up @@ -153,6 +158,8 @@ object ProducerR {

override def partitionsFor(topic: Topic)(implicit trace: Trace): RIO[Any, Seq[PartitionInfo]] =
producer.partitionsFor(topic).provideEnvironment(env)

override def metrics: UIO[Option[Map[MetricName, Metric]]] = producer.metrics
}
def onShutdown(onShutdown: => UIO[Unit])(implicit trace: Trace): ProducerR[R] = new ProducerR[R] {
override def produceAsync(
Expand All @@ -165,6 +172,8 @@ object ProducerR {
override def attributes: Map[String, String] = producer.attributes

override def partitionsFor(topic: Topic)(implicit trace: Trace) = producer.partitionsFor(topic)

override def metrics: UIO[Option[Map[MetricName, Metric]]] = producer.metrics
}

def tapBoth(onError: (Topic, Cause[ProducerError]) => URIO[R, Unit], onSuccess: RecordMetadata => URIO[R, Unit]) = new ProducerR[R] {
Expand All @@ -186,6 +195,8 @@ object ProducerR {
override def attributes: Map[String, String] = producer.attributes

override def partitionsFor(topic: Topic)(implicit trace: Trace) = producer.partitionsFor(topic)

override def metrics: UIO[Option[Map[MetricName, Metric]]] = producer.metrics
}

def map(f: ProducerRecord[Chunk[Byte], Chunk[Byte]] => ProducerRecord[Chunk[Byte], Chunk[Byte]]) = new ProducerR[R] {
Expand All @@ -199,6 +210,8 @@ object ProducerR {
override def shutdown(implicit trace: Trace): UIO[Unit] = producer.shutdown

override def attributes: Map[String, String] = producer.attributes

override def metrics: UIO[Option[Map[MetricName, Metric]]] = producer.metrics
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import com.wixpress.dst.greyhound.core.PartitionInfo
import com.wixpress.dst.greyhound.core.metrics.{GreyhoundMetric, GreyhoundMetrics}
import com.wixpress.dst.greyhound.core.producer.ProducerMetric._
import zio.{Chunk, IO, RIO, Trace, ULayer, ZIO}
import zio.{Chunk, IO, RIO, Trace, UIO, ULayer, ZIO}
import GreyhoundMetrics._
import org.apache.kafka.common.{Metric, MetricName}

import scala.concurrent.duration.FiniteDuration
import zio.Clock.currentTime

case class ReportingProducer[-R](internal: ProducerR[R], extraAttributes: Map[String, String]) extends ProducerR[GreyhoundMetrics with R] {


override def metrics: UIO[Option[Map[MetricName, Metric]]] = internal.metrics

override def produceAsync(
record: ProducerRecord[Chunk[Byte], Chunk[Byte]]
)(implicit trace: Trace): ZIO[GreyhoundMetrics with R, ProducerError, IO[ProducerError, RecordMetadata]] =
Expand Down

0 comments on commit 305c732

Please sign in to comment.