diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java index ed2eddce7a..f016dc829f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java @@ -81,11 +81,17 @@ public Collection, VeniceChangeCoordinate>> poll @Override public CompletableFuture seekToTimestamps(Map timestamps) { + if (timestamps.isEmpty()) { + return CompletableFuture.completedFuture(null); + } return internalSeekToTimestamps(timestamps, ""); } @Override public CompletableFuture subscribe(Set partitions) { + if (partitions.isEmpty()) { + return CompletableFuture.completedFuture(null); + } if (!versionSwapThreadScheduled.get()) { // schedule the version swap thread and set up the callback listener this.storeRepository.registerStoreDataChangedListener(versionSwapListener); @@ -101,11 +107,17 @@ public CompletableFuture subscribe(Set partitions) { @Override public CompletableFuture seekToTail(Set partitions) { + if (partitions.isEmpty()) { + return CompletableFuture.completedFuture(null); + } return internalSeekToTail(partitions, ""); } @Override public CompletableFuture seekToEndOfPush(Set partitions) { + if (partitions.isEmpty()) { + return CompletableFuture.completedFuture(null); + } return CompletableFuture.supplyAsync(() -> { synchronized (internalSeekConsumer) { try { @@ -113,6 +125,7 @@ public CompletableFuture seekToEndOfPush(Set partitions) { // approach // we'd like to do is instead add the offset of the EOP message in the VT, and then just seek to that offset. // We'll do that in a future patch. + internalSeekConsumer.get().unsubscribeAll(); internalSeekConsumer.get().subscribe(partitions).get(); // We need to get the internal consumer as we have to intercept the control messages that we would normally diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java index 97b6ba9d6e..30ce573959 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java @@ -239,7 +239,7 @@ protected CompletableFuture internalSubscribe(Set partitions, Pub synchronized (pubSubConsumer) { Set topicPartitionSet = getTopicAssignment(); - for (PubSubTopicPartition topicPartition: pubSubConsumer.getAssignment()) { + for (PubSubTopicPartition topicPartition: topicPartitionSet) { if (partitions.contains(topicPartition.getPartitionNumber())) { pubSubConsumer.unSubscribe(topicPartition); } @@ -549,6 +549,9 @@ private List getPartitionListToSubscribe( @Override public void unsubscribe(Set partitions) { + if (partitions.isEmpty()) { + return; + } synchronized (pubSubConsumer) { Set topicPartitionSet = getTopicAssignment(); Set topicPartitionsToUnsub = new HashSet<>(); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapDataChangeListener.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapDataChangeListener.java index d8b3ba0516..6f8d24fb95 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapDataChangeListener.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapDataChangeListener.java @@ -56,6 +56,10 @@ public void handleStoreChanged(Store store) { } } + if (partitions.isEmpty()) { + return; + } + LOGGER.info( "New Version detected! Seeking consumer to version: " + currentVersion + " in consumer: " + consumerName); this.consumer.seekToEndOfPush(partitions).get(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java index e28a9f414f..c29da42aaa 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java @@ -229,6 +229,11 @@ public void testAfterImageConsumerSeek() throws ExecutionException, InterruptedE Set partitionSet = new HashSet<>(); partitionSet.add(0); + + // try and do it with an empty set, this should complete immediately + veniceChangelogConsumer.seekToEndOfPush(Collections.emptySet()).get(); + Mockito.verifyNoInteractions(mockInternalSeekConsumer); + veniceChangelogConsumer.seekToEndOfPush(partitionSet).get(); Mockito.verify(mockInternalSeekConsumer).subscribe(partitionSet);