Skip to content

Commit

Permalink
[dvc][common][server][test]leader complete status part 2: DVC and sta…
Browse files Browse the repository at this point in the history
…ndby completes based on the consumed leader complete status (#775)

Part 2 on top of PR 741
* [Deadlock issue from part 1]: In part 1, we made a change that caused the leader replicas to send a heartbeat with leader completed header to VT when they report COMPLETED. This was done by the drainer thread using the same producer that originally produced the message that made the leader to be marked COMPLETED, which created a circular loop: [producer -> producer buffer -> VT -> callback -> drainer queue -> producer]. This led to a deadlock situation when both the producer buffer and the drainer queue were full, and the first message in the drainer queue made the leader report COMPLETED. This resulted in the drainer being blocked on the producer buffer being full, and the producer being blocked on the drainer queue being full. To avoid this, we removed the part that sent the heartbeat to VT directly, and instead we set the LFSIT#lastSendIngestionHeartbeatTimestamp to 0 such that maybeSendIngestionHeartbeat() force sends the HB to RT whenever a leader partition is reported completed apart from the interval based one.
* Also added another change wrt amplification factor: Leader partition reads HB from RT and propagates the heartbeat to all sub partitions' VT
* In maybeSendIngestionHeartbeat(): Don't send heartbeat to non-existing RT topics if amplification factor is enabled resulting in stuck consumption task.
* Followers reads the header added in heartbeat and updates its state in PartitionConsumptionState which will be used when checking whether lag is acceptable or not for both offset based lag and time-based lag for both DaVinci and STANDYBY replicas.
* Found some issue with stores without AA: In non-AA stores, consumer task is unable to read HB messages from RT topic (though a test consumer can still read it), leading to standby replicas waiting for leader completion state header indefinitely, so disabling this feature for non-AA stores until that issue is resolved.

* New configs:
"server.leader.complete.state.check.in.follower.enabled" => to enable this feature. Disabled by default and should be enabled only after Venice tag 0.4.154 is fully rolled out which supports sending HBs.
"server.leader.complete.state.check.in.follower.valid.interval.ms" => configure the time interval within which the HB should be considered valid. Default 5mins.
  • Loading branch information
m-nagarajan authored Dec 8, 2023
1 parent ceff23f commit 8541dc4
Show file tree
Hide file tree
Showing 24 changed files with 1,491 additions and 349 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_CONSUMER_OFFSET_COLLECTION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_MAX_POLL_RECORDS;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_PRODUCER_POOL_SIZE_PER_KAFKA_CLUSTER;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEAN_UP_INTERVAL_IN_MINUTES;
import static com.linkedin.venice.ConfigKeys.SERVER_LOCAL_CONSUMER_CONFIG_PREFIX;
Expand Down Expand Up @@ -449,7 +451,8 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final int metaStoreWriterCloseConcurrency;

private final long ingestionHeartbeatIntervalMs;

private final boolean leaderCompleteStateCheckInFollowerEnabled;
private final long leaderCompleteStateCheckInFollowerValidIntervalMs;
private final boolean stuckConsumerRepairEnabled;
private final int stuckConsumerRepairIntervalSecond;
private final int stuckConsumerDetectionRepairThresholdSecond;
Expand Down Expand Up @@ -757,6 +760,10 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getInt(SERVER_NON_EXISTING_TOPIC_INGESTION_TASK_KILL_THRESHOLD_SECOND, 15 * 60); // 15 mins
nonExistingTopicCheckRetryIntervalSecond =
serverProperties.getInt(SERVER_NON_EXISTING_TOPIC_CHECK_RETRY_INTERNAL_SECOND, 60); // 1min
leaderCompleteStateCheckInFollowerEnabled =
serverProperties.getBoolean(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_ENABLED, false);
leaderCompleteStateCheckInFollowerValidIntervalMs = serverProperties
.getLong(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, TimeUnit.MINUTES.toMillis(5));
}

