Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer API Design #10

Closed
mhowlett opened this issue Dec 2, 2016 · 10 comments
Closed

Producer API Design #10

mhowlett opened this issue Dec 2, 2016 · 10 comments

Comments

@mhowlett
Copy link
Contributor

mhowlett commented Dec 2, 2016

Creating an issue to track discussion around the Producer API design.

@edenhill raised the point that in some scenarios, the order in which DeliveryReport's are returned is important and the user of the library will want access to this information.

Here's how things work currently:

  1. librdkafka's conf_set_dr_msg_cb is set to point to the DeliveryReportCallbackImpl method in Producer to handle delivery report events.
  2. Producer.ProduceAsync is called to produce a message.
  3. This creates a TaskCompletionSource, a reference to which is passed to the librdkafka produce method via the opaque parameter. librdkafka's produce method doesn't block.
  4. The Producer class maintains a thread which calls the librdkafka poll method periodically.
  5. The DeliveryReportCallbackImpl gets called (on the poll thread managed by Producer) for every produce call that completes.
  6. The reference to the TaskCompletionSource is re-constructed and the Task associated with the produce call completed.

The user of the library can maintain a list of Tasks (this may be inconvenient in streaming scenarios?!?) and use methods like Task.WaitAll, Task.WaitAny, Task.WhenAny to get the DeliveryReports. But I'm pretty sure there is no straightforward way to be guaranteed to get at these in the order they were completed. For example, WaitAny will return the first Task in the supplied list that has completed. But what if there are two or more in the list and they were completed out of order? But assume this is doesn't happen - you call WaitAny, handle the DeliveryReport. Then you need to remove the associated task from the list (slow!). Then call WaitAny again. In that time, it's very likely more than one other task completed (potentially out of order).

It seems there is a solution: Jon Skeet documents how he made an OrderByCompletion method in his blog: https://codeblog.jonskeet.uk/category/eduasync/ But this approach has a lot of overhead - yet another array of size equal to the original list of Tasks for one thing. I don't see this as a viable solution.

Here are the alternative solutions that I can see:

  1. Maintain a sequence number in Producer. When the delivery report callback is called, increment this sequence number and include it in the delivery report. This solution might work ok if the user of the library is calling Task.WaitAll for example. Once all tasks are done, they can iterate through the list and check for out of order calls. Still, that'd be annoying.

  2. Have a variant of the ProduceAsync method that takes a callback parameter. This gets called on the Producers poll thread and blocks it. rdkafka-dotnet has this as a relatively recent addition I think (do we now understand why?!?). It's not especially idiomatic.

  3. Have an OnDeliveryReport event on Producer which gets raised (on the poll thread) whenever there is a DeliveryReport callback. It blocks the poll thread. This is less flexible than RdKafka -> Confluent.Kafka #2 but more idiomatic and I can't see why flexibility to add different callbacks to different produce calls will be a strong requirement in practice. It will be less efficient than RdKafka -> Confluent.Kafka #2. I think this is a nicer API than RdKafka -> Confluent.Kafka #2, because it requires no effort by the user unless they want to use it.

  4. Expose a .Poll method on Producer and don't have a separate thread managed by Producer doing the polling. Possibly give the user a choice between this having the producer manage the thread (similar to one of the options for Consumer). i.e. also have a .Start() method. The Start method would start a background thread, not block.

We could provide multiple options without affecting the API surface area very much. We could include a sequence number in all delivery reports + have an OnDeliveryReport + have both .Poll and .Start, possibly which to use (poll or start) set by a constructor argument that defaults to having a background thread.

@edenhill
Copy link
Contributor

edenhill commented Dec 2, 2016

My prioritized vote is:

  • 2 + 4
  • 2 + 3 + 4
  • 2
  • 3+4

(see how there is no mention of 1?!)

