diff --git a/inlong-sdk/transform-sdk/pom.xml b/inlong-sdk/transform-sdk/pom.xml
index 18db54765e..9450a4fcc8 100644
--- a/inlong-sdk/transform-sdk/pom.xml
+++ b/inlong-sdk/transform-sdk/pom.xml
@@ -46,6 +46,12 @@
sdk-common
${project.version}
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ provided
+
org.apache.httpcomponents
httpclient
@@ -110,6 +116,18 @@
org.dom4j
dom4j
+
+ org.apache.inlong
+ sort-format-common
+ ${project.version}
+ compile
+
+
+ org.apache.inlong
+ sort-format-rowdata-base
+ ${project.version}
+ compile
+
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
new file mode 100644
index 0000000000..3e6ee9fc39
--- /dev/null
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Map;
+
+@Slf4j
+public class RowDataSourceData implements SourceData {
+
+ private final RowData rowData;
+ private final Map fieldPositionMap;
+ private final RowDataToFieldConverters.RowFieldConverter[] converters;
+
+ public RowDataSourceData(
+ RowData rowData,
+ Map fieldPositionMap,
+ RowDataToFieldConverters.RowFieldConverter[] converters) {
+ this.rowData = rowData;
+ this.fieldPositionMap = fieldPositionMap;
+ this.converters = converters;
+ }
+
+ @Override
+ public int getRowCount() {
+ return 1;
+ }
+
+ @Override
+ public Object getField(int rowNum, String fieldName) {
+ if (rowNum != 0) {
+ return null;
+ }
+ try {
+ int fieldPosition = fieldPositionMap.get(fieldName);
+ return converters[fieldPosition].convert(rowData, fieldPosition);
+ } catch (Throwable e) {
+ log.error("failed to convert field={}", fieldName, e);
+ return null;
+ }
+ }
+
+}
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
new file mode 100644
index 0000000000..fdd6c4ce08
--- /dev/null
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
+import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;
+
+import org.apache.flink.table.data.RowData;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RowDataSourceDecoder extends SourceDecoder {
+
+ private final Map fieldPositionMap;
+ private final RowDataToFieldConverters.RowFieldConverter[] rowFieldConverters;
+
+ public RowDataSourceDecoder(RowDataSourceInfo sourceInfo) {
+ super(sourceInfo.getFields());
+ List fields = sourceInfo.getFields();
+ this.fieldPositionMap = parseFieldPositionMap(fields);
+
+ rowFieldConverters = new RowDataToFieldConverters.RowFieldConverter[fields.size()];
+ for (int i = 0; i < rowFieldConverters.length; i++) {
+ rowFieldConverters[i] = RowDataToFieldConverters.createNullableRowFieldConverter(
+ TableFormatForRowDataUtils.deriveLogicalType(fields.get(i).getFormatInfo()));
+ }
+ }
+
+ private Map parseFieldPositionMap(List fields) {
+ Map map = new HashMap<>();
+ for (int i = 0; i < fields.size(); i++) {
+ map.put(fields.get(i).getName(), i);
+ }
+ return map;
+ }
+
+ @Override
+ public SourceData decode(byte[] srcBytes, Context context) {
+ throw new UnsupportedOperationException("do not support decoding bytes for row data decoder");
+ }
+
+ @Override
+ public SourceData decode(RowData rowData, Context context) {
+ return new RowDataSourceData(rowData, fieldPositionMap, rowFieldConverters);
+ }
+
+}
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
index 76e856be14..dafd891435 100644
--- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
@@ -24,6 +24,7 @@
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.ParquetSourceInfo;
import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo;
import org.apache.inlong.sdk.transform.pojo.XmlSourceInfo;
import org.apache.inlong.sdk.transform.pojo.YamlSourceInfo;
@@ -65,4 +66,8 @@ public static YamlSourceDecoder createYamlDecoder(YamlSourceInfo sourceInfo) {
return new YamlSourceDecoder(sourceInfo);
}
+ public static RowDataSourceDecoder createRowDecoder(RowDataSourceInfo sourceInfo) {
+ return new RowDataSourceDecoder(sourceInfo);
+ }
+
}
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
new file mode 100644
index 0000000000..f2203cb3cd
--- /dev/null
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.base.TableFormatUtils;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RowDataSinkEncoder extends SinkEncoder {
+
+ private final FieldToRowDataConverters.FieldToRowDataConverter[] fieldToRowDataConverters;
+ private final Map fieldPositionMap;
+
+ public RowDataSinkEncoder(RowDataSinkInfo sinkInfo) {
+ super(sinkInfo.getFields());
+ this.fieldPositionMap = parseFieldPositionMap(fields);
+
+ fieldToRowDataConverters = new FieldToRowDataConverters.FieldToRowDataConverter[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ fieldToRowDataConverters[i] = FieldToRowDataConverters.createConverter(
+ TableFormatUtils.deriveLogicalType(fields.get(i).getFormatInfo()));
+ }
+ }
+
+ private Map parseFieldPositionMap(List fields) {
+ Map map = new HashMap<>();
+ for (int i = 0; i < fields.size(); i++) {
+ map.put(fields.get(i).getName(), i);
+ }
+ return map;
+ }
+
+ @Override
+ public RowData encode(SinkData sinkData, Context context) {
+ GenericRowData rowData = new GenericRowData(fieldToRowDataConverters.length);
+
+ for (int i = 0; i < fields.size(); i++) {
+ String fieldName = fields.get(i).getName();
+ String fieldValue = sinkData.getField(fieldName);
+ rowData.setField(i, fieldToRowDataConverters[i].convert(fieldValue));
+ }
+
+ return rowData;
+ }
+}
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
index 1778aba180..8d82970e7a 100644
--- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
@@ -22,6 +22,7 @@
import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo;
import org.apache.inlong.sdk.transform.pojo.PbSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
public class SinkEncoderFactory {
@@ -45,4 +46,8 @@ public static PbSinkEncoder createPbEncoder(PbSinkInfo pbSinkInfo) {
return new PbSinkEncoder(pbSinkInfo);
}
+ public static RowDataSinkEncoder createRowEncoder(RowDataSinkInfo rowDataSinkInfo) {
+ return new RowDataSinkEncoder(rowDataSinkInfo);
+ }
+
}
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
index fe08d00bb1..eaf1b7a9eb 100644
--- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sdk.transform.pojo;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
import lombok.Data;
@@ -28,6 +29,7 @@
public class FieldInfo {
private String name;
+ private FormatInfo formatInfo;
private TypeConverter converter = TypeConverter.DefaultTypeConverter();
public FieldInfo() {
@@ -42,4 +44,9 @@ public FieldInfo(String name, TypeConverter converter) {
this.name = name;
this.converter = converter;
}
+
+ public FieldInfo(String name, TypeConverter converter, FormatInfo formatInfo) {
+ this(name, converter);
+ this.formatInfo = formatInfo;
+ }
}
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSinkInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSinkInfo.java
new file mode 100644
index 0000000000..d88ddbb6eb
--- /dev/null
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSinkInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+import lombok.experimental.SuperBuilder;
+
+import java.util.List;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Data
+@SuperBuilder
+public class RowDataSinkInfo extends SinkInfo {
+
+ private List fields;
+
+ public RowDataSinkInfo(String charset, List fields) {
+ super(SinkInfo.ROWDATA, charset);
+ this.fields = fields;
+ }
+}
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSourceInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSourceInfo.java
new file mode 100644
index 0000000000..18aa936bc9
--- /dev/null
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSourceInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Getter;
+import lombok.experimental.SuperBuilder;
+
+import java.util.List;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+@SuperBuilder
+@Getter
+public class RowDataSourceInfo extends SourceInfo {
+
+ private List fields;
+
+ public RowDataSourceInfo(String charset, List fields) {
+ super(charset);
+ this.fields = fields;
+ }
+}
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
index 022af73fd5..89cf599d98 100644
--- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
@@ -46,6 +46,7 @@ public abstract class SinkInfo {
public static final String ES_MAP = "es_map";
public static final String PARQUET = "parquet";
public static final String PB = "pb";
+ public static final String ROWDATA = "rowdata";
@JsonIgnore
private String type;
diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
index 3322d83199..bc0fb0371e 100644
--- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
+++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sdk.transform.process.processor;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo;
import org.apache.inlong.sdk.transform.decode.ParquetInputByteArray;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
@@ -48,6 +49,7 @@ protected List getTestFieldList(String... fieldNames) {
for (String fieldName : fieldNames) {
FieldInfo field = new FieldInfo();
field.setName(fieldName);
+ field.setFormatInfo(new StringFormatInfo());
fields.add(field);
}
return fields;
diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestRowData2RowDataProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestRowData2RowDataProcessor.java
new file mode 100644
index 0000000000..9bb997e9af
--- /dev/null
+++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestRowData2RowDataProcessor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.processor;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestRowData2RowDataProcessor extends AbstractProcessorTestBase {
+
+ @Test
+ public void testRowData2RowData() throws Exception {
+ List fields1 = this.getTestFieldList("sid", "packageID", "msgTime", "msg");
+ RowDataSourceInfo sourceInfo = new RowDataSourceInfo("utf-8", fields1);
+ List fields2 = this.getTestFieldList("f1", "f2", "f3", "f4");
+ RowDataSinkInfo sinkInfo = new RowDataSinkInfo("utf-8", fields2);
+
+ String transformSql = "select msgTime ,msg, packageID, sid";
+ TransformConfig config = new TransformConfig(transformSql);
+ TransformProcessor processor =
+ TransformProcessor.create(
+ config,
+ SourceDecoderFactory.createRowDecoder(sourceInfo),
+ SinkEncoderFactory.createRowEncoder(sinkInfo));
+
+ RowData sourceRow = createRowData();
+
+ List sinkRow = processor.transform(sourceRow);
+ RowData expectedRow = sinkRow.get(0);
+ Assert.assertEquals("2024-12-19T11:00:55.212", expectedRow.getString(0).toString());
+ Assert.assertEquals("msg111", expectedRow.getString(1).toString());
+ Assert.assertEquals("pack123", expectedRow.getString(2).toString());
+ Assert.assertEquals("s123", expectedRow.getString(3).toString());
+
+ }
+
+ private RowData createRowData() {
+ GenericRowData rowData = new GenericRowData(4);
+ rowData.setField(0, StringData.fromString("s123"));
+ rowData.setField(1, StringData.fromString("pack123"));
+ rowData.setField(2, StringData.fromString("2024-12-19T11:00:55.212"));
+ rowData.setField(3, StringData.fromString("msg111"));
+ return rowData;
+ }
+}