Skip to content

Commit

Permalink
fix connection refused
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterZh6 committed Nov 16, 2024
1 parent 8ca0805 commit 888f7b8
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -66,6 +67,9 @@ public class Kafka2Elasticsearch7Test extends FlinkContainerTestEnvJRE8 {
private static final String FIRST_KAFKA_MESSAGE = "{\"message\":\"value1\"}";
private static final String SECOND_KAFKA_MESSAGE = "{\"message\":\"value2\"}";

private static final String FIRST_ES_MESSAGE = "{\"message\":\"value3\"}";
private static final String SECOND_ES_MESSAGE = "{\"message\":\"value4\"}";

private static final String sqlFile;

static {
Expand Down Expand Up @@ -99,10 +103,6 @@ public void setup() throws IOException {
waitUntilJobRunning(Duration.ofSeconds(30));
initializeKafkaTopic("test-topic");
initializeElasticsearchIndex();
String sqlContent = new String(Files.readAllBytes(Paths.get(sqlFile)), StandardCharsets.UTF_8);
String elasticsearchHost = "http://localhost:" + ELASTICSEARCH.getMappedPort(9200);
sqlContent = sqlContent.replace("http://localhost:9200", elasticsearchHost);
Files.write(Paths.get(sqlFile), sqlContent.getBytes(StandardCharsets.UTF_8));
}

private void initializeKafkaTopic(String topic) {
Expand Down Expand Up @@ -139,6 +139,7 @@ private String getCreateStatement(String fileName, Map<String, Object> propertie
}

private void initializeElasticsearchIndex() {
log.info(">>>>>>>>>>>>>>>>>>>>> initializeElasticsearchIndex");
try (RestClient restClient =
RestClient.builder(new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http")).build()) {
RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
Expand Down Expand Up @@ -182,6 +183,13 @@ public void testKafkaToElasticsearch() throws Exception {
RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);

Map<String, String> key3Data = Collections.singletonMap("message", FIRST_ES_MESSAGE);
Map<String, String> key4Data = Collections.singletonMap("message", SECOND_ES_MESSAGE);

client.index(i -> i.index("test-index").id("key3").document(key3Data));
client.index(i -> i.index("test-index").id("key4").document(key4Data));
LOG.info("Inserted key3 and key4 into Elasticsearch.");

// Search Elasticsearch for the ingested data
SearchRequest searchRequest =
new SearchRequest.Builder().index("test-index").query(q -> q.matchAll(m -> m)).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ CREATE TABLE kafka_source (
);


CREATE TABLE file_sink (
CREATE TABLE elasticsearch_sink (
`message` STRING
) WITH (
'connector' = 'filesystem',
'path' = 'file:///tmp/output_file',
'connector' = 'elasticsearch7-inlong',
'hosts' = 'http://elasticsearch:9200',
'index' = 'test-index',
'format' = 'json'
);

INSERT INTO file_sink

INSERT INTO elasticsearch_sink
SELECT * FROM kafka_source;

0 comments on commit 888f7b8

Please sign in to comment.