Having per-msg delivery reports is the most flexible design (from the application's perspective), overhead should be negligible since per-msg state will need to be maintained between produce() and dr_cb() by the client internals anyway, this is just storing an additional and optional callable too along with the other info.

The python client has 2+3+4 (per-msg on_delivery, global on_delivery, poll() call).

Is .Start() something idiomatic? I dont like it, the other clients "start" automatically in the constructor.

@mhowlett
Copy link
Contributor Author

mhowlett commented Dec 2, 2016

yep, I don't like 1 either.

And no, I don't think Start is idiomatic - i was just thinking of it as a way to choose background thread vs polling mode... but by the last paragraph I'd pretty much decided that choice should be in the constructor only. So, yep, I'm in agreement here.

There is no extra overhead of per-msg callbacks i think (if there is only one of them) and this will be more efficient than having an OnDeliveryReport event (+ error event).

I don't think many people will use .Poll in practice, but i like having a single threaded option. Also, it's a natural place in which to document what's going on with threads in the docs.

So I think i'm in agreement with your #1 prioritized vote.

@ewencp
Copy link

ewencp commented Dec 2, 2016

Can't you get this just by using that Task's ContinueWith functionality? Am I missing something about that behavior that would cause it to invoke them out of order?

@mhowlett
Copy link
Contributor Author

mhowlett commented Dec 2, 2016

With ContinueWith, the code'd look something like:

    public static void myFunc(Task<DeliveryReport> r)
    {
        // do whatever here
    }

    ... 
    while (true)
    {
        ...
        Task <DeliveryReport> deliveryReport = producer.ProduceAsync(topicName, key, val);
        deliveryReport.ContinueWith(myFunc);
        ...
    }

myFunc is run asynchronously at a time and on a thread as determined by a task scheduler. the default task scheduler draws from 10 or something threads and can do fancy stuff (like work stealing, inlining, etc and i don't know how fancy the queuing gets). Anyway I don't trust it to maintain the order of Task completion -> call of myFunc. There's got to be race conditions in there somewhere as well. Also, I haven't looked into the mechanism as to how the task scheduler finds out about completed tasks, and whether ordering is maintained there (I'm guessing it probably is, or can be, but I don't know the details). Also, I don't really like that synchronization will be required in myFunc.

Something that might be possible is write a custom scheduler that only runs tasks on one extra thread, queuing them in a simple manner in the meantime. I don't know the details around how much extra work this is, but I think it'd be less straight forward than the other ideas - and less transparent to the user what's going on.

Note that we're talking about adding an additional ProduceAsync method which takes an IDeliveryHandler instance (with SetResult and SetException methods). The Task returning ProduceAsync methods will still be there, so if there is an alternative using them it'll still be possible. On the other hand, we do want to reduce the API surface area if possible.

@mhowlett
Copy link
Contributor Author

@ewencp @edenhill

Thoughts on ProduceAsync method signatures on the non-serializing producer:

Currently I have:

public Task<MessageInfo> ProduceAsync(string topic, byte[] key, byte[] val, int? partition = null, bool blockIfQueueFull = true)
public Task<MessageInfo> ProduceAsync(string topic, ArraySegment<byte>? key, ArraySegment<byte>? val, int? partition = null, bool blockIfQueueFull = true)
public Task<MessageInfo> ProduceAsync(string topic, Message message, int? partition = null, bool blockIfQueueFull = true)
// and variants that return void + take IDeliveryHandler instead.

Here, Message encapsulates the what:

        public ArraySegment<byte>? Value { get; set; }
        public ArraySegment<byte>? Key { get; set; }
        public DateTime? Timestamp { get; set; }

Two unfortunate things:

  1. ArraySegment is a struct - allowing for null values means making it nullable, which is slightly awkward and brings the heap into play.
  2. To allow copying of data to be avoided where a subset of an existing array holds the desired message and timestamps are to be specified, ArraySegment? is required in the Message class. However, that seems a bit awkward - not as encapsulated as it should be (ideally i'd prefer byte[]'s here).

These points are leading me (fairly strongly) towards favoring the following:

public Task<MessageInfo> ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, DateTime? timestamp = null, int? partition = null, bool blockIfQueueFull = true)
public Task<MessageInfo> ProduceAsync(string topic, byte[] key, byte[] val, DateTime? timestamp = null, int? partition = null, bool blockIfQueueFull = true)
// and variants that return void + take IDeliveryHandler instead.

which is consistent with other .NET APIs, and is simpler than what is currently there (a sign that it's the best option). These two methods will extend easily enough to allow headers and transactions.

@edenhill
Copy link
Contributor

edenhill commented Dec 21, 2016

The second of your favored functions looks good, but the former one that takes byte[],Offset,Length looks a bit verbose and I understand that ArraySegment provides an abstraction for it.
But you are saying it requires a heap allocation, if that's the case we need to measure the real performance implications of using ArraySegment vs byte triplets to see if it even matters enough to leave it out.

@mhowlett
Copy link
Contributor Author

mhowlett commented Dec 22, 2016

ArraySegment requires the byte array it wraps to be defined.. so in order to allow for null values, which we want, it needs to be wrapped by Nullable (heap allocated) for which the ? is shorthand notation. I'm not happy about the verbosity either, but:

  1. this variant probably isn't going to be greatly used - it's there to provide a maximally performant option for people with values in subarrays (not uncommon, eg StringBuilder) who want to avoid copying the subarray into a new byte[]. People wanting this option will be prioritising performance and will appreciate a maximally performant interface I think (even if there isn't much in it).
  2. ArraySegment? just seems awkward to me.
  3. it's not inconsistent with other parts of the .NET framework API (though to be fair what I'm thinking of predates ArraySegment).

@edenhill
Copy link
Contributor

Is the underlying array actually copied to the ArraySegment, it seems to me ArraySegment just wraps the array with an additional offset and count. But there seems to be a heap allocation for the ArraySegment struct itself.. Again: The cost needs to be measured so we can make an educated decision. Probably want to compare the following matrices: [triplets,ArraySegment] * [(null key, null value), (null key, value), (key, value)]

rdkafka-dotnet initially got a PR for the triplet approach which was then changed to ArraySegment, refs:
https://github.com/ah-/rdkafka-dotnet/pull/73
https://github.com/ah-/rdkafka-dotnet/pull/79

@mhowlett
Copy link
Contributor Author

A heap alloc is required for the ArraySegment only because we need it to be nullable. Yes, it's just a simple wrapper around an array with offset and length and I'd be 100% completely in favour of it if it wasn't for the need to make it nullable.

Perf: I doubt the performance difference is going to be significant (I'll need to move to Windows land to get a memory profiler to check it out properly), however I still prefer the expanded interface as a matter of style (perf interface for perf freaks, and it's no one else is going to care). I know that is controversial...

Didn't know about those PRs to rdkafka-dotnet. They never made it in... though it does look like there is no good reason for that.

@mhowlett
Copy link
Contributor Author

In the up-coming commit (that makes the above changes), I'll use the ArraySegment? alternative because this is the minimal change from the version I currently have. However I'm still of the opinion this should be changed.

Another argument for the longer method signature is I think people would more rarely have an ArraySegment ready already - they'll have to create one for this purpose. So it'd be a more long winded, difficult to use interface (note: my intuition on how frequently people have ArraySegment handy already could be wrong, i'll need to research more).

mapr-devops pushed a commit to mapr/confluent-kafka-dotnet that referenced this issue Nov 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants