From b67f67edf511b0681b055442686d52643d7d6e1e Mon Sep 17 00:00:00 2001 From: Ben Wattelman <82799628+ben-wattelman@users.noreply.github.com> Date: Sun, 25 Jun 2023 18:17:27 +0300 Subject: [PATCH] [greyhound] parallel consumer - compression and encoding for gaps limit (#35423) GitOrigin-RevId: 10b761cb8fcd926178aebf586e99ed1a8829c734 --- .../greyhound/core/compression/BUILD.bazel | 14 +++++++ .../core/compression/GzipCompression.scala | 39 +++++++++++++++++++ .../dst/greyhound/core/consumer/BUILD.bazel | 1 + .../core/consumer/OffsetsAndGaps.scala | 24 +++++++----- .../greyhound/core/compression/BUILD.bazel | 15 +++++++ .../compression/GzipCompressionTest.scala | 11 ++++++ .../core/consumer/OffsetsAndGapsTest.scala | 11 +++--- 7 files changed, 101 insertions(+), 14 deletions(-) create mode 100644 core/src/main/scala/com/wixpress/dst/greyhound/core/compression/BUILD.bazel create mode 100644 core/src/main/scala/com/wixpress/dst/greyhound/core/compression/GzipCompression.scala create mode 100644 core/src/test/scala/com/wixpress/dst/greyhound/core/compression/BUILD.bazel create mode 100644 core/src/test/scala/com/wixpress/dst/greyhound/core/compression/GzipCompressionTest.scala diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/compression/BUILD.bazel b/core/src/main/scala/com/wixpress/dst/greyhound/core/compression/BUILD.bazel new file mode 100644 index 00000000..dedf42f3 --- /dev/null +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/compression/BUILD.bazel @@ -0,0 +1,14 @@ +package(default_visibility = ["//visibility:public"]) + +# visibility is extended to allow packaging a jar to deploy to maven central +sources(["//core:__subpackages__"]) + +scala_library( + name = "compression", + srcs = [ + ":sources", + ], + deps = [ + "@org_apache_commons_commons_compress", + ], +) diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/compression/GzipCompression.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/compression/GzipCompression.scala new file mode 100644 index 00000000..cc8c1191 --- /dev/null +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/compression/GzipCompression.scala @@ -0,0 +1,39 @@ +package com.wixpress.dst.greyhound.core.compression + +import org.apache.commons.compress.utils.IOUtils + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util.zip.{GZIPInputStream, GZIPOutputStream} +import scala.util.Try + +object GzipCompression { + def compress(input: Array[Byte]): Array[Byte] = { + val bos = new ByteArrayOutputStream(input.length) + val gzip = new GZIPOutputStream(bos) + gzip.write(input) + gzip.close() + val compressed = bos.toByteArray + bos.close() + compressed + } + + def decompress(compressed: Array[Byte]): Option[Array[Byte]] = { + val byteStream = new ByteArrayInputStream(compressed) + Try(new GZIPInputStream(byteStream)) + .flatMap(gzipStream => + Try { + val result = IOUtils.toByteArray(gzipStream) + gzipStream.close() + byteStream.close() + result + }.recover { + case e: Throwable => + Try(gzipStream.close()) + Try(byteStream.close()) + e.printStackTrace() + throw e + } + ) + .toOption + } +} diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/BUILD.bazel b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/BUILD.bazel index 007b00c7..584724fb 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/BUILD.bazel +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/BUILD.bazel @@ -14,6 +14,7 @@ scala_library( "@dev_zio_zio_stacktracer_2_12", "//core/src/main/scala/com/wixpress/dst/greyhound/core", "//core/src/main/scala/com/wixpress/dst/greyhound/core/admin", + "//core/src/main/scala/com/wixpress/dst/greyhound/core/compression", "//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/domain", "//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry", "//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics", diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala index 9255bcbb..d6070e9d 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala @@ -1,11 +1,14 @@ package com.wixpress.dst.greyhound.core.consumer +import com.wixpress.dst.greyhound.core.compression.GzipCompression import com.wixpress.dst.greyhound.core.consumer.Gap.GAP_SEPARATOR import com.wixpress.dst.greyhound.core.consumer.OffsetAndGaps.{GAPS_STRING_SEPARATOR, LAST_HANDLED_OFFSET_SEPARATOR} import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, RecordTopicPartition} import com.wixpress.dst.greyhound.core.{Offset, OffsetAndMetadata, TopicPartition} import zio._ +import java.util.Base64 + trait OffsetsAndGaps { def init(committedOffsets: Map[TopicPartition, OffsetAndGaps]): UIO[Unit] @@ -104,11 +107,15 @@ object OffsetsAndGaps { } def toOffsetsAndMetadata(offsetsAndGaps: Map[TopicPartition, OffsetAndGaps]): Map[TopicPartition, OffsetAndMetadata] = - offsetsAndGaps.mapValues(offsetAndGaps => - OffsetAndMetadata(offsetAndGaps.offset, offsetAndGaps.gapsString) - ) // todo: add encoding and compression to plain gaps string - - def parseGapsString(offsetAndGapsString: String): Option[OffsetAndGaps] = { + offsetsAndGaps.mapValues(offsetAndGaps => OffsetAndMetadata(offsetAndGaps.offset, offsetAndGaps.gapsString)) + + def parseGapsString(rawOffsetAndGapsString: String): Option[OffsetAndGaps] = { + val offsetAndGapsString = + if (rawOffsetAndGapsString.nonEmpty) + new String( + GzipCompression.decompress(Base64.getDecoder.decode(rawOffsetAndGapsString)).getOrElse(Array.empty) + ) + else "" val lastHandledOffsetSeparatorIndex = offsetAndGapsString.indexOf(LAST_HANDLED_OFFSET_SEPARATOR) if (lastHandledOffsetSeparatorIndex < 0) None @@ -152,9 +159,8 @@ case class OffsetAndGaps(offset: Offset, gaps: Seq[Gap], committable: Boolean = def markCommitted: OffsetAndGaps = copy(committable = false) def gapsString: String = { - if (gaps.isEmpty) "" - else - s"${offset.toString}${LAST_HANDLED_OFFSET_SEPARATOR}${gaps.sortBy(_.start).mkString(GAPS_STRING_SEPARATOR)}" + val plainGapsString = s"${offset.toString}${LAST_HANDLED_OFFSET_SEPARATOR}${gaps.sortBy(_.start).mkString(GAPS_STRING_SEPARATOR)}" + Base64.getEncoder.encodeToString(GzipCompression.compress(plainGapsString.getBytes())) } } @@ -167,5 +173,5 @@ object OffsetAndGaps { def apply(offset: Offset, committable: Boolean): OffsetAndGaps = OffsetAndGaps(offset, Seq.empty[Gap], committable) def gapsSize(gaps: Map[TopicPartition, OffsetAndGaps]): Int = - gaps.values.flatMap(_.gaps).map(_.size.toInt).sum + gaps.values.map(_.gaps.size).sum } diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/compression/BUILD.bazel b/core/src/test/scala/com/wixpress/dst/greyhound/core/compression/BUILD.bazel new file mode 100644 index 00000000..5447e093 --- /dev/null +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/compression/BUILD.bazel @@ -0,0 +1,15 @@ +package(default_visibility = ["//visibility:public"]) + +sources() + +specs2_unit_test( + name = "compression", + srcs = [ + ":sources", + ], + deps = [ + "//core/src/main/scala/com/wixpress/dst/greyhound/core", + "//core/src/main/scala/com/wixpress/dst/greyhound/core/compression", + "//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit", + ], +) diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/compression/GzipCompressionTest.scala b/core/src/test/scala/com/wixpress/dst/greyhound/core/compression/GzipCompressionTest.scala new file mode 100644 index 00000000..f8409feb --- /dev/null +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/compression/GzipCompressionTest.scala @@ -0,0 +1,11 @@ +package com.wixpress.dst.greyhound.core.compression + +import com.wixpress.dst.greyhound.core.testkit.BaseTestNoEnv + +class GzipCompressionTest extends BaseTestNoEnv { + "GZIPCompressor" should { + "return None for bad input" in { + GzipCompression.decompress("not a gzip".toCharArray.map(_.toByte)) must beNone + } + } +} diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGapsTest.scala b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGapsTest.scala index 2a3837ca..f21c194b 100644 --- a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGapsTest.scala +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGapsTest.scala @@ -70,15 +70,16 @@ class OffsetsAndGapsTestGapsTest extends BaseTestNoEnv { } yield current must havePairs(partition0 -> OffsetAndGaps(102L, Seq()), partition1 -> OffsetAndGaps(204L, Seq(Gap(201L, 202L)))) } - "parse gaps from string" in { - val gaps = Seq(s"10${LAST_HANDLED_OFFSET_SEPARATOR}0${GAP_SEPARATOR}1", s"10${LAST_HANDLED_OFFSET_SEPARATOR}", "") - val expected = Seq(Some(OffsetAndGaps(10, Seq(Gap(0, 1)))), Some(OffsetAndGaps(10, Seq())), None) - gaps.map(OffsetsAndGaps.parseGapsString).must(beEqualTo(expected)) + "parse offsets and gaps correctly" in { + val offsetsAndGaps = Seq(OffsetAndGaps(10, Seq(Gap(0, 1))), OffsetAndGaps(10, Seq())) + val gapsStringsToTest = offsetsAndGaps.map(_.gapsString) ++ Seq("") // use gapsString method to get compressed and encoded strings + val expected = Seq(Some(OffsetAndGaps(10, Seq(Gap(0, 1)))), Some(OffsetAndGaps(10, Seq())), None) + gapsStringsToTest.map(OffsetsAndGaps.parseGapsString).must(beEqualTo(expected)) } } object OffsetGapsTest { - val topic = "some-topic" + val topic = "some-topic" val topicPartition = TopicPartition(topic, 0) }