Skip to content

Commit

Permalink
feat: Dynamic GRPC Metadata (#46)
Browse files Browse the repository at this point in the history
Criteria :

Enable dynamic gRPC metadata based on payload
Payload is parsed using CEL expression and mapped to simple key value header
Metadata could contains both static and dynamic key-value
Using CEL Expression to parse the Proto payload, for details please refer https://github.com/google/cel-spec/blob/master/doc/langdef.md. The reason of using CEL expression is because it natively support proto/DynamicMessage without the need to parse it to JSON first
Should use full qualified proto class name for the CEL Expression
Only supports primitive value as the key and value for the metadata
Reason for change :

Some of EGLC users that want to migrate are expecting Firehose to have the feature to insert gRPC metadata based on the payload/event
Configuration would be something like this :

SINK_GRPC_METADATA=staticKey : $Payload.field_one,Payload.field_two: $Payload.fields[0].name

Commits: 
* Initial commit

* streamline Cel program pipeline

* Refactor code

* trim metadata key

* Rename method

* Add assertions for buildGrpcMetadata

* Add tests

* Add tests and refactor classes

* Add proto for test

* Add mock config

* Add more test

* Refactor to use existing config

* Add test for empty config

* Test unmapped field

* Update grpc-sink.md

* Rename several variables to be more clear

* Add inline documentation

* Refactor several CEL functionalities to the utility class

* Refactor naming

* Checkstyle

* - Add more unit test
- Add checking on initialization for type support

* - Checkstyle

* Use built in exception for unsupported operation

* Update to classpath("org.jfrog.buildinfo:build-info-extractor-gradle:4.33.1")

* Remove parentheses on classpath

* - Reduce try catch block
- Use annotation to assert exception in test

* Bump version

* Bump version
  • Loading branch information
ekawinataa authored Jul 19, 2024
1 parent 5178fd9 commit a368c89
Show file tree
Hide file tree
Showing 13 changed files with 415 additions and 46 deletions.
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ buildscript {
}
dependencies {
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.17'
classpath "org.jfrog.buildinfo:build-info-extractor-gradle:4.4.7"
classpath "org.jfrog.buildinfo:build-info-extractor-gradle:4.33.1"
classpath "org.ajoberstar:gradle-git:1.6.0"
}
}
Expand Down Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'com.gotocompany'
version '0.10.3'
version '0.10.4'

def projName = "firehose"

