Skip to content

Commit

Permalink
feat: commit EStest
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterZh6 committed Sep 29, 2024
1 parent 816fe3e commit 771fdc4
Showing 1 changed file with 26 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.TestUtils;

import org.apache.http.HttpHost;
import org.apache.http.util.EntityUtils;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
Expand All @@ -45,11 +46,8 @@
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Objects;

import static org.junit.Assert.assertTrue;

public class Pulsar2ElasticsearchTest extends FlinkContainerTestEnvJRE8 {

private static final Logger LOG = LoggerFactory.getLogger(Pulsar2ElasticsearchTest.class);
Expand All @@ -73,20 +71,21 @@ public class Pulsar2ElasticsearchTest extends FlinkContainerTestEnvJRE8 {
}

@ClassRule
public static final PulsarContainer PULSAR = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:2.8.0"))
public static final PulsarContainer PULSAR = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:2.8.2"))
.withNetwork(NETWORK)
.withNetworkAliases("pulsar")
.withLogConsumer(new Slf4jLogConsumer(PULSAR_LOG));

@ClassRule
public static final ElasticsearchContainer ELASTICSEARCH =
new ElasticsearchContainer(DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch:7.10.1"))
.withExposedPorts(9200)
.withNetwork(NETWORK)
.withNetworkAliases("elasticsearch")
.withLogConsumer(new Slf4jLogConsumer(ELASTICSEARCH_LOG));

@Before
public void setup() {
public void setup() throws Exception {
waitUntilJobRunning(Duration.ofSeconds(30));
initializePulsarTopic();
initializeElasticsearchIndex();
Expand All @@ -100,19 +99,20 @@ private void initializePulsarTopic() {
if (result.getExitCode() != 0) {
throw new RuntimeException("Init Pulsar topic failed. Exit code:" + result.getExitCode());
}
} catch (IOException | InterruptedException e) {
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void initializeElasticsearchIndex() {
try (RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http")).build()) {
Request request = new Request("PUT", "/test-index");
Response response = restClient.performRequest(request);
LOG.info("Create Elasticsearch index: {}, status: {}", "test-index",
response.getStatusLine().getStatusCode());
// 使用 Elasticsearch 客户端创建索引
try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(
new org.elasticsearch.client.RestClientBuilder.HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200),
"http")))) {
client.indices().create(new CreateIndexRequest("test-index"), RequestOptions.DEFAULT);
LOG.info("Created Elasticsearch index: test-index");
} catch (IOException e) {
throw new RuntimeException(e);
throw new RuntimeException("Failed to create Elasticsearch index", e);
}
}

Expand Down Expand Up @@ -141,14 +141,16 @@ public void testPulsarToElasticsearch() throws Exception {
producer.close();
}

try (RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http")).build()) {
Request request = new Request("GET", "/test-index/_search");
Response response = restClient.performRequest(request);
String responseBody = EntityUtils.toString(response.getEntity());
LOG.info("Elasticsearch response: {}", responseBody);

assertTrue(responseBody.contains("Test message 1"));
assertTrue(responseBody.contains("Test message 2"));
// 查询 Elasticsearch 数据
try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(
new org.elasticsearch.client.RestClientBuilder.HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200),
"http")))) {
SearchRequest searchRequest = new SearchRequest("test-index");
searchRequest.source().query(QueryBuilders.matchAllQuery());
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
LOG.info("Elasticsearch response: {}", searchResponse.getHits().getHits());
} catch (IOException e) {
LOG.error("Failed to query Elasticsearch", e);
}
}
}

0 comments on commit 771fdc4

Please sign in to comment.