Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
luchunliang authored Sep 5, 2024
2 parents 9484b4c + 9bb3175 commit 50d430c
Show file tree
Hide file tree
Showing 18 changed files with 843 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ private DataFlowConfig getDataFlowConfig(InlongGroupInfo groupInfo, InlongStream
.dataflowId(String.valueOf(sink.getId()))
.sourceConfig(getSourceConfig(groupInfo, streamInfo, sink))
.auditTag(String.valueOf(sink.getId()))
.transformSql(sink.getTransformSql())
.sinkConfig(getSinkConfig(groupInfo, streamInfo, sink))
.inlongGroupId(groupInfo.getInlongGroupId())
.inlongStreamId(streamInfo.getInlongStreamId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ public String encode(SinkData sinkData, Context context) {
} else {
for (String fieldName : sinkData.keyList()) {
String fieldValue = sinkData.getField(fieldName);
EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue);
if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) {
builder.append(fieldValue);
} else {
EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue);
}
builder.append(delimiter);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ public String encode(SinkData sinkData, Context context) {
if (fields == null || fields.size() == 0) {
for (String fieldName : sinkData.keyList()) {
String fieldValue = sinkData.getField(fieldName);
builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter);
if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) {
builder.append(fieldValue).append(entryDelimiter);
} else {
builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter);
}
}
} else {
for (FieldInfo field : fields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
*/
public interface SinkEncoder<Output> {

public static final String ALL_SOURCE_FIELD_SIGN = "*";

Output encode(SinkData sinkData, Context context);

List<FieldInfo> getFields();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,23 @@
import com.google.common.collect.ImmutableMap;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserManager;
import net.sf.jsqlparser.statement.select.AllColumns;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
* TransformProcessor
*
*
*/
public class TransformProcessor<I, O> {

Expand All @@ -61,7 +61,9 @@ public class TransformProcessor<I, O> {

private PlainSelect transformSelect;
private ExpressionOperator where;
private Map<String, ValueParser> selectItemMap;
private List<ValueParserNode> selectItems;

private boolean includeAllSourceFields = false;

public static <I, O> TransformProcessor<I, O> create(
TransformConfig config,
Expand Down Expand Up @@ -91,7 +93,7 @@ private void initTransformSql() throws JSQLParserException {
this.transformSelect = (PlainSelect) select.getSelectBody();
this.where = OperatorTools.buildOperator(this.transformSelect.getWhere());
List<SelectItem> items = this.transformSelect.getSelectItems();
this.selectItemMap = new HashMap<>(items.size());
this.selectItems = new ArrayList<>(items.size());
List<FieldInfo> fields = this.encoder.getFields();
for (int i = 0; i < items.size(); i++) {
SelectItem item = items.get(i);
Expand All @@ -108,8 +110,12 @@ private void initTransformSql() throws JSQLParserException {
fieldName = exprItem.getAlias().getName();
}
}
this.selectItemMap.put(fieldName,
OperatorTools.buildParser(exprItem.getExpression()));
this.selectItems
.add(new ValueParserNode(fieldName, OperatorTools.buildParser(exprItem.getExpression())));
} else if (item instanceof AllColumns) {
fieldName = item.toString();
this.encoder.getFields().clear();
this.selectItems.add(new ValueParserNode(fieldName, null));
}
}
}
Expand Down Expand Up @@ -137,10 +143,18 @@ public List<O> transform(I input, Map<String, Object> extParams) {

// parse value
SinkData sinkData = new DefaultSinkData();
for (Entry<String, ValueParser> entry : this.selectItemMap.entrySet()) {
String fieldName = entry.getKey();
for (ValueParserNode node : this.selectItems) {
String fieldName = node.getFieldName();
ValueParser parser = node.getParser();
if (parser == null && StringUtils.equals(fieldName, SinkEncoder.ALL_SOURCE_FIELD_SIGN)) {
if (input instanceof String) {
sinkData.addField(fieldName, (String) input);
} else {
sinkData.addField(fieldName, "");
}
continue;
}
try {
ValueParser parser = entry.getValue();
Object fieldValue = parser.parse(sourceData, i, context);
sinkData.addField(fieldName, String.valueOf(fieldValue));
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process;

import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import lombok.AllArgsConstructor;
import lombok.Data;

/**
* ValueParserNode
*/
@AllArgsConstructor
@Data
public class ValueParserNode {

private String fieldName;
private ValueParser parser;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Function;

import java.math.BigDecimal;

/**
* AsinFunction
* description: asin(numeric)--returns the arc sine of numeric
*/
@TransformFunction(names = {"asin"})
public class AsinFunction implements ValueParser {

private ValueParser numberParser;

public AsinFunction(Function expr) {
numberParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object numberObj = numberParser.parse(sourceData, rowIndex, context);
if (numberObj == null) {
throw new NullPointerException("Parsed number object is null");
}
BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
return Math.asin(numberValue.doubleValue());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Function;

import java.math.BigDecimal;

/**
* Atan2Function
* description: atan2(numeric)--returns the arc tangent of a coordinate (numeric1, numeric2).
*/
@TransformFunction(names = {"atan2"})
public class Atan2Function implements ValueParser {

private ValueParser xParser;
private ValueParser yParser;

public Atan2Function(Function expr) {
xParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
yParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(1));
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object xObj = xParser.parse(sourceData, rowIndex, context);
Object yObj = yParser.parse(sourceData, rowIndex, context);

if (xObj == null) {
throw new NullPointerException("Parsed number object on the x-axis is null");
}

BigDecimal xValue = OperatorTools.parseBigDecimal(xObj);
BigDecimal yValue = OperatorTools.parseBigDecimal(yObj);

return Math.atan2(xValue.doubleValue(), yValue.doubleValue());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Function;

import java.math.BigDecimal;

/**
* AtanFunction
* description: atan(numeric)--returns the arc tangent of numeric
*/
@TransformFunction(names = {"atan"})
public class AtanFunction implements ValueParser {

private ValueParser numberParser;

public AtanFunction(Function expr) {
numberParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object numberObj = numberParser.parse(sourceData, rowIndex, context);
BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
return Math.atan(numberValue.doubleValue());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.util.List;

/**
* ContainsFunction
* description: contains(left, right) - Returns a boolean.
* The value is True if right is found inside left, otherwise, returns False.
* Both left or right must be of STRING type.
*/
@TransformFunction(names = {"contains"})
public class ContainsFunction implements ValueParser {

private ValueParser leftStrParser;
private ValueParser rightStrParser;

public ContainsFunction(Function expr) {
List<Expression> expressions = expr.getParameters().getExpressions();
leftStrParser = OperatorTools.buildParser(expressions.get(0));
rightStrParser = OperatorTools.buildParser(expressions.get(1));
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object leftStrObj = leftStrParser.parse(sourceData, rowIndex, context);
Object rightStrObj = rightStrParser.parse(sourceData, rowIndex, context);
String leftStr = OperatorTools.parseString(leftStrObj);
String rightStr = OperatorTools.parseString(rightStrObj);
return (leftStr == null || rightStr == null) ? null : leftStr.contains(rightStr);
}
}
Loading

0 comments on commit 50d430c

Please sign in to comment.