Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 22 additions & 24 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,45 +470,40 @@ def add_step(
self._pipe.insert_step(item_transform, insert_at)
return self

def _remove_incremental_step(self) -> None:
self._pipe.remove_by_type(Incremental, IncrementalResourceWrapper)
def _remove_incremental_step(self) -> Optional[int]:
"""Removes incremental step from pipe, returns its previous index (None if not found)"""
idx = self._pipe.remove_by_type(Incremental, IncrementalResourceWrapper)
return idx if idx >= 0 else None

def set_incremental(
self, new_incremental: Union[Incremental[Any], IncrementalResourceWrapper]
self,
new_incremental: Union[Incremental[Any], IncrementalResourceWrapper],
) -> Optional[Union[Incremental[Any], IncrementalResourceWrapper]]:
"""Set/replace the incremental transform for the resource.

Args:
new_incremental: The Incremental instance/hint to set or replace
from_hints: If the incremental is set from hints. Defaults to False.
"""
"""Set/replace the incremental transform for the resource, preserving step position."""
if new_incremental is Incremental.EMPTY:
new_incremental = None
if new_incremental:
# set primary key as dedup key on new incremental
if resource_primary_key := self._hints.get("primary_key"):
new_incremental.set_deduplication_key(resource_primary_key, from_hints=True)
incremental = self.incremental
existing_idx: Optional[int] = None
if incremental is not None:
# if isinstance(new_incremental, Mapping):
# new_incremental = Incremental.ensure_instance(new_incremental)

if isinstance(new_incremental, IncrementalResourceWrapper):
# Completely replace the wrapper
self._remove_incremental_step()
self.add_step(new_incremental)
# completely replace the wrapper at the same position
existing_idx = self._remove_incremental_step()
self.add_step(new_incremental, insert_at=existing_idx)
elif isinstance(incremental, IncrementalResourceWrapper):
incremental.set_incremental(new_incremental, from_hints=True)
else:
self._remove_incremental_step()
# re-add the step
existing_idx = self._remove_incremental_step()
incremental = None
if incremental is None:
# if there's no wrapper add incremental as a transform
if new_incremental:
if not isinstance(new_incremental, IncrementalResourceWrapper):
new_incremental = Incremental.ensure_instance(new_incremental)
self.add_step(new_incremental)
self.add_step(new_incremental, insert_at=existing_idx)
return new_incremental

def _set_hints(
Expand Down Expand Up @@ -688,20 +683,21 @@ def _set_explicit_args(
def _eject_config(self) -> bool:
"""Unwraps the pipe generator step from config injection and incremental wrappers by restoring the original step.

Removes the step with incremental wrapper. Should be used before a subsequent _inject_config is called on the
same pipe to successfully wrap it with new incremental and config injection.
Should be used before a subsequent _inject_config is called on the same pipe to successfully
wrap it with new incremental and config injection.
Note that resources with bound arguments cannot be ejected.

"""
if not self._pipe.is_empty and not self._args_bound:
orig_gen = getattr(self._pipe.gen, "__GEN__", None)
if orig_gen:
self._remove_incremental_step()
self._pipe.replace_gen(orig_gen)
return True
return False

def _inject_config(self, incremental_from_hints_override: Optional[bool] = None) -> Self:
def _inject_config(
self,
incremental_from_hints_override: Optional[bool] = None,
) -> Self:
"""Wraps the pipe generation step in incremental and config injection wrappers and adds pipe step with
Incremental transform.
"""
Expand Down Expand Up @@ -793,7 +789,9 @@ def _clone(
# this makes sure that a take config values from a right section and wrapper has a separated
# instance in the pipeline
if cloned_r._eject_config():
cloned_r._inject_config(incremental_from_hints_override=incremental_from_hints)
cloned_r._inject_config(
incremental_from_hints_override=incremental_from_hints,
)
return cloned_r

def _update_wrapper(self) -> None:
Expand Down
78 changes: 78 additions & 0 deletions tests/extract/test_incremental.py
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great if we compare before and after states around r._clone(), something like:

    layout_before = [type(s).__name__ for s in r._pipe._steps]
    incr_idx_before = r._pipe.find(Incremental, IncrementalResourceWrapper)

    cloned = r._clone()

    layout_after = [type(s).__name__ for s in cloned._pipe._steps]
    incr_idx_after = cloned._pipe.find(Incremental, IncrementalResourceWrapper)
    assert incr_idx_after == incr_idx_before
    assert layout_after == layout_before

Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,84 @@ def set_default_updated_at_arrow(records):
assert s["last_value"] == 4


def test_insert_at_ordering_preserved_after_clone() -> None:
"""Step ordering via insert_at must survive the eject/inject cycle triggered by resource clone"""

@dlt.source
def my_source():
@dlt.resource
def events():
yield [
{"id": 1, "updated_at": 1},
{"id": 2, "updated_at": 2},
{"id": 3, "updated_at": 3},
]

return events

def count_rows(items, meta, metrics):
if isinstance(items, list):
metrics["row_count"] = metrics.get("row_count", 0) + len(items)

source = my_source()
r = source.events
r.apply_hints(incremental=dlt.sources.incremental("updated_at", initial_value=2))
r.add_metrics(count_rows, insert_at=len(r._pipe))

# verify step layout is preserved across clone
layout_before = [type(s).__name__ for s in r._pipe._steps]
incr_idx_before = r._pipe.find(Incremental, IncrementalResourceWrapper)

cloned = r._clone()

layout_after = [type(s).__name__ for s in cloned._pipe._steps]
incr_idx_after = cloned._pipe.find(Incremental, IncrementalResourceWrapper)
assert layout_after == layout_before
assert incr_idx_after == incr_idx_before

p = dlt.pipeline(pipeline_name="p" + uniq_id(), destination="duckdb", dev_mode=True)
p.run(r)

extract_info = p.last_trace.last_extract_info
resource_metrics = extract_info.metrics[extract_info.loads_ids[0]][0]["resource_metrics"][
"events"
]
assert resource_metrics.custom_metrics["row_count"] == 2


def test_insert_at_ordering_preserved_after_clone_arg_based() -> None:
"""Step ordering via insert_at must survive clone with argument-based incremental"""

@dlt.source
def my_source():
@dlt.resource
def events(updated_at=dlt.sources.incremental("updated_at", initial_value=2)):
yield [
{"id": 1, "updated_at": 1},
{"id": 2, "updated_at": 2},
{"id": 3, "updated_at": 3},
]

return events

source = my_source()
r = source.events

# add a filter after the incremental step
r.add_filter(lambda item: item["id"] != 999, insert_at=len(r._pipe))

# verify step layout is preserved across clone
layout_before = [type(s).__name__ for s in r._pipe._steps]
incr_idx_before = r._pipe.find(Incremental, IncrementalResourceWrapper)

cloned = r._clone()

layout_after = [type(s).__name__ for s in cloned._pipe._steps]
incr_idx_after = cloned._pipe.find(Incremental, IncrementalResourceWrapper)
assert layout_after == layout_before
assert incr_idx_after == incr_idx_before


def test_json_path_cursor() -> None:
@dlt.resource
def some_data(last_timestamp=dlt.sources.incremental("item.timestamp|modifiedAt")):
Expand Down
Loading