Skip to content

Commit

Permalink
ann configurable CE metadata extension prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
cchayka-campsci committed Nov 13, 2024
1 parent 1616ec3 commit fad5d55
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
public class CloudEventOverridesMutator implements CloudEventMutator {

private final DataPlaneContract.CloudEventOverrides cloudEventOverrides;
private final String ceMetadataExtensionPrefix;

public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cloudEventOverrides) {
public CloudEventOverridesMutator(
final DataPlaneContract.CloudEventOverrides cloudEventOverrides, final String ceMetadataExtensionPrefix) {
this.cloudEventOverrides = cloudEventOverrides;
this.ceMetadataExtensionPrefix = ceMetadataExtensionPrefix;
}

@Override
Expand All @@ -49,7 +52,7 @@ private void applyCloudEventOverrides(CloudEventBuilder builder) {
}

private void applyKafkaMetadata(CloudEventBuilder builder, Number partition, Number offset) {
builder.withExtension("knativekafkapartition", partition);
builder.withExtension("knativekafkaoffset", offset);
builder.withExtension(ceMetadataExtensionPrefix + "partition", partition);
builder.withExtension(ceMetadataExtensionPrefix + "offset", offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ private void build(final Vertx vertx, final ConsumerVerticle consumerVerticle, f
TracingPolicy.PROPAGATE),
Metrics.getRegistry()),
new CloudEventOverridesMutator(
consumerVerticleContext.getResource().getCloudEventOverrides()));
consumerVerticleContext.getResource().getCloudEventOverrides(),
consumerVerticleContext.getCeMetadataExtensionPrefix()));
consumerVerticle.setRecordDispatcher(recordDispatcher);

final var partitionRevokedHandlers =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class ConsumerVerticleContext {

private Tags tags;

private String ceMetadataExtensionPrefix;

public ConsumerVerticleContext withConsumerConfigs(final Map<String, Object> consumerConfigs) {
this.consumerConfigs = new HashMap<>(consumerConfigs);
return this;
Expand Down Expand Up @@ -170,6 +172,11 @@ public ConsumerVerticleContext withProducerFactory(
return this;
}

public ConsumerVerticleContext withCeMetadataExtensionPrefix(final String ceMetadataExtensionPrefix) {
this.ceMetadataExtensionPrefix = ceMetadataExtensionPrefix;
return this;
}

public ConsumerVerticleContext withEventTypeListerFactory(EventTypeListerFactory eventTypeListerFactory) {
this.eventTypeListerFactory = eventTypeListerFactory;
return this;
Expand Down Expand Up @@ -247,6 +254,10 @@ public ReactiveProducerFactory<String, CloudEvent> getProducerFactory() {
return this.producerFactory;
}

public String getCeMetadataExtensionPrefix() {
return this.ceMetadataExtensionPrefix;
}

public EventTypeCreator getEventTypeCreator() {
return this.eventTypeCreator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class ConsumerVerticleFactoryImpl implements ConsumerVerticleFactory {
private final ReactiveProducerFactory reactiveProducerFactory;
private final EventTypeCreator eventTypeCreator;
private final EventTypeListerFactory eventTypeListerFactory;
private final String ceMetadataExtensionPrefix;

/**
* All args constructor.
Expand All @@ -62,7 +63,8 @@ public ConsumerVerticleFactoryImpl(
final ReactiveConsumerFactory reactiveConsumerFactory,
final ReactiveProducerFactory reactiveProducerFactory,
EventTypeCreator eventTypeCreator,
EventTypeListerFactory eventTypeListerFactory) {
EventTypeListerFactory eventTypeListerFactory,
final String ceMetadataExtensionPrefix) {
this.eventTypeCreator = eventTypeCreator;
this.eventTypeListerFactory = eventTypeListerFactory;

Expand All @@ -83,6 +85,7 @@ public ConsumerVerticleFactoryImpl(
this.metricsRegistry = metricsRegistry;
this.reactiveConsumerFactory = reactiveConsumerFactory;
this.reactiveProducerFactory = reactiveProducerFactory;
this.ceMetadataExtensionPrefix = ceMetadataExtensionPrefix;
}

/**
Expand All @@ -103,7 +106,8 @@ public ConsumerVerticle get(final EgressContext egressContext) {
.withConsumerFactory(reactiveConsumerFactory)
.withProducerFactory(reactiveProducerFactory)
.withEventTypeCreator(eventTypeCreator)
.withEventTypeListerFactory(eventTypeListerFactory))
.withEventTypeListerFactory(eventTypeListerFactory)
.withCeMetadataExtensionPrefix(ceMetadataExtensionPrefix))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package dev.knative.eventing.kafka.broker.dispatcher.main;

import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse;

import dev.knative.eventing.kafka.broker.core.utils.BaseEnv;
import java.util.function.Function;
Expand All @@ -31,12 +32,17 @@ public class DispatcherEnv extends BaseEnv {
public static final String EGRESSES_INITIAL_CAPACITY = "EGRESSES_INITIAL_CAPACITY";
private final int egressesInitialCapacity;

public static final String CE_METADATA_EXTENSION_PREFIX = "CE_METADATA_EXTENSION_PREFIX";
private final String ceMetadataExtensionPrefix;

public DispatcherEnv(Function<String, String> envProvider) {
super(envProvider);

this.consumerConfigFilePath = requireNonNull(envProvider.apply(CONSUMER_CONFIG_FILE_PATH));
this.webClientConfigFilePath = requireNonNull(envProvider.apply(WEBCLIENT_CONFIG_FILE_PATH));
this.egressesInitialCapacity = Integer.parseInt(requireNonNull(envProvider.apply(EGRESSES_INITIAL_CAPACITY)));
this.ceMetadataExtensionPrefix =
requireNonNullElse(envProvider.apply(CE_METADATA_EXTENSION_PREFIX), "knativekafka");
}

public String getConsumerConfigFilePath() {
Expand All @@ -51,6 +57,10 @@ public int getEgressesInitialCapacity() {
return egressesInitialCapacity;
}

public String getCeMetadataExtensionPrefix() {
return ceMetadataExtensionPrefix;
}

@Override
public String toString() {
return "DispatcherEnv{" + "consumerConfigFilePath='"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ public static void start(
reactiveConsumerFactory,
reactiveProducerFactory,
eventTypeCreator,
eventTypeListerFactory),
eventTypeListerFactory,
env.getCeMetadataExtensionPrefix()),
env.getEgressesInitialCapacity());

// Deploy the consumer deployer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ public void shouldAddExtensions() {
final var ceOverrides = DataPlaneContract.CloudEventOverrides.newBuilder()
.putAllExtensions(extensions)
.build();
final var ceMetadataExtensionPrefix = "knativekafka";

final var mutator = new CloudEventOverridesMutator(ceOverrides);
final var mutator = new CloudEventOverridesMutator(ceOverrides, ceMetadataExtensionPrefix);

final var given = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
Expand All @@ -50,8 +51,8 @@ public void shouldAddExtensions() {

final var expected = CloudEventBuilder.from(given);
extensions.forEach(expected::withExtension);
expected.withExtension("knativekafkaoffset", 1L);
expected.withExtension("knativekafkapartition", 1);
expected.withExtension(ceMetadataExtensionPrefix + "offset", 1L);
expected.withExtension(ceMetadataExtensionPrefix + "partition", 1);

final var got = mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given));

Expand All @@ -67,7 +68,7 @@ public void shouldNotThrowOnInvalidCloudEvent() {
.putAllExtensions(extensions)
.build();

final var mutator = new CloudEventOverridesMutator(ceOverrides);
final var mutator = new CloudEventOverridesMutator(ceOverrides, "knativekafka");

final var given = new InvalidCloudEvent(null);

Expand All @@ -87,8 +88,9 @@ public void shouldAddKafkaExtensionsWhenNoOverrides() {
final var ceOverrides = DataPlaneContract.CloudEventOverrides.newBuilder()
.putAllExtensions(Map.of())
.build();
final var ceMetadataExtensionPrefix = "knativekafka";

final var mutator = new CloudEventOverridesMutator(ceOverrides);
final var mutator = new CloudEventOverridesMutator(ceOverrides, ceMetadataExtensionPrefix);

final var given = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
Expand All @@ -98,8 +100,8 @@ public void shouldAddKafkaExtensionsWhenNoOverrides() {
.build();

final var expected = CloudEventBuilder.from(given)
.withExtension("knativekafkaoffset", 1L)
.withExtension("knativekafkapartition", 1)
.withExtension(ceMetadataExtensionPrefix + "offset", 1L)
.withExtension(ceMetadataExtensionPrefix + "partition", 1)
.build();

final var got = mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public void shouldAlwaysSucceed() {
producerConfigs.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class.getName());
producerConfigs.setProperty(INTERCEPTOR_CLASSES_CONFIG, PartitionKeyExtensionInterceptor.class.getName());

final var ceMetadataExtensionPrefix = "knativekafka";

final var verticleFactory = new ConsumerVerticleFactoryImpl(
consumerProperties,
new WebClientOptions(),
Expand All @@ -79,7 +81,8 @@ public void shouldAlwaysSucceed() {
new MockReactiveConsumerFactory<>(),
new dev.knative.eventing.kafka.broker.receiver.MockReactiveProducerFactory<>(),
mock(EventTypeCreator.class),
mock(EventTypeListerFactory.class));
mock(EventTypeListerFactory.class),
ceMetadataExtensionPrefix);

final var egress = DataPlaneContract.Egress.newBuilder()
.setConsumerGroup("1234")
Expand Down Expand Up @@ -117,6 +120,8 @@ public void shouldAlwaysSucceedWhenPassingResourceReference() {
producerConfigs.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class.getName());
producerConfigs.setProperty(INTERCEPTOR_CLASSES_CONFIG, PartitionKeyExtensionInterceptor.class.getName());

final var ceMetadataExtensionPrefix = "knativekafka";

final var verticleFactory = new ConsumerVerticleFactoryImpl(
consumerProperties,
new WebClientOptions(),
Expand All @@ -126,7 +131,8 @@ public void shouldAlwaysSucceedWhenPassingResourceReference() {
new MockReactiveConsumerFactory<>(),
new MockReactiveProducerFactory<>(),
mock(EventTypeCreator.class),
mock(EventTypeListerFactory.class));
mock(EventTypeListerFactory.class),
ceMetadataExtensionPrefix);

final var egress = DataPlaneContract.Egress.newBuilder()
.setConsumerGroup("1234")
Expand Down Expand Up @@ -168,6 +174,8 @@ public void shouldNotThrowIllegalArgumentExceptionIfNotDeadLetterSink() {
producerConfigs.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class.getName());
producerConfigs.setProperty(INTERCEPTOR_CLASSES_CONFIG, PartitionKeyExtensionInterceptor.class.getName());

final var ceMetadataExtensionPrefix = "knativekafka";

final var verticleFactory = new ConsumerVerticleFactoryImpl(
consumerProperties,
new WebClientOptions(),
Expand All @@ -177,7 +185,8 @@ public void shouldNotThrowIllegalArgumentExceptionIfNotDeadLetterSink() {
new MockReactiveConsumerFactory<>(),
new MockReactiveProducerFactory<>(),
mock(EventTypeCreator.class),
mock(EventTypeListerFactory.class));
mock(EventTypeListerFactory.class),
ceMetadataExtensionPrefix);

final var egress = DataPlaneContract.Egress.newBuilder()
.setConsumerGroup("1234")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ private ConsumerDeployerVerticle setUpDispatcher(final Vertx vertx, final VertxT
NullCloudEventInterceptor.class.getName() + "," + InvalidCloudEventInterceptor.class.getName());

final var producerConfigs = producerConfigs();
final var ceMetadataExtensionPrefix = "knativekafka";

final var consumerVerticleFactory = new ConsumerVerticleFactoryImpl(
consumerConfigs,
Expand All @@ -363,7 +364,8 @@ private ConsumerDeployerVerticle setUpDispatcher(final Vertx vertx, final VertxT
getReactiveConsumerFactory(),
getReactiveProducerFactory(),
mock(EventTypeCreator.class),
mock(EventTypeListerFactory.class));
mock(EventTypeListerFactory.class),
ceMetadataExtensionPrefix);

final var verticle = new ConsumerDeployerVerticle(consumerVerticleFactory, 10);

Expand Down

0 comments on commit fad5d55

Please sign in to comment.