Skip to content

Commit

Permalink
Less verbose delivery handler and decreased producer queue (#48)
Browse files Browse the repository at this point in the history
Converted the delivery handler to no longer return the original key, value etc to reduce memory footprint.

Reduced the max queued message size to 101 MB from 1GB when none specified.
  • Loading branch information
peter-quix authored Apr 23, 2024
1 parent ddffa39 commit ae0010b
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 6 deletions.
4 changes: 2 additions & 2 deletions builds/csharp/nuget/build_nugets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from typing import List

version = "0.7.1.0"
informal_version = "0.7.1.0-dev1"
nuget_version = "0.7.1.0-dev1"
informal_version = "0.7.1.0-dev2"
nuget_version = "0.7.1.0-dev2"


def updatecsproj(projfilepath):
Expand Down
11 changes: 8 additions & 3 deletions src/QuixStreams.Kafka/KafkaProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ private void ConfigureProduceDelegate()
private ProducerConfig GetKafkaProducerConfig(ProducerConfiguration producerConfiguration)
{
var config = producerConfiguration.ToProducerConfig();
if (string.IsNullOrWhiteSpace(config.DeliveryReportFields))
{
config.DeliveryReportFields = "none";
}

config.Debug = config.Debug;
if (!string.IsNullOrWhiteSpace(config.Debug))
{
Expand Down Expand Up @@ -492,19 +497,19 @@ private Task SendInternal(KafkaMessage message, ProducerDelegate handler, Cance
return Task.FromCanceled<DeliveryResult<byte[], byte[]>>(cancellationToken);
}

var taskSource = new TaskCompletionSource<DeliveryResult<byte[], byte[]>>(TaskCreationOptions.RunContinuationsAsynchronously);
var taskSource = new TaskCompletionSource<TopicPartitionOffset>(TaskCreationOptions.RunContinuationsAsynchronously);

void DeliveryHandler(DeliveryReport<byte[], byte[]> report)
{
if (report.Error?.IsError == true)
{
this.logger.LogTrace("[{0}] {1} {2}", this.configId, report.Error.Code, report.Error.Reason);
var wrappedError = new Error(report.Error.Code, $"[{this.configId}] {report.Error.Reason}", report.Error.IsFatal);
taskSource.SetException(new ProduceException<byte[], byte[]>(wrappedError, report));
taskSource.SetException(new KafkaException(wrappedError));
return;
}

taskSource.SetResult(report);
taskSource.SetResult(report.TopicPartitionOffset);
}

var success = false;
Expand Down
7 changes: 6 additions & 1 deletion src/QuixStreams.Kafka/ProducerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public ProducerConfiguration(string brokerList, IDictionary<string, string> prod

/// <summary>
/// Maximum total message size sum allowed on the queue. This property has higher priority than <see cref="QueueBufferingMaxMessages" />
/// default: 1048576
/// default: 103424 (101MB)
/// </summary>
public int? QueueBufferingMaxKbytes { get; set; }

Expand Down Expand Up @@ -114,6 +114,11 @@ internal ProducerConfig ToProducerConfig()
producerProperties["log_level"] = "0";
}

if (!producerProperties.ContainsKey("queue.buffering.max.kbytes") && !QueueBufferingMaxKbytes.HasValue)
{
QueueBufferingMaxKbytes = 103424;
}

var config = new ProducerConfig(producerProperties);
config.BootstrapServers ??= BrokerList;
config.QueueBufferingMaxKbytes ??= QueueBufferingMaxKbytes;
Expand Down

0 comments on commit ae0010b

Please sign in to comment.