Skip to content

Commit

Permalink
Allow specifying payload and key buffer lengths for Produce()
Browse files Browse the repository at this point in the history
  • Loading branch information
David Chapman authored and David Chapman committed Nov 4, 2016
1 parent ef97de4 commit 2db622c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 9 deletions.
8 changes: 4 additions & 4 deletions src/RdKafka/Internal/SafeTopicHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ protected override bool ReleaseHandle()

internal string GetName() => Marshal.PtrToStringAnsi(LibRdKafka.topic_name(handle));

internal long Produce(byte[] payload, byte[] key, int partition, IntPtr opaque)
internal long Produce(byte[] payload, int payloadCount, byte[] key, int keyCount, int partition, IntPtr opaque)
=> (long) LibRdKafka.produce(
handle,
partition,
(IntPtr) MsgFlags.MSG_F_COPY,
payload, (UIntPtr) (payload?.Length ?? 0),
key, (UIntPtr) (key?.Length ?? 0),
payload, (UIntPtr) payloadCount,
key, (UIntPtr) keyCount,
opaque);

internal bool PartitionAvailable(int partition) => LibRdKafka.topic_partition_available(handle, partition);
}
}
33 changes: 28 additions & 5 deletions src/RdKafka/Topic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,16 @@ public void Dispose()
public string Name => handle.GetName();

public Task<DeliveryReport> Produce(byte[] payload, byte[] key = null, Int32 partition = RD_KAFKA_PARTITION_UA)
{
return Produce(payload, payload?.Length ?? 0, key, key?.Length ?? 0, partition);
}

public Task<DeliveryReport> Produce(byte[] payload, int payloadCount, byte[] key = null, int keyCount = 0, Int32 partition = RD_KAFKA_PARTITION_UA)
{
// Passes the TaskCompletionSource to the delivery report callback
// via the msg_opaque pointer
var deliveryCompletionSource = new TaskDeliveryHandler();
Produce(payload, key, partition, deliveryCompletionSource);
Produce(payload, payloadCount, key, keyCount, partition, deliveryCompletionSource);
return deliveryCompletionSource.Task;
}

Expand All @@ -81,19 +86,37 @@ public Task<DeliveryReport> Produce(byte[] payload, byte[] key = null, Int32 par
/// Use this overload for high-performance use cases as it does not use TPL and reduces the number of allocations.</remarks>
public void Produce(byte[] payload, IDeliveryHandler deliveryHandler, byte[] key = null, Int32 partition = RD_KAFKA_PARTITION_UA)
{
if(deliveryHandler==null)
Produce(payload, payload?.Length ?? 0, deliveryHandler, key, key?.Length ?? 0, partition);
}

/// <summary>
/// Produces a keyed message to a partition of the current Topic and notifies the caller of progress via a callback interface.
/// </summary>
/// <param name="payload">Payload to send to Kafka. Can be null.</param>
/// <param name="payloadCount">Number of bytes to use from payload buffer</param>
/// <param name="deliveryHandler">IDeliveryHandler implementation used to notify the caller when the given produce request completes or an error occurs.</param>
/// <param name="key">(Optional) The key associated with <paramref name="payload"/> (or null if no key is specified).</param>
/// <param name="keyCount">Number of bytes to use from key buffer</param>
/// <param name="partition">(Optional) The topic partition to which <paramref name="payload"/> will be sent (or -1 if no partition is specified).</param>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="deliveryHandler"/> is null.</exception>
/// <remarks>Methods of <paramref name="deliveryHandler"/> will be executed in an RdKafka-internal thread and will block other operations - consider this when implementing IDeliveryHandler.
/// Use this overload for high-performance use cases as it does not use TPL and reduces the number of allocations.</remarks>
public void Produce(byte[] payload, int payloadCount, IDeliveryHandler deliveryHandler, byte[] key = null, int keyCount = 0, Int32 partition = RD_KAFKA_PARTITION_UA)
{
if (deliveryHandler == null)
throw new ArgumentNullException(nameof(deliveryHandler));
Produce(payload, key, partition, deliveryHandler);
Produce(payload, payloadCount, key, keyCount, partition, deliveryHandler);
}

private void Produce(byte[] payload, byte[] key, Int32 partition, object deliveryHandler)

private void Produce(byte[] payload, int payloadCount, byte[] key, int keyCount, Int32 partition, object deliveryHandler)
{
var gch = GCHandle.Alloc(deliveryHandler);
var ptr = GCHandle.ToIntPtr(gch);

while (true)
{
if (handle.Produce(payload, key, partition, ptr) == 0)
if (handle.Produce(payload, payloadCount, key, keyCount, partition, ptr) == 0)
{
// Successfully enqueued produce request
break;
Expand Down

0 comments on commit 2db622c

Please sign in to comment.