Skip to content

Commit

Permalink
[da-vinci] Speed up shutdown (#731)
Browse files Browse the repository at this point in the history
There are two issues with DaVinci shutdown:
1. `VersionBackend#close` is calling `KafkaStoreIngestionService#shutdownStoreIngestionTask`, which
   would wait for up to 10s to complete the shutdown (best effort).
2. Each SIT will shutdown the partition sequentially.

When shutting down a DaVinci instance, the Store Versions hosted will be shut down sequentially and
if there are x number of DaVinci stores, the last store will receive the shutdown signal after 10 * x seconds,
which is too late.
If there are y number of partitions, based on the logs, each partition graceful shutdown would take roughly 2 seconds
for a hybrid store partition,
so it will take 2 * y seconds to gracefully shut down one `SIT`.
For some heavy DaVinci users, they normally use multiple DaVinci stores (5+) stores and each store would use a partition count
at least 20+, and based on the above calculation, it will take several mins to let DaVinci instance fully shutdown.

This code change will shut down all the subscribed Stores concurrently when closing DaVinciBackend and in the meantime, for DaVinci,
`SIT` will try to close the subscribed partitions concurrently as well.

TODO: we can evaluate whether this concurrent shutdown is applicable to Venice Server or not.
To me, it is not necessary mostly since normally, each Venice Server will host a big number of SIT and adding more concurrency in
partition level won't help much since the total number of CPU cores is limited.
  • Loading branch information
gaojieliu authored Nov 3, 2023
1 parent 074c0ac commit fa03473
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.stats.TehutiUtils;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
Expand Down Expand Up @@ -423,8 +424,21 @@ public synchronized void close() {
cacheBackend.ifPresent(
objectCacheBackend -> storeRepository
.unregisterStoreDataChangedListener(objectCacheBackend.getCacheInvalidatingStoreChangeListener()));
ExecutorService storeBackendCloseExecutor =
Executors.newCachedThreadPool(new DaemonThreadFactory("DaVinciBackend-StoreBackend-Close"));
for (StoreBackend storeBackend: storeByNameMap.values()) {
storeBackend.close();
/**
* {@link StoreBackend#close()} is time-consuming since the internal {@link VersionBackend#close()} call triggers
* {@link KafkaStoreIngestionService#shutdownStoreIngestionTask}, which can take up to 10s to return.
* So here we use a thread pool to shut down all the subscribed stores concurrently.
*/
storeBackendCloseExecutor.submit(() -> storeBackend.close());
}
storeBackendCloseExecutor.shutdown();
try {
storeBackendCloseExecutor.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
currentThread().interrupt();
}
storeByNameMap.clear();
versionByTopicMap.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
protected static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER =
RedundantExceptionFilter.getRedundantExceptionFilter();

/**
* Speed up DaVinci shutdown by closing partitions concurrently.
*/
private static final ExecutorService SHUTDOWN_EXECUTOR_FOR_DVC =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

/** storage destination for consumption */
protected final StorageEngineRepository storageEngineRepository;
protected final AbstractStorageEngine storageEngine;
Expand Down Expand Up @@ -1299,6 +1305,7 @@ public void run() {
maybeSendIngestionHeartbeat();
}

List<CompletableFuture<Void>> shutdownFutures = new ArrayList<>(partitionConsumptionStateMap.size());
// If the ingestion task is stopped gracefully (server stops), persist processed offset to disk
for (Map.Entry<Integer, PartitionConsumptionState> entry: partitionConsumptionStateMap.entrySet()) {
/**
Expand All @@ -1314,21 +1321,43 @@ public void run() {
* hasn't been applied yet, when checkpointing happens in current thread.
*/

int partition = entry.getKey();
PartitionConsumptionState partitionConsumptionState = entry.getValue();
consumerUnSubscribeAllTopics(partitionConsumptionState);
Runnable shutdownRunnable = () -> {
int partition = entry.getKey();
PartitionConsumptionState partitionConsumptionState = entry.getValue();
consumerUnSubscribeAllTopics(partitionConsumptionState);

if (ingestionCheckpointDuringGracefulShutdownEnabled) {
try {
waitForAllMessageToBeProcessedFromTopicPartition(
new PubSubTopicPartitionImpl(versionTopic, partitionConsumptionState.getPartition()),
partitionConsumptionState);
} catch (InterruptedException e) {
throw new VeniceException(e);
}

if (ingestionCheckpointDuringGracefulShutdownEnabled) {
waitForAllMessageToBeProcessedFromTopicPartition(
new PubSubTopicPartitionImpl(versionTopic, partitionConsumptionState.getPartition()),
partitionConsumptionState);
this.kafkaDataIntegrityValidator
.updateOffsetRecordForPartition(partition, partitionConsumptionState.getOffsetRecord());
updateOffsetMetadataInOffsetRecord(partitionConsumptionState);
syncOffset(kafkaVersionTopic, partitionConsumptionState);
}
};

this.kafkaDataIntegrityValidator
.updateOffsetRecordForPartition(partition, partitionConsumptionState.getOffsetRecord());
updateOffsetMetadataInOffsetRecord(partitionConsumptionState);
syncOffset(kafkaVersionTopic, partitionConsumptionState);
if (isDaVinciClient) {
shutdownFutures.add(CompletableFuture.runAsync(shutdownRunnable, SHUTDOWN_EXECUTOR_FOR_DVC));
} else {
/**
* TODO: evaluate whether we need to apply concurrent shutdown in Venice Server or not.
*/
shutdownRunnable.run();
}
}
if (isDaVinciClient) {
/**
* DaVinci shutdown shouldn't take that long because of high concurrency, and it is fine to specify a high timeout here
* to avoid infinite wait in case there is some regression.
*/
CompletableFuture.allOf(shutdownFutures.toArray(new CompletableFuture[0])).get(60, SECONDS);
}
} catch (VeniceIngestionTaskKilledException e) {
LOGGER.info("{} has been killed.", consumerTaskId);
statusReportAdapter.reportKilled(partitionConsumptionStateMap.values(), e);
Expand Down Expand Up @@ -1456,9 +1485,11 @@ private void internalClose(boolean doFlush) {
}

close();

synchronized (this) {
notifyAll();
}

LOGGER.info("Store ingestion task for store: {} is closed", kafkaVersionTopic);
}

Expand Down

0 comments on commit fa03473

Please sign in to comment.