Skip to content

Commit

Permalink
[server][controller] Close VeniceWriters in MetaStoreWriter concurren…
Browse files Browse the repository at this point in the history
…tly and in bounded time (#710)

This PR addresses the issue in the current `MetaStoreWriter::close` implementation,  
which uses `parallelStream` to close Venice writers. Since no thread pool is specified for  
running `parallelStream`, it uses threads from `ForkJoin.commonPool`. This can lead to  
issues where other parts of the system monopolize the FJ thread pool, causing a lack of  
guaranteed concurrency during the close operation. In other cases, the `MetaStoreWriter`  
itself may monopolize the FJ thread pool, preventing other system components' access to it.  

This PR addresses the issue by eliminating the dependency on the FJ common pool during  
the MetaStore shutdown. It introduces the `VeniceWriter::closeAsync` API to close Venice  
writers asynchronously. This async operation runs in the Venice writer's dedicated elastic  
thread pool, which ensures concurrent and predictable behavior during the close process.
  • Loading branch information
sushantmane authored Oct 24, 2023
1 parent 781f3dd commit 1ff609a
Show file tree
Hide file tree
Showing 14 changed files with 336 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import static com.linkedin.venice.ConfigKeys.LOCAL_D2_ZK_HOST;
import static com.linkedin.venice.ConfigKeys.MAX_FUTURE_VERSION_LEADER_FOLLOWER_STATE_TRANSITION_THREAD_NUMBER;
import static com.linkedin.venice.ConfigKeys.MAX_LEADER_FOLLOWER_STATE_TRANSITION_THREAD_NUMBER;
import static com.linkedin.venice.ConfigKeys.META_STORE_WRITER_CLOSE_CONCURRENCY;
import static com.linkedin.venice.ConfigKeys.META_STORE_WRITER_CLOSE_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.OFFSET_LAG_DELTA_RELAX_FACTOR_FOR_FAST_ONLINE_TRANSITION_IN_RESTART;
import static com.linkedin.venice.ConfigKeys.PARTICIPANT_MESSAGE_CONSUMPTION_DELAY_MS;
import static com.linkedin.venice.ConfigKeys.PUB_SUB_ADMIN_ADAPTER_FACTORY_CLASS;
Expand Down Expand Up @@ -437,6 +439,9 @@ public class VeniceServerConfig extends VeniceClusterConfig {

private final int ingestionTaskMaxIdleCount;

private final long metaStoreWriterCloseTimeoutInMS;
private final int metaStoreWriterCloseConcurrency;

public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException {
this(serverProperties, Collections.emptyMap());
}
Expand Down Expand Up @@ -719,6 +724,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
}
routerPrincipalName = serverProperties.getString(ROUTER_PRINCIPAL_NAME, "CN=venice-router");
ingestionTaskMaxIdleCount = serverProperties.getInt(SERVER_INGESTION_TASK_MAX_IDLE_COUNT, 10000);
metaStoreWriterCloseTimeoutInMS = serverProperties.getLong(META_STORE_WRITER_CLOSE_TIMEOUT_MS, 300000L);
metaStoreWriterCloseConcurrency = serverProperties.getInt(META_STORE_WRITER_CLOSE_CONCURRENCY, -1);
}

