From 53adf352d4bfeceb0a3bb77f8299a1ba34b656a9 Mon Sep 17 00:00:00 2001 From: Sushant Mane Date: Wed, 9 Oct 2024 10:25:33 -0700 Subject: [PATCH] [server] Add option to disable incremental push status update writes to Zookeeper or PS3 Currently, incremental push status updates are written to both Zookeeper and PS3. As part of a phased approach to eventually discontinue Zookeeper updates, this PR introduces a configuration option allowing selective control over where these updates are written, enabling flexibility to disable updates to either Zookeeper, PS3, or both. --- .../davinci/config/VeniceServerConfig.java | 29 ++- .../helix/HelixParticipationService.java | 3 +- .../davinci/notifier/PushStatusNotifier.java | 54 ++++-- .../notifier/TestPushStatusNotifier.java | 176 +++++++++++++++++- .../java/com/linkedin/venice/ConfigKeys.java | 5 + .../davinci/notifier/LeaderErrorNotifier.java | 9 +- .../venice/endToEnd/PushStatusStoreTest.java | 14 +- .../pushmonitor/AbstractPushMonitor.java | 4 + 8 files changed, 268 insertions(+), 26 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index cbf6b393f3..336ee675c0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -90,6 +90,7 @@ import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_MAX_CONCURRENT_STREAMS; import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_MAX_FRAME_SIZE; import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_MAX_HEADER_LIST_SIZE; +import static com.linkedin.venice.ConfigKeys.SERVER_INCREMENTAL_PUSH_STATUS_WRITE_MODE; import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_CHECKPOINT_DURING_GRACEFUL_SHUTDOWN_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_HEARTBEAT_INTERVAL_MS; import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_ISOLATION_APPLICATION_PORT; @@ -507,6 +508,7 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final int metaStoreWriterCloseConcurrency; private final boolean batchReportEOIPEnabled; + private final IncrementalPushStatusWriteMode incrementalPushStatusWriteMode; private final long ingestionHeartbeatIntervalMs; private final boolean leaderCompleteStateCheckInFollowerEnabled; private final long leaderCompleteStateCheckInFollowerValidIntervalMs; @@ -834,7 +836,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map pendingReportIncPushVersionList) { - offLinePushAccessor - .batchUpdateReplicaIncPushStatus(topic, partitionId, instanceId, offset, pendingReportIncPushVersionList); - // We don't need to report redundant SOIP for these stale inc push versions as they've all received EOIP. - for (String incPushVersion: pendingReportIncPushVersionList) { - updateIncrementalPushStatusToPushStatusStore( - topic, - incPushVersion, - partitionId, - END_OF_INCREMENTAL_PUSH_RECEIVED); + if (incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.ZOOKEEPER_ONLY + || incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.DUAL) { + offLinePushAccessor + .batchUpdateReplicaIncPushStatus(topic, partitionId, instanceId, offset, pendingReportIncPushVersionList); + } + if (incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.PUSH_STATUS_SYSTEM_STORE_ONLY + || incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.DUAL) { + // We don't need to report redundant SOIP for these stale inc push versions as they've all received EOIP. + for (String incPushVersion: pendingReportIncPushVersionList) { + updateIncrementalPushStatusToPushStatusStore( + topic, + incPushVersion, + partitionId, + END_OF_INCREMENTAL_PUSH_RECEIVED); + } } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/notifier/TestPushStatusNotifier.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/notifier/TestPushStatusNotifier.java index 016b047ee2..53716a1b96 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/notifier/TestPushStatusNotifier.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/notifier/TestPushStatusNotifier.java @@ -1,26 +1,65 @@ package com.linkedin.davinci.notifier; +import static com.linkedin.davinci.config.VeniceServerConfig.IncrementalPushStatusWriteMode.DUAL; +import static com.linkedin.davinci.config.VeniceServerConfig.IncrementalPushStatusWriteMode.PUSH_STATUS_SYSTEM_STORE_ONLY; +import static com.linkedin.davinci.config.VeniceServerConfig.IncrementalPushStatusWriteMode.ZOOKEEPER_ONLY; +import static com.linkedin.venice.pushmonitor.ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED; +import static com.linkedin.venice.pushmonitor.ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import com.linkedin.davinci.config.VeniceServerConfig.IncrementalPushStatusWriteMode; import com.linkedin.venice.helix.HelixPartitionStatusAccessor; import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.meta.Store; import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.pushmonitor.OfflinePushAccessor; import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter; +import java.util.ArrayList; +import java.util.List; import org.apache.helix.HelixException; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class TestPushStatusNotifier { + public static final String INSTANCE_ID = "instance1"; + private OfflinePushAccessor offlinePushAccessor; + private HelixPartitionStatusAccessor helixPartitionStatusAccessor; + private PushStatusStoreWriter pushStatusStoreWriter; + private ReadOnlyStoreRepository storeRepository; + private PushStatusNotifier notifier; + private static final String STORE_NAME = "test_store"; + private static final int STORE_VERSION = 1; + private static final String TOPIC = "test_store_v1"; + private static final int PARTITION_ID = 1; + private static final long OFFSET = 12345L; + private static final String MESSAGE = "Test Message"; + + @BeforeMethod + public void setUp() { + offlinePushAccessor = mock(OfflinePushAccessor.class); + helixPartitionStatusAccessor = mock(HelixPartitionStatusAccessor.class); + pushStatusStoreWriter = mock(PushStatusStoreWriter.class); + storeRepository = mock(ReadOnlyStoreRepository.class); + Store store = mock(Store.class); + when(store.isDaVinciPushStatusStoreEnabled()).thenReturn(true); + when(storeRepository.getStoreOrThrow(any())).thenReturn(store); + } + @Test public void testCompleteCVUpdate() { - OfflinePushAccessor offlinePushAccessor = mock(OfflinePushAccessor.class); - HelixPartitionStatusAccessor helixPartitionStatusAccessor = mock(HelixPartitionStatusAccessor.class); - PushStatusStoreWriter pushStatusStoreWriter = mock(PushStatusStoreWriter.class); ReadOnlyStoreRepository storeRepository = mock(ReadOnlyStoreRepository.class); String topic = "abc_v1"; String host = "localhost"; @@ -30,7 +69,8 @@ public void testCompleteCVUpdate() { helixPartitionStatusAccessor, pushStatusStoreWriter, storeRepository, - host); + host, + DUAL); statusNotifier.completed(topic, 1, 1, ""); verify(offlinePushAccessor, times(1)).updateReplicaStatus(topic, 1, host, ExecutionStatus.COMPLETED, 1, ""); @@ -50,4 +90,130 @@ public void testCompleteCVUpdate() { statusNotifier.endOfPushReceived(topic, 1, 1, ""); statusNotifier.progress(topic, 1, 1, ""); } + + @DataProvider(name = "pushStatusWriteModes") + public Object[][] pushStatusWriteModes() { + return new Object[][] { { ZOOKEEPER_ONLY, true, false }, { PUSH_STATUS_SYSTEM_STORE_ONLY, false, true }, + { DUAL, true, true } }; + } + + @Test(dataProvider = "pushStatusWriteModes") + public void testStartOfIncrementalPushReceived( + IncrementalPushStatusWriteMode mode, + boolean expectZookeeper, + boolean expectPushStatusStore) { + + notifier = new PushStatusNotifier( + offlinePushAccessor, + helixPartitionStatusAccessor, + pushStatusStoreWriter, + storeRepository, + "instance1", + mode); + + notifier.startOfIncrementalPushReceived(TOPIC, PARTITION_ID, OFFSET, MESSAGE); + + if (expectZookeeper) { + verify(offlinePushAccessor, times(1)) + .updateReplicaStatus(TOPIC, PARTITION_ID, INSTANCE_ID, START_OF_INCREMENTAL_PUSH_RECEIVED, OFFSET, MESSAGE); + } else { + verify(offlinePushAccessor, never()) + .updateReplicaStatus(anyString(), anyInt(), anyString(), any(ExecutionStatus.class), anyLong(), anyString()); + } + + if (expectPushStatusStore) { + verify(pushStatusStoreWriter, times(1)).writePushStatus( + eq(STORE_NAME), + eq(STORE_VERSION), + eq(PARTITION_ID), + eq(START_OF_INCREMENTAL_PUSH_RECEIVED), + any(), + any()); + } else { + verify(pushStatusStoreWriter, never()) + .writePushStatus(anyString(), anyInt(), anyInt(), any(ExecutionStatus.class), any()); + } + } + + @Test(dataProvider = "pushStatusWriteModes") + public void testEndOfIncrementalPushReceived( + IncrementalPushStatusWriteMode mode, + boolean expectZookeeper, + boolean expectPushStatusStore) { + + notifier = new PushStatusNotifier( + offlinePushAccessor, + helixPartitionStatusAccessor, + pushStatusStoreWriter, + storeRepository, + INSTANCE_ID, + mode); + + notifier.endOfIncrementalPushReceived(TOPIC, PARTITION_ID, OFFSET, MESSAGE); + + if (expectZookeeper) { + verify(offlinePushAccessor, times(1)) + .updateReplicaStatus(TOPIC, PARTITION_ID, INSTANCE_ID, END_OF_INCREMENTAL_PUSH_RECEIVED, OFFSET, MESSAGE); + } else { + verify(offlinePushAccessor, never()) + .updateReplicaStatus(anyString(), anyInt(), anyString(), any(ExecutionStatus.class), anyLong(), anyString()); + } + + if (expectPushStatusStore) { + verify(pushStatusStoreWriter, times(1)).writePushStatus( + eq(STORE_NAME), + eq(STORE_VERSION), + eq(PARTITION_ID), + eq(END_OF_INCREMENTAL_PUSH_RECEIVED), + any(), + any()); + } else { + verify(pushStatusStoreWriter, never()) + .writePushStatus(anyString(), anyInt(), anyInt(), any(ExecutionStatus.class), any()); + } + } + + @Test(dataProvider = "pushStatusWriteModes") + public void testBatchEndOfIncrementalPushReceived( + IncrementalPushStatusWriteMode mode, + boolean expectZookeeper, + boolean expectPushStatusStore) { + + notifier = new PushStatusNotifier( + offlinePushAccessor, + helixPartitionStatusAccessor, + pushStatusStoreWriter, + storeRepository, + INSTANCE_ID, + mode); + + List incPushVersions = new ArrayList<>(); + incPushVersions.add("inc_push_version_1"); + incPushVersions.add("inc_push_version_2"); + incPushVersions.add("inc_push_version_3"); + incPushVersions.add("inc_push_version_4"); + + notifier.batchEndOfIncrementalPushReceived(TOPIC, PARTITION_ID, OFFSET, incPushVersions); + + if (expectZookeeper) { + verify(offlinePushAccessor, times(1)) + .batchUpdateReplicaIncPushStatus(TOPIC, PARTITION_ID, INSTANCE_ID, OFFSET, incPushVersions); + } else { + verify(offlinePushAccessor, never()) + .batchUpdateReplicaIncPushStatus(anyString(), anyInt(), anyString(), anyLong(), any()); + } + + if (expectPushStatusStore) { + verify(pushStatusStoreWriter, times(4)).writePushStatus( + eq(STORE_NAME), + eq(STORE_VERSION), + eq(PARTITION_ID), + eq(END_OF_INCREMENTAL_PUSH_RECEIVED), + any(), + any()); + } else { + verify(pushStatusStoreWriter, never()) + .writePushStatus(anyString(), anyInt(), anyInt(), any(ExecutionStatus.class), any()); + } + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 73fb3939cd..5c3eede7c8 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -778,6 +778,11 @@ private ConfigKeys() { public static final String SERVER_BATCH_REPORT_END_OF_INCREMENTAL_PUSH_STATUS_ENABLED = "server.batch.report.end.of.incremental.push.status.enabled"; + /** + * This config dictates where the server should write the end of incremental push status. + */ + public static final String SERVER_INCREMENTAL_PUSH_STATUS_WRITE_MODE = "server.incremental.push.status.write.mode"; + /** * whether to enable checksum verification in the ingestion path from kafka to database persistency. If enabled it will * keep a running checksum for all and only PUT kafka data message received in the ingestion task and periodically diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/davinci/notifier/LeaderErrorNotifier.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/davinci/notifier/LeaderErrorNotifier.java index 32d1428ad1..3edee9174e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/davinci/notifier/LeaderErrorNotifier.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/davinci/notifier/LeaderErrorNotifier.java @@ -3,6 +3,7 @@ import static com.linkedin.venice.common.VeniceSystemStoreUtils.*; import static com.linkedin.venice.pushmonitor.ExecutionStatus.*; +import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.venice.helix.HelixPartitionStatusAccessor; import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.pushmonitor.OfflinePushAccessor; @@ -23,7 +24,13 @@ public LeaderErrorNotifier( PushStatusStoreWriter writer, ReadOnlyStoreRepository repository, String instanceId) { - super(accessor, helixPartitionStatusAccessor, writer, repository, instanceId); + super( + accessor, + helixPartitionStatusAccessor, + writer, + repository, + instanceId, + VeniceServerConfig.IncrementalPushStatusWriteMode.DUAL); this.accessor = accessor; this.instanceId = instanceId; } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java index 1d1d6dbb20..e288ac0df2 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java @@ -5,6 +5,7 @@ import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX; import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED; +import static com.linkedin.venice.ConfigKeys.SERVER_INCREMENTAL_PUSH_STATUS_WRITE_MODE; import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; import static com.linkedin.venice.ConfigKeys.USE_PUSH_STATUS_STORE_FOR_INCREMENTAL_PUSH; import static com.linkedin.venice.common.PushStatusStoreUtils.SERVER_INCREMENTAL_PUSH_PREFIX; @@ -22,6 +23,7 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.davinci.client.DaVinciClient; import com.linkedin.davinci.client.DaVinciConfig; +import com.linkedin.davinci.config.VeniceServerConfig.IncrementalPushStatusWriteMode; import com.linkedin.venice.D2.D2ClientUtils; import com.linkedin.venice.client.store.AvroGenericStoreClient; import com.linkedin.venice.client.store.ClientConfig; @@ -37,6 +39,7 @@ import com.linkedin.venice.integration.utils.D2TestUtils; import com.linkedin.venice.integration.utils.DaVinciTestContext; import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; import com.linkedin.venice.integration.utils.VeniceRouterWrapper; import com.linkedin.venice.meta.Store; @@ -93,10 +96,17 @@ public void setUp() { extraProperties.setProperty(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1L)); // all tests in this class will be reading incremental push status from push status store extraProperties.setProperty(USE_PUSH_STATUS_STORE_FOR_INCREMENTAL_PUSH, String.valueOf(true)); + extraProperties.setProperty( + SERVER_INCREMENTAL_PUSH_STATUS_WRITE_MODE, + IncrementalPushStatusWriteMode.PUSH_STATUS_SYSTEM_STORE_ONLY.toString()); Utils.thisIsLocalhost(); - cluster = ServiceFactory - .getVeniceCluster(1, NUMBER_OF_SERVERS, 1, REPLICATION_FACTOR, 10000, false, false, extraProperties); + cluster = ServiceFactory.getVeniceCluster( + new VeniceClusterCreateOptions.Builder().numberOfServers(NUMBER_OF_SERVERS) + .replicationFactor(REPLICATION_FACTOR) + .partitionSize(10000) + .extraProperties(extraProperties) + .build()); controllerClient = cluster.getControllerClient(); d2Client = D2TestUtils.getAndStartD2Client(cluster.getZk().getAddress()); reader = new PushStatusStoreReader( diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index f715b2a4b3..d29641aac0 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -375,6 +375,10 @@ ExecutionStatusWithDetails getIncrementalPushStatusFromPushStatusStore( PushStatusStoreReader pushStatusStoreReader, int numberOfPartitions, int replicationFactor) { + LOGGER.debug( + "Querying incremental push status from PS3 for storeVersion: {}, incrementalPushVersion: {}", + kafkaTopic, + incrementalPushVersion); String storeName = Version.parseStoreFromKafkaTopicName(kafkaTopic); int storeVersion = Version.parseVersionFromVersionTopicName(kafkaTopic); Map> pushStatusMap =