Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
e29e7cc
Implement incremental filtering for relations
burnash Apr 23, 2026
c751400
Merge branch 'devel' into feat/3750-relation-incremental
burnash Apr 28, 2026
97ff02e
fix on_cursor_value_missing="include" + tests
burnash Apr 28, 2026
1ed54b1
add clear error when dotted incremental cursor follows .from_loads()/…
burnash Apr 28, 2026
2d6ea64
dont's strip order with limit
burnash Apr 29, 2026
baf104f
Merge branch 'devel' into feat/3750-relation-incremental
burnash Apr 29, 2026
aa96ff4
rename the arg
burnash Apr 30, 2026
413a280
emit `IS NOT NULL` for "raise" in on_cursor_value_missing, warn on nu…
burnash Apr 30, 2026
ce2ed0d
parse cursor validation errors, reject double incremental
burnash Apr 30, 2026
9f972ea
refactor cursor path parsing in _parse_incremental_cursor_path
burnash Apr 30, 2026
dc8230d
reject limit and order by on stateful incremental aggregate
burnash Apr 30, 2026
6d84c0a
drop _INCREMENTAL_META_KEY, switch is_incremental to _incremental_ctx…
burnash May 4, 2026
b0648ee
delegate incremental state advancement to ModelIncremental
burnash May 6, 2026
5321c89
Merge remote-tracking branch 'origin/devel' into feat/3750-relation-i…
burnash May 6, 2026
c83bfd9
add tests for ModelIncremental
burnash May 6, 2026
5d03400
rewrite test_simple_incremental as a model-incremental smoke test
burnash May 6, 2026
14b9bdb
make incremental aggregate handling bare and qualified cursors
burnash May 7, 2026
95d3ef4
test cases for incomplete column in projections
rudolfix May 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 233 additions & 0 deletions dlt/dataset/_incremental.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
from __future__ import annotations

import warnings
from dataclasses import dataclass
from typing import Any, Optional, Tuple, Type, TYPE_CHECKING

import sqlglot.expressions as sge
from jsonpath_ng.exceptions import JSONPathError

from dlt.common.jsonpath import extract_simple_field_name
from dlt.common.libs.sqlglot import build_typed_literal, to_sqlglot_type
from dlt.common.schema.typing import TTableSchemaColumns

if TYPE_CHECKING:
from dlt.extract.incremental import Incremental


_AGG_CURSOR_ALIAS = "__dlt_inc_cursor"


@dataclass(frozen=True)
class _RelationIncrementalContext:
"""Private per-relation marker tying a `Relation` back to its `Incremental`.

Set by `Relation.incremental()` and consumed by downstream lifecycle
code (e.g. `dlthub` transformations) that needs to advance the cursor
state after the relation executes.
"""

incremental: Incremental[Any]
cursor_column: sge.Column


def _build_incremental_aggregate(
base_query: sge.Query,
ctx: _RelationIncrementalContext,
) -> sge.Select:
"""Build `SELECT <func>(alias) FROM (SELECT cursor AS alias FROM <filtered>)`.

Bare cursor: wraps the base query as a subquery so projections, GROUP BY,
HAVING and aliased computed cursors are preserved. Qualified cursor
(`table.column`, from an auto-join): replaces the base query's projection
list inline so the join qualifier resolves.
"""
if ctx.incremental.end_value is None and (
base_query.args.get("limit") is not None or base_query.args.get("order") is not None
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is wrong with "ORDER"? order alone returns all rows and is acceptable

):
raise ValueError(
"LIMIT and ORDER BY aren't supported on stateful `.incremental()` as "
"state would advance past only the returned rows, silently skipping "
"the rest on the next run. Remove them, or set `end_value=` for a "
"bounded read."
)

cursor_alias = sge.to_identifier(_AGG_CURSOR_ALIAS, quoted=True)
if ctx.cursor_column.table:
inner = base_query.copy()
inner.set(
"expressions",
[sge.Alias(this=ctx.cursor_column.copy(), alias=cursor_alias)],
)
else:
bare_cursor = sge.Column(this=ctx.cursor_column.this.copy())
inner = sge.Select(
expressions=[sge.Alias(this=bare_cursor, alias=cursor_alias)]
).from_(base_query.copy().subquery())

