From 70edf835ee927bd9fad2034d3db78b6288a40880 Mon Sep 17 00:00:00 2001 From: Sushant Mane Date: Wed, 18 Oct 2023 12:30:06 -0700 Subject: [PATCH] [test] Fix TestDeleteStoreDeletesRealtimeTopic for correctness (#701) This commit addresses a potential issue with resource management in `TestDeleteStoreDeletesRealtimeTopic`. The previous code prematurely closed the `TopicManagerRepository` within a try-with-resources block, causing early closure of internal topic managers and its associated PubSub clients. To resolve this, we close the `TopicManagerRepository` only when its local topic manager is no longer required. This change ensures proper resource management. --- .../TestDeleteStoreDeletesRealtimeTopic.java | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestDeleteStoreDeletesRealtimeTopic.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestDeleteStoreDeletesRealtimeTopic.java index 49f65bc64a..ebde83d1fc 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestDeleteStoreDeletesRealtimeTopic.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestDeleteStoreDeletesRealtimeTopic.java @@ -5,6 +5,7 @@ import static com.linkedin.venice.utils.IntegrationTestPushUtils.makeStoreHybrid; import static com.linkedin.venice.utils.IntegrationTestPushUtils.sendStreamingRecord; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import com.linkedin.venice.client.store.AvroGenericStoreClient; import com.linkedin.venice.client.store.ClientConfig; @@ -15,7 +16,6 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; -import com.linkedin.venice.kafka.TopicManager; import com.linkedin.venice.kafka.TopicManagerRepository; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -42,26 +42,22 @@ public class TestDeleteStoreDeletesRealtimeTopic { private VeniceClusterWrapper venice = null; private AvroGenericStoreClient client = null; private ControllerClient controllerClient = null; - private TopicManager topicManager = null; + private TopicManagerRepository topicManagerRepository = null; private String storeName = null; - private PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); @BeforeClass public void setUp() { venice = ServiceFactory.getVeniceCluster(); controllerClient = ControllerClient.constructClusterControllerClient(venice.getClusterName(), venice.getRandomRouterURL()); - - try (TopicManagerRepository topicManagerRepository = IntegrationTestPushUtils.getTopicManagerRepo( + topicManagerRepository = IntegrationTestPushUtils.getTopicManagerRepo( DEFAULT_KAFKA_OPERATION_TIMEOUT_MS, 100, 0l, venice.getPubSubBrokerWrapper(), - pubSubTopicRepository)) { - topicManager = topicManagerRepository.getTopicManager(); - } - + pubSubTopicRepository); storeName = Utils.getUniqueString("hybrid-store"); venice.getNewStore(storeName); makeStoreHybrid(venice, storeName, 100L, 5L); @@ -71,7 +67,7 @@ public void setUp() { @AfterClass public void cleanUp() { - Utils.closeQuietlyWithErrorLogged(topicManager); + Utils.closeQuietlyWithErrorLogged(topicManagerRepository); Utils.closeQuietlyWithErrorLogged(client); Utils.closeQuietlyWithErrorLogged(venice); Utils.closeQuietlyWithErrorLogged(controllerClient); @@ -109,7 +105,7 @@ public void deletingHybridStoreDeletesRealtimeTopic() { // verify realtime topic exists PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); - Assert.assertTrue(topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)); + assertTrue(topicManagerRepository.getTopicManager().containsTopicAndAllPartitionsAreOnline(rtTopic)); // disable store TestUtils.assertCommand( @@ -130,11 +126,11 @@ public void deletingHybridStoreDeletesRealtimeTopic() { // verify realtime topic does not exist PubSubTopic realTimeTopicName = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); try { - boolean isTruncated = topicManager.isTopicTruncated(realTimeTopicName, 60000); - Assert.assertTrue( + boolean isTruncated = topicManagerRepository.getTopicManager().isTopicTruncated(realTimeTopicName, 60000); + assertTrue( isTruncated, "Real-time buffer topic should be truncated: " + realTimeTopicName + " but retention is set to: " - + topicManager.getTopicRetention(realTimeTopicName) + "."); + + topicManagerRepository.getTopicManager().getTopicRetention(realTimeTopicName) + "."); LOGGER.info("Confirmed truncation of real-time topic: {}", realTimeTopicName); } catch (PubSubTopicDoesNotExistException e) { LOGGER