Skip to content

Commit

Permalink
Makes error type for retryable error configurable through env
Browse files Browse the repository at this point in the history
  • Loading branch information
ekawinataa committed Jul 19, 2024
1 parent 312cc13 commit 6a706cb
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.gotocompany.firehose.config;

import com.gotocompany.depot.error.ErrorType;
import com.gotocompany.firehose.config.converter.GrpcMetadataConverter;
import com.gotocompany.firehose.config.converter.GrpcSinkRetryErrorTypeConverter;
import io.grpc.Metadata;
import org.aeonbits.owner.Config;

Expand Down Expand Up @@ -34,6 +36,11 @@ public interface GrpcSinkConfig extends AppConfig {
@DefaultValue("")
String getSinkGrpcResponseRetryCELExpression();

@Config.Key("SINK_GRPC_RESPONSE_RETRY_ERROR_TYPE")
@DefaultValue("ErrorType.DEFAULT_ERROR")
@ConverterClass(GrpcSinkRetryErrorTypeConverter.class)
ErrorType getSinkGrpcRetryErrorType();

@Key("SINK_GRPC_METADATA")
@DefaultValue("")
@ConverterClass(GrpcMetadataConverter.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.gotocompany.firehose.config.converter;

import com.gotocompany.depot.error.ErrorType;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;
import java.util.Locale;

public class GrpcSinkRetryErrorTypeConverter implements Converter<ErrorType> {
@Override
public ErrorType convert(Method method, String s) {
return ErrorType.valueOf(s.trim().toUpperCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import com.gotocompany.depot.error.ErrorInfo;
import com.gotocompany.depot.error.ErrorType;
import com.gotocompany.firehose.config.GrpcSinkConfig;
import com.gotocompany.firehose.evaluator.PayloadEvaluator;
import com.gotocompany.firehose.exception.DefaultException;
import com.gotocompany.firehose.exception.DeserializerException;
Expand All @@ -25,16 +26,19 @@ public class GrpcSink extends AbstractSink {

private final GrpcClient grpcClient;
private final StencilClient stencilClient;
private final GrpcSinkConfig grpcSinkConfig;
private List<Message> messages;
private PayloadEvaluator<com.google.protobuf.Message> retryEvaluator;

public GrpcSink(FirehoseInstrumentation firehoseInstrumentation,
GrpcClient grpcClient,
StencilClient stencilClient,
GrpcSinkConfig grpcSinkConfig,
PayloadEvaluator<com.google.protobuf.Message> retryEvaluator) {
super(firehoseInstrumentation, "grpc");
this.grpcClient = grpcClient;
this.stencilClient = stencilClient;
this.grpcSinkConfig = grpcSinkConfig;
this.retryEvaluator = retryEvaluator;
}

Expand Down Expand Up @@ -74,7 +78,7 @@ private void setRetryableErrorInfo(Message message, DynamicMessage dynamicMessag
boolean eligibleToRetry = retryEvaluator.evaluate(dynamicMessage);
if (eligibleToRetry) {
getFirehoseInstrumentation().logDebug("Retrying grpc service");
message.setErrorInfo(new ErrorInfo(new DefaultException("Retryable gRPC Error"), ErrorType.SINK_RETRYABLE_ERROR));
message.setErrorInfo(new ErrorInfo(new DefaultException("Retryable gRPC Error"), grpcSinkConfig.getSinkGrpcRetryErrorType()));
return;
}
message.setErrorInfo(new ErrorInfo(new DefaultException("Non Retryable gRPC Error"), ErrorType.SINK_NON_RETRYABLE_ERROR));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static AbstractSink create(Map<String, String> configuration, StatsDRepor
GrpcClient grpcClient = new GrpcClient(new FirehoseInstrumentation(statsDReporter, GrpcClient.class), grpcConfig, managedChannel, stencilClient);
firehoseInstrumentation.logInfo("GRPC connection established");
PayloadEvaluator<Message> grpcResponseRetryEvaluator = instantiatePayloadEvaluator(grpcConfig, stencilClient);
return new GrpcSink(new FirehoseInstrumentation(statsDReporter, GrpcSink.class), grpcClient, stencilClient, grpcResponseRetryEvaluator);
return new GrpcSink(new FirehoseInstrumentation(statsDReporter, GrpcSink.class), grpcClient, stencilClient, grpcConfig, grpcResponseRetryEvaluator);
}

private static PayloadEvaluator<Message> instantiatePayloadEvaluator(GrpcSinkConfig grpcSinkConfig, StencilClient stencilClient) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.gotocompany.firehose.converter;

import com.gotocompany.depot.error.ErrorType;
import com.gotocompany.firehose.config.converter.GrpcSinkRetryErrorTypeConverter;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;


public class GrpcSinkRetryErrorTypeConverterTest {
@Test
public void shouldConvertToAppropriateEnumType() {
Map<String, ErrorType> stringToExpectedValue = Arrays.stream(ErrorType.values())
.collect(Collectors.toMap(ErrorType::toString, Function.identity()));
GrpcSinkRetryErrorTypeConverter grpcSinkRetryErrorTypeConverter = new GrpcSinkRetryErrorTypeConverter();

stringToExpectedValue.keySet().stream()
.forEach(key -> {
ErrorType expectedValue = stringToExpectedValue.get(key);
ErrorType actualValue = grpcSinkRetryErrorTypeConverter.convert(null, key);
Assertions.assertEquals(expectedValue, actualValue);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ public class GrpcSinkTest {
@Before
public void setUp() {
initMocks(this);
when(grpcSinkConfig.getSinkGrpcRetryErrorType()).thenReturn(ErrorType.SINK_RETRYABLE_ERROR);
this.grpcResponsePayloadEvaluator = new GrpcResponseCelPayloadEvaluator(
GenericResponse.getDescriptor(),
"GenericResponse.success == false && GenericResponse.errors.exists(e, e.code == \"4000\")"
);
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, new DefaultGrpcResponsePayloadEvaluator());
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig, new DefaultGrpcResponsePayloadEvaluator());
}

@Test
Expand Down Expand Up @@ -102,23 +103,23 @@ public void shouldReturnBackListOfFailedMessages() throws IOException, Deseriali

@Test
public void shouldCloseStencilClient() throws IOException {
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, this.grpcResponsePayloadEvaluator);
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig, this.grpcResponsePayloadEvaluator);

sink.close();
verify(stencilClient, times(1)).close();
}

@Test
public void shouldLogWhenClosingConnection() throws IOException {
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, this.grpcResponsePayloadEvaluator);
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig, this.grpcResponsePayloadEvaluator);

sink.close();
verify(firehoseInstrumentation, times(1)).logInfo("GRPC connection closing");
}

@Test
public void shouldReturnFailedMessagesWithRetryableErrorsWhenCELExpressionMatches() throws InvalidProtocolBufferException {
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, this.grpcResponsePayloadEvaluator);
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig, this.grpcResponsePayloadEvaluator);
Message payload = new Message(new byte[]{}, new byte[]{}, "topic", 0, 1);
GenericResponse response = GenericResponse.newBuilder()
.setSuccess(false)
Expand All @@ -135,7 +136,7 @@ public void shouldReturnFailedMessagesWithRetryableErrorsWhenCELExpressionMatche
);
when(grpcClient.execute(any(), any()))
.thenReturn(dynamicMessage);
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcResponsePayloadEvaluator);
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig, grpcResponsePayloadEvaluator);

List<Message> result = sink.pushMessage(Collections.singletonList(payload));

Expand All @@ -145,7 +146,7 @@ public void shouldReturnFailedMessagesWithRetryableErrorsWhenCELExpressionMatche

@Test
public void shouldReturnFailedMessagesWithNonRetryableErrorsWhenCELExpressionDoesntMatch() throws InvalidProtocolBufferException {
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, this.grpcResponsePayloadEvaluator);
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig, this.grpcResponsePayloadEvaluator);
Message payload = new Message(new byte[]{}, new byte[]{}, "topic", 0, 1);
GenericResponse response = GenericResponse.newBuilder()
.setSuccess(false)
Expand All @@ -162,7 +163,7 @@ public void shouldReturnFailedMessagesWithNonRetryableErrorsWhenCELExpressionDoe
);
when(grpcClient.execute(any(), any()))
.thenReturn(dynamicMessage);
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcResponsePayloadEvaluator);
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig, grpcResponsePayloadEvaluator);

List<Message> result = sink.pushMessage(Collections.singletonList(payload));

Expand Down

0 comments on commit 6a706cb

Please sign in to comment.