Skip to content

Commit

Permalink
[server] Add option to disable incremental push status update writes …
Browse files Browse the repository at this point in the history
…to Zookeeper or PS3

Currently, incremental push status updates are written to both Zookeeper and
PS3. As part of a phased approach to eventually discontinue Zookeeper updates,
this PR introduces a configuration option allowing selective control over where
these updates are written, enabling flexibility to disable updates to either
Zookeeper, PS3, or both.
  • Loading branch information
sushantmane authored Oct 9, 2024
1 parent fe01543 commit 53adf35
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_MAX_CONCURRENT_STREAMS;
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_MAX_FRAME_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_MAX_HEADER_LIST_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_INCREMENTAL_PUSH_STATUS_WRITE_MODE;
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_CHECKPOINT_DURING_GRACEFUL_SHUTDOWN_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_HEARTBEAT_INTERVAL_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_ISOLATION_APPLICATION_PORT;
Expand Down Expand Up @@ -507,6 +508,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final int metaStoreWriterCloseConcurrency;

private final boolean batchReportEOIPEnabled;
private final IncrementalPushStatusWriteMode incrementalPushStatusWriteMode;
private final long ingestionHeartbeatIntervalMs;
private final boolean leaderCompleteStateCheckInFollowerEnabled;
private final long leaderCompleteStateCheckInFollowerValidIntervalMs;
Expand Down Expand Up @@ -834,7 +836,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getLong(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, TimeUnit.MINUTES.toMillis(1));
batchReportEOIPEnabled =
serverProperties.getBoolean(SERVER_BATCH_REPORT_END_OF_INCREMENTAL_PUSH_STATUS_ENABLED, false);

incrementalPushStatusWriteMode =
extractIncPushStatusWriteMode(serverProperties.getString(SERVER_INCREMENTAL_PUSH_STATUS_WRITE_MODE, "DUAL"));
stuckConsumerRepairEnabled = serverProperties.getBoolean(SERVER_STUCK_CONSUMER_REPAIR_ENABLED, true);
stuckConsumerRepairIntervalSecond = serverProperties.getInt(SERVER_STUCK_CONSUMER_REPAIR_INTERVAL_SECOND, 60);
stuckConsumerDetectionRepairThresholdSecond =
Expand Down Expand Up @@ -1478,6 +1481,30 @@ public boolean getBatchReportEOIPEnabled() {
return batchReportEOIPEnabled;
}

public enum IncrementalPushStatusWriteMode {
/** Write incremental push status to Zookeeper only */
ZOOKEEPER_ONLY,

/** Write incremental push status to push status system store only */
PUSH_STATUS_SYSTEM_STORE_ONLY,

/** Write incremental push status to both Zookeeper and push status system store */
DUAL
}

public IncrementalPushStatusWriteMode extractIncPushStatusWriteMode(String mode) {
try {
return IncrementalPushStatusWriteMode.valueOf(mode);
} catch (IllegalArgumentException e) {
LOGGER.error("Invalid incremental push status write mode: {}. Defaulting to DUAL", mode);
return IncrementalPushStatusWriteMode.DUAL;
}
}

public IncrementalPushStatusWriteMode getIncrementalPushStatusWriteMode() {
return incrementalPushStatusWriteMode;
}

public boolean isLeaderCompleteStateCheckInFollowerEnabled() {
return leaderCompleteStateCheckInFollowerEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ private void asyncStart() {
partitionPushStatusAccessor,
statusStoreWriter,
helixReadOnlyStoreRepository,
instance.getNodeId());
instance.getNodeId(),
veniceServerConfig.getIncrementalPushStatusWriteMode());

