Skip to content

Commit

Permalink
[server][dvc] Drop partitions asynchronously (#1310)
Browse files Browse the repository at this point in the history
When a storage node is transitioning from LEADER -> STANDBY -> OFFLINE -> DROPPED, a race condition can occur. Specifically, the DROPPED state transition may be executed synchronously before the other state transitions are processed (LEADER -> STANDBY -> OFFLINE are executed asynchronously). This results in the store partition being deleted prematurely. Consequently, when the LEADER -> STANDBY message is eventually processed, it triggers a PersistenceFailureException since the storage partition no longer exists.

The solution for this is to drop the store partition asynchronously by adding a DROP_PARTITION message to the consumerActionsQueue if the ingestion task is still running. In the case that it's not running (this can happen if it was killed), the store partition will be dropped synchronously. Additionally, if the ingestion task is killed after adding a DROP_PARTITION message, we will process this message in StoreIngestionTask::internalClose.
  • Loading branch information
kvargha authored Nov 20, 2024
1 parent 5542657 commit e23c375
Show file tree
Hide file tree
Showing 15 changed files with 274 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public DaVinciBackend(
.map(objectCacheConfig -> new ObjectCacheBackend(clientConfig, objectCacheConfig, schemaRepository));

ingestionService = new KafkaStoreIngestionService(
storageService.getStorageEngineRepository(),
storageService,
configLoader,
storageMetadataService,
clusterInfoProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ public void dropStoragePartitionGracefully(
final int waitIntervalInSecond = 1;
final int maxRetry = timeoutInSeconds / waitIntervalInSecond;
getStoreIngestionService().stopConsumptionAndWait(storeConfig, partition, waitIntervalInSecond, maxRetry, true);
// Drops corresponding data partition from storage.
this.storageService.dropStorePartition(storeConfig, partition, removeEmptyStorageEngine);
getStoreIngestionService().dropStoragePartitionGracefully(storeConfig, partition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ private void initializeIsolatedIngestionServer() {

// Create KafkaStoreIngestionService
storeIngestionService = new KafkaStoreIngestionService(
storageService.getStorageEngineRepository(),
storageService,
configLoader,
storageMetadataService,
clusterInfoProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.linkedin.davinci.replication.merge.RmdSerDe;
import com.linkedin.davinci.replication.merge.StringAnnotatedStoreSchemaCache;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer;
import com.linkedin.davinci.storage.chunking.RawBytesChunkingAdapter;
import com.linkedin.davinci.storage.chunking.SingleGetChunkingAdapter;
Expand Down Expand Up @@ -103,6 +104,7 @@ private static class ReusableObjects {
private final ThreadLocal<ReusableObjects> threadLocalReusableObjects = ThreadLocal.withInitial(ReusableObjects::new);

public ActiveActiveStoreIngestionTask(
StorageService storageService,
StoreIngestionTaskFactory.Builder builder,
Store store,
Version version,
Expand All @@ -114,6 +116,7 @@ public ActiveActiveStoreIngestionTask(
Optional<ObjectCacheBackend> cacheBackend,
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) {
super(
storageService,
builder,
store,
version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* An Enum enumerating all valid types of {@link ConsumerAction}.
*/
public enum ConsumerActionType {
SUBSCRIBE(1), UNSUBSCRIBE(1), RESET_OFFSET(1), PAUSE(1), RESUME(1),
SUBSCRIBE(1), UNSUBSCRIBE(1), RESET_OFFSET(1), PAUSE(1), RESUME(1), DROP_PARTITION(1),
/**
* KILL action has higher priority than others, so that once KILL action is added to the action queue,
* we will process it immediately to avoid doing throw-away works.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.stats.ParticipantStoreConsumptionStats;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.view.VeniceViewWriterFactory;
import com.linkedin.venice.SSLConfig;
Expand Down Expand Up @@ -130,6 +130,7 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements
private static final String GROUP_ID_FORMAT = "%s_%s";

private static final Logger LOGGER = LogManager.getLogger(KafkaStoreIngestionService.class);
private final StorageService storageService;

private final VeniceConfigLoader veniceConfigLoader;

Expand Down Expand Up @@ -190,7 +191,7 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements
private final ExecutorService aaWCWorkLoadProcessingThreadPool;

public KafkaStoreIngestionService(
StorageEngineRepository storageEngineRepository,
StorageService storageService,
VeniceConfigLoader veniceConfigLoader,
StorageMetadataService storageMetadataService,
ClusterInfoProvider clusterInfoProvider,
Expand All @@ -212,6 +213,7 @@ public KafkaStoreIngestionService(
PubSubClientsFactory pubSubClientsFactory,
Optional<SSLFactory> sslFactory,
HeartbeatMonitoringService heartbeatMonitoringService) {
this.storageService = storageService;
this.cacheBackend = cacheBackend;
this.recordTransformerFunction = recordTransformerFunction;
this.storageMetadataService = storageMetadataService;
Expand Down Expand Up @@ -448,7 +450,7 @@ public void handleStoreDeleted(Store store) {

ingestionTaskFactory = StoreIngestionTaskFactory.builder()
.setVeniceWriterFactory(veniceWriterFactory)
.setStorageEngineRepository(storageEngineRepository)
.setStorageEngineRepository(storageService.getStorageEngineRepository())
.setStorageMetadataService(storageMetadataService)
.setLeaderFollowerNotifiersQueue(leaderFollowerNotifiers)
.setSchemaRepository(schemaRepo)
Expand Down Expand Up @@ -519,6 +521,7 @@ private StoreIngestionTask createStoreIngestionTask(
};

return ingestionTaskFactory.getNewIngestionTask(
storageService,
store,
version,
getKafkaConsumerProperties(veniceStoreVersionConfig),
Expand Down Expand Up @@ -920,6 +923,30 @@ public void stopConsumptionAndWait(
}
}

/**
* Drops the corresponding Venice Partition gracefully.
* This should only be called after {@link #stopConsumptionAndWait} has been called
* @param veniceStore Venice Store for the partition.
* @param partitionId Venice partition's id.
*/
public void dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStore, int partitionId) {
final String topic = veniceStore.getStoreVersionName();

try (AutoCloseableLock ignore = topicLockManager.getLockForResource(topic)) {
StoreIngestionTask ingestionTask = topicNameToIngestionTaskMap.get(topic);
if (ingestionTask != null) {
ingestionTask.dropStoragePartitionGracefully(
new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), partitionId));
} else {
LOGGER.info(
"Ingestion task for Topic {} is null. Dropping partition {} synchronously",
veniceStore.getStoreVersionName(),
partitionId);
this.storageService.dropStorePartition(veniceStore, partitionId, true);
}
}
}

/**
* This function will try to kill the ingestion tasks belonging to non-current versions.
* And this is mainly being used by memory limiter feature to free up resources when encountering memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.linkedin.davinci.schema.merge.CollectionTimestampMergeRecordHelper;
import com.linkedin.davinci.schema.merge.MergeRecordHelper;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer;
import com.linkedin.davinci.storage.chunking.GenericRecordChunkingAdapter;
import com.linkedin.davinci.store.AbstractStorageEngine;
Expand Down Expand Up @@ -204,6 +205,7 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask {
private final Version version;

public LeaderFollowerStoreIngestionTask(
StorageService storageService,
StoreIngestionTaskFactory.Builder builder,
Store store,
Version version,
Expand All @@ -215,6 +217,7 @@ public LeaderFollowerStoreIngestionTask(
Optional<ObjectCacheBackend> cacheBackend,
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) {
super(
storageService,
builder,
store,
version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.davinci.ingestion.LagType.OFFSET_LAG;
import static com.linkedin.davinci.ingestion.LagType.TIME_LAG;
import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.DROP_PARTITION;
import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.RESET_OFFSET;
import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.SUBSCRIBE;
import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.UNSUBSCRIBE;
Expand Down Expand Up @@ -34,6 +35,7 @@
import com.linkedin.davinci.stats.HostLevelIngestionStats;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.StoragePartitionConfig;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
Expand Down Expand Up @@ -189,6 +191,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

/** storage destination for consumption */
protected final StorageService storageService;
protected final StorageEngineRepository storageEngineRepository;
protected final AbstractStorageEngine storageEngine;

Expand Down Expand Up @@ -344,6 +347,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
protected final ExecutorService parallelProcessingThreadPool;

public StoreIngestionTask(
StorageService storageService,
StoreIngestionTaskFactory.Builder builder,
Store store,
Version version,
Expand All @@ -361,6 +365,7 @@ public StoreIngestionTask(
this.databaseSyncBytesIntervalForTransactionalMode = storeConfig.getDatabaseSyncBytesIntervalForTransactionalMode();
this.databaseSyncBytesIntervalForDeferredWriteMode = storeConfig.getDatabaseSyncBytesIntervalForDeferredWriteMode();
this.kafkaProps = kafkaConsumerProperties;
this.storageService = storageService;
this.storageEngineRepository = builder.getStorageEngineRepository();
this.storageMetadataService = builder.getStorageMetadataService();
this.storeRepository = builder.getMetadataRepo();
Expand Down Expand Up @@ -632,6 +637,41 @@ public synchronized CompletableFuture<Void> unSubscribePartition(
return consumerAction.getFuture();
}

/**
* Drops a storage partition gracefully.
* This is always a Helix triggered action.
*/
public void dropStoragePartitionGracefully(PubSubTopicPartition topicPartition) {
int partitionId = topicPartition.getPartitionNumber();
synchronized (this) {
if (isRunning()) {
LOGGER.info(
"Ingestion task is still running for Topic {}. Dropping partition {} asynchronously",
topicPartition.getTopicName(),
partitionId);
ConsumerAction consumerAction = new ConsumerAction(DROP_PARTITION, topicPartition, nextSeqNum(), true);
consumerActionsQueue.add(consumerAction);
return;
}
}

LOGGER.info(
"Ingestion task isn't running for Topic {}. Dropping partition {} synchronously",
topicPartition.getTopicName(),
partitionId);
dropPartitionSynchronously(topicPartition);
}

/**
* Drops a partition synchrnously. This is invoked when processing a DROP_PARTITION message.
*/
private void dropPartitionSynchronously(PubSubTopicPartition topicPartition) {
LOGGER.info("{} Dropping partition: {}", ingestionTaskName, topicPartition);
int partition = topicPartition.getPartitionNumber();
this.storageService.dropStorePartition(storeConfig, partition, true);
LOGGER.info("{} Dropped partition: {}", ingestionTaskName, topicPartition);
}

public boolean hasAnySubscription() {
return !partitionConsumptionStateMap.isEmpty();
}
Expand Down Expand Up @@ -1732,9 +1772,15 @@ private void reportError(
}

private void internalClose(boolean doFlush) {
// Set isRunning to false to prevent messages being added after we've already looped through consumerActionsQueue.
// Wrapping in synchronized to prevent a race condition on methods reading the value of isRunning.
synchronized (this) {
getIsRunning().set(false);
}

this.missingSOPCheckExecutor.shutdownNow();

// Only reset Offset Messages are important, subscribe/unsubscribe will be handled
// Only reset Offset and Drop Partition Messages are important, subscribe/unsubscribe will be handled
// on the restart by Helix Controller notifications on the new StoreIngestionTask.
try {
this.storeRepository.unregisterStoreDataChangedListener(this.storageUtilizationManager);
Expand All @@ -1746,12 +1792,16 @@ private void internalClose(boolean doFlush) {
if (opType == ConsumerActionType.RESET_OFFSET) {
LOGGER.info("Cleanup Reset OffSet. Replica: {}", replica);
storageMetadataService.clearOffset(topic, partition);
} else if (opType == DROP_PARTITION) {
PubSubTopicPartition topicPartition = message.getTopicPartition();
LOGGER.info("Processing DROP_PARTITION message for {} in internalClose", topicPartition);
dropPartitionSynchronously(topicPartition);
} else {
LOGGER.info("Cleanup ignoring the Message: {} Replica: {}", message, replica);
}
}
} catch (Exception e) {
LOGGER.error("{} Error while resetting offset.", ingestionTaskName, e);
LOGGER.error("{} Error while handling message in internalClose", ingestionTaskName, e);
}
// Unsubscribe any topic partitions related to this version topic from the shared consumer.
aggKafkaConsumerService.unsubscribeAll(versionTopic);
Expand Down Expand Up @@ -2137,6 +2187,9 @@ protected void processCommonConsumerAction(ConsumerAction consumerAction) throws
LOGGER.info("Kill this consumer task for Topic: {}", topic);
// Throw the exception here to break the consumption loop, and then this task is marked as error status.
throw new VeniceIngestionTaskKilledException(KILLED_JOB_MESSAGE + topic);
case DROP_PARTITION:
dropPartitionSynchronously(topicPartition);
break;
default:
throw new UnsupportedOperationException(operation.name() + " is not supported in " + getClass().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.view.VeniceViewWriterFactory;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
Expand Down Expand Up @@ -43,6 +44,7 @@ private StoreIngestionTaskFactory(Builder builder) {
}

public StoreIngestionTask getNewIngestionTask(
StorageService storageService,
Store store,
Version version,
Properties kafkaConsumerProperties,
Expand All @@ -54,6 +56,7 @@ public StoreIngestionTask getNewIngestionTask(
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) {
if (version.isActiveActiveReplicationEnabled()) {
return new ActiveActiveStoreIngestionTask(
storageService,
builder,
store,
version,
Expand All @@ -66,6 +69,7 @@ public StoreIngestionTask getNewIngestionTask(
recordTransformerFunction);
}
return new LeaderFollowerStoreIngestionTask(
storageService,
builder,
store,
version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.stats.HostLevelIngestionStats;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer;
import com.linkedin.davinci.storage.chunking.ChunkingUtils;
import com.linkedin.davinci.store.AbstractStorageEngine;
Expand Down Expand Up @@ -231,6 +232,7 @@ public void testisReadyToServeAnnouncedWithRTLag() {
Version mockVersion = new VersionImpl(STORE_NAME, 1, PUSH_JOB_ID);
mockVersion.setHybridStoreConfig(hybridStoreConfig);

StorageService storageService = mock(StorageService.class);
Store store = new ZKStore(
STORE_NAME,
"Felix",
Expand All @@ -250,6 +252,7 @@ public void testisReadyToServeAnnouncedWithRTLag() {
VeniceStoreVersionConfig storeVersionConfig =
new VeniceStoreVersionConfig(STORE_NAME + "_v1", new VeniceProperties(kafkaConsumerProperties));
ActiveActiveStoreIngestionTask ingestionTask = new ActiveActiveStoreIngestionTask(
storageService,
builder,
store,
mockVersion,
Expand Down
Loading

0 comments on commit e23c375

Please sign in to comment.