Skip to content

Add citus.allow_unsafe_insert_select_pushdown for shard-local batched INSERT ... SELECT#8625

Draft
onurctirtir wants to merge 25 commits into
citusdata:mainfrom
onurctirtir:onurctirtir-automatic-broccoli
Draft

Add citus.allow_unsafe_insert_select_pushdown for shard-local batched INSERT ... SELECT#8625
onurctirtir wants to merge 25 commits into
citusdata:mainfrom
onurctirtir:onurctirtir-automatic-broccoli

Conversation

@onurctirtir

@onurctirtir onurctirtir commented Jun 29, 2026

Copy link
Copy Markdown
Member

DESCRIPTION: Adds citus.allow_unsafe_insert_select_pushdown to allow maybe unsafe shard-local batched INSERT .. SELECT ..

Some workloads want to call an expensive, batch-oriented UDF from a colocated
INSERT … SELECT — e.g. process_array(text[]), which returns something
per element, in order. The natural way to use it is to batch rows with array_agg
(+ a window function to form batches), call the UDF once per batch, then unnest
the result back to one row per input and insert it.

On a distributed table this SELECT looks like it "requires a merge" (e.g., GROUP
BY / window on a non-distribution column), so Citus today falls back to
pull-to-coordinator: it pulls every shard's rows to the coordinator, batches
there, and calls the UDF there. That defeats the purpose — the batching and the
expensive call should run on the shards.

What this adds

With citus.allow_unsafe_insert_select_pushdown (default off), for a
colocated INSERT … SELECT, it relaxes the merge-step checks that would
otherwise cause pull-to-coordinator, so the whole SELECT — grouping, window,
and the batch UDF — is pushed down and runs shard-local, one task per shard.

Concretely, when the GUC is on:

  • GROUP BY / aggregates / HAVING / window / DISTINCT on non-distribution columns
    no longer force a merge step — for the top-level SELECT and the subqueries.
  • The INSERT's distribution-column target entry no longer has to match a SELECT
    partition column as a plain Var. Instead it may be a provably shard-local
    batch pass-through of the distribution column
    , i.e.
    unnest(array_agg(dist_key)) (InsertPartitionColumnMatchesSelect is skipped
    only when this exact pattern is detected). Because those values are read
    straight from this shard's rows and — since source and target are colocated —
    hash right back into this shard's range, routing stays correct. The NOT NULL
    filter Citus normally injects on that column is re-added at runtime (see the
    safety net below) so a NULL-padded row can never be misrouted.

However, the following are still enforced:

  • The distribution value must be the unnest(array_agg(dist_key)) pass-through
    (or a plain Var).
    Any other derived distribution value — e.g.
    dist_key + 1, length(text_col), or unnest(f(array_agg(dist_key))) — is
    still rejected even with the GUC on, because it could produce values that hash
    to a different shard and silently misroute rows.
  • The pass-through array_agg must emit one value per group row. DISTINCT
    or FILTER (...) on that aggregate — e.g.
    unnest(array_agg(dist_key) FILTER (WHERE ...)) or
    unnest(array_agg(DISTINCT dist_key)) — can emit fewer values than the batch
    has rows. These shapes are rejected and fall back to a coordinator merge, which
    re-routes each row and raises the usual not-NULL partition-column error instead
    of corrupting data. (ORDER BY inside array_agg stays allowed — it only
    reorders values, never drops them.)
  • Colocation of the INSERT target and all SELECT source relations is fully
    enforced. This is what keeps every batch shard-local, and is the reason the
    relaxation is sound rather than arbitrary.
  • Volatile functions are still rejected from pushdown (the batch UDF is
    expected to be immutable, so this ban is not relaxed).
  • LIMIT/OFFSET is still rejected.

Safety net: drop NULL (mis-routed) partition keys at runtime

Even for the accepted unnest(array_agg(dist_key)) pass-through, a sibling
set-returning target in the same projection can be longer than the
distribution-column set — e.g. an over-emitting batch UDF that returns more
elements than its input. PostgreSQL's ProjectSet then NULL-pads the shorter
distribution-column set, which would insert a NULL (mis-routed) partition key.

