diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 494a73c0aba..141af20a368 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -252,7 +252,7 @@ public DaVinciBackend( .map(objectCacheConfig -> new ObjectCacheBackend(clientConfig, objectCacheConfig, schemaRepository)); ingestionService = new KafkaStoreIngestionService( - storageService.getStorageEngineRepository(), + storageService, configLoader, storageMetadataService, clusterInfoProvider, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java index a43ed1decf1..79b8c9959fb 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java @@ -145,8 +145,7 @@ public void dropStoragePartitionGracefully( final int waitIntervalInSecond = 1; final int maxRetry = timeoutInSeconds / waitIntervalInSecond; getStoreIngestionService().stopConsumptionAndWait(storeConfig, partition, waitIntervalInSecond, maxRetry, true); - // Drops corresponding data partition from storage. - this.storageService.dropStorePartition(storeConfig, partition, removeEmptyStorageEngine); + getStoreIngestionService().dropStoragePartitionGracefully(storeConfig, partition); } @Override diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java index 2456fcfe310..b232b77fbda 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java @@ -695,7 +695,7 @@ private void initializeIsolatedIngestionServer() { // Create KafkaStoreIngestionService storeIngestionService = new KafkaStoreIngestionService( - storageService.getStorageEngineRepository(), + storageService, configLoader, storageMetadataService, clusterInfoProvider, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index e8d6e8869b7..1ceaa95b653 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -15,6 +15,7 @@ import com.linkedin.davinci.replication.merge.RmdSerDe; import com.linkedin.davinci.replication.merge.StringAnnotatedStoreSchemaCache; import com.linkedin.davinci.stats.AggVersionedIngestionStats; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer; import com.linkedin.davinci.storage.chunking.RawBytesChunkingAdapter; import com.linkedin.davinci.storage.chunking.SingleGetChunkingAdapter; @@ -103,6 +104,7 @@ private static class ReusableObjects { private final ThreadLocal threadLocalReusableObjects = ThreadLocal.withInitial(ReusableObjects::new); public ActiveActiveStoreIngestionTask( + StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, @@ -114,6 +116,7 @@ public ActiveActiveStoreIngestionTask( Optional cacheBackend, DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { super( + storageService, builder, store, version, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumerActionType.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumerActionType.java index 398191044d3..8aaca3c07f4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumerActionType.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumerActionType.java @@ -4,7 +4,7 @@ * An Enum enumerating all valid types of {@link ConsumerAction}. */ public enum ConsumerActionType { - SUBSCRIBE(1), UNSUBSCRIBE(1), RESET_OFFSET(1), PAUSE(1), RESUME(1), + SUBSCRIBE(1), UNSUBSCRIBE(1), RESET_OFFSET(1), PAUSE(1), RESUME(1), DROP_PARTITION(1), /** * KILL action has higher priority than others, so that once KILL action is added to the action queue, * we will process it immediately to avoid doing throw-away works. diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index ba3bb10c1a4..5734943356d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -29,8 +29,8 @@ import com.linkedin.davinci.stats.AggVersionedIngestionStats; import com.linkedin.davinci.stats.ParticipantStoreConsumptionStats; import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; -import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; import com.linkedin.davinci.store.view.VeniceViewWriterFactory; import com.linkedin.venice.SSLConfig; @@ -130,6 +130,7 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements private static final String GROUP_ID_FORMAT = "%s_%s"; private static final Logger LOGGER = LogManager.getLogger(KafkaStoreIngestionService.class); + private final StorageService storageService; private final VeniceConfigLoader veniceConfigLoader; @@ -190,7 +191,7 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements private final ExecutorService aaWCWorkLoadProcessingThreadPool; public KafkaStoreIngestionService( - StorageEngineRepository storageEngineRepository, + StorageService storageService, VeniceConfigLoader veniceConfigLoader, StorageMetadataService storageMetadataService, ClusterInfoProvider clusterInfoProvider, @@ -212,6 +213,7 @@ public KafkaStoreIngestionService( PubSubClientsFactory pubSubClientsFactory, Optional sslFactory, HeartbeatMonitoringService heartbeatMonitoringService) { + this.storageService = storageService; this.cacheBackend = cacheBackend; this.recordTransformerFunction = recordTransformerFunction; this.storageMetadataService = storageMetadataService; @@ -448,7 +450,7 @@ public void handleStoreDeleted(Store store) { ingestionTaskFactory = StoreIngestionTaskFactory.builder() .setVeniceWriterFactory(veniceWriterFactory) - .setStorageEngineRepository(storageEngineRepository) + .setStorageEngineRepository(storageService.getStorageEngineRepository()) .setStorageMetadataService(storageMetadataService) .setLeaderFollowerNotifiersQueue(leaderFollowerNotifiers) .setSchemaRepository(schemaRepo) @@ -519,6 +521,7 @@ private StoreIngestionTask createStoreIngestionTask( }; return ingestionTaskFactory.getNewIngestionTask( + storageService, store, version, getKafkaConsumerProperties(veniceStoreVersionConfig), @@ -920,6 +923,30 @@ public void stopConsumptionAndWait( } } + /** + * Drops the corresponding Venice Partition gracefully. + * This should only be called after {@link #stopConsumptionAndWait} has been called + * @param veniceStore Venice Store for the partition. + * @param partitionId Venice partition's id. + */ + public void dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStore, int partitionId) { + final String topic = veniceStore.getStoreVersionName(); + + try (AutoCloseableLock ignore = topicLockManager.getLockForResource(topic)) { + StoreIngestionTask ingestionTask = topicNameToIngestionTaskMap.get(topic); + if (ingestionTask != null) { + ingestionTask.dropStoragePartitionGracefully( + new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), partitionId)); + } else { + LOGGER.info( + "Ingestion task for Topic {} is null. Dropping partition {} synchronously", + veniceStore.getStoreVersionName(), + partitionId); + this.storageService.dropStorePartition(veniceStore, partitionId, true); + } + } + } + /** * This function will try to kill the ingestion tasks belonging to non-current versions. * And this is mainly being used by memory limiter feature to free up resources when encountering memory diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index cc8501f4408..9e0415d8755 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -23,6 +23,7 @@ import com.linkedin.davinci.schema.merge.CollectionTimestampMergeRecordHelper; import com.linkedin.davinci.schema.merge.MergeRecordHelper; import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer; import com.linkedin.davinci.storage.chunking.GenericRecordChunkingAdapter; import com.linkedin.davinci.store.AbstractStorageEngine; @@ -204,6 +205,7 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask { private final Version version; public LeaderFollowerStoreIngestionTask( + StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, @@ -215,6 +217,7 @@ public LeaderFollowerStoreIngestionTask( Optional cacheBackend, DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { super( + storageService, builder, store, version, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index cf43fbebb94..1e91f8b31e7 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -2,6 +2,7 @@ import static com.linkedin.davinci.ingestion.LagType.OFFSET_LAG; import static com.linkedin.davinci.ingestion.LagType.TIME_LAG; +import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.DROP_PARTITION; import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.RESET_OFFSET; import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.SUBSCRIBE; import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.UNSUBSCRIBE; @@ -34,6 +35,7 @@ import com.linkedin.davinci.stats.HostLevelIngestionStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.StoragePartitionConfig; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; @@ -189,6 +191,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); /** storage destination for consumption */ + protected final StorageService storageService; protected final StorageEngineRepository storageEngineRepository; protected final AbstractStorageEngine storageEngine; @@ -344,6 +347,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final ExecutorService parallelProcessingThreadPool; public StoreIngestionTask( + StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, @@ -361,6 +365,7 @@ public StoreIngestionTask( this.databaseSyncBytesIntervalForTransactionalMode = storeConfig.getDatabaseSyncBytesIntervalForTransactionalMode(); this.databaseSyncBytesIntervalForDeferredWriteMode = storeConfig.getDatabaseSyncBytesIntervalForDeferredWriteMode(); this.kafkaProps = kafkaConsumerProperties; + this.storageService = storageService; this.storageEngineRepository = builder.getStorageEngineRepository(); this.storageMetadataService = builder.getStorageMetadataService(); this.storeRepository = builder.getMetadataRepo(); @@ -632,6 +637,41 @@ public synchronized CompletableFuture unSubscribePartition( return consumerAction.getFuture(); } + /** + * Drops a storage partition gracefully. + * This is always a Helix triggered action. + */ + public void dropStoragePartitionGracefully(PubSubTopicPartition topicPartition) { + int partitionId = topicPartition.getPartitionNumber(); + synchronized (this) { + if (isRunning()) { + LOGGER.info( + "Ingestion task is still running for Topic {}. Dropping partition {} asynchronously", + topicPartition.getTopicName(), + partitionId); + ConsumerAction consumerAction = new ConsumerAction(DROP_PARTITION, topicPartition, nextSeqNum(), true); + consumerActionsQueue.add(consumerAction); + return; + } + } + + LOGGER.info( + "Ingestion task isn't running for Topic {}. Dropping partition {} synchronously", + topicPartition.getTopicName(), + partitionId); + dropPartitionSynchronously(topicPartition); + } + + /** + * Drops a partition synchrnously. This is invoked when processing a DROP_PARTITION message. + */ + private void dropPartitionSynchronously(PubSubTopicPartition topicPartition) { + LOGGER.info("{} Dropping partition: {}", ingestionTaskName, topicPartition); + int partition = topicPartition.getPartitionNumber(); + this.storageService.dropStorePartition(storeConfig, partition, true); + LOGGER.info("{} Dropped partition: {}", ingestionTaskName, topicPartition); + } + public boolean hasAnySubscription() { return !partitionConsumptionStateMap.isEmpty(); } @@ -1732,9 +1772,15 @@ private void reportError( } private void internalClose(boolean doFlush) { + // Set isRunning to false to prevent messages being added after we've already looped through consumerActionsQueue. + // Wrapping in synchronized to prevent a race condition on methods reading the value of isRunning. + synchronized (this) { + getIsRunning().set(false); + } + this.missingSOPCheckExecutor.shutdownNow(); - // Only reset Offset Messages are important, subscribe/unsubscribe will be handled + // Only reset Offset and Drop Partition Messages are important, subscribe/unsubscribe will be handled // on the restart by Helix Controller notifications on the new StoreIngestionTask. try { this.storeRepository.unregisterStoreDataChangedListener(this.storageUtilizationManager); @@ -1746,12 +1792,16 @@ private void internalClose(boolean doFlush) { if (opType == ConsumerActionType.RESET_OFFSET) { LOGGER.info("Cleanup Reset OffSet. Replica: {}", replica); storageMetadataService.clearOffset(topic, partition); + } else if (opType == DROP_PARTITION) { + PubSubTopicPartition topicPartition = message.getTopicPartition(); + LOGGER.info("Processing DROP_PARTITION message for {} in internalClose", topicPartition); + dropPartitionSynchronously(topicPartition); } else { LOGGER.info("Cleanup ignoring the Message: {} Replica: {}", message, replica); } } } catch (Exception e) { - LOGGER.error("{} Error while resetting offset.", ingestionTaskName, e); + LOGGER.error("{} Error while handling message in internalClose", ingestionTaskName, e); } // Unsubscribe any topic partitions related to this version topic from the shared consumer. aggKafkaConsumerService.unsubscribeAll(versionTopic); @@ -2137,6 +2187,9 @@ protected void processCommonConsumerAction(ConsumerAction consumerAction) throws LOGGER.info("Kill this consumer task for Topic: {}", topic); // Throw the exception here to break the consumption loop, and then this task is marked as error status. throw new VeniceIngestionTaskKilledException(KILLED_JOB_MESSAGE + topic); + case DROP_PARTITION: + dropPartitionSynchronously(topicPartition); + break; default: throw new UnsupportedOperationException(operation.name() + " is not supported in " + getClass().getName()); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java index 6a48231c3c9..4c1ef625fac 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java @@ -11,6 +11,7 @@ import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; import com.linkedin.davinci.store.view.VeniceViewWriterFactory; import com.linkedin.venice.kafka.protocol.state.PartitionState; @@ -43,6 +44,7 @@ private StoreIngestionTaskFactory(Builder builder) { } public StoreIngestionTask getNewIngestionTask( + StorageService storageService, Store store, Version version, Properties kafkaConsumerProperties, @@ -54,6 +56,7 @@ public StoreIngestionTask getNewIngestionTask( DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { if (version.isActiveActiveReplicationEnabled()) { return new ActiveActiveStoreIngestionTask( + storageService, builder, store, version, @@ -66,6 +69,7 @@ public StoreIngestionTask getNewIngestionTask( recordTransformerFunction); } return new LeaderFollowerStoreIngestionTask( + storageService, builder, store, version, diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java index fa8e79576af..3311fc95c99 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java @@ -28,6 +28,7 @@ import com.linkedin.davinci.stats.AggVersionedIngestionStats; import com.linkedin.davinci.stats.HostLevelIngestionStats; import com.linkedin.davinci.storage.StorageEngineRepository; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer; import com.linkedin.davinci.storage.chunking.ChunkingUtils; import com.linkedin.davinci.store.AbstractStorageEngine; @@ -231,6 +232,7 @@ public void testisReadyToServeAnnouncedWithRTLag() { Version mockVersion = new VersionImpl(STORE_NAME, 1, PUSH_JOB_ID); mockVersion.setHybridStoreConfig(hybridStoreConfig); + StorageService storageService = mock(StorageService.class); Store store = new ZKStore( STORE_NAME, "Felix", @@ -250,6 +252,7 @@ public void testisReadyToServeAnnouncedWithRTLag() { VeniceStoreVersionConfig storeVersionConfig = new VeniceStoreVersionConfig(STORE_NAME + "_v1", new VeniceProperties(kafkaConsumerProperties)); ActiveActiveStoreIngestionTask ingestionTask = new ActiveActiveStoreIngestionTask( + storageService, builder, store, mockVersion, diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java index 65ca79333e6..0eb6a644138 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java @@ -5,12 +5,14 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -22,6 +24,7 @@ import com.linkedin.davinci.config.VeniceStoreVersionConfig; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.AbstractStorageEngineTest; import com.linkedin.venice.exceptions.VeniceNoStoreException; @@ -52,9 +55,11 @@ import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.Pair; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.locks.ResourceAutoClosableLockManager; import io.tehuti.metrics.MetricsRepository; import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; import it.unimi.dsi.fastutil.objects.Object2IntMaps; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.NavigableMap; @@ -62,6 +67,8 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import org.apache.avro.Schema; import org.mockito.Mockito; @@ -72,6 +79,7 @@ @Test public abstract class KafkaStoreIngestionServiceTest { + private StorageService mockStorageService; private StorageEngineRepository mockStorageEngineRepository; private VeniceConfigLoader mockVeniceConfigLoader; private StorageMetadataService storageMetadataService; @@ -88,7 +96,9 @@ public abstract class KafkaStoreIngestionServiceTest { @BeforeClass public void setUp() { + mockStorageService = mock(StorageService.class); mockStorageEngineRepository = mock(StorageEngineRepository.class); + when(mockStorageService.getStorageEngineRepository()).thenReturn(mockStorageEngineRepository); doReturn(mock(AbstractStorageEngine.class)).when(mockStorageEngineRepository).getLocalStorageEngine(anyString()); storageMetadataService = mock(StorageMetadataService.class); mockClusterInfoProvider = mock(ClusterInfoProvider.class); @@ -149,7 +159,7 @@ private void setupMockConfig() { @Test public void testDisableMetricsEmission() { kafkaStoreIngestionService = new KafkaStoreIngestionService( - mockStorageEngineRepository, + mockStorageService, mockVeniceConfigLoader, storageMetadataService, mockClusterInfoProvider, @@ -233,7 +243,7 @@ public void testGetIngestingTopicsNotWithOnlineVersion() { // Without starting the ingestion service test getIngestingTopicsWithVersionStatusNotOnline would return the correct // topics under different scenarios. kafkaStoreIngestionService = new KafkaStoreIngestionService( - mockStorageEngineRepository, + mockStorageService, mockVeniceConfigLoader, storageMetadataService, mockClusterInfoProvider, @@ -321,7 +331,7 @@ public void testGetIngestingTopicsNotWithOnlineVersion() { @Test public void testCloseStoreIngestionTask() { kafkaStoreIngestionService = new KafkaStoreIngestionService( - mockStorageEngineRepository, + mockStorageService, mockVeniceConfigLoader, storageMetadataService, mockClusterInfoProvider, @@ -386,7 +396,7 @@ public void testCloseStoreIngestionTask() { @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) public void testStoreIngestionTaskShutdownLastPartition(boolean isIsolatedIngestion) { kafkaStoreIngestionService = new KafkaStoreIngestionService( - mockStorageEngineRepository, + mockStorageService, mockVeniceConfigLoader, storageMetadataService, mockClusterInfoProvider, @@ -489,4 +499,62 @@ public void testHasCurrentVersionBootstrapping() { assertTrue(KafkaStoreIngestionService.hasCurrentVersionBootstrapping(mapContainsCurrentBootstrappingTask)); } + + @Test + public void testDropStoragePartitionGracefully() throws NoSuchFieldException, IllegalAccessException { + kafkaStoreIngestionService = mock(KafkaStoreIngestionService.class); + String topicName = "test-store_v1"; + int partitionId = 0; + VeniceProperties veniceProperties = AbstractStorageEngineTest.getServerProperties(PersistenceType.ROCKS_DB); + VeniceStoreVersionConfig config = new VeniceStoreVersionConfig(topicName, veniceProperties); + doCallRealMethod().when(kafkaStoreIngestionService).dropStoragePartitionGracefully(config, partitionId); + + Field topicLockManagerField = kafkaStoreIngestionService.getClass().getDeclaredField("topicLockManager"); + topicLockManagerField.setAccessible(true); + topicLockManagerField.set(kafkaStoreIngestionService, new ResourceAutoClosableLockManager<>(ReentrantLock::new)); + + NavigableMap topicNameToIngestionTaskMap = mock(NavigableMap.class); + Field topicNameToIngestionTaskMapField = + kafkaStoreIngestionService.getClass().getDeclaredField("topicNameToIngestionTaskMap"); + topicNameToIngestionTaskMapField.setAccessible(true); + topicNameToIngestionTaskMapField.set(kafkaStoreIngestionService, topicNameToIngestionTaskMap); + + PubSubTopicRepository pubSubTopicRepository = mock(PubSubTopicRepository.class); + Field pubSubTopicRepositoryField = kafkaStoreIngestionService.getClass().getDeclaredField("pubSubTopicRepository"); + pubSubTopicRepositoryField.setAccessible(true); + pubSubTopicRepositoryField.set(kafkaStoreIngestionService, pubSubTopicRepository); + + StoreIngestionTask storeIngestionTask = mock(StoreIngestionTask.class); + + PriorityBlockingQueue consumerActionsQueue = mock(PriorityBlockingQueue.class); + Field consumerActionsQueueField = StoreIngestionTask.class.getDeclaredField("consumerActionsQueue"); + consumerActionsQueueField.setAccessible(true); + consumerActionsQueueField.set(storeIngestionTask, consumerActionsQueue); + + when(topicNameToIngestionTaskMap.get(topicName)).thenReturn(storeIngestionTask); + doCallRealMethod().when(storeIngestionTask).dropStoragePartitionGracefully(any()); + + PubSubTopic pubSubTopic = mock(PubSubTopic.class); + when(pubSubTopicRepository.getTopic(topicName)).thenReturn(pubSubTopic); + + StorageService storageService = mock(StorageService.class); + Field storageServiceField = StoreIngestionTask.class.getDeclaredField("storageService"); + storageServiceField.setAccessible(true); + storageServiceField.set(storeIngestionTask, storageService); + + Field storeConfigField = StoreIngestionTask.class.getDeclaredField("storeConfig"); + storeConfigField.setAccessible(true); + storeConfigField.set(storeIngestionTask, config); + + // Verify that when the ingestion task is running, it drops the store partition asynchronously + when(storeIngestionTask.isRunning()).thenReturn(true); + kafkaStoreIngestionService.dropStoragePartitionGracefully(config, partitionId); + verify(storeIngestionTask).dropStoragePartitionGracefully(any()); + verify(consumerActionsQueue).add(any()); + + // Verify that when the ingestion task isn't running, it drops the store partition synchronously + when(storeIngestionTask.isRunning()).thenReturn(false); + kafkaStoreIngestionService.dropStoragePartitionGracefully(config, partitionId); + verify(storageService).dropStorePartition(config, partitionId, true); + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PushTimeoutTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PushTimeoutTest.java index 966ea0906ee..9042bc28487 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PushTimeoutTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PushTimeoutTest.java @@ -12,6 +12,7 @@ import com.linkedin.davinci.stats.AggHostLevelIngestionStats; import com.linkedin.davinci.stats.HostLevelIngestionStats; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.venice.exceptions.VeniceTimeoutException; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; @@ -55,6 +56,7 @@ public void testPushTimeoutForLeaderFollowerStores() { .setHostLevelIngestionStats(mockAggStoreIngestionStats) .setPubSubTopicRepository(pubSubTopicRepository); + StorageService storageService = mock(StorageService.class); Store mockStore = builder.getMetadataRepo().getStoreOrThrow(storeName); Version version = mockStore.getVersion(versionNumber); @@ -66,6 +68,7 @@ public void testPushTimeoutForLeaderFollowerStores() { doReturn(versionTopic).when(mockVeniceStoreVersionConfig).getStoreVersionName(); LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask = new LeaderFollowerStoreIngestionTask( + storageService, builder, mockStore, version, @@ -113,6 +116,7 @@ public void testReportIfCatchUpBaseTopicOffsetRouteWillNotMakePushTimeout() { .setHostLevelIngestionStats(mockAggStoreIngestionStats) .setPubSubTopicRepository(pubSubTopicRepository); + StorageService storageService = mock(StorageService.class); Store mockStore = builder.getMetadataRepo().getStoreOrThrow(storeName); Version version = mockStore.getVersion(versionNumber); @@ -139,6 +143,7 @@ public void testReportIfCatchUpBaseTopicOffsetRouteWillNotMakePushTimeout() { doReturn(mockOffsetRecord).when(mockStorageMetadataService).getLastOffset(eq(versionTopic), eq(0)); LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask = new LeaderFollowerStoreIngestionTask( + storageService, builder, mockStore, version, diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 53e5b750893..7031b27e5d8 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -96,6 +96,7 @@ import com.linkedin.davinci.stats.KafkaConsumerServiceStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.AbstractStorageIterator; import com.linkedin.davinci.store.AbstractStoragePartition; @@ -818,6 +819,7 @@ private void runTest( true, aaConfig, storeVersionConfigOverride); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigsUnderTest.store; Version version = storeAndVersionConfigsUnderTest.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigsUnderTest.storeVersionConfig; @@ -835,6 +837,7 @@ private void runTest( storeIngestionTaskUnderTest = spy( ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -2780,6 +2783,7 @@ public void testRecordsCanBeThrottledPerRegion() throws ExecutionException, Inte false, false, AA_ON); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -2799,6 +2803,7 @@ public void testRecordsCanBeThrottledPerRegion() throws ExecutionException, Inte kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -2916,6 +2921,7 @@ public void testIsReadyToServe(NodeType nodeType, AAConfig aaConfig, DataReplica false, true, aaConfig); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -2947,6 +2953,7 @@ public void testIsReadyToServe(NodeType nodeType, AAConfig aaConfig, DataReplica kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -3140,6 +3147,7 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT false, true, AA_ON); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -3163,6 +3171,7 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT Properties kafkaProps = new Properties(); kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -3279,6 +3288,7 @@ public void testCheckAndLogIfLagIsAcceptableForHybridStore( false, true, aaConfig); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -3308,6 +3318,7 @@ public void testCheckAndLogIfLagIsAcceptableForHybridStore( Properties kafkaProps = new Properties(); kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -3444,6 +3455,7 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node false, true, AA_ON); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -3460,6 +3472,7 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); LeaderFollowerStoreIngestionTask ingestionTask = (LeaderFollowerStoreIngestionTask) ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -3539,6 +3552,7 @@ public void testProcessTopicSwitch(NodeType nodeType) { new HybridStoreConfigImpl(100, 100, 100, DataReplicationPolicy.AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP); MockStoreVersionConfigs storeAndVersionConfigs = setupStoreAndVersionMocks(2, partitionerConfig, Optional.of(hybridStoreConfig), false, true, AA_OFF); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -3555,6 +3569,7 @@ public void testProcessTopicSwitch(NodeType nodeType) { kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -3598,6 +3613,7 @@ public void testUpdateConsumedUpstreamRTOffsetMapDuringRTSubscription(AAConfig a doReturn(VersionStatus.STARTED).when(mockVersion).getStatus(); ReadOnlyStoreRepository mockReadOnlyStoreRepository = mock(ReadOnlyStoreRepository.class); + StorageService storageService = mock(StorageService.class); Store mockStore = mock(Store.class); doReturn(storeName).when(mockStore).getName(); doReturn(mockStore).when(mockReadOnlyStoreRepository).getStoreOrThrow(eq(storeName)); @@ -3627,6 +3643,7 @@ public void testUpdateConsumedUpstreamRTOffsetMapDuringRTSubscription(AAConfig a LeaderFollowerStoreIngestionTask ingestionTask = (LeaderFollowerStoreIngestionTask) ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, mockVersion, mockKafkaConsumerProperties, @@ -3732,6 +3749,7 @@ public void testLeaderShouldSubscribeToCorrectVTOffset() { doReturn("localhost").when(version).getPushStreamSourceAddress(); Store store = mock(Store.class); + StorageService storageService = mock(StorageService.class); doReturn(version).when(store).getVersion(eq(1)); String versionTopicName = "testStore_v1"; @@ -3740,6 +3758,7 @@ public void testLeaderShouldSubscribeToCorrectVTOffset() { doReturn(versionTopicName).when(storeConfig).getStoreVersionName(); LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask = spy( new LeaderFollowerStoreIngestionTask( + storageService, builder, store, version, @@ -4235,6 +4254,7 @@ public void testBatchOnlyStoreDataRecovery() { DataRecoveryVersionConfig dataRecoveryVersionConfig = new DataRecoveryVersionConfigImpl("dc-0", false, 1); doReturn(dataRecoveryVersionConfig).when(version).getDataRecoveryVersionConfig(); + StorageService storageService = mock(StorageService.class); Store store = mock(Store.class); doReturn(version).when(store).getVersion(eq(1)); @@ -4251,6 +4271,7 @@ public void testBatchOnlyStoreDataRecovery() { null).build(); doReturn(Version.parseStoreFromVersionTopic(topic)).when(store).getName(); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, store, version, new Properties(), @@ -4310,6 +4331,7 @@ public void testMaybeSendIngestionHeartbeat( NodeType nodeType, HybridConfig hybridConfig) { String storeName = Utils.getUniqueString("store"); + StorageService storageService = mock(StorageService.class); Store mockStore = mock(Store.class); doReturn(storeName).when(mockStore).getName(); String versionTopic = Version.composeKafkaTopic(storeName, 1); @@ -4366,6 +4388,7 @@ public void testMaybeSendIngestionHeartbeat( .build(); LeaderFollowerStoreIngestionTask ingestionTask = (LeaderFollowerStoreIngestionTask) ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, mockVersion, mockKafkaConsumerProperties, @@ -4399,6 +4422,7 @@ public void testMaybeSendIngestionHeartbeat( @Test public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws InterruptedException { String storeName = Utils.getUniqueString("store"); + StorageService storageService = mock(StorageService.class); Store mockStore = mock(Store.class); doReturn(storeName).when(mockStore).getName(); String versionTopic = Version.composeKafkaTopic(storeName, 1); @@ -4453,6 +4477,7 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter .build(); LeaderFollowerStoreIngestionTask ingestionTask = (LeaderFollowerStoreIngestionTask) ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, mockVersion, mockKafkaConsumerProperties, diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java index 83576ac7ea1..cd0800b742c 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java @@ -73,6 +73,7 @@ public class VeniceServerTest { + static final long TOTAL_TIMEOUT_FOR_LONG_TEST_MS = 70 * Time.MS_PER_SECOND; private static final Logger LOGGER = LogManager.getLogger(VeniceServerTest.class); @Test @@ -372,4 +373,72 @@ public void testStartServerWithSystemSchemaInitialization() { }); } } + + @Test(timeOut = TOTAL_TIMEOUT_FOR_LONG_TEST_MS) + public void testDropStorePartitionAsynchronously() { + try (VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(1, 1, 0)) { + Properties featureProperties = new Properties(); + featureProperties.setProperty(SERVER_ENABLE_SERVER_ALLOW_LIST, Boolean.toString(true)); + featureProperties.setProperty(SERVER_IS_AUTO_JOIN, Boolean.toString(true)); + VeniceServerWrapper server = cluster.getVeniceServers().get(0); + Assert.assertTrue(server.getVeniceServer().isStarted()); + + StorageService storageService = server.getVeniceServer().getStorageService(); + Assert.assertTrue(server.getVeniceServer().isStarted()); + final StorageEngineRepository repository = storageService.getStorageEngineRepository(); + Assert + .assertTrue(repository.getAllLocalStorageEngines().isEmpty(), "New node should not have any storage engine."); + + // Create a new store + String storeName = cluster.createStore(1); + String storeVersionName = Version.composeKafkaTopic(storeName, 1); + Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1); + Assert.assertTrue(server.getVeniceServer().getHelixParticipationService().isRunning()); + Assert.assertEquals(storageService.getStorageEngine(storeVersionName).getPartitionIds().size(), 3); + + // Add servers to trigger a rebalance, which will redistribute and drop partitions for the current participant + cluster.addVeniceServer(featureProperties, new Properties()); + cluster.addVeniceServer(featureProperties, new Properties()); + + Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1); + + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + // Partitions should have been dropped asynchronously due to rebalancing + Assert.assertTrue(storageService.getStorageEngine(storeVersionName).getPartitionIds().size() < 3); + }); + } + } + + @Test(timeOut = TOTAL_TIMEOUT_FOR_LONG_TEST_MS) + public void testDropStorePartitionSynchronously() { + try (VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(1, 1, 0)) { + Properties featureProperties = new Properties(); + featureProperties.setProperty(SERVER_ENABLE_SERVER_ALLOW_LIST, Boolean.toString(true)); + featureProperties.setProperty(SERVER_IS_AUTO_JOIN, Boolean.toString(true)); + VeniceServerWrapper server = cluster.getVeniceServers().get(0); + Assert.assertTrue(server.getVeniceServer().isStarted()); + + StorageService storageService = server.getVeniceServer().getStorageService(); + Assert.assertTrue(server.getVeniceServer().isStarted()); + final StorageEngineRepository repository = storageService.getStorageEngineRepository(); + Assert + .assertTrue(repository.getAllLocalStorageEngines().isEmpty(), "New node should not have any storage engine."); + + // Create a new store + String storeName = cluster.createStore(1); + String storeVersionName = Version.composeKafkaTopic(storeName, 1); + Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1); + Assert.assertTrue(server.getVeniceServer().getHelixParticipationService().isRunning()); + Assert.assertEquals(storageService.getStorageEngine(storeVersionName).getPartitionIds().size(), 3); + + cluster.useControllerClient(controllerClient -> { + controllerClient.disableAndDeleteStore(storeName); + }); + + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + // All partitions should have been dropped synchronously + Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 0); + }); + } + } } diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index 18f6f50507f..9df4ce8d610 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -370,7 +370,7 @@ private List createServices() { // create and add KafkaSimpleConsumerService this.kafkaStoreIngestionService = new KafkaStoreIngestionService( - storageService.getStorageEngineRepository(), + storageService, veniceConfigLoader, storageMetadataService, new StaticClusterInfoProvider(Collections.singleton(clusterConfig.getClusterName())),