-
Notifications
You must be signed in to change notification settings - Fork 534
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[INLONG-11302][SDK] Transform SQL supports "JSON_INSERT" function (#1…
- Loading branch information
Showing
2 changed files
with
331 additions
and
0 deletions.
There are no files selected for viewing
264 changes: 264 additions & 0 deletions
264
...c/main/java/org/apache/inlong/sdk/transform/process/function/json/JsonInsertFunction.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,264 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.inlong.sdk.transform.process.function.json; | ||
|
||
import org.apache.inlong.sdk.transform.decode.SourceData; | ||
import org.apache.inlong.sdk.transform.process.Context; | ||
import org.apache.inlong.sdk.transform.process.function.TransformFunction; | ||
import org.apache.inlong.sdk.transform.process.operator.OperatorTools; | ||
import org.apache.inlong.sdk.transform.process.parser.ValueParser; | ||
|
||
import com.alibaba.fastjson.JSON; | ||
import com.alibaba.fastjson.JSONArray; | ||
import com.alibaba.fastjson.JSONObject; | ||
import net.sf.jsqlparser.expression.Expression; | ||
import net.sf.jsqlparser.expression.Function; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
/** | ||
* JsonInsertFunction -> JSON_INSERT(json_doc, path, val[, path, val, ...]) | ||
* description: | ||
* - return NULL if any argument is NULL or the 'json_doc' argument is not a valid JSON document | ||
* or any path argument is not a valid path expression or contains a * or ** wildcard; | ||
* - return the result of inserting data into 'json_doc'. | ||
*/ | ||
@TransformFunction(names = { | ||
"json_insert"}, parameter = "(String json_doc, String path1, String val1[, String path2, String val2, ...] )", descriptions = { | ||
"- Return \"\" if any argument is NULL or the 'json_doc' argument is not a valid JSON document " + | ||
"or any path argument is not a valid path expression or contains a * or ** wildcard.;", | ||
"- Return the result of inserting data into 'json_doc'." | ||
}, examples = { | ||
"json_insert({\"a\": {\"b\": [1, 2]}, \"c\": [3, 4]}, $.c[1][1], \"2\", \"$.c[1][1][5]\", \"1\") = " + | ||
"{\"a\":{\"b\":[1,2]},\"c\":[3,[4,[\"2\",\"1\"]]]}" | ||
}) | ||
public class JsonInsertFunction implements ValueParser { | ||
|
||
private final ValueParser jsonDocParser; | ||
private final List<ValueParser> pathValuePairsParser; | ||
|
||
public JsonInsertFunction(Function expr) { | ||
List<Expression> expressions = expr.getParameters().getExpressions(); | ||
jsonDocParser = OperatorTools.buildParser(expressions.get(0)); | ||
pathValuePairsParser = new ArrayList<>(); | ||
for (int i = 1; i < expressions.size(); i++) { | ||
pathValuePairsParser.add(OperatorTools.buildParser(expressions.get(i))); | ||
} | ||
} | ||
|
||
@Override | ||
public Object parse(SourceData sourceData, int rowIndex, Context context) { | ||
Object jsonDocObj = jsonDocParser.parse(sourceData, rowIndex, context); | ||
if (jsonDocObj == null) { | ||
return null; | ||
} | ||
ArrayList<Object> pathValuePairs = new ArrayList<>(); | ||
for (ValueParser valueParser : pathValuePairsParser) { | ||
pathValuePairs.add(valueParser.parse(sourceData, rowIndex, context)); | ||
} | ||
return jsonInsert(jsonDocObj.toString(), pathValuePairs); | ||
} | ||
|
||
private static String jsonInsert(String json, ArrayList<Object> pathValuePairs) { | ||
if (json == null || pathValuePairs == null || pathValuePairs.size() % 2 != 0) { | ||
return null; | ||
} | ||
|
||
JSONObject jsonObject; | ||
try { | ||
jsonObject = JSON.parseObject(json); | ||
} catch (Exception e) { | ||
// Invalid JSON document | ||
return null; | ||
} | ||
|
||
for (int i = 0; i < pathValuePairs.size(); i += 2) { | ||
String path = (String) pathValuePairs.get(i); | ||
Object value = pathValuePairs.get(i + 1); | ||
|
||
if (path == null || value == null) { | ||
return null; | ||
} | ||
|
||
if (path.contains("*") || path.contains("**")) { | ||
throw new IllegalArgumentException("Invalid path expression: " + path); | ||
} | ||
|
||
List<Object> tokens = parsePath(path); | ||
if (tokens.isEmpty()) { | ||
continue; | ||
} | ||
|
||
Object current = jsonObject; | ||
Object parent = null, currentToken = null; | ||
|
||
for (int j = 0; j < tokens.size() - 1; j++) { | ||
Object token = tokens.get(j); | ||
parent = current; | ||
currentToken = token; | ||
|
||
if (token instanceof String) { | ||
String key = (String) token; | ||
if (!(current instanceof JSONObject)) { | ||
JSONObject newObj = new JSONObject(); | ||
if (parent instanceof JSONObject) { | ||
((JSONObject) parent).put((String) currentToken, newObj); | ||
} else if (parent instanceof JSONArray) { | ||
((JSONArray) parent).set((Integer) currentToken, newObj); | ||
} | ||
current = newObj; | ||
} | ||
JSONObject jsonObj = (JSONObject) current; | ||
|
||
Object next = jsonObj.get(key); | ||
if (next == null) { | ||
Object nextToken = tokens.get(j + 1); | ||
next = nextToken instanceof Integer ? new JSONArray() : new JSONObject(); | ||
jsonObj.put(key, next); | ||
} | ||
current = next; | ||
} else if (token instanceof Integer) { | ||
int index = (Integer) token; | ||
if (!(current instanceof JSONArray)) { | ||
JSONArray newArr = new JSONArray(); | ||
newArr.add(current); | ||
if (parent instanceof JSONObject) { | ||
((JSONObject) parent).put((String) currentToken, newArr); | ||
} else if (parent instanceof JSONArray) { | ||
((JSONArray) parent).set((Integer) currentToken, newArr); | ||
} | ||
current = newArr; | ||
} | ||
JSONArray jsonArr = (JSONArray) current; | ||
if (jsonArr.size() <= index) { | ||
index = jsonArr.size(); | ||
} | ||
Object next = jsonArr.get(index); | ||
if (next == null) { | ||
Object nextToken = tokens.get(j + 1); | ||
next = nextToken instanceof Integer ? new JSONArray() : new JSONObject(); | ||
jsonArr.set(index, next); | ||
} | ||
current = next; | ||
} | ||
} | ||
|
||
Object lastToken = tokens.get(tokens.size() - 1); | ||
if (lastToken instanceof String) { | ||
String key = (String) lastToken; | ||
if (!(current instanceof JSONObject)) { | ||
JSONObject newObj = new JSONObject(); | ||
if (parent instanceof JSONObject) { | ||
((JSONObject) parent).put((String) currentToken, newObj); | ||
} else if (parent instanceof JSONArray) { | ||
((JSONArray) parent).set((Integer) currentToken, newObj); | ||
} | ||
current = newObj; | ||
} | ||
JSONObject jsonObj = (JSONObject) current; | ||
if (!jsonObj.containsKey(key)) { | ||
jsonObj.put(key, parseValue(value)); | ||
} | ||
} else if (lastToken instanceof Integer) { | ||
int index = (Integer) lastToken; | ||
if (!(current instanceof JSONArray)) { | ||
JSONArray newArr = new JSONArray(); | ||
newArr.add(current); | ||
if (parent instanceof JSONObject) { | ||
((JSONObject) parent).put((String) currentToken, newArr); | ||
} else if (parent instanceof JSONArray) { | ||
((JSONArray) parent).set((Integer) currentToken, newArr); | ||
} | ||
current = newArr; | ||
} | ||
JSONArray jsonArr = (JSONArray) current; | ||
if (index >= jsonArr.size() || jsonArr.get(index) == null) { | ||
if (jsonArr.size() <= index) { | ||
index = jsonArr.size(); | ||
} | ||
jsonArr.set(index, parseValue(value)); | ||
} | ||
} | ||
} | ||
|
||
return jsonObject.toJSONString(); | ||
} | ||
|
||
private static Object parseValue(Object value) { | ||
if (value instanceof String) { | ||
String str = (String) value; | ||
try { | ||
// Try to parse as JSON | ||
Object json = JSON.parse(str); | ||
if (json instanceof JSONObject || json instanceof JSONArray) { | ||
return json; | ||
} | ||
} catch (Exception ignored) { | ||
// Not a JSON string | ||
} | ||
} | ||
return value; | ||
} | ||
|
||
private static List<Object> parsePath(String path) { | ||
if (path == null || !path.startsWith("$")) { | ||
throw new IllegalArgumentException("Invalid path expression: " + path); | ||
} | ||
|
||
List<Object> tokens = new ArrayList<>(); | ||
int i = 1; // Skip the '$' | ||
int len = path.length(); | ||
|
||
while (i < len) { | ||
char c = path.charAt(i); | ||
if (c == '.') { | ||
i++; | ||
int start = i; | ||
while (i < len && path.charAt(i) != '.' && path.charAt(i) != '[') { | ||
i++; | ||
} | ||
if (start == i) { | ||
throw new IllegalArgumentException("Invalid path expression: " + path); | ||
} | ||
String key = path.substring(start, i); | ||
tokens.add(key); | ||
} else if (c == '[') { | ||
i++; | ||
int start = i; | ||
while (i < len && path.charAt(i) != ']') { | ||
i++; | ||
} | ||
if (i == len || path.charAt(i) != ']') { | ||
throw new IllegalArgumentException("Invalid path expression: " + path); | ||
} | ||
String indexStr = path.substring(start, i); | ||
i++; // Skip ']' | ||
try { | ||
int index = Integer.parseInt(indexStr); | ||
tokens.add(index); | ||
} catch (NumberFormatException e) { | ||
throw new IllegalArgumentException("Invalid array index in path: " + path); | ||
} | ||
} else { | ||
throw new IllegalArgumentException("Invalid path expression: " + path); | ||
} | ||
} | ||
return tokens; | ||
} | ||
} |
67 changes: 67 additions & 0 deletions
67
...st/java/org/apache/inlong/sdk/transform/process/function/json/TestJsonInsertFunction.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.inlong.sdk.transform.process.function.json; | ||
|
||
import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; | ||
import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; | ||
import org.apache.inlong.sdk.transform.pojo.TransformConfig; | ||
import org.apache.inlong.sdk.transform.process.TransformProcessor; | ||
import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase; | ||
|
||
import org.junit.Assert; | ||
import org.junit.Test; | ||
|
||
import java.util.HashMap; | ||
import java.util.List; | ||
|
||
public class TestJsonInsertFunction extends AbstractFunctionStringTestBase { | ||
|
||
@Test | ||
public void testJsonInsertFunction() throws Exception { | ||
String transformSql = null, data = null; | ||
TransformConfig config = null; | ||
TransformProcessor<String, String> processor = null; | ||
List<String> output = null; | ||
|
||
String json_doc = "{\\\"a\\\": {\\\"b\\\": [1, 2]}, \\\"c\\\": [3, 4]}"; | ||
transformSql = "select json_insert(string1,string2,string3,numeric1,numeric2) from source"; | ||
config = new TransformConfig(transformSql); | ||
processor = TransformProcessor | ||
.create(config, SourceDecoderFactory.createCsvDecoder(csvSource), | ||
SinkEncoderFactory.createKvEncoder(kvSink)); | ||
|
||
// case1: json_insert({"a": {"b": [1, 2]}, "c": [3, 4]}, $.c[1][1], 2, "$.c[1][1][5]", 1) | ||
data = json_doc + "|$.c[1][1]|2|$.c[1][1][5]|1"; | ||
output = processor.transform(data, new HashMap<>()); | ||
Assert.assertEquals(1, output.size()); | ||
Assert.assertEquals("result={\"a\":{\"b\":[1,2]},\"c\":[3,[4,[\"2\",\"1\"]]]}", output.get(0)); | ||
|
||
// case2: json_insert({"a": {"b": [1, 2]}, "c": [3, 4]}, $.c[1][1], 2, "$.c[1][1]", 1) | ||
data = json_doc + "|$.c[1][1]|2|$.c[1][1]|1"; | ||
output = processor.transform(data, new HashMap<>()); | ||
Assert.assertEquals(1, output.size()); | ||
Assert.assertEquals("result={\"a\":{\"b\":[1,2]},\"c\":[3,[4,\"2\"]]}", output.get(0)); | ||
|
||
// case3: json_insert({"a": {"b": [1, 2]}, "c": [3, 4]}, "$.c", 2, "$.d", 1) | ||
data = json_doc + "|$.c|2|$.d|1"; | ||
output = processor.transform(data, new HashMap<>()); | ||
Assert.assertEquals(1, output.size()); | ||
Assert.assertEquals("result={\"a\":{\"b\":[1,2]},\"c\":[3,4],\"d\":\"1\"}", output.get(0)); | ||
|
||
} | ||
} |