diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java index 40eaa0f0f5..4b1e005588 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java @@ -22,7 +22,6 @@ import org.apache.inlong.sort.tests.utils.TestUtils; import co.elastic.clients.elasticsearch.ElasticsearchClient; -import co.elastic.clients.elasticsearch._types.ElasticsearchException; import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.transport.rest_client.RestClientTransport; import org.apache.http.HttpHost; @@ -139,24 +138,15 @@ private String getCreateStatement(String fileName, Map propertie } } - private void initializeElasticsearchIndex() { - try (RestClient restClient = - RestClient.builder( - new HttpHost("localhost", ELASTICSEARCH.getMappedPort(ELASTICSEARCH_DEFAULT_PORT), "http")) - .build()) { - RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); - try (ElasticsearchClient client = new ElasticsearchClient(transport)) { - // Create Elasticsearch index - client.indices().create(c -> c.index("test-index")); - } catch (ElasticsearchException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + private void initializeElasticsearchIndex() throws IOException { + RestClient restClient = RestClient.builder( + new HttpHost("localhost", ELASTICSEARCH.getMappedPort(ELASTICSEARCH_DEFAULT_PORT), "http")) + .build(); + RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); + ElasticsearchClient client = new ElasticsearchClient(transport); - LOG.info("Created Elasticsearch index: test-index"); - } catch (IOException e) { - throw new RuntimeException("Failed to create Elasticsearch index", e); - } + client.indices().create(c -> c.index("test-index")); + LOG.info("Created Elasticsearch index: test-index"); } @AfterClass @@ -175,81 +165,75 @@ public void testKafkaToElasticsearch() throws Exception { waitUntilJobRunning(Duration.ofSeconds(10)); // Produce messages to Kafka - try (org.apache.kafka.clients.producer.KafkaProducer producer = - new org.apache.kafka.clients.producer.KafkaProducer<>(getKafkaProducerConfig())) { - producer.send( - new org.apache.kafka.clients.producer.ProducerRecord<>("test-topic", "key1", FIRST_KAFKA_MESSAGE)); - producer.send( - new org.apache.kafka.clients.producer.ProducerRecord<>("test-topic", "key2", SECOND_KAFKA_MESSAGE)); - } + org.apache.kafka.clients.producer.KafkaProducer producer = + new org.apache.kafka.clients.producer.KafkaProducer<>(getKafkaProducerConfig()); + producer.send( + new org.apache.kafka.clients.producer.ProducerRecord<>("test-topic", "key1", FIRST_KAFKA_MESSAGE)); + producer.send( + new org.apache.kafka.clients.producer.ProducerRecord<>("test-topic", "key2", SECOND_KAFKA_MESSAGE)); // Query Elasticsearch to verify data is ingested - try (RestClient restClient = RestClient.builder( + RestClient restClient = RestClient.builder( new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http")) - .build()) { - - RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); - try (ElasticsearchClient client = new ElasticsearchClient(transport)) { - - List messages = new ArrayList<>(); - int maxRetries = 10; // Maximum number of retries (10 seconds) - int retryCount = 0; - - while (retryCount < maxRetries) { - co.elastic.clients.elasticsearch.core.SearchRequest searchRequest = - new co.elastic.clients.elasticsearch.core.SearchRequest.Builder() - .index("test-index") - .query(q -> q.matchAll(m -> m)) - .build(); - - co.elastic.clients.elasticsearch.core.SearchResponse response = - client.search(searchRequest, Map.class); - - // Extract `message` fields using Elasticsearch Java API - messages = response.hits().hits().stream() - .map(hit -> { - @SuppressWarnings("unchecked") - Map source = hit.source(); - if (source != null && source.containsKey("message")) { - return (String) source.get("message"); - } - return null; - }) - .filter(Objects::nonNull) // Remove null values - .collect(Collectors.toList()); - - if (!messages.isEmpty()) { - // Stop polling if data is found - break; - } - - // Wait for 1 second before retrying - Thread.sleep(1000); - retryCount++; - } - - if (messages.isEmpty()) { - throw new AssertionError("Elasticsearch validation failed: No messages found after polling."); - } - - LOG.info("Extracted messages from Elasticsearch: {}", messages); - - // Create expected messages list - List expectedMessages = new ArrayList<>(); - expectedMessages.add(FIRST_EXPECTED_MESSAGE); - expectedMessages.add(SECOND_EXPECTED_MESSAGE); - - // Validate messages against the expected messages - if (new HashSet<>(messages).equals(new HashSet<>(expectedMessages))) { - LOG.info("Elasticsearch contains all expected messages: {}", expectedMessages); - } else { - throw new AssertionError( - String.format("Elasticsearch validation failed. Expected: %s, Found: %s", expectedMessages, - messages)); - } + .build(); + RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); + ElasticsearchClient client = new ElasticsearchClient(transport); + + List messages = new ArrayList<>(); + int maxRetries = 10; // Maximum number of retries (10 seconds) + int retryCount = 0; + + while (retryCount < maxRetries) { + co.elastic.clients.elasticsearch.core.SearchRequest searchRequest = + new co.elastic.clients.elasticsearch.core.SearchRequest.Builder() + .index("test-index") + .query(q -> q.matchAll(m -> m)) + .build(); + + co.elastic.clients.elasticsearch.core.SearchResponse response = + client.search(searchRequest, Map.class); + + // Extract `message` fields using Elasticsearch Java API + messages = response.hits().hits().stream() + .map(hit -> { + @SuppressWarnings("unchecked") + Map source = hit.source(); + if (source != null && source.containsKey("message")) { + return (String) source.get("message"); + } + return null; + }) + .filter(Objects::nonNull) // Remove null values + .collect(Collectors.toList()); + + if (!messages.isEmpty()) { + // Stop polling if data is found + break; } - } catch (IOException e) { - LOG.error("Failed to query Elasticsearch", e); + + // Wait for 1 second before retrying + Thread.sleep(1000); + retryCount++; + } + + if (messages.isEmpty()) { + throw new AssertionError("Elasticsearch validation failed: No messages found after polling."); + } + + LOG.info("Extracted messages from Elasticsearch: {}", messages); + + // Create expected messages list + List expectedMessages = new ArrayList<>(); + expectedMessages.add(FIRST_EXPECTED_MESSAGE); + expectedMessages.add(SECOND_EXPECTED_MESSAGE); + + // Validate messages against the expected messages + if (new HashSet<>(messages).equals(new HashSet<>(expectedMessages))) { + LOG.info("Elasticsearch contains all expected messages: {}", expectedMessages); + } else { + throw new AssertionError( + String.format("Elasticsearch validation failed. Expected: %s, Found: %s", expectedMessages, + messages)); } }