Because the batch path skips the NOT NULL filter Citus normally injects, the
pushdown now re-adds a distribution-column IS NOT NULL qual to the shard SELECT
so those NULL-padded rows are dropped, matching how a normal INSERT … SELECT
handles a NULL distribution value. Both shapes are covered:

  • outer-subquery shape — the distribution column surfaces as a plain Var
    (the unnest lives in an inner subquery), so the qual is attached to the
    pushed-down SELECT in place.
  • flat shape — the distribution column is the bare unnest(...)
    set-returning expression projected directly in the SELECT list, so there is no
    Var handle for a WHERE clause. The SELECT is wrapped in a pure pass-through
    subquery (citus_insert_select_subquery) that turns the column into an ordinary
    Var, then filtered. The wrapper never lifts expressions to the outer query, so
    the batch call keeps running on the shard.

Within those guardrails the user still takes responsibility for making each batch
order-preserving (e.g. array_agg(... ORDER BY ...)) so generated rows line up.

EXPLAIN — before / after

The batch shape: bucket rows into fixed-size batches with row_number()/batch_size,
array_agg each batch (id and text in the same order), call the batch UDF once
per batch, then unnest back to one row per id.

Default (off) — batching happens after pulling every shard's rows to the
coordinator, so the window scan runs a distributed subplan and the grouping +
UDF run in a single coordinator task over the intermediate results:

EXPLAIN (COSTS OFF) INSERT INTO res(text_id, val)
SELECT id, v FROM (
  SELECT
    unnest(array_agg(text_id  ORDER BY text_id)) id,
    unnest(process_array(array_agg(text_col ORDER BY text_id))) v
  FROM (
    SELECT text_id, text_col, (row_number() OVER () - 1) / 100 AS batch FROM dist
  ) q
  GROUP BY batch
) s;

 Custom Scan (Citus INSERT ... SELECT)
   INSERT/SELECT method: pull to coordinator
   ->  Custom Scan (Citus Adaptive)
         ->  Distributed Subplan XXX_1
               ->  WindowAgg
                     ->  Custom Scan (Citus Adaptive)
                           Task Count: 4
                           Tasks Shown: One of 4
                           ->  Task
                                 ->  Seq Scan on dist_14000000 dist
         Task Count: 1
         ->  Task
               ->  Subquery Scan on s
                     ->  ProjectSet
                           ->  GroupAggregate
                                 Group Key: intermediate_result.batch
                                 ->  Sort
                                       Sort Key: intermediate_result.batch, intermediate_result.text_id
                                       ->  Function Scan on read_intermediate_result intermediate_result

With citus.allow_unsafe_insert_select_pushdown = on — the window, grouping, the
batch UDF, and the INSERT all run shard-local, one task per shard, with no
distributed subplan / intermediate results. The distribution-column
IS NOT NULL safety filter appears on the shard SELECT:

 Custom Scan (Citus Adaptive)
   Task Count: 4
   Tasks Shown: One of 4
   ->  Task
         ->  Insert on res_14000004 citus_table_alias
               ->  Subquery Scan on s
                     Filter: (s.id IS NOT NULL)
                     ->  ProjectSet
                           ->  GroupAggregate
                                 Group Key: q.batch
                                 ->  Sort
                                       Sort Key: q.batch, q.text_id
                                       ->  Subquery Scan on q
                                             ->  WindowAgg
                                                   ->  Seq Scan on dist_14000000 dist

Onur Tirtir and others added 9 commits June 29, 2026 11:49
…tching pushdown

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@codecov

codecov Bot commented Jun 29, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 90.00000% with 21 lines in your changes missing coverage. Please review.
✅ Project coverage is 88.72%. Comparing base (efa65fc) to head (e544103).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #8625      +/-   ##
==========================================
- Coverage   88.73%   88.72%   -0.02%     
==========================================
  Files         288      288              
  Lines       64384    64499     +115     
  Branches     8108     8136      +28     
