Skip to content

Commit

Permalink
introduce interfaces fr delegating clients, use them in instrumented …
Browse files Browse the repository at this point in the history
…clients (#136)
  • Loading branch information
radai-rosenblatt authored Aug 27, 2019
1 parent 4bddf07 commit a5da5c0
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information.
*/

package com.linkedin.kafka.clients.consumer;

import org.apache.kafka.clients.consumer.Consumer;


public interface DelegatingConsumer<K, V> extends Consumer<K, V> {
Consumer<K, V> getDelegate();
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* @param <K>
* @param <V>
*/
public class LiKafkaInstrumentedConsumerImpl<K, V> implements Consumer<K, V>, EventHandler {
public class LiKafkaInstrumentedConsumerImpl<K, V> implements DelegatingConsumer<K, V>, EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(LiKafkaInstrumentedConsumerImpl.class);

private final long initialConnectionTimeoutMs = TimeUnit.SECONDS.toMillis(30);
Expand Down Expand Up @@ -154,8 +154,8 @@ public void configChangeRequested(UUID commandId, Map<String, String> configDiff
//TODO - respond to command UUID
}

//package-private FOR TESTING
Consumer<K, V> getDelegate() {
@Override
public Consumer<K, V> getDelegate() {
return delegate;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information.
*/

package com.linkedin.kafka.clients.producer;

import org.apache.kafka.clients.producer.Producer;


public interface DelegatingProducer<K, V> extends Producer<K, V> {
Producer<K, V> getDelegate();
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* @param <K>
* @param <V>
*/
public class LiKafkaInstrumentedProducerImpl<K, V> implements Producer<K, V>, EventHandler {
public class LiKafkaInstrumentedProducerImpl<K, V> implements DelegatingProducer<K, V>, EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(LiKafkaInstrumentedProducerImpl.class);
private static final String BOUNDED_FLUSH_THREAD_PREFIX = "Bounded-Flush-Thread-";

Expand Down Expand Up @@ -156,8 +156,8 @@ public void configChangeRequested(UUID commandId, Map<String, String> configDiff
//TODO - respond to command UUID
}

//package-private FOR TESTING
Producer<K, V> getDelegate() {
@Override
public Producer<K, V> getDelegate() {
return delegate;
}

Expand Down

0 comments on commit a5da5c0

Please sign in to comment.