Skip to content

Commit

Permalink
Merge pull request #34 from ElifBayrakdar/feature/issue-14
Browse files Browse the repository at this point in the history
#14 Get Kafka Producer Configs from Configuration
  • Loading branch information
MehmetFiratKomurcu authored Feb 2, 2024
2 parents 551520a + f1d19ad commit ae7d973
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 120 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
name: Build & push Docker image
with:
image: kafka-retry-job
tags: 1.11.0, latest
tags: 1.12.0, latest
registry: ghcr.io
username: ${{ secrets.GHCR_USERNAME }}
password: ${{ secrets.GHCR_TOKEN }}
Expand Down
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

variables:
VERSION: "1.11.0"
VERSION: "1.12.0"
DOCKER_IMAGE_VERSION: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH:$VERSION
DOCKER_IMAGE_LATEST: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH

Expand Down
11 changes: 11 additions & 0 deletions CHANGELOG.MD
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## [1.12.0](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-01)

**Closed issues:**

- [\#14](https://github.com/Trendyol/kafka-retry-job/issues/14) Get Kafka Producer Configs from Configuration

**Merged pull requests:**

- Pull Request for the issues #14 [\#34](https://github.com/Trendyol/kafka-retry-job/pull/34) ([elifbayrakdar](https://github.com/elifbayrakdar))


## [1.11.0](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-01)

**Closed issues:**
Expand Down
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,15 @@ Here is the explanation of environment variables. Please note that the config pr
- RetryTopicNameInHeader: Retry topic name that will be presented in the header of each message to transfer them corresponding retry topic
- MessageConsumeLimitPerTopicPartition: Limit the total number of messages that can be consumed for a topic partition
- EnableAutoCommit: Enable/disable auto commit config
- EnableAutoOffsetStore: Enable/disable auto offset store config
- EnableAutoOffsetStore: Enable/disable auto offset store config
- ProducerEnableIdempotence: EnableIdempotence property of Confluent Kafka ProducerConfig
- ProducerAcks: Acks property of Confluent Kafka ProducerConfig
- ProducerBatchSize: BatchSize property of Confluent Kafka ProducerConfig
- ProducerClientId: ClientId property of Confluent Kafka ProducerConfig
- ProducerLingerMs: LingerMs property of Confluent Kafka ProducerConfig
- ProducerMessageTimeoutMs: MessageTimeoutMs property of Confluent Kafka ProducerConfig
- ProducerRequestTimeoutMs: RequestTimeoutMs property of Confluent Kafka ProducerConfig
- ProducerMessageMaxBytes: MessageMaxBytes property of Confluent Kafka ProducerConfig

## Getting Started

Expand Down
18 changes: 18 additions & 0 deletions src/Helpers/Constants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Confluent.Kafka;

namespace KafkaRetry.Job.Helpers;

public static class Constants
{
public static class ProducerConfigDefaults
{
public const bool EnableIdempotence = true;
public const Acks Acks = Confluent.Kafka.Acks.Leader;
public const int BatchSize = 1000000;
public const string ClientId = "rdkafka";
public const double LingerMs = 5;
public const int MessageTimeoutMs = 300000;
public const int RequestTimeoutMs = 30000;
public const int MessageMaxBytes = 1000000;
}
}
93 changes: 59 additions & 34 deletions src/Services/Implementations/ConfigurationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,72 @@
using KafkaRetry.Job.Helpers;
using Microsoft.Extensions.Configuration;

namespace KafkaRetry.Job.Services.Implementations
namespace KafkaRetry.Job.Services.Implementations;

public class ConfigurationService
{
public class ConfigurationService
private readonly IConfiguration _configuration;

public ConfigurationService(IConfiguration configuration)
{
private readonly IConfiguration _configuration;
_configuration = configuration;
}

public ConfigurationService(IConfiguration configuration)
{
_configuration = configuration;
}
public string BootstrapServers => GetValueOrThrowInvalidConfigException("BootstrapServers");
public string TopicRegex => GetValueOrThrowInvalidConfigException("TopicRegex");
public string ErrorSuffix => GetValueOrThrowInvalidConfigException("ErrorSuffix");
public string RetrySuffix => GetValueOrThrowInvalidConfigException("RetrySuffix");
public string RetryTopicNameInHeader => GetValue<string>("RetryTopicNameInHeader");

public string BootstrapServers => GetValueOrThrowInvalidConfigException("BootstrapServers");
public string TopicRegex => GetValueOrThrowInvalidConfigException("TopicRegex");
public string ErrorSuffix => GetValueOrThrowInvalidConfigException("ErrorSuffix");
public string RetrySuffix => GetValueOrThrowInvalidConfigException("RetrySuffix");
public string RetryTopicNameInHeader => GetValue<string>("RetryTopicNameInHeader");
public long MessageConsumeLimitPerTopicPartition => GetValue<long?>("MessageConsumeLimitPerTopicPartition") ?? Int64.MaxValue;
public bool EnableAutoCommit => GetValue<bool?>("EnableAutoCommit") ?? false;
public bool EnableAutoOffsetStore => GetValue<bool?>("EnableAutoOffsetStore") ?? false;
public string GroupId => GetValueOrThrowInvalidConfigException("GroupId");
public string SaslUsername => GetValue<string>("SaslUsername");
public string SaslPassword => GetValue<string>("SaslPassword");
public string SslCaLocation => GetValue<string>("SslCaLocation");
public SaslMechanism? SaslMechanism => GetValue<SaslMechanism?>("SaslMechanism");
public string SslKeystorePassword => GetValue<string>("SslKeystorePassword");
public SecurityProtocol? SecurityProtocol => GetValue<SecurityProtocol?>("SecurityProtocol");

private string GetValueOrThrowInvalidConfigException(string configName)
{
var configValue = _configuration.GetValue<string>(configName);
if (string.IsNullOrEmpty(configValue))
{
throw new InvalidConfigException($"{configName} {ErrorMessages.ConfigCanNotBeNullOrEmpty}");
}
public long MessageConsumeLimitPerTopicPartition =>
GetValue<long?>("MessageConsumeLimitPerTopicPartition") ?? Int64.MaxValue;

return configValue;
}
public bool EnableAutoCommit => GetValue<bool?>("EnableAutoCommit") ?? false;
public bool EnableAutoOffsetStore => GetValue<bool?>("EnableAutoOffsetStore") ?? false;
public string GroupId => GetValueOrThrowInvalidConfigException("GroupId");
public string SaslUsername => GetValue<string>("SaslUsername");
public string SaslPassword => GetValue<string>("SaslPassword");
public string SslCaLocation => GetValue<string>("SslCaLocation");
public SaslMechanism? SaslMechanism => GetValue<SaslMechanism?>("SaslMechanism");
public string SslKeystorePassword => GetValue<string>("SslKeystorePassword");
public SecurityProtocol? SecurityProtocol => GetValue<SecurityProtocol?>("SecurityProtocol");

public bool EnableIdempotence => GetValue<bool?>("ProducerEnableIdempotence") ??
Constants.ProducerConfigDefaults.EnableIdempotence;

public Acks? Acks => GetValue<Acks?>("ProducerAcks") ?? Constants.ProducerConfigDefaults.Acks;

public int BatchSize =>
GetValue<int?>("ProducerBatchSize") ?? Constants.ProducerConfigDefaults.BatchSize;

private T GetValue<T>(string configName)
public string ClientId =>
GetValue<string>("ProducerClientId") ?? Constants.ProducerConfigDefaults.ClientId;

public double LingerMs =>
GetValue<double?>("ProducerLingerMs") ?? Constants.ProducerConfigDefaults.LingerMs;

public int MessageTimeoutMs => GetValue<int?>("ProducerMessageTimeoutMs") ??
Constants.ProducerConfigDefaults.MessageTimeoutMs;

public int RequestTimeoutMs => GetValue<int?>("ProducerRequestTimeoutMs") ??
Constants.ProducerConfigDefaults.RequestTimeoutMs;

public int MessageMaxBytes => GetValue<int?>("ProducerMessageMaxBytes") ??
Constants.ProducerConfigDefaults.MessageMaxBytes;

private string GetValueOrThrowInvalidConfigException(string configName)
{
var configValue = _configuration.GetValue<string>(configName);
if (string.IsNullOrEmpty(configValue))
{
return _configuration.GetValue<T>(configName);
throw new InvalidConfigException($"{configName} {ErrorMessages.ConfigCanNotBeNullOrEmpty}");
}

return configValue;
}

private T GetValue<T>(string configName)
{
return _configuration.GetValue<T>(configName);
}
}
172 changes: 89 additions & 83 deletions src/Services/Implementations/KafkaService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,104 +2,110 @@
using Confluent.Kafka;
using KafkaRetry.Job.Services.Interfaces;

namespace KafkaRetry.Job.Services.Implementations
namespace KafkaRetry.Job.Services.Implementations;

public class KafkaService : IKafkaService
{
public class KafkaService : IKafkaService
{
private readonly ConfigurationService _configuration;
private readonly ConfigurationService _configuration;

public KafkaService(ConfigurationService configuration)
{
_configuration = configuration;
}
public KafkaService(ConfigurationService configuration)
{
_configuration = configuration;
}

public IConsumer<string, string> BuildKafkaConsumer()
{
var bootstrapServers = _configuration.BootstrapServers;
var groupId = _configuration.GroupId;
var consumerConfig = CreateConsumerConfig(bootstrapServers, groupId);
var consumerBuilder = new ConsumerBuilder<string, string>(consumerConfig);
public IConsumer<string, string> BuildKafkaConsumer()
{
var bootstrapServers = _configuration.BootstrapServers;
var groupId = _configuration.GroupId;
var consumerConfig = CreateConsumerConfig(bootstrapServers, groupId);
var consumerBuilder = new ConsumerBuilder<string, string>(consumerConfig);

return consumerBuilder.Build();
}
return consumerBuilder.Build();
}

public IProducer<string, string> BuildKafkaProducer()
{
var bootstrapServers = _configuration.BootstrapServers;
var producerConfig = CreateProducerConfig(bootstrapServers);
var producerBuilder = new ProducerBuilder<string, string>(producerConfig);
public IProducer<string, string> BuildKafkaProducer()
{
var bootstrapServers = _configuration.BootstrapServers;
var producerConfig = CreateProducerConfig(bootstrapServers);
var producerBuilder = new ProducerBuilder<string, string>(producerConfig);

return producerBuilder.Build();
}
return producerBuilder.Build();
}

public IAdminClient BuildAdminClient()
{
var bootstrapServers = _configuration.BootstrapServers;
var adminClientConfig = CreateAdminClientConfig(bootstrapServers);
var adminClientBuilder = new AdminClientBuilder(adminClientConfig);
public IAdminClient BuildAdminClient()
{
var bootstrapServers = _configuration.BootstrapServers;
var adminClientConfig = CreateAdminClientConfig(bootstrapServers);
var adminClientBuilder = new AdminClientBuilder(adminClientConfig);

return adminClientBuilder.Build();
}
return adminClientBuilder.Build();
}

private AdminClientConfig CreateAdminClientConfig(string bootstrapServers)
private AdminClientConfig CreateAdminClientConfig(string bootstrapServers)
{
return new AdminClientConfig
{
return new AdminClientConfig
{
BootstrapServers = bootstrapServers,
SaslUsername = _configuration.SaslUsername ?? string.Empty,
SaslPassword = _configuration.SaslPassword ?? string.Empty,
SslCaLocation = _configuration.SslCaLocation ?? string.Empty,
SaslMechanism = _configuration.SaslMechanism,
SecurityProtocol = _configuration.SecurityProtocol,
SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty,
};
}
BootstrapServers = bootstrapServers,
SaslUsername = _configuration.SaslUsername ?? string.Empty,
SaslPassword = _configuration.SaslPassword ?? string.Empty,
SslCaLocation = _configuration.SslCaLocation ?? string.Empty,
SaslMechanism = _configuration.SaslMechanism,
SecurityProtocol = _configuration.SecurityProtocol,
SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty
};
}

private ProducerConfig CreateProducerConfig(string bootstrapServers)
private ProducerConfig CreateProducerConfig(string bootstrapServers)
{
return new ProducerConfig
{
return new ProducerConfig
{
BootstrapServers = bootstrapServers,
SaslUsername = _configuration.SaslUsername ?? string.Empty,
SaslPassword = _configuration.SaslPassword ?? string.Empty,
SslCaLocation = _configuration.SslCaLocation ?? string.Empty,
SaslMechanism = _configuration.SaslMechanism,
SecurityProtocol = _configuration.SecurityProtocol,
SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty,
EnableIdempotence = true
};
}
BootstrapServers = bootstrapServers,
SaslUsername = _configuration.SaslUsername ?? string.Empty,
SaslPassword = _configuration.SaslPassword ?? string.Empty,
SslCaLocation = _configuration.SslCaLocation ?? string.Empty,
SaslMechanism = _configuration.SaslMechanism,
SecurityProtocol = _configuration.SecurityProtocol,
SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty,
EnableIdempotence = _configuration.EnableIdempotence,
Acks = _configuration.Acks,
BatchSize = _configuration.BatchSize,
ClientId = _configuration.ClientId,
LingerMs = _configuration.LingerMs,
MessageTimeoutMs = _configuration.MessageTimeoutMs,
RequestTimeoutMs = _configuration.RequestTimeoutMs,
MessageMaxBytes = _configuration.MessageMaxBytes
};
}

private ConsumerConfig CreateConsumerConfig(string bootstrapServers, string groupId)
private ConsumerConfig CreateConsumerConfig(string bootstrapServers, string groupId)
{
return new ConsumerConfig
{
return new ConsumerConfig
BootstrapServers = bootstrapServers,
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = groupId,
EnableAutoCommit = _configuration.EnableAutoCommit,
SaslUsername = _configuration.SaslUsername ?? string.Empty,
SaslPassword = _configuration.SaslPassword ?? string.Empty,
SslCaLocation = _configuration.SslCaLocation ?? string.Empty,
SaslMechanism = _configuration.SaslMechanism,
SecurityProtocol = _configuration.SecurityProtocol,
SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty,
EnableAutoOffsetStore = _configuration.EnableAutoOffsetStore
};
}

public Action<IConsumer<string, string>, ConsumeResult<string, string>> GetConsumerCommitStrategy()
{
return _configuration.EnableAutoCommit ?
(assignedConsumer, result) =>
{
BootstrapServers = bootstrapServers,
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = groupId,
EnableAutoCommit = _configuration.EnableAutoCommit,
SaslUsername = _configuration.SaslUsername ?? string.Empty,
SaslPassword = _configuration.SaslPassword ?? string.Empty,
SslCaLocation = _configuration.SslCaLocation ?? string.Empty,
SaslMechanism = _configuration.SaslMechanism,
SecurityProtocol = _configuration.SecurityProtocol,
SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty,
EnableAutoOffsetStore = _configuration.EnableAutoOffsetStore,
assignedConsumer.StoreOffset(result);
} :
(assignedConsumer, result) =>
{
assignedConsumer.StoreOffset(result);
assignedConsumer.Commit();
};
}

public Action<IConsumer<string, string>, ConsumeResult<string, string>> GetConsumerCommitStrategy()
{
return _configuration.EnableAutoCommit ?
(assignedConsumer, result) =>
{
assignedConsumer.StoreOffset(result);
} :
(assignedConsumer, result) =>
{
assignedConsumer.StoreOffset(result);
assignedConsumer.Commit();
};
}
}
}

0 comments on commit ae7d973

Please sign in to comment.