Skip to content

Commit

Permalink
feat: add metrics for Redis sink (#39)
Browse files Browse the repository at this point in the history
* feat: add metrics for Redis sink

* feat: add metrics for Redis sink

* feat: add metrics for Redis sink

* feat: add metrics for Redis sink

* feat: add metrics for Redis sink

* tests: add tests for redis sink metrics

* tests: add tests for redis sink metrics

* tests: add tests for redis sink metrics

* tests: add tests for redis sink metrics

* docs: add docs for Redis sink metrics
  • Loading branch information
sumitaich1998 authored Dec 21, 2023
1 parent 081274e commit faba3ac
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 8 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ plugins {
}

group 'com.gotocompany'
version '0.8.1'
version '0.8.2'

repositories {
mavenLocal()
Expand Down
15 changes: 15 additions & 0 deletions docs/reference/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Sinks can have their own metrics, and they will be emmited while using sink conn

* [Bigquery Sink](metrics.md#bigquery-sink)
* [Bigtable Sink](metrics.md#bigtable-sink)
* [Redis Sink](metrics.md#redis-sink)


## Bigquery Sink

Expand Down Expand Up @@ -36,4 +38,17 @@ Time taken for bigtable insert/update operation performed

Total numbers of error occurred on bigtable insert/update operation

## Redis Sink

### `Redis Success Response Total`

Total number of successful records pushed to the Redis server

### `Redis No Response Total`

Total number of records which could not be pushed to the Redis server due to broken connection,client timeout,etc.

### `Redis Connection Retry Total`

Total number of attempts to recreate the connection to Redis server from the Jedis client.

24 changes: 24 additions & 0 deletions src/main/java/com/gotocompany/depot/metrics/RedisSinkMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.gotocompany.depot.metrics;

import com.gotocompany.depot.config.SinkConfig;

public class RedisSinkMetrics extends SinkMetrics {

public static final String REDIS_SINK_PREFIX = "redis_";

public RedisSinkMetrics(SinkConfig config) {
super(config);
}

public String getRedisSuccessResponseTotalMetric() {
return getApplicationPrefix() + SINK_PREFIX + REDIS_SINK_PREFIX + "success_response_total";
}

public String getRedisNoResponseTotalMetric() {
return getApplicationPrefix() + SINK_PREFIX + REDIS_SINK_PREFIX + "no_response_total";
}

public String getRedisConnectionRetryTotalMetric() {
return getApplicationPrefix() + SINK_PREFIX + REDIS_SINK_PREFIX + "connection_retry_total";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.gotocompany.depot.config.RedisSinkConfig;
import com.gotocompany.depot.exception.ConfigurationException;
import com.gotocompany.depot.metrics.Instrumentation;
import com.gotocompany.depot.metrics.RedisSinkMetrics;
import com.gotocompany.depot.redis.client.response.RedisResponse;
import com.gotocompany.depot.redis.client.response.RedisStandaloneResponse;
import com.gotocompany.depot.redis.record.RedisRecord;
Expand Down Expand Up @@ -33,6 +34,7 @@ public class RedisStandaloneClient implements RedisClient {
private Jedis jedis;
private final int connectionMaxRetries;
private final long connectionRetryBackoffMs;
private final RedisSinkMetrics redisSinkMetrics;

public RedisStandaloneClient(Instrumentation instrumentation, RedisSinkConfig config) {
this.instrumentation = instrumentation;
Expand All @@ -41,6 +43,7 @@ public RedisStandaloneClient(Instrumentation instrumentation, RedisSinkConfig co
this.redisTTL = RedisTTLFactory.getTTl(config);
this.defaultJedisClientConfig = RedisSinkUtils.getJedisConfig(config);
this.hostAndPort = getHostPort(config);
this.redisSinkMetrics = new RedisSinkMetrics(config);
}

private static HostAndPort getHostPort(RedisSinkConfig config) {
Expand All @@ -67,11 +70,13 @@ public List<RedisResponse> send(List<RedisRecord> records) {
while (retryCount >= 0) {
try {
redisResponseList = sendInternal(records);
instrumentSuccess(redisResponseList);
break;
} catch (RuntimeException e) {

e.printStackTrace();
if (retryCount == 0) {
instrumentFailure(records);
throw e;
}
instrumentation.logInfo("Backing off for " + connectionRetryBackoffMs + " milliseconds.");
Expand All @@ -81,6 +86,7 @@ public List<RedisResponse> send(List<RedisRecord> records) {
interruptedException.printStackTrace();
}
instrumentation.logInfo("Attempting to recreate Redis client. Retry attempt count : " + (connectionMaxRetries - retryCount + 1));
instrumentConnectionRetry();
this.init();

}
Expand Down Expand Up @@ -112,4 +118,18 @@ public void init() {
instrumentation.logInfo("Initialising Jedis client: Host: {} Port: {}", hostAndPort.getHost(), hostAndPort.getPort());
jedis = new Jedis(hostAndPort, defaultJedisClientConfig);
}

private void instrumentSuccess(List<RedisResponse> redisResponseList) {
long successCount = redisResponseList.size();
instrumentation.captureCount(redisSinkMetrics.getRedisSuccessResponseTotalMetric(), successCount);
}

private void instrumentFailure(List<RedisRecord> validRecords) {
long failureCount = validRecords.size();
instrumentation.captureCount(redisSinkMetrics.getRedisNoResponseTotalMetric(), failureCount);
}

private void instrumentConnectionRetry() {
instrumentation.captureCount(redisSinkMetrics.getRedisConnectionRetryTotalMetric(), 1L);
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,30 @@
package com.gotocompany.depot.redis.client;

import com.gotocompany.depot.config.RedisSinkConfig;
import com.gotocompany.depot.metrics.Instrumentation;
import com.gotocompany.depot.metrics.RedisSinkMetrics;
import com.gotocompany.depot.redis.client.response.RedisResponse;
import com.gotocompany.depot.redis.client.response.RedisStandaloneResponse;
import com.gotocompany.depot.redis.record.RedisRecord;
import com.gotocompany.depot.redis.ttl.RedisTtl;
import org.aeonbits.owner.ConfigFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;

import static org.mockito.Mockito.*;


@RunWith(MockitoJUnitRunner.class)
public class RedisStandaloneClientTest {
Expand All @@ -34,18 +41,27 @@ public class RedisStandaloneClientTest {
private HostAndPort hostAndPort;


private RedisSinkMetrics redisSinkMetrics;

@Before
public void setUp() {
System.setProperty("SINK_METRICS_APPLICATION_PREFIX", "xyz_");
RedisSinkConfig sinkConfig = ConfigFactory.create(RedisSinkConfig.class, System.getProperties());
redisSinkMetrics = new RedisSinkMetrics(sinkConfig);
}

@Test
public void shouldCloseTheClient() throws IOException {
RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, defaultJedisClientConfig, hostAndPort, jedis, 0, 2000);
RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, defaultJedisClientConfig, hostAndPort, jedis, 0, 2000, redisSinkMetrics);
redisClient.close();

Mockito.verify(instrumentation, Mockito.times(1)).logInfo("Closing Jedis client");
Mockito.verify(jedis, Mockito.times(1)).close();
verify(instrumentation, times(1)).logInfo("Closing Jedis client");
verify(jedis, times(1)).close();
}

@Test
public void shouldSendRecordsToJedis() {
RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, defaultJedisClientConfig, hostAndPort, jedis, 0, 2000);
RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, defaultJedisClientConfig, hostAndPort, jedis, 0, 2000, redisSinkMetrics);
Pipeline pipeline = Mockito.mock(Pipeline.class);
Response response = Mockito.mock(Response.class);
Mockito.when(jedis.pipelined()).thenReturn(pipeline);
Expand Down Expand Up @@ -75,13 +91,97 @@ public void shouldSendRecordsToJedis() {
}
);
List<RedisResponse> actualResponses = redisClient.send(redisRecords);
Mockito.verify(pipeline, Mockito.times(1)).multi();
Mockito.verify(pipeline, Mockito.times(1)).sync();
Mockito.verify(instrumentation, Mockito.times(1)).logDebug("jedis responses: {}", ob);
verify(pipeline, times(1)).multi();
verify(pipeline, times(1)).sync();
verify(instrumentation, times(1)).logDebug("jedis responses: {}", ob);
IntStream.range(0, actualResponses.size()).forEach(
index -> {
Assert.assertEquals(responses.get(index), actualResponses.get(index));
}
);
}

@Test
public void shouldInstrumentSuccess() {
RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, defaultJedisClientConfig, hostAndPort, jedis, 0, 2000, redisSinkMetrics);
Pipeline pipeline = Mockito.mock(Pipeline.class);
Response response = Mockito.mock(Response.class);
Mockito.when(jedis.pipelined()).thenReturn(pipeline);
Mockito.when(pipeline.exec()).thenReturn(response);
Object ob = new Object();
Mockito.when(response.get()).thenReturn(ob);
List<RedisRecord> redisRecords = new ArrayList<RedisRecord>() {{
add(Mockito.mock(RedisRecord.class));
add(Mockito.mock(RedisRecord.class));
add(Mockito.mock(RedisRecord.class));
add(Mockito.mock(RedisRecord.class));
add(Mockito.mock(RedisRecord.class));
add(Mockito.mock(RedisRecord.class));
}};
List<RedisStandaloneResponse> responses = new ArrayList<RedisStandaloneResponse>() {{
add(Mockito.mock(RedisStandaloneResponse.class));
add(Mockito.mock(RedisStandaloneResponse.class));
add(Mockito.mock(RedisStandaloneResponse.class));
add(Mockito.mock(RedisStandaloneResponse.class));
add(Mockito.mock(RedisStandaloneResponse.class));
add(Mockito.mock(RedisStandaloneResponse.class));
}};
IntStream.range(0, redisRecords.size()).forEach(
index -> {
Mockito.when(redisRecords.get(index).send(pipeline, redisTTL)).thenReturn(responses.get(index));
Mockito.when(responses.get(index).process()).thenReturn(responses.get(index));
}
);
redisClient.send(redisRecords);
verify(instrumentation, times(1)).captureCount("xyz_sink_redis_success_response_total", 6L);

}

@Test
public void shouldInstrumentFailure() {

RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, defaultJedisClientConfig, hostAndPort, jedis, 0, 2000, redisSinkMetrics);
Pipeline pipeline = Mockito.mock(Pipeline.class);
Mockito.when(jedis.pipelined()).thenReturn(pipeline);
List<RedisRecord> redisRecords = new ArrayList<RedisRecord>() {{
add(Mockito.mock(RedisRecord.class));
add(Mockito.mock(RedisRecord.class));
add(Mockito.mock(RedisRecord.class));
add(Mockito.mock(RedisRecord.class));
add(Mockito.mock(RedisRecord.class));
add(Mockito.mock(RedisRecord.class));
}};

IntStream.range(0, redisRecords.size()).forEach(
index -> Mockito.when(redisRecords.get(index).send(pipeline, redisTTL)).thenThrow(JedisConnectionException.class)
);
try {
redisClient.send(redisRecords);
} catch (JedisConnectionException ignored) {
}
verify(instrumentation, times(1)).captureCount("xyz_sink_redis_no_response_total", 6L);

}

@Test
public void shouldInstrumentConnectionRetry() {
RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, defaultJedisClientConfig, hostAndPort, jedis, 1, 2000, redisSinkMetrics);
List<RedisRecord> redisRecords = new ArrayList<RedisRecord>() {{
add(Mockito.mock(RedisRecord.class));
add(Mockito.mock(RedisRecord.class));
add(Mockito.mock(RedisRecord.class));
add(Mockito.mock(RedisRecord.class));
add(Mockito.mock(RedisRecord.class));
add(Mockito.mock(RedisRecord.class));
}};

try {
redisClient.send(redisRecords);
} catch (Exception ignored) {
}
verify(instrumentation, times(1)).captureCount("xyz_sink_redis_connection_retry_total", 1L);


}

}

0 comments on commit faba3ac

Please sign in to comment.