Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -372,13 +373,13 @@ public static List<RelCollation> sort(RelCollation collation) {
/** Helper method to determine a
* {@link org.apache.calcite.rel.core.Window}'s collation.
*
* <p>A Window projects the fields of its input first, followed by the output
* from each of its windows. Assuming (quite reasonably) that the
* implementation does not re-order its input rows, then any collations of its
* input are preserved. */
* <p>A Window operator groups rows by PARTITION BY keys and sorts each
* partition by ORDER BY keys. The output order is therefore not defined by
* a simple collation in the general case, so we conservatively report no
* collations. */
public static @Nullable List<RelCollation> window(RelMetadataQuery mq, RelNode input,

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for modifying RelMdCollation.window is that the original window sorting derivation was too optimistic, which would cause the optimizer to mistakenly believe that the window output retained the input order, thus mistakenly deleting the top-level Sort.
The original implementation had the following problem:

  1. Previously, RelMdCollation.window directly returned mq.collations(input), meaning "the window operator will preserve the order of the input rows as is." However, the actual implementation of EnumerableWindow first groups the rows by the PARTITION BY key using SortedMultiMap, and then sorts them within each group by the window ORDER BY key. Therefore, the input order is not preserved; the global output order is PARTITION BY keys + ORDER BY keys, not simply the input order.
    This caused the top-level Sort to be incorrectly optimized away.

  2. When order by empno is written in the SQL, if the window also happens to be sorted by empno, the optimizer will mistakenly assume that the window output is globally ordered, thus deleting the top-level EnumerableSort. The resulting output is grouped by deptno, not sorted by empno.

ImmutableList<Window.Group> groups) {
return mq.collations(input);
return Collections.emptyList();
}

/** Helper method to determine a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Calc;
import org.apache.calcite.rel.core.Project;
Expand Down Expand Up @@ -52,6 +53,7 @@

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -261,6 +263,7 @@ static class WindowedAggRelSplitter extends CalcRelSplitter {
RelBuilder relBuilder, RelNode input, RexProgram program, List<RelHint> hints) {
checkArgument(program.getCondition() == null,
"WindowedAggregateRel cannot accept a condition");
traitSet = traitSet.replaceIfs(RelCollationTraitDef.INSTANCE, Collections::emptyList);
return LogicalWindow.create(cluster, traitSet, relBuilder, input,
program, hints);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2504,6 +2504,11 @@ private RexNode convertOver(Blackboard bb, SqlNode node) {
SqlCall call = (SqlCall) node;
bb.getValidator().deriveType(bb.scope, call);
SqlCall aggCall = call.operand(0);
@Nullable SqlNode filter = null;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rarely see the need for @nullable in code.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to satisfy the checkframe null-safety check; otherwise, the calling method would need to be modified, whereas here, simply adding an annotation resolves the issue.

if (aggCall.getKind() == SqlKind.FILTER) {
filter = aggCall.operand(1);
aggCall = aggCall.operand(0);
}
boolean ignoreNulls = false;
switch (aggCall.getKind()) {
case IGNORE_NULLS:
Expand All @@ -2515,6 +2520,22 @@ private RexNode convertOver(Blackboard bb, SqlNode node) {
default:
break;
}
if (filter != null) {
final SqlOperator op = aggCall.getOperator();
if (op instanceof SqlAggFunction
&& !((SqlAggFunction) op).requiresOver()) {
// FILTER on a windowed aggregate can be implemented by wrapping the
// aggregate arguments in CASE expressions, because true aggregates
// ignore NULL inputs. This does not work for window value functions
// (FIRST_VALUE, LAST_VALUE, NTH_VALUE, LEAD, LAG, etc.) which do not
// ignore NULL inputs.
aggCall = applyFilterToAggArgs(aggCall, filter);
bb.getValidator().deriveType(bb.scope, aggCall);
} else {
throw new UnsupportedOperationException(
"FILTER clause is not supported for window function " + op.getName());
}
}

SqlNode windowOrRef = call.operand(1);
final SqlWindow window =
Expand Down Expand Up @@ -2609,6 +2630,47 @@ private RexNode convertOver(Blackboard bb, SqlNode node) {
}
}

/**
* Applies a FILTER clause to the arguments of an aggregate call by wrapping
* each argument in a CASE expression. For example,
* {@code SUM(sal) FILTER (WHERE comm IS NOT NULL)} becomes
* {@code SUM(CASE WHEN comm IS NOT NULL THEN sal END)}.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure if the semantics of functions like FIRST_VALUE, LAST_VALUE, NTH_VALUE, LEAD, and LAG are correct after the rewrite.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your concern is valid. The rewrite is only semantically correct for true aggregate functions like SUM, COUNT, AVG, MIN, and MAX, because those functions ignore NULL inputs.

However filter does not hold for window value functions such as FIRST_VALUE, LAST_VALUE, NTH_VALUE, LEAD, and LAG. such as sql in postgresql would error out:

SELECT
  ename,
  job,
  FIRST_VALUE(sal) FILTER (WHERE job = 'MANAGER') OVER (ORDER BY hiredate, ename) AS first_mgr_sal
FROM emp
ORDER BY hiredate, ename;
psql:commands.sql:81: ERROR:  FILTER is not implemented for non-aggregate window functions
LINE 4:   FIRST_VALUE(sal) FILTER (WHERE job = 'MANAGER') OVER (ORDE...

Therefore, based on the tests conducted so far, I believe there are no issues.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a related test?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*
* <p>This transformation preserves the semantics of the FILTER clause for
* windowed aggregates: rows that do not satisfy the filter contribute NULL
* and are ignored by the aggregate function.
*/
private static SqlCall applyFilterToAggArgs(SqlCall aggCall, SqlNode filter) {
final SqlOperator op = aggCall.getOperator();
final List<SqlNode> operands = aggCall.getOperandList();
final SqlParserPos pos = aggCall.getParserPosition();
final SqlLiteral quantifier = aggCall.getFunctionQuantifier();
final List<SqlNode> newOperands = new ArrayList<>(operands.size());
if (op == SqlStdOperatorTable.COUNT
&& operands.size() == 1
&& operands.get(0) instanceof SqlIdentifier
&& ((SqlIdentifier) operands.get(0)).isStar()) {
// COUNT(*) FILTER (WHERE x) => COUNT(CASE WHEN x THEN 0 END)
newOperands.add(
new SqlCase(pos, null, SqlNodeList.of(filter),
SqlNodeList.of(SqlLiteral.createExactNumeric("0", pos)),
SqlLiteral.createNull(pos)));
} else {
for (SqlNode operand : operands) {
if (operand instanceof SqlIdentifier
&& ((SqlIdentifier) operand).isStar()) {
newOperands.add(operand);
} else {
newOperands.add(
new SqlCase(pos, null, SqlNodeList.of(filter),
SqlNodeList.of(operand),
SqlLiteral.createNull(pos)));
}
}
}
return op.createCall(quantifier, pos, newOperands);
}

protected void convertFrom(
Blackboard bb,
@Nullable SqlNode from) {
Expand Down
22 changes: 12 additions & 10 deletions core/src/test/resources/sql/sub-query.iq
Original file line number Diff line number Diff line change
Expand Up @@ -2430,11 +2430,12 @@ EnumerableCalc(expr#0..5=[{inputs}], expr#6=[RAND()], expr#7=[CAST($t6):INTEGER
EnumerableSort(sort0=[$2], dir0=[ASC])
EnumerableCalc(expr#0..7=[{inputs}], EMPNO=[$t0], SAL=[$t5], DEPTNO=[$t7])
EnumerableTableScan(table=[[scott, EMP]])
EnumerableSort(sort0=[$1], dir0=[ASC])
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[false], expr#3=[1], expr#4=[<=($t1, $t3)], cs=[$t2], DEPTNO=[$t0], rn=[$t1], $condition=[$t4])
EnumerableWindow(window#0=[window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])], constants=[[false]])
EnumerableCalc(expr#0..2=[{inputs}], DEPTNO=[$t0])
EnumerableTableScan(table=[[scott, DEPT]])
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[<=($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
EnumerableSort(sort0=[$1], dir0=[ASC])
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[false], cs=[$t2], DEPTNO=[$t0], rn=[$t1])
EnumerableWindow(window#0=[window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])], constants=[[false]])
EnumerableCalc(expr#0..2=[{inputs}], DEPTNO=[$t0])
EnumerableTableScan(table=[[scott, DEPT]])
!plan
!}

Expand Down Expand Up @@ -2540,11 +2541,12 @@ EnumerableCalc(expr#0..5=[{inputs}], expr#6=[NOT($t3)], expr#7=[IS NOT NULL($t3)
EnumerableSort(sort0=[$2], dir0=[ASC])
EnumerableCalc(expr#0..7=[{inputs}], EMPNO=[$t0], SAL=[$t5], DEPTNO=[$t7])
EnumerableTableScan(table=[[scott, EMP]])
EnumerableSort(sort0=[$1], dir0=[ASC])
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[false], expr#3=[1], expr#4=[<=($t1, $t3)], cs=[$t2], DEPTNO=[$t0], rn=[$t1], $condition=[$t4])
EnumerableWindow(window#0=[window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])], constants=[[false]])
EnumerableCalc(expr#0..2=[{inputs}], DEPTNO=[$t0])
EnumerableTableScan(table=[[scott, DEPT]])
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[<=($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about the motivation behind this update?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix altered the sorting metadata derivation for the window operator, resulting in a legitimate change to the execution plan structure of two existing !plan tests, but the computation results remain unchanged.

Specific changes: Previously, the collation derivation for EnumerableWindow was optimistic, claiming to retain the input sorting. With CalcRelSplitter inheriting the original Calc's collation, the optimizer would push Sort down onto the Window and merge it with the outer Calc. Therefore, the original plan was:

EnumerableSort
  EnumerableCalc(condition + window output)
      EnumerableWindow

After the fix, EnumerableWindow no longer claims any sorting, and CalcRelSplitter no longer inherits the contaminated collation. Sort can only stop at the Window, wrapped in an outer Calc for projection. The plan becomes:

EnumerableCalc(projection)
  EnumerableSort
    EnumerableCalc(condition)
      EnumerableWindow

Both are logically equivalent; only the relative positions of Sort and the projected Calc have changed. Therefore, only the expected output of these two !plan blocks was updated, without modifying any result data.This also ensures complete consistency with the PostgreSQL sorting results.

EnumerableSort(sort0=[$1], dir0=[ASC])
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[false], cs=[$t2], DEPTNO=[$t0], rn=[$t1])
EnumerableWindow(window#0=[window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])], constants=[[false]])
EnumerableCalc(expr#0..2=[{inputs}], DEPTNO=[$t0])
EnumerableTableScan(table=[[scott, DEPT]])
!plan
!}

Expand Down
124 changes: 80 additions & 44 deletions core/src/test/resources/sql/winagg.iq
Original file line number Diff line number Diff line change
Expand Up @@ -1186,20 +1186,20 @@ order by empno;
+-------+--------+----------------+
| EMPNO | DEPTNO | FILTERED_COUNT |
+-------+--------+----------------+
| 7369 | 20 | 0 |
| 7566 | 20 | 5 |
| 7788 | 20 | 5 |
| 7876 | 20 | 0 |
| 7902 | 20 | 5 |
| 7782 | 10 | 3 |
| 7839 | 10 | 3 |
| 7934 | 10 | 0 |
| 7499 | 30 | 6 |
| 7521 | 30 | 0 |
| 7654 | 30 | 0 |
| 7698 | 30 | 6 |
| 7844 | 30 | 0 |
| 7900 | 30 | 0 |
| 7369 | 20 | 3 |
| 7499 | 30 | 2 |
| 7521 | 30 | 2 |
| 7566 | 20 | 3 |
| 7654 | 30 | 2 |
| 7698 | 30 | 2 |
| 7782 | 10 | 2 |
| 7788 | 20 | 3 |
| 7839 | 10 | 2 |
| 7844 | 30 | 2 |
| 7876 | 20 | 3 |
| 7900 | 30 | 2 |
| 7902 | 20 | 3 |
| 7934 | 10 | 2 |
+-------+--------+----------------+
(14 rows)

Expand All @@ -1214,19 +1214,19 @@ order by empno;
| EMPNO | DEPTNO | FILTERED_SUM |
+-------+--------+--------------+
| 7369 | 20 | |
| 7499 | 30 | 5600.00 |
| 7521 | 30 | 5600.00 |
| 7566 | 20 | |
| 7654 | 30 | 5600.00 |
| 7698 | 30 | 5600.00 |
| 7782 | 10 | |
| 7788 | 20 | |
| 7839 | 10 | |
| 7844 | 30 | 5600.00 |
| 7876 | 20 | |
| 7900 | 30 | 5600.00 |
| 7902 | 20 | |
| 7782 | 10 | |
| 7839 | 10 | |
| 7934 | 10 | |
| 7499 | 30 | 9400.00 |
| 7521 | 30 | 9400.00 |
| 7654 | 30 | 9400.00 |
| 7698 | 30 | |
| 7844 | 30 | 9400.00 |
| 7900 | 30 | |
+-------+--------+--------------+
(14 rows)

Expand All @@ -1241,20 +1241,20 @@ order by empno;
+-------+--------+----------------+-------------+
| EMPNO | DEPTNO | HIGH_SAL_COUNT | LOW_SAL_SUM |
+-------+--------+----------------+-------------+
| 7369 | 20 | 0 | 10875.00 |
| 7566 | 20 | 5 | |
| 7788 | 20 | 5 | |
| 7876 | 20 | 0 | 10875.00 |
| 7902 | 20 | 5 | |
| 7782 | 10 | 3 | |
| 7839 | 10 | 3 | |
| 7934 | 10 | 0 | 8750.00 |
| 7499 | 30 | 6 | |
| 7521 | 30 | 0 | 9400.00 |
| 7654 | 30 | 0 | 9400.00 |
| 7698 | 30 | 6 | |
| 7844 | 30 | 0 | 9400.00 |
| 7900 | 30 | 0 | 9400.00 |
| 7369 | 20 | 3 | 1900.00 |
| 7499 | 30 | 2 | 4950.00 |
| 7521 | 30 | 2 | 4950.00 |
| 7566 | 20 | 3 | 1900.00 |
| 7654 | 30 | 2 | 4950.00 |
| 7698 | 30 | 2 | 4950.00 |
| 7782 | 10 | 2 | 1300.00 |
| 7788 | 20 | 3 | 1900.00 |
| 7839 | 10 | 2 | 1300.00 |
| 7844 | 30 | 2 | 4950.00 |
| 7876 | 20 | 3 | 1900.00 |
| 7900 | 30 | 2 | 4950.00 |
| 7902 | 20 | 3 | 1900.00 |
| 7934 | 10 | 2 | 1300.00 |
+-------+--------+----------------+-------------+
(14 rows)

Expand All @@ -1269,22 +1269,58 @@ order by empno;
| EMPNO | DEPTNO | SAL | RUNNING_SUM |
+-------+--------+---------+-------------+
| 7369 | 20 | 800.00 | |
| 7566 | 20 | 2975.00 | 3775.00 |
| 7788 | 20 | 3000.00 | 6775.00 |
| 7876 | 20 | 1100.00 | 7875.00 |
| 7902 | 20 | 3000.00 | 10875.00 |
| 7782 | 10 | 2450.00 | 2450.00 |
| 7839 | 10 | 5000.00 | 7450.00 |
| 7934 | 10 | 1300.00 | 8750.00 |
| 7499 | 30 | 1600.00 | 1600.00 |
| 7521 | 30 | 1250.00 | 2850.00 |
| 7566 | 20 | 2975.00 | 2975.00 |
| 7654 | 30 | 1250.00 | 4100.00 |
| 7698 | 30 | 2850.00 | 6950.00 |
| 7782 | 10 | 2450.00 | 2450.00 |
| 7788 | 20 | 3000.00 | 5975.00 |
| 7839 | 10 | 5000.00 | 7450.00 |
| 7844 | 30 | 1500.00 | 8450.00 |
| 7900 | 30 | 950.00 | |
| 7876 | 20 | 1100.00 | 7075.00 |
| 7900 | 30 | 950.00 | 8450.00 |
| 7902 | 20 | 3000.00 | 10075.00 |
| 7934 | 10 | 1300.00 | 8750.00 |
+-------+--------+---------+-------------+
(14 rows)

!ok

# Test 5: FILTER with OVER and running window without PARTITION BY
select ename, job, hiredate,
avg(sal) over (order by hiredate, ename rows 3 preceding) as avg_sal,
avg(sal) filter (where job = 'MANAGER') over (order by hiredate, ename rows 3 preceding)
as avg_mgr_sal
from emp
order by hiredate, ename;
+--------+-----------+------------+---------+-------------+
| ENAME | JOB | HIREDATE | AVG_SAL | AVG_MGR_SAL |
+--------+-----------+------------+---------+-------------+
| SMITH | CLERK | 1980-12-17 | 800.00 | |
| BLAKE | MANAGER | 1981-01-05 | 1825.00 | 2850.00 |
| JONES | MANAGER | 1981-02-04 | 2208.33 | 2912.50 |
| ALLEN | SALESMAN | 1981-02-20 | 2056.25 | 2912.50 |
| WARD | SALESMAN | 1981-02-22 | 2168.75 | 2912.50 |
| CLARK | MANAGER | 1981-06-09 | 2068.75 | 2712.50 |
| TURNER | SALESMAN | 1981-09-08 | 1700.00 | 2450.00 |
| MARTIN | SALESMAN | 1981-09-28 | 1612.50 | 2450.00 |
| KING | PRESIDENT | 1981-11-17 | 2550.00 | 2450.00 |
| FORD | ANALYST | 1981-12-03 | 2687.50 | |
| JAMES | CLERK | 1981-12-03 | 2550.00 | |
| MILLER | CLERK | 1982-01-23 | 2562.50 | |
| SCOTT | ANALYST | 1987-04-19 | 2062.50 | |
| ADAMS | CLERK | 1987-05-23 | 1587.50 | |
+--------+-----------+------------+---------+-------------+
(14 rows)

!ok

# Test 6: FILTER on window value functions is not supported
select first_value(sal) filter (where job = 'MANAGER') over (order by hiredate)
from emp;
java.sql.SQLException: Error while executing SQL "select first_value(sal) filter (where job = 'MANAGER') over (order by hiredate)
from emp": FILTER clause is not supported for window function FIRST_VALUE
!error

# End winagg.iq
Loading