Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error types should be modeled more strongly #241

Open
jypma opened this issue Nov 16, 2020 · 3 comments
Open

Error types should be modeled more strongly #241

jypma opened this issue Nov 16, 2020 · 3 comments
Milestone

Comments

@jypma
Copy link
Contributor

jypma commented Nov 16, 2020

Right now, all zio-kafka effect types have Throwable as their error type, requiring users to look into the kafka API itself to find out what exceptions to expect.

An argument could be made to introduce concrete error types for the expected error scenarios, and use those instead of Throwable. This can have some clear advantages:

  • We can show the difference between a stream that silently reconnects, vs. a (not implemented yet) stream that fails when kafka's connection goes down (as requested in Close consumer when it cannot connect to the broker #175 ).

  • We can model deserialization errors into this type, if desired, as an alternative to the current .asTry construct.

In fact, in its current state, which exceptions can be expected in that Throwable?

@svroonland
Copy link
Collaborator

I like this idea a lot, at the very least as an experiment to see how feasible this is and what would be a good approach. I've been considering doing the same for zio-kinesis.

Taking def partitionedStream as a starting point, which can fail with any Throwable, we'd have to look into the call hierarchy to see which exceptions we can expect. Methods that are called on KafkaConsumer as part of that hierarchy are:

  • poll
  • resume
  • pause
  • assignment
  • seek

For pause, resume, assignment we can refineOrDie on the IllegalStateException, this is already done in some places using ZIO.effectTotal.

Most of the exceptions can come from poll:

    /**     
     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of
     *             partitions is undefined or out of range and no offset reset policy has been configured
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     *             function is called
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *             this function is called
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed
     *             topics or to the configured groupId. See the exception for more details
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
     *             session timeout, errors deserializing key/value pairs, your rebalance callback thrown exceptions,
     *             or any new error cases in future versions)
     * @throws java.lang.IllegalArgumentException if the timeout value is negative
     * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
     *             partitions to consume from
     * @throws java.lang.ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
     * @throws org.apache.kafka.common.errors.InvalidTopicException if the current subscription contains any invalid
     *             topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
     * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the consumer attempts to fetch stable offsets
     *             when the broker doesn't support this feature
     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
     */

Some of these we should (hopefully) be able to get rid of as zio-kafka programming errors using refineOrDie, because they should not happen the way we have programmed zio-kafka, like:

  • InterruptionException
  • IllegalStateException
  • WakeupException

You would get a fiber failure in such a case. But that still leaves like 20 of them which would need to be modeled into a custom error type or otherwise eliminated, including KafkaException which has 123 subclasses..(!).

I see three options:

  1. Leave error type as Throwable (as is know)
  2. Create an ADT including every possible of these exceptions.
  • Lot of work
  • Some of them may never be thrown from our call hierarchy
  • There still needs to be a catch-all error like UnknownException
  1. Pick out some of them which we explicitly want the client to include in the error ADT.
  • Looking at the 123 subclasses, there's so many things that can go wrong when consuming from kafka.
  • The selection may be very subjective

Besides poll, commit errors are not passed to the error channel of partitionedStream but have to be handled by the caller of CommittableRecord.offset#commit. For committing using commitAsync we can expect a org.apache.kafka.common.errors.RebalanceInProgressException if the commit failed because it is in the middle of a rebalance. In such cases commit could be retried after the rebalance is completed with the {@link KafkaConsumer#poll(Duration)} call.. This is something we might want to handle internally by retrying anyway (@iravid?).

Looking forward to your feedback @jypma @iravid .

@jypma
Copy link
Contributor Author

jypma commented Nov 17, 2020

I like the approach of gradually building an ADT, including a catch-all-like type like OtherError(x: KafkaException).

That way, we can start with a few known error-cases, and incrementally extend the error types with actual real-life cases that do occur. Subjectivity should be less of a factor that way, since the ADT hierarchy should be able to mirror the exception hierarchy.

We could model the OtherError type deprecated from the get-go, saying that we'd prefer a PR for a new type, rather than users relying on OtherError. Perhaps even UnmodeledError would be a more apt name?

We'd only have to push the IllegalArgumentException, IllegalStateException and friends into some more-specific types, or get rid of them (e.g. "if the offset is negative", that probably should be caught a lot earlier).

@iravid
Copy link
Member

iravid commented Mar 31, 2021

Big 👍 on this!

svroonland added a commit that referenced this issue Nov 3, 2024
Implements #241

Build still fails because test and bench projects have not been migrated yet, first want to gather some feedback on this change.
@svroonland svroonland added this to the 3.0.0 milestone Nov 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants