From 30067ad923f381e29e2c40821a7164a482152466 Mon Sep 17 00:00:00 2001 From: Taras Pashkevych Date: Sat, 18 Apr 2026 21:59:29 +0200 Subject: [PATCH 01/13] feat(db-engine-specs): add allows_offset_fetch capability flag --- superset/db_engine_specs/base.py | 7 ++++++ tests/unit_tests/db_engine_specs/test_base.py | 23 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/superset/db_engine_specs/base.py b/superset/db_engine_specs/base.py index bd1e2f9c3612..347790b3e226 100644 --- a/superset/db_engine_specs/base.py +++ b/superset/db_engine_specs/base.py @@ -494,6 +494,12 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods allows_sql_comments = True allows_escaped_colons = True + # Whether the engine supports OFFSET in SQL queries. Defaults to True; + # engines like Elasticsearch SQL that do not support OFFSET set this to + # False and are expected to implement `fetch_data_with_cursor` for + # pagination via another mechanism (e.g. Elasticsearch's cursor API). + allows_offset_fetch = True + # Whether ORDER BY clause can use aliases created in SELECT # that are the same as a source column allows_alias_to_source_column = True @@ -2515,6 +2521,7 @@ def get_public_information(cls) -> dict[str, Any]: "disable_ssh_tunneling": cls.disable_ssh_tunneling, "supports_dynamic_catalog": cls.supports_dynamic_catalog, "supports_oauth2": cls.supports_oauth2, + "allows_offset_fetch": cls.allows_offset_fetch, } @classmethod diff --git a/tests/unit_tests/db_engine_specs/test_base.py b/tests/unit_tests/db_engine_specs/test_base.py index 22ec7d0aa185..08f4353e97b1 100644 --- a/tests/unit_tests/db_engine_specs/test_base.py +++ b/tests/unit_tests/db_engine_specs/test_base.py @@ -1223,3 +1223,26 @@ def test_start_oauth2_dance_falls_back_to_url_for(mocker: MockerFixture) -> None error = exc_info.value.error assert error.extra["redirect_uri"] == fallback_uri + + +def test_base_spec_allows_offset_fetch_default_true() -> None: + """ + New engines opt-in to OFFSET support by default. Engines that do not + support OFFSET (like Elasticsearch SQL) opt out explicitly. + """ + from superset.db_engine_specs.base import BaseEngineSpec + + assert BaseEngineSpec.allows_offset_fetch is True + + +def test_base_spec_public_information_includes_allows_offset_fetch() -> None: + """ + The allows_offset_fetch capability is exposed via get_public_information + so the frontend can reason about it (e.g. future UI disablement). + """ + from superset.db_engine_specs.base import BaseEngineSpec + + info = BaseEngineSpec.get_public_information() + + assert "allows_offset_fetch" in info + assert info["allows_offset_fetch"] is True From 435b48140648db77f3c66fa50389718eef70936b Mon Sep 17 00:00:00 2001 From: Taras Pashkevych Date: Sat, 18 Apr 2026 22:01:54 +0200 Subject: [PATCH 02/13] =?UTF-8?q?fix(elasticsearch):=20opt=20out=20of=20OF?= =?UTF-8?q?FSET=20=E2=80=94=20ES=20SQL=20does=20not=20support=20it?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- superset/db_engine_specs/elasticsearch.py | 2 ++ .../db_engine_specs/test_elasticsearch.py | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/superset/db_engine_specs/elasticsearch.py b/superset/db_engine_specs/elasticsearch.py index 447605055ea3..84f5e3637218 100644 --- a/superset/db_engine_specs/elasticsearch.py +++ b/superset/db_engine_specs/elasticsearch.py @@ -39,6 +39,7 @@ class ElasticSearchEngineSpec(BaseEngineSpec): # pylint: disable=abstract-metho allows_joins = False allows_subqueries = True allows_sql_comments = False + allows_offset_fetch = False metadata = { "description": ( @@ -190,6 +191,7 @@ class OpenDistroEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method allows_joins = False allows_subqueries = True allows_sql_comments = False + allows_offset_fetch = False _time_grain_expressions = { None: "{col}", diff --git a/tests/unit_tests/db_engine_specs/test_elasticsearch.py b/tests/unit_tests/db_engine_specs/test_elasticsearch.py index 640f4aa39d0f..90635da6153e 100644 --- a/tests/unit_tests/db_engine_specs/test_elasticsearch.py +++ b/tests/unit_tests/db_engine_specs/test_elasticsearch.py @@ -92,3 +92,22 @@ def test_opendistro_sqla_column_label(original: str, expected: str) -> None: from superset.db_engine_specs.elasticsearch import OpenDistroEngineSpec assert OpenDistroEngineSpec.make_label_compatible(original) == expected + + +def test_elasticsearch_spec_opts_out_of_offset_fetch() -> None: + """ + Elasticsearch SQL does not support OFFSET. The spec must opt out so the + query builder does not emit OFFSET clauses that crash the parser. + """ + from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec + + assert ElasticSearchEngineSpec.allows_offset_fetch is False + + +def test_opendistro_spec_opts_out_of_offset_fetch() -> None: + """ + OpenDistro/OpenSearch SQL also does not support OFFSET. + """ + from superset.db_engine_specs.elasticsearch import OpenDistroEngineSpec + + assert OpenDistroEngineSpec.allows_offset_fetch is False From 4cce8a6add559d8343dc8a096ec9fdbf805cad93 Mon Sep 17 00:00:00 2001 From: Taras Pashkevych Date: Sat, 18 Apr 2026 22:09:56 +0200 Subject: [PATCH 03/13] fix(drill-detail): skip OFFSET when engine spec disallows it Engines like Elasticsearch SQL do not support OFFSET; emitting the clause crashes the parser with 'mismatched input OFFSET expecting '. Guard the .offset() call with db_engine_spec.allows_offset_fetch. --- superset/models/helpers.py | 2 +- .../unit_tests/models/test_helpers_offset.py | 40 +++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) create mode 100644 tests/unit_tests/models/test_helpers_offset.py diff --git a/superset/models/helpers.py b/superset/models/helpers.py index 71958d8cf837..d9944c109f7e 100644 --- a/superset/models/helpers.py +++ b/superset/models/helpers.py @@ -3285,7 +3285,7 @@ def get_sqla_query( # pylint: disable=too-many-arguments,too-many-locals,too-ma if row_limit: qry = qry.limit(row_limit) - if row_offset: + if row_offset and self.database.db_engine_spec.allows_offset_fetch: qry = qry.offset(row_offset) if series_limit and groupby_series_columns: diff --git a/tests/unit_tests/models/test_helpers_offset.py b/tests/unit_tests/models/test_helpers_offset.py new file mode 100644 index 000000000000..4cff384d95b8 --- /dev/null +++ b/tests/unit_tests/models/test_helpers_offset.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from pathlib import Path + + +def test_helpers_guards_offset_with_allows_offset_fetch_flag() -> None: + """ + Structural regression test: the .offset() call in get_sqla_query must + be guarded by db_engine_spec.allows_offset_fetch. Removing the guard + regresses Elasticsearch drill-to-detail pagination (crashes on page 2+ + with 'mismatched input OFFSET'). + + We assert on the source file rather than invoking get_sqla_query + directly because the full call path requires an SqlaTable with + columns/metrics/database — overkill for a one-line guard. + """ + source = Path("superset/models/helpers.py").read_text() + + guard_line = ( + "if row_offset and self.database.db_engine_spec.allows_offset_fetch:" + ) + assert guard_line in source, ( + "The OFFSET guard is missing from superset/models/helpers.py — " + "Elasticsearch (and any other engine with allows_offset_fetch=False) " + "will crash when drill-to-detail requests page 2+." + ) From 0598928c67d140d55f46db74d3b8809475e4e558 Mon Sep 17 00:00:00 2001 From: Taras Pashkevych Date: Sun, 19 Apr 2026 14:17:15 +0200 Subject: [PATCH 04/13] feat(elasticsearch): paginate drill-detail samples via cursor Engines without SQL OFFSET support (Elasticsearch, OpenDistro) now paginate drill-to-detail samples through the driver's cursor API instead of emitting OFFSET. Adds `fetch_data_with_cursor` on both engine specs and branches `get_samples` on the `allows_offset_fetch` capability flag. Exposes the flag via `EngineInformationSchema` and documents it in UPDATING.md. --- UPDATING.md | 4 + superset/databases/schemas.py | 8 + superset/db_engine_specs/elasticsearch.py | 114 +++++++ superset/views/datasource/utils.py | 89 ++++- tests/unit_tests/databases/test_schemas.py | 37 ++ .../db_engine_specs/test_elasticsearch.py | 234 +++++++++++++ .../unit_tests/models/test_helpers_offset.py | 80 ++++- tests/unit_tests/views/__init__.py | 0 .../unit_tests/views/datasource/test_utils.py | 316 ++++++++++++++++++ 9 files changed, 862 insertions(+), 20 deletions(-) create mode 100644 tests/unit_tests/databases/test_schemas.py create mode 100644 tests/unit_tests/views/__init__.py create mode 100644 tests/unit_tests/views/datasource/test_utils.py diff --git a/UPDATING.md b/UPDATING.md index 27fc3428b9ee..a1ba578ca54b 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -24,6 +24,10 @@ assists people when migrating to a new version. ## Next +### Engine spec capability flag: `allows_offset_fetch` + +A new `BaseEngineSpec.allows_offset_fetch` attribute (default `True`) indicates whether a database engine supports the SQL `OFFSET` clause. Engines that do not support `OFFSET` — such as Elasticsearch SQL and OpenDistro — opt out by setting it to `False`, and Superset uses each engine's cursor API to paginate drill-to-detail samples instead of emitting `OFFSET`. Downstream forks maintaining custom engine specs may set the flag to `False` (and implement `fetch_data_with_cursor`) to avoid crashes when paginated drill-to-detail queries are run against engines without `OFFSET` support. + ### Granular Export Controls A new feature flag `GRANULAR_EXPORT_CONTROLS` introduces three fine-grained permissions that replace the legacy `can_csv` permission: diff --git a/superset/databases/schemas.py b/superset/databases/schemas.py index da0dd3cf3403..111cbeaf8eee 100644 --- a/superset/databases/schemas.py +++ b/superset/databases/schemas.py @@ -1067,6 +1067,14 @@ class EngineInformationSchema(Schema): supports_oauth2 = fields.Boolean( metadata={"description": "The database supports OAuth2"} ) + allows_offset_fetch = fields.Boolean( + metadata={ + "description": ( + "The database supports OFFSET in SQL queries. " + "Engines like Elasticsearch SQL return False." + ) + } + ) class DatabaseConnectionSchema(Schema): diff --git a/superset/db_engine_specs/elasticsearch.py b/superset/db_engine_specs/elasticsearch.py index 84f5e3637218..d4674e232f5c 100644 --- a/superset/db_engine_specs/elasticsearch.py +++ b/superset/db_engine_specs/elasticsearch.py @@ -32,6 +32,72 @@ logger = logging.getLogger() +def _fetch_page_via_cursor( + database: Any, + sql: str, + page_index: int, + page_size: int, + sql_path: str, + close_path: str, +) -> tuple[list[list[Any]], list[str], None]: + """ + Iterate Elasticsearch/OpenSearch SQL cursor pagination to return a single + page of results. + + Executes ``sql`` with ``fetch_size = page_size``, then sends cursor + follow-up requests ``page_index`` times to skip earlier pages. Closes the + cursor when done to release server-side state. Returns + ``(rows, columns, None)``; the third slot is reserved for a future + cursor-handle API. + + If the dataset is exhausted before reaching ``page_index``, returns an + empty rows list with the column names from the initial request. + """ + with database.get_raw_connection() as conn: + transport = conn.es.transport + response = transport.perform_request( + "POST", + sql_path, + body={"query": sql, "fetch_size": page_size}, + ) + columns = [col["name"] for col in response.get("columns", [])] + rows = response.get("rows", []) + cursor = response.get("cursor") + + try: + for _ in range(page_index): + if not cursor: + # Dataset exhausted before reaching the target page — + # no cursor to close (ES returns no cursor on the final + # page). Return immediately with empty rows. + return [], columns, None + response = transport.perform_request( + "POST", + sql_path, + body={"cursor": cursor}, + ) + rows = response.get("rows", []) + cursor = response.get("cursor") + + return rows, columns, None + finally: + if cursor: + # Best-effort cleanup. If close itself fails we don't want + # to mask the original error (if any) — swallow and log. + try: + transport.perform_request( + "POST", + close_path, + body={"cursor": cursor}, + ) + except Exception: # pylint: disable=broad-except + logger.warning( + "Failed to close Elasticsearch SQL cursor at %s", + close_path, + exc_info=True, + ) + + class ElasticSearchEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method engine = "elasticsearch" engine_name = "Elasticsearch" @@ -137,6 +203,30 @@ class ElasticSearchEngineSpec(BaseEngineSpec): # pylint: disable=abstract-metho type_code_map: dict[int, str] = {} # loaded from get_datatype only if needed + SQL_ENDPOINT = "/_sql" + SQL_CLOSE_ENDPOINT = "/_sql/close" + + @classmethod + def fetch_data_with_cursor( + cls, + database: Any, + sql: str, + page_index: int, + page_size: int, + ) -> tuple[list[list[Any]], list[str], None]: + """ + Fetch a single page of results using Elasticsearch cursor pagination. + See ``_fetch_page_via_cursor`` for the protocol. + """ + return _fetch_page_via_cursor( + database=database, + sql=sql, + page_index=page_index, + page_size=page_size, + sql_path=cls.SQL_ENDPOINT, + close_path=cls.SQL_CLOSE_ENDPOINT, + ) + @classmethod def get_dbapi_exception_mapping(cls) -> dict[type[Exception], type[Exception]]: # pylint: disable=import-error,import-outside-toplevel @@ -206,6 +296,30 @@ class OpenDistroEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method engine = "odelasticsearch" engine_name = "OpenSearch (OpenDistro)" + SQL_ENDPOINT = "/_opendistro/_sql" + SQL_CLOSE_ENDPOINT = "/_opendistro/_sql/close" + + @classmethod + def fetch_data_with_cursor( + cls, + database: Any, + sql: str, + page_index: int, + page_size: int, + ) -> tuple[list[list[Any]], list[str], None]: + """ + Fetch a single page of results using OpenDistro SQL cursor pagination. + Same protocol as ElasticSearchEngineSpec, different endpoint paths. + """ + return _fetch_page_via_cursor( + database=database, + sql=sql, + page_index=page_index, + page_size=page_size, + sql_path=cls.SQL_ENDPOINT, + close_path=cls.SQL_CLOSE_ENDPOINT, + ) + @classmethod def convert_dttm( cls, target_type: str, dttm: datetime, db_extra: Optional[dict[str, Any]] = None diff --git a/superset/views/datasource/utils.py b/superset/views/datasource/utils.py index deec4067a56a..df583b24b32d 100644 --- a/superset/views/datasource/utils.py +++ b/superset/views/datasource/utils.py @@ -169,11 +169,33 @@ def get_samples( # pylint: disable=too-many-arguments if count_star_data.get("status") == QueryStatus.FAILED: raise DatasetSamplesFailedError(count_star_data.get("error")) - sample_data = samples_instance.get_payload()["queries"][0] + engine_spec = datasource.database.db_engine_spec + row_offset = limit_clause["row_offset"] + row_limit = limit_clause["row_limit"] - if sample_data.get("status") == QueryStatus.FAILED: - QueryCacheManager.delete(count_star_data.get("cache_key"), CacheRegion.DATA) - raise DatasetSamplesFailedError(sample_data.get("error")) + if not engine_spec.allows_offset_fetch and row_offset > 0: + try: + sample_data = _fetch_samples_via_cursor( + datasource=datasource, + samples_instance=samples_instance, + count_star_data=count_star_data, + page_index=row_offset // row_limit, + page_size=row_limit, + ) + except DatasetSamplesFailedError: + raise + except Exception as exc: + QueryCacheManager.delete( + count_star_data.get("cache_key"), CacheRegion.DATA + ) + raise DatasetSamplesFailedError(str(exc)) from exc + else: + sample_data = samples_instance.get_payload()["queries"][0] + if sample_data.get("status") == QueryStatus.FAILED: + QueryCacheManager.delete( + count_star_data.get("cache_key"), CacheRegion.DATA + ) + raise DatasetSamplesFailedError(sample_data.get("error") or "") sample_data["page"] = page sample_data["per_page"] = per_page @@ -181,3 +203,62 @@ def get_samples( # pylint: disable=too-many-arguments return sample_data except (IndexError, KeyError) as exc: raise DatasetSamplesFailedError from exc + + +def _fetch_samples_via_cursor( + datasource: Any, + samples_instance: Any, + count_star_data: dict[str, Any], + page_index: int, + page_size: int, +) -> dict[str, Any]: + """ + Fetch a single page of samples via engine-spec cursor pagination. + + Used when ``datasource.database.db_engine_spec.allows_offset_fetch`` is + False and a non-first page is requested. Extracts the compiled SQL from + the already-built QueryContext, delegates cursor iteration to the engine + spec, and assembles a response dict compatible with the normal samples + path. + + The samples payload is also executed (its SQL is OFFSET-stripped by the + models/helpers.py guard) to obtain the authoritative ``colnames`` and + ``coltypes`` that the frontend grid needs for type-based cell renderers + — ensuring page 2+ renders identically to page 1. + """ + # Run the normal samples payload once to source authoritative colnames + # and coltypes for the paginated result set. The helpers.py OFFSET guard + # keeps this to a single cheap page-1 query for the cursor-path engine. + sample_payload = samples_instance.get_payload()["queries"][0] + if sample_payload.get("status") == QueryStatus.FAILED: + QueryCacheManager.delete(count_star_data.get("cache_key"), CacheRegion.DATA) + raise DatasetSamplesFailedError(sample_payload.get("error") or "") + + query_obj = samples_instance.queries[0] + query_obj_dict = query_obj.to_dict() + # The engine spec's allows_offset_fetch guard in models/helpers.py keeps + # OFFSET out of the SQL already; we ask for a LIMIT large enough for the + # cursor to iterate across pages. + sample_row_limit = app.config.get("SAMPLES_ROW_LIMIT", 1000) + query_obj_dict["row_limit"] = sample_row_limit + query_obj_dict["row_offset"] = 0 + + sql = datasource.get_query_str(query_obj_dict) + + engine_spec = datasource.database.db_engine_spec + rows, cursor_colnames, _ = engine_spec.fetch_data_with_cursor( + database=datasource.database, + sql=sql, + page_index=page_index, + page_size=page_size, + ) + + colnames = sample_payload.get("colnames") or cursor_colnames + coltypes = sample_payload.get("coltypes") or [] + + return { + "data": [dict(zip(colnames, row, strict=False)) for row in rows], + "colnames": colnames, + "coltypes": coltypes, + "status": QueryStatus.SUCCESS, + } diff --git a/tests/unit_tests/databases/test_schemas.py b/tests/unit_tests/databases/test_schemas.py new file mode 100644 index 000000000000..38d203151054 --- /dev/null +++ b/tests/unit_tests/databases/test_schemas.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +def test_engine_information_schema_includes_allows_offset_fetch() -> None: + """ + The frontend consumes EngineInformationSchema to know what the engine + can do. allows_offset_fetch is a new field that must pass through. + """ + from superset.databases.schemas import EngineInformationSchema + + schema = EngineInformationSchema() + result = schema.dump( + { + "supports_file_upload": True, + "disable_ssh_tunneling": False, + "supports_dynamic_catalog": False, + "supports_oauth2": False, + "allows_offset_fetch": False, + } + ) + + assert result["allows_offset_fetch"] is False diff --git a/tests/unit_tests/db_engine_specs/test_elasticsearch.py b/tests/unit_tests/db_engine_specs/test_elasticsearch.py index 90635da6153e..b4ab47404362 100644 --- a/tests/unit_tests/db_engine_specs/test_elasticsearch.py +++ b/tests/unit_tests/db_engine_specs/test_elasticsearch.py @@ -16,6 +16,7 @@ # under the License. from datetime import datetime from typing import Any, Optional +from unittest.mock import MagicMock import pytest from sqlalchemy import column # noqa: F401 @@ -111,3 +112,236 @@ def test_opendistro_spec_opts_out_of_offset_fetch() -> None: from superset.db_engine_specs.elasticsearch import OpenDistroEngineSpec assert OpenDistroEngineSpec.allows_offset_fetch is False + + +def _build_fake_database(transport_responses: list[dict[str, Any]]) -> MagicMock: + """ + Build a mocked Database whose get_raw_connection() yields a connection + whose es.transport.perform_request returns transport_responses sequentially. + """ + database = MagicMock(name="Database") + + responses_iter = iter(transport_responses) + + def perform_request(method, path, body=None, **_kwargs): + return next(responses_iter) + + transport = MagicMock() + transport.perform_request.side_effect = perform_request + conn = MagicMock() + conn.es.transport = transport + + ctx = MagicMock() + ctx.__enter__ = MagicMock(return_value=conn) + ctx.__exit__ = MagicMock(return_value=False) + database.get_raw_connection.return_value = ctx + database._transport = transport # expose for assertions + return database + + +def test_fetch_data_with_cursor_returns_first_page_when_page_index_zero() -> None: + """ + Page index 0 = return the rows from the initial query, no cursor + iteration needed. The cursor must still be closed if present. + """ + from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec + + database = _build_fake_database( + [ + { + "columns": [{"name": "a"}, {"name": "b"}], + "rows": [[1, "x"], [2, "y"]], + "cursor": "CUR-1", + }, + {}, # close + ] + ) + + rows, cols, _ = ElasticSearchEngineSpec.fetch_data_with_cursor( + database=database, + sql="SELECT a, b FROM idx", + page_index=0, + page_size=2, + ) + + assert rows == [[1, "x"], [2, "y"]] + assert cols == ["a", "b"] + + calls = database._transport.perform_request.call_args_list + assert len(calls) == 2 + assert calls[0][0][0] == "POST" + assert calls[0][0][1] == "/_sql" + assert calls[0].kwargs["body"] == {"query": "SELECT a, b FROM idx", "fetch_size": 2} + assert calls[1][0][1] == "/_sql/close" + assert calls[1].kwargs["body"] == {"cursor": "CUR-1"} + + +def test_fetch_data_with_cursor_iterates_to_target_page() -> None: + """ + For page_index=2, the code executes the initial query, then sends the + cursor twice. The rows returned belong to the third page. + """ + from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec + + database = _build_fake_database( + [ + {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"}, + {"rows": [[1]], "cursor": "C2"}, + {"rows": [[2]], "cursor": "C3"}, + {}, # close + ] + ) + + rows, cols, _ = ElasticSearchEngineSpec.fetch_data_with_cursor( + database=database, + sql="SELECT a FROM idx", + page_index=2, + page_size=1, + ) + + assert rows == [[2]] + assert cols == ["a"] + + calls = database._transport.perform_request.call_args_list + assert len(calls) == 4 + assert calls[1].kwargs["body"] == {"cursor": "C1"} + assert calls[2].kwargs["body"] == {"cursor": "C2"} + assert calls[3][0][1] == "/_sql/close" + assert calls[3].kwargs["body"] == {"cursor": "C3"} + + +def test_fetch_data_with_cursor_returns_empty_when_dataset_exhausted() -> None: + """ + If the dataset has fewer pages than the requested page_index, the + cursor becomes falsy mid-iteration. Return empty rows, do not call + close, do not raise. + """ + from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec + + database = _build_fake_database( + [ + {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"}, + {"rows": [[1]]}, # no cursor → dataset ends here + ] + ) + + rows, cols, _ = ElasticSearchEngineSpec.fetch_data_with_cursor( + database=database, + sql="SELECT a FROM idx", + page_index=5, + page_size=1, + ) + + assert rows == [] + assert cols == ["a"] + assert len(database._transport.perform_request.call_args_list) == 2 + + +def test_fetch_data_with_cursor_does_not_close_when_no_cursor_present() -> None: + """ + Some responses (tiny result sets) come back without a cursor token. + The code must not send a close request with a missing cursor. + """ + from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec + + database = _build_fake_database( + [ + {"columns": [{"name": "a"}], "rows": [[0], [1]]}, + ] + ) + + rows, _, _ = ElasticSearchEngineSpec.fetch_data_with_cursor( + database=database, + sql="SELECT a FROM idx", + page_index=0, + page_size=50, + ) + + assert rows == [[0], [1]] + assert len(database._transport.perform_request.call_args_list) == 1 + + +def test_fetch_data_with_cursor_closes_cursor_even_if_iteration_raises() -> None: + """ + If an intermediate cursor request raises, the cursor from the most + recent successful response must still be closed. Prevents server-side + cursor leaks on transport errors. + """ + from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec + + class BoomError(RuntimeError): + pass + + responses = [ + {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"}, + ] + raises_on_call_index = 1 + + call_count = {"n": 0} + recorded_close = {} + + def perform_request(method, path, body=None, **_kwargs): + call_count["n"] += 1 + # calls: 0=initial query, 1=cursor follow-up (raises), 2=close + if call_count["n"] - 1 == raises_on_call_index: + raise BoomError("transport blew up") + if path.endswith("/close"): + recorded_close["body"] = body + return {} + return responses[call_count["n"] - 1] + + from unittest.mock import MagicMock + + import pytest + + transport = MagicMock() + transport.perform_request.side_effect = perform_request + conn = MagicMock() + conn.es.transport = transport + ctx = MagicMock() + ctx.__enter__ = MagicMock(return_value=conn) + ctx.__exit__ = MagicMock(return_value=False) + database = MagicMock() + database.get_raw_connection.return_value = ctx + + with pytest.raises(BoomError): + ElasticSearchEngineSpec.fetch_data_with_cursor( + database=database, + sql="SELECT a FROM idx", + page_index=3, + page_size=1, + ) + + assert recorded_close.get("body") == {"cursor": "C1"}, ( + "The cursor from the last successful response must be closed " + "even when a later iteration raises." + ) + + +def test_opendistro_fetch_data_with_cursor_uses_opendistro_endpoints() -> None: + """ + OpenDistro's SQL plugin historically lives at /_opendistro/_sql rather + than /_sql. Verify the classmethod sends requests to the OpenDistro paths. + """ + from superset.db_engine_specs.elasticsearch import OpenDistroEngineSpec + + database = _build_fake_database( + [ + {"columns": [{"name": "a"}], "rows": [[42]], "cursor": "OD-1"}, + {}, # close + ] + ) + + rows, cols, _ = OpenDistroEngineSpec.fetch_data_with_cursor( + database=database, + sql="SELECT a FROM idx", + page_index=0, + page_size=1, + ) + + assert rows == [[42]] + assert cols == ["a"] + + calls = database._transport.perform_request.call_args_list + assert calls[0][0][1] == "/_opendistro/_sql" + assert calls[1][0][1] == "/_opendistro/_sql/close" diff --git a/tests/unit_tests/models/test_helpers_offset.py b/tests/unit_tests/models/test_helpers_offset.py index 4cff384d95b8..c431584c3574 100644 --- a/tests/unit_tests/models/test_helpers_offset.py +++ b/tests/unit_tests/models/test_helpers_offset.py @@ -14,27 +14,75 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import ast from pathlib import Path +HELPERS_PATH = ( + Path(__file__).resolve().parents[3] / "superset" / "models" / "helpers.py" +) + + +def _uses_allows_offset_fetch(node: ast.AST) -> bool: + """True if any attribute access on `node` references 'allows_offset_fetch'.""" + return any( + isinstance(child, ast.Attribute) and child.attr == "allows_offset_fetch" + for child in ast.walk(node) + ) + + +def _is_qry_offset_assignment(stmt: ast.AST) -> bool: + """True if stmt is `qry = qry.offset(...)` (any LHS, call to `.offset`).""" + if not isinstance(stmt, ast.Assign): + return False + call = stmt.value + if not isinstance(call, ast.Call): + return False + func = call.func + return isinstance(func, ast.Attribute) and func.attr == "offset" + def test_helpers_guards_offset_with_allows_offset_fetch_flag() -> None: """ - Structural regression test: the .offset() call in get_sqla_query must - be guarded by db_engine_spec.allows_offset_fetch. Removing the guard - regresses Elasticsearch drill-to-detail pagination (crashes on page 2+ - with 'mismatched input OFFSET'). - - We assert on the source file rather than invoking get_sqla_query - directly because the full call path requires an SqlaTable with - columns/metrics/database — overkill for a one-line guard. - """ - source = Path("superset/models/helpers.py").read_text() + Regression guard: the `.offset()` call in get_sqla_query must be wrapped + in an `if` that checks `allows_offset_fetch`. Without this guard, + engines that do not support OFFSET (Elasticsearch SQL) crash drill- + to-detail on page 2+. - guard_line = ( - "if row_offset and self.database.db_engine_spec.allows_offset_fetch:" + We parse the AST rather than grep the source so the test survives + Black-style reformatting and trivial refactors. + """ + source = HELPERS_PATH.read_text() + assert "allows_offset_fetch" in source, ( + "helpers.py no longer references allows_offset_fetch; the OFFSET " + "guard is gone — Elasticsearch drill-to-detail will crash on page 2+." ) - assert guard_line in source, ( - "The OFFSET guard is missing from superset/models/helpers.py — " - "Elasticsearch (and any other engine with allows_offset_fetch=False) " - "will crash when drill-to-detail requests page 2+." + + tree = ast.parse(source) + unguarded: list[int] = [] + + class Visitor(ast.NodeVisitor): + def __init__(self) -> None: + self._in_guarded_if = 0 + + def visit_If(self, node: ast.If) -> None: # noqa: N802 + if _uses_allows_offset_fetch(node.test): + self._in_guarded_if += 1 + for child in node.body: + self.visit(child) + self._in_guarded_if -= 1 + for child in node.orelse: + self.visit(child) + else: + self.generic_visit(node) + + def visit_Assign(self, node: ast.Assign) -> None: # noqa: N802 + if _is_qry_offset_assignment(node) and self._in_guarded_if == 0: + unguarded.append(node.lineno) + self.generic_visit(node) + + Visitor().visit(tree) + assert not unguarded, ( + f"Unguarded .offset() call(s) in helpers.py at line(s) {unguarded}. " + "Wrap with `if ... allows_offset_fetch:` to prevent OFFSET emission " + "on engines that cannot parse it (e.g. Elasticsearch SQL)." ) diff --git a/tests/unit_tests/views/__init__.py b/tests/unit_tests/views/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/unit_tests/views/datasource/test_utils.py b/tests/unit_tests/views/datasource/test_utils.py new file mode 100644 index 000000000000..c484626df6af --- /dev/null +++ b/tests/unit_tests/views/datasource/test_utils.py @@ -0,0 +1,316 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from unittest.mock import MagicMock, patch + +import pytest + + +@pytest.fixture +def fake_datasource_factory(): + """Builds a MagicMock datasource whose db_engine_spec is configurable.""" + + def _build(allows_offset_fetch: bool) -> MagicMock: + datasource = MagicMock(name="SqlaTable") + datasource.type = "table" + datasource.id = 1 + datasource.database.db_engine_spec.allows_offset_fetch = allows_offset_fetch + return datasource + + return _build + + +def test_get_samples_uses_normal_path_when_engine_supports_offset( + fake_datasource_factory, +): + """ + Engines with allows_offset_fetch=True continue to use the existing + QueryContext/get_payload path. No cursor-method calls. + """ + from superset.views.datasource import utils + + datasource = fake_datasource_factory(allows_offset_fetch=True) + datasource.database.db_engine_spec.fetch_data_with_cursor = MagicMock() + + with ( + patch.object( + utils, "DatasourceDAO", MagicMock(get_datasource=lambda **kw: datasource) + ), + patch.object(utils, "QueryContextFactory") as qcf, + ): + samples_ctx = MagicMock() + samples_ctx.get_payload.return_value = { + "queries": [ + { + "data": [{"a": 1}], + "colnames": ["a"], + "coltypes": [], + "status": "success", + } + ] + } + count_ctx = MagicMock() + count_ctx.get_payload.return_value = { + "queries": [{"data": [{"COUNT(*)": 42}], "status": "success"}] + } + qcf.return_value.create.side_effect = [samples_ctx, count_ctx] + + result = utils.get_samples( + datasource_type="table", + datasource_id=1, + page=2, + per_page=50, + ) + + assert result["data"] == [{"a": 1}] + assert result["page"] == 2 + assert result["per_page"] == 50 + assert result["total_count"] == 42 + datasource.database.db_engine_spec.fetch_data_with_cursor.assert_not_called() + + +def test_get_samples_uses_cursor_path_when_engine_disallows_offset( + fake_datasource_factory, +): + """ + When the engine reports allows_offset_fetch=False and the requested + page is > 1, get_samples delegates to fetch_data_with_cursor. + """ + from flask import current_app + + from superset.views.datasource import utils + + datasource = fake_datasource_factory(allows_offset_fetch=False) + datasource.get_query_str.return_value = "SELECT a FROM idx LIMIT 1000" + datasource.database.db_engine_spec.fetch_data_with_cursor.return_value = ( + [[99]], + ["a"], + None, + ) + + samples_ctx = MagicMock() + samples_ctx.queries = [MagicMock(to_dict=MagicMock(return_value={}))] + samples_ctx.get_payload.return_value = { + "queries": [ + { + "data": [], + "colnames": ["a"], + "coltypes": [2], + "status": "success", + } + ] + } + count_ctx = MagicMock() + count_ctx.get_payload.return_value = { + "queries": [{"data": [{"COUNT(*)": 200}], "status": "success"}] + } + + with ( + patch.object( + utils, "DatasourceDAO", MagicMock(get_datasource=lambda **kw: datasource) + ), + patch.object(utils, "QueryContextFactory") as qcf, + ): + qcf.return_value.create.side_effect = [samples_ctx, count_ctx] + + result = utils.get_samples( + datasource_type="table", + datasource_id=1, + page=3, + per_page=50, + ) + + datasource.database.db_engine_spec.fetch_data_with_cursor.assert_called_once() + kwargs = datasource.database.db_engine_spec.fetch_data_with_cursor.call_args.kwargs + assert kwargs["page_index"] == 2 + assert kwargs["page_size"] == 50 + + # Lock in the SQL-rewrite contract: cursor path must ask datasource for + # SQL with row_offset=0 and row_limit=SAMPLES_ROW_LIMIT, so the compiled + # SQL contains the full page-scan window (no OFFSET). + expected_row_limit = current_app.config.get("SAMPLES_ROW_LIMIT", 1000) + get_query_str_args = datasource.get_query_str.call_args.args + assert len(get_query_str_args) == 1 + query_obj_dict = get_query_str_args[0] + assert query_obj_dict["row_offset"] == 0 + assert query_obj_dict["row_limit"] == expected_row_limit + + assert result["data"] == [{"a": 99}] + assert result["colnames"] == ["a"] + assert result["page"] == 3 + assert result["per_page"] == 50 + assert result["total_count"] == 200 + + +def test_get_samples_cursor_path_propagates_coltypes_from_samples_payload( + fake_datasource_factory, +): + """ + Issue 1: coltypes from the normal samples payload must flow into the + cursor-path response dict so that SamplesPane's useGridColumns() picks + up type-based cell renderers on page 2+. + """ + from superset.views.datasource import utils + + datasource = fake_datasource_factory(allows_offset_fetch=False) + datasource.get_query_str.return_value = "SELECT a, b FROM idx LIMIT 1000" + datasource.database.db_engine_spec.fetch_data_with_cursor.return_value = ( + [["x", 1]], + ["a", "b"], + None, + ) + + samples_ctx = MagicMock() + samples_ctx.queries = [MagicMock(to_dict=MagicMock(return_value={}))] + samples_ctx.get_payload.return_value = { + "queries": [ + { + "data": [], + "colnames": ["a", "b"], + "coltypes": [2, 1], + "status": "success", + } + ] + } + count_ctx = MagicMock() + count_ctx.get_payload.return_value = { + "queries": [{"data": [{"COUNT(*)": 2000}], "status": "success"}] + } + + with ( + patch.object( + utils, "DatasourceDAO", MagicMock(get_datasource=lambda **kw: datasource) + ), + patch.object(utils, "QueryContextFactory") as qcf, + ): + qcf.return_value.create.side_effect = [samples_ctx, count_ctx] + + result = utils.get_samples( + datasource_type="table", + datasource_id=1, + page=2, + per_page=50, + ) + + assert result["coltypes"] == [2, 1] + assert result["colnames"] == ["a", "b"] + assert result["data"] == [{"a": "x", "b": 1}] + + +def test_get_samples_cursor_path_cleans_count_cache_on_failure( + fake_datasource_factory, +): + """ + Issue 2: if fetch_data_with_cursor raises, the count-star cache must be + evicted (mirroring the normal FAILED path) and the error re-raised as + DatasetSamplesFailedError. + """ + from superset.commands.dataset.exceptions import DatasetSamplesFailedError + from superset.constants import CacheRegion + from superset.views.datasource import utils + + datasource = fake_datasource_factory(allows_offset_fetch=False) + datasource.get_query_str.return_value = "SELECT a FROM idx LIMIT 1000" + datasource.database.db_engine_spec.fetch_data_with_cursor.side_effect = ( + RuntimeError("boom") + ) + + samples_ctx = MagicMock() + samples_ctx.queries = [MagicMock(to_dict=MagicMock(return_value={}))] + samples_ctx.get_payload.return_value = { + "queries": [ + { + "data": [], + "colnames": ["a"], + "coltypes": [2], + "status": "success", + } + ] + } + count_ctx = MagicMock() + count_ctx.get_payload.return_value = { + "queries": [ + { + "data": [{"COUNT(*)": 200}], + "status": "success", + "cache_key": "count-cache-key", + } + ] + } + + with ( + patch.object( + utils, "DatasourceDAO", MagicMock(get_datasource=lambda **kw: datasource) + ), + patch.object(utils, "QueryContextFactory") as qcf, + patch.object(utils, "QueryCacheManager") as cache_mgr, + ): + qcf.return_value.create.side_effect = [samples_ctx, count_ctx] + + with pytest.raises(DatasetSamplesFailedError): + utils.get_samples( + datasource_type="table", + datasource_id=1, + page=3, + per_page=50, + ) + + cache_mgr.delete.assert_called_once_with("count-cache-key", CacheRegion.DATA) + + +def test_get_samples_cursor_path_unused_for_page_one(fake_datasource_factory): + """ + Page 1 (row_offset = 0) does not need cursor iteration — the normal + path already returns the first page correctly without emitting OFFSET. + Keep the fast path. + """ + from superset.views.datasource import utils + + datasource = fake_datasource_factory(allows_offset_fetch=False) + datasource.database.db_engine_spec.fetch_data_with_cursor = MagicMock() + + samples_ctx = MagicMock() + samples_ctx.get_payload.return_value = { + "queries": [ + { + "data": [{"a": 1}], + "colnames": ["a"], + "coltypes": [], + "status": "success", + } + ] + } + count_ctx = MagicMock() + count_ctx.get_payload.return_value = { + "queries": [{"data": [{"COUNT(*)": 1}], "status": "success"}] + } + + with ( + patch.object( + utils, "DatasourceDAO", MagicMock(get_datasource=lambda **kw: datasource) + ), + patch.object(utils, "QueryContextFactory") as qcf, + ): + qcf.return_value.create.side_effect = [samples_ctx, count_ctx] + + utils.get_samples( + datasource_type="table", + datasource_id=1, + page=1, + per_page=50, + ) + + datasource.database.db_engine_spec.fetch_data_with_cursor.assert_not_called() From 95934b05af538b2930b49977639385743a872393 Mon Sep 17 00:00:00 2001 From: Taras Pashkevych Date: Mon, 20 Apr 2026 09:26:05 +0200 Subject: [PATCH 05/13] fix(elasticsearch): sanitize samples SQL and send JSON header for cursor paging --- superset/db_engine_specs/elasticsearch.py | 17 +++- superset/views/datasource/utils.py | 26 +++--- .../db_engine_specs/test_elasticsearch.py | 75 ++++++++++++++++ .../unit_tests/views/datasource/test_utils.py | 87 +++++++++++++++---- 4 files changed, 170 insertions(+), 35 deletions(-) diff --git a/superset/db_engine_specs/elasticsearch.py b/superset/db_engine_specs/elasticsearch.py index d4674e232f5c..258150da109b 100644 --- a/superset/db_engine_specs/elasticsearch.py +++ b/superset/db_engine_specs/elasticsearch.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. import logging +import re from datetime import datetime from typing import Any, Optional @@ -53,12 +54,24 @@ def _fetch_page_via_cursor( If the dataset is exhausted before reaching ``page_index``, returns an empty rows list with the column names from the initial request. """ + # The Elasticsearch SQL API rejects trailing semicolons, and any LIMIT + # in the submitted statement caps the result set before the cursor can + # page through it. ``fetch_size`` drives pagination instead. + sanitized_sql = sql.strip().rstrip(";").strip() + sanitized_sql = re.sub( + r"\s+LIMIT\s+\d+\s*$", "", sanitized_sql, flags=re.IGNORECASE + ) + + # The raw transport does not auto-set Content-Type the way the Python + # DB-API driver does; ES rejects POSTs without a JSON content type. + json_headers = {"Content-Type": "application/json"} with database.get_raw_connection() as conn: transport = conn.es.transport response = transport.perform_request( "POST", sql_path, - body={"query": sql, "fetch_size": page_size}, + headers=json_headers, + body={"query": sanitized_sql, "fetch_size": page_size}, ) columns = [col["name"] for col in response.get("columns", [])] rows = response.get("rows", []) @@ -74,6 +87,7 @@ def _fetch_page_via_cursor( response = transport.perform_request( "POST", sql_path, + headers=json_headers, body={"cursor": cursor}, ) rows = response.get("rows", []) @@ -88,6 +102,7 @@ def _fetch_page_via_cursor( transport.perform_request( "POST", close_path, + headers=json_headers, body={"cursor": cursor}, ) except Exception: # pylint: disable=broad-except diff --git a/superset/views/datasource/utils.py b/superset/views/datasource/utils.py index df583b24b32d..26f33f1aa09a 100644 --- a/superset/views/datasource/utils.py +++ b/superset/views/datasource/utils.py @@ -216,15 +216,17 @@ def _fetch_samples_via_cursor( Fetch a single page of samples via engine-spec cursor pagination. Used when ``datasource.database.db_engine_spec.allows_offset_fetch`` is - False and a non-first page is requested. Extracts the compiled SQL from - the already-built QueryContext, delegates cursor iteration to the engine - spec, and assembles a response dict compatible with the normal samples - path. + False and a non-first page is requested. Reuses the SQL that Superset + already compiled for the normal samples payload, delegates cursor + iteration to the engine spec, and assembles a response dict compatible + with the normal samples path. The samples payload is also executed (its SQL is OFFSET-stripped by the models/helpers.py guard) to obtain the authoritative ``colnames`` and ``coltypes`` that the frontend grid needs for type-based cell renderers - — ensuring page 2+ renders identically to page 1. + — ensuring page 2+ renders identically to page 1. The engine spec is + responsible for stripping any trailing ``LIMIT`` from the SQL so the + cursor is not capped to a single page. """ # Run the normal samples payload once to source authoritative colnames # and coltypes for the paginated result set. The helpers.py OFFSET guard @@ -234,16 +236,10 @@ def _fetch_samples_via_cursor( QueryCacheManager.delete(count_star_data.get("cache_key"), CacheRegion.DATA) raise DatasetSamplesFailedError(sample_payload.get("error") or "") - query_obj = samples_instance.queries[0] - query_obj_dict = query_obj.to_dict() - # The engine spec's allows_offset_fetch guard in models/helpers.py keeps - # OFFSET out of the SQL already; we ask for a LIMIT large enough for the - # cursor to iterate across pages. - sample_row_limit = app.config.get("SAMPLES_ROW_LIMIT", 1000) - query_obj_dict["row_limit"] = sample_row_limit - query_obj_dict["row_offset"] = 0 - - sql = datasource.get_query_str(query_obj_dict) + sql = sample_payload.get("query") + if not sql: + QueryCacheManager.delete(count_star_data.get("cache_key"), CacheRegion.DATA) + raise DatasetSamplesFailedError("Empty samples query") engine_spec = datasource.database.db_engine_spec rows, cursor_colnames, _ = engine_spec.fetch_data_with_cursor( diff --git a/tests/unit_tests/db_engine_specs/test_elasticsearch.py b/tests/unit_tests/db_engine_specs/test_elasticsearch.py index b4ab47404362..a5e1156f46b5 100644 --- a/tests/unit_tests/db_engine_specs/test_elasticsearch.py +++ b/tests/unit_tests/db_engine_specs/test_elasticsearch.py @@ -318,6 +318,81 @@ def perform_request(method, path, body=None, **_kwargs): ) +@pytest.mark.parametrize( + "sql_in,expected_query", + [ + # Superset's SQL builder terminates statements with `;` for DB-API + # execution; the ES SQL API rejects that. + ("SELECT a FROM idx;", "SELECT a FROM idx"), + # A trailing LIMIT from the normal samples pipeline would cap the + # cursor to a single fetch; it must be stripped so `fetch_size` + # drives pagination instead. + ("SELECT a FROM idx LIMIT 50", "SELECT a FROM idx"), + ("SELECT a FROM idx LIMIT 50;", "SELECT a FROM idx"), + ("SELECT a FROM idx LIMIT 50 ; ", "SELECT a FROM idx"), + # Case-insensitive LIMIT recognition, since the builder's output is + # not guaranteed to be uppercase across backends. + ("SELECT a FROM idx limit 100", "SELECT a FROM idx"), + # LIMIT that is *not* the final clause (e.g. inside a subquery) must + # not be mangled. + ( + "SELECT * FROM (SELECT a FROM idx LIMIT 10) sub", + "SELECT * FROM (SELECT a FROM idx LIMIT 10) sub", + ), + ], +) +def test_fetch_data_with_cursor_sanitizes_sql(sql_in: str, expected_query: str) -> None: + """ + The Elasticsearch SQL API has two ergonomic landmines for Superset SQL: + a trailing ``;`` is rejected, and a trailing ``LIMIT N`` caps the cursor + to a single fetch. ``_fetch_page_via_cursor`` must strip both before + submitting the query. + """ + from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec + + database = _build_fake_database([{"columns": [{"name": "a"}], "rows": []}]) + + ElasticSearchEngineSpec.fetch_data_with_cursor( + database=database, + sql=sql_in, + page_index=0, + page_size=25, + ) + + first_call = database._transport.perform_request.call_args_list[0] + assert first_call.kwargs["body"]["query"] == expected_query + # fetch_size is independent of the SQL rewrite — it's what actually + # controls the page window. + assert first_call.kwargs["body"]["fetch_size"] == 25 + + +def test_fetch_data_with_cursor_sets_json_content_type_header() -> None: + """ + The raw ES transport does not auto-set Content-Type the way the DB-API + driver does; without it the cluster responds with HTTP 406. Every + perform_request issued by the cursor helper must carry the JSON header. + """ + from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec + + database = _build_fake_database( + [ + {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"}, + {"rows": [[1]], "cursor": "C2"}, + {}, # close + ] + ) + + ElasticSearchEngineSpec.fetch_data_with_cursor( + database=database, + sql="SELECT a FROM idx", + page_index=1, + page_size=1, + ) + + for call in database._transport.perform_request.call_args_list: + assert call.kwargs.get("headers") == {"Content-Type": "application/json"} + + def test_opendistro_fetch_data_with_cursor_uses_opendistro_endpoints() -> None: """ OpenDistro's SQL plugin historically lives at /_opendistro/_sql rather diff --git a/tests/unit_tests/views/datasource/test_utils.py b/tests/unit_tests/views/datasource/test_utils.py index c484626df6af..8b7bfffa6d45 100644 --- a/tests/unit_tests/views/datasource/test_utils.py +++ b/tests/unit_tests/views/datasource/test_utils.py @@ -87,14 +87,12 @@ def test_get_samples_uses_cursor_path_when_engine_disallows_offset( ): """ When the engine reports allows_offset_fetch=False and the requested - page is > 1, get_samples delegates to fetch_data_with_cursor. + page is > 1, get_samples delegates to fetch_data_with_cursor with the + SQL that Superset already compiled for the normal samples payload. """ - from flask import current_app - from superset.views.datasource import utils datasource = fake_datasource_factory(allows_offset_fetch=False) - datasource.get_query_str.return_value = "SELECT a FROM idx LIMIT 1000" datasource.database.db_engine_spec.fetch_data_with_cursor.return_value = ( [[99]], ["a"], @@ -102,7 +100,6 @@ def test_get_samples_uses_cursor_path_when_engine_disallows_offset( ) samples_ctx = MagicMock() - samples_ctx.queries = [MagicMock(to_dict=MagicMock(return_value={}))] samples_ctx.get_payload.return_value = { "queries": [ { @@ -110,6 +107,7 @@ def test_get_samples_uses_cursor_path_when_engine_disallows_offset( "colnames": ["a"], "coltypes": [2], "status": "success", + "query": "SELECT a FROM idx LIMIT 50", } ] } @@ -137,16 +135,9 @@ def test_get_samples_uses_cursor_path_when_engine_disallows_offset( kwargs = datasource.database.db_engine_spec.fetch_data_with_cursor.call_args.kwargs assert kwargs["page_index"] == 2 assert kwargs["page_size"] == 50 - - # Lock in the SQL-rewrite contract: cursor path must ask datasource for - # SQL with row_offset=0 and row_limit=SAMPLES_ROW_LIMIT, so the compiled - # SQL contains the full page-scan window (no OFFSET). - expected_row_limit = current_app.config.get("SAMPLES_ROW_LIMIT", 1000) - get_query_str_args = datasource.get_query_str.call_args.args - assert len(get_query_str_args) == 1 - query_obj_dict = get_query_str_args[0] - assert query_obj_dict["row_offset"] == 0 - assert query_obj_dict["row_limit"] == expected_row_limit + # The cursor path reuses the already-compiled samples SQL verbatim; the + # engine spec is responsible for any sanitation (strip ``;``/``LIMIT``). + assert kwargs["sql"] == "SELECT a FROM idx LIMIT 50" assert result["data"] == [{"a": 99}] assert result["colnames"] == ["a"] @@ -166,7 +157,6 @@ def test_get_samples_cursor_path_propagates_coltypes_from_samples_payload( from superset.views.datasource import utils datasource = fake_datasource_factory(allows_offset_fetch=False) - datasource.get_query_str.return_value = "SELECT a, b FROM idx LIMIT 1000" datasource.database.db_engine_spec.fetch_data_with_cursor.return_value = ( [["x", 1]], ["a", "b"], @@ -174,7 +164,6 @@ def test_get_samples_cursor_path_propagates_coltypes_from_samples_payload( ) samples_ctx = MagicMock() - samples_ctx.queries = [MagicMock(to_dict=MagicMock(return_value={}))] samples_ctx.get_payload.return_value = { "queries": [ { @@ -182,6 +171,7 @@ def test_get_samples_cursor_path_propagates_coltypes_from_samples_payload( "colnames": ["a", "b"], "coltypes": [2, 1], "status": "success", + "query": "SELECT a, b FROM idx LIMIT 50", } ] } @@ -223,13 +213,11 @@ def test_get_samples_cursor_path_cleans_count_cache_on_failure( from superset.views.datasource import utils datasource = fake_datasource_factory(allows_offset_fetch=False) - datasource.get_query_str.return_value = "SELECT a FROM idx LIMIT 1000" datasource.database.db_engine_spec.fetch_data_with_cursor.side_effect = ( RuntimeError("boom") ) samples_ctx = MagicMock() - samples_ctx.queries = [MagicMock(to_dict=MagicMock(return_value={}))] samples_ctx.get_payload.return_value = { "queries": [ { @@ -237,6 +225,7 @@ def test_get_samples_cursor_path_cleans_count_cache_on_failure( "colnames": ["a"], "coltypes": [2], "status": "success", + "query": "SELECT a FROM idx LIMIT 50", } ] } @@ -271,6 +260,66 @@ def test_get_samples_cursor_path_cleans_count_cache_on_failure( cache_mgr.delete.assert_called_once_with("count-cache-key", CacheRegion.DATA) +def test_get_samples_cursor_path_raises_when_sample_payload_has_no_sql( + fake_datasource_factory, +): + """ + If the samples payload is ``success`` but carries no compiled ``query`` + string, the cursor path has nothing to submit. Fail fast with a + descriptive error and evict the count cache, instead of handing an empty + statement to the engine driver. + """ + from superset.commands.dataset.exceptions import DatasetSamplesFailedError + from superset.constants import CacheRegion + from superset.views.datasource import utils + + datasource = fake_datasource_factory(allows_offset_fetch=False) + + samples_ctx = MagicMock() + samples_ctx.get_payload.return_value = { + "queries": [ + { + "data": [], + "colnames": ["a"], + "coltypes": [2], + "status": "success", + # No ``query`` key — simulates a backend that reports success + # without emitting SQL (e.g. fully cached empty result). + } + ] + } + count_ctx = MagicMock() + count_ctx.get_payload.return_value = { + "queries": [ + { + "data": [{"COUNT(*)": 200}], + "status": "success", + "cache_key": "count-cache-key", + } + ] + } + + with ( + patch.object( + utils, "DatasourceDAO", MagicMock(get_datasource=lambda **kw: datasource) + ), + patch.object(utils, "QueryContextFactory") as qcf, + patch.object(utils, "QueryCacheManager") as cache_mgr, + ): + qcf.return_value.create.side_effect = [samples_ctx, count_ctx] + + with pytest.raises(DatasetSamplesFailedError): + utils.get_samples( + datasource_type="table", + datasource_id=1, + page=2, + per_page=50, + ) + + cache_mgr.delete.assert_called_once_with("count-cache-key", CacheRegion.DATA) + datasource.database.db_engine_spec.fetch_data_with_cursor.assert_not_called() + + def test_get_samples_cursor_path_unused_for_page_one(fake_datasource_factory): """ Page 1 (row_offset = 0) does not need cursor iteration — the normal From 42fe5a09fe84b4a47f88265c58c22b5ae1b65fd4 Mon Sep 17 00:00:00 2001 From: Taras Pashkevych Date: Mon, 20 Apr 2026 11:44:18 +0200 Subject: [PATCH 06/13] docs(elasticsearch): note O(N) cost of cursor pagination for deep pages --- UPDATING.md | 2 ++ superset/db_engine_specs/elasticsearch.py | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/UPDATING.md b/UPDATING.md index a1ba578ca54b..9739e73af11c 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -28,6 +28,8 @@ assists people when migrating to a new version. A new `BaseEngineSpec.allows_offset_fetch` attribute (default `True`) indicates whether a database engine supports the SQL `OFFSET` clause. Engines that do not support `OFFSET` — such as Elasticsearch SQL and OpenDistro — opt out by setting it to `False`, and Superset uses each engine's cursor API to paginate drill-to-detail samples instead of emitting `OFFSET`. Downstream forks maintaining custom engine specs may set the flag to `False` (and implement `fetch_data_with_cursor`) to avoid crashes when paginated drill-to-detail queries are run against engines without `OFFSET` support. +**Note on deep-pagination cost:** Cursor-based engines (including Elasticsearch and OpenDistro) are forward-only, so reaching page `N` of a drill-to-detail view issues `N` round trips to the cluster. Deep pagination is therefore linear in page number; users paginating into the hundreds or thousands will notice added latency compared to `OFFSET`-capable engines. + ### Granular Export Controls A new feature flag `GRANULAR_EXPORT_CONTROLS` introduces three fine-grained permissions that replace the legacy `can_csv` permission: diff --git a/superset/db_engine_specs/elasticsearch.py b/superset/db_engine_specs/elasticsearch.py index 258150da109b..5092ab588fdb 100644 --- a/superset/db_engine_specs/elasticsearch.py +++ b/superset/db_engine_specs/elasticsearch.py @@ -53,6 +53,12 @@ def _fetch_page_via_cursor( If the dataset is exhausted before reaching ``page_index``, returns an empty rows list with the column names from the initial request. + + Note: the Elasticsearch SQL cursor is forward-only, so cost is linear in + ``page_index`` — reaching page N issues N round trips to the cluster. + Deep pagination (hundreds of pages) will therefore be noticeably slower + than on ``OFFSET``-capable engines. This is a protocol limitation, not + an implementation choice. """ # The Elasticsearch SQL API rejects trailing semicolons, and any LIMIT # in the submitted statement caps the result set before the cursor can From 0e41faa7fb06ed188271821646b9cb99bfd98550 Mon Sep 17 00:00:00 2001 From: Taras Pashkevych Date: Mon, 20 Apr 2026 11:54:27 +0200 Subject: [PATCH 07/13] docs(drill-detail): note extra samples query on cursor pagination path --- superset/views/datasource/utils.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/superset/views/datasource/utils.py b/superset/views/datasource/utils.py index 26f33f1aa09a..5605ba659ac7 100644 --- a/superset/views/datasource/utils.py +++ b/superset/views/datasource/utils.py @@ -227,10 +227,20 @@ def _fetch_samples_via_cursor( — ensuring page 2+ renders identically to page 1. The engine spec is responsible for stripping any trailing ``LIMIT`` from the SQL so the cursor is not capped to a single page. + + Cost: this path issues one extra "page-1-shaped" samples query on every + request for page ≥ 2, on top of the ``page_index + 1`` cursor round + trips. The extra query is what provides authoritative ``coltypes`` + (derived from the DB-API cursor description) — the ES cursor response + only carries ES SQL type names, which would need a separate translator + to Superset's coltype enum. TODO: extract SQL via + ``datasource.get_query_str(query_obj.to_dict())`` and derive coltypes + from cursor metadata to eliminate the extra execution. """ - # Run the normal samples payload once to source authoritative colnames - # and coltypes for the paginated result set. The helpers.py OFFSET guard - # keeps this to a single cheap page-1 query for the cursor-path engine. + # Execute the normal samples payload to source authoritative colnames + # and coltypes. See the cost note in the docstring — this is deliberate. + # The helpers.py OFFSET guard keeps it to a single page-1-shaped query, + # not a full-table scan, for engines on the cursor path. sample_payload = samples_instance.get_payload()["queries"][0] if sample_payload.get("status") == QueryStatus.FAILED: QueryCacheManager.delete(count_star_data.get("cache_key"), CacheRegion.DATA) From df67a695b38629f2f4e2ea49939cec181e888976 Mon Sep 17 00:00:00 2001 From: Taras Pashkevych Date: Mon, 20 Apr 2026 12:03:43 +0200 Subject: [PATCH 08/13] refactor(db-engine-specs): drop speculative third slot from cursor return type --- superset/db_engine_specs/elasticsearch.py | 13 ++++++------- superset/views/datasource/utils.py | 2 +- .../db_engine_specs/test_elasticsearch.py | 10 +++++----- tests/unit_tests/views/datasource/test_utils.py | 2 -- 4 files changed, 12 insertions(+), 15 deletions(-) diff --git a/superset/db_engine_specs/elasticsearch.py b/superset/db_engine_specs/elasticsearch.py index 5092ab588fdb..c7e210bfe01b 100644 --- a/superset/db_engine_specs/elasticsearch.py +++ b/superset/db_engine_specs/elasticsearch.py @@ -40,7 +40,7 @@ def _fetch_page_via_cursor( page_size: int, sql_path: str, close_path: str, -) -> tuple[list[list[Any]], list[str], None]: +) -> tuple[list[list[Any]], list[str]]: """ Iterate Elasticsearch/OpenSearch SQL cursor pagination to return a single page of results. @@ -48,8 +48,7 @@ def _fetch_page_via_cursor( Executes ``sql`` with ``fetch_size = page_size``, then sends cursor follow-up requests ``page_index`` times to skip earlier pages. Closes the cursor when done to release server-side state. Returns - ``(rows, columns, None)``; the third slot is reserved for a future - cursor-handle API. + ``(rows, columns)``. If the dataset is exhausted before reaching ``page_index``, returns an empty rows list with the column names from the initial request. @@ -89,7 +88,7 @@ def _fetch_page_via_cursor( # Dataset exhausted before reaching the target page — # no cursor to close (ES returns no cursor on the final # page). Return immediately with empty rows. - return [], columns, None + return [], columns response = transport.perform_request( "POST", sql_path, @@ -99,7 +98,7 @@ def _fetch_page_via_cursor( rows = response.get("rows", []) cursor = response.get("cursor") - return rows, columns, None + return rows, columns finally: if cursor: # Best-effort cleanup. If close itself fails we don't want @@ -234,7 +233,7 @@ def fetch_data_with_cursor( sql: str, page_index: int, page_size: int, - ) -> tuple[list[list[Any]], list[str], None]: + ) -> tuple[list[list[Any]], list[str]]: """ Fetch a single page of results using Elasticsearch cursor pagination. See ``_fetch_page_via_cursor`` for the protocol. @@ -327,7 +326,7 @@ def fetch_data_with_cursor( sql: str, page_index: int, page_size: int, - ) -> tuple[list[list[Any]], list[str], None]: + ) -> tuple[list[list[Any]], list[str]]: """ Fetch a single page of results using OpenDistro SQL cursor pagination. Same protocol as ElasticSearchEngineSpec, different endpoint paths. diff --git a/superset/views/datasource/utils.py b/superset/views/datasource/utils.py index 5605ba659ac7..8bafab6cdab1 100644 --- a/superset/views/datasource/utils.py +++ b/superset/views/datasource/utils.py @@ -252,7 +252,7 @@ def _fetch_samples_via_cursor( raise DatasetSamplesFailedError("Empty samples query") engine_spec = datasource.database.db_engine_spec - rows, cursor_colnames, _ = engine_spec.fetch_data_with_cursor( + rows, cursor_colnames = engine_spec.fetch_data_with_cursor( database=datasource.database, sql=sql, page_index=page_index, diff --git a/tests/unit_tests/db_engine_specs/test_elasticsearch.py b/tests/unit_tests/db_engine_specs/test_elasticsearch.py index a5e1156f46b5..1421ba8fd22a 100644 --- a/tests/unit_tests/db_engine_specs/test_elasticsearch.py +++ b/tests/unit_tests/db_engine_specs/test_elasticsearch.py @@ -157,7 +157,7 @@ def test_fetch_data_with_cursor_returns_first_page_when_page_index_zero() -> Non ] ) - rows, cols, _ = ElasticSearchEngineSpec.fetch_data_with_cursor( + rows, cols = ElasticSearchEngineSpec.fetch_data_with_cursor( database=database, sql="SELECT a, b FROM idx", page_index=0, @@ -192,7 +192,7 @@ def test_fetch_data_with_cursor_iterates_to_target_page() -> None: ] ) - rows, cols, _ = ElasticSearchEngineSpec.fetch_data_with_cursor( + rows, cols = ElasticSearchEngineSpec.fetch_data_with_cursor( database=database, sql="SELECT a FROM idx", page_index=2, @@ -225,7 +225,7 @@ def test_fetch_data_with_cursor_returns_empty_when_dataset_exhausted() -> None: ] ) - rows, cols, _ = ElasticSearchEngineSpec.fetch_data_with_cursor( + rows, cols = ElasticSearchEngineSpec.fetch_data_with_cursor( database=database, sql="SELECT a FROM idx", page_index=5, @@ -250,7 +250,7 @@ def test_fetch_data_with_cursor_does_not_close_when_no_cursor_present() -> None: ] ) - rows, _, _ = ElasticSearchEngineSpec.fetch_data_with_cursor( + rows, _ = ElasticSearchEngineSpec.fetch_data_with_cursor( database=database, sql="SELECT a FROM idx", page_index=0, @@ -407,7 +407,7 @@ def test_opendistro_fetch_data_with_cursor_uses_opendistro_endpoints() -> None: ] ) - rows, cols, _ = OpenDistroEngineSpec.fetch_data_with_cursor( + rows, cols = OpenDistroEngineSpec.fetch_data_with_cursor( database=database, sql="SELECT a FROM idx", page_index=0, diff --git a/tests/unit_tests/views/datasource/test_utils.py b/tests/unit_tests/views/datasource/test_utils.py index 8b7bfffa6d45..0746f0a149bd 100644 --- a/tests/unit_tests/views/datasource/test_utils.py +++ b/tests/unit_tests/views/datasource/test_utils.py @@ -96,7 +96,6 @@ def test_get_samples_uses_cursor_path_when_engine_disallows_offset( datasource.database.db_engine_spec.fetch_data_with_cursor.return_value = ( [[99]], ["a"], - None, ) samples_ctx = MagicMock() @@ -160,7 +159,6 @@ def test_get_samples_cursor_path_propagates_coltypes_from_samples_payload( datasource.database.db_engine_spec.fetch_data_with_cursor.return_value = ( [["x", 1]], ["a", "b"], - None, ) samples_ctx = MagicMock() From 350410eb7cefa9dfdbfef503eae266e83ecc0527 Mon Sep 17 00:00:00 2001 From: Taras Pashkevych Date: Mon, 20 Apr 2026 12:10:11 +0200 Subject: [PATCH 09/13] refactor(db-engine-specs): type cursor pagination params --- superset/db_engine_specs/base.py | 20 ++++++++++++++++++++ superset/db_engine_specs/elasticsearch.py | 13 +++++++++---- superset/views/datasource/utils.py | 12 +++++++++--- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/superset/db_engine_specs/base.py b/superset/db_engine_specs/base.py index 347790b3e226..df975e789b62 100644 --- a/superset/db_engine_specs/base.py +++ b/superset/db_engine_specs/base.py @@ -1239,6 +1239,26 @@ def fetch_data(cls, cursor: Any, limit: int | None = None) -> list[tuple[Any, .. except Exception as ex: raise cls.get_dbapi_mapped_exception(ex) from ex + @classmethod + def fetch_data_with_cursor( + cls, + database: Database, + sql: str, + page_index: int, + page_size: int, + ) -> tuple[list[list[Any]], list[str]]: + """ + Fetch a single page of results via engine-native cursor pagination. + + Only called when ``cls.allows_offset_fetch`` is False and a non-first + page is requested (see ``superset/views/datasource/utils.py``). + Engines that set ``allows_offset_fetch = False`` must override this. + """ + raise NotImplementedError( + f"{cls.__name__} sets allows_offset_fetch=False but does not " + "implement fetch_data_with_cursor()" + ) + @classmethod def expand_data( cls, columns: list[ResultSetColumnType], data: list[dict[Any, Any]] diff --git a/superset/db_engine_specs/elasticsearch.py b/superset/db_engine_specs/elasticsearch.py index c7e210bfe01b..9bc04a41aa8f 100644 --- a/superset/db_engine_specs/elasticsearch.py +++ b/superset/db_engine_specs/elasticsearch.py @@ -14,10 +14,12 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from __future__ import annotations + import logging import re from datetime import datetime -from typing import Any, Optional +from typing import Any, Optional, TYPE_CHECKING from packaging.version import Version from sqlalchemy import types @@ -30,11 +32,14 @@ SupersetDBAPIProgrammingError, ) +if TYPE_CHECKING: + from superset.models.core import Database + logger = logging.getLogger() def _fetch_page_via_cursor( - database: Any, + database: Database, sql: str, page_index: int, page_size: int, @@ -229,7 +234,7 @@ class ElasticSearchEngineSpec(BaseEngineSpec): # pylint: disable=abstract-metho @classmethod def fetch_data_with_cursor( cls, - database: Any, + database: Database, sql: str, page_index: int, page_size: int, @@ -322,7 +327,7 @@ class OpenDistroEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method @classmethod def fetch_data_with_cursor( cls, - database: Any, + database: Database, sql: str, page_index: int, page_size: int, diff --git a/superset/views/datasource/utils.py b/superset/views/datasource/utils.py index 8bafab6cdab1..bd99271658c8 100644 --- a/superset/views/datasource/utils.py +++ b/superset/views/datasource/utils.py @@ -14,8 +14,10 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from __future__ import annotations + import logging -from typing import Any, Iterable, Optional +from typing import Any, Iterable, Optional, TYPE_CHECKING from flask import current_app as app @@ -28,6 +30,10 @@ from superset.utils.core import QueryStatus from superset.views.datasource.schemas import SamplesPayloadSchema +if TYPE_CHECKING: + from superset.common.query_context import QueryContext + from superset.daos.datasource import Datasource + logger = logging.getLogger(__name__) @@ -206,8 +212,8 @@ def get_samples( # pylint: disable=too-many-arguments def _fetch_samples_via_cursor( - datasource: Any, - samples_instance: Any, + datasource: Datasource, + samples_instance: QueryContext, count_star_data: dict[str, Any], page_index: int, page_size: int, From e88af892fd0a8ab7605a08915dc151365cb5ec32 Mon Sep 17 00:00:00 2001 From: Taras Pashkevych Date: Mon, 20 Apr 2026 12:12:28 +0200 Subject: [PATCH 10/13] fix(drill-detail): sanitize cursor pagination error message --- superset/views/datasource/utils.py | 5 ++++- tests/unit_tests/views/datasource/test_utils.py | 8 ++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/superset/views/datasource/utils.py b/superset/views/datasource/utils.py index bd99271658c8..f92e0719058d 100644 --- a/superset/views/datasource/utils.py +++ b/superset/views/datasource/utils.py @@ -194,7 +194,10 @@ def get_samples( # pylint: disable=too-many-arguments QueryCacheManager.delete( count_star_data.get("cache_key"), CacheRegion.DATA ) - raise DatasetSamplesFailedError(str(exc)) from exc + logger.exception("Cursor-based samples pagination failed") + raise DatasetSamplesFailedError( + "Failed to fetch samples via cursor pagination" + ) from exc else: sample_data = samples_instance.get_payload()["queries"][0] if sample_data.get("status") == QueryStatus.FAILED: diff --git a/tests/unit_tests/views/datasource/test_utils.py b/tests/unit_tests/views/datasource/test_utils.py index 0746f0a149bd..5f38ee88f6af 100644 --- a/tests/unit_tests/views/datasource/test_utils.py +++ b/tests/unit_tests/views/datasource/test_utils.py @@ -212,7 +212,7 @@ def test_get_samples_cursor_path_cleans_count_cache_on_failure( datasource = fake_datasource_factory(allows_offset_fetch=False) datasource.database.db_engine_spec.fetch_data_with_cursor.side_effect = ( - RuntimeError("boom") + RuntimeError("boom: internal es stack trace details") ) samples_ctx = MagicMock() @@ -247,7 +247,7 @@ def test_get_samples_cursor_path_cleans_count_cache_on_failure( ): qcf.return_value.create.side_effect = [samples_ctx, count_ctx] - with pytest.raises(DatasetSamplesFailedError): + with pytest.raises(DatasetSamplesFailedError) as excinfo: utils.get_samples( datasource_type="table", datasource_id=1, @@ -256,6 +256,10 @@ def test_get_samples_cursor_path_cleans_count_cache_on_failure( ) cache_mgr.delete.assert_called_once_with("count-cache-key", CacheRegion.DATA) + # Backend-internal error text must not leak into the user-facing message; + # only the original exception (chained via ``from exc``) retains the detail. + assert "internal es stack trace details" not in str(excinfo.value) + assert isinstance(excinfo.value.__cause__, RuntimeError) def test_get_samples_cursor_path_raises_when_sample_payload_has_no_sql( From 0410908ab3b4e7055c77fa2385480962b31b3a9f Mon Sep 17 00:00:00 2001 From: Taras Pashkevych Date: Mon, 20 Apr 2026 12:13:54 +0200 Subject: [PATCH 11/13] docs(elasticsearch): pin trailing-LIMIT assumption in cursor sanitizer --- superset/db_engine_specs/elasticsearch.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/superset/db_engine_specs/elasticsearch.py b/superset/db_engine_specs/elasticsearch.py index 9bc04a41aa8f..48a6f6bcc85f 100644 --- a/superset/db_engine_specs/elasticsearch.py +++ b/superset/db_engine_specs/elasticsearch.py @@ -67,6 +67,9 @@ def _fetch_page_via_cursor( # The Elasticsearch SQL API rejects trailing semicolons, and any LIMIT # in the submitted statement caps the result set before the cursor can # page through it. ``fetch_size`` drives pagination instead. + # Assumption: Superset only appends a trailing ``LIMIT N`` for engines + # with ``allows_offset_fetch=False``. If that ever changes (e.g. + # ``FETCH FIRST N ROWS`` or ``TOP N``), extend this sanitizer to match. sanitized_sql = sql.strip().rstrip(";").strip() sanitized_sql = re.sub( r"\s+LIMIT\s+\d+\s*$", "", sanitized_sql, flags=re.IGNORECASE From 18967d7c5296c9d5bdeaa575aaee7db23d864274 Mon Sep 17 00:00:00 2001 From: Taras Pashkevych Date: Mon, 20 Apr 2026 13:11:45 +0200 Subject: [PATCH 12/13] =?UTF-8?q?refactor(db-engine-specs):=20rename=20all?= =?UTF-8?q?ows=5Foffset=5Ffetch=20=E2=86=92=20supports=5Foffset?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- UPDATING.md | 4 ++-- superset/databases/schemas.py | 2 +- superset/db_engine_specs/base.py | 10 +++++----- superset/db_engine_specs/elasticsearch.py | 6 +++--- superset/models/helpers.py | 2 +- superset/views/datasource/utils.py | 4 ++-- tests/unit_tests/databases/test_schemas.py | 8 ++++---- tests/unit_tests/db_engine_specs/test_base.py | 12 +++++------ .../db_engine_specs/test_elasticsearch.py | 4 ++-- .../unit_tests/models/test_helpers_offset.py | 18 ++++++++--------- .../unit_tests/views/datasource/test_utils.py | 20 +++++++++---------- 11 files changed, 45 insertions(+), 45 deletions(-) diff --git a/UPDATING.md b/UPDATING.md index 9739e73af11c..f766780773fa 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -24,9 +24,9 @@ assists people when migrating to a new version. ## Next -### Engine spec capability flag: `allows_offset_fetch` +### Engine spec capability flag: `supports_offset` -A new `BaseEngineSpec.allows_offset_fetch` attribute (default `True`) indicates whether a database engine supports the SQL `OFFSET` clause. Engines that do not support `OFFSET` — such as Elasticsearch SQL and OpenDistro — opt out by setting it to `False`, and Superset uses each engine's cursor API to paginate drill-to-detail samples instead of emitting `OFFSET`. Downstream forks maintaining custom engine specs may set the flag to `False` (and implement `fetch_data_with_cursor`) to avoid crashes when paginated drill-to-detail queries are run against engines without `OFFSET` support. +A new `BaseEngineSpec.supports_offset` attribute (default `True`) indicates whether a database engine supports the SQL `OFFSET` clause. Engines that do not support `OFFSET` — such as Elasticsearch SQL and OpenDistro — opt out by setting it to `False`, and Superset uses each engine's cursor API to paginate drill-to-detail samples instead of emitting `OFFSET`. Downstream forks maintaining custom engine specs may set the flag to `False` (and implement `fetch_data_with_cursor`) to avoid crashes when paginated drill-to-detail queries are run against engines without `OFFSET` support. **Note on deep-pagination cost:** Cursor-based engines (including Elasticsearch and OpenDistro) are forward-only, so reaching page `N` of a drill-to-detail view issues `N` round trips to the cluster. Deep pagination is therefore linear in page number; users paginating into the hundreds or thousands will notice added latency compared to `OFFSET`-capable engines. diff --git a/superset/databases/schemas.py b/superset/databases/schemas.py index 111cbeaf8eee..07fc2f475d20 100644 --- a/superset/databases/schemas.py +++ b/superset/databases/schemas.py @@ -1067,7 +1067,7 @@ class EngineInformationSchema(Schema): supports_oauth2 = fields.Boolean( metadata={"description": "The database supports OAuth2"} ) - allows_offset_fetch = fields.Boolean( + supports_offset = fields.Boolean( metadata={ "description": ( "The database supports OFFSET in SQL queries. " diff --git a/superset/db_engine_specs/base.py b/superset/db_engine_specs/base.py index df975e789b62..29d44fc233ad 100644 --- a/superset/db_engine_specs/base.py +++ b/superset/db_engine_specs/base.py @@ -498,7 +498,7 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods # engines like Elasticsearch SQL that do not support OFFSET set this to # False and are expected to implement `fetch_data_with_cursor` for # pagination via another mechanism (e.g. Elasticsearch's cursor API). - allows_offset_fetch = True + supports_offset = True # Whether ORDER BY clause can use aliases created in SELECT # that are the same as a source column @@ -1250,12 +1250,12 @@ def fetch_data_with_cursor( """ Fetch a single page of results via engine-native cursor pagination. - Only called when ``cls.allows_offset_fetch`` is False and a non-first + Only called when ``cls.supports_offset`` is False and a non-first page is requested (see ``superset/views/datasource/utils.py``). - Engines that set ``allows_offset_fetch = False`` must override this. + Engines that set ``supports_offset = False`` must override this. """ raise NotImplementedError( - f"{cls.__name__} sets allows_offset_fetch=False but does not " + f"{cls.__name__} sets supports_offset=False but does not " "implement fetch_data_with_cursor()" ) @@ -2541,7 +2541,7 @@ def get_public_information(cls) -> dict[str, Any]: "disable_ssh_tunneling": cls.disable_ssh_tunneling, "supports_dynamic_catalog": cls.supports_dynamic_catalog, "supports_oauth2": cls.supports_oauth2, - "allows_offset_fetch": cls.allows_offset_fetch, + "supports_offset": cls.supports_offset, } @classmethod diff --git a/superset/db_engine_specs/elasticsearch.py b/superset/db_engine_specs/elasticsearch.py index 48a6f6bcc85f..df0bc49e1e74 100644 --- a/superset/db_engine_specs/elasticsearch.py +++ b/superset/db_engine_specs/elasticsearch.py @@ -68,7 +68,7 @@ def _fetch_page_via_cursor( # in the submitted statement caps the result set before the cursor can # page through it. ``fetch_size`` drives pagination instead. # Assumption: Superset only appends a trailing ``LIMIT N`` for engines - # with ``allows_offset_fetch=False``. If that ever changes (e.g. + # with ``supports_offset=False``. If that ever changes (e.g. # ``FETCH FIRST N ROWS`` or ``TOP N``), extend this sanitizer to match. sanitized_sql = sql.strip().rstrip(";").strip() sanitized_sql = re.sub( @@ -133,7 +133,7 @@ class ElasticSearchEngineSpec(BaseEngineSpec): # pylint: disable=abstract-metho allows_joins = False allows_subqueries = True allows_sql_comments = False - allows_offset_fetch = False + supports_offset = False metadata = { "description": ( @@ -309,7 +309,7 @@ class OpenDistroEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method allows_joins = False allows_subqueries = True allows_sql_comments = False - allows_offset_fetch = False + supports_offset = False _time_grain_expressions = { None: "{col}", diff --git a/superset/models/helpers.py b/superset/models/helpers.py index d9944c109f7e..b94f196a0f28 100644 --- a/superset/models/helpers.py +++ b/superset/models/helpers.py @@ -3285,7 +3285,7 @@ def get_sqla_query( # pylint: disable=too-many-arguments,too-many-locals,too-ma if row_limit: qry = qry.limit(row_limit) - if row_offset and self.database.db_engine_spec.allows_offset_fetch: + if row_offset and self.database.db_engine_spec.supports_offset: qry = qry.offset(row_offset) if series_limit and groupby_series_columns: diff --git a/superset/views/datasource/utils.py b/superset/views/datasource/utils.py index f92e0719058d..8fc3755e22c4 100644 --- a/superset/views/datasource/utils.py +++ b/superset/views/datasource/utils.py @@ -179,7 +179,7 @@ def get_samples( # pylint: disable=too-many-arguments row_offset = limit_clause["row_offset"] row_limit = limit_clause["row_limit"] - if not engine_spec.allows_offset_fetch and row_offset > 0: + if not engine_spec.supports_offset and row_offset > 0: try: sample_data = _fetch_samples_via_cursor( datasource=datasource, @@ -224,7 +224,7 @@ def _fetch_samples_via_cursor( """ Fetch a single page of samples via engine-spec cursor pagination. - Used when ``datasource.database.db_engine_spec.allows_offset_fetch`` is + Used when ``datasource.database.db_engine_spec.supports_offset`` is False and a non-first page is requested. Reuses the SQL that Superset already compiled for the normal samples payload, delegates cursor iteration to the engine spec, and assembles a response dict compatible diff --git a/tests/unit_tests/databases/test_schemas.py b/tests/unit_tests/databases/test_schemas.py index 38d203151054..da29a11c1444 100644 --- a/tests/unit_tests/databases/test_schemas.py +++ b/tests/unit_tests/databases/test_schemas.py @@ -16,10 +16,10 @@ # under the License. -def test_engine_information_schema_includes_allows_offset_fetch() -> None: +def test_engine_information_schema_includes_supports_offset() -> None: """ The frontend consumes EngineInformationSchema to know what the engine - can do. allows_offset_fetch is a new field that must pass through. + can do. supports_offset is a new field that must pass through. """ from superset.databases.schemas import EngineInformationSchema @@ -30,8 +30,8 @@ def test_engine_information_schema_includes_allows_offset_fetch() -> None: "disable_ssh_tunneling": False, "supports_dynamic_catalog": False, "supports_oauth2": False, - "allows_offset_fetch": False, + "supports_offset": False, } ) - assert result["allows_offset_fetch"] is False + assert result["supports_offset"] is False diff --git a/tests/unit_tests/db_engine_specs/test_base.py b/tests/unit_tests/db_engine_specs/test_base.py index 08f4353e97b1..bf079b50a8a7 100644 --- a/tests/unit_tests/db_engine_specs/test_base.py +++ b/tests/unit_tests/db_engine_specs/test_base.py @@ -1225,24 +1225,24 @@ def test_start_oauth2_dance_falls_back_to_url_for(mocker: MockerFixture) -> None assert error.extra["redirect_uri"] == fallback_uri -def test_base_spec_allows_offset_fetch_default_true() -> None: +def test_base_spec_supports_offset_default_true() -> None: """ New engines opt-in to OFFSET support by default. Engines that do not support OFFSET (like Elasticsearch SQL) opt out explicitly. """ from superset.db_engine_specs.base import BaseEngineSpec - assert BaseEngineSpec.allows_offset_fetch is True + assert BaseEngineSpec.supports_offset is True -def test_base_spec_public_information_includes_allows_offset_fetch() -> None: +def test_base_spec_public_information_includes_supports_offset() -> None: """ - The allows_offset_fetch capability is exposed via get_public_information + The supports_offset capability is exposed via get_public_information so the frontend can reason about it (e.g. future UI disablement). """ from superset.db_engine_specs.base import BaseEngineSpec info = BaseEngineSpec.get_public_information() - assert "allows_offset_fetch" in info - assert info["allows_offset_fetch"] is True + assert "supports_offset" in info + assert info["supports_offset"] is True diff --git a/tests/unit_tests/db_engine_specs/test_elasticsearch.py b/tests/unit_tests/db_engine_specs/test_elasticsearch.py index 1421ba8fd22a..bf440fe55677 100644 --- a/tests/unit_tests/db_engine_specs/test_elasticsearch.py +++ b/tests/unit_tests/db_engine_specs/test_elasticsearch.py @@ -102,7 +102,7 @@ def test_elasticsearch_spec_opts_out_of_offset_fetch() -> None: """ from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec - assert ElasticSearchEngineSpec.allows_offset_fetch is False + assert ElasticSearchEngineSpec.supports_offset is False def test_opendistro_spec_opts_out_of_offset_fetch() -> None: @@ -111,7 +111,7 @@ def test_opendistro_spec_opts_out_of_offset_fetch() -> None: """ from superset.db_engine_specs.elasticsearch import OpenDistroEngineSpec - assert OpenDistroEngineSpec.allows_offset_fetch is False + assert OpenDistroEngineSpec.supports_offset is False def _build_fake_database(transport_responses: list[dict[str, Any]]) -> MagicMock: diff --git a/tests/unit_tests/models/test_helpers_offset.py b/tests/unit_tests/models/test_helpers_offset.py index c431584c3574..7e728defbb08 100644 --- a/tests/unit_tests/models/test_helpers_offset.py +++ b/tests/unit_tests/models/test_helpers_offset.py @@ -22,10 +22,10 @@ ) -def _uses_allows_offset_fetch(node: ast.AST) -> bool: - """True if any attribute access on `node` references 'allows_offset_fetch'.""" +def _uses_supports_offset(node: ast.AST) -> bool: + """True if any attribute access on `node` references 'supports_offset'.""" return any( - isinstance(child, ast.Attribute) and child.attr == "allows_offset_fetch" + isinstance(child, ast.Attribute) and child.attr == "supports_offset" for child in ast.walk(node) ) @@ -41,10 +41,10 @@ def _is_qry_offset_assignment(stmt: ast.AST) -> bool: return isinstance(func, ast.Attribute) and func.attr == "offset" -def test_helpers_guards_offset_with_allows_offset_fetch_flag() -> None: +def test_helpers_guards_offset_with_supports_offset_flag() -> None: """ Regression guard: the `.offset()` call in get_sqla_query must be wrapped - in an `if` that checks `allows_offset_fetch`. Without this guard, + in an `if` that checks `supports_offset`. Without this guard, engines that do not support OFFSET (Elasticsearch SQL) crash drill- to-detail on page 2+. @@ -52,8 +52,8 @@ def test_helpers_guards_offset_with_allows_offset_fetch_flag() -> None: Black-style reformatting and trivial refactors. """ source = HELPERS_PATH.read_text() - assert "allows_offset_fetch" in source, ( - "helpers.py no longer references allows_offset_fetch; the OFFSET " + assert "supports_offset" in source, ( + "helpers.py no longer references supports_offset; the OFFSET " "guard is gone — Elasticsearch drill-to-detail will crash on page 2+." ) @@ -65,7 +65,7 @@ def __init__(self) -> None: self._in_guarded_if = 0 def visit_If(self, node: ast.If) -> None: # noqa: N802 - if _uses_allows_offset_fetch(node.test): + if _uses_supports_offset(node.test): self._in_guarded_if += 1 for child in node.body: self.visit(child) @@ -83,6 +83,6 @@ def visit_Assign(self, node: ast.Assign) -> None: # noqa: N802 Visitor().visit(tree) assert not unguarded, ( f"Unguarded .offset() call(s) in helpers.py at line(s) {unguarded}. " - "Wrap with `if ... allows_offset_fetch:` to prevent OFFSET emission " + "Wrap with `if ... supports_offset:` to prevent OFFSET emission " "on engines that cannot parse it (e.g. Elasticsearch SQL)." ) diff --git a/tests/unit_tests/views/datasource/test_utils.py b/tests/unit_tests/views/datasource/test_utils.py index 5f38ee88f6af..858b4b124dbd 100644 --- a/tests/unit_tests/views/datasource/test_utils.py +++ b/tests/unit_tests/views/datasource/test_utils.py @@ -23,11 +23,11 @@ def fake_datasource_factory(): """Builds a MagicMock datasource whose db_engine_spec is configurable.""" - def _build(allows_offset_fetch: bool) -> MagicMock: + def _build(supports_offset: bool) -> MagicMock: datasource = MagicMock(name="SqlaTable") datasource.type = "table" datasource.id = 1 - datasource.database.db_engine_spec.allows_offset_fetch = allows_offset_fetch + datasource.database.db_engine_spec.supports_offset = supports_offset return datasource return _build @@ -37,12 +37,12 @@ def test_get_samples_uses_normal_path_when_engine_supports_offset( fake_datasource_factory, ): """ - Engines with allows_offset_fetch=True continue to use the existing + Engines with supports_offset=True continue to use the existing QueryContext/get_payload path. No cursor-method calls. """ from superset.views.datasource import utils - datasource = fake_datasource_factory(allows_offset_fetch=True) + datasource = fake_datasource_factory(supports_offset=True) datasource.database.db_engine_spec.fetch_data_with_cursor = MagicMock() with ( @@ -86,13 +86,13 @@ def test_get_samples_uses_cursor_path_when_engine_disallows_offset( fake_datasource_factory, ): """ - When the engine reports allows_offset_fetch=False and the requested + When the engine reports supports_offset=False and the requested page is > 1, get_samples delegates to fetch_data_with_cursor with the SQL that Superset already compiled for the normal samples payload. """ from superset.views.datasource import utils - datasource = fake_datasource_factory(allows_offset_fetch=False) + datasource = fake_datasource_factory(supports_offset=False) datasource.database.db_engine_spec.fetch_data_with_cursor.return_value = ( [[99]], ["a"], @@ -155,7 +155,7 @@ def test_get_samples_cursor_path_propagates_coltypes_from_samples_payload( """ from superset.views.datasource import utils - datasource = fake_datasource_factory(allows_offset_fetch=False) + datasource = fake_datasource_factory(supports_offset=False) datasource.database.db_engine_spec.fetch_data_with_cursor.return_value = ( [["x", 1]], ["a", "b"], @@ -210,7 +210,7 @@ def test_get_samples_cursor_path_cleans_count_cache_on_failure( from superset.constants import CacheRegion from superset.views.datasource import utils - datasource = fake_datasource_factory(allows_offset_fetch=False) + datasource = fake_datasource_factory(supports_offset=False) datasource.database.db_engine_spec.fetch_data_with_cursor.side_effect = ( RuntimeError("boom: internal es stack trace details") ) @@ -275,7 +275,7 @@ def test_get_samples_cursor_path_raises_when_sample_payload_has_no_sql( from superset.constants import CacheRegion from superset.views.datasource import utils - datasource = fake_datasource_factory(allows_offset_fetch=False) + datasource = fake_datasource_factory(supports_offset=False) samples_ctx = MagicMock() samples_ctx.get_payload.return_value = { @@ -330,7 +330,7 @@ def test_get_samples_cursor_path_unused_for_page_one(fake_datasource_factory): """ from superset.views.datasource import utils - datasource = fake_datasource_factory(allows_offset_fetch=False) + datasource = fake_datasource_factory(supports_offset=False) datasource.database.db_engine_spec.fetch_data_with_cursor = MagicMock() samples_ctx = MagicMock() From 057f04420e0bb0b8ed10fafc8305cf8e5c6fc297 Mon Sep 17 00:00:00 2001 From: Taras Pashkevych Date: Mon, 20 Apr 2026 13:37:39 +0200 Subject: [PATCH 13/13] style(tests): inline single-use constant and drop duplicate imports --- tests/unit_tests/db_engine_specs/test_elasticsearch.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tests/unit_tests/db_engine_specs/test_elasticsearch.py b/tests/unit_tests/db_engine_specs/test_elasticsearch.py index bf440fe55677..12ccc750f8dc 100644 --- a/tests/unit_tests/db_engine_specs/test_elasticsearch.py +++ b/tests/unit_tests/db_engine_specs/test_elasticsearch.py @@ -275,25 +275,20 @@ class BoomError(RuntimeError): responses = [ {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"}, ] - raises_on_call_index = 1 call_count = {"n": 0} recorded_close = {} def perform_request(method, path, body=None, **_kwargs): call_count["n"] += 1 - # calls: 0=initial query, 1=cursor follow-up (raises), 2=close - if call_count["n"] - 1 == raises_on_call_index: + # calls: 1=initial query, 2=cursor follow-up (raises), 3=close + if call_count["n"] == 2: raise BoomError("transport blew up") if path.endswith("/close"): recorded_close["body"] = body return {} return responses[call_count["n"] - 1] - from unittest.mock import MagicMock - - import pytest - transport = MagicMock() transport.perform_request.side_effect = perform_request conn = MagicMock()