diff --git a/be/src/connector/connector.h b/be/src/connector/connector.h index 4cebb339d7042..cb20715fe0d08 100644 --- a/be/src/connector/connector.h +++ b/be/src/connector/connector.h @@ -159,7 +159,7 @@ class DataSourceProvider { virtual const TupleDescriptor* tuple_descriptor(RuntimeState* state) const = 0; - virtual bool always_shared_scan() const { return true; } + virtual bool always_shared_scan(TScanRangeParams scan_range) const { return true; } virtual void peek_scan_ranges(const std::vector& scan_ranges) {} diff --git a/be/src/connector/hive_connector.cpp b/be/src/connector/hive_connector.cpp index 117ae3be18aed..2f7bb60af83aa 100644 --- a/be/src/connector/hive_connector.cpp +++ b/be/src/connector/hive_connector.cpp @@ -49,6 +49,11 @@ DataSourcePtr HiveDataSourceProvider::create_data_source(const TScanRange& scan_ return std::make_unique(this, scan_range); } +bool HiveDataSourceProvider::always_shared_scan(TScanRangeParams scan_range) const { + return !scan_range.scan_range.hdfs_scan_range.__isset.use_kudu_jni_reader && + !scan_range.scan_range.hdfs_scan_range.use_kudu_jni_reader; +} + const TupleDescriptor* HiveDataSourceProvider::tuple_descriptor(RuntimeState* state) const { return state->desc_tbl().get_tuple_descriptor(_hdfs_scan_node.tuple_id); } diff --git a/be/src/connector/hive_connector.h b/be/src/connector/hive_connector.h index 79b41b9d39e24..5c54767332521 100644 --- a/be/src/connector/hive_connector.h +++ b/be/src/connector/hive_connector.h @@ -44,6 +44,7 @@ class HiveDataSourceProvider final : public DataSourceProvider { HiveDataSourceProvider(ConnectorScanNode* scan_node, const TPlanNode& plan_node); DataSourcePtr create_data_source(const TScanRange& scan_range) override; const TupleDescriptor* tuple_descriptor(RuntimeState* state) const override; + bool always_shared_scan(TScanRangeParams scan_range) const override; void peek_scan_ranges(const std::vector& scan_ranges) override; void default_data_source_mem_bytes(int64_t* min_value, int64_t* max_value) override; diff --git a/be/src/connector/lake_connector.h b/be/src/connector/lake_connector.h index e6fefe0b3ffe0..8a04ca368436c 100644 --- a/be/src/connector/lake_connector.h +++ b/be/src/connector/lake_connector.h @@ -199,7 +199,7 @@ class LakeDataSourceProvider final : public DataSourceProvider { const TupleDescriptor* tuple_descriptor(RuntimeState* state) const override; // always enable shared scan for cloud native table - bool always_shared_scan() const override { return true; } + bool always_shared_scan(TScanRangeParams scan_range) const override { return true; } StatusOr convert_scan_range_to_morsel_queue( const std::vector& scan_ranges, int node_id, int32_t pipeline_dop, diff --git a/be/src/exec/connector_scan_node.cpp b/be/src/exec/connector_scan_node.cpp index 9b8720aca3841..b720b614c48e5 100644 --- a/be/src/exec/connector_scan_node.cpp +++ b/be/src/exec/connector_scan_node.cpp @@ -680,7 +680,7 @@ void ConnectorScanNode::_init_counter() { } bool ConnectorScanNode::always_shared_scan() const { - return _data_source_provider->always_shared_scan(); + return _scan_ranges.size() > 0 && _data_source_provider->always_shared_scan(_scan_ranges[0]); } StatusOr ConnectorScanNode::convert_scan_range_to_morsel_queue( diff --git a/be/test/exec/connector_scan_node_test.cpp b/be/test/exec/connector_scan_node_test.cpp index 11c9fbcb736b9..0bba093c570a7 100644 --- a/be/test/exec/connector_scan_node_test.cpp +++ b/be/test/exec/connector_scan_node_test.cpp @@ -49,7 +49,7 @@ class ConnectorScanNodeTest : public ::testing::Test { std::vector create_scan_ranges_cloud(size_t num); std::shared_ptr create_tplan_node_hive(); - std::vector create_scan_ranges_hive(size_t num); + std::vector create_scan_ranges_hive(size_t num, std::string scanner_type); std::shared_ptr create_tplan_node_stream_load(); std::vector create_scan_ranges_stream_load(RuntimeState* runtime_state, @@ -188,7 +188,7 @@ std::shared_ptr ConnectorScanNodeTest::create_tplan_node_hive() { return tnode; } -std::vector ConnectorScanNodeTest::create_scan_ranges_hive(size_t num) { +std::vector ConnectorScanNodeTest::create_scan_ranges_hive(size_t num, std::string scanner_type) { std::vector scan_ranges; for (int i = 0; i < num; i++) { @@ -196,6 +196,9 @@ std::vector ConnectorScanNodeTest::create_scan_ranges_hive(siz hdfs_scan_range.__set_full_path("file"); hdfs_scan_range.__set_offset(i); hdfs_scan_range.__set_length(1); + if (scanner_type == "kudu") { + hdfs_scan_range.__set_use_kudu_jni_reader(true); + } TScanRange scan_range; scan_range.__set_hdfs_scan_range(hdfs_scan_range); @@ -223,7 +226,7 @@ TEST_F(ConnectorScanNodeTest, test_convert_scan_range_to_morsel_queue_factory_hi // dop is 1 and not so much morsels int pipeline_dop = 1; - auto scan_ranges = create_scan_ranges_hive(1); + auto scan_ranges = create_scan_ranges_hive(1, "hive")); ASSIGN_OR_ABORT(auto morsel_queue_factory, scan_node->convert_scan_range_to_morsel_queue_factory( scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false, @@ -232,7 +235,7 @@ TEST_F(ConnectorScanNodeTest, test_convert_scan_range_to_morsel_queue_factory_hi // dop is 2 and not so much morsels pipeline_dop = 2; - scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator()); + scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator(), "hive")); ASSIGN_OR_ABORT(morsel_queue_factory, scan_node->convert_scan_range_to_morsel_queue_factory( scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false, @@ -241,7 +244,7 @@ TEST_F(ConnectorScanNodeTest, test_convert_scan_range_to_morsel_queue_factory_hi // dop is 2 and so much morsels pipeline_dop = 2; - scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator() + 1); + scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator() + 1, "hive"); ASSIGN_OR_ABORT(morsel_queue_factory, scan_node->convert_scan_range_to_morsel_queue_factory( scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false, @@ -351,4 +354,45 @@ TEST_F(ConnectorScanNodeTest, test_stream_load_thread_pool) { ASSERT_TRUE(scan_node->use_stream_load_thread_pool()); } +TEST_F(ConnectorScanNodeTest, test_convert_scan_range_to_morsel_queue_factory_kudu) { + std::shared_ptr runtime_state = create_runtime_state(); + std::vector types; + types.emplace_back(TYPE_INT); + auto* descs = create_table_desc(runtime_state.get(), types); + auto tnode = create_tplan_node_hive(); + auto scan_node = std::make_shared(runtime_state->obj_pool(), *tnode, *descs); + ASSERT_OK(scan_node->init(*tnode, runtime_state.get())); + + bool enable_tablet_internal_parallel = false; + auto tablet_internal_parallel_mode = TTabletInternalParallelMode::type::AUTO; + std::map> no_scan_ranges_per_driver_seq; + + // dop is 1 and not so much morsels + int pipeline_dop = 1; + auto scan_ranges = create_scan_ranges_hive(1, "kudu"); + ASSIGN_OR_ABORT(auto morsel_queue_factory, + scan_node->convert_scan_range_to_morsel_queue_factory( + scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false, + enable_tablet_internal_parallel, tablet_internal_parallel_mode)); + ASSERT_TRUE(morsel_queue_factory->is_shared()); + + // dop is 2 and not so much morsels + pipeline_dop = 2; + scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator(), "kudu"); + ASSIGN_OR_ABORT(morsel_queue_factory, + scan_node->convert_scan_range_to_morsel_queue_factory( + scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false, + enable_tablet_internal_parallel, tablet_internal_parallel_mode)); + ASSERT_FALSE(morsel_queue_factory->is_shared()); + + // dop is 2 and so much morsels + pipeline_dop = 2; + scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator() + 1, "kudu"); + ASSIGN_OR_ABORT(morsel_queue_factory, + scan_node->convert_scan_range_to_morsel_queue_factory( + scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false, + enable_tablet_internal_parallel, tablet_internal_parallel_mode)); + ASSERT_TRUE(morsel_queue_factory->is_shared()); +} + } // namespace starrocks diff --git a/be/test/storage/lake/lake_data_source_test.cpp b/be/test/storage/lake/lake_data_source_test.cpp index 2c5d6f73712ca..5d0ebc9207319 100644 --- a/be/test/storage/lake/lake_data_source_test.cpp +++ b/be/test/storage/lake/lake_data_source_test.cpp @@ -163,7 +163,8 @@ TEST_F(LakeDataSourceTest, test_convert_scan_range_to_morsel_queue) { auto data_source_provider = dynamic_cast(scan_node->data_source_provider()); data_source_provider->set_lake_tablet_manager(_tablet_mgr.get()); - ASSERT_TRUE(data_source_provider->always_shared_scan()); + TScanRangeParams scan_range; + ASSERT_TRUE(data_source_provider->always_shared_scan(scan_range)); config::tablet_internal_parallel_max_splitted_scan_bytes = 32; config::tablet_internal_parallel_min_splitted_scan_rows = 4;