Skip to content

Commit

Permalink
[server][dvc][controller][test] Solidify PubSubConsumerAdapter APIs (#…
Browse files Browse the repository at this point in the history
…730)

The goal of this PR is to enhance reliability of the PubSubConsumerAdapter APIs  
by solidifying their contracts. One notable change is the behavior of the subscribe  
API, which has been updated to ensure that the specified topic-partition exists before  
attempting to subscribe to it. Integration tests have been added to validate these  
improved contracts. In addition, this PR addresses minor configuration extraction issues,  
removes unnecessary Kafka imports, and eliminates unused code that causes confusion.
  • Loading branch information
sushantmane authored Nov 3, 2023
1 parent fa03473 commit 6709c26
Show file tree
Hide file tree
Showing 24 changed files with 2,210 additions and 671 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_TASK_MAX_IDLE_COUNT;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_CONSUMER_OFFSET_COLLECTION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_MAX_POLL_RECORDS;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_POLL_RETRY_BACKOFF_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_POLL_RETRY_TIMES;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_PRODUCER_POOL_SIZE_PER_KAFKA_CLUSTER;
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEAN_UP_INTERVAL_IN_MINUTES;
Expand All @@ -92,6 +90,8 @@
import static com.linkedin.venice.ConfigKeys.SERVER_PARALLEL_BATCH_GET_CHUNK_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_PARTITION_GRACEFUL_DROP_DELAY_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_PUBSUB_CONSUMER_POLL_RETRY_BACKOFF_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_PUBSUB_CONSUMER_POLL_RETRY_TIMES;
import static com.linkedin.venice.ConfigKeys.SERVER_QUOTA_ENFORCEMENT_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_CONSUMER_CONFIG_PREFIX;
import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_INGESTION_REPAIR_SLEEP_INTERVAL_SECONDS;
Expand Down Expand Up @@ -294,9 +294,9 @@ public class VeniceServerConfig extends VeniceClusterConfig {

private final int kafkaMaxPollRecords;

private final int kafkaPollRetryTimes;
private final int pubSubConsumerPollRetryTimes;

private final int kafkaPollRetryBackoffMs;
private final int pubSubConsumerPollRetryBackoffMs;

/**
* The number of threads being used to serve compute request.
Expand Down Expand Up @@ -520,8 +520,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str

nodeCapacityInRcu = serverProperties.getLong(SERVER_NODE_CAPACITY_RCU, 100000);
kafkaMaxPollRecords = serverProperties.getInt(SERVER_KAFKA_MAX_POLL_RECORDS, 100);
kafkaPollRetryTimes = serverProperties.getInt(SERVER_KAFKA_POLL_RETRY_TIMES, 100);
kafkaPollRetryBackoffMs = serverProperties.getInt(SERVER_KAFKA_POLL_RETRY_BACKOFF_MS, 0);
pubSubConsumerPollRetryTimes = serverProperties.getInt(SERVER_PUBSUB_CONSUMER_POLL_RETRY_TIMES, 100);
pubSubConsumerPollRetryBackoffMs = serverProperties.getInt(SERVER_PUBSUB_CONSUMER_POLL_RETRY_BACKOFF_MS, 0);
diskHealthCheckIntervalInMS =
TimeUnit.SECONDS.toMillis(serverProperties.getLong(SERVER_DISK_HEALTH_CHECK_INTERVAL_IN_SECONDS, 10));
diskHealthCheckTimeoutInMs =
Expand Down Expand Up @@ -941,12 +941,12 @@ public int getKafkaMaxPollRecords() {
return kafkaMaxPollRecords;
}

public int getKafkaPollRetryTimes() {
return kafkaPollRetryTimes;
public int getPubSubConsumerPollRetryTimes() {
return pubSubConsumerPollRetryTimes;
}

public int getKafkaPollRetryBackoffMs() {
return kafkaPollRetryBackoffMs;
public int getPubSubConsumerPollRetryBackoffMs() {
return pubSubConsumerPollRetryBackoffMs;
}

public long getDiskHealthCheckIntervalInMS() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.davinci.consumer;

import static com.linkedin.venice.schema.rmd.RmdConstants.*;
import static com.linkedin.venice.schema.rmd.RmdConstants.REPLICATION_CHECKPOINT_VECTOR_FIELD_POS;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.davinci.repository.ThinClientMetaStoreBasedRepository;
Expand Down Expand Up @@ -67,7 +67,6 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -873,13 +872,6 @@ protected void switchToNewTopic(PubSubTopic newTopic, String topicSuffix, Intege
}
}

protected PubSubTopicPartition getPubSubTopicPartitionFromConsumerRecord(
ConsumerRecord<KafkaKey, KafkaMessageEnvelope> consumerRecord) {
return new PubSubTopicPartitionImpl(
pubSubTopicRepository.getTopic(consumerRecord.topic()),
consumerRecord.partition());
}

@Override
public void close() {
this.unsubscribeAll();
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import static com.linkedin.venice.ConfigKeys.KAFKA_AUTO_OFFSET_RESET_CONFIG;
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.ConfigKeys.KAFKA_CLIENT_ID_CONFIG;
import static com.linkedin.venice.ConfigKeys.KAFKA_CONSUMER_POLL_RETRY_BACKOFF_MS_CONFIG;
import static com.linkedin.venice.ConfigKeys.KAFKA_CONSUMER_POLL_RETRY_TIMES_CONFIG;
import static com.linkedin.venice.ConfigKeys.KAFKA_ENABLE_AUTO_COMMIT_CONFIG;
import static com.linkedin.venice.ConfigKeys.KAFKA_FETCH_MAX_BYTES_CONFIG;
import static com.linkedin.venice.ConfigKeys.KAFKA_FETCH_MAX_WAIT_MS_CONFIG;
Expand Down Expand Up @@ -1112,11 +1110,12 @@ private Properties getCommonKafkaConsumerProperties(VeniceServerConfig serverCon
kafkaConsumerProperties.setProperty(
KAFKA_MAX_PARTITION_FETCH_BYTES_CONFIG,
String.valueOf(serverConfig.getKafkaFetchPartitionMaxSizePerSecond()));
kafkaConsumerProperties
.setProperty(KAFKA_CONSUMER_POLL_RETRY_TIMES_CONFIG, String.valueOf(serverConfig.getKafkaPollRetryTimes()));
kafkaConsumerProperties.setProperty(
KAFKA_CONSUMER_POLL_RETRY_BACKOFF_MS_CONFIG,
String.valueOf(serverConfig.getKafkaPollRetryBackoffMs()));
PubSubConstants.PUBSUB_CONSUMER_POLL_RETRY_TIMES,
String.valueOf(serverConfig.getPubSubConsumerPollRetryTimes()));
kafkaConsumerProperties.setProperty(
PubSubConstants.PUBSUB_CONSUMER_POLL_RETRY_BACKOFF_MS,
String.valueOf(serverConfig.getPubSubConsumerPollRetryBackoffMs()));

return kafkaConsumerProperties;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


/**
* This class is serving as a {@link ConsumerRecord} buffer with an accompanying pool of drainer threads. The drainers
* This class is serving as a {@link PubSubMessage} buffer with an accompanying pool of drainer threads. The drainers
* pull records out of the buffer and delegate the persistence and validation to the appropriate {@link StoreIngestionTask}.
*
* High-level idea:
Expand All @@ -56,7 +55,7 @@ public class StoreBufferService extends AbstractStoreBufferService {
*/
private static class QueueNode implements Measurable {
/**
* Considering the overhead of {@link ConsumerRecord} and its internal structures.
* Considering the overhead of {@link PubSubMessage} and its internal structures.
*/
private static final int QUEUE_NODE_OVERHEAD_IN_BYTE = 256;
private final PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice;

import com.linkedin.venice.pubsub.PubSubConstants;
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerConfig;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig;
import com.linkedin.venice.pubsub.api.PubSubAdminAdapter;
Expand Down Expand Up @@ -51,10 +52,6 @@ private ConfigKeys() {
public static final String KAFKA_FETCH_MAX_WAIT_MS_CONFIG = ApacheKafkaConsumerConfig.KAFKA_FETCH_MAX_WAIT_MS_CONFIG;
public static final String KAFKA_MAX_PARTITION_FETCH_BYTES_CONFIG =
ApacheKafkaConsumerConfig.KAFKA_MAX_PARTITION_FETCH_BYTES_CONFIG;
public static final String KAFKA_CONSUMER_POLL_RETRY_TIMES_CONFIG =
ApacheKafkaConsumerConfig.KAFKA_CONSUMER_POLL_RETRY_TIMES_CONFIG;
public static final String KAFKA_CONSUMER_POLL_RETRY_BACKOFF_MS_CONFIG =
ApacheKafkaConsumerConfig.KAFKA_CONSUMER_POLL_RETRY_BACKOFF_MS_CONFIG;

public static final String KAFKA_ADMIN_GET_TOPIC_CONFIG_MAX_RETRY_TIME_SEC =
"kafka.admin.get.topic.config.max.retry.sec";
Expand Down Expand Up @@ -511,15 +508,17 @@ private ConfigKeys() {
public static final String SERVER_KAFKA_MAX_POLL_RECORDS = "server.kafka.max.poll.records";

/**
* This config is used to control how many times Kafka consumer would retry polling during ingestion
* This config is used to control how many times PubSub consumer would retry polling during ingestion
* when RetriableException happens.
*/
public static final String SERVER_KAFKA_POLL_RETRY_TIMES = "server.kafka.poll.retry.times";
public static final String SERVER_PUBSUB_CONSUMER_POLL_RETRY_TIMES =
"server." + PubSubConstants.PUBSUB_CONSUMER_POLL_RETRY_TIMES;

/**
* This config is used to control the backoff time between Kafka consumer poll retries.
* This config is used to control the backoff time between PubSub consumer poll retries.
*/
public static final String SERVER_KAFKA_POLL_RETRY_BACKOFF_MS = "server.kafka.poll.backoff.ms";
public static final String SERVER_PUBSUB_CONSUMER_POLL_RETRY_BACKOFF_MS =
"server." + PubSubConstants.PUBSUB_CONSUMER_POLL_RETRY_BACKOFF_MS;

/**
* This config decides the frequency of the disk health check; the disk health check service writes
Expand Down
Loading

0 comments on commit 6709c26

Please sign in to comment.