Skip to content

Commit

Permalink
Adds elasticsearch duplicate message detector
Browse files Browse the repository at this point in the history
  • Loading branch information
Didier Alberto Tabares Higuita committed Apr 27, 2020
1 parent 6817bb1 commit a72e3ff
Show file tree
Hide file tree
Showing 29 changed files with 969 additions and 0 deletions.
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
.gradle
build/
*.idea/
*.iml
*.log
out
.classpath
.project
.settings
bin
15 changes: 15 additions & 0 deletions build-and-test-all.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#! /bin/bash

set -e

. ./set-env.sh

docker-compose down -v

docker-compose up --build -d

sleep 10

./gradlew $GRADLE_OPTIONS cleanTest build $GRADLE_TASK_OPTIONS

docker-compose down -v
109 changes: 109 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
buildscript {
repositories {
mavenLocal()
jcenter()
//maven { url deployUrl }
maven { url "https://dl.bintray.com/eventuateio-oss/eventuate-maven-release" }
}
dependencies {
classpath 'com.jfrog.bintray.gradle:gradle-bintray-plugin:1.7.3'
classpath 'io.eventuate.plugins.gradle:eventuate-plugins-gradle-versions:0.1.0.RELEASE'
}
}

allprojects {
group = "io.eventuate.tram.elasticsearch"
}

apply plugin: "io.eventuate.plugins.gradle.UpgradeVersions"
apply plugin: WaitForCdcPlugin

allprojects {
apply plugin: 'java'
apply plugin: 'maven'
apply plugin: 'com.jfrog.bintray'

sourceCompatibility = 1.8
targetCompatibility = 1.8

bintray {
publish = true
user = System.getenv('BINTRAY_USER')
key = System.getenv('BINTRAY_KEY')
configurations = ['archives']
pkg {
repo = "eventuate-maven-$bintrayRepoType"
name = 'eventuate-tram'
licenses = ['Apache-2.0']
vcsUrl = 'https://github.com/eventuate-tram/eventuate-tram-elasticsearch'
}
}


repositories {
mavenLocal()
mavenCentral()
jcenter()
maven { url "https://repo.spring.io/milestone" }
eventuateMavenRepoUrl.split(',').each { repoUrl -> maven { url repoUrl } }
}

dependencies {
compile "javax.annotation:javax.annotation-api:1.3.2"
testCompile "junit:junit:4.12"
testCompile "org.mockito:mockito-core:2.23.4"
}

configurations {
deployerJars
}

dependencies {
deployerJars 'org.springframework.build:aws-maven:5.0.0.RELEASE'
}


uploadArchives {
repositories {
mavenDeployer {
configuration = configurations.deployerJars
repository(url: deployUrl) {
authentication(userName: System.getenv('S3_REPO_AWS_ACCESS_KEY'), password: System.getenv('S3_REPO_AWS_SECRET_ACCESS_KEY'))
}
pom.project {
licenses {
license {
name 'The Apache Software License, Version 2.0'
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
distribution 'repo'
}
}
}
}
}
}
}


gradle.projectsEvaluated {
task aggregateJavaDocs(type: Javadoc) {
description = 'Aggregated Javadoc API documentation of all subprojects.'
group = JavaBasePlugin.DOCUMENTATION_GROUP
dependsOn subprojects.findAll { subproject -> subproject.plugins.hasPlugin(PublicModulePlugin) }.javadoc

source subprojects.findAll { subproject -> subproject.plugins.hasPlugin(PublicModulePlugin) }.javadoc.source
destinationDir file("$buildDir/docs/javadoc")
classpath = files(subprojects.findAll { subproject -> subproject.plugins.hasPlugin(PublicModulePlugin) }.javadoc.classpath)
}

task("aggregateJavaDocsJar", type: org.gradle.api.tasks.bundling.Jar, dependsOn: project.aggregateJavaDocs) {
classifier = 'javadoc'
from 'build/docs/javadoc'
}

artifacts {
archives project.aggregateJavaDocsJar
}

}

11 changes: 11 additions & 0 deletions buildSrc/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
test.enabled=false

repositories {
mavenCentral()
jcenter()
}

dependencies {
compile "com.jayway.restassured:rest-assured:2.9.0"
}

18 changes: 18 additions & 0 deletions buildSrc/src/main/groovy/PrivateModulePlugin.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import org.gradle.api.Plugin
import org.gradle.api.Project

class PrivateModulePlugin implements Plugin<Project> {
void apply(Project project) {

project.task("sourcesJar", type: org.gradle.api.tasks.bundling.Jar) {
classifier = 'sources'
from project.sourceSets.main.allSource
// manifest = defaultManifest()
}

project.artifacts {
archives project.jar
archives project.sourcesJar
}
}
}
27 changes: 27 additions & 0 deletions buildSrc/src/main/groovy/PublicModulePlugin.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import org.gradle.api.Plugin
import org.gradle.api.Project

