Skip to content

Commit

Permalink
[INLONG-11537][Sort] Optimize the session key generation of TubeMQ So…
Browse files Browse the repository at this point in the history
…urce (#11538)
  • Loading branch information
vernedeng authored Nov 26, 2024
1 parent ace3362 commit bcace69
Showing 1 changed file with 6 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T>
private static final Logger LOG = LoggerFactory.getLogger(FlinkTubeMQConsumer.class);
private static final String TUBE_OFFSET_STATE = "tube-offset-state";

private static final String UNDERSCORE = "_";

/**
* The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715.
*/
Expand Down Expand Up @@ -221,7 +223,10 @@ public void open(Configuration parameters) throws Exception {
messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
messagePullConsumer.subscribe(topic, streamIdSet);
String jobId = getRuntimeContext().getJobId().toString();
messagePullConsumer.completeSubscribe(sessionKey.concat(jobId), numTasks, true, currentOffsets);
String attemptNumber = String.valueOf(getRuntimeContext().getAttemptNumber());
String startSessionKey = sessionKey.concat(UNDERSCORE).concat(jobId).concat(UNDERSCORE).concat(attemptNumber);
LOG.info("start to init tube mq consumer, session key={}", startSessionKey);
messagePullConsumer.completeSubscribe(startSessionKey, numTasks, true, currentOffsets);

running = true;
}
Expand Down

0 comments on commit bcace69

Please sign in to comment.