Skip to content

Commit

Permalink
[Enhancement] Kudu scanner should be allowed to do non-share scan.
Browse files Browse the repository at this point in the history
Signed-off-by: Song Jiacheng <[email protected]>
  • Loading branch information
Jcnessss committed Nov 22, 2024
1 parent 5411897 commit f648756
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 4 deletions.
2 changes: 1 addition & 1 deletion be/src/connector/connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class DataSourceProvider {

virtual const TupleDescriptor* tuple_descriptor(RuntimeState* state) const = 0;

virtual bool always_shared_scan() const { return true; }
virtual bool always_shared_scan(TScanRangeParams scan_range) const { return true; }

virtual void peek_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {}

Expand Down
5 changes: 5 additions & 0 deletions be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ DataSourcePtr HiveDataSourceProvider::create_data_source(const TScanRange& scan_
return std::make_unique<HiveDataSource>(this, scan_range);
}

bool HiveDataSourceProvider::always_shared_scan(TScanRangeParams scan_range) const {
return !scan_range.scan_range.hdfs_scan_range.__isset.use_kudu_jni_reader &&
!scan_range.scan_range.hdfs_scan_range.use_kudu_jni_reader;
}

const TupleDescriptor* HiveDataSourceProvider::tuple_descriptor(RuntimeState* state) const {
return state->desc_tbl().get_tuple_descriptor(_hdfs_scan_node.tuple_id);
}
Expand Down
1 change: 1 addition & 0 deletions be/src/connector/hive_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class HiveDataSourceProvider final : public DataSourceProvider {
HiveDataSourceProvider(ConnectorScanNode* scan_node, const TPlanNode& plan_node);
DataSourcePtr create_data_source(const TScanRange& scan_range) override;
const TupleDescriptor* tuple_descriptor(RuntimeState* state) const override;
bool always_shared_scan(TScanRangeParams scan_range) const override;

void peek_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
void default_data_source_mem_bytes(int64_t* min_value, int64_t* max_value) override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/connector/lake_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class LakeDataSourceProvider final : public DataSourceProvider {
const TupleDescriptor* tuple_descriptor(RuntimeState* state) const override;

// always enable shared scan for cloud native table
bool always_shared_scan() const override { return true; }
bool always_shared_scan(TScanRangeParams scan_range) const override { return true; }

StatusOr<pipeline::MorselQueuePtr> convert_scan_range_to_morsel_queue(
const std::vector<TScanRangeParams>& scan_ranges, int node_id, int32_t pipeline_dop,
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/connector_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ void ConnectorScanNode::_init_counter() {
}

bool ConnectorScanNode::always_shared_scan() const {
return _data_source_provider->always_shared_scan();
return _scan_ranges.size() > 0 && _data_source_provider->always_shared_scan(_scan_ranges[0]);
}

StatusOr<pipeline::MorselQueuePtr> ConnectorScanNode::convert_scan_range_to_morsel_queue(
Expand Down
3 changes: 2 additions & 1 deletion be/test/storage/lake/lake_data_source_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ TEST_F(LakeDataSourceTest, test_convert_scan_range_to_morsel_queue) {
auto data_source_provider = dynamic_cast<connector::LakeDataSourceProvider*>(scan_node->data_source_provider());
data_source_provider->set_lake_tablet_manager(_tablet_mgr.get());

ASSERT_TRUE(data_source_provider->always_shared_scan());
TScanRangeParams scan_range;
ASSERT_TRUE(data_source_provider->always_shared_scan(scan_range));

config::tablet_internal_parallel_max_splitted_scan_bytes = 32;
config::tablet_internal_parallel_min_splitted_scan_rows = 4;
Expand Down

0 comments on commit f648756

Please sign in to comment.