diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 1f3f9d9cfb..2064b69593 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -470,18 +470,16 @@ def add_step( self._pipe.insert_step(item_transform, insert_at) return self - def _remove_incremental_step(self) -> None: - self._pipe.remove_by_type(Incremental, IncrementalResourceWrapper) + def _remove_incremental_step(self) -> Optional[int]: + """Removes incremental step from pipe, returns its previous index (None if not found)""" + idx = self._pipe.remove_by_type(Incremental, IncrementalResourceWrapper) + return idx if idx >= 0 else None def set_incremental( - self, new_incremental: Union[Incremental[Any], IncrementalResourceWrapper] + self, + new_incremental: Union[Incremental[Any], IncrementalResourceWrapper], ) -> Optional[Union[Incremental[Any], IncrementalResourceWrapper]]: - """Set/replace the incremental transform for the resource. - - Args: - new_incremental: The Incremental instance/hint to set or replace - from_hints: If the incremental is set from hints. Defaults to False. - """ + """Set/replace the incremental transform for the resource, preserving step position.""" if new_incremental is Incremental.EMPTY: new_incremental = None if new_incremental: @@ -489,26 +487,23 @@ def set_incremental( if resource_primary_key := self._hints.get("primary_key"): new_incremental.set_deduplication_key(resource_primary_key, from_hints=True) incremental = self.incremental + existing_idx: Optional[int] = None if incremental is not None: - # if isinstance(new_incremental, Mapping): - # new_incremental = Incremental.ensure_instance(new_incremental) - if isinstance(new_incremental, IncrementalResourceWrapper): - # Completely replace the wrapper - self._remove_incremental_step() - self.add_step(new_incremental) + # completely replace the wrapper at the same position + existing_idx = self._remove_incremental_step() + self.add_step(new_incremental, insert_at=existing_idx) elif isinstance(incremental, IncrementalResourceWrapper): incremental.set_incremental(new_incremental, from_hints=True) else: - self._remove_incremental_step() - # re-add the step + existing_idx = self._remove_incremental_step() incremental = None if incremental is None: # if there's no wrapper add incremental as a transform if new_incremental: if not isinstance(new_incremental, IncrementalResourceWrapper): new_incremental = Incremental.ensure_instance(new_incremental) - self.add_step(new_incremental) + self.add_step(new_incremental, insert_at=existing_idx) return new_incremental def _set_hints( @@ -688,20 +683,21 @@ def _set_explicit_args( def _eject_config(self) -> bool: """Unwraps the pipe generator step from config injection and incremental wrappers by restoring the original step. - Removes the step with incremental wrapper. Should be used before a subsequent _inject_config is called on the - same pipe to successfully wrap it with new incremental and config injection. + Should be used before a subsequent _inject_config is called on the same pipe to successfully + wrap it with new incremental and config injection. Note that resources with bound arguments cannot be ejected. - """ if not self._pipe.is_empty and not self._args_bound: orig_gen = getattr(self._pipe.gen, "__GEN__", None) if orig_gen: - self._remove_incremental_step() self._pipe.replace_gen(orig_gen) return True return False - def _inject_config(self, incremental_from_hints_override: Optional[bool] = None) -> Self: + def _inject_config( + self, + incremental_from_hints_override: Optional[bool] = None, + ) -> Self: """Wraps the pipe generation step in incremental and config injection wrappers and adds pipe step with Incremental transform. """ @@ -793,7 +789,9 @@ def _clone( # this makes sure that a take config values from a right section and wrapper has a separated # instance in the pipeline if cloned_r._eject_config(): - cloned_r._inject_config(incremental_from_hints_override=incremental_from_hints) + cloned_r._inject_config( + incremental_from_hints_override=incremental_from_hints, + ) return cloned_r def _update_wrapper(self) -> None: diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index fbc5b522f7..738f48c5fc 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -1240,6 +1240,84 @@ def set_default_updated_at_arrow(records): assert s["last_value"] == 4 +def test_insert_at_ordering_preserved_after_clone() -> None: + """Step ordering via insert_at must survive the eject/inject cycle triggered by resource clone""" + + @dlt.source + def my_source(): + @dlt.resource + def events(): + yield [ + {"id": 1, "updated_at": 1}, + {"id": 2, "updated_at": 2}, + {"id": 3, "updated_at": 3}, + ] + + return events + + def count_rows(items, meta, metrics): + if isinstance(items, list): + metrics["row_count"] = metrics.get("row_count", 0) + len(items) + + source = my_source() + r = source.events + r.apply_hints(incremental=dlt.sources.incremental("updated_at", initial_value=2)) + r.add_metrics(count_rows, insert_at=len(r._pipe)) + + # verify step layout is preserved across clone + layout_before = [type(s).__name__ for s in r._pipe._steps] + incr_idx_before = r._pipe.find(Incremental, IncrementalResourceWrapper) + + cloned = r._clone() + + layout_after = [type(s).__name__ for s in cloned._pipe._steps] + incr_idx_after = cloned._pipe.find(Incremental, IncrementalResourceWrapper) + assert layout_after == layout_before + assert incr_idx_after == incr_idx_before + + p = dlt.pipeline(pipeline_name="p" + uniq_id(), destination="duckdb", dev_mode=True) + p.run(r) + + extract_info = p.last_trace.last_extract_info + resource_metrics = extract_info.metrics[extract_info.loads_ids[0]][0]["resource_metrics"][ + "events" + ] + assert resource_metrics.custom_metrics["row_count"] == 2 + + +def test_insert_at_ordering_preserved_after_clone_arg_based() -> None: + """Step ordering via insert_at must survive clone with argument-based incremental""" + + @dlt.source + def my_source(): + @dlt.resource + def events(updated_at=dlt.sources.incremental("updated_at", initial_value=2)): + yield [ + {"id": 1, "updated_at": 1}, + {"id": 2, "updated_at": 2}, + {"id": 3, "updated_at": 3}, + ] + + return events + + source = my_source() + r = source.events + + # add a filter after the incremental step + r.add_filter(lambda item: item["id"] != 999, insert_at=len(r._pipe)) + + # verify step layout is preserved across clone + layout_before = [type(s).__name__ for s in r._pipe._steps] + incr_idx_before = r._pipe.find(Incremental, IncrementalResourceWrapper) + + cloned = r._clone() + + layout_after = [type(s).__name__ for s in cloned._pipe._steps] + incr_idx_after = cloned._pipe.find(Incremental, IncrementalResourceWrapper) + assert layout_after == layout_before + assert incr_idx_after == incr_idx_before + + def test_json_path_cursor() -> None: @dlt.resource def some_data(last_timestamp=dlt.sources.incremental("item.timestamp|modifiedAt")):