Skip to content

Commit

Permalink
resolve #1645 | refactor stop method not to be called twice when stop…
Browse files Browse the repository at this point in the history
…ping the subscription
  • Loading branch information
pkasperowicz committed Oct 26, 2023
1 parent 7ec469d commit f3e814b
Showing 1 changed file with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,7 @@ public void run() {
} finally {
logger.info("Releasing consumer process thread of subscription {}", getSubscriptionName());
refreshHealthcheck();
try {
stop();
} catch (Exception exceptionWhileStopping) {
logger.error("An error occurred while stopping consumer process of subscription {}",
getSubscriptionName(), exceptionWhileStopping);
} finally {
onConsumerStopped.accept(getSubscriptionName());
Thread.currentThread().setName("consumer-released-thread");
}
stop();
}
}

Expand Down Expand Up @@ -115,7 +107,6 @@ private void process(Signal signal) {
break;
case STOP:
logger.info("Stopping main loop for consumer {}. {}", signal.getTarget(), signal.getLogWithIdAndType());
this.running = false;
stop();
break;
case RETRANSMIT:
Expand Down Expand Up @@ -154,12 +145,24 @@ private void start(Signal signal) {
}

private void stop() {
long startTime = clock.millis();
logger.info("Stopping consumer for subscription {}", getSubscriptionName());
if (!running) {
return;
}
this.running = false;
try {
long startTime = clock.millis();
logger.info("Stopping consumer for subscription {}", getSubscriptionName());

consumer.tearDown();
consumer.tearDown();

logger.info("Stopped consumer for subscription {} in {}ms", getSubscriptionName(), clock.millis() - startTime);
logger.info("Stopped consumer for subscription {} in {}ms", getSubscriptionName(), clock.millis() - startTime);
} catch (Exception exceptionWhileStopping) {
logger.error("An error occurred while stopping consumer process of subscription {}",
getSubscriptionName(), exceptionWhileStopping);
} finally {
onConsumerStopped.accept(getSubscriptionName());
Thread.currentThread().setName("consumer-released-thread");
}
}

private void retransmit(Signal signal) {
Expand Down

0 comments on commit f3e814b

Please sign in to comment.