Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix build breakage due to spotlessScalaCheck failures on Confluent's streams-scala code #502

Merged
merged 1 commit into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,17 @@ object ImplicitConversions {
implicit def groupedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Grouped[K, V] =
Grouped.`with`[K, V]

implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K],
valueSerde: Serde[V],
otherValueSerde: Serde[VO]): Joined[K, V, VO] =
implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit
keySerde: Serde[K],
valueSerde: Serde[V],
otherValueSerde: Serde[VO]
): Joined[K, V, VO] =
Joined.`with`[K, V, VO]

implicit def materializedFromSerde[K, V, S <: StateStore](implicit keySerde: Serde[K],
valueSerde: Serde[V]): Materialized[K, V, S] =
implicit def materializedFromSerde[K, V, S <: StateStore](implicit
keySerde: Serde[K],
valueSerde: Serde[V]
): Materialized[K, V, S] =
Materialized.`with`[K, V, S]

implicit def producedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K, V] =
Expand All @@ -95,8 +99,10 @@ object ImplicitConversions {
implicit def repartitionedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Repartitioned[K, V] =
Repartitioned.`with`[K, V]

implicit def streamJoinFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K],
valueSerde: Serde[V],
otherValueSerde: Serde[VO]): StreamJoined[K, V, VO] =
implicit def streamJoinFromKeyValueOtherSerde[K, V, VO](implicit
keySerde: Serde[K],
valueSerde: Serde[V],
otherValueSerde: Serde[VO]
): StreamJoined[K, V, VO] =
StreamJoined.`with`[K, V, VO]
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ object Serdes {
}
)

