Skip to content

Add citus.allow_unsafe_insert_select_pushdown for shard-local batched INSERT ... SELECT#8625

Draft
onurctirtir wants to merge 16 commits into
citusdata:mainfrom
onurctirtir:onurctirtir-automatic-broccoli
Draft

Add citus.allow_unsafe_insert_select_pushdown for shard-local batched INSERT ... SELECT#8625
onurctirtir wants to merge 16 commits into
citusdata:mainfrom
onurctirtir:onurctirtir-automatic-broccoli

Conversation

@onurctirtir

@onurctirtir onurctirtir commented Jun 29, 2026

Copy link
Copy Markdown
Member

DESCRIPTION: Adds citus.allow_unsafe_insert_select_pushdown to allow maybe unsafe shard-local batched INSERT .. SELECT ..

Some workloads want to call an expensive, batch-oriented UDF from a colocated
INSERT … SELECT — e.g. process_array(text[]), which returns something
per element, in order. The natural way to use it is to batch rows with array_agg
(+ a window function to form batches), call the UDF once per batch, then unnest
the result back to one row per input and insert it.

On a distributed table this SELECT looks like it "requires a merge" (e.g., GROUP
BY / window on a non-distribution column), so Citus today falls back to
pull-to-coordinator: it pulls every shard's rows to the coordinator, batches
there, and calls the UDF there. That defeats the purpose — the batching and the
expensive call should run on the shards.

What this adds

With citus.allow_unsafe_insert_select_pushdown (default off), for a
colocated INSERT … SELECT, it relaxes the merge-step checks that would
otherwise cause pull-to-coordinator, so the whole SELECT — grouping, window,
and the batch UDF — is pushed down and runs shard-local, one task per shard.

Concretely, when the GUC is on:

  • GROUP BY / aggregates / HAVING / window / DISTINCT on non-distribution columns
    no longer force a merge step — for the top-level SELECT and the subqueries.
  • The INSERT's distribution-column target entry no longer has to match a SELECT
    partition column as a plain Var. Instead it may be a provably shard-local
    batch pass-through of the distribution column
    , i.e.
    unnest(array_agg(dist_key)) (InsertPartitionColumnMatchesSelect is skipped
    only when this exact pattern is detected). Because those values are read
    straight from this shard's rows and — since source and target are colocated —
    hash right back into this shard's range, routing stays correct. The NOT NULL
    filter Citus normally injects on that column is skipped too, since there is no
    plain Var to attach it to.

However, the following are still enforced:

  • The distribution value must be the unnest(array_agg(dist_key)) pass-through
    (or a plain Var).
    Any other derived distribution value — e.g.
    dist_key + 1, length(text_col), or unnest(f(array_agg(dist_key))) — is
    still rejected even with the GUC on, because it could produce values that hash
    to a different shard and silently misroute rows.
  • Colocation of the INSERT target and all SELECT source relations is fully
    enforced. This is what keeps every batch shard-local, and is the reason the
    relaxation is sound rather than arbitrary.
  • Volatile functions are still rejected from pushdown (the batch UDF is
    expected to be immutable, so this ban is not relaxed).
  • LIMIT/OFFSET is still rejected.

Within those guardrails the user still takes responsibility for making each batch
order-preserving (e.g. array_agg(... ORDER BY ...)) so generated rows line up.

EXPLAIN — before / after

The batch shape: bucket rows into fixed-size batches with row_number()/batch_size,
array_agg each batch (id and text in the same order), call the batch UDF once
per batch, then unnest back to one row per id.

Default (off) — batching happens after pulling every shard's rows to the
coordinator, so the window scan runs a distributed subplan and the grouping +
UDF run in a single coordinator task over the intermediate results:

