Skip to content

Commit

Permalink
#35 update builder pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
furkan.kavraz committed Mar 12, 2024
1 parent da020b2 commit 1d52589
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 64 deletions.
50 changes: 40 additions & 10 deletions src/Helpers/KafkaConfigs/ClientConfigBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,61 +6,91 @@ public static class ClientConfigBuilder
{
public static ClientConfig WithBootstrapServers(this ClientConfig config, string bootstrapServers)
{
config.BootstrapServers = bootstrapServers;
if (!string.IsNullOrWhiteSpace(bootstrapServers))
{
config.BootstrapServers = bootstrapServers;
}
return config;
}

public static ClientConfig WithSaslUsername(this ClientConfig config, string username)
{
config.SaslUsername = username;
if (!string.IsNullOrWhiteSpace(username))
{
config.SaslUsername = username;
}
return config;
}

public static ClientConfig WithSaslPassword(this ClientConfig config, string password)
{
config.SaslPassword = password;
if (!string.IsNullOrWhiteSpace(password))
{
config.SaslPassword = password;
}
return config;
}

public static ClientConfig WithSslCaLocation(this ClientConfig config, string sslCaLocation)
{
config.SslCaLocation = sslCaLocation;
if (!string.IsNullOrWhiteSpace(sslCaLocation))
{
config.SslCaLocation = sslCaLocation;
}
return config;
}

public static ClientConfig WithSaslMechanism(this ClientConfig config, SaslMechanism? saslMechanism)
{
config.SaslMechanism = saslMechanism;
if (saslMechanism is not null)
{
config.SaslMechanism = saslMechanism;
}
return config;
}

public static ClientConfig WithSecurityProtocol(this ClientConfig config, SecurityProtocol? securityProtocol)
{
config.SecurityProtocol = securityProtocol;
if (securityProtocol is not null)
{
config.SecurityProtocol = securityProtocol;
}
return config;
}

public static ClientConfig WithSslKeystorePassword(this ClientConfig config, string sslKeystorePassword)
{
config.SslKeystorePassword = sslKeystorePassword;
if (!string.IsNullOrWhiteSpace(sslKeystorePassword))
{
config.SslKeystorePassword = sslKeystorePassword;
}
return config;
}

public static ClientConfig WithClientId(this ClientConfig config, string clientId)
{
config.ClientId = clientId;
if (!string.IsNullOrWhiteSpace(clientId))
{
config.ClientId = clientId;
}
return config;
}

public static ClientConfig WithMessageMaxBytes(this ClientConfig config, int? messageMaxBytes)
{
config.MessageMaxBytes = messageMaxBytes;
if (messageMaxBytes is not null)
{
config.MessageMaxBytes = messageMaxBytes;
}
return config;
}

public static ClientConfig WithAcks(this ClientConfig config, Acks? acks)
{
config.Acks = acks;
if (acks is not null)
{
config.Acks = acks;
}
return config;
}
}
20 changes: 16 additions & 4 deletions src/Helpers/KafkaConfigs/ConsumerConfigBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,37 @@ public static class ConsumerConfigBuilder
{
public static ConsumerConfig WithAutoOffsetReset(this ConsumerConfig config, AutoOffsetReset? autoOffsetReset)
{
config.AutoOffsetReset = autoOffsetReset;
if (autoOffsetReset is not null)
{
config.AutoOffsetReset = autoOffsetReset;
}
return config;
}

public static ConsumerConfig WithGroupId(this ConsumerConfig config, string groupId)
{
config.GroupId = groupId;
if (!string.IsNullOrWhiteSpace(groupId))
{
config.GroupId = groupId;
}
return config;
}

public static ConsumerConfig WithEnableAutoCommit(this ConsumerConfig config, bool? autoCommit)
{
config.EnableAutoCommit = autoCommit;
if (autoCommit is not null)
{
config.EnableAutoCommit = autoCommit;
}
return config;
}

public static ConsumerConfig WithEnableAutoOffsetStore(this ConsumerConfig config, bool? autoOffsetStore)
{
config.EnableAutoOffsetStore = autoOffsetStore;
if (autoOffsetStore is not null)
{
config.EnableAutoOffsetStore = autoOffsetStore;
}
return config;
}
}
20 changes: 16 additions & 4 deletions src/Helpers/KafkaConfigs/ProducerConfigBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,37 @@ public static class ProducerConfigBuilder
{
public static ProducerConfig WithEnableIdempotence(this ProducerConfig config, bool? idempotence)
{
config.EnableIdempotence = idempotence;
if (idempotence is not null)
{
config.EnableIdempotence = idempotence;
}
return config;
}

public static ProducerConfig WithBatchSize(this ProducerConfig config, int? batchSize)
{
config.BatchSize = batchSize;
if (batchSize is not null)
{
config.BatchSize = batchSize;
}
return config;
}

public static ProducerConfig WithLingerMs(this ProducerConfig config, double? lingerMs)
{
config.LingerMs = lingerMs;
if (lingerMs is not null)
{
config.LingerMs = lingerMs;
}
return config;
}

public static ProducerConfig WithMessageTimeoutMs(this ProducerConfig config, int? messageTimeoutMs)
{
config.MessageTimeoutMs = messageTimeoutMs;
if (messageTimeoutMs is not null)
{
config.MessageTimeoutMs = messageTimeoutMs;
}
return config;
}

Expand Down
38 changes: 11 additions & 27 deletions src/Services/Implementations/ConfigurationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,42 +21,26 @@ public ConfigurationService(IConfiguration configuration)
public string RetrySuffix => GetValueOrThrowInvalidConfigException("RetrySuffix");
public string RetryTopicNameInHeader => GetValue<string>("RetryTopicNameInHeader");

public long MessageConsumeLimitPerTopicPartition =>
GetValue<long?>("MessageConsumeLimitPerTopicPartition") ?? Int64.MaxValue;
public long MessageConsumeLimitPerTopicPartition => GetValue<long?>("MessageConsumeLimitPerTopicPartition") ?? Int64.MaxValue;

public bool EnableAutoCommit => GetValue<bool?>("EnableAutoCommit") ?? false;
public bool EnableAutoOffsetStore => GetValue<bool?>("EnableAutoOffsetStore") ?? false;
public bool? EnableAutoCommit => GetValue<bool?>("EnableAutoCommit");
public bool? EnableAutoOffsetStore => GetValue<bool?>("EnableAutoOffsetStore");
public string GroupId => GetValueOrThrowInvalidConfigException("GroupId");
public string SaslUsername => GetValue<string>("SaslUsername");
public string SaslPassword => GetValue<string>("SaslPassword");
public string SslCaLocation => GetValue<string>("SslCaLocation");
public SaslMechanism? SaslMechanism => GetValue<SaslMechanism?>("SaslMechanism");
public string SslKeystorePassword => GetValue<string>("SslKeystorePassword");
public SecurityProtocol? SecurityProtocol => GetValue<SecurityProtocol?>("SecurityProtocol");

public bool EnableIdempotence => GetValue<bool?>("ProducerEnableIdempotence") ??
Constants.ProducerConfigDefaults.EnableIdempotence;

public bool? EnableIdempotence => GetValue<bool?>("ProducerEnableIdempotence");
public Acks? Acks => GetValue<Acks?>("ProducerAcks");

public int BatchSize =>
GetValue<int?>("ProducerBatchSize") ?? Constants.ProducerConfigDefaults.BatchSize;

public string ClientId =>
GetValue<string>("ProducerClientId") ?? Constants.ProducerConfigDefaults.ClientId;

public double LingerMs =>
GetValue<double?>("ProducerLingerMs") ?? Constants.ProducerConfigDefaults.LingerMs;

public int MessageTimeoutMs => GetValue<int?>("ProducerMessageTimeoutMs") ??
Constants.ProducerConfigDefaults.MessageTimeoutMs;

public int RequestTimeoutMs => GetValue<int?>("ProducerRequestTimeoutMs") ??
Constants.ProducerConfigDefaults.RequestTimeoutMs;

public int MessageMaxBytes => GetValue<int?>("ProducerMessageMaxBytes") ??
Constants.ProducerConfigDefaults.MessageMaxBytes;

public int? BatchSize => GetValue<int?>("ProducerBatchSize");
public string ClientId => GetValue<string>("ProducerClientId");
public double? LingerMs => GetValue<double?>("ProducerLingerMs");
public int? MessageTimeoutMs => GetValue<int?>("ProducerMessageTimeoutMs");
public int? RequestTimeoutMs => GetValue<int?>("ProducerRequestTimeoutMs");
public int? MessageMaxBytes => GetValue<int?>("ProducerMessageMaxBytes");

private string GetValueOrThrowInvalidConfigException(string configName)
{
var configValue = _configuration.GetValue<string>(configName);
Expand Down
28 changes: 9 additions & 19 deletions src/Services/Implementations/KafkaService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,15 @@ private ClientConfig CreateClientConfig(string bootstrapServers)
ClientConfig clientConfig = new ClientConfig()
.WithBootstrapServers(bootstrapServers)
.WithClientId(_configuration.ClientId)
.WithMessageMaxBytes(_configuration.MessageMaxBytes);
.WithMessageMaxBytes(_configuration.MessageMaxBytes)
.WithSaslUsername(_configuration.SaslUsername)
.WithSaslPassword(_configuration.SaslPassword)
.WithSslCaLocation(_configuration.SslCaLocation)
.WithSaslMechanism(_configuration.SaslMechanism)
.WithSecurityProtocol(_configuration.SecurityProtocol)
.WithSslKeystorePassword(_configuration.SslKeystorePassword)
.WithAcks(_configuration.Acks);

if (_configuration.SaslMechanism is not null)
{
clientConfig = clientConfig
.WithSaslUsername(_configuration.SaslUsername)
.WithSaslPassword(_configuration.SaslPassword)
.WithSslCaLocation(_configuration.SslCaLocation)
.WithSaslMechanism(_configuration.SaslMechanism)
.WithSecurityProtocol(_configuration.SecurityProtocol)
.WithSslKeystorePassword(_configuration.SslKeystorePassword);
}

if (_configuration.Acks is not null)
{
clientConfig = clientConfig
.WithAcks(_configuration.Acks);
}

return clientConfig;
}

Expand Down Expand Up @@ -106,7 +96,7 @@ private ConsumerConfig CreateConsumerConfig(string bootstrapServers, string grou

public Action<IConsumer<string, string>, ConsumeResult<string, string>> GetConsumerCommitStrategy()
{
return _configuration.EnableAutoCommit ?
return _configuration.EnableAutoCommit is true ?
(assignedConsumer, result) =>
{
assignedConsumer.StoreOffset(result);
Expand Down

0 comments on commit 1d52589

Please sign in to comment.