def fromFn[T >: Null](serializer: (String, T) => Array[Byte],
deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
def fromFn[T >: Null](
serializer: (String, T) => Array[Byte],
deserializer: (String, Array[Byte]) => Option[T]
): Serde[T] =
JSerdes.serdeFrom(
new Serializer[T] {
override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
* @see #table(String)
* @see `org.apache.kafka.streams.StreamsBuilder#table`
*/
def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
implicit consumed: Consumed[K, V]
def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(implicit
consumed: Consumed[K, V]
): KTable[K, V] =
new KTable(inner.table[K, V](topic, consumed, materialized))

Expand All @@ -146,8 +146,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
* @return a `GlobalKTable` for the specified topic
* @see `org.apache.kafka.streams.StreamsBuilder#globalTable`
*/
def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
implicit consumed: Consumed[K, V]
def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(implicit
consumed: Consumed[K, V]
): GlobalKTable[K, V] =
inner.globalTable(topic, consumed, materialized)

Expand Down Expand Up @@ -177,10 +177,12 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
"Use #addGlobalStore(StoreBuilder, String, Consumed, org.apache.kafka.streams.processor.api.ProcessorSupplier) instead.",
"2.7.0"
)
def addGlobalStore[K, V](storeBuilder: StoreBuilder[_ <: StateStore],
topic: String,
consumed: Consumed[K, V],
stateUpdateSupplier: ProcessorSupplier[K, V]): StreamsBuilderJ =
def addGlobalStore[K, V](
storeBuilder: StoreBuilder[_ <: StateStore],
topic: String,
consumed: Consumed[K, V],
stateUpdateSupplier: ProcessorSupplier[K, V]
): StreamsBuilderJ =
inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class BranchedKStream[K, V](val inner: BranchedKStreamJ[K, V]) {
def noDefaultBranch(): Map[String, KStream[K, V]] = toScalaMap(inner.noDefaultBranch())

private def toScalaMap(m: util.Map[String, kstream.KStream[K, V]]): collection.immutable.Map[String, KStream[K, V]] =
m.asScala.map {
case (name, kStreamJ) => (name, new KStream(kStreamJ))
m.asScala.map { case (name, kStreamJ) =>
(name, new KStream(kStreamJ))
}.toMap
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) {
* @param aggregator a function that computes a new aggregate result
* @return a [[CogroupedKStream]]
*/
def cogroup[VIn](groupedStream: KGroupedStream[KIn, VIn],
aggregator: (KIn, VIn, VOut) => VOut): CogroupedKStream[KIn, VOut] =
def cogroup[VIn](
groupedStream: KGroupedStream[KIn, VIn],
aggregator: (KIn, VIn, VOut) => VOut
): CogroupedKStream[KIn, VOut] =
new CogroupedKStream(inner.cogroup(groupedStream.inner, aggregator.asAggregator))

/**
Expand All @@ -58,8 +60,8 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) {
* (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.CogroupedKStream#aggregate`
*/
def aggregate(initializer: => VOut)(
implicit materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore]
def aggregate(initializer: => VOut)(implicit
materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore]
): KTable[KIn, VOut] = new KTable(inner.aggregate((() => initializer).asInitializer, materialized))

/**
Expand All @@ -74,8 +76,8 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) {
* (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.CogroupedKStream#aggregate`
*/
def aggregate(initializer: => VOut, named: Named)(
implicit materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore]
def aggregate(initializer: => VOut, named: Named)(implicit
materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore]
): KTable[KIn, VOut] = new KTable(inner.aggregate((() => initializer).asInitializer, named, materialized))

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ object Consumed {
* @tparam V value type
* @return a new instance of [[Consumed]]
*/
def `with`[K, V](timestampExtractor: TimestampExtractor)(implicit keySerde: Serde[K],
valueSerde: Serde[V]): ConsumedJ[K, V] =
def `with`[K, V](
timestampExtractor: TimestampExtractor
)(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
ConsumedJ.`with`(timestampExtractor).withKeySerde(keySerde).withValueSerde(valueSerde)

/**
Expand All @@ -73,7 +74,8 @@ object Consumed {
* @param resetPolicy the offset reset policy to be used. If `null` the default reset policy from config will be used
* @return a new instance of [[Consumed]]
*/
def `with`[K, V](resetPolicy: Topology.AutoOffsetReset)(implicit keySerde: Serde[K],
valueSerde: Serde[V]): ConsumedJ[K, V] =
def `with`[K, V](
resetPolicy: Topology.AutoOffsetReset
)(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ object Joined {
* @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used
* @return new [[org.apache.kafka.streams.kstream.Joined]] instance with the provided serdes
*/
def `with`[K, V, VO](implicit keySerde: Serde[K],
valueSerde: Serde[V],
otherValueSerde: Serde[VO]): JoinedJ[K, V, VO] =
def `with`[K, V, VO](implicit
keySerde: Serde[K],
valueSerde: Serde[V],
otherValueSerde: Serde[VO]
): JoinedJ[K, V, VO] =
JoinedJ.`with`(keySerde, valueSerde, otherValueSerde)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
*/
def reduce(reducer: (V, V) => V,
named: Named)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
def reduce(reducer: (V, V) => V, named: Named)(implicit
materialized: Materialized[K, V, ByteArrayKeyValueStore]
): KTable[K, V] =
new KTable(inner.reduce(reducer.asReducer, materialized))

/**
Expand All @@ -125,8 +126,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
*/
def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(
implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(implicit
materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
new KTable(inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized))

Expand All @@ -141,8 +142,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
*/
def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR)(
implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR)(implicit
materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
new KTable(inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, named, materialized))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce`
*/
def reduce(adder: (V, V) => V,
subtractor: (V, V) => V)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
def reduce(adder: (V, V) => V, subtractor: (V, V) => V)(implicit
materialized: Materialized[K, V, ByteArrayKeyValueStore]
): KTable[K, V] =
new KTable(inner.reduce(adder.asReducer, subtractor.asReducer, materialized))

/**
Expand All @@ -92,8 +93,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce`
*/
def reduce(adder: (V, V) => V, subtractor: (V, V) => V, named: Named)(
implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]
def reduce(adder: (V, V) => V, subtractor: (V, V) => V, named: Named)(implicit
materialized: Materialized[K, V, ByteArrayKeyValueStore]
): KTable[K, V] =
new KTable(inner.reduce(adder.asReducer, subtractor.asReducer, named, materialized))

Expand All @@ -109,8 +110,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
*/
def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(
implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(implicit
materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
new KTable(
inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator, materialized)
Expand All @@ -129,14 +130,16 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
*/
def aggregate[VR](initializer: => VR, named: Named)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(
implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
def aggregate[VR](initializer: => VR, named: Named)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(implicit
materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
new KTable(
inner.aggregate((() => initializer).asInitializer,
adder.asAggregator,
subtractor.asAggregator,
named,
materialized)
inner.aggregate(
(() => initializer).asInitializer,
adder.asAggregator,
subtractor.asAggregator,
named,
materialized
)
)
}
Loading
Loading