Expand Down Expand Up @@ -87,7 +87,6 @@ dependencies {
}
implementation 'io.confluent:monitoring-interceptors:3.0.0'
implementation 'io.grpc:grpc-all:1.53.0'
implementation group: 'org.jfrog.buildinfo', name: 'build-info-extractor', version: '2.6.3'
implementation group: 'com.google.gradle', name: 'osdetector-gradle-plugin', version: '1.2.1'
implementation group: 'org.apache.ivy', name: 'ivy', version: '2.2.0'
implementation group: 'org.mongodb', name: 'mongo-java-driver', version: '3.12.8'
Expand All @@ -103,6 +102,7 @@ dependencies {
implementation 'org.apache.logging.log4j:log4j-core:2.20.0'
implementation group: 'com.gotocompany', name: 'depot', version: '0.9.1'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'
implementation 'dev.cel:cel:0.5.2'

testImplementation group: 'junit', name: 'junit', version: '4.11'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
Expand Down
23 changes: 20 additions & 3 deletions docs/docs/sinks/grpc-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,29 @@ Defines the URL of the GRPC method that needs to be called.

### `SINK_GRPC_METADATA`

Defines the GRPC additional static Metadata that allows clients to provide information to server that is associated with an RPC.
Defines the GRPC additional static and dynamic Metadata that allows clients to provide information to server that is associated with an RPC.
Dynamic metadata is populated by using CEL expression applied to the input payload. CEL expression should be flagged by '$' and use fully qualified package name.
Config format is CSV key-value pair, separated by colon. String, numeric, boolean are the dynamic values supported. Refer to official CEL documentation https://github.com/google/cel-spec.

Note - final metadata will be generated with merging static metadata and the kafka record header.
Note - final metadata will be generated with merging metadata and the kafka record header.

- Example value: `authorization:token,dlq:true`
- Example value: `authorization:token,dlq:true,$com.goto.company.GenericPayload.field:staticvalue,$com.goto.company.GenericPayload.field_two:$(com.goto.company.GenericPayload.id + '' + com.goto.company.GenericPayload.code)`
- Type: `optional`
- Use case :
Example Proto
```
package com.goto.company
message GenericPayload {
string field = "field_name";
string field_two = "field_two";
string id = "123";
int code = 400;
}
```
Example config : `$com.goto.company.GenericPayload.field: $(com.goto.company.GenericPayload.field_two + '_' + string(com.goto.company.GenericPayload.code))`
This would result in : `field_name:field_two_400`


### `SINK_GRPC_RESPONSE_SCHEMA_PROTO_CLASS`

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.gotocompany.firehose.config;

import com.gotocompany.firehose.config.converter.GrpcMetadataConverter;
import io.grpc.Metadata;
import org.aeonbits.owner.Config;

import java.util.Map;


public interface GrpcSinkConfig extends AppConfig {

Expand Down Expand Up @@ -33,5 +34,6 @@ public interface GrpcSinkConfig extends AppConfig {
@Key("SINK_GRPC_METADATA")
@DefaultValue("")
@ConverterClass(GrpcMetadataConverter.class)
Metadata getSinkGrpcMetadata();
Map<String, String> getSinkGrpcMetadata();

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,36 @@
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;
import java.util.AbstractMap;
import java.util.Arrays;
import io.grpc.Metadata;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public class GrpcMetadataConverter implements Converter<Metadata> {
import org.apache.commons.lang.StringUtils;

public class GrpcMetadataConverter implements Converter<Map<String, String>> {

private static final String PAIR_DELIMITER = ",";
private static final String KEY_VALUE_DELIMITER = ":";
private static final int KEY_INDEX = 0;
private static final int VALUE_INDEX = 1;

@Override
public Metadata convert(Method method, String input) {
Metadata metadata = new Metadata();
Arrays.stream(input.split(","))
.filter(metadataKeyValue -> !metadataKeyValue.trim().isEmpty())
.map(metadataKeyValue -> metadataKeyValue.split(":", 2))
.forEach(keyValue -> {
if (keyValue.length != 2) {
throw new IllegalArgumentException(String.format("provided metadata %s is invalid", input));
public Map<String, String> convert(Method method, String input) {
if (StringUtils.isBlank(input)) {
return new HashMap<>();
}
return Arrays.stream(input.split(PAIR_DELIMITER))
.filter(StringUtils::isNotBlank)
.map(pair -> {
String[] split = pair.split(KEY_VALUE_DELIMITER);
if (split.length < 2 || StringUtils.isBlank(split[KEY_INDEX])) {
throw new IllegalArgumentException("Invalid metadata entry: " + pair);
}
metadata.put(Metadata.Key.of(keyValue[0].trim(), Metadata.ASCII_STRING_MARSHALLER), keyValue[1].trim());
});

return metadata;
return new AbstractMap.SimpleEntry<>(split[KEY_INDEX].trim(), split[VALUE_INDEX].trim());
})
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package com.gotocompany.firehose.proto;

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.gotocompany.firehose.exception.DeserializerException;
import com.gotocompany.firehose.utils.CelUtils;
import dev.cel.compiler.CelCompiler;
import dev.cel.runtime.CelRuntime;
import dev.cel.runtime.CelRuntimeFactory;
import io.grpc.Metadata;
import org.apache.commons.collections.MapUtils;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Class responsible for mapping Protobuf messages to gRPC metadata using CEL expressions.
*/
public class ProtoToMetadataMapper {

private static final Pattern CEL_EXPRESSION_MARKER = Pattern.compile("^\\$(.+)");
private static final int EXACT_CEL_EXPRESSION_GROUP_INDEX = 1;

private final Map<String, CelRuntime.Program> celExpressionToProgramMap;
private final Map<String, String> metadataTemplate;
private final Descriptors.Descriptor descriptor;

/**
* Constructor for ProtoToMetadataMapper.
*
* @param descriptor the Protobuf descriptor of the message type
* @param metadataTemplate a map of metadata keys and values that may contain CEL expressions
*/
public ProtoToMetadataMapper(Descriptors.Descriptor descriptor, Map<String, String> metadataTemplate) {
this.metadataTemplate = metadataTemplate;
this.descriptor = descriptor;
this.celExpressionToProgramMap = initializeCelPrograms();
}

/**
* Builds gRPC metadata from a Protobuf message in byte array format.
*
* @param message the Protobuf message as a byte array
* @return gRPC metadata
* @throws DeserializerException if the Protobuf message cannot be parsed
*/
public Metadata buildGrpcMetadata(byte[] message) {
if (MapUtils.isEmpty(metadataTemplate)) {
return new Metadata();
}
try {
return buildGrpcMetadata(DynamicMessage.parseFrom(descriptor, message));
} catch (InvalidProtocolBufferException e) {
throw new DeserializerException("Failed to parse protobuf message", e);
}
}

/**
* Builds gRPC metadata from a Protobuf message.
*
* @param message the Protobuf message
* @return gRPC metadata
*/
private Metadata buildGrpcMetadata(Message message) {
Metadata metadata = new Metadata();
for (Map.Entry<String, String> entry : metadataTemplate.entrySet()) {
String updatedKey = evaluateExpression(entry.getKey(), message).toString();
Object updatedValue = evaluateExpression(entry.getValue(), message);
metadata.put(Metadata.Key.of(updatedKey.trim(), Metadata.ASCII_STRING_MARSHALLER), updatedValue.toString());
}
return metadata;
}

/**
* Evaluates a CEL expression or returns the input string if it's not a CEL expression.
*
* @param input the expression to evaluate
* @param message the Protobuf message used for evaluation
* @return the evaluated result or the original expression if not a CEL expression
*/
private Object evaluateExpression(String input, Message message) {
Matcher matcher = CEL_EXPRESSION_MARKER.matcher(input);
if (!matcher.find()) {
return input;
}
String celExpression = matcher.group(EXACT_CEL_EXPRESSION_GROUP_INDEX);
return Optional.ofNullable(celExpressionToProgramMap.get(celExpression))
.map(program -> CelUtils.evaluate(program, message)).orElse(input);
}

/**
* Initializes CEL programs for the metadata template.
*
* @return a map of CEL expressions to their corresponding programs
*/
private Map<String, CelRuntime.Program> initializeCelPrograms() {
CelRuntime celRuntime = CelRuntimeFactory.standardCelRuntimeBuilder().build();
CelCompiler celCompiler = CelUtils.initializeCelCompiler(this.descriptor);
return this.metadataTemplate.entrySet()
.stream()
.filter(entry -> Objects.nonNull(entry.getValue()))
.flatMap(entry -> Stream.of(entry.getKey(), entry.getValue()))
.map(string -> {
Matcher matcher = CEL_EXPRESSION_MARKER.matcher(string);
if (matcher.find()) {
return matcher.group(EXACT_CEL_EXPRESSION_GROUP_INDEX);
}
return null;
})
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toMap(Function.identity(), celExpression ->
CelUtils.initializeCelProgram(celExpression, celRuntime, celCompiler, celType -> celType.kind()
.isPrimitive())));
}

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.gotocompany.firehose.sink.grpc;


import com.gotocompany.firehose.config.AppConfig;
import com.gotocompany.firehose.config.GrpcSinkConfig;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
import com.gotocompany.firehose.proto.ProtoToMetadataMapper;
import com.gotocompany.firehose.sink.grpc.client.GrpcClient;
import com.gotocompany.depot.metrics.StatsDReporter;
import com.gotocompany.firehose.sink.AbstractSink;
Expand All @@ -29,13 +31,13 @@ public static AbstractSink create(Map<String, String> configuration, StatsDRepor
String grpcSinkConfig = String.format("\n\tService host: %s\n\tService port: %s\n\tMethod url: %s\n\tResponse proto schema: %s",
grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort(), grpcConfig.getSinkGrpcMethodUrl(), grpcConfig.getSinkGrpcResponseSchemaProtoClass());
firehoseInstrumentation.logDebug(grpcSinkConfig);

ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort())
.keepAliveTime(grpcConfig.getSinkGrpcArgKeepaliveTimeMS(), TimeUnit.MILLISECONDS)
.keepAliveTimeout(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS(), TimeUnit.MILLISECONDS)
.usePlaintext().build();

GrpcClient grpcClient = new GrpcClient(new FirehoseInstrumentation(statsDReporter, GrpcClient.class), grpcConfig, managedChannel, stencilClient);
AppConfig appConfig = ConfigFactory.create(AppConfig.class, configuration);
ProtoToMetadataMapper protoToMetadataMapper = new ProtoToMetadataMapper(stencilClient.get(appConfig.getInputSchemaProtoClass()), grpcConfig.getSinkGrpcMetadata());
GrpcClient grpcClient = new GrpcClient(new FirehoseInstrumentation(statsDReporter, GrpcClient.class), grpcConfig, managedChannel, stencilClient, protoToMetadataMapper);
firehoseInstrumentation.logInfo("GRPC connection established");

return new GrpcSink(new FirehoseInstrumentation(statsDReporter, GrpcSink.class), grpcClient, stencilClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.gotocompany.firehose.metrics.Metrics;

import com.gotocompany.firehose.proto.ProtoToMetadataMapper;
import io.grpc.CallOptions;
import io.grpc.Metadata;
import io.grpc.Channel;
Expand Down Expand Up @@ -38,24 +39,28 @@ public class GrpcClient {
private ManagedChannel managedChannel;
private final MethodDescriptor<byte[], byte[]> methodDescriptor;
private final DynamicMessage emptyResponse;
private final Metadata grpcStaticMetadata;
private final ProtoToMetadataMapper protoToMetadataMapper;

public GrpcClient(FirehoseInstrumentation firehoseInstrumentation, GrpcSinkConfig grpcSinkConfig, ManagedChannel managedChannel, StencilClient stencilClient) {
public GrpcClient(FirehoseInstrumentation firehoseInstrumentation,
GrpcSinkConfig grpcSinkConfig,
ManagedChannel managedChannel,
StencilClient stencilClient,
ProtoToMetadataMapper protoToMetadataMapper) {
this.firehoseInstrumentation = firehoseInstrumentation;
this.grpcSinkConfig = grpcSinkConfig;
this.stencilClient = stencilClient;
this.managedChannel = managedChannel;
this.protoToMetadataMapper = protoToMetadataMapper;
MethodDescriptor.Marshaller<byte[]> marshaller = getMarshaller();
this.methodDescriptor = MethodDescriptor.newBuilder(marshaller, marshaller)
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(grpcSinkConfig.getSinkGrpcMethodUrl())
.build();
this.emptyResponse = DynamicMessage.newBuilder(this.stencilClient.get(this.grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass())).build();
this.grpcStaticMetadata = grpcSinkConfig.getSinkGrpcMetadata();
}

public DynamicMessage execute(byte[] logMessage, Headers headers) {
Metadata metadata = buildMetadata(headers);
Metadata metadata = buildMetadata(headers, logMessage);
try {
Channel decoratedChannel = ClientInterceptors.intercept(managedChannel,
MetadataUtils.newAttachHeadersInterceptor(metadata));
Expand All @@ -78,12 +83,13 @@ public DynamicMessage execute(byte[] logMessage, Headers headers) {
return emptyResponse;
}

protected Metadata buildMetadata(Headers headers) {
protected Metadata buildMetadata(Headers headers, byte[] logMessage) {
Metadata metadata = new Metadata();
for (Header header : headers) {
metadata.put(Metadata.Key.of(header.key(), Metadata.ASCII_STRING_MARSHALLER), new String(header.value()));
}
metadata.merge(grpcStaticMetadata);
Metadata externalizedMetadata = protoToMetadataMapper.buildGrpcMetadata(logMessage);
metadata.merge(externalizedMetadata);
return metadata;
}

Expand Down
Loading

0 comments on commit a368c89

Please sign in to comment.