Skip to content

Commit

Permalink
[INLONG-11616][SDK] Use self-defined Field and RowData conversion utils
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Dec 19, 2024
1 parent 3a297fb commit d7a54ba
Show file tree
Hide file tree
Showing 6 changed files with 414 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.inlong.sdk.transform.decode;

import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
import org.apache.inlong.sdk.transform.utils.RowToFieldDataUtils;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.table.data.RowData;
Expand All @@ -29,12 +29,12 @@ public class RowDataSourceData implements SourceData {

private final RowData rowData;
private final Map<String, Integer> fieldPositionMap;
private final RowDataToFieldConverters.RowFieldConverter[] converters;
private final RowToFieldDataUtils.RowFieldConverter[] converters;

public RowDataSourceData(
RowData rowData,
Map<String, Integer> fieldPositionMap,
RowDataToFieldConverters.RowFieldConverter[] converters) {
RowToFieldDataUtils.RowFieldConverter[] converters) {
this.rowData = rowData;
this.fieldPositionMap = fieldPositionMap;
this.converters = converters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
import org.apache.inlong.sdk.transform.utils.RowToFieldDataUtils;
import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;

import org.apache.flink.table.data.RowData;
Expand All @@ -32,16 +32,16 @@
public class RowDataSourceDecoder extends SourceDecoder<RowData> {

private final Map<String, Integer> fieldPositionMap;
private final RowDataToFieldConverters.RowFieldConverter[] rowFieldConverters;
private final RowToFieldDataUtils.RowFieldConverter[] rowFieldConverters;

public RowDataSourceDecoder(RowDataSourceInfo sourceInfo) {
super(sourceInfo.getFields());
List<FieldInfo> fields = sourceInfo.getFields();
this.fieldPositionMap = parseFieldPositionMap(fields);

rowFieldConverters = new RowDataToFieldConverters.RowFieldConverter[fields.size()];
rowFieldConverters = new RowToFieldDataUtils.RowFieldConverter[fields.size()];
for (int i = 0; i < rowFieldConverters.length; i++) {
rowFieldConverters[i] = RowDataToFieldConverters.createNullableRowFieldConverter(
rowFieldConverters[i] = RowToFieldDataUtils.createNullableRowFieldConverter(
TableFormatForRowDataUtils.deriveLogicalType(fields.get(i).getFormatInfo()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,28 @@

package org.apache.inlong.sdk.transform.encode;

import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
import org.apache.inlong.sdk.transform.utils.FieldToRowDataUtils;
import org.apache.inlong.sort.formats.base.TableFormatUtils;

import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class RowDataSinkEncoder extends SinkEncoder<RowData> {

private final FieldToRowDataConverters.FieldToRowDataConverter[] fieldToRowDataConverters;
private final Map<String, Integer> fieldPositionMap;
private final FieldToRowDataUtils.FieldToRowDataConverter[] fieldToRowDataConverters;

public RowDataSinkEncoder(RowDataSinkInfo sinkInfo) {
super(sinkInfo.getFields());
this.fieldPositionMap = parseFieldPositionMap(fields);

fieldToRowDataConverters = new FieldToRowDataConverters.FieldToRowDataConverter[fields.size()];
fieldToRowDataConverters = new FieldToRowDataUtils.FieldToRowDataConverter[fields.size()];
for (int i = 0; i < fields.size(); i++) {
fieldToRowDataConverters[i] = FieldToRowDataConverters.createConverter(
fieldToRowDataConverters[i] = FieldToRowDataUtils.createConverter(
TableFormatUtils.deriveLogicalType(fields.get(i).getFormatInfo()));
}
}

private Map<String, Integer> parseFieldPositionMap(List<FieldInfo> fields) {
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < fields.size(); i++) {
map.put(fields.get(i).getName(), i);
}
return map;
}

@Override
public RowData encode(SinkData sinkData, Context context) {
GenericRowData rowData = new GenericRowData(fieldToRowDataConverters.length);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.utils;

import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;

import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class FieldToRowDataUtils {

private static final long serialVersionUID = 1L;

/**
* Base class of Field To RowData Converters.
*/
public interface FieldToRowDataConverter extends Serializable {

Object convert(Object obj);
}

public static FieldToRowDataConverter createConverter(LogicalType logicalType) {
return wrapIntoNullableConverter(createFieldRowConverter(logicalType));
}

private static FieldToRowDataConverter wrapIntoNullableConverter(
FieldToRowDataConverter converter) {
return obj -> {
if (obj == null) {
return null;
}
return converter.convert(obj);
};
}

private static FieldToRowDataConverter createFieldRowConverter(LogicalType fieldType) {
switch (fieldType.getTypeRoot()) {
case NULL:
return (obj) -> null;
case BOOLEAN:
return obj -> Boolean.parseBoolean(obj.toString());
case TINYINT:
return obj -> Byte.parseByte(obj.toString());
case SMALLINT:
return obj -> Short.parseShort(obj.toString());
case INTERVAL_YEAR_MONTH:
case INTEGER:
return obj -> Integer.parseInt(obj.toString());
case INTERVAL_DAY_TIME:
case BIGINT:
return obj -> Long.parseLong(obj.toString());
case FLOAT:
return obj -> Float.parseFloat(obj.toString());
case DOUBLE:
return obj -> Double.parseDouble(obj.toString());
case BINARY:
case VARBINARY:
return obj -> obj.toString().getBytes();
case CHAR:
case VARCHAR:
return (obj -> StringData.fromString((String) obj));
case DATE:
return (obj -> ((Date) obj).toLocalDate().toEpochDay());
case TIME_WITHOUT_TIME_ZONE:
return (obj -> ((Time) obj).toLocalTime().toSecondOfDay() * 1000);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_TIME_ZONE:
return obj -> TimestampData.fromTimestamp((Timestamp) obj);
case DECIMAL:
return obj -> DecimalData.fromBigDecimal(
(BigDecimal) obj,
DecimalType.DEFAULT_PRECISION,
DecimalType.DEFAULT_SCALE);
case ARRAY:
return obj -> {
final Object[] array = (Object[]) obj;
FieldToRowDataConverter elementConverter =
createFieldRowConverter(((ArrayType) fieldType).getElementType());
Object[] converted = Arrays.stream(array)
.map(elementConverter::convert)
.toArray();
return new GenericArrayData(converted);
};
case MAP:
return obj -> {
FieldToRowDataConverter keyConverter =
createFieldRowConverter(((MapType) fieldType).getKeyType());
FieldToRowDataConverter valueConverter =
createFieldRowConverter(((MapType) fieldType).getValueType());
Map map = (Map) obj;
Map<Object, Object> internalMap = new HashMap<>();
for (Object k : map.keySet()) {
internalMap.put(keyConverter.convert(k),
valueConverter.convert(map.get(k)));
}
return new GenericMapData(internalMap);
};
case ROW:
case MULTISET:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type:" + fieldType);
}
}
}
Loading

0 comments on commit d7a54ba

Please sign in to comment.