Skip to content

Commit

Permalink
[changelog] Clean up consumer state on changelog client (#1398)
Browse files Browse the repository at this point in the history
* [chanelog] Clean up consumer state on changelog client

And other protections like subscribing to an empty set of partitions.
  • Loading branch information
ZacAttack authored Dec 17, 2024
1 parent ed335b8 commit 6771f7d
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,17 @@ public Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> poll

@Override
public CompletableFuture<Void> seekToTimestamps(Map<Integer, Long> timestamps) {
if (timestamps.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return internalSeekToTimestamps(timestamps, "");
}

@Override
public CompletableFuture<Void> subscribe(Set<Integer> partitions) {
if (partitions.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
if (!versionSwapThreadScheduled.get()) {
// schedule the version swap thread and set up the callback listener
this.storeRepository.registerStoreDataChangedListener(versionSwapListener);
Expand All @@ -101,18 +107,25 @@ public CompletableFuture<Void> subscribe(Set<Integer> partitions) {

@Override
public CompletableFuture<Void> seekToTail(Set<Integer> partitions) {
if (partitions.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return internalSeekToTail(partitions, "");
}

@Override
public CompletableFuture<Void> seekToEndOfPush(Set<Integer> partitions) {
if (partitions.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return CompletableFuture.supplyAsync(() -> {
synchronized (internalSeekConsumer) {
try {
// TODO: This implementation basically just scans the version topic until it finds the EOP message. The
// approach
// we'd like to do is instead add the offset of the EOP message in the VT, and then just seek to that offset.
// We'll do that in a future patch.
internalSeekConsumer.get().unsubscribeAll();
internalSeekConsumer.get().subscribe(partitions).get();

// We need to get the internal consumer as we have to intercept the control messages that we would normally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ protected CompletableFuture<Void> internalSubscribe(Set<Integer> partitions, Pub

synchronized (pubSubConsumer) {
Set<PubSubTopicPartition> topicPartitionSet = getTopicAssignment();
for (PubSubTopicPartition topicPartition: pubSubConsumer.getAssignment()) {
for (PubSubTopicPartition topicPartition: topicPartitionSet) {
if (partitions.contains(topicPartition.getPartitionNumber())) {
pubSubConsumer.unSubscribe(topicPartition);
}
Expand Down Expand Up @@ -549,6 +549,9 @@ private List<PubSubTopicPartition> getPartitionListToSubscribe(

@Override
public void unsubscribe(Set<Integer> partitions) {
if (partitions.isEmpty()) {
return;
}
synchronized (pubSubConsumer) {
Set<PubSubTopicPartition> topicPartitionSet = getTopicAssignment();
Set<PubSubTopicPartition> topicPartitionsToUnsub = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public void handleStoreChanged(Store store) {
}
}

if (partitions.isEmpty()) {
return;
}

LOGGER.info(
"New Version detected! Seeking consumer to version: " + currentVersion + " in consumer: " + consumerName);
this.consumer.seekToEndOfPush(partitions).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ public void testAfterImageConsumerSeek() throws ExecutionException, InterruptedE

Set<Integer> partitionSet = new HashSet<>();
partitionSet.add(0);

// try and do it with an empty set, this should complete immediately
veniceChangelogConsumer.seekToEndOfPush(Collections.emptySet()).get();
Mockito.verifyNoInteractions(mockInternalSeekConsumer);

veniceChangelogConsumer.seekToEndOfPush(partitionSet).get();

Mockito.verify(mockInternalSeekConsumer).subscribe(partitionSet);
Expand Down

0 comments on commit 6771f7d

Please sign in to comment.