From bcace693b1e53aac32b4f5d69539fa064905069e Mon Sep 17 00:00:00 2001 From: vernedeng Date: Tue, 26 Nov 2024 10:21:54 +0800 Subject: [PATCH] [INLONG-11537][Sort] Optimize the session key generation of TubeMQ Source (#11538) --- .../org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java index abd69f8ecb7..4cbd285f175 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java @@ -69,6 +69,8 @@ public class FlinkTubeMQConsumer extends RichParallelSourceFunction private static final Logger LOG = LoggerFactory.getLogger(FlinkTubeMQConsumer.class); private static final String TUBE_OFFSET_STATE = "tube-offset-state"; + private static final String UNDERSCORE = "_"; + /** * The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715. */ @@ -221,7 +223,10 @@ public void open(Configuration parameters) throws Exception { messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig); messagePullConsumer.subscribe(topic, streamIdSet); String jobId = getRuntimeContext().getJobId().toString(); - messagePullConsumer.completeSubscribe(sessionKey.concat(jobId), numTasks, true, currentOffsets); + String attemptNumber = String.valueOf(getRuntimeContext().getAttemptNumber()); + String startSessionKey = sessionKey.concat(UNDERSCORE).concat(jobId).concat(UNDERSCORE).concat(attemptNumber); + LOG.info("start to init tube mq consumer, session key={}", startSessionKey); + messagePullConsumer.completeSubscribe(startSessionKey, numTasks, true, currentOffsets); running = true; }