From cddb7e3b99233d773336a9aaff792c59e85a3f61 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Thu, 27 Jun 2024 10:58:10 +0800 Subject: [PATCH] [INLONG-10522][SDK] SortSDK support assgin subscription --- .../inlong/sdk/sort/api/SortClientConfig.java | 16 ++++++++++++++++ .../fetcher/kafka/KafkaMultiTopicsFetcher.java | 2 +- .../fetcher/kafka/KafkaSingleTopicFetcher.java | 2 +- .../fetcher/pulsar/PulsarMultiTopicsFetcher.java | 2 +- .../fetcher/pulsar/PulsarSingleTopicFetcher.java | 2 +- .../fetcher/tube/TubeSingleTopicFetcher.java | 2 +- 6 files changed, 21 insertions(+), 5 deletions(-) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java index b9776265b47..a354b1aced4 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java @@ -35,6 +35,7 @@ public class SortClientConfig implements Serializable { private static final long serialVersionUID = -7531960714809683830L; private final String sortTaskId; + private final String subscription; private final String sortClusterName; private InLongTopicChangeListener assignmentsListener; private ReadCallback callback; @@ -83,11 +84,22 @@ public SortClientConfig( InLongTopicChangeListener assignmentsListener, ConsumeStrategy consumeStrategy, String localIp) { + this(sortTaskId, sortClusterName, assignmentsListener, consumeStrategy, localIp, sortTaskId); + } + + public SortClientConfig( + String sortTaskId, + String sortClusterName, + InLongTopicChangeListener assignmentsListener, + ConsumeStrategy consumeStrategy, + String localIp, + String subscription) { this.sortTaskId = sortTaskId; this.sortClusterName = sortClusterName; this.assignmentsListener = assignmentsListener; this.consumeStrategy = consumeStrategy; this.localIp = localIp; + this.subscription = subscription; } public boolean isStopConsume() { @@ -102,6 +114,10 @@ public String getSortTaskId() { return sortTaskId; } + public String getSubscription() { + return subscription; + } + public String getSortClusterName() { return sortClusterName; } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java index b5cb6b121de..a24c2703890 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java @@ -106,7 +106,7 @@ public boolean init() { private KafkaConsumer createKafkaConsumer() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - properties.put(ConsumerConfig.GROUP_ID_CONFIG, context.getConfig().getSortTaskId()); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, context.getConfig().getSubscription()); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java index e88d0a93009..1c081a64ef9 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java @@ -160,7 +160,7 @@ public List getTopics() { private void createKafkaConsumer(String bootstrapServers) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - properties.put(ConsumerConfig.GROUP_ID_CONFIG, context.getConfig().getSortTaskId()); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, context.getConfig().getSubscription()); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java index bbe53d44c00..a346591e18b 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java @@ -165,7 +165,7 @@ private Consumer createConsumer(Collection newTopics) { .collect(Collectors.toList()); Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) .topics(topicNames) - .subscriptionName(context.getConfig().getSortTaskId()) + .subscriptionName(context.getConfig().getSubscription()) .subscriptionType(SubscriptionType.Shared) .startMessageIdInclusive() .subscriptionInitialPosition(position) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java index 90d981c9126..91a4dcf5f17 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java @@ -151,7 +151,7 @@ private boolean createConsumer(PulsarClient client) { consumer = client.newConsumer(Schema.BYTES) .topic(topic.getTopic()) - .subscriptionName(context.getConfig().getSortTaskId()) + .subscriptionName(context.getConfig().getSubscription()) .subscriptionType(SubscriptionType.Shared) .startMessageIdInclusive() .subscriptionInitialPosition(position) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java index d5792d849b4..ae30d1c3d2c 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java @@ -71,7 +71,7 @@ public boolean init() { TubeClientConfig tubeClientConfig = tubeConsumerCreator.getTubeClientConfig(); try { ConsumerConfig consumerConfig = new ConsumerConfig(tubeClientConfig.getMasterInfo(), - context.getConfig().getSortTaskId()); + context.getConfig().getSubscription()); messageConsumer = tubeConsumerCreator.getMessageSessionFactory().createPullConsumer(consumerConfig); if (messageConsumer != null) {