From a4e2e2e37c7d5209650a5ab71d1c246ea91a6bab Mon Sep 17 00:00:00 2001 From: emptyOVO <118812562+emptyOVO@users.noreply.github.com> Date: Fri, 11 Oct 2024 16:33:31 +0800 Subject: [PATCH] [INLONG-11282][SDK] Transform enhance Array-related Collection Functions (#11288) --- .../collection/ArrayAppendFunction.java | 66 ++++++++ .../collection/ArrayConcatFunction.java | 74 +++++++++ .../collection/ArrayContainsFunction.java | 71 ++++++++ .../collection/ArrayDistinctFunction.java | 62 +++++++ .../collection/ArrayExceptFunction.java | 76 +++++++++ .../collection/ArrayIntersectFunction.java | 75 +++++++++ .../collection/ArrayJoinFunction.java | 111 +++++++++++++ .../function/collection/ArrayMaxFunction.java | 67 ++++++++ .../function/collection/ArrayMinFunction.java | 68 ++++++++ .../collection/ArrayPositionFunction.java | 75 +++++++++ .../collection/ArrayPrependFunction.java | 75 +++++++++ .../collection/ArrayRemoveFunction.java | 71 ++++++++ .../collection/ArrayReverseFunction.java | 62 +++++++ .../collection/ArraySliceFunction.java | 85 ++++++++++ .../collection/ArraySortFunction.java | 154 ++++++++++++++++++ .../collection/ArrayUnionFunction.java | 72 ++++++++ .../collection/CardinalityFunction.java | 57 +++++++ .../function/collection/ElementFunction.java | 63 +++++++ .../{ => json}/JsonArrayInsertFunction.java | 3 +- .../function/{ => string}/EltFunction.java | 3 +- .../{ => string}/EndsWithFunction.java | 3 +- .../{ => string}/StartsWithFunction.java | 3 +- .../process/operator/OperatorTools.java | 8 + .../collection/TestArrayAppendFunction.java | 89 ++++++++++ .../collection/TestArrayConcatFunction.java | 89 ++++++++++ .../collection/TestArrayContainsFunction.java | 101 ++++++++++++ .../collection/TestArrayDistinctFunction.java | 90 ++++++++++ .../collection/TestArrayExceptFunction.java | 97 +++++++++++ .../TestArrayIntersectFunction.java | 92 +++++++++++ .../collection/TestArrayJoinFunction.java | 103 ++++++++++++ .../collection/TestArrayMaxFunction.java | 103 ++++++++++++ .../collection/TestArrayMinFunction.java | 103 ++++++++++++ .../collection/TestArrayPositionFunction.java | 115 +++++++++++++ .../collection/TestArrayPrependFunction.java | 102 ++++++++++++ .../collection/TestArrayRemoveFunction.java | 96 +++++++++++ .../collection/TestArrayReverseFunction.java | 78 +++++++++ .../collection/TestArraySliceFunction.java | 78 +++++++++ .../collection/TestArraySortFunction.java | 108 ++++++++++++ .../collection/TestArrayUnionFunction.java | 79 +++++++++ .../collection/TestCardinalityFunction.java | 147 +++++++++++++++++ .../collection/TestElementFunction.java | 89 ++++++++++ .../TestJsonArrayInsertFunction.java | 3 +- 42 files changed, 3161 insertions(+), 5 deletions(-) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayAppendFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayConcatFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayContainsFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayDistinctFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayExceptFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayIntersectFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayJoinFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayMaxFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayMinFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayPositionFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayPrependFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayRemoveFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayReverseFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArraySliceFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArraySortFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayUnionFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/CardinalityFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ElementFunction.java rename inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/{ => json}/JsonArrayInsertFunction.java (97%) rename inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/{ => string}/EltFunction.java (94%) rename inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/{ => string}/EndsWithFunction.java (95%) rename inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/{ => string}/StartsWithFunction.java (95%) create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayAppendFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayConcatFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayContainsFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayDistinctFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayExceptFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayIntersectFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayJoinFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayMaxFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayMinFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayPositionFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayPrependFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayRemoveFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayReverseFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArraySliceFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArraySortFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayUnionFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestCardinalityFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestElementFunction.java rename inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/{string => json}/TestJsonArrayInsertFunction.java (96%) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayAppendFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayAppendFunction.java new file mode 100644 index 00000000000..055aaef7bc7 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayAppendFunction.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +/** + * ArrayAppendFunction + * description: ARRAY_APPEND(array, element)--Appends an element to the end of the array and returns the result. + * If the array itself is null, the function will return null. If an element to add is null, the null + * element will be added to the end of the array. + * for example: array_append(array('he',7,'xxd'), 'cloud')--return[he, 7, xxd, cloud] + */ +@TransformFunction(names = {"array_append"}) +public class ArrayAppendFunction implements ValueParser { + + private final ValueParser arrayParser; + + private final ValueParser elementParser; + + public ArrayAppendFunction(Function expr) { + this.arrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + this.elementParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(1)); + } + + @SuppressWarnings("unchecked") + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object arrayObj = arrayParser.parse(sourceData, rowIndex, context); + Object elementObj = elementParser.parse(sourceData, rowIndex, context); + if (arrayObj == null) { + return null; + } + if (arrayObj instanceof ArrayList) { + ArrayList array = (ArrayList) arrayObj; + if (array.isEmpty()) { + return null; + } + array.add(elementObj); + return array; + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayConcatFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayConcatFunction.java new file mode 100644 index 00000000000..046bf4fb778 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayConcatFunction.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.List; +/** + * ArrayConcatFunction + * description: ARRAY_CONCAT(array1, …)--Returns an array that is the result of concatenating at least one array. + * This array contains all the elements in the first array, followed by all the elements in the second + * array, and so forth, up to the Nth array. If any input array is NULL, the function returns NULL. + * for example: array_concat(array('he',7),array('xxd', 'cloud'))--return [he, 7, xxd, cloud] + */ +@TransformFunction(names = {"array_concat"}) +public class ArrayConcatFunction implements ValueParser { + + private List parserList; + + public ArrayConcatFunction(Function expr) { + if (expr.getParameters() == null) { + this.parserList = new ArrayList<>(); + } else { + List params = expr.getParameters().getExpressions(); + parserList = new ArrayList<>(params.size()); + for (Expression param : params) { + ValueParser node = OperatorTools.buildParser(param); + parserList.add(node); + } + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + ArrayList res = new ArrayList<>(); + for (ValueParser valueParser : parserList) { + Object parseObj = valueParser.parse(sourceData, rowIndex, context); + if (parseObj == null) { + return null; + } + if (parseObj instanceof ArrayList) { + ArrayList array = (ArrayList) parseObj; + if (array.isEmpty()) { + return null; + } + res.addAll(array); + } + } + return res; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayContainsFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayContainsFunction.java new file mode 100644 index 00000000000..f583ced4acc --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayContainsFunction.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +/** + * ArrayContainsFunction + * description: ARRAY_CONTAINS(haystack, needle)--Returns whether the given element exists in an array. Checking for + * null elements in the array is supported. If the array itself is null, the function will return null. + * for example: array_contains(array('he',7,'xxd'), 'cloud')--return false + * array_contains(array('he',-1,''),'')--return true + */ +@TransformFunction(names = {"array_contains"}) +public class ArrayContainsFunction implements ValueParser { + + private final ValueParser arrayParser; + + private final ValueParser elementParser; + + public ArrayContainsFunction(Function expr) { + this.arrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + this.elementParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(1)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object arrayObj = arrayParser.parse(sourceData, rowIndex, context); + Object elementObj = elementParser.parse(sourceData, rowIndex, context); + if (arrayObj == null) { + return null; + } + if (arrayObj instanceof ArrayList) { + ArrayList array = (ArrayList) arrayObj; + if (array.isEmpty()) { + return null; + } + for (Object element : array) { + if (element == null && elementObj == null) { + return true; + } + if (element != null && element.equals(elementObj)) { + return true; + } + } + } + return false; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayDistinctFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayDistinctFunction.java new file mode 100644 index 00000000000..d601d531df1 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayDistinctFunction.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.Set; +/** + * ArrayDistinctFunction + * description: ARRAY_DISTINCT(haystack)--Returns an array with unique elements. If the array itself is null, + * the function will return null. Keeps ordering of elements. + * for example: array_distinct(array('he',-1,'he'))--return [he, -1] + */ +@TransformFunction(names = {"array_distinct"}) +public class ArrayDistinctFunction implements ValueParser { + + private final ValueParser arrayParser; + + public ArrayDistinctFunction(Function expr) { + this.arrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object arrayObj = arrayParser.parse(sourceData, rowIndex, context); + if (arrayObj == null) { + return null; + } + if (arrayObj instanceof ArrayList) { + ArrayList array = (ArrayList) arrayObj; + if (array.isEmpty()) { + return null; + } + Set distinctSet = new LinkedHashSet<>(array); + return new ArrayList<>(distinctSet); + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayExceptFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayExceptFunction.java new file mode 100644 index 00000000000..2ca0f7e3cd2 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayExceptFunction.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.Set; +/** + * ArrayExceptFunction + * description: ARRAY_EXCEPT(array1, array2)--Returns an ARRAY that contains the elements from array1 that are not in + * array2, without duplicates. If no elements remain after excluding the elements in array2 from array1, + * the function returns an empty ARRAY. If one or both arguments are NULL, the function returns NULL. + * The order of the elements from array1 is kept. + * for example: array_except(array('he',7,'xxd'),array('he'))--return [7, xxd] + * array_except(array('he',7,'xxd'),array('cloud'))--return [he, 7, xxd] + */ +@TransformFunction(names = {"array_except"}) +public class ArrayExceptFunction implements ValueParser { + + private final ValueParser leftArrayParser; + + private final ValueParser rightArrayParser; + + public ArrayExceptFunction(Function expr) { + this.leftArrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + this.rightArrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(1)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object leftArrayObj = leftArrayParser.parse(sourceData, rowIndex, context); + Object rightArrayObj = rightArrayParser.parse(sourceData, rowIndex, context); + if (leftArrayObj == null || rightArrayObj == null) { + return null; + } + if (leftArrayObj instanceof ArrayList && rightArrayObj instanceof ArrayList) { + ArrayList leftArray = (ArrayList) leftArrayObj; + ArrayList rightArray = (ArrayList) rightArrayObj; + if (leftArray.isEmpty() || rightArray.isEmpty()) { + return null; + } + Set res = new LinkedHashSet<>(); + + for (Object value : leftArray) { + if (!rightArray.contains(value)) { + res.add(value); + } + } + return new ArrayList<>(res); + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayIntersectFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayIntersectFunction.java new file mode 100644 index 00000000000..b123fedeab5 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayIntersectFunction.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.stream.Collectors; +/** + * ArrayIntersectFunction + * description: ARRAY_INTERSECT(array1, array2)--Returns an ARRAY that contains the elements from array1 that are also + * in array2, without duplicates. If no elements that are both in array1 and array2, the function returns + * an empty ARRAY. If any of the array is null, the function will return null. The order of the elements + * from array1 is kept. + * for example: array_intersect(array('he',7,'xxd'),array('he'))--return [he] + * array_intersect(array('he',7,'xxd'),array('cloud'))--return [] + */ +@TransformFunction(names = {"array_intersect"}) +public class ArrayIntersectFunction implements ValueParser { + + private final ValueParser leftArrayParser; + + private final ValueParser rightArrayParser; + + public ArrayIntersectFunction(Function expr) { + this.leftArrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + this.rightArrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(1)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object leftArrayObj = leftArrayParser.parse(sourceData, rowIndex, context); + Object rightArrayObj = rightArrayParser.parse(sourceData, rowIndex, context); + if (leftArrayObj == null || rightArrayObj == null) { + return null; + } + if (leftArrayObj instanceof ArrayList && rightArrayObj instanceof ArrayList) { + ArrayList leftArray = (ArrayList) leftArrayObj; + ArrayList rightArray = (ArrayList) rightArrayObj; + if (leftArray.isEmpty() || rightArray.isEmpty()) { + return null; + } + Set res = new LinkedHashSet<>(rightArray); + + return leftArray.stream() + .filter(res::contains) + .distinct() + .collect(Collectors.toList()); + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayJoinFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayJoinFunction.java new file mode 100644 index 00000000000..9c0ebe02781 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayJoinFunction.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.List; +/** + * ArrayJoinFunction + * description: ARRAY_JOIN(array, delimiter[, nullReplacement])--Returns a string that represents the concatenation of + * the elements in the given array and the elements’ data type in the given array is string. The delimiter + * is a string that separates each pair of consecutive elements of the array. The optional nullReplacement + * is a string that replaces null elements in the array. If nullReplacement is not specified, null elements + * in the array will be omitted from the resulting string. Returns null if input array or delimiter or + * nullReplacement are null. + * for example: array_join(array('he',7,'xxd'),'~')--return he~7~xxd + * array_join(array('he',3,''),'~','oo')--return he~3~oo + */ +@TransformFunction(names = {"array_join"}) +public class ArrayJoinFunction implements ValueParser { + + private ValueParser arrayParser; + + private ValueParser delimiterParser; + + private ValueParser nullReplacementParser; + + public ArrayJoinFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + if (expressions.size() >= 2) { + this.arrayParser = OperatorTools.buildParser(expressions.get(0)); + this.delimiterParser = OperatorTools.buildParser(expressions.get(1)); + if (expressions.size() >= 3) { + this.nullReplacementParser = OperatorTools.buildParser(expressions.get(2)); + } + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object arrayObj = arrayParser.parse(sourceData, rowIndex, context); + Object delimiterObj = delimiterParser.parse(sourceData, rowIndex, context); + if (arrayObj == null || delimiterObj == null) { + return null; + } + String delimiter = OperatorTools.parseString(delimiterObj); + if (delimiter.isEmpty()) { + return null; + } + String nullReplacement = parseNullReplacement(sourceData, rowIndex, context); + + if (arrayObj instanceof ArrayList) { + return joinArrayWithDelimiter((ArrayList) arrayObj, delimiter, nullReplacement); + } + + return null; + } + + private String joinArrayWithDelimiter(ArrayList array, String delimiter, String nullReplacement) { + StringBuilder result = new StringBuilder(); + + for (int i = 0; i < array.size(); i++) { + String element = (String) array.get(i); + + if (element == null || element.isEmpty()) { + if (nullReplacement != null && !nullReplacement.isEmpty()) { + result.append(nullReplacement); + } + } else { + result.append(element); + } + + if (i < array.size() - 1) { + result.append(delimiter); + } + } + + return result.toString(); + } + + private String parseNullReplacement(SourceData sourceData, int rowIndex, Context context) { + if (nullReplacementParser != null) { + Object nullReplacementObj = nullReplacementParser.parse(sourceData, rowIndex, context); + return OperatorTools.parseString(nullReplacementObj); + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayMaxFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayMaxFunction.java new file mode 100644 index 00000000000..4fd075ccb8b --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayMaxFunction.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +/** + * ArrayMaxFunction + * description: ARRAY_MAX(array)--Returns the maximum value from the array, if array itself is null + * , the function returns null. + * for example: array_max(array(4,3,56))--return 56 + */ +@TransformFunction(names = {"array_max"}) +public class ArrayMaxFunction implements ValueParser { + + private final ValueParser arrayParser; + + public ArrayMaxFunction(Function expr) { + this.arrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));; + } + + @SuppressWarnings("unchecked") + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object arrayObj = arrayParser.parse(sourceData, rowIndex, context); + if (arrayObj == null) { + return null; + } + if (arrayObj instanceof ArrayList) { + ArrayList array = (ArrayList) arrayObj; + if (array.isEmpty()) { + return null; + } + int max = Integer.parseInt(String.valueOf(array.get(0))); + for (Object element : array) { + int elementInt = Integer.parseInt((String) element); + if (elementInt - max > 0) { + max = elementInt; + } + } + return max; + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayMinFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayMinFunction.java new file mode 100644 index 00000000000..b0fcfb716d5 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayMinFunction.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +/** + * ArrayMinFunction + * description: ARRAY_MIN(array)--Returns the minimum value from the array, if array itself is null + * , the function returns null. + * for example: array_max(array(4,3,56))--return 3 + */ +@TransformFunction(names = {"array_min"}) +public class ArrayMinFunction implements ValueParser { + + private final ValueParser arrayParser; + + public ArrayMinFunction(Function expr) { + this.arrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));; + } + + @SuppressWarnings("unchecked") + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object arrayObj = arrayParser.parse(sourceData, rowIndex, context); + if (arrayObj == null) { + return null; + } + if (arrayObj instanceof ArrayList) { + ArrayList array = (ArrayList) arrayObj; + if (array.isEmpty()) { + return null; + } + int min = Integer.parseInt(String.valueOf(array.get(0))); + for (Object element : array) { + int elementInt = Integer.parseInt((String) element); + if (elementInt - min < 0) { + min = elementInt; + } + } + return min; + } + return null; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayPositionFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayPositionFunction.java new file mode 100644 index 00000000000..08d9e3f7d85 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayPositionFunction.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +/** + * ArrayPositionFunction + * description: ARRAY_POSITION(haystack, needle)--Returns the position of the first occurrence of element in the given + * array as int. Returns 0 if the given value could not be found in the array. Returns null if either of + * the arguments are null. And this is not zero based, but 1-based index. The first element in the array + * has index 1. + * for example: array_position(array('he',7,'xxd'),'he')--return 1 + * array_position(array('he',7,''),'_')--return 0 + */ +@TransformFunction(names = {"array_position"}) +public class ArrayPositionFunction implements ValueParser { + + private final ValueParser arrayParser; + + private final ValueParser needleParser; + + public ArrayPositionFunction(Function expr) { + this.arrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));; + this.needleParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(1));; + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object arrayObj = arrayParser.parse(sourceData, rowIndex, context); + Object needleObj = needleParser.parse(sourceData, rowIndex, context); + if (arrayObj == null || needleObj == null) { + return null; + } + if (arrayObj instanceof ArrayList) { + ArrayList array = (ArrayList) arrayObj; + if (array.isEmpty()) { + return null; + } + for (int i = 0; i < array.size(); i++) { + if (array.get(i) == null) { + continue; + } + if (array.get(i).equals(needleObj)) { + return i + 1; + } + } + + return 0; + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayPrependFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayPrependFunction.java new file mode 100644 index 00000000000..5975195e817 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayPrependFunction.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.List; +/** + * ArrayPrependFunction + * description: ARRAY_PREPEND(array, element)--Appends an element to the beginning of the array and returns the result. + * If the array itself is null, the function will return null. If an element to add is null, the null + * element will be added to the beginning of the array. + * for example: array_prepend(array(4,3),3)--return [3, 4, 3] + */ +@TransformFunction(names = {"array_prepend"}) +public class ArrayPrependFunction implements ValueParser { + + private final ValueParser arrayParser; + + private ValueParser elementParser; + + public ArrayPrependFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + this.arrayParser = OperatorTools.buildParser(expressions.get(0)); + if (expressions.size() > 1) { + this.elementParser = OperatorTools.buildParser(expressions.get(1)); + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object arrayObj = arrayParser.parse(sourceData, rowIndex, context); + Object elementObj = elementParser.parse(sourceData, rowIndex, context); + if (arrayObj == null) { + return null; + } + if (arrayObj instanceof ArrayList) { + ArrayList array = (ArrayList) arrayObj; + + if (array.isEmpty()) { + return null; + } + + List result = new ArrayList<>(array.size() + 1); + + result.add(elementObj); + result.addAll(array); + return result; + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayRemoveFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayRemoveFunction.java new file mode 100644 index 00000000000..bf86e7e6b3d --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayRemoveFunction.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.List; +/** + * ArrayRemoveFunction + * description: ARRAY_REMOVE(haystack, needle)--Removes all elements that equal to element from array. If the array + * itself is null, the function will return null. Keeps ordering of elements. + * for example: array_remove(array('he',7,'xxd'),'he')--return [7, xxd] + */ +@TransformFunction(names = {"array_remove"}) +public class ArrayRemoveFunction implements ValueParser { + + private final ValueParser arrayParser; + + private ValueParser needleParser; + + public ArrayRemoveFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + this.arrayParser = OperatorTools.buildParser(expressions.get(0)); + if (expressions.size() > 1) { + this.needleParser = OperatorTools.buildParser(expressions.get(1)); + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object arrayObj = arrayParser.parse(sourceData, rowIndex, context); + Object needleObj = needleParser.parse(sourceData, rowIndex, context); + if (arrayObj == null) { + return null; + } + if (arrayObj instanceof ArrayList) { + ArrayList array = (ArrayList) arrayObj; + if (array.isEmpty()) { + return null; + } + + array.removeIf(element -> (element == null && needleObj == null) + || (element != null && element.equals(needleObj))); + return array; + } + return null; + } +} \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayReverseFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayReverseFunction.java new file mode 100644 index 00000000000..ad8d4fad618 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayReverseFunction.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.Collections; +/** + * ArrayReverseFunction + * description: ARRAY_REVERSE(haystack)--Returns an array in reverse order. If the array itself is null, + * the function will return null. + * for example: array_reverse(array('he',7,'xxd'))--return [xxd, 7, he] + */ +@TransformFunction(names = {"array_reverse"}) +public class ArrayReverseFunction implements ValueParser { + + private final ValueParser arrayParser; + + public ArrayReverseFunction(Function expr) { + this.arrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object arrayObj = arrayParser.parse(sourceData, rowIndex, context); + if (arrayObj == null) { + return null; + } + if (arrayObj instanceof ArrayList) { + ArrayList array = (ArrayList) arrayObj; + if (array.isEmpty()) { + return null; + } + + Collections.reverse(array); + return array; + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArraySliceFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArraySliceFunction.java new file mode 100644 index 00000000000..95151022f31 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArraySliceFunction.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +/** + * ArraySliceFunction + * description: ARRAY_SLICE(array, start_offset[, end_offset])--Returns a subarray of the input array between + * ‘start_offset’ and ’end_offset’ inclusive. The offsets are 1-based however 0 is also treated as the + * beginning of the array. Positive values are counted from the beginning of the array while negative from + * the end. If ’end_offset’ is omitted then this offset is treated as the length of the array. If ‘ + * start_offset’ is after ’end_offset’ or both are out of array bounds an empty array will be returned. + * Returns null if any input is null. + * for example: array_slice(array('he',7,'xxd'),1,2)--return [he, 7] + * array_slice(array('he',3,'b'),-2,-1)--return [3, xxd] + */ +@TransformFunction(names = {"array_slice"}) +public class ArraySliceFunction implements ValueParser { + + private ValueParser arrayParser; + + private ValueParser startOffsetParser; + + private ValueParser endOffsetParser; + + public ArraySliceFunction(Function expr) { + if (expr.getParameters().getExpressions().size() >= 3) { + this.arrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));; + this.startOffsetParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(1));; + this.endOffsetParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(2));; + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object arrayObj = arrayParser.parse(sourceData, rowIndex, context); + Object startOffsetObj = startOffsetParser.parse(sourceData, rowIndex, context); + Object endOffsetObj = endOffsetParser.parse(sourceData, rowIndex, context); + if (arrayObj == null || startOffsetObj == null || endOffsetObj == null) { + return null; + } + int startOffset = OperatorTools.parseBigDecimal(startOffsetObj).intValue(); + int endOffset = OperatorTools.parseBigDecimal(endOffsetObj).intValue(); + if (arrayObj instanceof ArrayList) { + ArrayList array = (ArrayList) arrayObj; + if (array.isEmpty()) { + return null; + } + int arrayLength = array.size(); + + int startIndex = startOffset > 0 ? startOffset - 1 : startOffset == 0 ? 0 : arrayLength + startOffset; + int endIndex = endOffset > 0 ? endOffset - 1 : endOffset == 0 ? 0 : arrayLength + endOffset; + + if (startIndex < 0 || endIndex < 0 || endIndex >= arrayLength || startIndex > endIndex) { + return new ArrayList<>(); + } + + return new ArrayList<>(array.subList(startIndex, endIndex + 1)); + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArraySortFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArraySortFunction.java new file mode 100644 index 00000000000..73680c98d9a --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArraySortFunction.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +/** + * ArraySortFunction + * description: ARRAY_SORT(array[, ascending_order[, null_first]])--Returns the array in sorted order.The function sorts + * an array, defaulting to ascending order with NULLs at the start when only the array is input. Specifying + * ascending_order as true orders the array in ascending with NULLs first, and setting it to false orders + * it in descending with NULLs last. Independently, null_first as true moves NULLs to the beginning, and + * as false to the end, irrespective of the sorting order. The function returns null if any input is null. + * for example: array_sort(array('he',7,'xxd'))--return [7, he, xxd] + * array_sort(array(3,7,5))--return [3, 5, 7] + * array_sort(array(,3,7),false,false)--return [7, 3, ] + * array_sort(array(3,7,),true,false)--return [3, 7, ] + */ +@TransformFunction(names = {"array_sort"}) +public class ArraySortFunction implements ValueParser { + + private ValueParser arrayParser; + + private ValueParser ascendingOrderParser; + + private ValueParser nullFirstParser; + + public ArraySortFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + if (!expressions.isEmpty()) { + this.arrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + if (expressions.size() >= 2) { + this.ascendingOrderParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(1)); + if (expressions.size() >= 3) { + this.nullFirstParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(2)); + } + } + } + } + + @SuppressWarnings("unchecked") + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object arrayObj = arrayParser.parse(sourceData, rowIndex, context); + Object ascendingOrderObj = null; + Object nullFirstObj = null; + if (ascendingOrderParser != null) { + ascendingOrderObj = ascendingOrderParser.parse(sourceData, rowIndex, context); + } + if (nullFirstParser != null) { + nullFirstObj = nullFirstParser.parse(sourceData, rowIndex, context); + } + if (arrayObj == null) { + return null; + } + // default value + boolean ascendingOrder = ascendingOrderObj == null || OperatorTools.parseBoolean(ascendingOrderObj); + boolean nullFirst = nullFirstObj == null || OperatorTools.parseBoolean(nullFirstObj); + + if (arrayObj instanceof ArrayList) { + ArrayList array = (ArrayList) arrayObj; + if (array.isEmpty()) { + return null; + } + // sort array + return arraySort(array, ascendingOrder, nullFirst); + } + return null; + } + + @SuppressWarnings("unchecked") + private List arraySort(List array, Boolean ascendingOrder, Boolean nullFirst) { + if (array == null || ascendingOrder == null || nullFirst == null) { + return null; + } + + // Separate the null and non-null elements + List nonNullElements = new ArrayList<>(); + List nullElements = new ArrayList<>(); + + for (Object element : array) { + if (element == null || element.equals("")) { + nullElements.add(element); + } else { + if (element instanceof Comparable) { + nonNullElements.add(element); + } else { + throw new IllegalArgumentException("Array contains non-comparable elements."); + } + } + } + + // Sort the non-null elements based on the ascendingOrder flag + if (ascendingOrder) { + Collections.sort(nonNullElements, new Comparator() { + + @SuppressWarnings("unchecked") + @Override + public int compare(Object o1, Object o2) { + return ((Comparable) o1).compareTo(o2); + } + }); + } else { + Collections.sort(nonNullElements, new Comparator() { + + @SuppressWarnings("unchecked") + @Override + public int compare(Object o1, Object o2) { + return ((Comparable) o2).compareTo(o1); + } + }); + } + + // Combine null and non-null elements based on nullFirst flag + List sortedArray = new ArrayList<>(); + if (nullFirst) { + // NULLs go first + sortedArray.addAll(nullElements); + sortedArray.addAll(nonNullElements); + } else { + // NULLs go last + sortedArray.addAll(nonNullElements); + sortedArray.addAll(nullElements); + } + + return sortedArray; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayUnionFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayUnionFunction.java new file mode 100644 index 00000000000..4bf4d62cb07 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ArrayUnionFunction.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.Set; +/** + * ArrayConcatFunction + * description: ARRAY_CONCAT(array1, …)--Returns an array that is the result of concatenating at least one array. + * This array contains all the elements in the first array, followed by all the elements in the second + * array, and so forth, up to the Nth array. If any input array is NULL, the function returns NULL. + * for example: array_concat(array('he',7),array('xxd', 'cloud'))--return [he, 7, xxd, cloud] + */ +@TransformFunction(names = {"array_union"}) +public class ArrayUnionFunction implements ValueParser { + + private final ValueParser leftArrayParser; + + private final ValueParser rightArrayParser; + + public ArrayUnionFunction(Function expr) { + this.leftArrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + this.rightArrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(1)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object leftArrayObj = leftArrayParser.parse(sourceData, rowIndex, context); + Object rightArrayObj = rightArrayParser.parse(sourceData, rowIndex, context); + if (leftArrayObj == null || rightArrayObj == null) { + return null; + } + if (leftArrayObj instanceof ArrayList && rightArrayObj instanceof ArrayList) { + ArrayList leftArray = (ArrayList) leftArrayObj; + ArrayList rightArray = (ArrayList) rightArrayObj; + if (leftArray.isEmpty() || rightArray.isEmpty()) { + return null; + } + + Set res = new LinkedHashSet<>(); + res.addAll(leftArray); + res.addAll(rightArray); + + return new ArrayList<>(res); + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/CardinalityFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/CardinalityFunction.java new file mode 100644 index 00000000000..9fbc4d7847c --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/CardinalityFunction.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.Map; +/** + * CardinalityFunction + * description: CARDINALITY(array)--Returns the number of elements in array. + * CARDINALITY(map)--Returns the number of entries in map. + * for example: cardinality(array('he',7,'xxd'))--return 3 + * cardinality(map('he',7,'xxd',3))--return 2 + */ +@TransformFunction(names = {"cardinality"}) +public class CardinalityFunction implements ValueParser { + + private final ValueParser valueParser; + + public CardinalityFunction(Function expr) { + valueParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object valueObj = valueParser.parse(sourceData, rowIndex, context); + if (valueObj instanceof ArrayList) { + return ((ArrayList) valueObj).size(); + + } else if (valueObj instanceof Map) { + return ((Map) valueObj).size(); + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ElementFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ElementFunction.java new file mode 100644 index 00000000000..c1c0d4493fa --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/collection/ElementFunction.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import lombok.extern.slf4j.Slf4j; +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +/** + * ElementFunction + * description: ELEMENT(array)--Returns the sole element of array (whose cardinality should be one); returns NULL if + * array is empty. Throws an exception if array has more than one element. + * for example: element(array('he'))--return he + */ +@Slf4j +@TransformFunction(names = {"element"}) +public class ElementFunction implements ValueParser { + + private final ValueParser valueParser; + + public ElementFunction(Function expr) { + valueParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object valueObj = valueParser.parse(sourceData, rowIndex, context); + if (valueObj instanceof ArrayList) { + ArrayList array = (ArrayList) valueObj; + if (array.isEmpty()) { + return null; + } else if (array.size() == 1) { + return array.get(0); + } else { + log.warn("Array contains more than one element", new IllegalArgumentException()); + return null; + } + } + return null; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/JsonArrayInsertFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonArrayInsertFunction.java similarity index 97% rename from inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/JsonArrayInsertFunction.java rename to inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonArrayInsertFunction.java index 4c8e722dcc8..a49e145d198 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/JsonArrayInsertFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonArrayInsertFunction.java @@ -15,10 +15,11 @@ * limitations under the License. */ -package org.apache.inlong.sdk.transform.process.function; +package org.apache.inlong.sdk.transform.process.function.json; import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/EltFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/EltFunction.java similarity index 94% rename from inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/EltFunction.java rename to inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/EltFunction.java index 4eae49423c8..85acc9bf266 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/EltFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/EltFunction.java @@ -15,10 +15,11 @@ * limitations under the License. */ -package org.apache.inlong.sdk.transform.process.function; +package org.apache.inlong.sdk.transform.process.function.string; import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/EndsWithFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/EndsWithFunction.java similarity index 95% rename from inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/EndsWithFunction.java rename to inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/EndsWithFunction.java index 3a78a17ac5f..1a609995b38 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/EndsWithFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/EndsWithFunction.java @@ -15,10 +15,11 @@ * limitations under the License. */ -package org.apache.inlong.sdk.transform.process.function; +package org.apache.inlong.sdk.transform.process.function.string; import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/StartsWithFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/StartsWithFunction.java similarity index 95% rename from inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/StartsWithFunction.java rename to inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/StartsWithFunction.java index ae7937eb883..ff2b747b461 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/StartsWithFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/StartsWithFunction.java @@ -15,10 +15,11 @@ * limitations under the License. */ -package org.apache.inlong.sdk.transform.process.function; +package org.apache.inlong.sdk.transform.process.function.string; import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index df991e7c724..0f273cd1eb4 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -157,6 +157,14 @@ public static byte[] parseBytes(Object value) { } } + public static boolean parseBoolean(Object value) { + if (value instanceof Boolean) { + return (Boolean) value; + } else { + return Boolean.parseBoolean(String.valueOf(value)); + } + } + /** * compareValue * @param left diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayAppendFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayAppendFunction.java new file mode 100644 index 00000000000..c69ef660d8f --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayAppendFunction.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArrayAppendFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArrayAppendFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select array_append(array(string1,numeric1,string2),string3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array_append(array('he',7,'xxd'), 'cloud') + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, 7, xxd, cloud]", output.get(0)); + + // case2: array_append(array('he',-1,'xxd'),'') + data = "he|xxd||-1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, -1, xxd, ]", output.get(0)); + + transformSql = "select array_append(array(string1), array(numeric1)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case3: array_append(array('he'), array(5)) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, [5]]", output.get(0)); + + transformSql = "select array_append(array(),numeric1) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: array_append(array(),5) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + // case5: array_append(array(),'') + data = "||"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayConcatFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayConcatFunction.java new file mode 100644 index 00000000000..9f0d56c1b73 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayConcatFunction.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArrayConcatFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArrayConcatFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select array_concat(array(string1,numeric1),array(string2, string3)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array_concat(array('he',7),array('xxd', 'cloud')) + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, 7, xxd, cloud]", output.get(0)); + + // case2: array_append(array('he',-1),array('xxd','')) + data = "he|xxd||-1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, -1, xxd, ]", output.get(0)); + + transformSql = "select array_concat(array(string1), array(numeric1)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case3: array_concat(array('he'), array(5)) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, 5]", output.get(0)); + + transformSql = "select array_concat(array(),numeric1) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: array_append(array(),5) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + // case5: array_append(array(),'') + data = "||"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayContainsFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayContainsFunction.java new file mode 100644 index 00000000000..021280ea5c5 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayContainsFunction.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArrayContainsFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArrayContainsFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select array_contains(array(string1,numeric1,string2),string3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array_contains(array('he',7,'xxd'), 'cloud') + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=false", output.get(0)); + + // case2: array_contains(array('he',-1,''),'') + data = "he|||-1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=true", output.get(0)); + + // case3: array_contains(array('he',-1,'xxd'),'') + data = "he|xxd||-1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=false", output.get(0)); + + transformSql = "select array_contains(array(array(string1)), array(string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: array_contains(array(array('he')), array('he')) + data = "he|he|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=true", output.get(0)); + + // case5: array_contains(array(array('he')), array('xxd')) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=false", output.get(0)); + + transformSql = "select array_contains(array(),numeric1) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case6: array_contains(array(),5) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + // case7: array_contains(array(),'') + data = "||"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayDistinctFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayDistinctFunction.java new file mode 100644 index 00000000000..ca6a739a0b0 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayDistinctFunction.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArrayDistinctFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArrayDistinctFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select array_distinct(array(string1,numeric1,string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array_distinct(array('he',7,'xxd')) + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, 7, xxd]", output.get(0)); + + // case2: array_distinct(array('he',-1,'he')) + data = "he|he||-1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, -1]", output.get(0)); + + transformSql = "select array_distinct(array(array(string1), array(string2), array(numeric1))) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case3: array_distinct(array(array('he'), array('xxd'), array(5)) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[[he], [xxd], [5]]", output.get(0)); + + // case4: array_distinct(array(array('he'), array('he'), array(5)) + data = "he|he|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[[he], [5]]", output.get(0)); + + transformSql = "select array_distinct(array()) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case5: array_append(array()) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayExceptFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayExceptFunction.java new file mode 100644 index 00000000000..a6ec9a903be --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayExceptFunction.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArrayExceptFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArrayExceptFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select array_except(array(string1,numeric1,string2),array(string3)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array_except(array('he',7,'xxd'),array('he')) + data = "he|xxd|he|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[7, xxd]", output.get(0)); + + // case2: array_except(array('he',7,'xxd'),array('cloud')) + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, 7, xxd]", output.get(0)); + + // case2: array_except(array('xxd',7,'xxd'),array('cloud')) + data = "xxd|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[xxd, 7]", output.get(0)); + + transformSql = + "select array_except(array(array(string1), array(string2), array(numeric1)),array(array(string3))) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case3: array_except(array(array('he'), array('xxd'), array(5)),array(array('cloud'))) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[[he], [xxd], [5]]", output.get(0)); + + // case4: array_except(array(array('he'), array('xxd'), array(5)),array(array('he'))) + data = "he|xxd|he|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[[xxd], [5]]", output.get(0)); + + transformSql = "select array_except(array(),array(string1)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case5: array_except(array(),array('he')) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayIntersectFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayIntersectFunction.java new file mode 100644 index 00000000000..61410a9f248 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayIntersectFunction.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArrayIntersectFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArrayIntersectFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select array_intersect(array(string1,numeric1,string2),array(string3)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array_intersect(array('he',7,'xxd'),array('he')) + data = "he|xxd|he|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he]", output.get(0)); + + // case2: array_intersect(array('he',7,'xxd'),array('cloud')) + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[]", output.get(0)); + + transformSql = "select array_intersect(array(array(string1), array(string2), array(numeric1))" + + ",array(array(string3))) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case3: array_intersect(array(array('he'), array('xxd'), array(5)),array(array('cloud'))) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[]", output.get(0)); + + // case4: array_intersect(array(array('he'), array('xxd'), array(5)),array(array('he'))) + data = "he|xxd|he|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[[he]]", output.get(0)); + + transformSql = "select array_except(array(),array(string1)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case5: array_intersect(array(),array('he')) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + } + +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayJoinFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayJoinFunction.java new file mode 100644 index 00000000000..23adb0def3b --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayJoinFunction.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArrayJoinFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArrayJoinFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select array_join(array(string1,numeric1,string2),string3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array_join(array('he',7,'xxd'),'~') + data = "he|xxd|~|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=he~7~xxd", output.get(0)); + + // case2: array_join(array('he',7,''),'_') + data = "he||_|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=he_7_", output.get(0)); + + transformSql = "select array_join(array(string1,numeric1,numeric2),string2,string3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case3: array_join(array('he',3,''),'~','oo') + data = "he|~||3|oo|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=he~3~oo", output.get(0)); + + // case4: array_join(array('he',3,5),',','oo') + data = "he|,|oo|3|5|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=he,3,5", output.get(0)); + + transformSql = "select array_join(array(),string2,string3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case5: array_join(array(),'xxd','cloud') + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = "select array_join(array(string1),string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case6: array_join(array('he'),'') + data = "he|||5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + } + +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayMaxFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayMaxFunction.java new file mode 100644 index 00000000000..0983b024f58 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayMaxFunction.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArrayMaxFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArrayMaxFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select array_max(array(string1,numeric1,string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array_max(array('he',7,'xxd')) + data = "he|xxd|~|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + // case2: array_max(array('he',7,'')) + data = "he||_|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = "select array_max(array(numeric1,numeric2,numeric3)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case3: array_max(array(4,3,56)) + data = "3|2|54|4|3|56"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=56", output.get(0)); + + // case4: array_max(array(3,5,3)) + data = "he|,|oo|3|5|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=5", output.get(0)); + + transformSql = "select array_max(array(),string2,string3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case5: array_max(array(),'xxd','cloud') + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = "select array_max(array(string1),string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case6: array_max(array('he'),'') + data = "he|||5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + } + +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayMinFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayMinFunction.java new file mode 100644 index 00000000000..82ba6879f16 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayMinFunction.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArrayMinFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArrayMinFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select array_min(array(string1,numeric1,string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array_min(array('he',7,'xxd')) + data = "he|xxd|~|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + // case2: array_min(array('he',7,'')) + data = "he||_|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = "select array_min(array(numeric1,numeric2,numeric3)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case3: array_max(array(4,3,56)) + data = "3|2|54|4|3|56"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=3", output.get(0)); + + // case4: array_min(array(3,5,3)) + data = "he|,|oo|3|5|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=3", output.get(0)); + + transformSql = "select array_min(array(),string2,string3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case5: array_min(array(),'xxd','cloud') + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = "select array_min(array(string1),string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case6: array_min(array('he'),'') + data = "he|||5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + } + +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayPositionFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayPositionFunction.java new file mode 100644 index 00000000000..bd90e523433 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayPositionFunction.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArrayPositionFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArrayPositionFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select array_position(array(string1,numeric1,string2),string3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array_position(array('he',7,'xxd'),'he') + data = "he|xxd|he|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1", output.get(0)); + + // case2: array_position(array('he',7,''),'_') + data = "he||_|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=0", output.get(0)); + + // case3: array_position(array('he',7,''),'') + data = "he|||7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=3", output.get(0)); + + transformSql = "select array_position(array(numeric1,numeric2),numeric3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: array_position(array(4,3),3) + data = "3|2|54|4|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2", output.get(0)); + + // case5: array_position(array(3,5),3) + data = "he|,|oo|3|5|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1", output.get(0)); + + // case6: array_position(array(3,5),4) + data = "he|,|oo|3|5|4"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=0", output.get(0)); + + transformSql = "select array_position(array(),string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case7: array_position(array(),'xxd') + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = "select array_position(array(string1),string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case8: array_position(array('he'),'') + data = "he|||5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=0", output.get(0)); + + } + +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayPrependFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayPrependFunction.java new file mode 100644 index 00000000000..451017670c3 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayPrependFunction.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArrayPrependFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArrayPrependFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select array_prepend(array(string1,numeric1,string2),string3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array_prepend(array('he',7,'xxd'),'he') + data = "he|xxd|he|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, he, 7, xxd]", output.get(0)); + + // case2: array_prepend(array('he',7,''),'_') + data = "he||_|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[_, he, 7, ]", output.get(0)); + + // case3: array_prepend(array('he',7,''),'') + data = "he|||7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[, he, 7, ]", output.get(0)); + + transformSql = "select array_prepend(array(numeric1,numeric2),numeric3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: array_prepend(array(4,3),3) + data = "3|2|54|4|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[3, 4, 3]", output.get(0)); + + // case5: array_prepend(array(3,5),3) + data = "he|,|oo|3|5|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[3, 3, 5]", output.get(0)); + + // case6: array_prepend(array(3,5),4) + data = "he|,|oo|3|5|4"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[4, 3, 5]", output.get(0)); + + transformSql = "select array_prepend(array(),string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case7: array_prepend(array(),'xxd') + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayRemoveFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayRemoveFunction.java new file mode 100644 index 00000000000..a75425ee2d9 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayRemoveFunction.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArrayRemoveFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArrayRemoveFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select array_remove(array(string1,numeric1,string2),string3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array_remove(array('he',7,'xxd'),'he') + data = "he|xxd|he|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[7, xxd]", output.get(0)); + + // case2: array_remove(array('he',7,''),'_') + data = "he||_|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, 7, ]", output.get(0)); + + // case3: array_remove(array('he','',''),'') + data = "he||||3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he]", output.get(0)); + + transformSql = "select array_remove(array(numeric1,numeric2),numeric3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: array_remove(array(4,3),3) + data = "3|2|54|4|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[4]", output.get(0)); + + // case5: array_remove(array(3,5),3) + data = "he|,|oo|3|5|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[5]", output.get(0)); + + transformSql = "select array_remove(array(),string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case6: array_remove(array(),'xxd') + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayReverseFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayReverseFunction.java new file mode 100644 index 00000000000..30b91b55f79 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayReverseFunction.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArrayReverseFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArrayReverseFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select array_reverse(array(string1,numeric1,string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array_reverse(array('he',7,'xxd')) + data = "he|xxd|he|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[xxd, 7, he]", output.get(0)); + + // case2: array_reverse(array('he',7,'')) + data = "he||_|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[, 7, he]", output.get(0)); + + // case3: array_reverse(array('he','','')) + data = "he||||3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[, , he]", output.get(0)); + + transformSql = "select array_remove(array()) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: array_reverse(array()) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArraySliceFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArraySliceFunction.java new file mode 100644 index 00000000000..0ea36101aea --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArraySliceFunction.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArraySliceFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArraySliceFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select array_slice(array(string1,numeric1,string2),numeric2,numeric3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array_slice(array('he',7,'xxd'),1,2) + data = "he|xxd|he|7|1|2"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, 7]", output.get(0)); + + // case2: array_slice(array('he',7,''),1,1) + data = "he||_|7|1|1"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he]", output.get(0)); + + // case3: array_slice(array('he',3,'b'),-2,-1) + data = "he|xxd|b|3|-2|-1"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[3, xxd]", output.get(0)); + + transformSql = "select array_slice(array(),1,2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: array_slice(array()) + data = "he|xxd|cloud|1|2|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArraySortFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArraySortFunction.java new file mode 100644 index 00000000000..d0824b1a564 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArraySortFunction.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArraySortFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArraySortFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select array_sort(array(string1,numeric1,string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array_sort(array('he',7,'xxd')) + data = "he|xxd|he|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[7, he, xxd]", output.get(0)); + + // case2: array_sort(array('he',7,'')) + data = "he||he|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[, 7, he]", output.get(0)); + + transformSql = "select array_sort(array(numeric1,numeric2,numeric3)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case3: array_sort(array(3,7,5)) + data = "he|||3|7|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[3, 5, 7]", output.get(0)); + + // case4: array_sort(array(3,7,)) + data = "he|||3|7|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[, 3, 7]", output.get(0)); + + transformSql = "select array_sort(array(numeric1,numeric2,numeric3),string1,string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case5: array_sort(array(,3,7),false,false) + data = "false|false|||3|7"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[7, 3, ]", output.get(0)); + + // case6: array_sort(array(3,7,),true,false) + data = "true|false||3|7|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[3, 7, ]", output.get(0)); + + transformSql = "select array_sort(array()) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case7: array_sort(array()) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayUnionFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayUnionFunction.java new file mode 100644 index 00000000000..0eab341a5ea --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestArrayUnionFunction.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArrayUnionFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArrayUnionFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select array_union(array(string1,numeric1),array(string2,string3)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array_union(array('he', 7),array('cloud','ha')) + data = "he|cloud|ha|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, 7, cloud, ha]", output.get(0)); + + // case2: array_union(array('he',7,''),array('')) + data = "he|||7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, 7, ]", output.get(0)); + + // case3: array_union(array('he',7),array('he','ha')) + data = "he|he|ha|7|7|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, 7, ha]", output.get(0)); + + transformSql = "select array_union(array(numeric1,numeric2,numeric3),array()) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: array_union(array(3,7,5),array()) + data = "he|||3|7|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + } + +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestCardinalityFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestCardinalityFunction.java new file mode 100644 index 00000000000..43914dadbd2 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestCardinalityFunction.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestCardinalityFunction extends AbstractFunctionStringTestBase { + + @Test + public void testCardinalityForArrayFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select cardinality(array(string1,numeric1,string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: cardinality(array('he',7,'xxd')) + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=3", output.get(0)); + + // case2: cardinality(array('he', 1, '')) + data = "he||cloud|1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=3", output.get(0)); + + // case3: cardinality(array('he',-1,'xxd')) + data = "he|xxd|cloud|-1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=3", output.get(0)); + + transformSql = "select cardinality(array(array(string1,numeric1),string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: cardinality(array(array('he',5),'xxd')) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2", output.get(0)); + + // case5: cardinality(array(array('he',5),'')) + data = "he||cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2", output.get(0)); + + transformSql = "select cardinality(array(array(string1,numeric1),array(string2,string3))) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case6: cardinality(array(array('he',5),array('xxd','cloud'))) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2", output.get(0)); + + // case7: cardinality(array(array('he',5),array('',''))) + data = "he|||5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2", output.get(0)); + } + + @Test + public void testCardinalityForMapFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select cardinality(map(string1,numeric1,string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case3: cardinality(map('he',-1,'xxd')) + data = "he|xxd|cloud|-1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = "select cardinality(map(string1,numeric1,string2,string3)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: cardinality(map('he', 5, 'xxd', 'cloud')) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2", output.get(0)); + + transformSql = + "select cardinality(map(string1,numeric1,map(string2,string3),map(numeric2,numeric3))) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case6: cardinality(map('he', 5, map('xxd', 'cloud'),map(3, 3))) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2", output.get(0)); + + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestElementFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestElementFunction.java new file mode 100644 index 00000000000..f47db2cc28d --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/collection/TestElementFunction.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.collection; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestElementFunction extends AbstractFunctionStringTestBase { + + @Test + public void testElementFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select element(array(string1,numeric1,string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: element(array('he',7,'xxd')) + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + // case2: element(array('he',-1,'xxd')) + data = "he|xxd|cloud|-1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = "select element(array(array(string1,numeric1))) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case3: element(array(array('he',5))) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, 5]", output.get(0)); + + transformSql = "select element(array(string1)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: element(array('he')) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=he", output.get(0)); + + // case5: element(array('')) + data = "|||5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestJsonArrayInsertFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonArrayInsertFunction.java similarity index 96% rename from inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestJsonArrayInsertFunction.java rename to inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonArrayInsertFunction.java index 82f30282e26..27f0bf7222a 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestJsonArrayInsertFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonArrayInsertFunction.java @@ -15,12 +15,13 @@ * limitations under the License. */ -package org.apache.inlong.sdk.transform.process.function.string; +package org.apache.inlong.sdk.transform.process.function.json; import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; import org.apache.inlong.sdk.transform.pojo.TransformConfig; import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; import org.junit.Assert; import org.junit.Test;