Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 77 additions & 4 deletions src/backend/distributed/planner/insert_select_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
subqueryRte,
Oid *
selectPartitionColumnTableId);
static bool InsertPartitionColumnIsBatchPassThrough(Query *query,
RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte);
static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse,
ParamListInfo boundParams);
static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery);
Expand Down Expand Up @@ -771,14 +774,16 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
/* first apply toplevel pushdown checks to SELECT query */
error =
DeferErrorIfUnsupportedSubqueryPushdown(subquery, plannerRestrictionContext,
true);
true, AllowUnsafeInsertSelectPushdown)
;
if (error)
{
return error;
}

/* then apply subquery pushdown checks to SELECT query */
error = DeferErrorIfCannotPushdownSubquery(subquery, false);
error = DeferErrorIfCannotPushdownSubquery(subquery, false,
AllowUnsafeInsertSelectPushdown);
if (error)
{
return error;
Expand Down Expand Up @@ -821,11 +826,29 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
"table", NULL, NULL);
}

/*
* Ensure that INSERT's partition column comes from SELECT's partition
* column. Normally this requires a plain-Var partition-column match.
* With unsafe INSERT ... SELECT pushdown enabled we additionally accept
* a provably shard-local batch pass-through of the distribution column,
* i.e. unnest(array_agg(dist_key)); those values can only hash back into
* the current shard, so the co-location check further below keeps the
* batch shard-local. Any other derived distribution value is still
* rejected, because it could route rows that actually belong to a different
* shard.
*/
if (HasDistributionKey(targetRelationId))
{
/* ensure that INSERT's partition column comes from SELECT's partition column */
error = InsertPartitionColumnMatchesSelect(queryTree, insertRte, subqueryRte,
error = InsertPartitionColumnMatchesSelect(queryTree, insertRte,
subqueryRte,
&selectPartitionColumnTableId);
if (error && AllowUnsafeInsertSelectPushdown &&
InsertPartitionColumnIsBatchPassThrough(queryTree, insertRte,
subqueryRte))
{
error = NULL;
}

if (error)
{
return error;
Expand Down Expand Up @@ -1196,6 +1219,56 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
}


/*
* InsertPartitionColumnIsBatchPassThrough returns true if the SELECT target
* entry that feeds the INSERT's partition column is a provably shard-local
* unnest(array_agg(<partition column>)) batch pass-through. It mirrors the
* partition-column position mapping used by InsertPartitionColumnMatchesSelect
* and then defers the actual pattern check to IsBatchUnnestArrayAggPartitionColumn.
*/
static bool
InsertPartitionColumnIsBatchPassThrough(Query *query, RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte)
{
Oid insertRelationId = insertRte->relid;
Var *insertPartitionColumn = PartitionColumn(insertRelationId, 1);
Query *subquery = subqueryRte->subquery;

ListCell *targetEntryCell = NULL;
foreach(targetEntryCell, query->targetList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
List *insertTargetEntryColumnList = pull_var_clause_default((Node *) targetEntry);

if (list_length(insertTargetEntryColumnList) != 1)
{
continue;
}

Var *insertVar = (Var *) linitial(insertTargetEntryColumnList);

/* skip processing of target table non-partition columns */
if (targetEntry->resno != insertPartitionColumn->varattno)
{
continue;
}

if (insertVar->varattno > list_length(subquery->targetList))
{
return false;
}

TargetEntry *subqueryTargetEntry = list_nth(subquery->targetList,
insertVar->varattno - 1);

return IsBatchUnnestArrayAggPartitionColumn(subqueryTargetEntry->expr,
subquery);
}

return false;
}


/*
* InsertPartitionColumnMatchesSelect returns NULL the partition column in the
* table targeted by INSERTed matches with the any of the SELECTed table's
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/planner/merge_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,7 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
deferredError =
DeferErrorIfUnsupportedSubqueryPushdown(query,
plannerRestrictionContext,
true);
true, false);
if (deferredError)
{
ereport(DEBUG1, (errmsg("Sub-query is not pushable, try repartitioning")));
Expand Down
135 changes: 135 additions & 0 deletions src/backend/distributed/planner/multi_logical_optimizer.c
Original file line number Diff line number Diff line change
Expand Up @@ -4763,6 +4763,141 @@ IsPartitionColumn(Expr *columnExpression, Query *query, bool skipOuterVars)
}


/*
* IsBatchUnnestArrayAggPartitionColumn returns true if the given SELECT target
* expression is a provably shard-local "batch pass-through" of the distribution
* column, i.e. it has the shape
*
* unnest(array_agg(<partition column>))
*
* (optionally projected through one or more plain-Var subquery indirections, and
* with array_agg allowed to carry ORDER BY / DISTINCT / FILTER modifiers).
*
* Such an expression only ever emits distribution-column values that were read
* from rows of the current shard, so - given source and target are colocated -
* every produced value hashes back into this shard's range. That makes it safe
* to push a colocated INSERT ... SELECT down to the shards even though the
* distribution value is technically a derived expression rather than a plain
* Var. Any intermediate transformation (e.g. unnest(array_agg(dist_key + 1)) or
* unnest(f(array_agg(dist_key)))) is rejected because it could produce values
* that would hash to a different shard.
*/
bool
IsBatchUnnestArrayAggPartitionColumn(Expr *expr, Query *query)
{
Query *leafQuery = query;

/*
* Peel plain-Var subquery projection indirection down to the underlying
* expression. We only follow the simple "SELECT <col> FROM (subquery)"
* projection form; anything else makes us conservatively bail out.
*/
for (;;)
{
expr = (Expr *) strip_implicit_coercions((Node *) expr);

if (!IsA(expr, Var))
{
break;
}

Var *var = (Var *) expr;
if (var->varlevelsup != 0 || var->varattno <= InvalidAttrNumber)
{
return false;
}

if (var->varno <= 0 || var->varno > list_length(leafQuery->rtable))
{
return false;
}

RangeTblEntry *rte = rt_fetch(var->varno, leafQuery->rtable);
if (rte->rtekind != RTE_SUBQUERY)
{
return false;
}

Query *subquery = rte->subquery;
if (var->varattno > list_length(subquery->targetList))
{
return false;
}

TargetEntry *targetEntry = list_nth(subquery->targetList, var->varattno - 1);
expr = targetEntry->expr;
leafQuery = subquery;
}

/* the leaf expression must be unnest(...) over a single array argument */
if (!IsA(expr, FuncExpr))
{
return false;
}

FuncExpr *unnestExpr = (FuncExpr *) expr;
if (unnestExpr->funcid != F_UNNEST_ANYARRAY ||
list_length(unnestExpr->args) != 1)
{
return false;
}

/* the unnest argument must be array_agg(...) with no wrapping transform */
Expr *unnestArg = (Expr *) strip_implicit_coercions(
(Node *) linitial(unnestExpr->args));
if (!IsA(unnestArg, Aggref))
{
return false;
}

Aggref *arrayAgg = (Aggref *) unnestArg;
if (arrayAgg->aggfnoid != F_ARRAY_AGG_ANYNONARRAY &&
arrayAgg->aggfnoid != F_ARRAY_AGG_ANYARRAY)
{
return false;
}

/*
* Locate the single aggregated value argument. ORDER BY keys that differ
* from the aggregated value appear as additional resjunk target entries;
* they only reorder the (shard-local) values and do not affect routing.
*/
TargetEntry *aggValueTargetEntry = NULL;
TargetEntry *aggArgTargetEntry = NULL;
foreach_declared_ptr(aggArgTargetEntry, arrayAgg->args)
{
if (aggArgTargetEntry->resjunk)
{
continue;
}

if (aggValueTargetEntry != NULL)
{
/*
* array_agg (the OIDs we matched above) is a single-argument
* aggregate, so a second non-resjunk entry cannot occur for a
* well-formed tree. Assert to catch a broken invariant in debug
* builds, but still bail out gracefully in production: a false
* result only forgoes the optimization, it is never unsafe.
*/
Assert(false);
return false;
}

aggValueTargetEntry = aggArgTargetEntry;
}

if (aggValueTargetEntry == NULL)
{
return false;
}

/* the aggregated value must be the untransformed source partition column */
bool skipOuterVars = false;
return IsPartitionColumn(aggValueTargetEntry->expr, leafQuery, skipOuterVars);
}


/*
* FindReferencedTableColumn recursively traverses query tree to find actual relation
* id, and column that columnExpression refers to. If columnExpression is a
Expand Down
23 changes: 22 additions & 1 deletion src/backend/distributed/planner/multi_router_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,27 @@ AddPartitionKeyNotNullFilterToSelect(Query *subqery)
}
}

/*
* Normally the SELECT projects the distribution column as a plain Var. With
* unsafe INSERT ... SELECT pushdown the distribution column may instead be a
* provably shard-local batch pass-through, i.e. unnest(array_agg(dist_col)).
* In that case there is no plain Var to attach a NOT NULL filter to, and the
* batch stays shard-local, so the filter is unnecessary; skip it. Any other
* derived distribution value is rejected earlier during planning, so if we
* reach here without a plain-Var partition column it must be this pattern.
*/
if (targetPartitionColumnVar == NULL && AllowUnsafeInsertSelectPushdown)
{
TargetEntry *batchTargetEntry = NULL;
foreach_declared_ptr(batchTargetEntry, targetList)
{
if (IsBatchUnnestArrayAggPartitionColumn(batchTargetEntry->expr, subqery))
{
return;
}
}
}

/* we should have found target partition column */
Assert(targetPartitionColumnVar != NULL);

Expand Down Expand Up @@ -1342,7 +1363,7 @@ MultiShardUpdateDeleteSupported(Query *originalQuery,
errorMessage = DeferErrorIfUnsupportedSubqueryPushdown(
originalQuery,
plannerRestrictionContext,
true);
true, false);
}

return errorMessage;
Expand Down
Loading
Loading