Skip to content

Commit

Permalink
[INLONG-11282][SDK] Transform enhance Array-related Collection Functi…
Browse files Browse the repository at this point in the history
…ons (#11288)
  • Loading branch information
emptyOVO authored Oct 11, 2024
1 parent 3da5447 commit a4e2e2e
Show file tree
Hide file tree
Showing 42 changed files with 3,161 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function.collection;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.function.TransformFunction;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Function;

import java.util.ArrayList;
/**
* ArrayAppendFunction
* description: ARRAY_APPEND(array, element)--Appends an element to the end of the array and returns the result.
* If the array itself is null, the function will return null. If an element to add is null, the null
* element will be added to the end of the array.
* for example: array_append(array('he',7,'xxd'), 'cloud')--return[he, 7, xxd, cloud]
*/
@TransformFunction(names = {"array_append"})
public class ArrayAppendFunction implements ValueParser {

private final ValueParser arrayParser;

private final ValueParser elementParser;

public ArrayAppendFunction(Function expr) {
this.arrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
this.elementParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(1));
}

@SuppressWarnings("unchecked")
@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object arrayObj = arrayParser.parse(sourceData, rowIndex, context);
Object elementObj = elementParser.parse(sourceData, rowIndex, context);
if (arrayObj == null) {
return null;
}
if (arrayObj instanceof ArrayList) {
ArrayList<Object> array = (ArrayList<Object>) arrayObj;
if (array.isEmpty()) {
return null;
}
array.add(elementObj);
return array;
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function.collection;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.function.TransformFunction;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.util.ArrayList;
import java.util.List;
/**
* ArrayConcatFunction
* description: ARRAY_CONCAT(array1, …)--Returns an array that is the result of concatenating at least one array.
* This array contains all the elements in the first array, followed by all the elements in the second
* array, and so forth, up to the Nth array. If any input array is NULL, the function returns NULL.
* for example: array_concat(array('he',7),array('xxd', 'cloud'))--return [he, 7, xxd, cloud]
*/
@TransformFunction(names = {"array_concat"})
public class ArrayConcatFunction implements ValueParser {

private List<ValueParser> parserList;

public ArrayConcatFunction(Function expr) {
if (expr.getParameters() == null) {
this.parserList = new ArrayList<>();
} else {
List<Expression> params = expr.getParameters().getExpressions();
parserList = new ArrayList<>(params.size());
for (Expression param : params) {
ValueParser node = OperatorTools.buildParser(param);
parserList.add(node);
}
}
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
ArrayList<Object> res = new ArrayList<>();
for (ValueParser valueParser : parserList) {
Object parseObj = valueParser.parse(sourceData, rowIndex, context);
if (parseObj == null) {
return null;
}
if (parseObj instanceof ArrayList) {
ArrayList<?> array = (ArrayList<?>) parseObj;
if (array.isEmpty()) {
return null;
}
res.addAll(array);
}
}
return res;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function.collection;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.function.TransformFunction;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Function;

import java.util.ArrayList;
/**
* ArrayContainsFunction
* description: ARRAY_CONTAINS(haystack, needle)--Returns whether the given element exists in an array. Checking for
* null elements in the array is supported. If the array itself is null, the function will return null.
* for example: array_contains(array('he',7,'xxd'), 'cloud')--return false
* array_contains(array('he',-1,''),'')--return true
*/
@TransformFunction(names = {"array_contains"})
public class ArrayContainsFunction implements ValueParser {

private final ValueParser arrayParser;

private final ValueParser elementParser;

public ArrayContainsFunction(Function expr) {
this.arrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
this.elementParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(1));
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object arrayObj = arrayParser.parse(sourceData, rowIndex, context);
Object elementObj = elementParser.parse(sourceData, rowIndex, context);
if (arrayObj == null) {
return null;
}
if (arrayObj instanceof ArrayList) {
ArrayList<?> array = (ArrayList<?>) arrayObj;
if (array.isEmpty()) {
return null;
}
for (Object element : array) {
if (element == null && elementObj == null) {
return true;
}
if (element != null && element.equals(elementObj)) {
return true;
}
}
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function.collection;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.function.TransformFunction;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Function;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Set;
/**
* ArrayDistinctFunction
* description: ARRAY_DISTINCT(haystack)--Returns an array with unique elements. If the array itself is null,
* the function will return null. Keeps ordering of elements.
* for example: array_distinct(array('he',-1,'he'))--return [he, -1]
*/
@TransformFunction(names = {"array_distinct"})
public class ArrayDistinctFunction implements ValueParser {

private final ValueParser arrayParser;

public ArrayDistinctFunction(Function expr) {
this.arrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object arrayObj = arrayParser.parse(sourceData, rowIndex, context);
if (arrayObj == null) {
return null;
}
if (arrayObj instanceof ArrayList) {
ArrayList<?> array = (ArrayList<?>) arrayObj;
if (array.isEmpty()) {
return null;
}
Set<Object> distinctSet = new LinkedHashSet<>(array);
return new ArrayList<>(distinctSet);
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function.collection;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.function.TransformFunction;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Function;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Set;
/**
* ArrayExceptFunction
* description: ARRAY_EXCEPT(array1, array2)--Returns an ARRAY that contains the elements from array1 that are not in
* array2, without duplicates. If no elements remain after excluding the elements in array2 from array1,
* the function returns an empty ARRAY. If one or both arguments are NULL, the function returns NULL.
* The order of the elements from array1 is kept.
* for example: array_except(array('he',7,'xxd'),array('he'))--return [7, xxd]
* array_except(array('he',7,'xxd'),array('cloud'))--return [he, 7, xxd]
*/
@TransformFunction(names = {"array_except"})
public class ArrayExceptFunction implements ValueParser {

private final ValueParser leftArrayParser;

private final ValueParser rightArrayParser;

public ArrayExceptFunction(Function expr) {
this.leftArrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
this.rightArrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(1));
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object leftArrayObj = leftArrayParser.parse(sourceData, rowIndex, context);
Object rightArrayObj = rightArrayParser.parse(sourceData, rowIndex, context);
if (leftArrayObj == null || rightArrayObj == null) {
return null;
}
if (leftArrayObj instanceof ArrayList && rightArrayObj instanceof ArrayList) {
ArrayList<?> leftArray = (ArrayList<?>) leftArrayObj;
ArrayList<?> rightArray = (ArrayList<?>) rightArrayObj;
if (leftArray.isEmpty() || rightArray.isEmpty()) {
return null;
}
Set<Object> res = new LinkedHashSet<>();

for (Object value : leftArray) {
if (!rightArray.contains(value)) {
res.add(value);
}
}
return new ArrayList<>(res);
}
return null;
}
}
Loading

0 comments on commit a4e2e2e

Please sign in to comment.