Skip to content

Commit

Permalink
[server] [config] Configurable Increased Max Wait After Unsubscribe (#…
Browse files Browse the repository at this point in the history
…1339)

* Extended the `waitAfterUnsubscribe()` timeout during state transitions to be configurable. 🩰
* Fixed log message for seconds instead of milliseconds timeout. 🎺
  • Loading branch information
KaiSernLim authored Nov 22, 2024
1 parent 7f553be commit b11b966
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEAN_UP_INTERVAL_IN_MINUTES;
import static com.linkedin.venice.ConfigKeys.SERVER_LOCAL_CONSUMER_CONFIG_PREFIX;
import static com.linkedin.venice.ConfigKeys.SERVER_MAX_REQUEST_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_MAX_WAIT_AFTER_UNSUBSCRIBE_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_MAX_WAIT_FOR_VERSION_INFO_MS_CONFIG;
import static com.linkedin.venice.ConfigKeys.SERVER_NEARLINE_WORKLOAD_PRODUCER_THROUGHPUT_OPTIMIZATION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS;
Expand Down Expand Up @@ -562,6 +563,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final boolean isGlobalRtDivEnabled;
private final boolean nearlineWorkloadProducerThroughputOptimizationEnabled;
private final int zstdDictCompressionLevel;
private final long maxWaitAfterUnsubscribeMs;

public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException {
this(serverProperties, Collections.emptyMap());
Expand Down Expand Up @@ -947,6 +949,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
"Invalid zstd dict compression level: " + zstdDictCompressionLevel + " should be between "
+ Zstd.minCompressionLevel() + " and " + Zstd.maxCompressionLevel());
}
maxWaitAfterUnsubscribeMs =
serverProperties.getLong(SERVER_MAX_WAIT_AFTER_UNSUBSCRIBE_MS, TimeUnit.MINUTES.toMillis(30));
}

long extractIngestionMemoryLimit(
Expand Down Expand Up @@ -1720,4 +1724,8 @@ public boolean isNearlineWorkloadProducerThroughputOptimizationEnabled() {
public int getZstdDictCompressionLevel() {
return zstdDictCompressionLevel;
}

public long getMaxWaitAfterUnsubscribeMs() {
return maxWaitAfterUnsubscribeMs;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.davinci.kafka.consumer;

import static com.linkedin.venice.utils.LatencyUtils.getElapsedTimeFromMsToMs;

import com.linkedin.davinci.stats.AggKafkaConsumerServiceStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
Expand All @@ -10,7 +12,6 @@
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.api.exceptions.PubSubUnsubscribedTopicPartitionException;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
Expand Down Expand Up @@ -40,13 +41,9 @@
* TODO: move this logic inside consumption task, this class does not need to be sub-class of {@link PubSubConsumerAdapter}
*/
class SharedKafkaConsumer implements PubSubConsumerAdapter {
// StoreIngestionTask#consumerUnSubscribeForStateTransition() uses an increased max wait (30 mins by default) for
// safety
public static final long DEFAULT_MAX_WAIT_MS = TimeUnit.SECONDS.toMillis(10);
/**
* Increase the max wait during state transitions to ensure that it waits for the messages to finish processing. A
* poll() indicates that all previous inflight messages under the previous state were processed, so there can't be a
* state mismatch. The consumer_records_producing_to_write_buffer_latency metric suggests how long the wait should be.
*/
public static final long STATE_TRANSITION_MAX_WAIT_MS = TimeUnit.MINUTES.toMillis(30);

private static final Logger LOGGER = LogManager.getLogger(SharedKafkaConsumer.class);

Expand Down Expand Up @@ -127,8 +124,7 @@ protected synchronized void updateCurrentAssignment(Set<PubSubTopicPartition> ne
currentAssignmentSize.set(newAssignment.size());
currentAssignment = Collections.unmodifiableSet(newAssignment);
assignmentChangeListener.run();
stats.recordTotalUpdateCurrentAssignmentLatency(
LatencyUtils.getElapsedTimeFromMsToMs(updateCurrentAssignmentStartTime));
stats.recordTotalUpdateCurrentAssignmentLatency(getElapsedTimeFromMsToMs(updateCurrentAssignmentStartTime));
}

@Override
Expand All @@ -151,7 +147,7 @@ synchronized void subscribe(
+ " versionTopic: " + versionTopic + ", previousVersionTopic: " + previousVersionTopic
+ ", topicPartitionToSubscribe: " + topicPartitionToSubscribe);
}
stats.recordTotalDelegateSubscribeLatency(LatencyUtils.getElapsedTimeFromMsToMs(delegateSubscribeStartTime));
stats.recordTotalDelegateSubscribeLatency(getElapsedTimeFromMsToMs(delegateSubscribeStartTime));
updateCurrentAssignment(delegate.getAssignment());
}

Expand Down Expand Up @@ -228,19 +224,19 @@ protected void waitAfterUnsubscribe(
final long waitMs = endTimeMs - time.getMilliseconds();
if (waitMs <= 0) {
LOGGER.warn(
"Wait for poll request after unsubscribe topic partition(s) ({}) timed out after {} milliseconds",
"Wait for poll request after unsubscribe topic partition(s) ({}) timed out after {} seconds",
topicPartitions,
timeoutMs);
TimeUnit.MILLISECONDS.toSeconds(timeoutMs));
break;
}
wait(waitMs);
}
final long elapsedMs = time.getMilliseconds() - startTimeMs;
if (elapsedMs > TimeUnit.SECONDS.toMillis(15)) {
final long elapsedSeconds = TimeUnit.MILLISECONDS.toSeconds(getElapsedTimeFromMsToMs(startTimeMs));
if (elapsedSeconds > 15) {
LOGGER.warn(
"Wait for poll request after unsubscribe topic partition(s) ({}) took {} milliseconds",
"Wait for poll request after unsubscribe topic partition(s) ({}) took {} seconds",
topicPartitions,
elapsedMs);
elapsedSeconds);
}
// no action to take actually, just return;
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3470,7 +3470,7 @@ public void consumerUnSubscribeForStateTransition(
int partitionId = partitionConsumptionState.getPartition();
PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(topic, partitionId);
aggKafkaConsumerService
.unsubscribeConsumerFor(versionTopic, topicPartition, SharedKafkaConsumer.STATE_TRANSITION_MAX_WAIT_MS);
.unsubscribeConsumerFor(versionTopic, topicPartition, serverConfig.getMaxWaitAfterUnsubscribeMs());
LOGGER.info(
"Consumer unsubscribed to topic-partition: {} for replica: {}. Took {} ms",
topicPartition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ private ConfigKeys() {
"pubsub.topic.manager.metadata.fetcher.thread.pool.size";

/**
* How long to wait for the next poll request after unsubscribing, indicating that old messages were processed.
* During a state transition, it is unsafe to proceed without waiting for all inflight messages to be processed.
* This controls how long to wait for inflight messages after unsubscribing from a topic during a state transition.
*/
public static final String SERVER_WAIT_AFTER_UNSUBSCRIBE_TIMEOUT_MS = "server.wait.after.unsubscribe.timeout.ms";
public static final String SERVER_MAX_WAIT_AFTER_UNSUBSCRIBE_MS = "server.max.wait.after.unsubscribe.ms";

// Cluster specific configs for controller
public static final String CONTROLLER_NAME = "controller.name";
Expand Down

0 comments on commit b11b966

Please sign in to comment.