diff --git a/UPDATING.md b/UPDATING.md index 27fc3428b9e..f766780773f 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -24,6 +24,12 @@ assists people when migrating to a new version. ## Next +### Engine spec capability flag: `supports_offset` + +A new `BaseEngineSpec.supports_offset` attribute (default `True`) indicates whether a database engine supports the SQL `OFFSET` clause. Engines that do not support `OFFSET` — such as Elasticsearch SQL and OpenDistro — opt out by setting it to `False`, and Superset uses each engine's cursor API to paginate drill-to-detail samples instead of emitting `OFFSET`. Downstream forks maintaining custom engine specs may set the flag to `False` (and implement `fetch_data_with_cursor`) to avoid crashes when paginated drill-to-detail queries are run against engines without `OFFSET` support. + +**Note on deep-pagination cost:** Cursor-based engines (including Elasticsearch and OpenDistro) are forward-only, so reaching page `N` of a drill-to-detail view issues `N` round trips to the cluster. Deep pagination is therefore linear in page number; users paginating into the hundreds or thousands will notice added latency compared to `OFFSET`-capable engines. + ### Granular Export Controls A new feature flag `GRANULAR_EXPORT_CONTROLS` introduces three fine-grained permissions that replace the legacy `can_csv` permission: diff --git a/superset/databases/schemas.py b/superset/databases/schemas.py index da0dd3cf340..07fc2f475d2 100644 --- a/superset/databases/schemas.py +++ b/superset/databases/schemas.py @@ -1067,6 +1067,14 @@ class EngineInformationSchema(Schema): supports_oauth2 = fields.Boolean( metadata={"description": "The database supports OAuth2"} ) + supports_offset = fields.Boolean( + metadata={ + "description": ( + "The database supports OFFSET in SQL queries. " + "Engines like Elasticsearch SQL return False." + ) + } + ) class DatabaseConnectionSchema(Schema): diff --git a/superset/db_engine_specs/base.py b/superset/db_engine_specs/base.py index bd1e2f9c361..29d44fc233a 100644 --- a/superset/db_engine_specs/base.py +++ b/superset/db_engine_specs/base.py @@ -494,6 +494,12 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods allows_sql_comments = True allows_escaped_colons = True + # Whether the engine supports OFFSET in SQL queries. Defaults to True; + # engines like Elasticsearch SQL that do not support OFFSET set this to + # False and are expected to implement `fetch_data_with_cursor` for + # pagination via another mechanism (e.g. Elasticsearch's cursor API). + supports_offset = True + # Whether ORDER BY clause can use aliases created in SELECT # that are the same as a source column allows_alias_to_source_column = True @@ -1233,6 +1239,26 @@ def fetch_data(cls, cursor: Any, limit: int | None = None) -> list[tuple[Any, .. except Exception as ex: raise cls.get_dbapi_mapped_exception(ex) from ex + @classmethod + def fetch_data_with_cursor( + cls, + database: Database, + sql: str, + page_index: int, + page_size: int, + ) -> tuple[list[list[Any]], list[str]]: + """ + Fetch a single page of results via engine-native cursor pagination. + + Only called when ``cls.supports_offset`` is False and a non-first + page is requested (see ``superset/views/datasource/utils.py``). + Engines that set ``supports_offset = False`` must override this. + """ + raise NotImplementedError( + f"{cls.__name__} sets supports_offset=False but does not " + "implement fetch_data_with_cursor()" + ) + @classmethod def expand_data( cls, columns: list[ResultSetColumnType], data: list[dict[Any, Any]] @@ -2515,6 +2541,7 @@ def get_public_information(cls) -> dict[str, Any]: "disable_ssh_tunneling": cls.disable_ssh_tunneling, "supports_dynamic_catalog": cls.supports_dynamic_catalog, "supports_oauth2": cls.supports_oauth2, + "supports_offset": cls.supports_offset, } @classmethod diff --git a/superset/db_engine_specs/elasticsearch.py b/superset/db_engine_specs/elasticsearch.py index 447605055ea..df0bc49e1e7 100644 --- a/superset/db_engine_specs/elasticsearch.py +++ b/superset/db_engine_specs/elasticsearch.py @@ -14,9 +14,12 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from __future__ import annotations + import logging +import re from datetime import datetime -from typing import Any, Optional +from typing import Any, Optional, TYPE_CHECKING from packaging.version import Version from sqlalchemy import types @@ -29,9 +32,100 @@ SupersetDBAPIProgrammingError, ) +if TYPE_CHECKING: + from superset.models.core import Database + logger = logging.getLogger() +def _fetch_page_via_cursor( + database: Database, + sql: str, + page_index: int, + page_size: int, + sql_path: str, + close_path: str, +) -> tuple[list[list[Any]], list[str]]: + """ + Iterate Elasticsearch/OpenSearch SQL cursor pagination to return a single + page of results. + + Executes ``sql`` with ``fetch_size = page_size``, then sends cursor + follow-up requests ``page_index`` times to skip earlier pages. Closes the + cursor when done to release server-side state. Returns + ``(rows, columns)``. + + If the dataset is exhausted before reaching ``page_index``, returns an + empty rows list with the column names from the initial request. + + Note: the Elasticsearch SQL cursor is forward-only, so cost is linear in + ``page_index`` — reaching page N issues N round trips to the cluster. + Deep pagination (hundreds of pages) will therefore be noticeably slower + than on ``OFFSET``-capable engines. This is a protocol limitation, not + an implementation choice. + """ + # The Elasticsearch SQL API rejects trailing semicolons, and any LIMIT + # in the submitted statement caps the result set before the cursor can + # page through it. ``fetch_size`` drives pagination instead. + # Assumption: Superset only appends a trailing ``LIMIT N`` for engines + # with ``supports_offset=False``. If that ever changes (e.g. + # ``FETCH FIRST N ROWS`` or ``TOP N``), extend this sanitizer to match. + sanitized_sql = sql.strip().rstrip(";").strip() + sanitized_sql = re.sub( + r"\s+LIMIT\s+\d+\s*$", "", sanitized_sql, flags=re.IGNORECASE + ) + + # The raw transport does not auto-set Content-Type the way the Python + # DB-API driver does; ES rejects POSTs without a JSON content type. + json_headers = {"Content-Type": "application/json"} + with database.get_raw_connection() as conn: + transport = conn.es.transport + response = transport.perform_request( + "POST", + sql_path, + headers=json_headers, + body={"query": sanitized_sql, "fetch_size": page_size}, + ) + columns = [col["name"] for col in response.get("columns", [])] + rows = response.get("rows", []) + cursor = response.get("cursor") + + try: + for _ in range(page_index): + if not cursor: + # Dataset exhausted before reaching the target page — + # no cursor to close (ES returns no cursor on the final + # page). Return immediately with empty rows. + return [], columns + response = transport.perform_request( + "POST", + sql_path, + headers=json_headers, + body={"cursor": cursor}, + ) + rows = response.get("rows", []) + cursor = response.get("cursor") + + return rows, columns + finally: + if cursor: + # Best-effort cleanup. If close itself fails we don't want + # to mask the original error (if any) — swallow and log. + try: + transport.perform_request( + "POST", + close_path, + headers=json_headers, + body={"cursor": cursor}, + ) + except Exception: # pylint: disable=broad-except + logger.warning( + "Failed to close Elasticsearch SQL cursor at %s", + close_path, + exc_info=True, + ) + + class ElasticSearchEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method engine = "elasticsearch" engine_name = "Elasticsearch" @@ -39,6 +133,7 @@ class ElasticSearchEngineSpec(BaseEngineSpec): # pylint: disable=abstract-metho allows_joins = False allows_subqueries = True allows_sql_comments = False + supports_offset = False metadata = { "description": ( @@ -136,6 +231,30 @@ class ElasticSearchEngineSpec(BaseEngineSpec): # pylint: disable=abstract-metho type_code_map: dict[int, str] = {} # loaded from get_datatype only if needed + SQL_ENDPOINT = "/_sql" + SQL_CLOSE_ENDPOINT = "/_sql/close" + + @classmethod + def fetch_data_with_cursor( + cls, + database: Database, + sql: str, + page_index: int, + page_size: int, + ) -> tuple[list[list[Any]], list[str]]: + """ + Fetch a single page of results using Elasticsearch cursor pagination. + See ``_fetch_page_via_cursor`` for the protocol. + """ + return _fetch_page_via_cursor( + database=database, + sql=sql, + page_index=page_index, + page_size=page_size, + sql_path=cls.SQL_ENDPOINT, + close_path=cls.SQL_CLOSE_ENDPOINT, + ) + @classmethod def get_dbapi_exception_mapping(cls) -> dict[type[Exception], type[Exception]]: # pylint: disable=import-error,import-outside-toplevel @@ -190,6 +309,7 @@ class OpenDistroEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method allows_joins = False allows_subqueries = True allows_sql_comments = False + supports_offset = False _time_grain_expressions = { None: "{col}", @@ -204,6 +324,30 @@ class OpenDistroEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method engine = "odelasticsearch" engine_name = "OpenSearch (OpenDistro)" + SQL_ENDPOINT = "/_opendistro/_sql" + SQL_CLOSE_ENDPOINT = "/_opendistro/_sql/close" + + @classmethod + def fetch_data_with_cursor( + cls, + database: Database, + sql: str, + page_index: int, + page_size: int, + ) -> tuple[list[list[Any]], list[str]]: + """ + Fetch a single page of results using OpenDistro SQL cursor pagination. + Same protocol as ElasticSearchEngineSpec, different endpoint paths. + """ + return _fetch_page_via_cursor( + database=database, + sql=sql, + page_index=page_index, + page_size=page_size, + sql_path=cls.SQL_ENDPOINT, + close_path=cls.SQL_CLOSE_ENDPOINT, + ) + @classmethod def convert_dttm( cls, target_type: str, dttm: datetime, db_extra: Optional[dict[str, Any]] = None diff --git a/superset/models/helpers.py b/superset/models/helpers.py index 71958d8cf83..b94f196a0f2 100644 --- a/superset/models/helpers.py +++ b/superset/models/helpers.py @@ -3285,7 +3285,7 @@ def get_sqla_query( # pylint: disable=too-many-arguments,too-many-locals,too-ma if row_limit: qry = qry.limit(row_limit) - if row_offset: + if row_offset and self.database.db_engine_spec.supports_offset: qry = qry.offset(row_offset) if series_limit and groupby_series_columns: diff --git a/superset/views/datasource/utils.py b/superset/views/datasource/utils.py index deec4067a56..8fc3755e22c 100644 --- a/superset/views/datasource/utils.py +++ b/superset/views/datasource/utils.py @@ -14,8 +14,10 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from __future__ import annotations + import logging -from typing import Any, Iterable, Optional +from typing import Any, Iterable, Optional, TYPE_CHECKING from flask import current_app as app @@ -28,6 +30,10 @@ from superset.utils.core import QueryStatus from superset.views.datasource.schemas import SamplesPayloadSchema +if TYPE_CHECKING: + from superset.common.query_context import QueryContext + from superset.daos.datasource import Datasource + logger = logging.getLogger(__name__) @@ -169,11 +175,36 @@ def get_samples( # pylint: disable=too-many-arguments if count_star_data.get("status") == QueryStatus.FAILED: raise DatasetSamplesFailedError(count_star_data.get("error")) - sample_data = samples_instance.get_payload()["queries"][0] + engine_spec = datasource.database.db_engine_spec + row_offset = limit_clause["row_offset"] + row_limit = limit_clause["row_limit"] - if sample_data.get("status") == QueryStatus.FAILED: - QueryCacheManager.delete(count_star_data.get("cache_key"), CacheRegion.DATA) - raise DatasetSamplesFailedError(sample_data.get("error")) + if not engine_spec.supports_offset and row_offset > 0: + try: + sample_data = _fetch_samples_via_cursor( + datasource=datasource, + samples_instance=samples_instance, + count_star_data=count_star_data, + page_index=row_offset // row_limit, + page_size=row_limit, + ) + except DatasetSamplesFailedError: + raise + except Exception as exc: + QueryCacheManager.delete( + count_star_data.get("cache_key"), CacheRegion.DATA + ) + logger.exception("Cursor-based samples pagination failed") + raise DatasetSamplesFailedError( + "Failed to fetch samples via cursor pagination" + ) from exc + else: + sample_data = samples_instance.get_payload()["queries"][0] + if sample_data.get("status") == QueryStatus.FAILED: + QueryCacheManager.delete( + count_star_data.get("cache_key"), CacheRegion.DATA + ) + raise DatasetSamplesFailedError(sample_data.get("error") or "") sample_data["page"] = page sample_data["per_page"] = per_page @@ -181,3 +212,68 @@ def get_samples( # pylint: disable=too-many-arguments return sample_data except (IndexError, KeyError) as exc: raise DatasetSamplesFailedError from exc + + +def _fetch_samples_via_cursor( + datasource: Datasource, + samples_instance: QueryContext, + count_star_data: dict[str, Any], + page_index: int, + page_size: int, +) -> dict[str, Any]: + """ + Fetch a single page of samples via engine-spec cursor pagination. + + Used when ``datasource.database.db_engine_spec.supports_offset`` is + False and a non-first page is requested. Reuses the SQL that Superset + already compiled for the normal samples payload, delegates cursor + iteration to the engine spec, and assembles a response dict compatible + with the normal samples path. + + The samples payload is also executed (its SQL is OFFSET-stripped by the + models/helpers.py guard) to obtain the authoritative ``colnames`` and + ``coltypes`` that the frontend grid needs for type-based cell renderers + — ensuring page 2+ renders identically to page 1. The engine spec is + responsible for stripping any trailing ``LIMIT`` from the SQL so the + cursor is not capped to a single page. + + Cost: this path issues one extra "page-1-shaped" samples query on every + request for page ≥ 2, on top of the ``page_index + 1`` cursor round + trips. The extra query is what provides authoritative ``coltypes`` + (derived from the DB-API cursor description) — the ES cursor response + only carries ES SQL type names, which would need a separate translator + to Superset's coltype enum. TODO: extract SQL via + ``datasource.get_query_str(query_obj.to_dict())`` and derive coltypes + from cursor metadata to eliminate the extra execution. + """ + # Execute the normal samples payload to source authoritative colnames + # and coltypes. See the cost note in the docstring — this is deliberate. + # The helpers.py OFFSET guard keeps it to a single page-1-shaped query, + # not a full-table scan, for engines on the cursor path. + sample_payload = samples_instance.get_payload()["queries"][0] + if sample_payload.get("status") == QueryStatus.FAILED: + QueryCacheManager.delete(count_star_data.get("cache_key"), CacheRegion.DATA) + raise DatasetSamplesFailedError(sample_payload.get("error") or "") + + sql = sample_payload.get("query") + if not sql: + QueryCacheManager.delete(count_star_data.get("cache_key"), CacheRegion.DATA) + raise DatasetSamplesFailedError("Empty samples query") + + engine_spec = datasource.database.db_engine_spec + rows, cursor_colnames = engine_spec.fetch_data_with_cursor( + database=datasource.database, + sql=sql, + page_index=page_index, + page_size=page_size, + ) + + colnames = sample_payload.get("colnames") or cursor_colnames + coltypes = sample_payload.get("coltypes") or [] + + return { + "data": [dict(zip(colnames, row, strict=False)) for row in rows], + "colnames": colnames, + "coltypes": coltypes, + "status": QueryStatus.SUCCESS, + } diff --git a/tests/unit_tests/databases/test_schemas.py b/tests/unit_tests/databases/test_schemas.py new file mode 100644 index 00000000000..da29a11c144 --- /dev/null +++ b/tests/unit_tests/databases/test_schemas.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +def test_engine_information_schema_includes_supports_offset() -> None: + """ + The frontend consumes EngineInformationSchema to know what the engine + can do. supports_offset is a new field that must pass through. + """ + from superset.databases.schemas import EngineInformationSchema + + schema = EngineInformationSchema() + result = schema.dump( + { + "supports_file_upload": True, + "disable_ssh_tunneling": False, + "supports_dynamic_catalog": False, + "supports_oauth2": False, + "supports_offset": False, + } + ) + + assert result["supports_offset"] is False diff --git a/tests/unit_tests/db_engine_specs/test_base.py b/tests/unit_tests/db_engine_specs/test_base.py index 22ec7d0aa18..bf079b50a8a 100644 --- a/tests/unit_tests/db_engine_specs/test_base.py +++ b/tests/unit_tests/db_engine_specs/test_base.py @@ -1223,3 +1223,26 @@ def test_start_oauth2_dance_falls_back_to_url_for(mocker: MockerFixture) -> None error = exc_info.value.error assert error.extra["redirect_uri"] == fallback_uri + + +def test_base_spec_supports_offset_default_true() -> None: + """ + New engines opt-in to OFFSET support by default. Engines that do not + support OFFSET (like Elasticsearch SQL) opt out explicitly. + """ + from superset.db_engine_specs.base import BaseEngineSpec + + assert BaseEngineSpec.supports_offset is True + + +def test_base_spec_public_information_includes_supports_offset() -> None: + """ + The supports_offset capability is exposed via get_public_information + so the frontend can reason about it (e.g. future UI disablement). + """ + from superset.db_engine_specs.base import BaseEngineSpec + + info = BaseEngineSpec.get_public_information() + + assert "supports_offset" in info + assert info["supports_offset"] is True diff --git a/tests/unit_tests/db_engine_specs/test_elasticsearch.py b/tests/unit_tests/db_engine_specs/test_elasticsearch.py index 640f4aa39d0..12ccc750f8d 100644 --- a/tests/unit_tests/db_engine_specs/test_elasticsearch.py +++ b/tests/unit_tests/db_engine_specs/test_elasticsearch.py @@ -16,6 +16,7 @@ # under the License. from datetime import datetime from typing import Any, Optional +from unittest.mock import MagicMock import pytest from sqlalchemy import column # noqa: F401 @@ -92,3 +93,325 @@ def test_opendistro_sqla_column_label(original: str, expected: str) -> None: from superset.db_engine_specs.elasticsearch import OpenDistroEngineSpec assert OpenDistroEngineSpec.make_label_compatible(original) == expected + + +def test_elasticsearch_spec_opts_out_of_offset_fetch() -> None: + """ + Elasticsearch SQL does not support OFFSET. The spec must opt out so the + query builder does not emit OFFSET clauses that crash the parser. + """ + from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec + + assert ElasticSearchEngineSpec.supports_offset is False + + +def test_opendistro_spec_opts_out_of_offset_fetch() -> None: + """ + OpenDistro/OpenSearch SQL also does not support OFFSET. + """ + from superset.db_engine_specs.elasticsearch import OpenDistroEngineSpec + + assert OpenDistroEngineSpec.supports_offset is False + + +def _build_fake_database(transport_responses: list[dict[str, Any]]) -> MagicMock: + """ + Build a mocked Database whose get_raw_connection() yields a connection + whose es.transport.perform_request returns transport_responses sequentially. + """ + database = MagicMock(name="Database") + + responses_iter = iter(transport_responses) + + def perform_request(method, path, body=None, **_kwargs): + return next(responses_iter) + + transport = MagicMock() + transport.perform_request.side_effect = perform_request + conn = MagicMock() + conn.es.transport = transport + + ctx = MagicMock() + ctx.__enter__ = MagicMock(return_value=conn) + ctx.__exit__ = MagicMock(return_value=False) + database.get_raw_connection.return_value = ctx + database._transport = transport # expose for assertions + return database + + +def test_fetch_data_with_cursor_returns_first_page_when_page_index_zero() -> None: + """ + Page index 0 = return the rows from the initial query, no cursor + iteration needed. The cursor must still be closed if present. + """ + from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec + + database = _build_fake_database( + [ + { + "columns": [{"name": "a"}, {"name": "b"}], + "rows": [[1, "x"], [2, "y"]], + "cursor": "CUR-1", + }, + {}, # close + ] + ) + + rows, cols = ElasticSearchEngineSpec.fetch_data_with_cursor( + database=database, + sql="SELECT a, b FROM idx", + page_index=0, + page_size=2, + ) + + assert rows == [[1, "x"], [2, "y"]] + assert cols == ["a", "b"] + + calls = database._transport.perform_request.call_args_list + assert len(calls) == 2 + assert calls[0][0][0] == "POST" + assert calls[0][0][1] == "/_sql" + assert calls[0].kwargs["body"] == {"query": "SELECT a, b FROM idx", "fetch_size": 2} + assert calls[1][0][1] == "/_sql/close" + assert calls[1].kwargs["body"] == {"cursor": "CUR-1"} + + +def test_fetch_data_with_cursor_iterates_to_target_page() -> None: + """ + For page_index=2, the code executes the initial query, then sends the + cursor twice. The rows returned belong to the third page. + """ + from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec + + database = _build_fake_database( + [ + {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"}, + {"rows": [[1]], "cursor": "C2"}, + {"rows": [[2]], "cursor": "C3"}, + {}, # close + ] + ) + + rows, cols = ElasticSearchEngineSpec.fetch_data_with_cursor( + database=database, + sql="SELECT a FROM idx", + page_index=2, + page_size=1, + ) + + assert rows == [[2]] + assert cols == ["a"] + + calls = database._transport.perform_request.call_args_list + assert len(calls) == 4 + assert calls[1].kwargs["body"] == {"cursor": "C1"} + assert calls[2].kwargs["body"] == {"cursor": "C2"} + assert calls[3][0][1] == "/_sql/close" + assert calls[3].kwargs["body"] == {"cursor": "C3"} + + +def test_fetch_data_with_cursor_returns_empty_when_dataset_exhausted() -> None: + """ + If the dataset has fewer pages than the requested page_index, the + cursor becomes falsy mid-iteration. Return empty rows, do not call + close, do not raise. + """ + from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec + + database = _build_fake_database( + [ + {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"}, + {"rows": [[1]]}, # no cursor → dataset ends here + ] + ) + + rows, cols = ElasticSearchEngineSpec.fetch_data_with_cursor( + database=database, + sql="SELECT a FROM idx", + page_index=5, + page_size=1, + ) + + assert rows == [] + assert cols == ["a"] + assert len(database._transport.perform_request.call_args_list) == 2 + + +def test_fetch_data_with_cursor_does_not_close_when_no_cursor_present() -> None: + """ + Some responses (tiny result sets) come back without a cursor token. + The code must not send a close request with a missing cursor. + """ + from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec + + database = _build_fake_database( + [ + {"columns": [{"name": "a"}], "rows": [[0], [1]]}, + ] + ) + + rows, _ = ElasticSearchEngineSpec.fetch_data_with_cursor( + database=database, + sql="SELECT a FROM idx", + page_index=0, + page_size=50, + ) + + assert rows == [[0], [1]] + assert len(database._transport.perform_request.call_args_list) == 1 + + +def test_fetch_data_with_cursor_closes_cursor_even_if_iteration_raises() -> None: + """ + If an intermediate cursor request raises, the cursor from the most + recent successful response must still be closed. Prevents server-side + cursor leaks on transport errors. + """ + from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec + + class BoomError(RuntimeError): + pass + + responses = [ + {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"}, + ] + + call_count = {"n": 0} + recorded_close = {} + + def perform_request(method, path, body=None, **_kwargs): + call_count["n"] += 1 + # calls: 1=initial query, 2=cursor follow-up (raises), 3=close + if call_count["n"] == 2: + raise BoomError("transport blew up") + if path.endswith("/close"): + recorded_close["body"] = body + return {} + return responses[call_count["n"] - 1] + + transport = MagicMock() + transport.perform_request.side_effect = perform_request + conn = MagicMock() + conn.es.transport = transport + ctx = MagicMock() + ctx.__enter__ = MagicMock(return_value=conn) + ctx.__exit__ = MagicMock(return_value=False) + database = MagicMock() + database.get_raw_connection.return_value = ctx + + with pytest.raises(BoomError): + ElasticSearchEngineSpec.fetch_data_with_cursor( + database=database, + sql="SELECT a FROM idx", + page_index=3, + page_size=1, + ) + + assert recorded_close.get("body") == {"cursor": "C1"}, ( + "The cursor from the last successful response must be closed " + "even when a later iteration raises." + ) + + +@pytest.mark.parametrize( + "sql_in,expected_query", + [ + # Superset's SQL builder terminates statements with `;` for DB-API + # execution; the ES SQL API rejects that. + ("SELECT a FROM idx;", "SELECT a FROM idx"), + # A trailing LIMIT from the normal samples pipeline would cap the + # cursor to a single fetch; it must be stripped so `fetch_size` + # drives pagination instead. + ("SELECT a FROM idx LIMIT 50", "SELECT a FROM idx"), + ("SELECT a FROM idx LIMIT 50;", "SELECT a FROM idx"), + ("SELECT a FROM idx LIMIT 50 ; ", "SELECT a FROM idx"), + # Case-insensitive LIMIT recognition, since the builder's output is + # not guaranteed to be uppercase across backends. + ("SELECT a FROM idx limit 100", "SELECT a FROM idx"), + # LIMIT that is *not* the final clause (e.g. inside a subquery) must + # not be mangled. + ( + "SELECT * FROM (SELECT a FROM idx LIMIT 10) sub", + "SELECT * FROM (SELECT a FROM idx LIMIT 10) sub", + ), + ], +) +def test_fetch_data_with_cursor_sanitizes_sql(sql_in: str, expected_query: str) -> None: + """ + The Elasticsearch SQL API has two ergonomic landmines for Superset SQL: + a trailing ``;`` is rejected, and a trailing ``LIMIT N`` caps the cursor + to a single fetch. ``_fetch_page_via_cursor`` must strip both before + submitting the query. + """ + from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec + + database = _build_fake_database([{"columns": [{"name": "a"}], "rows": []}]) + + ElasticSearchEngineSpec.fetch_data_with_cursor( + database=database, + sql=sql_in, + page_index=0, + page_size=25, + ) + + first_call = database._transport.perform_request.call_args_list[0] + assert first_call.kwargs["body"]["query"] == expected_query + # fetch_size is independent of the SQL rewrite — it's what actually + # controls the page window. + assert first_call.kwargs["body"]["fetch_size"] == 25 + + +def test_fetch_data_with_cursor_sets_json_content_type_header() -> None: + """ + The raw ES transport does not auto-set Content-Type the way the DB-API + driver does; without it the cluster responds with HTTP 406. Every + perform_request issued by the cursor helper must carry the JSON header. + """ + from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec + + database = _build_fake_database( + [ + {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"}, + {"rows": [[1]], "cursor": "C2"}, + {}, # close + ] + ) + + ElasticSearchEngineSpec.fetch_data_with_cursor( + database=database, + sql="SELECT a FROM idx", + page_index=1, + page_size=1, + ) + + for call in database._transport.perform_request.call_args_list: + assert call.kwargs.get("headers") == {"Content-Type": "application/json"} + + +def test_opendistro_fetch_data_with_cursor_uses_opendistro_endpoints() -> None: + """ + OpenDistro's SQL plugin historically lives at /_opendistro/_sql rather + than /_sql. Verify the classmethod sends requests to the OpenDistro paths. + """ + from superset.db_engine_specs.elasticsearch import OpenDistroEngineSpec + + database = _build_fake_database( + [ + {"columns": [{"name": "a"}], "rows": [[42]], "cursor": "OD-1"}, + {}, # close + ] + ) + + rows, cols = OpenDistroEngineSpec.fetch_data_with_cursor( + database=database, + sql="SELECT a FROM idx", + page_index=0, + page_size=1, + ) + + assert rows == [[42]] + assert cols == ["a"] + + calls = database._transport.perform_request.call_args_list + assert calls[0][0][1] == "/_opendistro/_sql" + assert calls[1][0][1] == "/_opendistro/_sql/close" diff --git a/tests/unit_tests/models/test_helpers_offset.py b/tests/unit_tests/models/test_helpers_offset.py new file mode 100644 index 00000000000..7e728defbb0 --- /dev/null +++ b/tests/unit_tests/models/test_helpers_offset.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import ast +from pathlib import Path + +HELPERS_PATH = ( + Path(__file__).resolve().parents[3] / "superset" / "models" / "helpers.py" +) + + +def _uses_supports_offset(node: ast.AST) -> bool: + """True if any attribute access on `node` references 'supports_offset'.""" + return any( + isinstance(child, ast.Attribute) and child.attr == "supports_offset" + for child in ast.walk(node) + ) + + +def _is_qry_offset_assignment(stmt: ast.AST) -> bool: + """True if stmt is `qry = qry.offset(...)` (any LHS, call to `.offset`).""" + if not isinstance(stmt, ast.Assign): + return False + call = stmt.value + if not isinstance(call, ast.Call): + return False + func = call.func + return isinstance(func, ast.Attribute) and func.attr == "offset" + + +def test_helpers_guards_offset_with_supports_offset_flag() -> None: + """ + Regression guard: the `.offset()` call in get_sqla_query must be wrapped + in an `if` that checks `supports_offset`. Without this guard, + engines that do not support OFFSET (Elasticsearch SQL) crash drill- + to-detail on page 2+. + + We parse the AST rather than grep the source so the test survives + Black-style reformatting and trivial refactors. + """ + source = HELPERS_PATH.read_text() + assert "supports_offset" in source, ( + "helpers.py no longer references supports_offset; the OFFSET " + "guard is gone — Elasticsearch drill-to-detail will crash on page 2+." + ) + + tree = ast.parse(source) + unguarded: list[int] = [] + + class Visitor(ast.NodeVisitor): + def __init__(self) -> None: + self._in_guarded_if = 0 + + def visit_If(self, node: ast.If) -> None: # noqa: N802 + if _uses_supports_offset(node.test): + self._in_guarded_if += 1 + for child in node.body: + self.visit(child) + self._in_guarded_if -= 1 + for child in node.orelse: + self.visit(child) + else: + self.generic_visit(node) + + def visit_Assign(self, node: ast.Assign) -> None: # noqa: N802 + if _is_qry_offset_assignment(node) and self._in_guarded_if == 0: + unguarded.append(node.lineno) + self.generic_visit(node) + + Visitor().visit(tree) + assert not unguarded, ( + f"Unguarded .offset() call(s) in helpers.py at line(s) {unguarded}. " + "Wrap with `if ... supports_offset:` to prevent OFFSET emission " + "on engines that cannot parse it (e.g. Elasticsearch SQL)." + ) diff --git a/tests/unit_tests/views/__init__.py b/tests/unit_tests/views/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/unit_tests/views/datasource/test_utils.py b/tests/unit_tests/views/datasource/test_utils.py new file mode 100644 index 00000000000..858b4b124db --- /dev/null +++ b/tests/unit_tests/views/datasource/test_utils.py @@ -0,0 +1,367 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from unittest.mock import MagicMock, patch + +import pytest + + +@pytest.fixture +def fake_datasource_factory(): + """Builds a MagicMock datasource whose db_engine_spec is configurable.""" + + def _build(supports_offset: bool) -> MagicMock: + datasource = MagicMock(name="SqlaTable") + datasource.type = "table" + datasource.id = 1 + datasource.database.db_engine_spec.supports_offset = supports_offset + return datasource + + return _build + + +def test_get_samples_uses_normal_path_when_engine_supports_offset( + fake_datasource_factory, +): + """ + Engines with supports_offset=True continue to use the existing + QueryContext/get_payload path. No cursor-method calls. + """ + from superset.views.datasource import utils + + datasource = fake_datasource_factory(supports_offset=True) + datasource.database.db_engine_spec.fetch_data_with_cursor = MagicMock() + + with ( + patch.object( + utils, "DatasourceDAO", MagicMock(get_datasource=lambda **kw: datasource) + ), + patch.object(utils, "QueryContextFactory") as qcf, + ): + samples_ctx = MagicMock() + samples_ctx.get_payload.return_value = { + "queries": [ + { + "data": [{"a": 1}], + "colnames": ["a"], + "coltypes": [], + "status": "success", + } + ] + } + count_ctx = MagicMock() + count_ctx.get_payload.return_value = { + "queries": [{"data": [{"COUNT(*)": 42}], "status": "success"}] + } + qcf.return_value.create.side_effect = [samples_ctx, count_ctx] + + result = utils.get_samples( + datasource_type="table", + datasource_id=1, + page=2, + per_page=50, + ) + + assert result["data"] == [{"a": 1}] + assert result["page"] == 2 + assert result["per_page"] == 50 + assert result["total_count"] == 42 + datasource.database.db_engine_spec.fetch_data_with_cursor.assert_not_called() + + +def test_get_samples_uses_cursor_path_when_engine_disallows_offset( + fake_datasource_factory, +): + """ + When the engine reports supports_offset=False and the requested + page is > 1, get_samples delegates to fetch_data_with_cursor with the + SQL that Superset already compiled for the normal samples payload. + """ + from superset.views.datasource import utils + + datasource = fake_datasource_factory(supports_offset=False) + datasource.database.db_engine_spec.fetch_data_with_cursor.return_value = ( + [[99]], + ["a"], + ) + + samples_ctx = MagicMock() + samples_ctx.get_payload.return_value = { + "queries": [ + { + "data": [], + "colnames": ["a"], + "coltypes": [2], + "status": "success", + "query": "SELECT a FROM idx LIMIT 50", + } + ] + } + count_ctx = MagicMock() + count_ctx.get_payload.return_value = { + "queries": [{"data": [{"COUNT(*)": 200}], "status": "success"}] + } + + with ( + patch.object( + utils, "DatasourceDAO", MagicMock(get_datasource=lambda **kw: datasource) + ), + patch.object(utils, "QueryContextFactory") as qcf, + ): + qcf.return_value.create.side_effect = [samples_ctx, count_ctx] + + result = utils.get_samples( + datasource_type="table", + datasource_id=1, + page=3, + per_page=50, + ) + + datasource.database.db_engine_spec.fetch_data_with_cursor.assert_called_once() + kwargs = datasource.database.db_engine_spec.fetch_data_with_cursor.call_args.kwargs + assert kwargs["page_index"] == 2 + assert kwargs["page_size"] == 50 + # The cursor path reuses the already-compiled samples SQL verbatim; the + # engine spec is responsible for any sanitation (strip ``;``/``LIMIT``). + assert kwargs["sql"] == "SELECT a FROM idx LIMIT 50" + + assert result["data"] == [{"a": 99}] + assert result["colnames"] == ["a"] + assert result["page"] == 3 + assert result["per_page"] == 50 + assert result["total_count"] == 200 + + +def test_get_samples_cursor_path_propagates_coltypes_from_samples_payload( + fake_datasource_factory, +): + """ + Issue 1: coltypes from the normal samples payload must flow into the + cursor-path response dict so that SamplesPane's useGridColumns() picks + up type-based cell renderers on page 2+. + """ + from superset.views.datasource import utils + + datasource = fake_datasource_factory(supports_offset=False) + datasource.database.db_engine_spec.fetch_data_with_cursor.return_value = ( + [["x", 1]], + ["a", "b"], + ) + + samples_ctx = MagicMock() + samples_ctx.get_payload.return_value = { + "queries": [ + { + "data": [], + "colnames": ["a", "b"], + "coltypes": [2, 1], + "status": "success", + "query": "SELECT a, b FROM idx LIMIT 50", + } + ] + } + count_ctx = MagicMock() + count_ctx.get_payload.return_value = { + "queries": [{"data": [{"COUNT(*)": 2000}], "status": "success"}] + } + + with ( + patch.object( + utils, "DatasourceDAO", MagicMock(get_datasource=lambda **kw: datasource) + ), + patch.object(utils, "QueryContextFactory") as qcf, + ): + qcf.return_value.create.side_effect = [samples_ctx, count_ctx] + + result = utils.get_samples( + datasource_type="table", + datasource_id=1, + page=2, + per_page=50, + ) + + assert result["coltypes"] == [2, 1] + assert result["colnames"] == ["a", "b"] + assert result["data"] == [{"a": "x", "b": 1}] + + +def test_get_samples_cursor_path_cleans_count_cache_on_failure( + fake_datasource_factory, +): + """ + Issue 2: if fetch_data_with_cursor raises, the count-star cache must be + evicted (mirroring the normal FAILED path) and the error re-raised as + DatasetSamplesFailedError. + """ + from superset.commands.dataset.exceptions import DatasetSamplesFailedError + from superset.constants import CacheRegion + from superset.views.datasource import utils + + datasource = fake_datasource_factory(supports_offset=False) + datasource.database.db_engine_spec.fetch_data_with_cursor.side_effect = ( + RuntimeError("boom: internal es stack trace details") + ) + + samples_ctx = MagicMock() + samples_ctx.get_payload.return_value = { + "queries": [ + { + "data": [], + "colnames": ["a"], + "coltypes": [2], + "status": "success", + "query": "SELECT a FROM idx LIMIT 50", + } + ] + } + count_ctx = MagicMock() + count_ctx.get_payload.return_value = { + "queries": [ + { + "data": [{"COUNT(*)": 200}], + "status": "success", + "cache_key": "count-cache-key", + } + ] + } + + with ( + patch.object( + utils, "DatasourceDAO", MagicMock(get_datasource=lambda **kw: datasource) + ), + patch.object(utils, "QueryContextFactory") as qcf, + patch.object(utils, "QueryCacheManager") as cache_mgr, + ): + qcf.return_value.create.side_effect = [samples_ctx, count_ctx] + + with pytest.raises(DatasetSamplesFailedError) as excinfo: + utils.get_samples( + datasource_type="table", + datasource_id=1, + page=3, + per_page=50, + ) + + cache_mgr.delete.assert_called_once_with("count-cache-key", CacheRegion.DATA) + # Backend-internal error text must not leak into the user-facing message; + # only the original exception (chained via ``from exc``) retains the detail. + assert "internal es stack trace details" not in str(excinfo.value) + assert isinstance(excinfo.value.__cause__, RuntimeError) + + +def test_get_samples_cursor_path_raises_when_sample_payload_has_no_sql( + fake_datasource_factory, +): + """ + If the samples payload is ``success`` but carries no compiled ``query`` + string, the cursor path has nothing to submit. Fail fast with a + descriptive error and evict the count cache, instead of handing an empty + statement to the engine driver. + """ + from superset.commands.dataset.exceptions import DatasetSamplesFailedError + from superset.constants import CacheRegion + from superset.views.datasource import utils + + datasource = fake_datasource_factory(supports_offset=False) + + samples_ctx = MagicMock() + samples_ctx.get_payload.return_value = { + "queries": [ + { + "data": [], + "colnames": ["a"], + "coltypes": [2], + "status": "success", + # No ``query`` key — simulates a backend that reports success + # without emitting SQL (e.g. fully cached empty result). + } + ] + } + count_ctx = MagicMock() + count_ctx.get_payload.return_value = { + "queries": [ + { + "data": [{"COUNT(*)": 200}], + "status": "success", + "cache_key": "count-cache-key", + } + ] + } + + with ( + patch.object( + utils, "DatasourceDAO", MagicMock(get_datasource=lambda **kw: datasource) + ), + patch.object(utils, "QueryContextFactory") as qcf, + patch.object(utils, "QueryCacheManager") as cache_mgr, + ): + qcf.return_value.create.side_effect = [samples_ctx, count_ctx] + + with pytest.raises(DatasetSamplesFailedError): + utils.get_samples( + datasource_type="table", + datasource_id=1, + page=2, + per_page=50, + ) + + cache_mgr.delete.assert_called_once_with("count-cache-key", CacheRegion.DATA) + datasource.database.db_engine_spec.fetch_data_with_cursor.assert_not_called() + + +def test_get_samples_cursor_path_unused_for_page_one(fake_datasource_factory): + """ + Page 1 (row_offset = 0) does not need cursor iteration — the normal + path already returns the first page correctly without emitting OFFSET. + Keep the fast path. + """ + from superset.views.datasource import utils + + datasource = fake_datasource_factory(supports_offset=False) + datasource.database.db_engine_spec.fetch_data_with_cursor = MagicMock() + + samples_ctx = MagicMock() + samples_ctx.get_payload.return_value = { + "queries": [ + { + "data": [{"a": 1}], + "colnames": ["a"], + "coltypes": [], + "status": "success", + } + ] + } + count_ctx = MagicMock() + count_ctx.get_payload.return_value = { + "queries": [{"data": [{"COUNT(*)": 1}], "status": "success"}] + } + + with ( + patch.object( + utils, "DatasourceDAO", MagicMock(get_datasource=lambda **kw: datasource) + ), + patch.object(utils, "QueryContextFactory") as qcf, + ): + qcf.return_value.create.side_effect = [samples_ctx, count_ctx] + + utils.get_samples( + datasource_type="table", + datasource_id=1, + page=1, + per_page=50, + ) + + datasource.database.db_engine_spec.fetch_data_with_cursor.assert_not_called()