Skip to content

Commit

Permalink
bump to Kafka2Elasticsearch7Test.java
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterZh6 committed Nov 14, 2024
1 parent eaea173 commit 6806c7c
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 925 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
<properties>
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
<flink.version>1.18.1</flink.version>
<elasticsearch.version>6.8.17</elasticsearch.version>
<elasticsearch6.version>6.8.17</elasticsearch6.version>
<elasticsearch7.version>7.17.24</elasticsearch7.version>
<flink.shaded.jackson.version>2.15.3-18.0</flink.shaded.jackson.version>
</properties>

Expand All @@ -55,11 +56,6 @@
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
Expand All @@ -73,13 +69,13 @@
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
<version>${elasticsearch6.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
<version>${elasticsearch6.version}</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -151,6 +147,16 @@
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elasticsearch7.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand All @@ -175,33 +181,14 @@
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.jdbc.version}</version>
<destFileName>mysql-driver.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</dependency>
<!-- temporary dependency for kafka end2end tests, generated from v1.15 START-->
<!-- REMOVE THIS WHEN THE CORRESPONDING CONNECTORS FOR 1.18 ARE FINISHED-->
<artifactItem>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-mysql-cdc-v1.15</artifactId>
<version>${project.version}</version>
<destFileName>sort-connector-mysql-cdc.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-starrocks-v1.15</artifactId>
<artifactId>sort-connector-elasticsearch7-v1.18</artifactId>
<version>${project.version}</version>
<destFileName>sort-connector-starrocks.jar</destFileName>
<destFileName>sort-connector-elasticsearch7.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
<!-- temporary dependency for kafka end2end tests, generated from v1.15 END-->

</artifactItems>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.tests;

import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.TestUtils;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.List;

public class Kafka2Elasticsearch7Test extends FlinkContainerTestEnvJRE8 {

private static final Logger LOG = LoggerFactory.getLogger(Kafka2Elasticsearch7Test.class);

private static final Path kafkaJar = TestUtils.getResource("sort-connector-kafka.jar");
private static final Path elasticsearchJar = TestUtils.getResource("sort-connector-elasticsearch7.jar");

private static final String sqlFile;

static {
try {
sqlFile = Paths
.get(Kafka2Elasticsearch7Test.class.getResource("/flinkSql/kafka_to_elasticsearch.sql").toURI())
.toString();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@ClassRule
public static final KafkaContainer KAFKA =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
.withNetwork(NETWORK)
.withNetworkAliases("kafka");

@ClassRule
public static final ElasticsearchContainer ELASTICSEARCH =
new ElasticsearchContainer(DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch:7.17.24"))
.withExposedPorts(9200)
.withNetwork(NETWORK)
.withNetworkAliases("elasticsearch");

@Before
public void setup() {
waitUntilJobRunning(Duration.ofSeconds(30));
initializeKafkaTopic("test-topic");
initializeElasticsearchIndex();
}

private void initializeKafkaTopic(String topic) {
try {
Container.ExecResult result = KAFKA.execInContainer("kafka-topics", "--create", "--topic", topic,
"--bootstrap-server", "localhost:9093", "--partitions", "1", "--replication-factor", "1");
LOG.info("Kafka topic created: {}", result.getStdout());
} catch (Exception e) {
throw new RuntimeException("Failed to initialize Kafka topic", e);
}
}

private void initializeElasticsearchIndex() {
try (RestClient restClient =
RestClient.builder(new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http")).build()) {
RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);

// Create Elasticsearch index
client.indices().create(c -> c.index("test-index"));
LOG.info("Created Elasticsearch index: test-index");
} catch (IOException e) {
throw new RuntimeException("Failed to create Elasticsearch index", e);
}
}

@AfterClass
public static void teardown() {
if (KAFKA != null) {
KAFKA.stop();
}
if (ELASTICSEARCH != null) {
ELASTICSEARCH.stop();
}
}

@Test
public void testKafkaToElasticsearch() throws Exception {
submitSQLJob(sqlFile, kafkaJar, elasticsearchJar);
waitUntilJobRunning(Duration.ofSeconds(10));

// Produce messages to Kafka
try (org.apache.kafka.clients.producer.KafkaProducer<String, String> producer =
new org.apache.kafka.clients.producer.KafkaProducer<>(getKafkaProducerConfig())) {
producer.send(new org.apache.kafka.clients.producer.ProducerRecord<>("test-topic", "key1", "value1"));
producer.send(new org.apache.kafka.clients.producer.ProducerRecord<>("test-topic", "key2", "value2"));
}

// Query Elasticsearch to verify data is ingested
try (RestClient restClient =
RestClient.builder(new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http")).build()) {
RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);

// Search Elasticsearch for the ingested data
SearchRequest searchRequest =
new SearchRequest.Builder().index("test-index").query(q -> q.matchAll(m -> m)).build();
SearchResponse<Object> searchResponse = client.search(searchRequest, Object.class);

List<Hit<Object>> hits = searchResponse.hits().hits();
LOG.info("Elasticsearch response: {}", hits);
} catch (IOException e) {
LOG.error("Failed to query Elasticsearch", e);
}
}

private java.util.Properties getKafkaProducerConfig() {
java.util.Properties props = new java.util.Properties();
props.put("bootstrap.servers", "localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
}
Loading

0 comments on commit 6806c7c

Please sign in to comment.