Skip to content

Commit

Permalink
Merge pull request #73 from qed-/bufferlen
Browse files Browse the repository at this point in the history
Allow specifying payload and key buffer lengths for Produce()
  • Loading branch information
ah- authored Nov 8, 2016
2 parents ef97de4 + 559ab46 commit b4f258d
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 12 deletions.
10 changes: 5 additions & 5 deletions src/RdKafka/Internal/LibRdKafka.cs
Original file line number Diff line number Diff line change
Expand Up @@ -341,14 +341,14 @@ internal static ErrorCode committed(IntPtr rk, IntPtr partitions, IntPtr timeout
internal static ErrorCode position(IntPtr rk, IntPtr partitions)
=> _position(rk, partitions);

private static Func<IntPtr, int, IntPtr, byte[], UIntPtr, byte[], UIntPtr,
private static Func<IntPtr, int, IntPtr, IntPtr, UIntPtr, IntPtr, UIntPtr,
IntPtr, IntPtr> _produce;
internal static IntPtr produce(
IntPtr rkt,
int partition,
IntPtr msgflags,
byte[] payload, UIntPtr len,
byte[] key, UIntPtr keylen,
IntPtr payload, UIntPtr len,
IntPtr key, UIntPtr keylen,
IntPtr msg_opaque)
=> _produce(rkt, partition, msgflags, payload, len, key, keylen, msg_opaque);

Expand Down Expand Up @@ -607,8 +607,8 @@ internal static extern IntPtr rd_kafka_produce(
IntPtr rkt,
int partition,
IntPtr msgflags,
byte[] payload, UIntPtr len,
byte[] key, UIntPtr keylen,
IntPtr payload, UIntPtr len,
IntPtr key, UIntPtr keylen,
IntPtr msg_opaque);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
Expand Down
44 changes: 40 additions & 4 deletions src/RdKafka/Internal/SafeTopicHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,51 @@ protected override bool ReleaseHandle()

internal string GetName() => Marshal.PtrToStringAnsi(LibRdKafka.topic_name(handle));

internal long Produce(byte[] payload, byte[] key, int partition, IntPtr opaque)
=> (long) LibRdKafka.produce(
internal long Produce(ArraySegment<byte>? payload, ArraySegment<byte>? key, int partition, IntPtr opaque)
{
var pPayload = IntPtr.Zero;
var pKey = IntPtr.Zero;

var gchPayload= default(GCHandle);
var gchKey = default(GCHandle);

var payloadCount = 0;
var keyCount = 0;

if (payload.HasValue)
{
gchPayload = GCHandle.Alloc(payload.Value.Array, GCHandleType.Pinned);
pPayload = GCHandle.ToIntPtr(gchPayload) + payload.Value.Offset;
payloadCount = payload.Value.Count;
}

if (key.HasValue)
{
gchKey = GCHandle.Alloc(key.Value.Array, GCHandleType.Pinned);
pKey = GCHandle.ToIntPtr(gchKey) + key.Value.Offset;
keyCount = key.Value.Count;
}

try
{
return (long) LibRdKafka.produce(
handle,
partition,
(IntPtr) MsgFlags.MSG_F_COPY,
payload, (UIntPtr) (payload?.Length ?? 0),
key, (UIntPtr) (key?.Length ?? 0),
pPayload, (UIntPtr) payloadCount,
pKey, (UIntPtr) keyCount,
opaque);
}
finally
{
if (payload.HasValue)
gchPayload.Free();

if (key.HasValue)
gchKey.Free();
}
}

internal bool PartitionAvailable(int partition) => LibRdKafka.topic_partition_available(handle, partition);
}
}
24 changes: 21 additions & 3 deletions src/RdKafka/Topic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public Task<DeliveryReport> Produce(byte[] payload, byte[] key = null, Int32 par
// Passes the TaskCompletionSource to the delivery report callback
// via the msg_opaque pointer
var deliveryCompletionSource = new TaskDeliveryHandler();
Produce(payload, key, partition, deliveryCompletionSource);
Produce(payload, deliveryCompletionSource, key, partition);
return deliveryCompletionSource.Task;
}

Expand All @@ -81,12 +81,30 @@ public Task<DeliveryReport> Produce(byte[] payload, byte[] key = null, Int32 par
/// Use this overload for high-performance use cases as it does not use TPL and reduces the number of allocations.</remarks>
public void Produce(byte[] payload, IDeliveryHandler deliveryHandler, byte[] key = null, Int32 partition = RD_KAFKA_PARTITION_UA)
{
if(deliveryHandler==null)
var payloadSegment = payload == null ? (ArraySegment<byte>?) null : new ArraySegment<byte>(payload);
var keySegment = key == null ? (ArraySegment<byte>?)null : new ArraySegment<byte>(key);
Produce(payloadSegment, deliveryHandler, keySegment, partition);
}

/// <summary>
/// Produces a keyed message to a partition of the current Topic and notifies the caller of progress via a callback interface.
/// </summary>
/// <param name="payload">Payload to send to Kafka. Can be null.</param>
/// <param name="deliveryHandler">IDeliveryHandler implementation used to notify the caller when the given produce request completes or an error occurs.</param>
/// <param name="key">(Optional) The key associated with <paramref name="payload"/> (or null if no key is specified).</param>
/// <param name="partition">(Optional) The topic partition to which <paramref name="payload"/> will be sent (or -1 if no partition is specified).</param>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="deliveryHandler"/> is null.</exception>
/// <remarks>Methods of <paramref name="deliveryHandler"/> will be executed in an RdKafka-internal thread and will block other operations - consider this when implementing IDeliveryHandler.
/// Use this overload for high-performance use cases as it does not use TPL and reduces the number of allocations.</remarks>
public void Produce(ArraySegment<byte>? payload, IDeliveryHandler deliveryHandler, ArraySegment<byte>? key = null, Int32 partition = RD_KAFKA_PARTITION_UA)
{
if (deliveryHandler == null)
throw new ArgumentNullException(nameof(deliveryHandler));
Produce(payload, key, partition, deliveryHandler);
}

private void Produce(byte[] payload, byte[] key, Int32 partition, object deliveryHandler)

private void Produce(ArraySegment<byte>? payload, ArraySegment<byte>? key, Int32 partition, object deliveryHandler)
{
var gch = GCHandle.Alloc(deliveryHandler);
var ptr = GCHandle.ToIntPtr(gch);
Expand Down

0 comments on commit b4f258d

Please sign in to comment.