Skip to content

Commit

Permalink
[greyhound] parallel consumer - compression and encoding for gaps lim…
Browse files Browse the repository at this point in the history
…it (#35423)

GitOrigin-RevId: 10b761cb8fcd926178aebf586e99ed1a8829c734
  • Loading branch information
ben-wattelman authored and wix-oss committed Sep 8, 2023
1 parent 59fad3a commit b67f67e
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package(default_visibility = ["//visibility:public"])

# visibility is extended to allow packaging a jar to deploy to maven central
sources(["//core:__subpackages__"])

scala_library(
name = "compression",
srcs = [
":sources",
],
deps = [
"@org_apache_commons_commons_compress",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.wixpress.dst.greyhound.core.compression

import org.apache.commons.compress.utils.IOUtils

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.util.Try

object GzipCompression {
def compress(input: Array[Byte]): Array[Byte] = {
val bos = new ByteArrayOutputStream(input.length)
val gzip = new GZIPOutputStream(bos)
gzip.write(input)
gzip.close()
val compressed = bos.toByteArray
bos.close()
compressed
}

def decompress(compressed: Array[Byte]): Option[Array[Byte]] = {
val byteStream = new ByteArrayInputStream(compressed)
Try(new GZIPInputStream(byteStream))
.flatMap(gzipStream =>
Try {
val result = IOUtils.toByteArray(gzipStream)
gzipStream.close()
byteStream.close()
result
}.recover {
case e: Throwable =>
Try(gzipStream.close())
Try(byteStream.close())
e.printStackTrace()
throw e
}
)
.toOption
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ scala_library(
"@dev_zio_zio_stacktracer_2_12",
"//core/src/main/scala/com/wixpress/dst/greyhound/core",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/admin",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/compression",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/domain",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package com.wixpress.dst.greyhound.core.consumer

import com.wixpress.dst.greyhound.core.compression.GzipCompression
import com.wixpress.dst.greyhound.core.consumer.Gap.GAP_SEPARATOR
import com.wixpress.dst.greyhound.core.consumer.OffsetAndGaps.{GAPS_STRING_SEPARATOR, LAST_HANDLED_OFFSET_SEPARATOR}
import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, RecordTopicPartition}
import com.wixpress.dst.greyhound.core.{Offset, OffsetAndMetadata, TopicPartition}
import zio._

import java.util.Base64

trait OffsetsAndGaps {
def init(committedOffsets: Map[TopicPartition, OffsetAndGaps]): UIO[Unit]

Expand Down Expand Up @@ -104,11 +107,15 @@ object OffsetsAndGaps {
}

def toOffsetsAndMetadata(offsetsAndGaps: Map[TopicPartition, OffsetAndGaps]): Map[TopicPartition, OffsetAndMetadata] =
offsetsAndGaps.mapValues(offsetAndGaps =>
OffsetAndMetadata(offsetAndGaps.offset, offsetAndGaps.gapsString)
) // todo: add encoding and compression to plain gaps string

def parseGapsString(offsetAndGapsString: String): Option[OffsetAndGaps] = {
offsetsAndGaps.mapValues(offsetAndGaps => OffsetAndMetadata(offsetAndGaps.offset, offsetAndGaps.gapsString))

def parseGapsString(rawOffsetAndGapsString: String): Option[OffsetAndGaps] = {
val offsetAndGapsString =
if (rawOffsetAndGapsString.nonEmpty)
new String(
GzipCompression.decompress(Base64.getDecoder.decode(rawOffsetAndGapsString)).getOrElse(Array.empty)
)
else ""
val lastHandledOffsetSeparatorIndex = offsetAndGapsString.indexOf(LAST_HANDLED_OFFSET_SEPARATOR)
if (lastHandledOffsetSeparatorIndex < 0)
None
Expand Down Expand Up @@ -152,9 +159,8 @@ case class OffsetAndGaps(offset: Offset, gaps: Seq[Gap], committable: Boolean =
def markCommitted: OffsetAndGaps = copy(committable = false)

def gapsString: String = {
if (gaps.isEmpty) ""
else
s"${offset.toString}${LAST_HANDLED_OFFSET_SEPARATOR}${gaps.sortBy(_.start).mkString(GAPS_STRING_SEPARATOR)}"
val plainGapsString = s"${offset.toString}${LAST_HANDLED_OFFSET_SEPARATOR}${gaps.sortBy(_.start).mkString(GAPS_STRING_SEPARATOR)}"
Base64.getEncoder.encodeToString(GzipCompression.compress(plainGapsString.getBytes()))
}
}

Expand All @@ -167,5 +173,5 @@ object OffsetAndGaps {
def apply(offset: Offset, committable: Boolean): OffsetAndGaps = OffsetAndGaps(offset, Seq.empty[Gap], committable)

def gapsSize(gaps: Map[TopicPartition, OffsetAndGaps]): Int =
gaps.values.flatMap(_.gaps).map(_.size.toInt).sum
gaps.values.map(_.gaps.size).sum
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package(default_visibility = ["//visibility:public"])

sources()

specs2_unit_test(
name = "compression",
srcs = [
":sources",
],
deps = [
"//core/src/main/scala/com/wixpress/dst/greyhound/core",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/compression",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.wixpress.dst.greyhound.core.compression

import com.wixpress.dst.greyhound.core.testkit.BaseTestNoEnv

class GzipCompressionTest extends BaseTestNoEnv {
"GZIPCompressor" should {
"return None for bad input" in {
GzipCompression.decompress("not a gzip".toCharArray.map(_.toByte)) must beNone
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,16 @@ class OffsetsAndGapsTestGapsTest extends BaseTestNoEnv {
} yield current must havePairs(partition0 -> OffsetAndGaps(102L, Seq()), partition1 -> OffsetAndGaps(204L, Seq(Gap(201L, 202L))))
}

"parse gaps from string" in {
val gaps = Seq(s"10${LAST_HANDLED_OFFSET_SEPARATOR}0${GAP_SEPARATOR}1", s"10${LAST_HANDLED_OFFSET_SEPARATOR}", "")
val expected = Seq(Some(OffsetAndGaps(10, Seq(Gap(0, 1)))), Some(OffsetAndGaps(10, Seq())), None)
gaps.map(OffsetsAndGaps.parseGapsString).must(beEqualTo(expected))
"parse offsets and gaps correctly" in {
val offsetsAndGaps = Seq(OffsetAndGaps(10, Seq(Gap(0, 1))), OffsetAndGaps(10, Seq()))
val gapsStringsToTest = offsetsAndGaps.map(_.gapsString) ++ Seq("") // use gapsString method to get compressed and encoded strings
val expected = Seq(Some(OffsetAndGaps(10, Seq(Gap(0, 1)))), Some(OffsetAndGaps(10, Seq())), None)
gapsStringsToTest.map(OffsetsAndGaps.parseGapsString).must(beEqualTo(expected))
}

}

object OffsetGapsTest {
val topic = "some-topic"
val topic = "some-topic"
val topicPartition = TopicPartition(topic, 0)
}

0 comments on commit b67f67e

Please sign in to comment.