Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11495][Sort] Add end-to-end test for Kafka connector v1.18 #11554

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
<properties>
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
<flink.version>1.18.1</flink.version>
<elasticsearch.version>6.8.17</elasticsearch.version>
<flink.shaded.jackson.version>2.15.3-18.0</flink.shaded.jackson.version>
<kafka.clients.version>3.7.1</kafka.clients.version>
</properties>

<dependencies>
Expand All @@ -51,6 +51,10 @@
<artifactId>postgresql</artifactId>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
Expand All @@ -61,16 +65,11 @@
<artifactId>elasticsearch</artifactId>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
<version>${elasticsearch7.version}</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -142,6 +141,17 @@
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.clients.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elasticsearch7.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand All @@ -158,6 +168,23 @@
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-kafka-v1.18</artifactId>
<version>${project.version}</version>
<destFileName>sort-connector-kafka.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-elasticsearch7-v1.18</artifactId>
<version>${project.version}</version>
<destFileName>sort-connector-elasticsearch7.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>

</artifactItems>
</configuration>
<executions>
Expand Down Expand Up @@ -203,6 +230,11 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${plugin.surefire.version}</version>
<configuration>
<systemPropertyVariables>
<log4j.configurationFile>src/test/resources/log4j2-test.properties</log4j.configurationFile>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.tests;

