Skip to content

Commit

Permalink
[Flink] Support batch mode for arrow source (#559)
Browse files Browse the repository at this point in the history
* support batch merge source

Signed-off-by: fphantam <[email protected]>

* add bounded param for arrow source

Signed-off-by: chenxu <[email protected]>

* add limit push down for table source for batch and stream,but not work with order by clause because of flink

---------

Signed-off-by: fphantam <[email protected]>
Signed-off-by: chenxu <[email protected]>
Co-authored-by: fphantam <[email protected]>
Co-authored-by: chenxu <[email protected]>
Co-authored-by: maosen <[email protected]>
  • Loading branch information
4 people authored Nov 14, 2024
1 parent 54e7198 commit 60a55e9
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.stream.Collectors;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.BATCH_SIZE;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.LIMIT;

public class LakeSoulOneSplitRecordsReader implements RecordsWithSplitIds<RowData>, AutoCloseable {

Expand Down Expand Up @@ -79,6 +80,9 @@ public class LakeSoulOneSplitRecordsReader implements RecordsWithSplitIds<RowDat

private final Plan filter;

private long totalRead = 0;
private long limit = Long.MAX_VALUE;

public LakeSoulOneSplitRecordsReader(Configuration conf,
LakeSoulPartitionSplit split,
RowType tableRowType,
Expand Down Expand Up @@ -106,6 +110,7 @@ public LakeSoulOneSplitRecordsReader(Configuration conf,
this.partitionSchema = new Schema(partitionFields);
this.partitionValues = DBUtil.parsePartitionDesc(split.getPartitionDesc());
this.filter = filter;
this.limit = conf.getLong(LIMIT, LIMIT.defaultValue());
initializeReader();
recoverFromSkipRecord();
}
Expand Down Expand Up @@ -187,9 +192,11 @@ private void recoverFromSkipRecord() throws Exception {
}
this.currentVCR = this.reader.nextResultVectorSchemaRoot();
skipRowCount += this.currentVCR.getRowCount();

}
skipRowCount -= currentVCR.getRowCount();
curRecordIdx = (int) (skipRecords - skipRowCount);
totalRead = skipRecords;
} else {
if (this.reader.hasNext()) {
this.currentVCR = this.reader.nextResultVectorSchemaRoot();
Expand Down Expand Up @@ -217,6 +224,11 @@ public RowData nextRecordFromSplit() {
return null;
}
while (true) {
if (totalRead >= this.limit) {
this.reader.close();
LOG.info("Reach limit condition {}", split);
return null;
}
if (curRecordIdx >= currentVCR.getRowCount()) {
if (this.reader.hasNext()) {
this.currentVCR = this.reader.nextResultVectorSchemaRoot();
Expand Down Expand Up @@ -262,6 +274,7 @@ public RowData nextRecordFromSplit() {
rd = this.curArrowReaderRequestedSchema.read(rowId);
// change rowkind if needed
rd.setRowKind(rk);
totalRead++;
return rd;
}
}
Expand All @@ -274,6 +287,7 @@ public Set<String> finishedSplits() {

@Override
public void close() throws Exception {
LOG.info("Close reader split {}, read num {}", splitId, totalRead);
if (this.currentVCR != null) {
this.currentVCR.close();
this.currentVCR = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,72 @@ public static LakeSoulArrowSource create(
TableInfo tableInfo = DataOperation.dbManager().getTableInfoByNameAndNamespace(tableName, tableNamespace);
RowType tableRowType = ArrowUtils.fromArrowSchema(Schema.fromJSON(tableInfo.getTableSchema()));
DBUtil.TablePartitionKeys tablePartitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions());
boolean isBounded = conf.getBoolean("IS_BOUNDED", false);
return new LakeSoulArrowSource(
tableInfo,
tableId,
conf.toMap(),
isBounded,
tableRowType,
tablePartitionKeys.primaryKeys,
tablePartitionKeys.rangeKeys
);
}

public static LakeSoulArrowSource create(
String tableNamespace,
String tableName,
Configuration conf,
List<Map<String, String>> remainingPartitions
) throws IOException {
TableId tableId = new TableId(LakeSoulCatalog.CATALOG_NAME, tableNamespace, tableName);
TableInfo tableInfo = DataOperation.dbManager().getTableInfoByNameAndNamespace(tableName, tableNamespace);
RowType tableRowType = ArrowUtils.fromArrowSchema(Schema.fromJSON(tableInfo.getTableSchema()));
DBUtil.TablePartitionKeys tablePartitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions());
boolean isBounded = conf.getBoolean("IS_BOUNDED", false);
return new LakeSoulArrowSource(
tableInfo,
tableId,
conf.toMap(),
isBounded,
tableRowType,
tablePartitionKeys.primaryKeys,
tablePartitionKeys.rangeKeys,
remainingPartitions
);
}

LakeSoulArrowSource(
TableInfo tableInfo,
TableId tableId,
Map<String, String> optionParams,
boolean isBounded,
RowType tableRowType,
List<String> pkColumns,
List<String> partitionColumns,
List<Map<String, String>> remainingPartitions
) {
super(
tableId,
tableRowType,
tableRowType,
tableRowType,
isBounded,
pkColumns,
partitionColumns,
optionParams,
remainingPartitions,
null,
null
);
this.encodedTableInfo = tableInfo.toByteArray();
}

LakeSoulArrowSource(
TableInfo tableInfo,
TableId tableId,
Map<String, String> optionParams,
boolean isBounded,
RowType tableRowType,
List<String> pkColumns,
List<String> partitionColumns
Expand All @@ -54,7 +106,7 @@ public static LakeSoulArrowSource create(
tableRowType,
tableRowType,
tableRowType,
false,
isBounded,
pkColumns,
partitionColumns,
optionParams,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
import org.apache.flink.table.expressions.ResolvedExpression;
Expand All @@ -47,7 +48,7 @@

public class LakeSoulTableSource
implements SupportsFilterPushDown, SupportsProjectionPushDown, ScanTableSource,
SupportsRowLevelModificationScan {
SupportsRowLevelModificationScan, SupportsLimitPushDown {

private static final Logger LOG = LoggerFactory.getLogger(LakeSoulTableSource.class);

Expand Down Expand Up @@ -351,4 +352,9 @@ public RowLevelModificationScanContext applyRowLevelModificationScan(
public LakeSoulRowLevelModificationScanContext getModificationContext() {
return modificationContext;
}

@Override
public void applyLimit(long limit) {
this.optionParams.put(LakeSoulSinkOptions.LIMIT.key(),String.valueOf(limit));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import java.time.Duration;

public class LakeSoulSinkOptions {
public class LakeSoulSinkOptions {

public static final String FACTORY_IDENTIFIER = "lakesoul";

Expand Down Expand Up @@ -231,6 +231,11 @@ public class LakeSoulSinkOptions {
.booleanType()
.defaultValue(false)
.withDescription("If true, lakesoul sink will auto change sink table's schema");
public static final ConfigOption<Long> LIMIT = ConfigOptions
.key("lakesoul.limit")
.longType()
.defaultValue(Long.MAX_VALUE)
.withDescription("limit io read num");
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,28 @@ public void testDeletePartitionOnlySQL() throws ExecutionException, InterruptedE
TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[4, Bob, 110]"});
}

@Test
public void testSelectWithLimit() throws ExecutionException, InterruptedException {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
createLakeSoulSourceTableUserWithRange(tEnv);
tEnv.executeSql(
"INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)")
.await();
String testSelect = "select * from user_info_1 limit 2";
List<Row> result = CollectionUtil.iteratorToList(tEnv.executeSql(testSelect).collect());
assert result.size() == 2;
String testSelect1 = "select * from user_info_1 limit 9";
List<Row> result1 = CollectionUtil.iteratorToList(tEnv.executeSql(testSelect1).collect());
assert result1.size() == 4;
StreamTableEnvironment streamEnv = TestUtils.createStreamTableEnv(BATCH_TYPE);
TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect);
List<Row> results = CollectionUtil.iteratorToList(flinkTable.execute().collect());
assert result.size() == 2;
TableImpl flinkTable1 = (TableImpl) streamEnv.sqlQuery(testSelect1);
List<Row> result2 = CollectionUtil.iteratorToList(flinkTable1.execute().collect());
assert result2.size() == 4;
}

@Test
public void testDeleteAllPartitionedDataExactlySQL() throws ExecutionException, InterruptedException {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
Expand Down

0 comments on commit 60a55e9

Please sign in to comment.