Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Fix mismatch between hybrid version partition count and real-time partition count #1338

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,7 @@ private boolean canSwitchToLeaderTopic(PartitionConsumptionState pcs) {
private boolean isLocalVersionTopicPartitionFullyConsumed(PartitionConsumptionState pcs) {
long localVTOff = pcs.getLatestProcessedLocalVersionTopicOffset();
long localVTEndOffset = getTopicPartitionEndOffSet(localKafkaServer, versionTopic, pcs.getPartition());

sushantmane marked this conversation as resolved.
Show resolved Hide resolved
if (localVTEndOffset == StatsErrorCode.LAG_MEASUREMENT_FAILURE.code) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.LogMessages.KILLED_JOB_MESSAGE;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT;
import static com.linkedin.venice.pubsub.PubSubConstants.UNKNOWN_LATEST_OFFSET;
import static com.linkedin.venice.utils.Utils.FATAL_DATA_VALIDATION_ERROR;
import static com.linkedin.venice.utils.Utils.getReplicaId;
import static java.util.Comparator.comparingInt;
Expand Down Expand Up @@ -2319,16 +2320,17 @@ private void reportStoreVersionTopicOffsetRewindMetrics(PartitionConsumptionStat
* written to, the end offset is 0.
*/
protected long getTopicPartitionEndOffSet(String kafkaUrl, PubSubTopic pubSubTopic, int partition) {
long offsetFromConsumer = aggKafkaConsumerService
.getLatestOffsetBasedOnMetrics(kafkaUrl, versionTopic, new PubSubTopicPartitionImpl(pubSubTopic, partition));
PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(pubSubTopic, partition);
long offsetFromConsumer =
aggKafkaConsumerService.getLatestOffsetBasedOnMetrics(kafkaUrl, versionTopic, topicPartition);
if (offsetFromConsumer >= 0) {
return offsetFromConsumer;
}
try {
return RetryUtils.executeWithMaxAttemptAndExponentialBackoff(() -> {
long offset = getTopicManager(kafkaUrl).getLatestOffsetCachedNonBlocking(pubSubTopic, partition);
if (offset == -1) {
throw new VeniceException("Found latest offset -1");
if (offset == UNKNOWN_LATEST_OFFSET) {
throw new VeniceException("Latest offset is unknown. Check if the topic: " + topicPartition + " exists.");
}
return offset;
},
Expand Down Expand Up @@ -3076,6 +3078,13 @@ private boolean processControlMessage(
processEndOfIncrementalPush(controlMessage, partitionConsumptionState);
break;
case TOPIC_SWITCH:
TopicSwitch topicSwitch = (TopicSwitch) controlMessage.controlMessageUnion;
LOGGER.info(
"Received {} control message. Replica: {}, Offset: {} NewSource: {}",
type.name(),
partitionConsumptionState.getReplicaId(),
offset,
topicSwitch.getSourceKafkaServers());
checkReadyToServeAfterProcess =
processTopicSwitch(controlMessage, partition, offset, partitionConsumptionState);
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package com.linkedin.venice.controllerapi;

import com.linkedin.venice.meta.Version.PushType;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;


public class RequestTopicForPushRequest {
private final String clusterName;
private final String storeName;
private final PushType pushType;
private final String pushJobId;

private boolean sendStartOfPush = false;
private boolean sorted = false; // an inefficient but safe default
private boolean isWriteComputeEnabled = false;
private boolean separateRealTimeTopicEnabled = false;
private long rewindTimeInSecondsOverride = -1L;
private boolean deferVersionSwap = false;
private String targetedRegions = null;
private int repushSourceVersion = -1;
private Set<String> partitioners = Collections.emptySet();
private String compressionDictionary = null;
private X509Certificate certificateInRequest = null;
private String sourceGridFabric = null;
private String emergencySourceRegion = null;

public RequestTopicForPushRequest(String clusterName, String storeName, PushType pushType, String pushJobId) {
if (clusterName == null || clusterName.isEmpty()) {
throw new IllegalArgumentException("clusterName is required");
}
if (storeName == null || storeName.isEmpty()) {
throw new IllegalArgumentException("storeName is required");
}
if (pushType == null) {
throw new IllegalArgumentException("pushType is required");
}

if (pushJobId == null || pushJobId.isEmpty()) {
throw new IllegalArgumentException("pushJobId is required");
}

this.clusterName = clusterName;
this.storeName = storeName;
this.pushType = pushType;
this.pushJobId = pushJobId;
}

public String getClusterName() {
return clusterName;
}

public String getStoreName() {
return storeName;
}

public PushType getPushType() {
return pushType;
}

public String getPushJobId() {
return pushJobId;
}

public boolean isSendStartOfPush() {
return sendStartOfPush;
}

public boolean isSorted() {
return sorted;
}

public boolean isWriteComputeEnabled() {
return isWriteComputeEnabled;
}

public String getSourceGridFabric() {
return sourceGridFabric;
}

public long getRewindTimeInSecondsOverride() {
return rewindTimeInSecondsOverride;
}

public boolean isDeferVersionSwap() {
return deferVersionSwap;
}

public String getTargetedRegions() {
return targetedRegions;
}

public int getRepushSourceVersion() {
return repushSourceVersion;
}

public Set<String> getPartitioners() {
return partitioners;
}

public String getCompressionDictionary() {
return compressionDictionary;
}

public X509Certificate getCertificateInRequest() {
return certificateInRequest;
}

public String getEmergencySourceRegion() {
return emergencySourceRegion;
}

public void setSendStartOfPush(boolean sendStartOfPush) {
this.sendStartOfPush = sendStartOfPush;
}

public void setSorted(boolean sorted) {
this.sorted = sorted;
}

public void setWriteComputeEnabled(boolean writeComputeEnabled) {
isWriteComputeEnabled = writeComputeEnabled;
}

public void setSourceGridFabric(String sourceGridFabric) {
this.sourceGridFabric = sourceGridFabric;
}

public void setRewindTimeInSecondsOverride(long rewindTimeInSecondsOverride) {
this.rewindTimeInSecondsOverride = rewindTimeInSecondsOverride;
}

public void setDeferVersionSwap(boolean deferVersionSwap) {
this.deferVersionSwap = deferVersionSwap;
}

public void setTargetedRegions(String targetedRegions) {
this.targetedRegions = targetedRegions;
}

public void setRepushSourceVersion(int repushSourceVersion) {
this.repushSourceVersion = repushSourceVersion;
}

public void setPartitioners(String commaSeparatedPartitioners) {
if (commaSeparatedPartitioners == null || commaSeparatedPartitioners.isEmpty()) {
return;
}
setPartitioners(new HashSet<>(Arrays.asList(commaSeparatedPartitioners.split(","))));
}

public void setPartitioners(Set<String> partitioners) {
this.partitioners = partitioners;
}

public void setCompressionDictionary(String compressionDictionary) {
this.compressionDictionary = compressionDictionary;
}

public void setCertificateInRequest(X509Certificate certificateInRequest) {
this.certificateInRequest = certificateInRequest;
}

public void setEmergencySourceRegion(String emergencySourceRegion) {
this.emergencySourceRegion = emergencySourceRegion;
}

public boolean isSeparateRealTimeTopicEnabled() {
return separateRealTimeTopicEnabled;
}

public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
this.separateRealTimeTopicEnabled = separateRealTimeTopicEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
import com.linkedin.venice.systemstore.schemas.StoreVersion;
import com.linkedin.venice.views.VeniceView;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;


/**
Expand Down Expand Up @@ -41,11 +40,21 @@ public interface Version extends Comparable<Version>, DataModelBackedStructure<S
*/
enum PushType {
BATCH(0), // Batch jobs will create a new version topic and write to it in a batch manner.
STREAM_REPROCESSING(1), // reprocessing jobs will create a new version topic and a reprocessing topic.
STREAM_REPROCESSING(1), // Reprocessing jobs will create a new version topic and a reprocessing topic.
STREAM(2), // Stream jobs will write to a buffer or RT topic.
INCREMENTAL(3); // Incremental jobs will re-use an existing version topic and write on top of it.

private final int value;
private static final Map<Integer, PushType> VALUE_TO_TYPE_MAP = new HashMap<>(4);
private static final Map<String, PushType> NAME_TO_TYPE_MAP = new HashMap<>(4);

// Static initializer for map population
static {
for (PushType type: PushType.values()) {
VALUE_TO_TYPE_MAP.put(type.value, type);
NAME_TO_TYPE_MAP.put(type.name(), type);
}
}

PushType(int value) {
this.value = value;
Expand All @@ -68,15 +77,41 @@ public boolean isStreamReprocessing() {
}

public boolean isBatchOrStreamReprocessing() {
return isBatch() || isStreamReprocessing();
return this == BATCH || this == STREAM_REPROCESSING;
}

/**
* Retrieve the PushType based on its integer value.
*
* @param value the integer value of the PushType
* @return the corresponding PushType
* @throws VeniceException if the value is invalid
*/
public static PushType valueOf(int value) {
Optional<PushType> pushType = Arrays.stream(values()).filter(p -> p.value == value).findFirst();
if (!pushType.isPresent()) {
PushType pushType = VALUE_TO_TYPE_MAP.get(value);
if (pushType == null) {
throw new VeniceException("Invalid push type with int value: " + value);
}
return pushType.get();
return pushType;
}

/**
* Extracts the PushType from its string name.
*
* @param pushTypeString the string representation of the PushType
* @return the corresponding PushType
* @throws IllegalArgumentException if the string is invalid
*/
public static PushType extractPushType(String pushTypeString) {
PushType pushType = NAME_TO_TYPE_MAP.get(pushTypeString);
if (pushType == null) {
throw new IllegalArgumentException(
String.format(
"%s is an invalid push type. Valid push types are: %s",
pushTypeString,
String.join(", ", NAME_TO_TYPE_MAP.keySet())));
}
return pushType;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class PubSubConstants {
* Default value of sleep interval for polling topic deletion status from ZK.
*/
public static final int PUBSUB_TOPIC_DELETION_STATUS_POLL_INTERVAL_MS_DEFAULT_VALUE = 2 * Time.MS_PER_SECOND;
public static final long UNKNOWN_LATEST_OFFSET = -12345;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any significance for this number or any negative number would work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*nit long should end in L


private static final Duration PUBSUB_OFFSET_API_TIMEOUT_DURATION_DEFAULT_VALUE_DEFAULT = Duration.ofMinutes(1);
private static Duration PUBSUB_OFFSET_API_TIMEOUT_DURATION_DEFAULT_VALUE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,17 @@ public boolean updateTopicRetention(
return false;
}

public boolean updateTopicRetentionWithRetries(PubSubTopic topicName, long expectedRetentionInMs) {
PubSubTopicConfiguration topicConfiguration = getCachedTopicConfig(topicName);
return RetryUtils.executeWithMaxAttemptAndExponentialBackoff(
() -> updateTopicRetention(topicName, expectedRetentionInMs, topicConfiguration),
5,
Duration.ofMillis(200),
Duration.ofSeconds(1),
Duration.ofMillis(2 * topicManagerContext.getPubSubOperationTimeoutMs()),
CREATE_TOPIC_RETRIABLE_EXCEPTIONS);
}

public void updateTopicCompactionPolicy(PubSubTopic topic, boolean expectedLogCompacted) {
updateTopicCompactionPolicy(topic, expectedLogCompacted, -1, Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ long getLatestOffsetCachedNonBlocking(PubSubTopicPartition pubSubTopicPartition)
if (cachedValue == null) {
cachedValue = latestOffsetCache.get(pubSubTopicPartition);
if (cachedValue == null) {
return -1;
return PubSubConstants.UNKNOWN_LATEST_OFFSET;
}
}
return cachedValue.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,12 @@ public static long parseLongFromString(String value, String fieldName) {
* any string that are not equal to 'true', We validate the string by our own.
*/
public static boolean parseBooleanFromString(String value, String fieldName) {
if (value == null) {
throw new VeniceHttpException(
HttpStatus.SC_BAD_REQUEST,
fieldName + " must be a boolean, but value is null",
ErrorType.BAD_REQUEST);
}
if (value.equalsIgnoreCase("true") || value.equalsIgnoreCase("false")) {
return Boolean.parseBoolean(value);
} else {
Expand Down
Loading
Loading