From 0d55d154e6a048376cb0080446c2babad482cd74 Mon Sep 17 00:00:00 2001 From: ZKpLo <14148880+zkplo@user.noreply.gitee.com> Date: Thu, 10 Oct 2024 10:23:39 +0800 Subject: [PATCH] [INLONG-11301][SDK] Transform SQL supports "JSON_ARRAY_INSERT" function --- .../function/JsonArrayInsertFunction.java | 124 ++++++++++++++++++ .../string/TestJsonArrayInsertFunction.java | 89 +++++++++++++ 2 files changed, 213 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/JsonArrayInsertFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestJsonArrayInsertFunction.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/JsonArrayInsertFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/JsonArrayInsertFunction.java new file mode 100644 index 0000000000..e88bfaaee5 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/JsonArrayInsertFunction.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.JSONPath; +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.List; + +/** + * JsonArrayInsertFunction -> JSON_ARRAY_INSERT(json_doc, path, val[, path, val] ...) + * description: + * - return NULL if any argument is NULL; + * - return the document inserted into the array. + * Note: If multiple groups of parameters are passed in, the parameter subscripts of the latter groups + * need to be based on the document subscripts after the previous group of parameters are updated. + */ +@TransformFunction(names = {"json_array_insert"}) +public class JsonArrayInsertFunction implements ValueParser { + + private ValueParser jsonDocParser; + private List pathValuePairsParser; + + public JsonArrayInsertFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + jsonDocParser = OperatorTools.buildParser(expressions.get(0)); + pathValuePairsParser = new ArrayList<>(); + for (int i = 1; i < expressions.size(); i++) { + pathValuePairsParser.add(OperatorTools.buildParser(expressions.get(i))); + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object jsonDocObj = jsonDocParser.parse(sourceData, rowIndex, context); + if (jsonDocObj == null) { + return null; + } + ArrayList pathValuePairs = new ArrayList<>(); + for (ValueParser valueParser : pathValuePairsParser) { + pathValuePairs.add(valueParser.parse(sourceData, rowIndex, context)); + } + return jsonArrayInsert(jsonDocObj.toString(), pathValuePairs); + } + + public static String jsonArrayInsert(String jsonDoc, ArrayList pathValuePairs) { + if (jsonDoc == null || pathValuePairs == null || pathValuePairs.size() % 2 != 0) { + return null; + } + + Object jsonObject = JSON.parse(jsonDoc); + + for (int i = 0; i < pathValuePairs.size(); i += 2) { + String path = (String) pathValuePairs.get(i); + Object value = pathValuePairs.get(i + 1); + + if (!path.endsWith("]")) { + throw new IllegalArgumentException("Path must end with an array index: " + path); + } + + // Find the parent path + String parentPath = path.substring(0, path.lastIndexOf('[')); + Object parentNode = JSONPath.eval(jsonObject, parentPath); + + if (parentNode instanceof JSONArray) { + // If the parent path is an array, perform the insert operation + insertIntoArray((JSONArray) parentNode, path, value); + } else if (parentNode instanceof JSONObject) { + // If the parent path is an object, try inserting it into the array inside the object + String arrayIndexPart = path.substring(path.lastIndexOf('['), path.lastIndexOf(']') + 1); + handleArrayInsertionInObject((JSONObject) parentNode, arrayIndexPart, value); + } else { + throw new IllegalArgumentException("Invalid path or target node is not an array or object: " + path); + } + } + + return JSON.toJSONString(jsonObject); + } + + private static void insertIntoArray(JSONArray array, String path, Object value) { + String indexPart = path.substring(path.lastIndexOf('[') + 1, path.lastIndexOf(']')); + int index = Integer.parseInt(indexPart); + + // If the index exceeds the length of the array, insert at the end + if (index >= array.size()) { + array.add(value); + } else { + array.add(index, value); + } + } + + private static void handleArrayInsertionInObject(JSONObject jsonObject, String arrayPart, Object value) { + String arrayField = arrayPart.substring(1, arrayPart.length() - 1); + JSONArray array = jsonObject.getJSONArray(arrayField); + if (array != null) { + insertIntoArray(array, arrayPart, value); + } + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestJsonArrayInsertFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestJsonArrayInsertFunction.java new file mode 100644 index 0000000000..82f30282e2 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestJsonArrayInsertFunction.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.string; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestJsonArrayInsertFunction extends AbstractFunctionStringTestBase { + + @Test + public void testJsonArrayInsertFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select json_array_insert(string1,string2,string3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: json_array_append(["a", {"b": [1, 2]}, [3, 4]], $[1], x) + data = "[\\\"a\\\", {\\\"b\\\": [1, 2]}, [3, 4]]|$[1]|x|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[\"a\",\"x\",{\"b\":[1,2]},[3,4]]", output.get(0)); + + // case2: json_array_append(["a", {"b": [1, 2]}, [3, 4]], $[100], x) + data = "[\\\"a\\\", {\\\"b\\\": [1, 2]}, [3, 4]]|$[100]|x|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[\"a\",{\"b\":[1,2]},[3,4],\"x\"]", output.get(0)); + + // case3: json_array_append(["a", {"b": [1, 2]}, [3, 4]], $[100], x) + data = "[\\\"a\\\", {\\\"b\\\": [1, 2]}, [3, 4]]|$[1].b[0]|x|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[\"a\",{\"b\":[\"x\",1,2]},[3,4]]", output.get(0)); + + // case4: json_array_append(["a", {"b": [1, 2]}, [3, 4]], $[100], y) + data = "[\\\"a\\\", {\\\"b\\\": [1, 2]}, [3, 4]]|$[2][1]|y|"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[\"a\",{\"b\":[1,2]},[3,\"y\",4]]", output.get(0)); + + transformSql = "select json_array_insert(string1,string2,string3,numeric1,numeric2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case5: json_array_append(["a", {"b": [1, 2]}, [3, 4]], "$[0]", "x", "$[2][1]", "y") + data = "[\\\"a\\\", {\\\"b\\\": [1, 2]}, [3, 4]]|$[0]|x|$[2][1]|y"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[\"x\",\"a\",{\"b\":[1,2]},[3,4]]", output.get(0)); + + // case6: json_array_append(["a", {"b": [1, 2]}, [3, 4]], "$[0]", "x", "$[3][1]", "y") + data = "[\\\"a\\\", {\\\"b\\\": [1, 2]}, [3, 4]]|$[0]|x|$[3][1]|y"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[\"x\",\"a\",{\"b\":[1,2]},[3,\"y\",4]]", output.get(0)); + + } +}