Skip to content

Commit

Permalink
gh RecordConsumer - add visibility to extra properties setup with aut…
Browse files Browse the repository at this point in the history
…o.offset.reset (#35848)

GitOrigin-RevId: 7b878399acd1e25003ccae071e817ed22b2fba21
  • Loading branch information
natansil authored and wix-oss committed Sep 3, 2023
1 parent 975c135 commit b69b93b
Showing 1 changed file with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,23 @@ object RecordConsumer {
* concurrent between partitions; order is guaranteed to be maintained within the same partition.
*/
def make[R, E](
config: RecordConsumerConfig,
handler: RecordHandler[R, E, Chunk[Byte], Chunk[Byte]],
createConsumerOverride: Option[ConsumerConfig => RIO[GreyhoundMetrics with Scope, Consumer]] = None
config: RecordConsumerConfig,
handler: RecordHandler[R, E, Chunk[Byte], Chunk[Byte]],
createConsumerOverride: Option[ConsumerConfig => RIO[GreyhoundMetrics with Scope, Consumer]] = None
)(implicit trace: Trace, tag: Tag[Env]): ZIO[R with Env with Scope with GreyhoundMetrics, Throwable, RecordConsumer[R with Env]] =
ZIO
.acquireRelease(
for {
consumerShutdown <- AwaitShutdown.make
_ <- GreyhoundMetrics
.report(CreatingConsumer(config.clientId, config.group, config.bootstrapServers, config.consumerAttributes))
.report(
CreatingConsumer(
config.clientId,
config.group,
config.bootstrapServers,
config.consumerAttributes ++ config.extraProperties
)
)

_ <- validateRetryPolicy(config)
consumerSubscriptionRef <- Ref.make[ConsumerSubscription](config.initialSubscription)
Expand Down

0 comments on commit b69b93b

Please sign in to comment.