==========================================
+ Hits        57133    57228      +95     
- Misses       4909     4917       +8     
- Partials     2342     2354      +12     
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Onur Tirtir and others added 2 commits June 30, 2026 10:59
Fixes check-style ci/check_gucs_are_alphabetically_sorted.sh.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Wrap the two EXPLAIN statements in public.explain_filter(..., true) so the
PG18-only "Window:" line is stripped and the plan footer row counts match
across supported Postgres versions. Regenerate the expected output, which
also re-syncs blank lines that had drifted from the .sql.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@onurctirtir onurctirtir force-pushed the onurctirtir-automatic-broccoli branch from 7e67c16 to f6f01e7 Compare June 30, 2026 08:25
onurctirtir and others added 5 commits June 30, 2026 08:57
Validate the shard-local INSERT..SELECT batching pushdown plan directly on
PG18 (no public.explain_filter), so the PG18-only WindowAgg "Window:" line
is exercised in pg18.sql.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Cover each relaxed branch (volatile fn, derived insert dist col, GROUP BY
on non-dist col, aggregates and HAVING without GROUP BY, window not on
dist col, DISTINCT without dist col), a few combinations, and the same
constructs nested in subqueries. Add negative tests showing the GUC does
not apply to reference-table targets and never relaxes LIMIT/OFFSET.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The unsafe insert-select pushdown GUC no longer relaxes the volatile
function ban: a batched-embeddings style UDF is immutable, so the GUC only
needs to relax grouping / window / aggregate / DISTINCT on non-distribution
columns and partition-column matching. Volatile functions stay blocked from
shard pushdown and fall back to the coordinator plan as before.

- insert_select_planner.c: always defer on volatile functions
- shared_library_init.c: drop "volatile functions" from the GUC description
- tests: make batch_transform IMMUTABLE PARALLEL SAFE, replace the volatile
  EXPLAIN test with the batched-embeddings benchmark query shape
  (row_number()/batch_size bucketing + array_agg + batch UDF + unnest zip-back)
  as the SELECT of an INSERT .. SELECT, with an id<->val correctness check,
  and a negative test showing a volatile function is still not pushed down
  even with the GUC enabled.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
PG18 emits 'Insert on res_... citus_table_alias' for the shard-local INSERT..SELECT pushdown plan; the committed pg18.out was missing the alias, causing the pg18 test (and all Test flakyness shards) to fail on CI. Match CI output.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@onurctirtir

onurctirtir commented Jul 1, 2026

Copy link
Copy Markdown
Member Author

Copilot Suggestion


⚠️ Blocking correctness issue: pushed-down batch pass-through can silently insert NULL distribution keys

The PR treats unnest(array_agg(dist_key)) as a safe shard-local pass-through and then skips AddPartitionKeyNotNullFilterToSelect() for that shape. But PostgreSQL ProjectSet pads shorter set-returning targetlist expressions with NULL when sibling SRFs emit more rows. That means the target distribution column can become NULL even though direct Citus inserts reject NULL partition keys.

Minimal repro from an isolated ~/citus-iso/pr8625-review cluster:

SET citus.allow_unsafe_insert_select_pushdown TO on;
TRUNCATE res;

EXPLAIN (COSTS OFF)
INSERT INTO res(k, val)
SELECT unnest(array_agg(k ORDER BY k)),
       unnest(array_append(array_agg(v ORDER BY k), 'extra-value'))
FROM dist_a
GROUP BY grp;

INSERT INTO res(k, val)
SELECT unnest(array_agg(k ORDER BY k)),
       unnest(array_append(array_agg(v ORDER BY k), 'extra-value'))
FROM dist_a
GROUP BY grp;

SELECT count(*) AS total,
       count(*) FILTER (WHERE k IS NULL) AS null_keys
FROM res;

Observed:

