Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ assists people when migrating to a new version.

## Next

### Engine spec capability flag: `supports_offset`

A new `BaseEngineSpec.supports_offset` attribute (default `True`) indicates whether a database engine supports the SQL `OFFSET` clause. Engines that do not support `OFFSET` — such as Elasticsearch SQL and OpenDistro — opt out by setting it to `False`, and Superset uses each engine's cursor API to paginate drill-to-detail samples instead of emitting `OFFSET`. Downstream forks maintaining custom engine specs may set the flag to `False` (and implement `fetch_data_with_cursor`) to avoid crashes when paginated drill-to-detail queries are run against engines without `OFFSET` support.

**Note on deep-pagination cost:** Cursor-based engines (including Elasticsearch and OpenDistro) are forward-only, so reaching page `N` of a drill-to-detail view issues `N` round trips to the cluster. Deep pagination is therefore linear in page number; users paginating into the hundreds or thousands will notice added latency compared to `OFFSET`-capable engines.

### Granular Export Controls

A new feature flag `GRANULAR_EXPORT_CONTROLS` introduces three fine-grained permissions that replace the legacy `can_csv` permission:
Expand Down
8 changes: 8 additions & 0 deletions superset/databases/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,14 @@ class EngineInformationSchema(Schema):
supports_oauth2 = fields.Boolean(
metadata={"description": "The database supports OAuth2"}
)
supports_offset = fields.Boolean(
metadata={
"description": (
"The database supports OFFSET in SQL queries. "
"Engines like Elasticsearch SQL return False."
)
}
)


class DatabaseConnectionSchema(Schema):
Expand Down
27 changes: 27 additions & 0 deletions superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,12 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
allows_sql_comments = True
allows_escaped_colons = True

# Whether the engine supports OFFSET in SQL queries. Defaults to True;
# engines like Elasticsearch SQL that do not support OFFSET set this to
# False and are expected to implement `fetch_data_with_cursor` for
# pagination via another mechanism (e.g. Elasticsearch's cursor API).
supports_offset = True