ingestionBackend.getStoreIngestionService().addIngestionNotifier(pushStatusNotifier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static com.linkedin.venice.pushmonitor.ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.TOPIC_SWITCH_RECEIVED;

import com.linkedin.davinci.config.VeniceServerConfig.IncrementalPushStatusWriteMode;
import com.linkedin.venice.common.PushStatusStoreUtils;
import com.linkedin.venice.helix.HelixPartitionStatusAccessor;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
Expand Down Expand Up @@ -39,18 +40,21 @@ public class PushStatusNotifier implements VeniceNotifier {
private final PushStatusStoreWriter pushStatusStoreWriter;
private final ReadOnlyStoreRepository storeRepository;
private final String instanceId;
private final IncrementalPushStatusWriteMode incrementalPushStatusWriteMode;

public PushStatusNotifier(
OfflinePushAccessor offlinePushAccessor,
HelixPartitionStatusAccessor helixPartitionStatusAccessor,
PushStatusStoreWriter pushStatusStoreWriter,
ReadOnlyStoreRepository storeRepository,
String instanceId) {
String instanceId,
IncrementalPushStatusWriteMode incrementalPushStatusWriteMode) {
this.offLinePushAccessor = offlinePushAccessor;
this.helixPartitionStatusAccessor = helixPartitionStatusAccessor;
this.pushStatusStoreWriter = pushStatusStoreWriter;
this.storeRepository = storeRepository;
this.instanceId = instanceId;
this.incrementalPushStatusWriteMode = incrementalPushStatusWriteMode;
}

@Override
Expand Down Expand Up @@ -116,16 +120,28 @@ public void dataRecoveryCompleted(String kafkaTopic, int partitionId, long offse

@Override
public void startOfIncrementalPushReceived(String topic, int partitionId, long offset, String message) {
offLinePushAccessor
.updateReplicaStatus(topic, partitionId, instanceId, START_OF_INCREMENTAL_PUSH_RECEIVED, offset, message);
updateIncrementalPushStatusToPushStatusStore(topic, message, partitionId, START_OF_INCREMENTAL_PUSH_RECEIVED);
updateIncrementalPushStatus(topic, partitionId, offset, message, START_OF_INCREMENTAL_PUSH_RECEIVED);
}

@Override
public void endOfIncrementalPushReceived(String topic, int partitionId, long offset, String message) {
offLinePushAccessor
.updateReplicaStatus(topic, partitionId, instanceId, END_OF_INCREMENTAL_PUSH_RECEIVED, offset, message);
updateIncrementalPushStatusToPushStatusStore(topic, message, partitionId, END_OF_INCREMENTAL_PUSH_RECEIVED);
updateIncrementalPushStatus(topic, partitionId, offset, message, END_OF_INCREMENTAL_PUSH_RECEIVED);
}

private void updateIncrementalPushStatus(
String topic,
int partitionId,
long offset,
String message,
ExecutionStatus status) {
if (incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.ZOOKEEPER_ONLY
|| incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.DUAL) {
offLinePushAccessor.updateReplicaStatus(topic, partitionId, instanceId, status, offset, message);
}
if (incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.PUSH_STATUS_SYSTEM_STORE_ONLY
|| incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.DUAL) {
updateIncrementalPushStatusToPushStatusStore(topic, message, partitionId, status);
}
}

@Override
Expand All @@ -135,15 +151,21 @@ public void batchEndOfIncrementalPushReceived(
long offset,
List<String> pendingReportIncPushVersionList) {

offLinePushAccessor
.batchUpdateReplicaIncPushStatus(topic, partitionId, instanceId, offset, pendingReportIncPushVersionList);
// We don't need to report redundant SOIP for these stale inc push versions as they've all received EOIP.
for (String incPushVersion: pendingReportIncPushVersionList) {
updateIncrementalPushStatusToPushStatusStore(
topic,
incPushVersion,
partitionId,
END_OF_INCREMENTAL_PUSH_RECEIVED);
if (incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.ZOOKEEPER_ONLY
|| incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.DUAL) {
offLinePushAccessor
.batchUpdateReplicaIncPushStatus(topic, partitionId, instanceId, offset, pendingReportIncPushVersionList);
}
if (incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.PUSH_STATUS_SYSTEM_STORE_ONLY
|| incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.DUAL) {
// We don't need to report redundant SOIP for these stale inc push versions as they've all received EOIP.
for (String incPushVersion: pendingReportIncPushVersionList) {
updateIncrementalPushStatusToPushStatusStore(
topic,
incPushVersion,
partitionId,
END_OF_INCREMENTAL_PUSH_RECEIVED);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,65 @@
package com.linkedin.davinci.notifier;

import static com.linkedin.davinci.config.VeniceServerConfig.IncrementalPushStatusWriteMode.DUAL;
import static com.linkedin.davinci.config.VeniceServerConfig.IncrementalPushStatusWriteMode.PUSH_STATUS_SYSTEM_STORE_ONLY;
import static com.linkedin.davinci.config.VeniceServerConfig.IncrementalPushStatusWriteMode.ZOOKEEPER_ONLY;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.linkedin.davinci.config.VeniceServerConfig.IncrementalPushStatusWriteMode;
import com.linkedin.venice.helix.HelixPartitionStatusAccessor;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.pushmonitor.OfflinePushAccessor;
import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter;
import java.util.ArrayList;
import java.util.List;
import org.apache.helix.HelixException;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;


public class TestPushStatusNotifier {
public static final String INSTANCE_ID = "instance1";
private OfflinePushAccessor offlinePushAccessor;
private HelixPartitionStatusAccessor helixPartitionStatusAccessor;
private PushStatusStoreWriter pushStatusStoreWriter;
private ReadOnlyStoreRepository storeRepository;
private PushStatusNotifier notifier;
private static final String STORE_NAME = "test_store";
private static final int STORE_VERSION = 1;
private static final String TOPIC = "test_store_v1";
private static final int PARTITION_ID = 1;
private static final long OFFSET = 12345L;
private static final String MESSAGE = "Test Message";

@BeforeMethod
public void setUp() {
offlinePushAccessor = mock(OfflinePushAccessor.class);
helixPartitionStatusAccessor = mock(HelixPartitionStatusAccessor.class);
pushStatusStoreWriter = mock(PushStatusStoreWriter.class);
storeRepository = mock(ReadOnlyStoreRepository.class);
Store store = mock(Store.class);
when(store.isDaVinciPushStatusStoreEnabled()).thenReturn(true);
when(storeRepository.getStoreOrThrow(any())).thenReturn(store);
}

@Test
public void testCompleteCVUpdate() {
OfflinePushAccessor offlinePushAccessor = mock(OfflinePushAccessor.class);
HelixPartitionStatusAccessor helixPartitionStatusAccessor = mock(HelixPartitionStatusAccessor.class);
PushStatusStoreWriter pushStatusStoreWriter = mock(PushStatusStoreWriter.class);
ReadOnlyStoreRepository storeRepository = mock(ReadOnlyStoreRepository.class);
String topic = "abc_v1";
String host = "localhost";
Expand All @@ -30,7 +69,8 @@ public void testCompleteCVUpdate() {
helixPartitionStatusAccessor,
pushStatusStoreWriter,
storeRepository,
host);
host,
DUAL);
statusNotifier.completed(topic, 1, 1, "");
verify(offlinePushAccessor, times(1)).updateReplicaStatus(topic, 1, host, ExecutionStatus.COMPLETED, 1, "");

Expand All @@ -50,4 +90,130 @@ public void testCompleteCVUpdate() {
statusNotifier.endOfPushReceived(topic, 1, 1, "");
statusNotifier.progress(topic, 1, 1, "");
}

@DataProvider(name = "pushStatusWriteModes")
public Object[][] pushStatusWriteModes() {
return new Object[][] { { ZOOKEEPER_ONLY, true, false }, { PUSH_STATUS_SYSTEM_STORE_ONLY, false, true },
{ DUAL, true, true } };
}

@Test(dataProvider = "pushStatusWriteModes")
public void testStartOfIncrementalPushReceived(
IncrementalPushStatusWriteMode mode,
boolean expectZookeeper,
boolean expectPushStatusStore) {

notifier = new PushStatusNotifier(
offlinePushAccessor,
helixPartitionStatusAccessor,
pushStatusStoreWriter,
storeRepository,
"instance1",
mode);

notifier.startOfIncrementalPushReceived(TOPIC, PARTITION_ID, OFFSET, MESSAGE);

if (expectZookeeper) {
verify(offlinePushAccessor, times(1))
.updateReplicaStatus(TOPIC, PARTITION_ID, INSTANCE_ID, START_OF_INCREMENTAL_PUSH_RECEIVED, OFFSET, MESSAGE);
} else {
verify(offlinePushAccessor, never())
.updateReplicaStatus(anyString(), anyInt(), anyString(), any(ExecutionStatus.class), anyLong(), anyString());
}

if (expectPushStatusStore) {
verify(pushStatusStoreWriter, times(1)).writePushStatus(
eq(STORE_NAME),
eq(STORE_VERSION),
eq(PARTITION_ID),
eq(START_OF_INCREMENTAL_PUSH_RECEIVED),
any(),
any());
} else {
verify(pushStatusStoreWriter, never())
.writePushStatus(anyString(), anyInt(), anyInt(), any(ExecutionStatus.class), any());
}
}

@Test(dataProvider = "pushStatusWriteModes")
public void testEndOfIncrementalPushReceived(
IncrementalPushStatusWriteMode mode,
boolean expectZookeeper,
boolean expectPushStatusStore) {

notifier = new PushStatusNotifier(
offlinePushAccessor,
helixPartitionStatusAccessor,
pushStatusStoreWriter,
storeRepository,
INSTANCE_ID,
mode);

notifier.endOfIncrementalPushReceived(TOPIC, PARTITION_ID, OFFSET, MESSAGE);

if (expectZookeeper) {
verify(offlinePushAccessor, times(1))
.updateReplicaStatus(TOPIC, PARTITION_ID, INSTANCE_ID, END_OF_INCREMENTAL_PUSH_RECEIVED, OFFSET, MESSAGE);
} else {
verify(offlinePushAccessor, never())
.updateReplicaStatus(anyString(), anyInt(), anyString(), any(ExecutionStatus.class), anyLong(), anyString());
}

if (expectPushStatusStore) {
verify(pushStatusStoreWriter, times(1)).writePushStatus(
eq(STORE_NAME),
eq(STORE_VERSION),
eq(PARTITION_ID),
eq(END_OF_INCREMENTAL_PUSH_RECEIVED),
any(),
any());
} else {
verify(pushStatusStoreWriter, never())
.writePushStatus(anyString(), anyInt(), anyInt(), any(ExecutionStatus.class), any());
}
}

@Test(dataProvider = "pushStatusWriteModes")
public void testBatchEndOfIncrementalPushReceived(
IncrementalPushStatusWriteMode mode,
boolean expectZookeeper,
boolean expectPushStatusStore) {

notifier = new PushStatusNotifier(
offlinePushAccessor,
helixPartitionStatusAccessor,
pushStatusStoreWriter,
storeRepository,
INSTANCE_ID,
mode);

List<String> incPushVersions = new ArrayList<>();
incPushVersions.add("inc_push_version_1");
incPushVersions.add("inc_push_version_2");
incPushVersions.add("inc_push_version_3");
incPushVersions.add("inc_push_version_4");

notifier.batchEndOfIncrementalPushReceived(TOPIC, PARTITION_ID, OFFSET, incPushVersions);

if (expectZookeeper) {
verify(offlinePushAccessor, times(1))
.batchUpdateReplicaIncPushStatus(TOPIC, PARTITION_ID, INSTANCE_ID, OFFSET, incPushVersions);
} else {
verify(offlinePushAccessor, never())
.batchUpdateReplicaIncPushStatus(anyString(), anyInt(), anyString(), anyLong(), any());
}

if (expectPushStatusStore) {
verify(pushStatusStoreWriter, times(4)).writePushStatus(
eq(STORE_NAME),
eq(STORE_VERSION),
eq(PARTITION_ID),
eq(END_OF_INCREMENTAL_PUSH_RECEIVED),
any(),
any());
} else {
verify(pushStatusStoreWriter, never())
.writePushStatus(anyString(), anyInt(), anyInt(), any(ExecutionStatus.class), any());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,11 @@ private ConfigKeys() {
public static final String SERVER_BATCH_REPORT_END_OF_INCREMENTAL_PUSH_STATUS_ENABLED =
"server.batch.report.end.of.incremental.push.status.enabled";

/**
* This config dictates where the server should write the end of incremental push status.
*/
public static final String SERVER_INCREMENTAL_PUSH_STATUS_WRITE_MODE = "server.incremental.push.status.write.mode";

/**
* whether to enable checksum verification in the ingestion path from kafka to database persistency. If enabled it will
* keep a running checksum for all and only PUT kafka data message received in the ingestion task and periodically
Expand Down
Loading

0 comments on commit 53adf35

Please sign in to comment.