Custom Scan (Citus Adaptive)
  Task Count: 4
  -> Insert on res_102016 citus_table_alias
     -> ProjectSet
        -> GroupAggregate

INSERT 0 106
 total | null_keys
-------+-----------
   106 |        26

Direct coordinator routing still rejects the same invalid key shape:

INSERT INTO res(k, val) VALUES (NULL, 'direct-null-still-rejected');
-- ERROR: cannot perform an INSERT with NULL in the partition column

A second variant also reproduces it:

INSERT INTO res(k, val)
SELECT unnest(array_agg(k ORDER BY k) FILTER (WHERE k % 2 = 0)),
       unnest(array_agg(v ORDER BY k))
FROM dist_a
GROUP BY grp;
-- INSERT 0 80; 40 NULL distribution keys written

Why this matters: the safety argument says the distribution value is read straight from the shard's source rows and hashes back to the same shard. That is false for padded NULL rows produced by ProjectSet. It also bypasses Citus' normal "no NULL partition column" insert invariant unless the user happened to declare the distribution column NOT NULL, in which case the worker shard constraint errors after pushdown.

Relevant code paths:

  • multi_logical_optimizer.c: IsBatchUnnestArrayAggPartitionColumn() documents the expression as “provably shard-local” and explicitly allows ORDER BY / DISTINCT / FILTER modifiers.
  • multi_router_planner.c: AddPartitionKeyNotNullFilterToSelect() skips the partition-key IS NOT NULL filter when the batch pass-through pattern is found.

Recommendation: don't merge until this invariant is addressed. At minimum, add regression tests for value-side extra rows and distribution-side FILTER/shorter SRF. Design-wise, the implementation needs a way to preserve the “no NULL partition key” invariant for batch pass-through; simply trusting unnest(array_agg(dist_key)) is not enough because sibling SRFs can synthesize NULLs at the output row level.

@onurctirtir

This comment was marked as resolved.

@onurctirtir

This comment was marked as resolved.

@onurctirtir

This comment was marked as resolved.

@onurctirtir

This comment was marked as resolved.

Onur Tirtir and others added 7 commits July 2, 2026 11:25
Extract the shared INSERT target-list walk that maps the target table's
distribution column to the SELECT (subquery) target entry into a single
helper, SelectTargetEntryForInsertPartitionColumn(). Both
InsertPartitionColumnMatchesSelect() and
InsertPartitionColumnIsBatchPassThrough() previously duplicated this
lookup; they now call the helper.

The helper optionally reports the matching INSERT target entry so
InsertPartitionColumnMatchesSelect() can still inspect casting on the
distribution column. Pure refactor; no behavior change.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
AddPartitionKeyNotNullFilterToSelect() previously consulted the global
GUC AllowUnsafeInsertSelectPushdown (and re-walked the target list) to
decide whether to skip the NOT NULL filter for a batch pass-through
distribution column. That coupled a low-level deparse/router helper to a
planner-level GUC and re-derived a fact the caller already knows.

Instead, take a bool distributionColumnIsBatchPassThrough parameter and
skip the filter when it is set and there is no plain-Var partition
column. Both callers (RouterModifyTaskForShardInterval and
deparse_shard_query) compute the flag via the now-exported
InsertPartitionColumnIsBatchPassThrough(), which inspects the query
shape directly. A batch-shape query with the GUC off is rejected during
planning and never reaches these call sites, so the query-shape
predicate is the correct signal here.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
IsBatchUnnestArrayAggPartitionColumn() lived in multi_logical_optimizer.c
and was exported, but after the previous commit its only caller is
InsertPartitionColumnIsBatchPassThrough() in insert_select_planner.c.

Move it next to its caller as a file-local static and rename it to
IsInsertSelectBatchPassThroughDistributionColumn(), which better
describes what it checks and where it is used. Drop the header export and
add utils/fmgroids.h to insert_select_planner.c for the array_agg/unnest
function OIDs. No behavior change.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Several comments described unnest(array_agg(dist_key)) as "provably
shard-local". That overstates what the shape check proves: it only
validates the distribution-column expression, and its safety argument
holds only if the row reaching the INSERT actually carries one of the
original source-row distribution values rather than a targetlist/SRF
artifact (e.g. a NULL-padded row from a set-returning function).

