From c140f09406b119d86d43bba6248d17fe0120a4dd Mon Sep 17 00:00:00 2001 From: John Roesler Date: Sat, 18 May 2019 21:18:14 -0500 Subject: [PATCH] KAFKA-6474: remove KStreamTestDriver (#6732) The implementation of KIP-258 broke the state store methods in KStreamTestDriver. These methods were unused in this project, so the breakage was not detected. Since this is an internal testing utility, and it was deprecated and partially removed in favor of TopologyTestDriver, I opted to just complete the removal of the class. Reviewers: A. Sophie Blee-Goldman , Boyang Chen , Bill Bejeck , Matthias J. Sax , Guozhang Wang --- .../internals/KStreamTransformTest.java | 67 +-- .../internals/KTableAggregateTest.java | 411 ++++++------------ .../apache/kafka/test/KStreamTestDriver.java | 277 ------------ .../kafka/streams/TopologyTestDriver.java | 1 - 4 files changed, 186 insertions(+), 570 deletions(-) delete mode 100644 streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java index 8f87d409a9dc9..fcf6aea7e9c63 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; @@ -30,24 +31,22 @@ import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.StreamsTestUtils; -import org.junit.Rule; import org.junit.Test; import java.time.Duration; import java.util.Properties; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.junit.Assert.assertEquals; public class KStreamTransformTest { - private final String topicName = "topic"; + private static final String TOPIC_NAME = "topic"; private final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer(), 0L); private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer()); - @SuppressWarnings("deprecation") - @Rule - public final org.apache.kafka.test.KStreamTestDriver kstreamDriver = new org.apache.kafka.test.KStreamTestDriver(); - @Test public void testTransform() { final StreamsBuilder builder = new StreamsBuilder(); @@ -57,7 +56,13 @@ public void testTransform() { private int total = 0; @Override - public void init(final ProcessorContext context) {} + public void init(final ProcessorContext context) { + context.schedule( + Duration.ofMillis(1), + PunctuationType.WALL_CLOCK_TIME, + timestamp -> context.forward(-1, (int) timestamp) + ); + } @Override public KeyValue transform(final Number key, final Number value) { @@ -72,27 +77,39 @@ public void close() {} final int[] expectedKeys = {1, 10, 100, 1000}; final MockProcessorSupplier processor = new MockProcessorSupplier<>(); - final KStream stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer())); + final KStream stream = builder.stream(TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())); stream.transform(transformerSupplier).process(processor); - kstreamDriver.setUp(builder); - for (final int expectedKey : expectedKeys) { - kstreamDriver.setTime(expectedKey / 2L); - kstreamDriver.process(topicName, expectedKey, expectedKey * 10); - } - - // TODO: un-comment after replaced with TopologyTestDriver - //kstreamDriver.punctuate(2); - //kstreamDriver.punctuate(3); + try (final TopologyTestDriver driver = new TopologyTestDriver( + builder.build(), + mkProperties(mkMap( + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"), + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test") + )), + 0L)) { + final ConsumerRecordFactory recordFactory = + new ConsumerRecordFactory<>(TOPIC_NAME, new IntegerSerializer(), new IntegerSerializer()); - //assertEquals(6, processor.theCapturedProcessor().processed.size()); + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(expectedKey, expectedKey * 10, expectedKey / 2L)); + } - //String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"}; + driver.advanceWallClockTime(2); + driver.advanceWallClockTime(1); - final String[] expected = {"2:10 (ts: 0)", "20:110 (ts: 5)", "200:1110 (ts: 50)", "2000:11110 (ts: 500)"}; + final String[] expected = { + "2:10 (ts: 0)", + "20:110 (ts: 5)", + "200:1110 (ts: 50)", + "2000:11110 (ts: 500)", + "-1:2 (ts: 2)", + "-1:3 (ts: 3)" + }; - for (int i = 0; i < expected.length; i++) { - assertEquals(expected[i], processor.theCapturedProcessor().processed.get(i)); + assertEquals(expected.length, processor.theCapturedProcessor().processed.size()); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.theCapturedProcessor().processed.get(i)); + } } } @@ -125,15 +142,15 @@ public void close() {} final int[] expectedKeys = {1, 10, 100, 1000}; final MockProcessorSupplier processor = new MockProcessorSupplier<>(); - final KStream stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer())); + final KStream stream = builder.stream(TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())); stream.transform(transformerSupplier).process(processor); try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { for (final int expectedKey : expectedKeys) { - driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L)); + driver.pipeInput(recordFactory.create(TOPIC_NAME, expectedKey, expectedKey * 10, 0L)); } - // This tick will yield yields the "-1:2" result + // This tick yields the "-1:2" result driver.advanceWallClockTime(2); // This tick further advances the clock to 3, which leads to the "-1:3" result driver.advanceWallClockTime(1); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index 6144051b5af1d..b704e13c3250a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -18,52 +18,38 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.TestUtils; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import java.io.File; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.junit.Assert.assertEquals; -@SuppressWarnings("deprecation") public class KTableAggregateTest { private final Serde stringSerde = Serdes.String(); private final Consumed consumed = Consumed.with(stringSerde, stringSerde); - private final Grouped stringSerialzied = Grouped.with(stringSerde, stringSerde); + private final Grouped stringSerialized = Grouped.with(stringSerde, stringSerde); private final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); - private File stateDir = null; - - @Rule - public EmbeddedKafkaCluster cluster = null; - @Rule - public final org.apache.kafka.test.KStreamTestDriver driver = new org.apache.kafka.test.KStreamTestDriver(); - - @Before - public void setUp() { - stateDir = TestUtils.tempDirectory("kafka-test"); - } - @Test public void testAggBasic() { final StreamsBuilder builder = new StreamsBuilder(); @@ -73,7 +59,7 @@ public void testAggBasic() { final KTable table2 = table1 .groupBy( MockMapper.noOpKeyValueMapper(), - stringSerialzied) + stringSerialized) .aggregate( MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, @@ -83,76 +69,43 @@ public void testAggBasic() { table2.toStream().process(supplier); - driver.setUp(builder, stateDir, Serdes.String(), Serdes.String()); - - driver.setTime(10L); - driver.process(topic1, "A", "1"); - driver.flushState(); - driver.setTime(15L); - driver.process(topic1, "B", "2"); - driver.flushState(); - driver.setTime(20L); - driver.process(topic1, "A", "3"); - driver.flushState(); - driver.setTime(18L); - driver.process(topic1, "B", "4"); - driver.flushState(); - driver.setTime(5L); - driver.process(topic1, "C", "5"); - driver.flushState(); - driver.setTime(25L); - driver.process(topic1, "D", "6"); - driver.flushState(); - driver.setTime(15L); - driver.process(topic1, "B", "7"); - driver.flushState(); - driver.setTime(10L); - driver.process(topic1, "C", "8"); - driver.flushState(); - - assertEquals( - asList( - "A:0+1 (ts: 10)", - "B:0+2 (ts: 15)", - "A:0+1-1+3 (ts: 20)", - "B:0+2-2+4 (ts: 18)", - "C:0+5 (ts: 5)", - "D:0+6 (ts: 25)", - "B:0+2-2+4-4+7 (ts: 18)", - "C:0+5-5+8 (ts: 10)"), - supplier.theCapturedProcessor().processed); - } - - - @Test - public void testAggCoalesced() { - final StreamsBuilder builder = new StreamsBuilder(); - final String topic1 = "topic1"; - - final KTable table1 = builder.table(topic1, consumed); - final KTable table2 = table1 - .groupBy( - MockMapper.noOpKeyValueMapper(), - stringSerialzied) - .aggregate(MockInitializer.STRING_INIT, - MockAggregator.TOSTRING_ADDER, - MockAggregator.TOSTRING_REMOVER, - Materialized.>as("topic1-Canonized") - .withValueSerde(stringSerde)); - - table2.toStream().process(supplier); - - driver.setUp(builder, stateDir); - - driver.setTime(10L); - driver.process(topic1, "A", "1"); - driver.setTime(20L); - driver.process(topic1, "A", "3"); - driver.setTime(15L); - driver.process(topic1, "A", "4"); - driver.flushState(); - - assertEquals(Collections.singletonList("A:0+4 (ts: 15)"), supplier.theCapturedProcessor().processed); + try ( + final TopologyTestDriver driver = new TopologyTestDriver( + builder.build(), + mkProperties(mkMap( + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"), + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"), + mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath()) + )), + 0L)) { + final ConsumerRecordFactory recordFactory = + new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer(), 0L, 0L); + + driver.pipeInput(recordFactory.create(topic1, "A", "1", 10L)); + driver.pipeInput(recordFactory.create(topic1, "B", "2", 15L)); + driver.pipeInput(recordFactory.create(topic1, "A", "3", 20L)); + driver.pipeInput(recordFactory.create(topic1, "B", "4", 18L)); + driver.pipeInput(recordFactory.create(topic1, "C", "5", 5L)); + driver.pipeInput(recordFactory.create(topic1, "D", "6", 25L)); + driver.pipeInput(recordFactory.create(topic1, "B", "7", 15L)); + driver.pipeInput(recordFactory.create(topic1, "C", "8", 10L)); + + assertEquals( + asList( + "A:0+1 (ts: 10)", + "B:0+2 (ts: 15)", + "A:0+1-1 (ts: 20)", + "A:0+1-1+3 (ts: 20)", + "B:0+2-2 (ts: 18)", + "B:0+2-2+4 (ts: 18)", + "C:0+5 (ts: 5)", + "D:0+6 (ts: 25)", + "B:0+2-2+4-4 (ts: 18)", + "B:0+2-2+4-4+7 (ts: 18)", + "C:0+5-5 (ts: 10)", + "C:0+5-5+8 (ts: 10)"), + supplier.theCapturedProcessor().processed); + } } @Test @@ -173,7 +126,7 @@ public void testAggRepartition() { return KeyValue.pair(value, value); } }, - stringSerialzied) + stringSerialized) .aggregate( MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, @@ -183,78 +136,74 @@ public void testAggRepartition() { table2.toStream().process(supplier); - driver.setUp(builder, stateDir); - - driver.setTime(10L); - driver.process(topic1, "A", "1"); - driver.flushState(); - driver.setTime(15L); - driver.process(topic1, "A", null); - driver.flushState(); - driver.setTime(12L); - driver.process(topic1, "A", "1"); - driver.flushState(); - driver.setTime(20L); - driver.process(topic1, "B", "2"); - driver.flushState(); - driver.setTime(25L); - driver.process(topic1, "null", "3"); - driver.flushState(); - driver.setTime(23L); - driver.process(topic1, "B", "4"); - driver.flushState(); - driver.setTime(24L); - driver.process(topic1, "NULL", "5"); - driver.flushState(); - driver.setTime(22L); - driver.process(topic1, "B", "7"); - driver.flushState(); - - assertEquals( - asList( - "1:0+1 (ts: 10)", - "1:0+1-1 (ts: 15)", - "1:0+1-1+1 (ts: 15)", - "2:0+2 (ts: 20)", - //noop - "2:0+2-2 (ts: 23)", "4:0+4 (ts: 23)", - //noop - "4:0+4-4 (ts: 23)", "7:0+7 (ts: 22)"), - supplier.theCapturedProcessor().processed); + try ( + final TopologyTestDriver driver = new TopologyTestDriver( + builder.build(), + mkProperties(mkMap( + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"), + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"), + mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath()) + )), + 0L)) { + final ConsumerRecordFactory recordFactory = + new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer(), 0L, 0L); + + driver.pipeInput(recordFactory.create(topic1, "A", "1", 10L)); + driver.pipeInput(recordFactory.create(topic1, "A", (String) null, 15L)); + driver.pipeInput(recordFactory.create(topic1, "A", "1", 12L)); + driver.pipeInput(recordFactory.create(topic1, "B", "2", 20L)); + driver.pipeInput(recordFactory.create(topic1, "null", "3", 25L)); + driver.pipeInput(recordFactory.create(topic1, "B", "4", 23L)); + driver.pipeInput(recordFactory.create(topic1, "NULL", "5", 24L)); + driver.pipeInput(recordFactory.create(topic1, "B", "7", 22L)); + + assertEquals( + asList( + "1:0+1 (ts: 10)", + "1:0+1-1 (ts: 15)", + "1:0+1-1+1 (ts: 15)", + "2:0+2 (ts: 20)", + //noop + "2:0+2-2 (ts: 23)", "4:0+4 (ts: 23)", + //noop + "4:0+4-4 (ts: 23)", "7:0+7 (ts: 22)"), + supplier.theCapturedProcessor().processed); + } } - private void testCountHelper(final StreamsBuilder builder, - final String input, - final MockProcessorSupplier supplier) { - driver.setUp(builder, stateDir); - - driver.setTime(10L); - driver.process(input, "A", "green"); - driver.flushState(); - driver.setTime(9L); - driver.process(input, "B", "green"); - driver.flushState(); - driver.setTime(12L); - driver.process(input, "A", "blue"); - driver.flushState(); - driver.setTime(15L); - driver.process(input, "C", "yellow"); - driver.flushState(); - driver.setTime(11L); - driver.process(input, "D", "green"); - driver.flushState(); - driver.flushState(); - - assertEquals( - asList( - "green:1 (ts: 10)", - "green:2 (ts: 10)", - "green:1 (ts: 12)", "blue:1 (ts: 12)", - "yellow:1 (ts: 15)", - "green:2 (ts: 12)"), - supplier.theCapturedProcessor().processed); + private static void testCountHelper(final StreamsBuilder builder, + final String input, + final MockProcessorSupplier supplier) { + try ( + final TopologyTestDriver driver = new TopologyTestDriver( + builder.build(), + mkProperties(mkMap( + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"), + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"), + mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath()) + )), + 0L)) { + final ConsumerRecordFactory recordFactory = + new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer(), 0L, 0L); + + driver.pipeInput(recordFactory.create(input, "A", "green", 10L)); + driver.pipeInput(recordFactory.create(input, "B", "green", 9L)); + driver.pipeInput(recordFactory.create(input, "A", "blue", 12L)); + driver.pipeInput(recordFactory.create(input, "C", "yellow", 15L)); + driver.pipeInput(recordFactory.create(input, "D", "green", 11L)); + + assertEquals( + asList( + "green:1 (ts: 10)", + "green:2 (ts: 10)", + "green:1 (ts: 12)", "blue:1 (ts: 12)", + "yellow:1 (ts: 15)", + "green:2 (ts: 12)"), + supplier.theCapturedProcessor().processed); + } } + @Test public void testCount() { final StreamsBuilder builder = new StreamsBuilder(); @@ -262,7 +211,7 @@ public void testCount() { builder .table(input, consumed) - .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied) + .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialized) .count(Materialized.as("count")) .toStream() .process(supplier); @@ -277,7 +226,7 @@ public void testCountWithInternalStore() { builder .table(input, consumed) - .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied) + .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialized) .count() .toStream() .process(supplier); @@ -285,43 +234,6 @@ public void testCountWithInternalStore() { testCountHelper(builder, input, supplier); } - @Test - public void testCountCoalesced() { - final StreamsBuilder builder = new StreamsBuilder(); - final String input = "count-test-input"; - final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); - - builder - .table(input, consumed) - .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied) - .count(Materialized.as("count")) - .toStream() - .process(supplier); - - driver.setUp(builder, stateDir); - - final MockProcessor proc = supplier.theCapturedProcessor(); - - driver.setTime(10L); - driver.process(input, "A", "green"); - driver.setTime(8L); - driver.process(input, "B", "green"); - driver.setTime(9L); - driver.process(input, "A", "blue"); - driver.setTime(10L); - driver.process(input, "C", "yellow"); - driver.setTime(15L); - driver.process(input, "D", "green"); - driver.flushState(); - - assertEquals( - asList( - "blue:1 (ts: 9)", - "yellow:1 (ts: 10)", - "green:2 (ts: 15)"), - proc.processed); - } - @Test public void testRemoveOldBeforeAddNew() { final StreamsBuilder builder = new StreamsBuilder(); @@ -334,7 +246,7 @@ public void testRemoveOldBeforeAddNew() { (key, value) -> KeyValue.pair( String.valueOf(key.charAt(0)), String.valueOf(key.charAt(1))), - stringSerialzied) + stringSerialized) .aggregate( () -> "", (aggKey, value, aggregate) -> aggregate + value, @@ -344,70 +256,35 @@ public void testRemoveOldBeforeAddNew() { .toStream() .process(supplier); - driver.setUp(builder, stateDir); - - final MockProcessor proc = supplier.theCapturedProcessor(); - - driver.setTime(10L); - driver.process(input, "11", "A"); - driver.flushState(); - driver.setTime(8L); - driver.process(input, "12", "B"); - driver.flushState(); - driver.setTime(12L); - driver.process(input, "11", null); - driver.flushState(); - driver.setTime(6L); - driver.process(input, "12", "C"); - driver.flushState(); - - assertEquals( - asList( - "1:1 (ts: 10)", - "1:12 (ts: 10)", - "1:2 (ts: 12)", - "1:2 (ts: 12)"), - proc.processed); - } - - @Test - public void shouldForwardToCorrectProcessorNodeWhenMultiCacheEvictions() { - final String tableOne = "tableOne"; - final String tableTwo = "tableTwo"; - final StreamsBuilder builder = new StreamsBuilder(); - final String reduceTopic = "TestDriver-reducer-store-repartition"; - final Map reduceResults = new HashMap<>(); - - final KTable one = builder.table(tableOne, consumed); - final KTable two = builder.table(tableTwo, Consumed.with(Serdes.Long(), Serdes.String())); - - final KTable reduce = two - .groupBy( - (key, value) -> new KeyValue<>(value, key), - Grouped.with(Serdes.String(), Serdes.Long())) - .reduce( - (value1, value2) -> value1 + value2, - (value1, value2) -> value1 - value2, - Materialized.as("reducer-store")); - - reduce.toStream().foreach(reduceResults::put); - - one.leftJoin(reduce, (value1, value2) -> value1 + ":" + value2) - .mapValues(value -> value); - - driver.setUp(builder, stateDir, 111); - driver.process(reduceTopic, "1", new Change<>(1L, null)); - driver.process("tableOne", "2", "2"); - // this should trigger eviction on the reducer-store topic - driver.process(reduceTopic, "2", new Change<>(2L, null)); - // this wont as it is the same value - driver.process(reduceTopic, "2", new Change<>(2L, null)); - assertEquals(Long.valueOf(2L), reduceResults.get("2")); - - // this will trigger eviction on the tableOne topic - // that in turn will cause an eviction on reducer-topic. It will flush - // key 2 as it is the only dirty entry in the cache - driver.process("tableOne", "1", "5"); - assertEquals(Long.valueOf(4L), reduceResults.get("2")); + try ( + final TopologyTestDriver driver = new TopologyTestDriver( + builder.build(), + mkProperties(mkMap( + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"), + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"), + mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath()) + )), + 0L)) { + final ConsumerRecordFactory recordFactory = + new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer(), 0L, 0L); + + final MockProcessor proc = supplier.theCapturedProcessor(); + + driver.pipeInput(recordFactory.create(input, "11", "A", 10L)); + driver.pipeInput(recordFactory.create(input, "12", "B", 8L)); + driver.pipeInput(recordFactory.create(input, "11", (String) null, 12L)); + driver.pipeInput(recordFactory.create(input, "12", "C", 6L)); + + assertEquals( + asList( + "1:1 (ts: 10)", + "1:12 (ts: 10)", + "1:2 (ts: 12)", + "1: (ts: 12)", + "1:2 (ts: 12)" + ), + proc.processed + ); + } } } diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java deleted file mode 100644 index b83936b8df512..0000000000000 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.test; - -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.TopologyWrapper; -import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StreamPartitioner; -import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; -import org.apache.kafka.streams.processor.internals.ProcessorNode; -import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.processor.internals.ProcessorTopology; -import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; -import org.apache.kafka.streams.state.internals.ThreadCache; -import org.junit.rules.ExternalResource; - -import java.io.File; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; - -/** - * KStreamTestDriver - * - * @deprecated please use {@link org.apache.kafka.streams.TopologyTestDriver} instead - */ -@Deprecated -public class KStreamTestDriver extends ExternalResource { - - private static final long DEFAULT_CACHE_SIZE_BYTES = 1024 * 1024L; - - private ProcessorTopology topology; - private InternalMockProcessorContext context; - private ProcessorTopology globalTopology; - private final LogContext logContext = new LogContext("testCache "); - - public void setUp(final StreamsBuilder builder) { - setUp(builder, null, Serdes.ByteArray(), Serdes.ByteArray()); - } - - public void setUp(final StreamsBuilder builder, final File stateDir) { - setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray()); - } - - public void setUp(final StreamsBuilder builder, final File stateDir, final long cacheSize) { - setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize); - } - - public void setUp(final StreamsBuilder builder, - final File stateDir, - final Serde keySerde, - final Serde valSerde) { - setUp(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES); - } - - public void setUp(final StreamsBuilder builder, - final File stateDir, - final Serde keySerde, - final Serde valSerde, - final long cacheSize) { - final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build()); - - internalTopologyBuilder.setApplicationId("TestDriver"); - topology = internalTopologyBuilder.build(null); - globalTopology = internalTopologyBuilder.buildGlobalStateTopology(); - - final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new Metrics())); - context = new InternalMockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache); - context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic", null)); - - // init global topology first as it will add stores to the - // store map that are required for joins etc. - if (globalTopology != null) { - initTopology(globalTopology, globalTopology.globalStateStores()); - } - initTopology(topology, topology.stateStores()); - } - - @Override - protected void after() { - if (topology != null) { - close(); - } - } - - private void initTopology(final ProcessorTopology topology, final List stores) { - for (final StateStore store : stores) { - try { - store.init(context, store); - } catch (final RuntimeException e) { - new RuntimeException("Fatal exception initializing store.", e).printStackTrace(); - throw e; - } - } - - for (final ProcessorNode node : topology.processors()) { - context.setCurrentNode(node); - try { - node.init(context); - } finally { - context.setCurrentNode(null); - } - } - } - - public ProcessorTopology topology() { - return topology; - } - - public ProcessorContext context() { - return context; - } - - public void process(final String topicName, final Object key, final Object value) { - final ProcessorNode prevNode = context.currentNode(); - final ProcessorNode currNode = sourceNodeByTopicName(topicName); - - if (currNode != null) { - context.setRecordContext(createRecordContext(topicName, context.timestamp())); - context.setCurrentNode(currNode); - try { - context.forward(key, value); - } finally { - context.setCurrentNode(prevNode); - } - } - } - - private ProcessorNode sourceNodeByTopicName(final String topicName) { - ProcessorNode topicNode = topology.source(topicName); - if (topicNode == null) { - for (final String sourceTopic : topology.sourceTopics()) { - if (Pattern.compile(sourceTopic).matcher(topicName).matches()) { - return topology.source(sourceTopic); - } - } - if (globalTopology != null) { - topicNode = globalTopology.source(topicName); - } - } - - return topicNode; - } - - public void setTime(final long timestamp) { - context.setTime(timestamp); - } - - public void close() { - // close all processors - for (final ProcessorNode node : topology.processors()) { - context.setCurrentNode(node); - try { - node.close(); - } finally { - context.setCurrentNode(null); - } - } - - closeState(); - } - - public Set allProcessorNames() { - final Set names = new HashSet<>(); - - final List nodes = topology.processors(); - - for (final ProcessorNode node : nodes) { - names.add(node.name()); - } - - return names; - } - - public ProcessorNode processor(final String name) { - final List nodes = topology.processors(); - - for (final ProcessorNode node : nodes) { - if (node.name().equals(name)) { - return node; - } - } - - return null; - } - - public Map allStateStores() { - return context.allStateStores(); - } - - public void flushState() { - for (final StateStore stateStore : context.allStateStores().values()) { - stateStore.flush(); - } - } - - private void closeState() { - // we need to first flush all stores before trying to close any one - // of them since the flushing could cause eviction and hence tries to access other stores - flushState(); - - for (final StateStore stateStore : context.allStateStores().values()) { - stateStore.close(); - } - } - - private ProcessorRecordContext createRecordContext(final String topicName, final long timestamp) { - return new ProcessorRecordContext(timestamp, -1, -1, topicName, null); - } - - private class MockRecordCollector extends RecordCollectorImpl { - MockRecordCollector() { - super("KStreamTestDriver", new LogContext("KStreamTestDriver "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")); - } - - @Override - public void send(final String topic, - final K key, - final V value, - final Headers headers, - final Long timestamp, - final Serializer keySerializer, - final Serializer valueSerializer, - final StreamPartitioner partitioner) { - // The serialization is skipped. - if (sourceNodeByTopicName(topic) != null) { - process(topic, key, value); - } - } - - @Override - public void send(final String topic, - final K key, - final V value, - final Headers headers, - final Integer partition, - final Long timestamp, - final Serializer keySerializer, - final Serializer valueSerializer) { - // The serialization is skipped. - if (sourceNodeByTopicName(topic) != null) { - process(topic, key, value); - } - } - - @Override - public void flush() {} - - @Override - public void close() {} - } -} diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 23dbf3026a7e3..38da0d8250907 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -587,7 +587,6 @@ public ProducerRecord readOutput(final String topic, * @see #getTimestampedWindowStore(String) * @see #getSessionStore(String) */ - @SuppressWarnings("WeakerAccess") public Map getAllStateStores() { final Map allStores = new HashMap<>(); for (final String storeName : internalTopologyBuilder.allStateStoreName()) {