Skip to content

Commit

Permalink
[INLONG-11537][Sort] Optimize the session key generation of TubeMQ So…
Browse files Browse the repository at this point in the history
…urce
  • Loading branch information
vernedeng committed Nov 25, 2024
1 parent 82128a3 commit 3645d99
Showing 1 changed file with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ public void open(Configuration parameters) throws Exception {
messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
messagePullConsumer.subscribe(topic, streamIdSet);
String jobId = getRuntimeContext().getJobId().toString();
messagePullConsumer.completeSubscribe(sessionKey.concat(jobId), numTasks, true, currentOffsets);
String attemptNumber = String.valueOf(getRuntimeContext().getAttemptNumber());
String startSessionKey = sessionKey.concat("_").concat(jobId).concat("_").concat(attemptNumber);
LOG.info("start to init tube mq consumer, session key={}", startSessionKey);
messagePullConsumer.completeSubscribe(startSessionKey, numTasks, true, currentOffsets);

running = true;
}
Expand Down

0 comments on commit 3645d99

Please sign in to comment.