Reword the acceptance-site comment, the moved predicate's doc, the
InsertPartitionColumnIsBatchPassThrough doc, and the router NOT NULL
filter comment to state the precise invariant and explicitly call out
the unguarded SRF NULL-padding gap as the reason the GUC is unsafe.
Comments only; no behavior change.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
SelectTargetEntryForInsertPartitionColumn() (added when de-duplicating the
INSERT distribution-column lookup) unconditionally dereferences the target
table's distribution column via PartitionColumn(insertRelationId, 1). For
single-shard (null distribution key) target tables that helper returns NULL,
so the subsequent insertPartitionColumn->varattno access segfaulted the
backend.

This path is reached from RouterModifyTaskForShardInterval when a shard-key
source is inserted into a single-shard target (e.g.
INSERT INTO nullkey_c1_t1 SELECT * FROM range_table), because
InsertPartitionColumnIsBatchPassThrough() is now computed unconditionally at
the call site to feed AddPartitionKeyNotNullFilterToSelect(). The earlier
InsertPartitionColumnMatchesSelect() caller was guarded by
HasDistributionKey(targetRelationId), which is why the crash only surfaced on
the batch pass-through path.

Return NULL early when the target has no distribution column. This restores
the pre-refactor behavior: the batch pass-through check reports false and the
plain-Var NOT NULL filter is still added as before.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Our convention is to define file-local helper functions after the functions
that call them. SelectTargetEntryForInsertPartitionColumn and
IsInsertSelectBatchPassThroughDistributionColumn were defined before their
callers InsertPartitionColumnIsBatchPassThrough and
InsertPartitionColumnMatchesSelect. Move both helpers after the callers.
Pure reordering; no behavior change.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
onurctirtir and others added 2 commits July 3, 2026 16:56
The shard-local batch pass-through unnest(array_agg(<dist col>)) is only
correct when the array_agg emits exactly one value per group row. A DISTINCT
or FILTER clause on that aggregate can emit fewer values, so when a sibling
set-returning target in the same projection is longer PostgreSQL ProjectSet
NULL-pads the shorter distribution-column set. Because the batch path skips
the NOT NULL partition-column filter, that NULL (mis-routed) distribution key
was inserted silently.

Reject aggdistinct/aggfilter in IsInsertSelectBatchPassThroughDistributionColumn
so such a shape is no longer recognized as a pass-through: the INSERT..SELECT
falls back to the coordinator merge, which re-routes each row and raises the
usual not-NULL partition-column error instead of corrupting data. ORDER BY
inside array_agg stays allowed (it only reorders, never drops, values).

Add regression coverage: FILTER and DISTINCT on the distribution-column
array_agg now fall back to a coordinator merge under the GUC, and the FILTER
shape run for real raises the not-NULL partition-column error.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
When a sibling set-returning target in the batch projection (e.g. an
over-emitting batch UDF) is longer than the distribution-column
unnest(array_agg(...)), PostgreSQL ProjectSet NULL-pads the shorter
distribution-column set. The batch pushdown path skips
AddPartitionKeyNotNullFilterToSelect, so such a NULL (mis-routed)
partition key would otherwise be inserted silently.

AddBatchPassThroughNotNullFilter now appends a distribution-column
IS NOT NULL qual to the pushed-down SELECT for both shapes: the
outer-subquery shape (distribution column is a plain Var, filtered in
place) and the flat shape (distribution column is the bare unnest
set-returning expression, wrapped in a pass-through subquery so it
becomes a filterable Var). Adds regression coverage asserting no NULL
key is inserted while every real key keeps its value.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@onurctirtir onurctirtir force-pushed the onurctirtir-automatic-broccoli branch from 7260e1f to e544103 Compare July 3, 2026 18:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant