Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,20 @@ task integTestRemote(type: RestIntegTestTask) {

// === Excludes: asserts a Lucene pushdown fragment absent on the AE route ===
excludeTestsMatching '*CalciteSortCommandIT.testPushdownSortCastToDoubleExpression'

// === Excludes: CalciteStreamstatsCommandIT route divergences ===
// Each test also carries an in-test @RequiresCapability(...) recording the reason.
// - CHAINED_STREAMSTATS_BY: chaining two streamstats where an upstream stage has `by`
// emits two ROW_NUMBER() sequence columns the Substrait converter names identically,
// so the stacked schema has a duplicate/ambiguous field name (500) or, for chained
// window streamstats, non-deterministic values. Fails single- and multi-shard.
excludeTestsMatching '*CalciteStreamstatsCommandIT.testMultipleStreamstats'
excludeTestsMatching '*CalciteStreamstatsCommandIT.testMultipleStreamstatsWithWindow'
excludeTestsMatching '*CalciteStreamstatsCommandIT.testMultipleStreamstatsWithNull1'
excludeTestsMatching '*CalciteStreamstatsCommandIT.testMultipleStreamstatsWithEval'
// - STREAMSTATS_SORT_NOT_HONORED: streamstats computes its window over the backend scan
// order, ignoring a preceding `| sort` (the OVER clause has no explicit ORDER BY).
excludeTestsMatching '*CalciteStreamstatsCommandIT.testStreamstatsAndSort'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
package org.opensearch.sql.calcite.remote;

import static org.opensearch.sql.legacy.TestsConstants.*;
import static org.opensearch.sql.util.Capability.CHAINED_STREAMSTATS_BY;
import static org.opensearch.sql.util.Capability.DOC_MUTATION;
import static org.opensearch.sql.util.Capability.STREAMSTATS_SORT_NOT_HONORED;
import static org.opensearch.sql.util.MatcherUtils.*;

import java.io.IOException;
Expand Down Expand Up @@ -558,6 +560,7 @@ public void testStreamstatsGlobal() throws IOException {
}

@Test
@RequiresCapability(DOC_MUTATION)
public void testStreamstatsGlobalWithNull() throws IOException {
final int docId = 7;
Request insertRequest =
Expand Down Expand Up @@ -613,6 +616,7 @@ public void testStreamstatsGlobalWithNull() throws IOException {
}

@Test
@RequiresCapability(DOC_MUTATION)
public void testStreamstatsGlobalWithNullBucket() throws IOException {
final int docId = 7;
Request insertRequest =
Expand Down Expand Up @@ -718,6 +722,7 @@ public void testStreamstatsReset() throws IOException {
}

@Test
@RequiresCapability(DOC_MUTATION)
public void testStreamstatsResetWithNull() throws IOException {
final int docId = 7;
Request insertRequest =
Expand Down Expand Up @@ -773,6 +778,7 @@ public void testStreamstatsResetWithNull() throws IOException {
}

@Test
@RequiresCapability(DOC_MUTATION)
public void testStreamstatsResetWithNullBucket() throws IOException {
final int docId = 7;
Request insertRequest =
Expand Down Expand Up @@ -845,6 +851,7 @@ public void testUnsupportedWindowFunctions() {
}

@Test
@RequiresCapability(CHAINED_STREAMSTATS_BY)
public void testMultipleStreamstats() throws IOException {
JSONObject actual =
executeQuery(
Expand All @@ -863,6 +870,7 @@ public void testMultipleStreamstats() throws IOException {
}

@Test
@RequiresCapability(CHAINED_STREAMSTATS_BY)
public void testMultipleStreamstatsWithWindow() throws IOException {
// Test case from GitHub issue #4800: chained streamstats with window=2
JSONObject actual =
Expand Down Expand Up @@ -899,6 +907,7 @@ public void testMultipleStreamstatsWithWindow() throws IOException {
// causing Calcite's RelDecorrelator to fail on duplicate correlate references.

@Test
@RequiresCapability(CHAINED_STREAMSTATS_BY)
public void testMultipleStreamstatsWithNull1() throws IOException {
JSONObject actual =
executeQuery(
Expand Down Expand Up @@ -1008,6 +1017,7 @@ public void testStreamstatsAndEventstats() throws IOException {
}

@Test
@RequiresCapability(STREAMSTATS_SORT_NOT_HONORED)
public void testStreamstatsAndSort() throws IOException {
JSONObject actual =
executeQuery(
Expand Down Expand Up @@ -1074,6 +1084,7 @@ public void testWhereInWithStreamstatsSubquery() throws IOException {
}

@Test
@RequiresCapability(CHAINED_STREAMSTATS_BY)
public void testMultipleStreamstatsWithEval() throws IOException {
JSONObject actual =
executeQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,37 @@ public enum Capability {
LUCENE_PUSHDOWN_EXPLAIN(
"A test asserting a Lucene-specific pushdown fragment in the explain plan (e.g. SORT->[...])"
+ " can't pass on the analytics-engine route: the DataFusion scan produces a different"
+ " plan with no Lucene pushdown fragment.");
+ " plan with no Lucene pushdown fragment."),

/**
* Chaining two {@code streamstats} commands where an upstream stage partitions {@code by} a group
* fails on the analytics-engine route. Each {@code streamstats ... by} stage projects a {@code
* ROW_NUMBER() OVER ()} sequence column to order its window; the Calcite plan aliases these
* distinctly ({@code __stream_seq__}), but the Substrait converter names both physical columns
* after the operator ({@code "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"}),
* so the stacked schema has a duplicate/ambiguous field name. Verified: it surfaces as a 500
* ({@code Schema contains duplicate unqualified field name ...} / streaming-fragment failure) or,
* for chained {@code window} streamstats, non-deterministic window values. A single {@code
* streamstats by} (or a chain where only the final stage has {@code by}) works.
*/
CHAINED_STREAMSTATS_BY(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, let's think how to maintain real backend capability here later. Or maybe there is better mechanism to make our IT work with various engines with different capabilities. Thanks.

"Chaining two streamstats where an upstream stage partitions by a group fails on the"
+ " analytics-engine route: both stages emit a ROW_NUMBER() sequence column the Substrait"
+ " converter names identically, producing a duplicate/ambiguous field name (500) or"
+ " non-deterministic window values."),

/**
* {@code streamstats} computes its running/window aggregate over the backend scan order on the
* analytics-engine route, ignoring a preceding {@code | sort}. The {@code OVER} clause carries no
* explicit {@code ORDER BY} (streamstats orders by encounter order by design), so DataFusion
* evaluates the window in scan order rather than the sorted order the v2/Calcite path honors.
* Verified: {@code sort age | streamstats window=2 avg(age)} yields window values computed in
* insertion order, not age order, so the per-row aggregates diverge.
*/
STREAMSTATS_SORT_NOT_HONORED(
"streamstats computes its window over the backend scan order on the analytics-engine route,"
+ " ignoring a preceding | sort (the OVER clause has no explicit ORDER BY), so the window"
+ " values diverge from the v2/Calcite path which honors the sort.");

private final String reason;

Expand Down
Loading