Skip to content

Commit

Permalink
MINOR: Deflake OptimizedKTableIntegrationTest (apache#12186)
Browse files Browse the repository at this point in the history
This test has been flaky due to unexpected rebalances during the test.
This change fixes it by detecting an unexpected rebalance and retrying
the test logic (within a timeout).

Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
vvcephei authored May 20, 2022
1 parent a8e3a25 commit 3f86a18
Showing 1 changed file with 36 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Properties;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -39,10 +40,11 @@
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
Expand All @@ -51,6 +53,7 @@
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
Expand All @@ -60,9 +63,12 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(IntegrationTest.class)
public class OptimizedKTableIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(OptimizedKTableIntegrationTest.class);
private static final int NUM_BROKERS = 1;
private static int port = 0;
private static final String INPUT_TOPIC_NAME = "input-topic";
Expand Down Expand Up @@ -125,31 +131,37 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception {
// Assert that all messages in the first batch were processed in a timely manner
assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));

final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore());
final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore());

final boolean kafkaStreams1WasFirstActive;
final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);

// Assert that the current value in store reflects all messages being processed
if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
kafkaStreams1WasFirstActive = true;
} else {
assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
kafkaStreams1WasFirstActive = false;
}

if (kafkaStreams1WasFirstActive) {
kafkaStreams1.close();
} else {
kafkaStreams2.close();
}
final AtomicReference<ReadOnlyKeyValueStore<Integer, Integer>> newActiveStore = new AtomicReference<>(null);
TestUtils.retryOnExceptionWithTimeout(() -> {
final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore());
final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore());

final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);

try {
// Assert that the current value in store reflects all messages being processed
if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
kafkaStreams1.close();
newActiveStore.set(store2);
} else {
assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
kafkaStreams2.close();
newActiveStore.set(store1);
}
} catch (final InvalidStateStoreException e) {
LOG.warn("Detected an unexpected rebalance during test. Retrying if possible.", e);
throw e;
} catch (final Throwable t) {
LOG.error("Caught non-retriable exception in test. Exiting.", t);
throw new NoRetryException(t);
}
});

final ReadOnlyKeyValueStore<Integer, Integer> newActiveStore = kafkaStreams1WasFirstActive ? store2 : store1;
// Wait for failover
TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> {
// Assert that after failover we have recovered to the last store write
assertThat(newActiveStore.get(key), is(equalTo(batch1NumMessages - 1)));
assertThat(newActiveStore.get().get(key), is(equalTo(batch1NumMessages - 1)));
});

final int totalNumMessages = batch1NumMessages + batch2NumMessages;
Expand All @@ -161,7 +173,7 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception {

TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> {
// Assert that the current value in store reflects all messages being processed
assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1)));
assertThat(newActiveStore.get().get(key), is(equalTo(totalNumMessages - 1)));
});
}

Expand Down

0 comments on commit 3f86a18

Please sign in to comment.