Skip to content

Commit

Permalink
[da-vinci] Add a logline when waitAfterUnsubscribe timed out (#806)
Browse files Browse the repository at this point in the history
waitAfterUnsubscribe was introduced to make a best effort in moving records from the SharedConsumer into
the drainer queue for unsubscribe and it has a timeout value. This change adds a warning log to track
if any timed out event can actually happen.
  • Loading branch information
lluwm authored Dec 21, 2023
1 parent afffe7f commit 1a5b4ec
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -40,6 +40,7 @@
*/
class SharedKafkaConsumer implements PubSubConsumerAdapter {
private static final Logger LOGGER = LogManager.getLogger(SharedKafkaConsumer.class);
private long nextPollTimeOutSeconds = 10;

protected final PubSubConsumerAdapter delegate;

Expand Down Expand Up @@ -155,7 +156,7 @@ public synchronized void unSubscribe(PubSubTopicPartition pubSubTopicPartition)
this.delegate.unSubscribe(pubSubTopicPartition);
subscribedTopicPartitionToVersionTopic.remove(pubSubTopicPartition);
unsubscriptionListener.call(this, pubSubTopicPartition);
return 1;
return Collections.singleton(pubSubTopicPartition);
});
}

Expand All @@ -167,42 +168,47 @@ public synchronized void batchUnsubscribe(Set<PubSubTopicPartition> pubSubTopicP
subscribedTopicPartitionToVersionTopic.remove(pubSubTopicPartition);
unsubscriptionListener.call(this, pubSubTopicPartition);
}
return pubSubTopicPartitionSet.size();
return pubSubTopicPartitionSet;
});
}

/**
* This function encapsulates the logging, bookkeeping and required waiting period surrounding the action of
* unsubscribing some partition(s).
*
* @param action which performs the unsubscription and returns the number of partitions which were unsubscribed
* @param supplier which performs the unsubscription and returns a set of partitions which were unsubscribed
*/
protected synchronized void unSubscribeAction(IntSupplier action) {
protected synchronized void unSubscribeAction(Supplier<Set<PubSubTopicPartition>> supplier) {
long currentPollTimes = pollTimes;
long startTime = System.currentTimeMillis();
int numberOfUnsubbedPartitions = action.getAsInt();
Set<PubSubTopicPartition> topicPartitions = supplier.get();
long elapsedTime = System.currentTimeMillis() - startTime;

LOGGER.info(
"Shared consumer {} unsubscribed {} partition(s) in {} ms.",
"Shared consumer {} unsubscribed {} partition(s): ({}) in {} ms",
this.getClass().getSimpleName(),
numberOfUnsubbedPartitions,
topicPartitions.size(),
topicPartitions,
elapsedTime);
updateCurrentAssignment(delegate.getAssignment());
waitAfterUnsubscribe(currentPollTimes);
waitAfterUnsubscribe(currentPollTimes, topicPartitions);
}

protected void waitAfterUnsubscribe(long currentPollTimes) {
protected void waitAfterUnsubscribe(long currentPollTimes, Set<PubSubTopicPartition> topicPartitions) {
currentPollTimes++;
waitingForPoll.set(true);
// Wait for the next poll or maximum 10 seconds. Interestingly wait api does not provide any indication if wait
// returned
// due to timeout. So an explicit time check is necessary.
long timeoutMs = (time.getNanoseconds() / Time.NS_PER_MS) + (10 * Time.MS_PER_SECOND);
long timeoutMs = (time.getNanoseconds() / Time.NS_PER_MS) + nextPollTimeOutSeconds * Time.MS_PER_SECOND;
try {
while (currentPollTimes > pollTimes) {
long waitMs = timeoutMs - (time.getNanoseconds() / Time.NS_PER_MS);
if (waitMs <= 0) {
LOGGER.warn(
"Wait for poll request after unsubscribe topic partition(s) ({}) timed out after {} seconds",
topicPartitions,
nextPollTimeOutSeconds);
break;
}
wait(waitMs);
Expand All @@ -214,6 +220,16 @@ protected void waitAfterUnsubscribe(long currentPollTimes) {
}
}

// Only for testing.
void setNextPollTimeoutSeconds(long nextPollTimeOutSeconds) {
this.nextPollTimeOutSeconds = nextPollTimeOutSeconds;
}

// Only for testing.
long getPollTimes() {
return pollTimes;
}

@Override
public synchronized void resetOffset(PubSubTopicPartition pubSubTopicPartition)
throws PubSubUnsubscribedTopicPartitionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.SystemTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand All @@ -31,6 +34,9 @@ public class SharedKafkaConsumerTest {
protected KafkaConsumerServiceStats consumerServiceStats;

protected PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
private SharedKafkaConsumer sharedKafkaConsumer;
private Set<PubSubTopicPartition> topicPartitions;
private PubSubConsumerAdapter consumerAdapter;

@BeforeMethod
public void setUp() {
Expand Down Expand Up @@ -64,4 +70,35 @@ public void testSubscriptionEmptyPoll() {
sharedConsumer.poll(1000);
verify(consumer, times(1)).poll(1000);
}

private void setUpSharedConsumer() {
consumerAdapter = mock(PubSubConsumerAdapter.class);
KafkaConsumerServiceStats stats = mock(KafkaConsumerServiceStats.class);
Runnable assignmentChangeListener = mock(Runnable.class);
SharedKafkaConsumer.UnsubscriptionListener unsubscriptionListener =
mock(SharedKafkaConsumer.UnsubscriptionListener.class);

sharedKafkaConsumer = new SharedKafkaConsumer(
consumerAdapter,
stats,
assignmentChangeListener,
unsubscriptionListener,
new SystemTime());
topicPartitions = new HashSet<>();
topicPartitions.add(mock(PubSubTopicPartition.class));
}

@Test
public void testWaitAfterUnsubscribe() {
setUpSharedConsumer();
Supplier<Set<PubSubTopicPartition>> supplier = () -> topicPartitions;

long poolTimesBeforeUnsubscribe = sharedKafkaConsumer.getPollTimes();
sharedKafkaConsumer.setNextPollTimeoutSeconds(1);
sharedKafkaConsumer.unSubscribeAction(supplier);

// This is to test that if the poll time is not incremented when the consumer is unsubscribed the correct log can
// be found in the logs.
Assert.assertEquals(poolTimesBeforeUnsubscribe, sharedKafkaConsumer.getPollTimes());
}
}

0 comments on commit 1a5b4ec

Please sign in to comment.