diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7e22753 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +.gradle +build/ +*.idea/ +*.iml +*.log +out +.classpath +.project +.settings +bin diff --git a/build-and-test-all.sh b/build-and-test-all.sh new file mode 100644 index 0000000..ff813fc --- /dev/null +++ b/build-and-test-all.sh @@ -0,0 +1,15 @@ +#! /bin/bash + +set -e + +. ./set-env.sh + +docker-compose down -v + +docker-compose up --build -d + +sleep 10 + +./gradlew $GRADLE_OPTIONS cleanTest build $GRADLE_TASK_OPTIONS + +docker-compose down -v \ No newline at end of file diff --git a/build.gradle b/build.gradle new file mode 100644 index 0000000..be3a78e --- /dev/null +++ b/build.gradle @@ -0,0 +1,107 @@ +buildscript { + repositories { + jcenter() + //maven { url deployUrl } + maven { url "https://dl.bintray.com/eventuateio-oss/eventuate-maven-release" } + } + dependencies { + classpath 'com.jfrog.bintray.gradle:gradle-bintray-plugin:1.7.3' + classpath 'io.eventuate.plugins.gradle:eventuate-plugins-gradle-versions:0.1.0.RELEASE' + } +} + +allprojects { + group = "io.eventuate.tram.elasticsearch" +} + +apply plugin: "io.eventuate.plugins.gradle.UpgradeVersions" +apply plugin: WaitForCdcPlugin + +allprojects { + apply plugin: 'java' + apply plugin: 'maven' + apply plugin: 'com.jfrog.bintray' + + sourceCompatibility = 1.8 + targetCompatibility = 1.8 + + bintray { + publish = true + user = System.getenv('BINTRAY_USER') + key = System.getenv('BINTRAY_KEY') + configurations = ['archives'] + pkg { + repo = "eventuate-maven-$bintrayRepoType" + name = 'eventuate-tram' + licenses = ['Apache-2.0'] + vcsUrl = 'https://github.com/eventuate-tram/eventuate-tram-elasticsearch' + } + } + + + repositories { + mavenCentral() + jcenter() + maven { url "https://repo.spring.io/milestone" } + eventuateMavenRepoUrl.split(',').each { repoUrl -> maven { url repoUrl } } + } + + dependencies { + compile "javax.annotation:javax.annotation-api:1.3.2" + testCompile "junit:junit:4.12" + testCompile "org.mockito:mockito-core:2.23.4" + } + + configurations { + deployerJars + } + + dependencies { + deployerJars 'org.springframework.build:aws-maven:5.0.0.RELEASE' + } + + + uploadArchives { + repositories { + mavenDeployer { + configuration = configurations.deployerJars + repository(url: deployUrl) { + authentication(userName: System.getenv('S3_REPO_AWS_ACCESS_KEY'), password: System.getenv('S3_REPO_AWS_SECRET_ACCESS_KEY')) + } + pom.project { + licenses { + license { + name 'The Apache Software License, Version 2.0' + url 'http://www.apache.org/licenses/LICENSE-2.0.txt' + distribution 'repo' + } + } + } + } + } + } +} + + +gradle.projectsEvaluated { + task aggregateJavaDocs(type: Javadoc) { + description = 'Aggregated Javadoc API documentation of all subprojects.' + group = JavaBasePlugin.DOCUMENTATION_GROUP + dependsOn subprojects.findAll { subproject -> subproject.plugins.hasPlugin(PublicModulePlugin) }.javadoc + + source subprojects.findAll { subproject -> subproject.plugins.hasPlugin(PublicModulePlugin) }.javadoc.source + destinationDir file("$buildDir/docs/javadoc") + classpath = files(subprojects.findAll { subproject -> subproject.plugins.hasPlugin(PublicModulePlugin) }.javadoc.classpath) + } + + task("aggregateJavaDocsJar", type: org.gradle.api.tasks.bundling.Jar, dependsOn: project.aggregateJavaDocs) { + classifier = 'javadoc' + from 'build/docs/javadoc' + } + + artifacts { + archives project.aggregateJavaDocsJar + } + +} + diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle new file mode 100644 index 0000000..26e0dea --- /dev/null +++ b/buildSrc/build.gradle @@ -0,0 +1,11 @@ +test.enabled=false + +repositories { + mavenCentral() + jcenter() +} + +dependencies { + compile "com.jayway.restassured:rest-assured:2.9.0" +} + diff --git a/buildSrc/src/main/groovy/PrivateModulePlugin.groovy b/buildSrc/src/main/groovy/PrivateModulePlugin.groovy new file mode 100644 index 0000000..fe03754 --- /dev/null +++ b/buildSrc/src/main/groovy/PrivateModulePlugin.groovy @@ -0,0 +1,18 @@ +import org.gradle.api.Plugin +import org.gradle.api.Project + +class PrivateModulePlugin implements Plugin { + void apply(Project project) { + + project.task("sourcesJar", type: org.gradle.api.tasks.bundling.Jar) { + classifier = 'sources' + from project.sourceSets.main.allSource +// manifest = defaultManifest() + } + + project.artifacts { + archives project.jar + archives project.sourcesJar + } + } +} \ No newline at end of file diff --git a/buildSrc/src/main/groovy/PublicModulePlugin.groovy b/buildSrc/src/main/groovy/PublicModulePlugin.groovy new file mode 100644 index 0000000..4ff77a3 --- /dev/null +++ b/buildSrc/src/main/groovy/PublicModulePlugin.groovy @@ -0,0 +1,27 @@ +import org.gradle.api.Plugin +import org.gradle.api.Project + +class PublicModulePlugin implements Plugin { + void apply(Project project) { + + project.ext.genjavadoc = true + + project.task("javadocJar", type: org.gradle.api.tasks.bundling.Jar, dependsOn: project.javadoc) { + classifier = 'javadoc' + from 'build/docs/javadoc' +// manifest = defaultManifest() + } + + project.task("sourcesJar", type: org.gradle.api.tasks.bundling.Jar) { + classifier = 'sources' + from project.sourceSets.main.allSource +// manifest = defaultManifest() + } + + project.artifacts { + archives project.jar + archives project.javadocJar + archives project.sourcesJar + } + } +} \ No newline at end of file diff --git a/buildSrc/src/main/groovy/WaitForCdc.java b/buildSrc/src/main/groovy/WaitForCdc.java new file mode 100644 index 0000000..d3529f3 --- /dev/null +++ b/buildSrc/src/main/groovy/WaitForCdc.java @@ -0,0 +1,57 @@ +import org.gradle.api.DefaultTask; +import org.gradle.api.tasks.TaskAction; + +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import com.jayway.restassured.RestAssured; + +public class WaitForCdc extends DefaultTask { + + private String getenv(String name, String defaultValue) { + return Optional.ofNullable(System.getenv(name)).orElse(defaultValue); + } + + @TaskAction + public void waitForCdc() { + + String hostName = getenv("DOCKER_HOST_IP", "localhost"); + + getLogger().info("Connected to CDC on hostname={}", hostName); + + long deadline = System.currentTimeMillis() + 1000 * 60; + + while (System.currentTimeMillis() <= deadline) { + + try { + + String detail = RestAssured.given(). + when(). + get(String.format("http://%s:8099/actuator/health", hostName)). + then(). + statusCode(200) + .extract(). + path("details.binlogEntryReaderHealthCheck.details[\"detail-1\"]"); + + if (detail != null && detail.length() > 0 && !detail.endsWith("is not the leader")) { + getLogger().info("CDC is up"); + return; + } + + getLogger().info("CDC is not ready. Detail={}", detail); + } catch (Exception | AssertionError e) { + getLogger().error("Got error connecting to CDC {}", e.getMessage()); + } + + + try { + TimeUnit.SECONDS.sleep(4); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + throw new RuntimeException("CDC failed to start"); + + } +} \ No newline at end of file diff --git a/buildSrc/src/main/groovy/WaitForCdcPlugin.java b/buildSrc/src/main/groovy/WaitForCdcPlugin.java new file mode 100644 index 0000000..70214af --- /dev/null +++ b/buildSrc/src/main/groovy/WaitForCdcPlugin.java @@ -0,0 +1,10 @@ +import org.gradle.api.Plugin; +import org.gradle.api.Project; + +public class WaitForCdcPlugin implements Plugin { + @Override + public void apply(Project project) { + project.getTasks().create("waitForCdc", WaitForCdc.class); + } +} + diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..2b4ac8a --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,28 @@ +version: '3' +services: + zookeeper: + image: confluentinc/cp-zookeeper:5.2.4 + ports: + - 2181:2181 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + kafka: + image: "confluentinc/cp-kafka:5.2.4" + ports: + - 9092:9092 + depends_on: + - zookeeper + environment: + KAFKA_LISTENERS: LC://kafka:29092,LX://kafka:9092 + KAFKA_ADVERTISED_LISTENERS: LC://kafka:29092,LX://${DOCKER_HOST_IP:-localhost}:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LC:PLAINTEXT,LX:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: LC + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 60000 + elasticsearch: + image: docker.elastic.co/elasticsearch/elasticsearch:7.7.0 + environment: + - "discovery.type=single-node" + ports: + - 9200:9200 \ No newline at end of file diff --git a/eventuate-tram-consumer-elasticsearch/build.gradle b/eventuate-tram-consumer-elasticsearch/build.gradle new file mode 100644 index 0000000..b7af1e2 --- /dev/null +++ b/eventuate-tram-consumer-elasticsearch/build.gradle @@ -0,0 +1,9 @@ +apply plugin: PublicModulePlugin + +dependencies { + compile "io.eventuate.tram.core:eventuate-tram-consumer-common:$eventuateTramCoreVersion" + + compile "org.elasticsearch.client:elasticsearch-rest-high-level-client:7.6.2" + compile "com.fasterxml.jackson.core:jackson-databind:2.2.3" + +} \ No newline at end of file diff --git a/eventuate-tram-consumer-elasticsearch/src/main/java/io/eventuate/tram/consumer/elasticsearch/ElasticsearchConsumerConfigurationProperties.java b/eventuate-tram-consumer-elasticsearch/src/main/java/io/eventuate/tram/consumer/elasticsearch/ElasticsearchConsumerConfigurationProperties.java new file mode 100644 index 0000000..d805257 --- /dev/null +++ b/eventuate-tram-consumer-elasticsearch/src/main/java/io/eventuate/tram/consumer/elasticsearch/ElasticsearchConsumerConfigurationProperties.java @@ -0,0 +1,52 @@ +package io.eventuate.tram.consumer.elasticsearch; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class ElasticsearchConsumerConfigurationProperties { + + private static final String DEFAULT_RECEIVED_MESSAGES_INDEX_NAME = "received-messages"; + private static final String DEFAULT_RECEIVED_MESSAGES_TYPE_NAME = "_doc"; + + private Map properties = new HashMap<>(); + + private String receivedMessagesIndexName; + private String receivedMessagesTypeName; + + public String getReceivedMessagesIndexName() { + return Optional.ofNullable(receivedMessagesIndexName).orElse(DEFAULT_RECEIVED_MESSAGES_INDEX_NAME); + } + + public void setReceivedMessagesIndexName(String receivedMessagesIndexName) { + this.receivedMessagesIndexName = receivedMessagesIndexName; + } + + public String getReceivedMessagesTypeName() { + return Optional.ofNullable(receivedMessagesTypeName).orElse(DEFAULT_RECEIVED_MESSAGES_TYPE_NAME); + } + + public void setReceivedMessagesTypeName(String receivedMessagesTypeName) { + this.receivedMessagesTypeName = receivedMessagesTypeName; + } + + + public ElasticsearchConsumerConfigurationProperties() { + } + + public ElasticsearchConsumerConfigurationProperties(Map properties) { + this.properties = properties; + } + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + + public static ElasticsearchConsumerConfigurationProperties empty() { + return new ElasticsearchConsumerConfigurationProperties(); + } +} diff --git a/eventuate-tram-consumer-elasticsearch/src/main/java/io/eventuate/tram/consumer/elasticsearch/ElasticsearchIndexDuplicateMessageDetector.java b/eventuate-tram-consumer-elasticsearch/src/main/java/io/eventuate/tram/consumer/elasticsearch/ElasticsearchIndexDuplicateMessageDetector.java new file mode 100644 index 0000000..cce498b --- /dev/null +++ b/eventuate-tram-consumer-elasticsearch/src/main/java/io/eventuate/tram/consumer/elasticsearch/ElasticsearchIndexDuplicateMessageDetector.java @@ -0,0 +1,76 @@ +package io.eventuate.tram.consumer.elasticsearch; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.eventuate.tram.consumer.common.DuplicateMessageDetector; +import io.eventuate.tram.consumer.common.SubscriberIdAndMessage; + +public class ElasticsearchIndexDuplicateMessageDetector implements DuplicateMessageDetector { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + private final RestHighLevelClient client; + private final ElasticsearchConsumerConfigurationProperties properties; + + public ElasticsearchIndexDuplicateMessageDetector(RestHighLevelClient client, ElasticsearchConsumerConfigurationProperties properties) { + this.client = client; + this.properties = properties; + } + + @Override + public boolean isDuplicate(String consumerId, String messageId) { + try { + String id = id(consumerId, messageId); + String receivedMessagesIndexName = properties.getReceivedMessagesIndexName(); + String receivedMessagesTypeName = properties.getReceivedMessagesTypeName(); + GetRequest request = new GetRequest(receivedMessagesIndexName) + .type(receivedMessagesTypeName) + .id(id) + .routing(messageId); + GetResponse response = client.get(request, RequestOptions.DEFAULT); + return response.isExists(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void doWithMessage(SubscriberIdAndMessage subscriberIdAndMessage, Runnable callback) { + String subscriberId = subscriberIdAndMessage.getSubscriberId(); + String messageId = subscriberIdAndMessage.getMessage().getId(); + if (isDuplicate(subscriberId, messageId)) { + logger.info("Message duplicate: consumerId = {}, messageId = {}", subscriberId, messageId); + } else { + callback.run(); + saveReceivedMessage(subscriberId, messageId); + } + } + + private void saveReceivedMessage(String subscriberId, String messageId) { + try { + String id = id(subscriberId, messageId); + String receivedMessagesIndexName = properties.getReceivedMessagesIndexName(); + String receivedMessagesTypeName = properties.getReceivedMessagesTypeName(); + IndexRequest indexRequest = new IndexRequest(receivedMessagesIndexName) + .type(receivedMessagesTypeName) + .id(id) + .source(Collections.emptyMap()) + .routing(messageId); + client.index(indexRequest, RequestOptions.DEFAULT); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static String id(String consumerId, String messageId) { + return String.format("%s-%s", consumerId, messageId); + } +} diff --git a/eventuate-tram-micronaut-consumer-elasticsearch/build.gradle b/eventuate-tram-micronaut-consumer-elasticsearch/build.gradle new file mode 100644 index 0000000..906d70e --- /dev/null +++ b/eventuate-tram-micronaut-consumer-elasticsearch/build.gradle @@ -0,0 +1,30 @@ +plugins { + id "io.spring.dependency-management" version "1.0.6.RELEASE" +} + +apply plugin: PublicModulePlugin + + + + +dependencyManagement { + imports { + mavenBom "io.micronaut:micronaut-bom:$micronautVersion" + } +} + +dependencies { + compile project(":eventuate-tram-consumer-elasticsearch") + + compile "io.eventuate.tram.core:eventuate-tram-consumer-common:$eventuateTramCoreVersion" + compile "io.eventuate.tram.core:eventuate-tram-micronaut-consumer-common:$eventuateTramCoreVersion" + compile "io.eventuate.tram.core:eventuate-tram-micronaut-messaging:$eventuateTramCoreVersion" + compile "io.eventuate.messaging.kafka:eventuate-messaging-kafka-micronaut-consumer:$eventuateMessagingKafkaVersion" + + annotationProcessor "io.micronaut:micronaut-inject-java" + annotationProcessor "io.micronaut:micronaut-validation" + annotationProcessor "io.micronaut.configuration:micronaut-openapi" + compile "io.micronaut:micronaut-inject" + compile "io.micronaut:micronaut-validation" + compile "io.micronaut:micronaut-runtime" +} diff --git a/eventuate-tram-micronaut-consumer-elasticsearch/src/main/java/io/eventuate/tram/micronaut/consumer/elasticsearch/TramConsumerElasticsearchFactory.java b/eventuate-tram-micronaut-consumer-elasticsearch/src/main/java/io/eventuate/tram/micronaut/consumer/elasticsearch/TramConsumerElasticsearchFactory.java new file mode 100644 index 0000000..a607366 --- /dev/null +++ b/eventuate-tram-micronaut-consumer-elasticsearch/src/main/java/io/eventuate/tram/micronaut/consumer/elasticsearch/TramConsumerElasticsearchFactory.java @@ -0,0 +1,19 @@ +package io.eventuate.tram.micronaut.consumer.elasticsearch; + +import javax.inject.Singleton; +import org.elasticsearch.client.RestHighLevelClient; +import io.eventuate.tram.consumer.common.DuplicateMessageDetector; +import io.eventuate.tram.consumer.elasticsearch.ElasticsearchConsumerConfigurationProperties; +import io.eventuate.tram.consumer.elasticsearch.ElasticsearchIndexDuplicateMessageDetector; +import io.micronaut.context.annotation.Factory; +import io.micronaut.context.annotation.Requires; + +@Factory +public class TramConsumerElasticsearchFactory { + + @Singleton + @Requires(missingProperty = "transactional.noop.duplicate.message.detector.factory.enabled") + public DuplicateMessageDetector duplicateMessageDetector(RestHighLevelClient client, ElasticsearchConsumerConfigurationProperties properties) { + return new ElasticsearchIndexDuplicateMessageDetector(client, properties); + } +} diff --git a/eventuate-tram-micronaut-jdbc-kafka-elasticsearch/build.gradle b/eventuate-tram-micronaut-jdbc-kafka-elasticsearch/build.gradle new file mode 100644 index 0000000..be56204 --- /dev/null +++ b/eventuate-tram-micronaut-jdbc-kafka-elasticsearch/build.gradle @@ -0,0 +1,8 @@ +apply plugin: PublicModulePlugin + +dependencies { + compile "io.eventuate.messaging.kafka.elasticsearch:eventuate-messaging-kafka-offset-store-elasticsearch-micronaut-consumer:$eventuateMessagingKafkaVersion" + compile "io.eventuate.tram.core:eventuate-tram-micronaut-jdbc-kafka:$eventuateTramCoreVersion" + + compile project(":eventuate-tram-micronaut-consumer-elasticsearch") +} \ No newline at end of file diff --git a/eventuate-tram-spring-consumer-elasticsearch/build.gradle b/eventuate-tram-spring-consumer-elasticsearch/build.gradle new file mode 100644 index 0000000..3eb6299 --- /dev/null +++ b/eventuate-tram-spring-consumer-elasticsearch/build.gradle @@ -0,0 +1,13 @@ +apply plugin: PublicModulePlugin + +dependencies { + compile project(":eventuate-tram-consumer-elasticsearch") + + compile "io.eventuate.tram.core:eventuate-tram-spring-jdbc-kafka:$eventuateTramCoreVersion" + compile "io.eventuate.tram.core:eventuate-tram-spring-producer-jdbc:$eventuateTramCoreVersion" + compile "io.eventuate.messaging.kafka.elasticsearch:eventuate-messaging-kafka-offset-store-elasticsearch-spring-consumer:$eventuateMessagingKafkaVersion" + + compile "org.springframework.boot:spring-boot-starter:$springBootVersion" + + testCompile "org.springframework.boot:spring-boot-starter-test:$springBootCdcVersion" +} \ No newline at end of file diff --git a/eventuate-tram-spring-consumer-elasticsearch/src/main/java/io/eventuate/tram/spring/consumer/elasticsearch/ElasticsearchConsumerSpringConfigurationProperties.java b/eventuate-tram-spring-consumer-elasticsearch/src/main/java/io/eventuate/tram/spring/consumer/elasticsearch/ElasticsearchConsumerSpringConfigurationProperties.java new file mode 100644 index 0000000..983c5ea --- /dev/null +++ b/eventuate-tram-spring-consumer-elasticsearch/src/main/java/io/eventuate/tram/spring/consumer/elasticsearch/ElasticsearchConsumerSpringConfigurationProperties.java @@ -0,0 +1,33 @@ +package io.eventuate.tram.spring.consumer.elasticsearch; + +import java.util.HashMap; +import java.util.Map; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("eventuate.local.consumer.elasticsearch") +public class ElasticsearchConsumerSpringConfigurationProperties { + private Map properties = new HashMap<>(); + + private String receivedMessagesIndexName; + private String receivedMessagesTypeName; + + public String getReceivedMessagesIndexName() { + return receivedMessagesIndexName; + } + + public void setReceivedMessagesIndexName(String receivedMessagesIndexName) { + this.receivedMessagesIndexName = receivedMessagesIndexName; + } + + public String getReceivedMessagesTypeName() { + return receivedMessagesTypeName; + } + + public void setReceivedMessagesTypeName(String receivedMessagesTypeName) { + this.receivedMessagesTypeName = receivedMessagesTypeName; + } + + public Map getProperties() { + return properties; + } +} diff --git a/eventuate-tram-spring-consumer-elasticsearch/src/main/java/io/eventuate/tram/spring/consumer/elasticsearch/ElasticsearchConsumerSpringConfigurationPropertiesConfiguration.java b/eventuate-tram-spring-consumer-elasticsearch/src/main/java/io/eventuate/tram/spring/consumer/elasticsearch/ElasticsearchConsumerSpringConfigurationPropertiesConfiguration.java new file mode 100644 index 0000000..b3d6c35 --- /dev/null +++ b/eventuate-tram-spring-consumer-elasticsearch/src/main/java/io/eventuate/tram/spring/consumer/elasticsearch/ElasticsearchConsumerSpringConfigurationPropertiesConfiguration.java @@ -0,0 +1,17 @@ +package io.eventuate.tram.spring.consumer.elasticsearch; + +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import io.eventuate.tram.consumer.elasticsearch.ElasticsearchConsumerConfigurationProperties; + +@EnableConfigurationProperties(ElasticsearchConsumerSpringConfigurationProperties.class) +public class ElasticsearchConsumerSpringConfigurationPropertiesConfiguration { + + @Bean + ElasticsearchConsumerConfigurationProperties elasticsearchConsumerSpringConfigurationProperties(ElasticsearchConsumerSpringConfigurationProperties elasticsearchConsumerSpringConfigurationProperties) { + ElasticsearchConsumerConfigurationProperties properties = new ElasticsearchConsumerConfigurationProperties(elasticsearchConsumerSpringConfigurationProperties.getProperties()); + properties.setReceivedMessagesIndexName(elasticsearchConsumerSpringConfigurationProperties.getReceivedMessagesIndexName()); + properties.setReceivedMessagesTypeName(elasticsearchConsumerSpringConfigurationProperties.getReceivedMessagesTypeName()); + return properties; + } +} diff --git a/eventuate-tram-spring-consumer-elasticsearch/src/main/java/io/eventuate/tram/spring/consumer/elasticsearch/TramConsumerElasticsearchAutoConfiguration.java b/eventuate-tram-spring-consumer-elasticsearch/src/main/java/io/eventuate/tram/spring/consumer/elasticsearch/TramConsumerElasticsearchAutoConfiguration.java new file mode 100644 index 0000000..e23ea6c --- /dev/null +++ b/eventuate-tram-spring-consumer-elasticsearch/src/main/java/io/eventuate/tram/spring/consumer/elasticsearch/TramConsumerElasticsearchAutoConfiguration.java @@ -0,0 +1,19 @@ +package io.eventuate.tram.spring.consumer.elasticsearch; + +import org.elasticsearch.client.RestHighLevelClient; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import io.eventuate.tram.consumer.common.DuplicateMessageDetector; +import io.eventuate.tram.consumer.elasticsearch.ElasticsearchConsumerConfigurationProperties; +import io.eventuate.tram.consumer.elasticsearch.ElasticsearchIndexDuplicateMessageDetector; + +@Configuration +@ConditionalOnMissingBean(DuplicateMessageDetector.class) +public class TramConsumerElasticsearchAutoConfiguration { + + @Bean + public DuplicateMessageDetector duplicateMessageDetector(RestHighLevelClient client, ElasticsearchConsumerConfigurationProperties properties) { + return new ElasticsearchIndexDuplicateMessageDetector(client, properties); + } +} diff --git a/eventuate-tram-spring-consumer-elasticsearch/src/test/java/io/eventuate/tram/spring/consumer/elasticsearch/EventuateSpringElasticsearchIndexBasedDuplicateMessageDetectorTest.java b/eventuate-tram-spring-consumer-elasticsearch/src/test/java/io/eventuate/tram/spring/consumer/elasticsearch/EventuateSpringElasticsearchIndexBasedDuplicateMessageDetectorTest.java new file mode 100644 index 0000000..cb10563 --- /dev/null +++ b/eventuate-tram-spring-consumer-elasticsearch/src/test/java/io/eventuate/tram/spring/consumer/elasticsearch/EventuateSpringElasticsearchIndexBasedDuplicateMessageDetectorTest.java @@ -0,0 +1,74 @@ +package io.eventuate.tram.spring.consumer.elasticsearch; + +import java.io.IOException; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.junit4.SpringRunner; +import io.eventuate.tram.consumer.common.DuplicateMessageDetector; +import io.eventuate.tram.consumer.common.SubscriberIdAndMessage; +import io.eventuate.tram.consumer.elasticsearch.ElasticsearchConsumerConfigurationProperties; +import io.eventuate.tram.messaging.common.Message; +import io.eventuate.tram.messaging.producer.MessageBuilder; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = {EventuateSpringElasticsearchIndexBasedDuplicateMessageDetectorTest.DuplicateMessageDetectorTestConfiguration.class}, webEnvironment = SpringBootTest.WebEnvironment.NONE) +public class EventuateSpringElasticsearchIndexBasedDuplicateMessageDetectorTest { + + @Autowired + private DuplicateMessageDetector duplicateMessageDetector; + + @Autowired + private RestHighLevelClient elasticsearchClient; + + @Autowired + private ElasticsearchConsumerConfigurationProperties properties; + + @Configuration + @EnableAutoConfiguration + @Import({ + TramConsumerElasticsearchAutoConfiguration.class, + ElasticsearchConsumerSpringConfigurationPropertiesConfiguration.class + }) + static public class DuplicateMessageDetectorTestConfiguration { + } + + @Test + public void shouldDetectDuplicate() throws IOException { + + createReceivedMessagesIndexIfNotExists(); + + String consumerId = getClass().getName(); + String messageId = Long.toString(System.currentTimeMillis()); + + assertFalse(duplicateMessageDetector.isDuplicate(consumerId, messageId)); + + duplicateMessageDetector.doWithMessage(new SubscriberIdAndMessage(consumerId, buildMessage(messageId)), () -> {}); + + assertTrue(duplicateMessageDetector.isDuplicate(consumerId, messageId)); + } + + private static Message buildMessage(String messageId) { + return MessageBuilder.withPayload("{}").withHeader(Message.ID, messageId).build(); + } + + private void createReceivedMessagesIndexIfNotExists() throws IOException { + String receivedMessagesIndexName = properties.getReceivedMessagesIndexName(); + boolean exists = elasticsearchClient.indices().exists(new GetIndexRequest(receivedMessagesIndexName), RequestOptions.DEFAULT); + if (!exists) { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(receivedMessagesIndexName); + elasticsearchClient.indices().create(createIndexRequest, RequestOptions.DEFAULT); + } + } +} \ No newline at end of file diff --git a/eventuate-tram-spring-jdbc-kafka-elasticsearch/build.gradle b/eventuate-tram-spring-jdbc-kafka-elasticsearch/build.gradle new file mode 100644 index 0000000..c64829f --- /dev/null +++ b/eventuate-tram-spring-jdbc-kafka-elasticsearch/build.gradle @@ -0,0 +1,8 @@ +apply plugin: PublicModulePlugin + +dependencies { + compile project(":eventuate-tram-spring-consumer-elasticsearch") + + testCompile "org.springframework.boot:spring-boot-starter-test:$springBootCdcVersion" + testCompile "com.h2database:h2:1.3.166" +} \ No newline at end of file diff --git a/eventuate-tram-spring-jdbc-kafka-elasticsearch/src/main/java/io/eventuate/tram/spring/jdbckafka/elasticsearch/TramJdbcKafkaElasticsearchConfiguration.java b/eventuate-tram-spring-jdbc-kafka-elasticsearch/src/main/java/io/eventuate/tram/spring/jdbckafka/elasticsearch/TramJdbcKafkaElasticsearchConfiguration.java new file mode 100644 index 0000000..c45a827 --- /dev/null +++ b/eventuate-tram-spring-jdbc-kafka-elasticsearch/src/main/java/io/eventuate/tram/spring/jdbckafka/elasticsearch/TramJdbcKafkaElasticsearchConfiguration.java @@ -0,0 +1,22 @@ +package io.eventuate.tram.spring.jdbckafka.elasticsearch; + +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import io.eventuate.tram.consumer.kafka.elasticsearch.ElasticsearchKafkaConsumerFactorySpringConfiguration; +import io.eventuate.tram.consumer.kafka.elasticsearch.EventuateKafkaConsumerElasticsearchSpringConfigurationPropertiesConfiguration; +import io.eventuate.tram.spring.consumer.elasticsearch.ElasticsearchConsumerSpringConfigurationPropertiesConfiguration; +import io.eventuate.tram.spring.consumer.elasticsearch.TramConsumerElasticsearchAutoConfiguration; +import io.eventuate.tram.spring.consumer.kafka.EventuateTramKafkaMessageConsumerConfiguration; +import io.eventuate.tram.spring.messaging.producer.jdbc.TramMessageProducerJdbcConfiguration; + +@Configuration +@Import({ + ElasticsearchKafkaConsumerFactorySpringConfiguration.class, + TramConsumerElasticsearchAutoConfiguration.class, + TramMessageProducerJdbcConfiguration.class, + EventuateTramKafkaMessageConsumerConfiguration.class, + ElasticsearchConsumerSpringConfigurationPropertiesConfiguration.class, + EventuateKafkaConsumerElasticsearchSpringConfigurationPropertiesConfiguration.class, +}) +public class TramJdbcKafkaElasticsearchConfiguration { +} diff --git a/gradle.properties b/gradle.properties new file mode 100644 index 0000000..1922d3f --- /dev/null +++ b/gradle.properties @@ -0,0 +1,24 @@ +org.gradle.jvmargs=-XX:MaxPermSize=512m + +deployUrl=file:///Users/cer/.m2/testdeploy +bintrayRepoType=defineMe + +eventuateMavenRepoUrl=https://dl.bintray.com/eventuateio-oss/eventuate-maven-release,https://dl.bintray.com/eventuateio-oss/eventuate-maven-rc,https://dl.bintray.com/eventuateio-oss/eventuate-maven-milestone,file:///Users/cer/.m2/testdeploy,https://snapshots.repositories.eventuate.io/repository + +springBootVersion=1.4.7.RELEASE +springBootCdcVersion=2.1.1.RELEASE +micronautVersion=1.2.4 +eventuateUtilVersion=0.4.0.RELEASE + +springCloudContractDependenciesVersion=2.0.0.RELEASE +springDependencyManagementPluginVersion=1.0.3.RELEASE + +springCloudSleuthVersion=2.0.2.RELEASE +dockerComposePluginVersion=0.6.6 +eventuateCdcVersion=0.4.0.RELEASE +eventuateCommonVersion=0.9.0.RELEASE + +eventuateTramCoreVersion=0.25.0-SNAPSHOT +eventuateMessagingKafkaVersion=0.10.0-SNAPSHOT + +version=0.25.0-SNAPSHOT diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..87b738c Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..44e7c4d --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-5.2.1-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew new file mode 100644 index 0000000..af6708f --- /dev/null +++ b/gradlew @@ -0,0 +1,172 @@ +#!/usr/bin/env sh + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=$(save "$@") + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then + cd "$(dirname "$0")" +fi + +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat new file mode 100644 index 0000000..6d57edc --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,84 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/set-env.sh b/set-env.sh new file mode 100644 index 0000000..69c45b7 --- /dev/null +++ b/set-env.sh @@ -0,0 +1,15 @@ +if [ -z "$DOCKER_HOST_IP" ] ; then + if [ -z "$DOCKER_HOST" ] ; then + export DOCKER_HOST_IP=`hostname` + else + echo using ${DOCKER_HOST?} + XX=${DOCKER_HOST%\:*} + export DOCKER_HOST_IP=${XX#tcp\:\/\/} + fi +fi + +echo DOCKER_HOST_IP is $DOCKER_HOST_IP + +export EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS=$DOCKER_HOST_IP:9092 +export EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING=$DOCKER_HOST_IP:2181 +export EVENTUATE_EVENT_TRACKER_ITERATIONS=120 \ No newline at end of file diff --git a/settings.gradle b/settings.gradle new file mode 100644 index 0000000..779d7ff --- /dev/null +++ b/settings.gradle @@ -0,0 +1,6 @@ +include 'eventuate-tram-consumer-elasticsearch' +include 'eventuate-tram-spring-consumer-elasticsearch' +include 'eventuate-tram-spring-jdbc-kafka-elasticsearch' +include 'eventuate-tram-micronaut-jdbc-kafka-elasticsearch' +include 'eventuate-tram-micronaut-consumer-elasticsearch' +