Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: initial kafka integration for whylogs java client #20

Draft
wants to merge 7 commits into
base: mainline
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions java/kafka-avro/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@

## Environment Setup

Clone the whylogs-examples repo from GitHub and work in the java/kafka-avro/ subdirectory

Use docker-compose to bring up a basic Kafka cluster and control dashboard.
```
% docker-compose up -d
Creating network "kafka-avro_default" with the default driver
Creating zookeeper ... done
Creating kafka-tools ... done
Creating broker ... done
Creating schema-registry ... done
Creating control-center ... done
```

Visit http://127.0.0.1:9021 to see the kafka health dashboard. From the dashboard you can see assess the health f the kefka cluster and see any active topics.

## Schema Definition

These java examples are built using `gradle`.
If you already have your Java environment set up, you should not need to install anything more to build these examples.

```
./gradlew build
```

## If you get an error...

`Could not initialize class org.codehaus.groovy.reflection.ReflectionCache`

Try upgrading your gradle installation.
```
gradle wrapper --gradle-version 6.3
```
I upgraded from 6.1 to 6.3. IntelliJ warns me that Gradle 6.3 is incompatible with
amazon-corretto-15.jdk but I have not seen ill-effects yet.

## Run the demos

There are two separate demos that carry out the duties of a Kafka Producer and Consumer.
The Producer and Consumer demos can be run sequentially in the same window as the events written are persistent in Kafka.

```
% gradle producer

> Task :producer
opening lending_club_1000.csv
Sent event...
Sent event...
Sent event...
Sent event...
```

```
% gradle consumer

> Task :consumer
Read 500 records
Read 444 records
Read 56 records
Read 0 records
Received 1000 events
Writing profile to profile_2021.bin
```




60 changes: 60 additions & 0 deletions java/kafka-avro/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
plugins {
java
application
id("com.github.davidmc24.gradle.plugin.avro") version "1.0.0"
}

application {
mainClassName = "com.whylogs.examples.ConsumerDemo"
}

group = "com.whylogs.example"
version = "1.0-SNAPSHOT"

repositories {
mavenCentral()
maven(url = "https://packages.confluent.io/maven/")
}

dependencies {
implementation("ai.whylabs:whylogs-core:0.0.2b3")
implementation("org.apache.commons:commons-csv:1.8")
implementation("org.apache.avro:avro:1.8.2")
implementation("org.apache.kafka:kafka-clients:2.7.0")
implementation("io.confluent:kafka-avro-serializer:6.1.0")
implementation("joda-time:joda-time:2.10.10")
}

// configuration of avro class generation plugin
avro {
isCreateSetters.set(false)
isCreateOptionalGetters.set(false)
isGettersReturnOptional.set(false)
isOptionalGettersForNullableFieldsOnly.set(false)
fieldVisibility.set("PUBLIC_DEPRECATED")
outputCharacterEncoding.set("UTF-8")
stringType.set("String")
templateDirectory.set(null as String?)
isEnableDecimalLogicalType.set(true)
}

task<Exec>("consumer") {
dependsOn("build")
// description("Run the whylogs consumer class with ExecTask")
commandLine( "java", "-classpath", sourceSets["main"].runtimeClasspath.getAsPath(), "com.whylogs.examples.ConsumerDemo")
}


tasks.withType<JavaCompile>().configureEach {
// warn about deprecated code.
options.setDeprecation(true)
// Sets any additional arguments to be passed to the compiler.
options.setCompilerArgs(listOf("-Xlint:unchecked"))
}

task<Exec>("producer") {
dependsOn("build")
// description("Run the whylogs producer class with ExecTask")
commandLine( "java", "-classpath", sourceSets["main"].runtimeClasspath.getAsPath(), "com.whylogs.examples.ProducerDemo")
}

76 changes: 76 additions & 0 deletions java/kafka-avro/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@

version: "3"

services:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-server:5.4.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: "true"
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"

kafka-tools:
image: confluentinc/cp-kafka:5.4.0
hostname: kafka-tools
container_name: kafka-tools
command: ["tail", "-f", "/dev/null"]
network_mode: "host"

schema-registry:
image: confluentinc/cp-schema-registry:5.4.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"

control-center:
image: confluentinc/cp-enterprise-control-center:5.4.0
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
89 changes: 89 additions & 0 deletions java/kafka-avro/gradlew.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem

@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################

@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal

set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%

@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi

@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"

@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome

set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto execute

echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.

goto fail

:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe

if exist "%JAVA_EXE%" goto execute

echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.

goto fail

:execute
@rem Setup the command line

set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar


@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*

:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd

:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1

:mainEnd
if "%OS%"=="Windows_NT" endlocal

:omega
9 changes: 9 additions & 0 deletions java/kafka-avro/settings.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
pluginManagement {
repositories {
gradlePluginPortal()
mavenCentral()
}
}

rootProject.name = 'kafka-avro'

Loading