Skip to content

Commit

Permalink
usable test
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterZh6 committed Nov 28, 2024
1 parent 912f2e9 commit 494f027
Showing 1 changed file with 73 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.inlong.sort.tests.utils.TestUtils;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
Expand Down Expand Up @@ -139,24 +138,15 @@ private String getCreateStatement(String fileName, Map<String, Object> propertie
}
}

private void initializeElasticsearchIndex() {
try (RestClient restClient =
RestClient.builder(
new HttpHost("localhost", ELASTICSEARCH.getMappedPort(ELASTICSEARCH_DEFAULT_PORT), "http"))
.build()) {
RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
try (ElasticsearchClient client = new ElasticsearchClient(transport)) {
// Create Elasticsearch index
client.indices().create(c -> c.index("test-index"));
} catch (ElasticsearchException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
private void initializeElasticsearchIndex() throws IOException {
RestClient restClient = RestClient.builder(
new HttpHost("localhost", ELASTICSEARCH.getMappedPort(ELASTICSEARCH_DEFAULT_PORT), "http"))
.build();
RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);

LOG.info("Created Elasticsearch index: test-index");
} catch (IOException e) {
throw new RuntimeException("Failed to create Elasticsearch index", e);
}
client.indices().create(c -> c.index("test-index"));
LOG.info("Created Elasticsearch index: test-index");
}

@AfterClass
Expand All @@ -175,81 +165,75 @@ public void testKafkaToElasticsearch() throws Exception {
waitUntilJobRunning(Duration.ofSeconds(10));

// Produce messages to Kafka
try (org.apache.kafka.clients.producer.KafkaProducer<String, String> producer =
new org.apache.kafka.clients.producer.KafkaProducer<>(getKafkaProducerConfig())) {
producer.send(
new org.apache.kafka.clients.producer.ProducerRecord<>("test-topic", "key1", FIRST_KAFKA_MESSAGE));
producer.send(
new org.apache.kafka.clients.producer.ProducerRecord<>("test-topic", "key2", SECOND_KAFKA_MESSAGE));
}
org.apache.kafka.clients.producer.KafkaProducer<String, String> producer =
new org.apache.kafka.clients.producer.KafkaProducer<>(getKafkaProducerConfig());
producer.send(
new org.apache.kafka.clients.producer.ProducerRecord<>("test-topic", "key1", FIRST_KAFKA_MESSAGE));
producer.send(
new org.apache.kafka.clients.producer.ProducerRecord<>("test-topic", "key2", SECOND_KAFKA_MESSAGE));

// Query Elasticsearch to verify data is ingested
try (RestClient restClient = RestClient.builder(
RestClient restClient = RestClient.builder(
new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http"))
.build()) {

RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
try (ElasticsearchClient client = new ElasticsearchClient(transport)) {

List<String> messages = new ArrayList<>();
int maxRetries = 10; // Maximum number of retries (10 seconds)
int retryCount = 0;

while (retryCount < maxRetries) {
co.elastic.clients.elasticsearch.core.SearchRequest searchRequest =
new co.elastic.clients.elasticsearch.core.SearchRequest.Builder()
.index("test-index")
.query(q -> q.matchAll(m -> m))
.build();

co.elastic.clients.elasticsearch.core.SearchResponse<Map> response =
client.search(searchRequest, Map.class);

// Extract `message` fields using Elasticsearch Java API
messages = response.hits().hits().stream()
.map(hit -> {
@SuppressWarnings("unchecked")
Map<String, Object> source = hit.source();
if (source != null && source.containsKey("message")) {
return (String) source.get("message");
}
return null;
})
.filter(Objects::nonNull) // Remove null values
.collect(Collectors.toList());

if (!messages.isEmpty()) {
// Stop polling if data is found
break;
}

// Wait for 1 second before retrying
Thread.sleep(1000);
retryCount++;
}

if (messages.isEmpty()) {
throw new AssertionError("Elasticsearch validation failed: No messages found after polling.");
}

LOG.info("Extracted messages from Elasticsearch: {}", messages);

// Create expected messages list
List<String> expectedMessages = new ArrayList<>();
expectedMessages.add(FIRST_EXPECTED_MESSAGE);
expectedMessages.add(SECOND_EXPECTED_MESSAGE);

// Validate messages against the expected messages
if (new HashSet<>(messages).equals(new HashSet<>(expectedMessages))) {
LOG.info("Elasticsearch contains all expected messages: {}", expectedMessages);
} else {
throw new AssertionError(
String.format("Elasticsearch validation failed. Expected: %s, Found: %s", expectedMessages,
messages));
}
.build();
RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);

List<String> messages = new ArrayList<>();
int maxRetries = 10; // Maximum number of retries (10 seconds)
int retryCount = 0;

while (retryCount < maxRetries) {
co.elastic.clients.elasticsearch.core.SearchRequest searchRequest =
new co.elastic.clients.elasticsearch.core.SearchRequest.Builder()
.index("test-index")
.query(q -> q.matchAll(m -> m))
.build();

co.elastic.clients.elasticsearch.core.SearchResponse<Map> response =
client.search(searchRequest, Map.class);

// Extract `message` fields using Elasticsearch Java API
messages = response.hits().hits().stream()
.map(hit -> {
@SuppressWarnings("unchecked")
Map<String, Object> source = hit.source();
if (source != null && source.containsKey("message")) {
return (String) source.get("message");
}
return null;
})
.filter(Objects::nonNull) // Remove null values
.collect(Collectors.toList());

if (!messages.isEmpty()) {
// Stop polling if data is found
break;
}
} catch (IOException e) {
LOG.error("Failed to query Elasticsearch", e);

// Wait for 1 second before retrying
Thread.sleep(1000);
retryCount++;
}

if (messages.isEmpty()) {
throw new AssertionError("Elasticsearch validation failed: No messages found after polling.");
}

LOG.info("Extracted messages from Elasticsearch: {}", messages);

// Create expected messages list
List<String> expectedMessages = new ArrayList<>();
expectedMessages.add(FIRST_EXPECTED_MESSAGE);
expectedMessages.add(SECOND_EXPECTED_MESSAGE);

// Validate messages against the expected messages
if (new HashSet<>(messages).equals(new HashSet<>(expectedMessages))) {
LOG.info("Elasticsearch contains all expected messages: {}", expectedMessages);
} else {
throw new AssertionError(
String.format("Elasticsearch validation failed. Expected: %s, Found: %s", expectedMessages,
messages));
}
}

Expand Down

0 comments on commit 494f027

Please sign in to comment.