Skip to content

Commit

Permalink
[INLONG-10696][Sort] StarRocks sink supports ignore json parse error (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng authored Jul 23, 2024
1 parent 63df4f5 commit 58dc220
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ public final class Constants {
.defaultValue(false)
.withDescription("Regard upsert delete as insert kind.");

public static final ConfigOption<Boolean> IGNORE_JSON_PARSE_ERROR =
ConfigOptions.key("sink.ignore.json.parse.error")
.booleanType()
.defaultValue(false)
.withDescription("Ignore json parse error.");

public static final ConfigOption<String> SINK_MULTIPLE_FORMAT =
ConfigOptions.key("sink.multiple.format")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
import static org.apache.inlong.sort.base.Constants.IGNORE_JSON_PARSE_ERROR;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
Expand All @@ -54,14 +55,16 @@ public DynamicTableSink createDynamicTableSink(Context context) {
String inlongMetric = options.getOptional(INLONG_METRIC).orElse(INLONG_METRIC.defaultValue());
String auditHostAndPorts = options.getOptional(INLONG_AUDIT).orElse(INLONG_AUDIT.defaultValue());
String auditKeys = options.getOptional(AUDIT_KEYS).orElse(AUDIT_KEYS.defaultValue());
boolean ignoreJsonParseError =
options.getOptional(IGNORE_JSON_PARSE_ERROR).orElse(IGNORE_JSON_PARSE_ERROR.defaultValue());

// validate some special properties
StarRocksSinkOptions sinkOptions = new StarRocksSinkOptions(options, context.getCatalogTable().getOptions());
sinkOptions.enableUpsertDelete();
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());

return new StarRocksDynamicTableSink(sinkOptions,
physicalSchema, inlongMetric, auditHostAndPorts, auditKeys);
physicalSchema, inlongMetric, auditHostAndPorts, auditKeys, ignoreJsonParseError);
}

@Override
Expand Down Expand Up @@ -101,6 +104,7 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(INLONG_METRIC);
optionalOptions.add(INLONG_AUDIT);
optionalOptions.add(AUDIT_KEYS);
optionalOptions.add(IGNORE_JSON_PARSE_ERROR);
return optionalOptions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ public class StarRocksDynamicTableSink implements DynamicTableSink {
private String inlongMetric;
private String auditHostAndPorts;
private String auditKeys;
private boolean ignoreJsonParseError;

public StarRocksDynamicTableSink(StarRocksSinkOptions sinkOptions, TableSchema schema,
String inlongMetric, String auditHostAndPorts, String auditKeys) {
String inlongMetric, String auditHostAndPorts, String auditKeys, boolean ignoreJsonParseError) {
this.flinkSchema = schema;
this.sinkOptions = sinkOptions;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
this.auditKeys = auditKeys;
this.ignoreJsonParseError = ignoreJsonParseError;
}

@Override
Expand All @@ -59,7 +61,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
StarRocksDynamicSinkFunctionBase<RowData> starrocksSinkFunction = SinkFunctionFactory.createSinkFunction(
sinkOptions,
flinkSchema,
new StarRocksTableRowTransformer(rowDataTypeInfo),
new StarRocksTableRowTransformer(rowDataTypeInfo, ignoreJsonParseError),
inlongMetric,
auditHostAndPorts,
auditKeys);
Expand All @@ -69,7 +71,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {

@Override
public DynamicTableSink copy() {
return new StarRocksDynamicTableSink(sinkOptions, flinkSchema, inlongMetric, auditHostAndPorts, auditKeys);
return new StarRocksDynamicTableSink(sinkOptions, flinkSchema, inlongMetric,
auditHostAndPorts, auditKeys, ignoreJsonParseError);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ public class StarRocksTableRowTransformer implements StarRocksIRowTransformer<Ro
private String[] columnNames;
private DataType[] columnDataTypes;
private Map<String, StarRocksDataType> columns;
private boolean ignoreJsonParseError;
private final SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd");

public StarRocksTableRowTransformer(TypeInformation<RowData> rowDataTypeInfo) {
public StarRocksTableRowTransformer(TypeInformation<RowData> rowDataTypeInfo, boolean ignoreJsonParseError) {
this.rowDataTypeInfo = rowDataTypeInfo;
this.ignoreJsonParseError = ignoreJsonParseError;
}

@Override
Expand Down Expand Up @@ -156,7 +158,14 @@ private Object typeConvertion(LogicalType type, RowData record, int pos) {
// string is "{"a": 1, "b": 2}", and if input it to JSON.toJSONString directly, the
// result will be "{\"a\": 1, \"b\": 2}" which will not be recognized as a json in
// StarRocks
return JSON.parse(sValue);
try {
return JSON.parse(sValue);
} catch (Throwable t) {
if (!ignoreJsonParseError) {
throw t;
}
return sValue;
}
}
return sValue;
case DATE:
Expand Down Expand Up @@ -273,9 +282,11 @@ private Map<Object, Object> convertNestedMap(MapData mapData, LogicalType type)
}

}

final class BinaryStringDataSerializer implements ObjectSerializer, Serializable {

private static final long serialVersionUID = 1L;

@Override
public void write(JSONSerializer serializer, Object object, Object fieldName, Type fieldType, int features)
throws IOException {
Expand All @@ -292,6 +303,7 @@ public void write(JSONSerializer serializer, Object object, Object fieldName, Ty
final class DecimalDataSerializer implements ObjectSerializer, Serializable {

private static final long serialVersionUID = 1L;

@Override
public void write(JSONSerializer serializer, Object object, Object fieldName, Type fieldType, int features)
throws IOException {
Expand All @@ -307,6 +319,7 @@ public void write(JSONSerializer serializer, Object object, Object fieldName, Ty
final class TimestampDataSerializer implements ObjectSerializer, Serializable {

private static final long serialVersionUID = 1L;

@Override
public void write(JSONSerializer serializer, Object object, Object fieldName, Type fieldType, int features)
throws IOException {
Expand Down

0 comments on commit 58dc220

Please sign in to comment.