diff --git a/src/RdKafka/Internal/SafeTopicHandle.cs b/src/RdKafka/Internal/SafeTopicHandle.cs index 6f2ad0d..175e903 100644 --- a/src/RdKafka/Internal/SafeTopicHandle.cs +++ b/src/RdKafka/Internal/SafeTopicHandle.cs @@ -27,15 +27,15 @@ protected override bool ReleaseHandle() internal string GetName() => Marshal.PtrToStringAnsi(LibRdKafka.topic_name(handle)); - internal long Produce(byte[] payload, byte[] key, int partition, IntPtr opaque) + internal long Produce(byte[] payload, int payloadCount, byte[] key, int keyCount, int partition, IntPtr opaque) => (long) LibRdKafka.produce( handle, partition, (IntPtr) MsgFlags.MSG_F_COPY, - payload, (UIntPtr) (payload?.Length ?? 0), - key, (UIntPtr) (key?.Length ?? 0), + payload, (UIntPtr) payloadCount, + key, (UIntPtr) keyCount, opaque); - + internal bool PartitionAvailable(int partition) => LibRdKafka.topic_partition_available(handle, partition); } } diff --git a/src/RdKafka/Topic.cs b/src/RdKafka/Topic.cs index d3e1f69..0f402e3 100644 --- a/src/RdKafka/Topic.cs +++ b/src/RdKafka/Topic.cs @@ -61,11 +61,16 @@ public void Dispose() public string Name => handle.GetName(); public Task Produce(byte[] payload, byte[] key = null, Int32 partition = RD_KAFKA_PARTITION_UA) + { + return Produce(payload, payload?.Length ?? 0, key, key?.Length ?? 0, partition); + } + + public Task Produce(byte[] payload, int payloadCount, byte[] key = null, int keyCount = 0, Int32 partition = RD_KAFKA_PARTITION_UA) { // Passes the TaskCompletionSource to the delivery report callback // via the msg_opaque pointer var deliveryCompletionSource = new TaskDeliveryHandler(); - Produce(payload, key, partition, deliveryCompletionSource); + Produce(payload, payloadCount, key, keyCount, partition, deliveryCompletionSource); return deliveryCompletionSource.Task; } @@ -81,19 +86,37 @@ public Task Produce(byte[] payload, byte[] key = null, Int32 par /// Use this overload for high-performance use cases as it does not use TPL and reduces the number of allocations. public void Produce(byte[] payload, IDeliveryHandler deliveryHandler, byte[] key = null, Int32 partition = RD_KAFKA_PARTITION_UA) { - if(deliveryHandler==null) + Produce(payload, payload?.Length ?? 0, deliveryHandler, key, key?.Length ?? 0, partition); + } + + /// + /// Produces a keyed message to a partition of the current Topic and notifies the caller of progress via a callback interface. + /// + /// Payload to send to Kafka. Can be null. + /// Number of bytes to use from payload buffer + /// IDeliveryHandler implementation used to notify the caller when the given produce request completes or an error occurs. + /// (Optional) The key associated with (or null if no key is specified). + /// Number of bytes to use from key buffer + /// (Optional) The topic partition to which will be sent (or -1 if no partition is specified). + /// Thrown if is null. + /// Methods of will be executed in an RdKafka-internal thread and will block other operations - consider this when implementing IDeliveryHandler. + /// Use this overload for high-performance use cases as it does not use TPL and reduces the number of allocations. + public void Produce(byte[] payload, int payloadCount, IDeliveryHandler deliveryHandler, byte[] key = null, int keyCount = 0, Int32 partition = RD_KAFKA_PARTITION_UA) + { + if (deliveryHandler == null) throw new ArgumentNullException(nameof(deliveryHandler)); - Produce(payload, key, partition, deliveryHandler); + Produce(payload, payloadCount, key, keyCount, partition, deliveryHandler); } - private void Produce(byte[] payload, byte[] key, Int32 partition, object deliveryHandler) + + private void Produce(byte[] payload, int payloadCount, byte[] key, int keyCount, Int32 partition, object deliveryHandler) { var gch = GCHandle.Alloc(deliveryHandler); var ptr = GCHandle.ToIntPtr(gch); while (true) { - if (handle.Produce(payload, key, partition, ptr) == 0) + if (handle.Produce(payload, payloadCount, key, keyCount, partition, ptr) == 0) { // Successfully enqueued produce request break;