diff --git a/integ-test/build.gradle b/integ-test/build.gradle index d4780f90ea..17b88a901d 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -1328,6 +1328,20 @@ task integTestRemote(type: RestIntegTestTask) { // === Excludes: asserts a Lucene pushdown fragment absent on the AE route === excludeTestsMatching '*CalciteSortCommandIT.testPushdownSortCastToDoubleExpression' + + // === Excludes: CalciteStreamstatsCommandIT route divergences === + // Each test also carries an in-test @RequiresCapability(...) recording the reason. + // - CHAINED_STREAMSTATS_BY: chaining two streamstats where an upstream stage has `by` + // emits two ROW_NUMBER() sequence columns the Substrait converter names identically, + // so the stacked schema has a duplicate/ambiguous field name (500) or, for chained + // window streamstats, non-deterministic values. Fails single- and multi-shard. + excludeTestsMatching '*CalciteStreamstatsCommandIT.testMultipleStreamstats' + excludeTestsMatching '*CalciteStreamstatsCommandIT.testMultipleStreamstatsWithWindow' + excludeTestsMatching '*CalciteStreamstatsCommandIT.testMultipleStreamstatsWithNull1' + excludeTestsMatching '*CalciteStreamstatsCommandIT.testMultipleStreamstatsWithEval' + // - STREAMSTATS_SORT_NOT_HONORED: streamstats computes its window over the backend scan + // order, ignoring a preceding `| sort` (the OVER clause has no explicit ORDER BY). + excludeTestsMatching '*CalciteStreamstatsCommandIT.testStreamstatsAndSort' } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java index 43ede1606b..e70812e3c3 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java @@ -6,7 +6,9 @@ package org.opensearch.sql.calcite.remote; import static org.opensearch.sql.legacy.TestsConstants.*; +import static org.opensearch.sql.util.Capability.CHAINED_STREAMSTATS_BY; import static org.opensearch.sql.util.Capability.DOC_MUTATION; +import static org.opensearch.sql.util.Capability.STREAMSTATS_SORT_NOT_HONORED; import static org.opensearch.sql.util.MatcherUtils.*; import java.io.IOException; @@ -558,6 +560,7 @@ public void testStreamstatsGlobal() throws IOException { } @Test + @RequiresCapability(DOC_MUTATION) public void testStreamstatsGlobalWithNull() throws IOException { final int docId = 7; Request insertRequest = @@ -613,6 +616,7 @@ public void testStreamstatsGlobalWithNull() throws IOException { } @Test + @RequiresCapability(DOC_MUTATION) public void testStreamstatsGlobalWithNullBucket() throws IOException { final int docId = 7; Request insertRequest = @@ -718,6 +722,7 @@ public void testStreamstatsReset() throws IOException { } @Test + @RequiresCapability(DOC_MUTATION) public void testStreamstatsResetWithNull() throws IOException { final int docId = 7; Request insertRequest = @@ -773,6 +778,7 @@ public void testStreamstatsResetWithNull() throws IOException { } @Test + @RequiresCapability(DOC_MUTATION) public void testStreamstatsResetWithNullBucket() throws IOException { final int docId = 7; Request insertRequest = @@ -845,6 +851,7 @@ public void testUnsupportedWindowFunctions() { } @Test + @RequiresCapability(CHAINED_STREAMSTATS_BY) public void testMultipleStreamstats() throws IOException { JSONObject actual = executeQuery( @@ -863,6 +870,7 @@ public void testMultipleStreamstats() throws IOException { } @Test + @RequiresCapability(CHAINED_STREAMSTATS_BY) public void testMultipleStreamstatsWithWindow() throws IOException { // Test case from GitHub issue #4800: chained streamstats with window=2 JSONObject actual = @@ -899,6 +907,7 @@ public void testMultipleStreamstatsWithWindow() throws IOException { // causing Calcite's RelDecorrelator to fail on duplicate correlate references. @Test + @RequiresCapability(CHAINED_STREAMSTATS_BY) public void testMultipleStreamstatsWithNull1() throws IOException { JSONObject actual = executeQuery( @@ -1008,6 +1017,7 @@ public void testStreamstatsAndEventstats() throws IOException { } @Test + @RequiresCapability(STREAMSTATS_SORT_NOT_HONORED) public void testStreamstatsAndSort() throws IOException { JSONObject actual = executeQuery( @@ -1074,6 +1084,7 @@ public void testWhereInWithStreamstatsSubquery() throws IOException { } @Test + @RequiresCapability(CHAINED_STREAMSTATS_BY) public void testMultipleStreamstatsWithEval() throws IOException { JSONObject actual = executeQuery( diff --git a/integ-test/src/test/java/org/opensearch/sql/util/Capability.java b/integ-test/src/test/java/org/opensearch/sql/util/Capability.java index 1ca2b9adee..3bac06e7fc 100644 --- a/integ-test/src/test/java/org/opensearch/sql/util/Capability.java +++ b/integ-test/src/test/java/org/opensearch/sql/util/Capability.java @@ -469,7 +469,37 @@ public enum Capability { LUCENE_PUSHDOWN_EXPLAIN( "A test asserting a Lucene-specific pushdown fragment in the explain plan (e.g. SORT->[...])" + " can't pass on the analytics-engine route: the DataFusion scan produces a different" - + " plan with no Lucene pushdown fragment."); + + " plan with no Lucene pushdown fragment."), + + /** + * Chaining two {@code streamstats} commands where an upstream stage partitions {@code by} a group + * fails on the analytics-engine route. Each {@code streamstats ... by} stage projects a {@code + * ROW_NUMBER() OVER ()} sequence column to order its window; the Calcite plan aliases these + * distinctly ({@code __stream_seq__}), but the Substrait converter names both physical columns + * after the operator ({@code "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"}), + * so the stacked schema has a duplicate/ambiguous field name. Verified: it surfaces as a 500 + * ({@code Schema contains duplicate unqualified field name ...} / streaming-fragment failure) or, + * for chained {@code window} streamstats, non-deterministic window values. A single {@code + * streamstats by} (or a chain where only the final stage has {@code by}) works. + */ + CHAINED_STREAMSTATS_BY( + "Chaining two streamstats where an upstream stage partitions by a group fails on the" + + " analytics-engine route: both stages emit a ROW_NUMBER() sequence column the Substrait" + + " converter names identically, producing a duplicate/ambiguous field name (500) or" + + " non-deterministic window values."), + + /** + * {@code streamstats} computes its running/window aggregate over the backend scan order on the + * analytics-engine route, ignoring a preceding {@code | sort}. The {@code OVER} clause carries no + * explicit {@code ORDER BY} (streamstats orders by encounter order by design), so DataFusion + * evaluates the window in scan order rather than the sorted order the v2/Calcite path honors. + * Verified: {@code sort age | streamstats window=2 avg(age)} yields window values computed in + * insertion order, not age order, so the per-row aggregates diverge. + */ + STREAMSTATS_SORT_NOT_HONORED( + "streamstats computes its window over the backend scan order on the analytics-engine route," + + " ignoring a preceding | sort (the OVER clause has no explicit ORDER BY), so the window" + + " values diverge from the v2/Calcite path which honors the sort."); private final String reason;