From e1884106681a4062555d413f6f2cdc24ef741229 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Tue, 3 Sep 2024 14:07:56 +0800 Subject: [PATCH 1/5] [INLONG-10997][Manager] Incorrect setting of transformSQL in dataflowconfig (#10998) --- .../manager/service/resource/sort/DefaultSortConfigOperator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index 116a8f0ea21..17ca4b9bad2 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -175,6 +175,7 @@ private DataFlowConfig getDataFlowConfig(InlongGroupInfo groupInfo, InlongStream .dataflowId(String.valueOf(sink.getId())) .sourceConfig(getSourceConfig(groupInfo, streamInfo, sink)) .auditTag(String.valueOf(sink.getId())) + .transformSql(sink.getTransformSql()) .sinkConfig(getSinkConfig(groupInfo, streamInfo, sink)) .inlongGroupId(groupInfo.getInlongGroupId()) .inlongStreamId(streamInfo.getInlongStreamId()) From edf93bd547a09df6b1d7a6d57ea3d719d9f63f4f Mon Sep 17 00:00:00 2001 From: ChunLiang Lu Date: Tue, 3 Sep 2024 21:49:46 +0800 Subject: [PATCH 2/5] [INLONG-10999][SDK] Support to return raw data by star sign in transformer SQL (#11004) * [INLONG-10999][SDK] Support to return raw data by star sign in transformer SQL * add more UT Case * fix code format problems * fix pom.xml problem --- .../sdk/transform/encode/CsvSinkEncoder.java | 6 +- .../sdk/transform/encode/KvSinkEncoder.java | 6 +- .../sdk/transform/encode/SinkEncoder.java | 2 + .../transform/process/TransformProcessor.java | 34 ++++++--- .../transform/process/ValueParserNode.java | 34 +++++++++ .../process/TestTransformProcessor.java | 76 +++++++++++++++++++ 6 files changed, 146 insertions(+), 12 deletions(-) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java index ce47a0072c7..89f6f364a08 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java @@ -66,7 +66,11 @@ public String encode(SinkData sinkData, Context context) { } else { for (String fieldName : sinkData.keyList()) { String fieldValue = sinkData.getField(fieldName); - EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue); + if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) { + builder.append(fieldValue); + } else { + EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue); + } builder.append(delimiter); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java index 7460ec95c29..2822374c412 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java @@ -63,7 +63,11 @@ public String encode(SinkData sinkData, Context context) { if (fields == null || fields.size() == 0) { for (String fieldName : sinkData.keyList()) { String fieldValue = sinkData.getField(fieldName); - builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter); + if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) { + builder.append(fieldValue).append(entryDelimiter); + } else { + builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter); + } } } else { for (FieldInfo field : fields) { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java index 7f845a99d61..a63f9702956 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java @@ -27,6 +27,8 @@ */ public interface SinkEncoder { + public static final String ALL_SOURCE_FIELD_SIGN = "*"; + Output encode(SinkData sinkData, Context context); List getFields(); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java index 9944268dda6..acb7e62e070 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java @@ -31,23 +31,23 @@ import com.google.common.collect.ImmutableMap; import net.sf.jsqlparser.JSQLParserException; import net.sf.jsqlparser.parser.CCJSqlParserManager; +import net.sf.jsqlparser.statement.select.AllColumns; import net.sf.jsqlparser.statement.select.PlainSelect; import net.sf.jsqlparser.statement.select.Select; import net.sf.jsqlparser.statement.select.SelectExpressionItem; import net.sf.jsqlparser.statement.select.SelectItem; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.StringReader; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; /** * TransformProcessor - * + * */ public class TransformProcessor { @@ -61,7 +61,9 @@ public class TransformProcessor { private PlainSelect transformSelect; private ExpressionOperator where; - private Map selectItemMap; + private List selectItems; + + private boolean includeAllSourceFields = false; public static TransformProcessor create( TransformConfig config, @@ -91,7 +93,7 @@ private void initTransformSql() throws JSQLParserException { this.transformSelect = (PlainSelect) select.getSelectBody(); this.where = OperatorTools.buildOperator(this.transformSelect.getWhere()); List items = this.transformSelect.getSelectItems(); - this.selectItemMap = new HashMap<>(items.size()); + this.selectItems = new ArrayList<>(items.size()); List fields = this.encoder.getFields(); for (int i = 0; i < items.size(); i++) { SelectItem item = items.get(i); @@ -108,8 +110,12 @@ private void initTransformSql() throws JSQLParserException { fieldName = exprItem.getAlias().getName(); } } - this.selectItemMap.put(fieldName, - OperatorTools.buildParser(exprItem.getExpression())); + this.selectItems + .add(new ValueParserNode(fieldName, OperatorTools.buildParser(exprItem.getExpression()))); + } else if (item instanceof AllColumns) { + fieldName = item.toString(); + this.encoder.getFields().clear(); + this.selectItems.add(new ValueParserNode(fieldName, null)); } } } @@ -137,10 +143,18 @@ public List transform(I input, Map extParams) { // parse value SinkData sinkData = new DefaultSinkData(); - for (Entry entry : this.selectItemMap.entrySet()) { - String fieldName = entry.getKey(); + for (ValueParserNode node : this.selectItems) { + String fieldName = node.getFieldName(); + ValueParser parser = node.getParser(); + if (parser == null && StringUtils.equals(fieldName, SinkEncoder.ALL_SOURCE_FIELD_SIGN)) { + if (input instanceof String) { + sinkData.addField(fieldName, (String) input); + } else { + sinkData.addField(fieldName, ""); + } + continue; + } try { - ValueParser parser = entry.getValue(); Object fieldValue = parser.parse(sourceData, i, context); sinkData.addField(fieldName, String.valueOf(fieldValue)); } catch (Throwable t) { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java new file mode 100644 index 00000000000..e36c0c9c6a9 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process; + +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * ValueParserNode + */ +@AllArgsConstructor +@Data +public class ValueParserNode { + + private String fieldName; + private ValueParser parser; +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java index 3413f1aca3e..8448260252d 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java @@ -350,4 +350,80 @@ public void testPb2CsvForNow() throws Exception { List output = processor.transform(srcBytes, new HashMap<>()); Assert.assertEquals(2, output.size()); } + @Test + public void testCsv2Star() throws Exception { + List fields = this.getTestFieldList("ftime", "extinfo"); + CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\', fields); + CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', new ArrayList<>()); + String transformSql = "select *"; + TransformConfig config = new TransformConfig(transformSql); + // case1 + TransformProcessor processor1 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); + + List output1 = processor1.transform("2024-04-28 00:00:00|ok", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok"); + // case2 + config.setTransformSql("select * from source where extinfo!='ok'"); + TransformProcessor processor2 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); + + List output2 = processor2.transform("2024-04-28 00:00:00|ok", new HashMap<>()); + Assert.assertEquals(0, output2.size()); + // case3 + config.setTransformSql("select *,extinfo,ftime from source where extinfo!='ok'"); + TransformProcessor processor3 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); + + List output3 = processor3.transform("2024-04-28 00:00:00|nok", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "2024-04-28 00:00:00|nok|nok|2024-04-28 00:00:00"); + // case4 + CsvSourceInfo csvSourceNoField = new CsvSourceInfo("UTF-8", '|', '\\', new ArrayList<>()); + CsvSinkInfo csvSinkNoField = new CsvSinkInfo("UTF-8", '|', '\\', new ArrayList<>()); + config.setTransformSql("select *,$2,$1 from source where $2='nok'"); + TransformProcessor processor4 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSourceNoField), + SinkEncoderFactory.createCsvEncoder(csvSinkNoField)); + + List output4 = processor4.transform("2024-04-28 00:00:00|nok", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "2024-04-28 00:00:00|nok|nok|2024-04-28 00:00:00"); + } + + @Test + public void testKv2Star() throws Exception { + List fields = this.getTestFieldList("ftime", "extinfo"); + KvSourceInfo kvSource = new KvSourceInfo("UTF-8", fields); + KvSinkInfo kvSink = new KvSinkInfo("UTF-8", new ArrayList<>()); + String transformSql = "select *"; + TransformConfig config = new TransformConfig(transformSql); + // case1 + TransformProcessor processor1 = TransformProcessor + .create(config, SourceDecoderFactory.createKvDecoder(kvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output1 = processor1.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "ftime=2024-04-28 00:00:00&extinfo=ok"); + // case2 + config.setTransformSql("select * from source where extinfo!='ok'"); + TransformProcessor processor2 = TransformProcessor + .create(config, SourceDecoderFactory.createKvDecoder(kvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output2 = processor2.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); + Assert.assertEquals(0, output2.size()); + // case3 + config.setTransformSql("select *,extinfo e1,ftime f1 from source where extinfo!='ok'"); + TransformProcessor processor3 = TransformProcessor + .create(config, SourceDecoderFactory.createKvDecoder(kvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + List output3 = processor3.transform("ftime=2024-04-28 00:00:00&extinfo=nok", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "ftime=2024-04-28 00:00:00&extinfo=nok&e1=nok&f1=2024-04-28 00:00:00"); + } } From 49b07ad68382cbaf50db2b6d04a799c3abff97f3 Mon Sep 17 00:00:00 2001 From: rachely <124853723+Ybszzzziz@users.noreply.github.com> Date: Thu, 5 Sep 2024 09:02:48 +0800 Subject: [PATCH 3/5] [INLONG-10963][SDK] Transform SQL support CONTAINS function. (#10964) * [INLONG-10963][SDK] Transform SQL support CONTAINS function.(#10963) * [INLONG-10963][SDK] Transform SQL support CONTAINS function.(#10963) --- .../process/function/ContainsFunction.java | 56 +++++++++++++++++++ ...TestTransformStringFunctionsProcessor.java | 32 +++++++++++ 2 files changed, 88 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ContainsFunction.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ContainsFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ContainsFunction.java new file mode 100644 index 00000000000..e2905eea07e --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ContainsFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.List; + +/** + * ContainsFunction + * description: contains(left, right) - Returns a boolean. + * The value is True if right is found inside left, otherwise, returns False. + * Both left or right must be of STRING type. + */ +@TransformFunction(names = {"contains"}) +public class ContainsFunction implements ValueParser { + + private ValueParser leftStrParser; + private ValueParser rightStrParser; + + public ContainsFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + leftStrParser = OperatorTools.buildParser(expressions.get(0)); + rightStrParser = OperatorTools.buildParser(expressions.get(1)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object leftStrObj = leftStrParser.parse(sourceData, rowIndex, context); + Object rightStrObj = rightStrParser.parse(sourceData, rowIndex, context); + String leftStr = OperatorTools.parseString(leftStrObj); + String rightStr = OperatorTools.parseString(rightStrObj); + return (leftStr == null || rightStr == null) ? null : leftStr.contains(rightStr); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java index a3099d09f2a..f099d728dc7 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java @@ -724,4 +724,36 @@ public void testInsertFunction() throws Exception { Assert.assertEquals("result=1278", output5.get(0)); } + @Test + public void testContainsFunction() throws Exception { + String transformSql = "select contains(string1, string2) from source"; + TransformConfig config = new TransformConfig(transformSql); + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: contains('Transform SQL', 'SQL') + List output1 = processor.transform("Transform SQL|SQL", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=true"); + // case2: contains('', 'SQL') + List output2 = processor.transform("|SQL", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=false"); + // case3: contains('Transform SQL', '') + List output3 = processor.transform("Transform SQL|", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=true"); + // case4: contains('Transform SQL', 'Transformer') + List output4 = processor.transform("Transform SQL|Transformer", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "result=false"); + // case5: contains('Transform SQL', 'm S') + List output5 = processor.transform("Transform SQL|m S", new HashMap<>()); + Assert.assertEquals(1, output5.size()); + Assert.assertEquals(output5.get(0), "result=true"); + // case6: contains('', '') + List output6 = processor.transform("|", new HashMap<>()); + Assert.assertEquals(1, output6.size()); + Assert.assertEquals(output6.get(0), "result=true"); + } } From 4f73aa0c81657ea98e5586cd8ff498e08fb2bc39 Mon Sep 17 00:00:00 2001 From: MOONSakura0614 <151456101+MOONSakura0614@users.noreply.github.com> Date: Thu, 5 Sep 2024 10:55:14 +0800 Subject: [PATCH 4/5] [INLONG-10893][SDK] Transform SQL support FromBase64 function (#11016) --- .../process/function/FromBase64Function.java | 66 ++++++++++++++ ...tTransformFromBase64FunctionProcessor.java | 90 +++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FromBase64Function.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformFromBase64FunctionProcessor.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FromBase64Function.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FromBase64Function.java new file mode 100644 index 00000000000..d64891d4711 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FromBase64Function.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.List; + +/** + * FromBase64Function + * description: Returns the base64-decoded result from string; returns NULL if string is NULL + */ +@TransformFunction(names = {"from_base64"}) +public class FromBase64Function implements ValueParser { + + private final ValueParser stringParser; + + public FromBase64Function(Function expr) { + List expressions = expr.getParameters().getExpressions(); + stringParser = OperatorTools.buildParser(expressions.get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object stringObj = stringParser.parse(sourceData, rowIndex, context); + if (stringObj == null) { + return null; + } + String encodedString = OperatorTools.parseString(stringObj); + + if (encodedString == null) { + return null; + } + + try { + byte[] decodedBytes = Base64.getDecoder().decode(encodedString); + return new String(decodedBytes, StandardCharsets.UTF_8); + } catch (IllegalArgumentException e) { + // handle decoding exceptions and log exception information + throw new RuntimeException("Invalid Base64 input: " + encodedString, e); + } + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformFromBase64FunctionProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformFromBase64FunctionProcessor.java new file mode 100644 index 00000000000..9684c5fe7cd --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformFromBase64FunctionProcessor.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * TestTransformFromBase64FunctionProcessor + * description: test the from_base64 function in transform processor + */ +public class TestTransformFromBase64FunctionProcessor { + + private static final List srcFields = new ArrayList<>(); + private static final List dstFields = new ArrayList<>(); + private static final CsvSourceInfo csvSource; + private static final KvSinkInfo kvSink; + + static { + for (int i = 1; i < 4; i++) { + FieldInfo field = new FieldInfo(); + field.setName("string" + i); + srcFields.add(field); + } + for (int i = 1; i < 4; i++) { + FieldInfo field = new FieldInfo(); + field.setName("numeric" + i); + srcFields.add(field); + } + FieldInfo field = new FieldInfo(); + field.setName("result"); + dstFields.add(field); + csvSource = new CsvSourceInfo("UTF-8", '|', '\\', srcFields); + kvSink = new KvSinkInfo("UTF-8", dstFields); + } + @Test + public void testFromBase64Function() throws Exception { + String transformSql = "select from_base64(string1) from source"; + TransformConfig config = new TransformConfig(transformSql); + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: from_base64('aGVsbG8gd29ybGQ=') -> 'hello world' + List output1 = processor.transform("aGVsbG8gd29ybGQ=|apple|banana|cloud|1", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=hello world"); + + String transformSql2 = "select from_base64(stringX) from source"; + TransformConfig config2 = new TransformConfig(transformSql2); + TransformProcessor processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case2: from_base64(null) -> null + List output2 = processor2.transform("|apple|banana|cloud|1", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=null"); + + // case3: from_base64('QXBhY2hlIEluTG9uZw==') -> 'Apache InLong' + List output3 = processor.transform("QXBhY2hlIEluTG9uZw==|apple|banana|cloud|1", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=Apache InLong"); + } +} From 9bb3175212f210e9f825dfbf8e928db13521ec62 Mon Sep 17 00:00:00 2001 From: MOONSakura0614 <151456101+MOONSakura0614@users.noreply.github.com> Date: Thu, 5 Sep 2024 11:04:34 +0800 Subject: [PATCH 5/5] [INLONG-10940][SDK] Transform SQL support arithmetic functions(Including cot, tanh, cosh, asin, atan and atan2) (#10950) --- .../process/function/AsinFunction.java | 51 ++++++ .../process/function/Atan2Function.java | 58 +++++++ .../process/function/AtanFunction.java | 48 ++++++ .../process/function/CoshFunction.java | 48 ++++++ .../process/function/CotFunction.java | 55 +++++++ .../process/function/TanhFunction.java | 48 ++++++ ...TransformArithmeticFunctionsProcessor.java | 145 +++++++++++++++++- 7 files changed, 452 insertions(+), 1 deletion(-) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AsinFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Atan2Function.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AtanFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/CoshFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/CotFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TanhFunction.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AsinFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AsinFunction.java new file mode 100644 index 00000000000..ac45933509f --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AsinFunction.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; + +/** + * AsinFunction + * description: asin(numeric)--returns the arc sine of numeric + */ +@TransformFunction(names = {"asin"}) +public class AsinFunction implements ValueParser { + + private ValueParser numberParser; + + public AsinFunction(Function expr) { + numberParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object numberObj = numberParser.parse(sourceData, rowIndex, context); + if (numberObj == null) { + throw new NullPointerException("Parsed number object is null"); + } + BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj); + return Math.asin(numberValue.doubleValue()); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Atan2Function.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Atan2Function.java new file mode 100644 index 00000000000..733a323347c --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Atan2Function.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; + +/** + * Atan2Function + * description: atan2(numeric)--returns the arc tangent of a coordinate (numeric1, numeric2). + */ +@TransformFunction(names = {"atan2"}) +public class Atan2Function implements ValueParser { + + private ValueParser xParser; + private ValueParser yParser; + + public Atan2Function(Function expr) { + xParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + yParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(1)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object xObj = xParser.parse(sourceData, rowIndex, context); + Object yObj = yParser.parse(sourceData, rowIndex, context); + + if (xObj == null) { + throw new NullPointerException("Parsed number object on the x-axis is null"); + } + + BigDecimal xValue = OperatorTools.parseBigDecimal(xObj); + BigDecimal yValue = OperatorTools.parseBigDecimal(yObj); + + return Math.atan2(xValue.doubleValue(), yValue.doubleValue()); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AtanFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AtanFunction.java new file mode 100644 index 00000000000..13a7d0bac61 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AtanFunction.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; + +/** + * AtanFunction + * description: atan(numeric)--returns the arc tangent of numeric + */ +@TransformFunction(names = {"atan"}) +public class AtanFunction implements ValueParser { + + private ValueParser numberParser; + + public AtanFunction(Function expr) { + numberParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object numberObj = numberParser.parse(sourceData, rowIndex, context); + BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj); + return Math.atan(numberValue.doubleValue()); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/CoshFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/CoshFunction.java new file mode 100644 index 00000000000..0de7e84f03e --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/CoshFunction.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; + +/** + * CoshFunction + * description: cosh(numeric)--returns the hyperbolic cosine of numeric + */ +@TransformFunction(names = {"cosh"}) +public class CoshFunction implements ValueParser { + + private ValueParser numberParser; + + public CoshFunction(Function expr) { + numberParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object numberObj = numberParser.parse(sourceData, rowIndex, context); + BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj); + return Math.cosh(numberValue.doubleValue()); + } +} \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/CotFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/CotFunction.java new file mode 100644 index 00000000000..34bbe3231c2 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/CotFunction.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; + +/** + * CotFunction + * description: cot(numeric) -- returns the cotangent of the numeric (in radians) + */ +@TransformFunction(names = {"cot"}) +public class CotFunction implements ValueParser { + + private final ValueParser valueParser; + + public CotFunction(Function expr) { + this.valueParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object valueObj = valueParser.parse(sourceData, rowIndex, context); + + BigDecimal value = OperatorTools.parseBigDecimal(valueObj); + + // Calculate tan(x) and take the inverse to find cot(x) + double tanValue = Math.tan(value.doubleValue()); + if (tanValue == 0) { + throw new ArithmeticException("Cotangent undefined for this input, tan(x) is zero."); + } + return 1.0 / tanValue; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TanhFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TanhFunction.java new file mode 100644 index 00000000000..dd19eb57081 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TanhFunction.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; + +/** + * TanhFunction + * description: tanh(numeric)--returns the hyperbolic tangent of numeric + */ +@TransformFunction(names = {"tanh"}) +public class TanhFunction implements ValueParser { + + private ValueParser numberParser; + + public TanhFunction(Function expr) { + numberParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object numberObj = numberParser.parse(sourceData, rowIndex, context); + BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj); + return Math.tanh(numberValue.doubleValue()); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java index 738ef2e0f6d..b6254eb35d5 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java @@ -952,6 +952,150 @@ public void testRandFunction() throws Exception { Assert.assertTrue(result >= 0.0 && result < 1.0); } + @Test + public void testCotFunction() throws Exception { + String transformSql = "select cot(numeric1) from source"; + TransformConfig config = new TransformConfig(transformSql); + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: cot(1) + List output1 = processor.transform("1|4|6|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0.6420926159343306"); + + // case2: cot(0.5) + List output2 = processor.transform("0.5|4|6|8"); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=1.830487721712452"); + + // case3: cot(-1) + List output3 = processor.transform("-1|4|6|8"); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=-0.6420926159343306"); + } + + @Test + public void testTanhFunction() throws Exception { + String transformSql = "select tanh(numeric1) from source"; + TransformConfig config = new TransformConfig(transformSql); + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: tanh(1) + List output1 = processor.transform("1|4|6|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0.7615941559557649"); + + // case2: tanh(0) + List output2 = processor.transform("0|4|6|8"); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=0.0"); + + // case3: tanh(-1) + List output3 = processor.transform("-1|4|6|8"); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=-0.7615941559557649"); + } + + @Test + public void testCoshFunction() throws Exception { + String transformSql = "select cosh(numeric1) from source"; + TransformConfig config = new TransformConfig(transformSql); + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: cosh(1) + List output1 = processor.transform("1|4|6|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=1.543080634815244"); + + // case2: cosh(0) + List output2 = processor.transform("0|4|6|8"); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=1.0"); + + // case3: cosh(-1) + List output3 = processor.transform("-1|4|6|8"); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=1.543080634815244"); + } + + @Test + public void testAsinFunction() throws Exception { + String transformSql = "select asin(numeric1) from source"; + TransformConfig config = new TransformConfig(transformSql); + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: asin(0.5) + List output1 = processor.transform("0.5|4|6|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0.5235987755982989"); + + // case2: asin(0) + List output2 = processor.transform("0|4|6|8"); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=0.0"); + + // case3: asin(-0.5) + List output3 = processor.transform("-0.5|4|6|8"); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=-0.5235987755982989"); + } + + @Test + public void testAtanFunction() throws Exception { + String transformSql = "select atan(numeric1) from source"; + TransformConfig config = new TransformConfig(transformSql); + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: atan(1) + List output1 = processor.transform("1|4|6|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0.7853981633974483"); + + // case2: atan(0) + List output2 = processor.transform("0|4|6|8"); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=0.0"); + + // case3: atan(-1) + List output3 = processor.transform("-1|4|6|8"); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=-0.7853981633974483"); + } + + @Test + public void testAtan2Function() throws Exception { + String transformSql = "select atan2(numeric1, numeric2) from source"; + TransformConfig config = new TransformConfig(transformSql); + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: atan2(1, 1) + List output1 = processor.transform("1|1|6|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0.7853981633974483"); + + // case2: atan2(1, 0) + List output2 = processor.transform("1|0|6|8"); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=1.5707963267948966"); + + // case3: atan2(0, -1) + List output3 = processor.transform("0|-1|6|8"); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=3.141592653589793"); + } + @Test public void testHexFunction() throws Exception { String transformSql1 = "select hex(numeric1) from source"; @@ -993,5 +1137,4 @@ public void testPiFunction() throws Exception { Assert.assertEquals(1, output1.size()); Assert.assertEquals(output1.get(0), "result=3.141592653589793"); } - }