agg_cls: Type[sge.AggFunc]
if ctx.incremental.last_value_func is max:
agg_cls = sge.Max
elif ctx.incremental.last_value_func is min:
agg_cls = sge.Min
else:
raise ValueError(
"Incremental aggregate can only be built for `min` or `max` "
f"`last_value_func`, got {ctx.incremental.last_value_func!r}."
)

outer_ref = sge.Column(this=cursor_alias.copy())
return sge.Select(expressions=[agg_cls(this=outer_ref)]).from_(inner.subquery())


def _parse_incremental_cursor_path(cursor_path: str) -> Tuple[Optional[str], str]:
"""Split `table.column` into parts, or return `(None, column)` for a bare field.

Rejects JSONPath expressions (wildcards, array indices, `$` root markers) that
cannot be pushed down to SQL.
"""
if not cursor_path:
raise ValueError("Incremental `cursor_path` must be a non-empty string.")

if any(ch in cursor_path for ch in ("$", "[", "*")):
raise ValueError(
f"Incremental `cursor_path={cursor_path!r}` is a JSONPath expression. "
"`Relation.incremental()` only supports plain `column` or `table.column` cursors."
)

invalid_msg = (
f"Incremental `cursor_path={cursor_path!r}` is not a plain column identifier. "
"Use `column` or `table.column`."
)

if "." in cursor_path:
table_part, column_part = cursor_path.rsplit(".", 1)
if not table_part:
raise ValueError(invalid_msg)
else:
table_part, column_part = None, cursor_path

try:
column_name = extract_simple_field_name(column_part)
except JSONPathError as e:
raise ValueError(invalid_msg) from e
if column_name is None:
raise ValueError(invalid_msg)
return table_part, column_name


def _build_incremental_condition(
incremental: Incremental[Any],
column_ref: sge.Column,
sqlglot_type: Optional[sge.DataType],
) -> Optional[sge.Expression]:
"""Build the WHERE condition for an Incremental cursor on `column_ref`.

Operator matrix (closed/open bounds):

- `max` + closed start -> `>=`, open start -> `>`
- `max` + open end -> `<`, closed end -> `<=`
- `min` + closed start -> `<=`, open start -> `<`
- `min` + open end -> `>`, closed end -> `>=`

Args:
incremental (Incremental): The incremental carrying cursor bounds, range, and
`on_cursor_value_missing` policy.
column_ref (sge.Column): Reference to the cursor column in the target query.
sqlglot_type (Optional[sge.DataType]): SQLGlot data type used to CAST the
bound literals; pass `None` to skip casting.

Returns:
Optional[sge.Expression]: A boolean expression ready to be attached via
`.where(...)`, or `None`.

Raises:
ValueError: If `incremental.last_value_func` is not `min` or `max`, or if
`on_cursor_value_missing` is not one of `"include"`, `"exclude"`, `"raise"`.
"""
last_value_func = incremental.last_value_func
start_op_cls: Type[sge.Binary]
end_op_cls: Type[sge.Binary]
if last_value_func is max:
start_op_cls = sge.GTE if incremental.range_start == "closed" else sge.GT
end_op_cls = sge.LT if incremental.range_end == "open" else sge.LTE
elif last_value_func is min:
start_op_cls = sge.LTE if incremental.range_start == "closed" else sge.LT
end_op_cls = sge.GT if incremental.range_end == "open" else sge.GTE
else:
raise ValueError(
f"Incremental `last_value_func={last_value_func!r}` cannot be pushed "
"down to SQL. Only `min` and `max` are supported by `Relation.incremental()`."
)

on_missing = incremental.on_cursor_value_missing
if on_missing not in ("include", "exclude", "raise"):
raise ValueError(
"Incremental `on_cursor_value_missing="
f"{on_missing!r}` is not supported by "
"`Relation.incremental()`. Expected one of: 'include', 'exclude', 'raise'."
)

