Skip to content

Commit

Permalink
feat: add socket and connection timeout configs in redis sink (#38)
Browse files Browse the repository at this point in the history
* feat: add socket and connection timeout configs in redis sink

* chore: version bump to 0.8.1

* fix: remove retry mechanism of temp fix

* tests: fix tests

* feat: make retry mechanism configurable

* docs: add docs for redis client retry configs

* tests: fix tests

* fix: redis client to handle retry mechanism

* fix: redis client to handle retry mechanism

* fix: redis client to handle retry mechanism

* fix: redis client to handle retry mechanism

* fix: redis client to handle retry mechanism

* fix: remove redundant RedisStandaloneClient constructor

* fix: add break after successfull push to redis

* fix: change default timeout values
  • Loading branch information
sumitaich1998 authored Dec 15, 2023
1 parent d3ad67b commit 081274e
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 42 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ plugins {
}

group 'com.gotocompany'
version '0.8.0'
version '0.8.1'

repositories {
mavenLocal()
Expand Down
33 changes: 33 additions & 0 deletions docs/reference/configuration/redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,36 @@ The Redis deployment you are using. At present, we support `Standalone` and `Clu
- Example value: `Standalone`
- Type: `required`
- Default value: `Standalone`

### `SINK_REDIS_SOCKET_TIMEOUT_MS`

The max time in milliseconds that the Redis client will wait for response from the Redis server.

- Example value: `4000`
- Type: `optional`
- Default value: `2000`

### `SINK_REDIS_CONNECTION_TIMEOUT_MS`

The max time in milliseconds that the Redis client will wait for establishing connection to the Redis server.

- Example value: `4000`
- Type: `optional`
- Default value: `2000`

### `SINK_REDIS_CONNECTION_RETRY_BACKOFF_MS`

The constant backoff time in milliseconds between subsequent retries to reestablish the Redis connection

- Example value: `4000`
- Type: `optional`
- Default value: `2000`

### `SINK_REDIS_CONNECTION_MAX_RETRIES`

The max no. of retries to reestablish the connection between Redis client and server.

- Example value: `5`
- Type: `optional`
- Default value: `2`

16 changes: 16 additions & 0 deletions src/main/java/com/gotocompany/depot/config/RedisSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ public interface RedisSinkConfig extends SinkConfig {
@ConverterClass(EmptyStringToNull.class)
String getSinkRedisAuthPassword();

@Key("SINK_REDIS_CONNECTION_TIMEOUT_MS")
@DefaultValue("5000")
int getSinkRedisConnectionTimeoutMs();

@Key("SINK_REDIS_SOCKET_TIMEOUT_MS")
@DefaultValue("10000")
int getSinkRedisSocketTimeoutMs();

@Key("SINK_REDIS_KEY_TEMPLATE")
String getSinkRedisKeyTemplate();

Expand Down Expand Up @@ -64,4 +72,12 @@ public interface RedisSinkConfig extends SinkConfig {
@ConverterClass(JsonToPropertiesConverter.class)
@DefaultValue("")
Properties getSinkRedisHashsetFieldToColumnMapping();

@Key("SINK_REDIS_CONNECTION_MAX_RETRIES")
@DefaultValue("1")
int getSinkRedisConnectionMaxRetries();

@Key("SINK_REDIS_CONNECTION_RETRY_BACKOFF_MS")
@DefaultValue("2000")
long getSinkRedisConnectionRetryBackoffMs();
}
37 changes: 9 additions & 28 deletions src/main/java/com/gotocompany/depot/redis/RedisSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ public class RedisSink implements Sink {
private final RedisClient redisClient;
private final RedisParser redisParser;
private final Instrumentation instrumentation;
private static final int CONNECTION_RETRY = 2;
private static final int CONNECTION_RETRY_BACKOFF_MILLIS = 2000;


public RedisSink(RedisClient redisClient, RedisParser redisParser, Instrumentation instrumentation) {
this.redisClient = redisClient;
this.redisParser = redisParser;
this.instrumentation = instrumentation;

}

@Override
Expand All @@ -46,33 +46,14 @@ public SinkResponse pushToSink(List<Message> messages) {
}

private Map<Long, ErrorInfo> send(List<RedisRecord> validRecords) {
List<RedisResponse> responses = null;
RuntimeException exception = null;
int retry = CONNECTION_RETRY;
while (retry > 0) {
try {
responses = redisClient.send(validRecords);
break;
} catch (RuntimeException e) {
exception = e;
e.printStackTrace();
instrumentation.logInfo("Backing off for " + CONNECTION_RETRY_BACKOFF_MILLIS + " milliseconds.");
try {
Thread.sleep(CONNECTION_RETRY_BACKOFF_MILLIS);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
instrumentation.logInfo("Attempting to recreate Redis client. Retry attempt count : " + (CONNECTION_RETRY - retry + 1));
redisClient.init();

}
retry--;
}
if (responses == null) {
return RedisSinkUtils.getNonRetryableErrors(validRecords, exception, instrumentation);
} else {
return RedisSinkUtils.getErrorsFromResponse(validRecords, responses, instrumentation);
List<RedisResponse> responses;
try {
responses = redisClient.send(validRecords);
} catch (RuntimeException e) {
return RedisSinkUtils.getNonRetryableErrors(validRecords, e, instrumentation);
}
return RedisSinkUtils.getErrorsFromResponse(validRecords, responses, instrumentation);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ public class RedisStandaloneClient implements RedisClient {
private final DefaultJedisClientConfig defaultJedisClientConfig;
private final HostAndPort hostAndPort;
private Jedis jedis;

public RedisStandaloneClient(Instrumentation instrumentation, RedisTtl redisTTL, DefaultJedisClientConfig defaultJedisClientConfig, HostAndPort hostAndPort) {
this.instrumentation = instrumentation;
this.redisTTL = redisTTL;
this.defaultJedisClientConfig = defaultJedisClientConfig;
this.hostAndPort = hostAndPort;
}
private final int connectionMaxRetries;
private final long connectionRetryBackoffMs;

public RedisStandaloneClient(Instrumentation instrumentation, RedisSinkConfig config) {
this(instrumentation, RedisTTLFactory.getTTl(config), RedisSinkUtils.getJedisConfig(config), getHostPort(config));
this.instrumentation = instrumentation;
this.connectionMaxRetries = config.getSinkRedisConnectionMaxRetries();
this.connectionRetryBackoffMs = config.getSinkRedisConnectionRetryBackoffMs();
this.redisTTL = RedisTTLFactory.getTTl(config);
this.defaultJedisClientConfig = RedisSinkUtils.getJedisConfig(config);
this.hostAndPort = getHostPort(config);
}

private static HostAndPort getHostPort(RedisSinkConfig config) {
Expand All @@ -62,6 +62,34 @@ private static HostAndPort getHostPort(RedisSinkConfig config) {

@Override
public List<RedisResponse> send(List<RedisRecord> records) {
int retryCount = connectionMaxRetries;
List<RedisResponse> redisResponseList = null;
while (retryCount >= 0) {
try {
redisResponseList = sendInternal(records);
break;
} catch (RuntimeException e) {

e.printStackTrace();
if (retryCount == 0) {
throw e;
}
instrumentation.logInfo("Backing off for " + connectionRetryBackoffMs + " milliseconds.");
try {
Thread.sleep(connectionRetryBackoffMs);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
instrumentation.logInfo("Attempting to recreate Redis client. Retry attempt count : " + (connectionMaxRetries - retryCount + 1));
this.init();

}
retryCount--;
}
return redisResponseList;
}

public List<RedisResponse> sendInternal(List<RedisRecord> records) {
Pipeline jedisPipelined = jedis.pipelined();
jedisPipelined.multi();
List<RedisStandaloneResponse> responses = records.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public static Map<Long, ErrorInfo> getNonRetryableErrors(List<RedisRecord> redis

public static DefaultJedisClientConfig getJedisConfig(RedisSinkConfig config) {
return DefaultJedisClientConfig.builder()
.connectionTimeoutMillis(config.getSinkRedisConnectionTimeoutMs())
.socketTimeoutMillis(config.getSinkRedisSocketTimeoutMs())
.user(config.getSinkRedisAuthUsername())
.password(config.getSinkRedisAuthPassword())
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.gotocompany.depot.redis.client;

import com.gotocompany.depot.config.RedisSinkConfig;
import com.gotocompany.depot.metrics.Instrumentation;
import com.gotocompany.depot.redis.client.response.RedisResponse;
import com.gotocompany.depot.redis.client.response.RedisStandaloneResponse;
Expand Down Expand Up @@ -28,8 +27,7 @@ public class RedisStandaloneClientTest {
private RedisTtl redisTTL;
@Mock
private Jedis jedis;
@Mock
private RedisSinkConfig redisSinkConfig;

@Mock
private DefaultJedisClientConfig defaultJedisClientConfig;
@Mock
Expand All @@ -38,7 +36,7 @@ public class RedisStandaloneClientTest {

@Test
public void shouldCloseTheClient() throws IOException {
RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, defaultJedisClientConfig, hostAndPort, jedis);
RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, defaultJedisClientConfig, hostAndPort, jedis, 0, 2000);
redisClient.close();

Mockito.verify(instrumentation, Mockito.times(1)).logInfo("Closing Jedis client");
Expand All @@ -47,7 +45,7 @@ public void shouldCloseTheClient() throws IOException {

@Test
public void shouldSendRecordsToJedis() {
RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, defaultJedisClientConfig, hostAndPort, jedis);
RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, defaultJedisClientConfig, hostAndPort, jedis, 0, 2000);
Pipeline pipeline = Mockito.mock(Pipeline.class);
Response response = Mockito.mock(Response.class);
Mockito.when(jedis.pipelined()).thenReturn(pipeline);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.gotocompany.depot.redis.util;

import com.google.common.collect.ImmutableMap;
import com.gotocompany.depot.config.RedisSinkConfig;
import com.gotocompany.depot.redis.client.entry.RedisListEntry;
import com.gotocompany.depot.redis.client.response.RedisClusterResponse;
import com.gotocompany.depot.redis.client.response.RedisResponse;
Expand All @@ -8,12 +10,14 @@
import com.gotocompany.depot.metrics.Instrumentation;
import com.gotocompany.depot.metrics.StatsDReporter;
import com.gotocompany.depot.redis.record.RedisRecord;
import org.aeonbits.owner.ConfigFactory;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import redis.clients.jedis.DefaultJedisClientConfig;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -68,4 +72,28 @@ public void shouldGetEmptyMapWhenNoErrors() {
Map<Long, ErrorInfo> errors = RedisSinkUtils.getErrorsFromResponse(records, responses, new Instrumentation(statsDReporter, RedisSinkUtils.class));
Assert.assertTrue(errors.isEmpty());
}


@Test
public void shouldSetRedisConnectionTimeoutMillis() {

RedisSinkConfig config = ConfigFactory.create(RedisSinkConfig.class, ImmutableMap.of(
"SINK_REDIS_CONNECTION_TIMEOUT_MS", "5000"
));
DefaultJedisClientConfig defaultJedisClientConfig = RedisSinkUtils.getJedisConfig(config);
Assert.assertEquals(5000, defaultJedisClientConfig.getConnectionTimeoutMillis());

}

@Test
public void shouldSetRedisSocketTimeoutMillis() {

RedisSinkConfig config = ConfigFactory.create(RedisSinkConfig.class, ImmutableMap.of(
"SINK_REDIS_SOCKET_TIMEOUT_MS", "7000"
));
DefaultJedisClientConfig defaultJedisClientConfig = RedisSinkUtils.getJedisConfig(config);
Assert.assertEquals(7000, defaultJedisClientConfig.getSocketTimeoutMillis());

}

}

0 comments on commit 081274e

Please sign in to comment.