Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Kudu scanner should be allowed to do non-share scan. #53123

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/connector/connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class DataSourceProvider {

virtual const TupleDescriptor* tuple_descriptor(RuntimeState* state) const = 0;

virtual bool always_shared_scan() const { return true; }
virtual bool always_shared_scan(TScanRangeParams scan_range) const { return true; }

virtual void peek_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {}

Expand Down
5 changes: 5 additions & 0 deletions be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ DataSourcePtr HiveDataSourceProvider::create_data_source(const TScanRange& scan_
return std::make_unique<HiveDataSource>(this, scan_range);
}

bool HiveDataSourceProvider::always_shared_scan(TScanRangeParams scan_range) const {
return !scan_range.scan_range.hdfs_scan_range.__isset.use_kudu_jni_reader &&
!scan_range.scan_range.hdfs_scan_range.use_kudu_jni_reader;
}

const TupleDescriptor* HiveDataSourceProvider::tuple_descriptor(RuntimeState* state) const {
return state->desc_tbl().get_tuple_descriptor(_hdfs_scan_node.tuple_id);
}
Expand Down
1 change: 1 addition & 0 deletions be/src/connector/hive_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class HiveDataSourceProvider final : public DataSourceProvider {
HiveDataSourceProvider(ConnectorScanNode* scan_node, const TPlanNode& plan_node);
DataSourcePtr create_data_source(const TScanRange& scan_range) override;
const TupleDescriptor* tuple_descriptor(RuntimeState* state) const override;
bool always_shared_scan(TScanRangeParams scan_range) const override;

void peek_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
void default_data_source_mem_bytes(int64_t* min_value, int64_t* max_value) override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/connector/lake_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class LakeDataSourceProvider final : public DataSourceProvider {
const TupleDescriptor* tuple_descriptor(RuntimeState* state) const override;

// always enable shared scan for cloud native table
bool always_shared_scan() const override { return true; }
bool always_shared_scan(TScanRangeParams scan_range) const override { return true; }

StatusOr<pipeline::MorselQueuePtr> convert_scan_range_to_morsel_queue(
const std::vector<TScanRangeParams>& scan_ranges, int node_id, int32_t pipeline_dop,
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/connector_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ void ConnectorScanNode::_init_counter() {
}

bool ConnectorScanNode::always_shared_scan() const {
return _data_source_provider->always_shared_scan();
return _scan_ranges.size() > 0 && _data_source_provider->always_shared_scan(_scan_ranges[0]);
}

StatusOr<pipeline::MorselQueuePtr> ConnectorScanNode::convert_scan_range_to_morsel_queue(
Expand Down
54 changes: 49 additions & 5 deletions be/test/exec/connector_scan_node_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ConnectorScanNodeTest : public ::testing::Test {
std::vector<TScanRangeParams> create_scan_ranges_cloud(size_t num);

std::shared_ptr<TPlanNode> create_tplan_node_hive();
std::vector<TScanRangeParams> create_scan_ranges_hive(size_t num);
std::vector<TScanRangeParams> create_scan_ranges_hive(size_t num, std::string scanner_type);

std::shared_ptr<TPlanNode> create_tplan_node_stream_load();
std::vector<TScanRangeParams> create_scan_ranges_stream_load(RuntimeState* runtime_state,
Expand Down Expand Up @@ -188,14 +188,17 @@ std::shared_ptr<TPlanNode> ConnectorScanNodeTest::create_tplan_node_hive() {
return tnode;
}

std::vector<TScanRangeParams> ConnectorScanNodeTest::create_scan_ranges_hive(size_t num) {
std::vector<TScanRangeParams> ConnectorScanNodeTest::create_scan_ranges_hive(size_t num, std::string scanner_type) {
std::vector<TScanRangeParams> scan_ranges;

for (int i = 0; i < num; i++) {
THdfsScanRange hdfs_scan_range;
hdfs_scan_range.__set_full_path("file");
hdfs_scan_range.__set_offset(i);
hdfs_scan_range.__set_length(1);
if (scanner_type == "kudu") {
hdfs_scan_range.__set_use_kudu_jni_reader(true);
}

TScanRange scan_range;
scan_range.__set_hdfs_scan_range(hdfs_scan_range);
Expand Down Expand Up @@ -223,7 +226,7 @@ TEST_F(ConnectorScanNodeTest, test_convert_scan_range_to_morsel_queue_factory_hi

// dop is 1 and not so much morsels
int pipeline_dop = 1;
auto scan_ranges = create_scan_ranges_hive(1);
auto scan_ranges = create_scan_ranges_hive(1, "hive"));
ASSIGN_OR_ABORT(auto morsel_queue_factory,
scan_node->convert_scan_range_to_morsel_queue_factory(
scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false,
Expand All @@ -232,7 +235,7 @@ TEST_F(ConnectorScanNodeTest, test_convert_scan_range_to_morsel_queue_factory_hi

// dop is 2 and not so much morsels
pipeline_dop = 2;
scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator());
scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator(), "hive"));
ASSIGN_OR_ABORT(morsel_queue_factory,
scan_node->convert_scan_range_to_morsel_queue_factory(
scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false,
Expand All @@ -241,7 +244,7 @@ TEST_F(ConnectorScanNodeTest, test_convert_scan_range_to_morsel_queue_factory_hi

// dop is 2 and so much morsels
pipeline_dop = 2;
scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator() + 1);
scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator() + 1, "hive");
ASSIGN_OR_ABORT(morsel_queue_factory,
scan_node->convert_scan_range_to_morsel_queue_factory(
scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false,
Expand Down Expand Up @@ -351,4 +354,45 @@ TEST_F(ConnectorScanNodeTest, test_stream_load_thread_pool) {
ASSERT_TRUE(scan_node->use_stream_load_thread_pool());
}

TEST_F(ConnectorScanNodeTest, test_convert_scan_range_to_morsel_queue_factory_kudu) {
std::shared_ptr<RuntimeState> runtime_state = create_runtime_state();
std::vector<TypeDescriptor> types;
types.emplace_back(TYPE_INT);
auto* descs = create_table_desc(runtime_state.get(), types);
auto tnode = create_tplan_node_hive();
auto scan_node = std::make_shared<starrocks::ConnectorScanNode>(runtime_state->obj_pool(), *tnode, *descs);
ASSERT_OK(scan_node->init(*tnode, runtime_state.get()));

bool enable_tablet_internal_parallel = false;
auto tablet_internal_parallel_mode = TTabletInternalParallelMode::type::AUTO;
std::map<int32_t, std::vector<TScanRangeParams>> no_scan_ranges_per_driver_seq;

// dop is 1 and not so much morsels
int pipeline_dop = 1;
auto scan_ranges = create_scan_ranges_hive(1, "kudu");
ASSIGN_OR_ABORT(auto morsel_queue_factory,
scan_node->convert_scan_range_to_morsel_queue_factory(
scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false,
enable_tablet_internal_parallel, tablet_internal_parallel_mode));
ASSERT_TRUE(morsel_queue_factory->is_shared());

// dop is 2 and not so much morsels
pipeline_dop = 2;
scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator(), "kudu");
ASSIGN_OR_ABORT(morsel_queue_factory,
scan_node->convert_scan_range_to_morsel_queue_factory(
scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false,
enable_tablet_internal_parallel, tablet_internal_parallel_mode));
ASSERT_FALSE(morsel_queue_factory->is_shared());

// dop is 2 and so much morsels
pipeline_dop = 2;
scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator() + 1, "kudu");
ASSIGN_OR_ABORT(morsel_queue_factory,
scan_node->convert_scan_range_to_morsel_queue_factory(
scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false,
enable_tablet_internal_parallel, tablet_internal_parallel_mode));
ASSERT_TRUE(morsel_queue_factory->is_shared());
}

} // namespace starrocks
3 changes: 2 additions & 1 deletion be/test/storage/lake/lake_data_source_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ TEST_F(LakeDataSourceTest, test_convert_scan_range_to_morsel_queue) {
auto data_source_provider = dynamic_cast<connector::LakeDataSourceProvider*>(scan_node->data_source_provider());
data_source_provider->set_lake_tablet_manager(_tablet_mgr.get());

ASSERT_TRUE(data_source_provider->always_shared_scan());
TScanRangeParams scan_range;
ASSERT_TRUE(data_source_provider->always_shared_scan(scan_range));

config::tablet_internal_parallel_max_splitted_scan_bytes = 32;
config::tablet_internal_parallel_min_splitted_scan_rows = 4;
Expand Down
Loading