Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 57 additions & 3 deletions src/backend/distributed/planner/multi_physical_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -2183,6 +2183,7 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
int minShardOffset = INT_MAX;
int prevShardCount = 0;
Bitmapset *taskRequiredForShardIndex = NULL;
Bitmapset *distributedTableIndex = NULL;

/* error if shards are not co-partitioned */
ErrorIfUnsupportedShardDistribution(query);
Expand All @@ -2198,9 +2199,8 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,

RelationRestriction *relationRestriction = NULL;
List *prunedShardList = NULL;

forboth_ptr(prunedShardList, prunedRelationShardList,
relationRestriction, relationRestrictionContext->relationRestrictionList)
foreach_ptr(relationRestriction,
relationRestrictionContext->relationRestrictionList)
{
Oid relationId = relationRestriction->relationId;

Expand All @@ -2220,6 +2220,54 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
return NIL;
}
prevShardCount = cacheEntry->shardIntervalArrayLength;
distributedTableIndex = bms_add_member(distributedTableIndex,
relationRestriction->index);
}

bool noDistTables = bms_is_empty(distributedTableIndex);
bool hasRefOrSchemaShardedTable = false;

forboth_ptr(prunedShardList, prunedRelationShardList,
relationRestriction, relationRestrictionContext->relationRestrictionList)
{
Oid relationId = relationRestriction->relationId;

CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
if (!HasDistributionKeyCacheEntry(cacheEntry))
{
if (noDistTables && !hasRefOrSchemaShardedTable)
{
/*
* Before continuing, check if we're looking at a reference or schema-
* sharded table. If so, and it is the first such table we've seen, we
* add a task for shard index 0; all reference and schema sharded tables
* have shard index 0 so we can hard-code the value rather than looking at
* the shardIndex in pruned shard list, as is done further on down for
* distributed tables.
*
* Note that this only needs to be done once, regardless of how many
* reference or schema sharded tables there are; they all have the
* same shard index (0), and will require just one task.
*
* Also note that this is only done if there are no distributed tables
* involved; the relevant shard indexes will get added, and furthermore
* we don't want to incorrectly add shard index 0 if for example a left
* outer join between a reference table and a distributed table also has
* a restriction that prunes out shard index 0 of the distributed table.
*/
CitusTableType currentTableType = GetCitusTableType(cacheEntry);
hasRefOrSchemaShardedTable = currentTableType == REFERENCE_TABLE ||
currentTableType == SINGLE_SHARD_DISTRIBUTED;
if (hasRefOrSchemaShardedTable)
{
taskRequiredForShardIndex = bms_add_member(taskRequiredForShardIndex,
0);
minShardOffset = 0;
}
}

continue;
}

/*
* For left joins we don't care about the shards pruned for the right hand side.
Expand Down Expand Up @@ -2277,6 +2325,12 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
++taskIdIndex;
}

/* If we detected a reference or schema sharded table then there
* should be no distributed tables involved and exactly one task.
*/
Assert(!hasRefOrSchemaShardedTable || (noDistTables &&
list_length(sqlTaskList) == 1));

/* If it is a modify task with multiple tables */
if (taskType == MODIFY_TASK && list_length(
relationRestrictionContext->relationRestrictionList) > 1)
Expand Down
307 changes: 307 additions & 0 deletions src/test/regress/expected/issue_8243.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
-- Test the fix for https://github.com/citusdata/citus/issues/8243
-- Fix empty list for worker subquery tasks when the query has no
-- ditribtued table but at least one reference table or schema
-- sharded table
SET citus.next_shard_id TO 580000;
SET citus.shard_count TO 32;
SET citus.shard_replication_factor to 1;
CREATE SCHEMA issue_8243;
SET search_path TO issue_8243;
-- DDL for the test; we need some schema sharded tables
CREATE SCHEMA schmshrd;
SELECT citus_schema_distribute('schmshrd');
NOTICE: distributing the schema schmshrd
citus_schema_distribute
---------------------------------------------------------------------

(1 row)

CREATE TABLE schmshrd.t1(id bigserial PRIMARY KEY, name text, val int);
CREATE TABLE schmshrd.t2(id bigserial PRIMARY KEY, name text, val int);
CREATE TABLE schmshrd.t3(id bigserial PRIMARY KEY, name text, val int);
-- and some reference tables
CREATE TABLE ref1(id bigserial PRIMARY KEY, name text);
CREATE TABLE ref2(id bigserial PRIMARY KEY, name text);
CREATE TABLE ref3(id bigserial PRIMARY KEY, name text);
SELECT create_reference_table('ref1');
create_reference_table
---------------------------------------------------------------------

(1 row)

SELECT create_reference_table('ref2');
create_reference_table
---------------------------------------------------------------------

(1 row)

SELECT create_reference_table('ref3');
create_reference_table
---------------------------------------------------------------------

(1 row)

-- and distributed tables
CREATE TABLE dist1_8243(id bigserial PRIMARY KEY, name text, val int);
CREATE TABLE dist2_8243(id bigserial PRIMARY KEY, name text, val int);
SELECT create_distributed_table('dist1_8243', 'id');
create_distributed_table
---------------------------------------------------------------------

(1 row)

SELECT create_distributed_table('dist2_8243', 'id');
create_distributed_table
---------------------------------------------------------------------

(1 row)

-- Test some queries that would previously end up with empty subquery tasks on workers.
-- Query characteristics:
-- - no distributed tables
-- - at least one reference table or schema sharded table
-- - some property that prevents a router plan (e.g. nextval() on a citus table in select targets)
-- - worker subquery task(s) needed; the Postgres plan has a subquery scan node
-- Test 1: schema shareded table only; exactly 1 task in the query plan.
EXPLAIN (verbose, costs off)
SELECT nextval('schmshrd.t1_id_seq'::regclass) AS id, name
FROM (select name from schmshrd.t2 group by name) sub ;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: nextval('schmshrd.t1_id_seq'::regclass), remote_scan.name
Task Count: 1
Tasks Shown: All
-> Task
Query: SELECT worker_column_1 AS name FROM (SELECT sub.name AS worker_column_1 FROM (SELECT t2.name FROM schmshrd.t2_580001 t2 GROUP BY t2.name) sub) worker_subquery
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: t2.name
Group Key: t2.name
-> Seq Scan on schmshrd.t2_580001 t2
Output: t2.id, t2.name, t2.val
(12 rows)

-- Test 2: bunch of schema sharded tables; still expect 1 task
EXPLAIN (verbose, costs off)
SELECT nextval('schmshrd.t1_id_seq'::regclass) AS id, sub.name
FROM (select t1.name as name from schmshrd.t1 t1, schmshrd.t2 t2, schmshrd.t3 t3 where t1.id = t2.id and t3.name = t2.name group by t1.name) sub ;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: nextval('schmshrd.t1_id_seq'::regclass), remote_scan.name
Task Count: 1
Tasks Shown: All
-> Task
Query: SELECT worker_column_1 AS name FROM (SELECT sub.name AS worker_column_1 FROM (SELECT t1.name FROM schmshrd.t1_580000 t1, schmshrd.t2_580001 t2, schmshrd.t3_580002 t3 WHERE ((t1.id OPERATOR(pg_catalog.=) t2.id) AND (t3.name OPERATOR(pg_catalog.=) t2.name)) GROUP BY t1.name) sub) worker_subquery
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: t1.name
Group Key: t1.name
-> Merge Join
Output: t1.name
Merge Cond: (t2.name = t3.name)
-> Sort
Output: t1.name, t2.name
Sort Key: t2.name
-> Hash Join
Output: t1.name, t2.name
Inner Unique: true
Hash Cond: (t1.id = t2.id)
-> Seq Scan on schmshrd.t1_580000 t1
Output: t1.id, t1.name, t1.val
-> Hash
Output: t2.id, t2.name
-> Seq Scan on schmshrd.t2_580001 t2
Output: t2.id, t2.name
-> Sort
Output: t3.name
Sort Key: t3.name
-> Seq Scan on schmshrd.t3_580002 t3
Output: t3.name
(31 rows)

-- Test 3: reference table only; exactly 1 task in the query plan.
EXPLAIN (verbose, costs off)
SELECT nextval('ref1_id_seq'::regclass) AS id, name
FROM (select name from ref2 group by name) sub ;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: nextval('ref1_id_seq'::regclass), remote_scan.name
Task Count: 1
Tasks Shown: All
-> Task
Query: SELECT worker_column_1 AS name FROM (SELECT sub.name AS worker_column_1 FROM (SELECT ref2.name FROM issue_8243.ref2_580004 ref2 GROUP BY ref2.name) sub) worker_subquery
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: ref2.name
Group Key: ref2.name
-> Seq Scan on issue_8243.ref2_580004 ref2
Output: ref2.id, ref2.name
(12 rows)

-- Test 4: bunch of reference tables; still expect 1 task
EXPLAIN (verbose, costs off)
SELECT nextval('ref1_id_seq'::regclass) AS id, sub.name
FROM (select r1.name as name from ref1 r1, ref2 r2, ref3 r3 where r1.id = r2.id and r3.name = r2.name group by r1.name) sub ;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: nextval('ref1_id_seq'::regclass), remote_scan.name
Task Count: 1
Tasks Shown: All
-> Task
Query: SELECT worker_column_1 AS name FROM (SELECT sub.name AS worker_column_1 FROM (SELECT r1.name FROM issue_8243.ref1_580003 r1, issue_8243.ref2_580004 r2, issue_8243.ref3_580005 r3 WHERE ((r1.id OPERATOR(pg_catalog.=) r2.id) AND (r3.name OPERATOR(pg_catalog.=) r2.name)) GROUP BY r1.name) sub) worker_subquery
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: r1.name
Group Key: r1.name
-> Merge Join
Output: r1.name
Merge Cond: (r2.name = r3.name)
-> Sort
Output: r1.name, r2.name
Sort Key: r2.name
-> Hash Join
Output: r1.name, r2.name
Inner Unique: true
Hash Cond: (r1.id = r2.id)
-> Seq Scan on issue_8243.ref1_580003 r1
Output: r1.id, r1.name
-> Hash
Output: r2.id, r2.name
-> Seq Scan on issue_8243.ref2_580004 r2
Output: r2.id, r2.name
-> Sort
Output: r3.name
Sort Key: r3.name
-> Seq Scan on issue_8243.ref3_580005 r3
Output: r3.name
(31 rows)

-- Test 5: mix of schema sharded and reference tables; exactly 1 task in the query plan.
EXPLAIN (verbose, costs off)
SELECT nextval('schmshrd.t1_id_seq'::regclass) AS id , sub.name
FROM (select t1.name as name from schmshrd.t1 t1, ref1 r1 where t1.id = r1.id and r1.id IN (select id from ref3) group by t1.name) sub ;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: nextval('schmshrd.t1_id_seq'::regclass), remote_scan.name
Task Count: 1
Tasks Shown: All
-> Task
Query: SELECT worker_column_1 AS name FROM (SELECT sub.name AS worker_column_1 FROM (SELECT t1.name FROM schmshrd.t1_580000 t1, issue_8243.ref1_580003 r1 WHERE ((t1.id OPERATOR(pg_catalog.=) r1.id) AND (r1.id OPERATOR(pg_catalog.=) ANY (SELECT ref3.id FROM issue_8243.ref3_580005 ref3))) GROUP BY t1.name) sub) worker_subquery
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: t1.name
Group Key: t1.name
-> Hash Join
Output: t1.name
Inner Unique: true
Hash Cond: (t1.id = ref3.id)
-> Hash Join
Output: t1.name, t1.id, r1.id
Inner Unique: true
Hash Cond: (r1.id = t1.id)
-> Seq Scan on issue_8243.ref1_580003 r1
Output: r1.id, r1.name
-> Hash
Output: t1.name, t1.id
-> Seq Scan on schmshrd.t1_580000 t1
Output: t1.name, t1.id
-> Hash
Output: ref3.id
-> Seq Scan on issue_8243.ref3_580005 ref3
Output: ref3.id
(28 rows)

-- Test 6: sanity tests - the fix does not interfere with outer join between reference and distributed table
-- where a restriction prunes out shard index 0 of the distributed table.
SET enable_memoize TO off;
-- Plan has 3 tasks
EXPLAIN (verbose, costs off)
SELECT x1.name, dist1_8243.val
FROM ref2 x1 left outer join dist1_8243 using (id)
WHERE dist1_8243.id IN (1, 10001, 999989);
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: remote_scan.name, remote_scan.val
Task Count: 3
Tasks Shown: One of 3
-> Task
Query: SELECT worker_column_1 AS name, worker_column_2 AS val FROM (SELECT x1.name AS worker_column_1, dist1_8243.val AS worker_column_2 FROM (issue_8243.ref2_580004 x1(id, name) LEFT JOIN issue_8243.dist1_8243_580007 dist1_8243(id, name, val) USING (id)) WHERE (dist1_8243.id OPERATOR(pg_catalog.=) ANY (ARRAY[(1)::bigint, (10001)::bigint, (999989)::bigint]))) worker_subquery
Node: host=localhost port=xxxxx dbname=regression
-> Nested Loop
Output: x1.name, dist1_8243.val
Inner Unique: true
-> Bitmap Heap Scan on issue_8243.dist1_8243_580007 dist1_8243
Output: dist1_8243.id, dist1_8243.name, dist1_8243.val
Recheck Cond: (dist1_8243.id = ANY ('{1,10001,999989}'::bigint[]))
-> Bitmap Index Scan on dist1_8243_pkey_580007
Index Cond: (dist1_8243.id = ANY ('{1,10001,999989}'::bigint[]))
-> Index Scan using ref2_pkey_580004 on issue_8243.ref2_580004 x1
Output: x1.id, x1.name
Index Cond: (x1.id = dist1_8243.id)
(18 rows)

-- Plan has 2 tasks
EXPLAIN (verbose, costs off)
SELECT x1.name, dist2_8243.val
FROM ref3 x1 left outer join dist2_8243 using (id)
WHERE dist2_8243.id IN (10001, 999989);
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: remote_scan.name, remote_scan.val
Task Count: 2
Tasks Shown: One of 2
-> Task
Query: SELECT worker_column_1 AS name, worker_column_2 AS val FROM (SELECT x1.name AS worker_column_1, dist2_8243.val AS worker_column_2 FROM (issue_8243.ref3_580005 x1(id, name) LEFT JOIN issue_8243.dist2_8243_580041 dist2_8243(id, name, val) USING (id)) WHERE (dist2_8243.id OPERATOR(pg_catalog.=) ANY (ARRAY[(10001)::bigint, (999989)::bigint]))) worker_subquery
Node: host=localhost port=xxxxx dbname=regression
-> Nested Loop
Output: x1.name, dist2_8243.val
Inner Unique: true
-> Bitmap Heap Scan on issue_8243.dist2_8243_580041 dist2_8243
Output: dist2_8243.id, dist2_8243.name, dist2_8243.val
Recheck Cond: (dist2_8243.id = ANY ('{10001,999989}'::bigint[]))
-> Bitmap Index Scan on dist2_8243_pkey_580041
Index Cond: (dist2_8243.id = ANY ('{10001,999989}'::bigint[]))
-> Index Scan using ref3_pkey_580005 on issue_8243.ref3_580005 x1
Output: x1.id, x1.name
Index Cond: (x1.id = dist2_8243.id)
(18 rows)

RESET enable_memoize;
-- Test 7: failing query from https://github.com/citusdata/citus/issues/8243
SET search_path TO schmshrd;
INSERT INTO t2 (name) VALUES ('user1'), ('user2'), ('user3'), ('user1'), ('user2'), ('user1');
INSERT INTO t1 (name) SELECT name FROM (SELECT name FROM t2 GROUP BY name) sub;
SELECT id, name FROM t1 ORDER BY id;
id | name
---------------------------------------------------------------------
1 | user3
2 | user2
3 | user1
(3 rows)

-- and for reference tables
SET search_path TO issue_8243;
INSERT INTO ref2 (name) VALUES ('user1'), ('user2'), ('user3'), ('user1'), ('user2'), ('user1');
INSERT INTO ref1 (name) SELECT name FROM (SELECT name FROM ref2 GROUP BY name) sub;
SELECT id, name FROM ref1 ORDER BY id;
id | name
---------------------------------------------------------------------
1 | user3
2 | user2
3 | user1
(3 rows)

--- clean up:
SET client_min_messages TO WARNING;
DROP SCHEMA schmshrd CASCADE;
DROP SCHEMA issue_8243 CASCADE;
RESET citus.next_shard_id;
RESET citus.shard_count;
RESET citus.shard_replication_factor;
Loading
Loading