Skip to content

Commit

Permalink
Add ignoreConfigError config
Browse files Browse the repository at this point in the history
  • Loading branch information
luchunliang committed Sep 5, 2024
1 parent ad72398 commit 7587f41
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,34 @@ public class TransformConfig {
@JsonProperty("strictOrder")
private boolean strictOrder = true;

@JsonProperty("ignoreConfigError")
private boolean ignoreConfigError = true;

@JsonCreator
public TransformConfig(@JsonProperty("transformSql") String transformSql) {
this(transformSql, ImmutableMap.of(), true);
this(transformSql, ImmutableMap.of(), true, true);
}

@JsonCreator
public TransformConfig(@JsonProperty("transformSql") String transformSql,
@JsonProperty("configuration") Map<String, Object> configuration) {
this(transformSql, configuration, true);
this(transformSql, configuration, true, true);
}
@JsonCreator
public TransformConfig(@JsonProperty("transformSql") String transformSql,
@JsonProperty("strictOrder") boolean strictOrder) {
this(transformSql, ImmutableMap.of(), strictOrder);
this(transformSql, ImmutableMap.of(), strictOrder, true);
}

@JsonCreator
public TransformConfig(@JsonProperty("transformSql") String transformSql,
@JsonProperty("configuration") Map<String, Object> configuration,
@JsonProperty("strictOrder") boolean strictOrder) {
@JsonProperty("strictOrder") boolean strictOrder,
@JsonProperty("ignoreConfigError") boolean ignoreConfigError) {
this.transformSql = Preconditions.checkNotNull(transformSql, "transform sql should not be null");
this.configuration = configuration;
this.strictOrder = strictOrder;
this.ignoreConfigError = ignoreConfigError;
}

/**
Expand All @@ -78,10 +83,15 @@ public Map<String, Object> getConfiguration() {
}

@JsonProperty("strictOrder")
public boolean getStrictOrder() {
public boolean isStrictOrder() {
return strictOrder;
}

@JsonProperty("ignoreConfigError")
public boolean isIgnoreConfigError() {
return ignoreConfigError;
}

/**
* set transformSql
* @param transformSql the transformSql to set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ private TransformProcessor(
}

private void init() throws JSQLParserException {
this.initTransformSql();
if (!config.getStrictOrder() && encoder != null && encoder.getFields() != null) {
if (!config.isStrictOrder() && encoder != null && encoder.getFields() != null) {
List<FieldInfo> fields = encoder.getFields();
this.sinkFieldList = new ArrayList<>(fields.size());
fields.forEach(v -> this.sinkFieldList.add(v.getName()));
}
this.initTransformSql();
}

private void initTransformSql() throws JSQLParserException {
Expand All @@ -102,7 +102,7 @@ private void initTransformSql() throws JSQLParserException {
for (int i = 0; i < items.size(); i++) {
SelectItem item = items.get(i);
String fieldName = null;
if (config.getStrictOrder() && i < fields.size()) {
if (config.isStrictOrder() && i < fields.size()) {
fieldName = fields.get(i).getName();
}
if (item instanceof SelectExpressionItem) {
Expand All @@ -113,6 +113,10 @@ private void initTransformSql() throws JSQLParserException {
} else {
fieldName = exprItem.getAlias().getName();
}
if (!this.checkSelectField(fieldName)) {
throw new JSQLParserException(
String.format("Field name:%s can not be found in sink field list.", fieldName));
}
}
this.selectItems
.add(new ValueParserNode(fieldName, OperatorTools.buildParser(exprItem.getExpression())));
Expand All @@ -124,6 +128,13 @@ private void initTransformSql() throws JSQLParserException {
}
}

public boolean checkSelectField(String fieldName) {
if (config.isIgnoreConfigError()) {
return true;
}
return this.sinkFieldList != null && this.sinkFieldList.contains(fieldName);
}

public List<O> transform(I input) {
return this.transform(input, EMPTY_EXT_PARAMS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;

import net.sf.jsqlparser.JSQLParserException;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -463,4 +464,20 @@ public void testKv2KvForErrorOrder() throws Exception {
Assert.assertEquals(1, output1.size());
Assert.assertEquals("field1=string11&field2=&field3=number12", output1.get(0));
}

@Test
public void testKv2KvForConfigError() throws Exception {
List<FieldInfo> sourceFields = this.getTestFieldList("key1", "key2", "key3", "key4");
KvSourceInfo kvSource = new KvSourceInfo("UTF-8", sourceFields);
List<FieldInfo> sinkFields = this.getTestFieldList("field1", "field2", "field3");
KvSinkInfo kvSink = new KvSinkInfo("UTF-8", sinkFields);
String transformSql = "select key4 as field3, key2 as field6, key1 as field1";
TransformConfig config = new TransformConfig(transformSql, new HashMap<>(), false, false);
// case1
Assert.assertThrows(JSQLParserException.class, () -> {
TransformProcessor
.create(config, SourceDecoderFactory.createKvDecoder(kvSource),
SinkEncoderFactory.createKvEncoder(kvSink));
});
}
}

0 comments on commit 7587f41

Please sign in to comment.