diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ModuloFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ModuloFunction.java new file mode 100644 index 00000000000..b6b8b3c0af1 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ModuloFunction.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; +import java.util.List; + +/** + * ModuloFunction + * description: MOD(NUMERIC1, NUMERIC2) : Return the remainder of numeric1 divided by numeric2. + */ +public class ModuloFunction implements ValueParser { + + private ValueParser dividendParser; + private ValueParser divisorParser; + + public ModuloFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + dividendParser = OperatorTools.buildParser(expressions.get(0)); + divisorParser = OperatorTools.buildParser(expressions.get(1)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object dividendObj = dividendParser.parse(sourceData, rowIndex, context); + Object divisorObj = divisorParser.parse(sourceData, rowIndex, context); + BigDecimal dividend = OperatorTools.parseBigDecimal(dividendObj); + BigDecimal divisor = OperatorTools.parseBigDecimal(divisorObj); + return dividend.remainder(divisor); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index b47d39e298a..2305d5165ca 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -34,6 +34,7 @@ import org.apache.inlong.sdk.transform.process.function.Log10Function; import org.apache.inlong.sdk.transform.process.function.Log2Function; import org.apache.inlong.sdk.transform.process.function.LogFunction; +import org.apache.inlong.sdk.transform.process.function.ModuloFunction; import org.apache.inlong.sdk.transform.process.function.NowFunction; import org.apache.inlong.sdk.transform.process.function.PowerFunction; import org.apache.inlong.sdk.transform.process.function.ReplicateFunction; @@ -53,24 +54,30 @@ import org.apache.inlong.sdk.transform.process.parser.ColumnParser; import org.apache.inlong.sdk.transform.process.parser.DateParser; import org.apache.inlong.sdk.transform.process.parser.DivisionParser; +import org.apache.inlong.sdk.transform.process.parser.DoubleParser; import org.apache.inlong.sdk.transform.process.parser.LongParser; +import org.apache.inlong.sdk.transform.process.parser.ModuloParser; import org.apache.inlong.sdk.transform.process.parser.MultiplicationParser; import org.apache.inlong.sdk.transform.process.parser.ParenthesisParser; +import org.apache.inlong.sdk.transform.process.parser.SignParser; import org.apache.inlong.sdk.transform.process.parser.StringParser; import org.apache.inlong.sdk.transform.process.parser.SubtractionParser; import org.apache.inlong.sdk.transform.process.parser.TimestampParser; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.DateValue; +import net.sf.jsqlparser.expression.DoubleValue; import net.sf.jsqlparser.expression.Expression; import net.sf.jsqlparser.expression.Function; import net.sf.jsqlparser.expression.LongValue; import net.sf.jsqlparser.expression.NotExpression; import net.sf.jsqlparser.expression.Parenthesis; +import net.sf.jsqlparser.expression.SignedExpression; import net.sf.jsqlparser.expression.StringValue; import net.sf.jsqlparser.expression.TimestampValue; import net.sf.jsqlparser.expression.operators.arithmetic.Addition; import net.sf.jsqlparser.expression.operators.arithmetic.Division; +import net.sf.jsqlparser.expression.operators.arithmetic.Modulo; import net.sf.jsqlparser.expression.operators.arithmetic.Multiplication; import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction; import net.sf.jsqlparser.expression.operators.conditional.AndExpression; @@ -92,7 +99,7 @@ /** * OperatorTools - * + * */ public class OperatorTools { @@ -144,6 +151,7 @@ public class OperatorTools { functionMap.put("from_unixtime", FromUnixTimeFunction::new); functionMap.put("unix_timestamp", UnixTimestampFunction::new); functionMap.put("to_timestamp", ToTimestampFunction::new); + functionMap.put("mod", ModuloFunction::new); functionMap.put("to_base64", ToBase64Function::new); functionMap.put("length", LengthFunction::new); } @@ -180,6 +188,10 @@ public static ValueParser buildParser(Expression expr) { return new StringParser((StringValue) expr); } else if (expr instanceof LongValue) { return new LongParser((LongValue) expr); + } else if (expr instanceof DoubleValue) { + return new DoubleParser((DoubleValue) expr); + } else if (expr instanceof SignedExpression) { + return new SignParser((SignedExpression) expr); } else if (expr instanceof Parenthesis) { return new ParenthesisParser((Parenthesis) expr); } else if (expr instanceof Addition) { @@ -190,6 +202,8 @@ public static ValueParser buildParser(Expression expr) { return new MultiplicationParser((Multiplication) expr); } else if (expr instanceof Division) { return new DivisionParser((Division) expr); + } else if (expr instanceof Modulo) { + return new ModuloParser((Modulo) expr); } else if (expr instanceof DateValue) { return new DateParser((DateValue) expr); } else if (expr instanceof TimestampValue) { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java new file mode 100644 index 00000000000..2f50b5311b2 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; + +import net.sf.jsqlparser.expression.DoubleValue; + +/** + * LongParser + */ +public class DoubleParser implements ValueParser { + + private final Double value; + + public DoubleParser(DoubleValue expr) { + this.value = expr.getValue(); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + return value; + } +} \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ModuloParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ModuloParser.java new file mode 100644 index 00000000000..ca0a8411709 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ModuloParser.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; + +import net.sf.jsqlparser.expression.operators.arithmetic.Modulo; + +import java.math.BigDecimal; + +/** + * ModuloParser + * description: analyze the % expression + */ +public class ModuloParser implements ValueParser { + + private ValueParser left; + + private ValueParser right; + + public ModuloParser(Modulo expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object leftObj = this.left.parse(sourceData, rowIndex, context); + Object rightObj = this.right.parse(sourceData, rowIndex, context); + BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj); + BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj); + return leftValue.remainder(rightValue); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SignParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SignParser.java new file mode 100644 index 00000000000..360fc0d0f6e --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SignParser.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; + +import net.sf.jsqlparser.expression.SignedExpression; + +import java.math.BigDecimal; + +/** + * SignParser + * + */ +public class SignParser implements ValueParser { + + private final Integer sign; + private final ValueParser number; + + public SignParser(SignedExpression expr) { + sign = expr.getSign() == '-' ? -1 : 1; + number = OperatorTools.buildParser(expr.getExpression()); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object numberObject = number.parse(sourceData, rowIndex, context); + BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObject); + return numberValue.multiply(new BigDecimal(sign)); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java index e725345558d..f6596cda1af 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java @@ -55,6 +55,153 @@ public class TestTransformArithmeticFunctionsProcessor { kvSink = new KvSinkInfo("UTF-8", dstFields); } + @Test + public void testModuloFunction() throws Exception { + String transformFunctionSql = "select mod(numeric1,100) from source"; + String transformExpressionSql = "select numeric1 % 100 from source"; + List output1, output2; + String data; + TransformConfig functionConfig = new TransformConfig(transformFunctionSql); + TransformProcessor functionProcessor = TransformProcessor + .create(functionConfig, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + TransformConfig expressionConfig = new TransformConfig(transformExpressionSql); + TransformProcessor expressionProcessor = TransformProcessor + .create(expressionConfig, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: "mod(3.1415926,100)" and "3.1415926 % 100" + data = "3.1415926|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=3.1415926", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=3.1415926", output2.get(0)); + + // case2: "mod(-3.1415926,100)" and "-3.1415926 % 100" + data = "-3.1415926|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=-3.1415926", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=-3.1415926", output2.get(0)); + + // case3: "mod(320,100)" and "320 % 100" + data = "320|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=20", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=20", output2.get(0)); + + // case4: "mod(-320,100)" and "-320 % 100" + data = "-320|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=-20", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=-20", output2.get(0)); + + transformFunctionSql = "select mod(numeric1,-10) from source"; + transformExpressionSql = "select numeric1 % -10 from source"; + functionConfig = new TransformConfig(transformFunctionSql); + functionProcessor = TransformProcessor + .create(functionConfig, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + expressionConfig = new TransformConfig(transformExpressionSql); + expressionProcessor = TransformProcessor + .create(expressionConfig, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case5: "mod(9,-10)" and "9 % -10" + data = "9|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=9", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=9", output2.get(0)); + + // case6: "mod(-13,-10)" and "-13 % -10" + data = "-13|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=-3", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=-3", output2.get(0)); + + // case7: "mod(-13.14,-10)" and "-13.14 % -10" + data = "-13.14|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=-3.14", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=-3.14", output2.get(0)); + + // case8: "mod(13.14,-10)" and "13.14 % -10" + data = "13.14|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=3.14", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=3.14", output2.get(0)); + + transformFunctionSql = "select mod(numeric1,-3.14) from source"; + transformExpressionSql = "select numeric1 % -3.14 from source"; + functionConfig = new TransformConfig(transformFunctionSql); + functionProcessor = TransformProcessor + .create(functionConfig, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + expressionConfig = new TransformConfig(transformExpressionSql); + expressionProcessor = TransformProcessor + .create(expressionConfig, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case9: "mod(9,-3.14)" and "9 % -3.14" + data = "9|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=2.72", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=2.72", output2.get(0)); + + // case10: "mod(-9,-3.14)" and "-9 % -3.14" + data = "-9|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=-2.72", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=-2.72", output2.get(0)); + + // case11: "mod(-13.14,-3.14)" and "-13.14 % -3.14" + data = "-13.14|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=-0.58", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=-0.58", output2.get(0)); + + // case12: "mod(13.14,-3.14)" and "13.14 % -3.14" + data = "13.14|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=0.58", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=0.58", output2.get(0)); + + } + @Test public void testRoundFunction() throws Exception { String transformSql = "select round(numeric1) from source";