class PublicModulePlugin implements Plugin<Project> {
void apply(Project project) {

project.ext.genjavadoc = true

project.task("javadocJar", type: org.gradle.api.tasks.bundling.Jar, dependsOn: project.javadoc) {
classifier = 'javadoc'
from 'build/docs/javadoc'
// manifest = defaultManifest()
}

project.task("sourcesJar", type: org.gradle.api.tasks.bundling.Jar) {
classifier = 'sources'
from project.sourceSets.main.allSource
// manifest = defaultManifest()
}

project.artifacts {
archives project.jar
archives project.javadocJar
archives project.sourcesJar
}
}
}
57 changes: 57 additions & 0 deletions buildSrc/src/main/groovy/WaitForCdc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import org.gradle.api.DefaultTask;
import org.gradle.api.tasks.TaskAction;

import java.util.Optional;
import java.util.concurrent.TimeUnit;

import com.jayway.restassured.RestAssured;

public class WaitForCdc extends DefaultTask {

private String getenv(String name, String defaultValue) {
return Optional.ofNullable(System.getenv(name)).orElse(defaultValue);
}

@TaskAction
public void waitForCdc() {

String hostName = getenv("DOCKER_HOST_IP", "localhost");

getLogger().info("Connected to CDC on hostname={}", hostName);

long deadline = System.currentTimeMillis() + 1000 * 60;

while (System.currentTimeMillis() <= deadline) {

try {

String detail = RestAssured.given().
when().
get(String.format("http://%s:8099/actuator/health", hostName)).
then().
statusCode(200)
.extract().
path("details.binlogEntryReaderHealthCheck.details[\"detail-1\"]");

if (detail != null && detail.length() > 0 && !detail.endsWith("is not the leader")) {
getLogger().info("CDC is up");
return;
}

getLogger().info("CDC is not ready. Detail={}", detail);
} catch (Exception | AssertionError e) {
getLogger().error("Got error connecting to CDC {}", e.getMessage());
}


try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

throw new RuntimeException("CDC failed to start");

}
}
10 changes: 10 additions & 0 deletions buildSrc/src/main/groovy/WaitForCdcPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import org.gradle.api.Plugin;
import org.gradle.api.Project;

public class WaitForCdcPlugin implements Plugin<Project> {
@Override
public void apply(Project project) {
project.getTasks().create("waitForCdc", WaitForCdc.class);
}
}

26 changes: 26 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
zookeeper:
image: eventuateio/eventuate-zookeeper:0.4.0.RELEASE
ports:
- 2181:2181
- 2888:2888
- 3888:3888

kafka:
image: eventuateio/eventuate-kafka:0.4.0.RELEASE
ports:
- 9092:9092
links:
- zookeeper
environment:
- ADVERTISED_HOST_NAME=${DOCKER_HOST_IP}
- KAFKA_HEAP_OPTS=-Xmx320m -Xms320m
- ZOOKEEPER_SERVERS=zookeeper:2181
- ZOOKEEPER_CONNECTION_TIMEOUT_MS=60000

elasticsearch:
image: elasticsearch:7.6.2
ports:
- 9200:9200
- 9300:9300
environment:
- discovery.type=single-node
9 changes: 9 additions & 0 deletions eventuate-tram-consumer-elasticsearch/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apply plugin: PublicModulePlugin

dependencies {
compile "io.eventuate.tram.core:eventuate-tram-consumer-common:$eventuateTramCoreVersion"

compile "org.elasticsearch.client:elasticsearch-rest-high-level-client:7.6.2"
compile "com.fasterxml.jackson.core:jackson-databind:2.2.3"

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.eventuate.tram.consumer.elasticsearch;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class ElasticsearchConsumerConfigurationProperties {

private static final String DEFAULT_RECEIVED_MESSAGES_INDEX_NAME = "received-messages";
private static final String DEFAULT_RECEIVED_MESSAGES_TYPE_NAME = "_doc";

private Map<String, String> properties = new HashMap<>();

private String receivedMessagesIndexName;
private String receivedMessagesTypeName;

public String getReceivedMessagesIndexName() {
return Optional.ofNullable(receivedMessagesIndexName).orElse(DEFAULT_RECEIVED_MESSAGES_INDEX_NAME);
}

public void setReceivedMessagesIndexName(String receivedMessagesIndexName) {
this.receivedMessagesIndexName = receivedMessagesIndexName;
}

public String getReceivedMessagesTypeName() {
return Optional.ofNullable(receivedMessagesTypeName).orElse(DEFAULT_RECEIVED_MESSAGES_TYPE_NAME);
}

public void setReceivedMessagesTypeName(String receivedMessagesTypeName) {
this.receivedMessagesTypeName = receivedMessagesTypeName;
}


public ElasticsearchConsumerConfigurationProperties() {
}

public ElasticsearchConsumerConfigurationProperties(Map<String, String> properties) {
this.properties = properties;
}

public Map<String, String> getProperties() {
return properties;
}

public void setProperties(Map<String, String> properties) {
this.properties = properties;
}

public static ElasticsearchConsumerConfigurationProperties empty() {
return new ElasticsearchConsumerConfigurationProperties();
}
}
Loading

0 comments on commit a72e3ff

Please sign in to comment.