The compute agent computes field values based on expressions evaluated at runtime. If the field already exists, it will be overwritten.
Given the input:
{
"key": {
"compound": {
"uuid": "uuidValue",
"timestamp": 1663616014
}
},
"value" : {
"first" : "joe",
"last" : "schmoe",
"rank" : 1,
"address" : {
"zipcode" : "abc-def"
}
}
}
With an agent configuration of:
- name: "Compute a new record"
type: "compute"
input: "input-topic" # optional
output: "output-topic" # optional
configuration:
fields:
- name: "key.newKeyField"
expression: "5*3"
type: "INT32"
optional: true
- name: "value.first"
expression: "fn:concat(value.first, ' ')"
type: "STRING"
optional: false
- name: "value.fullName"
expression: "fn:concat(value.first, value.last)"
type: "STRING"
optional: false
The output would be:
{
key: {
newKeyField: 15
},
value: {
first: “joe ”,
fullName: “joe schmoe”
}
}
Label | Type | Description |
---|---|---|
fields | object[] (required) | An array of objects describing how to calculate the field values. Refer to the field table for more info. |
Label | Type | Description |
---|---|---|
name | string (required) | The name of the field to be computed. Prefix with “key.” or “value.” to compute the fields in the key or value parts of the message. Example: name: “value.first-name” |
expression | string (required) | Supports the Expression language syntax. It is evaluated at runtime and the result of the evaluation is assigned to the field (do not include mustache brackets, the agent will fill the value correctly). |
type | string (required) | The type of the computed field. this will translate to the schema type of the new field in the transformed message. See type reference below. |
optional | boolean (optional) | If true, it marks the field as optional in the schema of the transformed message. This is useful when null is a possible value of the compute expression. The default value is “true” |
Label | Type | Description |
---|---|---|
BOOLEAN | true or false | expression1: "true", expression2: "1 == 1", expression3: "value.stringField == 'matching string'" |
DATE | a date without a time-zone in the RFC3339 format | expression1: "2021-12-03" |
DATETIME | a date-time with an offset from UTC in the RFC3339 format | expression1: "2022-10-02T01:02:03+02:00", expression2: "2019-10-02T01:02:03Z", expression3: "fn:now()" |
DOUBLE | represents 64-bit floating point. | expression1: "1.79769313486231570e+308", expression2: "1.1 + 1.1" |
FLOAT | represents 32-bit floating point. | expression1: "340282346638528859999999999999999999999.999999", expression2: "1.1 + 1.1" |
INT32 | represents 32-bit integer. | expression1: "2147483647", expression2: "1 + 1" |
INT64 | represents 64-bit integer. | expression1: "9223372036854775807", expression2: "1 + 1" |
TIME | a time without a time-zone in the RFC3339 format | expression1: "20:15:45" |
To support Conditional steps and the Compute Transform, an expression language is required to evaluate the conditional step when
or the compute step expression
. The syntax is EL, which uses the dot notation to access field properties or map keys. It supports the following operators and functions:
The Expression Language supports the following operators:
- Arithmetic: +, - (binary), *, / and div, % and mod, - (unary)
- Logical: and, &&, or, ||, not, !
- Relational: ==, eq, !=, ne, <, lt, >, gt, <=, ge, >=, le.
Utility methods available under the fn
namespace. For example, to get the current timestamp, use fn:now()
. The Expression Language supports the following functions:\
Name (field) | Description |
---|---|
concat(input1, input2) | Returns a string concatenation of input1 and input2. If either input is null, it is treated as an empty |
concat3(input1, input2, input3) | Returns a string concatenation of input1, input2 and input3. If either input is null, it is treated as an empty |
contains(input, value) | Returns true if value exists in input. It attempts string conversion on both input and value if either is not a string. If input or value is null, ir returns false. Do not use this function on lists |
coalesce(value, valueIfNull) | Returns value if it is not null, otherwise returns valueIfNull. |
dateadd(input, delta, unit) | Performs date/time arithmetic operations on the input date/time. |
decimalFromNumber(input) | Converts
|
decimalFromUnscaled(input, scale) | Converts
|
filter(collection, expression) | Returns a new collection containing only the elements of For all methods, if a parameter is not in the right type, a conversion will be done using the rules described in Type conversions. For instance, you can do |
fromJson | Parse input as JSON. |
lowercase(input) | Changes the capitalization of a string. If input is not a string, it attempts a string conversion. If the input is null, it returns null. |
now() | Returns the current epoch millis. |
replace(input,regex,replacement) | Replaces each substring of input that matches the regex regular expression with replacement . See Java's replaceAll. |
str(input) | Converts input to a string. |
toString(input) | Converts input to a string. |
toDouble(input) | Converts the input value to a DOUBLE number, If the input is null , it returns null . |
toInt(input) | Converts the input value to an INTEGER number, If the input is null , it returns null . |
toListOfFloat(input) | Converts the input value to a list of FLOAT numbers (embeddings vector), If the input is null , it returns null . The input must already be a list |
emptyList() | Returns a new empty list |
listAdd(list, item) | Returns a new list that contains the contents of a given list plus an item. The input list must be a list and not null (you can use fn:emptyList() to create an empty list) |
emptyMap() | Returns a new empty map |
mapToListOfStructs(map, fields) | Converts a map to a list of one element that contains only a selection of fields from the map. For instance if your map is {"text":"value","foo":"bar"} and you call mapToListOfStructs(map, 'text') the result is a list list [{"text":"value"}] |
listToListOfStructs(list, field) | Converts a list of items to a list of maps with one element named after field For instance if your list is 'this is a question' and you call listToListOfStructs(list, 'text') the result is a list list [{"text":"this is a question"}] . This is very handful when you have to convert a list of texts to a list of structs |
uppercase(input) | Changes the capitalization of a string. If input is not a string, it attempts a string conversion. If the input is null, it returns null. |
toJson | Converts input to a JSON string. |
trim(input) | Returns the input string with all leading and trailing spaces removed. If the input is not a string, it attempts a string conversion. |
unpack(input, fieldsList) | Returns a map containing the elements of input , for each field in the fieldList you will see an entry in the map. If the input is a string it is converted to a list using the split() function with the ', ' separator |
split(input, separatorExpression) | Split the input to a list of strings, this is internally using the String.split() function. An empty input corresponds to an empty list. The input is convered to a String using the str() function. |
timestampAdd(input, delta, unit) | Returns a timestamp formed by adding delta in unit to the input timestamp. input is a timestamp to add to. delta is a long amount of unit to add to input . Can be a negative value to perform subtraction. unit the string unit of time to add or subtract. Can be one of [years , months , days , hours , minutes , seconds , millis ]. |
Each step
accepts an optional when
configuration that is evaluated at step execution time against current records (the current step in the transform pipeline).
The when
condition supports the expression language syntax, which provides access to the record attributes as follows:
Name (field) | Description |
---|---|
key: | the key portion of the record in a KeyValue schema. |
value: | the value portion of the record in a KeyValue schema, or the message payload itself. |
messageKey: | the optional key messages are tagged with (aka. Partition Key). |
topicName: | the optional name of the topic which the record originated from (aka. Input Topic). |
destinationTopic: | the name of the topic on which the transformed record will be sent (aka. Output Topic). |
eventTime: | the optional timestamp attached to the record from its source. For example, the original timestamp attached to the pulsar message. |
properties: | the optional user-defined properties attached to record. |
You can use the .
operator to access top level or nested properties on a schema-full key or value.
For example, key.keyField1
or value.valueFiled1.nestedValueField
.
Checkout the full configuration properties in the API Reference page.