From 33b0cf9b332b654e28495389e4ff373c8c411808 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Wed, 11 Sep 2024 16:22:12 +0700 Subject: [PATCH] Add more test --- .../dagger/core/source/config/StreamConfigTest.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/StreamConfigTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/StreamConfigTest.java index 8c343f8cc..23527ea52 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/StreamConfigTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/StreamConfigTest.java @@ -249,8 +249,18 @@ public void shouldParseMultipleAdditionalConsumerConfigs() { @Test(expected = IllegalArgumentException.class) public void shouldThrowIllegalArgumentExceptionIfAdditionalKafkaPropsNotMatchingPrefix() { + String streamConfig = "[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL\": \"SASL_PLAINTEXT\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM\":\"SCRAM-SHA-512\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG\":\"org.apache.kafka.common.security.scram.ScramLoginModule required username=\\\"username\\\" password=\\\"password\\\";\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"false\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key\", \"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location\"} }, {\"INPUT_SCHEMA_TABLE\": \"data_stream_1\", \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_DATATYPE\": \"JSON\", \"INPUT_SCHEMA_JSON_SCHEMA\": \"{ \\\"$schema\\\": \\\"https://json-schema.org/draft/2020-12/schema\\\", \\\"$id\\\": \\\"https://example.com/product.schema.json\\\", \\\"title\\\": \\\"Product\\\", \\\"description\\\": \\\"A product from Acme's catalog\\\", \\\"type\\\": \\\"object\\\", \\\"properties\\\": { \\\"id\\\": { \\\"description\\\": \\\"The unique identifier for a product\\\", \\\"type\\\": \\\"string\\\" }, \\\"time\\\": { \\\"description\\\": \\\"event timestamp of the event\\\", \\\"type\\\": \\\"string\\\", \\\"format\\\" : \\\"date-time\\\" } }, \\\"required\\\": [ \\\"id\\\", \\\"time\\\" ] }\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"true\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key_2\", \"CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location_2\", \"SOURCE_KAFKA_CONSUMER_CONFIG_OFFSET_FLUSH_INTERVAL_MS\":\"1000\"} } ]"; when(configuration.getString(INPUT_STREAMS, "")) - .thenReturn("[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL\": \"SASL_PLAINTEXT\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM\":\"SCRAM-SHA-512\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG\":\"org.apache.kafka.common.security.scram.ScramLoginModule required username=\\\"username\\\" password=\\\"password\\\";\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"false\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key\", \"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location\"} }, {\"INPUT_SCHEMA_TABLE\": \"data_stream_1\", \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_DATATYPE\": \"JSON\", \"INPUT_SCHEMA_JSON_SCHEMA\": \"{ \\\"$schema\\\": \\\"https://json-schema.org/draft/2020-12/schema\\\", \\\"$id\\\": \\\"https://example.com/product.schema.json\\\", \\\"title\\\": \\\"Product\\\", \\\"description\\\": \\\"A product from Acme's catalog\\\", \\\"type\\\": \\\"object\\\", \\\"properties\\\": { \\\"id\\\": { \\\"description\\\": \\\"The unique identifier for a product\\\", \\\"type\\\": \\\"string\\\" }, \\\"time\\\": { \\\"description\\\": \\\"event timestamp of the event\\\", \\\"type\\\": \\\"string\\\", \\\"format\\\" : \\\"date-time\\\" } }, \\\"required\\\": [ \\\"id\\\", \\\"time\\\" ] }\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"true\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key_2\", \"CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location_2\", \"SOURCE_KAFKA_CONSUMER_CONFIG_OFFSET_FLUSH_INTERVAL_MS\":\"1000\"} } ]"); + .thenReturn(streamConfig); + + StreamConfig.parse(configuration); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionIfAdditionalKafkaPropsNotMatchingPrefixMissingUnderscore() { + String streamConfig = "[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL\": \"SASL_PLAINTEXT\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM\":\"SCRAM-SHA-512\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG\":\"org.apache.kafka.common.security.scram.ScramLoginModule required username=\\\"username\\\" password=\\\"password\\\";\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"false\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key\", \"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location\"} }, {\"INPUT_SCHEMA_TABLE\": \"data_stream_1\", \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_DATATYPE\": \"JSON\", \"INPUT_SCHEMA_JSON_SCHEMA\": \"{ \\\"$schema\\\": \\\"https://json-schema.org/draft/2020-12/schema\\\", \\\"$id\\\": \\\"https://example.com/product.schema.json\\\", \\\"title\\\": \\\"Product\\\", \\\"description\\\": \\\"A product from Acme's catalog\\\", \\\"type\\\": \\\"object\\\", \\\"properties\\\": { \\\"id\\\": { \\\"description\\\": \\\"The unique identifier for a product\\\", \\\"type\\\": \\\"string\\\" }, \\\"time\\\": { \\\"description\\\": \\\"event timestamp of the event\\\", \\\"type\\\": \\\"string\\\", \\\"format\\\" : \\\"date-time\\\" } }, \\\"required\\\": [ \\\"id\\\", \\\"time\\\" ] }\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"true\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key_2\", \"SOURCE_KAFKACONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location_2\", \"SOURCE_KAFKA_CONSUMER_CONFIG_OFFSET_FLUSH_INTERVAL_MS\":\"1000\"} } ]"; + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn(streamConfig); StreamConfig.parse(configuration); }