Skip to content

Commit

Permalink
[server] Introduced a repair service for stuck shared consumer (#757)
Browse files Browse the repository at this point in the history
* [server] Introduced a repair service for stuck shared consumer

Recently, we noticed that some shared consumers were stuck since they
were blocked by producing to some non-existing topics, and many other
store versions got affected since the stuck consumer is being shared
by many other stores.
This PR introduces a way to detect and repair stuck consumer for above
situation.
In the high level, there will be a thread periodically check whether consumer#poll
is being invoked regularly or not, and if not, the thread will scan all
the ingestion tasks to see whether they are producing to any non-existing topics
or not. And for the store ingestion task, which is talking to some non-existing topic,
the repair thread will try to close the stuck KafkaProducer and kill the corresponding
ingestion task.
This PR also checked the non-existing topics for a configurable duration to tolerate
tranisent topic metadata propagaton delay.

5 new server configs:

server.stuck.consumer.repair.enabled : default true
server.stuck.consumer.repair.second : default 1 min
server.stuck.consumer.repair.threshold.second : default 5 mins
server.non.existing.topic.ingestion.task.kill.threshold.second: default 15 mins
server.non.existing.topic.check.retry.interval.second: default 1 min


3 new server metrics:

.StuckConsumerRepair--stuck_consumer_found.OccurrenceRate
.StuckConsumerRepair--ingestion_task_repair.OccurrenceRate
.StuckConsumerRepair--repair_failure.OccurrenceRate
  • Loading branch information
gaojieliu authored Nov 29, 2023
1 parent ffa68e8 commit 59bcfe4
Show file tree
Hide file tree
Showing 12 changed files with 662 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
import static com.linkedin.venice.ConfigKeys.SERVER_NETTY_IDLE_TIME_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_NETTY_WORKER_THREADS;
import static com.linkedin.venice.ConfigKeys.SERVER_NODE_CAPACITY_RCU;
import static com.linkedin.venice.ConfigKeys.SERVER_NON_EXISTING_TOPIC_CHECK_RETRY_INTERNAL_SECOND;
import static com.linkedin.venice.ConfigKeys.SERVER_NON_EXISTING_TOPIC_INGESTION_TASK_KILL_THRESHOLD_SECOND;
import static com.linkedin.venice.ConfigKeys.SERVER_NUM_SCHEMA_FAST_CLASS_WARMUP;
import static com.linkedin.venice.ConfigKeys.SERVER_OPTIMIZE_DATABASE_FOR_BACKUP_VERSION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_OPTIMIZE_DATABASE_FOR_BACKUP_VERSION_NO_READ_THRESHOLD_SECONDS;
Expand Down Expand Up @@ -110,6 +112,9 @@
import static com.linkedin.venice.ConfigKeys.SERVER_SSL_HANDSHAKE_THREAD_POOL_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_STOP_CONSUMPTION_TIMEOUT_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_STORE_TO_EARLY_TERMINATION_THRESHOLD_MS_MAP;
import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_INTERVAL_SECOND;
import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_THRESHOLD_SECOND;
import static com.linkedin.venice.ConfigKeys.SERVER_SYSTEM_STORE_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_UNSUB_AFTER_BATCHPUSH;
import static com.linkedin.venice.ConfigKeys.SEVER_CALCULATE_QUOTA_USAGE_BASED_ON_PARTITIONS_ASSIGNMENT_ENABLED;
Expand Down Expand Up @@ -445,6 +450,12 @@ public class VeniceServerConfig extends VeniceClusterConfig {

private final long ingestionHeartbeatIntervalMs;

private final boolean stuckConsumerRepairEnabled;
private final int stuckConsumerRepairIntervalSecond;
private final int stuckConsumerDetectionRepairThresholdSecond;
private final int nonExistingTopicIngestionTaskKillThresholdSecond;
private final int nonExistingTopicCheckRetryIntervalSecond;

public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException {
this(serverProperties, Collections.emptyMap());
}
Expand Down Expand Up @@ -731,6 +742,21 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
metaStoreWriterCloseConcurrency = serverProperties.getInt(META_STORE_WRITER_CLOSE_CONCURRENCY, -1);
ingestionHeartbeatIntervalMs =
serverProperties.getLong(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, TimeUnit.MINUTES.toMillis(1));

stuckConsumerRepairEnabled = serverProperties.getBoolean(SERVER_STUCK_CONSUMER_REPAIR_ENABLED, true);
stuckConsumerRepairIntervalSecond = serverProperties.getInt(SERVER_STUCK_CONSUMER_REPAIR_INTERVAL_SECOND, 60);
stuckConsumerDetectionRepairThresholdSecond =
serverProperties.getInt(SERVER_STUCK_CONSUMER_REPAIR_THRESHOLD_SECOND, 5 * 60); // 5 mins
if (stuckConsumerRepairEnabled && stuckConsumerDetectionRepairThresholdSecond < stuckConsumerRepairIntervalSecond) {
throw new VeniceException(
"Config for " + SERVER_STUCK_CONSUMER_REPAIR_THRESHOLD_SECOND + ": "
+ stuckConsumerDetectionRepairThresholdSecond + " should be equal to or larger than "
+ SERVER_STUCK_CONSUMER_REPAIR_INTERVAL_SECOND + ": " + stuckConsumerRepairIntervalSecond);
}
nonExistingTopicIngestionTaskKillThresholdSecond =
serverProperties.getInt(SERVER_NON_EXISTING_TOPIC_INGESTION_TASK_KILL_THRESHOLD_SECOND, 15 * 60); // 15 mins
nonExistingTopicCheckRetryIntervalSecond =
serverProperties.getInt(SERVER_NON_EXISTING_TOPIC_CHECK_RETRY_INTERNAL_SECOND, 60); // 1min
}

long extractIngestionMemoryLimit(
Expand Down Expand Up @@ -1284,4 +1310,24 @@ public int getMetaStoreWriterCloseConcurrency() {
public long getIngestionHeartbeatIntervalMs() {
return ingestionHeartbeatIntervalMs;
}

public boolean isStuckConsumerRepairEnabled() {
return stuckConsumerRepairEnabled;
}

public int getStuckConsumerRepairIntervalSecond() {
return stuckConsumerRepairIntervalSecond;
}

public int getStuckConsumerDetectionRepairThresholdSecond() {
return stuckConsumerDetectionRepairThresholdSecond;
}

public int getNonExistingTopicIngestionTaskKillThresholdSecond() {
return nonExistingTopicIngestionTaskKillThresholdSecond;
}

public int getNonExistingTopicCheckRetryIntervalSecond() {
return nonExistingTopicCheckRetryIntervalSecond;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.ingestion.consumption.ConsumedDataReceiver;
import com.linkedin.davinci.stats.StuckConsumerRepairStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.TopicManagerRepository;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
Expand All @@ -16,15 +17,24 @@
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricsRepository;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -56,6 +66,9 @@ public class AggKafkaConsumerService extends AbstractVeniceService {
private final TopicManagerRepository.SSLPropertiesSupplier sslPropertiesSupplier;
private final Function<String, String> kafkaClusterUrlResolver;

private final Map<String, StoreIngestionTask> versionTopicStoreIngestionTaskMapping = new VeniceConcurrentHashMap<>();
private ScheduledExecutorService stuckConsumerRepairExecutorService;

public AggKafkaConsumerService(
final PubSubConsumerAdapterFactory consumerFactory,
TopicManagerRepository.SSLPropertiesSupplier sslPropertiesSupplier,
Expand All @@ -65,7 +78,8 @@ public AggKafkaConsumerService(
KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler,
final MetricsRepository metricsRepository,
TopicExistenceChecker topicExistenceChecker,
final PubSubMessageDeserializer pubSubDeserializer) {
final PubSubMessageDeserializer pubSubDeserializer,
Consumer<String> killIngestionTaskRunnable) {
this.consumerFactory = consumerFactory;
this.readCycleDelayMs = serverConfig.getKafkaReadCycleDelayMs();
this.numOfConsumersPerKafkaCluster = serverConfig.getConsumerPoolSizePerKafkaCluster();
Expand All @@ -83,6 +97,26 @@ public AggKafkaConsumerService(
this.pubSubDeserializer = pubSubDeserializer;
this.sslPropertiesSupplier = sslPropertiesSupplier;
this.kafkaClusterUrlResolver = serverConfig.getKafkaClusterUrlResolver();

if (serverConfig.isStuckConsumerRepairEnabled()) {
this.stuckConsumerRepairExecutorService = Executors.newSingleThreadScheduledExecutor(
new DaemonThreadFactory(this.getClass().getName() + "-StuckConsumerRepair"));
int intervalInSeconds = serverConfig.getStuckConsumerRepairIntervalSecond();
this.stuckConsumerRepairExecutorService.scheduleAtFixedRate(
getStuckConsumerDetectionAndRepairRunnable(
kafkaServerToConsumerServiceMap,
versionTopicStoreIngestionTaskMapping,
TimeUnit.SECONDS.toMillis(serverConfig.getStuckConsumerDetectionRepairThresholdSecond()),
TimeUnit.SECONDS.toMillis(serverConfig.getNonExistingTopicIngestionTaskKillThresholdSecond()),
TimeUnit.SECONDS.toMillis(serverConfig.getNonExistingTopicCheckRetryIntervalSecond()),
new StuckConsumerRepairStats(metricsRepository),
killIngestionTaskRunnable),
intervalInSeconds,
intervalInSeconds,
TimeUnit.SECONDS);
LOGGER.info("Started stuck consumer repair service with checking interval: {} seconds", intervalInSeconds);
}

LOGGER.info("Successfully initialized AggKafkaConsumerService");
}

Expand All @@ -100,6 +134,106 @@ public void stopInner() throws Exception {
for (KafkaConsumerService consumerService: kafkaServerToConsumerServiceMap.values()) {
consumerService.stop();
}
if (this.stuckConsumerRepairExecutorService != null) {
this.stuckConsumerRepairExecutorService.shutdownNow();
}
}

protected static Runnable getStuckConsumerDetectionAndRepairRunnable(
Map<String, KafkaConsumerService> kafkaServerToConsumerServiceMap,
Map<String, StoreIngestionTask> versionTopicStoreIngestionTaskMapping,
long stuckConsumerRepairThresholdMs,
long nonExistingTopicIngestionTaskKillThresholdMs,
long nonExistingTopicRetryIntervalMs,
StuckConsumerRepairStats stuckConsumerRepairStats,
Consumer<String> killIngestionTaskRunnable) {
return () -> {
/**
* The following logic can be further optimized in the following way:
* 1. If the max delay of previous run is much smaller than the threshold.
* 2. In the next run, the max possible delay will be schedule interval + previous max delay ms, and if it is
* still below the threshold, this function can return directly.
* We are not adopting such optimization right now because:
* 1. Extra state to maintain.
* 2. The schedule interval is supposed to be high.
* 3. The check is cheap when there is no stuck consumer.
*/
boolean scanStoreIngestionTaskToFixStuckConsumer = false;
for (KafkaConsumerService consumerService: kafkaServerToConsumerServiceMap.values()) {
long maxDelayMs = consumerService.getMaxElapsedTimeMSSinceLastPollInConsumerPool();
if (maxDelayMs >= stuckConsumerRepairThresholdMs) {
scanStoreIngestionTaskToFixStuckConsumer = true;
LOGGER.warn("Found some consumer has stuck for {} ms, will start the repairing procedure", maxDelayMs);
break;
}
}
if (!scanStoreIngestionTaskToFixStuckConsumer) {
return;
}
stuckConsumerRepairStats.recordStuckConsumerFound();

/**
* Collect a list of SITs, whose version topic doesn't exist by checking {@link StoreIngestionTask#isProducingVersionTopicHealthy()},
* and this function will continue to check the version topic healthiness for a period of {@link nonExistingTopicIngestionTaskKillThresholdMs}
* to tolerate transient topic discovery issue.
*/
Map<String, StoreIngestionTask> versionTopicIngestionTaskMappingForNonExistingTopic = new HashMap<>();
versionTopicStoreIngestionTaskMapping.forEach((vt, sit) -> {
try {
if (!sit.isProducingVersionTopicHealthy()) {
versionTopicIngestionTaskMappingForNonExistingTopic.put(vt, sit);
LOGGER.warn("The producing version topic:{} is not healthy", vt);
}
} catch (Exception e) {
LOGGER.error("Got exception while checking topic existence for version topic: {}", vt, e);
}
});
int maxAttempts =
(int) Math.ceil((double) nonExistingTopicIngestionTaskKillThresholdMs / nonExistingTopicRetryIntervalMs);
for (int cnt = 0; cnt < maxAttempts; ++cnt) {
Iterator<Map.Entry<String, StoreIngestionTask>> iterator =
versionTopicIngestionTaskMappingForNonExistingTopic.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, StoreIngestionTask> entry = iterator.next();
String versionTopic = entry.getKey();
StoreIngestionTask sit = entry.getValue();
try {
if (sit.isProducingVersionTopicHealthy()) {
/**
* If the version topic becomes available after retries, remove it from the tracking map.
*/
iterator.remove();
LOGGER.info("The producing version topic:{} becomes healthy", versionTopic);
}
} catch (Exception e) {
LOGGER.error("Got exception while checking topic existence for version topic: {}", versionTopic, e);
} finally {
Utils.sleep(nonExistingTopicRetryIntervalMs);
}
}
}

AtomicBoolean repairSomeIngestionTask = new AtomicBoolean(false);
versionTopicIngestionTaskMappingForNonExistingTopic.forEach((vt, sit) -> {
LOGGER.warn(
"The ingestion topics (version topic) are not healthy for "
+ "store version: {}, will kill the ingestion task to try to unblock shared consumer",
vt);
/**
* The following function call will interrupt all the stuck {@link org.apache.kafka.clients.producer.KafkaProducer#send} call
* to non-existing topics.
*/
sit.closeVeniceWriters(false);
killIngestionTaskRunnable.accept(vt);
repairSomeIngestionTask.set(true);
stuckConsumerRepairStats.recordIngestionTaskRepair();
});
if (!repairSomeIngestionTask.get()) {
LOGGER.error(
"Didn't find any suspicious ingestion task, and please contact developers to investigate it further");
stuckConsumerRepairStats.recordRepairFailure();
}
};
}

/**
Expand Down Expand Up @@ -235,6 +369,8 @@ public ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, L

consumerService.startConsumptionIntoDataReceiver(pubSubTopicPartition, lastOffset, dataReceiver);

versionTopicStoreIngestionTaskMapping.put(storeIngestionTask.getVersionTopic().getName(), storeIngestionTask);

return dataReceiver;
}

Expand All @@ -260,6 +396,7 @@ public long getLatestOffsetFor(
*/
void unsubscribeAll(PubSubTopic versionTopic) {
kafkaServerToConsumerServiceMap.values().forEach(consumerService -> consumerService.unsubscribeAll(versionTopic));
versionTopicStoreIngestionTaskMapping.remove(versionTopic.getName());
}

void pauseConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ protected KafkaConsumerService(
: createKafkaConsumerServiceStats(
metricsRepository,
kafkaClusterAlias,
this::getMaxElapsedTimeSinceLastPollInConsumerPool);
this::getMaxElapsedTimeMSSinceLastPollInConsumerPool);
for (int i = 0; i < numOfConsumersPerKafkaCluster; ++i) {
/**
* We need to assign a unique client id across all the storage nodes, otherwise, they will fail into the same throttling bucket.
Expand Down Expand Up @@ -324,7 +324,7 @@ private KafkaConsumerServiceStats createKafkaConsumerServiceStats(
getMaxElapsedTimeSinceLastPollInConsumerPool);
}

private long getMaxElapsedTimeSinceLastPollInConsumerPool() {
public long getMaxElapsedTimeMSSinceLastPollInConsumerPool() {
long maxElapsedTimeSinceLastPollInConsumerPool = -1;
int slowestTaskId = -1;
long elapsedTimeSinceLastPoll;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,8 @@ public void handleStoreDeleted(Store store) {
kafkaClusterBasedRecordThrottler,
metricsRepository,
new MetadataRepoBasedTopicExistingCheckerImpl(this.getMetadataRepo()),
pubSubDeserializer);
pubSubDeserializer,
(topicName) -> this.killConsumptionTask(topicName));
/**
* After initializing a {@link AggKafkaConsumerService} service, it doesn't contain KafkaConsumerService yet until
* a new Kafka cluster is registered; here we explicitly create KafkaConsumerService for the local Kafka cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public LeaderFollowerStoreIngestionTask(
}

@Override
protected void closeVeniceWriters(boolean doFlush) {
public void closeVeniceWriters(boolean doFlush) {
if (veniceWriter.isPresent()) {
veniceWriter.get().close(doFlush);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,13 @@ public void run() {
LOGGER.info("{} has been killed.", consumerTaskId);
statusReportAdapter.reportKilled(partitionConsumptionStateMap.values(), e);
doFlush = false;
if (isCurrentVersion.getAsBoolean()) {
/**
* Current version can be killed if {@link AggKafkaConsumerService} discovers there are some issues with
* the producing topics, and here will report metrics for such case.
*/
handleIngestionException(e);
}
} catch (VeniceChecksumException e) {
/**
* It's possible to receive checksum verification failure exception here from the above syncOffset() call.
Expand Down Expand Up @@ -1521,7 +1528,7 @@ private void internalClose(boolean doFlush) {
LOGGER.info("Store ingestion task for store: {} is closed", kafkaVersionTopic);
}

protected void closeVeniceWriters(boolean doFlush) {
public void closeVeniceWriters(boolean doFlush) {
}

protected void closeVeniceViewWriters() {
Expand Down Expand Up @@ -3387,6 +3394,10 @@ public PubSubTopic getVersionTopic() {
return versionTopic;
}

public PubSubTopic getRealtimeTopic() {
return realTimeTopic;
}

public boolean isMetricsEmissionEnabled() {
return emitMetrics.get();
}
Expand Down Expand Up @@ -3777,4 +3788,21 @@ protected boolean shouldUpdateUpstreamOffset(PubSubMessage<KafkaKey, KafkaMessag
protected void maybeSendIngestionHeartbeat() {
// No op, heartbeat is only useful for L/F hybrid stores.
}

/**
* This function is checking the following conditions:
* 1. Whether the version topic exists or not.
*/
public boolean isProducingVersionTopicHealthy() {
if (isDaVinciClient) {
/**
* DaVinci doesn't produce to any topics.
*/
return true;
}
if (!topicManagerRepository.getTopicManager().containsTopic(this.versionTopic)) {
return false;
}
return true;
}
}
Loading

0 comments on commit 59bcfe4

Please sign in to comment.