# Whether ORDER BY clause can use aliases created in SELECT
# that are the same as a source column
allows_alias_to_source_column = True
Expand Down Expand Up @@ -1233,6 +1239,26 @@ def fetch_data(cls, cursor: Any, limit: int | None = None) -> list[tuple[Any, ..
except Exception as ex:
raise cls.get_dbapi_mapped_exception(ex) from ex

@classmethod
def fetch_data_with_cursor(
cls,
database: Database,
sql: str,
page_index: int,
page_size: int,
) -> tuple[list[list[Any]], list[str]]:
"""
Fetch a single page of results via engine-native cursor pagination.

Only called when ``cls.supports_offset`` is False and a non-first
page is requested (see ``superset/views/datasource/utils.py``).
Engines that set ``supports_offset = False`` must override this.
"""
raise NotImplementedError(
f"{cls.__name__} sets supports_offset=False but does not "
"implement fetch_data_with_cursor()"
)

@classmethod
def expand_data(
cls, columns: list[ResultSetColumnType], data: list[dict[Any, Any]]
Expand Down Expand Up @@ -2515,6 +2541,7 @@ def get_public_information(cls) -> dict[str, Any]:
"disable_ssh_tunneling": cls.disable_ssh_tunneling,
"supports_dynamic_catalog": cls.supports_dynamic_catalog,
"supports_oauth2": cls.supports_oauth2,
"supports_offset": cls.supports_offset,
}

@classmethod
Expand Down
146 changes: 145 additions & 1 deletion superset/db_engine_specs/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
import re
from datetime import datetime
from typing import Any, Optional
from typing import Any, Optional, TYPE_CHECKING

from packaging.version import Version
from sqlalchemy import types
Expand All @@ -29,16 +32,108 @@
SupersetDBAPIProgrammingError,
)

if TYPE_CHECKING:
from superset.models.core import Database

logger = logging.getLogger()


def _fetch_page_via_cursor(
database: Database,
sql: str,
page_index: int,
page_size: int,
sql_path: str,
close_path: str,
) -> tuple[list[list[Any]], list[str]]:
"""
Iterate Elasticsearch/OpenSearch SQL cursor pagination to return a single
page of results.

Executes ``sql`` with ``fetch_size = page_size``, then sends cursor
follow-up requests ``page_index`` times to skip earlier pages. Closes the
cursor when done to release server-side state. Returns
``(rows, columns)``.

If the dataset is exhausted before reaching ``page_index``, returns an
empty rows list with the column names from the initial request.

Note: the Elasticsearch SQL cursor is forward-only, so cost is linear in
``page_index`` — reaching page N issues N round trips to the cluster.
Deep pagination (hundreds of pages) will therefore be noticeably slower
than on ``OFFSET``-capable engines. This is a protocol limitation, not
an implementation choice.
"""
# The Elasticsearch SQL API rejects trailing semicolons, and any LIMIT
# in the submitted statement caps the result set before the cursor can
# page through it. ``fetch_size`` drives pagination instead.
# Assumption: Superset only appends a trailing ``LIMIT N`` for engines
# with ``supports_offset=False``. If that ever changes (e.g.
# ``FETCH FIRST N ROWS`` or ``TOP N``), extend this sanitizer to match.
sanitized_sql = sql.strip().rstrip(";").strip()
sanitized_sql = re.sub(
r"\s+LIMIT\s+\d+\s*$", "", sanitized_sql, flags=re.IGNORECASE
)

# The raw transport does not auto-set Content-Type the way the Python
# DB-API driver does; ES rejects POSTs without a JSON content type.
json_headers = {"Content-Type": "application/json"}
with database.get_raw_connection() as conn:
transport = conn.es.transport
response = transport.perform_request(
"POST",
sql_path,
headers=json_headers,
body={"query": sanitized_sql, "fetch_size": page_size},
)
columns = [col["name"] for col in response.get("columns", [])]
rows = response.get("rows", [])
cursor = response.get("cursor")

try:
for _ in range(page_index):
if not cursor:
# Dataset exhausted before reaching the target page —
# no cursor to close (ES returns no cursor on the final
# page). Return immediately with empty rows.
return [], columns
response = transport.perform_request(
"POST",
sql_path,
headers=json_headers,
body={"cursor": cursor},
)
rows = response.get("rows", [])
cursor = response.get("cursor")

return rows, columns
finally:
if cursor:
# Best-effort cleanup. If close itself fails we don't want
# to mask the original error (if any) — swallow and log.
try:
transport.perform_request(
"POST",
close_path,
headers=json_headers,
body={"cursor": cursor},
)
except Exception: # pylint: disable=broad-except
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blind exception catch should be specific

Replace the broad Exception catch on line 121 with a specific exception type (e.g., ConnectionError, TimeoutError, or RequestException) to avoid masking unexpected errors.

Code suggestion
Check the AI-generated fix before applying
Suggested change
except Exception: # pylint: disable=broad-except
except (ConnectionError, TimeoutError, Exception): # pylint: disable=broad-except

Code Review Run #0b865c


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

logger.warning(
"Failed to close Elasticsearch SQL cursor at %s",
close_path,
exc_info=True,
)


class ElasticSearchEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
engine = "elasticsearch"
engine_name = "Elasticsearch"
time_groupby_inline = True
allows_joins = False
allows_subqueries = True
allows_sql_comments = False
supports_offset = False

metadata = {
"description": (
Expand Down Expand Up @@ -136,6 +231,30 @@ class ElasticSearchEngineSpec(BaseEngineSpec): # pylint: disable=abstract-metho

type_code_map: dict[int, str] = {} # loaded from get_datatype only if needed

SQL_ENDPOINT = "/_sql"
SQL_CLOSE_ENDPOINT = "/_sql/close"

@classmethod
def fetch_data_with_cursor(
cls,
database: Database,
sql: str,
page_index: int,
page_size: int,
) -> tuple[list[list[Any]], list[str]]:
"""
Fetch a single page of results using Elasticsearch cursor pagination.
See ``_fetch_page_via_cursor`` for the protocol.
"""
return _fetch_page_via_cursor(
database=database,
sql=sql,
page_index=page_index,
page_size=page_size,
sql_path=cls.SQL_ENDPOINT,
close_path=cls.SQL_CLOSE_ENDPOINT,
)

@classmethod
def get_dbapi_exception_mapping(cls) -> dict[type[Exception], type[Exception]]:
# pylint: disable=import-error,import-outside-toplevel
Expand Down Expand Up @@ -190,6 +309,7 @@ class OpenDistroEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
allows_joins = False
allows_subqueries = True
allows_sql_comments = False
supports_offset = False

_time_grain_expressions = {
None: "{col}",
Expand All @@ -204,6 +324,30 @@ class OpenDistroEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
engine = "odelasticsearch"
engine_name = "OpenSearch (OpenDistro)"

SQL_ENDPOINT = "/_opendistro/_sql"
SQL_CLOSE_ENDPOINT = "/_opendistro/_sql/close"

@classmethod
def fetch_data_with_cursor(
cls,
database: Database,
sql: str,
page_index: int,
page_size: int,
) -> tuple[list[list[Any]], list[str]]:
"""
Fetch a single page of results using OpenDistro SQL cursor pagination.
Same protocol as ElasticSearchEngineSpec, different endpoint paths.
"""
return _fetch_page_via_cursor(
database=database,
sql=sql,
page_index=page_index,
page_size=page_size,
sql_path=cls.SQL_ENDPOINT,
close_path=cls.SQL_CLOSE_ENDPOINT,
)

@classmethod
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[dict[str, Any]] = None
Expand Down
2 changes: 1 addition & 1 deletion superset/models/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3285,7 +3285,7 @@ def get_sqla_query( # pylint: disable=too-many-arguments,too-many-locals,too-ma

if row_limit:
qry = qry.limit(row_limit)
if row_offset:
if row_offset and self.database.db_engine_spec.supports_offset:
qry = qry.offset(row_offset)

if series_limit and groupby_series_columns:
Expand Down
Loading