Skip to content

Commit

Permalink
improve the performance of composing client metrics in liConsumer (#161)
Browse files Browse the repository at this point in the history
Signed-off-by: Radai Rosenblatt <[email protected]>
  • Loading branch information
radai-rosenblatt authored Dec 6, 2019
1 parent 174bc75 commit 2cbbfdc
Show file tree
Hide file tree
Showing 9 changed files with 389 additions and 12 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ project.ext {
}
}
liKafkaVersion = "2.0.0.23"
marioVersion = "0.0.24"
marioVersion = "0.0.25"
}

subprojects {
Expand Down
1 change: 0 additions & 1 deletion li-apache-kafka-clients/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ plugins {
dependencies {
compile "com.linkedin.kafka:kafka-clients:${rootProject.ext.liKafkaVersion}"
compile "com.linkedin.mario:mario-client-all:${rootProject.ext.marioVersion}"
compile "com.linkedin.mario:common:${rootProject.ext.marioVersion}"

testCompile "org.mockito:mockito-core:2.24.0"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@
/**
* this class allows delegating kafka metrics to an underlying delegate
* kafka client, allowing the delegate to be replaced/recreated without
* "invalidating" metrics maps user code may hold on to.
* "invalidating" metrics maps user code may hold on to. <br >
* it is meant to allow user code like the following to continue to "just work": <br >
*
* KafkaClient client = ... <br >
* Map&lt;MetricName, ? extends Metric&gt; metrics = client.getMetrics(); <br >
* // ... long time later ... <br >
* //do something with metrics map <br >
*
* while still allowing instrumented clients to replace the underlying kafka client
*/
public abstract class MetricsProxy implements Map {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.kafka.clients.largemessage.MessageAssembler;
import com.linkedin.kafka.clients.largemessage.MessageAssemblerImpl;
import com.linkedin.kafka.clients.largemessage.errors.ConsumerRecordsProcessingException;
import com.linkedin.kafka.clients.utils.CompositeMap;
import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -78,9 +79,7 @@ public class LiKafkaConsumerImpl<K, V> implements LiKafkaConsumer<K, V> {
private final long _autoCommitInterval;
private final LiOffsetResetStrategy _offsetResetStrategy;
private long _lastAutoCommitMs;

private final MetricName _skippedRecordsMetricName;
private final Metric __skippedRecordsMetric;
private final Map<MetricName, Metric> _extraMetrics = new HashMap<>(1);

private ConsumerRecordsProcessResult<K, V> _lastProcessedResult;

Expand Down Expand Up @@ -126,16 +125,16 @@ private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs,
byteArrayDeserializer,
byteArrayDeserializer);
_clientId = LiKafkaClientsUtils.getClientId(_kafkaConsumer);
_skippedRecordsMetricName = new MetricName(
MetricName skippedRecordsMetricName = new MetricName(
"records-skipped",
"lnkd",
"number of records skipped due to deserialization issues",
Collections.singletonMap("client-id", _clientId)
);
__skippedRecordsMetric = new Metric() {
Metric skippedRecordsMetric = new Metric() {
@Override
public MetricName metricName() {
return _skippedRecordsMetricName;
return skippedRecordsMetricName;
}

@Override
Expand All @@ -149,6 +148,8 @@ public Object metricValue() {
}
};

_extraMetrics.put(skippedRecordsMetricName, skippedRecordsMetric);

try {

// Instantiate segment deserializer if needed.
Expand Down Expand Up @@ -724,9 +725,8 @@ public Long committedSafeOffset(TopicPartition tp) {

@Override
public Map<MetricName, ? extends Metric> metrics() {
Map<MetricName, Metric> fromDelegate = new HashMap<>(_kafkaConsumer.metrics());
fromDelegate.put(_skippedRecordsMetricName, __skippedRecordsMetric);
return fromDelegate;
//noinspection unchecked
return new CompositeMap<>((Map<MetricName, Metric>) _kafkaConsumer.metrics(), _extraMetrics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information.
*/

package com.linkedin.kafka.clients.utils;

import java.util.Collection;
import java.util.Iterator;

/**
* quick and simple unmodifiable implementation of a collection on top of a pair of other collections.
* @param <T> value type
*/
public class CompositeCollection<T> implements Collection<T> {
private final Collection<T> a;
private final Collection<T> b;

public CompositeCollection(Collection<T> a, Collection<T> b) {
if (a == null || b == null) {
throw new IllegalArgumentException("arguments must not be null");
}
this.a = a;
this.b = b;
}

@Override
public int size() {
return a.size() + b.size();
}

@Override
public boolean isEmpty() {
return a.isEmpty() && b.isEmpty();
}

@Override
public boolean contains(Object o) {
return a.contains(o) || b.contains(o);
}

@Override
public Iterator<T> iterator() {
return new CompositeIterator<>(a.iterator(), b.iterator());
}

@Override
public Object[] toArray() {
throw new UnsupportedOperationException();
}

@Override
public <U> U[] toArray(U[] a) {
throw new UnsupportedOperationException();
}

@Override
public boolean add(T t) {
throw new UnsupportedOperationException();
}

@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}

@Override
public boolean containsAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean addAll(Collection<? extends T> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public void clear() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information.
*/

package com.linkedin.kafka.clients.utils;

import java.util.Iterator;

/**
* quick and simple unmodifiable implementation of an iterator on top of a pair of other iterators.
* @param <T> value type
*/
public class CompositeIterator<T> implements Iterator<T> {
private final Iterator<T> a;
private final Iterator<T> b;

public CompositeIterator(Iterator<T> a, Iterator<T> b) {
if (a == null || b == null) {
throw new IllegalArgumentException("arguments must not be null");
}
this.a = a;
this.b = b;
}

@Override
public boolean hasNext() {
return a.hasNext() || b.hasNext();
}

@Override
public T next() {
if (a.hasNext()) {
return a.next();
}
return b.next();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information.
*/

package com.linkedin.kafka.clients.utils;

import java.util.Collection;
import java.util.Map;
import java.util.Set;


/**
* quick and simple unmodifiable implementation of a map on top of a pair of other maps.
* note that this class is meant for a particular use case (composition of kafka client metrics)
* and is written to be fast over perfectly correct
* @param <K> key type
* @param <V> value type
*/
public class CompositeMap<K, V> implements Map<K, V> {
private final Map<K, V> a;
private final Map<K, V> b;

public CompositeMap(Map<K, V> a, Map<K, V> b) {
if (a == null || a.isEmpty() || b == null || b.isEmpty()) {
throw new IllegalArgumentException("arguments must be non empty");
}
this.a = a;
this.b = b;
}

@Override
public int size() {
return a.size() + b.size(); //we assume they're foreign
}

@Override
public boolean isEmpty() {
return a.isEmpty() && b.isEmpty();
}

@Override
public boolean containsKey(Object key) {
return a.containsKey(key) || b.containsKey(key);
}

@Override
public boolean containsValue(Object value) {
return a.containsValue(value) || b.containsValue(value);
}

@Override
public V get(Object key) {
//this assumes no map container a null value (and is faster than a containsKey + get)
V v = a.get(key);
if (v != null) {
return v;
}
return b.get(key);
}

@Override
public V put(K key, V value) {
throw new UnsupportedOperationException();
}

@Override
public V remove(Object key) {
throw new UnsupportedOperationException();
}

@Override
public void putAll(Map<? extends K, ? extends V> m) {
throw new UnsupportedOperationException();
}

@Override
public void clear() {
throw new UnsupportedOperationException();
}

@Override
public Set<K> keySet() {
return new CompositeSet<>(a.keySet(), b.keySet());
}

@Override
public Collection<V> values() {
return new CompositeCollection<>(a.values(), b.values());
}

@Override
public Set<Entry<K, V>> entrySet() {
return new CompositeSet<>(a.entrySet(), b.entrySet());
}
}
Loading

0 comments on commit 2cbbfdc

Please sign in to comment.