From e5e8cea9df148494197ddf9879ff83efe813c0b0 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Thu, 29 Feb 2024 09:27:04 +0300 Subject: [PATCH 1/6] #15 fix integration tests --- tests/Containers/KafkaContainer.cs | 10 ++++------ tests/Containers/ZookeeperContainer.cs | 9 ++++----- tests/kafka.retry.job.tests.csproj | 2 +- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/tests/Containers/KafkaContainer.cs b/tests/Containers/KafkaContainer.cs index 0c430aa..261c84c 100644 --- a/tests/Containers/KafkaContainer.cs +++ b/tests/Containers/KafkaContainer.cs @@ -1,17 +1,15 @@ using System; using System.IO; using System.Threading.Tasks; -using DotNet.Testcontainers.Containers.Builders; -using DotNet.Testcontainers.Containers.Modules; -using DotNet.Testcontainers.Containers.OutputConsumers; -using DotNet.Testcontainers.Containers.WaitStrategies; +using DotNet.Testcontainers.Builders; +using DotNet.Testcontainers.Containers; namespace KafkaRetry.Job.Tests.Containers { public class KafkaContainer { private const int Port = 9092; - private readonly TestcontainersContainer _container; + private readonly IContainer _container; private readonly Stream _outStream = new MemoryStream(); private readonly Stream _errorStream = new MemoryStream(); @@ -24,7 +22,7 @@ public KafkaContainer(string zookeeperAddress) dockerHost = "unix:/var/run/docker.sock"; } - _container = new TestcontainersBuilder() + _container = new ContainerBuilder() .WithDockerEndpoint(dockerHost) .WithImage("confluentinc/cp-kafka:6.0.1") .WithExposedPort(Port) diff --git a/tests/Containers/ZookeeperContainer.cs b/tests/Containers/ZookeeperContainer.cs index 3768f39..ddae5df 100644 --- a/tests/Containers/ZookeeperContainer.cs +++ b/tests/Containers/ZookeeperContainer.cs @@ -1,8 +1,7 @@ using System; using System.Threading.Tasks; -using DotNet.Testcontainers.Containers.Builders; -using DotNet.Testcontainers.Containers.Modules; -using DotNet.Testcontainers.Containers.WaitStrategies; +using DotNet.Testcontainers.Builders; +using DotNet.Testcontainers.Containers; using DotNet.Testcontainers.Images; namespace KafkaRetry.Job.Tests.Containers @@ -10,7 +9,7 @@ namespace KafkaRetry.Job.Tests.Containers public class ZookeeperContainer { private const int Port = 2181; - private readonly TestcontainersContainer _container; + private readonly IContainer _container; public string Address => $"{_container.IpAddress}:{Port}"; @@ -22,7 +21,7 @@ public ZookeeperContainer() dockerHost = "unix:/var/run/docker.sock"; } - _container = new TestcontainersBuilder() + _container = new ContainerBuilder() .WithDockerEndpoint(dockerHost) .WithImage(new DockerImage("zookeeper")) .WithExposedPort(Port) diff --git a/tests/kafka.retry.job.tests.csproj b/tests/kafka.retry.job.tests.csproj index 3e05128..450fd94 100644 --- a/tests/kafka.retry.job.tests.csproj +++ b/tests/kafka.retry.job.tests.csproj @@ -14,7 +14,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - + From 6c5e968ad988575f5983aeb8797c6359f8e00887 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Thu, 29 Feb 2024 11:13:55 +0300 Subject: [PATCH 2/6] #35 read configuration using Builder Pattern --- .../KafkaConfigs/ClientConfigBuilder.cs | 66 +++++++++++++ .../KafkaConfigs/ConsumerConfigBuilder.cs | 30 ++++++ .../KafkaConfigs/ProducerConfigBuilder.cs | 36 +++++++ src/Services/Implementations/KafkaService.cs | 94 ++++++++++--------- 4 files changed, 181 insertions(+), 45 deletions(-) create mode 100644 src/Helpers/KafkaConfigs/ClientConfigBuilder.cs create mode 100644 src/Helpers/KafkaConfigs/ConsumerConfigBuilder.cs create mode 100644 src/Helpers/KafkaConfigs/ProducerConfigBuilder.cs diff --git a/src/Helpers/KafkaConfigs/ClientConfigBuilder.cs b/src/Helpers/KafkaConfigs/ClientConfigBuilder.cs new file mode 100644 index 0000000..d748e35 --- /dev/null +++ b/src/Helpers/KafkaConfigs/ClientConfigBuilder.cs @@ -0,0 +1,66 @@ +using Confluent.Kafka; + +namespace KafkaRetry.Job.Helpers.KafkaConfigs; + +public static class ClientConfigBuilder +{ + public static ClientConfig WithBootstrapServers(this ClientConfig config, string bootstrapServers) + { + config.BootstrapServers = bootstrapServers; + return config; + } + + public static ClientConfig WithSaslUsername(this ClientConfig config, string username) + { + config.SaslUsername = username; + return config; + } + + public static ClientConfig WithSaslPassword(this ClientConfig config, string password) + { + config.SaslPassword = password; + return config; + } + + public static ClientConfig WithSslCaLocation(this ClientConfig config, string sslCaLocation) + { + config.SslCaLocation = sslCaLocation; + return config; + } + + public static ClientConfig WithSaslMechanism(this ClientConfig config, SaslMechanism? saslMechanism) + { + config.SaslMechanism = saslMechanism; + return config; + } + + public static ClientConfig WithSecurityProtocol(this ClientConfig config, SecurityProtocol? securityProtocol) + { + config.SecurityProtocol = securityProtocol; + return config; + } + + public static ClientConfig WithSslKeystorePassword(this ClientConfig config, string sslKeystorePassword) + { + config.SslKeystorePassword = sslKeystorePassword; + return config; + } + + public static ClientConfig WithClientId(this ClientConfig config, string clientId) + { + config.ClientId = clientId; + return config; + } + + public static ClientConfig WithMessageMaxBytes(this ClientConfig config, int? messageMaxBytes) + { + config.MessageMaxBytes = messageMaxBytes; + return config; + } + + public static ClientConfig WithAcks(this ClientConfig config, Acks? acks) + { + config.Acks = acks; + return config; + } +} \ No newline at end of file diff --git a/src/Helpers/KafkaConfigs/ConsumerConfigBuilder.cs b/src/Helpers/KafkaConfigs/ConsumerConfigBuilder.cs new file mode 100644 index 0000000..27b16f3 --- /dev/null +++ b/src/Helpers/KafkaConfigs/ConsumerConfigBuilder.cs @@ -0,0 +1,30 @@ +using Confluent.Kafka; + +namespace KafkaRetry.Job.Helpers.KafkaConfigs; + +public static class ConsumerConfigBuilder +{ + public static ConsumerConfig WithAutoOffsetReset(this ConsumerConfig config, AutoOffsetReset? autoOffsetReset) + { + config.AutoOffsetReset = autoOffsetReset; + return config; + } + + public static ConsumerConfig WithGroupId(this ConsumerConfig config, string groupId) + { + config.GroupId = groupId; + return config; + } + + public static ConsumerConfig WithEnableAutoCommit(this ConsumerConfig config, bool? autoCommit) + { + config.EnableAutoCommit = autoCommit; + return config; + } + + public static ConsumerConfig WithEnableAutoOffsetStore(this ConsumerConfig config, bool? autoOffsetStore) + { + config.EnableAutoOffsetStore = autoOffsetStore; + return config; + } +} \ No newline at end of file diff --git a/src/Helpers/KafkaConfigs/ProducerConfigBuilder.cs b/src/Helpers/KafkaConfigs/ProducerConfigBuilder.cs new file mode 100644 index 0000000..115fca1 --- /dev/null +++ b/src/Helpers/KafkaConfigs/ProducerConfigBuilder.cs @@ -0,0 +1,36 @@ +using Confluent.Kafka; + +namespace KafkaRetry.Job.Helpers.KafkaConfigs; + +public static class ProducerConfigBuilder +{ + public static ProducerConfig WithEnableIdempotence(this ProducerConfig config, bool? idempotence) + { + config.EnableIdempotence = idempotence; + return config; + } + + public static ProducerConfig WithBatchSize(this ProducerConfig config, int? batchSize) + { + config.BatchSize = batchSize; + return config; + } + + public static ProducerConfig WithLingerMs(this ProducerConfig config, double? lingerMs) + { + config.LingerMs = lingerMs; + return config; + } + + public static ProducerConfig WithMessageTimeoutMs(this ProducerConfig config, int? messageTimeoutMs) + { + config.MessageTimeoutMs = messageTimeoutMs; + return config; + } + + public static ProducerConfig WithRequestTimeoutMs(this ProducerConfig config, int? requestTimeoutMs) + { + config.RequestTimeoutMs = requestTimeoutMs; + return config; + } +} \ No newline at end of file diff --git a/src/Services/Implementations/KafkaService.cs b/src/Services/Implementations/KafkaService.cs index 67df77c..14cc884 100644 --- a/src/Services/Implementations/KafkaService.cs +++ b/src/Services/Implementations/KafkaService.cs @@ -1,5 +1,6 @@ using System; using Confluent.Kafka; +using KafkaRetry.Job.Helpers.KafkaConfigs; using KafkaRetry.Job.Services.Interfaces; namespace KafkaRetry.Job.Services.Implementations; @@ -41,63 +42,66 @@ public IAdminClient BuildAdminClient() return adminClientBuilder.Build(); } - private AdminClientConfig CreateAdminClientConfig(string bootstrapServers) + private ClientConfig CreateClientConfig(string bootstrapServers) { - return new AdminClientConfig + ClientConfig clientConfig = new ClientConfig() + .WithBootstrapServers(bootstrapServers) + .WithClientId(_configuration.ClientId) + .WithMessageMaxBytes(_configuration.MessageMaxBytes); + + if (_configuration.SaslMechanism is not null) + { + clientConfig = clientConfig + .WithSaslUsername(_configuration.SaslUsername) + .WithSaslPassword(_configuration.SaslPassword) + .WithSslCaLocation(_configuration.SslCaLocation) + .WithSaslMechanism(_configuration.SaslMechanism) + .WithSecurityProtocol(_configuration.SecurityProtocol) + .WithSslKeystorePassword(_configuration.SslKeystorePassword); + } + + if (_configuration.Acks is not null) { - BootstrapServers = bootstrapServers, - SaslUsername = _configuration.SaslUsername ?? string.Empty, - SaslPassword = _configuration.SaslPassword ?? string.Empty, - SslCaLocation = _configuration.SslCaLocation ?? string.Empty, - SaslMechanism = _configuration.SaslMechanism, - SecurityProtocol = _configuration.SecurityProtocol, - SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty - }; + clientConfig = clientConfig + .WithAcks(_configuration.Acks); + } + + return clientConfig; + } + + private AdminClientConfig CreateAdminClientConfig(string bootstrapServers) + { + ClientConfig clientConfig = CreateClientConfig(bootstrapServers); + return new AdminClientConfig(clientConfig); } private ProducerConfig CreateProducerConfig(string bootstrapServers) { - var producerConfig = new ProducerConfig - { - BootstrapServers = bootstrapServers, - SaslUsername = _configuration.SaslUsername ?? string.Empty, - SaslPassword = _configuration.SaslPassword ?? string.Empty, - SslCaLocation = _configuration.SslCaLocation ?? string.Empty, - SaslMechanism = _configuration.SaslMechanism, - SecurityProtocol = _configuration.SecurityProtocol, - SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty, - EnableIdempotence = _configuration.EnableIdempotence, - BatchSize = _configuration.BatchSize, - ClientId = _configuration.ClientId, - LingerMs = _configuration.LingerMs, - MessageTimeoutMs = _configuration.MessageTimeoutMs, - RequestTimeoutMs = _configuration.RequestTimeoutMs, - MessageMaxBytes = _configuration.MessageMaxBytes - }; + ClientConfig clientConfig = CreateClientConfig(bootstrapServers); + ProducerConfig producerConfig = new ProducerConfig(clientConfig); - if (_configuration.Acks is not null) - { - producerConfig.Acks = _configuration.Acks; - } + producerConfig = producerConfig + .WithEnableIdempotence(_configuration.EnableIdempotence) + .WithBatchSize(_configuration.BatchSize) + .WithLingerMs(_configuration.LingerMs) + .WithMessageTimeoutMs(_configuration.MessageTimeoutMs) + .WithRequestTimeoutMs(_configuration.RequestTimeoutMs); + return producerConfig; } private ConsumerConfig CreateConsumerConfig(string bootstrapServers, string groupId) { - return new ConsumerConfig - { - BootstrapServers = bootstrapServers, - AutoOffsetReset = AutoOffsetReset.Earliest, - GroupId = groupId, - EnableAutoCommit = _configuration.EnableAutoCommit, - SaslUsername = _configuration.SaslUsername ?? string.Empty, - SaslPassword = _configuration.SaslPassword ?? string.Empty, - SslCaLocation = _configuration.SslCaLocation ?? string.Empty, - SaslMechanism = _configuration.SaslMechanism, - SecurityProtocol = _configuration.SecurityProtocol, - SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty, - EnableAutoOffsetStore = _configuration.EnableAutoOffsetStore - }; + ClientConfig clientConfig = CreateClientConfig(bootstrapServers); + ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig); + + consumerConfig = consumerConfig + .WithGroupId(groupId) + .WithAutoOffsetReset(AutoOffsetReset.Earliest) + .WithEnableAutoCommit(_configuration.EnableAutoCommit) + .WithEnableAutoOffsetStore(_configuration.EnableAutoOffsetStore); + + return consumerConfig; } public Action, ConsumeResult> GetConsumerCommitStrategy() From b0ba054d569acc0904da6bf34efa16eb58dd6ce6 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Thu, 29 Feb 2024 11:30:00 +0300 Subject: [PATCH 3/6] #35 update changelog --- CHANGELOG.MD | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/CHANGELOG.MD b/CHANGELOG.MD index 8504857..30b0132 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -1,5 +1,16 @@ # Changelog +## [1.12.2](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-03) + +**Closed issues:** + +- [\#15](https://github.com/Trendyol/kafka-retry-job/issues/15) Fix integration tests +- [\#35](https://github.com/Trendyol/kafka-retry-job/issues/35) Use Builder pattern to read configurations + +**Merged pull requests:** + +- Pull Request for the issues #15 #35 ([ahmetfurkankavraz](https://github.com/ahmetfurkankavraz)) + ## [1.12.1](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-02) **Closed issues:** From da020b214ed4f7ebcaeabea3237fb365ecb5d414 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Thu, 29 Feb 2024 11:30:58 +0300 Subject: [PATCH 4/6] #35 update changelog --- CHANGELOG.MD | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.MD b/CHANGELOG.MD index 30b0132..aa9ba71 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -9,7 +9,7 @@ **Merged pull requests:** -- Pull Request for the issues #15 #35 ([ahmetfurkankavraz](https://github.com/ahmetfurkankavraz)) +- Pull Request for the issues #15 [\#35](https://github.com/Trendyol/kafka-retry-job/pull/38) ([ahmetfurkankavraz](https://github.com/ahmetfurkankavraz)) ## [1.12.1](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-02) From 1d52589d4df3fdf35fbaca1040db2a29d14801a4 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Tue, 12 Mar 2024 14:02:52 +0300 Subject: [PATCH 5/6] #35 update builder pattern --- .../KafkaConfigs/ClientConfigBuilder.cs | 50 +++++++++++++++---- .../KafkaConfigs/ConsumerConfigBuilder.cs | 20 ++++++-- .../KafkaConfigs/ProducerConfigBuilder.cs | 20 ++++++-- .../Implementations/ConfigurationService.cs | 38 ++++---------- src/Services/Implementations/KafkaService.cs | 28 ++++------- 5 files changed, 92 insertions(+), 64 deletions(-) diff --git a/src/Helpers/KafkaConfigs/ClientConfigBuilder.cs b/src/Helpers/KafkaConfigs/ClientConfigBuilder.cs index d748e35..64b1223 100644 --- a/src/Helpers/KafkaConfigs/ClientConfigBuilder.cs +++ b/src/Helpers/KafkaConfigs/ClientConfigBuilder.cs @@ -6,61 +6,91 @@ public static class ClientConfigBuilder { public static ClientConfig WithBootstrapServers(this ClientConfig config, string bootstrapServers) { - config.BootstrapServers = bootstrapServers; + if (!string.IsNullOrWhiteSpace(bootstrapServers)) + { + config.BootstrapServers = bootstrapServers; + } return config; } public static ClientConfig WithSaslUsername(this ClientConfig config, string username) { - config.SaslUsername = username; + if (!string.IsNullOrWhiteSpace(username)) + { + config.SaslUsername = username; + } return config; } public static ClientConfig WithSaslPassword(this ClientConfig config, string password) { - config.SaslPassword = password; + if (!string.IsNullOrWhiteSpace(password)) + { + config.SaslPassword = password; + } return config; } public static ClientConfig WithSslCaLocation(this ClientConfig config, string sslCaLocation) { - config.SslCaLocation = sslCaLocation; + if (!string.IsNullOrWhiteSpace(sslCaLocation)) + { + config.SslCaLocation = sslCaLocation; + } return config; } public static ClientConfig WithSaslMechanism(this ClientConfig config, SaslMechanism? saslMechanism) { - config.SaslMechanism = saslMechanism; + if (saslMechanism is not null) + { + config.SaslMechanism = saslMechanism; + } return config; } public static ClientConfig WithSecurityProtocol(this ClientConfig config, SecurityProtocol? securityProtocol) { - config.SecurityProtocol = securityProtocol; + if (securityProtocol is not null) + { + config.SecurityProtocol = securityProtocol; + } return config; } public static ClientConfig WithSslKeystorePassword(this ClientConfig config, string sslKeystorePassword) { - config.SslKeystorePassword = sslKeystorePassword; + if (!string.IsNullOrWhiteSpace(sslKeystorePassword)) + { + config.SslKeystorePassword = sslKeystorePassword; + } return config; } public static ClientConfig WithClientId(this ClientConfig config, string clientId) { - config.ClientId = clientId; + if (!string.IsNullOrWhiteSpace(clientId)) + { + config.ClientId = clientId; + } return config; } public static ClientConfig WithMessageMaxBytes(this ClientConfig config, int? messageMaxBytes) { - config.MessageMaxBytes = messageMaxBytes; + if (messageMaxBytes is not null) + { + config.MessageMaxBytes = messageMaxBytes; + } return config; } public static ClientConfig WithAcks(this ClientConfig config, Acks? acks) { - config.Acks = acks; + if (acks is not null) + { + config.Acks = acks; + } return config; } } \ No newline at end of file diff --git a/src/Helpers/KafkaConfigs/ConsumerConfigBuilder.cs b/src/Helpers/KafkaConfigs/ConsumerConfigBuilder.cs index 27b16f3..6bc12a4 100644 --- a/src/Helpers/KafkaConfigs/ConsumerConfigBuilder.cs +++ b/src/Helpers/KafkaConfigs/ConsumerConfigBuilder.cs @@ -6,25 +6,37 @@ public static class ConsumerConfigBuilder { public static ConsumerConfig WithAutoOffsetReset(this ConsumerConfig config, AutoOffsetReset? autoOffsetReset) { - config.AutoOffsetReset = autoOffsetReset; + if (autoOffsetReset is not null) + { + config.AutoOffsetReset = autoOffsetReset; + } return config; } public static ConsumerConfig WithGroupId(this ConsumerConfig config, string groupId) { - config.GroupId = groupId; + if (!string.IsNullOrWhiteSpace(groupId)) + { + config.GroupId = groupId; + } return config; } public static ConsumerConfig WithEnableAutoCommit(this ConsumerConfig config, bool? autoCommit) { - config.EnableAutoCommit = autoCommit; + if (autoCommit is not null) + { + config.EnableAutoCommit = autoCommit; + } return config; } public static ConsumerConfig WithEnableAutoOffsetStore(this ConsumerConfig config, bool? autoOffsetStore) { - config.EnableAutoOffsetStore = autoOffsetStore; + if (autoOffsetStore is not null) + { + config.EnableAutoOffsetStore = autoOffsetStore; + } return config; } } \ No newline at end of file diff --git a/src/Helpers/KafkaConfigs/ProducerConfigBuilder.cs b/src/Helpers/KafkaConfigs/ProducerConfigBuilder.cs index 115fca1..c977f8d 100644 --- a/src/Helpers/KafkaConfigs/ProducerConfigBuilder.cs +++ b/src/Helpers/KafkaConfigs/ProducerConfigBuilder.cs @@ -6,25 +6,37 @@ public static class ProducerConfigBuilder { public static ProducerConfig WithEnableIdempotence(this ProducerConfig config, bool? idempotence) { - config.EnableIdempotence = idempotence; + if (idempotence is not null) + { + config.EnableIdempotence = idempotence; + } return config; } public static ProducerConfig WithBatchSize(this ProducerConfig config, int? batchSize) { - config.BatchSize = batchSize; + if (batchSize is not null) + { + config.BatchSize = batchSize; + } return config; } public static ProducerConfig WithLingerMs(this ProducerConfig config, double? lingerMs) { - config.LingerMs = lingerMs; + if (lingerMs is not null) + { + config.LingerMs = lingerMs; + } return config; } public static ProducerConfig WithMessageTimeoutMs(this ProducerConfig config, int? messageTimeoutMs) { - config.MessageTimeoutMs = messageTimeoutMs; + if (messageTimeoutMs is not null) + { + config.MessageTimeoutMs = messageTimeoutMs; + } return config; } diff --git a/src/Services/Implementations/ConfigurationService.cs b/src/Services/Implementations/ConfigurationService.cs index 4ddeaf7..713dc5e 100644 --- a/src/Services/Implementations/ConfigurationService.cs +++ b/src/Services/Implementations/ConfigurationService.cs @@ -21,11 +21,10 @@ public ConfigurationService(IConfiguration configuration) public string RetrySuffix => GetValueOrThrowInvalidConfigException("RetrySuffix"); public string RetryTopicNameInHeader => GetValue("RetryTopicNameInHeader"); - public long MessageConsumeLimitPerTopicPartition => - GetValue("MessageConsumeLimitPerTopicPartition") ?? Int64.MaxValue; + public long MessageConsumeLimitPerTopicPartition => GetValue("MessageConsumeLimitPerTopicPartition") ?? Int64.MaxValue; - public bool EnableAutoCommit => GetValue("EnableAutoCommit") ?? false; - public bool EnableAutoOffsetStore => GetValue("EnableAutoOffsetStore") ?? false; + public bool? EnableAutoCommit => GetValue("EnableAutoCommit"); + public bool? EnableAutoOffsetStore => GetValue("EnableAutoOffsetStore"); public string GroupId => GetValueOrThrowInvalidConfigException("GroupId"); public string SaslUsername => GetValue("SaslUsername"); public string SaslPassword => GetValue("SaslPassword"); @@ -33,30 +32,15 @@ public ConfigurationService(IConfiguration configuration) public SaslMechanism? SaslMechanism => GetValue("SaslMechanism"); public string SslKeystorePassword => GetValue("SslKeystorePassword"); public SecurityProtocol? SecurityProtocol => GetValue("SecurityProtocol"); - - public bool EnableIdempotence => GetValue("ProducerEnableIdempotence") ?? - Constants.ProducerConfigDefaults.EnableIdempotence; - + public bool? EnableIdempotence => GetValue("ProducerEnableIdempotence"); public Acks? Acks => GetValue("ProducerAcks"); - - public int BatchSize => - GetValue("ProducerBatchSize") ?? Constants.ProducerConfigDefaults.BatchSize; - - public string ClientId => - GetValue("ProducerClientId") ?? Constants.ProducerConfigDefaults.ClientId; - - public double LingerMs => - GetValue("ProducerLingerMs") ?? Constants.ProducerConfigDefaults.LingerMs; - - public int MessageTimeoutMs => GetValue("ProducerMessageTimeoutMs") ?? - Constants.ProducerConfigDefaults.MessageTimeoutMs; - - public int RequestTimeoutMs => GetValue("ProducerRequestTimeoutMs") ?? - Constants.ProducerConfigDefaults.RequestTimeoutMs; - - public int MessageMaxBytes => GetValue("ProducerMessageMaxBytes") ?? - Constants.ProducerConfigDefaults.MessageMaxBytes; - + public int? BatchSize => GetValue("ProducerBatchSize"); + public string ClientId => GetValue("ProducerClientId"); + public double? LingerMs => GetValue("ProducerLingerMs"); + public int? MessageTimeoutMs => GetValue("ProducerMessageTimeoutMs"); + public int? RequestTimeoutMs => GetValue("ProducerRequestTimeoutMs"); + public int? MessageMaxBytes => GetValue("ProducerMessageMaxBytes"); + private string GetValueOrThrowInvalidConfigException(string configName) { var configValue = _configuration.GetValue(configName); diff --git a/src/Services/Implementations/KafkaService.cs b/src/Services/Implementations/KafkaService.cs index 14cc884..9560275 100644 --- a/src/Services/Implementations/KafkaService.cs +++ b/src/Services/Implementations/KafkaService.cs @@ -47,25 +47,15 @@ private ClientConfig CreateClientConfig(string bootstrapServers) ClientConfig clientConfig = new ClientConfig() .WithBootstrapServers(bootstrapServers) .WithClientId(_configuration.ClientId) - .WithMessageMaxBytes(_configuration.MessageMaxBytes); + .WithMessageMaxBytes(_configuration.MessageMaxBytes) + .WithSaslUsername(_configuration.SaslUsername) + .WithSaslPassword(_configuration.SaslPassword) + .WithSslCaLocation(_configuration.SslCaLocation) + .WithSaslMechanism(_configuration.SaslMechanism) + .WithSecurityProtocol(_configuration.SecurityProtocol) + .WithSslKeystorePassword(_configuration.SslKeystorePassword) + .WithAcks(_configuration.Acks); - if (_configuration.SaslMechanism is not null) - { - clientConfig = clientConfig - .WithSaslUsername(_configuration.SaslUsername) - .WithSaslPassword(_configuration.SaslPassword) - .WithSslCaLocation(_configuration.SslCaLocation) - .WithSaslMechanism(_configuration.SaslMechanism) - .WithSecurityProtocol(_configuration.SecurityProtocol) - .WithSslKeystorePassword(_configuration.SslKeystorePassword); - } - - if (_configuration.Acks is not null) - { - clientConfig = clientConfig - .WithAcks(_configuration.Acks); - } - return clientConfig; } @@ -106,7 +96,7 @@ private ConsumerConfig CreateConsumerConfig(string bootstrapServers, string grou public Action, ConsumeResult> GetConsumerCommitStrategy() { - return _configuration.EnableAutoCommit ? + return _configuration.EnableAutoCommit is true ? (assignedConsumer, result) => { assignedConsumer.StoreOffset(result); From b8d92aa0c3dc2e4be7f0f85df8c0086ee9c11156 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Thu, 21 Mar 2024 11:14:31 +0300 Subject: [PATCH 6/6] #35 update the version --- .github/workflows/publish.yml | 2 +- .gitlab-ci.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index c3172d7..5e21781 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -19,7 +19,7 @@ jobs: name: Build & push Docker image with: image: kafka-retry-job - tags: 1.12.2, latest + tags: 1.12.3, latest registry: ghcr.io username: ${{ secrets.GHCR_USERNAME }} password: ${{ secrets.GHCR_TOKEN }} diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index d192d1e..2a3db68 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,6 +1,6 @@ variables: - VERSION: "1.12.2" + VERSION: "1.12.3" DOCKER_IMAGE_VERSION: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH:$VERSION DOCKER_IMAGE_LATEST: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH