Skip to content

Commit

Permalink
Support extract-headers kafka message transform (#1177)
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek authored Aug 2, 2024
1 parent 36016d7 commit 153aac1
Show file tree
Hide file tree
Showing 13 changed files with 300 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ private <C> KafkaOptionsConfigBuilder<C> injectKafkaTopicOptions(
options
.topic()
.name(channel.address)
.headers(topic.headers)
.transforms()
.extractHeaders(topic.transforms.extractHeaders)
.build()
.inject(t -> injectKafkaTopicKey(t, channel))
.inject(t -> injectKafkaTopicValue(t, channel))
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.aklivity.zilla.runtime.binding.kafka.config;

import java.util.List;
import java.util.Objects;
import java.util.function.Function;

Expand All @@ -30,7 +29,7 @@ public class KafkaTopicConfig
public final KafkaDeltaType deltaType;
public final ModelConfig key;
public final ModelConfig value;
public final List<KafkaTopicHeaderType> headers;
public final KafkaTopicTransformsConfig transforms;

public static KafkaTopicConfigBuilder<KafkaTopicConfig> builder()
{
Expand All @@ -49,14 +48,14 @@ public static <T> KafkaTopicConfigBuilder<T> builder(
KafkaDeltaType deltaType,
ModelConfig key,
ModelConfig value,
List<KafkaTopicHeaderType> headers)
KafkaTopicTransformsConfig transforms)
{
this.name = name;
this.defaultOffset = defaultOffset;
this.deltaType = deltaType;
this.key = key;
this.value = value;
this.headers = headers;
this.transforms = transforms;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
*/
package io.aklivity.zilla.runtime.binding.kafka.config;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaDeltaType;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetType;
Expand All @@ -28,29 +24,18 @@

public final class KafkaTopicConfigBuilder<T> extends ConfigBuilder<T, KafkaTopicConfigBuilder<T>>
{
private static final String PATH = "^\\$\\{message\\.value\\.([A-Za-z_][A-Za-z0-9_]*)\\}$";
private static final Pattern PATH_PATTERN = Pattern.compile(PATH);
private static final String INTERNAL_VALUE = "$.%s";
private static final String INTERNAL_PATH = "^\\$\\..*$";
private static final Pattern INTERNAL_PATH_PATTERN = Pattern.compile(INTERNAL_PATH);

private final Matcher matcher;
private final Matcher internalMatcher;
private final Function<KafkaTopicConfig, T> mapper;
private String name;
private KafkaOffsetType defaultOffset;
private KafkaDeltaType deltaType;
private KafkaTopicTransformsConfig transforms;
private ModelConfig key;
private ModelConfig value;
private List<KafkaTopicHeaderType> headers;

KafkaTopicConfigBuilder(
Function<KafkaTopicConfig, T> mapper)
{
this.mapper = mapper;
this.headers = new ArrayList<>();
this.matcher = PATH_PATTERN.matcher("");
this.internalMatcher = INTERNAL_PATH_PATTERN.matcher("");
}

@Override
Expand Down Expand Up @@ -81,56 +66,33 @@ public KafkaTopicConfigBuilder<T> deltaType(
return this;
}

public KafkaTopicConfigBuilder<T> key(
ModelConfig key)
public KafkaTopicTransformsConfigBuilder<KafkaTopicConfigBuilder<T>> transforms()
{
this.key = key;
return this;
return KafkaTopicTransformsConfig.builder(this::transforms);
}

public KafkaTopicConfigBuilder<T> value(
ModelConfig value)
public KafkaTopicConfigBuilder<T> transforms(
KafkaTopicTransformsConfig transforms)
{
this.value = value;
this.transforms = transforms;
return this;
}

public KafkaTopicConfigBuilder<T> headers(
List<KafkaTopicHeaderType> headers)
public KafkaTopicConfigBuilder<T> key(
ModelConfig key)
{
if (headers != null)
{
headers.forEach(this::header);
}
this.key = key;
return this;
}

public KafkaTopicConfigBuilder<T> header(
KafkaTopicHeaderType header)
{
return header(header.name, header.path);
}

public KafkaTopicConfigBuilder<T> header(
String name,
String path)
public KafkaTopicConfigBuilder<T> value(
ModelConfig value)
{
if (this.headers == null)
{
this.headers = new ArrayList<>();
}
if (matcher.reset(path).matches())
{
this.headers.add(new KafkaTopicHeaderType(name,
String.format(INTERNAL_VALUE, matcher.group(1))));
}
else if (internalMatcher.reset(path).matches())
{
this.headers.add(new KafkaTopicHeaderType(name, path));
}
this.value = value;
return this;
}


public <C extends ConfigBuilder<KafkaTopicConfigBuilder<T>, C>> C key(
Function<Function<ModelConfig, KafkaTopicConfigBuilder<T>>, C> key)
{
Expand All @@ -146,6 +108,6 @@ public <C extends ConfigBuilder<KafkaTopicConfigBuilder<T>, C>> C value(
@Override
public T build()
{
return mapper.apply(new KafkaTopicConfig(name, defaultOffset, deltaType, key, value, headers));
return mapper.apply(new KafkaTopicConfig(name, defaultOffset, deltaType, key, value, transforms));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021-2023 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.kafka.config;

import java.util.List;
import java.util.function.Function;

public class KafkaTopicTransformsConfig
{
public final List<KafkaTopicHeaderType> extractHeaders;

public static KafkaTopicTransformsConfigBuilder<KafkaTopicTransformsConfig> builder()
{
return new KafkaTopicTransformsConfigBuilder<>(KafkaTopicTransformsConfig.class::cast);
}

public static <T> KafkaTopicTransformsConfigBuilder<T> builder(
Function<KafkaTopicTransformsConfig, T> mapper)
{
return new KafkaTopicTransformsConfigBuilder<>(mapper);
}

KafkaTopicTransformsConfig(
List<KafkaTopicHeaderType> extractHeaders)
{
this.extractHeaders = extractHeaders;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2021-2023 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.kafka.config;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;

public final class KafkaTopicTransformsConfigBuilder<T> extends ConfigBuilder<T, KafkaTopicTransformsConfigBuilder<T>>
{
private static final String PATH = "^\\$\\{message\\.value\\.([A-Za-z_][A-Za-z0-9_]*)\\}$";
private static final Pattern PATH_PATTERN = Pattern.compile(PATH);
private static final String INTERNAL_VALUE = "$.%s";
private static final String INTERNAL_PATH = "^\\$\\..*$";
private static final Pattern INTERNAL_PATH_PATTERN = Pattern.compile(INTERNAL_PATH);

private final Matcher matcher;
private final Matcher internalMatcher;
private final Function<KafkaTopicTransformsConfig, T> mapper;
private List<KafkaTopicHeaderType> extractHeaders;

KafkaTopicTransformsConfigBuilder(
Function<KafkaTopicTransformsConfig, T> mapper)
{
this.mapper = mapper;
this.extractHeaders = new ArrayList<>();
this.matcher = PATH_PATTERN.matcher("");
this.internalMatcher = INTERNAL_PATH_PATTERN.matcher("");
}

@Override
@SuppressWarnings("unchecked")
protected Class<KafkaTopicTransformsConfigBuilder<T>> thisType()
{
return (Class<KafkaTopicTransformsConfigBuilder<T>>) getClass();
}

public KafkaTopicTransformsConfigBuilder<T> extractHeaders(
List<KafkaTopicHeaderType> extractHeaders)
{
if (extractHeaders != null)
{
extractHeaders.forEach(this::extractHeader);
}
return this;
}

public KafkaTopicTransformsConfigBuilder<T> extractHeader(
KafkaTopicHeaderType header)
{
return extractHeader(header.name, header.path);
}

public KafkaTopicTransformsConfigBuilder<T> extractHeader(
String name,
String path)
{
if (this.extractHeaders == null)
{
this.extractHeaders = new ArrayList<>();
}
if (matcher.reset(path).matches())
{
this.extractHeaders.add(new KafkaTopicHeaderType(name,
String.format(INTERNAL_VALUE, matcher.group(1))));
}
else if (internalMatcher.reset(path).matches())
{
this.extractHeaders.add(new KafkaTopicHeaderType(name, path));
}
return this;
}

@Override
public T build()
{
return mapper.apply(new KafkaTopicTransformsConfig(extractHeaders));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.IntFunction;
Expand All @@ -67,6 +66,7 @@
import org.agrona.io.ExpandableDirectBufferOutputStream;

import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicHeaderType;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicTransformsConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Array32FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight;
Expand Down Expand Up @@ -360,14 +360,14 @@ public void writeEntry(
ConverterHandler convertKey,
ConverterHandler convertValue,
boolean verbose,
List<KafkaTopicHeaderType> headerTypes)
KafkaTopicTransformsConfig transforms)
{
final int valueLength = value != null ? value.sizeof() : -1;
writeEntryStart(context, traceId, bindingId, offset, entryMark, valueMark, timestamp, producerId, key,
valueLength, null, entryFlags, deltaType, value, convertKey, convertValue, verbose);
writeEntryContinue(value);
writeEntryFinish(headers, deltaType, context, traceId, bindingId, FLAGS_COMPLETE, offset, entryMark, valueMark,
convertValue, verbose, headerTypes);
convertValue, verbose, transforms);
}

public void writeEntryStart(
Expand Down Expand Up @@ -545,7 +545,7 @@ public void writeEntryFinish(
MutableInteger valueMark,
ConverterHandler convertValue,
boolean verbose,
List<KafkaTopicHeaderType> headerTypes)
KafkaTopicTransformsConfig transforms)
{
final Node head = sentinel.previous;
assert head != sentinel;
Expand Down Expand Up @@ -600,11 +600,13 @@ public void writeEntryFinish(
context.supplyLocalName(bindingId), topic, id, offset);
}
}
else if (headerTypes != null && !headerTypes.isEmpty())
else if (transforms != null &&
transforms.extractHeaders != null &&
!transforms.extractHeaders.isEmpty())
{
Array32FW.Builder<KafkaHeaderFW.Builder, KafkaHeaderFW> builder =
trailersRW.wrap(trailersRW.buffer(), 0, trailersRW.maxLimit());
for (KafkaTopicHeaderType header : headerTypes)
for (KafkaTopicHeaderType header : transforms.extractHeaders)
{
String32FW name = stringRW.set(header.name, UTF_8).build();
String path = header.path;
Expand Down
Loading

0 comments on commit 153aac1

Please sign in to comment.