Implement incremental filtering for dlt.Relation#3889
Implement incremental filtering for dlt.Relation#3889
dlt.Relation#3889Conversation
Deploying with
|
| Status | Name | Latest Commit | Preview URL | Updated (UTC) |
|---|---|---|---|---|
| ✅ Deployment successful! View logs |
docs | 95d3ef4 | Commit Preview URL Branch Preview URL |
May 08 2026, 11:44 AM |
Co-authored-by: Copilot <copilot@github.com>
dlt.Relationdlt.Relation
| @property | ||
| def is_incremental(self) -> bool: | ||
| """True if any clause on this relation was produced by `.incremental()`.""" | ||
| # TODO: leaks True on aggregate relations because the inner subquery still |
There was a problem hiding this comment.
The issue (#3750) recommends storing this as a flag in sqlglot meta, but this imo has downsides: now there's no single source of truth and _incremental_ctx also should survive transformations. So I'd drop the flag.
rudolfix
left a comment
There was a problem hiding this comment.
this looks pretty good! 99% of use cases will be datetime cursors (this include _dlt_load_id which we'll join to _dlt_loads to use inserted_at as cursor - that's why we need this autojoin)
- it makes sense to reuse parts of
to_sqlglot_filter- it got tested across many destinations, which surfaced sqlglot problems with datetime literal
- taking sqlglot Column and type is a good idea - better then
to_sqlglot_filter lower, upper = incremental.resolve_bounds(apply_lag=apply_lag)- this gets correct range from incremental - also from ad hoc instances (not only inside resource) ie. without end_value set. (I added this exactly due to this problem)- code that formats datetime literals depending on destination caps should be ported
- there are unit test: you'll probably have your own
- end to end tests in
test_read_interfacesare worth preserving - they test datetime literals on all destinations - was not easy to make it work on all of them
- OK to drop this meta tag for now... let's see when we need it.
- we may still want to expose something similar to
to_sqlglot_filterfrom_incrementalmodule - for users that want to generate expression themselves and pass them towhere) _parse_incremental_cursor_paththis will be pretty useful. let's sync on those max/min values - datetime values will come mostly from external schedulers but we should still support "standalone" incrementals
| def test_incremental_dotted_cursor_runtime_columns_base_only( | ||
| incremental_dataset: dlt.Dataset, | ||
| ) -> None: | ||
| inc: dlt.sources.incremental[Any] = dlt.sources.incremental( |
There was a problem hiding this comment.
very cool! this will require end to end test (processing subsequent package ids)
rudolfix
left a comment
There was a problem hiding this comment.
ModalIncremental looks good - see the other PR for more details on that.
I've found two places where incomplete columns are allowed (raw read of columns from table schema). the other is old one on autojoin (dlt/dataset/_join.py:316). I included two tests that surface those
at this point we IMO need at least one smoke test for Incremental working on all destinations - there's still no end to end test that runs pipeline a few times and observes right data being selected via incremental
| """ | ||
|
|
||
| def __call__(self, relation: TDataItem) -> Tuple[Optional[TDataItem], bool, bool]: | ||
| ctx = getattr(relation, "_incremental_ctx", None) |
There was a problem hiding this comment.
please check dlthub review - here we should apply incremental automatically if not yet applied
| list inline so the join qualifier resolves. | ||
| """ | ||
| if ctx.incremental.end_value is None and ( | ||
| base_query.args.get("limit") is not None or base_query.args.get("order") is not None |
There was a problem hiding this comment.
what is wrong with "ORDER"? order alone returns all rows and is acceptable
| `last_value_func` is not `min` or `max`. | ||
|
|
||
| Notes: | ||
| Aggregate (GROUP BY) cursors with `range_start="open"`: late |
There was a problem hiding this comment.
- This is a rare edge case - worth describing - but maybe in our docs. Relation docs are not yet updated - it belongs there
- the case described here is intentional
range_start="open"is explicit setting and it means: ignore late arriving data at the boundary. so this is intentional correct behavior
| ) | ||
| return rel | ||
|
|
||
| if not self._table_name: |
There was a problem hiding this comment.
you could easily extract part below into a helper function and reuse incremental application logic in
if table_name is None:
path, incremental building like:
sqlglot_type = _sqlglot_type_for_column(columns, column_name)
_maybe_warn_on_cursor_missing_raise(incremental, columns, column_name)
condition = _build_incremental_condition(incremental, column_ref, sqlglot_type)
rel = self.__copy__()
rel._sqlglot_expression = query.where(condition) if condition is not None else query
rel._incremental_ctx = _RelationIncrementalContext(
incremental=incremental,
cursor_column=column_ref.copy(),
)
return rel
is identical for both
| this=sge.to_identifier(column_name, quoted=True), | ||
| table=sge.to_identifier(target_qualifier, quoted=False), | ||
| ) | ||
| target_columns = self._dataset.schema.tables[table_name].get("columns", {}) |
There was a problem hiding this comment.
- materialized tables have columns
- this expression also returns incomplete/not materialized columns.
- use
get_table_columns
having such columns is a byproduct of letting our users to define columns partially (ie. to say column with this name should be null, without setting the data type). maybe we should cleanup schemas when they are used in Relation class... but that's not something we can do now
| """Gets transform implementation that handles particular data item type""" | ||
| # Lazy import to avoid failure with a partially-initialised | ||
| # `dlt.extract` during dlt startup. | ||
| # TODO: we should consider creating a registry for transforms |
There was a problem hiding this comment.
I agree. we are reworking item detection in other pull requests. there's should be universal dispatched function (well - it is there but not used) that will return item type from which right incremental type could be derived
Closes #3750