Skip to content

Commit

Permalink
fix: temp fix for redis broken connection issue (#35)
Browse files Browse the repository at this point in the history
* fix: temp fix for redis broken connection issue

* fix: temp fix for redis broken connection issue

* fix: temp fix for redis broken connection issue

* fix: temp fix for redis broken connection issue

* fix: temp fix for redis broken connection issue

* chore: version bump

* fix: temp fix for redis broken connection issue

* fix: temp fix for redis broken connection issue

* fix: temp fix for redis broken connection issue

* fix: temp fix for redis broken connection issue

* fix: fix checkstyle

* fix: temp fix for redis broken connection issue

* fix: temp fix for redis broken connection issue

* fix: temp fix for redis broken connection issue

* fix: fix checkstyle

* fix: temp fix for redis broken connection issue

* tests: add tests for redis sink temp fix
  • Loading branch information
sumitaich1998 authored Nov 24, 2023
1 parent 7f23e57 commit 7063dbe
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 35 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ plugins {
}

group 'com.gotocompany'
version '0.7.2'
version '0.7.3'

repositories {
mavenLocal()
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/gotocompany/depot/error/ErrorType.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public enum ErrorType {
SINK_4XX_ERROR,
SINK_5XX_ERROR,
SINK_RETRYABLE_ERROR,
SINK_NON_RETRYABLE_ERROR,
SINK_UNKNOWN_ERROR,
DEFAULT_ERROR //Deprecated
}
28 changes: 25 additions & 3 deletions src/main/java/com/gotocompany/depot/redis/RedisSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class RedisSink implements Sink {
private final RedisClient redisClient;
private final RedisParser redisParser;
private final Instrumentation instrumentation;
private static final int CONNECTION_RETRY = 2;

public RedisSink(RedisClient redisClient, RedisParser redisParser, Instrumentation instrumentation) {
this.redisClient = redisClient;
Expand All @@ -35,15 +36,36 @@ public SinkResponse pushToSink(List<Message> messages) {
List<RedisRecord> validRecords = splitterRecords.get(Boolean.TRUE);
SinkResponse sinkResponse = new SinkResponse();
invalidRecords.forEach(invalidRecord -> sinkResponse.addErrors(invalidRecord.getIndex(), invalidRecord.getErrorInfo()));
if (validRecords.size() > 0) {
List<RedisResponse> responses = redisClient.send(validRecords);
Map<Long, ErrorInfo> errorInfoMap = RedisSinkUtils.getErrorsFromResponse(validRecords, responses, instrumentation);
if (!validRecords.isEmpty()) {
Map<Long, ErrorInfo> errorInfoMap = send(validRecords);
errorInfoMap.forEach(sinkResponse::addErrors);
instrumentation.logInfo("Pushed a batch of {} records to Redis", validRecords.size());
}
return sinkResponse;
}

private Map<Long, ErrorInfo> send(List<RedisRecord> validRecords) {
List<RedisResponse> responses = null;
RuntimeException exception = null;
int retry = CONNECTION_RETRY;
while (retry > 0) {
try {
responses = redisClient.send(validRecords);
break;
} catch (RuntimeException e) {
exception = e;
e.printStackTrace();
redisClient.init();
}
retry--;
}
if (responses == null) {
return RedisSinkUtils.getNonRetryableErrors(validRecords, exception, instrumentation);
} else {
return RedisSinkUtils.getErrorsFromResponse(validRecords, responses, instrumentation);
}
}

@Override
public void close() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.gotocompany.depot.message.SinkConnectorSchemaMessageMode;
import com.gotocompany.depot.metrics.Instrumentation;
import com.gotocompany.depot.metrics.StatsDReporter;
import com.gotocompany.depot.redis.client.RedisClient;
import com.gotocompany.depot.redis.client.RedisClientFactory;
import com.gotocompany.depot.redis.parsers.RedisEntryParser;
import com.gotocompany.depot.redis.parsers.RedisEntryParserFactory;
Expand Down Expand Up @@ -76,8 +77,10 @@ public void init() {
* @return RedisSink
*/
public Sink create() {
RedisClient redisClient = RedisClientFactory.getClient(sinkConfig, statsDReporter);
redisClient.init();
return new RedisSink(
RedisClientFactory.getClient(sinkConfig, statsDReporter),
redisClient,
redisParser,
new Instrumentation(statsDReporter, RedisSink.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@
*/
public interface RedisClient extends Closeable {
List<RedisResponse> send(List<RedisRecord> records);

void init();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
import com.gotocompany.depot.redis.enums.RedisSinkDeploymentType;
import com.gotocompany.depot.redis.ttl.RedisTTLFactory;
import com.gotocompany.depot.redis.ttl.RedisTtl;
import com.gotocompany.depot.redis.util.RedisSinkUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.DefaultJedisClientConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;

import java.util.HashSet;
Expand All @@ -29,23 +28,9 @@ public static RedisClient getClient(RedisSinkConfig redisSinkConfig, StatsDRepor
RedisTtl redisTTL = RedisTTLFactory.getTTl(redisSinkConfig);
return RedisSinkDeploymentType.CLUSTER.equals(redisSinkDeploymentType)
? getRedisClusterClient(redisTTL, redisSinkConfig, statsDReporter)
: getRedisStandaloneClient(redisTTL, redisSinkConfig, statsDReporter);
: new RedisStandaloneClient(new Instrumentation(statsDReporter, RedisStandaloneClient.class), redisSinkConfig);
}

private static RedisStandaloneClient getRedisStandaloneClient(RedisTtl redisTTL, RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter) {
HostAndPort hostAndPort;
try {
hostAndPort = HostAndPort.parseString(StringUtils.trim(redisSinkConfig.getSinkRedisUrls()));
} catch (IllegalArgumentException e) {
throw new ConfigurationException(String.format("Invalid url for redis standalone: %s", redisSinkConfig.getSinkRedisUrls()));
}
DefaultJedisClientConfig jedisConfig = DefaultJedisClientConfig.builder()
.user(redisSinkConfig.getSinkRedisAuthUsername())
.password(redisSinkConfig.getSinkRedisAuthPassword())
.build();
Jedis jedis = new Jedis(hostAndPort, jedisConfig);
return new RedisStandaloneClient(new Instrumentation(statsDReporter, RedisStandaloneClient.class), redisTTL, jedis);
}

private static RedisClusterClient getRedisClusterClient(RedisTtl redisTTL, RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter) {
String[] redisUrls = redisSinkConfig.getSinkRedisUrls().split(DELIMITER);
Expand All @@ -57,11 +42,8 @@ private static RedisClusterClient getRedisClusterClient(RedisTtl redisTTL, Redis
} catch (IllegalArgumentException e) {
throw new ConfigurationException(String.format("Invalid url(s) for redis cluster: %s", redisSinkConfig.getSinkRedisUrls()));
}
DefaultJedisClientConfig jedisConfig = DefaultJedisClientConfig.builder()
.user(redisSinkConfig.getSinkRedisAuthUsername())
.password(redisSinkConfig.getSinkRedisAuthPassword())
.build();
JedisCluster jedisCluster = new JedisCluster(nodes, jedisConfig, redisSinkConfig.getSinkRedisMaxAttempts(), new GenericObjectPoolConfig<>());

JedisCluster jedisCluster = new JedisCluster(nodes, RedisSinkUtils.getJedisConfig(redisSinkConfig), redisSinkConfig.getSinkRedisMaxAttempts(), new GenericObjectPoolConfig<>());
return new RedisClusterClient(new Instrumentation(statsDReporter, RedisClusterClient.class), redisTTL, jedisCluster);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public List<RedisResponse> send(List<RedisRecord> records) {
.collect(Collectors.toList());
}

@Override
public void init() {
}

@Override
public void close() {
instrumentation.logInfo("Closing Jedis client");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package com.gotocompany.depot.redis.client;

import com.gotocompany.depot.config.RedisSinkConfig;
import com.gotocompany.depot.exception.ConfigurationException;
import com.gotocompany.depot.metrics.Instrumentation;
import com.gotocompany.depot.redis.client.response.RedisResponse;
import com.gotocompany.depot.redis.client.response.RedisStandaloneResponse;
import com.gotocompany.depot.redis.record.RedisRecord;
import com.gotocompany.depot.redis.ttl.RedisTTLFactory;
import com.gotocompany.depot.redis.ttl.RedisTtl;
import com.gotocompany.depot.metrics.Instrumentation;
import com.gotocompany.depot.redis.util.RedisSinkUtils;
import lombok.AllArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import redis.clients.jedis.DefaultJedisClientConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
Expand All @@ -21,7 +28,28 @@ public class RedisStandaloneClient implements RedisClient {

private final Instrumentation instrumentation;
private final RedisTtl redisTTL;
private final Jedis jedis;
private final DefaultJedisClientConfig defaultJedisClientConfig;
private final HostAndPort hostAndPort;
private Jedis jedis;

public RedisStandaloneClient(Instrumentation instrumentation, RedisTtl redisTTL, DefaultJedisClientConfig defaultJedisClientConfig, HostAndPort hostAndPort) {
this.instrumentation = instrumentation;
this.redisTTL = redisTTL;
this.defaultJedisClientConfig = defaultJedisClientConfig;
this.hostAndPort = hostAndPort;
}

public RedisStandaloneClient(Instrumentation instrumentation, RedisSinkConfig config) {
this(instrumentation, RedisTTLFactory.getTTl(config), RedisSinkUtils.getJedisConfig(config), getHostPort(config));
}

private static HostAndPort getHostPort(RedisSinkConfig config) {
try {
return HostAndPort.parseString(StringUtils.trim(config.getSinkRedisUrls()));
} catch (IllegalArgumentException e) {
throw new ConfigurationException(String.format("Invalid url for redis standalone: %s", config.getSinkRedisUrls()));
}
}

/**
* Pushes records in a transaction.
Expand All @@ -30,6 +58,8 @@ public class RedisStandaloneClient implements RedisClient {
* @param records records to send
* @return Custom response containing status of the API calls.
*/


@Override
public List<RedisResponse> send(List<RedisRecord> records) {
Pipeline jedisPipelined = jedis.pipelined();
Expand All @@ -43,9 +73,15 @@ public List<RedisResponse> send(List<RedisRecord> records) {
return responses.stream().map(RedisStandaloneResponse::process).collect(Collectors.toList());
}


@Override
public void close() {
instrumentation.logInfo("Closing Jedis client");
jedis.close();
}

public void init() {
instrumentation.logInfo("Initialising Jedis client: Host: {} Port: {}", hostAndPort.getHost(), hostAndPort.getPort());
jedis = new Jedis(hostAndPort, defaultJedisClientConfig);
}
}
21 changes: 21 additions & 0 deletions src/main/java/com/gotocompany/depot/redis/util/RedisSinkUtils.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.gotocompany.depot.redis.util;

import com.gotocompany.depot.config.RedisSinkConfig;
import com.gotocompany.depot.redis.client.response.RedisResponse;
import com.gotocompany.depot.redis.record.RedisRecord;
import com.gotocompany.depot.error.ErrorInfo;
import com.gotocompany.depot.error.ErrorType;
import com.gotocompany.depot.metrics.Instrumentation;
import redis.clients.jedis.DefaultJedisClientConfig;

import java.util.HashMap;
import java.util.List;
Expand All @@ -27,4 +29,23 @@ public static Map<Long, ErrorInfo> getErrorsFromResponse(List<RedisRecord> redis
);
return errors;
}

public static Map<Long, ErrorInfo> getNonRetryableErrors(List<RedisRecord> redisRecords, RuntimeException e, Instrumentation instrumentation) {
Map<Long, ErrorInfo> errors = new HashMap<>();
for (RedisRecord record : redisRecords) {
instrumentation.logError("Error while inserting to redis for message. Record: {}, Error: {}",
record.toString(), e.getMessage());
errors.put(record.getIndex(), new ErrorInfo(new Exception(e.getMessage()), ErrorType.SINK_NON_RETRYABLE_ERROR));

}
return errors;
}


public static DefaultJedisClientConfig getJedisConfig(RedisSinkConfig config) {
return DefaultJedisClientConfig.builder()
.user(config.getSinkRedisAuthUsername())
.password(config.getSinkRedisAuthPassword())
.build();
}
}
21 changes: 21 additions & 0 deletions src/test/java/com/gotocompany/depot/redis/RedisSinkTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,25 @@ public void shouldReportNetErrors() {
Assert.assertEquals("failed at 3", sinkResponse.getErrorsFor(3).getException().getMessage());
Assert.assertEquals("failed at 4", sinkResponse.getErrorsFor(4).getException().getMessage());
}

@Test
public void shouldReturnNonRetryableErrors() {
List<Message> messages = new ArrayList<>();
List<RedisRecord> records = new ArrayList<>();
records.add(new RedisRecord(new RedisListEntry("key1", "val1", null), 0L, null, null, true));
records.add(new RedisRecord(new RedisListEntry("key1", "val1", null), 1L, null, null, true));
records.add(new RedisRecord(new RedisListEntry("key1", "val1", null), 2L, null, null, true));
records.add(new RedisRecord(new RedisListEntry("key1", "val1", null), 3L, null, null, true));
records.add(new RedisRecord(new RedisListEntry("key1", "val1", null), 4L, null, null, true));
when(redisParser.convert(messages)).thenReturn(records);
List<RedisRecord> validRecords = records.stream().filter(RedisRecord::isValid).collect(Collectors.toList());
when(redisClient.send(validRecords)).thenThrow(new ClassCastException("[B cannot be cast to java.util.List"));
RedisSink redisSink = new RedisSink(redisClient, redisParser, instrumentation);
SinkResponse sinkResponse = redisSink.pushToSink(messages);
Assert.assertEquals(5, sinkResponse.getErrors().size());
Assert.assertEquals(ErrorType.SINK_NON_RETRYABLE_ERROR, sinkResponse.getErrorsFor(0).getErrorType());
Assert.assertEquals(ErrorType.SINK_NON_RETRYABLE_ERROR, sinkResponse.getErrorsFor(2).getErrorType());
Assert.assertEquals("[B cannot be cast to java.util.List", sinkResponse.getErrorsFor(3).getException().getMessage());
Assert.assertEquals("[B cannot be cast to java.util.List", sinkResponse.getErrorsFor(4).getException().getMessage());
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.gotocompany.depot.redis.client;

import com.gotocompany.depot.config.RedisSinkConfig;
import com.gotocompany.depot.metrics.Instrumentation;
import com.gotocompany.depot.redis.client.response.RedisResponse;
import com.gotocompany.depot.redis.client.response.RedisStandaloneResponse;
import com.gotocompany.depot.metrics.Instrumentation;
import com.gotocompany.depot.redis.record.RedisRecord;
import com.gotocompany.depot.redis.ttl.RedisTtl;
import org.junit.Assert;
Expand All @@ -11,9 +12,7 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
import redis.clients.jedis.*;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -29,10 +28,17 @@ public class RedisStandaloneClientTest {
private RedisTtl redisTTL;
@Mock
private Jedis jedis;
@Mock
private RedisSinkConfig redisSinkConfig;
@Mock
private DefaultJedisClientConfig defaultJedisClientConfig;
@Mock
private HostAndPort hostAndPort;


@Test
public void shouldCloseTheClient() throws IOException {
RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, jedis);
RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, defaultJedisClientConfig, hostAndPort, jedis);
redisClient.close();

Mockito.verify(instrumentation, Mockito.times(1)).logInfo("Closing Jedis client");
Expand All @@ -41,7 +47,7 @@ public void shouldCloseTheClient() throws IOException {

@Test
public void shouldSendRecordsToJedis() {
RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, jedis);
RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, defaultJedisClientConfig, hostAndPort, jedis);
Pipeline pipeline = Mockito.mock(Pipeline.class);
Response response = Mockito.mock(Response.class);
Mockito.when(jedis.pipelined()).thenReturn(pipeline);
Expand Down

0 comments on commit 7063dbe

Please sign in to comment.