long extractIngestionMemoryLimit(
Expand Down Expand Up @@ -1311,6 +1318,14 @@ public long getIngestionHeartbeatIntervalMs() {
return ingestionHeartbeatIntervalMs;
}

public boolean isLeaderCompleteStateCheckInFollowerEnabled() {
return leaderCompleteStateCheckInFollowerEnabled;
}

public long getLeaderCompleteStateCheckInFollowerValidIntervalMs() {
return leaderCompleteStateCheckInFollowerValidIntervalMs;
}

public boolean isStuckConsumerRepairEnabled() {
return stuckConsumerRepairEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.LEADER;
import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.STANDBY;
import static com.linkedin.venice.VeniceConstants.REWIND_TIME_DECIDED_BY_SERVER;
import static com.linkedin.venice.writer.LeaderCompleteState.LEADER_COMPLETE_STATE_UNKNOWN;
import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
Expand Down Expand Up @@ -1267,6 +1268,67 @@ public boolean isTransientRecordBufferUsed() {
return true;
}

/**
* Checks whether the lag is acceptable for hybrid stores
* <p>
* If the instance is a standby or DaVinciClient: Also check if <br>
* 1. the feature is enabled <br>
* 2. first HB SOS is received and <br>
* 3. leaderCompleteStatus header has been received and <br>
* 4. leader was completed and <br>
* 5. the last update time was within the configured time interval
*/
@Override
protected boolean checkAndLogIfLagIsAcceptableForHybridStore(
PartitionConsumptionState pcs,
long offsetLag,
long offsetThreshold,
boolean shouldLogLag,
boolean isOffsetBasedLag,
long latestConsumedProducerTimestamp) {
boolean isLagAcceptable = offsetLag <= offsetThreshold;

if (isLagAcceptable && isHybridFollower(pcs)) {
if (!getServerConfig().isLeaderCompleteStateCheckInFollowerEnabled()) {
isLagAcceptable = true;
} else if (!pcs.isFirstHeartBeatSOSReceived()) {
// wait for the first HB to know if the leader supports sending LeaderCompleteState or not
isLagAcceptable = false;
} else if (pcs.getLeaderCompleteState().equals(LEADER_COMPLETE_STATE_UNKNOWN)) {
// if the leader don't support LeaderCompleteState
isLagAcceptable = true;
} else {
// check if the leader is completed and the last update time was within the configured time
isLagAcceptable = pcs.isLeaderCompleted()
&& ((System.currentTimeMillis() - pcs.getLastLeaderCompleteStateUpdateInMs()) <= getServerConfig()
.getLeaderCompleteStateCheckInFollowerValidIntervalMs());
}
}

if (shouldLogLag) {
String lagLogFooter;
if (isHybridFollower(pcs)) {
lagLogFooter = ". Leader Complete State: {" + pcs.getLeaderCompleteState().toString()
+ "}, Last update In Ms: {" + pcs.getLastLeaderCompleteStateUpdateInMs() + "}.";
} else {
lagLogFooter = "";
}
LOGGER.info(
"{} [{} lag] partition {} is {}lagging. {}Lag: [{}] {} Threshold [{}]{}",
isOffsetBasedLag ? "Offset" : "Time",
consumerTaskId,
pcs.getPartition(),
(isLagAcceptable ? "not " : ""),
(isOffsetBasedLag ? "" : "The latest producer timestamp is " + latestConsumedProducerTimestamp + ". "),
offsetLag,
(isLagAcceptable ? "<" : ">"),
offsetThreshold,
lagLogFooter);
}

return isLagAcceptable;
}

@Override
public long getRegionHybridOffsetLag(int regionId) {
StoreVersionState svs = storageEngine.getStoreVersionState();
Expand Down Expand Up @@ -1392,6 +1454,7 @@ protected long measureRTOffsetLagForMultiRegions(
}
}

/** used for metric purposes **/
@Override
public boolean isReadyToServeAnnouncedWithRTLag() {
if (!hybridStoreConfig.isPresent() || partitionConsumptionStateMap.isEmpty()) {
Expand Down
Loading

0 comments on commit 8541dc4

Please sign in to comment.