From 5aa37cd63c25850b924b1fa24b339fb52562da0e Mon Sep 17 00:00:00 2001 From: Zkplo <87751516+Zkplo@users.noreply.github.com> Date: Tue, 15 Oct 2024 17:35:34 +0800 Subject: [PATCH] [INLONG-11302][SDK] Transform SQL supports "JSON_INSERT" function (#11343) --- .../function/json/JsonInsertFunction.java | 264 ++++++++++++++++++ .../function/json/TestJsonInsertFunction.java | 67 +++++ 2 files changed, 331 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonInsertFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonInsertFunction.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonInsertFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonInsertFunction.java new file mode 100644 index 0000000000..69b15d6c3c --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonInsertFunction.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.json; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.List; + +/** + * JsonInsertFunction -> JSON_INSERT(json_doc, path, val[, path, val, ...]) + * description: + * - return NULL if any argument is NULL or the 'json_doc' argument is not a valid JSON document + * or any path argument is not a valid path expression or contains a * or ** wildcard; + * - return the result of inserting data into 'json_doc'. + */ +@TransformFunction(names = { + "json_insert"}, parameter = "(String json_doc, String path1, String val1[, String path2, String val2, ...] )", descriptions = { + "- Return \"\" if any argument is NULL or the 'json_doc' argument is not a valid JSON document " + + "or any path argument is not a valid path expression or contains a * or ** wildcard.;", + "- Return the result of inserting data into 'json_doc'." + }, examples = { + "json_insert({\"a\": {\"b\": [1, 2]}, \"c\": [3, 4]}, $.c[1][1], \"2\", \"$.c[1][1][5]\", \"1\") = " + + "{\"a\":{\"b\":[1,2]},\"c\":[3,[4,[\"2\",\"1\"]]]}" + }) +public class JsonInsertFunction implements ValueParser { + + private final ValueParser jsonDocParser; + private final List pathValuePairsParser; + + public JsonInsertFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + jsonDocParser = OperatorTools.buildParser(expressions.get(0)); + pathValuePairsParser = new ArrayList<>(); + for (int i = 1; i < expressions.size(); i++) { + pathValuePairsParser.add(OperatorTools.buildParser(expressions.get(i))); + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object jsonDocObj = jsonDocParser.parse(sourceData, rowIndex, context); + if (jsonDocObj == null) { + return null; + } + ArrayList pathValuePairs = new ArrayList<>(); + for (ValueParser valueParser : pathValuePairsParser) { + pathValuePairs.add(valueParser.parse(sourceData, rowIndex, context)); + } + return jsonInsert(jsonDocObj.toString(), pathValuePairs); + } + + private static String jsonInsert(String json, ArrayList pathValuePairs) { + if (json == null || pathValuePairs == null || pathValuePairs.size() % 2 != 0) { + return null; + } + + JSONObject jsonObject; + try { + jsonObject = JSON.parseObject(json); + } catch (Exception e) { + // Invalid JSON document + return null; + } + + for (int i = 0; i < pathValuePairs.size(); i += 2) { + String path = (String) pathValuePairs.get(i); + Object value = pathValuePairs.get(i + 1); + + if (path == null || value == null) { + return null; + } + + if (path.contains("*") || path.contains("**")) { + throw new IllegalArgumentException("Invalid path expression: " + path); + } + + List tokens = parsePath(path); + if (tokens.isEmpty()) { + continue; + } + + Object current = jsonObject; + Object parent = null, currentToken = null; + + for (int j = 0; j < tokens.size() - 1; j++) { + Object token = tokens.get(j); + parent = current; + currentToken = token; + + if (token instanceof String) { + String key = (String) token; + if (!(current instanceof JSONObject)) { + JSONObject newObj = new JSONObject(); + if (parent instanceof JSONObject) { + ((JSONObject) parent).put((String) currentToken, newObj); + } else if (parent instanceof JSONArray) { + ((JSONArray) parent).set((Integer) currentToken, newObj); + } + current = newObj; + } + JSONObject jsonObj = (JSONObject) current; + + Object next = jsonObj.get(key); + if (next == null) { + Object nextToken = tokens.get(j + 1); + next = nextToken instanceof Integer ? new JSONArray() : new JSONObject(); + jsonObj.put(key, next); + } + current = next; + } else if (token instanceof Integer) { + int index = (Integer) token; + if (!(current instanceof JSONArray)) { + JSONArray newArr = new JSONArray(); + newArr.add(current); + if (parent instanceof JSONObject) { + ((JSONObject) parent).put((String) currentToken, newArr); + } else if (parent instanceof JSONArray) { + ((JSONArray) parent).set((Integer) currentToken, newArr); + } + current = newArr; + } + JSONArray jsonArr = (JSONArray) current; + if (jsonArr.size() <= index) { + index = jsonArr.size(); + } + Object next = jsonArr.get(index); + if (next == null) { + Object nextToken = tokens.get(j + 1); + next = nextToken instanceof Integer ? new JSONArray() : new JSONObject(); + jsonArr.set(index, next); + } + current = next; + } + } + + Object lastToken = tokens.get(tokens.size() - 1); + if (lastToken instanceof String) { + String key = (String) lastToken; + if (!(current instanceof JSONObject)) { + JSONObject newObj = new JSONObject(); + if (parent instanceof JSONObject) { + ((JSONObject) parent).put((String) currentToken, newObj); + } else if (parent instanceof JSONArray) { + ((JSONArray) parent).set((Integer) currentToken, newObj); + } + current = newObj; + } + JSONObject jsonObj = (JSONObject) current; + if (!jsonObj.containsKey(key)) { + jsonObj.put(key, parseValue(value)); + } + } else if (lastToken instanceof Integer) { + int index = (Integer) lastToken; + if (!(current instanceof JSONArray)) { + JSONArray newArr = new JSONArray(); + newArr.add(current); + if (parent instanceof JSONObject) { + ((JSONObject) parent).put((String) currentToken, newArr); + } else if (parent instanceof JSONArray) { + ((JSONArray) parent).set((Integer) currentToken, newArr); + } + current = newArr; + } + JSONArray jsonArr = (JSONArray) current; + if (index >= jsonArr.size() || jsonArr.get(index) == null) { + if (jsonArr.size() <= index) { + index = jsonArr.size(); + } + jsonArr.set(index, parseValue(value)); + } + } + } + + return jsonObject.toJSONString(); + } + + private static Object parseValue(Object value) { + if (value instanceof String) { + String str = (String) value; + try { + // Try to parse as JSON + Object json = JSON.parse(str); + if (json instanceof JSONObject || json instanceof JSONArray) { + return json; + } + } catch (Exception ignored) { + // Not a JSON string + } + } + return value; + } + + private static List parsePath(String path) { + if (path == null || !path.startsWith("$")) { + throw new IllegalArgumentException("Invalid path expression: " + path); + } + + List tokens = new ArrayList<>(); + int i = 1; // Skip the '$' + int len = path.length(); + + while (i < len) { + char c = path.charAt(i); + if (c == '.') { + i++; + int start = i; + while (i < len && path.charAt(i) != '.' && path.charAt(i) != '[') { + i++; + } + if (start == i) { + throw new IllegalArgumentException("Invalid path expression: " + path); + } + String key = path.substring(start, i); + tokens.add(key); + } else if (c == '[') { + i++; + int start = i; + while (i < len && path.charAt(i) != ']') { + i++; + } + if (i == len || path.charAt(i) != ']') { + throw new IllegalArgumentException("Invalid path expression: " + path); + } + String indexStr = path.substring(start, i); + i++; // Skip ']' + try { + int index = Integer.parseInt(indexStr); + tokens.add(index); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid array index in path: " + path); + } + } else { + throw new IllegalArgumentException("Invalid path expression: " + path); + } + } + return tokens; + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonInsertFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonInsertFunction.java new file mode 100644 index 0000000000..498093107d --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonInsertFunction.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.json; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestJsonInsertFunction extends AbstractFunctionStringTestBase { + + @Test + public void testJsonInsertFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + String json_doc = "{\\\"a\\\": {\\\"b\\\": [1, 2]}, \\\"c\\\": [3, 4]}"; + transformSql = "select json_insert(string1,string2,string3,numeric1,numeric2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: json_insert({"a": {"b": [1, 2]}, "c": [3, 4]}, $.c[1][1], 2, "$.c[1][1][5]", 1) + data = json_doc + "|$.c[1][1]|2|$.c[1][1][5]|1"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={\"a\":{\"b\":[1,2]},\"c\":[3,[4,[\"2\",\"1\"]]]}", output.get(0)); + + // case2: json_insert({"a": {"b": [1, 2]}, "c": [3, 4]}, $.c[1][1], 2, "$.c[1][1]", 1) + data = json_doc + "|$.c[1][1]|2|$.c[1][1]|1"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={\"a\":{\"b\":[1,2]},\"c\":[3,[4,\"2\"]]}", output.get(0)); + + // case3: json_insert({"a": {"b": [1, 2]}, "c": [3, 4]}, "$.c", 2, "$.d", 1) + data = json_doc + "|$.c|2|$.d|1"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={\"a\":{\"b\":[1,2]},\"c\":[3,4],\"d\":\"1\"}", output.get(0)); + + } +}