diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/arrow/LakeSoulArrowSource.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/arrow/LakeSoulArrowSource.java index dfa7f99fb..588e624c4 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/arrow/LakeSoulArrowSource.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/arrow/LakeSoulArrowSource.java @@ -41,6 +41,55 @@ public static LakeSoulArrowSource create( ); } + public static LakeSoulArrowSource create( + String tableNamespace, + String tableName, + Configuration conf, + List> remainingPartitions + ) throws IOException { + TableId tableId = new TableId(LakeSoulCatalog.CATALOG_NAME, tableNamespace, tableName); + TableInfo tableInfo = DataOperation.dbManager().getTableInfoByNameAndNamespace(tableName, tableNamespace); + RowType tableRowType = ArrowUtils.fromArrowSchema(Schema.fromJSON(tableInfo.getTableSchema())); + DBUtil.TablePartitionKeys tablePartitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions()); + boolean isBounded = conf.getBoolean("IS_BOUNDED", false); + return new LakeSoulArrowSource( + tableInfo, + tableId, + conf.toMap(), + tableRowType, + isBounded, + tablePartitionKeys.primaryKeys, + tablePartitionKeys.rangeKeys, + remainingPartitions + ); + } + + LakeSoulArrowSource( + TableInfo tableInfo, + TableId tableId, + Map optionParams, + RowType tableRowType, + boolean isBounded, + List pkColumns, + List partitionColumns, + List> remainingPartitions + ) { + super( + tableId, + tableRowType, + tableRowType, + tableRowType, + isBounded, + pkColumns, + partitionColumns, + optionParams, + remainingPartitions, + null, + null + ); + this.encodedTableInfo = tableInfo.toByteArray(); + } + LakeSoulArrowSource( TableInfo tableInfo, TableId tableId,