Skip to content

Commit

Permalink
KAFKA-6474: remove KStreamTestDriver (apache#6732)
Browse files Browse the repository at this point in the history
The implementation of KIP-258 broke the state store methods in KStreamTestDriver.
These methods were unused in this project, so the breakage was not detected.
Since this is an internal testing utility, and it was deprecated and partially removed in
favor of TopologyTestDriver, I opted to just complete the removal of the class.

Reviewers: A. Sophie Blee-Goldman <[email protected]>, Boyang Chen <[email protected]>, Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
vvcephei authored and guozhangwang committed May 19, 2019
1 parent b521703 commit c140f09
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 570 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
Expand All @@ -30,24 +31,22 @@
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Rule;
import org.junit.Test;

import java.time.Duration;
import java.util.Properties;

import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.junit.Assert.assertEquals;

public class KStreamTransformTest {
private final String topicName = "topic";
private static final String TOPIC_NAME = "topic";
private final ConsumerRecordFactory<Integer, Integer> recordFactory =
new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer(), 0L);
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());

@SuppressWarnings("deprecation")
@Rule
public final org.apache.kafka.test.KStreamTestDriver kstreamDriver = new org.apache.kafka.test.KStreamTestDriver();

@Test
public void testTransform() {
final StreamsBuilder builder = new StreamsBuilder();
Expand All @@ -57,7 +56,13 @@ public void testTransform() {
private int total = 0;

@Override
public void init(final ProcessorContext context) {}
public void init(final ProcessorContext context) {
context.schedule(
Duration.ofMillis(1),
PunctuationType.WALL_CLOCK_TIME,
timestamp -> context.forward(-1, (int) timestamp)
);
}

@Override
public KeyValue<Integer, Integer> transform(final Number key, final Number value) {
Expand All @@ -72,27 +77,39 @@ public void close() {}
final int[] expectedKeys = {1, 10, 100, 1000};

final MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
final KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
final KStream<Integer, Integer> stream = builder.stream(TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()));
stream.transform(transformerSupplier).process(processor);

kstreamDriver.setUp(builder);
for (final int expectedKey : expectedKeys) {
kstreamDriver.setTime(expectedKey / 2L);
kstreamDriver.process(topicName, expectedKey, expectedKey * 10);
}

// TODO: un-comment after replaced with TopologyTestDriver
//kstreamDriver.punctuate(2);
//kstreamDriver.punctuate(3);
try (final TopologyTestDriver driver = new TopologyTestDriver(
builder.build(),
mkProperties(mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test")
)),
0L)) {
final ConsumerRecordFactory<Integer, Integer> recordFactory =
new ConsumerRecordFactory<>(TOPIC_NAME, new IntegerSerializer(), new IntegerSerializer());

//assertEquals(6, processor.theCapturedProcessor().processed.size());
for (final int expectedKey : expectedKeys) {
driver.pipeInput(recordFactory.create(expectedKey, expectedKey * 10, expectedKey / 2L));
}

//String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"};
driver.advanceWallClockTime(2);
driver.advanceWallClockTime(1);

final String[] expected = {"2:10 (ts: 0)", "20:110 (ts: 5)", "200:1110 (ts: 50)", "2000:11110 (ts: 500)"};
final String[] expected = {
"2:10 (ts: 0)",
"20:110 (ts: 5)",
"200:1110 (ts: 50)",
"2000:11110 (ts: 500)",
"-1:2 (ts: 2)",
"-1:3 (ts: 3)"
};

for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], processor.theCapturedProcessor().processed.get(i));
assertEquals(expected.length, processor.theCapturedProcessor().processed.size());
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], processor.theCapturedProcessor().processed.get(i));
}
}
}

Expand Down Expand Up @@ -125,15 +142,15 @@ public void close() {}
final int[] expectedKeys = {1, 10, 100, 1000};

final MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
final KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
final KStream<Integer, Integer> stream = builder.stream(TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()));
stream.transform(transformerSupplier).process(processor);

try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
for (final int expectedKey : expectedKeys) {
driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
driver.pipeInput(recordFactory.create(TOPIC_NAME, expectedKey, expectedKey * 10, 0L));
}

// This tick will yield yields the "-1:2" result
// This tick yields the "-1:2" result
driver.advanceWallClockTime(2);
// This tick further advances the clock to 3, which leads to the "-1:3" result
driver.advanceWallClockTime(1);
Expand Down
Loading

0 comments on commit c140f09

Please sign in to comment.