EXPLAIN (COSTS OFF) INSERT INTO res(text_id, val)
SELECT id, v FROM (
  SELECT
    unnest(array_agg(text_id  ORDER BY text_id)) id,
    unnest(process_array(array_agg(text_col ORDER BY text_id))) v
  FROM (
    SELECT text_id, text_col, (row_number() OVER () - 1) / 100 AS batch FROM dist
  ) q
  GROUP BY batch
) s;

 Custom Scan (Citus INSERT ... SELECT)
   INSERT/SELECT method: pull to coordinator
   ->  Custom Scan (Citus Adaptive)
         ->  Distributed Subplan XXX_1
               ->  WindowAgg
                     ->  Custom Scan (Citus Adaptive)
                           Task Count: 4
                           Tasks Shown: One of 4
                           ->  Task
                                 ->  Seq Scan on dist_14000000 dist
         Task Count: 1
         ->  Task
               ->  Subquery Scan on s
                     ->  ProjectSet
                           ->  GroupAggregate
                                 Group Key: intermediate_result.batch
                                 ->  Sort
                                       Sort Key: intermediate_result.batch, intermediate_result.text_id
                                       ->  Function Scan on read_intermediate_result intermediate_result

With citus.allow_unsafe_insert_select_pushdown = on — the window, grouping, the
batch UDF, and the INSERT all run shard-local, one task per shard, with no
distributed subplan / intermediate results:

 Custom Scan (Citus Adaptive)
   Task Count: 4
   Tasks Shown: One of 4
   ->  Task
         ->  Insert on res_14000004 citus_table_alias
               ->  Subquery Scan on s
                     ->  ProjectSet
                           ->  GroupAggregate
                                 Group Key: q.batch
                                 ->  Sort
                                       Sort Key: q.batch, q.text_id
                                       ->  Subquery Scan on q
                                             ->  WindowAgg
                                                   ->  Seq Scan on dist_14000000 dist

Onur Tirtir and others added 9 commits June 29, 2026 11:49
…tching pushdown

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@codecov

codecov Bot commented Jun 29, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 78.88889% with 19 lines in your changes missing coverage. Please review.
✅ Project coverage is 88.71%. Comparing base (efa65fc) to head (aa9725c).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #8625      +/-   ##
==========================================
- Coverage   88.73%   88.71%   -0.03%     
==========================================
  Files         288      288              
  Lines       64384    64446      +62     
  Branches     8108     8126      +18     
==========================================
+ Hits        57133    57175      +42     
- Misses       4909     4918       +9     
- Partials     2342     2353      +11     
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Onur Tirtir and others added 2 commits June 30, 2026 10:59
Fixes check-style ci/check_gucs_are_alphabetically_sorted.sh.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Wrap the two EXPLAIN statements in public.explain_filter(..., true) so the
PG18-only "Window:" line is stripped and the plan footer row counts match
across supported Postgres versions. Regenerate the expected output, which
also re-syncs blank lines that had drifted from the .sql.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@onurctirtir onurctirtir force-pushed the onurctirtir-automatic-broccoli branch from 7e67c16 to f6f01e7 Compare June 30, 2026 08:25
onurctirtir and others added 5 commits June 30, 2026 08:57
Validate the shard-local INSERT..SELECT batching pushdown plan directly on
PG18 (no public.explain_filter), so the PG18-only WindowAgg "Window:" line
is exercised in pg18.sql.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Cover each relaxed branch (volatile fn, derived insert dist col, GROUP BY
on non-dist col, aggregates and HAVING without GROUP BY, window not on
dist col, DISTINCT without dist col), a few combinations, and the same
constructs nested in subqueries. Add negative tests showing the GUC does
not apply to reference-table targets and never relaxes LIMIT/OFFSET.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The unsafe insert-select pushdown GUC no longer relaxes the volatile
function ban: a batched-embeddings style UDF is immutable, so the GUC only
needs to relax grouping / window / aggregate / DISTINCT on non-distribution
columns and partition-column matching. Volatile functions stay blocked from
shard pushdown and fall back to the coordinator plan as before.

- insert_select_planner.c: always defer on volatile functions
- shared_library_init.c: drop "volatile functions" from the GUC description
- tests: make batch_transform IMMUTABLE PARALLEL SAFE, replace the volatile
  EXPLAIN test with the batched-embeddings benchmark query shape
  (row_number()/batch_size bucketing + array_agg + batch UDF + unnest zip-back)
  as the SELECT of an INSERT .. SELECT, with an id<->val correctness check,
  and a negative test showing a volatile function is still not pushed down
  even with the GUC enabled.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
PG18 emits 'Insert on res_... citus_table_alias' for the shard-local INSERT..SELECT pushdown plan; the committed pg18.out was missing the alias, causing the pg18 test (and all Test flakyness shards) to fail on CI. Match CI output.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant