diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java index a55e198b92..d92c88d6c6 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java @@ -49,6 +49,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -66,6 +67,9 @@ public class Kafka2Elasticsearch7Test extends FlinkContainerTestEnvJRE8 { private static final String FIRST_KAFKA_MESSAGE = "{\"message\":\"value1\"}"; private static final String SECOND_KAFKA_MESSAGE = "{\"message\":\"value2\"}"; + private static final String FIRST_ES_MESSAGE = "{\"message\":\"value3\"}"; + private static final String SECOND_ES_MESSAGE = "{\"message\":\"value4\"}"; + private static final String sqlFile; static { @@ -99,10 +103,6 @@ public void setup() throws IOException { waitUntilJobRunning(Duration.ofSeconds(30)); initializeKafkaTopic("test-topic"); initializeElasticsearchIndex(); - String sqlContent = new String(Files.readAllBytes(Paths.get(sqlFile)), StandardCharsets.UTF_8); - String elasticsearchHost = "http://localhost:" + ELASTICSEARCH.getMappedPort(9200); - sqlContent = sqlContent.replace("http://localhost:9200", elasticsearchHost); - Files.write(Paths.get(sqlFile), sqlContent.getBytes(StandardCharsets.UTF_8)); } private void initializeKafkaTopic(String topic) { @@ -139,6 +139,7 @@ private String getCreateStatement(String fileName, Map propertie } private void initializeElasticsearchIndex() { + log.info(">>>>>>>>>>>>>>>>>>>>> initializeElasticsearchIndex"); try (RestClient restClient = RestClient.builder(new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http")).build()) { RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); @@ -182,6 +183,13 @@ public void testKafkaToElasticsearch() throws Exception { RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); ElasticsearchClient client = new ElasticsearchClient(transport); + Map key3Data = Collections.singletonMap("message", FIRST_ES_MESSAGE); + Map key4Data = Collections.singletonMap("message", SECOND_ES_MESSAGE); + + client.index(i -> i.index("test-index").id("key3").document(key3Data)); + client.index(i -> i.index("test-index").id("key4").document(key4Data)); + LOG.info("Inserted key3 and key4 into Elasticsearch."); + // Search Elasticsearch for the ingested data SearchRequest searchRequest = new SearchRequest.Builder().index("test-index").query(q -> q.matchAll(m -> m)).build(); diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/kafka_to_elasticsearch.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/kafka_to_elasticsearch.sql index e19df21554..77cdeb8cae 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/kafka_to_elasticsearch.sql +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/kafka_to_elasticsearch.sql @@ -10,13 +10,15 @@ CREATE TABLE kafka_source ( ); -CREATE TABLE file_sink ( +CREATE TABLE elasticsearch_sink ( `message` STRING ) WITH ( - 'connector' = 'filesystem', - 'path' = 'file:///tmp/output_file', + 'connector' = 'elasticsearch7-inlong', + 'hosts' = 'http://elasticsearch:9200', + 'index' = 'test-index', 'format' = 'json' ); -INSERT INTO file_sink + +INSERT INTO elasticsearch_sink SELECT * FROM kafka_source;