From 17d539df9e30c708c68a5b5bf153f29515dc9171 Mon Sep 17 00:00:00 2001 From: Didier Alberto Tabares Higuita Date: Thu, 21 May 2020 19:21:19 -0500 Subject: [PATCH] Migrates to rest high level client of elasticsearch and adds kafka consumer factory configuration --- docker-compose-snapshots-mysql-binlog.yml | 5 +- .../OrderHistoryViewBackendConfiguration.java | 6 +- .../build.gradle | 5 +- .../OrderHistoryTestSearchConfiguration.java | 50 +++++++++----- .../SupportingIndicesConfiguration.java | 39 +++++++++++ .../backend/TextViewService.java | 68 +++++++++++-------- .../src/main/resources/application.properties | 6 +- 7 files changed, 126 insertions(+), 53 deletions(-) create mode 100644 order-history-text-search-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistorytextsearch/backend/SupportingIndicesConfiguration.java diff --git a/docker-compose-snapshots-mysql-binlog.yml b/docker-compose-snapshots-mysql-binlog.yml index 7128db62..932731ab 100755 --- a/docker-compose-snapshots-mysql-binlog.yml +++ b/docker-compose-snapshots-mysql-binlog.yml @@ -12,10 +12,11 @@ services: EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: kafka:29092 EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181 ELASTICSEARCH_HOST: elasticsearch - ELASTICSEARCH_PORT: 9300 + ELASTICSEARCH_PORT: 9200 + ELASTICSEARCH_SCHEME: http elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:5.6.3 + image: docker.elastic.co/elasticsearch/elasticsearch:7.7.0 environment: - http.host=0.0.0.0 - network.publish_host=0.0.0.0 diff --git a/order-history-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistory/backend/OrderHistoryViewBackendConfiguration.java b/order-history-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistory/backend/OrderHistoryViewBackendConfiguration.java index 7ef78c0e..e80a8708 100644 --- a/order-history-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistory/backend/OrderHistoryViewBackendConfiguration.java +++ b/order-history-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistory/backend/OrderHistoryViewBackendConfiguration.java @@ -1,9 +1,10 @@ package io.eventuate.examples.tram.ordersandcustomers.orderhistory.backend; -import io.eventuate.tram.spring.consumer.common.TramNoopDuplicateMessageDetectorConfiguration; -import io.eventuate.tram.spring.consumer.kafka.EventuateTramKafkaMessageConsumerConfiguration; +import io.eventuate.messaging.kafka.spring.consumer.KafkaConsumerFactoryConfiguration; import io.eventuate.tram.events.subscriber.DomainEventDispatcher; import io.eventuate.tram.events.subscriber.DomainEventDispatcherFactory; +import io.eventuate.tram.spring.consumer.common.TramNoopDuplicateMessageDetectorConfiguration; +import io.eventuate.tram.spring.consumer.kafka.EventuateTramKafkaMessageConsumerConfiguration; import io.eventuate.tram.spring.events.subscriber.TramEventSubscriberConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -11,6 +12,7 @@ @Configuration @Import({OrderHistoryViewMongoConfiguration.class, + KafkaConsumerFactoryConfiguration.class, EventuateTramKafkaMessageConsumerConfiguration.class, TramNoopDuplicateMessageDetectorConfiguration.class, TramEventSubscriberConfiguration.class}) diff --git a/order-history-text-search-backend/build.gradle b/order-history-text-search-backend/build.gradle index 91ecfae0..b562c548 100755 --- a/order-history-text-search-backend/build.gradle +++ b/order-history-text-search-backend/build.gradle @@ -2,6 +2,7 @@ apply plugin: "io.spring.dependency-management" dependencies { compile project (":order-history-text-search-common") - compile "org.elasticsearch.client:transport:6.8.7" - compile "io.eventuate.tram.core:eventuate-tram-spring-consumer-kafka:$eventuateTramVersion" + compile "org.elasticsearch.client:elasticsearch-rest-high-level-client:7.7.0" + compile "com.fasterxml.jackson.core:jackson-databind:2.2.3" + compile "io.eventuate.tram.elasticsearch:eventuate-tram-spring-jdbc-kafka-elasticsearch:$eventuateTramVersion" } diff --git a/order-history-text-search-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistorytextsearch/backend/OrderHistoryTestSearchConfiguration.java b/order-history-text-search-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistorytextsearch/backend/OrderHistoryTestSearchConfiguration.java index dcd331e8..90581c0f 100644 --- a/order-history-text-search-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistorytextsearch/backend/OrderHistoryTestSearchConfiguration.java +++ b/order-history-text-search-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistorytextsearch/backend/OrderHistoryTestSearchConfiguration.java @@ -2,24 +2,32 @@ import io.eventuate.examples.tram.ordersandcustomers.CustomerTextView; import io.eventuate.examples.tram.ordersandcustomers.OrderTextView; -import io.eventuate.tram.consumer.common.NoopDuplicateMessageDetector; -import io.eventuate.tram.spring.consumer.kafka.EventuateTramKafkaMessageConsumerConfiguration; +import io.eventuate.tram.consumer.elasticsearch.ElasticsearchIndexDuplicateMessageDetector; +import io.eventuate.tram.consumer.kafka.elasticsearch.ElasticsearchKafkaConsumerFactorySpringConfiguration; +import io.eventuate.tram.consumer.kafka.elasticsearch.EventuateKafkaConsumerElasticsearchSpringConfigurationPropertiesConfiguration; import io.eventuate.tram.events.subscriber.DomainEventDispatcher; import io.eventuate.tram.events.subscriber.DomainEventDispatcherFactory; +import io.eventuate.tram.spring.consumer.elasticsearch.ElasticsearchConsumerSpringConfigurationPropertiesConfiguration; +import io.eventuate.tram.spring.consumer.kafka.EventuateTramKafkaMessageConsumerConfiguration; import io.eventuate.tram.spring.events.subscriber.TramEventSubscriberConfiguration; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.transport.client.PreBuiltTransportClient; +import org.apache.http.HttpHost; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; -import java.net.InetAddress; - @Configuration -@Import({EventuateTramKafkaMessageConsumerConfiguration.class, TramEventSubscriberConfiguration.class, NoopDuplicateMessageDetector.class}) +@Import({ + ElasticsearchConsumerSpringConfigurationPropertiesConfiguration.class, + EventuateKafkaConsumerElasticsearchSpringConfigurationPropertiesConfiguration.class, + ElasticsearchKafkaConsumerFactorySpringConfiguration.class, + EventuateTramKafkaMessageConsumerConfiguration.class, + TramEventSubscriberConfiguration.class, + ElasticsearchIndexDuplicateMessageDetector.class, + SupportingIndicesConfiguration.class +}) public class OrderHistoryTestSearchConfiguration { @Value("${elasticsearch.host}") @@ -28,6 +36,9 @@ public class OrderHistoryTestSearchConfiguration { @Value("${elasticsearch.port}") private int elasticSearchPort; + @Value("${elasticsearch.scheme}") + private String elasticSearchScheme; + @Bean public CustomerSnapshotEventConsumer customerSnapshotEventConsumer() { return new CustomerSnapshotEventConsumer(); @@ -49,18 +60,25 @@ public DomainEventDispatcher orderDomainEventDispatcher(OrderSnapshotEventConsum } @Bean - public TransportClient elasticSearchClient() throws Exception { - return new PreBuiltTransportClient(Settings.builder().put("client.transport.ignore_cluster_name", true).build()) - .addTransportAddress(new TransportAddress(InetAddress.getByName(elasticSearchHost), elasticSearchPort)); + public RestHighLevelClient highLevelElasticSearchClient() { + return new RestHighLevelClient( + RestClient.builder( + new HttpHost( + elasticSearchHost, + elasticSearchPort, + elasticSearchScheme + ) + ) + ); } @Bean("customerTextViewService") - public TextViewService customerTextViewService(TransportClient transportClient) { - return new TextViewService<>(transportClient, CustomerTextView.class, CustomerTextView.TYPE, CustomerTextView.INDEX); + public TextViewService customerTextViewService(RestHighLevelClient restHighLevelClient) { + return new TextViewService<>(restHighLevelClient, CustomerTextView.class, CustomerTextView.TYPE, CustomerTextView.INDEX); } @Bean("orderTextViewService") - public TextViewService orderTextViewService(TransportClient transportClient) { - return new TextViewService<>(transportClient, OrderTextView.class, OrderTextView.TYPE, OrderTextView.INDEX); + public TextViewService orderTextViewService(RestHighLevelClient restHighLevelClient) { + return new TextViewService<>(restHighLevelClient, OrderTextView.class, OrderTextView.TYPE, OrderTextView.INDEX); } } diff --git a/order-history-text-search-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistorytextsearch/backend/SupportingIndicesConfiguration.java b/order-history-text-search-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistorytextsearch/backend/SupportingIndicesConfiguration.java new file mode 100644 index 00000000..0f6339f2 --- /dev/null +++ b/order-history-text-search-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistorytextsearch/backend/SupportingIndicesConfiguration.java @@ -0,0 +1,39 @@ +package io.eventuate.examples.tram.ordersandcustomers.orderhistorytextsearch.backend; + +import io.eventuate.tram.consumer.elasticsearch.ElasticsearchConsumerConfigurationProperties; +import io.eventuate.tram.consumer.kafka.elasticsearch.ElasticsearchOffsetStorageConfigurationProperties; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.EventListener; + +import java.io.IOException; + +@Configuration +public class SupportingIndicesConfiguration { + + @Autowired + private RestHighLevelClient elasticsearchClient; + + @Autowired + private ElasticsearchConsumerConfigurationProperties elasticsearchConsumerSpringConfigurationProperties; + + @Autowired + private ElasticsearchOffsetStorageConfigurationProperties eventuateKafkaConsumerElasticsearchSpringConfigurationProperties; + + @EventListener(ApplicationReadyEvent.class) + void createSupportingIndices() throws IOException { + createIndexIfNotExists(elasticsearchConsumerSpringConfigurationProperties.getReceivedMessagesIndexName()); + createIndexIfNotExists(eventuateKafkaConsumerElasticsearchSpringConfigurationProperties.getOffsetStorageIndexName()); + } + + private void createIndexIfNotExists(String offsetStorageIndexName) throws IOException { + if (!elasticsearchClient.indices().exists(new GetIndexRequest(offsetStorageIndexName), RequestOptions.DEFAULT)) { + elasticsearchClient.indices().create(new CreateIndexRequest(offsetStorageIndexName), RequestOptions.DEFAULT); + } + } +} diff --git a/order-history-text-search-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistorytextsearch/backend/TextViewService.java b/order-history-text-search-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistorytextsearch/backend/TextViewService.java index d065f127..7c44a1be 100644 --- a/order-history-text-search-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistorytextsearch/backend/TextViewService.java +++ b/order-history-text-search-backend/src/main/java/io/eventuate/examples/tram/ordersandcustomers/orderhistorytextsearch/backend/TextViewService.java @@ -1,14 +1,18 @@ package io.eventuate.examples.tram.ordersandcustomers.orderhistorytextsearch.backend; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.eventuate.examples.tram.ordersandcustomers.TextView; -import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; import java.util.ArrayList; @@ -18,56 +22,60 @@ public class TextViewService { private ObjectMapper objectMapper = new ObjectMapper(); - private TransportClient transportClient; + private RestHighLevelClient restHighLevelClient; private Class textViewClass; private String type; private String index; - public TextViewService(TransportClient transportClient, Class textViewClass, String type, String index) { - this.transportClient = transportClient; + public TextViewService(RestHighLevelClient restHighLevelClient, Class textViewClass, String type, String index) { + this.restHighLevelClient = restHighLevelClient; this.textViewClass = textViewClass; this.type = type; this.index = index; } public List search(String value) { + try { + if (!restHighLevelClient.indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT)) { + return Collections.emptyList(); + } - if (!transportClient.admin().indices().prepareExists(index).execute().actionGet().isExists()) { - return Collections.emptyList(); - } - - SearchResponse response = transportClient.prepareSearch(index) - .setTypes(type) - .setQuery(QueryBuilders.termQuery("_all", value)) - .get(); + SearchResponse response = restHighLevelClient.search(new SearchRequest() + .indices(index) + .types(type) + .source(new SearchSourceBuilder().query(QueryBuilders.termQuery("_all", value))), + RequestOptions.DEFAULT); - List result = new ArrayList<>(); + List result = new ArrayList<>(); - for (SearchHit searchHit : response.getHits()) { - try { + for (SearchHit searchHit : response.getHits()) { result.add(objectMapper.readValue(searchHit.getSourceAsString(), textViewClass)); } - catch (IOException e) { - throw new RuntimeException(e); - } + return result; + } catch (IOException e) { + throw new RuntimeException(e); } - - return result; } public void index(TextView textView) { try { - IndexResponse ir = transportClient - .prepareIndex(index, type, textView.getId()) - .setSource(objectMapper.writeValueAsString(textView), XContentType.JSON) - .get(); - } - catch (JsonProcessingException e) { - throw new RuntimeException(e); + restHighLevelClient.index( + new IndexRequest() + .index(index) + .type(type) + .id(textView.getId()) + .source(objectMapper.writeValueAsString(textView), XContentType.JSON), + RequestOptions.DEFAULT); + } catch (IOException e) { + throw new RuntimeException(e); } } public void remove(String id) { - transportClient.prepareDelete(index, type, id).get(); + try { + restHighLevelClient.delete(new DeleteRequest().index(index).type(type).id(id), RequestOptions.DEFAULT); + } catch (IOException e) { + throw new RuntimeException(e); + } } } diff --git a/order-history-text-search-service/src/main/resources/application.properties b/order-history-text-search-service/src/main/resources/application.properties index e4526dd2..a16bfdcc 100644 --- a/order-history-text-search-service/src/main/resources/application.properties +++ b/order-history-text-search-service/src/main/resources/application.properties @@ -1,4 +1,8 @@ eventuatelocal.kafka.bootstrap.servers=localhost:9092 eventuatelocal.zookeeper.connection.string=localhost:2181 + elasticsearch.host=localhost -elasticsearch.port=9300 \ No newline at end of file +elasticsearch.port=9200 +elasticsearch.scheme=http + +logging.level.io.eventuate.tram.consumer.kafka.elasticsearch=DEBUG \ No newline at end of file