import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.PlaceholderResolver;
import org.apache.inlong.sort.tests.utils.TestUtils;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public class Kafka2Elasticsearch7Test extends FlinkContainerTestEnvJRE8 {

private static final Logger LOG = LoggerFactory.getLogger(Kafka2Elasticsearch7Test.class);
public static final Logger KAFKA_LOG = LoggerFactory.getLogger(KafkaContainer.class);
public static final Logger ELASTICSEARCH_LOGGER = LoggerFactory.getLogger(ElasticsearchContainer.class);

private static final Path kafkaJar = TestUtils.getResource("sort-connector-kafka.jar");
private static final Path elasticsearchJar = TestUtils.getResource("sort-connector-elasticsearch7.jar");

private static final int ELASTICSEARCH_DEFAULT_PORT = 9200;

private static final String FIRST_KAFKA_MESSAGE = "{\"message\":\"Hello From Kafka\"}";
private static final String SECOND_KAFKA_MESSAGE = "{\"message\":\"Goodbye From ElasticSearch\"}";

private static final String FIRST_EXPECTED_MESSAGE = "Hello From Kafka";
private static final String SECOND_EXPECTED_MESSAGE = "Goodbye From ElasticSearch";

private static final String sqlFile;

static {
try {
sqlFile = Paths
.get(Kafka2Elasticsearch7Test.class.getResource("/flinkSql/kafka_to_elasticsearch.sql").toURI())
.toString();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@ClassRule
public static final KafkaContainer KAFKA =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
.withNetwork(NETWORK)
.withNetworkAliases("kafka")
.withEmbeddedZookeeper()
.withLogConsumer(new Slf4jLogConsumer(KAFKA_LOG));

@ClassRule
public static final ElasticsearchContainer ELASTICSEARCH =
new ElasticsearchContainer(DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch:7.17.13"))
.withNetwork(NETWORK)
.withNetworkAliases("elasticsearch")
.withLogConsumer(new Slf4jLogConsumer(ELASTICSEARCH_LOGGER));

@Before
public void setup() throws IOException {
waitUntilJobRunning(Duration.ofSeconds(30));
initializeKafkaTopic("test-topic");
initializeElasticsearchIndex();
}

private void initializeKafkaTopic(String topic) {
String fileName = "kafka_test_kafka_init.txt";
int port = KafkaContainer.ZOOKEEPER_PORT;

Map<String, Object> properties = new HashMap<>();
properties.put("TOPIC", topic);
properties.put("ZOOKEEPER_PORT", port);

try {
String createKafkaStatement = getCreateStatement(fileName, properties);
ExecResult result = KAFKA.execInContainer("bash", "-c", createKafkaStatement);
LOG.info("Create kafka topic: {}, std: {}", createKafkaStatement, result.getStdout());
if (result.getExitCode() != 0) {
throw new RuntimeException("Init kafka topic failed. Exit code:" + result.getExitCode());
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}

private String getCreateStatement(String fileName, Map<String, Object> properties) {
URL url = Objects.requireNonNull(Kafka2Elasticsearch7Test.class.getResource("/env/" + fileName));

try {
Path file = Paths.get(url.toURI());
return PlaceholderResolver.getDefaultResolver().resolveByMap(
new String(Files.readAllBytes(file), StandardCharsets.UTF_8),
properties);
} catch (IOException | URISyntaxException e) {
throw new RuntimeException(e);
}
}

private void initializeElasticsearchIndex() throws IOException {
RestClient restClient = RestClient.builder(
new HttpHost("localhost", ELASTICSEARCH.getMappedPort(ELASTICSEARCH_DEFAULT_PORT), "http"))
.build();
RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);

client.indices().create(c -> c.index("test-index"));
LOG.info("Created Elasticsearch index: test-index");
}

@AfterClass
public static void teardown() {
if (KAFKA != null) {
KAFKA.stop();
}
if (ELASTICSEARCH != null) {
ELASTICSEARCH.stop();
}
}

@Test
public void testKafkaToElasticsearch() throws Exception {
submitSQLJob(sqlFile, kafkaJar, elasticsearchJar);
waitUntilJobRunning(Duration.ofSeconds(10));

// Produce messages to Kafka
org.apache.kafka.clients.producer.KafkaProducer<String, String> producer =
new org.apache.kafka.clients.producer.KafkaProducer<>(getKafkaProducerConfig());
producer.send(
new org.apache.kafka.clients.producer.ProducerRecord<>("test-topic", "key1", FIRST_KAFKA_MESSAGE));
producer.send(
new org.apache.kafka.clients.producer.ProducerRecord<>("test-topic", "key2", SECOND_KAFKA_MESSAGE));

// Query Elasticsearch to verify data is ingested
RestClient restClient = RestClient.builder(
new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http"))
.build();
RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);

List<String> messages = new ArrayList<>();
int maxRetries = 10; // Maximum number of retries (10 seconds)
int retryCount = 0;

while (retryCount < maxRetries) {
co.elastic.clients.elasticsearch.core.SearchRequest searchRequest =
new co.elastic.clients.elasticsearch.core.SearchRequest.Builder()
.index("test-index")
.query(q -> q.matchAll(m -> m))
.build();

co.elastic.clients.elasticsearch.core.SearchResponse<Map> response =
client.search(searchRequest, Map.class);

// Extract `message` fields using Elasticsearch Java API
messages = response.hits().hits().stream()
.map(hit -> {
@SuppressWarnings("unchecked")
Map<String, Object> source = hit.source();
if (source != null && source.containsKey("message")) {
return (String) source.get("message");
}
return null;
})
.filter(Objects::nonNull) // Remove null values
.collect(Collectors.toList());

if (!messages.isEmpty()) {
// Stop polling if data is found
break;
}

// Wait for 1 second before retrying
Thread.sleep(1000);
retryCount++;
}

if (messages.isEmpty()) {
throw new AssertionError("Elasticsearch validation failed: No messages found after polling.");
}

LOG.info("Extracted messages from Elasticsearch: {}", messages);

// Create expected messages list
List<String> expectedMessages = new ArrayList<>();
expectedMessages.add(FIRST_EXPECTED_MESSAGE);
expectedMessages.add(SECOND_EXPECTED_MESSAGE);

// Validate messages against the expected messages
if (new HashSet<>(messages).equals(new HashSet<>(expectedMessages))) {
LOG.info("Elasticsearch contains all expected messages: {}", expectedMessages);
} else {
throw new AssertionError(
String.format("Elasticsearch validation failed. Expected: %s, Found: %s", expectedMessages,
messages));
}
}

private java.util.Properties getKafkaProducerConfig() {
java.util.Properties props = new java.util.Properties();
String bootstrapServers = KAFKA.getBootstrapServers();
props.put("bootstrap.servers", bootstrapServers);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
kafka-topics --create --topic ${TOPIC} --replication-factor 1 --partitions 1 --zookeeper localhost:${ZOOKEEPER_PORT}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
CREATE TABLE kafka_source (
`message` STRING
) WITH (
'connector' = 'kafka-inlong',
'topic' = 'test-topic',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);


CREATE TABLE elasticsearch_sink (
`message` STRING
) WITH (
'connector' = 'elasticsearch7-inlong',
'hosts' = 'http://elasticsearch:9200',
'index' = 'test-index',
'format' = 'json'
);


INSERT INTO elasticsearch_sink
SELECT * FROM kafka_source;
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory
org.apache.inlong.sort.kafka.table.KafkaDynamicTableFactory
org.apache.inlong.sort.kafka.table.UpsertKafkaDynamicTableFactory
Loading