Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Retryable Configuration for GRPC Sink (using CEL) #44

Merged
merged 39 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
a186c50
[feat] Add CEL evaluator and add it to grpc sink
ekawinataa Jul 4, 2024
c9a6ac0
Remove blank check
ekawinataa Jul 4, 2024
0cbe4fa
remove unintended change
ekawinataa Jul 4, 2024
2b46bbb
[feat] add success field checking
ekawinataa Jul 5, 2024
7cf9caf
[feat] add evaluator method
ekawinataa Jul 5, 2024
75a5b4a
[feat] handle descriptor update
ekawinataa Jul 5, 2024
a0a9e7b
Add test for evaluator
ekawinataa Jul 5, 2024
9c0eafa
Update test
ekawinataa Jul 5, 2024
9c56c64
Update SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION default value
ekawinataa Jul 5, 2024
764d0dd
Add test for GrpcSink
ekawinataa Jul 7, 2024
91ebabc
Rename descriptor to payloadDescriptor
ekawinataa Jul 7, 2024
2b4c984
Checkstyle update
ekawinataa Jul 7, 2024
927ccc3
Refactor instantiation logic to separate method
ekawinataa Jul 8, 2024
b350110
Remove schema refresh
ekawinataa Jul 8, 2024
8485e9d
Remove projectnessie and use implementation from cel-java
ekawinataa Jul 8, 2024
949dab9
update checkstyle
ekawinataa Jul 8, 2024
e4f4080
Add comment
ekawinataa Jul 8, 2024
70d3a5e
Update docs
ekawinataa Jul 8, 2024
44e3ee6
Move the evaluator instantiation to factory method
ekawinataa Jul 8, 2024
188d258
Remove unused sink config
ekawinataa Jul 9, 2024
3c33f10
Add more testcases
ekawinataa Jul 9, 2024
9f994e7
revert protoc version
ekawinataa Jul 9, 2024
61bc693
Add more test cases
ekawinataa Jul 9, 2024
cea966e
Add more comprehensive documentation
ekawinataa Jul 9, 2024
08adb94
Rename default class and update docs
ekawinataa Jul 9, 2024
a22d7ab
Refactor typical cel functionality to util class
ekawinataa Jul 12, 2024
e963eba
Add checking for expression result
ekawinataa Jul 15, 2024
93590c8
Use built in UnsupportedOperationException
ekawinataa Jul 18, 2024
b7f8cec
Update build-info-extractor
ekawinataa Jul 18, 2024
6283b9c
Update to classpath("org.jfrog.buildinfo:build-info-extractor-gradle:…
ekawinataa Jul 18, 2024
3caa4c0
Remove OperationNotSupportedException.java
ekawinataa Jul 18, 2024
7927c58
Remove jfrog build info on dependencies
ekawinataa Jul 18, 2024
02d39be
- Tidy up tests
ekawinataa Jul 19, 2024
312cc13
Bump version
ekawinataa Jul 19, 2024
6a706cb
Makes error type for retryable error configurable through env
ekawinataa Jul 19, 2024
e3a4c3f
Add 1 more test case
ekawinataa Jul 19, 2024
2faa366
Update default value
ekawinataa Jul 19, 2024
1631890
Resolve conflict
ekawinataa Jul 22, 2024
8f2d33d
Use default value of true on CEL Expression config to retry on defaul…
ekawinataa Jul 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions docs/docs/sinks/grpc-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,46 @@ Defines the amount of time (in milliseconds) gRPC clients are willing to wait fo

- Example value: `1000`
- Type: `optional`

### `SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION`

Defines the CEL(Common Expression Language) expression used to evaluate whether gRPC sink call should be retried or not based on the gRPC response.
The given CEL expression should evaluate to a boolean value. If the expression evaluates to true, the unsuccessful gRPC sink call will be retried, otherwise it won't.
Currently, we support all standard CEL macro including: has, all, exists, exists_one, map, map_filter, filter
For more information about CEL please refer to this documentation : https://github.com/google/cel-spec/blob/master/doc/langdef.md

- Example value: `com.gotocompany.generic.GrpcResponse.success == false && com.gotocompany.generic.GenericResponse.errors.exists(e, int(e.code) >= 400 && int(e.code) <= 500)`
- Type: `optional`
- Default value: ``
- Use cases :
Example response proto :
```
syntax = "proto3";
package com.gotocompany.generic;

GenericResponse {
bool success = 1;
repeated Error errors = 2;
}

Error {
string code = 1;
string reason = 2;
}
```

Example retry rule :
- Retry on specific error code : `com.gotocompany.generic.GenericResponse.errors.exists(e, e.code == "400")`
- Retry on specific error code range : `com.gotocompany.generic.GenericResponse.errors.exists(e, int(e.code) >= 400 && int(e.code) <= 500)`
- Retry on error codes outside from specific error codes : `com.gotocompany.generic.GenericResponse.errors.exists(e, !(int(e.code) in [400, 500, 600]))`
- Disable retry on all cases : `false`
- Retry on all error codes : `true`

### `SINK_GRPC_RESPONSE_RETRY_ERROR_TYPE`

Defines the ErrorType to assign for a retryable error. This is used in conjunction with `SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION` and `ERROR_TYPES_FOR_RETRY`.
Value must be defined in com.gotocompany.depot.error.ErrorType

- Example value: `SINK_RETRYABLE_ERROR`
- Type: `optional`
- Default Value: `DEFAULT_ERROR`
11 changes: 11 additions & 0 deletions src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.gotocompany.firehose.config;

import com.gotocompany.depot.error.ErrorType;
import com.gotocompany.firehose.config.converter.GrpcMetadataConverter;
import com.gotocompany.firehose.config.converter.GrpcSinkRetryErrorTypeConverter;
import org.aeonbits.owner.Config;

import java.util.Map;
Expand Down Expand Up @@ -31,6 +33,15 @@ public interface GrpcSinkConfig extends AppConfig {
@Config.Key("SINK_GRPC_ARG_DEADLINE_MS")
Long getSinkGrpcArgDeadlineMS();

@Config.Key("SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION")
@DefaultValue("true")
String getSinkGrpcResponseRetryCELExpression();

@Config.Key("SINK_GRPC_RESPONSE_RETRY_ERROR_TYPE")
@DefaultValue("DEFAULT_ERROR")
@ConverterClass(GrpcSinkRetryErrorTypeConverter.class)
ErrorType getSinkGrpcRetryErrorType();

@Key("SINK_GRPC_METADATA")
@DefaultValue("")
@ConverterClass(GrpcMetadataConverter.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.gotocompany.firehose.config.converter;

import com.gotocompany.depot.error.ErrorType;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;
import java.util.Locale;

public class GrpcSinkRetryErrorTypeConverter implements Converter<ErrorType> {
@Override
public ErrorType convert(Method method, String s) {
return ErrorType.valueOf(s.trim().toUpperCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.gotocompany.firehose.evaluator;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.gotocompany.firehose.utils.CelUtils;
import dev.cel.common.types.CelKind;
import dev.cel.compiler.CelCompiler;
import dev.cel.runtime.CelRuntime;
import dev.cel.runtime.CelRuntimeFactory;
import lombok.extern.slf4j.Slf4j;

/**
* Implementation of PayloadEvaluator that evaluates gRPC responses using CEL (Common Expression Language).
*/
@Slf4j
public class GrpcResponseCelPayloadEvaluator implements PayloadEvaluator<Message> {

private final Descriptors.Descriptor descriptor;
private CelRuntime.Program celProgram;

/**
* Constructs a GrpcResponseCelPayloadEvaluator with the specified descriptor and CEL expression.
*
* @param descriptor the descriptor of the gRPC message
* @param celExpression the CEL expression to evaluate against the message
*/
public GrpcResponseCelPayloadEvaluator(Descriptors.Descriptor descriptor, String celExpression) {
this.descriptor = descriptor;
buildCelEnvironment(celExpression);
}

/**
* Evaluates the given gRPC message payload using the CEL program.
*
* @param payload the gRPC message to be evaluated
* @return true if the payload passes the evaluation, false otherwise
*/
@Override
public boolean evaluate(Message payload) {
if (!descriptor.getFullName().equals(payload.getDescriptorForType().getFullName())) {
throw new IllegalArgumentException(String.format("Payload %s does not match descriptor %s",
payload.getDescriptorForType().getFullName(), descriptor.getFullName()));
}
return (boolean) CelUtils.evaluate(this.celProgram, payload);
}

/**
* Builds the CEL environment required to evaluate the CEL expression.
*
* @param celExpression the CEL expression to evaluate against the message
* @throws IllegalArgumentException if the CEL expression is invalid or if the evaluator cannot be constructed
*/
private void buildCelEnvironment(String celExpression) {
CelCompiler celCompiler = CelUtils.initializeCelCompiler(this.descriptor);
CelRuntime celRuntime = CelRuntimeFactory.standardCelRuntimeBuilder()
.build();
this.celProgram = CelUtils.initializeCelProgram(celExpression, celRuntime, celCompiler,
celType -> celType.kind().equals(CelKind.BOOL));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.gotocompany.firehose.evaluator;

/**
* A generic interface for evaluating payloads.
*
* @param <T> the type of payload to be evaluated
*/
public interface PayloadEvaluator<T> {
/**
* Evaluates the given payload.
*
* @param payload the payload to be evaluated
* @return true if the payload passes the evaluation, false otherwise
*/
boolean evaluate(T payload);
}
29 changes: 26 additions & 3 deletions src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSink.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package com.gotocompany.firehose.sink.grpc;



import com.gotocompany.depot.error.ErrorInfo;
import com.gotocompany.depot.error.ErrorType;
import com.gotocompany.firehose.config.GrpcSinkConfig;
import com.gotocompany.firehose.evaluator.PayloadEvaluator;
import com.gotocompany.firehose.exception.DefaultException;
import com.gotocompany.firehose.exception.DeserializerException;
import com.gotocompany.firehose.message.Message;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
Expand All @@ -21,13 +25,21 @@
public class GrpcSink extends AbstractSink {

private final GrpcClient grpcClient;
private final StencilClient stencilClient;
private final GrpcSinkConfig grpcSinkConfig;
private List<Message> messages;
private StencilClient stencilClient;
private PayloadEvaluator<com.google.protobuf.Message> retryEvaluator;

public GrpcSink(FirehoseInstrumentation firehoseInstrumentation, GrpcClient grpcClient, StencilClient stencilClient) {
public GrpcSink(FirehoseInstrumentation firehoseInstrumentation,
GrpcClient grpcClient,
StencilClient stencilClient,
GrpcSinkConfig grpcSinkConfig,
PayloadEvaluator<com.google.protobuf.Message> retryEvaluator) {
super(firehoseInstrumentation, "grpc");
this.grpcClient = grpcClient;
this.stencilClient = stencilClient;
this.grpcSinkConfig = grpcSinkConfig;
this.retryEvaluator = retryEvaluator;
}

@Override
Expand All @@ -43,6 +55,7 @@ protected List<Message> execute() throws Exception {
if (!success) {
getFirehoseInstrumentation().logWarn("Grpc Service returned error");
failedMessages.add(message);
setRetryableErrorInfo(message, response);
}
}
getFirehoseInstrumentation().logDebug("Failed messages count: {}", failedMessages.size());
Expand All @@ -60,4 +73,14 @@ public void close() throws IOException {
this.messages = new ArrayList<>();
stencilClient.close();
}

private void setRetryableErrorInfo(Message message, DynamicMessage dynamicMessage) {
boolean eligibleToRetry = retryEvaluator.evaluate(dynamicMessage);
if (eligibleToRetry) {
getFirehoseInstrumentation().logDebug("Retrying grpc service");
message.setErrorInfo(new ErrorInfo(new DefaultException("Retryable gRPC Error"), grpcSinkConfig.getSinkGrpcRetryErrorType()));
return;
}
message.setErrorInfo(new ErrorInfo(new DefaultException("Non Retryable gRPC Error"), ErrorType.SINK_NON_RETRYABLE_ERROR));
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.gotocompany.firehose.sink.grpc;


import com.google.protobuf.Message;
import com.gotocompany.firehose.config.AppConfig;
import com.gotocompany.firehose.config.GrpcSinkConfig;
import com.gotocompany.firehose.evaluator.GrpcResponseCelPayloadEvaluator;
import com.gotocompany.firehose.evaluator.PayloadEvaluator;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
import com.gotocompany.firehose.proto.ProtoToMetadataMapper;
import com.gotocompany.firehose.sink.grpc.client.GrpcClient;
Expand Down Expand Up @@ -32,15 +35,21 @@ public static AbstractSink create(Map<String, String> configuration, StatsDRepor
grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort(), grpcConfig.getSinkGrpcMethodUrl(), grpcConfig.getSinkGrpcResponseSchemaProtoClass());
firehoseInstrumentation.logDebug(grpcSinkConfig);
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort())
.keepAliveTime(grpcConfig.getSinkGrpcArgKeepaliveTimeMS(), TimeUnit.MILLISECONDS)
.keepAliveTimeout(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS(), TimeUnit.MILLISECONDS)
.usePlaintext().build();
.keepAliveTime(grpcConfig.getSinkGrpcArgKeepaliveTimeMS(), TimeUnit.MILLISECONDS)
.keepAliveTimeout(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS(), TimeUnit.MILLISECONDS)
.usePlaintext().build();
AppConfig appConfig = ConfigFactory.create(AppConfig.class, configuration);
ProtoToMetadataMapper protoToMetadataMapper = new ProtoToMetadataMapper(stencilClient.get(appConfig.getInputSchemaProtoClass()), grpcConfig.getSinkGrpcMetadata());
GrpcClient grpcClient = new GrpcClient(new FirehoseInstrumentation(statsDReporter, GrpcClient.class), grpcConfig, managedChannel, stencilClient, protoToMetadataMapper);
firehoseInstrumentation.logInfo("GRPC connection established");
PayloadEvaluator<Message> grpcResponseRetryEvaluator = instantiatePayloadEvaluator(grpcConfig, stencilClient);
return new GrpcSink(new FirehoseInstrumentation(statsDReporter, GrpcSink.class), grpcClient, stencilClient, grpcConfig, grpcResponseRetryEvaluator);
}

return new GrpcSink(new FirehoseInstrumentation(statsDReporter, GrpcSink.class), grpcClient, stencilClient);
private static PayloadEvaluator<Message> instantiatePayloadEvaluator(GrpcSinkConfig grpcSinkConfig, StencilClient stencilClient) {
return new GrpcResponseCelPayloadEvaluator(
stencilClient.get(grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass()),
grpcSinkConfig.getSinkGrpcResponseRetryCELExpression());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.gotocompany.firehose.converter;

import com.gotocompany.depot.error.ErrorType;
import com.gotocompany.firehose.config.converter.GrpcSinkRetryErrorTypeConverter;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;


public class GrpcSinkRetryErrorTypeConverterTest {
@Test
public void shouldConvertToAppropriateEnumType() {
Map<String, ErrorType> stringToExpectedValue = Arrays.stream(ErrorType.values())
.collect(Collectors.toMap(ErrorType::toString, Function.identity()));
GrpcSinkRetryErrorTypeConverter grpcSinkRetryErrorTypeConverter = new GrpcSinkRetryErrorTypeConverter();

stringToExpectedValue.keySet().stream()
.forEach(key -> {
ErrorType expectedValue = stringToExpectedValue.get(key);
ErrorType actualValue = grpcSinkRetryErrorTypeConverter.convert(null, key);
Assertions.assertEquals(expectedValue, actualValue);
});
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowExceptionForInvalidValue() {
GrpcSinkRetryErrorTypeConverter grpcSinkRetryErrorTypeConverter = new GrpcSinkRetryErrorTypeConverter();
grpcSinkRetryErrorTypeConverter.convert(null, "ErrorType.UNREGISTERED");
}
}
Loading
Loading