From 5411897e35999238743ac3f71a086a843ed941a5 Mon Sep 17 00:00:00 2001 From: stdpain <34912776+stdpain@users.noreply.github.com> Date: Fri, 22 Nov 2024 11:23:26 +0800 Subject: [PATCH] [Enhancement] add limit operator before union gather (#53102) Signed-off-by: stdpain --- be/src/exec/union_node.cpp | 8 +++++- .../test_union/R/test_union_all_with_limit | 28 +++++++++++++++++++ .../test_union/T/test_union_all_with_limit | 24 ++++++++++++++++ 3 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 test/sql/test_union/R/test_union_all_with_limit create mode 100644 test/sql/test_union/T/test_union_all_with_limit diff --git a/be/src/exec/union_node.cpp b/be/src/exec/union_node.cpp index 1d0d16b0fd143..35587da748081 100644 --- a/be/src/exec/union_node.cpp +++ b/be/src/exec/union_node.cpp @@ -420,12 +420,18 @@ pipeline::OpFactories UnionNode::decompose_to_pipeline(pipeline::PipelineBuilder this->init_runtime_filter_for_operator(operators_list[i].back().get(), context, rc_rf_probe_collector); } + if (limit() != -1) { + for (size_t i = 0; i < operators_list.size(); ++i) { + operators_list[i].emplace_back( + std::make_shared(context->next_operator_id(), id(), limit())); + } + } + auto final_operators = context->maybe_gather_pipelines_to_one(runtime_state(), id(), operators_list); if (limit() != -1) { final_operators.emplace_back( std::make_shared(context->next_operator_id(), id(), limit())); } - return final_operators; } diff --git a/test/sql/test_union/R/test_union_all_with_limit b/test/sql/test_union/R/test_union_all_with_limit new file mode 100644 index 0000000000000..2aa59786520a6 --- /dev/null +++ b/test/sql/test_union/R/test_union_all_with_limit @@ -0,0 +1,28 @@ +-- name: test_union_all_with_limit +CREATE TABLE `t0` ( + `c0` int(11) NULL COMMENT "", + `c1` varchar(20) NULL COMMENT "", + `c2` varchar(200) NULL COMMENT "", + `c3` int(11) NULL COMMENT "" +) ENGINE=OLAP +DUPLICATE KEY(`c0`, `c1`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`c0`, `c1`) BUCKETS 48 +PROPERTIES ( +"colocate_with" = "${uuid0}", +"replication_num" = "1", +"in_memory" = "false", +"storage_format" = "DEFAULT", +"enable_persistent_index" = "false", +"replicated_storage" = "true", +"compression" = "LZ4" +); +-- result: +-- !result +insert into t0 SELECT generate_series, generate_series, generate_series, generate_series FROM TABLE(generate_series(1, 40960)); +-- result: +-- !result +select count(*) from (select c0 from t0 union all select distinct c0 from t0 limit 1)t; +-- result: +1 +-- !result \ No newline at end of file diff --git a/test/sql/test_union/T/test_union_all_with_limit b/test/sql/test_union/T/test_union_all_with_limit new file mode 100644 index 0000000000000..09deac14a1ca8 --- /dev/null +++ b/test/sql/test_union/T/test_union_all_with_limit @@ -0,0 +1,24 @@ +-- name: test_union_all_with_limit + +CREATE TABLE `t0` ( + `c0` int(11) NULL COMMENT "", + `c1` varchar(20) NULL COMMENT "", + `c2` varchar(200) NULL COMMENT "", + `c3` int(11) NULL COMMENT "" +) ENGINE=OLAP +DUPLICATE KEY(`c0`, `c1`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`c0`, `c1`) BUCKETS 48 +PROPERTIES ( +"colocate_with" = "${uuid0}", +"replication_num" = "1", +"in_memory" = "false", +"storage_format" = "DEFAULT", +"enable_persistent_index" = "false", +"replicated_storage" = "true", +"compression" = "LZ4" +); + +insert into t0 SELECT generate_series, generate_series, generate_series, generate_series FROM TABLE(generate_series(1, 40960)); + +select count(*) from (select c0 from t0 union all select distinct c0 from t0 limit 1)t; \ No newline at end of file