Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dead-letter-policy support to SCSt Pulsar Binder #373

Closed
onobc opened this issue Mar 16, 2023 · 14 comments
Closed

Add dead-letter-policy support to SCSt Pulsar Binder #373

onobc opened this issue Mar 16, 2023 · 14 comments
Assignees
Milestone

Comments

@onobc
Copy link
Collaborator

onobc commented Mar 16, 2023

No description provided.

@onobc onobc added this to the 1.0.0 milestone Mar 16, 2023
@noorkhan-92
Copy link

pulsar:
    client:
      service-url: pulsar://localhost:6650
  cloud:
    function:
      definition: notificationListener
    stream:
      bindings:
        notificationListener-in-0:
          destination: notification
          consumer:
            use-native-decoding: true

      pulsar:
        bindings:
          notificationListener-in-0:
            consumer:
              negative-ack-redelivery-delay: 1s
              dead-letter-policy:
                dead-letter-topic: notification-dlq
                max-redeliver-count: 5
              schema-type: JSON
              message-type: global.din.notification.data.dto.BrokerMessage

In this configuration the DLP doesn't apply. But also I tried the following configuration and it also doesn't apply the DLP.

pulsar:
    client:
      service-url: pulsar://localhost:6650
    consumer:
        negative-ack-redelivery-delay: 1s
        dead-letter-policy:
             dead-letter-topic: notification-dlq
             max-redeliver-count: 5
  cloud:
    function:
      definition: notificationListener
    stream:
      bindings:
        notificationListener-in-0:
          destination: notification
          consumer:
            use-native-decoding: true

      pulsar:
        bindings:
          notificationListener-in-0:
              schema-type: JSON
              message-type: global.din.notification.data.dto.BrokerMessage

@sobychacko
Copy link
Collaborator

@noorkhan-92 You use the native Pulsar DLT feature, which only works with a shared subscription type. By default, the binder uses an exclusive subscription. By changing the configuration below, the DLT works for me.

spring:
  cloud:
    stream:
       bindings:
         notificationListener-in-0:
            destination: notification
            consumer:
              use-native-decoding: true

      pulsar:
        bindings:
          notificationListener-in-0:
                consumer:
                  subscription-type: Shared
                  negative-ack-redelivery-delay: 1s
                  dead-letter-policy:
                    dead-letter-topic: notification-dlq
                    max-redeliver-count: 5
              schema-type: JSON
              message-type: global.din.notification.data.dto.BrokerMessage

We are still working through the general DLT feature for the binder across all the subscription types. In the meantime, could you confirm native DLT works for you, after making the above config changes?

Thanks!

@noorkhan-92
Copy link

@sobychacko Yes subscription-type: Shared worked for me. I think also it should be highlighted in this spring docs.
spring for apache pulsar

@noorkhan-92
Copy link

@sobychacko Shared applying dead-letter-policy, but then it redeliver the message even if it is successfully consumed.

@sobychacko
Copy link
Collaborator

Can you elaborate on that? I didn't fully understand - that sounds like a bug.

@noorkhan-92
Copy link

noorkhan-92 commented Mar 21, 2023

Sure, on consumer side when the message is consumed the broker redeliver the message again and again until the max-redeliver-count reached. When I debug I have seen that on returning from the following method it goes to some error in the SimpleInstantiationStrategy.java class line no 139 from the dependency (spring-pulsar-spring-cloud-stream-binding) I am using from this project.

@Bean
    fun notificationListener(): Consumer<Message<BrokerMessage>> = Consumer {
            message ->
        when (message.payload.notificationType) {
            NotificationType.SMS -> smsChain.sendMessage(message.payload.notification)
            NotificationType.WHATSAPP -> whatsappChain.sendMessage(message.payload.notification)
            NotificationType.EMAIL -> emailChain.sendMessage(message.payload.notification)
            NotificationType.PUSH_NOTIFICATION -> TODO()
        }
    }

@sobychacko
Copy link
Collaborator

So, you see that the delivery count works as expected, and after the count is exhausted, it is sent to the DLT, correct? But then you see that some error occurs after that?

@noorkhan-92
Copy link

Yes the deliver count work as expected and also send message to DLT on exhaustion. There is no error.
But I mean with happy flow when the message is consumed successfully then the broker should delete the message. While here it doesn't delete instead redeliver the message. And as per what I have seen this is because the consumer doesn't acknowledge instead go to some error as I have explained in my previous comment.

@sobychacko
Copy link
Collaborator

Ok, thanks! We will investigate this.

@sobychacko
Copy link
Collaborator

sobychacko commented Mar 21, 2023

@noorkhan-92 I cannot reproduce the scenario, so I need more information about the error that you are running into. For reference, here is a working binder sample. Read this comment before running the sample. As you can see here, when the consumer receives the message, it throws an exception which is handled by the framework and then negatively acknowledged. Then Pulsar will redeliver the message as many times as the redelivery counts in the DLT policy. Once that is exhausted, it is no longer redelivering the message but rather re-routed to the DLT topic. A PulsarListener method is provided at the end of the class that is a consumer of the DLT topic. Could you give us a similar Java sample that can reproduce the error you are seeing?

@onobc onobc modified the milestones: 1.0.0, 0.2.0 Mar 22, 2023
@noorkhan-92
Copy link

@sobychacko Yes on throwing exception it redeliver the message which is fine. But when the message is successfully consumed without throwing any exception then still the broker redeliver the message redeliver-count times. As you can see in the following testListener method which consume the message successfully but still the broker redeliver the message.
dlq-test
message is produced in test method via StreamBridge.

@sobychacko
Copy link
Collaborator

sobychacko commented Mar 22, 2023

Thanks for that. That was a bug in which the subscription type was not properly propagated down to the container from the binder. Addressed now through this commit.

@onobc onobc modified the milestones: 0.2.0, 1.0.0 Mar 24, 2023
@onobc
Copy link
Collaborator Author

onobc commented Sep 27, 2023

@sobychacko do you mind transferring this issue over the SCSt Pulsar Binder project?

@onobc
Copy link
Collaborator Author

onobc commented Oct 17, 2023

Closing in favor of spring-cloud/spring-cloud-stream#2830

@onobc onobc closed this as completed Oct 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants