Skip to content

Commit

Permalink
Merge pull request #37 from ElifBayrakdar/feature/issue-36
Browse files Browse the repository at this point in the history
#36 Add Acks to ProducerConfig if exists in configuration as Producer
  • Loading branch information
MehmetFiratKomurcu authored Feb 9, 2024
2 parents ae7d973 + affca85 commit d49097c
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
name: Build & push Docker image
with:
image: kafka-retry-job
tags: 1.12.0, latest
tags: 1.12.1, latest
registry: ghcr.io
username: ${{ secrets.GHCR_USERNAME }}
password: ${{ secrets.GHCR_TOKEN }}
Expand Down
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

variables:
VERSION: "1.12.0"
VERSION: "1.12.1"
DOCKER_IMAGE_VERSION: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH:$VERSION
DOCKER_IMAGE_LATEST: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH

Expand Down
11 changes: 11 additions & 0 deletions CHANGELOG.MD
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## [1.12.1](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-02)

**Closed issues:**

- [\#36](https://github.com/Trendyol/kafka-retry-job/issues/36) Add Acks to ProducerConfig if exists in configuration as ProducerAcks

**Merged pull requests:**

- Pull Request for the issues #36 [\#37](https://github.com/Trendyol/kafka-retry-job/pull/37) ([elifbayrakdar](https://github.com/elifbayrakdar))


## [1.12.0](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-01)

**Closed issues:**
Expand Down
3 changes: 0 additions & 3 deletions src/Helpers/Constants.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
using Confluent.Kafka;

namespace KafkaRetry.Job.Helpers;

public static class Constants
{
public static class ProducerConfigDefaults
{
public const bool EnableIdempotence = true;
public const Acks Acks = Confluent.Kafka.Acks.Leader;
public const int BatchSize = 1000000;
public const string ClientId = "rdkafka";
public const double LingerMs = 5;
Expand Down
2 changes: 1 addition & 1 deletion src/Services/Implementations/ConfigurationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public ConfigurationService(IConfiguration configuration)
public bool EnableIdempotence => GetValue<bool?>("ProducerEnableIdempotence") ??
Constants.ProducerConfigDefaults.EnableIdempotence;

public Acks? Acks => GetValue<Acks?>("ProducerAcks") ?? Constants.ProducerConfigDefaults.Acks;
public Acks? Acks => GetValue<Acks?>("ProducerAcks");

public int BatchSize =>
GetValue<int?>("ProducerBatchSize") ?? Constants.ProducerConfigDefaults.BatchSize;
Expand Down
9 changes: 7 additions & 2 deletions src/Services/Implementations/KafkaService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private AdminClientConfig CreateAdminClientConfig(string bootstrapServers)

private ProducerConfig CreateProducerConfig(string bootstrapServers)
{
return new ProducerConfig
var producerConfig = new ProducerConfig
{
BootstrapServers = bootstrapServers,
SaslUsername = _configuration.SaslUsername ?? string.Empty,
Expand All @@ -67,14 +67,19 @@ private ProducerConfig CreateProducerConfig(string bootstrapServers)
SecurityProtocol = _configuration.SecurityProtocol,
SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty,
EnableIdempotence = _configuration.EnableIdempotence,
Acks = _configuration.Acks,
BatchSize = _configuration.BatchSize,
ClientId = _configuration.ClientId,
LingerMs = _configuration.LingerMs,
MessageTimeoutMs = _configuration.MessageTimeoutMs,
RequestTimeoutMs = _configuration.RequestTimeoutMs,
MessageMaxBytes = _configuration.MessageMaxBytes
};

if (_configuration.Acks is not null)
{
producerConfig.Acks = _configuration.Acks;
}
return producerConfig;
}

private ConsumerConfig CreateConsumerConfig(string bootstrapServers, string groupId)
Expand Down

0 comments on commit d49097c

Please sign in to comment.