start_value = incremental.last_value
end_value = incremental.end_value

bounds: Optional[sge.Expression] = None
if start_value is not None:
start_literal = build_typed_literal(start_value, sqlglot_type)
bounds = start_op_cls(this=column_ref.copy(), expression=start_literal)

if end_value is not None:
end_literal = build_typed_literal(end_value, sqlglot_type)
end_condition: sge.Expression = end_op_cls(this=column_ref.copy(), expression=end_literal)
bounds = end_condition if bounds is None else sge.And(this=bounds, expression=end_condition)

if on_missing == "include":
if bounds is None:
return None
is_null = sge.Is(this=column_ref.copy(), expression=sge.Null())
return sge.Or(this=bounds, expression=is_null)

# "exclude" or "raise" both pin nulls out via IS NOT NULL.
# "raise" can't raise mid-query in SQL pushdown; so we warn users
is_not_null = sge.Not(this=sge.Is(this=column_ref.copy(), expression=sge.Null()))
if bounds is None:
return is_not_null
return sge.And(this=bounds, expression=is_not_null)


def _maybe_warn_on_cursor_missing_raise(
incremental: Incremental[Any],
columns_schema: TTableSchemaColumns,
column_name: str,
) -> None:
"""Warn when `on_cursor_value_missing="raise"` is bound against a nullable cursor."""
if incremental.on_cursor_value_missing != "raise":
return
column_schema = columns_schema.get(column_name) or {}
if column_schema.get("nullable") is False:
return
warnings.warn(
"Can't raise on NULL cursor values; rows with NULL "
"cursors will be excluded. Set on_cursor_value_missing explicitly "
"to silence.",
UserWarning,
stacklevel=3,
)


