diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java index d49f987f9f5..bb0d665d983 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java @@ -33,6 +33,8 @@ */ public class PartitionConsumptionState { private static final int MAX_INCREMENTAL_PUSH_ENTRY_NUM = 50; + private static final String PREVIOUSLY_READY_TO_SERVE = "previouslyReadyToServe"; + private static final String TRUE = "true"; private final String replicaId; private final int partition; @@ -599,6 +601,19 @@ public void setSkipKafkaMessage(boolean skipKafkaMessage) { this.skipKafkaMessage = skipKafkaMessage; } + /** + * This persists to the offsetRecord associated to this partitionConsumptionState that the ready to serve check has + * passed. This will be persisted to disk once the offsetRecord is checkpointed, and subsequent restarts will + * consult this information when determining if the node should come online or not to serve traffic + */ + public void recordReadyToServeInOffsetRecord() { + offsetRecord.setPreviousStatusesEntry(PREVIOUSLY_READY_TO_SERVE, TRUE); + } + + public boolean getReadyToServeInOffsetRecord() { + return offsetRecord.getPreviousStatusesEntry(PREVIOUSLY_READY_TO_SERVE).equals(TRUE); + } + /** * This immutable class holds a association between a key and value and the source offset of the consumed message. * The value could be either as received in kafka ConsumerRecord or it could be a write computed value. diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 1e91f8b31e7..1e544f62f32 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -2031,7 +2031,8 @@ private void checkConsumptionStateWhenStart( offsetLag, previousOffsetLag, offsetLagThreshold); - if (offsetLag < previousOffsetLag + offsetLagDeltaRelaxFactor * offsetLagThreshold) { + if (newPartitionConsumptionState.getReadyToServeInOffsetRecord() + && (offsetLag < previousOffsetLag + offsetLagDeltaRelaxFactor * offsetLagThreshold)) { newPartitionConsumptionState.lagHasCaughtUp(); reportCompleted(newPartitionConsumptionState, true); isCompletedReport = true; @@ -4089,6 +4090,7 @@ private ReadyToServeCheck getDefaultReadyToServeChecker() { } unSubscribePartition(new PubSubTopicPartitionImpl(versionTopic, partition)); } + partitionConsumptionState.recordReadyToServeInOffsetRecord(); } else { ingestionNotificationDispatcher.reportProgress(partitionConsumptionState); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 7031b27e5d8..bfad3f2d80e 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -629,147 +629,162 @@ private long getOffset(Future produceResultFuture) } private void runTest(Set partitions, Runnable assertions, AAConfig aaConfig) throws Exception { - runTest(partitions, () -> {}, assertions, aaConfig); + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(partitions, assertions, aaConfig); + runTest(config); } - private void runTest( - Set partitions, - Runnable assertions, - AAConfig aaConfig, - DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) throws Exception { - runTest(partitions, () -> {}, assertions, aaConfig, recordTransformerFunction); - } + public static class StoreIngestionTaskTestConfig { + private final AAConfig aaConfig; + private final Set partitions; + private final Runnable assertions; + private PollStrategy pollStrategy = new RandomPollStrategy(); + private Runnable beforeStartingConsumption = () -> {}; + private Optional hybridStoreConfig = Optional.empty(); + private boolean incrementalPushEnabled = false; + private boolean chunkingEnabled = false; + private boolean rmdChunkingEnabled = false; + private Optional diskUsageForTest = Optional.empty(); + private Map extraServerProperties = new HashMap<>(); + private Consumer storeVersionConfigOverride = storeVersionConfigOverride -> {}; + private DaVinciRecordTransformerFunctionalInterface recordTransformerFunction = null; + private OffsetRecord offsetRecord = null; + + public StoreIngestionTaskTestConfig(Set partitions, Runnable assertions, AAConfig aaConfig) { + this.partitions = partitions; + this.assertions = assertions; + this.aaConfig = aaConfig; + } - private void runTest( - Set partitions, - Runnable beforeStartingConsumption, - Runnable assertions, - AAConfig aaConfig, - DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) throws Exception { - runTest( - new RandomPollStrategy(), - partitions, - beforeStartingConsumption, - assertions, - this.hybridStoreConfig, - false, - Optional.empty(), - aaConfig, - Collections.emptyMap(), - storeVersionConfigOverride -> {}, - recordTransformerFunction); - } + public boolean isChunkingEnabled() { + return chunkingEnabled; + } - private void runTest( - Set partitions, - Runnable beforeStartingConsumption, - Runnable assertions, - AAConfig aaConfig) throws Exception { - runTest( - new RandomPollStrategy(), - partitions, - beforeStartingConsumption, - assertions, - this.hybridStoreConfig, - false, - Optional.empty(), - aaConfig, - Collections.emptyMap(), - storeVersionConfigOverride -> {}, - null); - } + public StoreIngestionTaskTestConfig setChunkingEnabled(boolean chunkingEnabled) { + this.chunkingEnabled = chunkingEnabled; + return this; + } - private void runTest( - Set partitions, - Runnable beforeStartingConsumption, - Runnable assertions, - AAConfig aaConfig, - Consumer storeVersionConfigOverride) throws Exception { - runTest( - new RandomPollStrategy(), - partitions, - beforeStartingConsumption, - assertions, - this.hybridStoreConfig, - false, - Optional.empty(), - aaConfig, - Collections.emptyMap(), - storeVersionConfigOverride, - null); - } + public Set getPartitions() { + return partitions; + } - private void runTest( - PollStrategy pollStrategy, - Set partitions, - Runnable beforeStartingConsumption, - Runnable assertions, - AAConfig aaConfig, - DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) throws Exception { - runTest( - pollStrategy, - partitions, - beforeStartingConsumption, - assertions, - this.hybridStoreConfig, - false, - Optional.empty(), - aaConfig, - Collections.emptyMap(), - storeVersionConfigOverride -> {}, - recordTransformerFunction); - } + public Runnable getAssertions() { + return assertions; + } - private void runTest( - PollStrategy pollStrategy, - Set partitions, - Runnable beforeStartingConsumption, - Runnable assertions, - Optional hybridStoreConfig, - boolean incrementalPushEnabled, - Optional diskUsageForTest, - AAConfig aaConfig, - Map extraServerProperties) throws Exception { - runTest( - pollStrategy, - partitions, - beforeStartingConsumption, - assertions, - hybridStoreConfig, - incrementalPushEnabled, - diskUsageForTest, - aaConfig, - extraServerProperties, - storeVersionConfigOverride -> {}, - null); + public PollStrategy getPollStrategy() { + return pollStrategy; + } + + public StoreIngestionTaskTestConfig setPollStrategy(PollStrategy pollStrategy) { + this.pollStrategy = pollStrategy; + return this; + } + + public Runnable getBeforeStartingConsumption() { + return beforeStartingConsumption; + } + + public StoreIngestionTaskTestConfig setBeforeStartingConsumption(Runnable beforeStartingConsumption) { + this.beforeStartingConsumption = beforeStartingConsumption; + return this; + } + + public Optional getHybridStoreConfig() { + return hybridStoreConfig; + } + + public StoreIngestionTaskTestConfig setHybridStoreConfig(Optional hybridStoreConfig) { + this.hybridStoreConfig = hybridStoreConfig; + return this; + } + + public boolean isIncrementalPushEnabled() { + return incrementalPushEnabled; + } + + public StoreIngestionTaskTestConfig setIncrementalPushEnabled(boolean incrementalPushEnabled) { + this.incrementalPushEnabled = incrementalPushEnabled; + return this; + } + + public boolean isRmdChunkingEnabled() { + return rmdChunkingEnabled; + } + + public StoreIngestionTaskTestConfig setRmdChunkingEnabled(boolean rmdChunkingEnabled) { + this.rmdChunkingEnabled = rmdChunkingEnabled; + return this; + } + + public Optional getDiskUsageForTest() { + return diskUsageForTest; + } + + public StoreIngestionTaskTestConfig setDiskUsageForTest(Optional diskUsageForTest) { + this.diskUsageForTest = diskUsageForTest; + return this; + } + + public AAConfig getAaConfig() { + return aaConfig; + } + + public Map getExtraServerProperties() { + return extraServerProperties; + } + + public StoreIngestionTaskTestConfig setExtraServerProperties(Map extraServerProperties) { + this.extraServerProperties = extraServerProperties; + return this; + } + + public Consumer getStoreVersionConfigOverride() { + return storeVersionConfigOverride; + } + + public StoreIngestionTaskTestConfig setStoreVersionConfigOverride( + Consumer storeVersionConfigOverride) { + this.storeVersionConfigOverride = storeVersionConfigOverride; + return this; + } + + public DaVinciRecordTransformerFunctionalInterface getRecordTransformerFunction() { + return recordTransformerFunction; + } + + public StoreIngestionTaskTestConfig setRecordTransformerFunction( + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { + this.recordTransformerFunction = recordTransformerFunction; + return this; + } + + public OffsetRecord getOffsetRecord() { + return offsetRecord; + } + + public StoreIngestionTaskTestConfig setOffsetRecord(OffsetRecord offsetRecord) { + this.offsetRecord = offsetRecord; + return this; + } } - private void runTest( - PollStrategy pollStrategy, - Set partitions, - Runnable beforeStartingConsumption, - Runnable assertions, - Optional hybridStoreConfig, - boolean incrementalPushEnabled, - Optional diskUsageForTest, - AAConfig aaConfig, - Map extraServerProperties, - Consumer storeVersionConfigOverride, - DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) throws Exception { + private void runTest(StoreIngestionTaskTestConfig config) throws Exception { runTest( - pollStrategy, - partitions, - beforeStartingConsumption, - assertions, - hybridStoreConfig, - incrementalPushEnabled, - false, - false, - diskUsageForTest, - aaConfig, - extraServerProperties, - storeVersionConfigOverride, - recordTransformerFunction); + config.getPollStrategy(), + config.getPartitions(), + config.getBeforeStartingConsumption(), + config.getAssertions(), + config.getHybridStoreConfig(), + config.isIncrementalPushEnabled(), + config.isChunkingEnabled(), + config.isRmdChunkingEnabled(), + config.getDiskUsageForTest(), + config.getAaConfig(), + config.getExtraServerProperties(), + config.getStoreVersionConfigOverride(), + config.getRecordTransformerFunction(), + config.getOffsetRecord()); } /** @@ -787,6 +802,7 @@ private void runTest( * @param aaConfig, the flag to turn on ActiveActiveReplication for SIT * @param extraServerProperties, the extra config for server * @param storeVersionConfigOverride, the override for store version config + * @param offsetRecord, an override for what offsetRecord should be returned when building PCS in storeIngestionTask * @throws Exception */ private void runTest( @@ -802,7 +818,8 @@ private void runTest( AAConfig aaConfig, Map extraServerProperties, Consumer storeVersionConfigOverride, - DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) throws Exception { + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction, + OffsetRecord offsetRecord) throws Exception { int partitionCount = PARTITION_COUNT; VenicePartitioner partitioner = getVenicePartitioner(); // Only get base venice partitioner @@ -830,7 +847,8 @@ private void runTest( diskUsageForTest, extraServerProperties, false, - recordTransformerFunction).build(); + recordTransformerFunction, + offsetRecord).build(); Properties kafkaProps = new Properties(); kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); @@ -964,7 +982,8 @@ private StoreIngestionTaskFactory.Builder getIngestionTaskFactoryBuilder( Optional diskUsageForTest, Map extraServerProperties, Boolean isLiveConfigEnabled, - DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { + DaVinciRecordTransformerFunctionalInterface recordTransformerFunction, + OffsetRecord optionalOffsetRecord) { if (recordTransformerFunction != null) { doReturn(mockAbstractStorageEngine).when(mockStorageEngineRepository).getLocalStorageEngine(topic); @@ -1004,8 +1023,9 @@ private StoreIngestionTaskFactory.Builder getIngestionTaskFactoryBuilder( AvroProtocolDefinition.PARTITION_STATE.getSerializer(); if (mockStorageMetadataService.getClass() != InMemoryStorageMetadataService.class) { for (int partition: partitions) { - doReturn(new OffsetRecord(partitionStateSerializer)).when(mockStorageMetadataService) - .getLastOffset(topic, partition); + OffsetRecord record = + optionalOffsetRecord != null ? optionalOffsetRecord : new OffsetRecord(partitionStateSerializer); + doReturn(record).when(mockStorageMetadataService).getLastOffset(topic, partition); } } offsetManager = new DeepCopyStorageMetadataService(mockStorageMetadataService); @@ -1337,7 +1357,7 @@ public void testVeniceMessagesProcessing(AAConfig aaConfig) throws Exception { PollStrategy pollStrategy = new CompositePollStrategy(pollStrategies); - runTest(pollStrategy, Utils.setOf(PARTITION_FOO), () -> {}, () -> { + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { // Verify it retrieves the offset from the OffSet Manager verify(mockStorageMetadataService, timeout(TEST_TIMEOUT_MS)).getLastOffset(topic, PARTITION_FOO); verifyPutAndDelete(aaConfig, true); @@ -1348,7 +1368,9 @@ public void testVeniceMessagesProcessing(AAConfig aaConfig) throws Exception { verify(mockVersionedStorageIngestionStats, timeout(TEST_TIMEOUT_MS).atLeast(3)) .recordConsumedRecordEndToEndProcessingLatency(any(), eq(1), anyDouble(), anyLong()); - }, aaConfig, null); + }, aaConfig); + config.setPollStrategy(pollStrategy); + runTest(config); // verify the shared consumer should be detached when the ingestion task is closed. verify(aggKafkaConsumerService).unsubscribeAll(pubSubTopic); @@ -1379,7 +1401,7 @@ public void testRecordLevelMetricForCurrentVersion(boolean enableRecordLevelMetr isCurrentVersion = () -> true; - runTest(new RandomPollStrategy(), Utils.setOf(PARTITION_FOO), () -> {}, () -> { + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { verify(mockAbstractStorageEngine, timeout(TEST_TIMEOUT_MS)) .put(PARTITION_FOO, putKeyFoo2, ByteBuffer.wrap(ValueRecord.create(SCHEMA_ID, putValue).serialize())); // Verify host-level metrics @@ -1390,7 +1412,9 @@ public void testRecordLevelMetricForCurrentVersion(boolean enableRecordLevelMetr } verify(mockStoreIngestionStats, times(3)).recordTotalRecordsConsumed(); - }, Optional.of(hybridStoreConfig), false, Optional.empty(), AA_OFF, extraProps); + }, AA_OFF); + config.setHybridStoreConfig(Optional.of(hybridStoreConfig)).setExtraServerProperties(extraProps); + runTest(config); } @Test(dataProvider = "aaConfigProvider") @@ -1423,7 +1447,7 @@ public void testMissingMessagesForTopicWithLogCompactionEnabled(AAConfig aaConfi pollDeliveryOrder.add(getTopicPartitionOffsetPair(putMetadata3)); PollStrategy pollStrategy = new ArbitraryOrderingPollStrategy(pollDeliveryOrder); - runTest(pollStrategy, Utils.setOf(PARTITION_FOO), () -> {}, () -> { + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { // Verify it retrieves the offset from the OffSet Manager verify(mockStorageMetadataService, timeout(TEST_TIMEOUT_MS)).getLastOffset(topic, PARTITION_FOO); @@ -1438,7 +1462,9 @@ public void testMissingMessagesForTopicWithLogCompactionEnabled(AAConfig aaConfi OffsetRecord expectedOffsetRecordForLastMessage = getOffsetRecord(putMetadata4.getOffset()); verify(mockStorageMetadataService, timeout(TEST_TIMEOUT_MS)) .put(topic, PARTITION_FOO, expectedOffsetRecordForLastMessage); - }, aaConfig, null); + }, aaConfig); + config.setPollStrategy(pollStrategy); + runTest(config); } @Test(dataProvider = "aaConfigProvider") @@ -1477,7 +1503,7 @@ public void testVeniceMessagesProcessingWithTemporarilyNotAvailableSchemaId(AACo .thenReturn(false, false, true); doReturn(true).when(mockSchemaRepo).hasValueSchema(storeNameWithoutVersionInfo, EXISTING_SCHEMA_ID); - runTest(Utils.setOf(PARTITION_FOO), () -> {}, () -> { + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { // Verify it retrieves the offset from the OffSet Manager verify(mockStorageMetadataService, timeout(TEST_TIMEOUT_MS)).getLastOffset(topic, PARTITION_FOO); @@ -1497,6 +1523,7 @@ public void testVeniceMessagesProcessingWithTemporarilyNotAvailableSchemaId(AACo OffsetRecord expected = getOffsetRecord(existingSchemaOffset); verify(mockStorageMetadataService, timeout(TEST_TIMEOUT_MS)).put(topic, PARTITION_FOO, expected); }, aaConfig); + runTest(config); } /** @@ -1532,12 +1559,15 @@ public void testVeniceMessagesProcessingWithNonExistingSchemaId(AAConfig aaConfi public void testReportStartWhenRestarting(AAConfig aaConfig) throws Exception { localVeniceWriter.broadcastStartOfPush(new HashMap<>()); final long STARTING_OFFSET = 2; - runTest(Utils.setOf(PARTITION_FOO, PARTITION_BAR), () -> { + StoreIngestionTaskTestConfig config = + new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO, PARTITION_BAR), () -> { + // Verify STARTED is NOT reported when offset is 0 + verify(mockLogNotifier, never()).started(topic, PARTITION_BAR); + }, aaConfig); + config.setBeforeStartingConsumption(() -> { doReturn(getOffsetRecord(STARTING_OFFSET)).when(mockStorageMetadataService).getLastOffset(anyString(), anyInt()); - }, () -> { - // Verify STARTED is NOT reported when offset is 0 - verify(mockLogNotifier, never()).started(topic, PARTITION_BAR); - }, aaConfig); + }); + runTest(config); } @Test(dataProvider = "aaConfigProvider") @@ -1592,13 +1622,7 @@ public void testReadyToServePartition(AAConfig aaConfig) throws Exception { localVeniceWriter.broadcastStartOfPush(new HashMap<>()); localVeniceWriter.broadcastEndOfPush(new HashMap<>()); - runTest(Utils.setOf(PARTITION_FOO), () -> { - Store mockStore = mock(Store.class); - doReturn(true).when(mockStore).isHybrid(); - doReturn(new VersionImpl("storeName", 1)).when(mockStore).getVersion(1); - doReturn(storeNameWithoutVersionInfo).when(mockStore).getName(); - doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeNameWithoutVersionInfo); - }, () -> { + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { ArgumentCaptor storagePartitionConfigArgumentCaptor = ArgumentCaptor.forClass(StoragePartitionConfig.class); TestUtils.waitForNonDeterministicAssertion( @@ -1616,6 +1640,15 @@ public void testReadyToServePartition(AAConfig aaConfig) throws Exception { assertFalse(storagePartitionConfigParam.isReadWriteLeaderForDefaultCF()); assertFalse(storagePartitionConfigParam.isReadWriteLeaderForRMDCF()); }, aaConfig); + config.setBeforeStartingConsumption(() -> { + Store mockStore = mock(Store.class); + doReturn(true).when(mockStore).isHybrid(); + doReturn(new VersionImpl("storeName", 1)).when(mockStore).getVersion(1); + doReturn(storeNameWithoutVersionInfo).when(mockStore).getName(); + doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeNameWithoutVersionInfo); + }); + + runTest(config); } @Test(dataProvider = "aaConfigProvider") @@ -1659,6 +1692,41 @@ public void testReadyToServePartitionWriteOnly(AAConfig aaConfig) throws Excepti }, aaConfig); } + @Test(dataProvider = "aaConfigProvider") + public void testReadyToServePartitionValidateIngestionSuccessWithPriorState(AAConfig aaConfig) throws Exception { + localVeniceWriter.broadcastStartOfPush(new HashMap<>()); + localVeniceWriter.broadcastEndOfPush(new HashMap<>()); + Store mockStore = mock(Store.class); + doReturn(mockStore).when(mockMetadataRepo).getStore(storeNameWithoutVersionInfo); + doReturn(true).when(mockStore).isHybrid(); + doReturn(storeNameWithoutVersionInfo).when(mockStore).getName(); + mockAbstractStorageEngine.addStoragePartition(PARTITION_FOO); + AbstractStoragePartition mockPartition = mock(AbstractStoragePartition.class); + doReturn(mockPartition).when(mockAbstractStorageEngine).getPartitionOrThrow(PARTITION_FOO); + doReturn(true).when(mockPartition).validateBatchIngestion(); + new StoragePartitionConfig(topic, PARTITION_FOO); + + StoreIngestionTaskTestConfig testConfig = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { + verify(mockAbstractStorageEngine, never()).adjustStoragePartition(eq(PARTITION_FOO), eq(PREPARE_FOR_READ), any()); + }, aaConfig); + + testConfig.setHybridStoreConfig( + Optional.of( + new HybridStoreConfigImpl( + 1L, + 1L, + 1L, + DataReplicationPolicy.AGGREGATE, + BufferReplayPolicy.REWIND_FROM_SOP))); + final InternalAvroSpecificSerializer partitionStateSerializer = + AvroProtocolDefinition.PARTITION_STATE.getSerializer(); + OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer); + offsetRecord.endOfPushReceived(2L); + offsetRecord.setPreviousStatusesEntry("previouslyReadyToServe", "true"); + testConfig.setOffsetRecord(offsetRecord); + runTest(testConfig); + } + @Test(dataProvider = "aaConfigProvider") public void testResetPartition(AAConfig aaConfig) throws Exception { localVeniceWriter.broadcastStartOfPush(new HashMap<>()); @@ -1714,21 +1782,25 @@ public void testDetectionOfMissingRecord(AAConfig aaConfig) throws Exception { new RandomPollStrategy(), Utils.setOf(new PubSubTopicPartitionOffset(barTopicPartition, barOffsetToSkip))); - runTest(pollStrategy, Utils.setOf(PARTITION_FOO, PARTITION_BAR), () -> {}, () -> { - verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS).atLeastOnce()) - .endOfPushReceived(topic, PARTITION_FOO, fooLastOffset); - verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).error( - eq(topic), - eq(PARTITION_BAR), - argThat(new NonEmptyStringMatcher()), - argThat(new ExceptionClassMatcher(MissingDataException.class))); - - // After we verified that completed() and error() are called, the rest should be guaranteed to be finished, so no - // need for timeouts - - verify(mockLogNotifier, atLeastOnce()).started(topic, PARTITION_FOO); - verify(mockLogNotifier, atLeastOnce()).started(topic, PARTITION_BAR); - }, aaConfig, null); + StoreIngestionTaskTestConfig config = + new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO, PARTITION_BAR), () -> { + verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS).atLeastOnce()) + .endOfPushReceived(topic, PARTITION_FOO, fooLastOffset); + verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).error( + eq(topic), + eq(PARTITION_BAR), + argThat(new NonEmptyStringMatcher()), + argThat(new ExceptionClassMatcher(MissingDataException.class))); + + // After we verified that completed() and error() are called, the rest should be guaranteed to be finished, so + // no + // need for timeouts + + verify(mockLogNotifier, atLeastOnce()).started(topic, PARTITION_FOO); + verify(mockLogNotifier, atLeastOnce()).started(topic, PARTITION_BAR); + }, aaConfig); + config.setPollStrategy(pollStrategy); + runTest(config); } /** @@ -1747,20 +1819,23 @@ public void testSkippingOfDuplicateRecord(AAConfig aaConfig) throws Exception { Utils.mutableSetOf( new PubSubTopicPartitionOffset(new PubSubTopicPartitionImpl(pubSubTopic, PARTITION_BAR), barOffsetToDupe))); - runTest(pollStrategy, Utils.setOf(PARTITION_FOO, PARTITION_BAR), () -> {}, () -> { - verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS).atLeastOnce()) - .endOfPushReceived(topic, PARTITION_FOO, fooLastOffset); - verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS).atLeastOnce()) - .endOfPushReceived(topic, PARTITION_BAR, barOffsetToDupe); - verify(mockLogNotifier, after(TEST_TIMEOUT_MS).never()) - .endOfPushReceived(topic, PARTITION_BAR, barOffsetToDupe + 1); - - // After we verified that completed() is called, the rest should be guaranteed to be finished, so no need for - // timeouts - - verify(mockLogNotifier, atLeastOnce()).started(topic, PARTITION_FOO); - verify(mockLogNotifier, atLeastOnce()).started(topic, PARTITION_BAR); - }, aaConfig, null); + StoreIngestionTaskTestConfig config = + new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO, PARTITION_BAR), () -> { + verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS).atLeastOnce()) + .endOfPushReceived(topic, PARTITION_FOO, fooLastOffset); + verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS).atLeastOnce()) + .endOfPushReceived(topic, PARTITION_BAR, barOffsetToDupe); + verify(mockLogNotifier, after(TEST_TIMEOUT_MS).never()) + .endOfPushReceived(topic, PARTITION_BAR, barOffsetToDupe + 1); + + // After we verified that completed() is called, the rest should be guaranteed to be finished, so no need for + // timeouts + + verify(mockLogNotifier, atLeastOnce()).started(topic, PARTITION_FOO); + verify(mockLogNotifier, atLeastOnce()).started(topic, PARTITION_BAR); + }, aaConfig); + config.setPollStrategy(pollStrategy); + runTest(config); } @Test(dataProvider = "aaConfigProvider") @@ -1769,12 +1844,14 @@ public void testThrottling(AAConfig aaConfig) throws Exception { localVeniceWriter.put(putKeyFoo, putValue, SCHEMA_ID); localVeniceWriter.delete(deleteKeyFoo, null); - runTest(new RandomPollStrategy(1), Utils.setOf(PARTITION_FOO), () -> {}, () -> { + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { // START_OF_SEGMENT, START_OF_PUSH, PUT, DELETE verify(mockIngestionThrottler, timeout(TEST_TIMEOUT_MS).times(4)) .maybeThrottleRecordRate(ConsumerPoolType.REGULAR_POOL, 1); verify(mockIngestionThrottler, timeout(TEST_TIMEOUT_MS).times(4)).maybeThrottleBandwidth(anyInt()); - }, aaConfig, null); + }, aaConfig); + config.setPollStrategy(new RandomPollStrategy(1)); + runTest(config); } /** @@ -1955,13 +2032,15 @@ public void testDIVErrorMessagesNotFailFastAfterEOP(AAConfig aaConfig) throws Ex LOGGER.info("lastOffsetBeforeEOP: {}, lastOffset: {}", lastOffsetBeforeEOP, lastOffset); - runTest(pollStrategy, Utils.setOf(PARTITION_FOO), () -> {}, () -> { + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { for (Object[] args: mockNotifierError) { Assert.assertFalse( args[0].equals(topic) && args[1].equals(PARTITION_FOO) && ((String) args[2]).length() > 0 && args[3] instanceof FatalDataValidationException); } - }, aaConfig, null); + }, aaConfig); + config.setPollStrategy(pollStrategy); + runTest(config); } /** @@ -2009,14 +2088,15 @@ public void testCorruptMessagesFailFast(AAConfig aaConfig) throws Exception { public void testSubscribeCompletedPartition(AAConfig aaConfig) throws Exception { final int offset = 100; localVeniceWriter.broadcastStartOfPush(new HashMap<>()); - runTest( - Utils.setOf(PARTITION_FOO), + + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { + verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).completed(topic, PARTITION_FOO, offset, "STANDBY"); + }, aaConfig); + config.setBeforeStartingConsumption( () -> doReturn(getOffsetRecord(offset, true)).when(mockStorageMetadataService) - .getLastOffset(topic, PARTITION_FOO), - () -> { - verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).completed(topic, PARTITION_FOO, offset, "STANDBY"); - }, - aaConfig); + .getLastOffset(topic, PARTITION_FOO)); + + runTest(config); } @Test(dataProvider = "aaConfigProvider") @@ -2027,14 +2107,7 @@ public void testSubscribeCompletedPartitionUnsubscribe(AAConfig aaConfig) throws Map extraServerProperties = new HashMap<>(); extraServerProperties.put(SERVER_UNSUB_AFTER_BATCHPUSH, true); - runTest(new RandomPollStrategy(), Utils.setOf(PARTITION_FOO), () -> { - Store mockStore = mock(Store.class); - doReturn(storeNameWithoutVersionInfo).when(mockStore).getName(); - doReturn(1).when(mockStore).getCurrentVersion(); - doReturn(new VersionImpl("storeName", 1, Version.numberBasedDummyPushId(1))).when(mockStore).getVersion(1); - doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeNameWithoutVersionInfo); - doReturn(getOffsetRecord(offset, true)).when(mockStorageMetadataService).getLastOffset(topic, PARTITION_FOO); - }, () -> { + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { verify(mockLogNotifier, timeout(LONG_TEST_TIMEOUT)).completed(topic, PARTITION_FOO, offset, "STANDBY"); verify(aggKafkaConsumerService, timeout(LONG_TEST_TIMEOUT)) .batchUnsubscribeConsumerFor(pubSubTopic, Collections.singleton(fooTopicPartition)); @@ -2042,7 +2115,16 @@ public void testSubscribeCompletedPartitionUnsubscribe(AAConfig aaConfig) throws verify(mockLocalKafkaConsumer, timeout(LONG_TEST_TIMEOUT)) .batchUnsubscribe(Collections.singleton(fooTopicPartition)); verify(mockLocalKafkaConsumer, never()).unSubscribe(barTopicPartition); - }, this.hybridStoreConfig, false, Optional.empty(), aaConfig, extraServerProperties); + }, aaConfig); + config.setBeforeStartingConsumption(() -> { + Store mockStore = mock(Store.class); + doReturn(storeNameWithoutVersionInfo).when(mockStore).getName(); + doReturn(1).when(mockStore).getCurrentVersion(); + doReturn(new VersionImpl("storeName", 1, Version.numberBasedDummyPushId(1))).when(mockStore).getVersion(1); + doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeNameWithoutVersionInfo); + doReturn(getOffsetRecord(offset, true)).when(mockStorageMetadataService).getLastOffset(topic, PARTITION_FOO); + }).setHybridStoreConfig(this.hybridStoreConfig).setExtraServerProperties(extraServerProperties); + runTest(config); } @Test(dataProvider = "aaConfigProvider") @@ -2050,7 +2132,11 @@ public void testCompleteCalledWhenUnsubscribeAfterBatchPushDisabled(AAConfig aaC final int offset = 10; localVeniceWriter.broadcastStartOfPush(new HashMap<>()); - runTest(Utils.setOf(PARTITION_FOO), () -> { + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig( + Utils.setOf(PARTITION_FOO), + () -> verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).completed(topic, PARTITION_FOO, offset, "STANDBY"), + aaConfig); + config.setBeforeStartingConsumption(() -> { Store mockStore = mock(Store.class); storeIngestionTaskUnderTest.unSubscribePartition(fooTopicPartition); doReturn(storeNameWithoutVersionInfo).when(mockStore).getName(); @@ -2059,9 +2145,9 @@ public void testCompleteCalledWhenUnsubscribeAfterBatchPushDisabled(AAConfig aaC .getVersion(1); doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeNameWithoutVersionInfo); doReturn(getOffsetRecord(offset, true)).when(mockStorageMetadataService).getLastOffset(topic, PARTITION_FOO); - }, - () -> verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).completed(topic, PARTITION_FOO, offset, "STANDBY"), - aaConfig); + }); + + runTest(config); } @Test(dataProvider = "aaConfigProvider") @@ -2090,32 +2176,37 @@ public void testKillConsumption(AAConfig aaConfig) throws Exception { }); try { - runTest(Utils.setOf(PARTITION_FOO, PARTITION_BAR), () -> { + + StoreIngestionTaskTestConfig config = + new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO, PARTITION_BAR), () -> { + verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).started(topic, PARTITION_FOO); + verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).started(topic, PARTITION_BAR); + + // Start of push has already been consumed. Stop consumption + storeIngestionTaskUnderTest.kill(); + // task should report an error to notifier that it's killed. + verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).error( + eq(topic), + eq(PARTITION_FOO), + argThat(new NonEmptyStringMatcher()), + argThat(new ExceptionClassMatcher(VeniceIngestionTaskKilledException.class))); + verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).error( + eq(topic), + eq(PARTITION_BAR), + argThat(new NonEmptyStringMatcher()), + argThat(new ExceptionClassMatcher(VeniceIngestionTaskKilledException.class))); + + waitForNonDeterministicCompletion( + TEST_TIMEOUT_MS, + TimeUnit.MILLISECONDS, + () -> storeIngestionTaskUnderTest.isRunning() == false); + }, aaConfig); + config.setBeforeStartingConsumption(() -> { localVeniceWriter.broadcastStartOfPush(new HashMap<>()); writingThread.start(); - }, () -> { - verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).started(topic, PARTITION_FOO); - verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).started(topic, PARTITION_BAR); - - // Start of push has already been consumed. Stop consumption - storeIngestionTaskUnderTest.kill(); - // task should report an error to notifier that it's killed. - verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).error( - eq(topic), - eq(PARTITION_FOO), - argThat(new NonEmptyStringMatcher()), - argThat(new ExceptionClassMatcher(VeniceIngestionTaskKilledException.class))); - verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).error( - eq(topic), - eq(PARTITION_BAR), - argThat(new NonEmptyStringMatcher()), - argThat(new ExceptionClassMatcher(VeniceIngestionTaskKilledException.class))); - - waitForNonDeterministicCompletion( - TEST_TIMEOUT_MS, - TimeUnit.MILLISECONDS, - () -> storeIngestionTaskUnderTest.isRunning() == false); - }, aaConfig); + }); + + runTest(config); } finally { TestUtils.shutdownThread(writingThread); } @@ -2123,14 +2214,8 @@ public void testKillConsumption(AAConfig aaConfig) throws Exception { @Test(dataProvider = "aaConfigProvider") public void testKillActionPriority(AAConfig aaConfig) throws Exception { - runTest(Utils.setOf(PARTITION_FOO), () -> { - localVeniceWriter.broadcastStartOfPush(new HashMap<>()); - localVeniceWriter.put(putKeyFoo, putValue, SCHEMA_ID); - // Add a reset consumer action - storeIngestionTaskUnderTest.resetPartitionConsumptionOffset(fooTopicPartition); - // Add a kill consumer action in higher priority than subscribe and reset. - storeIngestionTaskUnderTest.kill(); - }, () -> { + + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { // verify subscribe has not been processed. Because consumption task should process kill action at first verify(mockStorageMetadataService, after(TEST_TIMEOUT_MS).never()).getLastOffset(topic, PARTITION_FOO); /** @@ -2150,6 +2235,16 @@ public void testKillActionPriority(AAConfig aaConfig) throws Exception { TimeUnit.MILLISECONDS, () -> storeIngestionTaskUnderTest.isRunning() == false); }, aaConfig); + config.setBeforeStartingConsumption(() -> { + localVeniceWriter.broadcastStartOfPush(new HashMap<>()); + localVeniceWriter.put(putKeyFoo, putValue, SCHEMA_ID); + // Add a reset consumer action + storeIngestionTaskUnderTest.resetPartitionConsumptionOffset(fooTopicPartition); + // Add a kill consumer action in higher priority than subscribe and reset. + storeIngestionTaskUnderTest.kill(); + }); + + runTest(config); } private byte[] getNumberedKey(int number) { @@ -2221,7 +2316,7 @@ public void testDataValidationCheckPointing(SortedInput sortedInput, AAConfig aa } }); - runTest(pollStrategy, relevantPartitions, () -> {}, () -> { + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(relevantPartitions, () -> { // Verify that all partitions reported success. maxOffsetPerPartition.entrySet() .stream() @@ -2282,7 +2377,9 @@ public void testDataValidationCheckPointing(SortedInput sortedInput, AAConfig aa PartitionConsumptionState pcs = storeIngestionTaskUnderTest.getPartitionConsumptionState(partition); Assert.assertTrue(pcs.getLatestProcessedUpstreamRTOffsetMap().isEmpty()); }); - }, aaConfig, null); + }, aaConfig); + config.setPollStrategy(pollStrategy); + runTest(config); } @Test(dataProvider = "aaConfigProvider") @@ -2305,14 +2402,16 @@ public void testKillAfterPartitionIsCompleted(AAConfig aaConfig) throws Exceptio public void testNeverReportProgressBeforeStart(AAConfig aaConfig) throws Exception { localVeniceWriter.broadcastStartOfPush(new HashMap<>()); // Read one message for each poll. - runTest(new RandomPollStrategy(1), Utils.setOf(PARTITION_FOO), () -> {}, () -> { + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { verify(mockLogNotifier, after(TEST_TIMEOUT_MS).never()).progress(topic, PARTITION_FOO, 0); verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS).atLeastOnce()).started(topic, PARTITION_FOO); // The current behavior is only to sync offset/report progress after processing a pre-configured amount // of messages in bytes, since control message is being counted as 0 bytes (no data persisted in disk), // then no progress will be reported during start, but only for processed messages. verify(mockLogNotifier, after(TEST_TIMEOUT_MS).never()).progress(any(), anyInt(), anyInt()); - }, aaConfig, null); + }, aaConfig); + config.setPollStrategy(new RandomPollStrategy(1)); + runTest(config); } @Test(dataProvider = "aaConfigProvider") @@ -2330,23 +2429,20 @@ public void testOffsetPersistent(AAConfig aaConfig) throws Exception { * Persist for every control message except START_OF_SEGMENT and END_OF_SEGMENT: * START_OF_PUSH, END_OF_PUSH, START_OF_BUFFER_REPLAY */ - runTest( - new RandomPollStrategy(), + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig( Utils.setOf(PARTITION_FOO), - () -> {}, () -> verify(mockStorageMetadataService, timeout(TEST_TIMEOUT_MS).times(2)) .put(eq(topic), eq(PARTITION_FOO), any()), + aaConfig); + config.setHybridStoreConfig( Optional.of( new HybridStoreConfigImpl( 100, 100, HybridStoreConfigImpl.DEFAULT_HYBRID_TIME_LAG_THRESHOLD, DataReplicationPolicy.NON_AGGREGATE, - BufferReplayPolicy.REWIND_FROM_EOP)), - false, - Optional.empty(), - aaConfig, - Collections.emptyMap()); + BufferReplayPolicy.REWIND_FROM_EOP))); + runTest(config); } finally { databaseSyncBytesIntervalForTransactionalMode = 1; } @@ -2443,29 +2539,7 @@ public void testDelayedTransitionToOnlineInHybridMode(AAConfig aaConfig) throws return messageCountPerPartition[partitionNumber]; }); - runTest(ALL_PARTITIONS, () -> { - localVeniceWriter.broadcastStartOfPush(Collections.emptyMap()); - for (int partition: ALL_PARTITIONS) { - // Taking into account both the initial SOS and the SOP - messageCountPerPartition[partition] += 2; - } - for (int i = 0; i < MESSAGES_BEFORE_EOP; i++) { - try { - CompletableFuture future = - localVeniceWriter.put(getNumberedKey(i), getNumberedValue(i), SCHEMA_ID); - PubSubProduceResult result = future.get(); - int partition = result.getPartition(); - messageCountPerPartition[partition]++; - } catch (InterruptedException | ExecutionException e) { - throw new VeniceException(e); - } - } - localVeniceWriter.broadcastEndOfPush(Collections.emptyMap()); - for (int partition: ALL_PARTITIONS) { - messageCountPerPartition[partition]++; - } - - }, () -> { + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(ALL_PARTITIONS, () -> { verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS).atLeast(ALL_PARTITIONS.size())).started(eq(topic), anyInt()); verify(mockLogNotifier, never()).completed(anyString(), anyInt(), anyLong()); @@ -2512,6 +2586,31 @@ public void testDelayedTransitionToOnlineInHybridMode(AAConfig aaConfig) throws .completed(anyString(), anyInt(), anyLong(), anyString()); }, aaConfig); + config.setBeforeStartingConsumption(() -> { + localVeniceWriter.broadcastStartOfPush(Collections.emptyMap()); + for (int partition: ALL_PARTITIONS) { + // Taking into account both the initial SOS and the SOP + messageCountPerPartition[partition] += 2; + } + for (int i = 0; i < MESSAGES_BEFORE_EOP; i++) { + try { + CompletableFuture future = + localVeniceWriter.put(getNumberedKey(i), getNumberedValue(i), SCHEMA_ID); + PubSubProduceResult result = future.get(); + int partition = result.getPartition(); + messageCountPerPartition[partition]++; + } catch (InterruptedException | ExecutionException e) { + throw new VeniceException(e); + } + } + localVeniceWriter.broadcastEndOfPush(Collections.emptyMap()); + for (int partition: ALL_PARTITIONS) { + messageCountPerPartition[partition]++; + } + + }); + + runTest(config); } /** @@ -2530,10 +2629,8 @@ public void testStoreIngestionTaskRespectsDiskUsage(AAConfig aaConfig) throws Ex doReturn("mock disk full disk usage").when(diskFullUsage).getDiskStatus(); doReturn(true).when(mockSchemaRepo).hasValueSchema(storeNameWithoutVersionInfo, EXISTING_SCHEMA_ID); - runTest( - new RandomPollStrategy(), + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig( Utils.setOf(PARTITION_FOO), - () -> {}, () -> waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { // If the partition already got EndOfPushReceived, then all errors will be suppressed and not reported. // The speed for a partition to get EOP is non-deterministic, adds the if check here to make this test not @@ -2542,7 +2639,7 @@ public void testStoreIngestionTaskRespectsDiskUsage(AAConfig aaConfig) throws Ex Assert.assertFalse(mockNotifierError.isEmpty(), "Disk Usage should have triggered an ingestion error"); String errorMessages = mockNotifierError.stream() .map(o -> ((Exception) o[3]).getMessage()) // elements in object array are 0:store name (String), 1: - // partition (int), 2: message (String), 3: cause (Exception) + // partition (int), 2: message (String), 3: cause (Exception) .collect(Collectors.joining()); Assert.assertTrue( errorMessages.contains("Disk is full"), @@ -2551,11 +2648,9 @@ public void testStoreIngestionTaskRespectsDiskUsage(AAConfig aaConfig) throws Ex LOGGER.info("EOP was received, and therefore this test cannot perform its assertions."); } }), - Optional.empty(), - false, - Optional.of(diskFullUsage), - aaConfig, - Collections.emptyMap()); + aaConfig); + config.setDiskUsageForTest(Optional.of(diskFullUsage)); + runTest(config); } @Test(dataProvider = "aaConfigProvider") @@ -2579,7 +2674,7 @@ public void testIncrementalPush(AAConfig aaConfig) throws Exception { DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP); - runTest(new RandomPollStrategy(), Utils.setOf(PARTITION_FOO), () -> {}, () -> { + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { // sync the offset when receiving EndOfPush verify(mockStorageMetadataService).put(eq(topic), eq(PARTITION_FOO), eq(getOffsetRecord(fooOffset + 1, true))); @@ -2596,7 +2691,9 @@ public void testIncrementalPush(AAConfig aaConfig) throws Exception { verify(mockLogNotifier, atLeastOnce()) .endOfIncrementalPushReceived(topic, PARTITION_FOO, fooNewOffset, version); }); - }, Optional.of(hybridStoreConfig), true, Optional.empty(), aaConfig, Collections.emptyMap()); + }, aaConfig); + config.setHybridStoreConfig(Optional.of(hybridStoreConfig)).setIncrementalPushEnabled(true); + runTest(config); } @Test(dataProvider = "aaConfigProvider") @@ -2606,8 +2703,25 @@ public void testSchemaCacheWarming(AAConfig aaConfig) throws Exception { long fooOffset = getOffset(localVeniceWriter.put(putKeyFoo, putValue, SCHEMA_ID)); localVeniceWriter.broadcastEndOfPush(new HashMap<>()); SchemaEntry schemaEntry = new SchemaEntry(1, STRING_SCHEMA); + // Records order are: StartOfSeg, StartOfPush, data, EndOfPush, EndOfSeg - runTest(new RandomPollStrategy(), Utils.setOf(PARTITION_FOO), () -> { + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig( + Utils.setOf(PARTITION_FOO), + () -> waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + verify(mockLogNotifier, atLeastOnce()).started(topic, PARTITION_FOO); + // since notifier reporting happens before offset update, it actually reports previous offsets + verify(mockLogNotifier, atLeastOnce()).endOfPushReceived(topic, PARTITION_FOO, fooOffset); + // Since the completion report will be async, the completed offset could be `END_OF_PUSH` or `END_OF_SEGMENT` + // for + // batch push job. + verify(mockLogNotifier).completed( + eq(topic), + eq(PARTITION_FOO), + longThat(completionOffset -> (completionOffset == fooOffset + 1) || (completionOffset == fooOffset + 2)), + eq("STANDBY")); + }), + aaConfig); + config.setBeforeStartingConsumption(() -> { Store mockStore = mock(Store.class); doReturn(storeNameWithoutVersionInfo).when(mockStore).getName(); doReturn(true).when(mockStore).isReadComputationEnabled(); @@ -2615,23 +2729,8 @@ public void testSchemaCacheWarming(AAConfig aaConfig) throws Exception { doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeNameWithoutVersionInfo); doReturn(schemaEntry).when(mockSchemaRepo).getValueSchema(anyString(), anyInt()); doReturn(new VersionImpl("storeName", 1)).when(mockStore).getVersion(1); - }, () -> waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { - verify(mockLogNotifier, atLeastOnce()).started(topic, PARTITION_FOO); - // since notifier reporting happens before offset update, it actually reports previous offsets - verify(mockLogNotifier, atLeastOnce()).endOfPushReceived(topic, PARTITION_FOO, fooOffset); - // Since the completion report will be async, the completed offset could be `END_OF_PUSH` or `END_OF_SEGMENT` for - // batch push job. - verify(mockLogNotifier).completed( - eq(topic), - eq(PARTITION_FOO), - longThat(completionOffset -> (completionOffset == fooOffset + 1) || (completionOffset == fooOffset + 2)), - eq("STANDBY")); - }), - Optional.empty(), - false, - Optional.empty(), - aaConfig, - Collections.singletonMap(SERVER_NUM_SCHEMA_FAST_CLASS_WARMUP, 1)); + }).setExtraServerProperties(Collections.singletonMap(SERVER_NUM_SCHEMA_FAST_CLASS_WARMUP, 1)); + runTest(config); } @Test(dataProvider = "aaConfigProvider") @@ -2797,6 +2896,7 @@ public void testRecordsCanBeThrottledPerRegion() throws ExecutionException, Inte Optional.empty(), extraServerProperties, true, + null, null).build(); Properties kafkaProps = new Properties(); @@ -2937,6 +3037,7 @@ public void testIsReadyToServe(NodeType nodeType, AAConfig aaConfig, DataReplica Optional.empty(), extraServerProperties, false, + null, null).setIsDaVinciClient(nodeType == DA_VINCI).setAggKafkaConsumerService(aggKafkaConsumerService).build(); TopicManager mockTopicManagerRemoteKafka = mock(TopicManager.class); @@ -3158,6 +3259,7 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT Optional.empty(), new HashMap<>(), false, + null, null).setIsDaVinciClient(nodeType == DA_VINCI).setAggKafkaConsumerService(aggKafkaConsumerService).build(); TopicManager mockTopicManagerRemoteKafka = mock(TopicManager.class); @@ -3305,6 +3407,7 @@ public void testCheckAndLogIfLagIsAcceptableForHybridStore( Optional.empty(), serverProperties, false, + null, null).setIsDaVinciClient(nodeType == DA_VINCI).setAggKafkaConsumerService(aggKafkaConsumerService).build(); TopicManager mockTopicManagerRemoteKafka = mock(TopicManager.class); @@ -3466,6 +3569,7 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node Optional.empty(), new HashMap<>(), false, + null, null).setIsDaVinciClient(nodeType == DA_VINCI).setAggKafkaConsumerService(aggKafkaConsumerService).build(); Properties kafkaProps = new Properties(); @@ -3564,6 +3668,7 @@ public void testProcessTopicSwitch(NodeType nodeType) { Optional.empty(), new HashMap<>(), false, + null, null).setIsDaVinciClient(nodeType == DA_VINCI).build(); Properties kafkaProps = new Properties(); kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); @@ -3912,55 +4017,59 @@ public void testResubscribeAfterRoleChange() throws Exception { doReturn(mockTopicManagerRemoteKafka).when(mockTopicManagerRepository) .getTopicManager(inMemoryRemoteKafkaBroker.getKafkaBootstrapServer()); - runTest(localPollStrategy, Utils.setOf(PARTITION_FOO, PARTITION_BAR), () -> {}, () -> { - doReturn(vtWriter).when(mockWriterFactory).createVeniceWriter(any(VeniceWriterOptions.class)); - verify(mockLogNotifier, never()).completed(anyString(), anyInt(), anyLong()); - List kafkaBootstrapServers = new ArrayList<>(); - kafkaBootstrapServers.add(inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); - kafkaBootstrapServers.add(inMemoryRemoteKafkaBroker.getKafkaBootstrapServer()); - - // Verify ingestion of Venice version topic batchMessagesNum messages - verify(mockAbstractStorageEngine, timeout(10000).times(batchMessagesNum)) - .put(eq(PARTITION_BAR), any(), (ByteBuffer) any()); - - vtWriter.broadcastEndOfPush(new HashMap<>()); - vtWriter.broadcastTopicSwitch( - kafkaBootstrapServers, - Version.composeRealTimeTopic(storeNameWithoutVersionInfo), - System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10), - new HashMap<>()); - storeIngestionTaskUnderTest.promoteToLeader( - fooTopicPartition, - new LeaderFollowerPartitionStateModel.LeaderSessionIdChecker(1, new AtomicLong(1))); - - // Both Colo RT ingestion, avoid DCR collision intentionally. Each rt will be produced batchMessagesNum messages. - produceRecordsUsingSpecificWriter(localRtWriter, 0, batchMessagesNum, this::getNumberedKey); - produceRecordsUsingSpecificWriter(remoteRtWriter, batchMessagesNum, batchMessagesNum, this::getNumberedKey); - - verify(mockAbstractStorageEngine, timeout(10000).times(batchMessagesNum * 2)) - .putWithReplicationMetadata(eq(PARTITION_FOO), any(), any(), any()); - try { - verify(storeIngestionTaskUnderTest, times(totalResubscriptionTriggered)).resubscribeForAllPartitions(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - verify(mockLocalKafkaConsumer, atLeast(totalLocalVtResubscriptionTriggered)).unSubscribe(eq(fooTopicPartition)); - verify(mockLocalKafkaConsumer, atLeast(totalLocalVtResubscriptionTriggered)).unSubscribe(eq(barTopicPartition)); - PubSubTopicPartition fooRtTopicPartition = new PubSubTopicPartitionImpl(realTimeTopic, PARTITION_FOO); - verify(mockLocalKafkaConsumer, atLeast(totalLocalRtResubscriptionTriggered)).unSubscribe(fooRtTopicPartition); - verify(mockRemoteKafkaConsumer, atLeast(totalRemoteRtResubscriptionTriggered)).unSubscribe(fooRtTopicPartition); - verify(mockLocalKafkaConsumer, atLeast(totalLocalVtResubscriptionTriggered)) - .subscribe(eq(fooTopicPartition), anyLong()); - verify(mockLocalKafkaConsumer, atLeast(totalLocalRtResubscriptionTriggered)) - .subscribe(eq(fooRtTopicPartition), anyLong()); - verify(mockRemoteKafkaConsumer, atLeast(totalRemoteRtResubscriptionTriggered)) - .subscribe(eq(fooRtTopicPartition), anyLong()); - }, - Optional.of(hybridStoreConfig), - false, - Optional.empty(), - AA_ON, - Collections.singletonMap(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, 3L)); + StoreIngestionTaskTestConfig config = + new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO, PARTITION_BAR), () -> { + doReturn(vtWriter).when(mockWriterFactory).createVeniceWriter(any(VeniceWriterOptions.class)); + verify(mockLogNotifier, never()).completed(anyString(), anyInt(), anyLong()); + List kafkaBootstrapServers = new ArrayList<>(); + kafkaBootstrapServers.add(inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); + kafkaBootstrapServers.add(inMemoryRemoteKafkaBroker.getKafkaBootstrapServer()); + + // Verify ingestion of Venice version topic batchMessagesNum messages + verify(mockAbstractStorageEngine, timeout(10000).times(batchMessagesNum)) + .put(eq(PARTITION_BAR), any(), (ByteBuffer) any()); + + vtWriter.broadcastEndOfPush(new HashMap<>()); + vtWriter.broadcastTopicSwitch( + kafkaBootstrapServers, + Version.composeRealTimeTopic(storeNameWithoutVersionInfo), + System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10), + new HashMap<>()); + storeIngestionTaskUnderTest.promoteToLeader( + fooTopicPartition, + new LeaderFollowerPartitionStateModel.LeaderSessionIdChecker(1, new AtomicLong(1))); + + // Both Colo RT ingestion, avoid DCR collision intentionally. Each rt will be produced batchMessagesNum + // messages. + produceRecordsUsingSpecificWriter(localRtWriter, 0, batchMessagesNum, this::getNumberedKey); + produceRecordsUsingSpecificWriter(remoteRtWriter, batchMessagesNum, batchMessagesNum, this::getNumberedKey); + + verify(mockAbstractStorageEngine, timeout(10000).times(batchMessagesNum * 2)) + .putWithReplicationMetadata(eq(PARTITION_FOO), any(), any(), any()); + try { + verify(storeIngestionTaskUnderTest, times(totalResubscriptionTriggered)).resubscribeForAllPartitions(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + verify(mockLocalKafkaConsumer, atLeast(totalLocalVtResubscriptionTriggered)) + .unSubscribe(eq(fooTopicPartition)); + verify(mockLocalKafkaConsumer, atLeast(totalLocalVtResubscriptionTriggered)) + .unSubscribe(eq(barTopicPartition)); + PubSubTopicPartition fooRtTopicPartition = new PubSubTopicPartitionImpl(realTimeTopic, PARTITION_FOO); + verify(mockLocalKafkaConsumer, atLeast(totalLocalRtResubscriptionTriggered)).unSubscribe(fooRtTopicPartition); + verify(mockRemoteKafkaConsumer, atLeast(totalRemoteRtResubscriptionTriggered)) + .unSubscribe(fooRtTopicPartition); + verify(mockLocalKafkaConsumer, atLeast(totalLocalVtResubscriptionTriggered)) + .subscribe(eq(fooTopicPartition), anyLong()); + verify(mockLocalKafkaConsumer, atLeast(totalLocalRtResubscriptionTriggered)) + .subscribe(eq(fooRtTopicPartition), anyLong()); + verify(mockRemoteKafkaConsumer, atLeast(totalRemoteRtResubscriptionTriggered)) + .subscribe(eq(fooRtTopicPartition), anyLong()); + }, AA_ON); + config.setPollStrategy(localPollStrategy) + .setHybridStoreConfig(Optional.of(hybridStoreConfig)) + .setExtraServerProperties(Collections.singletonMap(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, 3L)); + runTest(config); } @Test(dataProvider = "aaConfigProvider") @@ -3973,14 +4082,17 @@ public void testWrappedInterruptExceptionDuringGracefulShutdown(AAConfig aaConfi DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP)); VeniceException veniceException = new VeniceException("Wrapped interruptedException", new InterruptedException()); - runTest(Utils.setOf(PARTITION_FOO), () -> { - doReturn(getOffsetRecord(1, true)).when(mockStorageMetadataService).getLastOffset(topic, PARTITION_FOO); - doThrow(veniceException).when(aggKafkaConsumerService).unsubscribeConsumerFor(eq(pubSubTopic), any()); - }, () -> { + + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).restarted(eq(topic), eq(PARTITION_FOO), anyLong()); storeIngestionTaskUnderTest.close(); verify(aggKafkaConsumerService, timeout(TEST_TIMEOUT_MS)).unsubscribeConsumerFor(eq(pubSubTopic), any()); }, aaConfig); + config.setBeforeStartingConsumption(() -> { + doReturn(getOffsetRecord(1, true)).when(mockStorageMetadataService).getLastOffset(topic, PARTITION_FOO); + doThrow(veniceException).when(aggKafkaConsumerService).unsubscribeConsumerFor(eq(pubSubTopic), any()); + }); + runTest(config); Assert.assertEquals(mockNotifierError.size(), 0); } @@ -4005,9 +4117,8 @@ public void testOffsetSyncBeforeGracefulShutDown(AAConfig aaConfig) throws Excep HybridStoreConfigImpl.DEFAULT_HYBRID_TIME_LAG_THRESHOLD, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP)); - runTest(Utils.setOf(PARTITION_FOO), () -> { - doReturn(getOffsetRecord(0, true)).when(mockStorageMetadataService).getLastOffset(topic, PARTITION_FOO); - }, () -> { + + StoreIngestionTaskTestConfig testConfig = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { // Verify it retrieves the offset from the OffSet Manager verify(mockStorageMetadataService, timeout(TEST_TIMEOUT_MS)).getLastOffset(topic, PARTITION_FOO); @@ -4044,10 +4155,15 @@ public void testOffsetSyncBeforeGracefulShutDown(AAConfig aaConfig) throws Excep // Verify that the underlying storage engine sync function is invoked. verify(mockAbstractStorageEngine, timeout(TEST_TIMEOUT_MS).times(1)).sync(eq(PARTITION_FOO)); - }, aaConfig, configOverride -> { + }, aaConfig); + + testConfig.setHybridStoreConfig(this.hybridStoreConfig).setBeforeStartingConsumption(() -> { + doReturn(getOffsetRecord(0, true)).when(mockStorageMetadataService).getLastOffset(topic, PARTITION_FOO); + }).setStoreVersionConfigOverride(configOverride -> { // set very high threshold so offsetRecord isn't be synced during regular consumption doReturn(100_000L).when(configOverride).getDatabaseSyncBytesIntervalForTransactionalMode(); }); + runTest(testConfig); Assert.assertEquals(mockNotifierError.size(), 0); } @@ -4159,10 +4275,20 @@ public void testShouldPersistRecord() throws Exception { return partitionConsumptionState; }; - runTest(new RandomPollStrategy(), Collections.singleton(PARTITION_FOO), () -> {}, () -> { - PartitionConsumptionState partitionConsumptionState = partitionConsumptionStateSupplier.get(); - assertFalse(storeIngestionTaskUnderTest.shouldPersistRecord(pubSubMessage, partitionConsumptionState)); - }, this.hybridStoreConfig, false, Optional.empty(), AA_OFF, serverProperties); + StoreIngestionTaskTestConfig testConfig = + new StoreIngestionTaskTestConfig(Collections.singleton(PARTITION_FOO), () -> { + PartitionConsumptionState partitionConsumptionState = partitionConsumptionStateSupplier.get(); + assertFalse(storeIngestionTaskUnderTest.shouldPersistRecord(pubSubMessage, partitionConsumptionState)); + }, AA_OFF); + + testConfig.setHybridStoreConfig(this.hybridStoreConfig).setExtraServerProperties(serverProperties); + + runTest(testConfig); + + // runTest(new RandomPollStrategy(), Collections.singleton(PARTITION_FOO), () -> {}, () -> { + // PartitionConsumptionState partitionConsumptionState = partitionConsumptionStateSupplier.get(); + // assertFalse(storeIngestionTaskUnderTest.shouldPersistRecord(pubSubMessage, partitionConsumptionState)); + // }, this.hybridStoreConfig, false, Optional.empty(), AA_OFF, serverProperties); runTest(Collections.singleton(PARTITION_FOO), () -> { PartitionConsumptionState partitionConsumptionState = partitionConsumptionStateSupplier.get(); @@ -4268,6 +4394,7 @@ public void testBatchOnlyStoreDataRecovery() { Optional.empty(), Collections.emptyMap(), true, + null, null).build(); doReturn(Version.parseStoreFromVersionTopic(topic)).when(store).getName(); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( @@ -4571,7 +4698,7 @@ public void testStoreIngestionRecordTransformer(AAConfig aaConfig) throws Except when(valueSchemaEntry.getSchema()).thenReturn(valueSchema); when(mockSchemaRepo.getValueSchema(eq(storeNameWithoutVersionInfo), anyInt())).thenReturn(valueSchemaEntry); - runTest(Collections.singleton(PARTITION_FOO), () -> { + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Collections.singleton(PARTITION_FOO), () -> { TestUtils.waitForNonDeterministicAssertion( 5, TimeUnit.SECONDS, @@ -4588,7 +4715,9 @@ public void testStoreIngestionRecordTransformer(AAConfig aaConfig) throws Except } catch (InterruptedException e) { throw new VeniceException(e); } - }, aaConfig, (storeVersion) -> new TestStringRecordTransformer(storeVersion, true)); + }, aaConfig); + config.setRecordTransformerFunction((storeVersion) -> new TestStringRecordTransformer(storeVersion, true)); + runTest(config); // Transformer error should never be recorded verify(mockVersionedStorageIngestionStats, never()) @@ -4631,7 +4760,7 @@ public void testStoreIngestionRecordTransformerError(AAConfig aaConfig) throws E when(valueSchemaEntry.getSchema()).thenReturn(valueSchema); when(mockSchemaRepo.getValueSchema(eq(storeNameWithoutVersionInfo), anyInt())).thenReturn(valueSchemaEntry); - runTest(Collections.singleton(PARTITION_FOO), () -> { + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Collections.singleton(PARTITION_FOO), () -> { TestUtils.waitForNonDeterministicAssertion( 5, TimeUnit.SECONDS, @@ -4651,7 +4780,9 @@ public void testStoreIngestionRecordTransformerError(AAConfig aaConfig) throws E // Verify transformer error was recorded verify(mockVersionedStorageIngestionStats, timeout(1000)) .recordTransformerError(eq(storeNameWithoutVersionInfo), anyInt(), anyDouble(), anyLong()); - }, aaConfig, (storeVersion) -> new TestStringRecordTransformer(storeVersion, true)); + }, aaConfig); + config.setRecordTransformerFunction((storeVersion) -> new TestStringRecordTransformer(storeVersion, true)); + runTest(config); } public enum RmdState { @@ -4688,76 +4819,76 @@ public void testAssembledValueSizeSensor(AAConfig aaConfig, int testSchemaId, Rm ChunkingTestUtils.createChunkValueManifestRecord(putKeyFoo, messages.get(0), numChunks, tp); messages.add(manifestMessage); - runTest(new RandomPollStrategy(), Collections.singleton(PARTITION_FOO), () -> {}, () -> { - TestUtils.waitForNonDeterministicAssertion( - 5, - TimeUnit.SECONDS, - () -> assertTrue(storeIngestionTaskUnderTest.hasAnySubscription())); + StoreIngestionTaskTestConfig testConfig = + new StoreIngestionTaskTestConfig(Collections.singleton(PARTITION_FOO), () -> { + TestUtils.waitForNonDeterministicAssertion( + 5, + TimeUnit.SECONDS, + () -> assertTrue(storeIngestionTaskUnderTest.hasAnySubscription())); + + for (PubSubMessage message: messages) { + try { + Put put = (Put) message.getValue().getPayloadUnion(); + if (put.schemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) { + put.schemaId = testSchemaId; // set manifest schemaId to testSchemaId to see if metrics are still + // recorded + switch (rmdState) { + case NON_CHUNKED: + put.replicationMetadataPayload = ByteBuffer.allocate(rmdSize + ByteUtils.SIZE_OF_INT); + put.replicationMetadataPayload.position(ByteUtils.SIZE_OF_INT); // for getIntHeaderFromByteBuffer() + break; + case CHUNKED: + put.replicationMetadataPayload = ChunkingTestUtils.createReplicationMetadataPayload(rmdSize); + break; + default: + put.replicationMetadataPayload = VeniceWriter.EMPTY_BYTE_BUFFER; + break; + } + } + LeaderProducedRecordContext leaderProducedRecordContext = mock(LeaderProducedRecordContext.class); + when(leaderProducedRecordContext.getMessageType()).thenReturn(MessageType.PUT); + when(leaderProducedRecordContext.getValueUnion()).thenReturn(put); + when(leaderProducedRecordContext.getKeyBytes()).thenReturn(putKeyFoo); + + storeIngestionTaskUnderTest.produceToStoreBufferService( + message, + leaderProducedRecordContext, + PARTITION_FOO, + localKafkaConsumerService.kafkaUrl, + System.nanoTime(), + System.currentTimeMillis()); + } catch (InterruptedException e) { + throw new VeniceException(e); + } + } - for (PubSubMessage message: messages) { - try { - Put put = (Put) message.getValue().getPayloadUnion(); - if (put.schemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) { - put.schemaId = testSchemaId; // set manifest schemaId to testSchemaId to see if metrics are still recorded - switch (rmdState) { - case NON_CHUNKED: - put.replicationMetadataPayload = ByteBuffer.allocate(rmdSize + ByteUtils.SIZE_OF_INT); - put.replicationMetadataPayload.position(ByteUtils.SIZE_OF_INT); // for getIntHeaderFromByteBuffer() - break; - case CHUNKED: - put.replicationMetadataPayload = ChunkingTestUtils.createReplicationMetadataPayload(rmdSize); - break; - default: - put.replicationMetadataPayload = VeniceWriter.EMPTY_BYTE_BUFFER; - break; + // Verify that the assembled record metrics are only recorded if schemaId=-20 which indicates a manifest + HostLevelIngestionStats stats = storeIngestionTaskUnderTest.hostLevelIngestionStats; + if (testSchemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) { + ArgumentCaptor sizeCaptor = ArgumentCaptor.forClass(long.class); + verify(stats, timeout(1000).times(1)).recordAssembledRecordSize(sizeCaptor.capture(), anyLong()); + assertEquals(sizeCaptor.getValue().longValue(), expectedRecordSize); + verify(stats, timeout(1000).times(1)).recordAssembledRecordSizeRatio(anyDouble(), anyLong()); + + if (rmdState != RmdState.NO_RMD) { + verify(stats, timeout(1000).times(1)).recordAssembledRmdSize(sizeCaptor.capture(), anyLong()); + assertEquals(sizeCaptor.getValue().longValue(), rmdSize); + } else { + verify(stats, times(0)).recordAssembledRmdSize(anyLong(), anyLong()); } + } else { + verify(stats, times(0)).recordAssembledRmdSize(anyLong(), anyLong()); + verify(stats, times(0)).recordAssembledRecordSize(anyLong(), anyLong()); + verify(stats, times(0)).recordAssembledRecordSizeRatio(anyDouble(), anyLong()); } - LeaderProducedRecordContext leaderProducedRecordContext = mock(LeaderProducedRecordContext.class); - when(leaderProducedRecordContext.getMessageType()).thenReturn(MessageType.PUT); - when(leaderProducedRecordContext.getValueUnion()).thenReturn(put); - when(leaderProducedRecordContext.getKeyBytes()).thenReturn(putKeyFoo); + }, aaConfig); - storeIngestionTaskUnderTest.produceToStoreBufferService( - message, - leaderProducedRecordContext, - PARTITION_FOO, - localKafkaConsumerService.kafkaUrl, - System.nanoTime(), - System.currentTimeMillis()); - } catch (InterruptedException e) { - throw new VeniceException(e); - } - } + testConfig.setPollStrategy(new RandomPollStrategy()) + .setHybridStoreConfig(hybridStoreConfig) + .setChunkingEnabled(true) + .setRmdChunkingEnabled(rmdState == RmdState.CHUNKED); - // Verify that the assembled record metrics are only recorded if schemaId=-20 which indicates a manifest - HostLevelIngestionStats stats = storeIngestionTaskUnderTest.hostLevelIngestionStats; - if (testSchemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) { - ArgumentCaptor sizeCaptor = ArgumentCaptor.forClass(long.class); - verify(stats, timeout(1000).times(1)).recordAssembledRecordSize(sizeCaptor.capture(), anyLong()); - assertEquals(sizeCaptor.getValue().longValue(), expectedRecordSize); - verify(stats, timeout(1000).times(1)).recordAssembledRecordSizeRatio(anyDouble(), anyLong()); - - if (rmdState != RmdState.NO_RMD) { - verify(stats, timeout(1000).times(1)).recordAssembledRmdSize(sizeCaptor.capture(), anyLong()); - assertEquals(sizeCaptor.getValue().longValue(), rmdSize); - } else { - verify(stats, times(0)).recordAssembledRmdSize(anyLong(), anyLong()); - } - } else { - verify(stats, times(0)).recordAssembledRmdSize(anyLong(), anyLong()); - verify(stats, times(0)).recordAssembledRecordSize(anyLong(), anyLong()); - verify(stats, times(0)).recordAssembledRecordSizeRatio(anyDouble(), anyLong()); - } - }, - hybridStoreConfig, - false, - true, - rmdState == RmdState.CHUNKED, - Optional.empty(), - aaConfig, - Collections.emptyMap(), - storeVersionConfigOverride -> {}, - null); + runTest(testConfig); } @Test diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java b/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java index e973cc5c2a0..7e925e5a1ef 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java @@ -38,6 +38,7 @@ public class OffsetRecord { public static final long DEFAULT_OFFSET_LAG = -1; public static final String NON_AA_REPLICATION_UPSTREAM_OFFSET_MAP_KEY = ""; // A place holder key private static final String PARTITION_STATE_STRING = "PartitionState"; + private static final String NULL_STRING = "null"; private final PartitionState partitionState; private final InternalAvroSpecificSerializer serializer; @@ -85,6 +86,14 @@ public long getLocalVersionTopicOffset() { return this.partitionState.offset; } + public void setPreviousStatusesEntry(String key, String value) { + partitionState.getPreviousStatuses().put(key, value); + } + + public String getPreviousStatusesEntry(String key) { + return partitionState.getPreviousStatuses().getOrDefault(key, NULL_STRING).toString(); + } + public void setCheckpointLocalVersionTopicOffset(long offset) { this.partitionState.offset = offset; }