From 174bc75316649399b777e4f6c52d66c94e8a2369 Mon Sep 17 00:00:00 2001 From: Radai Rosenblatt Date: Mon, 25 Nov 2019 13:33:38 -0800 Subject: [PATCH] update kafka clients and mario client, switch to shaded mario client (#160) Signed-off-by: Radai Rosenblatt --- build.gradle | 4 ++-- .../consumer/LiKafkaConsumerIntegrationTest.java | 12 +++++++++--- li-apache-kafka-clients/build.gradle | 4 ++-- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/build.gradle b/build.gradle index 0a16970..d738686 100644 --- a/build.gradle +++ b/build.gradle @@ -49,8 +49,8 @@ project.ext { url "https://github.com/linkedin/li-apache-kafka-clients" } } - liKafkaVersion = "2.0.0.21" - marioVersion = "0.0.23" + liKafkaVersion = "2.0.0.23" + marioVersion = "0.0.24" } subprojects { diff --git a/integration-tests/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerIntegrationTest.java b/integration-tests/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerIntegrationTest.java index fd63af9..fe7eb72 100644 --- a/integration-tests/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerIntegrationTest.java +++ b/integration-tests/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerIntegrationTest.java @@ -94,6 +94,11 @@ public Properties overridingProps() { Properties props = new Properties(); props.setProperty(KafkaConfig.NumPartitionsProp(), Integer.toString(NUM_PARTITIONS)); props.setProperty(KafkaConfig.LogRetentionTimeMillisProp(), "" + TimeUnit.DAYS.toMillis(1)); + props.setProperty(KafkaConfig.LogRollTimeMillisProp(), "" + TimeUnit.SECONDS.toMillis(5)); //makes retention kick-in faster + props.setProperty(KafkaConfig.LogDeleteDelayMsProp(), "100"); //makes retention kick-in faster + props.setProperty(KafkaConfig.LogCleanerBackoffMsProp(), "" + TimeUnit.SECONDS.toMillis(1)); //makes retention kick-in faster + props.setProperty(KafkaConfig.LogCleanupIntervalMsProp(), "" + TimeUnit.SECONDS.toMillis(10)); //makes retention kick-in faster + return props; } @@ -1497,15 +1502,16 @@ public void testFallOffStartWithLiClosest() throws Exception { long currentLso = initialLso; consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(initialLso, String.valueOf(consumer.safeOffset(tp))))); //wait for broker to truncate data that we have not read (we never called poll()) - long giveUp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5); + long timeout = TimeUnit.MINUTES.toMillis(5); + long giveUp = System.currentTimeMillis() + timeout; while (currentLso == initialLso && System.currentTimeMillis() < giveUp) { - Thread.sleep(5000); + Thread.sleep(1000); currentLso = consumer.beginningOffsets(tpAsCollection).get(tp); produceRecordsWithKafkaProducer(); } if (currentLso == initialLso) { throw new IllegalStateException("nothing was truncated broker-side within timeout. LogStartOffset = " + - currentLso + " remains the same after " + giveUp + "ms."); + currentLso + " remains the same after " + timeout + "ms."); } truncatedStartOffset = currentLso; } diff --git a/li-apache-kafka-clients/build.gradle b/li-apache-kafka-clients/build.gradle index e32f87c..d7b54a2 100644 --- a/li-apache-kafka-clients/build.gradle +++ b/li-apache-kafka-clients/build.gradle @@ -13,10 +13,10 @@ plugins { } dependencies { - compile "org.apache.httpcomponents:httpclient:4.5.7" compile "com.linkedin.kafka:kafka-clients:${rootProject.ext.liKafkaVersion}" - compile "com.linkedin.mario:mario-client:${rootProject.ext.marioVersion}" + compile "com.linkedin.mario:mario-client-all:${rootProject.ext.marioVersion}" compile "com.linkedin.mario:common:${rootProject.ext.marioVersion}" + testCompile "org.mockito:mockito-core:2.24.0" }