From 7063dbea57e9fbae312b4970a4ee27c9b5356a17 Mon Sep 17 00:00:00 2001 From: Sumit Aich Date: Fri, 24 Nov 2023 20:59:25 +0530 Subject: [PATCH] fix: temp fix for redis broken connection issue (#35) * fix: temp fix for redis broken connection issue * fix: temp fix for redis broken connection issue * fix: temp fix for redis broken connection issue * fix: temp fix for redis broken connection issue * fix: temp fix for redis broken connection issue * chore: version bump * fix: temp fix for redis broken connection issue * fix: temp fix for redis broken connection issue * fix: temp fix for redis broken connection issue * fix: temp fix for redis broken connection issue * fix: fix checkstyle * fix: temp fix for redis broken connection issue * fix: temp fix for redis broken connection issue * fix: temp fix for redis broken connection issue * fix: fix checkstyle * fix: temp fix for redis broken connection issue * tests: add tests for redis sink temp fix --- build.gradle | 2 +- .../gotocompany/depot/error/ErrorType.java | 1 + .../gotocompany/depot/redis/RedisSink.java | 28 +++++++++++-- .../depot/redis/RedisSinkFactory.java | 5 ++- .../depot/redis/client/RedisClient.java | 2 + .../redis/client/RedisClientFactory.java | 26 ++---------- .../redis/client/RedisClusterClient.java | 4 ++ .../redis/client/RedisStandaloneClient.java | 40 ++++++++++++++++++- .../depot/redis/util/RedisSinkUtils.java | 21 ++++++++++ .../depot/redis/RedisSinkTest.java | 21 ++++++++++ .../client/RedisStandaloneClientTest.java | 18 ++++++--- 11 files changed, 133 insertions(+), 35 deletions(-) diff --git a/build.gradle b/build.gradle index c5ae0d53..ecff76ec 100644 --- a/build.gradle +++ b/build.gradle @@ -22,7 +22,7 @@ plugins { } group 'com.gotocompany' -version '0.7.2' +version '0.7.3' repositories { mavenLocal() diff --git a/src/main/java/com/gotocompany/depot/error/ErrorType.java b/src/main/java/com/gotocompany/depot/error/ErrorType.java index 592a31e6..5f098f79 100644 --- a/src/main/java/com/gotocompany/depot/error/ErrorType.java +++ b/src/main/java/com/gotocompany/depot/error/ErrorType.java @@ -7,6 +7,7 @@ public enum ErrorType { SINK_4XX_ERROR, SINK_5XX_ERROR, SINK_RETRYABLE_ERROR, + SINK_NON_RETRYABLE_ERROR, SINK_UNKNOWN_ERROR, DEFAULT_ERROR //Deprecated } diff --git a/src/main/java/com/gotocompany/depot/redis/RedisSink.java b/src/main/java/com/gotocompany/depot/redis/RedisSink.java index 3658d2e3..586d8cac 100644 --- a/src/main/java/com/gotocompany/depot/redis/RedisSink.java +++ b/src/main/java/com/gotocompany/depot/redis/RedisSink.java @@ -20,6 +20,7 @@ public class RedisSink implements Sink { private final RedisClient redisClient; private final RedisParser redisParser; private final Instrumentation instrumentation; + private static final int CONNECTION_RETRY = 2; public RedisSink(RedisClient redisClient, RedisParser redisParser, Instrumentation instrumentation) { this.redisClient = redisClient; @@ -35,15 +36,36 @@ public SinkResponse pushToSink(List messages) { List validRecords = splitterRecords.get(Boolean.TRUE); SinkResponse sinkResponse = new SinkResponse(); invalidRecords.forEach(invalidRecord -> sinkResponse.addErrors(invalidRecord.getIndex(), invalidRecord.getErrorInfo())); - if (validRecords.size() > 0) { - List responses = redisClient.send(validRecords); - Map errorInfoMap = RedisSinkUtils.getErrorsFromResponse(validRecords, responses, instrumentation); + if (!validRecords.isEmpty()) { + Map errorInfoMap = send(validRecords); errorInfoMap.forEach(sinkResponse::addErrors); instrumentation.logInfo("Pushed a batch of {} records to Redis", validRecords.size()); } return sinkResponse; } + private Map send(List validRecords) { + List responses = null; + RuntimeException exception = null; + int retry = CONNECTION_RETRY; + while (retry > 0) { + try { + responses = redisClient.send(validRecords); + break; + } catch (RuntimeException e) { + exception = e; + e.printStackTrace(); + redisClient.init(); + } + retry--; + } + if (responses == null) { + return RedisSinkUtils.getNonRetryableErrors(validRecords, exception, instrumentation); + } else { + return RedisSinkUtils.getErrorsFromResponse(validRecords, responses, instrumentation); + } + } + @Override public void close() throws IOException { diff --git a/src/main/java/com/gotocompany/depot/redis/RedisSinkFactory.java b/src/main/java/com/gotocompany/depot/redis/RedisSinkFactory.java index f14c7aa8..ed5070b3 100644 --- a/src/main/java/com/gotocompany/depot/redis/RedisSinkFactory.java +++ b/src/main/java/com/gotocompany/depot/redis/RedisSinkFactory.java @@ -9,6 +9,7 @@ import com.gotocompany.depot.message.SinkConnectorSchemaMessageMode; import com.gotocompany.depot.metrics.Instrumentation; import com.gotocompany.depot.metrics.StatsDReporter; +import com.gotocompany.depot.redis.client.RedisClient; import com.gotocompany.depot.redis.client.RedisClientFactory; import com.gotocompany.depot.redis.parsers.RedisEntryParser; import com.gotocompany.depot.redis.parsers.RedisEntryParserFactory; @@ -76,8 +77,10 @@ public void init() { * @return RedisSink */ public Sink create() { + RedisClient redisClient = RedisClientFactory.getClient(sinkConfig, statsDReporter); + redisClient.init(); return new RedisSink( - RedisClientFactory.getClient(sinkConfig, statsDReporter), + redisClient, redisParser, new Instrumentation(statsDReporter, RedisSink.class)); } diff --git a/src/main/java/com/gotocompany/depot/redis/client/RedisClient.java b/src/main/java/com/gotocompany/depot/redis/client/RedisClient.java index 92f0f8cf..6eacd0f2 100644 --- a/src/main/java/com/gotocompany/depot/redis/client/RedisClient.java +++ b/src/main/java/com/gotocompany/depot/redis/client/RedisClient.java @@ -11,4 +11,6 @@ */ public interface RedisClient extends Closeable { List send(List records); + + void init(); } diff --git a/src/main/java/com/gotocompany/depot/redis/client/RedisClientFactory.java b/src/main/java/com/gotocompany/depot/redis/client/RedisClientFactory.java index 89bc62ea..35104896 100644 --- a/src/main/java/com/gotocompany/depot/redis/client/RedisClientFactory.java +++ b/src/main/java/com/gotocompany/depot/redis/client/RedisClientFactory.java @@ -8,11 +8,10 @@ import com.gotocompany.depot.redis.enums.RedisSinkDeploymentType; import com.gotocompany.depot.redis.ttl.RedisTTLFactory; import com.gotocompany.depot.redis.ttl.RedisTtl; +import com.gotocompany.depot.redis.util.RedisSinkUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; -import redis.clients.jedis.DefaultJedisClientConfig; import redis.clients.jedis.HostAndPort; -import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import java.util.HashSet; @@ -29,23 +28,9 @@ public static RedisClient getClient(RedisSinkConfig redisSinkConfig, StatsDRepor RedisTtl redisTTL = RedisTTLFactory.getTTl(redisSinkConfig); return RedisSinkDeploymentType.CLUSTER.equals(redisSinkDeploymentType) ? getRedisClusterClient(redisTTL, redisSinkConfig, statsDReporter) - : getRedisStandaloneClient(redisTTL, redisSinkConfig, statsDReporter); + : new RedisStandaloneClient(new Instrumentation(statsDReporter, RedisStandaloneClient.class), redisSinkConfig); } - private static RedisStandaloneClient getRedisStandaloneClient(RedisTtl redisTTL, RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter) { - HostAndPort hostAndPort; - try { - hostAndPort = HostAndPort.parseString(StringUtils.trim(redisSinkConfig.getSinkRedisUrls())); - } catch (IllegalArgumentException e) { - throw new ConfigurationException(String.format("Invalid url for redis standalone: %s", redisSinkConfig.getSinkRedisUrls())); - } - DefaultJedisClientConfig jedisConfig = DefaultJedisClientConfig.builder() - .user(redisSinkConfig.getSinkRedisAuthUsername()) - .password(redisSinkConfig.getSinkRedisAuthPassword()) - .build(); - Jedis jedis = new Jedis(hostAndPort, jedisConfig); - return new RedisStandaloneClient(new Instrumentation(statsDReporter, RedisStandaloneClient.class), redisTTL, jedis); - } private static RedisClusterClient getRedisClusterClient(RedisTtl redisTTL, RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter) { String[] redisUrls = redisSinkConfig.getSinkRedisUrls().split(DELIMITER); @@ -57,11 +42,8 @@ private static RedisClusterClient getRedisClusterClient(RedisTtl redisTTL, Redis } catch (IllegalArgumentException e) { throw new ConfigurationException(String.format("Invalid url(s) for redis cluster: %s", redisSinkConfig.getSinkRedisUrls())); } - DefaultJedisClientConfig jedisConfig = DefaultJedisClientConfig.builder() - .user(redisSinkConfig.getSinkRedisAuthUsername()) - .password(redisSinkConfig.getSinkRedisAuthPassword()) - .build(); - JedisCluster jedisCluster = new JedisCluster(nodes, jedisConfig, redisSinkConfig.getSinkRedisMaxAttempts(), new GenericObjectPoolConfig<>()); + + JedisCluster jedisCluster = new JedisCluster(nodes, RedisSinkUtils.getJedisConfig(redisSinkConfig), redisSinkConfig.getSinkRedisMaxAttempts(), new GenericObjectPoolConfig<>()); return new RedisClusterClient(new Instrumentation(statsDReporter, RedisClusterClient.class), redisTTL, jedisCluster); } } diff --git a/src/main/java/com/gotocompany/depot/redis/client/RedisClusterClient.java b/src/main/java/com/gotocompany/depot/redis/client/RedisClusterClient.java index 9cddf250..6dd0131d 100644 --- a/src/main/java/com/gotocompany/depot/redis/client/RedisClusterClient.java +++ b/src/main/java/com/gotocompany/depot/redis/client/RedisClusterClient.java @@ -27,6 +27,10 @@ public List send(List records) { .collect(Collectors.toList()); } + @Override + public void init() { + } + @Override public void close() { instrumentation.logInfo("Closing Jedis client"); diff --git a/src/main/java/com/gotocompany/depot/redis/client/RedisStandaloneClient.java b/src/main/java/com/gotocompany/depot/redis/client/RedisStandaloneClient.java index ff04ad14..bd2e6b33 100644 --- a/src/main/java/com/gotocompany/depot/redis/client/RedisStandaloneClient.java +++ b/src/main/java/com/gotocompany/depot/redis/client/RedisStandaloneClient.java @@ -1,11 +1,18 @@ package com.gotocompany.depot.redis.client; +import com.gotocompany.depot.config.RedisSinkConfig; +import com.gotocompany.depot.exception.ConfigurationException; +import com.gotocompany.depot.metrics.Instrumentation; import com.gotocompany.depot.redis.client.response.RedisResponse; import com.gotocompany.depot.redis.client.response.RedisStandaloneResponse; import com.gotocompany.depot.redis.record.RedisRecord; +import com.gotocompany.depot.redis.ttl.RedisTTLFactory; import com.gotocompany.depot.redis.ttl.RedisTtl; -import com.gotocompany.depot.metrics.Instrumentation; +import com.gotocompany.depot.redis.util.RedisSinkUtils; import lombok.AllArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import redis.clients.jedis.DefaultJedisClientConfig; +import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.Pipeline; import redis.clients.jedis.Response; @@ -21,7 +28,28 @@ public class RedisStandaloneClient implements RedisClient { private final Instrumentation instrumentation; private final RedisTtl redisTTL; - private final Jedis jedis; + private final DefaultJedisClientConfig defaultJedisClientConfig; + private final HostAndPort hostAndPort; + private Jedis jedis; + + public RedisStandaloneClient(Instrumentation instrumentation, RedisTtl redisTTL, DefaultJedisClientConfig defaultJedisClientConfig, HostAndPort hostAndPort) { + this.instrumentation = instrumentation; + this.redisTTL = redisTTL; + this.defaultJedisClientConfig = defaultJedisClientConfig; + this.hostAndPort = hostAndPort; + } + + public RedisStandaloneClient(Instrumentation instrumentation, RedisSinkConfig config) { + this(instrumentation, RedisTTLFactory.getTTl(config), RedisSinkUtils.getJedisConfig(config), getHostPort(config)); + } + + private static HostAndPort getHostPort(RedisSinkConfig config) { + try { + return HostAndPort.parseString(StringUtils.trim(config.getSinkRedisUrls())); + } catch (IllegalArgumentException e) { + throw new ConfigurationException(String.format("Invalid url for redis standalone: %s", config.getSinkRedisUrls())); + } + } /** * Pushes records in a transaction. @@ -30,6 +58,8 @@ public class RedisStandaloneClient implements RedisClient { * @param records records to send * @return Custom response containing status of the API calls. */ + + @Override public List send(List records) { Pipeline jedisPipelined = jedis.pipelined(); @@ -43,9 +73,15 @@ public List send(List records) { return responses.stream().map(RedisStandaloneResponse::process).collect(Collectors.toList()); } + @Override public void close() { instrumentation.logInfo("Closing Jedis client"); jedis.close(); } + + public void init() { + instrumentation.logInfo("Initialising Jedis client: Host: {} Port: {}", hostAndPort.getHost(), hostAndPort.getPort()); + jedis = new Jedis(hostAndPort, defaultJedisClientConfig); + } } diff --git a/src/main/java/com/gotocompany/depot/redis/util/RedisSinkUtils.java b/src/main/java/com/gotocompany/depot/redis/util/RedisSinkUtils.java index 4de529e0..1ec6c62c 100644 --- a/src/main/java/com/gotocompany/depot/redis/util/RedisSinkUtils.java +++ b/src/main/java/com/gotocompany/depot/redis/util/RedisSinkUtils.java @@ -1,10 +1,12 @@ package com.gotocompany.depot.redis.util; +import com.gotocompany.depot.config.RedisSinkConfig; import com.gotocompany.depot.redis.client.response.RedisResponse; import com.gotocompany.depot.redis.record.RedisRecord; import com.gotocompany.depot.error.ErrorInfo; import com.gotocompany.depot.error.ErrorType; import com.gotocompany.depot.metrics.Instrumentation; +import redis.clients.jedis.DefaultJedisClientConfig; import java.util.HashMap; import java.util.List; @@ -27,4 +29,23 @@ public static Map getErrorsFromResponse(List redis ); return errors; } + + public static Map getNonRetryableErrors(List redisRecords, RuntimeException e, Instrumentation instrumentation) { + Map errors = new HashMap<>(); + for (RedisRecord record : redisRecords) { + instrumentation.logError("Error while inserting to redis for message. Record: {}, Error: {}", + record.toString(), e.getMessage()); + errors.put(record.getIndex(), new ErrorInfo(new Exception(e.getMessage()), ErrorType.SINK_NON_RETRYABLE_ERROR)); + + } + return errors; + } + + + public static DefaultJedisClientConfig getJedisConfig(RedisSinkConfig config) { + return DefaultJedisClientConfig.builder() + .user(config.getSinkRedisAuthUsername()) + .password(config.getSinkRedisAuthPassword()) + .build(); + } } diff --git a/src/test/java/com/gotocompany/depot/redis/RedisSinkTest.java b/src/test/java/com/gotocompany/depot/redis/RedisSinkTest.java index 963d5274..17a4aa80 100644 --- a/src/test/java/com/gotocompany/depot/redis/RedisSinkTest.java +++ b/src/test/java/com/gotocompany/depot/redis/RedisSinkTest.java @@ -145,4 +145,25 @@ public void shouldReportNetErrors() { Assert.assertEquals("failed at 3", sinkResponse.getErrorsFor(3).getException().getMessage()); Assert.assertEquals("failed at 4", sinkResponse.getErrorsFor(4).getException().getMessage()); } + + @Test + public void shouldReturnNonRetryableErrors() { + List messages = new ArrayList<>(); + List records = new ArrayList<>(); + records.add(new RedisRecord(new RedisListEntry("key1", "val1", null), 0L, null, null, true)); + records.add(new RedisRecord(new RedisListEntry("key1", "val1", null), 1L, null, null, true)); + records.add(new RedisRecord(new RedisListEntry("key1", "val1", null), 2L, null, null, true)); + records.add(new RedisRecord(new RedisListEntry("key1", "val1", null), 3L, null, null, true)); + records.add(new RedisRecord(new RedisListEntry("key1", "val1", null), 4L, null, null, true)); + when(redisParser.convert(messages)).thenReturn(records); + List validRecords = records.stream().filter(RedisRecord::isValid).collect(Collectors.toList()); + when(redisClient.send(validRecords)).thenThrow(new ClassCastException("[B cannot be cast to java.util.List")); + RedisSink redisSink = new RedisSink(redisClient, redisParser, instrumentation); + SinkResponse sinkResponse = redisSink.pushToSink(messages); + Assert.assertEquals(5, sinkResponse.getErrors().size()); + Assert.assertEquals(ErrorType.SINK_NON_RETRYABLE_ERROR, sinkResponse.getErrorsFor(0).getErrorType()); + Assert.assertEquals(ErrorType.SINK_NON_RETRYABLE_ERROR, sinkResponse.getErrorsFor(2).getErrorType()); + Assert.assertEquals("[B cannot be cast to java.util.List", sinkResponse.getErrorsFor(3).getException().getMessage()); + Assert.assertEquals("[B cannot be cast to java.util.List", sinkResponse.getErrorsFor(4).getException().getMessage()); + } } diff --git a/src/test/java/com/gotocompany/depot/redis/client/RedisStandaloneClientTest.java b/src/test/java/com/gotocompany/depot/redis/client/RedisStandaloneClientTest.java index 221220d9..35994f42 100644 --- a/src/test/java/com/gotocompany/depot/redis/client/RedisStandaloneClientTest.java +++ b/src/test/java/com/gotocompany/depot/redis/client/RedisStandaloneClientTest.java @@ -1,8 +1,9 @@ package com.gotocompany.depot.redis.client; +import com.gotocompany.depot.config.RedisSinkConfig; +import com.gotocompany.depot.metrics.Instrumentation; import com.gotocompany.depot.redis.client.response.RedisResponse; import com.gotocompany.depot.redis.client.response.RedisStandaloneResponse; -import com.gotocompany.depot.metrics.Instrumentation; import com.gotocompany.depot.redis.record.RedisRecord; import com.gotocompany.depot.redis.ttl.RedisTtl; import org.junit.Assert; @@ -11,9 +12,7 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.Pipeline; -import redis.clients.jedis.Response; +import redis.clients.jedis.*; import java.io.IOException; import java.util.ArrayList; @@ -29,10 +28,17 @@ public class RedisStandaloneClientTest { private RedisTtl redisTTL; @Mock private Jedis jedis; + @Mock + private RedisSinkConfig redisSinkConfig; + @Mock + private DefaultJedisClientConfig defaultJedisClientConfig; + @Mock + private HostAndPort hostAndPort; + @Test public void shouldCloseTheClient() throws IOException { - RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, jedis); + RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, defaultJedisClientConfig, hostAndPort, jedis); redisClient.close(); Mockito.verify(instrumentation, Mockito.times(1)).logInfo("Closing Jedis client"); @@ -41,7 +47,7 @@ public void shouldCloseTheClient() throws IOException { @Test public void shouldSendRecordsToJedis() { - RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, jedis); + RedisClient redisClient = new RedisStandaloneClient(instrumentation, redisTTL, defaultJedisClientConfig, hostAndPort, jedis); Pipeline pipeline = Mockito.mock(Pipeline.class); Response response = Mockito.mock(Response.class); Mockito.when(jedis.pipelined()).thenReturn(pipeline);