def _sqlglot_type_for_column(
columns: TTableSchemaColumns, column_name: str
) -> Optional[sge.DataType]:
"""Resolve the SQLGlot data type for `column_name` from a dlt columns schema."""
column_schema = columns.get(column_name)
if not column_schema:
return None
data_type = column_schema.get("data_type")
if data_type is None:
return None
return to_sqlglot_type(
dlt_type=data_type,
precision=column_schema.get("precision"),
timezone=column_schema.get("timezone"),
nullable=column_schema.get("nullable"),
)
70 changes: 45 additions & 25 deletions dlt/dataset/_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,26 @@ def _discover_join_params(
return joins, target_qualifier


def _normalize_left_projection(query: sge.Select, left_table: str) -> list[sge.Expression]:
"""Qualify the left-side projection so an added JOIN can't leak right-side columns.
Bare `Star` becomes `<left_table>.*`; unqualified `Column`s get their
`table` set to `<left_table>`.
"""
origin_identifier = sge.to_identifier(left_table, quoted=False)
normalized: list[sge.Expression] = []
for expr in query.selects:
if isinstance(expr, sge.Star):
normalized.append(sge.Column(table=origin_identifier, this=sge.Star()))
elif isinstance(expr, sge.Column) and expr.args.get("table") is None:
expr_copy = expr.copy()
expr_copy.set("table", origin_identifier)
normalized.append(expr_copy)
else:
normalized.append(expr)
return normalized


def _apply_join_projection(
query: sge.Select,
*,
Expand All @@ -275,28 +295,17 @@ def _apply_join_projection(
projection_prefix: str,
allow_existing_target_projection: bool,
) -> None:
"""Apply join projection contract onto ``query``.
"""Apply join projection contract onto `query`.
Preserves the left-side projection and appends only columns from the explicitly
joined ``target_table`` as ``{projection_prefix}__{column}`` aliases.
joined `target_table` as `{projection_prefix}__{column}` aliases.
``allow_existing_target_projection`` is used for idempotent re-joins: when a
`allow_existing_target_projection` is used for idempotent re-joins: when a
join call contributes no new join edges, all target-prefixed columns may already
exist in the left projection and should be accepted as a no-op instead of raising
a collision error.
"""
# Unbound columns must refer to the origin table so bind them to it
origin_identifier = sge.to_identifier(left_table, quoted=False)
normalized_left_expressions: list[sge.Expression] = []
for expr in query.selects:
if isinstance(expr, sge.Star):
normalized_left_expressions.append(sge.Column(table=origin_identifier, this=sge.Star()))
elif isinstance(expr, sge.Column) and expr.args.get("table") is None:
expr_copy = expr.copy()
expr_copy.set("table", origin_identifier)
normalized_left_expressions.append(expr_copy)
else:
normalized_left_expressions.append(expr)
normalized_left_expressions = _normalize_left_projection(query, left_table)

existing_projection_column_names = {
expr.output_name
Expand Down Expand Up @@ -343,8 +352,14 @@ def _apply_join(
right_table: str,
projection_prefix: str,
kind: TJoinType = "inner",
project: bool = True,
) -> sge.Select:
"""Apply schema-driven join(s) to ``expression`` and return the new query."""
"""Apply schema-driven join(s) to `expression` and return the new query.
When `project` is `False` the JOIN is added to the query but the SELECT
list is left untouched (filter-only join). Use this for join targets whose
columns must be referenced in WHERE/ON predicates without being projected.
"""
if left_table not in schema.tables:
raise ValueError(f"Table `{left_table}` not found in dataset schema")
if right_table not in schema.tables:
Expand Down Expand Up @@ -374,13 +389,18 @@ def _apply_join(
)
query = query.join(join_expr)

_apply_join_projection(
query,
schema=schema,
left_table=left_table,
target_table=right_table,
target_qualifier=target_qualifier,
projection_prefix=projection_prefix,
allow_existing_target_projection=not join_params,
)
if project:
_apply_join_projection(
query,
schema=schema,
left_table=left_table,
target_table=right_table,
target_qualifier=target_qualifier,
projection_prefix=projection_prefix,
allow_existing_target_projection=not join_params,
)
else:
# Filter-only join: qualify the left projection so a bare `*` does not
# expand across the joined table and leak right-side columns at runtime.
query.set("expressions", _normalize_left_projection(query, left_table))
return query
26 changes: 21 additions & 5 deletions dlt/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
if TYPE_CHECKING:
from dlt.common.libs.ibis import ir
from dlt.common.libs.ibis import BaseBackend as IbisBackend
from dlt.extract.incremental import Incremental


class Dataset:
Expand Down Expand Up @@ -262,9 +263,22 @@ def __call__(
return self.query(query, query_dialect, _execute_raw_query=_execute_raw_query)

def table(
self, table_name: str, *, load_ids: Optional[Collection[str]] = None, **kwargs: Any
self,
table_name: str,
*,
load_ids: Optional[Collection[str]] = None,
incremental: Optional[Incremental[Any]] = None,
**kwargs: Any,
) -> dlt.Relation:
"""Get a `dlt.Relation` associated with a table from the dataset."""
"""Get a `dlt.Relation` associated with a table from the dataset.

Args:
table_name (str): Name of the table in the dataset schema.
load_ids (Optional[Collection[str]]): If provided, restrict rows to the
given load ids via `Relation.from_loads()`.
incremental (Optional[Incremental[Any]]): If provided, apply the cursor
range as a `WHERE` clause via `Relation.incremental()`.
"""
if table_name not in self.tables:
# TODO: raise TableNotFound
raise ValueError(f"Table `{table_name}` not found. Available table(s): {self.tables}")
Expand All @@ -277,10 +291,12 @@ def table(
" Ibis Table."
)

relation = dlt.Relation(dataset=self, table_name=table_name)
if load_ids:
return dlt.Relation(dataset=self, table_name=table_name).from_loads(load_ids)
else:
return dlt.Relation(dataset=self, table_name=table_name)
relation = relation.from_loads(load_ids)
if incremental is not None:
relation = relation.incremental(incremental)
return relation

def loads_table(self) -> dlt.Relation:
"""Get `_dlt_loads` table from the dataset."""
Expand Down
Loading
Loading