long extractIngestionMemoryLimit(
Expand Down Expand Up @@ -1260,4 +1267,12 @@ public int getIngestionTaskMaxIdleCount() {
public boolean isKMERegistrationFromMessageHeaderEnabled() {
return isKMERegistrationFromMessageHeaderEnabled;
}

public long getMetaStoreWriterCloseTimeoutInMS() {
return metaStoreWriterCloseTimeoutInMS;
}

public int getMetaStoreWriterCloseConcurrency() {
return metaStoreWriterCloseConcurrency;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,9 @@ public KafkaStoreIngestionService(
topicManagerRepository.getTopicManager(),
veniceWriterFactoryForMetaStoreWriter,
zkSharedSchemaRepository.get(),
pubSubTopicRepository);
pubSubTopicRepository,
serverConfig.getMetaStoreWriterCloseTimeoutInMS(),
serverConfig.getMetaStoreWriterCloseConcurrency());
this.metaSystemStoreReplicaStatusNotifier = new MetaSystemStoreReplicaStatusNotifier(
serverConfig.getClusterName(),
metaStoreWriter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ public void testLeaderCanSendValueChunksIntoDrainer()
PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class);
CompletableFuture mockedFuture = mock(CompletableFuture.class);
when(mockedProducer.getNumberOfPartitions(any())).thenReturn(1);
when(mockedProducer.getNumberOfPartitions(any(), anyInt(), any())).thenReturn(1);
AtomicLong offset = new AtomicLong(0);

ArgumentCaptor<KafkaKey> kafkaKeyArgumentCaptor = ArgumentCaptor.forClass(KafkaKey.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1869,6 +1869,16 @@ private ConfigKeys() {
public static final String USE_PUSH_STATUS_STORE_FOR_INCREMENTAL_PUSH =
"controller.server.incremental.push.use.push.status.store";

/**
* A config to control the maximum time spent on closing the meta store writer.
*/
public static final String META_STORE_WRITER_CLOSE_TIMEOUT_MS = "meta.store.writer.close.timeout.ms";

/**
* A config to control the maximum number of concurrent meta store writer close operations.
*/
public static final String META_STORE_WRITER_CLOSE_CONCURRENCY = "meta.store.writer.close.concurrency";

/**
* A config to control whether VeniceServer will optimize the database for the backup version to
* free up memory resources occupied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,8 @@
import com.linkedin.venice.pubsub.api.exceptions.PubSubTopicAuthorizationException;
import com.linkedin.venice.pubsub.api.exceptions.PubSubTopicDoesNotExistException;
import it.unimi.dsi.fastutil.objects.Object2DoubleMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;


/**
Expand All @@ -26,22 +20,12 @@
* 2. In order delivery (IOD): messages in the same partition should follow the order in which they were sent.
*/
public interface PubSubProducerAdapter {
ExecutorService timeOutExecutor = Executors.newSingleThreadExecutor();

/**
* The support for the following two getNumberOfPartitions APIs will be removed.
*/
@Deprecated
int getNumberOfPartitions(String topic);

@Deprecated
default int getNumberOfPartitions(String topic, int timeout, TimeUnit timeUnit)
throws InterruptedException, ExecutionException, TimeoutException {
Callable<Integer> task = () -> getNumberOfPartitions(topic);
Future<Integer> future = timeOutExecutor.submit(task);
return future.get(timeout, timeUnit);
}

/**
* Sends a message to a PubSub topic asynchronously and returns a {@link Future} representing the result of the produce operation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.linkedin.venice.systemstore.schemas.StoreReplicaStatus;
import com.linkedin.venice.systemstore.schemas.StoreValueSchema;
import com.linkedin.venice.systemstore.schemas.StoreValueSchemas;
import com.linkedin.venice.utils.Timer;
import com.linkedin.venice.utils.VeniceResourceCloseResult;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
Expand All @@ -36,9 +36,13 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -73,22 +77,46 @@ public class MetaStoreWriter implements Closeable {
private int derivedComputeSchemaId = -1;

private final PubSubTopicRepository pubSubTopicRepository;
private final long closeTimeoutMs;
private final int numOfConcurrentVwCloseOps;

public MetaStoreWriter(
TopicManager topicManager,
VeniceWriterFactory writerFactory,
HelixReadOnlyZKSharedSchemaRepository schemaRepo,
PubSubTopicRepository pubSubTopicRepository) {
this.topicManager = topicManager;
this.writerFactory = writerFactory;
PubSubTopicRepository pubSubTopicRepository,
long closeTimeoutMs,
int numOfConcurrentVwCloseOps) {
/**
* TODO: get the write compute schema from the constructor so that this class does not use {@link WriteComputeSchemaConverter}
*/
this.derivedComputeSchema = WriteComputeSchemaConverter.getInstance()
.convertFromValueRecordSchema(
AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersionSchema());
this(
topicManager,
writerFactory,
schemaRepo,
WriteComputeSchemaConverter.getInstance()
.convertFromValueRecordSchema(
AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersionSchema()),
pubSubTopicRepository,
closeTimeoutMs,
numOfConcurrentVwCloseOps);
}

MetaStoreWriter(
TopicManager topicManager,
VeniceWriterFactory writerFactory,
HelixReadOnlyZKSharedSchemaRepository schemaRepo,
Schema derivedComputeSchema,
PubSubTopicRepository pubSubTopicRepository,
long closeTimeoutMs,
int numOfConcurrentVwCloseOps) {
this.topicManager = topicManager;
this.writerFactory = writerFactory;
this.derivedComputeSchema = derivedComputeSchema;
this.zkSharedSchemaRepository = schemaRepo;
this.pubSubTopicRepository = pubSubTopicRepository;
this.closeTimeoutMs = closeTimeoutMs;
this.numOfConcurrentVwCloseOps = numOfConcurrentVwCloseOps;
}

/**
Expand Down Expand Up @@ -492,17 +520,71 @@ private void closeVeniceWriter(String metaStoreName, VeniceWriter veniceWriter,
}
}

/**
* If numOfConcurrentVwCloseOps is set to -1, then all the VeniceWriters will be closed asynchronously and concurrently.
* If numOfConcurrentVwCloseOps is set to a positive number, then the VeniceWriters will be closed with a bounded concurrency until timeout.
* Once timeout is reached, the remaining VeniceWriters will be closed asynchronously and concurrently.
*/
@Override
public void close() throws IOException {
// Close VeniceWrites in parallel to reduce the time to shut down the server.
try (Timer ignore = Timer.run(
elapsedTimeInMs -> LOGGER.info(
"MetaStoreWriter takes {}ms to close {} VeniceWriters in parallel",
elapsedTimeInMs,
metaStoreWriterMap.size()))) {
metaStoreWriterMap.entrySet()
.parallelStream()
.forEach(entry -> closeVeniceWriter(entry.getKey(), entry.getValue(), false));
public synchronized void close() throws IOException {
long startTime = System.currentTimeMillis();
LOGGER.info(
"Closing MetaStoreWriter - numOfVeniceWriters: {} permits: {} timeoutInMs: {}",
metaStoreWriterMap.size(),
numOfConcurrentVwCloseOps,
closeTimeoutMs);
// iterate through the map and close all the VeniceWriters
List<CompletableFuture<VeniceResourceCloseResult>> closeFutures = new ArrayList<>(metaStoreWriterMap.size());
List<VeniceWriter> writersToClose = new ArrayList<>(metaStoreWriterMap.values());
metaStoreWriterMap.clear();
// permit for the VeniceWriters to be closed asynchronously
int permits = numOfConcurrentVwCloseOps == -1 ? writersToClose.size() : numOfConcurrentVwCloseOps;
Semaphore semaphore = new Semaphore(permits);
long deadline = startTime + closeTimeoutMs;
for (VeniceWriter veniceWriter: writersToClose) {
boolean acquired = false;
try {
acquired = semaphore.tryAcquire(Math.max(0, deadline - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupted while acquiring semaphore", e);
}
CompletableFuture<VeniceResourceCloseResult> closeFuture = veniceWriter.closeAsync(true);
closeFutures.add(closeFuture);
if (acquired) {
// release the semaphore when the future is completed
closeFuture.whenComplete((result, throwable) -> semaphore.release());
}
}

// wait for all the VeniceWriters to be closed with a bounded timeout
try {
CompletableFuture.allOf(closeFutures.toArray(new CompletableFuture[0]))
.get(Math.max(1000, deadline - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOGGER.warn("Caught exception while closing VeniceWriters", e);
}

// collect the close results
EnumMap<VeniceResourceCloseResult, Integer> closeResultMap = new EnumMap<>(VeniceResourceCloseResult.class);
for (CompletableFuture<VeniceResourceCloseResult> future: closeFutures) {
if (!future.isDone()) {
closeResultMap.compute(VeniceResourceCloseResult.UNKNOWN, (key, value) -> value == null ? 1 : value + 1);
continue;
}
// for the completed future, get the close result and increment the counter
try {
closeResultMap.compute(future.get(), (key, value) -> value == null ? 1 : value + 1);
} catch (Exception e) {
LOGGER.warn("Caught exception while getting VeniceResourceCloseResult", e);
closeResultMap.compute(VeniceResourceCloseResult.FAILED, (key, value) -> value == null ? 1 : value + 1);
}
}

LOGGER.info(
"Closed MetaStoreWriter in {} ms - numbOfVeniceWriters: {} veniceWriterCloseResult: {}",
System.currentTimeMillis() - startTime,
writersToClose.size(),
closeResultMap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.linkedin.venice.utils;

public enum VeniceResourceCloseResult {
SUCCESS(0), ALREADY_CLOSED(1), FAILED(2), UNKNOWN(3);

private final int statusCode;

VeniceResourceCloseResult(int statusCode) {
this.statusCode = statusCode;
}

public int getStatusCode() {
return statusCode;
}
}
Loading

0 comments on commit 1ff609a

Please sign in to comment.