-
Notifications
You must be signed in to change notification settings - Fork 671
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
PG17 compatibility: add/fix tests with correlated subqueries that can…
… be pulled to a join (#7745) Fix Test Failure in subquery_in_where, set_operations, dml_recursive in PG17 #7741 The test failures are caused by[ this commit in PG17](https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=9f1337639), which enables correlated subqueries to be pulled up to a join. Prior to this, the correlated subquery was implemented as a subplan. In citus, it is not possible to pushdown a correlated subplan, but with a different plan in PG17 the query can be executed, per the test diff from `subquery_in_where`: ``` 37,39c37,41 < DEBUG: generating subplan XXX_1 for CTE event_id: SELECT user_id AS events_user_id, "time" AS events_time, event_type FROM public.events_table < DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ... < ERROR: correlated subqueries are not supported when the FROM clause contains a CTE or subquery --- > count > --------------------------------------------------------------------- > 0 > (1 row) > ``` This is because with pg17 `= ANY subquery` in the queries can be implemented as a join, instead of as a subplan filter on a table scan. For example, `SELECT * FROM test a WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) ORDER BY 1,2` (from set_operations) has this plan in pg17; note that the subquery is the inner side of a nested loop join: ``` ┌───────────────────────────────────────────────────┐ │ QUERY PLAN │ ├───────────────────────────────────────────────────┤ │ Sort │ │ Sort Key: a.x, a.y │ │ -> Nested Loop │ │ -> Seq Scan on test a │ │ -> Subquery Scan on "ANY_subquery" │ │ Filter: (a.x = "ANY_subquery".x) │ │ -> HashAggregate │ │ Group Key: b.x │ │ -> Append │ │ -> Seq Scan on test b │ │ -> Seq Scan on test c │ │ Filter: (a.x = x) │ └───────────────────────────────────────────────────┘ ``` and this plan in pg16 (and previous pg versions); the subquery is a correlated subplan filter on a table scan: ``` ┌───────────────────────────────────────────────┐ │ QUERY PLAN │ ├───────────────────────────────────────────────┤ │ Sort │ │ Sort Key: a.x, a.y │ │ -> Seq Scan on test a │ │ Filter: (SubPlan 1) │ │ SubPlan 1 │ │ -> HashAggregate │ │ Group Key: b.x │ │ -> Append │ │ -> Seq Scan on test b │ │ -> Seq Scan on test c │ │ Filter: (a.x = x) │ └───────────────────────────────────────────────┘ ``` The fix Modifies the queries causing the test failures so that an ANY subquery is not folded to a join, preserving the expected output of the tests. A similar approach was taken for existing regress tests in the[ postgres commit](https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=9f1337639). See the `join `regress test, for example. We also add pg17 specific tests that leverage this improvement in Postgres with Citus distributed planning as well.
- Loading branch information
1 parent
e2865b7
commit d69f8cd
Showing
10 changed files
with
842 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,354 @@ | ||
-- | ||
-- PG17 | ||
-- | ||
SHOW server_version \gset | ||
SELECT substring(:'server_version', '\d+')::int >= 17 AS server_version_ge_17 | ||
\gset | ||
-- PG17 has the capabilty to pull up a correlated ANY subquery to a join if | ||
-- the subquery only refers to its immediate parent query. Previously, the | ||
-- subquery needed to be implemented as a SubPlan node, typically as a | ||
-- filter on a scan or join node. This PG17 capability enables Citus to | ||
-- run queries with correlated subqueries in certain cases, as shown here. | ||
-- Relevant PG commit: | ||
-- https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=9f1337639 | ||
-- This feature is tested for all PG versions, not just PG17; each test query with | ||
-- a correlated subquery should fail with PG version < 17.0, but the test query | ||
-- rewritten to reflect how PG17 optimizes it should succeed with PG < 17.0 | ||
CREATE SCHEMA pg17_corr_subq_folding; | ||
SET search_path TO pg17_corr_subq_folding; | ||
SET citus.next_shard_id TO 20240017; | ||
SET citus.shard_count TO 2; | ||
SET citus.shard_replication_factor TO 1; | ||
CREATE TABLE test (x int, y int); | ||
SELECT create_distributed_table('test', 'x'); | ||
create_distributed_table | ||
--------------------------------------------------------------------- | ||
|
||
(1 row) | ||
|
||
INSERT INTO test VALUES (1,1), (2,2); | ||
-- Query 1: WHERE clause has a correlated subquery with a UNION. PG17 can plan | ||
-- this as a nested loop join with the subquery as the inner. The correlation | ||
-- is on the distribution column so the join can be pushed down by Citus. | ||
explain (costs off) | ||
SELECT * | ||
FROM test a | ||
WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) | ||
ORDER BY 1,2; | ||
QUERY PLAN | ||
--------------------------------------------------------------------- | ||
Sort | ||
Sort Key: remote_scan.x, remote_scan.y | ||
-> Custom Scan (Citus Adaptive) | ||
Task Count: 2 | ||
Tasks Shown: One of 2 | ||
-> Task | ||
Node: host=localhost port=xxxxx dbname=regression | ||
-> Nested Loop | ||
-> Seq Scan on test_20240017 a | ||
-> Subquery Scan on "ANY_subquery" | ||
Filter: (a.x = "ANY_subquery".x) | ||
-> HashAggregate | ||
Group Key: b.x | ||
-> Append | ||
-> Seq Scan on test_20240017 b | ||
-> Seq Scan on test_20240017 c | ||
Filter: (a.x = x) | ||
(17 rows) | ||
|
||
SET client_min_messages TO DEBUG2; | ||
SELECT * | ||
FROM test a | ||
WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) | ||
ORDER BY 1,2; | ||
DEBUG: Router planner cannot handle multi-shard select queries | ||
x | y | ||
--------------------------------------------------------------------- | ||
1 | 1 | ||
2 | 2 | ||
(2 rows) | ||
|
||
RESET client_min_messages; | ||
-- Query 1 rewritten with subquery pulled up to a join, as done by PG17 planner; | ||
-- this query can be run without issues by Citus with older (pre PG17) PGs. | ||
explain (costs off) | ||
SELECT a.* | ||
FROM test a JOIN LATERAL (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) dt1 ON a.x = dt1.x | ||
ORDER BY 1,2; | ||
QUERY PLAN | ||
--------------------------------------------------------------------- | ||
Sort | ||
Sort Key: remote_scan.x, remote_scan.y | ||
-> Custom Scan (Citus Adaptive) | ||
Task Count: 2 | ||
Tasks Shown: One of 2 | ||
-> Task | ||
Node: host=localhost port=xxxxx dbname=regression | ||
-> Nested Loop | ||
-> Seq Scan on test_20240017 a | ||
-> Subquery Scan on dt1 | ||
Filter: (a.x = dt1.x) | ||
-> HashAggregate | ||
Group Key: b.x | ||
-> Append | ||
-> Seq Scan on test_20240017 b | ||
-> Seq Scan on test_20240017 c | ||
Filter: (a.x = x) | ||
(17 rows) | ||
|
||
SET client_min_messages TO DEBUG2; | ||
SELECT a.* | ||
FROM test a JOIN LATERAL (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) dt1 ON a.x = dt1.x | ||
ORDER BY 1,2; | ||
DEBUG: Router planner cannot handle multi-shard select queries | ||
x | y | ||
--------------------------------------------------------------------- | ||
1 | 1 | ||
2 | 2 | ||
(2 rows) | ||
|
||
RESET client_min_messages; | ||
CREATE TABLE users (user_id int, time int, dept int, info bigint); | ||
CREATE TABLE events (user_id int, time int, event_type int, payload text); | ||
select create_distributed_table('users', 'user_id'); | ||
create_distributed_table | ||
--------------------------------------------------------------------- | ||
|
||
(1 row) | ||
|
||
select create_distributed_table('events', 'user_id'); | ||
create_distributed_table | ||
--------------------------------------------------------------------- | ||
|
||
(1 row) | ||
|
||
insert into users | ||
select i, 2021 + (i % 3), i % 5, 99999 * i from generate_series(1, 10) i; | ||
insert into events | ||
select i % 10 + 1, 2021 + (i % 3), i %11, md5((i*i)::text) from generate_series(1, 100) i; | ||
-- Query 2. In Citus correlated subqueries can not be used in the WHERE | ||
-- clause but if the subquery can be pulled up to a join it becomes possible | ||
-- for Citus to run the query, per this example. Pre PG17 the suqbuery | ||
-- was implemented as a SubPlan filter on the events table scan. | ||
EXPLAIN (costs off) | ||
WITH event_id | ||
AS(SELECT user_id AS events_user_id, | ||
time AS events_time, | ||
event_type | ||
FROM events) | ||
SELECT Count(*) | ||
FROM event_id | ||
WHERE (events_user_id) IN (SELECT user_id | ||
FROM users | ||
WHERE users.time = events_time); | ||
QUERY PLAN | ||
--------------------------------------------------------------------- | ||
Aggregate | ||
-> Custom Scan (Citus Adaptive) | ||
Task Count: 2 | ||
Tasks Shown: One of 2 | ||
-> Task | ||
Node: host=localhost port=xxxxx dbname=regression | ||
-> Aggregate | ||
-> Hash Join | ||
Hash Cond: ((events."time" = users."time") AND (events.user_id = users.user_id)) | ||
-> Seq Scan on events_20240021 events | ||
-> Hash | ||
-> HashAggregate | ||
Group Key: users."time", users.user_id | ||
-> Seq Scan on users_20240019 users | ||
(14 rows) | ||
|
||
SET client_min_messages TO DEBUG2; | ||
WITH event_id | ||
AS(SELECT user_id AS events_user_id, | ||
time AS events_time, | ||
event_type | ||
FROM events) | ||
SELECT Count(*) | ||
FROM event_id | ||
WHERE (events_user_id) IN (SELECT user_id | ||
FROM users | ||
WHERE users.time = events_time); | ||
DEBUG: CTE event_id is going to be inlined via distributed planning | ||
DEBUG: Router planner cannot handle multi-shard select queries | ||
count | ||
--------------------------------------------------------------------- | ||
31 | ||
(1 row) | ||
|
||
RESET client_min_messages; | ||
-- Query 2 rewritten with subquery pulled up to a join, as done by pg17 planner. Citus | ||
-- Citus is able to run this query with previous pg versions. Note that the CTE can be | ||
-- disregarded because it is inlined, being only referenced once. | ||
EXPLAIN (COSTS OFF) | ||
SELECT Count(*) | ||
FROM (SELECT user_id AS events_user_id, | ||
time AS events_time, | ||
event_type FROM events) dt1 | ||
INNER JOIN (SELECT distinct user_id, time FROM users) dt | ||
ON events_user_id = dt.user_id and events_time = dt.time; | ||
QUERY PLAN | ||
--------------------------------------------------------------------- | ||
Aggregate | ||
-> Custom Scan (Citus Adaptive) | ||
Task Count: 2 | ||
Tasks Shown: One of 2 | ||
-> Task | ||
Node: host=localhost port=xxxxx dbname=regression | ||
-> Aggregate | ||
-> Hash Join | ||
Hash Cond: ((events.user_id = users.user_id) AND (events."time" = users."time")) | ||
-> Seq Scan on events_20240021 events | ||
-> Hash | ||
-> HashAggregate | ||
Group Key: users.user_id, users."time" | ||
-> Seq Scan on users_20240019 users | ||
(14 rows) | ||
|
||
SET client_min_messages TO DEBUG2; | ||
SELECT Count(*) | ||
FROM (SELECT user_id AS events_user_id, | ||
time AS events_time, | ||
event_type FROM events) dt1 | ||
INNER JOIN (SELECT distinct user_id, time FROM users) dt | ||
ON events_user_id = dt.user_id and events_time = dt.time; | ||
DEBUG: Router planner cannot handle multi-shard select queries | ||
count | ||
--------------------------------------------------------------------- | ||
31 | ||
(1 row) | ||
|
||
RESET client_min_messages; | ||
-- Query 3: another example where recursive planning was prevented due to | ||
-- correlated subqueries, but with PG17 folding the subquery to a join it is | ||
-- possible for Citus to plan and run the query. | ||
EXPLAIN (costs off) | ||
SELECT dept, sum(user_id) FROM | ||
(SELECT users.dept, users.user_id | ||
FROM users, events as d1 | ||
WHERE d1.user_id = users.user_id | ||
AND users.dept IN (3,4) | ||
AND users.user_id IN | ||
(SELECT s2.user_id FROM users as s2 | ||
GROUP BY d1.user_id, s2.user_id)) dt | ||
GROUP BY dept; | ||
QUERY PLAN | ||
--------------------------------------------------------------------- | ||
HashAggregate | ||
Group Key: remote_scan.dept | ||
-> Custom Scan (Citus Adaptive) | ||
Task Count: 2 | ||
Tasks Shown: One of 2 | ||
-> Task | ||
Node: host=localhost port=xxxxx dbname=regression | ||
-> GroupAggregate | ||
Group Key: users.dept | ||
-> Sort | ||
Sort Key: users.dept | ||
-> Nested Loop Semi Join | ||
-> Hash Join | ||
Hash Cond: (d1.user_id = users.user_id) | ||
-> Seq Scan on events_20240021 d1 | ||
-> Hash | ||
-> Seq Scan on users_20240019 users | ||
Filter: (dept = ANY ('{3,4}'::integer[])) | ||
-> Subquery Scan on "ANY_subquery" | ||
Filter: (d1.user_id = "ANY_subquery".user_id) | ||
-> HashAggregate | ||
Group Key: s2.user_id | ||
-> Seq Scan on users_20240019 s2 | ||
(23 rows) | ||
|
||
SET client_min_messages TO DEBUG2; | ||
SELECT dept, sum(user_id) FROM | ||
(SELECT users.dept, users.user_id | ||
FROM users, events as d1 | ||
WHERE d1.user_id = users.user_id | ||
AND users.dept IN (3,4) | ||
AND users.user_id IN | ||
(SELECT s2.user_id FROM users as s2 | ||
GROUP BY d1.user_id, s2.user_id)) dt | ||
GROUP BY dept; | ||
DEBUG: Router planner cannot handle multi-shard select queries | ||
dept | sum | ||
--------------------------------------------------------------------- | ||
3 | 110 | ||
4 | 130 | ||
(2 rows) | ||
|
||
RESET client_min_messages; | ||
-- Query 3 rewritten in a similar way to how the PG17 pulls up the subquery; | ||
-- the join is on the distribution key so Citus can push down. | ||
EXPLAIN (costs off) | ||
SELECT dept, sum(user_id) FROM | ||
(SELECT users.dept, users.user_id | ||
FROM users, events as d1 | ||
JOIN LATERAL (SELECT s2.user_id FROM users as s2 | ||
GROUP BY s2.user_id HAVING d1.user_id IS NOT NULL) as d2 ON 1=1 | ||
WHERE d1.user_id = users.user_id | ||
AND users.dept IN (3,4) | ||
AND users.user_id = d2.user_id) dt | ||
GROUP BY dept; | ||
QUERY PLAN | ||
--------------------------------------------------------------------- | ||
HashAggregate | ||
Group Key: remote_scan.dept | ||
-> Custom Scan (Citus Adaptive) | ||
Task Count: 2 | ||
Tasks Shown: One of 2 | ||
-> Task | ||
Node: host=localhost port=xxxxx dbname=regression | ||
-> GroupAggregate | ||
Group Key: users.dept | ||
-> Sort | ||
Sort Key: users.dept | ||
-> Nested Loop | ||
-> Hash Join | ||
Hash Cond: (d1.user_id = users.user_id) | ||
-> Seq Scan on events_20240021 d1 | ||
-> Hash | ||
-> Seq Scan on users_20240019 users | ||
Filter: (dept = ANY ('{3,4}'::integer[])) | ||
-> Subquery Scan on d2 | ||
Filter: (d1.user_id = d2.user_id) | ||
-> HashAggregate | ||
Group Key: s2.user_id | ||
-> Result | ||
One-Time Filter: (d1.user_id IS NOT NULL) | ||
-> Seq Scan on users_20240019 s2 | ||
(25 rows) | ||
|
||
SET client_min_messages TO DEBUG2; | ||
SELECT dept, sum(user_id) FROM | ||
(SELECT users.dept, users.user_id | ||
FROM users, events as d1 | ||
JOIN LATERAL (SELECT s2.user_id FROM users as s2 | ||
GROUP BY s2.user_id HAVING d1.user_id IS NOT NULL) as d2 ON 1=1 | ||
WHERE d1.user_id = users.user_id | ||
AND users.dept IN (3,4) | ||
AND users.user_id = d2.user_id) dt | ||
GROUP BY dept; | ||
DEBUG: Router planner cannot handle multi-shard select queries | ||
dept | sum | ||
--------------------------------------------------------------------- | ||
3 | 110 | ||
4 | 130 | ||
(2 rows) | ||
|
||
RESET client_min_messages; | ||
RESET search_path; | ||
RESET citus.next_shard_id; | ||
RESET citus.shard_count; | ||
RESET citus.shard_replication_factor; | ||
DROP SCHEMA pg17_corr_subq_folding CASCADE; | ||
NOTICE: drop cascades to 3 other objects | ||
DETAIL: drop cascades to table pg17_corr_subq_folding.test | ||
drop cascades to table pg17_corr_subq_folding.users | ||
drop cascades to table pg17_corr_subq_folding.events | ||
\if :server_version_ge_17 | ||
\else | ||
\q | ||
\endif | ||
-- PG17-specific tests go here. | ||
-- |
Oops, something went wrong.