(fix) preserve insert_at step ordering across resource clone#3839
(fix) preserve insert_at step ordering across resource clone#3839AyushPatel101 wants to merge 2 commits intodlt-hub:develfrom
Conversation
The eject/inject cycle during pipeline.run() re-added the Incremental step via append_step(), which sorts by placement_affinity and destroys any explicit insert_at positioning. Save the incremental step index on eject and restore it on inject so user-defined step order is preserved.
anuunchin
left a comment
There was a problem hiding this comment.
Thanks for the PR and the repro code in the ticket!
Left a few comments with suggestions :)
| def _remove_incremental_step(self) -> int: | ||
| """Removes incremental step from pipe, returns its previous index (-1 if not found)""" | ||
| return self._pipe.remove_by_type(Incremental, IncrementalResourceWrapper) |
There was a problem hiding this comment.
How about
idx = self._pipe.remove_by_type(Incremental, IncrementalResourceWrapper)
return idx if idx >= 0 else Nonehere?
There was a problem hiding this comment.
Should be fixed now
| pass | ||
|
|
||
| def _eject_config(self) -> bool: | ||
| def _eject_config(self) -> Optional[int]: |
There was a problem hiding this comment.
I'd push back on the signature change here, we could delegate the entire incremental remove/add logic to the _set_incremental method instead. So here we'd have:
if not self._pipe.is_empty and not self._args_bound:
orig_gen = getattr(self._pipe.gen, "__GEN__", None)
if orig_gen:
self._pipe.replace_gen(orig_gen)
return True
return False (with proper dosctring adjustments)
There was a problem hiding this comment.
Should be fixed
| if incremental is not None: | ||
| # if isinstance(new_incremental, Mapping): | ||
| # new_incremental = Incremental.ensure_instance(new_incremental) | ||
|
|
||
| if isinstance(new_incremental, IncrementalResourceWrapper): | ||
| # Completely replace the wrapper | ||
| self._remove_incremental_step() | ||
| self.add_step(new_incremental) | ||
| # completely replace the wrapper, preserving position if not given | ||
| prev_idx = self._remove_incremental_step() | ||
| if insert_at is None and prev_idx >= 0: | ||
| insert_at = prev_idx | ||
| self.add_step(new_incremental, insert_at=insert_at) | ||
| elif isinstance(incremental, IncrementalResourceWrapper): | ||
| incremental.set_incremental(new_incremental, from_hints=True) | ||
| else: | ||
| self._remove_incremental_step() | ||
| prev_idx = self._remove_incremental_step() | ||
| if insert_at is None and prev_idx >= 0: | ||
| insert_at = prev_idx | ||
| # re-add the step | ||
| incremental = None | ||
| if incremental is None: | ||
| # if there's no wrapper add incremental as a transform | ||
| if new_incremental: | ||
| if not isinstance(new_incremental, IncrementalResourceWrapper): | ||
| new_incremental = Incremental.ensure_instance(new_incremental) | ||
| self.add_step(new_incremental) | ||
| self.add_step(new_incremental, insert_at=insert_at) |
There was a problem hiding this comment.
We should preferably keep the removal/addition of the incremental step with a position constraint in one place imo:
(next comment is related)
existing_idx: Optional[int] = None
if incremental is not None:
if isinstance(new_incremental, IncrementalResourceWrapper):
# completely replace the wrapper at the same position
existing_idx = self._remove_incremental_step()
self.add_step(new_incremental, insert_at=existing_idx)
elif isinstance(incremental, IncrementalResourceWrapper):
incremental.set_incremental(new_incremental, from_hints=True)
else:
existing_idx = self._remove_incremental_step()
# re-add the step
incremental = None
if incremental is None:
# if there's no wrapper add incremental as a transform
if new_incremental:
if not isinstance(new_incremental, IncrementalResourceWrapper):
new_incremental = Incremental.ensure_instance(new_incremental)
self.add_step(new_incremental, insert_at=existing_idx)There was a problem hiding this comment.
This will need tests for both arg based incremental
def events(updated_at=dlt.sources.incremental("updated_at", initial_value=2))and hint based incremental via apply_hints
There was a problem hiding this comment.
Should be fixed
| for step in p.last_trace.steps: | ||
| if step.step == "extract": | ||
| for load_id, metrics_list in step.step_info.metrics.items(): | ||
| for m in metrics_list: | ||
| for resource_name, resource_m in m["resource_metrics"].items(): | ||
| if resource_m.custom_metrics: | ||
| # metrics step ran after incremental, so only 2 rows pass | ||
| assert resource_m.custom_metrics["row_count"] == 2, ( | ||
| f"Expected 2 rows after incremental filter, " | ||
| f"got {resource_m.custom_metrics['row_count']}" | ||
| ) | ||
|
|
There was a problem hiding this comment.
Let's do
extract_info = p.last_trace.last_extract_info
resource_metrics = extract_info.metrics[extract_info.loads_ids[0]][0]["resource_metrics"]["events"]
assert resource_metrics.custom_metrics["row_count"] == 2(this is an existing pattern)
There was a problem hiding this comment.
Should be fixed
| r = source.events | ||
|
|
||
| # verify step ordering before clone: incremental should be the last step | ||
| from dlt.extract.incremental import IncrementalResourceWrapper |
There was a problem hiding this comment.
This is already imported in this file
There was a problem hiding this comment.
Should be fixed
| ) | ||
|
|
||
|
|
||
| def test_insert_at_ordering_preserved_signature_incremental() -> None: |
There was a problem hiding this comment.
This test is unnecessary imo
There was a problem hiding this comment.
Should be fixed
There was a problem hiding this comment.
Would be great if we compare before and after states around r._clone(), something like:
layout_before = [type(s).__name__ for s in r._pipe._steps]
incr_idx_before = r._pipe.find(Incremental, IncrementalResourceWrapper)
cloned = r._clone()
layout_after = [type(s).__name__ for s in cloned._pipe._steps]
incr_idx_after = cloned._pipe.find(Incremental, IncrementalResourceWrapper)
assert incr_idx_after == incr_idx_before
assert layout_after == layout_before…_incremental - Revert _eject_config to return bool; no longer removes incremental step - _remove_incremental_step returns Optional[int] (None instead of -1) - Consolidate removal/addition with existing_idx in set_incremental - Remove insert_at param from set_incremental and incremental_insert_at from _inject_config - Simplify test metrics assertion using last_extract_info pattern - Replace signature test with arg-based clone layout test - Remove duplicate IncrementalResourceWrapper import - Add clone layout assertions for both hint-based and arg-based incremental
AyushPatel101
left a comment
There was a problem hiding this comment.
Thanks for the feedback! All of this should be fixed now, but let me know if I missed anything
| pass | ||
|
|
||
| def _eject_config(self) -> bool: | ||
| def _eject_config(self) -> Optional[int]: |
There was a problem hiding this comment.
Should be fixed
| if incremental is not None: | ||
| # if isinstance(new_incremental, Mapping): | ||
| # new_incremental = Incremental.ensure_instance(new_incremental) | ||
|
|
||
| if isinstance(new_incremental, IncrementalResourceWrapper): | ||
| # Completely replace the wrapper | ||
| self._remove_incremental_step() | ||
| self.add_step(new_incremental) | ||
| # completely replace the wrapper, preserving position if not given | ||
| prev_idx = self._remove_incremental_step() | ||
| if insert_at is None and prev_idx >= 0: | ||
| insert_at = prev_idx | ||
| self.add_step(new_incremental, insert_at=insert_at) | ||
| elif isinstance(incremental, IncrementalResourceWrapper): | ||
| incremental.set_incremental(new_incremental, from_hints=True) | ||
| else: | ||
| self._remove_incremental_step() | ||
| prev_idx = self._remove_incremental_step() | ||
| if insert_at is None and prev_idx >= 0: | ||
| insert_at = prev_idx | ||
| # re-add the step | ||
| incremental = None | ||
| if incremental is None: | ||
| # if there's no wrapper add incremental as a transform | ||
| if new_incremental: | ||
| if not isinstance(new_incremental, IncrementalResourceWrapper): | ||
| new_incremental = Incremental.ensure_instance(new_incremental) | ||
| self.add_step(new_incremental) | ||
| self.add_step(new_incremental, insert_at=insert_at) |
There was a problem hiding this comment.
Should be fixed
| for step in p.last_trace.steps: | ||
| if step.step == "extract": | ||
| for load_id, metrics_list in step.step_info.metrics.items(): | ||
| for m in metrics_list: | ||
| for resource_name, resource_m in m["resource_metrics"].items(): | ||
| if resource_m.custom_metrics: | ||
| # metrics step ran after incremental, so only 2 rows pass | ||
| assert resource_m.custom_metrics["row_count"] == 2, ( | ||
| f"Expected 2 rows after incremental filter, " | ||
| f"got {resource_m.custom_metrics['row_count']}" | ||
| ) | ||
|
|
There was a problem hiding this comment.
Should be fixed
| r = source.events | ||
|
|
||
| # verify step ordering before clone: incremental should be the last step | ||
| from dlt.extract.incremental import IncrementalResourceWrapper |
There was a problem hiding this comment.
Should be fixed
| ) | ||
|
|
||
|
|
||
| def test_insert_at_ordering_preserved_signature_incremental() -> None: |
There was a problem hiding this comment.
Should be fixed
|
Hi @anuunchin, bumping this in case my fix was missed here, let me know if theres anything else I need to adjust here |
The eject/inject cycle during pipeline.run() re-added the Incremental step via append_step(), which sorts by placement_affinity and destroys any explicit insert_at positioning. Save the incremental step index on eject and restore it on inject so user-defined step order is preserved.
Description
Related Issues
Additional Context