Skip to content

Commit

Permalink
Support MSG_F_BLOCK in producer
Browse files Browse the repository at this point in the history
  • Loading branch information
ah- committed Nov 9, 2016
1 parent 6f7f461 commit dc74999
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 29 deletions.
7 changes: 4 additions & 3 deletions src/RdKafka/Internal/SafeTopicHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ namespace RdKafka.Internal
enum MsgFlags
{
MSG_F_FREE = 1,
MSG_F_COPY = 2
MSG_F_COPY = 2,
MSG_F_BLOCK = 4
}

internal sealed class SafeTopicHandle : SafeHandleZeroIsInvalid
Expand All @@ -27,11 +28,11 @@ protected override bool ReleaseHandle()

internal string GetName() => Marshal.PtrToStringAnsi(LibRdKafka.topic_name(handle));

internal long Produce(byte[] payload, int payloadCount, byte[] key, int keyCount, int partition, IntPtr opaque)
internal long Produce(byte[] payload, int payloadCount, byte[] key, int keyCount, int partition, IntPtr opaque, bool blockIfQueueFull)
=> (long) LibRdKafka.produce(
handle,
partition,
(IntPtr) MsgFlags.MSG_F_COPY,
(IntPtr) (MsgFlags.MSG_F_COPY | (blockIfQueueFull ? MsgFlags.MSG_F_BLOCK : 0)),
payload, (UIntPtr) payloadCount,
key, (UIntPtr) keyCount,
opaque);
Expand Down
38 changes: 12 additions & 26 deletions src/RdKafka/Topic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ public void Dispose()

public string Name => handle.GetName();

public Task<DeliveryReport> Produce(byte[] payload, byte[] key = null, Int32 partition = RD_KAFKA_PARTITION_UA)
public Task<DeliveryReport> Produce(byte[] payload, byte[] key = null, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true)
{
return Produce(payload, payload?.Length ?? 0, key, key?.Length ?? 0, partition);
return Produce(payload, payload?.Length ?? 0, key, key?.Length ?? 0, partition, blockIfQueueFull);
}

public Task<DeliveryReport> Produce(byte[] payload, int payloadCount, byte[] key = null, int keyCount = 0, Int32 partition = RD_KAFKA_PARTITION_UA)
public Task<DeliveryReport> Produce(byte[] payload, int payloadCount, byte[] key = null, int keyCount = 0, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true)
{
// Passes the TaskCompletionSource to the delivery report callback
// via the msg_opaque pointer
var deliveryCompletionSource = new TaskDeliveryHandler();
Produce(payload, payloadCount, key, keyCount, partition, deliveryCompletionSource);
Produce(payload, payloadCount, key, keyCount, partition, deliveryCompletionSource, blockIfQueueFull);
return deliveryCompletionSource.Task;
}

Expand All @@ -84,9 +84,9 @@ public Task<DeliveryReport> Produce(byte[] payload, int payloadCount, byte[] key
/// <exception cref="ArgumentNullException">Thrown if <paramref name="deliveryHandler"/> is null.</exception>
/// <remarks>Methods of <paramref name="deliveryHandler"/> will be executed in an RdKafka-internal thread and will block other operations - consider this when implementing IDeliveryHandler.
/// Use this overload for high-performance use cases as it does not use TPL and reduces the number of allocations.</remarks>
public void Produce(byte[] payload, IDeliveryHandler deliveryHandler, byte[] key = null, Int32 partition = RD_KAFKA_PARTITION_UA)
public void Produce(byte[] payload, IDeliveryHandler deliveryHandler, byte[] key = null, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true)
{
Produce(payload, payload?.Length ?? 0, deliveryHandler, key, key?.Length ?? 0, partition);
Produce(payload, payload?.Length ?? 0, deliveryHandler, key, key?.Length ?? 0, partition, blockIfQueueFull);
}

/// <summary>
Expand All @@ -101,38 +101,24 @@ public void Produce(byte[] payload, IDeliveryHandler deliveryHandler, byte[] key
/// <exception cref="ArgumentNullException">Thrown if <paramref name="deliveryHandler"/> is null.</exception>
/// <remarks>Methods of <paramref name="deliveryHandler"/> will be executed in an RdKafka-internal thread and will block other operations - consider this when implementing IDeliveryHandler.
/// Use this overload for high-performance use cases as it does not use TPL and reduces the number of allocations.</remarks>
public void Produce(byte[] payload, int payloadCount, IDeliveryHandler deliveryHandler, byte[] key = null, int keyCount = 0, Int32 partition = RD_KAFKA_PARTITION_UA)
public void Produce(byte[] payload, int payloadCount, IDeliveryHandler deliveryHandler, byte[] key = null, int keyCount = 0, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true)
{
if (deliveryHandler == null)
throw new ArgumentNullException(nameof(deliveryHandler));
Produce(payload, payloadCount, key, keyCount, partition, deliveryHandler);
Produce(payload, payloadCount, key, keyCount, partition, deliveryHandler, blockIfQueueFull);
}


private void Produce(byte[] payload, int payloadCount, byte[] key, int keyCount, Int32 partition, object deliveryHandler)
private void Produce(byte[] payload, int payloadCount, byte[] key, int keyCount, Int32 partition, object deliveryHandler, bool blockIfQueueFull)
{
var gch = GCHandle.Alloc(deliveryHandler);
var ptr = GCHandle.ToIntPtr(gch);

while (true)
if (handle.Produce(payload, payloadCount, key, keyCount, partition, ptr, blockIfQueueFull) != 0)
{
if (handle.Produce(payload, payloadCount, key, keyCount, partition, ptr) == 0)
{
// Successfully enqueued produce request
break;
}

var err = LibRdKafka.last_error();
if (err == ErrorCode._QUEUE_FULL)
{
// Wait and retry
Task.Delay(TimeSpan.FromMilliseconds(50)).Wait();
}
else
{
gch.Free();
throw RdKafkaException.FromErr(err, "Could not produce message");
}
gch.Free();
throw RdKafkaException.FromErr(err, "Could not produce message");
}
}

Expand Down

0 comments on commit dc74999

Please sign in to comment.