Skip to content
Andreas Heider edited this page Apr 24, 2016 · 2 revisions

See also https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md and https://github.com/edenhill/librdkafka/wiki.

Something strange is going on, how do I find out what RdKafka is doing?

Set the debug config option to get detailed log information about internals. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for all supported values.

var config = new Config();
config["debug"] = "all";
using (Producer producer = new Producer(config, brokerList))

Why am I getting low throughput when awaiting Produce()?

This is caused by roundtrip latency. If you wait for every message to be delivered and acknowledged before sending the next one, throughput will be low.

The trick is to not immediately await each send individually but to have many produce requests in flight simultaneously. librdkafka will then batch up your messages and send them in bulk for you.

See SimpleProducer/Program.cs for an example of how to use e.g. ContinueWith to avoid the roundtrip latency.

How can I control where a consumer starts?

When using a balanced consumer with Subscribe you can override the committed offsets in the OnPartitionsAssigned event, by modifying the default partition assignment before passing it to Assign.

By modifying the Offset you can start from a specific offset, the beginning/end of the partition or the last committed offset.

For example, to start from the beginning:

consumer.OnPartitionsAssigned += (obj, partitions) => {
    Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
    var fromBeginning = partitions.Select(p => new TopicPartitionOffset(p.Topic, p.Partition, Offset.Beginning)).ToList();
    Console.WriteLine($"Updated assignment: [{string.Join(", ", fromBeginning)}]");
    consumer.Assign(fromBeginning);
};

How can I detect that all brokers are down?

Subscribe to the OnError event on the Consumer/Producer, which will fire ErrorCode._ALL_BROKERS_DOWN.

Clone this wiki locally